Vertically Union Dask DataFrames with concat

Matt Powers October 5, 2021

, , ,


This post teaches you how to union Dask DataFrames vertically with concat and the important related technical details.  Vertical concatenation combines DataFrames like the SQL UNION operator combines tables which is common when joining datasets for reporting and machine learning.  It’s useful whenever you have two tables with identical schemas that you’d like to combine into a single DataFrame.

The tactics outlined in this post will help you combine two DataFrame with the same or similar schemas into a single DataFrame. It’s a useful design pattern to have in your toolkit.

Horizontal concatenations can also use concat, but require an entirely different set of considerations, and that’s why they’re discussed in a separate post.

Here’s how this post on vertical concatenations is organized:

  • Concatenating DataFrames with identical schemas / dtypes
  • Interleaving partitions to maintain divisions integrity
  • Concatenating DataFrames with different schemas
  • Concatenating large DataFrames

Concatenate DataFrames with identical schemas

Create two Dask DataFrames with identical schemas.

import dask.dataframe as dd
import pandas as pd

df = pd.DataFrame(
    {"nums": [1, 2, 3, 4, 5, 6], "letters": ["a", "b", "c", "d", "e", "f"]}
)
ddf1 = dd.from_pandas(df, npartitions=2)

df = pd.DataFrame({"nums": [88, 99], "letters": ["xx", "yy"]})
ddf2 = dd.from_pandas(df, npartitions=1)

Now concatenate both the DataFrames into a single DataFrame.

ddf3 = dd.concat([ddf1, ddf2])

Print the contents of ddf3 to verify it contains all the rows from ddf1 and ddf2.

print(ddf3.compute())

  nums letters
0     1       a
1     2       b
2     3       c
3     4       d
4     5       e
5     6       f
0    88      xx
1    99      yy

ddf1 has two partitions and ddf2 has one partition.  ddf1 and ddf2 are combined to ddf3, which has three total partitions.

ddf3.npartitions

3

Dask can use information on divisions to speed up certain queries.  The creation of ddf3 above wiped out information about DataFrame divisions. Let’s see how we can interleave partitions when concatenating DataFrames to avoid losing the division’s data.

Interleaving partitions

Let’s revisit our example with a focus on DataFrame divisions to illustrate how concat wipes out the DataFrame divisions by default.

Recreate the ddf1 DataFrame and look at its divisions.

df = pd.DataFrame(
    {"nums": [1, 2, 3, 4, 5, 6], "letters": ["a", "b", "c", "d", "e", "f"]}
)
ddf1 = dd.from_pandas(df, npartitions=2)

ddf1.divisions # (0, 3, 5)

Here’s how to interpret this divisions output:

  • The first partition has index values between 0 and 2
  • The second partitions has index values between 3 and 5

Let’s print every partition of the DataFrame to visualize the actual data and reason about the division’s values.

def print_partitions(ddf):
    for i in range(ddf.npartitions):
        print(ddf.partitions[i].compute())

print_partitions(ddf1)

  nums letters
0     1       a
1     2       b
2     3       c
   nums letters
3     4       d
4     5       e
5     6       f

Let’s recreate ddf2 and view its divisions too.

df = pd.DataFrame({"nums": [88, 99], "letters": ["xx", "yy"]})
ddf2 = dd.from_pandas(df, npartitions=1)

ddf2.divisions # (0, 1)

ddf2 has a single partition with index values between zero and one.

print_partitions(ddf2)

   nums letters
0    88      xx
1    99      yy

Let’s concatenate the DataFrames and see what happens with the divisions.

ddf3 = dd.concat([ddf1, ddf2])

ddf3.divisions # (None, None, None, None)

Dask has lost all information about divisions for ddf3 and won’t be able to use divisions-related optimizations for subsequent computations.

You can set interleave_partitions to True when concatenating DataFrames to avoid losing division’s information.

ddf3_interleave = dd.concat([ddf1, ddf2], interleave_partitions=True)

