Dask DataFrame groupby
• March 9, 2022
This post explains how to perform groupby aggregations with Dask DataFrames.
You’ll learn how to perform groupby operations with one and many columns. You’ll also learn how to compute aggregations like sum, mean, and count.
After you learn the basic syntax, we’ll discuss the best practices when performing groupby operations.
Sample dataset
Let’s read a sample dataset from S3 into a Dask DataFrame to perform some sample groupby computations. We will use Coiled to launch a Dask computation cluster with 5 nodes.
import coiled import dask import dask.dataframe as dd cluster = coiled.Cluster(name="demo-cluster", n_workers=5) client = dask.distributed.Client(cluster) ddf = dd.read_parquet( "s3://coiled-datasets/h2o/G1_1e7_1e2_0_0/parquet", storage_options={"anon": True, "use_ssl": True}, engine="pyarrow", )
This dataset contains 10 million rows of data and the following columns.

Let’s run some Dask groupby aggregations now that we’re familiar with the dataset.
Dask DataFrame groupby sum
Let’s groupby the values in the id1
column and then sum the values in the v1
column.
ddf.groupby("id1").v1.sum().compute()

You can also use an alternative syntax and get the same result.
ddf.groupby("id1").agg({"v1": "sum"}).compute()
agg
takes a more complex code path in Dask, so you should generally stick with the simple syntax unless you need to perform multiple aggregations.
Dask DataFrame groupby for a single column is pretty straightforward. Let’s look at how to groupby with multiple columns.
Dask DataFrame groupby multiple columns
Here’s how to group by the id1
and id2
columns and then sum the values in v1
.
ddf.groupby(["id1", "id2"]).v1.sum().compute()

You can pass a list to the Dask groupby method to group by multiple columns.
Now let’s look at how to perform multiple aggregations after grouping.
Dask groupby multiple aggregations
Here’s how to group by id3
and compute the sum of v1
and the mean of v3
.
ddf.groupby("id3").agg({"v1": "sum", "v3": "mean"}).compute()

You can pass a dictionary to the agg
method to perform different types of aggregations.
Let’s turn our attention to how Dask implements groupby computations. Specifically, let’s look at how Dask changes the number of partitions in the DataFrame when a groupby operation is performed. This is important because you need to manually set the number of partitions properly when the aggregated DataFrame is large.
How Dask groupby impacts npartitions
Dask doesn’t know the contents of your DataFrame ahead of time. So it can’t know how many groups the groupby operation will produce. By default, it assumes you’ll have relatively few groups, so the number of rows is reduced so significantly that the result will fit comfortably in a single partition.
However, when your data has many groups, you’ll need to tell Dask to split the results into multiple partitions in order to not overwhelm one unlucky worker.
Dask DataFrame groupby will return a DataFrame with a single partition by default. Let’s look at a DataFrame, confirm it has multiple partitions, run a groupby operation, and then observe how the resulting DataFrame only has a single partition.
Read in some data to a DataFrame and compute the number of partitions.
ddf = dd.read_parquet( "s3://coiled-datasets/h2o/G1_1e7_1e2_0_0/parquet", storage_options={"anon": True, "use_ssl": True}, engine="pyarrow", ) ddf.npartitions # 8
The Parquet dataset is read into a Dask DataFrame with 8 partitions. Now let’s run a groupby operation on the DataFrame and see how many partitions are in the result.
res = ddf.groupby("id1").v1.sum() res.npartitions # 1
Dask will output groupby results to a single partition Dask DataFrame by default. A single partition DataFrame is all that’s needed in most cases. groupby operations usually reduce the number of rows in a DataFrame significantly so they can be held in a single partition DataFrame.
You can set the split_out
argument to return a DataFrame with multiple partitions if the result of the groupby operation is too large for a single partition Dask DataFrame.
res2 = ddf.groupby("id1").v1.sum(split_out=2) res2.npartitions # 2
In this example, split_out
was set to two, so the groupby operation results in a DataFrame with two partitions. The onus is on you to properly set the split_out
size when the resulting DataFrame is large.
Performance considerations
Dask DataFrames are divided into many partitions, each of which is a pandas DataFrame. Dask performs groupby operations by running groupby on each of the individual pandas DataFrames and then aggregating all the results. The Dask DataFrame parallel execution of groupby on multiple subsets of the data makes it more scalable than pandas and often quicker too.
Running a groupby on the index is not faster than other columns in the DataFrame. Performance optimizations related to groupby operations on the DataFrame’s index are in a pull request that’s in progress.
Conclusion
Dask DataFrames make it easy to run groupby operations. The syntax is intuitive and familiar for pandas users. However, unlike pandas, setting split_out
is essential if your groupby operation will produce many groups.
The parallel execution of groupby operations makes it easy to run groupby operations on large datasets. Slow pandas groupby operations can also be sped up with parallelization powered by Dask as demonstrated in the blog post on speeding up a pandas query with 6 Dask DataFrame tricks.