Scale your Prefect + Dask Workflows to the Cloud

Richard Pelgrim September 22, 2021

, , , ,

Prefect is a popular open-source Python library for automated workflow orchestration. When running Prefect out-of-the-box orchestration of your workflow is done in the cloud, while the actual computation of your code is done locally on your machine. This means that your Prefect Cloud workflows are limited by the resources of your machine.

This blog post will show you:

  1. How to leverage parallelism in your local Prefect workflow by using the DaskExecutor
  2. When and how to connect the DaskExecutor to a cloud-computing cluster to transcend the limitations of your local machine.
  3. How to build a workflow that delegates computations to a cluster in the cloud only when necessary, using the Prefect ResourceManager.

You can also jump straight into the code with this Prefect Dask Python script.

Speed up your work with parallel computing

Prefect is an open-source Python library for automated data workflow orchestration that provides native Dask integration. A Prefect workflow consists of Flow and Tasks, which are orchestrated by an Executor. Prefect’s default Executor runs tasks sequentially. This is fine for simple workflows but means your computations may be running slower than they need to because they are underutilising available resources.

As a data scientist or engineer, you’ll want consider optimising performance by switching to the DaskExecutor. This will leverage the multiple cores of your local machine, speeding up compute-heavy tasks like joins, shuffles and machine learning jobs.

# create a temporary local Dask Executor
executor = DaskExecutor()

When you hit the memory limits of your machine, you can connect the DaskExecutor to cloud-computing resources to distribute work across multiple machines. One way to do so is by spinning up a Coiled Cluster and running your Prefect Flows there.

There are three different approaches you can take to set this up. You can:

– run your entire Prefect data workflow on Coiled by using a Coiled cluster as your DaskExecutor

– run only specific Prefect tasks on Coiled by spinning up a Coiled cluster within specific compute-heavy tasks,

– run specific Prefect tasks on Coiled only when the size of the dataset passes a certain threshold, by using a Coiled cluster within a Prefect ResourceManager object.

The links to the Coiled documentation above include examples of the first two methods; the section below will show you how to code a Prefect script that delegates computations to a Dask cluster when the size of your dataset passes a certain threshold.

Automated Prefect ETL Pipelines with Dask

The code example below converts a Github Archive dataset from JSON to Parquet and writes it to cloud storage, leveraging the compute resources of Coiled to do so whenever the dataset becomes too large to handle locally. This means you can convert the entire 75GB dataset on the cloud without ever having to download it to your machine. You can find the complete Python script here.

The ETL workflow will look something like this:

data workflow

Define your Prefect Task Runs

Let’s begin by defining the tasks we want to run. If you’re comfortable defining a Prefect Task, feel free to scroll down to the next section to connect your Prefect workflow to Coiled.

Let’s define the tasks in the order they will be run. We’ll start with the Prefect Task that will create the list of filenames we want to fetch from the Github Archive. For more context on how we’re building this code, have a look at this Jupyter Notebook.

@task(max_retries=3, retry_delay=datetime.timedelta(seconds=5))
def create_list(start_date, end_date, format="%d-%m-%Y"):
   start = datetime.datetime.strptime(start_date, format)
   end = datetime.datetime.strptime(end_date, format)
   date_generated = [start + datetime.timedelta(days=x) for x in range(0, (end-start).days)]
   prefix = "https://data.gharchive.org/"
   filenames = []
   for date in date_generated:
       for hour in range(1,24):
           filenames.append(prefix + date.strftime("%Y-%m-%d") + '-' + str(hour) + '.json.gz')
   return filenames

Next, let’s define the Task that will determine the type of cluster our Flow will spin up. We’ll use len(filenames) as a proxy for the dataset size, you could also think of other methods to estimate the size of the data in memory.

@task
def determine_cluster_type(filenames):
   if len(filenames) > 100:
       return "coiled"
   return "local"

We’ll also need a Task that will fetch the data as specified in the filenames list…

@task(max_retries=3, retry_delay=datetime.timedelta(seconds=5))
def get_github_data(filenames):
   records = db.read_text(filenames).map(ujson.loads)
   push_events = records.filter(lambda record: record["type"] == "PushEvent")
   return push_events

…a Task to transform the raw JSON data into tabular DataFrame format…

@task(max_retries=3, retry_delay=datetime.timedelta(seconds=5))
def to_dataframe(push_events):
   def process(record):
           try:
               for commit in record["payload"]["commits"]:
                   yield {
                       "user": record["actor"]["login"],
                       "repo": record["repo"]["name"],
                       "created_at": record["created_at"],
                       "message": commit["message"],
                       "author": commit["author"]["name"],
                   }
           except KeyError:
               pass
      
   processed = push_events.map(process)
   df = processed.flatten().to_dataframe()
   return df

…and a Task to write the flattened DataFrame to our Amazon S3 bucket as a Parquet file.

