Reading CSV files into Dask DataFrames with read_csv

February 9, 2022

Here’s how this post is organized:

  • Reading a single small CSV file
  • Reading a large CSV file
  • Reading multiple CSV files
  • Reading files from in remote data stores like S3
  • Limitations of CSV files
  • Alternative file formats that often perform better than CSV

Lots of datasets are stored with the CSV file format, so it’s important for you to understand the Dask read_csv API in detail.

Dask read_csv: single small file

Dask makes it easy to read a small file into a Dask DataFrame. Suppose you have a dogs.csv file with the following contents:

//first_name,age
fido,3
lucky,4
gus,8//]]>

Here’s how to read the CSV file into a Dask DataFrame.

import dask.dataframe as dd

ddf = dd.read_csv("dogs.csv")

You can inspect the content of the Dask DataFrame with the compute() method.

ddf.compute()

This is quite similar to the syntax for reading CSV files into pandas DataFrames.

import pandas as pd

df = pd.read_csv("dogs.csv")

The Dask DataFrame API was intentionally designed to look and feel just like the pandas API.

For a single small file, Dask may be overkill and you can probably just use pandas. Dask starts to gain a competitive advantage when dealing with large CSV files. Rule-of-thumb for working with pandas is to have at least 5x the size of your dataset as available RAM. Use Dask whenever you exceed this limit. For example, when working on a 16GB RAM machine, consider switching over to Dask when your dataset exceeds 3GB in size.

Dask read_csv: single large file

Dask DataFrames are composed of multiple partitions, each of which is a pandas DataFrame. Dask intentionally splits up the data into multiple pandas DataFrames so operations can be performed on multiple slices of the data in parallel.

Let’s read in a 5.19 GB file (5,190 MB) into a Dask DataFrame. This file is hosted in a public S3 bucket at s3://coiled-datasets/h2o/G1_1e8_1e2_0_0/csv/G1_1e8_1e2_0_0.csv if you’d like to download it yourself.

ddf = dd.read_csv("data/G1_1e8_1e2_0_0.csv")

We can run ddf.partitions to see how many partitions the data is divided into.

ddf.partitions # 82

You can customize the number of partitions that the DataFrame will contain by setting the blocksize parameter when invoking read_csv.

Dask read_csv: blocksize

The number of partitions depends on the value of the blocksize argument. If you don’t supply a value to the blocksize keyword, it is set to “default” and the blocksize is computed based on the available memory and number of cores on the machine, up to a max blocksize of 64 MB. In the example above, Dask automatically splits up the 5,190 MB data file into ~64 MB chunks when run on a Macbook Air with 8 GB of RAM and 4 cores.

We can also manually set the blocksize parameter when reading CSV files to make the partitions larger or smaller.

Let’s read this data file with a blocksize of 16 MB.

ddf = dd.read_csv("data/G1_1e8_1e2_0_0.csv", blocksize="16MB")

ddf.npartitions # 325

The Dask DataFrame consists of 325 partitions when the blocksize is 16 MB. The number of partitions goes up when the blocksize decreases.

Let’s read in this data file with a blocksize of 128 MB.

ddf = dd.read_csv("data/G1_1e8_1e2_0_0.csv", blocksize="128MB")

ddf.npartitions # 41

The Dask DataFrame has 41 partitions when the blocksize is set to 128 MB. There are fewer partitions when the blocksize increases.

Let’s take a look at how Dask infers the data types of each column when a CSV file is read.

The rule of thumb when working with Dask DataFrames is to keep your partitions under 100MB in size.

Dask read_csv: inferring dtypes

CSV is a text-based file format and does not contain metadata information about the data types or columns. When reading a CSV file, Dask needs to infer the column data types if they’re not explicitly set by the user.

Let’s look at the dtypes that Dask has inferred for our DataFrame.

ddf.dtypes

id1     object
id2     object
id3     object
id4      int64
id5      int64
id6      int64
v1       int64
v2       int64
v3     float64
dtype: object

Dask infers dtypes based on a sample of the data. It doesn’t look at every row in the dataset to infer dtypes because that would be prohibitively slow for large datasets.

You can increase the number of rows that are sampled by setting the sample_rows parameter.

ddf = dd.read_csv("data/G1_1e8_1e2_0_0.csv", sample_rows=5000)

For this dataset, increasing the number of rows that are sampled does not change the inferred dtypes.

ddf.dtypes

id1     object
id2     object
id3     object
id4      int64
id5      int64
id6      int64
v1       int64
v2       int64
v3     float64
dtype: object

Inferring data types based on a sample of the rows is error-prone. Dask may incorrectly infer dtypes based on a sample of the rows which will cause downstream computations to error out. You can avoid dtype inference by explicitly specifying dtypes when reading CSV files.

Dask read_csv: manually specifying dtypes

Let’s manually set the id1, id2, and id3 columns to be PyArrow strings, which are more efficient than object type columns, as described in this blog post.

