🗓️ Live Webinar November 9: How HealthMatch.io Used Customer.io and RudderStack to Launch Their New Business Model in 24 Hours

Pricing
Log in

Blog

PRODUCT

Activate Your Data Streams Using RudderStack: A Use-Case with AWS Lambda and Amazon Kinesis

Blog banner
Subscription

Subscribe

We'll send you updates from the blog and monthly release notes.

Amey Varangaonkar

Amey Varangaonkar

Content Manager at RudderStack

May 19, 2020

This blog presents an approach for routing data to RudderStack using Amazon Kinesis and AWS Lambda Functions.

Introduction

Many organizations today make use of streaming event data from their applications and websites. For collecting the data streams, they use tools like Amazon Kinesis. But how can these businesses turn the data streams into actionable insights? A popular approach to do this is through a process that is called activation. In this process, we transform the raw data and then route it to different applications and services for insights. For example, we can send signup events to our CRM so that the sales team can work with new leads and establish business opportunities.

In this post, we present a very powerful architecture that uses readily available services to achieve the above tasks. We combine Amazon Kinesis with AWS Lambda Functions and RudderStack, an open-source and flexible Customer Data Infrastructure that performs the activation we are looking for. The lambda functions in AWS read the Kinesis data streams and pass them on RudderStack for performing the necessary data mapping. RudderStack then passes on this mapped data to the analytics platforms (Google Analytics, Amplitude, etc.) for analytics.

Note: In this post, we use the AWS stack as an example. However, it is possible to substitute Kinesis with Kafka and AWS with any other cloud provider. The stack will still work seamlessly with RudderStack.

How AWS Lambda Integrates with RudderStack

As mentioned above, we use Lambda functions in AWS as an intermediary for processing and routing data streams for analytics. As Lambda functions can be coded in Node.js, its integration with data routing tools such as RudderStack is very easy. RudderStack provides a Node.js SDK which we can use with the Lambda code.

Quick Overview of the Data Flow

For the purpose of this blog, we devise a simple application flow:

  • Use AWS Kinesis Agent to:
    • Monitor the file system for specific file patterns in a specified location
    • Upload the newly arrived files to the pre-defined Kinesis stream
  • Use AWS Kinesis Data Streams Consumer to read and process the data
  • The Consumer triggers an AWS Lambda Function
  • The Lambda function maps the Kinesis data to the RudderStack API arguments
  • The Lambda function invokes the RudderStack API
  • RudderStack routes the data to two destinations – AWS S3 and Google Analytics

Setting up the AWS Kinesis Agent

We can configure the AWS Kinesis Agent using the following lines of code:

JSON
{
"cloudwatch.emitMetrics": true,
"kinesis.endpoint": "",
"firehose.endpoint": "",
"flows": [
{
"filePattern": "/tmp/*.csv",
"kinesisStream": "lambda-integration-poc",
"partitionKeyOption": "RANDOM",
"dataProcessingOptions": [
{
"optionName": "CSVTOJSON",
"customFieldNames": ["anonymousId","orderId", "itemId", "itemName", "qty", "unitPrice"],
"delimiter": ","
}
]
}
]
}

Some important points to note here:

  • We configure the AWS Kinesis Agent to monitor CSV files in the /tmp directory.
  • The Kinesis Agent passes the data to the lambda-integration-poc Kinesis data stream.
  • The data processing options stipulate that the CSV data be converted to JSON. The field names for the generated JSON are also mentioned.

Some examples of a sample CSV file and the corresponding generated Kinesis records follow:

JSON
testuser1,0001,0001,sample product 1,001,51.00
testuser1,0001,0002,sample product 2,002,23.50
{ "anonymousId": "testuser1", "orderId": "0001", "itemId": "0001", "itemName": "sample product 1",
"qty": "001", "unitPrice": "51.00" }
{ "anonymousId": "testuser1", "orderId": "0001", "itemId": "0002", "itemName": "sample product 2",
"qty": "002", "unitPrice": "23.50" }
Note: We use this transformation as an example to demonstrate the operation of AWS Kinesis Agent. Some businesses already have their own programs that write to Kinesis in a format that suits their business requirements. There is no need to change such programs and/or formats.

Setting up AWS Lambda

Before we proceed to the Lambda Function code, it is imperative that we review the overall setup. For this blog, we set up a Docker version of the RudderStack server in an EC2 instance. You can find more instructions on the setup here.

Note: You also need to have the AWS CLI installed in your development environment.

Integrating RudderStack with AWS Lambda

As mentioned previously, we can integrate AWS Lambda seamlessly with third-party libraries such as the RudderStack Node.js SDK. The lambda function calls this SDK to perform the necessary data mappings and route the data streams to the specified analytics destinations. Hence, it would also be prudent at this point to go over a few steps that are necessary for integrating the RudderStack Node SDK with the Lambda Function.

  • You should install the RudderStack Node.js SDK at the location where we maintain the Lambda function artifacts in the development environment, as shown:
SH
[ec2-user@ip-172-31-44-230 ~]$ npm install --prefix=~/lambda-apps @rudderstack/rudder-sdk-node
  • Archive all the contents of the Lambda function development directory in a ZIP file.
SH
[ec2-user@ip-172-31-44-230 lambda-apps]$ zip -r function.zip
  • Update the lambda function deployment, as shown:
SH
[ec2-user@ip-172-31-44-230 lambda-apps]$ aws lambda update-function-code --function-name lambda-apps-dev-helloWorld --zip-file fileb://~/lambda-apps/function.zip

Using the Lambda function

As the next step, the following snippet shows the Lambda code. You can use the Lambda code response to test the availability of the function at the AWS-designated web endpoint, created at the time of deployment of the function for the first time.

