3 Airflow DAG Examples with Dask

Richard Pelgrim January 7, 2022

,


tl;dr

You can run specific Airflow tasks on a Dask cluster using Coiled to leverage additional computing resources in the cloud whenever you need them. This post gives you 3 Airflow DAG examples to get you started. We will walk you through 1 of them in detail.

Jump straight into the code with these Python scripts.

Airflow DAG Examples for (Really!) Big Data

Apache Airflow is one of the most popular tools for orchestrating data engineering, machine learning, and DevOps workflows. Out-of-the-box, Airflow will run your computations locally, which means you can only process datasets that fit within the resources of your machine. 

To use Airflow for computations on larger-than-memory datasets, you can scale out the specific Airflow tasks containing heavy workloads to a Dask cluster using Coiled. And since Coiled clusters are charged by the second, you’ll only be paying for the extra resources when you’re actually using them.

Airflow DAG examples can be hard to find. This blog will show you how to construct your Airflow DAG for larger-than-memory datasets with only minimal changes to your existing Python code. 

We’ll walk through one example in detail, you can find the other Airflow DAG examples in this dedicated repository.

How to Scale Airflow ETL Tasks using Dask

Airflow workflows are defined using Tasks and DAGs and orchestrated by Executors. To delegate heavy workflows to Dask, we’ll spin up a Coiled cluster within a Task that contains heavy computation and bring back the result, in this case a .value_counts() over a column of interest. Since this result will fit into memory easily, we can shut down the cluster immediately to limit cost and continue using the result in our next tasks locally. 

Define your Airflow ETL DAG

The DAG will contain the following 3 Tasks:

  1. Spin up a Coiled cluster, perform heavy computations over the entire dataset, then shut the cluster down;
  2. Use result to calculate summary statistics and save these to a CSV file;
  3. Use result to find the top 100 most active Github users and save these to a CSV file.

Let’s start by defining an Airflow DAG using the @dag decorator, passing it the default_args defined earlier in the script as well as a number of other arguments you can tweak.

# define DAG as a function with the @dag decorator
@dag(
   default_args=default_args,
   schedule_interval=None,
   start_date=datetime(2021, 1, 1),
   catchup=False,
   tags=['coiled-demo'],
   )
def airflow_on_coiled():

Launch your Dask Cluster

Let’s define our first Task

This spins up a Coiled cluster named “airflow-task” consisting of 20 Dask workers, each running a specified Coiled software environment to ensure that they have all the right dependencies. 

   # define Airflow task that runs a computation on a Coiled cluster
   @task()
   def transform():
       # Create and connect to Coiled cluster using  software environment
       cluster = coiled.Cluster(
           name="airflow-task",
    n_workers=20,
           software="rrpelgrim/airflow",
       )
       client = Client(cluster)

We can then read our dataset stored on S3 into a Dask DataFrame and calculate the result we’re interested in. Here, we load in the Github Archive data for 2015 (subsetted to include only PushEvents) and calculate the number of PushEvents per user for the entire year using a call to .value_counts().

  # Read CSV data from S3
       ddf = dd.read_parquet(
           's3://coiled-datasets/github-archive/github-archive-2015.parq/',
           storage_options={"anon": True, 'use_ssl': True},
           blocksize="16 MiB",
       )
 
       # Compute result number of entries (PushEvents) per user
       result = ddf.user.value_counts().compute()

Since we now have our result locally, we can shut the cluster down to limit our costs. Note that this is really just a formality because Coiled will shut down your cluster automatically after 20 minutes of inactivity.

     # Shutdown Coiled cluster
       cluster.close()
       return result

Going over to the Coiled Cloud dashboard we can see that this computation cost us 5 cents. And no, that’s not a typo 😉 That means you could run this Airflow DAG example up to 200 times a month for free using the Coiled Free Tier.

Use Result Locally

We’ve leveraged cloud resources to get the result we’re interested in and now we can proceed with our following Tasks locally. Because Coiled runs locally on your own machine, reading and writing to local disk is straightforward.

