Coiled at the Dask Distributed Summit
• June 7, 2021
Coiled is built around Dask. We’re maintainers, contributors, and cheerleaders of Dask. Our goal is to make Dask more accessible to everyone. So, it doesn’t come as a surprise that the entire Coiled team was very excited for the Dask Distributed Summit this year. Collies were involved in everything from planning and organizing the Summit to presenting and facilitating the sessions.
In this blog post, we’ll discuss some sessions led by the Coiled team and highlight key takeaways from each session. We will cover:
- New improvements coming to Dask – Active memory management and an accelerated scheduler;
- Deeper look at Dask’s Internals and the Dask JupyterLab extension;
- Integrating Dask with Snowflake and Dask Cloud Deployments.
The session recordings are available on the Dask YouTube channel!
Dask SQL Query Engines – Dask + Snowflake!
Historically, Dask was not developed as a SQL project, but it was built with the capability to interoperate well with other systems. Matthew Rocklin, the CEO of Coiled, joined Miles Adkins, Partner Sales Engineer at Snowflake, for a presentation on the ongoing efforts to get the best of both worlds: SQL data warehouse with Snowflake and distributed data science with Dask.
Watch the recording: Dask SQL Query Engines
- Snowflake is excellent at centralized data storage and optimized SQL querying, but it doesn’t support advanced Python activities like machine learning.
- Dask can work with a wide variety of data (both traditional SQL databases, and non-traditional forms of data) and handle advanced ML workloads, but Dask isn’t good at data storage.
- Hence, it makes sense to think of Snowflake + Dask as a best-of-breed solution for scaling.
- As of today, Snowflake and Dask can interoperate in the following ways:
dask.distributed.read_sql_table()– works on all major databases and can perform distributed read/write, but it’s slow because it’s backed by ODBC and it doesn’t support complex queries.
- Using Dask Delayed – can also perform distributed read/write, but it’s complicated to use and requires manual effort.
- By exporting to Parquet – can perform fast and parallel read/write, but copies of data occupy unnecessary storage space.
- Matthew and Miles demonstrated native Snowflake + Dask support that can quickly read/write data in parallel, take care of data partitioning, and manage cleanups automatically:
ddf = dask_snowflake.from_snowflake( query=""" SELECT * FROM TableA JOIN TableB ON ... """, conn=conn, )
The Dask JupyterLab Extension – Access Dask Diagnostic Dashboards in JupyterLab
Dask’s diagnostics dashboards are an essential part of Dask. They allow us to visualize the state of our cluster in real-time, help diagnose any unexpected behavior, and analyze how the computation is performing. The Dask JupyterLab extension integrates Dask’s Diagnostic dashboards with the JupyterLab interface. Ian Rose, a Software Engineer at Coiled and the maintainer of the Dask JupyterLab extension, gave a very engaging talk at the Summit about this extension.
Watch the recording: Dask JupyterLab Extension
- Using Dask in a Jupyter Notebook has some pain points: constantly shuffling between browser windows, and frequently copy/pasting URLs.
- The JupyterLab IDE was developed to have first-class support for extensions (and as of JupyterLab 3, easy installation of extensions!), and flexible layouts.
- The Dask JupyterLab extension has two main components:
- The Dask dashboard launcher allows us to launch Dask’s diagnostic dashboards in the JupyterLab workspace, and
- The cluster manager allows us to start, stop, and scale Dask clusters.
- The team welcomes contributions to the project. Some improvements on the roadmap that you can help with include:
- Make it easier to connect to clusters when there is authentication and private clusters involved,
- Improve the workflow for creating custom cluster configurations, and
- Customize the extension’s frontend interface to display only the components that are relevant for individual use cases.
Active Memory Management on Dask.distributed
Guido Imperiale, a software engineer at Coiled, discussed ongoing efforts to improve how the Dask distributed scheduler manages memory across the cluster.
Watch the recording: Active Memory Management on Dask.distributed
- Dask.distributed has completely automated memory management. It copies data around (keys), which can result in unwanted redundancy and cause an imbalance as more keys accumulate on a single worker.
- Specific methods that are being redesigned:
rebalance()will get a performance boost going from the current O(nlogn) to O(1), it will start considering unmanaged memory, and more.
retire_workers()will be able to robustly run in the middle of a computation, shrink clusters without waiting for them to become idle, and pause+restart workers more gracefully.
replicate()will get the ability to define min/max number of replicas for specific keys and override it for specific keys, among other improvements.
- These changes will also be reflected in the diagnostic dashboards:
Statusdashboard now shows the total memory stored on the cluster, and the memory per worker visualization includes managed, unmanaged, and spilled memory; and
- In the
Workersdashboard, there are new columns for managed, unmanaged, and spilled memory!
Doing Nothing Poorly: Accelerating Dask Scheduling
In a collaborative effort from Dask contributors from NVIDIA, Capital One, Coiled, and the entire Dask community, there is ongoing work to improve the performance of the Dask scheduler. In this talk, Matthew Rocklin, James Bourbeau, and Gabe Joseph from Coiled joined Benjamin Zaitlen, Gil Forsyth, John Kirkham, Mads R. B. Kristensen, and Richard Zamora to discuss the details of this work.
- Dask works great most of the time, but in some cases, it performs poorly:
- When the tasks are incredibly short, Dask itself becomes an overhead; and
- When we have a lot of workers, constant communication with the workers can become a burden on the scheduler.
- The task graph is currently generated on the client-side. High-level graphs can be generated quickly, but low-level graphs can get very complex and very expensive to create. Hence, the idea is to move low-level graph generation to the scheduler, which can do the serialization much more efficiently.
- Profiling is a big part of the work and helps us understand the flow of tasks. Many profiling techniques are being used:
- Dask’s statistical profiler is a good starting point that provides a broad view,
- cProfile captures every single function call, and
- py-spy provides a good balance between the big-picture and granular viewpoints.
- Cython can perform as fast as C and have maintainable Python-like syntax, so some parts of Dask are being re-written in Cython.
Hacking Dask: Diving Into Dask’s Internals
James Bourbeau, Lead OSS Engineer at Coiled, and Julia Signell took a workshop on advanced operations on Dask collections and details about the Dask scheduler.
Watch the recording: Hacking Dask
- Blocked algorithms operate on each block in isolation. Custom block computations allow us to perform such block-wise computations:
dask.array.map_blockslets us map custom function over arrays. It works with single and multiple arrays and provides useful special arguments like
dask.dataframe.map_partitionsis similar to
map_blocks, but for Dataframes.
dask.array.map_overlapis useful when we want to perform block-wise computation but require information from neighboring blocks, for example: applying a filter over an image.
- For more fine-grained control over block-wise computation, each Dask collection has a
reductionmethod. It is useful when output dimensions do not match the input.
dask.optimizationcontains various methods to transform graphs in different ways. It is a useful vehicle working with custom graphs.
cullhelps remove unnecessary tasks.
inline_functionshelps inline constants and cheap operations respectively.
fusehelps combine merge together linear tasks.
- There are many ways to deploy Dask clusters: manual setup (using CLI
$ dask-scheduler), using cluster managers, Dask projects like Dask-Kubernetes and Dask-Cloudprovider, and companies like Coiled.
- Besides the Diagnostic Dashboard, Dask provides a number of objects and attributes that help us understand the state of the cluster.
The Dask Community in Australia
There is an active community of Dask users and contributors in Australia. Some of them came together to engage in this open panel discussion about everything Dask. Hugo Bowne-Anderson, Head of Data Science Evangelism at Coiled, participated in the session along with Genevieve Buckley, Ben Leighton, Draga Doncila Pop, and Tisham Dhar.
Watch the recording: Dask Down Under: Panel Discussion
- Dask has a wide range of use cases. The panelists have seen Dask used for geospatial imaging, biological imaging, large-scale text processing, climate analysis, workflow management, and financial data modeling, among many others.
- Dask Dataframe is a common entry path into Dask, but Dask is much bigger than that. It’s a complete distributed framework that many other projects can build on. The Napari project is an excellent example of Dask running behind the scenes and allowing researchers to work with their data comfortably.
- There is a perception that parallel computing is difficult. That is true sometimes, but Python, along with tools like Dask and xarray, are helping change that perception.
- Dask is good at reducing step-cliffs. Dask makes it seamless to scale from a laptop to a cluster – the code remains similar and familiar.
Dask for Everyone with Coiled
Hugo also presented a talk about the IT challenges of scalable compute and how to make distributed computing in the cloud easier.
Watch the recording: Dask for Everyone with Coiled
- Starting with an example of pandas+XGBoost (and +Dask) on a laptop, we see that this workflow is accessible to anyone who owns a laptop and has the time to learn.
- Looking at the same pandas+XGBoost+Dask on a cluster, we see this is not accessible to anyone today.
- A typical setup for data science on a cluster involves: learning about Dask on the cloud, setting up AWS (or any cloud platform), starting a Kubernetes cluster, and installing kubectl, then running the computation. This throws multiple errors, so follow-up tasks include pushing a Docker image and sharing up AWS credentials across workers.
- Then, IT challenges begin – large AWS bills due to idle systems, security issues, and more.
- Coiled was built by Dask contributors who saw this play out multiple times. Coiled provides hosted Dask Clusters, manages software environments and costs, runs from anywhere, and even supports GPUs. Products like Coiled Cloud can help reduce the time spent on setting up clusters from weeks to minutes.
Get The Most Out of Your Data Science and ML
Thanks for reading! As always, feel free to let us know what you think about this post. Plus, we’re always looking for more feedback on Coiled Cloud, which provides hosted Dask clusters, docker-less managed software, and zero-click deployments. You can try it out for free by clicking below.