Scikit-learn + Joblib: Scale your Machine Learning Models for Faster Training

March 15, 2022

You can train scikit-learn models in parallel using the scikit-learn joblib interface. This allows scikit-learn to take full advantage of the multiple cores in your machine (or, spoiler alert, on your cluster) and speed up training.

Using the Dask joblib backend, you can maximize parallelism by scaling your scikit-learn model training out to a remote cluster.

This works for all scikit-learn models that expose the n_jobs keyword, like random forests, linear regressions, and other machine learning algorithms. You can also use the joblib library for distributed hyperparameter tuning with grid search cross-validation.

Scikit-learn Models vs Data Size

When training machine learning models, you can run into 2 types of scalability issues: your model size may get too large, or your data size may start to cause issues (or even worse: both!). You can typically recognize a scalability problem related to your model size by long training times: the computation will complete (eventually), but it’s becoming a bottleneck in your data processing pipeline. This means you’re in the top-left corner of the diagram below:

Using the scikit-learn joblib integration, you can address this problem by processing your scikit-learn models in parallel, distributed over the multiple cores in your machine.

Note that this is only a good solution if your training data and test data fit in memory comfortably. If this is not the case (and you’re in the “I give up!” quadrant of large models and large datasets), see the “Dataset Too Large?” section below to learn how Dask-ML or XGBoost-on-Dask can help you out.

sklearn joblib

Sklearn + Joblib: Run Random Forest in Parallel Locally

Let’s see this in action with an scikit-learn algorithm that is embarrassingly parallel: a random forest model. A random forest model fits a number of decision tree classifiers on sub-samples of the training data , then combines the results from the various decision trees to make a prediction. Because each decision tree is an independent process, this model can easily be trained in parallel.

First, we'll create a new virtual environment for this work. (If you're following along, you can also use pip or any other package manager, or your own existing environment.)

conda create -n sklearn-example -c conda-forge python=3.9 scikit-learn joblib ipython
conda activate sklearn-example

We’ll import the necessary libraries and create a synthetic classification dataset.

from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import make_classification
X, y = make_classification(n_samples=1000, n_features=4,
                          n_informative=2, n_redundant=0,
                          random_state=0, shuffle=False)

Then we’ll instantiate our Random Forest classifier

clf = RandomForestClassifier(max_depth=2, random_state=0)

We can then use a joblib context manager to train this classifier in parallel, specifying the number of cores we want to use with the n_jobs argument.

import joblib
with joblib.parallel_backend(backend='loky', n_jobs=4):
    clf.fit(X, y)

To save users from having to use context managers every time they want to train a model in parallel, scikit-learn's developers exposed the n_jobs argument as part of the instantiating call:

clf = RandomForestClassifier(max_depth=2, random_state=0, n_jobs=-1)

The n_jobs keyword communicates to the joblib backend, so you can directly call clf.fit(X, y) without wrapping it in a context manager. This is the recommended approach for using joblib to train sklearn models in parallel locally.

Running this locally with n_jobs = -1 on a MacBook Pro with 8 cores and 16GB of RAM takes just under 3 minutes.

%%time
clf.fit(X,y)
CPU times: user 13min 21s, sys: 17.8 s, total: 13min 38s
Wall time: 2min 6s

Sklearn + Joblib: Run RF on the Cluster

The context manager syntax can still come in handy when your model size increases beyond the capabilities of your local machine, or if you want to train your model even faster. In those situations, you could consider scaling out to remote clusters on the cloud. This can be especially useful if you’re running heavy grid search cross-validation, or other forms of hyperparameter tuning.

You can use the Dask joblib backend to delegate the distributed training of your model to a Dask cluster of virtual machines in the cloud.

We first need to install some additional packages:

conda install -c conda-forge coiled dask xgboost dask-ml

Next, we’ll launch a Dask cluster of 20 machines in the cloud.

When you create the cluster, Coiled will automatically replicate your local sklearn-example environment in your cluster (see Package Sync).

import coiled
cluster = coiled.Cluster(
    name="sklearn-cluster",
    n_workers=20,
)

