TL;DR: Dask’s distributed scheduler implements work-stealing — taking pending tasks away from their assigned worker and giving them to a different worker. Work stealing improves performance by providing a counterbalance to the default data-locality-focused scheduling heuristic. The combination of heuristics helps maximize cluster utilization and overall throughput.
What is Work Stealing and Why is it Valuable?
Every distributed scheduler faces a common challenge: how should tasks be assigned to the various workers in the system?
Scheduling is a complex problem and the subject of ongoing research — it is difficult or impossible to specify a single scheduling algorithm that is optimal for all systems and workloads.
However, one pragmatic way to approach this problem is to create a flexible, approximate heuristic for initial assignment of tasks — and then revisit those decisions later if it turns out that improvements can easily be made.
Work stealing is is one such mechanism for “revisiting” scheduling decisions: work stealing amounts to
- locating tasks that have been previously assigned to a specific worker,
- discovering that — given the totality of information currently available — it might be more efficient to run those tasks elsewhere
- “stealing” that pending work from its initially assigned worker
- assigning it to a different worker
In other words, if we look at initial scheduling decisions as an approximate solution, we can look at work stealing as one (of various) possible “course corrections” to improve the overall approximation.
Work stealing represents an incremental improvement over the initial scheduling heuristic: it has access to new information (data sizes, task timing, realtime resource availability) that was unavailable at the time of the first scheduling.
Practically speaking, work stealing can improve two key metrics we care about in our distributed computations:
- overall system utilization (“getting the most out of what we’re paying for”)
- and lowering end-to-end (“wall clock”) processing time for large, complex operations
How Does Dask Distributed Work Stealing Operate?
At a high level, the Dask’s distributed scheduler optimizes task placement — in the initial worker assignment — to minimize expensive data transfer. This is sometimes described as optimization for data locality. Even more explicitly: we assume it’s cheaper to send code to where data is, than to move the data somewhere else for computation.
A series of computations which rely on a single chunk of data, located on a single worker: if we run these operations at a slow pace, one at a time, all of the tasks land on the worker with the data.
This initial heuristic is described here and generally works great as a first approximation.
But consider a series of tasks that all depend on the same chunk of data. Optimization for data locality will assign all of those tasks to the same worker. If there are equal sets of tasks, dependent on similar chunks of data, assigned to all of the workers, then things are working great.
However, it is often the case that workloads are asymmetric: there are not equal amounts of work, dependent on equal pieces of data, for all workers. (In traditional data engineering, such workloads are said to exhibit skew.) This situation can lead to some workers bogged down with tons of assigned tasks, while other workers have little or nothing to do.
Clearly that’s not a great situation, and this is where work stealing comes to the rescue.
Dask’s distributed scheduler (discussed in the recent Coiled Blog on Map Joins) maintains statistics on all work, allowing reasonable approximations to be calculated for compute time and data movement costs. By comparing the expected compute time to the expected data movement, Dask gets a cost/benefit score for tasks and tracks them in a loosely ordered collection for easy retrieval. Then, any time tasks are being submitted to workers, the scheduler also examines that collection to check the cost/benefit of stealing tasks from one worker and assigning them to others.
Specifically, if a worker is saturated, Dask checks whether there are pending tasks that would complete earlier if assigned somewhere else. Stealing is especially appealing when some workers are idle and others are heavily burdened.
By sending all of the tasks to the scheduler at around the same time, work stealing is triggered: the second worker receives the data and then computes on it in parallel with the first worker. This behavior, at the right-hand side of the Task Stream, is more easily visible in a zoomed detail:
Assigning a task to an alternate worker triggers a secondary benefit: since computing that task will require sending data dependencies to the new worker, we immediately double (or potentially multiply) the number of workers that are subsequently “ready to go” from a data locality perspective.
In this latter sense, work stealing is more than a one-off improvement, but rather has cascading effects helping the cluster dynamically adapt to its current workload. This sort of ad hoc, flexible adaptation is a key design theme in Dask, as it allows Dask to handle many different kinds of workloads well.
How Do I Use Work Stealing with Dask?
When using Dask’s distributed scheduler (and you should be — even if you’re running on a single machine!), work-stealing is enabled for you by default. You’re already getting the benefit.
If you need to turn off work-stealing, you can do so by setting the scheduler config “work-stealing” to False.
Get The Most Out of Your Data Science and ML
Thanks for reading! As always, feel free to let us know what you think about this post. Plus, we’re always looking for more feedback on Coiled Cloud, which provides hosted Dask clusters, docker-less managed software, and zero-click deployments. You can try it out for free by clicking below.