3 Powerful Airflow DAG Examples with Dask

Richard Pelgrim January 7, 2022

,

You can run specific Apache Airflow tasks on a Dask cluster to leverage additional computing resources in the cloud whenever you need them. This post gives you 3 Apache Airflow examples in Python to get you started. We will walk you through 1 of them in detail.

Try Coiled

Jump straight into the code with these Python scripts.

Airflow DAG Examples for Big Data

Apache Airflow is one of the most popular tools for orchestrating data engineering, machine learning, and DevOps workflows, developed by the Apache Software Foundation. Out-of-the-box, Airflow will run your computations locally, which means you can only process datasets that fit within the memory resources of your machine.

To use an Apache Airflow DAG (directed acyclic graph) for data pipelines with larger-than-memory datasets, you can scale out the specific Airflow tasks containing heavy data science workloads to a Dask cluster using Coiled. And since Coiled clusters are charged by the second, you’ll only be paying for the extra resources when you’re actually using them.

Airflow DAG examples can be hard to find. This blog will show you how to construct your own Apache Airflow DAG for larger-than-memory datasets with only minimal changes to your existing Python code.

We’ll walk through one example in detail, you can find the other Airflow DAG examples in this dedicated repository.

How to Scale an Airflow Pipeline using Dask

Airflow workflows are defined using Tasks and DAGs, which are orchestrated by Executors. DAG stands for Directed Acyclic Graph and each DAG is a Python script that defines your Airflow workflow.

To delegate heavy workflows to Dask, we’ll spin up a Coiled cluster within a Task that contains heavy computation and bring back the result, in this case a .value_counts() over a column of interest. Since this result will fit into memory easily, we can shut down the cluster immediately to limit cost and continue using the result in our next tasks locally.

Airflow ETL DAG Definition

The DAG will contain the following 3 Tasks :

  1. A task that spins up a Coiled cluster, perform heavy computations over the entire dataset, then shut the cluster down;
  2. A task that uses the result to calculate summary statistics over the data and save these statistics to a CSV file;
  3. A task that uses the result to find the top 100 most active Github users and save these to a CSV file.

Let’s start by defining an Airflow DAG using the @dag decorator, passing it the default_args defined earlier in the script as well as a number of other arguments you can tweak.

# define DAG as a function with the @dag decorator
@dag(
   default_args=default_args,
   schedule_interval=None,
   start_date=datetime(2021, 1, 1),
   catchup=False,
   tags=['coiled-demo'],
   )
def airflow_on_coiled():

Launch your Dask Cluster

Let’s define our first Task .

This spins up a Coiled cluster named “airflow-task” consisting of 20 Dask workers, each running a specified Coiled software environment to ensure that they have all the right dependencies.

   # define Airflow task that runs a computation on a Coiled cluster
   @task()
   def transform():
       # Create and connect to Coiled cluster using  software environment
       cluster = coiled.Cluster(
           name="airflow-task",
    n_workers=20,
           software="rrpelgrim/airflow",
       )
       client = Client(cluster)

We can then read our dataset stored in an Amazon S3 bucket into a Dask DataFrame and calculate the result we’re interested in. Here, we load in the Github Archive data for 2015 (subsetted to include only PushEvents) and calculate the number of PushEvents per user for the entire year using a call to .value_counts(). Dask also supports reading from Google Cloud, Microsoft Azure, and other cloud platforms.

  # Read CSV data from S3
       ddf = dd.read_parquet(
           's3://coiled-datasets/github-archive/github-archive-2015.parq/',
           storage_options={"anon": True, 'use_ssl': True},
           blocksize="16 MiB",
       )
 
       # Compute result number of entries (PushEvents) per user
       result = ddf.user.value_counts().compute()

Since we now have our result locally, we can shut the cluster down to limit our costs. This is really just a formality because Coiled will shut down your cluster automatically after 20 minutes of inactivity.

     # Shutdown Coiled cluster
       cluster.close()
       return result

Going over to the Coiled Cloud dashboard we can see that the computation performed during this Task cost us 5 cents. And no, that’s not a typo 😉 That means you could execute this DAG run up to 200 times a month for free using the Coiled free tier.

Try Coiled

Use Result Locally

