Learn Data Engineering
Only $19.97/month
Search

How to process simple data stream and consume with Lambda

I built a serverless architecture for my simulated credit card complaints stream using,


  • AWS S3

  • AWS Lambda

  • AWS Kinesis

the above picture gives a high-level view of the data flow. I assume uploading the CSV file as a data producer, so once you upload a file, it generates object created event and the Lambda function is invoked asynchronously. The file data content will be written to the Kinesis stream as a record (record = data + partition key), which triggers another Lambda function and persist the data to S3.


Prerequisites


If you are a python guy like me, go through these packages

boto3

(https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html)


Boto3 makes it easy to integrate your



below links helps you to set up access form your Jupiter notebooks

https://realpython.com/python-boto3-aws-s3/

https://towardsdatascience.com/aws-and-python-the-boto3-package-df495bb29cb3


Terminology


Batch Processing: With traditional analytics, you gather the data, load it periodically

into a database, and analyze it hours, days, or weeks later.


Stream Processing: Analyzing real-time data requires a different approach. Instead of running database queries over stored data, stream processing applications process data continuously in realtime, even before it is stored.


Producer: A producer puts data records into Amazon Kinesis Data Streams.


Consumer: A consumer process the data records.


Preparing S3


Create an S3 bucket from the management console.

I require to have a source and target as S3, you can see the list showing two buckets.








Configure Kinesis data stream


Create a Kinesis stream using the management console. The calculator helps you to decide shards count.

NOTE: keep on eye on billing as this service is not listed under the free tier.


Lambda function


I used the AWS management console to create a function. It's self-explanatory GUI where you need to choose a runtime( In my case its Python) and permission role.

IAM (Identity Access Management) is a service where you can create roles.

For example,

My requirement: Lamda function needs a read/write/list access and write access to S3

I created a role, lamda_to_s3, where I picked a predefined policy called AWSLambdaExecute(show in below pic1) and created a customized policy(shown in below pic2).



Trigger


I added an S3 trigger to invoke lambda function on file created event.




Handler function 1


Below is the generic format of the lambda function,

def lambda_handler(event, context):
    return <>

we should customize the function based on our requirement, utilizing the function parameters.

https://github.com/vijaykothareddy/Data-Engineering/blob/master/s3_to_kinesis.py

Code helps you read the file content on S3 and write to Kinesis Stream.


Trigger


I added kinesis trigger to invoke lambda function on, latest data available.



Handler function 2


https://github.com/vijaykothareddy/Data-Engineering/blob/master/kinesis_to_s3.py

Code helps you read the stream and write to S3.


Logs and Monitoring


Finally, cloud watch logs are a great source for monitoring the workflow and troubleshooting the errors.

I tried to put content regarding the architecture, which I feel is relevant and required. Let me know if any of the topic needs more explanation in the comments below.


Follow me @ linkedin.com/in/kvbr

Git @ https://github.com/vijaykothareddy/Data-Engineering


795 views

© 2020 Team Data Sicence - Andreas Kretz

  • Black LinkedIn Icon
  • Black Twitter Icon
  • Black Facebook Icon
  • Black Instagram Icon