Skip to content

Commit

Permalink
Make it possible to configure alternative storage backends. This will…
Browse files Browse the repository at this point in the history
… make it easier to work with Zarr v3. (#480)
  • Loading branch information
tomwhite authored Jun 19, 2024
1 parent fe71cb4 commit a01a576
Show file tree
Hide file tree
Showing 9 changed files with 87 additions and 15 deletions.
8 changes: 7 additions & 1 deletion cubed/core/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from cubed.primitive.blockwise import general_blockwise as primitive_general_blockwise
from cubed.primitive.rechunk import rechunk as primitive_rechunk
from cubed.spec import spec_from_config
from cubed.storage.backend import open_backend_array
from cubed.utils import (
_concatenate2,
array_memory,
Expand Down Expand Up @@ -106,7 +107,12 @@ def from_zarr(store, path=None, spec=None) -> "Array":
"""
name = gensym()
spec = spec or spec_from_config(config)
target = zarr.open(store, path=path, mode="r", storage_options=spec.storage_options)
target = open_backend_array(
store,
mode="r",
path=path,
storage_options=spec.storage_options,
)

from cubed.array_api import Array

Expand Down
36 changes: 36 additions & 0 deletions cubed/storage/backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from typing import Optional

from cubed import config
from cubed.types import T_DType, T_RegularChunks, T_Shape, T_Store


def open_backend_array(
store: T_Store,
mode: str,
*,
shape: Optional[T_Shape] = None,
dtype: Optional[T_DType] = None,
chunks: Optional[T_RegularChunks] = None,
path: Optional[str] = None,
**kwargs,
):
# get storage name from top-level config
# e.g. set globally with CUBED_STORAGE_NAME=tensorstore
storage_name = config.get("storage_name", None)

if storage_name is None or storage_name == "zarr-python":
from cubed.storage.backends.zarr_python import open_zarr_array

open_func = open_zarr_array
else:
raise ValueError(f"Unrecognized storage name: {storage_name}")

return open_func(
store,
mode,
shape=shape,
dtype=dtype,
chunks=chunks,
path=path,
**kwargs,
)
Empty file.
26 changes: 26 additions & 0 deletions cubed/storage/backends/zarr_python.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from typing import Optional

import zarr

from cubed.types import T_DType, T_RegularChunks, T_Shape, T_Store


def open_zarr_array(
store: T_Store,
mode: str,
*,
shape: Optional[T_Shape] = None,
dtype: Optional[T_DType] = None,
chunks: Optional[T_RegularChunks] = None,
path: Optional[str] = None,
**kwargs,
):
return zarr.open_array(
store,
mode=mode,
shape=shape,
dtype=dtype,
chunks=chunks,
path=path,
**kwargs,
)
5 changes: 3 additions & 2 deletions cubed/storage/zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import zarr
from toolz import reduce

from cubed.storage.backend import open_backend_array
from cubed.types import T_DType, T_RegularChunks, T_Shape, T_Store


Expand Down Expand Up @@ -55,7 +56,7 @@ def create(self, mode: str = "w-") -> zarr.Array:
The mode to open the Zarr array with using ``zarr.open``.
Default is 'w-', which means create, fail it already exists.
"""
target = zarr.open_array(
target = open_backend_array(
self.store,
mode=mode,
shape=self.shape,
Expand All @@ -72,7 +73,7 @@ def open(self) -> zarr.Array:
Note that the Zarr array must have been created or this method will raise an exception.
"""
# r+ means read/write, fail if it doesn't exist
return zarr.open_array(
return open_backend_array(
self.store,
mode="r+",
shape=self.shape,
Expand Down
7 changes: 4 additions & 3 deletions cubed/tests/primitive/test_blockwise.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
make_blockwise_key_function,
)
from cubed.runtime.executors.local import SingleThreadedExecutor
from cubed.storage.backend import open_backend_array
from cubed.tests.utils import create_zarr, execute_pipeline
from cubed.vendor.dask.blockwise import make_blockwise_graph

Expand Down Expand Up @@ -66,7 +67,7 @@ def test_blockwise(tmp_path, executor, reserved_mem):

execute_pipeline(op.pipeline, executor=executor)

res = zarr.open_array(target_store)
res = open_backend_array(target_store, mode="r")
assert_array_equal(res[:], np.outer([0, 1, 2], [10, 50, 100]))


Expand Down Expand Up @@ -129,7 +130,7 @@ def test_blockwise_with_args(tmp_path, executor):

execute_pipeline(op.pipeline, executor=executor)

res = zarr.open_array(target_store)
res = open_backend_array(target_store, mode="r")
assert_array_equal(
res[:], np.transpose(np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]]), axes=(1, 0))
)
Expand Down Expand Up @@ -220,7 +221,7 @@ def key_function(out_key):

execute_pipeline(op.pipeline, executor=executor)

res = zarr.open_array(target_store)
res = open_backend_array(target_store, mode="r")
assert_array_equal(res[:], np.arange(20))


Expand Down
3 changes: 2 additions & 1 deletion cubed/tests/primitive/test_rechunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from cubed.primitive.rechunk import rechunk
from cubed.runtime.executors.local import SingleThreadedExecutor
from cubed.storage.backend import open_backend_array
from cubed.tests.utils import execute_pipeline


Expand Down Expand Up @@ -94,7 +95,7 @@ def test_rechunk(
for op in ops:
execute_pipeline(op.pipeline, executor=executor)

res = zarr.open_array(target_store)
res = open_backend_array(target_store, mode="r")
assert_array_equal(res[:], np.ones(shape))
assert res.chunks == target_chunks

Expand Down
11 changes: 6 additions & 5 deletions cubed/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from cubed.backend_array_api import namespace as nxp
from cubed.core.ops import merge_chunks, partial_reduce, tree_reduce
from cubed.core.optimization import fuse_all_optimize_dag, multiple_inputs_optimize_dag
from cubed.storage.backend import open_backend_array
from cubed.tests.utils import (
ALL_EXECUTORS,
MAIN_EXECUTORS,
Expand Down Expand Up @@ -107,7 +108,7 @@ def test_from_array_zarr(tmp_path, spec):
store=store,
)
a = cubed.from_array(za, spec=spec)
assert_array_equal(a, za)
assert_array_equal(a, za[:])


@pytest.mark.parametrize("path", [None, "sub", "sub/group"])
Expand Down Expand Up @@ -167,9 +168,9 @@ def test_store_fails(tmp_path, spec):
@pytest.mark.parametrize("path", [None, "sub", "sub/group"])
def test_to_zarr(tmp_path, spec, executor, path):
a = xp.asarray([[1, 2, 3], [4, 5, 6], [7, 8, 9]], chunks=(2, 2), spec=spec)
store = zarr.storage.DirectoryStore(tmp_path / "output.zarr")
store = tmp_path / "output.zarr"
cubed.to_zarr(a, store, path=path, executor=executor)
res = zarr.open_array(store, path=path)
res = open_backend_array(store, mode="r", path=path)
assert_array_equal(res[:], np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]]))


Expand Down Expand Up @@ -599,8 +600,8 @@ def test_quad_means(tmp_path, t_length=50):
m1, store=tmp_path / "result1", optimize_function=fuse_all_optimize_dag
)

res0 = zarr.open_array(tmp_path / "result0")
res1 = zarr.open_array(tmp_path / "result1")
res0 = open_backend_array(tmp_path / "result0", mode="r")
res1 = open_backend_array(tmp_path / "result1", mode="r")

assert_array_equal(res0[:], res1[:])

Expand Down
6 changes: 3 additions & 3 deletions cubed/tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@

import networkx as nx
import numpy as np
import zarr

from cubed.runtime.create import create_executor
from cubed.runtime.types import Callback
from cubed.storage.backend import open_backend_array

LITHOPS_LOCAL_CONFIG = {
"lithops": {
"backend": "localhost",
"storage": "localhost",
"monitoring_interval": 0.1,
"include_modules": None
"include_modules": None,
},
"localhost": {"version": 1},
}
Expand Down Expand Up @@ -89,7 +89,7 @@ def create_zarr(a, /, store, *, dtype=None, chunks=None, path=None):
dtype = a.dtype

# write to zarr
za = zarr.open(
za = open_backend_array(
store, mode="w", shape=a.shape, dtype=dtype, chunks=chunks, path=path
)
za[:] = a
Expand Down

0 comments on commit a01a576

Please sign in to comment.