How to Check the Progress of Dask Computations

Pavithra Eswaramoorthy September 23, 2021

, ,


Dask allows you to scale your Python computations to multiple machines. This means your workflow is executed in a parallel and distributed setting. If you’ve worked with scalable computing in any capacity before, you know that these parallel computations behave differently from serial computations. So, it can be helpful to see their progress in real-time and visualize what’s happening in your cluster. Dask provides some diagnostic and visualization tools out-of-the-box exactly for this. We’ll learn more about them in this blog post.

Specifically, we will:

  • Learn to check the status of local Dask computations — computations that are executed on your local machine, but in parallel using all cores of the machine;
  • Learn to check the status of distributed Dask computations — computations that use Dask’s distributed scheduler and are executed on a cluster;
  • Explore the different visualization tools provided by Dask, especially tools that display the progress of tasks and help profile your computations.

Fig 1: Dask’s built-in visualizations

Introduction to Dask Computations

Dask is a powerful library for parallel and distributed computing in Python. In this post, we assume you have worked with Dask before. If you’re new to Dask, we suggest going through What is Dask? to get a quick overview. 

Dask helps parallelize your Python code in two main ways: single-machine and multi-machine parallelism.

Single-machine parallelism involves scaling-up to leverage all the resources in your local machine (your laptop or desktop computer). Dask has some local schedulers that are very efficient at this, and Dask’s collections use these local schedulers by default. In the case of local Dask computations, you can use some tools provided by dask.diagnostics to visualize the progress of your computation and profile it.

Multi-machine parallelism involves scaling-out to a cluster of machines locally, on-prem, or on the cloud. This is made possible by Dask’s distributed scheduler. This scheduler has complex optimizations to make distributed computing efficient and includes an elaborate dashboard to help visualize computations.

We’ll look at both of these techniques in the upcoming sections, focusing on visualizing the “progress” of computations. You can follow along with the code in this notebook.

Local Dask Computations

Let’s start with a local computation using Dask DataFrame. All the computations in this section will use Dask’s local threaded scheduler – a compact and lightweight scheduler that Dask DataFrame uses by default. 

First, let’s read some data. In this blog post, we are using the NYC Yellow Taxi Trips dataset.

import dask.dataframe as dd
df = dd.read_csv(
    "s3://nyc-tlc/trip data/yellow_tripdata_2019-01.csv",
    dtype={
        "payment_type": "UInt8",
        "VendorID": "UInt8",
        "passenger_count": "UInt8",
        "RatecodeID": "UInt8",
    },
)

Next, we will perform a computation that calculates the mean ‘tip amount’ as a function of ‘passenger count’ on this dataset. To visualize the status of your computation, you can use some tools provided by dask.diagnostics, namely: Progress Bar, Profiler, ResourceProfiler, and CacheProfiler. Let’s look at these in detail.

Progress Bar

You can use Dask’s built-in Progress Bar (see Fig 2) to visualize the progress on any get() or compute() calls. You can do this two ways:

  • With global registration, or
  • Using it as a context manager.

Global registration allows you to “register” the ProgressBar just once to display the progress for all computations, anywhere in your workflow. You can register the ProgressBar as shown below:

from dask.diagnostics import ProgressBar
pbar = ProgressBar()                
pbar.register() # global registration

Now, on executing any computation, a progress bar will appear immediately in your Jupyter Notebook (or any other IDE):

df.groupby("passenger_count").tip_amount.mean().compute()

Fig 2: ProgressBar() for local Dask Computation

ProgressBar can also be used as a context manager to visualize specific computations. You can do this using the `with` keyword as shown here:

with pbar:
    df.groupby("UsageClass").Checkouts.sum().compute()

Profilers

Dask.diagnostics’ Profiler, ResourceProfiler, and CacheProfiler help profile the computation at the task-level, resource-level, and scheduler-cache-level, respectively. We won’t go into details here, but you can learn more about these profiles in Dask’s local diagnostics documentation. Similar to the ProgressBar, you can use these with global registration or as context managers.

In the following code snippet, we’re using them as context managers:

from dask.diagnostics import Profiler, ResourceProfiler, CacheProfiler

prof = Profiler()
rprof = ResourceProfiler()
cprof = CacheProfiler()

with prof, rprof, cprof:
    df.groupby("passenger_count").tip_amount.mean().compute()

You can now call visualize() to view the plots for each of them. 

Fig 3: Interactive plots generated by Profiler, ResourceProfiler, and CacheProfiler.

Distributed Dask Computation

Dask’s distributed scheduler provides a lot of advanced features for both local and distributed computations. It’s the recommended scheduler for most workflows and includes a complete diagnostic dashboard for visualizing the status of your cluster.

Let’s spin up a Dask cluster:

from dask.distributed import Client
client = Client()

Dask will now use the distributed scheduler for all further computations.

Dask Diagnostic Dashboards

After spinning up a Dask cluster, you can use client.dashboard_link to get a link to your dashboard. If you’re using the distributed scheduler for local computation, the dashboard will be served at localhost:8787.

This dashboard shows the real-time status of your cluster, resources, and computations. You can find the ‘Progress’ visualization in the ‘Status’ tab, which displays the progress of all the tasks in your computation. You can also navigate to different visualizations using the tabs in the top bar, and click on “More” to see the full list of available plots.

Fig 4: Dask’s Diagnostic Dashboard

This dashboard opens in a new window on your browser. But if you use JupyterLab for your work, you can also view these plots directly in your JupyterLab interface. Keep reading!

Dask JuputerLab Extension

The Dask JupyterLab extension brings all the visualizations from the Diagnostic Dashboards to your JupyterLab workspace. We love this extension because it lets us see everything conveniently on one screen (Fig. 5). You can click on the Dask logo in the left panel to see a list of all available visualizations. You can now view the ‘Progress’ visualization (that we also saw in the previous section) by selecting it from this list.

We suggest reading Dask JupyterLab Workflow to make the most out of this extension.

Fig 5: Dask JupyterLab Extension

Progress Function

Since we’re focusing on the progress of computations, the progress function in dask.distributed is also worth mentioning. It provides a quick way to visualize the progress in the Jupyter Notebook itself. You can use it as shown below:

from dask.distributed import progress
result = df.groupby("passenger_count").tip_amount.mean().persist()
progress(result)

Fig 6: Output using the progress function in dask.distributed 

References

The Dask dashboard and visualizations are one of our favorite parts of Dask. We thank the entire Dask community for constantly improving existing plots and adding new ones. To learn more and stay up-to-date, you can refer to the Dask Documentation pages, especially Diagnostics (local) and Diagnostics (distributed).

Thanks for reading! If you’re interested in taking Coiled Cloud for a spin, which provides hosted Dask clusters, docker-less managed software, and one-click deployments, you can do so for free today when you click below.

Try Coiled Cloud