Under the Hood with Dask’s Cluster Autoscaling
• January 21, 2021
tl;dr: Autoscaling can save money and improve performance by “right-sizing” your distributed compute clusters as workloads change. Dask does this by collecting a lot of data about the work you’re running, and scaling to a size that ensures all your pending work gets done quickly (i.e., within a timeframe you can choose).
What is autoscaling?
Autoscaling — called dynamic allocation in some ecosystems such as Apache Spark — is the ability for a distributed compute system to scale up (run on more nodes) and down (run on fewer nodes) over time, with the cluster size adjusted automatically to match the needs of the workload.
Distributed computation always comes with a “tax”: there is additional complexity as compared with local, single-machine computation. Autoscaling provides a “tax break” — a benefit that we get in that distributed world.
Autoscaling is also a selling point for cloud computing:
- Automatic scale up allows the same application code to use enormous amounts of compute resources when necessary to solve hard problems fast or to support unpredictable user growth
- Automatic scale down ensures that costs are minimized and utilization of expensive resources is maximized
Thus autoscaling is a key pillar of the cloud value proposition: while cloud resources such as CPU cycles or storage are almost always more expensive than the equivalent “on-prem” resources, in the cloud you have unlimited scale while paying only for what you use, mitigating any cost differential.
An alternative perspective is that — whether in the cloud or on-prem — you may want each application to make conservative use of resources so that other applications within the same cluster fabric can claim those resources.
Basically, if you’ve sized your workload well and it’s consistent over time, you may never need scaling. But if your workload varies — perhaps across customer volume, regions, time periods, or analytics requests — autoscaling may save you time, money, and headaches.
Dask’s autoscaling support and APIs
Dask’s distributed scheduler — the state of the art scheduler recommended for almost all use cases today, and available under the `distributed` package — includes support for autoscaling, referred to as “adaptive” scaling.
A Dask cluster scaling up from 2 to 16 workers while crunching a dataset, viewed in the Task Stream dashboard.
Which clustering systems support autoscaling?
Dask’s distributed scheduler does the hard work (explained below) of figuring out how many nodes to bring online or to retire, as well as which specific nodes to involve and when to do so. However, at the end of the day, the cluster manager — abstracted with the Dask `Cluster` class — has to be able to acquire and manage more nodes, containers, or processes for the Dask workers.
That means most Cluster implementations — `KubeCluster`, `HelmCluster`, `YarnCluster`, `LocalCluster`, `FargateCluster`, and others — support adaptive scaling. Check your Cluster implementation to make sure it supports the adaptive APIs.
Dask adaptive scaling APIs
For manually scaling (resizing) your Dask cluster, the `Cluster.scale(…)` method adjusts cluster size to whatever number you explicitly pass in. But that makes you do the work when we might really want to lean on Dask’s profiling magic to make that decision for us.
The core API for automatic scaling is `Cluster.adapt(…) ` which allows you to optionally set minimum and maximum values for Dask workers, cores, and memory.
A few other important arguments can be provided — such as how often to check whether the cluster should be resized — and these are documented under the `Adaptive` class. Some of these settings are also accessible via the Dask config settings under the `distributed.adaptive` key.
How Dask’s autoscaler — or cluster adaptive sizing — works
Ok, now that you know how and why to get started with Dask adaptive scaling, let’s take a deeper dive to see how it works under the hood!
In addition to user-supplied constraints (like min and max workers), the key ingredient is the extensive statistics collected by the Dask scheduler’s dynamic statistical profiler. As discussed in our last “Under the Hood” blog covering Map Joins, Dask’s scheduler collects statistics on runtime and resource usage for every single task that runs on your cluster.
These data can be aggregated by task type (often the Python function name, or the “human-readable” initial part of the task key — this is also the label that appears throughout the dashboards) to provide estimates of each kind of operation.
The scheduler also accounts for the resources available (e.g., memory) on the various worker nodes.
At this point, the scheduler can look at all of the pending work (every single task currently queued for execution) and total up an expectation of required time given the current cluster size.
This “expected time” can then be divided by the configurable target duration (defaulting to 5 seconds) to get the proportional scale up (or down) needed to handle the pending work.
Before requesting nodes, a couple of adjustments can be made to that target size. If the workload appears memory constrained, and the target size is less than twice the number of current workers, then the target size will increase, and aim to double the current workers.
Finally the target will be adjusted to respect the configured minimum and maximum values (if any).
Now the cluster is asked to adjust to a new target worker count, and the speed of scaling is limited only by the cluster manager’s ability to allocate new workers.
A Dask cluster scaled to 12 nodes showing data and tasks in flight during an analytic query, visualized in the Dask JupyterLab extension.
When scaling down, Dask employs a few extra steps to mitigate oscillations that can occur when a fast system scales down (to avoid yielding a slower system that immediately needs to scale back up). Data resident on scale-down target nodes is first migrated to other nodes (so that it won’t need to be recomputed) and Dask waits for a few autoscale cycles (specified in `wait_count` and defaulting to 3) to add explicit damping.
Even more detail on Dask’s distributed scheduler
You can read (or step through) the scheduler logic here at the `Scheduler.adaptive_target` method.
Moreover, `Adaptive` can be subclassed to provide custom heuristics — for example to respond to availability of other custom resources in the cluster like GPUs.
Dask adaptive scaling best practices
Because the autoscaling logic can be quite aggressive at scaling up and down, it’s important to specify min and max constraints. You don’t want to blow through your cloud budget or annoy coworkers sharing your same Kubernetes cluster.
If you have several workloads to run in succession which may benefit from autoscaling, you may hit optimal sizing sooner if you use the same cluster (since the scheduler will retain statistics from previous work) versus killing the cluster and spinning up a new one for each workload. While the scheduler has to run all the time, the scale-down of workers should mitigate the cost of keeping the cluster active.
Last, we recommend tuning and sizing your cluster first on a known workload (data + logic) and only then considering enabling autoscaling. In any case do not use autoscaling as a way to ignore or avoid tuning your worker size, partitions, etc. Designed to solve a different set of problems, using autoscaling in lieu of any tuning will lead to difficulties (and often excess cost and/or poor utilization).
Scalable data science and machine learning with Dask
Thanks for reading! As always, feel free to let us know what you think about this post. Plus, we’re always looking for more feedback on Coiled Cloud, which provides hosted Dask clusters, docker-less managed software, and zero-click deployments. You can try it out for free by clicking below.