Creating Dask Array from Zarr files using from_zarr

Sultan Orazbayev June 2, 2022

, ,


This blog plost explains how to create Dask Arrays from Zarr files using the from_zarr function. The from_zarr function reads the data into a Dask array, which allows the user to work with arrays that are larger than memory using all of the available cores.

Here’s how to read a small Zarr file from the cloud into a Dask Array:

from dask.array import from_zarr
zar_file = "s3://coiled-datasets/synthetic-data/array-random-390KB.zarr"
x = from_zarr(zar_file)
 
print(x)
# dask.array<from-zarr, shape=(100, 100, 5), dtype=float64, chunksize=(10, 10, 1), chunktype=numpy.ndarray>

From the snippet above we can see that the Dask array contains 100x100x5 elements (float64) spread across 500 chunks, each with a shape of (10,10,1).

How from_zarr works

The Zarr file contains a chunked, compressed N-dimensional array (or a group of such arrays), with additional optimizations for efficient processing and support for the cloud. The appropriate Dask collection for working with arrays is Dask Array (https://docs.dask.org/en/stable/array.html). Dask Array initially loads only the metadata to get information on the shape and content of the array stored in the Zarr file. Based on this metadata, Dask will form small ‘chunks’, subsets of the original array, that can be processed individually for greater parallelism and smaller resource usage.

This lazy version of the full Zarr array allows for both faster speed of developing the downstream code and faster execution of the eventual code. Dask Array API implements a subset of the NumPy ndarray interface, and much of the downstream code should be interchangeable with plain NumPy code.

Controlling the shape of chunks with chunks

The array inside the Zarr file is stored in chunked format, however the default chunk size might not be optimal for downstream computations. Dask can create the array with the desired chunk shape via “chunks” kwarg. This keyword argument accepts different value types: 

  • int that specified the byte size of the block (chunk);
  • a string that specifies a human-friendly byte size, e.g. ‘10 mb’,
  • a tuple containing the shape of each chunk, e.g. (10, 3) will create chunks with 10 rows and 3 columns;
  • a tuple of tuples, that contains the sizes of all blocks, see https://stackoverflow.com/a/34904445/10693596 for further details.

For example, the following calculation would require execution of 376 tasks using the default chunks:

from dask.array import from_zarr
 
x = from_zarr("s3://coiled-datasets/from-zarr/group.zarr", component="group_foo/sample_array")
print(x)
# dask.array<from-zarr, shape=(10, 10), dtype=int32, chunksize=(2, 2), chunktype=numpy.ndarray>
 
product = x @ x.T
print(f"Number of tasks: {len((product).dask)}")
# Number of tasks: 376

On the other hand, since we know that multiplication of matrices A and B requires rows of A and columns of B, we can specify the chunks of A to include rows, and conveniently for this example transpose of row chunks will give us the column chunks of matrix B. By specifying the chunks to be (2,10), we reduce the number of tasks to 61:

from dask.array import from_zarr
 
x = from_zarr("s3://coiled-datasets/from-zarr/group.zarr", component="group_foo/sample_array", chunks=(2, 10))
print(x)
# dask.array<from-zarr, shape=(10, 10), dtype=int32, chunksize=(2, 2), chunktype=numpy.ndarray>
 
product = x @ x.T
print(f"Number of tasks: {len((product).dask)}")
# Number of tasks: 61

Note that in general, specifying custom chunks entails the fixed cost of rechunking (adjusting the data from the original chunks to the desired chunks), which can be expensive. Also, the tasks after rechunking should not be too resource-intensive to avoid excessive workload on the dask workers. Hence, a careful examination of the downstream code is needed to infer the optimal chunks. If performance is not of primary importance, then it’s best to let Dask determine the chunk size from the Zarr file specification.

Loading specific component of a grouped Zarr file

A Zarr file can contain multiple arrays, organized in groups. In such cases, Dask will need to know the path to the array of interest. For example, if file group.zarr contains group sample_group and inside this group there is an array sample_array, then creating Dask array will require passing component=”sample_group/sample_array” kwarg:

from dask.array import from_zarr
 
x = from_zarr("s3://coiled-datasets/from-zarr/group.zarr", component="group_foo/sample_array")
print(x)
# dask.array<from-zarr, shape=(10, 10), dtype=int32, chunksize=(2, 2), chunktype=numpy.ndarray>

Using inlining with inline_array

Note: this is an advanced topic and should in general require attention only if default performance requires improvement.

By design, calculations with Dask arrays are lazy until explicit evaluation is requested. The desired calculation can become complex with multiple layers of computations needed. Dask has a default strategy for ordering these calculations. This order does not matter if the computing power allows for simultaneous processing of all parallel tasks. However, if the resources are constrained, as is typically the case, then the order in which tasks are executed will matter.

Dask stores tasks in a task graph, which can be inspected via the .dask property of a Dask array. Let’s create a sample Dask array from Zarr file and inspect it’s task graph:

from dask.array import from_zarr
 
x = from_zarr("s3://coiled-datasets/from-zarr/sample.zarr")
print(x.dask)
# HighLevelGraph with 2 layers.
# <dask.highlevelgraph.HighLevelGraph object at 0x1877b8f40>
#  0. original-from-zarr-aa89d568fa5d6a465403e4a4457083a9
#  1. from-zarr-aa89d568fa5d6a465403e4a4457083a9

Note that at a high-level the task graph contains two layers, the first layer is to read the metadata from the Zarr file and the second layer contains instructions to load individual chunks. This is optimal for most use cases, however it introduces a common dependency between tasks that operate on the individual chunks. This dependency can alter the order in which downstream tasks are executed.

To avoid such dependencies, one can add the keyword argument inline_array=True, which embeds the information on the source Zarr file inside the instructions to process each chunk. This means that the resulting high-level task graph contains just one layer:

from dask.array import from_zarr
 
x = from_zarr("s3://coiled-datasets/from-zarr/sample.zarr", inline_array=True)
print(x.dask)
# HighLevelGraph with 1 layers.
# <dask.highlevelgraph.HighLevelGraph object at 0x1877ecac0>
#  0. from-zarr-dd5aae068dabbf1fea826d7abea7b6f8

By removing this dependency, the order in which downstream tasks are executed can be altered. A more detailed example is provided here: https://docs.dask.org/en/stable/order.html#order, and as noted in the docs (https://docs.dask.org/en/stable/generated/dask.array.from_array.html#dask.array.from_array):

The right choice for inline_array depends on several factors, including the size of x, how expensive it is to create, which scheduler you’re using, and the pattern of downstream computations. As a heuristic, inline_array=True may be the right choice when the array x is cheap to serialize and deserialize (since it’s included in the graph many times) and if you’re experiencing ordering issues (see Ordering for more).

Array computations in the cloud with coiled

Let’s look at a practical example of working with a large-scale Zarr array in the cloud using Coiled. This particular dataset contains simulated weather data (temperature) from dask-tutorial (see https://tutorial.dask.org/03_array.html). In this example, we are interested in calculating the average observed temperature for a particular month.

To perform this computation, we will load the Zarr file, identify the available arrays and stack them into a single Dask array, which can then be used for efficient computations.

from coiled import Cluster
from distributed import Client

cluster = Cluster(n_workers=3)
client = Client(cluster)

import zarr
from dask.array import from_zarr, stack
from matplotlib.pyplot import imshow

# load the components from the zarr array
zarr_file = "s3://coiled-datasets/from-zarr"
list_components = [
    "2014-01-01",
    "2014-01-02",
    "2014-01-03",
    "2014-01-04",
    "2014-01-05",
    "2014-01-06",
    "2014-01-07",
]

# create a list of dask arrays
arrays = [from_zarr(zarr_file, component=f"temperature/{c}") for c in list_components]
print(len(arrays))

# stack the arrays into a single array
temperature = stack(arrays, axis=0)

print(temperature)
# dask.array<stack, shape=(31, 5760, 11520), dtype=float64, chunksize=(1, 500, 500), chunktype=numpy.ndarray>

# calculate average temperature (at a single location across all dates)
result = temperature.mean(axis=0)

print(result)
# dask.array<mean_agg-aggregate, shape=(5760, 11520), dtype=float64, chunksize=(500, 500), chunktype=numpy.ndarray>

# visualize the result
imshow(result, cmap="RdBu_r")

Conclusion

The examples in this blog post show how to read information from Zarr files into Dask arrays. The advantage of using Dask for working with arrays is that the available resources are used efficiently to allow parallel computations on data that exceeds the available memory. By using Coiled for cloud-based computations we accelerated the analysis, since the data remained on the cloud and Dask workers could process individual chunks of the Zarr file in parallel.


Ready to get started?

Create your first cluster in minutes.