Big Data Workflow Automation with Prefect and Coiled

Richard Pelgrim September 22, 2021

, , , ,


tl;dr

When running Prefect out-of-the-box, orchestration of your workflow is done in the cloud, while the actual computation of your code is done locally on your machine. This means that your Prefect workflows are limited by the resources of your machine. You can transcend these local machine limitations by delegating compute-intensive workloads to a Dask cluster on Coiled.

This blog post will show you how to:

  1. Leverage parallelism in your local Prefect workflow by using the DaskExecutor
  2. Connect the DaskExecutor to a Coiled cluster to transcend the limitations of your local machine.
  3. Build a workflow that delegates computations to a Coiled cluster only when necessary, using the Prefect ResourceManager.

You can also jump straight into the code with this Prefect-on-Coiled Python script.

Leveraging Parallel Computing with Dask

Prefect is an open-source Python library for automated workflow orchestration that provides native Dask integration. Prefect workflow consists of Flow and Tasks, which are orchestrated by an Executor. Prefect’s default Executor runs tasks sequentially. This is fine for simple workflows but means your flows may be taking longer than necessary by underutilizing available computing power. You can take advantage of parallelism by switching to the DaskExecutor, which will leverage the multiple cores of your local machine. 

# create a temporary local Dask Executor
executor = DaskExecutor()

When you hit the memory limits of your machine, you can connect the DaskExecutor to cloud-computing resources. One way to do so is by spinning up a Coiled Cluster and running your Prefect Flow there, by either:

– running your entire Prefect workflow on Coiled by using a Coiled cluster as your DaskExecutor

– running specific Prefect tasks on Coiled by spinning up a Coiled cluster within specific compute-heavy tasks,

– running specific Prefect tasks on Coiled only when the size of the dataset passes a certain threshold, by using a Coiled cluster within a Prefect ResourceManager object

The links to the Coiled documentation above include examples of the first two methods; the section below will show you how to code a Prefect workflow that delegates computations to a Coiled cluster when the size of your dataset passes a certain threshold.

Automated Data Engineering Pipeline using Prefect on Coiled

The code example below converts a Github Archive dataset from JSON to Parquet and writes it to cloud storage, leveraging the compute resources of Coiled to do so whenever the dataset becomes too large to handle locally. This means you can convert the entire 75GB dataset on the cloud without ever having to download it to your machine. You can find the complete Python script here.

The ETL workflow will look something like this:

Define your Data Engineering Tasks in Prefect

Let’s begin by defining the separate tasks that our Prefect Flow will orchestrate. If you’re comfortable defining Prefect Tasks, feel free to scroll down to the next section to connect your Prefect workflow to Coiled.

Let’s define the tasks in the order they will be run. We’ll start with the Task that will create the list of filenames we want to fetch from the Github Archive. For more context on how we’re building this code, have a look at this Jupyter Notebook.

@task(max_retries=3, retry_delay=datetime.timedelta(seconds=5))
def create_list(start_date, end_date, format="%d-%m-%Y"):
   start = datetime.datetime.strptime(start_date, format)
   end = datetime.datetime.strptime(end_date, format)
   date_generated = [start + datetime.timedelta(days=x) for x in range(0, (end-start).days)]
   prefix = "https://data.gharchive.org/"
   filenames = []
   for date in date_generated:
       for hour in range(1,24):
           filenames.append(prefix + date.strftime("%Y-%m-%d") + '-' + str(hour) + '.json.gz')
   return filenames

Next, let’s define the Task that will determine the type of cluster our Flow will spin up. We’ll use len(filenames) as a proxy for the dataset size, you could also think of other methods to estimate the size of the data in memory.

@task
def determine_cluster_type(filenames):
   if len(filenames) > 100:
       return "coiled"
   return "local"

We’ll also need a Task that will fetch the data as specified in the filenames list…

@task(max_retries=3, retry_delay=datetime.timedelta(seconds=5))
def get_github_data(filenames):
   records = db.read_text(filenames).map(ujson.loads)
   push_events = records.filter(lambda record: record["type"] == "PushEvent")
   return push_events

…a Task to transform the raw JSON data into tabular DataFrame format…

@task(max_retries=3, retry_delay=datetime.timedelta(seconds=5))
def to_dataframe(push_events):
   def process(record):
           try:
               for commit in record["payload"]["commits"]:
                   yield {
                       "user": record["actor"]["login"],
                       "repo": record["repo"]["name"],
                       "created_at": record["created_at"],
                       "message": commit["message"],
                       "author": commit["author"]["name"],
                   }
           except KeyError:
               pass
      
   processed = push_events.map(process)
   df = processed.flatten().to_dataframe()
   return df

…and a Task to write the flattened DataFrame to our S3 bucket as a Parquet file.

