Dask as a Spark Replacement

Matt Rocklin October 4, 2021

, , , , , , ,


Dask and Spark

What’s broken and how we’re fixing it

Many Dask users and Coiled customers are looking for a Spark/Databricks replacement.  This article discusses the problem that these folks are trying to solve, the relative strengths of Dask/Coiled for large-scale ETL processing, and also the current shortcomings.  We focus on the shortcomings of Dask in this regard and describe ongoing technical efforts to address these.

This article is mainly intended to provide context to development teams, rather than serve as a sales pitch.

Motivation: ETL and SQL-like queries dominate pain today

The bulk of distributed compute time spent in the world seems to be ETL or SQL-like queries.  In Dask terms they are mostly dask bag and dask dataframe focused, with a bit of dask delayed/futures.  These users care about ingesting large volumes of data, performing basic operations on that data, joining against other datasets, and storing it into analysis-ready formats like Parquet.

We also see many people interested in advanced machine learning, but they’re often blocked by just being able to look at and process their data at scale.  We’re excited to push forward on Dask’s traditional roots of advanced computing, but want to make sure that basic data processing fundamentals are strong first.  This seems to be where most pain is focused today.

In these large-scale data processing situations, scale and predictability are far more important than flexibility and creative analyses.  Dask has some advantages here, but also some drawbacks today.

Dask advantages

We write about Dask advantages often, so we’ll be brief here.  Folks seem to prefer Dask for the following reasons:

  1. They like Python and the PyData stack
  2. They’re cost-sensitive and have seen good savings when comparing Dask against Spark
  3. They feel locked-in by Databricks, mostly in terms of user experience and flexibility
  4. They want increased flexibility from all of the other parts of Dask, particularly the general-purpose programming APIs, delayed and futures, and the ML integrations
  5. They want to reduce costs with adaptive scaling
  6. They want to reduce costs and increase speed with GPUs

Dask disadvantages

However, historically Dask developers have avoided attacking the Spark/ETL space head-on.  Instead, we’ve focused on all of the domains that Spark really just couldn’t support (arbitrary task scheduling, workflow management, ML, array computing, general-purpose computing, and so on …)  

As a result, Dask has to close a maturity gap here if we want to support these users at scale.  The rest of this article discusses several technical features where Dask falls short, and which need to be improved in order to provide a high-quality ETL-at-scale experience with Spark-level maturity.

Parquet

Parquet has become the default tabular data storage format today.  Python support for Parquet is strong, with two high-quality solutions, Arrow and FastParquet, as well as robust cloud object storage support with fsspec.

Problem

However, when operating at the 1-10TB scale or above, these solutions start to fail, at least in the way that Dask uses them.  This is for a variety of reasons:

  1. The size of metadata grows beyond serialization limits
  2. Intermittent S3 failures become common
  3. ETL workloads quickly require dataset/table mutations (e.g. append, update) which exposes us to an entire zoo of consistency and performance problems that the current Parquet interface cannot deal with (more on this later)
Solution

The solution here is composed of several small fixes across several projects (Dask, Pandas, Arrow, s3fs, fsspec, thrift, …)  This is being tracked in the meta-issue Processing Large Remote Parquet Datasets.   

This is currently being resolved by engineers like James Bourbeau (Coiled), Joris Van den Bossche (Voltron Data), Rick Zamora (NVIDIA), and Martin Durant (Anaconda).

There is also some interesting discussion around data lake management systems (DeltaLake, Iceberg, Hudi) in the next section, which provides solutions to some of the problems mentioned above.

Data Lake formats

Technologies like Delta Lake (Databricks), Apache Iceberg (Netflix, Tabular), and Apache Hudi (Uber) provide management layers on top of data lakes.  These technologies solve a variety of issues that arise in larger data engineering pipelines.

Database technologies like MemSQL, Snowflake, and BigQuery allow for huge queries to be run in less than a second.  These technologies are great for applications that require low latency, like dashboards.

Problem

