Dask DataFrame groupby

Matthew Powers Gabe Joseph March 9, 2022

,

This post explains how to perform groupby aggregations with Dask DataFrames.

You’ll learn how to perform groupby operations with one and many columns. You’ll also learn how to compute aggregations like sum, mean, and count.

After you learn the basic syntax, we’ll discuss the best practices when performing groupby operations.

Sample dataset

Let’s read a sample dataset from S3 into a Dask DataFrame to perform some sample groupby computations. We will use Coiled to launch a Dask computation cluster with 5 nodes.

import coiled
import dask
import dask.dataframe as dd

cluster = coiled.Cluster(name="demo-cluster", n_workers=5)
client = dask.distributed.Client(cluster)

ddf = dd.read_parquet(
    "s3://coiled-datasets/h2o/G1_1e7_1e2_0_0/parquet",
    storage_options={"anon": True, "use_ssl": True},
    engine="pyarrow",
)

This dataset contains 10 million rows of data and the following columns.

Let’s run some Dask groupby aggregations now that we’re familiar with the dataset.

Dask DataFrame groupby sum

Let’s groupby the values in the id1 column and then sum the values in the v1 column.

ddf.groupby("id1").v1.sum().compute()

You can also use an alternative syntax and get the same result.

ddf.groupby("id1").agg({"v1": "sum"}).compute()

agg takes a more complex code path in Dask, so you should generally stick with the simple syntax unless you need to perform multiple aggregations.

Dask DataFrame groupby for a single column is pretty straightforward. Let’s look at how to groupby with multiple columns.

Dask DataFrame groupby multiple columns

Here’s how to group by the id1 and id2 columns and then sum the values in v1.

ddf.groupby(["id1", "id2"]).v1.sum().compute()

You can pass a list to the Dask groupby method to group by multiple columns.

Now let’s look at how to perform multiple aggregations after grouping.

Dask groupby multiple aggregations

Here’s how to group by id3 and compute the sum of v1 and the mean of v3.

ddf.groupby("id3").agg({"v1": "sum", "v3": "mean"}).compute()

You can pass a dictionary to the agg method to perform different types of aggregations.

Let’s turn our attention to how Dask implements groupby computations. Specifically, let’s look at how Dask changes the number of partitions in the DataFrame when a groupby operation is performed. This is important because you need to manually set the number of partitions properly when the aggregated DataFrame is large.

How Dask groupby impacts npartitions

Dask doesn’t know the contents of your DataFrame ahead of time. So it can’t know how many groups the groupby operation will produce. By default, it assumes you’ll have relatively few groups, so the number of rows is reduced so significantly that the result will fit comfortably in a single partition.

However, when your data has many groups, you’ll need to tell Dask to split the results into multiple partitions in order to not overwhelm one unlucky worker.

Dask DataFrame groupby will return a DataFrame with a single partition by default. Let’s look at a DataFrame, confirm it has multiple partitions, run a groupby operation, and then observe how the resulting DataFrame only has a single partition.

Read in some data to a DataFrame and compute the number of partitions.

ddf = dd.read_parquet(
    "s3://coiled-datasets/h2o/G1_1e7_1e2_0_0/parquet",
    storage_options={"anon": True, "use_ssl": True},
    engine="pyarrow",
)

ddf.npartitions # 8

The Parquet dataset is read into a Dask DataFrame with 8 partitions. Now let’s run a groupby operation on the DataFrame and see how many partitions are in the result.

res = ddf.groupby("id1").v1.sum()

res.npartitions # 1

Dask will output groupby results to a single partition Dask DataFrame by default. A single partition DataFrame is all that’s needed in most cases. groupby operations usually reduce the number of rows in a DataFrame significantly so they can be held in a single partition DataFrame.

You can set the split_out argument to return a DataFrame with multiple partitions if the result of the groupby operation is too large for a single partition Dask DataFrame.

res2 = ddf.groupby("id1").v1.sum(split_out=2)

res2.npartitions # 2

In this example, split_out was set to two, so the groupby operation results in a DataFrame with two partitions. The onus is on you to properly set the split_out size when the resulting DataFrame is large.

Performance considerations

Dask DataFrames are divided into many partitions, each of which is a pandas DataFrame. Dask performs groupby operations by running groupby on each of the individual pandas DataFrames and then aggregating all the results. The Dask DataFrame parallel execution of groupby on multiple subsets of the data makes it more scalable than pandas and often quicker too.

Running a groupby on the index is not faster than other columns in the DataFrame. Performance optimizations related to groupby operations on the DataFrame’s index are in a pull request that’s in progress.

Conclusion

Dask DataFrames make it easy to run groupby operations. The syntax is intuitive and familiar for pandas users. However, unlike pandas, setting split_out is essential if your groupby operation will produce many groups.

The parallel execution of groupby operations makes it easy to run groupby operations on large datasets. Slow pandas groupby operations can also be sped up with parallelization powered by Dask as demonstrated in the blog post on speeding up a pandas query with 6 Dask DataFrame tricks.


Ready to get started?

Create your first cluster in minutes.