Skip to main content

In this blog, we explore how to leverage Databricks’ powerful jobs API with Amazon Managed Apache Airflow (MWAA) and integrate with Cloudwatch to monitor Directed Acyclic Graphs (DAG) with Databricks-based tasks. Additionally, we will show how to create alerts based on DAG performance metrics.

Before we get into the how-to section of this guidance, let’s quickly understand what are Databricks job orchestration and Amazon Managed Airflow (MWAA)?

Databricks orchestration and alerting

Job orchestration in Databricks is a fully integrated feature. Customers can use the Jobs API or UI to create and manage jobs and features, such as email alerts for monitoring. With this powerful API-driven approach, Databricks jobs can orchestrate anything that has an API ( e.g., pull data from a CRM). Databricks orchestration can support jobs with single or multi-task option, as well as newly added jobs with Delta Live Tables.

Amazon Managed Airflow

Amazon Managed Workflows for Apache Airflow (MWAA) is a managed orchestration service for Apache Airflow. MWAA manages the open-source Apache Airflow platform on the customers’ behalf with the security, availability, and scalability of AWS. MWAA gives customers additional benefits of easy integration with AWS Services and a variety of third-party services via pre-existing plugins, allowing customers to create complex data processing pipelines.

High-Level architecture diagram

We will create a simple DAG that launches a Databricks Cluster and executes a notebook. MWAA monitors the execution. Note: we have a simple job definition, but MWAA can orchestrate a variety of complex workloads.

High-Level architecture diagram for creating a simple DAG that launches a Databricks Cluster and executes a notebook.

Setting up the environment

The blog assumes you have access to Databricks workspace. Sign up for a free one here and configure a Databricks cluster. Additionally, create an API token to be used to configure connection in MWAA.

Databricks users can create an Amazon Managed Workflows for Apache Airflow (MWAA) directly from their dashboard.

To create an MWAA environment follow these instructions.

How to create a Databricks connection

The first step is to configure the Databricks connection in MWAA.

The first step in creating an MWAA on Databricks is establishing a connection between MWAA and the Databricks Workspace.

Example DAG

Next upload your DAG into the S3 bucket folder you specified when creating the MWAA environment. Your DAG will automatically appear on the MWAA UI.

Example Airflow DAG

Here’s an example of an Airflow DAG, which creates configuration for a new Databricks jobs cluster, Databricks notebook task, and submits the notebook task for execution in Databricks.

from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator, DatabricksRunNowOperator
from datetime import datetime, timedelta 

#Define params for Submit Run Operator
new_cluster = {
    'spark_version': '7.3.x-scala2.12',
    'num_workers': 2,
    'node_type_id': 'i3.xlarge',
     "aws_attributes": {
        "instance_profile_arn": "arn:aws:iam::XXXXXXX:instance-profile/databricks-data-role"
    }
}

notebook_task = {
    'notebook_path': '/Users/[email protected]/test',
}

#Define params for Run Now Operator
notebook_params = {
    "Variable":5
}

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=2)
}

with DAG('databricks_dag',
    start_date=datetime(2021, 1, 1),
    schedule_interval='@daily',
    catchup=False,
    default_args=default_args
    ) as dag:

    opr_submit_run = DatabricksSubmitRunOperator(
        task_id='submit_run',
        databricks_conn_id='databricks_default',
        new_cluster=new_cluster,
        notebook_task=notebook_task
    )
    opr_submit_run

Get the latest version of the file from the GitHub link.

Trigger the DAG in MWAA.

Triggering the Airflow DAG via the MWAA UI.

Once triggered you can see the job cluster on the Databricks cluster UI page.

Once an Airflow DAG is triggered, the respective job cluster is displayed on the Databricks cluster UI page.

Troubleshooting

Amazon MWAA uses Amazon CloudWatch for all Airflow logs. These are helpful troubleshooting DAG failures.

Amazon MWAA uses Amazon CloudWatch for all Airflow logs.

CloudWatch metrics and alerts

Next, we create a metric to monitor the successful completion of the DAG. Amazon MWAA supports many metrics.

Databricks creates a metric to monitor the successful completion of the Airflow DAG.

We use TaskInstanceFailures to create an alarm.

Databricks uses TaskInstanceFailures to create alarms once an Airflow DAG has run to, for example, notify if any failures are recorded over a specific period of time.

For threshold we select zero ( i.e., we want to be notified when there are any failures over a period of one hour).

Lastly, we select an Email notification as the action.

Databricks’ UI makes it easy to configure the notification action, e.g., email, for issues uncovered by the Airflow DAG run.

Here’s an example of the Cloudwatch Email notification generated when the DAG fails.

You are receiving this email because your Amazon CloudWatch Alarm "DatabricksDAGFailure" in the US East (N. Virginia) region has entered the ALARM state, because "Threshold Crossed

Example of the Cloudwatch alert generated when the DAG fails.

Conclusion

In this blog, we showed how to create an Airflow DAG that creates, configures, and submits a new Databricks jobs cluster, Databricks notebook task, and the notebook task for execution in Databricks. We leverage MWAA’s out-of-the-box integration with CloudWatch to monitor our example workflow and receive notifications when there are failures.

What’s next

Code Repo
MWAA-DATABRICKS Sample DAG Code

Try Databricks for free

Related posts

See all Partners posts