@task(max_retries=3, retry_delay=datetime.timedelta(seconds=5))
def to_parquet(df, path):
      df.to_parquet(
       path,
       engine='fastparquet',
       compression='lz4'
   )

Note: When running Dask commands within Prefect Tasks, you’ll need to ensure that you have fewer active Prefect tasks than n_workers * n_threads, or that you wrap your compute calls within a with worker_client()context manager. This avoids Prefect tasks waiting on work that can’t start since there are no open threads to run in.

Cluster Setup with Prefect Resource Manager

Fantastic, we’ve defined all of the Tasks in our Flow. The next step is to define the two cluster types our Flow can use: local for when the dataset is small enough to process locally and coiled otherwise.  

When using temporary cloud compute resources, you will want to ensure that these are properly set-up, used and cleaned up in order to avoid potential errors and unnecessary costs. We’ll use the Prefect ResourceManager object to do this by defining the required __init__, setup, and close blocks. Make sure to include any keyword arguments you’ll want to pass to the cluster in the __init__ definition.

# Define a ResourceManager object
@resource_manager
class DaskCluster:
   def __init__(self, cluster_type="local", n_workers=None, software=None, account=None, name=None):
       self.cluster_type = cluster_type
       self.n_workers = n_workers
       self.software = software
       self.account = account
       self.name = name

   def setup(self):
       if self.cluster_type == "local":
           return Client(processes=False)
       elif self.cluster_type == "coiled":
           cluster = coiled.Cluster(
               name = self.name,
               software = self.software,
               n_workers = self.n_workers,
               account = self.account,
           )
           return Client(cluster)
  
   def cleanup(self, client):
       client.close()
       if self.cluster_type == "coiled":
           client.cluster.close()

Construct your Data Engineering Flow in Prefect

Now that you’ve defined your Tasks and ResourceManager, the next step is to tell Prefect how these tasks relate to each other and how you’ll need them to be run. We’ll also define a number of Parameters that can be tweaked per Flow run by the user.

# Build Prefect Flow
with Flow(name="Github ETL Test") as flow:
   # define parameters
   n_workers = Parameter("n_workers", default=4)
   software = Parameter("software", default='coiled-examples/prefect')
   account = Parameter("account", default=None)
   name = Parameter("name", default='cluster-name')
   start_date = Parameter("start_date", default="01-01-2015")
   end_date = Parameter("end_date", default="31-12-2015")

   # build flow
   filenames = create_list(start_date=start_date, end_date=end_date)
   cluster_type = determine_cluster_type(filenames)

   
   # use ResourceManager object
   with DaskCluster(
           cluster_type=cluster_type,
           n_workers=n_workers,
           software=software,
           account=account,
           name=name
           ) as client:
       push_events = get_github_data(filenames)
       df = to_dataframe(push_events)
       to_parquet(df)

Great job! You’re now ready to run your Prefect Flow. Go ahead and experiment with various values for the end_date parameter to see the conditional Coiled cluster spin-up in action: setting end_date to anything after “06-01-2015” will delegate your computations to a cluster.  

You can also customize your Coiled cluster by passing alternative Parameter values to flow.run(), such as n_workers and cluster name. If you want to tweak other features of your Coiled cluster such as worker and scheduler memory or the idle timeout, you’ll have to include the corresponding keyword arguments in the __init__ block of DaskCluster.

# Run flow with parameters
flow.run(
   parameters=dict(
       end_date="02-01-2015",
       n_workers=15,
       name="prefect-on-coiled")
)

Security with Prefect and Coiled

Prefect Cloud is built as a hybrid model in which only the metadata about your workflow is communicated to Prefect. The actual code you’re running never leaves your machine. To match these security standards, Coiled provides end-to-end network security by the use of both cloud networking policies and SSL/TLS encryption. For additional control, Coiled can be deployed within your own cloud account where you can specify and manage data access controls directly. You can refer to our documentation on Security & Privacy for more information. 

Large Dataset Workflow Orchestration Summary 

In this post, we discussed when and how to use Dask and Coiled to leverage parallelism in your Prefect workflows, either locally or in the cloud. We then built a Prefect data engineering workflow that delegates computation to a Coiled cluster whenever the size of the dataset exceeds a certain threshold.

Main takeaways:

  • You can use Coiled to extend your Prefect workflows to larger-than-memory datasets
  • You can write your code so Coiled clusters are only spun up when they’re really necessary
  • Coiled provides tools to meet strict security requirements

We’d love to see what you build with Prefect and Coiled! Drop us a line at support@coiled.io or in our Coiled Community Slack channel to get in touch.

Thanks for reading! If you’re interested in taking Coiled Cloud for a spin, which provides hosted Dask clusters, docker-less managed software, and one-click deployments, you can do so for free today when you click below.

Try Coiled Cloud