Accessing the NYC Taxi Data in 2022

Richard Pelgrim May 17, 2022

, ,


As of May 13, 2022, access to the NYC Taxi data has changed. Parquet has now become the new default file format, instead of CSV. Practically, this means you will need to change two things in your code:

  1. Change the path to the S3 bucket
  2. Use the dd.read_parquet() method instead of your usual dd.read_csv() or pd.read_csv()
# read in 2012 Parquet data
ddf = dd.read_parquet(
    "s3://nyc-tlc/trip data/yellow_tripdata_2012-*.parquet",
)

This post provides context on the change, explains the benefits of the Parquet file format and demonstrates running computations on 11 years of NYC Taxi data using a Coiled cluster. It also shows you can still access the old CSV files and explains why you probably don’t want to do this.

source: www1.nyc.gov

What You’ve Been Doing for Years

The NYC TLC dataset is one of the most well-known public datasets. It’s one of the few public datasets that is both large (>100GBs) and relatively clean. Because of this, many companies use it for demos and to run internal tests. The dataset has been a reliable feature of the big data landscape for well over a decade.

Over the weekend, this suddenly changed. If you now try to run your familiar read_csv call, you will run into an IndexError: list index out of range:

# read in 2012 CSV data
ddf = dd.read_csv(
    "s3://nyc-tlc/trip data/yellow_tripdata_2012-*.csv",
)
---------------------------------------------------------------------------
IndexError                                Traceback (most recent call last)
Input In [21], in <cell line: 2>()
      1 # read in 2012 CSV data
----> 2 ddf = dd.read_csv(
      3     "s3://nyc-tlc/trip data/yellow_tripdata_2012-*.csv",
      4 )

File ~/mambaforge/envs/dask-dataframes/lib/python3.9/site-packages/dask/dataframe/io/csv.py:741, in make_reader.<locals>.read(urlpath, blocksize, lineterminator, compression, sample, sample_rows, enforce, assume_missing, storage_options, include_path_column, **kwargs)
    728 def read(
    729     urlpath,
    730     blocksize="default",
   (...)
    739     **kwargs,
    740 ):
--> 741     return read_pandas(
    742         reader,
    743         urlpath,
    744         blocksize=blocksize,
    745         lineterminator=lineterminator,
    746         compression=compression,
    747         sample=sample,
    748         sample_rows=sample_rows,
    749         enforce=enforce,
    750         assume_missing=assume_missing,
    751         storage_options=storage_options,
    752         include_path_column=include_path_column,
    753         **kwargs,
    754     )

File ~/mambaforge/envs/dask-dataframes/lib/python3.9/site-packages/dask/dataframe/io/csv.py:520, in read_pandas(reader, urlpath, blocksize, lineterminator, compression, sample, sample_rows, enforce, assume_missing, storage_options, include_path_column, **kwargs)
    515     paths = get_fs_token_paths(urlpath, mode="rb", storage_options=storage_options)[
    516         2
    517     ]
    519     # Infer compression from first path
--> 520     compression = infer_compression(paths[0])
    522 if blocksize == "default":
    523     blocksize = AUTO_BLOCKSIZE

IndexError: list index out of range

Switch to Parquet

Parquet has become the new default for the NYC TLC data. To access the data, you will need to:

  1. Change the path to the S3 bucket to use the .parquet file extension instead of .csv
  2. Use the dd.read_parquet() method instead of dd.read_csv()
# read in 2012 Parquet data
ddf = dd.read_parquet(
    "s3://nyc-tlc/trip data/yellow_tripdata_2012-*.parquet",
)

ddf.head()
Table is truncated for readability

Dask is the best way to read the new NYC Taxi data at scale. Dask enables you to maximise the parallel read/write capabilities of the Parquet file format.

You can also use pandas with pd.read_parquet() but this would mean you are limited to using only a single CPU core to process your data. This will make your workflow slower and less scalable.

Read this post to learn more about how Dask can help speed up your data analyses.

Old Habits Die Hard

You can still access the CSV data in the csv_backup directory:

# read in 2012 CSV data
ddf = dd.read_csv(
    "s3://nyc-tlc/csv_backup/yellow_tripdata_2012-*.csv",
)

Note that these CSV files will suffer from slower parallel I/O, less compression options and no column pruning or predicate pushdown than the new Parquet files. If you are working at scale – and unless you have a very strong reason to use CSV – you should generally use Parquet rather than CSV. Read this blog to learn more about how to write Parquet files with Dask.

It Gets Even Better

The only drawback of the NYC TLC Parquet files is that downloading these parquet files takes a long time, as there are 12 very large partitions per year. It’s best to repartition the dataset to a more optimal size for parallel IO and faster computations.

ddf = ddf.repartition(partition_size="100MB")

The code below demonstrates performing a groupby computation on the NYC TLC data for the years 2011 to 2021. This is more than 200GB of data uncompressed on disk. It’s unlikely that your local machine has the RAM to run this analysis. We’ll be running our computations in the cloud on a Coiled cluster with 50 workers and 16GB RAM each. Read our documentation to get started with Coiled.

from coiled import Cluster
from distributed import Client
import dask.dataframe as dd

# launch Coiled cluster
cluster = Cluster(
    name="dataframes",
    n_workers=50,
    worker_memory="16GiB",
    software="coiled-examples/dask-dataframes",
    scheduler_options={"idle_timeout": "2 hour"},
)

# connect Dask to Coiled
client = Client(cluster)

Now we can load in our dataset:

# read in all data for 2011-2021
ddf = dd.concat([dd.read_parquet(f"s3://nyc-tlc/trip data/yellow_tripdata_{i}-*.parquet") for i in range(2011, 2022)])

# repartition dataset
ddf = ddf.repartition(partition_size="100MB").persist()

And now run our groupby computation:

%%time 
# perform groupby aggregation 
ddf.groupby('passenger_count').trip_distance.mean().compute() 
CPU times: user 526 ms, sys: 55.1 ms, total: 582 ms Wall time: 10.3 s

passenger_count
49.0      0.000000
208.0     0.241961
10.0      0.386429
19.0      0.690000
211.0     0.970000
192.0     1.010000
254.0     1.020000
223.0     1.160000
96.0      1.195000
177.0     1.340000
33.0      1.615000
249.0     1.690000
193.0     1.740000
112.0     1.800000
97.0      1.870000
129.0     2.050000
37.0      2.160000
0.0       2.522421
47.0      2.560000
15.0      2.590000
255.0     2.953333
6.0       2.975480
5.0       3.001735
70.0      3.060000
7.0       3.288784
247.0     3.310000
58.0      3.460000
225.0     4.830000
8.0       4.950078
250.0     5.010000
4.0       5.028690
9.0       5.675410
2.0       5.869093
3.0       5.931338
1.0       6.567514
61.0      8.780000
65.0     18.520000
36.0     20.160000
Name: trip_distance, dtype: float64

Parquet is Your New Best Friend

Yes, this access change is painful and means you may have to update some legacy code. But the change is made for good reason: Parquet is a much more efficient file format, especially when working with datasets of this scale that are best read in parallel. Parquet enables you to perform time-saving operations like parallel IO, column pruning, and predicate pushdown.

Excerpt from this blog

Read our post on all the benefits of using Parquet over CSV or JSON to learn more and inspect the above benchmarks in more detail.


Ready to get started?

Create your first cluster in minutes.