🗓️ Live Webinar November 9: How HealthMatch.io Used Customer.io and RudderStack to Launch Their New Business Model in 24 Hours
Blog
PRODUCT
Activate Your Data Streams Using RudderStack: A Use-Case with AWS Lambda and Amazon Kinesis
Subscribe
We'll send you updates from the blog and monthly release notes.
Amey Varangaonkar
Content Manager at RudderStack
May 19, 2020
This blog presents an approach for routing data to RudderStack using Amazon Kinesis and AWS Lambda Functions.
Introduction
Many organizations today make use of streaming event data from their applications and websites. For collecting the data streams, they use tools like Amazon Kinesis. But how can these businesses turn the data streams into actionable insights? A popular approach to do this is through a process that is called activation. In this process, we transform the raw data and then route it to different applications and services for insights. For example, we can send signup events to our CRM so that the sales team can work with new leads and establish business opportunities.
In this post, we present a very powerful architecture that uses readily available services to achieve the above tasks. We combine Amazon Kinesis with AWS Lambda Functions and RudderStack, an open-source and flexible Customer Data Infrastructure that performs the activation we are looking for. The lambda functions in AWS read the Kinesis data streams and pass them on RudderStack for performing the necessary data mapping. RudderStack then passes on this mapped data to the analytics platforms (Google Analytics, Amplitude, etc.) for analytics.
Note: In this post, we use the AWS stack as an example. However, it is possible to substitute Kinesis with Kafka and AWS with any other cloud provider. The stack will still work seamlessly with RudderStack.
How AWS Lambda Integrates with RudderStack
As mentioned above, we use Lambda functions in AWS as an intermediary for processing and routing data streams for analytics. As Lambda functions can be coded in Node.js, its integration with data routing tools such as RudderStack is very easy. RudderStack provides a Node.js SDK which we can use with the Lambda code.
Quick Overview of the Data Flow
For the purpose of this blog, we devise a simple application flow:
- Use AWS Kinesis Agent to:
- Monitor the file system for specific file patterns in a specified location
- Upload the newly arrived files to the pre-defined Kinesis stream
- Use AWS Kinesis Data Streams Consumer to read and process the data
- The Consumer triggers an AWS Lambda Function
- The Lambda function maps the Kinesis data to the RudderStack API arguments
- The Lambda function invokes the RudderStack API
- RudderStack routes the data to two destinations – AWS S3 and Google Analytics
Setting up the AWS Kinesis Agent
We can configure the AWS Kinesis Agent using the following lines of code:
JSON
{"cloudwatch.emitMetrics": true,"kinesis.endpoint": "","firehose.endpoint": "","flows": [{"filePattern": "/tmp/*.csv","kinesisStream": "lambda-integration-poc","partitionKeyOption": "RANDOM","dataProcessingOptions": [{"optionName": "CSVTOJSON","customFieldNames": ["anonymousId","orderId", "itemId", "itemName", "qty", "unitPrice"],"delimiter": ","}]}]}
Some important points to note here:
- We configure the AWS Kinesis Agent to monitor CSV files in the
/tmp
directory. - The Kinesis Agent passes the data to the
lambda-integration-poc
Kinesis data stream. - The data processing options stipulate that the CSV data be converted to JSON. The field names for the generated JSON are also mentioned.
Some examples of a sample CSV file and the corresponding generated Kinesis records follow:
JSON
testuser1,0001,0001,sample product 1,001,51.00testuser1,0001,0002,sample product 2,002,23.50{ "anonymousId": "testuser1", "orderId": "0001", "itemId": "0001", "itemName": "sample product 1","qty": "001", "unitPrice": "51.00" }{ "anonymousId": "testuser1", "orderId": "0001", "itemId": "0002", "itemName": "sample product 2","qty": "002", "unitPrice": "23.50" }
Note: We use this transformation as an example to demonstrate the operation of AWS Kinesis Agent. Some businesses already have their own programs that write to Kinesis in a format that suits their business requirements. There is no need to change such programs and/or formats.
Setting up AWS Lambda
Before we proceed to the Lambda Function code, it is imperative that we review the overall setup. For this blog, we set up a Docker version of the RudderStack server in an EC2 instance. You can find more instructions on the setup here.
Note: You also need to have the AWS CLI installed in your development environment.
Integrating RudderStack with AWS Lambda
As mentioned previously, we can integrate AWS Lambda seamlessly with third-party libraries such as the RudderStack Node.js SDK. The lambda function calls this SDK to perform the necessary data mappings and route the data streams to the specified analytics destinations. Hence, it would also be prudent at this point to go over a few steps that are necessary for integrating the RudderStack Node SDK with the Lambda Function.
- You should install the RudderStack Node.js SDK at the location where we maintain the Lambda function artifacts in the development environment, as shown:
SH
[ec2-user@ip-172-31-44-230 ~]$ npm install --prefix=~/lambda-apps @rudderstack/rudder-sdk-node
- Archive all the contents of the Lambda function development directory in a ZIP file.
SH
[ec2-user@ip-172-31-44-230 lambda-apps]$ zip -r function.zip
- Update the lambda function deployment, as shown:
SH
[ec2-user@ip-172-31-44-230 lambda-apps]$ aws lambda update-function-code --function-name lambda-apps-dev-helloWorld --zip-file fileb://~/lambda-apps/function.zip
Using the Lambda function
As the next step, the following snippet shows the Lambda code. You can use the Lambda code response to test the availability of the function at the AWS-designated web endpoint, created at the time of deployment of the function for the first time.
In the following code snippet, the following actions occur:
- The function initializes some of the variables used for constructing the RudderStack canonical object
- The lambda function iterates over every record in the Kinesis event
- The function parses the records, which are in JSON format
- The function then uses the attribute values of the JSON object as the values for the RudderStack object attributes
- In some cases, RudderStack object attribute values are derived by aggregating the JSON object attribute values, as in the case of
revenue
- Each record is used to create a
product
object. Multipleproduct
objects are collected into aproducts
array. Anorder
object is constructed using theproducts
array, theorder_id
from the records, and therevenue
After this, the order
object is used as the value for the properties
key while invoking the track
API of RudderStack.
JAVASCRIPT
'use strict';const Analytics = require("@rudderstack/rudder-sdk-node");////module.exports.helloWorld = (event, context, callback) => {const response = {statusCode: 200,headers: {'Access-Control-Allow-Origin': '*', // Required for CORS support to work},body: JSON.stringify({message: 'Go Serverless v1.0! Your function executed successfully!',input: event,}),};var order = {};var revenue = 0;var anonymousId = "dummy";order["products"] = [];event.Records.forEach(function(record) {// Kinesis data is base64 encoded so decode herevar payload = Buffer.from(record.kinesis.data, 'base64').toString('ascii');console.log('Decoded payload:', payload);//Construct order line item as expected by GA from Kinesis recordvar orderLine = JSON.parse(payload);var product = {};product["product_id"] = orderLine.itemId;product["name"] = orderLine.itemName;revenue += orderLine.qty * orderLine.unitPrice;order["products"].push(product);order["order_id"] = orderLine.orderId; //keeping it simple, all line items from same orderanonymousId = orderLine.anonymousId; //keeping simple again, as above});order["revenue"] = revenue;console.log("Order : ", JSON.stringify(order));// we need the batch endpoint of the Rudder server you are runningconst client = new Analytics("1ZINZh5pUNcKwgVGccCuSE4hi7K", "Data Plane URL");//remember to handle error and allow for processing to continuetry {client.track({"event" : "Order Completed", "anonymousId" : anonymousId, "properties" : {order}});console.log("Rudder Success");} catch(err) {console.log("Rudder Error");}callback(null, response);};
The write_key
and the RudderStack endpoint is used to initialize the Rudder client. In this particular case, we configure RudderStack to dump the event to Amazon S3 as well as Google Analytics, for analytics. Learn more on configuring the sources and destinations in RudderStack here:
RudderStack Connections Configuration
The delivered event dumped to Amazon S3 looks like the following:
JSON
{"type": "track", "event": "Order Completed", "sentAt": "2020-04-15T09:59:50.246Z", "context": {"library": {"name": "analytics-node", "version": "0.0.1"}}, "_metadata": {"nodeVersion": "12.16.1"}, "messageId": "node-5306d64b863bdf7c95cce1442c70f3ac-1345b9b5-c5a9-4c1b-8338-64762ff2de8d", "timestamp": "2020-04-15T09:59:50.27Z", "properties": {"order": {"revenue": 98, "order_id": "0001", "products": [{"name": "sample product 1", "product_id": "0001"}, {"name": "sample product 2", "product_id": "0002"}]}}, "receivedAt": "2020-04-15T09:59:50.271Z", "request_ip": "34.205.171.63:54764", "anonymousId": "testuser1", "originalTimestamp": "2020-04-15T09:59:50.245Z"}
The screenshot below shows the delivered event in Google Analytics:
The RudderStack event as seen in Google Analytics
Summary
In this post, we saw how to combine the data streams with RudderStack and AWS Lambda functions to create an extremely flexible and real-time activation data flow for your event data. Combining infrastructures like AWS Kinesis and AWS Lambdas with RudderStack results in a lean and scalable data infrastructure where value can be extracted from your data in no time.
Moreover, RudderStack is an open and flexible Customer Data Infrastructure which means that it can be combined with any of the common data platforms you can find. In this post, we use the AWS stack as an example, but it is possible to substitute Kinesis with Kafka and AWS with any other cloud provider.
Sign up for Free and Start Sending Data
Test out our event stream, ELT, and reverse-ETL pipelines. Use our HTTP source to send data in less than 5 minutes, or install one of our 12 SDKs in your website or app. Get started.
ABOUT THE AUTHOR
Amey Varangaonkar
Content Manager at RudderStack
Recent Posts
Subscribe
We'll send you updates from the blog and monthly release notes.
Get started today
Start building smarter customer data pipelines today with RudderStack. Our solutions engineering team is here to help.
This site uses cookies to improve your experience. If you want to learn more about cookies and why we use them, visit our cookie policy. We’ll assume you’re ok with this, but you can opt-out if you wish Cookie Settings.