Snowflake and Dask

Matthew Rocklin James Bourbeau Florian Jetter June 22, 2021

, , , ,


tl;dr

Snowflake is a leading cloud data platform and SQL database.

Dask is a leading framework for scalable data science and machine learning in Python.

This article talks about why and how to use both together, and dives into the challenges of bulk parallel reads and writes into data warehouses, which is necessary for smooth interoperability.

Here is a video example: 

Motivation

Data warehouses are great at SQL queries but less great at more advanced operations, like machine learning or the free-form analyses provided by general-purpose languages like Python.

That’s ok.  Python users have been extracting copies of data out of databases for decades.  

It’s common to use a database to filter and join different datasets, and then pass that result off to Python for more custom computations.  We use each tool where it excels.  This works well as long as it is easy to perform the handoff of large results.  

However, as we work on larger datasets and our results grow beyond the scale of a single machine, passing results between a database and a general purpose computation system is challenging.  

This article describes a few ways in which we can move data between Snowflake and Dask today to help explain why this is both hard and important to do well.  Then at the end, we’ll preview new functionality currently in beta, which provides the best possible solution, and show an example of use.  

Sign Up For Beta 

Three ways to pass data from Snowflake to Dask

There are different ways to accomplish this task, each with different benefits and drawbacks.  By looking at all three, we’ll gain a better understanding of the problem, and what can go wrong.

1. Just use Pandas

First, if your data is small enough to fit in memory then you should just use Pandas and Python.  There is no reason to use a distributed computing framework like Dask if you don’t need it.

Snowflake publishes a Python connector with Pandas compatibility.  The following should work fine:

$ pip install snowflake-connector-python

>>> import snowflake
>>> df = snowflake.fetch_pandas_all(...)
ProsCons
– Super Simple
– Already available
– Doesn’t scale

2. Break into many subqueries

For larger datasets, we can also break one large table-scan query into many smaller queries and then run those in parallel to fill the partitions of a Dask dataframe.  

This is the approach implemented in the dask.dataframe.read_sql_table function. This function does the following steps:

  1. Query the length and schema of the table to determine the expected size, and determine an ideal number of partitions
  2. Query for the minimum and maximum of a column on which to split
  3. Submit many small queries that segment that column linearly between the minimum and maximum values.

For example, if we’re reading from a time series then we might submit several queries which each pull off a single month of data.  These each lazily return a Pandas dataframe.  We construct a Dask dataframe from these lazily generated subqueries. 

import dask.dataframe as dd
df = dd.read_sql_table(
    'accounts', 
    'snowflake://user:pass@...warehouse=...role=...', 
    npartitions=10, 
    index_col='id'
)

This scales and works with any database that supports a SQLAlchemy connector (which Snowflake does).  However, there are a few problems:

  • It only works for very simple queries, really just full table scans that can optionally select down columns and filter out rows.  For example, you couldn’t run a big join query and then feed that result to Dask without first writing out to a temporary table
  • It’s kinda slow.  SQLAlchemy/ODBC connectors are rarely well optimized (see Turbodbc for a good counter-example)
  • It’s inconsistent.  Those subqueries aren’t guaranteed to run at the same time, and so if other processes are writing to your database while you’re reading you could get incorrect/inconsistent results.  In some applications, this doesn’t matter.  In some, it matters a great deal
ProsCons
– Distributed reads/writes
– Uses existing SQLAlchemy machinery
– Works on all major databases
– Slow, backed by ODBC
– Doesn’t support complex queries
– Auth information in connection string
– Risk of inconsistent results

For more information see Dask Dataframe and SQL — Dask documentation

3. Bulk Export to Parquet

We can also perform a bulk export.  Both Snowflake and Dask (and really any distributed system these days) can read and write Parquet data on cloud object stores.  So we can perform a query with Snowflake and then write the output to Parquet, and then read in that data with Dask dataframe.

import dask.dataframe as dd
import snowflake

query = """
COPY INTO 's3://my_storage_location'
        from <Table name> file_format = (type = parquet) 
        credentials = (aws_key_id='xxxx' aws_secret_key='xxxxx' aws_token='xxxxxx');
"""

con = snowflake.connector.connect(
   user='XXXX',
   passwoard='XXXX',
   account='XXXX',
)
con.curson().execute(query)

df = dd.read_parquet('s3://my_storage_location', ...)

This is great because now we can perform large complex queries and export those results at high speed to Dask in a fully consistent way.  All of the challenges of the previous approach are removed.  

However, this creates new challenges with data management.  We now have two copies of our data.  One in Snowflake and one in S3.  This is technically suboptimal in many ways (cost, storage, …) but the major flaw here is that of data organization.  Inevitably these copies persist over time and get passed around the organization.  This results in many copies of old data in various states of disrepair.  We’ve lost all the benefits of centralized data warehousing, including cost efficiency, correct results, data security, governance, and more.  

Bulk exports provide performance at the cost of organization

Although note, users interested in this option could also look into Snowflake Stages.

New: Managed Bulk Transfers

Recently, Snowflake has added a capability for staged bulk transfers to external computation systems like Dask.  It combines the raw performance and support for complex queries of bulk exports, with the central management of directly reading SQL queries from the database.

Using this, we’re able to provide an excellent Snowflake <-> Dask data movement tool. 

Understanding Snowflake’s Parallel Data Exports

Now Snowflake can take the result of any query, and stage that result for external access.

After running the query, Snowflake gives us back a list of pointers to chunks of data that we can read.  We can then send pointers to our workers and they can directly pull out those chunks of data.  This is very similar to the bulk export option described above, except that now rather than us being responsible for that new Parquet-on-S3 dataset, Snowflake manages it.  This gives us all of the efficiency of a bulk export, while still letting Snowflake maintain control of the resulting artifact.  Snowflake can clean up that artifact for us automatically so that we’re always going back to the database as the single source of truth as we should.

Snowflake + Dask Integration

Given this new capability, we then wrapped Dask around it, resulting in the following experience:

import dask_snowflake
import snowflake

with snowflake.connector.connect(...) as conn:
    ddf = dask_snowflake.from_snowflake(
       query="""
       SELECT * FROM TableA JOIN TableB ON ...
       """,
       conn=conn,
    )

This is a smooth and simple approach that combines the best of all previous methods.  It is easy to use, performs well, and maturely handles any query.

Here is a video using Snowflake to perform a complex join, and then using Dask to train an XGBoost model on that data in parallel.

Snowflake/Dask Fine Print

There are a few things to keep in mind

  1. We recommend using Dask on the same cloud and region as your Snowflake deployment for optimal speed and to avoid data ingress/egress charges
  2. Writing data from Dask to Snowflake is also supported in the dask-snowflake package, but it is currently limited by the concurrency of your Snowflake cluster
  3. Your driving Python session (where you’re importing dask-snowflake) needs to be properly authenticated with Snowflake.  Your remote workers do not.  Dask-snowflake doesn’t require any additional considerations for authentication beyond what you do normally.  However, because authentication information is included in the messages sent to each worker we encourage you to ensure that you use secure network connections.
  4. These features aren’t yet fully released.  We recommend waiting until they hit general availability.  If you’re the kind of person who likes breaking things, then keep reading 🙂

Try it out

This is still in beta.  If you’d like early access then let us know by clicking below.

Sign Up For Beta 

The technical work described in this post was done by various engineers at Snowflake, and James Bourbeau and Florian Jetter from Coiled.