Protobuf transformation in AWS Kinesis with Lambda

Fouad Roumieh
4 min readOct 25, 2020

I recently came across a scenario where Protobuf formatted real-time events were ingested into a kinesis stream from an external third party source, however passing those events as is to Datadog via Kinesis firehose delivery stream to the other side of the fence ended up having the data in a corrupt format something similar to the scenario described here. For the purpose of fixing the format or getting it in the right shape in Datadog I needed to do a transformation on those events using the Lambda function that is attached to the kinesis firehose stream.

1- What is Protobuf?

Protocol Buffers or Protobuf is another open source binary data serialization mechanism made by google and it is gaining a lot of popularity recently since it has proven to have better performance than json which is the most common serialization protocol used today. If you want to learn more about Protobuf there is no better than the official documentation by its makers here.

Due to the fact that Protobuf is still not supported by many tools or services, in some scenarios you might need to capture Protobuf formatted messages and apply transformation on it to convert it to another format like json.

2- Scenario

In this scenario I’m assuming that messages/events are arriving into Kinesis stream in a protobuf format, the source of these events could be any aws service or an external service provider service. The kinesis stream is connected to a kinesis firehose delivery that has Datadog as destination. However, since Datadog does not support protobuf format, transformation is needed, which is done via a lambda function.

3- Kinesis firehose configuration

In aws you can forward kinesis data stream events to kinesis firehose delivery stream for additional processing and then forward to the final destination, you can read more about it here.

The first thing to notice in that configuration is the configuration of the source which is a pretty straight forward step, just select the kinesis that its events will be sent for the kinesis firehose delivery stream.

The other important configuration to notice is the “Transform source records with AWS Lambdawhich is just as the title says, we will discuss what you can do with the lambda function more in the next section.

Finally, something else to notice is the configuration of the destination i.e. where the data will be sent to. Kinesis firehose has integrations with some aws services like S3 in addition to other external service providers like Splunk and Datadog.

In my scenario the destination is Datadog and you can read more about that setup in this detailed article by the Datadog team.

4- The Lambda Transformation

We’ve seen in the previous step that you can configure a Lambda function that will apply Transformation logic on the data hand it over back to the Kinesis firehose which will deliver to the final destination. This is really a powerful option that will you to do other things like filtering out unneeded data, apply enrichment…etc., please check this amazing article and here if you want to learn more.

As I mentioned initially the data arriving is in protobuf format hence we want to convert it to a json acceptable format before forwarding it to Datadog.

In the below code I’m using the npm package protobufjs which will help us to decode incoming base64 message based on a specific schema written in proto format.

console.log("Loading function");
var protobuf = require("protobufjs");
const root = new protobuf.Root().loadSync("searchrequest.proto", {
keepCase: true,
});
var SearchRequests = root.lookupType("search.searchrequest");
exports.metricsTransform = async (event, context) => {
/* Process the list of records and transform them */
const output = event.records.map((record) => {

// Kinesis data is base64 encoded so decode here
var buf = Buffer.from(record.data, "base64");
var message = JSON.stringify(SearchRequests.decode(buf));
// You can also parse the message into json object and access properties for additional enrichment
message = JSON.parse(message);
return {
recordId: record.recordId,
result: "Ok",
data: Buffer.from(JSON.stringify(message)).toString("base64"),
};
});
return { records: output };
};

Here is an example of a protobuf message that I simply copied from the official documentation, this would sit in searchrequest.proto file in the root folder of your lambda or other path you may choose.

syntax = "proto3";
package Search;
message SearchRequest {
required string query = 1;
optional int32 page_number = 2;
optional int32 result_per_page = 3 [default = 10];
enum Corpus {
UNIVERSAL = 0;
WEB = 1;
IMAGES = 2;
LOCAL = 3;
NEWS = 4;
PRODUCTS = 5;
VIDEO = 6;
}
optional Corpus corpus = 4 [default = UNIVERSAL];
}

This Lambda is really basic one, but I just wanted to explain the idea behind transformation of Protobuf messages in Kinesis, hope that helps.

--

--