Dask Under the Hood: Scheduler Refactor

Adam Breindel April 12, 2021

, , , ,


TL;DR: In recent months, the Dask team has started refactoring task graph processing and the scheduler in order to handle large, complex workloads better and to provide a better interactive user experience. A key piece of the refactor involves moving parts of the graph processing from the Client to the Scheduler. You won’t need to change your code — but understanding task graphs and the refactor can improve your understanding of Dask for many applications.

It Already Looks Good! Why Refactor? 

Indeed, if you’ve never noticed any issues with Dask’s scheduling, you’re not alone and you don’t need to worry. However, some users can see significant delays when interactively working with Dask code (e.g., in a Jupyter notebook). Moreover, these delays happen at coding time rather than at execution time, which can seem mysterious. 

As a reminder, Dask-aware objects (like Dask Array, Dask Dataframe, or Dask Delayed) lazily execute operations: most operations with these objects don’t run the associated computation right away like “regular” Python would, but instead wait until .compute() is called on the relevant object.

Since (almost) no computation happens right away, it is reasonable to expect initial interactive coding to be instantaneous and lightweight. So it can come as a surprise that sometimes just “preparing” the computation can bog down an interactive coding session. That’s the phenomenon we want to explain and fix here.

Brief Anatomy of Dask Task Graphs

A task graph is just a set of tasks linked together where directed links represent dependencies. In the Dask world, there are 2 kinds of graphs to start: high-level graphs and low-level graphs. Moreover, both flavors can be optimized, so the full picture involves a bit more detail.

Fundamentally, what are these graphs?

The high-level graph represents the operations specified by user code for Dask-aware objects. Both of the following code snippets yield simple high-level graphs:

import dask.array as da

a = da.ones((10,10), chunks=(5,5))
b = da.zeros((10,10), chunks=(10,10))
c = a + b 
import dask 

zero = dask.delayed(lambda: 0)
one = dask.delayed(lambda: 1)
c = zero + one

In both cases, there is an “add” task that represents the “+” operation and it depends on tasks to generate an array of ones and zeros (in the first example) or a single one and a single zero (admittedly a silly use of a task, but coded to show the graph similarity) in the second example. 

The graph looks like this:

Warning: if you’re familiar with the visualize helper, you may be tempted to run that to generate an image… Don’t run it just yet! Visualize renders a lower-level version of the graph, so the picture you see won’t match my description here.

What makes that a high-level graph? What is a low-level graph?

That graph is “high level” because it comes from user-supplied method calls and operators and, in many cases, it is a bit too abstract for direct Dask scheduling. Recall that Dask tasks, when actually scheduled on workers, are just regular Python functions operating over regular Python objects. 

In the first example above, that Dask Array is not a “regular” Python object: it really represents a collection of NumPy Arrays, and translation from the Dask abstraction to the concrete NumPy operations needs to happen prior to task scheduling and execution.

A low-level graph is a graph of operations that are “ready to schedule/execute” — that is, they represent regular Python functions on regular Python objects.

Sometimes converting high to low is expensive, other times it’s not?!

In the first example, the Dask Array a has shape (10, 10) with shape (5,5) chunks. That means that underneath are really 4 NumPy arrays, so a simple operation on a will typically require 4 low-level tasks (one for each block or chunk). The low-level graph, which you can see by trying .visualize() on c makes this explicit.

On the other hand, look at the second example above, the one with dask.delayed. In that example, there are no partitions or chunks, or blocks hidden underneath our Python 1 and 0. Adding 1 and 0 is just … well … adding 1 and 0. So, in this case, the low-level graph is more or less the same thing as the high-level graph. It doesn’t “expand” into a lot more low-level tasks.

Now, let’s return to our first Dask Array example and imagine that it didn’t involve 4 blocks, but instead required millions of blocks. It’s easy to try: for an 8-million chunk example, run da.ones((1000, 1000, 1000), chunks=(5, 5, 5)) in an interactive environment. With ~8 million tasks, don’t try to visualize this one!

You’ll notice that the environment hangs for a noticeable period. That’s because Dask is decomposing your initial, tiny high-level graph (“make a bunch of ones”) into a lot of low-level tasks (8 million instances of “make a (5,5,5) block of ones”).

And, since large-scale analytics and data science frequently involve huge collections of chunks (or, in the case of Dataframe, partitions), along with more complex logic and dependencies than we’re looking at here… you can see that a problem arises. The goal of the scheduler refactor is to fix this problem!

Optimization

Before moving on to talk about where the graph is transformed, let’s complete the picture by offering some examples of optimizations that are possible today or in the future for Dask graphs.

Both high-level graphs and low-level graphs can be optimized in different situations, and we’re not going to get into all of the details here. But, to help your intuition, consider a couple of examples.

  • High-level graph optimization: imagine a Dask Dataframe derived from two sequential “query” operations to filter records. Usually, those two filters can be combined into a single filter. Think of it like supplying a “query” with AND applied to both conditions. The decomposition into partitions isn’t relevant for this optimization so we can adjust the high-level graph.
  • Low-level graph optimization: once we’ve produced lots of regular Python functions in our low-level graph, sometimes each task is so small that it isn’t efficient to schedule that task on its own. In those cases, Dask employs Task Fusion to wrap a group of tiny functions into a larger function, allowing more computation relative to scheduling overhead.

Where Ideally Does Graph Transformation Belong?

This question is at the crux of the refactor project. Historically, the transformation all the way “down” to an optimized low-level graph, was performed on the client. That is, it takes place in code where the Client instance is running — your local Python process, if you’re running the script, working in a notebook, etc.

Doing the work on the client lowers the workload on the scheduler. But, as we’ve seen, it means potentially painful delays on the client. 

The refactor addresses this tension in two ways:

  1. The graph is sent in the high-level, lighter weight, form to the scheduler, which lowers client workload significantly for large graphs. The harder work now will now land on the scheduler.
  2. Instead of just hiding expensive work where it is “out of sight” on the scheduler, there is another refactor workstream that aims thoroughly to optimize as many slow areas of scheduler code as possible. This work is classic, but hardcore, code optimization which you can learn more about in our video on this topic.

The final result yields better behavior on the client and good scheduler performance as well.

Cool … But Do I Need to Change My Application?

No need to change anything — probably the best answer a data or ML engineer can hear.

At the time of writing, the changes described above are not active, and are opt-in. You can try them out by setting the following config value:

    optimization:
        fuse:
            active: false

However, the plan is to eventually make these changes the default behavior — and to enable a variety of graph optimizations within the scheduler graph processing code.

Get The Most Out of Your Data Science and ML

Thanks for reading! As always, feel free to let us know what you think about this post. Plus, we’re always looking for more feedback on Coiled Cloud, which provides hosted Dask clusters, docker-less managed software, and zero-click deployments. You can try it out for free by clicking below.