The Python space has traditionally not engaged deeply here, probably because our technology stack is mostly used for exploratory data analysis on analysis-ready data.  However, as Python starts to displace Spark/Databricks for some of these workloads we’re relearning some of the pain points that the Spark community has already experienced.  Some specific examples of common pain points:

  • While someone is writing to the data lake, reads don’t work
  • Data lakes may have tens or hundreds of thousands of files.  Performing a S3 file listing operation before reading the data can be really expensive.
  • If you’re in the middle of a write and the cluster dies, then you’ll have a bunch of partially written files and all subsequent reads won’t work
  • Multiple people might write to the data lake at the same time
  • A few rows of data need to get deleted from the data lake for GDPR compliance, but that’s actually really hard because Parquet files are immutable
  • Tens of thousands of tiny files need to get compacted into bigger files to make data reads faster
Solution

Read support for Delta/Iceberg/Hudi isn’t hard to implement (indeed, here is a recent community PR adding Deltalake read support to Dask dataframe from scratch by Gurunath).

Write access is harder, but probably also necessary as we walk further down this path.  There is some internal conversation within Coiled, Voltron Data, and other companies about which solution to focus on, with some preference being for Iceberg, due to the more open community nature, and also a stronger relationship with the folks at Tabular.

Robust shuffle

When datasets need to be sorted or joined with other very large datasets this requires a full dataset communication.  Every input chunk of data needs to contribute a little bit to every output chunk of data.  This operation is commonly called a shuffle.

Spark’s shuffle implementation is solid, following a similarly strong tradition from Hadoop/Hive.  In fact, shuffle is so core to Spark that most Spark users mistakenly call all worker-worker communications “shuffles”.  Spark does this so well that they don’t try to support much else.  

Problem

Dask supports a variety of other communication patterns, and so we never optimized this pattern much.  As a result, Dask’s shuffle algorithm has the following flaws today:

  • At high partition count, it generates very many tasks, the handling of which can become more expensive than the computation itself
  • Spilling data to disk is not finely controlled, resulting in suboptimal use of hardware when datasets are larger than memory (full-dataset shuffles require all data to be accessible at once)
  • Network communication during shuffles is not well optimized.

As a result, Dask is able to perform any shuffle, but slowly, especially when memory is scarce.  We don’t max out the performance of all available hardware, which is arguably Dask’s job.

Solution

This is currently being resolved by Coiled engineers, notably Gabe Joseph, Florian Jetter, and James Bourbeau.  We’re building a separate shuffling systemservice to handle these workloads explicitly.  This will be a third option alongside the currently available partd and tasks options.

As we get this system to run with increasingly high levels of performance and increasingly high stability guarantees we’re also exposing a variety of other performance issues in Dask, Pandas, and even CPython which we’re working on as well.  We hope for an early opt-in release of this in the next month.  

High-level query optimization

Most developers do not write optimal code.  

Problem

In theory, Dask could look at user code and suggest alternatives.  This can result in, for example, reading only the relevant columns from a data store, moving a row filter or a join to some earlier or later stage in the pipeline to eliminate unnecessary work or any of a number of other optimizations that can easily save an order of magnitude of effort.

Example
# Before
import dask.dataframe as dd
df = dd.read_parquet("...")
df['z'] = df.x + df.y
df = df[df.balance > 0]
df.z.sum().compute()
# After
import dask.dataframe as dd
df = dd.read_parquet("...", columns=["x", "y", "balance"])  # Only read necessary columns
df = df[df.balance > 0]  # filter first
df['z'] = df.x + df.y
df.z.sum().compute()

Historically Dask has focused more on low-level optimizations that are applicable throughout all of the Dask subprojects.  Spark on the other hand has developed a specialized query optimizer, much like a traditional database.  This gives Spark the ability to avoid tremendous amounts of work in common situations, like above.

Solution

We’re currently looking at adding a high-level expression layer to Dask collections (array, bag, dataframe).  This will allow future developers to add query optimizations incrementally.  This change will affect not only Dask dataframe (the main subject of this article) but will also make space for changes in any Dask collection, notably including Dask array as well as other collections, like Xarray, RAPIDS, Dask-GeoPandas, and more.