ddf3_interleave.divisions # (0, 1, 3, 5)

Take a look at how the data is distributed across partitions in ddf3_interleave.

print_partitions(ddf3_interleave)

  nums letters
0     1       a
0    88      xx
   nums letters
1     2       b
2     3       c
1    99      yy
   nums letters
3     4       d
4     5       e
5     6       f

Dask can optimize certain computations when divisions exist.  Set interleave_partitions to True if you’d like to take advantage of these optimizations after concatenating DataFrames.

Concatenating DataFrames with different schemas

You can also concatenate DataFrames with different schemas. Let’s create two DataFrames with different schemas, concatenate them, and see how Dask behaves.

Start by creating the two DataFrames.

df = pd.DataFrame(
    {
        "animal": ["cat", "dolphin", "shark", "starfish"],
        "is_mammal": [True, True, False, False],
    }
)
ddf1 = dd.from_pandas(df, npartitions=2)

df = pd.DataFrame({"animal": ["hippo", "lion"], "likes_water": [True, False]})
ddf2 = dd.from_pandas(df, npartitions=1)

Concatenate the DataFrames and print the result.

ddf3 = dd.concat([ddf1, ddf2])

print(ddf3.compute())

    animal is_mammal likes_water
0       cat      True         NaN
1   dolphin      True         NaN
2     shark     False         NaN
3  starfish     False         NaN
0     hippo       NaN        True
1      lion       NaN       False

Dask fills in the missing values with NaN to make the concatenation possible.

Concatenating large DataFrames

Let’s create a Dask cluster and concatenate two 31 million row DataFrames to confirm that concat can scale to multi-node workflows.

Create a 5 node Coiled cluster and read in a Parquet dataset into a DataFrame.

import coiled
import dask

cluster = coiled.Cluster(name="concat-cluster", n_workers=5)

client = dask.distributed.Client(cluster)

ddf2000 = dd.read_parquet(
    "s3://coiled-datasets/timeseries/7d/parquet/2000",
    storage_options={"anon": True, "use_ssl": True},
    engine="pyarrow"
)

Run ddf2000.head() to visually inspect the contents of the DataFrame.

Let’s run some analytical queries on ddf2000 to better understand the data it contains.

len(ddf2000) # 31,449,600
ddf2000.npartitions # 52

Now let’s read another Parquet dataset into a separate DataFrame and run some analytical queries.

ddf2001 = dd.read_parquet(
    "s3://coiled-datasets/timeseries/7d/parquet/2001",
    storage_options={"anon": True, "use_ssl": True},
    engine="pyarrow"
)

len(ddf2001) # 31,449,600

ddf2001.npartitions # 52

Concatenate the two DataFrames and inspect the contents of the resulting DataFrame.

ddf = dd.concat([ddf2000, ddf2001])

len(ddf) # 62,899,200
 
ddf.npartitions # 104

ddf.divisions

(Timestamp('2000-01-01 00:00:00'),
 Timestamp('2000-01-08 00:00:00'),
 Timestamp('2000-01-15 00:00:00'),
 …
 Timestamp('2001-12-17 00:00:00'),
 Timestamp('2001-12-24 00:00:00'),
 Timestamp('2001-12-30 23:59:59'))

These DataFrames were concatenated without interleave_partitions=True and the division’s metadata was not lost as we saw earlier.

The DataFrames in this example doesn’t have any overlapping divisions, so you don’t need to set interleave_partitions=True.

Conclusion

Dask makes it easy to vertically concatenate DataFrames.

Dask does not interleave partitions when concatenating by default, but will if you set interleave_partitions=True.  Make sure to use this flag if your downstream queries will benefit from having divisions metadata.

Thanks for reading! If you’re interested in trying out Coiled Cloud, which provides hosted Dask clusters, docker-less managed software, and one-click deployments, you can do so for free today when you click below.

Try Coiled Cloud