Converting Dask DataFrame object columns to numbers with to_numeric

Matthew Powers February 22, 2022

,


This post explains how to convert Dask DataFrame object columns to floating point columns with to_numeric() and why it’s more flexible than astype() in certain situations. This design pattern is especially useful when you’re working with data in text based file formats like CSV. You’ll often have to read in numeric columns stored in CSV files as object columns because of messy data and then convert the numeric columns to floating point values to null out the bad data. You need to convert object columns to numeric values so you can perform numerical operations like addition, subtraction, etc. You can use the tactics outlined in this post for data munging in an extract, transform, & load (ETL) pipeline.

Cleaning data is often the first step of a data project. Luckily Dask has great helper methods like to_numeric() that make it easy to clean the data and properly type the columns in your DataFrames.

Converting object column with to_numeric

Let’s look at a simple example with a DataFrame that contains an invalid string value in a column that should only contain numbers. Let’s start by creating a DataFrame with nums and letters columns.

import dask.dataframe as dd
import pandas as pd

df = pd.DataFrame(
    {"nums": [1, 2.8, 3, 4, "hi", 6], "letters": ["a", "b", "c", "d", "e", "f"]}
)
ddf = dd.from_pandas(df, npartitions=2)

Now let’s print the contents of the DataFrame so it’s easy to visualize.

print(ddf.compute())

 nums letters
0    1       a
1  2.8       b
2    3       c
3    4       d
4   hi       e
5    6       f

Notice that row 4 in the nums column has the value “hi”. That’s a string value that Python cannot convert into a numerical value.

Let’s look at the data types of the columns and see that Dask is treating both nums and letters as object type columns.

ddf.dtypes

nums       object
letters    object

Let’s convert the nums column to be a number column with to_numeric.

ddf["nums"] = dd.to_numeric(ddf["nums"], errors="coerce")

ddf.dtypes

nums        int64
letters    object

print(ddf.compute())

  nums letters
0   1.0       a
1   2.8       b
2   3.0       c
3   4.0       d
4   NaN       e
5   6.0       f

Dask has conveniently nulled out the “hi” value in row 4 to be NaN. Nulling out values that cannot be easily converted to numerical values is often what you’ll want. Alternatively, you can set errors=“raise” to raise an error when a value can’t be cast to numeric dtype.

Limitations of astype

Many beginning Dask users tend to use the astype method to convert object columns to numeric columns. This has important limitations.

Let’s create another DataFrame to see when astype can be used to convert from object columns to numeric columns and when it falls short.

df2 = pd.DataFrame(
    {"n1": ["bye", 2.8, 3], "n2": ["7.7", "8", 9.2]}
)
ddf2 = dd.from_pandas(df, npartitions=2)

print(ddf2.compute())

   n1    n2
0  bye  7.7
1  2.8    8
2    3  9.2

You can run ddf2.dtypes to see that both n1 and n2 are object columns.

n1    object
n2    object

n2 is an object type column because it contains string and float values.

Let’s convert n2 to be a float64 column using astype.

ddf2["n2"] = ddf2["n2"].astype("float64")

ddf2.dtypes

n1     object
n2    float64
dtype: object

print(ddf2.compute())

   n1   n2
0  bye  7.7
1  2.8  8.0
2    3  9.2

astype can convert n2 to be a float column without issue.

Now let’s try to convert n1 to be a float column with astype.

ddf2["n1"] = ddf2["n1"].astype("float64")

print(ddf2.compute())

This errors out with the following long error message.

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
Input In [37], in <module>
----> 1 print(ddf2.compute())

File ~/opt/miniconda3/envs/standard-coiled/lib/python3.9/site-packages/dask/base.py:288, in DaskMethodsMixin.compute(self, **kwargs)
    264 def compute(self, **kwargs):
    265     """Compute this dask collection
    266 
    267    This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    286    dask.base.compute
    287    """
--> 288     (result,) = compute(self, traverse=False, **kwargs)
    289     return result

