Dask Memory Usage
• September 7, 2021
This post shows you how to compute the memory usage of a Dask DataFrame and how to develop a partitioning strategy based on the distribution of your data.
Dask distributes data in DataFrame partitions, so computations can be run in parallel. Each partition in a Dask DataFrame is a pandas DataFrame. This post explains how to measure the amount of data in each Dask partition. Intelligently distributing the data across partitions is important for performance.
There aren’t hard-and-fast rules on optimal partition sizes. It depends on the computing power of the nodes in your cluster and the analysis you’re running.
A general rule of thumb is to target 100 MB of data per memory partition in a cluster. This post shows you how to measure the distribution of data in your cluster so you know when and if you need to repartition.
Here’s what you’ll learn in this post:
- Calculating memory usage of a small Dask DataFrame
- Memory usage of a large Dask DataFrame
- Filtering can cause partition imbalances
- Assessing when a Dask DataFrame’s memory usage is unevenly distributed
- Fixing imbalances with repartitioning
- Other ways to compute memory usage by partition
Memory usage of small Dask DataFrames
Create a small Dask DataFrame with two partitions.
import pandas as pd from dask import dataframe as dd df = pd.DataFrame( {"nums": [1, 2, 3, 4, 5, 6], "letters": ["a", "b", "c", "d", "e", "f"]} ) ddf = dd.from_pandas(df, npartitions=2)
Print the data in each of the partitions.
for i in range(ddf.npartitions): print(ddf.partitions[i].compute()) nums letters 0 1 a 1 2 b 2 3 c nums letters 3 4 d 4 5 e 5 6 f
Use the pandas memory_usage
method to print the bytes of memory used in each column of the first partition.
ddf.partitions[0].memory_usage(deep=True).compute() Index 128 letters 174 nums 24 dtype: int64
Print the total memory used by each partition in the DataFrame with the Dask memory_usage_per_partition
method.
ddf.memory_usage_per_partition(deep=True).compute() 0 326 1 330 dtype: int64
Both of these partitions are tiny because the entire DataFrame only contains six rows of data.
If deep is set to False then the memory usage of the object columns is not counted.
ddf.memory_usage_per_partition(deep=False).compute() 0 176 1 180 dtype: int64
Calculating the memory usage of object columns is slow, so you can set deep to False and make the computation run faster. We care about how much memory all the columns are using, so our examples use deep=True.
Memory usage of large Dask DataFrame
Let’s calculate the memory for each partition of a 662 million-row dataset. See this notebook for all the code snippets used in this blog post.
ddf = dd.read_parquet( "s3://coiled-datasets/timeseries/20-years/parquet", storage_options={"anon": True, 'use_ssl': True} ) ddf.memory_usage_per_partition(deep=True).compute() 0 57061027 1 57060857 2 57059768 3 57059342 4 57060737 ... 1090 57059834 1091 57061111 1092 57061001 1093 57058404 1094 57061989 Length: 1095, dtype: int64
The DataFrame has 1,095 partitions and each partition has 57 MB of data.
The data is evenly balanced across each partition in the DataFrame. There aren’t lots of tiny, empty, or huge partitions. You probably don’t need to repartition this DataFrame because all the memory partitions are reasonably sized and the data is evenly distributed.
Let’s look at an operation that can cause this DataFrame to have data that’s imbalanced across partitions.
Filtering can cause imbalances
Dask DataFrames often become unbalanced after a large filtering operation, as discussed in the Filtering DataFrames post.
This section demonstrates how a large filtering operation can cause a DataFrame to have partitions that are way smaller than optimal.
Let’s run a filtering operation and then examine the memory per partition:
filtered_ddf = ddf.loc[ddf["id"] > 1150] filtered_ddf.memory_usage_per_partition(deep=True).compute() 0 0 1 94 2 0 3 0 4 187 ... 1090 0 1091 189 1092 0 1093 0 1094 0 Length: 1095, dtype: int64
Many partitions in filtered_ddf
are empty and the rest are tiny, way smaller than they should be.
filtered_ddf
should be repartitioned to get rid of all the tiny partitions.
Assessing imbalance in Dask DataFrame partitions
Here’s a helper method to help you measure data imbalance across partitions in a DataFrame.
import numpy def partition_report(ddf): series = ddf.memory_usage_per_partition(deep=True).compute() total = series.count() print(f"Total number of partitions: {total}") total_memory = format_bytes(series.sum()) print(f"Total DataFrame memory: {total_memory}") total = total.astype(numpy.float64) lt_1kb = series.where(lambda x : x < 1000).count() lt_1kb_percentage = '{:.1%}'.format(lt_1kb/total) lt_1mb = series.where(lambda x : x < 1000000).count() lt_1mb_percentage = '{:.1%}'.format(lt_1mb/total) gt_1gb = series.where(lambda x : x > 1000000000).count() gt_1gb_percentage = '{:.1%}'.format(gt_1gb/total) print(f"Num partitions < 1 KB: {lt_1kb} ({lt_1kb_percentage})") print(f"Num partitions < 1 MB: {lt_1mb} ({lt_1mb_percentage})") print(f"Num partitions > 1 GB: {gt_1gb} ({gt_1gb_percentage})")
Let’s run partition_report
on filtered_ddf
and see the partition sizes.
partition_report(filtered_ddf) Total number of partitions: 1095 Total DataFrame memory: 101.71 kiB Num partitions < 1 KB: 1095 (100.0%) Num partitions < 1 MB: 1095 (100.0%) Num partitions > 1 GB: 0 (0.0%)
We can see that all the partitions are less than 1 KB. There is only 102 KB of data in the entire DataFrame. This filtered dataset can easily be repartitioned to a small Dask DataFrame or converted to a pandas DataFrame.
Fixing imbalances with repartitioning
You can repartition your DataFrame when the data is distributed unevenly. See the blog post on repartitioning DataFrames for more information. The repartitioning blog post also quantifies the performance drag of having the data suboptimally partitioned.
Repartitioning is computationally expensive and should not always be performed. The performance drag from a small amount of imbalance can be less than the time it takes to fully repartition a dataset. The optimal strategy depends on your specific dataset, computations, and cluster size.
Other approaches: sizeof and memory_usage
Let’s look at two approaches for calculating the memory for each partition of a 662 million row dataset. You don’t need to know either of these, but they’re fun for language API aficionados to know about.
ddf = dd.read_parquet( "s3://coiled-datasets/timeseries/20-years/parquet", storage_options={"anon": True, 'use_ssl': True} ) ddf.map_partitions(lambda x: x.memory_usage(deep=True).sum()).compute() 0 57061027 1 57060857 2 57059768 3 57059342 4 57060737 ... 1090 57059834 1091 57061111 1092 57061001 1093 57058404 1094 57061989 Length: 1095, dtype: int64
This computation takes 124 seconds on a 5 node cluster.
Dask has a sizeof
function that estimates the size of each partition and runs faster.
ddf.map_partitions(lambda x: dask.sizeof.sizeof(x)).compute() 0 56822960 1 57125360 2 56822960 3 57246320 4 57306800 ... 1090 56974160 1091 57004400 1092 57337040 1093 56822960 1094 57004400 Length: 1095, dtype: int64
This takes 92 seconds to run, which is 21% faster than on the same dataset.
The sizeof
results are an approximation, but they’re pretty close as you can see.
Conclusion
This blog post taught you how to measure the memory usage in your DataFrame with memory_usage_per_partition
, memory_usage
and sizeof
.
Dask makes it easy for you to compute how data is stored in different partitions. This is not as easy with other distributed compute engines.
Make sure to play around with the memory sizes and the number of partitions that work best for your query patterns. 100 MB partitions generally work well, but you might need to make tweaks based on your query patterns and instance types.
Thanks for reading. If you’re interested in trying out Coiled Cloud, which provides hosted Dask clusters, docker-less managed software, and one-click deployments, you can do so for free today when you click below.
