Advantages of Parquet File Format for Dask Analyses

Matthew Powers August 10, 2021

,


This post explains the benefits of the Parquet file format and why it’s usually better than CSVs for most Dask analyses.

File formats like CSV and JSON have the following issues:

  • Require schema inference
  • Don’t allow for column pruning
  • Don’t allow for query pushdown
  • Don’t compress as well

The rest of this blog explains the CSV file format limitations and how Parquet offers improvements on all fronts.  It also shows how Parquet query pushdown can provide massive performance gains and why query pushdown isn’t possible with CSV files.

The concepts covered in this post apply to popular libraries like pandas, Dask, and Spark.

Schema

Parquet files store metadata, including the schema of the dataset, in the file footer.

Each column in a dataset has a name and a type. Look at the following example data:

first_name,age
Linda,34
Paula,65
Grace,12

The first_name column contains string values and the age column contains integer values.

When a library like pandas or Dask reads a Parquet file, it can simply fetch the schema in the file footer to figure out the column names and types.

When Dask is reading a CSV file, here is how it figures out the schema:

  • If the user has not specified the schema manually, then Dask needs to sample parts of the file and try to infer the schema. Dask will try to figure out the schema based on a sample of the file because checking the entire dataset would be computationally expensive. This process is error prone because the types are inferred on samples and might not be accurate.
  • If the user manually specifies the schema, then Dask doesn’t need to attempt to infer the schema. This approach isn’t foolproof because the user may specify a schema that’s wrong. Manually coding the schema for each column in a dataset can be really tedious, especially for wide tables.

The Parquet schema will always properly match the data that’s in the file. You’ll never have a Parquet file with a column that claims to be a string and actually contains integers. Parquet will error out if you try to write a file that’s misconfigured.

When you write a dataset to Parquet, the schema is automatically added. You don’t need to perform any extra steps.

Column pruning

Parquet lets you read specific columns from a dataset without reading the entire file. This is called column pruning and can be a massive performance improvement.

Parquet is a columnar file format, unlike CSV which is a row-based file format. Column pruning is only possible for columnar file formats.

Suppose you have a DataFrame with the following schema:

parquet

Here’s how to read a CSV file (column pruning isn’t possible):

ddf = dd.read_csv(
    "s3://coiled-datasets/timeseries/20-years/csv/*.part", 
    storage_options={"anon": True, 'use_ssl': True}
)

Here’s how to read a Parquet file without leveraging column pruning:

ddf = dd.read_parquet(
    "s3://coiled-datasets/timeseries/20-years/parquet",
    storage_options={"anon": True, 'use_ssl': True}
)

Here’s how to read a single column of a Parquet file with column pruning:

ddf = dd.read_parquet(
    "s3://coiled-datasets/timeseries/20-years/parquet", 
    storage_options={"anon": True, 'use_ssl': True},
    columns=["name"]
)

You need to manually specify the columns argument to take advantage of column pruning with Dask.

Column pruning benchmarks

This section shows the query speeds when column pruning is applied to a 662 million row dataset.

All computations are run on a 5 node Dask cluster that’s hosted by Coiled. Here’s a link to the notebook.

We’ll run a query to get the distinct number of names in the dataset on CSV files, Parquet files, and Parquet files that are read with column pruning. Here’s the query that’ll be run:

ddf["id"].nunique().compute()

Here are the benchmarking results:

Query time (seconds)
CSV files131
Parquet files32.4
Parquet files with column pruning29.8

Column pruning provides a modest speed boost in this example, but it can be much greater on other datasets and for other queries. Simply using the Parquet file format, even without column pruning, also provides a nice query speed-up compared to CSV files.

The benefits of column pruning increase as more columns are skipped. It’s especially beneficial when columns that take a lot of memory, like strings, can be skipped. You can expect a big performance boost if you have a dataset with 100 columns and can skip 90 columns with column pruning.

Predicate pushdown filtering

Query pushdown is when computations happen at the “database layer” instead of the “execution engine layer”. In this case, the database layer is Parquet files in a filesystem, and the execution engine is Dask.

Parquet allows for predicate pushdown filtering, a form of query pushdown because the file footer stores row-group level metadata for each column in the file.

The row group metadata contains min/max values for each row group in the Parquet file and which can be used by Dask to skip entire portions of the data file, depending on the query.  Parquet predicate pushdown filtering is best illustrated with an example.

Suppose you have a dataset with first_name and age columns and would like to get a count of everyone named Astrid that’s between 80 and 90 years old. Further, suppose your data file has three-row groups with the following min/max values.

row_groupFirst_name minFirst_name maxAge minAge max
0camilaluisa395
1anitacarla13103
2anthonymatt424

We can deduce that row group 1 is the only part of the file that’s relevant for our query from the Parquet metadata.

