Improving the Parquet Experience with Dask

Matt Powers, Ian Rose, and Jim Crist-Harif July 13, 2022

,

Parquet is a wonderful file format for storing and processing tabular data, and many organizations and toolchains have adopted it as the standard way to store their data at rest. In recent months, Dask developers have been leading a push to improve the developer experience around interacting with Parquet datasets using Dask. This effort has been a collaboration between engineers at Coiled, NVIDIA, and Anaconda. In particular, the Coiled team wants to shout out the efforts of Rick Zamora (NVIDIA) and Martin Durant (Anaconda).  The recent changes have made Dask’s processing of Parquet files faster, more scalable, and more user-friendly.  These optimizations were based on community research to find common Parquet usage patterns and pain points.

Try Coiled

Why Parquet?

See this video for 5 reasons Parquet is better than CSV files if you’re unfamiliar with the advantages of Parquet files.

You can also read this blog post that demonstrates the nice performance improvements that Parquet column pruning and Parquet predicate pushdown filtering allow for.

Parquet is generally a much better file format for analysis of tabular data compared to CSV, JSON, and other alternatives.  Parquet is faster, more compact, and cloud-friendly.

Upshot

Recent versions of Dask provide a much smoother user experience when processing Parquet datasets (especially large ones). Data scientists and data engineers should now be able to more easily scale their analyses to the many-terabyte scale.

The following code snippet reads and writes about 15 GB of data (~1 billion rows) from the NYC Taxi and Limousine Commission (TLC) open dataset.

# Get yellow trip rides from the last ~ten years
fs = s3fs.S3FileSystem(anon=True)
files = fs.glob("s3://nyc-tlc/trip data/yellow_tripdata**.parquet")
files = ["s3://"+f for f in files if int(f[-15:-11]) > 2011]

# Read a subset of the the parquet dataset
ddf = dd.read_parquet(
	files,
	columns=["passenger_count", "trip_distance", "fare_amount", "tip_amount"],
	storage_options={"anon": True},
	engine="pyarrow",
)

# Add a new column to the dataset
ddf = ddf.assign(transfer_time=pandas.Timestamp.now())

# Write the transformed data to a new location
ddf.to_parquet("s3://oss-shared-scratch/scratch_yellow_tripdata/")

With the version of Dask from early April of this year (2022.4.0) this takes about 72 seconds.  With the version from early June of this year (2022.6.0), it takes about 48 seconds (an over 30% speed-up).

 These benchmarks were run using 10-worker clusters using t3.large EC2 instances, with both compute and data in the us-east-1 AWS region.

As one scales to larger and larger datasets, new issues can arise that you don’t see at smaller scales. In addition to the performance improvements, we’ve also invested in scalability improvements. In the April release of Dask, the following workflow writing over 30 terabytes of synthetic data using a large cluster would simply fail:

ddf = dask.datasets.timeseries(
	dtypes={
    	**{f"name-{i}": str for i in range(50)},
    	**{f"price-{i}": float for i in range(50)},
    	**{f"id-{i}": int for i in range(50)},
    	**{f"cat-{i}": "category" for i in range(50)},
	},
	start="2002-01-01",
	end="2022-01-01",
	freq="10ms",
	partition_freq="1H",
)
ddf.to_parquet("s3://your-s3-bucket/parquet-terabytes/")

But in the most recent releases of Dask, the above completes smoothly:

 This test was run using a 500-worker cluster of t3.2xlarge EC2 instances, with both compute and data in the us-east-2 AWS region.

Let’s get into some of the specific improvements that we made to improve how Dask handles Parquet I/O at scale.

Specific Improvements

Changes to metadata handling

Parquet files contain metadata in the file footer.  The metadata includes information about the file schema, various statistics, and more for each column in each row group. Some tools (Dask among them) can also write a global metadata file containing the metadata for every partition in the dataset. When this global metadata is available during parquet reading it can be very helpful for constructing query plans. But it has the downside of being a single synchronization step in parquet writes which might fail. In particular, with large, many-partition, many-column datasets, a global metadata file can take a significant amount of memory to read/write.

Prior to version 2022.4.2, Dask’s to_parquet writer was writing a metadata file by default, which could cause large Parquet dataset I/O to crash Dask workers.  This was particularly annoying because the metadata file gets written out after all the Parquet files are written.  So a large data processing job might be running for many minutes or hours, and then you would get a metadata-related error on the very last task.

After version 2022.4.2, Dask has changed the to_parquet settings to not write the metadata file by default (Apache Spark did the same thing several years ago).  You now need to manually set write_metadata_file=True if you’d like to output the metadata file when writing with to_parquet.

Default Parquet engine

Dask supports the PyArrow and fastparquet engines for writing Parquet files.

