Getting started with Dask and SQL

Lots of people talk about “democratizing” data science and machine learning. What could be more democratic — in the sense of widely accessible — than SQL, PyData, and scaling data science to larger datasets and models?

Dask is rapidly becoming a go-to technology for scalable computing. Despite a strong and flexible dataframe API, Dask has historically not supported SQL for querying most raw data.

In this post, we look at dask-sql, an exciting new open-source library that offers a SQL front-end to Dask. Follow along with this notebook. You can also load it up on Coiled Cloud if you want to access some serious Dask clusters for free with a single click! To do so, log into Coiled Cloud here, navigate to our example notebooks, and launch the dask-sql notebook.

In this post, we:

  • Launch a Dask cluster and use dask-sql to run SQL queries on it!
  • Perform some basic speed tests,
  • Use SQL and cached data to turbocharge our analytics,
  • Investigate SQL built-in helper functions in dask-sql,
  • Provide an example of fast plotting from big data.

Many thanks to Nils Braun, the creator of dask-sql, for his thoughtful and constructive feedback on this post.

Follow along on Coiled Cloud with this notebook as we look at dask-sql.

Launch a Dask cluster and get ready for SQL

dask-sql is free + open source and will work with any Dask cluster, so you can run this (with minimal modification) on any environment. One easy way to spin up a cluster on AWS (Azure and GCP coming soon) is with Coiled Cloud. One reason it’s easy is that you don’t need to mess around with Docker and/or Kubernetes! That’s what we’ll do here and feel free to code along. 

If you haven’t signed up for the Coiled beta, you can do so for free with just a Github or Google ID here.

Then we perform our imports and spin up our cluster!

import coiled 
from dask.distributed import Client 

cluster = coiled.Cluster(n_workers=20) 
client = Client(cluster) 
client 

Next, we’ll install dask-sql . It’s a simple install but may take a minute or two. 

Analyze data in the cloud

At this point, we’re ready to start querying data! 

Before we run our first SQL query, let’s test things out with a “starter” query on the dataset of interest: this query calculates the average tip amount by passenger count for the 2019 records in the NYC taxicab dataset. 

Then we’ll try it again in SQL. 

import dask.dataframe as dd 

df = dd.read_csv( 
  "s3://nyc-tlc/trip data/yellow_tripdata_2019-*.csv", 
  dtype={ 
  "payment_type": "UInt8",
  "VendorID": "UInt8", 
  "passenger_count": "UInt8", 
  "RatecodeID": "UInt8", 
  }, 
  storage_options={"anon": True} 
 ) 


df.groupby("passenger_count").tip_amount.mean().compute() 

Great! 

Our cluster and code are running. 

Now let’s try some SQL with dask-sql!

dask-sql uses some code to “automagically” locate the JVM shared library we’ll need, but it doesn’t find the right path here in this cloud-deployed Jupyter container. We’ll give it a hint:

As we’ll see later when we take a look under the hood, dask-sql uses a Java library to handle some of the query analysis, so we’ll give it a hint about the JVM path.

import os 
os.environ["JAVA_HOME"] = "/opt/conda/lib/server"

dask-sql uses a well-established Java library, Apache Calcite, to parse the SQL and perform some initial work on your query. It’s a good thing because it means that dask-sql isn’t reinventing yet another query parser and optimizer, although it does create a dependency on the JVM.  Note that the speed drawback of starting and managing the JVM is only a problem when parsing the query, not when executing it. We’ll soon see that this doesn’t add significant overhead.

from dask_sql import Context 

c = Context() 

This Context instance will let us run queries … but first we need some data sources. 

There are a number of ways to define data sources with dask-sql but one of the simplest is to supply a Dask Dataframe as the source. The Dask Dataframe: 

  • is lazy, so it doesn’t retrieve the data until needed, 
  • can discover the data schema, 
  • supports out-of-core access — a fancy way of saying it does not need to actually load the data up into memory (e.g., maybe the data doesn’t fit in memory, or maybe you want that memory for other computations), 
  • knows how to retrieve the data from the underlying source (e.g., “CSV files in S3”). 

To use the data in our SQL queries, we need to assign an identifier (a name) to it in dask-sql.

