top of page
Learn Data Engineering
at LearnDataEngineering.com
Search

How to quickly set up Airflow to invoke Lambda

Hello, I am sure that this blog post gives you a quick way to set up Airflow on your desktop and get going!!!


What is Airflow?


Once I had a scenario to run the task on the Unix system and trigger another task on windows upon completion. I had to deal with installing a few tools and integrating them to accomplish the workflow.

Scheduling & Managing such tasks become even more complex.


Let's say you have a single tool that covers all the above issues and offers more capabilities.


Apache Airflow is once such a powerful tool, where you can define and schedule tasks programmatically. It offers rich UI to monitor and manipulate the workflow.


In this blog, I am sharing my experience of:

Setting up Airflow

Creating workflow

Deploying a DAG to invoke AWS lambda.


Analogy1


Problem: Job2 has to execute after Job1

Solution: In SQL I can write a logic to check the Job1 status and run Job2, only if Job1 is completed.


Analogy2


Let's say you have tasks dependent on cloud services, python and shell scripts, we need to look beyond the logic defined in Analogy1 and that is Workflow management.

Apache airflow is one such tool, which reduces the dependencies to an acyclic structure and each task can be defined from a wide variety of operators.



Apache Airflow deployment



I configured Airflow on Windows by enabling the Linux subsystem.

For detailed configuration, visit the link below,




Config file


airflow.cfg file will be located in the AIRFLOW_HOME Unix environment variable.

few configuration setting that, I want to talk about,



expose_config = True # set this to True to view config from UI

dags_folder = <absolute path> # path to your DAG's folder

load_examples = False # if you don't want examples to be loaded


How to create a DAG?


I created a DAG(Dynamic Acyclic Graph), to manually invoke Lamda function.

Python template captured in a picture,




Write a python script which includes


  • Default arguments dictionary: prepare a dictionary with some default arguments which are applicable to all tasks

  • Initialize DAG class: class initialization with parameters

  • Define tasks: task definition with operator class like python operator, bash operator, etc

  • Task dependency: set the dependency with a simple >> notation like,

          simple work flow
           t1 >> t2 >> t3
          extend the workflow
           start >> [t1, t2]   
           t2 >> t3 >> end
           t2 >> t4 >> t5
           t1 >> t6
          

  • place the script into the default DAG folder defined in the config file.


What are Hooks?


Hooks are the interface to external platforms and databases.


def lambda1(ds,**kwargs):         
    lambda_client = boto3.client('lambda',                                   
                                 region_name='<>',                                  
                                 aws_access_key_id='<>',                                  
                                 aws_secret_access_key='<>')         response_1 = lambda_client.invoke(FunctionName='myAirflowTest',
                                 InvocationType='RequestResponse')         print ('Response--->' , response_1) 

For the example above code can also be written with hooks as below


def lambda1(ds,**kwargs):                                                  
        hook = AwsLambdaHook('myAirflowTest', 
                              region_name='<>',        
                              log_type='None',qualifier='$LATEST',
                              invocation_type='RequestResponse',
                              config=None,aws_conn_id='my_lambda')
        response_1 = hook.invoke_lambda(payload='null')
        print ('Response--->' , response_1)



aws_conn_id can be defined as a connection in airflow




Note: install cryptography to store you passwords as encrypted ( pip install cryptography)

To run the Airflow process in the background


airflow webserver starts the application and access UI @http://localhost:8080

airflow scheduler checks to initiate any DAG runs

# To start services
nohup airflow webserver  >> $AIRFLOW_HOME/airflow/logs/webserver.logs &
nohup airflow scheduler >> $AIRFLOW_HOME/airflow/logs/scheduler.logs &

# shutdown services:
kill$(ps -ef | grep "airflow webserver"| awk '{print $2}')
kill$(ps -ef | grep "airflow scheduler"| awk '{print $2}')

I used the above commands on Ubuntu bash terminal and I am sure these commands work on most of Linux flavors.


Conclusion


Thanks for making it this Far,

You would have succeeded in setting up airflow and interacting with AWS lambda, kindly let me know if there are any issues or recommendations in the comments below.

Lot more to explore about Airflow @ https://airflow.apache.org/docs/stable/


11,879 views0 comments

Recent Posts

See All
bottom of page