Big Data vs. Big Model: Scaling Your ML Workflow

Tom Augspurger, Data Scientist at Anaconda and lead maintainer of Dask-ML, recently joined us to discuss how he likes to think about scalable machine learning in Python. As Tom shared with us on the live stream,

“You have your machine learning workflow that works well for small problems. Then there are different types of scaling challenges you can run into … scaling the size of your data and scaling the size of your model.”

Tom Augspurger presenting his "Scalable Machine Learning with Dask" talk at AnacondaCon 2018.

In a previous post, we outlined Tom’s framework for attacking machine learning problems:

A four-quadrant graph with model size on the y-axis and data size on the x-axis.

In the bottom-left quadrant, your datasets aren’t too large (and therefore fit comfortably in RAM) and your model isn’t too large. Here, you’re much better off using something like scikit-learn, XGBoost, and similar libraries. You don’t need to do distributed machine learning with tools like Dask-ML here.

If you’re in any of the other quadrants, however, Tom thinks distributed machine learning is the way to go. In this post, we’ll summarize Tom’s strategies for solving both CPU (compute)-bound and memory-bound machine learning problems.

You can find the code for the session here. You can also check out the YouTube video here:

Strategies for solving CPU (compute)-bound ML problems

A four-quadrant graph with model size on the y-axis and data size on the x-axis. The top-left "CPU bound" quadrant is circled.

In compute-bound problems, your dataset fits in RAM just fine, but you’re waiting around for your CPU (or GPU, TPU) to finish it’s computations. This commonly occurs when there’s many mostly independent components to your estimator:

  • Hyperparameter Optimization: many CV-splits / hyperparameter combinations
  • Ensemble estimator: combine predictions from many estimators

These are relatively straightforward to parallelize. Tom hopped into a small example:

  • Tom generated a small, 5000-observation dataset
  • He then did a little grid search with a small number of parameters

Next, Tom got fitting:

Fitting a single estimator took 1-2 seconds. Your thought process here might be,

Okay, it took 1.5 seconds for one estimator. My problem has [INSERT LARGE NUMBER] of estimators though. Hmm, applying some linear scaling factor, that would take [INSERT LARGE NUMBER TIMES 1.5 TIMES LINEAR SCALING FACTOR] seconds to run. That’s probably too long. Let’s run this on the cluster.

Fitting the whole grid search on the Coiled cluster that Tom set up (which had 80 cores) took 14 seconds. Note how Tom switches to a Dask parallel backend here. What’s happening here? 

Internally, scikit-learn parallelizes for loops with joblib. Typically that parallel for loop uses threads or processes on a single machine. ⬇️

A diagram showing how scikit-learn and joblib interact.

To make this process parallelizable on a cluster, Tom and the Dask-ML developers worked with the scikit-learn / joblib devs to implement a dask parallel backend. So internally, scikit-learn now talks to joblib, and joblib talks to Dask, and Dask is what handles scheduling all of those tasks on the cluster. ⬇️

A diagram showing how scikit-learn, joblib, and Dask interact.

 Pretty much anything that uses joblib internally can use the Dask joblib backend. This includes:

  • Anything in scikit-learn with n_jobs (fitting trees in a Random Forest, voting methods, hyperparameter optimization, etc.)
  • TPOT

Without an 80-core cluster available to us here, this small example would have taken us around 19 minutes. Going from 19 minutes to 14 seconds is a nice difference. This live stream wouldn’t have been possible. They would have gone for a coffee and maybe a lunch.

The other thing? Not much changed in terms of lines of code. Tom notes, “In terms of infrastructure, it was fairly easy for me to get going since I have access to a cluster. That’s the biggest barrier. Do you have a cluster? Sign up for Coiled.”

Tom then applied the same “scikit-learn and joblib on a single machine” to “Dask-ML with a Coiled cluster” strategy for a larger, real-world dataset: the NYC taxi dataset. Tom’s process:

  • “We’ll load a tiny fraction of the NYC taxi cab dataset. We’ll have a blob of code to transform the heterogeneous features into a nice array of floats that we can use in an estimator (GradientBoostingClassifier).”
  • “This is our “small” dataframe, with ~80,000 rows. I’ll bring it back locally to my laptop with a .compute(). This next section only uses pandas, NumPy, and scikit-learn.”
  • “Let’s fix up the dtypes (since we read from CSV, which doesn’t have pandas’ dtypes), drop the missing values, and split into our features and outcome variables.” Note: the beautiful mess that is data cleaning.
  • “And now we’ll build up a big pre-processing pipeline. Note that we aren’t doing anything with Dask / Dask-ML here. It’s all NumPy, pandas, and scikit-learn.”