The following code: 

  1. associates the table name taxi with df 
  2. creates a query to count the rows in this table 
  3. returns a handle to a lazy resultset in the form of a Dask dataframe
c.register_dask_table(df, "taxi") 

result = c.sql('SELECT count(1) FROM taxi') 

result

In almost all cases, running c.sql(…) will not actually run the full query, but just produces a Dask dataframe handle representing the results. (There are a couple of edge cases in the dask-sql docs that do trigger an immediate computation today, but the long term goal is to make as much lazy as possible.) 

How do we get our actual rowcount out? The same way we evaluate any other small Dask result that we want to retrieve: via .compute()

result.compute() 

Ok, we’ve run our first dask-sql query and gotten results back!

Speed test

Now let’s revisit the “starter query” — the one that calculates the average tip amount by passenger count. 

We have two goals here: 

  • Write the same query in Dask/Python and SQL, and see that they work and produce the same results, 
  • Time the execution, to verify that SQL does not add any significant performance overhead 
    • The SQL query gets processed and converted for Dask just once no matter how much data is involved, so it should not add any cost for nontrivial datasets.

If you’re trying this out, this is also a great time to view the Dask task stream dashboard, to see the cluster in action. You can do this via the JupyterLab extension or through the Coiled Cloud GUI.

%%time 

df.groupby("passenger_count").tip_amount.mean().compute() 

%%time 

