top of page
Learn Data Engineering

How to process simple data stream and consume with Lambda

Updated: Apr 21, 2022

I built a serverless architecture for my simulated credit card complaints stream using,

  • AWS S3

  • AWS Lambda

  • AWS Kinesis

the above picture gives a high-level view of the data flow. I assume uploading the CSV file as a data producer, so once you upload a file, it generates object created event and the Lambda function is invoked asynchronously. The file data content will be written to the Kinesis stream as a record (record = data + partition key), which triggers another Lambda function and persist the data to S3.


If you are a python guy like me, go through these packages


Below links helps you to set up access form your Jupiter notebooks


Batch Processing: With traditional analytics, you gather the data, load it periodically

into a database, and analyze it hours, days, or weeks later.

Stream Processing: Analyzing real-time data requires a different approach. Instead of running database queries over stored data, stream processing applications process data continuously in realtime, even before it is stored.

Producer: A producer puts data records into Amazon Kinesis Data Streams.

Consumer: A consumer process the data records.

Preparing S3

Create an S3 bucket from the management console.

I require to have a source and target as S3, you can see the list showing two buckets.

Configure Kinesis data stream

Create a Kinesis stream using the management console. The calculator helps you to decide shards count.

NOTE: keep on eye on billing as this service is not listed under the free tier.

Lambda function

I used the AWS management console to create a function. It's self-explanatory GUI where you need to choose a runtime( In my case its Python) and permission role.

IAM (Identity Access Management) is a service where you can create roles.

For example,

My requirement: Lamda function needs a read/write/list access and write access to S3

I created a role, lamda_to_s3, where I picked a predefined policy called AWSLambdaExecute(show in below pic1) and created a customized policy(shown in below pic2).


I added an S3 trigger to invoke lambda function on file created event.

Handler function 1

Below is the generic format of the lambda function,

def lambda_handler(event, context):
    return <>

we should customize the function based on our requirement, utilizing the function parameters.

Code helps you read the file content on S3 and write to Kinesis Stream.


I added kinesis trigger to invoke lambda function on, latest data available.

Handler function 2

Code helps you read the stream and write to S3.

Logs and Monitoring

Finally, cloud watch logs are a great source for monitoring the workflow and troubleshooting the errors.

I tried to put content regarding the architecture, which I feel is relevant and required. Let me know if any of the topic needs more explanation in the comments below.

3,673 views0 comments

Recent Posts

See All


bottom of page