Protobuf transformation in AWS Kinesis with Lambda

I recently came across a scenario where Protobuf formatted real-time events were ingested into a kinesis stream from an external third party source, however passing those events as is to Datadog via Kinesis firehose delivery stream to the other side of the fence ended up having the data in a corrupt format something similar to the scenario described here. For the purpose of fixing the format or getting it in the right shape in Datadog I needed to do a transformation on those events using the Lambda function that is attached to the kinesis firehose stream.

1- What is Protobuf?

Protocol Buffers or Protobuf is another open source binary data serialization mechanism made by google and it is gaining a lot of popularity recently since it has proven to have better performance than json which is the most common serialization protocol used today. If you want to learn more about Protobuf there is no better than the official documentation by its makers here.

2- Scenario

In this scenario I’m assuming that messages/events are arriving into Kinesis stream in a protobuf format, the source of these events could be any aws service or an external service provider service. The kinesis stream is connected to a kinesis firehose delivery that has Datadog as destination. However, since Datadog does not support protobuf format, transformation is needed, which is done via a lambda function.

3- Kinesis firehose configuration

In aws you can forward kinesis data stream events to kinesis firehose delivery stream for additional processing and then forward to the final destination, you can read more about it here.

4- The Lambda Transformation

We’ve seen in the previous step that you can configure a Lambda function that will apply Transformation logic on the data hand it over back to the Kinesis firehose which will deliver to the final destination. This is really a powerful option that will you to do other things like filtering out unneeded data, apply enrichment…etc., please check this amazing article and here if you want to learn more.

console.log("Loading function");
var protobuf = require("protobufjs");
const root = new protobuf.Root().loadSync("searchrequest.proto", {
keepCase: true,
});
var SearchRequests = root.lookupType("search.searchrequest");
exports.metricsTransform = async (event, context) => {
/* Process the list of records and transform them */
const output = event.records.map((record) => {

// Kinesis data is base64 encoded so decode here
var buf = Buffer.from(record.data, "base64");
var message = JSON.stringify(SearchRequests.decode(buf));
// You can also parse the message into json object and access properties for additional enrichment
message = JSON.parse(message);
return {
recordId: record.recordId,
result: "Ok",
data: Buffer.from(JSON.stringify(message)).toString("base64"),
};
});
return { records: output };
};
syntax = "proto3";
package Search;
message SearchRequest {
required string query = 1;
optional int32 page_number = 2;
optional int32 result_per_page = 3 [default = 10];
enum Corpus {
UNIVERSAL = 0;
WEB = 1;
IMAGES = 2;
LOCAL = 3;
NEWS = 4;
PRODUCTS = 5;
VIDEO = 6;
}
optional Corpus corpus = 4 [default = UNIVERSAL];
}

Senior Software Engineer