This work is being performed by Jim Crist-Harif (Coiled), Aaron Meurer (Quansight, also lead SymPy maintainer), and Rick Zamora (NVIDIA).

GPU accessibility

Problem

Many companies today are excited about using RAPIDS, but find it challenging to use.  This is due to a few problems:

  1. Setting up a GPU stack in the cloud (or anywhere) is hard
  2. Setting up a GPU stack in the cloud with proper hardware configuration for real performance, especially in ETL situations, is very hard
  3. Local experimentation is difficult if users don’t have a local GPU

    And notably, no Apple hardware today and very little other consumer hardware comes with NVIDIA GPUs by default

As a result, GPUs and RAPIDS are something that most companies “are looking forward to” especially given the promised cost reductions, but haven’t actually tried in production.

These problems are especially challenging in data-intensive workloads, much more so than in machine learning situations.  In deep learning situations there is so much compute-heavy work to do that computing costs dominate other costs like communication or disk access.  However, in data-intensive workloads, these costs are more balanced, and so we must look at accelerated networking and disk hardware at the same time that we add accelerated compute hardware.  This is all available today, but configuring and balancing these systems requires special consideration.

Solution

On the IT side, products like Coiled are designed to make it easy to achieve a professionally managed IT stack quickly.  However, doing this well requires care, and while Coiled offers GPU support, we haven’t yet fully solved the problem of optimally efficient configurations on the cloud, which is an order of magnitude harder than just giving “GPUs on the cloud’.  The balance between GPUs, network, disk, and other infrastructure is challenging to get perfect.

On the user side, projects like Afar, made by Erik Welch (Anaconda) make it easy to blend between the user’s local session and the execution environment on the cluster.  This is a little bit like Spark’s Livy project, but much finer-grained and usable.  Coiled engineers have been using Afar for a while now (mostly to avoid sending large task graphs over consumer internet connections) and have generally found it a pleasure to use.  

We are hopeful that this helps to make GPU clusters feel closer at hand.

SQL API

Finally, many companies report wanting to connect these computational systems up to other business infrastructure.  The Dask-SQL project is being led by Nils Braun (Bosch), and it provides a SQL interface on top of Dask dataframes.

I’ll admit to being torn by this topic.  I still strongly recommend that users use a proper database (Postgres, Snowflake, Dremio, Redshift, BigQuery, …) for routine SQL analysis.  Databases are just better at this work than Dask will ever be.  Normally in Dask we focus on interfacing smoothly with existing database technologies, like ongoing work from James Bourbeau and Florian Jetter (Coiled) on Dask-Snowflake and Naty Clementi (Coiled) on Dask-Mongo and Dask-BigQuery.

Florian Jetter (Coiled) gave a nice counterexample to this though, I’m going to quote him below:

> In my mind, SQL on Dask is mostly to build a bridge between the technical and business world. Most databases are not suited for this kind of workload unless they can read natively a Parquet dataset already. Have you ever tried inserting 1TB / 1bn rows of data into a Postgres database? Of course, this is a space where things like Dremio, Presto, etc. shine but getting this up and running is not easy and/or expensive. Also, especially big companies are hesitant to introduce “yet another technology for the same thing”. Even if we will never excel in this space, simply checking the box is valuable.

Community and teamwork

This is a lot of work across many different projects.  

The PyData stack differs from Databricks/Spark in that we’re spread into many different projects.  Larger initiatives like “PyData as a Spark replacement” require close collaboration among disparate teams.  Fortunately, we’re already really good at this.

This work is mostly being carried out by employees at Coiled, Anaconda, Quansight, Voltron Data, and NVIDIA.  This team works together with the broader community and forms a larger community.  This only works because of the decade of shared experience we all have with each other, and a strong history of open communication.

Early results and timelines

Early results in each of these areas individually are good.  These teams have been able to show forward progress on the timescale of weeks, with a completion horizon on the timeline of months.  

