Michal Mucha February 18, 2021

XGBoost – frictionless training on datasets too big for the memory

, , ,


TL;DR

XGBoost does best when it can train on the entire dataset at once. What to do when we don’t have enough RAM? 

Bursting XGBoost training from your laptop to a Dask cluster allows training on out-of-core data, and saves hours of engineering work.

We demo an accessible workflow to run your training on a temporary, Python-native cluster straight from your own notebook or script. Check it out, run it on your own data!

What we’ll cover:

  1. Loading and transforming data on a distributed cluster
  2. Training XGBoost on Dask
  3. How does the distributed XGBoost implementation work
  4. Bursting to the cloud as and when needed
  5. Visualizing performance of computations on a cluster with Dask Dashboards

Feel free to skip ahead to the part you’re most interested in!

Here is the code we will use, if you’d like to jump right in:

Questions about the code? Join our Community Slack channel.

import coiled
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster

# Create a local Dask Cluster
cluster = LocalCluster() # all the power of our local machine

# or a Coiled Cloud cluster
cluster = coiled.Cluster(
    n_workers=4,
    worker_cpu=4,
    worker_memory='24GiB', # As much as we need
    software='michal-mucha/xgboost-on-coiled' # Public docker image
)

# Connect to the cluster
client = Client(cluster)

# Load the example dataset sample - specify columns
columns = [
    "interest_rate", "loan_age", "num_borrowers", 
    "borrower_credit_score", "num_units"
]

categorical = [
    "orig_channel", "occupancy_status", "property_state",
    "first_home_buyer", "loan_purpose", "property_type",
    "zip", "relocation_mortgage_indicator", "delinquency_12"
]

# Download data from S3
mortgage_data = dd.read_parquet(
    "s3://coiled-data/mortgage-2000.parq/*", 
    compression="gzip",
    columns=columns + categorical, 
    storage_options={"anon": True}
)

# Cache the data on Cluster workers
mortgage_data = mortgage_data.persist()

# Cast categorical columns to the correct type
from dask_ml.preprocessing import Categorizer

ce = Categorizer(columns=categorical)
mortgage_data = ce.fit_transform(mortgage_data)
for col in categorical:
    mortgage_data[col] = mortgage_data[col].cat.codes


# Create the train-test split
from dask_ml.model_selection import train_test_split

X, y = mortgage_data.iloc[:, :-1], mortgage_data["delinquency_12"]
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.1, shuffle=True, random_state=2
)

# Create the XGBoost DMatrix
import xgboost as xgb

dtrain = xgb.dask.DaskDMatrix(client, X_train, y_train)    

# Set parameters
params = {
    "max_depth": 8,
    "max_leaves": 2 ** 8,
    "gamma": 0.1,
    "eta": 0.1,
    "min_child_weight": 30,
    "objective": "binary:logistic",
    "grow_policy": "lossguide"
}


# train the model
%%time 
output = xgb.dask.train(
    client, params, dtrain, num_boost_round=5,
    evals=[(dtrain, 'train')]
)

booster = output['booster']  # booster is the trained model
history = output['history']  # A dictionary containing evaluation 

# Set down the cluster
client.close()

When in doubt, use XGBoost 

– Owen Zhang, Kaggle Grandmaster

XGBoost reliably wins Kaggle competitions, is fast, and easy to use. It wins in business applications, which makes it a very popular model, with a great community and many exciting deployment stories.

All you really need to get some machine learning goodness flowing is the library, a computer to run it on, and a tabular dataset.

We’re all set if our data fits in memory. But what if it doesn’t? This is probably why you’re reading this post!

XGBoost works best with the entire dataset available during training. 

We would usually have a set of solutions with very limited appeal – using swap memory, limiting our data, buying hardware, spending days on a data streaming harness.. 

Friction, delay, complexity.

There must be a better way!

:

Swap memory is slow!

Enter Dask – Distributed XGBoost on a Cluster

Why a cluster? With a cluster, you can pool the memory of a lot of machines, which is usually cheaper (or less impossible) than getting a suitable single machine.

Companies like Walmart and institutions like Harvard Medical School use Dask to work on very large problems with quick iteration cycles. Dask scales native Python code – we can use it to easily run our XGBoost work on larger data!

Official integration

A year ago, in February 2020, the XGBoost team released an official integration with Dask. An official integration means faster introduction of bleeding-edge features, as well as the recognition and support for the community of Dask users.

This incredibly useful combo was already available through extensions such as dask-xgboost and the scikit-learn API – both very familiar to many Python data practitioners. In this post, we will use the new, official integration. If you prefer the sklearn API, we have you covered with a notebook example over here!

Predicting loan delinquency