File ~/opt/miniconda3/envs/standard-coiled/lib/python3.9/site-packages/dask/base.py:571, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    568     keys.append(x.__dask_keys__())
    569     postcomputes.append(x.__dask_postcompute__())
--> 571 results = schedule(dsk, keys, **kwargs)
    572 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/opt/miniconda3/envs/standard-coiled/lib/python3.9/site-packages/dask/threaded.py:79, in get(dsk, result, cache, num_workers, pool, **kwargs)
     76     elif isinstance(pool, multiprocessing.pool.Pool):
     77         pool = MultiprocessingPoolExecutor(pool)
---> 79 results = get_async(
     80    pool.submit,
     81    pool._max_workers,
     82    dsk,
     83    result,
     84    cache=cache,
     85    get_id=_thread_get_id,
     86    pack_exception=pack_exception,
     87    **kwargs,
     88 )
     90 # Cleanup pools associated to dead threads
     91 with pools_lock:

File ~/opt/miniconda3/envs/standard-coiled/lib/python3.9/site-packages/dask/local.py:507, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
    505         _execute_task(task, data)  # Re-execute locally
    506     else:
--> 507         raise_exception(exc, tb)
    508 res, worker_id = loads(res_info)
    509 state["cache"][key] = res

File ~/opt/miniconda3/envs/standard-coiled/lib/python3.9/site-packages/dask/local.py:315, in reraise(exc, tb)
    313 if exc.__traceback__ is not tb:
    314     raise exc.with_traceback(tb)
--> 315 raise exc

File ~/opt/miniconda3/envs/standard-coiled/lib/python3.9/site-packages/dask/local.py:220, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    218 try:
    219     task, data = loads(task_info)
--> 220     result = _execute_task(task, data)
    221     id = get_id()
    222     result = dumps((result, id))

File ~/opt/miniconda3/envs/standard-coiled/lib/python3.9/site-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File ~/opt/miniconda3/envs/standard-coiled/lib/python3.9/site-packages/dask/optimization.py:969, in SubgraphCallable.__call__(self, *args)
    967 if not len(args) == len(self.inkeys):
    968     raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 969 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))

File ~/opt/miniconda3/envs/standard-coiled/lib/python3.9/site-packages/dask/core.py:149, in get(dsk, out, cache)
    147 for key in toposort(dsk):
    148     task = dsk[key]
--> 149     result = _execute_task(task, cache)
    150     cache[key] = result
    151 result = _execute_task(out, cache)

File ~/opt/miniconda3/envs/standard-coiled/lib/python3.9/site-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File ~/opt/miniconda3/envs/standard-coiled/lib/python3.9/site-packages/dask/core.py:119, in <genexpr>(.0)
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File ~/opt/miniconda3/envs/standard-coiled/lib/python3.9/site-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File ~/opt/miniconda3/envs/standard-coiled/lib/python3.9/site-packages/dask/utils.py:37, in apply(func, args, kwargs)
     35 def apply(func, args, kwargs=None):
     36     if kwargs:
---> 37         return func(*args, **kwargs)
     38     else:
     39         return func(*args)

File ~/opt/miniconda3/envs/standard-coiled/lib/python3.9/site-packages/dask/utils.py:1021, in methodcaller.__call__(self, _methodcaller__obj, *args, **kwargs)
   1020 def __call__(self, __obj, *args, **kwargs):
-> 1021     return getattr(__obj, self.method)(*args, **kwargs)

File ~/opt/miniconda3/envs/standard-coiled/lib/python3.9/site-packages/pandas/core/generic.py:5815, in NDFrame.astype(self, dtype, copy, errors)
   5808     results = [
   5809         self.iloc[:, i].astype(dtype, copy=copy)
   5810         for i in range(len(self.columns))
   5811     ]
   5813 else:
   5814     # else, only a single dtype is given
-> 5815     new_data = self._mgr.astype(dtype=dtype, copy=copy, errors=errors)
   5816     return self._constructor(new_data).__finalize__(self, method="astype")
   5818 # GH 33113: handle empty frame or series