However, efficient computing at scale is a challenging problem.  As the larger and more relevant issues are resolved we increase scale, and with that increase of scale previously small and innocuous problems become quite significant. Having Dask operate with Spark-level maturity in this area where Spark was designed to succeed will likely take several months.  That being said, the natural advantages of Dask (Python-native, GPUs, flexibility, lower cost) mean that we’ll likely become an attractive choice for a decent chunk of users before full maturity is reached (indeed, we’re there today for many use cases), and there is really good momentum in this space today.  A lot of different institutions want to see this work out.

Hat tip to science 

As a closing note, I’ll say that, while the technical changes are being driven by traditional business needs and applications, they have fortunate side effects for the scientific userbase of Dask, and in particular dask array users.  As an example …

  1. Parquet: The changes around Parquet at scale often make us deal with cloud object stores more intelligently, which supports other scientific formats, like Zarr and TileDB
  2. Shuffle: The patterns used to develop the shuffle system could be repeated for other similarly challenging operations in dask array, like map_overlap, rechunk, and tensordot
  3. High-level query optimization: High-level expressions will allow for more automatic rechunking schemes in dask array, and better overall analysis before execution
  4. GPUs: the changes here are entirely agnostic to dataframes, and should be useful throughout the ecosystem

And so this work follows in the age-old Dask tradition of building infrastructural technology that is paid for by business, but also quite pragmatic for scientific advancement.

Need for Partners / Customers

Finally, this set of problems requires exposure to intense workloads.  It’s hard to deliver high-quality and robust solutions in a vacuum; when building a racecar you need to send it around the track a few thousand times, and then send it around a different track to see what breaks.

If your team has large Spark workloads that they’re looking to get off of and is interested in driving a new style of race cars then please do get in touch. You can fill out the form below and someone on the Coiled team will reach out shortly.



