Parallel XGBoost For Faster Training with Dask in Python

Richard Pelgrim December 20, 2021

, ,


Train XGBoost on 100GB in 4 Minutes

This post demonstrates how you can leverage the Dask XGBoost integration for faster machine learning in parallel, with only minor changes to your existing XGBoost Python code.

You will learn how to:

  1. Train a distributed XGBoost machine learning model locally with Dask on a subset of the data,
  2. Scale your distributed training XGBoost model to the cloud with Coiled to train on all 100GB of data,
  3. Speed up your distributed training with Pro tips from the Dask core team.

Here is the Python XGBoost example we will use if you want to jump right in. The code demonstrates training an XGBoost model on 100GB of training data in less than 4 minutes.

Questions about the code? Join our Community Slack channel

Dask + XGBoost

Parallel Dask XGBoost Model Training with xgb.dask.train()

By default, XGBoost trains your model sequentially. This is fine for basic projects, but as the size of your dataset and/or XGBoost model grows, you’ll want to consider running XGBoost in distributed mode with Dask to speed up computations and reduce the burden on your local machine.

You’ll know when you’ve hit the memory limits of your machine when you get the following error message:

xgboost.core.XGBoostError:  out of memory

Or the infamous:

The kernel for coiled-xgboost-synthetic-100GB.ipynb appears to have died. It will restart automatically.

XGBoost comes with a native Dask integration that makes it possible to train multiple models in parallel. Running an XGBoost model with the distributed Dask backend only requires two changes to your regular XGBoost code:

  1. substitute dtrain = xgb.DMatrix(X_train, y_train)
    with dtrain = xgb.dask.DaskDMatrix(X_train, y_train)
  2. substitute xgb.train(params, dtrain, ...)
    with xgb.dask.train(client, params, dtrain, ...)

Let’s see this in action with a hands-on data science project.

We’ll be working with a synthetic 100GB dataset stored in a public Amazon S3 bucket which we’ll load into a Dask DataFrame. Using the free Coiled Cloud subscription, you can run the entire Python XGBoost example in this notebook yourself.

Run the XGBoost Dask integration locally

First, instantiate a local version of the Dask distributed scheduler, which will orchestrate training your model in parallel. 

from dask.distributed import Client, LocalCluster

# local dask cluster
cluster = LocalCluster(n_workers=4)
client = Client(cluster)
client

Since this is synthetic data, we won’t be doing any preprocessing. For an example on real-world data that does include preprocessing, check out this notebook that trains XGBoost using a Dask DataFrame containing a 20GB subset of the ARCOS dataset. When you’re done preprocessing, you can create your train and test splits using the dask ml library.

In this section, we are working with data_local, a subset of the entire dataset containing only the first 50 partitions.

from dask_ml.model_selection import train_test_split
 
# Create the train-test split
X, y = data_local.iloc[:, :-1], data_local["target"]
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.3, shuffle=True, random_state=2
)

Now you’re all set to train your XGBoost model.

Let’s use the default parameters for this example.

import xgboost as xgb

# Create the XGBoost DMatrices
dtrain = xgb.dask.DaskDMatrix(client, X_train, y_train)
dtest = xgb.dask.DaskDMatrix(client, X_test, y_test)

# train the model
output = xgb.dask.train(
    client, params, dtrain, num_boost_round=4,
    evals=[(dtrain, 'train')]
)

You can then use your trained model together with your testing split to make predictions.

# make predictions
y_pred = xgb.dask.predict(client, output, dtest)

As you can see, not much has changed from your regular XGBoost code. We’ve just used xgb.dask.DaskDMatrix instead of xgb.DMatrix and called  xgb.dask.train() instead of xgb.train(). Note that the input to DaskDMatrix does not have to be a Dask DataFrame; it can also be a Dask Array.

Now if you were to train XGBoost on the entire Dask DataFrame data, however, Dask would throw a MemoryError because your are still running locally:

MemoryError

distributed.worker - WARNING - Worker is at 80% memory usage. Pausing worker.  Process memory: 1.49 GiB -- Worker memory limit: 1.86 GiB

Scaling your Dask XGBoost Training to the Cloud

Let’s now process the entire 100 GB dataset by scaling XGBoost to the cloud.

You’ll need to make 2 changes to the Python code we wrote above:

  1. Connect Dask to a Coiled cluster in the cloud instead of to our local CPU cores,
  2. Work with the entire 100 GB dataset instead of the local subset.
  1. Spin up your Cluster

