Dask in production: Multi-Scheduler architectures

I ran across an interesting problem yesterday:

A company wanted to serve many Dask computations behind a web API endpoint. This is pretty common whenever people offer computation as a service or data as a service. Today the company uses the single-machine Dask scheduler inside of a web request, but they were curious about moving to serving larger requests on a scaled out cluster.

However, they were concerned about having many thousands of requests all funnel down onto a single Dask scheduler. This lead to a conversation about the ways of using Dask in production. I thought I’d enumerate what we’ve seen historically here in a blogpost, and then finish up with a quick hack I put together that would probably better suit their particular needs.

When thinking about any production workload you need to consider a few axes:

  1. Request Scale: How large is the computation needed by one request?
  2. Request Concurrency: How many requests per second do we want to serve?
  3. Response time: What response time do we need?
  4. Resilience: Everything that we discuss today is under the assumption that resilience in production is required.

We’ll look at 5 different highly reliable architectures, each of which serves different regions in this space.

1. A Few Large Requests with a Dask cluster per request

For many institutions “production workloads” means “large jobs that need to get done every hour” or “A job that has to run whenever a large dataset comes in”. These jobs typically take several minutes. It’s important that they run, or if they don’t run, that someone is alerted.

Dask is often used with incombination with Kuberentes and Airflow/Prefect for batch production workloads. In this case, human-scale responsiveness is not critical.

In this situation people usually use a fresh Dask distributed scheduler, coupled with some resource management system like Kubernetes, and a workflow manager like Airflow or Prefect.

Whenever a job comes in they spin up a new Dask cluster (this takes a few seconds or a few minutes depending on your infrastructure) and submit the job to that. This often happens within some workflow management system like Airflow or Prefect, which adds logic for retries, alerting on failures, and so on.

  • Request Scale: Each request can be arbitrarily large. We’re using the full dask.distributed scheduler.
  • Request Concurrency: If we need to run many of these at once that’s ok, Kubernetes launches many independent Dask clusters
  • Response time: Kubernetes is in the loop here, so we’re going to deal with seconds or minutes of extra latency.
  • How do we achieve Resilience?: If a worker goes down, Dask handles it. If the scheduler or client go down (rare) then Kubernetes + Airflow/Prefect handle it.

2. Many Small Requests with the single-machine scheduler

For others, such as the company described above, they currently use a single-machine Dask deployment within a web request to gather data, do a bit of computation, and then fire back the result to the user. This typically all happens in the sub-second times expected by web users.

In this situation they use the Dask local scheduler, which doesn’t require any complicated resources to spin up (it runs in the current Python process) and so lives comfortably within existing web frameworks. However this limits them to the resources available in the machine hosting their web server.

Dask’s single-machine scheduler can be used within an individual request in a web server with minimal overhead. This is useful when handling many web requests for small-to-medium scale computation requests. There is no additional infrastructure, so Dask doesn’t get in the way of the web stack.
  • Request Scale: Limited to one machine. We’re using the single-machine scheduler.
  • Request Concurrency: Unlimiited. This is handled by however our web stack scales, typically by adding replicas of the web server.
  • Response Time: Good. The Dask local scheduler doesn’t add any relevant latency to a web request (10s of microseconds).
  • Resilience: This is handled by the web stack. The web has fairly robust protocols to handle server failover.

3. Many Large Requests (cluster per request)

Rarely we see companies that want to serve large distributed computations behind a publicly accessible web API. This requires some care. If your users don’t mind a few seconds of delay then it’s typical to spin up a new Dask cluster for every request, and fall back to the “Few Large Requests” model above. Kubernetes backs us here and creates large-but-ephemeral Dask clusters on an on-demand basis.

Serving large distributed computation behind a publicly accessible web API is relatively niche, and users today don’t seem to expect much, so waiting a few seconds for Kubernetes to respond is fine in many cases. But can we do better?

  • Request Scale: As in architecture 1, this is arbitrarily large
  • Request Concurrency: As in architecture 1, this is arbitrarily large, at least assuming that you have the hardware to handle it
  • Response time: Kubernetes is in the loop here, so we’re going to deal with seconds or minutes of extra latency.
  • How do we acheive Resilience?: As in architecture 1, this is handled by Kuberentes

4. Many Large Requests (shared cluster)

Alternatively, we could put a single Dask scheduler in front of many web requests. This brings our latency overheads down to milliseconds (well within web user tolerances) but introduces some concern about scalability and multi-tenancy. There are a few cases where this can break down:

Many requests can connect to the same Dask scheduler. This gives us rapid responsiveness and scale, but raises concerns about a central bottleneck.
  1. If there are many requests they could overwhelm the Dask scheduler, which is comfortable handling a few thousand tasks per second.For example if each of your computations have 100 tasks and you have 100 requests per second then this could become an issue.
  2. One noisy user can dominate the scheduler, and give everyone else a bad time
  • Request Scale: As in architecture 1, this is arbitrarily large
  • Request Concurrency: We’re limited here to the 5000 task / second limit of a single Dask scheduler
  • Response time: Assuming that we haven’t reached a concurrency limit, we’re adding milliseconds of latency here, which is usually fine
  • How do we acheive Resilience?: We retain resilience with Kubernetes and retries with the web stack

Having the single central scheduler in there can produce a bottleneck. Dask’s architecture was originally designed to serve computations for a single user and less for high-concurrency production workloads.

5. Many Large Requests (multi-scheduler architecture)

Fortunately Dask was also designed to be flexible and evolve. We can take the existing pieces of a Dask cluster, and rearrange them to suit this situation without too much pain.

If the scheduler is a bottleneck, then let’s add more schedulers. We’ll throw in a load balancer too for good measure.

We replicate the Dask scheduler, but connect all of them to the same pool of Workers. This allows us to balance load across several schedulers while still reusing compute resources efficiently.

Today each worker can only talk to a single scheduler. That turns out to be easy to change though, and is up in this experimental pull request.

Then, we can use standard web scaling techniques like load balancers and auto-scalers to build a robust system to handle many requests, and deliver them to the same set of Dask workers. This gives us the ability to handle an unbounded number of requests, while still being efficient with our single pool of responsive workers.

Additionally, it’s comforting that this solution fits into familiar patterns of deployment today. Dask schedulers and workers are lightweight web servers that fit into our existing models of deployment.

What this doesn’t do

However, let’s enumerate some problems that this doesn’t solve:

  1. This doesn’t accelerate single requests (see this post for more information on current efforts there).
  2. We’re not sharing intermediate results between requests (but separate caching services would not be hard to imagine here)
  3. We haven’t made this trivial for novices to deploy. Architectures like this tend to be managed by devops groups within companies. This is a straightforward thing to do, but only if you have people on staff who are familiar with setting up architectures like this, or work with a company who does .

Engage

As we’ve been working on Coiled it has been fun to hear about more companies and other institutions stretching Dask beyond its original intent. If these kinds of problems sound relevant to you then consider getting in touch.

Share