top of page
Learn Data Engineering
at LearnDataEngineering.com
Search

How to process simple data stream and consume with Lambda

Updated: Apr 21, 2022

I built a serverless architecture for my simulated credit card complaints stream using,


  • AWS S3

  • AWS Lambda

  • AWS Kinesis

the above picture gives a high-level view of the data flow. I assume uploading the CSV file as a data producer, so once you upload a file, it generates object created event and the Lambda function is invoked asynchronously. The file data content will be written to the Kinesis stream as a record (record = data + partition key), which triggers another Lambda function and persist the data to S3.


Prerequisites


If you are a python guy like me, go through these packages

boto3


Below links helps you to set up access form your Jupiter notebooks


Terminology


Batch Processing: With traditional analytics, you gather the data, load it periodically

into a database, and analyze it hours, days, or weeks later.


Stream Processing: Analyzing real-time data requires a different approach. Instead of running database queries over stored data, stream processing applications process data continuously in realtime, even before it is stored.


Producer: A producer puts data records into Amazon Kinesis Data Streams.


Consumer: A consumer process the data records.


Preparing S3


Create an S3 bucket from the management console.

I require to have a source and target as S3, you can see the list showing two buckets.








Configure Kinesis data stream


Create a Kinesis stream using the management console. The calculator helps you to decide shards count.

NOTE: keep on eye on billing as this service is not listed under the free tier.


Lambda function


I used the AWS management console to create a function. It's self-explanatory GUI where you need to choose a runtime( In my case its Python) and permission role.

IAM (Identity Access Management) is a service where you can create roles.

For example,

My requirement: Lamda function needs a read/write/list access and write access to S3

I created a role, lamda_to_s3, where I picked a predefined policy called AWSLambdaExecute(show in below pic1) and created a customized policy(shown in below pic2).



Trigger


I added an S3 trigger to invoke lambda function on file created event.




Handler function 1


Below is the generic format of the lambda function,

def lambda_handler(event, context):
    return <>

we should customize the function based on our requirement, utilizing the function parameters.

Code helps you read the file content on S3 and write to Kinesis Stream.


Trigger


I added kinesis trigger to invoke lambda function on, latest data available.



Handler function 2


Code helps you read the stream and write to S3.


Logs and Monitoring


Finally, cloud watch logs are a great source for monitoring the workflow and troubleshooting the errors.

I tried to put content regarding the architecture, which I feel is relevant and required. Let me know if any of the topic needs more explanation in the comments below.



3,649 views0 comments

Recent Posts

See All
bottom of page