In the following code snippet, the following actions occur:

  • The function initializes some of the variables used for constructing the RudderStack canonical object
  • The lambda function iterates over every record in the Kinesis event
  • The function parses the records, which are in JSON format
  • The function then uses the attribute values of the JSON object as the values for the RudderStack object attributes
  • In some cases, RudderStack object attribute values are derived by aggregating the JSON object attribute values, as in the case of revenue
  • Each record is used to create a product object. Multiple product objects are collected into a products array. An order object is constructed using the products array, the order_id from the records, and the revenue

After this, the order object is used as the value for the properties key while invoking the track API of RudderStack.

JAVASCRIPT
'use strict';
const Analytics = require("@rudderstack/rudder-sdk-node");
//
//
module.exports.helloWorld = (event, context, callback) => {
const response = {
statusCode: 200,
headers: {
'Access-Control-Allow-Origin': '*', // Required for CORS support to work
},
body: JSON.stringify({
message: 'Go Serverless v1.0! Your function executed successfully!',
input: event,
}),
};
var order = {};
var revenue = 0;
var anonymousId = "dummy";
order["products"] = [];
event.Records.forEach(function(record) {
// Kinesis data is base64 encoded so decode here
var payload = Buffer.from(record.kinesis.data, 'base64').toString('ascii');
console.log('Decoded payload:', payload);
//Construct order line item as expected by GA from Kinesis record
var orderLine = JSON.parse(payload);
var product = {};
product["product_id"] = orderLine.itemId;
product["name"] = orderLine.itemName;
revenue += orderLine.qty * orderLine.unitPrice;
order["products"].push(product);
order["order_id"] = orderLine.orderId; //keeping it simple, all line items from same order
anonymousId = orderLine.anonymousId; //keeping simple again, as above
});
order["revenue"] = revenue;
console.log("Order : ", JSON.stringify(order));
// we need the batch endpoint of the Rudder server you are running
const client = new Analytics("1ZINZh5pUNcKwgVGccCuSE4hi7K", "Data Plane URL");
//remember to handle error and allow for processing to continue
try {
client.track({"event" : "Order Completed", "anonymousId" : anonymousId, "properties" : {order}});
console.log("Rudder Success");
} catch(err) {
console.log("Rudder Error");
}
callback(null, response);
};

The write_key and the RudderStack endpoint is used to initialize the Rudder client. In this particular case, we configure RudderStack to dump the event to Amazon S3 as well as Google Analytics, for analytics. Learn more on configuring the sources and destinations in RudderStack here:

RudderStack Connections Configuration

The delivered event dumped to Amazon S3 looks like the following:

JSON
{"type": "track", "event": "Order Completed", "sentAt": "2020-04-15T09:59:50.246Z", "context": {"library": {"name": "analytics-node", "version": "0.0.1"}}, "_metadata": {"nodeVersion": "12.16.1"}, "messageId": "node-5306d64b863bdf7c95cce1442c70f3ac-1345b9b5-c5a9-4c1b-8338-64762ff2de8d", "timestamp": "2020-04-15T09:59:50.27Z", "properties": {"order": {"revenue": 98, "order_id": "0001", "products": [{"name": "sample product 1", "product_id": "0001"}, {"name": "sample product 2", "product_id": "0002"}]}}, "receivedAt": "2020-04-15T09:59:50.271Z", "request_ip": "34.205.171.63:54764", "anonymousId": "testuser1", "originalTimestamp": "2020-04-15T09:59:50.245Z"}

The screenshot below shows the delivered event in Google Analytics:

The RudderStack event as seen in Google Analytics

Summary

In this post, we saw how to combine the data streams with RudderStack and AWS Lambda functions to create an extremely flexible and real-time activation data flow for your event data. Combining infrastructures like AWS Kinesis and AWS Lambdas with RudderStack results in a lean and scalable data infrastructure where value can be extracted from your data in no time.

Moreover, RudderStack is an open and flexible Customer Data Infrastructure which means that it can be combined with any of the common data platforms you can find. In this post, we use the AWS stack as an example, but it is possible to substitute Kinesis with Kafka and AWS with any other cloud provider.

Sign up for Free and Start Sending Data

Test out our event stream, ELT, and reverse-ETL pipelines. Use our HTTP source to send data in less than 5 minutes, or install one of our 12 SDKs in your website or app. Get started.

Amey Varangaonkar

ABOUT THE AUTHOR

Amey Varangaonkar

Content Manager at RudderStack

Recent Posts

PRODUCT

Spotlight: Have a Very Data Holiday Promotion for Event Streams

By Kristen Glass
PRODUCT

Send Form Data From Marketo to Multiple Destinations Using RudderStack

By Alex Dovenmuehle
PRODUCT

RudderStack Unaffected by Log4j Vulnerability

By Kristen Glass
arrow

See all posts

Subscription

Subscribe

We'll send you updates from the blog and monthly release notes.

Get Started Image

Get started today

Start building smarter customer data pipelines today with RudderStack. Our solutions engineering team is here to help.

Sign up for freeGet a demo

COMPANY

  • About
  • Contact us
  • Partner with us
  • 🚀 We’re hiring!
  • Privacy policy
  • Terms of service

JOIN THE CONVERSATION

Learn more about the product and how other engineers are building their customer data pipelines.

Join our Slack Community

READ OUR DOCUMENTATION

Technical documentation on using RudderStack to collect, route and manage your event data securely.

Go to Docs

© RudderStack Inc.

This site uses cookies to improve your experience. If you want to learn more about cookies and why we use them, visit our cookie policy. We’ll assume you’re ok with this, but you can opt-out if you wish Cookie Settings.