File ~/opt/miniconda3/envs/standard-coiled/lib/python3.9/site-packages/pandas/core/internals/managers.py:418, in BaseBlockManager.astype(self, dtype, copy, errors)
    417 def astype(self: T, dtype, copy: bool = False, errors: str = "raise") -> T:
--> 418     return self.apply("astype", dtype=dtype, copy=copy, errors=errors)

File ~/opt/miniconda3/envs/standard-coiled/lib/python3.9/site-packages/pandas/core/internals/managers.py:327, in BaseBlockManager.apply(self, f, align_keys, ignore_failures, **kwargs)
    325         applied = b.apply(f, **kwargs)
    326     else:
--> 327         applied = getattr(b, f)(**kwargs)
    328 except (TypeError, NotImplementedError):
    329     if not ignore_failures:

File ~/opt/miniconda3/envs/standard-coiled/lib/python3.9/site-packages/pandas/core/internals/blocks.py:591, in Block.astype(self, dtype, copy, errors)
    573 """
    574 Coerce to the new dtype.
    575 
   (...)
    587 Block
    588 """
    589 values = self.values
--> 591 new_values = astype_array_safe(values, dtype, copy=copy, errors=errors)
    593 new_values = maybe_coerce_values(new_values)
    594 newb = self.make_block(new_values)

File ~/opt/miniconda3/envs/standard-coiled/lib/python3.9/site-packages/pandas/core/dtypes/cast.py:1309, in astype_array_safe(values, dtype, copy, errors)
   1306 dtype = pandas_dtype(dtype)
   1308 try:
-> 1309     new_values = astype_array(values, dtype, copy=copy)
   1310 except (ValueError, TypeError):
   1311     # e.g. astype_nansafe can fail on object-dtype of strings
   1312     #  trying to convert to float
   1313     if errors == "ignore":

File ~/opt/miniconda3/envs/standard-coiled/lib/python3.9/site-packages/pandas/core/dtypes/cast.py:1257, in astype_array(values, dtype, copy)
   1254     values = values.astype(dtype, copy=copy)
   1256 else:
-> 1257     values = astype_nansafe(values, dtype, copy=copy)
   1259 # in pandas we don't store numpy str dtypes, so convert to object
   1260 if isinstance(dtype, np.dtype) and issubclass(values.dtype.type, str):

File ~/opt/miniconda3/envs/standard-coiled/lib/python3.9/site-packages/pandas/core/dtypes/cast.py:1201, in astype_nansafe(arr, dtype, copy, skipna)
   1197     raise ValueError(msg)
   1199 if copy or is_object_dtype(arr.dtype) or is_object_dtype(dtype):
   1200     # Explicit copy, or required since NumPy can't view from / to object.
-> 1201     return arr.astype(dtype, copy=True)
   1203 return arr.astype(dtype, copy=copy)

ValueError: could not convert string to float: 'bye'

astype raises errors when columns contain string values that cannot be converted to numbers. It doesn’t coerce string values to NaN.

to_numeric also has the same default behavior and this code will error out as well.

ddf2["n1"] = dd.to_numeric(ddf["n1"])

print(ddf2.compute())

Here’s the error message: “ValueError: Unable to parse string “bye” at position 0”.

You need to set errors=”coerce” to successfully invoke to_numeric.

ddf2["n1"] = dd.to_numeric(ddf["n1"], errors="coerce")

print(ddf2.compute())

   n1   n2
0  NaN  7.7
1  2.8  8.0
2  3.0  9.2

See this blog post for more information about astype.

Conclusion

Dask makes it easy to convert object columns into number columns with to_numeric.

to_numeric is customizable with different error behavior when values cannot be converted to numbers. You can coerce these values to NaN, raise an error, or ignore these values. Choose the behavior that works best for your application.

The Parquet file format is generally better than CSV for data analysis because it doesn’t allow for non-numeric data to get written in number columns in the first place. Messy data like this is the CSV file format’s fault in most cases. Whenever possible, convert your CSV data to Parquet files, so you don’t need to deal with these types of data inconsistencies. See here for benchmarks on why Parquet is better than CSV.


Ready to get started?

Create your first cluster in minutes.