Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial Zarr v3 Support #511

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .devcontainer/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
ARG PYTHON_VERSION=3.12
ARG PYTHON_VERSION=3.13
ARG LINUX_DISTRO=bookworm

FROM mcr.microsoft.com/devcontainers/python:1-${PYTHON_VERSION}-${LINUX_DISTRO}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.12"
python-version: "3.13"

- name: Upgrade pip
run: |
Expand Down
19 changes: 9 additions & 10 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,20 @@ jobs:
fail-fast: false
matrix:
include:
- { python: "3.12", os: "ubuntu-latest", session: "pre-commit" }
- { python: "3.12", os: "ubuntu-latest", session: "safety" }
- { python: "3.13", os: "ubuntu-latest", session: "pre-commit" }
- { python: "3.13", os: "ubuntu-latest", session: "safety" }
# - { python: "3.13", os: "ubuntu-latest", session: "mypy" }
# - { python: "3.12", os: "ubuntu-latest", session: "mypy" }
# - { python: "3.11", os: "ubuntu-latest", session: "mypy" }
# - { python: "3.10", os: "ubuntu-latest", session: "mypy" }
- { python: "3.13", os: "ubuntu-latest", session: "tests" }
- { python: "3.12", os: "ubuntu-latest", session: "tests" }
- { python: "3.11", os: "ubuntu-latest", session: "tests" }
- { python: "3.10", os: "ubuntu-latest", session: "tests" }
- { python: "3.12", os: "windows-latest", session: "tests" }
- { python: "3.12", os: "macos-latest", session: "tests" }
- { python: "3.13", os: "windows-latest", session: "tests" }
- { python: "3.13", os: "macos-latest", session: "tests" }
# - { python: "3.13", os: "ubuntu-latest", session: "typeguard" }
# - { python: "3.12", os: "ubuntu-latest", session: "typeguard" }
# - { python: "3.11", os: "ubuntu-latest", session: "typeguard" }
# - { python: "3.10", os: "ubuntu-latest", session: "typeguard" }
# - { python: "3.10", os: "ubuntu-latest", session: "xdoctest" }
- { python: "3.12", os: "ubuntu-latest", session: "docs-build" }
- { python: "3.13", os: "ubuntu-latest", session: "docs-build" }

env:
NOXSESSION: ${{ matrix.session }}
Expand Down Expand Up @@ -121,7 +120,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.12"
python-version: "3.13"

- name: Upgrade pip
run: |
Expand Down
4 changes: 2 additions & 2 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@