FAQ

  1. What kind of cost-saving can I expect if I use Dask instead of Spark for data engineering / ETL workloads? 
    • It depends on how the clusters are deployed. If you’re running Spark or Dask on long-running clusters, you can expect a significant decrease in cost if moving to ephemeral clusters. Coiled is one option for creating ephemeral Dask clusters, which minimizes cost relative to a long-running cluster.

      Dask often supports more flexible and performant algorithms which are able to achieve the same result, but with less work.

      Finally, Dask’s integration with highly performant hardware, like GPUs, can often provide order-of-magnitude cost reductions if done well.
  2. How does Dask deliver more flexibility to data engineers? 
    • Dask provides dataframe and array interfaces that are familiar to widely used data engineering libraries like Pandas and NumPy. With Dask, data engineers can write pandas and numpy-like code and scale it across large clusters.

      Additionally, Dask goes beyond just Pandas and NumPy, and is a general-purpose parallel programming framework, capable of scaling out any kind of general-purpose Python code.
  3. What is adaptive scaling? 
    • Adaptive scaling allows you to work with auto-scaling Dask clusters that automatically adjust the number of workers based on a number of scaling heuristics.
  4. How do GPUs help scale data engineering workloads?
    • In conjunction with projects like RAPIDS and Dask, data engineers can scale dataframe and array operations across individual GPU machines or GPU clusters while writing pandas and numpy-like code. This means data engineers get to work with a familiar API and don’t have to learn a new API. 
  5. When should I use Dask dataFrames or Dask bags or Dask delayed when converting Spark code to Dask code?
    • In general, you can use Dask dataFrames for tabular data and operations that you would do in a Spark dataFrame. Dask bags are helpful for unstructured data and intermediate steps towards moving the data into a Dask dataFrame. Dask delayed is useful for custom parallel code/tasks.
  6. How does Dask compare to Koalas? Should I use Koalas or Dask Dataframes?
    • Dask DataFrames and Koalas both provide pandas-like interfaces for scaling dataframe computation across clusters of machines. Dask DataFrames run on top of Dask (pure Python) and the PyData ecosystem, whereas Koalas dataframes run on top of Spark and the issues mentioned in this article.
  7. Does Dask implement machine learning libraries similar to Spark MLlib?
    • Dask provides scalable machine learning in Python using Dask alongside popular machine learning libraries like Scikit-Learn, XGBoost, and others. Many ML libraries in Python have adopted Dask as their native parallel framework. In many cases, Dask does not re-implement these systems. Instead, Dask makes it easy to use typical Dask workflows to prepare and set up data, then it uses XGBoost alongside Dask and handles the data exchange between the two systems.
  8. Can I run Dask on my YARN (Spark) cluster?
  9. Can I run Dask on my Kubernetes cluster?
  10. Can Dask connect to my HDFS data lake in a Kerberized Hadoop cluster?
  11. Can Dask connect to my AWS/GCP/Azure Data Lake using enterprise security and authentication?
    • Yes, Coiled provides functionality to handle authentication and authorization for data access to cloud storage using standard IAM credentials and temporary access tokens on the Dask workers.
  12. Does Dask have a connector for Snowflake similar to Spark’s Snowflake connector?
    • Yes, there is a Dask-Snowflake connector that uses the distributed fetch capability in Snowflake to perform direct parallel reads and writes from Dask.
  13. How do you compare Spark vs Dask?
  14. Why might you choose Spark?
    • You prefer Scala or the SQL language
    • You have mostly JVM infrastructure and legacy systems
    • You want an established and trusted solution for business
    • You are mostly doing business analytics with some lightweight machine learning
    • You want an all-in-one solution
  15. Why might you choose Dask?
    • You prefer Python or native code or have large legacy code bases that you do not want to entirely rewrite
    • Your use case is complex or does not cleanly fit the Spark computing model
    • You want a lighter-weight transition from local computing to cluster computing
    • You want to interoperate with other technologies and don’t mind installing multiple packages
  16. Why might you use both Spark and Dask?
    • It is easy to use both Dask and Spark on the same data and on the same cluster.
    • They can both read and write common formats, like CSV, JSON, ORC, and Parquet, making it easy to hand results off between Dask and Spark workflows.
    • They can both deploy on the same clusters. Most clusters are designed to support many different distributed systems at the same time, using resource managers like Kubernetes and YARN. If you already have a cluster on which you run Spark workloads, it’s likely easy to also run Dask workloads on your current infrastructure and vice versa.
    • In particular, for users coming from traditional Hadoop/Spark clusters (such as those sold by Cloudera/Hortonworks) you are using the Yarn resource manager. You can deploy Dask on these systems using the Dask Yarn project, as well as other projects, like JupyterHub on Hadoop.
  17. How does ML compare between Dask and Spark?
    • Spark MLLib is a cohesive project with support for common operations that are easy to implement with Spark’s Map-Shuffle-Reduce style system. People considering MLLib might also want to consider other JVM-based machine learning libraries like H2O, which may have better performance.
    • Dask relies on and interoperates with existing libraries like Scikit-Learn and XGBoost. These can be more familiar or have higher performance but generally results in a less cohesive whole. See the dask-ml project for integrations.
  18. How do cluster deployments compare between Dask and Spark?
    • Cluster deployments of Dask are similar to Spark in that they both have drivers/schedulers to direct tasks to workers. These deployments can run on top of virtual machines, containers, or YARN clusters. Coiled provides functionality to create and manage Dask clusters on the cloud (AWS, GCP, and Azure) in a way that handles the backend infrastructure automatically.
  19. Can Coiled use spot/preemptible instances for Dask clusters similar to Databricks?
    • Yes, since Dask clusters are resilient to worker node failures, Coiled uses spot/preemptible instances by default for Dask clusters, or you can create a Dask cluster with on-demand instances if desired.
  20. How does Dask connect to data sources like S3, ADLS, GCS?
    • There are connectors for parallel data I/O in Dask such as s3fs, adlfs, and gcsfs.
  21. How does Dask connect to data sources like Snowflake, MongoDB, or BigQuery?
  22. How can I set up scheduled ETL jobs with Dask?
    • Popular workflow orchestration and data pipeline tools that work with Dask include AirFlow and Prefect. 

Thanks for reading!