ddf = dd.read_csv(
    "data/G1_1e8_1e2_0_0.csv",
    dtype={
        "id1": "string[pyarrow]",
        "id2": "string[pyarrow]",
        "id3": "string[pyarrow]",
    },
)

ddf.dtypes

id1     string
id2     string
id3     string
id4      int64
id5      int64
id6      int64
v1       int64
v2       int64
v3     float64
dtype: object

Dask will infer the types for the columns that you don’t manually specify.

If you specify the dtypes for all the columns, then Dask won’t do any dtype inference and you will avoid potential errors or performance slowdowns.

Dask read_csv: multiple files

Dask can read data from a single file, but it’s even faster for Dask to read multiple files in parallel.

Let’s write out the large 5.19 GB CSV file from earlier examples as multiple CSV files so we can see how to read multiple CSV files into a Dask DataFrame. Start by writing out the single CSV file as multiple CSV files.

ddf = dd.read_csv("data/G1_1e8_1e2_0_0.csv")
ddf.npartitions # 82
ddf.to_csv("data/csvs")

This will write out 82 files, one for each partition. The files will be outputted as follows.

//data/csvs/
 000.part
 001.part
 002.part//]]>

Let’s read these 82 CSV files into a Dask DataFrame.

ddf = dd.read_csv("data/csvs/*.part")

Reading multiple files into a pandas DataFrame must be done sequentially and requires more code. Here’s the pandas syntax:

import glob
import pandas as pd

all_files = glob.glob("./data/csvs/*.part")

df = pd.concat((pd.read_csv(f) for f in all_files))

Parallel I/O is a huge strength of Dask compared to pandas. pandas is designed for read / write operations with single files. pandas does not scale well with multiple files. Dask is designed to perform I/O in parallel and is more performant than pandas for operations with multiple files.

The same blocksize and dtype arguments we discussed earlier for reading a single file also apply when reading multiple files.

Dask readers also make it easy to read data that’s stored in remote object data stores, like AWS S3.

Dask read_csv: analyzing remote files with localhost compute

You can easily read a CSV file that’s stored in S3 to your local machine. Here’s how to read a public S3 file.

ddf = dd.read_csv("s3:coiled-datasets/timeseries/20-years/csv/0000.part")

ddf.head()

You normally should not analyze remote data on your localhost machine because it’s slow to download the data locally. It’s more natural to process cloud data with cloud compute power. Files stored in the AWS S3 cloud should be processed with AWS ec2 cloud provisioned compute instances.

Let’s look at a S3 folder with a lot of data. The coiled-datasets/timeseries/20-years/csv/ S3 folder has 1,095 files and requires 100 GB of memory when loaded into a DataFrame. Let’s try to run a computation on the entire dataset and see if it works.

ddf = dd.read_csv("s3:coiled-datasets/timeseries/20-years/csv/*.part")

ddf.describe().compute()

I let this computation run for 30 minutes before canceling the query. Running this locally is way too slow.

Let’s see how to read in this large dataset of CSVs to a Dask cluster that contains multiple compute nodes, so we can execute this query faster.

Dask read_csv: read remote files in cluster environment

Let’s spin up a 5 node cluster with Coiled and try to run the same computation with more computing power.

import coiled

cluster = coiled.Cluster(n_workers=5)

client = cluster.get_client()

ddf = dd.read_csv("s3://coiled-datasets/timeseries/20-years/csv/*.part")

ddf.describe().compute()

This computation runs in 5 minutes and 10 seconds. Running this computation on a cluster is certainly faster than running on localhost. It would have taken a very long time for the query to finish on localhost.

Storing the data in a different file format could make the query run even faster.

Limitations of CSV file format

CSV files are commonly used because they’re human readable, but they are usually not the best file format for a data analysis.

Here are the five reasons:

  • Parquet files don’t require schema inference / manual schema specification
  • Parquet files are easier to compress
  • Columnar nature of Parquet files allows for column pruning, which often yields big query performance gains
  • Row group metadata in Parquet files allows for predicate pushdown filtering
  • Parquet files are immutable

See the Dask docs for more details on the advantages of Parquet files.

In addition, CSV files let you save messy data in files, unlike other file formats. CSV lets you save string data in an integer column, whereas Parquet will error out if you try to store string data in an integer column.

Conclusion

Lots of data is stored in CSV files and you’ll often want to read that data into Dask DataFrames to perform analytical queries.

This blog has shown you that it’s easy to load one CSV or multiple CSV files into a Dask DataFrame. You’ve also learned how to set dtypes and customize the number of partitions in the DataFrame by setting the blocksize parameter.

CSV files aren’t usually as performant as a binary, columnar file format like Parquet. Whenever possible, consider converting the CSV files to Parquet. Most analytical queries run faster on Parquet lakes.

With GitHub, Google or email.

Use your AWS or GCP account.

Start scaling.

$ pip install coiled
$ coiled setup
$ ipython
>>> import coiled
>>> cluster = coiled.Cluster(n_workers=500)