Both PyArrow and fastparquet work well in most cases.  In recent years, PyArrow’s popularity has exploded as Arrow has become the de facto standard in memory data layout format for several large projects. pandas has adopted PyArrow as the default Parquet engine and Dask has followed suit.

The PyArrow Parquet engine opens the door for certain performance and memory optimizations, including the possibility of using pandas’ new pyarrow[string] dtype in the near future.  See this video from Matt Rocklin to learn more about this new dtype and why it’s more efficient.

Loading row groups

By default, Dask used to load each row group of a Parquet file into a separate partition.  So a dataset with three files, each with four row groups, was loaded into 12 partitions. This was fine, but it required some additional metadata parsing up-front, making Parquet I/O slower and more complex than it needed to be. Often, the logical unit of partitioning is simply a single file, and doing additional metadata logic just isn’t necessary.

Dask’s new default is to load each file into a separate partition. The Dask team has found that this works well for a wide range of Parquet datasets found in the real world. This isn’t foolproof, however, as Parquet files can still be too small or too big for optimal processing in Dask.  For those cases, Dask still provides options to be able to split among row groups or to aggregate multiple files into partitions.

Don’t compute Dask DataFrame divisions by default

When reading Parquet files, Dask used to calculate dataframe divisions by default, which works well for some workflows, but could also be slow.  Sometimes Dask can’t infer useful divisions for a dataset (e.g., if it has no structured partitioning or if the partitioning can’t be well-represented by Dask DataFrame’s divisions), so reading Parquet files could result in extra computation with no benefit for the user.

In newer versions of Dask, divisions aren’t calculated by default anymore. If a Parquet dataset does have useful partitioning, you can get the previous behavior by setting calculate_divisions to be True.  Having known divisions allows Dask to perform some nice optimizations later in a data processing pipeline.

Try Coiled

Simpler, more scalable schema inference

Certain custom workflows can produce Dask DataFrames with unknown or poorly-constrained data schemas. In those cases, Dask used to eagerly compute a partition before writing it in order to better infer its schema. This was both complex and counter to Dask best-practices. In more recent versions of Dask, schema inference is based on simpler heuristics and doesn’t rely on eager computations. For cases where that inference is incorrect, we now recommend users explicitly provide a data schema to their Parquet I/O.

Avoiding rate limiting

Some remote filesystems or object stores like S3 or GCS limit the rate at which you can perform filesystem-like operations like reading, writing, or listing files. This puts a cap on how often a user can repeat an action.  As the size of datasets increases it’s more likely that you’ll hit the rate limit of your cloud storage provider.  Dask relies on the s3fs library to interact with Amazon S3.  As part of this effort, we increased s3fs’s robustness to rate-limiting, now responding to such errors by retrying with exponential backoff. While it may still be possible to get rate-limited when performing I/O with S3, it is now significantly less likely.

Interoperability with Apache Spark and other engines

Parquet is a great format for transferring an analysis from one computation engine to another.  Some engineering organizations will write out Parquet files with Spark and pick up the analysis with Dask or vice versa. Storing intermediate results on disk is a common design pattern when building ETL pipelines, even when the same execution engine is used for various stages.nIt’s important to the Dask community that it can interoperate frictionlessly with Parquet written by Spark.

In the past, various changes to partitioning schemes or metadata handling have introduced regressions to this interoperability which were not caught as early as they could have been. For instance, Spark outputs a _SUCCESS file after writing a directory of Parquet files. This file could in some circumstances interfere with Dask’s inference of parquet partitioning schemes, resulting in difficult-to-debug errors. In addition to fixing this and related Spark interoperability issues, the Dask team now maintains a suite of tests to prevent further compatibility regressions in the future. This increases the likelihood that teams using Spark & Dask will have a delightful development experience.

Parquet Best Practices

One result of this recent focus on Parquet I/O is a brand new documentation page for Parquet best practices with Dask so users have a high quality reference manual when they’re working with Parquet files.

Parquet I/O is one of the most searched for topics with Dask. High-quality, community-maintained documentation is the best way to answer user questions now, and into the future.

Conclusion

Parquet I/O is a crucial API for smooth data engineering workflows using Dask. Over the last several months, the Dask developer team has made a coordinated push to improve the developer experience to make reading and writing Parquet files easier, faster, and more scalable.

Dask is maintained by developers from a variety of different organizations.  The cross collaboration between developers from different backgrounds is part of how Dask continuously evolves to best suit the needs of the Dask community.

Try Coiled for Free

Thanks for reading. If you’re interested in trying out Coiled, which provides hosted Dask clusters, docker-less managed software, and one-click deployments, you can do so for free today when you click below.

Try Coiled


Ready to get started?

Create your first cluster in minutes.