In this post, we will use `xgboost.dask` to train a model to predict loan delinquency on the Fannie Mae Single-Family Loan Performance Data prepared by the team at NVIDIA RAPIDS.

The entire dataset consists of 17 years (2000-2016) of monthly records, with the total uncompressed CSV size reaching close to 200GB. We will be using an 8GB sample – one year of data – to build out code that can successfully run on the entire history. The dataset sample is saved in the efficient parquet format. The original CSVs are available here.

From local to distributed

We will start locally with a sample, then scale out to use the full power of a local machine, and finally expand to a distributed Dask cluster in the cloud.

We would love to see you apply distributed XGBoost to a dataset that’s useful to you.

If you’d like to try, check out our scaling-xgboost example notebook, swap in your dataset, and see how well it does!

Let’s start by launching a local Dask cluster

import dask.dataframe as dd
from dask.distributed import Client, LocalCluster

cluster = LocalCluster(n_workers=8)
client = Client(cluster)

When working with big datasets, I like to start out on a small sample in JupyterLab on my local machine. Using Dask, I can design the entire workflow using a local cluster. 

When I need more power, all it takes is to swap out my local cluster for a bigger one.

Loading the dataset from the cloud, I pin it to cluster memory – I will need it for next steps:

mortgage_data = dd.read_parquet(
    "s3://coiled-data/mortgage-2000.parq/", 
    compression="gzip", 
    columns=columns, 
    storage_options={"anon":True}
)

mortgage_data = mortgage_data.persist()

When building the workflow, I need to account for the dynamics of running code on a distributed cluster. For example, network transfer cost between the scheduler, workers, and the client is not a factor in local development. Things change when I move out of my computer.

Distributed Preprocessing with Dask-ML

The dataset in this demo comes with pre-built features, but the model I want to use has a data type restriction – only float, integer, and boolean features are accepted. 

As some of the columns contain categorical data provided as strings, I need to transform them.

Dask-ML comes in very handy, with a friendly API that will make any scikit-learn user feel at home. For my use case, the `Categorizer` object is an excellent match.

from dask_ml.preprocessing import Categorizer

ce = Categorizer(columns=categorical)

I initialize the object with a prepared list of categorical columns. Then, I am ready to transform my data.

mortgage_data = ce.fit_transform(mortgage_data)
mortgage_data.dtypes
# Out:
delinquency_12                      category # target variable
interest_rate                        float64
loan_age                             float64
orig_channel                        category
...

At this point, we could add more features, using Dask to parallelize the pandas work across all the cores on my computer or cluster. 

Before jumping into model training, I will use Dask-ML to split my distributed dataframe into training and validation sets.

from dask_ml.model_selection import train_test_split

X, y = mortgage_data.iloc[:, 1:], mortgage_data["delinquency_12"]
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.1, shuffle=True, random_state=2
)

From small data to big data in one line of code

The official integration with Dask released by the XGBoost team makes distributed training on a Dask cluster as simple as typing `xgboost.dask.train` instead of the local `xgboost.train`. 

It also provides a distributed variant of its internal data structure, the DMatrix, which can be built directly from a distributed Dask dataframe. 

This operation will trigger materialization of the lazily scheduled Dask DF operations:

import xgboost as xgb
dtrain = xgb.dask.DaskDMatrix(client, X_train, y_train) 

Let’s train XGBoost to predict loan performance:

output = xgb.dask.train(
    client=client,
    params={
        'objective': 'binary:logistic',
        'max_depth': 8
    },
    dtrain=dtrain,
    num_boost_round=10,
    evals=[(dtrain, 'train')]
)

After training, we can access the XGBoost model and the history of training evaluations:

booster = output['booster'] 
history = output['history']

It’s worth noting that the booster is your regular XGBoost model, not a Dask-specific object. 

It really is as easy as that to train a model at scale. With such powerful infrastructure, we can pool the resources of many computers to solve bigger problems faster, while using the same high-level interface.

This integration is a great example of how Dask democratizes access to scalable computing resources.

How does distributed XGBoost on Dask implement distributed training?

In a grand simplification, model training consists of the sequential building of decision trees, adding decision nodes one by one. That occurs through iteration over data to find the best combination of feature and threshold.

At each step of the sequential operation, XGBoost can evaluate possible splits in parallel, gather the results and determine the best path to follow. On a single machine, data is readily available to multiple worker threads. The threads can speak to each other like rowers sitting in the same rowboat. 

In a distributed context, the cost of synchronization over the network becomes significant. Instead of threads sharing the same RAM, we have computers sending packets to each other over the wire.

The library streamlines that as much as possible, and yet still it is an important consideration – a cluster of fewer, more powerful machines is more useful for this scenario than a cluster of many low-resource machines. If you’d like to jump into the technical detail, have a look at this conversation on XGBoost forums.