Here’s the pipeline visualized:

A diagram of a pre-processing pipeline.

Back to Tom coding:

A Jupyter Notebook screenshot displaying the fitting of a pipeline and its score.

Here’s where Dask comes in. It’s subtle, and hard to spot in the details of preprocessing and featuring engineering code that comes with all machine learning workflows.

A screenshot of a Jupyter Notebook displaying Dask-ML's GridSearchCV.

When Tom ran this chunk of code, the Dask Dashboard’s progress bars told us that we had 3 tasks for all the pre-processing stuff (StandardScalar, ColumnTransformer, etc.), while we had 36 for gradient boosting. That’s because Dask-ML’s hyperparameter optimizers avoid repeated work. We’re only searching over parameters later on in the pipeline (the gradient boosting classifier), so all those can share the results of the earlier stages of the pipeline (for that CV split of the data).”

The end result? One minute.

A screenshot of a Jupyter Notebook displaying the timing and fitting of Dask-ML's GridSearchCV.

After this part of stream, founder of Coiled Matt Rocklin noted,

“What I like about your example is that this cell is when you decided to use Dask. You almost skipped over the part where you decided to use Dask! pandas, scikit-learn, pandas, scikit-learn, then let’s scale that up. Now we’re off and running. The fact that you can put a scikit-learn pipeline directly into a Dask-ML hyperparameter optimization is a result of a lot of work. It’s actually very impressive how little happened.”

Tom followed up,

“It’s not that you can just put a scikit-learn pipeline into this thing, it’s that you can put Dask-native estimators inside a scikit-learn pipeline, so it goes both directions. And you can feed that into this Dask-ML grid search. It’s nice to have shared foundations that we can all build off of.”

Strategies for solving memory-bound ML problems

A four-quadrant graph with model size on the y-axis and data size on the x-axis. The bottom-right "RAM bound" quadrant is circled.

Tom notes that we can break this “large data” scaling challenge into a couple components:

  1. Data structures: NumPy & pandas were built for in-memory problems.
  2. ML algorithms: Many algorithms in (e.g.) scikit-learn were built for (in-memory) NumPy arrays.

He started by solving component #1: data structures. Tom started by migrating his earlier pre-processing code to Dask/Dask-ML. When translating pandas and scikit-learn code to Dask code, not much changed at all. The two big examples:

  • pd.concat ➡️ dd.concat
  • sklearn.preprocessing.OneHotEncoder ➡️ dask_ml.preprocessing.OneHotEncoder

Often, Tom didn’t even change the sklearns to daskmls. In lots of cases, he just left them! They actually just work, which is a whole other level of compatibility.

When Tom attacked component #2—that not all algorithms can handle distributed datasets—he noted that you just need to know what algorithm you’re working with. Tom suggests going to this scikit-learn strategy page. If you see an algorithm that implements partial fit, you know that that can deal with larger-than-memory datasets by feeding it batches of data at a time.

At this point in the session, Tom pulled up the Dask Profiler. He clicked on read_csv task.

A screenshot of Tom Augspurger using the Dask Profiler.

This visualization (powered by Bokeh!) is showing us where each Dask worker’s CPU’s time was spent. With this visualization, we can ask “Where can I speed up my computation?” In this case, Tom spent 90% of his time reading data into memory (it is a memory-bound machine learning problem after all). This gave him a sense of where he should spend time optimizing (e.g., switching to Parquet) and the benefits he can get from optimizing.

Do you need to do Machine Learning at Scale?

It was great to see Tom do all of this scalable machine learning using a Coiled cluster! You too can get started on a Coiled cluster immediately. Coiled also handles security, conda/docker environments, and team management, so you can get back to doing data science. Get started for free by joining our Beta.

Share