Data Analysis with FugueSQL on Coiled Dask Clusters

Han Wang Kevin Kho May 19, 2021

, ,


TLDR: fugue-sql is a SQL interface meant for data analysts and SQL lovers to take advantage of the Dask execution engine with a language that is familiar to them. Attached to this blog post is an interactive notebook that will show you how to use fugue-sql on top of a Dask cluster hosted on Coiled. This blog post details the motivation and basics behind fugue-sql, along with code snippets that illustrate how fugue-sql interacts with Python functions to create a powerful end-to-end solution for distributed data analyst workflows performed on Coiled with Dask.

Background

As data gets larger and larger, data scientists are increasingly using Dask to parallelize compute workflows on big data. However, big data is not just limited to data scientists and data engineers. Data analysts are also working with increasingly larger datasets. The problem is that data analysts who primarily want to use SQL to manipulate data haven’t had a SQL interface to use with Dask. In this blog post, we’ll introduce fugue-sql, a SQL-based language compatible with Pandas, Dask, and Spark. With fugue-sql, data analysts will be able to work in a language that they are comfortable in and utilize the power of Dask’s distributed compute engine.

There is an example notebook that accompanies this blog post, which you can find on Coiled Cloud and GitHub, that shows how easy it is to perform data analysis with fugue-sql on top of Dask clusters through Coiled.

fugue-sql First Look

More detailed installation instructions are attached in the accompanying notebook. Here, we will dive right in to the basic syntax of fugue-sql. First, we load the NYC taxi dataset, which is commonly used in other Dask tutorials. Screenshots were used to preserve syntax highlighting, but the notebook attached to this blog post can be accessed to copy the code.

Now that the data is loaded, we can use it in succeeding fugue-sql blocks using the syntax seen in the image below. fugue-sql supports all ANSI SQL keywords such as ORDER BY, GROUP BY, WHERE, SELECT, etc. Jupyter notebook cells can be turned into SQL cells by using the %%fsql cell magic call. Passing “dask” after %%fsql tells fugue to run the SQL logic on top of the Dask execution engine. Not specifying anything uses pandas by default.

Another thing to note in the code snippet above is the assignment of tempdf. This is a cleaner syntax to create intermediate tables similar to SQL temp tables and common table expressions (CTE).

Enhancements Over Standard SQL

fugue-sql has a couple of enhancements over standard SQL that allow it to support end-to-end workflows for data analysts. fugue-sql allows users to LOAD from csv/json/parquet files using Pandas and Dask under the hood. This means we can load in data, perform transformations on it, and then SAVE the results. Data analysts can now work on flat files with a language they are comfortable in. The code below runs in pandas because we don’t specify the engine after %%fsql.

Using Python Functions

fugue-sql provides interoperability with Python functions. In the example below, we define a simple seaborn barplot function that displays a plot with the given x_col and y_col. We first use the SAMPLE keyword to reduce the number of records we are plotting. fugue-sql has a couple of added operations (DROP NULL, ALTER SCHEMA, FILL, etc.) that modify DataFrames. 

The sampled DataFrame is then passed to our seaborn_barplot function with different sets of parameters.

Below is an example plot generated by the code cell above. 

Partitioning Data

One of the most important concepts of distributed computing is how the data is partitioned. Dask normally performs operations on the default partitions. In some cases, users will have to define the logical partitions for operations performed on groups of data. fugue-sql gives control over the partitioning scheme with the PREPARTITION keyword. In the example below, the value_counts function is mapped to each of the partitions created (there are 4 unique improvement_surcharge values which create 4 partitions.

TRANSFORM is the keyword that allows us to use custom Python functions. Together, PREPARTITION and TRANSFORM are similar to the groupby-apply semantic of Dask. 

Deployment

Some users want to move their SQL code from Jupyter notebooks into Python scripts for deployment. This can be done by calling fsql and passing the query. We can then specify the execution engine in the run method. pandas is a default execution engine.

More Information

We have only scratched the surface of fugue and fugue-sql. fugue is an abstraction layer that makes code portable across differing computing frameworks such as Pandas, Spark and Dask. It decouples logic from execution engines by allowing users to write framework-agnostic code. The execution engine is then chosen. All questions about fugue and distributed computing are welcome in the fugue Slack channel.

Fugue Repo

Fugue Slack

Get The Most Out of Your Data Science and ML

Thanks for reading! As always, feel free to let us know what you think about this post. Plus, we’re always looking for more feedback on Coiled Cloud, which provides hosted Dask clusters, docker-less managed software, and zero-click deployments. You can try it out for free by clicking below.