We’ll start by launching a Coiled cluster in the cloud that can run your pipeline on the entire dataset. To run the Python code in this section, you’ll need a Coiled Cloud account. Log into Coiled Cloud with your Github credentials to create one.

import coiled

cluster = coiled.Cluster(
    name="xgboost",
    software="coiled-examples/xgboost",
    n_workers=50,
    worker_memory='16Gib',
    shutdown_on_close=False,
)

Note if you’re using the Coiled Free Tier you’ll have to decrease the n_workers argument to 25 as this Tier allows you a maximum of 100 concurrent cores, and the default setting is 4 cores per Dask worker.

Finally, instruct the Dask scheduler to run computations on your Coiled cluster.

# connect Dask to your Coiled cluster
from dask.distributed import Client
client = Client(cluster)
client
  1. Use the entire dataset

For this section, be sure to use the whole Dask DataFrame data containing 2750 partitions, rather than the data_local subset containing only the first 50 partitions we used above. 

# download data from S3
data = dd.read_parquet(
    "s3://coiled-datasets/synthetic-data/synth-reg-104GB.parquet/", 
    compression="lz4",
    storage_options={"anon": True, 'use_ssl': True},
)

data
  1. Train XGBoost in the Cloud

You can now re-run all the same code from above. All computations will be run on the multiple machines in your Coiled cluster in the cloud rather than on your local single machine. This means you’ll have orders of magnitude more compute at your disposal.

%%time 
# train the model 
output = xgb.dask.train(
    client, params, dtrain, num_boost_round=5,
    evals=[(dtrain, 'train')]
)

CPU times: user 17.5 s, sys: 3.43 s, total: 20.9 s
Wall time: 3min 24s

And as you can see in the Coiled Cloud dashboard, training XGBoost on 100GB of data in the cloud cost us $0.83. Coiled Cloud also hosts a Dask interface to follow along with running computations. Read more about the Dask dashboard in the Pro tips below.

  1. Shut down your cluster

After our training is done, we can save the model and close down the cluster, releasing the resources. Had we forgotten to do so for whatever reason, Coiled automatically shuts down clusters after 20 minutes of inactivity, to help avoid unnecessary costs.

# Shut down the cluster
client.cluster.close()

Pro Tips to speed up XGBoost training

We’ve collected a set of pro tips straight from the Dask core team to help you speed up your XGBoost training and get immediate value for your data science workflows:

  • Increase the number of Dask worker in your cluster. This will speed up your computations. If you’re running into the limits of your Free Tier, you can sign up for Coiled Pro.
  • Re-cast columns to less memory-intensive dtypes. For example, convert float64 into int16 whenever possible. This will reduce the memory load of your dataframe and speed up training.
  • Use the Dask Dashboard to spot bottle-necks and identify opportunities for increased performance in your code. Watch the original author of Dask, Matt Rocklin, explain how to get the most out of the Dask Dashboard here.
  • Pay attention to unmanaged memory. Read Dask core contributor Guido Imperiale’s blog on how to tackle the issue of unmanaged memory in Dask workers here

Parallel XGBoost in Python Summary

Let’s recap what we’ve discussed in this post:

  • Out of the box, XGBoost cannot be trained on large datasets that exceed your local memory.
  • You can leverage the multiple cores in your single machine by connecting XGboost to a local Dask cluster.
  • You can burst to the cloud for even higher performance gains with Coiled
  • You can tweak your distributed XGBoost performance by inspecting the Dask Dashboard.

We’d love to see you apply distributed training of your XGBoost model to a dataset that’s meaningful to you. If you’d like to try, swap your dataset into this Python XGboost example notebook and see how well it does! Let us know how you get on in our Coiled Community Slack channel!

Thanks for reading! And if you’re a data scientist interested in trying out Coiled Cloud, which provides hosted Dask clusters, docker-less managed software, and one-click deployments, you can do so for free today when you click below.

Try Coiled Cloud

Frequently Asked Questions


The original Washington Post Arcos dataset is stored as a .tsv file. The Coiled team has pre-processed the dataset into Parquet format to reduce the filesize and enable faster read/write and column pruning. Read Matthew Powers’ blog on the advantages of Parquet to learn more.

The Dask Dashboard is the best place to keep Read Pavithra Eswaramoorthy’s blog on how to track your Dask computations for different scheduler types. You may also be interested in our blog on setting up the Dask Dashboard in your Jupyter Lab environment.


Ready to get started?

Create your first cluster in minutes.