Speed up a pandas query 10x with these 6 Dask DataFrame tricks

Matthew Powers February 14, 2022

, ,


This post demonstrates how to speed up a pandas query to run 10 times faster with Dask using six performance optimizations. You’ll often want to incorporate these tricks into your workflows so your data analyses can run faster. Here are the 6 strategies covered in this post:

  • Parallelize the query with Dask
  • Avoid object columns
  • Split up single large file into 100 MB files
  • Use Parquet instead of CSV
  • Compress files with Snappy
  • Leverage column pruning

This diagram illustrates how the query time decreases as each performance optimization is applied.

As you can see, a pandas computation can execute a lot faster when it’s parallelized with Dask.

Let’s start by running a groupby operation on a 5 GB dataset with pandas to establish a runtime baseline.

Pandas query baselines

The queries in this post will be run on a 5 GB dataset that contains 9 columns. Here are the first five rows of data, so you can get a feel of the data contents.

See this notebook for instructions on how to download this dataset on your local machine or access it via S3.

Let’s use pandas to run a groupby computation and establish a performance baseline.

import pandas as pd

df = pd.read_csv("data/G1_1e8_1e2_0_0.csv")
df.groupby("id1", dropna=False, observed=True).agg({"v1": "sum"})

This query takes 182 seconds to run.

Here’s the query result:

Let’s see how Dask can make this query run faster, even when the Dask computation is not well structured.

Dask query baseline

Let’s run the same groupby query with Dask. We’re going to intentionally type all the columns as object columns, which should give us a good worst case Dask performance benchmark. object columns are notoriously memory hungry and inefficient.

dtypes = {
    "id1": "object",
    "id2": "object",
    "id3": "object",
    "id4": "object",
    "id5": "object",
    "id6": "object",
    "v1": "object",
    "v2": "object",
    "v3": "object",
}

import dask.dataframe as dd

ddf = dd.read_csv("data/G1_1e8_1e2_0_0.csv", dtype=dtypes)

Let’s type v1 as an integer column and then run the same groupby query as before.

ddf["v1"] = ddf["v1"].astype("int64")

ddf.groupby("id1", dropna=False, observed=True).agg({"v1": "sum"}).compute()

That query runs in 122 seconds.

Dask runs faster than pandas for this query, even when the most inefficient column type is used, because it parallelizes the computations. pandas only uses 1 CPU core to run the query. My computer has 4 cores and Dask uses all the cores to run the computation.

Let’s recreate the DataFrame with more efficient data types and see how that improves the query runtime.

Avoiding object columns with Dask

We can type id1, id2, and id3 as string type columns, which are more efficient as described in the following video.

id4, id5, id6, v1, and v2 can be typed as integers. v3 can be typed as a floating point number. Here is the revised computation.

better_dtypes = {
    "id1": "string[pyarrow]",
    "id2": "string[pyarrow]",
    "id3": "string[pyarrow]",
    "id4": "int64",
    "id5": "int64",
    "id6": "int64",
    "v1": "int64",
    "v2": "int64",
    "v3": "float64",
}

ddf = dd.read_csv("data/G1_1e8_1e2_0_0.csv", dtype=better_dtypes)

ddf.groupby("id1", dropna=False, observed=True).agg({"v1": "sum"}).compute()

This query runs in 67 seconds. Avoiding object type columns allows for a significant performance boost.

This this blog post on reducing memory usage with Dask dtypes for more information about how correct column data types reduce memory usage and allow queries to run faster.

Split data in multiple files

Let’s split up the data into multiple files instead of a single 5 GB CSV file. Here’s code that’ll split up the data into 100 MB CSV files.

ddf.repartition(partition_size="100MB").to_csv("data/csvs")

Let’s rerun the query on the smaller CSV files.

ddf = dd.read_csv("data/csvs/*.part", dtype=better_dtypes)

ddf.groupby("id1", dropna=False, observed=True).agg({"v1": "sum"}).compute()

The query now runs in 61 seconds, which is a bit faster. Dask can read and write multiple files in parallel. Parallel I/O is normally a lot faster than working with a single large file.

Let’s try to use the Parquet file format and see if that helps.

Use Parquet instead of CSV

Here’s the code that’ll convert the small CSV files into small Parquet files.

ddf.to_parquet("data/parquet", engine="pyarrow", compression=None)

Let’s rerun the same query off the Parquet files.

ddf = dd.read_parquet("data/parquet", engine="pyarrow")

ddf.groupby("id1", dropna=False, observed=True).agg({"v1": "sum"}).compute()

This query executes in 39 seconds, so Parquet provides a nice performance boost. Columnar file formats that are stored as binary usually perform better than row-based, text file formats like CSV. Compressing the files to create smaller file sizes also helps.

Compress Parquet files with Snappy

Let’s recreate the Parquet files with Snappy compression and see if that helps.

ddf.to_parquet("data/snappy-parquet", engine="pyarrow", compression="snappy")

Let’s rerun the same query off the Snappy compressed Parquet files.

ddf = dd.read_parquet("data/snappy-parquet", engine="pyarrow")

ddf.groupby("id1", dropna=False, observed=True).agg({"v1": "sum"}).compute()

This query runs in 38 seconds, which is a bit of a performance boost.

In this case, the files are stored on my hard drive and are relatively small. When data is stored in cloud based object stores and sent over the wire, file compression can result in an even bigger performance boost.

Let’s leverage the columnar nature of the Parquet file format to make the query even faster.

Leverage column pruning

Parquet is a columnar file format, which means you can selectively grab certain columns from the file. This is commonly referred to as column pruning. Column pruning isn’t possible for row based file formats like CSV.

Let’s grab the columns that are relevant for our query and run the query again.

ddf = dd.read_parquet("data/snappy-parquet", engine="pyarrow", columns=["id1", "v1"])

ddf.groupby("id1", dropna=False, observed=True).agg({"v1": "sum"}).compute()

This query runs in 19 seconds. See this blog post for more details on column pruning.

Overall performance improvement

The original pandas query took 182 seconds and the optimized Dask query took 19 seconds, which is about 10 times faster.

Dask can provide performance boosts over pandas because it can execute common operations in parallel, where pandas is limited to a single core. For this example, I configured Dask to use the available hardware resources on my laptop to execute the query faster. Dask scales with your hardware to optimize performance.

My machine only has 4 cores and Dask already gives great results. A laptop that has 14 cores would be able to run the Dask operations even faster.

Parquet files allow this query to run a lot faster than CSV files. The query speeds get even better when Snappy compression and column pruning are used.

The query optimization patterns outlined in this blog are applicable to a wide variety of use cases. Splitting your data into optimally sized files on disk makes it easier for Dask to read the files in parallel easier. Strategically leveraging file compression and column pruning almost always helps. There are often easy tweaks that’ll let you run your pandas code a lot faster with Dask.


Ready to get started?

Create your first cluster in minutes.