We’ve leveraged cloud resources to get the data we’re interested in and now we can proceed with our following Tasks locally. Because Coiled runs locally on your own machine, reading and writing to local disk is straightforward.

The next Task will generate summary statistics over the result pandas Series and save those to a CSV file:

   @task()
   def summarize(series):
       # Get summary statistics
       sum_stats = series.describe()
       # Save to CSV
       sum_stats.to_csv(f'{storage_directory}usercounts_summary_statistics.csv')
       return sum_stats

And the final Task will fetch the usernames and number of PushEvents for the top 100 most active users:

   @task()
   def get_top_users(series):
       # Get top 100 most active users
       top_100 = series.head(100)
       # Store user + number of events to CSV
       top_100.to_csv(f'{storage_directory}top_100_users.csv')
       return top_100

Finally, we’ll specify the order in which we want each Airflow Task to run and trigger the DAG run workflow by calling the DAG function airflow_on_coiled.

   # Call task functions in order
   series = transform()
   sum_stats = summarize(series)
   top_100 = get_top_users(series)
 
# Call taskflow
airflow_on_coiled()

You’re all set! You can now add your DAG file to your dags folder (by default: ~/airflow/dags ) and execute each DAG run as needed using the Airflow UI. Read the Airflow documentation to see how you can set a schedule interval, tweak airflow scheduler settings and customise each DAG run.

Important: Airflow disables pickling by default. You will have to enable it in order to run Dask tasks. You can do this by editing your airflow.cfg file or by setting the corresponding environment variable using export AIRFLOW__CORE__ENABLE_XCOM_PICKLING = True. Do this before launching your Airflow webserver.

If you’re working on an Apple M1 machine you may want to check out this blog on installing PyData libraries using conda. Specifically, make sure that neither blosc nor python-blosc libraries are installed on your local and cluster software environments.

More Airflow DAG Examples

In our dedicated airflow-with-coiled repository, you will find two more Airflow DAG examples using Dask.. The examples include common ETL pipeline operations. 

Note that:

  • The JSON-to-Parquet conversion DAG example requires you to connect Airflow to Amazon S3. You can find the instructions for doing so in the Airflow docs here. You will also need to pass your AWS secrets to the to_parquet() call using the storage_options keyword argument.
  • The XGBoost DAG example works with only a ~250MB subset of the >20GB ARCOS dataset. To run it on all of the ARCOS data, check out this tutorial.

Run all Airflow Tasks with the DaskExecutor

The Airflow DAG examples in the repo above launch Coiled clusters from within an Airflow task. You can also opt for a different architecture and run all of the tasks in an Airflow DAG on a Coiled cluster. You can then use Coiled’s adaptive scaling capabilities to scale the number of workers up and down depending on the workload.

To do this, switch from using Airflow’s default SequentialExecutor to the DaskExecutor. Using any Airflow executor other than the default SequentialExecutor also requires setting up a dedicated database backend where Airflow can store the metadata related to your workflow. Once that’s done, point the DaskExecutor to a Coiled cluster that is already running.

You can do this by making the following changes in your airflow.cfg file, by default stored in ~/airflow/.

  1. Set executor = DaskExecutor
  2. Set cluster_address = <cluster_IP_address/cluster_port>. You can access this address using cluster.scheduler_address
  3. Set the cluster’s TLS settings: tls_cert, tls_key, and tls_ca.  You can access these using client.security.tls_key and client.security.tls_cert. Note that the value for tls_ca is the same as tls_cert.

You can then run the entire workflow on Coiled.

Including a cluster.adapt(minimum=1, maximum=20) in the script that spins up your Coiled cluster will ensure that the cluster adaptively scales between a set minimum and maximum number of workers (in this case between 1 and 20) depending on the workload. 

Note that this architecture means that your cluster is managed outside of your Airflow DAG data pipeline; i.e. either manually or through a deployment tool. We are currently exploring upgrading the DaskExecutor to improve performance and expand its capabilities, including cluster lifecycle management. 

Thanks for reading! And if you’re interested in trying out Coiled, which provides hosted Dask clusters, docker-less managed software, and one-click deployments, you can do so for free today when you click below.

Try Coiled


Ready to get started?

Create your first cluster in minutes.