top of page
Learn Data Engineering
at LearnDataEngineering.com
Search

How to process simple data stream and consume with Lambda

Updated: Apr 21, 2022

I built a serverless architecture for my simulated credit card complaints stream using,


  • AWS S3

  • AWS Lambda

  • AWS Kinesis

the above picture gives a high-level view of the data flow. I assume uploading the CSV file as a data producer, so once you upload a file, it generates object created event and the Lambda function is invoked asynchronously. The file data content will be written to the Kinesis stream as a record (record = data + partition key), which triggers another Lambda function and persist the data to S3.


Prerequisites


If you are a python guy like me, go through these packages

boto3


Below links helps you to set up access form your Jupiter notebooks


Terminology


Batch Processing: With traditional analytics, you gather the data, load it periodically

into a database, and analyze it hours, days, or weeks later.


Stream Processing: Analyzing real-time data requires a different approach. Instead of running database queries over stored data, stream processing applications process data continuously in realtime, even before it is stored.


Producer: A producer puts data records into Amazon Kinesis Data Streams.


Consumer: A consumer process the data records.


Preparing S3


Create an S3 bucket from the management console.

I require to have a source and target as S3, you can see the list showing two buckets.








Configure Kinesis data stream


Create a Kinesis stream using the management console. The calculator helps you to decide shards count.

NOTE: keep on eye on billing as this service is not listed under the free tier.


Lambda function


I used the AWS management console to create a function. It's self-explanatory GUI where you need to choose a runtime( In my case its Python) and permission role.

IAM (Identity Access Management) is a service where you can create roles.

For example,

My requirement: Lamda function needs a read/write/list access and write access to S3

I created a role, lamda_to_s3, where I picked a predefined policy called AWSLambdaExecute(show in below pic1) and created a customized policy(shown in below pic2).



Trigger


I added an S3 trigger to invoke lambda function on file created event.




Handler function 1


Below is the generic format of the lambda function,

def lambda_handler(event, context):
    return <>

we should customize the function based on our requirement, utilizing the function parameters.

Code helps you read the file content on S3 and write to Kinesis Stream.


Trigger


I added kinesis trigger to invoke lambda function on, latest data available.



Handler function 2


Code helps you read the stream and write to S3.


Logs and Monitoring


Finally, cloud watch logs are a great source for monitoring the workflow and troubleshooting the errors.

I tried to put content regarding the architecture, which I feel is relevant and required. Let me know if any of the topic needs more explanation in the comments below.



3,673 views0 comments

Recent Posts

See All

Comments


bottom of page