We’ll generate summary statistics over the result pandas Series and save those to a CSV file:

   @task()
   def summarize(series):
       # Get summary statistics
       sum_stats = series.describe()
       # Save to CSV
       sum_stats.to_csv(f'{storage_directory}usercounts_summary_statistics.csv')
       return sum_stats

And get the usernames and number of PushEvents for the top 100 most active users:

   @task()
   def get_top_users(series):
       # Get top 100 most active users
       top_100 = series.head(100)
       # Store user + number of events to CSV
       top_100.to_csv(f'{storage_directory}top_100_users.csv')
       return top_100

Last but not least, we’ll specify the order in which we want the Airflow Tasks to run and actually call the DAG function to trigger the workflow.

   # Call task functions in order
   series = transform()
   sum_stats = summarize(series)
   top_100 = get_top_users(series)
 
# Call taskflow
airflow_on_coiled()

Great job, you’re all set! You can now add your Airflow DAG Python script to your dags folder (by default: ~/airflow/dags) and run or schedule it as needed.

Important: Airflow disables pickling by default. You will have to enable it in order to run Dask tasks. You can do this by editing your airflow.cfg file or by setting the corresponding environment variable using export AIRFLOW__CORE__ENABLE_XCOM_PICKLING = True. Do this before launching your Airflow webserver.

If you’re working on an Apple M1 machine you may want to check out this blog on installing PyData libraries using conda. Specifically, make sure that neither blosc nor python-blosc libraries are installed on your local and cluster software environments.

More Airflow DAG Examples

In our dedicated airflow-with-coiled repository, you will find two more Airflow DAG examples using Dask.. The examples include common Airflow ETL operations. 

Note that:

  • The JSON-to-Parquet conversion DAG example requires you to connect Airflow to Amazon S3. You can find the instructions for doing so in the Airflow docs here. You will also need to pass your AWS secrets to the to_parquet() call using the storage_options keyword argument.
  • The XGBoost DAG example works with only a ~250MB subset of the >20GB ARCOS dataset. To run it on the entire dataset, check out this tutorial.

Run all Airflow ETL Tasks with the DaskExecutor

The Airflow DAG examples in the repo above launch Coiled clusters from within an Airflow task. You can also opt for a different architecture and run all of the tasks in an Airflow DAG on a Coiled cluster. You can then use Coiled’s adaptive scaling capabilities to scale the number of workers up and down depending on the workload.

To do this, switch from using Airflow’s default SequentialExecutor to the DaskExecutor. Using any Airflow executor other than the default SequentialExecutor also requires setting up a dedicated database backend where Airflow can store the metadata related to your workflow. Once that’s done, point the DaskExecutor to a Coiled cluster that is already running.

You can do this by making the following changes in your airflow.cfg file, by default stored in ~/airflow/.

  1. Set executor = DaskExecutor
  2. Set cluster_address = <cluster_IP_address/cluster_port>. You can access this address using cluster.scheduler_address
  3. Set the cluster’s TLS settings: tls_cert, tls_key, and tls_ca.  You can access these using client.security.tls_key and client.security.tls_cert. Note that the value for tls_ca is the same as tls_cert.

You can then run the entire Airflow DAG on Coiled.

Including a cluster.adapt(minimum=1, maximum=20) in the script that spins up your Coiled cluster will ensure that the cluster adaptively scales between a set minimum and maximum number of workers (in this case between 1 and 20) depending on the workload. 

Note that this architecture means that your cluster is managed outside of your Airflow DAG; i.e. either manually or through a deployment tool. We are currently exploring upgrading the DaskExecutor to improve performance and expand its capabilities, including cluster lifecycle management. 

Get in touch

We’d love to see any Airflow DAGs you build that use Coiled, please feel free to share them in the Coiled Community Slack. And if you’re interested in a new CoiledExecutor for Airflow, reach out to us via support@coiled.io

Thanks for reading! And if you’re interested in trying out Coiled Cloud, which provides hosted Dask clusters, docker-less managed software, and one-click deployments, you can do so for free today when you click below.

Try Coiled Cloud


Ready to get started?

Create your first cluster in minutes.