c.sql('SELECT avg(tip_amount) FROM taxi GROUP BY 

passenger_count').compute()

You should see the identical output (we saw around 25 seconds for both). Moreover, you should see near-identical wall-clock time. The SQL processing adds 100ms or less and is a constant, one-time cost.

SQL + Cached data = turbocharged analytics

Let’s see how we can accelerate analytics by caching this dataset in our cluster, and then running SQL queries against that cached data. 

This isn’t just fast, it’s whole-team-friendly because: 

  • we can expose this dataset to other Dask analysts, incentivizing sharing a “large RAM pool” cluster for analytics over the data, 
  • dask-sql exposes the Presto wire protocol, so folks using Presto compatible clients or visualization tools can access this data with zero programming! 

First, we’ll ask Dask to cache the table. 

dfp = df.persist()

It may take a few seconds (or a lot more for really big datasets) to get loaded into cluster RAM. 

We can watch blocks load up in realtime in the Dask Graph dashboard. This shows tasks turn green as they compute and then red as the results are loaded in memory. In this case, each task retrieves a partition of the data from S3.

We can watch blocks load up in realtime in the Dask Graph dashboard. This shows tasks turn green as they compute and then red as the results are loaded in memory. In this case, each task retrieves a partition of the data from S3.

In other situations, we may not want to watch a GUI, but programmatically wait for the data to be loaded. 

We can do that using distributed.wait(…) : 

import dask.distributed 

cached_tasks = dask.distributed.wait(dfp) 
print(f'cached {len(cached_tasks[0])} results')

Next, we’ll give a new table name to this new cached flavor of the dataset (the dfp to which we assigned the result of df.persist above).

dfp is a little opaque, so we’ll name this table taxi_cached.

c.register_dask_table(dfp, "taxi_cached")

As a quick test to see how much faster it is working out of memory, let’s count the rows again.

result = c.sql('SELECT count(1) FROM taxi_cached') 
result.compute() 

Let’s also try that average tip by passenger count query, this time from cache. 

%%time 

c.sql('SELECT avg(tip_amount) FROM taxi_cached GROUP BY  passenger_count').comput

Not surprisingly, since working from the cached data removes most of the I/O, parsing, and ser/de from the job, it runs a lot faster than before.

SQL built-in functions

dask-sql also exposes a number of helper functions in SQL — just like traditional relational databases expose helper functions for math, date/time processing, string manipulation, and more. 

Here’s the floor function running on a static literal value:

c.sql('SELECT floor(3.14)').compute()

Using floor to discretize (or bin) the trip distances, we can look at a coarse-grained average fare for distance buckets. 

Our next query looks at rides with distances between 0 and 50, splits ( GROUP BY ) the binned ( floor() ) distance, and then for each of those bins, returns the floor’ed distance, the average fare, and the number of rides. 

Since we know — based on our query — that the report output will contain just 50 rows and 3 columns, we can safely compute it and get a result locally as a regular Pandas dataframe. If our results were much larger — or an intermediate transformation that we want to use in subsequent operations — we would either write it to persistent storage or keep the result in the cluster. After all, a big dataset will not fit in our local process, where the Dask Client and dask-sql Context objects live (note that the output of a dask-sql query can be feed into dask-sql again — making it possible to get an analog of “VIEWS” in SQL). 

%%time 

 c.sql(""" 
SELECT floor(trip_distance) AS dist, avg(fare_amount) as fare, count(1) as t  
FROM taxi_cached  
WHERE trip_distance < 50 AND trip_distance >= 0  
GROUP BY floor(trip_distance) 
""").compute()

Due to Dask already implementing so many of the computation building blocks, dask-sql is able to cover most of the SQL components – including things such as subqueries, JOINs, and aggregations.

Opportunity to contribute

If you look at the dask-sql docs, you’ll notice that there aren’t that many helper functions implemented yet. For example, most databases have several date and time processing helper functions, and today dask-sql does not implement all of them.

Most databases have several string processing helper functions, and today dask-sql just has one. 

This is a great opportunity to add valuable functionality and contribute to the library, since implementing many of these functions is just a matter of finding the existing Dask function and hooking it up. 

There are a ton of functions we might want to add, but each one is small, so it’s a great crowd-sourcing opportunity. You can see the existing implementations by clicking here.

One more example: fast plotting from big data

Since our results come back fast and as a Pandas DataFrame, we can easily make visualizations. This pattern can help us approach near-real-time interactive data exploration and visualization. 

If you don’t have matplotlib installed, you can install it with this command: 

! conda install -y matplotlib -c conda-forge 

And now we can run a query and immediately plot a visualization of the result using Pandas plotting syntax!

c.sql(""" 
SELECT floor(trip_distance) AS dist, avg(fare_amount) as fare  
FROM taxi_cached  
WHERE trip_distance < 50 AND trip_distance >= 0  
GROUP BY floor(trip_distance) 
""").compute().plot(x='dist', y='fare')

A peek under the hood

How does the technology fit together?

dask-sql relies on the well-established Apache Calcite (https://calcite.apache.org/), a Java project, to 

  • parse SQL 
  • represent queries as a tree of operators 
  • normalize and optimize queries 
  • Calcite’s optimizer is extensible, so there are numerous “plug points” for adding more capabilities in the future 

That’s a lot! What’s left? 

The output from Calcite is a representation of a query as a tree of logical operators. These are things like projections (think of them as abstractions of SELECTs) and filters (abstractions of WHERE ). 

The next job of dask-sql is to provide plugins that convert from purely abstract operators to logic that expresses that operation in terms of Dask APIs. The results are still logical operators, but a bit more concrete — similar to what you would get if you wrote a Dask dataframe query yourself, so it’s ready to execute on your Dask cluster. At execution time, Dask provides physical implementations of the operations, which vary depending, e.g., on how your data is stored.

Creating more opportunities

Synergy is a cliche these days. But adding a SQL front-end to Dask enables a ton of new users and new uses cases to share state-of-the-art Python data solutions. 

For example, analysts and businesspeople who are fluent with SQL but don’t write imperative code can now leverage Dask, PyData, Coiled, and so much more … while collaborating with those who do prefer to code. 

Custom function support in dask-sql means coders can create simple wrappers around complex flows (e.g., applying a machine learning model to score records) and SQL users can create reports with those functions. 

Last, with database server capability via Presto (and perhaps soon JDBC, since Calcite includes JDBC support) it becomes possible to take a visualization solution like Tableau and point it at a Dask cluster for visual analytics at scale. 

Links: 

Lastly, if you like this approach, “feel the need for speed,” and have GPUs available, be sure to check out BlazingSQL, which offers a similar SQL+Dask architecture on top of GPU compute for mind-blowing query speeds.

Turbocharge your data science 

For one-click hosted clusters and faster data science and analytics, try out Coiled Cloud for free today. Whether you’re interested in SQL databases or machine learning, we have plenty of ways for you to get up and running in our examples library.

Share

Sign up for updates