Integrating Apache Airflow and Databricks: Building ETL pipelines with Apache Spark

This is one of a series of blogs on integrating Databricks with commonly used software packages. See the “What’s Next” section at the end to read others in the series, which includes how-tos for AWS Lambda, Kinesis, and more.


Apache Airflow Overview

Airflow is a platform to programmatically author, schedule, and monitor workflows. It can be used to author workflows as directed acyclic graphs (DAGs) of tasks. The Airflow scheduler executes your tasks on an array of workers while following the specified dependencies.

There are two key concepts in Airflow: DAGs describe how to run a workflow, while Operators determine what actually gets done by defining a task in a workflow. Operators are usually (but not always) atomic, meaning they can stand on their own and don’t need to share resources with any other operators.

Airflow is a heterogenous workflow management system enabling gluing of multiple systems both in cloud and on-premise. In cases that Databricks is a component of the larger system, e.g., ETL or Machine Learning pipelines, Airflow can be used for scheduling and management.
Airflow already works with some commonly used systems like S3, MySQL, or HTTP endpoints; one can also extend the base modules easily for other systems.

This blog assumes there is an instance of Airflow up and running already. See the “References” section for readings on how to do setup Airflow.

How to use Airflow with Databricks

The Databricks REST API enables programmatic access to Databricks, (instead of going through the Web UI). It can automatically create and run jobs, productionalize a data flow, and much more. It will also allow us to integrate Airflow with Databricks through Airflow operators.

Airflow provides operators for many common tasks, and you can use the BashOperator and Sensor operator to solve many typical ETL use cases, e.g. triggering a daily ETL job to post updates in AWS S3 or row records in a database.

The BashOperator

The BashOperator executes a bash command. It can be used to integrate with Databricks via the Databricks API to start a preconfigured Spark job, for example:

t0 = BashOperator(
     task_id='dbjob',
     depends_on_past=False,
     bash_command='curl -X POST -u username:password https://.cloud.databricks.com/api/2.0/jobs/run-now -d \'{\"job_id\":}\'',
     dag=dag)

You can test this operator by typing in:

%airflow test tutorial dbjob 2016-10-01

In the above example the operator starts a job in Databricks, the JSON load is a key / value (job_id and the actual job number).

Note: Instead of using curl with the BashOperator, you can also use the SimpleHTTPOperator to achieve the same results.

The Sensor Operator

The Sensor operator keeps running until a criteria is met. Examples include: a certain file landing in a S3 bucket (S3KeySensor), or a HTTP GET request to an end-point (HttpSensor); it is important to set up the correct time interval between each retry, ‘poke_interval’. It is necessary to use a Sensor Operator with the ‘hook’ to the persistence layer to do a push notification in an ETL workflow.

JSON File to Parquet Processing Example

Below is an example of setting up a pipeline to process JSON files and converting them to parquet on a daily basis using Databricks. Airflow is used to orchestrate this pipeline by detecting when daily files are ready for processing and setting “S3 sensor” for detecting the output of the daily job and sending a final email notification.

Setup of the pipeline:

Screenshot showing the setup of the pipeline in Airflow

As shown above this pipeline has five steps:

  1. Input S3 Sensor (check_s3_for_file_s3) checks that input data do exist:
s3:///input-airflow/input-*
  1. Databricks REST API (dbjob), BashOperator to make REST API call to Databricks and dynamically passing the file input and output arguments. For the purposes of illustrating the point in this blog, we use the command below; for your workloads, there are many ways to maintain security if entering your S3 secret key in the Airflow Python configuration file is a security concern:
curl -X POST -u : \
     https://.cloud.databricks.com/api/2.0/jobs/run-now \
     -d'{"job_id":, "notebook_params":{"inputPath": "s3a://:@/input/test.json","outputPath": "s3a://:@/output/sample_parquet_data"}}'

The nightly-etl-job in Databricks

Above is the screen-shot of the job within Databricks that is getting called from Airflow. Read the Databricks jobs documentation to learn more.

  1. Databricks Action involves reading an input JSON file and converting it into parquet:
val inputPath = getArgument("inputPath", "test")
val testJsonData = sqlContext.read.json(inputPath)
val outPath = getArgument("outputPath", "test")
testJsonData.write.format("parquet").mode("overwrite").save(outPath)
  1. Output S3 Sensor (check_s3_output_for_file_s3) checks that output data do exist:
s3:///output-airflow/sample_parquet_data/_SUCCESS
  1. Email Notification (email_notification), sends out an email to alert when the job is successful.

Below is the Python configuration file for this Airflow pipeline:

from airflow import DAG
from airflow.operators import BashOperator, S3KeySensor, EmailOperator
from datetime import datetime, timedelta


today_date = datetime.today()


default_args = {
   'owner': 'airflow',
   'depends_on_past': False,
   'start_date': today_date,
   'email': ['<>@databricks.com'],
   'email_on_failure': False,
   'email_on_retry': False,
   'retries': 1,
   'retry_delay': timedelta(minutes=5),
}


dag = DAG('tutorial', default_args=default_args, schedule_interval= '@once')


inputsensor = S3KeySensor(
   task_id='check_s3_for_file_in_s3',
   bucket_key='input-airflow/input-*',
   wildcard_match=True,
   bucket_name='peyman-datapipeline',
   s3_conn_id='S3Connection',
   timeout=18*60*60,
   poke_interval=120,
   dag=dag)


dbtask = BashOperator(
    task_id='dbjob',
    depends_on_past=False,
    bash_command='curl -X POST -u : https://demo.cloud.databricks.com/api/2.0/jobs/run-now -d \'{\"job_id\":, \"notebook_params\":{\"inputPath\": \":<secrte-key@/input-airflow/input-airflow.json\",\"outputPath\": \":<secrte-key@/output-airflow/sample_parquet_data\"}}\'',
    dag=dag)


outputsensor = S3KeySensor(
   task_id='check_s3_output_for_file_in_s3',
   bucket_key='output-airflow/sample_parquet_data/_SUCCESS',
   wildcard_match=False,
   bucket_name='peyman-datapipeline',
   s3_conn_id='S3Connection',
   timeout=18*60*60,
   poke_interval=120,
   dag=dag)


emailNotify = EmailOperator(
   task_id='email_notification',
   to = '',
   subject = 'ETL Job Done',
   html_content = 'Airflow ETL Job Done',
   dag=dag)


dbtask.set_upstream(inputsensor)
outputsensor.set_upstream(dbtask)
emailNotify.set_upstream(outputsensor)

What’s Next

We hope this simple example shows how you can use Airflow and Databricks to solve your data processing problems. To try Databricks, sign-up for a free trial or contact us.

Read other blogs in the series to learn how to integrate Databricks with your existing architecture:

References

Try Databricks for free Get started

Sign up