@task(max_retries=3, retry_delay=datetime.timedelta(seconds=5))
def to_parquet(df, path):
      df.to_parquet(
       path,
       engine='fastparquet',
       compression='lz4'
   )

Note: when running Dask commands within Prefect Tasks, you’ll need to ensure that you have fewer active Prefect tasks than n_workers * n_threads, or that you wrap your compute calls within a with worker_client()context manager. This avoids Prefect tasks waiting on work that can’t start since there are no open threads to run in.

Dask Cluster Setup with Prefect Resource Manager

Fantastic, we’ve defined all of the Tasks in our Flow. The next step is to define the two cluster types our Flow can use: local for when the dataset is small enough to process locally and coiled otherwise.  

When using temporary cloud compute resources, you need to make sure that these are properly instantiated, used and cleaned up in order to avoid errors and unnecessary costs. We’ll use the Prefect ResourceManager object to do this by defining the required __init__, setup, and close blocks. Make sure to include any keyword arguments you’ll want to pass to the cluster in the __init__ definition. This block creates the Dask cluster, including the Dask scheduler, and connects it your local client.

# Define a ResourceManager object
@resource_manager
class DaskCluster:
   def __init__(self, cluster_type="local", n_workers=None, software=None, account=None, name=None):
       self.cluster_type = cluster_type
       self.n_workers = n_workers
       self.software = software
       self.account = account
       self.name = name

   def setup(self):
       if self.cluster_type == "local":
           return Client(processes=False)
       elif self.cluster_type == "coiled":
           cluster = coiled.Cluster(
               name = self.name,
               software = self.software,
               n_workers = self.n_workers,
               account = self.account,
           )
           return Client(cluster)
  
   def cleanup(self, client):
       client.close()
       if self.cluster_type == "coiled":
           client.cluster.close()

Construct your Prefect Dask Data Engineering Flow

Now that you’ve defined your Tasks and ResourceManager, the next step is to tell Prefect how these tasks relate to each other and how you’ll need them to be run. We’ll also define a number of Parameters that can be tweaked per Flow run by the user.

# Build Prefect Flow
with Flow(name="Github ETL Test") as flow:
   # define parameters
   n_workers = Parameter("n_workers", default=4)
   software = Parameter("software", default='coiled-examples/prefect')
   account = Parameter("account", default=None)
   name = Parameter("name", default='cluster-name')
   start_date = Parameter("start_date", default="01-01-2015")
   end_date = Parameter("end_date", default="31-12-2015")

   # build flow
   filenames = create_list(start_date=start_date, end_date=end_date)
   cluster_type = determine_cluster_type(filenames)

   
   # use ResourceManager object
   with DaskCluster(
           cluster_type=cluster_type,
           n_workers=n_workers,
           software=software,
           account=account,
           name=name
           ) as client:
       push_events = get_github_data(filenames)
       df = to_dataframe(push_events)
       to_parquet(df)

Great job! You’re now ready to run your Prefect Flow. Go ahead and experiment with various values for the end_date parameter to see the conditional Coiled cluster spin-up in action: setting end_date to anything after “06-01-2015” will delegate your computations to a cluster.  

You can also customize your Coiled cluster by passing alternative Parameter values to flow.run(), such as n_workers and cluster name. If you want to tweak other features of your Dask cluster such as Dask worker and Dask scheduler memory or the idle timeout, you’ll have to include the corresponding keyword arguments in the __init__ block of DaskCluster.

# Run flow with parameters
flow.run(
   parameters=dict(
       end_date="02-01-2015",
       n_workers=15,
       name="prefect-on-coiled")
)

Security with Prefect and Coiled

Prefect Cloud is built as a hybrid model in which only the metadata about your data workflow is communicated to Prefect. The actual code you’re running never leaves your machine. To match these security standards, Coiled provides end-to-end network security by the use of both cloud networking policies and SSL/TLS encryption. For additional control, Coiled can be deployed within your own cloud account where you can specify and manage data access controls directly. You can refer to our documentation on Security & Privacy for more information. 

Prefect Dask Workflow Orchestration Summary 

In this post, we discussed when and how to use Dask and Coiled to leverage parallelism in your Prefect workflows, either locally or in the cloud. We then built a Prefect data engineering workflow that delegates computation to a Coiled cluster whenever the size of the dataset exceeds a certain threshold.

Main takeaways:

  • You can use Dask and Coiled to extend your Prefect data workflows to larger-than-memory datasets
  • You can write your code so the Dask cluster is only spun up when they’re really necessary
  • Coiled provides tools to meet strict security requirements

We’d love to see any Prefect Dask workflows you’ve built! Drop us a line at support@coiled.io or in our Coiled Community Slack channel to get in touch.

Thanks for reading! If you’re interested in taking Coiled Cloud for a spin, which provides hosted Dask clusters, docker-less managed software, and one-click deployments, you can do so for free today when you click below.

Try Coiled Cloud


Ready to get started?

Create your first cluster in minutes.