package = "mdio"
python_versions = ["3.12", "3.11", "3.10"]
nox.needs_version = ">= 2022.1.7"
python_versions = ["3.13", "3.12", "3.11"]
nox.needs_version = ">= 2024.10.9"
nox.options.sessions = (
"pre-commit",
"safety",
Expand Down
792 changes: 353 additions & 439 deletions poetry.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@ authors = [{ name = "Altay Sansal", email = "[email protected]" }]
license = "Apache-2.0"
readme = "README.md"
keywords = ["mdio", "multidimio", "seismic", "wind", "data"]
requires-python = ">=3.10,<3.13"
requires-python = ">=3.11,<3.14"
dependencies = [
"click (>=8.1.7,<9.0.0)",
"click-params (>=0.5.0,<0.6.0)",
"zarr (>=2.18.2,<3.0.0)",
"zarr (>=3.0.2,<4.0.0)",
"dask (>=2024.12.0)",
"tqdm (>=4.67.0,<5.0.0)",
"psutil (>=6.1.0,<7.0.0)",
"fsspec (>=2024.10.0)",
"segy (>=0.3.1,<0.4.0)",
"segy (>=0.4.0,<1.0.0)",
"rich (>=13.9.4,<14.0.0)"
]

Expand Down
35 changes: 18 additions & 17 deletions src/mdio/api/accessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,32 +186,33 @@ def _validate_store(self, storage_options):
if storage_options is None:
storage_options = {}

self.store = process_url(
self.url = process_url(
url=self.url,
mode=self.mode,
storage_options=storage_options,
memory_cache_size=self._memory_cache_size,
disk_cache=self._disk_cache,
)

def _connect(self):
"""Open the zarr root."""
try:
if self.mode in {"r", "r+"}:
self.root = zarr.open_consolidated(store=self.store, mode=self.mode)
elif self.mode == "w":
self.root = zarr.open(store=self.store, mode="r+")
else:
msg = f"Invalid mode: {self.mode}"
raise ValueError(msg)
except KeyError as e:
self.store = zarr.open(
self.url, mode=self.mode, storage_options=storage_options
).store
except FileNotFoundError as e:
msg = (
f"MDIO file not found or corrupt at {self.store.path}. "
f"MDIO file not found or corrupt at {self.url}. "
"Please check the URL or ensure it is not a deprecated "
"version of MDIO file."
)
raise MDIONotFoundError(msg) from e

def _connect(self):
"""Open the zarr root."""
if self.mode in {"r", "r+"}:
self.root = zarr.open_consolidated(store=self.store, mode=self.mode)
elif self.mode == "w":
self.root = zarr.open(store=self.store, mode="r+")
else:
msg = f"Invalid mode: {self.mode}"
raise ValueError(msg)

def _deserialize_grid(self):
"""Deserialize grid from Zarr metadata."""
self.grid = Grid.from_zarr(self.root)
Expand Down Expand Up @@ -375,12 +376,12 @@ def stats(self, value: dict) -> None:
@property
def _metadata_group(self) -> zarr.Group:
"""Get metadata zarr.group handle."""
return self.root.metadata
return self.root["metadata"]

@property
def _data_group(self) -> zarr.Group:
"""Get data zarr.Group handle."""
return self.root.data
return self.root["data"]

def __getitem__(self, item: int | tuple) -> npt.ArrayLike | da.Array | tuple:
"""Data getter."""
Expand Down
11 changes: 7 additions & 4 deletions src/mdio/api/convenience.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
from typing import TYPE_CHECKING

import zarr
from numcodecs import Blosc
from tqdm.auto import tqdm
from zarr import Blosc

from mdio.api.io_utils import process_url
from mdio.core.indexing import ChunkIterator
Expand Down Expand Up @@ -51,20 +51,22 @@ def copy_mdio( # noqa: PLR0913
Default is None (will assume anonymous).
overwrite: Overwrite destination or not.

Raises:
NotImplementedError: because Zarr v3 doesn't support copy.
"""
if storage_options is None:
storage_options = {}

dest_store = process_url(
url=dest_path_or_buffer,
mode="w",
storage_options=storage_options,
memory_cache_size=0,
disk_cache=False,
)

if_exists = "replace" if overwrite is True else "raise"

# TODO(Altay): Update this function when Zarr v3 supports copy.
raise NotImplementedError("Zarr version 3.0.0+ does not support copy yet.")

zarr.copy_store(
source=source.store,
dest=dest_store,
Expand Down Expand Up @@ -242,6 +244,7 @@ def rechunk_batch(
write_rechunked_values(source, suffix_list, *plan)


# TODO(Altay): This needs to be validated
def rechunk(
source: MDIOAccessor,
chunks: tuple[int, ...],
Expand Down
51 changes: 7 additions & 44 deletions src/mdio/api/io_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,27 @@

from __future__ import annotations

from typing import Any

import dask.array as da
import zarr
from zarr.storage import FSStore


def process_url(
url: str,
mode: str,
storage_options: dict[str, Any],
memory_cache_size: int,
disk_cache: bool,
) -> FSStore:
) -> str:
"""Check read/write access to FSStore target and return FSStore with double caching.

It can use an in-memory Least Recently Used (LRU) cache implementation from
Zarr, and optionally, a file cache (`simplecache` protocol from FSSpec) that
is useful for remote stores.

File cache is only valid for remote stores. The LRU caching works
on both remote and local.
It can optionally use a file cache (`simplecache` protocol from fsspec) that
is useful for remote stores. File cache is only useful for remote stores.

The `storage_options` argument represents a set of parameters to be passed
to the FSSpec backend. Note that the format of `storage_options` is
to the fsspec backend. Note that the format of `storage_options` is
different if `disk_cache` is enabled or disabled, since `disk_cache`
interanlly uses the simplecache protocol.

Args:
url: FSSpec compliant url
mode: Toggle for overwriting existing store
storage_options: Storage options for the storage backend.
memory_cache_size: Maximum in memory LRU cache size in bytes.
disk_cache: This enables FSSpec's `simplecache` if True.
url: fsspec compliant url
disk_cache: This enables fsspec's `simplecache` if True.

Returns:
Store with augmentations like cache, write verification etc.
Expand All @@ -52,7 +39,6 @@ def process_url(
... url="s3://bucket/key",
... mode="r",
... storage_options={"key": "my_key", "secret": "my_secret"},
... memory_cache_size=0,
... disk_cache=False,
... )

Expand All @@ -64,7 +50,6 @@ def process_url(
... url="s3://bucket/key",
... mode="r",
... storage_options={"s3": {"key": "my_key", "secret": "my_secret"}},
... memory_cache_size=0,
... disk_cache=True,
... )

Expand All @@ -77,35 +62,13 @@ def process_url(
... "s3": {"key": "my_key", "secret": "my_secret"},
... "simplecache": {"cache_storage": "custom/local/cache/path"},
... },
... memory_cache_size=0,
... disk_cache=True,
... )
"""
if disk_cache is True:
url = "::".join(["simplecache", url])

# Strip whitespaces and slashes from end of string
url = url.rstrip("/ ")

# Flag for checking write access
check = True if mode == "w" else False

# TODO: Turning off write checking now because zarr has a bug.
# Get rid of this once bug is fixed.
check = False

store = FSStore(
url=url,
check=check,
create=check,
mode=mode,
**storage_options,
)

if memory_cache_size != 0:
store = zarr.storage.LRUStoreCache(store=store, max_size=memory_cache_size)

return store
return url


def open_zarr_array(group_handle: zarr.Group, name: str) -> zarr.Array:
Expand Down
2 changes: 1 addition & 1 deletion src/mdio/commands/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def parse_grid(grid: Grid) -> dict[str, dict[str, int | str]]:
def parse_access_patterns(reader: MDIOReader) -> dict[str, Any]:
"""Extract access patterns and their info."""
access_pattern_dict = {}
for name, array in reader._data_group.items():
for name, array in reader._data_group.arrays():
pattern = name.replace("chunked_", "")
chunks = str(array.chunks)
format_ = str(array.dtype)
Expand Down
Loading
Loading