Row group 0 isn’t relevant because the minimum first_name is “higher” than Astrid (alphabetically speaking).

Row group 2 isn’t relevant because the maximum age is lower than the min age threshold in the query.

We can skip row groups 0 and 2 in our query and only run the filtering logic on row group 1. As with column pruning, the more data you can skip, the bigger the performance benefit.

Parquet predicate pushdown filtering can be used in conjunction with column pruning, of course. You can skip both columns and row groups.

Predicate pushdown filtering benchmarks

Let’s run some benchmarks on the same 662 million row timeseries dataset with a 5 node Dask cluster. All the computations you’re about to see are in this notebook.

Predicate pushdown filtering is not possible with CSV files because there are no row groups with metadata. Here’s the syntax for applying predicate pushdowns when reading a Parquet file.

ddf = dd.read_parquet(
    "s3://coiled-datasets/timeseries/20-years/parquet", 
    storage_options={"anon": True, 'use_ssl': True},
    filters=[[('id', '>', 1170)]]
)

Predicate pushdowns can be applied in conjunction with column pruning, like so.

ddf = dd.read_parquet(
    "s3://coiled-datasets/timeseries/20-years/parquet", 
    storage_options={"anon": True, 'use_ssl': True},
    filters=[[('id', '>', 1170)]],
    columns=["id"]
)

Here are the benchmarking results for this particular query:

Run time (seconds)
CSV189
Parquet95
Parquet with predicate pushdown3.7
Predicate pushdown and column pruning2.2

There are only a small number of rows that satisfy our predicate filtering criteria, so we can skip a lot of row groups and get a massive performance boost by leveraging predicate pushdown filtering for this particular query.

Predicate pushdown filtering gotcha

Predicate pushdown filters return row groups that contain at least one value that satisfies the predicate. They’re likely to return rows that don’t meet the predicate as well.

Let’s look at an example.

Let’s look at an example.

ddf = dd.read_parquet(
    "s3://coiled-datasets/timeseries/20-years/parquet", 
    storage_options={"anon": True, 'use_ssl': True},
    filters=[[('id', '>', 1170)]]
)
ddf.head()

So you still need to run the “regular Dask filtering” query after running the predicate pushdown filtering. Take a look at the full code snippet.

ddf = dd.read_parquet(
    "s3://coiled-datasets/timeseries/20-years/parquet", 
    storage_options={"anon": True, 'use_ssl': True},
    filters=[[('id', '>', 1170)]]
)
len(ddf[ddf.id > 1170])

len(ddf[ddf.id > 1170]) returns 65, which is the correct result.

len(ddf) returns 38,707,200, which is far from the correct result.

Sorting data on columns that’ll be used for predicate pushdown filters and creating the right number of row groups per file are imperative for leveraging Parquet predicate pushdown filters to the fullest extent.

Immutability

Parquet files are immutable, which is unfamiliar for analysts that are used to mutable file formats like CSVs.

For example, you can open a CSV file, add a column of data, and save it, thereby mutating the original file.

If you want to add a column of data to a Parquet file, then you need to read it into a processing engine like Dask and write out an entirely new file.

This also applies to deleting rows of data. Suppose you have a Parquet file with 100,000 rows and would like to delete one row of data. You perform this “delete” by reading the entire file into Dask, filtering out the row of data you no longer want, and writing a new 99,999 row file.

In practice, the immutable nature of Parquet files is less important than you’d think. Most files for production data analyses are stored in cloud-based object stores like AWS S3. Cloud object store files are immutable, regardless of file type. CSVs are mutable locally, but not mutable once they’re uploaded to S3.

Immutable files save you from a lot of nasty bugs. Data files should be immutable, and changing data can cause unexpected downstream consequences. Data isn’t usually versioned, so data mutations can’t simply be rolled back.

When to use CSV

CSV files are still widely used because they’re human-readable. Parquet files are binary blobs that can’t be opened and read by humans.

CSVs are a decent option for a small file that needs to be frequently modified by a business user for example.

Lots of datasets you’ll find in the wild are already in the CSV file format. Most data analysts just stick with the existing file format. You can often save yourself a big headache by converting the CSV files to Parquet as the first step of your analysis.

Next steps

You’ve learned about the benefits of Parquet files in this blog, and you should be convinced that Parquet is better than CSV for a lot of data projects.

Pandas, Dask, and other data execution frameworks make it easy to work with Parquet files.  They all have built-in methods for reading and writing Parquet files.

Start using column pruning and predicate pushdown filtering and enjoy running your queries in less time!

Plus, if you haven’t already, you can try Parquet files on Coiled Cloud, which provides hosted Dask clusters, docker-less managed software, and one-click deployments, for free today when you sign up below.

Try Coiled Cloud

Frequently Asked Questions



Ready to get started?

Create your first cluster in minutes.