Scaling to the cluster, as and when needed

Clusters are cool and powerful, but let’s be honest – that’s not reason enough to be using them. Why, and when, do I need one? As scikit-learn core contributor Andreas Mueller shares on our blog, sampling the dataset is a great way to get going. Local work on smaller datasets is very productive, with fast iteration times that inspire good ideas. 

There usually comes a time where we decide to go for the entire dataset – test our assumption on the broader population, see how well the model does, find the optimal set of hyperparameters.

Custom Dask clusters, immediately

If that is your workflow, Dask on Coiled blends in very smoothly. You can get just the right cluster running just the right software in 2 minutes, straight from a code cell in your Jupyter Notebook. Because it runs on native Python, there’s no need to change your code, port it to PySpark, and you can even pip-install fresh dependencies onto your worker nodes during an interactive session.

Having worked out my approach, I am now ready to carefully take advantage of more computing power for great good. 

I can build my software environment on the cloud using conda, pip or docker configurations:

import coiled
coiled.create_software_environment(
   name="xgboost-on-coiled",
   conda={
      "channels": [
        "conda-forge"
      ],
      "dependencies": [
        "coiled",
        "dask-ml",
        "dask>=2.23.0",
        "fastparquet",
        "pandas>=1.1.0",
        "python-snappy",
        "s3fs",
        "scikit-learn",
        "xgboost>=1.3.0"
      ]
    }
)

After my environment is ready, I am free to use it for all future deployments.

Launching the following line will get my cluster up, directly from my JupyterLab session:

cluster = coiled.Cluster(
   n_workers=10, software='michal-mucha/xgboost-on-coiled'
)

client = Client(cluster)

The keyword argument software points to a public docker image – feel free to reuse it when running this example!

Commissioning a cluster in the cloud takes about 2 minutes. After connecting, I can reuse my existing preparation and training steps verbatim, this time on a grander scale!

After my training is done, I can save the model and close down the cluster, releasing the resources. Had I not done that, through forgetfulness, the entire setup will be decommissioned after 20 minutes of inactivity, reducing unnecessary costs.

Visualizing performance with Dask Dashboards

When shipping data science code for mission-critical tasks, I care about performance. The Dask cluster dashboard is a great and reliable friend when it comes to pinpointing bottlenecks and understanding whether computations take full advantage of available resources.

The task stream, which displays cluster workers’ activity and idleness over time, helps me understand whether I’ve designed the workload correctly. Tasks are horizontal bars – the longer they take to run, the wider the bars.

In this example dataset, I am working with a parquet-formatted dataset divided into 19 groups. The division means the data is saved as 19 parquet files. When I read the 19 files into Dask, I can decide how many partitions I’d like to have. By default, it will correspond to the number of groups.

What the task graph is showing me here is, after I scaled my cluster to include more nodes, some of them sit idle during XGBoost training. Why?

Did I partition the data incorrectly? Are there workers without their own chunk of data to run calculations on?

In this case, the actual reason is, XGBoost handles the internal communication between workers without providing live status updates to the Dask dashboard. As I scaled up to 12 workers with 4 cores each, I can see 12 horizontal bars, one per worker. 

Other features of the dashboard include a flamegraph (which you may know from using tools like py-spy and Speedscope), a cluster map, the live task graph, and more! It’s a reliable tool that saves money and helps design distributed computations correctly.

Big data is not as unwieldy as it used to be!

When applying XGBoost to real-world problems, running out of RAM can be a challenge. Fortunately, with Dask and Coiled you can burst to the cloud as and when needed, in a controlled and convenient manner, without leaving your favorite coding environment.

Making big problems easy to work with, reducing iteration speed, and giving data scientists their time back – that’s what Coiled is all about.

What we could have done better

  • We omitted data preparation steps to keep the post focused on XGBoost. Those are critical in a successful data science workflow, and Dask dataframes support handling these steps at scale. CSV files have no type metadata, take up much more space, and getting over those hurdles takes work.
  • More thorough performance evaluation on increasing dataset sizes, to stress-test the infrastructure and verify whether we get speed improvements and correct results.
  • Cost comparison with a powerful single-machine setup, like the new Amazon instance with 96 CPUs and 1.1TB of RAM.

Next steps

In the future, we’d like to improve on this solution by exploring the following topics

  • Accelerating training by activating GPUs
  • Benchmarking the training on the entire mortgage dataset (200GB, 16 years)
  • Diving into prediction explanations and building more features
  • Applying the tips from “How to control your XGBoost model” (Capital One)

Would you like to try it out on your own use case? 

How much acceleration can you achieve by going distributed?

Try Coiled today, and tweet at us – we’d love to hear about your success!