We’ll then connect Dask to our remote cluster. This will ensure that all future Dask computations are routed to the remote cluster instead of to our local cores.

# point Dask to remote cluster
client = cluster.get_client()

Now use the joblib context manager to specify the Dask backend. This will delegate all model training to the workers in your remote cluster:

%%time
with joblib.parallel_backend("dask"):
    clf.fit(X, y)

Model training is now occurring in parallel on your remote Dask cluster.

This runs in about a minute and a half on my machine—that’s a 2x speed-up. You could make this run even faster by adding more workers to your cluster.

CPU times: user 1.93 s, sys: 601 ms, total: 2.53 s
Wall time: 1min 1s

You can also use your Dask cluster to run an extensive hyperparameter grid search that would be too heavy to run on your local machine:

from sklearn.model_selection import GridSearchCV

# Create a parameter grid
param_grid = {
    'bootstrap': [True],
    'max_depth': [80, 90, 100, 110],
    'max_features': [2, 3],
    'min_samples_leaf': [3, 4, 5],
    'min_samples_split': [8, 10, 12],
    'n_estimators': [100, 200, 300, 1000]
}

# Instantiate the grid search model
grid_search = GridSearchCV(
    estimator=clf, 
    param_grid=param_grid, 
    cv=5, 
    n_jobs=-1
)

Before we execute this compute-heavy Grid Search, let’s just scale up our cluster to 100 workers to speed up training and ensure we don’t run into memory overloads:

cluster.scale(100)
client.wait_for_workers(100)

Now let’s execute the grid search in parallel:

%%time
with joblib.parallel_backend("dask"):
    grid_search.fit(X, y)

If this is your first time working in a distributed computing system or with remote clusters, you may want to consider reading The Beginner’s Guide to Distributed Computing.

Dataset too large?

Is your dataset becoming too large, for example because you have acquired new data? In that case, you may be experiencing two scalability issues at once: both your model and your dataset are becoming too large. You typically notice an issue with your dataset size by pandas throwing a MemoryError when trying to run a computation over the entire dataset.

sklearn joblib

Dask exists precisely to solve this problem. Using Dask, you can scale your existing Python data processing workflows to larger-than-memory datasets. You can use Dask DataFrames or Dask Arrays to process data that is too large for pandas or NumPy to handle. If you’re new to Dask, check out this Introduction to Dask.

Dask also scales machine learning for larger-than-memory datasets. Use dask-ml or the distributed Dask backend to XGBoost to train machine learning models on data that doesn't fit into memory. Dask-ML is a library that contains parallel implementation of many scikit-learn models.

The code snippet below demonstrates training a Logistic Regression model on larger-than-memory data in parallel.

from dask_ml.linear_model import LogisticRegression
from dask_ml.datasets import make_classification

X, y = make_classification(chunks=50)
lr = LogisticRegression()
lr.fit(X, y)

Dask also integrates natively with XGBoost to train XGBoost models in parallel:

import xgboost as xgb

# create DMatrix
dtrain = xgb.dask.DaskDMatrix(client, X, y)

# train model
output = xgb.dask.train(
   client, params, dtrain, num_boost_round=4,
   evals=[(dtrain, 'train')]
)

Scikit-learn + Joblib Summary

We’ve seen that:

  • You can use the scikit-learn's joblib integration to distribute certain scikit-learn tasks over all the cores in your machine for a faster runtime.
  • You can connect joblib to the Dask backend to scale out to a remote cluster for even faster processing times.
  • You can use XGBoost-on-Dask and/or dask-ml for distributed machine learning training on datasets that don’t fit into local memory.

If you’d like to scale your Dask work to the cloud, Coiled provides quick and on-demand Dask clusters along with tools to manage environments, teams, and costs. Click below to learn more!

With GitHub, Google or email.

Use your AWS or GCP account.

Start scaling.

$ pip install coiled
$ coiled setup
$ ipython
>>> import coiled
>>> cluster = coiled.Cluster(n_workers=500)