diff --git a/ci/upstream.yml b/ci/upstream.yml index 91272fab..feee5044 100644 --- a/ci/upstream.yml +++ b/ci/upstream.yml @@ -28,6 +28,6 @@ dependencies: - fsspec - pip - pip: - - icechunk # Installs zarr v3 as dependency - # - git+https://github.com/fsspec/kerchunk@main # kerchunk is currently incompatible with zarr-python v3 (https://github.com/fsspec/kerchunk/pull/516) - - imagecodecs-numcodecs==2024.6.1 + - icechunk>=0.1.0a7 # Installs zarr v3 as dependency + # - git+https://github.com/fsspec/kerchunk@main # kerchunk is currently incompatible with zarr-python v3 (https://github.com/fsspec/kerchunk/pull/516) + - imagecodecs-numcodecs==2024.6.1 diff --git a/conftest.py b/conftest.py index 55c07823..e86b9244 100644 --- a/conftest.py +++ b/conftest.py @@ -1,3 +1,5 @@ +from typing import Any, Dict, Optional + import h5py import numpy as np import pytest @@ -35,6 +37,32 @@ def netcdf4_file(tmpdir): return filepath +@pytest.fixture +def netcdf4_files_factory(tmpdir) -> callable: + def create_netcdf4_files( + encoding: Optional[Dict[str, Dict[str, Any]]] = None, + ) -> tuple[str, str]: + ds = xr.tutorial.open_dataset("air_temperature") + + # Split dataset into two parts + ds1 = ds.isel(time=slice(None, 1460)) + ds2 = ds.isel(time=slice(1460, None)) + + # Save datasets to disk as NetCDF in the temporary directory with the provided encoding + filepath1 = f"{tmpdir}/air1.nc" + filepath2 = f"{tmpdir}/air2.nc" + ds1.to_netcdf(filepath1, encoding=encoding) + ds2.to_netcdf(filepath2, encoding=encoding) + + # Close datasets + ds1.close() + ds2.close() + + return filepath1, filepath2 + + return create_netcdf4_files + + @pytest.fixture def netcdf4_file_with_2d_coords(tmpdir): ds = xr.tutorial.open_dataset("ROMS_example") @@ -71,26 +99,6 @@ def hdf5_groups_file(tmpdir): return filepath -@pytest.fixture -def netcdf4_files(tmpdir): - # Set up example xarray dataset - ds = xr.tutorial.open_dataset("air_temperature") - - # split inrto equal chunks so we can concatenate them back together later - ds1 = ds.isel(time=slice(None, 1460)) - ds2 = ds.isel(time=slice(1460, None)) - - # Save it to disk as netCDF (in temporary directory) - filepath1 = f"{tmpdir}/air1.nc" - filepath2 = f"{tmpdir}/air2.nc" - ds1.to_netcdf(filepath1) - ds2.to_netcdf(filepath2) - ds1.close() - ds2.close() - - return filepath1, filepath2 - - @pytest.fixture def hdf5_empty(tmpdir): filepath = f"{tmpdir}/empty.nc" diff --git a/docs/releases.rst b/docs/releases.rst index 35b7c488..e42e7662 100644 --- a/docs/releases.rst +++ b/docs/releases.rst @@ -11,6 +11,7 @@ New Features - Add a ``virtual_backend_kwargs`` keyword argument to file readers and to ``open_virtual_dataset``, to allow reader-specific options to be passed down. (:pull:`315`) By `Tom Nicholas `_. +- Added append functionality to `to_icechunk` (:pull:`272`) By `Aimee Barciauskas `_. Breaking changes ~~~~~~~~~~~~~~~~ diff --git a/examples/append/noaa-cdr-sst.ipynb b/examples/append/noaa-cdr-sst.ipynb new file mode 100644 index 00000000..0f80b169 --- /dev/null +++ b/examples/append/noaa-cdr-sst.ipynb @@ -0,0 +1,1279 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "120903d6-ea52-4a1e-83d2-4d434ad2cb98", + "metadata": {}, + "source": [ + "# Appending to an Icechunk Store with Virtual References\n", + "\n", + "This notebook demonstrates how to append to an icechunk store.\n", + "\n", + "Please ensure the correct dependencies are installed before starting." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "d09bbff3-4e96-4490-b837-14b78b64df35", + "metadata": {}, + "outputs": [], + "source": [ + "# !pip install -e \".[icechunk]\"\n", + "# !pip install git+https://github.com/mpiannucci/kerchunk@v3\n", + "# !pip install fsspec s3fs" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "3055eff4-9e22-4a95-a7fd-96933f381183", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Name: icechunk\n", + "Version: 0.1.0a7\n", + "Summary: Transactional storage engine for Zarr designed for use on cloud object storage\n", + "Home-page: https://github.com/earth-mover/icechunk\n", + "Author: Earthmover PBC\n", + "Author-email: \n", + "License: Apache-2.0\n", + "Location: /Users/aimeebarciauskas/github/virtualizarr/venv/lib/python3.12/site-packages\n", + "Requires: zarr\n", + "Required-by: \n" + ] + } + ], + "source": [ + "!pip show icechunk" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "2f69f0bb-316b-452c-b1ba-4d7ef4afcf67", + "metadata": {}, + "outputs": [], + "source": [ + "import warnings\n", + "\n", + "import fsspec\n", + "import xarray as xr\n", + "from icechunk import IcechunkStore, StorageConfig, StoreConfig, VirtualRefConfig\n", + "\n", + "from virtualizarr import open_virtual_dataset\n", + "\n", + "warnings.filterwarnings(\"ignore\", category=UserWarning)" + ] + }, + { + "cell_type": "markdown", + "id": "0df547e4-456d-44c1-b190-606f0b9e056e", + "metadata": {}, + "source": [ + "# Before you start\n", + "\n", + "Identify the dataset you will be using and create a list of files to generate a virtual icechunk datastore with." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "1532c33b-804f-49fa-9fa9-0eb42ea87e26", + "metadata": {}, + "outputs": [], + "source": [ + "fs = fsspec.filesystem(\"s3\", anon=True)\n", + "\n", + "oisst_files = fs.glob(\n", + " \"s3://noaa-cdr-sea-surface-temp-optimum-interpolation-pds/data/v2.1/avhrr/202408/oisst-avhrr-v02r01.*.nc\"\n", + ")\n", + "\n", + "oisst_files = sorted([\"s3://\" + f for f in oisst_files])" + ] + }, + { + "cell_type": "markdown", + "id": "73ceb93b-b0ac-48b2-928a-84da0d2019ac", + "metadata": {}, + "source": [ + "## Create virtual datasets with VirtualiZarr's `open_virtual_dataset`" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "06bbec92-3974-4859-8bda-353afc7800b9", + "metadata": {}, + "outputs": [], + "source": [ + "so = dict(anon=True, default_fill_cache=False, default_cache_type=\"none\")\n", + "\n", + "virtual_datasets = [\n", + " open_virtual_dataset(url, indexes={}, reader_options={\"storage_options\": so})\n", + " for url in oisst_files[0:2]\n", + "]" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "77fb94c8-870f-4c9e-8421-ac9c17402122", + "metadata": {}, + "outputs": [], + "source": [ + "virtual_ds = xr.concat(\n", + " virtual_datasets,\n", + " dim=\"time\",\n", + " coords=\"minimal\",\n", + " compat=\"override\",\n", + " combine_attrs=\"override\",\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "abefd6fa-386a-4e07-a7c8-219d3730eeeb", + "metadata": {}, + "outputs": [], + "source": [ + "# Clean up the store if running this notebook multiple times.\n", + "#!rm -rf ./noaa-cdr-icechunk/" + ] + }, + { + "cell_type": "markdown", + "id": "05f41a0b-6292-419d-a9d3-d8ddf8c0c15b", + "metadata": {}, + "source": [ + "## Initialize the Icechunk Store" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "79a4228a-0e17-4b07-9144-f24fe06db832", + "metadata": {}, + "outputs": [], + "source": [ + "storage_config = StorageConfig.filesystem(\"./noaa-cdr-icechunk\")\n", + "virtual_ref_store_config = StoreConfig(\n", + " virtual_ref_config=VirtualRefConfig.s3_anonymous(region=\"us-east-1\"),\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "5fd0c082-8d5e-46a8-a994-fee80baa4ecc", + "metadata": {}, + "outputs": [], + "source": [ + "store = IcechunkStore.create(\n", + " storage=storage_config, config=virtual_ref_store_config, read_only=False\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "749193c1-38b9-4400-a08f-f0a675d30f06", + "metadata": {}, + "source": [ + "## Write the virtual datasets to the icechunk store and commit" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "9387e1ff-46c1-45fd-9796-0457538209a7", + "metadata": {}, + "outputs": [], + "source": [ + "virtual_ds.virtualize.to_icechunk(store)" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "53a74fb9-006b-4d2b-9157-7090af6c9e09", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "'R1BP6057NW5A1ZANMBDG'" + ] + }, + "execution_count": 11, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "store.commit(\"first 2 days of 202408 data\")" + ] + }, + { + "cell_type": "markdown", + "id": "8becd176-1c7d-4c74-a3f1-1b9f55b445a2", + "metadata": {}, + "source": [ + "## Check your work!" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "b6271bd1-bc0b-4901-9901-91aabe508cf7", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "
<xarray.Dataset> Size: 66MB\n",
+       "Dimensions:  (time: 2, zlev: 1, lat: 720, lon: 1440)\n",
+       "Coordinates:\n",
+       "  * time     (time) datetime64[ns] 16B 2024-08-01T12:00:00 2024-08-02T12:00:00\n",
+       "  * zlev     (zlev) float32 4B 0.0\n",
+       "  * lat      (lat) float32 3kB -89.88 -89.62 -89.38 -89.12 ... 89.38 89.62 89.88\n",
+       "  * lon      (lon) float32 6kB 0.125 0.375 0.625 0.875 ... 359.4 359.6 359.9\n",
+       "Data variables:\n",
+       "    err      (time, zlev, lat, lon) float64 17MB ...\n",
+       "    anom     (time, zlev, lat, lon) float64 17MB ...\n",
+       "    ice      (time, zlev, lat, lon) float64 17MB ...\n",
+       "    sst      (time, zlev, lat, lon) float64 17MB ...\n",
+       "Attributes: (12/37)\n",
+       "    Conventions:                CF-1.6, ACDD-1.3\n",
+       "    cdm_data_type:              Grid\n",
+       "    comment:                    Data was converted from NetCDF-3 to NetCDF-4 ...\n",
+       "    creator_email:              oisst-help@noaa.gov\n",
+       "    creator_url:                https://www.ncei.noaa.gov/\n",
+       "    date_created:               2024-08-16T09:12:00Z\n",
+       "    ...                         ...\n",
+       "    source:                     ICOADS, NCEP_GTS, GSFC_ICE, NCEP_ICE, Pathfin...\n",
+       "    standard_name_vocabulary:   CF Standard Name Table (v40, 25 January 2017)\n",
+       "    summary:                    NOAAs 1/4-degree Daily Optimum Interpolation ...\n",
+       "    time_coverage_end:          2024-08-01T23:59:59Z\n",
+       "    time_coverage_start:        2024-08-01T00:00:00Z\n",
+       "    title:                      NOAA/NCEI 1/4 Degree Daily Optimum Interpolat...
" + ], + "text/plain": [ + " Size: 66MB\n", + "Dimensions: (time: 2, zlev: 1, lat: 720, lon: 1440)\n", + "Coordinates:\n", + " * time (time) datetime64[ns] 16B 2024-08-01T12:00:00 2024-08-02T12:00:00\n", + " * zlev (zlev) float32 4B 0.0\n", + " * lat (lat) float32 3kB -89.88 -89.62 -89.38 -89.12 ... 89.38 89.62 89.88\n", + " * lon (lon) float32 6kB 0.125 0.375 0.625 0.875 ... 359.4 359.6 359.9\n", + "Data variables:\n", + " err (time, zlev, lat, lon) float64 17MB ...\n", + " anom (time, zlev, lat, lon) float64 17MB ...\n", + " ice (time, zlev, lat, lon) float64 17MB ...\n", + " sst (time, zlev, lat, lon) float64 17MB ...\n", + "Attributes: (12/37)\n", + " Conventions: CF-1.6, ACDD-1.3\n", + " cdm_data_type: Grid\n", + " comment: Data was converted from NetCDF-3 to NetCDF-4 ...\n", + " creator_email: oisst-help@noaa.gov\n", + " creator_url: https://www.ncei.noaa.gov/\n", + " date_created: 2024-08-16T09:12:00Z\n", + " ... ...\n", + " source: ICOADS, NCEP_GTS, GSFC_ICE, NCEP_ICE, Pathfin...\n", + " standard_name_vocabulary: CF Standard Name Table (v40, 25 January 2017)\n", + " summary: NOAAs 1/4-degree Daily Optimum Interpolation ...\n", + " time_coverage_end: 2024-08-01T23:59:59Z\n", + " time_coverage_start: 2024-08-01T00:00:00Z\n", + " title: NOAA/NCEI 1/4 Degree Daily Optimum Interpolat..." + ] + }, + "execution_count": 12, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "ds = xr.open_zarr(store, consolidated=False, zarr_format=3)\n", + "ds" + ] + }, + { + "cell_type": "markdown", + "id": "23dd5a13-9c0e-4132-9073-474c0af65920", + "metadata": {}, + "source": [ + "# Append\n", + "\n", + "That was all nothing new! Basically a repeat of what is in the [icechunk docs](https://icechunk.io/icechunk-python/virtual/). Here we follow the same steps to create a virtual dataset, but we add an `append_dim` argument to the `to_icechunk` function." + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "id": "190c25f9-e000-4b17-83eb-cf551141dfea", + "metadata": {}, + "outputs": [], + "source": [ + "virtual_datasets_a = [\n", + " open_virtual_dataset(\n", + " url, indexes={}, reader_options={\"storage_options\": {\"anon\": True}}\n", + " )\n", + " for url in oisst_files[2:4]\n", + "]" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "id": "af330082-207a-4f08-aefe-fc15aa8b2eb3", + "metadata": {}, + "outputs": [], + "source": [ + "virtual_ds_a = xr.concat(\n", + " virtual_datasets_a,\n", + " dim=\"time\",\n", + " coords=\"minimal\",\n", + " compat=\"override\",\n", + " combine_attrs=\"override\",\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "id": "b6137cc1-b996-4e60-8c12-01eb19930da6", + "metadata": {}, + "outputs": [], + "source": [ + "append_store = IcechunkStore.open_existing(\n", + " storage=storage_config, config=virtual_ref_store_config, read_only=False\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "id": "a465be46-bb81-4e36-b1b6-67c3b8e4b9ec", + "metadata": {}, + "outputs": [], + "source": [ + "virtual_ds_a.virtualize.to_icechunk(append_store, append_dim=\"time\")" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "id": "e9908d2f-664b-4256-b9d4-842df2e512c3", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "'0HE5RZ869HTG8RZESHCG'" + ] + }, + "execution_count": 17, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "append_store.commit(\"wrote 2 more days of data\")" + ] + }, + { + "cell_type": "markdown", + "id": "e1384e99-c284-4942-a49b-7799802728b0", + "metadata": {}, + "source": [ + "# Check that it worked!" + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "id": "477094aa-2094-46e2-ae78-516fc2a51690", + "metadata": {}, + "outputs": [], + "source": [ + "read_store = IcechunkStore.open_existing(\n", + " storage=storage_config, config=virtual_ref_store_config, read_only=True\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "id": "47a53027-dbae-48aa-85d2-dcbc04e01e61", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "
<xarray.Dataset> Size: 133MB\n",
+       "Dimensions:  (time: 4, zlev: 1, lat: 720, lon: 1440)\n",
+       "Coordinates:\n",
+       "  * lat      (lat) float32 3kB -89.88 -89.62 -89.38 -89.12 ... 89.38 89.62 89.88\n",
+       "  * time     (time) datetime64[ns] 32B 2024-08-01T12:00:00 ... 2024-08-04T12:...\n",
+       "  * zlev     (zlev) float32 4B 0.0\n",
+       "  * lon      (lon) float32 6kB 0.125 0.375 0.625 0.875 ... 359.4 359.6 359.9\n",
+       "Data variables:\n",
+       "    anom     (time, zlev, lat, lon) float64 33MB ...\n",
+       "    ice      (time, zlev, lat, lon) float64 33MB ...\n",
+       "    err      (time, zlev, lat, lon) float64 33MB ...\n",
+       "    sst      (time, zlev, lat, lon) float64 33MB ...\n",
+       "Attributes: (12/37)\n",
+       "    Conventions:                CF-1.6, ACDD-1.3\n",
+       "    cdm_data_type:              Grid\n",
+       "    comment:                    Data was converted from NetCDF-3 to NetCDF-4 ...\n",
+       "    creator_email:              oisst-help@noaa.gov\n",
+       "    creator_url:                https://www.ncei.noaa.gov/\n",
+       "    date_created:               2024-08-18T09:12:00Z\n",
+       "    ...                         ...\n",
+       "    source:                     ICOADS, NCEP_GTS, GSFC_ICE, NCEP_ICE, Pathfin...\n",
+       "    standard_name_vocabulary:   CF Standard Name Table (v40, 25 January 2017)\n",
+       "    summary:                    NOAAs 1/4-degree Daily Optimum Interpolation ...\n",
+       "    time_coverage_end:          2024-08-03T23:59:59Z\n",
+       "    time_coverage_start:        2024-08-03T00:00:00Z\n",
+       "    title:                      NOAA/NCEI 1/4 Degree Daily Optimum Interpolat...
" + ], + "text/plain": [ + " Size: 133MB\n", + "Dimensions: (time: 4, zlev: 1, lat: 720, lon: 1440)\n", + "Coordinates:\n", + " * lat (lat) float32 3kB -89.88 -89.62 -89.38 -89.12 ... 89.38 89.62 89.88\n", + " * time (time) datetime64[ns] 32B 2024-08-01T12:00:00 ... 2024-08-04T12:...\n", + " * zlev (zlev) float32 4B 0.0\n", + " * lon (lon) float32 6kB 0.125 0.375 0.625 0.875 ... 359.4 359.6 359.9\n", + "Data variables:\n", + " anom (time, zlev, lat, lon) float64 33MB ...\n", + " ice (time, zlev, lat, lon) float64 33MB ...\n", + " err (time, zlev, lat, lon) float64 33MB ...\n", + " sst (time, zlev, lat, lon) float64 33MB ...\n", + "Attributes: (12/37)\n", + " Conventions: CF-1.6, ACDD-1.3\n", + " cdm_data_type: Grid\n", + " comment: Data was converted from NetCDF-3 to NetCDF-4 ...\n", + " creator_email: oisst-help@noaa.gov\n", + " creator_url: https://www.ncei.noaa.gov/\n", + " date_created: 2024-08-18T09:12:00Z\n", + " ... ...\n", + " source: ICOADS, NCEP_GTS, GSFC_ICE, NCEP_ICE, Pathfin...\n", + " standard_name_vocabulary: CF Standard Name Table (v40, 25 January 2017)\n", + " summary: NOAAs 1/4-degree Daily Optimum Interpolation ...\n", + " time_coverage_end: 2024-08-03T23:59:59Z\n", + " time_coverage_start: 2024-08-03T00:00:00Z\n", + " title: NOAA/NCEI 1/4 Degree Daily Optimum Interpolat..." + ] + }, + "execution_count": 19, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "ds = xr.open_zarr(read_store, consolidated=False, zarr_format=3)\n", + "ds" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "41808f96", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "virtualizarr", + "language": "python", + "name": "venv" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.7" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/pyproject.toml b/pyproject.toml index aaaade02..ecda9a16 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,6 +38,9 @@ hdf_reader = [ "imagecodecs-numcodecs==2024.6.1", "numcodecs" ] +icechunk = [ + "icechunk>=0.1.0a7", +] test = [ "codecov", "fastparquet", diff --git a/runtime.txt b/runtime.txt new file mode 100644 index 00000000..67ebc4e9 --- /dev/null +++ b/runtime.txt @@ -0,0 +1 @@ +python-3.11 diff --git a/virtualizarr/accessor.py b/virtualizarr/accessor.py index 336838f9..969d21af 100644 --- a/virtualizarr/accessor.py +++ b/virtualizarr/accessor.py @@ -1,10 +1,5 @@ from pathlib import Path -from typing import ( - TYPE_CHECKING, - Callable, - Literal, - overload, -) +from typing import TYPE_CHECKING, Callable, Literal, Optional, overload from xarray import Dataset, register_dataset_accessor @@ -43,19 +38,24 @@ def to_zarr(self, storepath: str) -> None: """ dataset_to_zarr(self.ds, storepath) - def to_icechunk(self, store: "IcechunkStore") -> None: + def to_icechunk( + self, store: "IcechunkStore", append_dim: Optional[str] = None + ) -> None: """ Write an xarray dataset to an Icechunk store. Any variables backed by ManifestArray objects will be be written as virtual references, any other variables will be loaded into memory before their binary chunk data is written into the store. + If `append_dim` is provided, the virtual dataset will be appended to the existing IcechunkStore along the `append_dim` dimension. + Parameters ---------- store: IcechunkStore + append_dim: str, optional """ from virtualizarr.writers.icechunk import dataset_to_icechunk - dataset_to_icechunk(self.ds, store) + dataset_to_icechunk(self.ds, store, append_dim=append_dim) @overload def to_kerchunk( diff --git a/virtualizarr/codecs.py b/virtualizarr/codecs.py new file mode 100644 index 00000000..ad2a3d9b --- /dev/null +++ b/virtualizarr/codecs.py @@ -0,0 +1,111 @@ +from typing import TYPE_CHECKING, Union + +from virtualizarr.zarr import Codec + +if TYPE_CHECKING: + from zarr import Array # type: ignore + from zarr.core.abc.codec import ( # type: ignore + ArrayArrayCodec, + ArrayBytesCodec, + BytesBytesCodec, + ) + + from .manifests.array import ManifestArray + + +def get_codecs( + array: Union["ManifestArray", "Array"], + normalize_to_zarr_v3: bool = False, +) -> Union[Codec, tuple["ArrayArrayCodec | ArrayBytesCodec | BytesBytesCodec", ...]]: + """ + Get the codecs for either a ManifestArray or a Zarr Array. + + Parameters: + array (Union[ManifestArray, ZarrArray]): The input array, either ManifestArray or Zarr Array. + + Returns: + List[Optional[Codec]]: A list of codecs or an empty list if no codecs are found. + + Raises: + ImportError: If `zarr` is required but not installed. + ValueError: If the array type is unsupported. + """ + if _is_manifest_array(array): + return _get_manifestarray_codecs(array, normalize_to_zarr_v3) # type: ignore[arg-type] + + if _is_zarr_array(array): + return _get_zarr_array_codecs(array, normalize_to_zarr_v3) # type: ignore[arg-type] + + raise ValueError("Unsupported array type or zarr is not installed.") + + +def _is_manifest_array(array: object) -> bool: + """Check if the array is an instance of ManifestArray.""" + try: + from .manifests.array import ManifestArray + + return isinstance(array, ManifestArray) + except ImportError: + return False + + +def _get_manifestarray_codecs( + array: "ManifestArray", + normalize_to_zarr_v3: bool = False, +) -> Union[Codec, tuple["ArrayArrayCodec | ArrayBytesCodec | BytesBytesCodec", ...]]: + """Get codecs for a ManifestArray based on its zarr_format.""" + if normalize_to_zarr_v3 or array.zarray.zarr_format == 3: + return array.zarray._v3_codec_pipeline() + elif array.zarray.zarr_format == 2: + return array.zarray.codec + else: + raise ValueError("Unsupported zarr_format for ManifestArray.") + + +def _is_zarr_array(array: object) -> bool: + """Check if the array is an instance of Zarr Array.""" + try: + from zarr import Array + + return isinstance(array, Array) + except ImportError: + return False + + +def _get_zarr_array_codecs( + array: "Array", + normalize_to_zarr_v3: bool = False, +) -> Union[Codec, tuple["ArrayArrayCodec | ArrayBytesCodec | BytesBytesCodec", ...]]: + """Get codecs for a Zarr Array based on its format.""" + import zarr + from packaging import version + + # Check that zarr-python v3 is installed + required_version = "3.0.0b" + installed_version = zarr.__version__ + if version.parse(installed_version) < version.parse(required_version): + raise NotImplementedError( + f"zarr-python v3 or higher is required, but version {installed_version} is installed." + ) + from zarr.core.metadata import ( # type: ignore[import-untyped] + ArrayV2Metadata, + ArrayV3Metadata, + ) + + # For zarr format v3 + if isinstance(array.metadata, ArrayV3Metadata): + return tuple(array.metadata.codecs) + # For zarr format v2 + elif isinstance(array.metadata, ArrayV2Metadata): + if normalize_to_zarr_v3: + # we could potentially normalize to v3 using ZArray._v3_codec_pipeline, but we don't have a use case for that. + raise NotImplementedError( + "Normalization to zarr v3 is not supported for zarr v2 array." + ) + else: + return Codec( + compressor=array.metadata.compressor, + filters=list(array.metadata.filters or ()), + ) + else: + raise ValueError("Unsupported zarr_format for Zarr Array.") diff --git a/virtualizarr/manifests/array_api.py b/virtualizarr/manifests/array_api.py index f5cf220b..4950b48c 100644 --- a/virtualizarr/manifests/array_api.py +++ b/virtualizarr/manifests/array_api.py @@ -1,10 +1,16 @@ -from typing import TYPE_CHECKING, Any, Callable, Iterable, cast +from typing import TYPE_CHECKING, Any, Callable, cast import numpy as np -from virtualizarr.zarr import Codec, determine_chunk_grid_shape +from virtualizarr.zarr import determine_chunk_grid_shape from .manifest import ChunkManifest +from .utils import ( + check_combinable_zarr_arrays, + check_same_ndims, + check_same_shapes, + check_same_shapes_except_on_concat_axis, +) if TYPE_CHECKING: from .array import ManifestArray @@ -25,55 +31,6 @@ def decorator(func): return decorator -def _check_combineable_zarr_arrays(arrays: Iterable["ManifestArray"]) -> None: - """ - The downside of the ManifestArray approach compared to the VirtualZarrArray concatenation proposal is that - the result must also be a single valid zarr array, implying that the inputs must have the same dtype, codec etc. - """ - _check_same_dtypes([arr.dtype for arr in arrays]) - - # Can't combine different codecs in one manifest - # see https://github.com/zarr-developers/zarr-specs/issues/288 - _check_same_codecs([arr.zarray.codec for arr in arrays]) - - # Would require variable-length chunks ZEP - _check_same_chunk_shapes([arr.chunks for arr in arrays]) - - -def _check_same_dtypes(dtypes: list[np.dtype]) -> None: - """Check all the dtypes are the same""" - - first_dtype, *other_dtypes = dtypes - for other_dtype in other_dtypes: - if other_dtype != first_dtype: - raise ValueError( - f"Cannot concatenate arrays with inconsistent dtypes: {other_dtype} vs {first_dtype}" - ) - - -def _check_same_codecs(codecs: list[Codec]) -> None: - first_codec, *other_codecs = codecs - for codec in other_codecs: - if codec != first_codec: - raise NotImplementedError( - "The ManifestArray class cannot concatenate arrays which were stored using different codecs, " - f"But found codecs {first_codec} vs {codec} ." - "See https://github.com/zarr-developers/zarr-specs/issues/288" - ) - - -def _check_same_chunk_shapes(chunks_list: list[tuple[int, ...]]) -> None: - """Check all the chunk shapes are the same""" - - first_chunks, *other_chunks_list = chunks_list - for other_chunks in other_chunks_list: - if other_chunks != first_chunks: - raise ValueError( - f"Cannot concatenate arrays with inconsistent chunk shapes: {other_chunks} vs {first_chunks} ." - "Requires ZEP003 (Variable-length Chunks)." - ) - - @implements(np.result_type) def result_type(*arrays_and_dtypes) -> np.dtype: """Called by xarray to ensure all arguments to concat have the same dtype.""" @@ -106,9 +63,9 @@ def concatenate( raise TypeError() # ensure dtypes, shapes, codecs etc. are consistent - _check_combineable_zarr_arrays(arrays) + check_combinable_zarr_arrays(arrays) - _check_same_ndims([arr.ndim for arr in arrays]) + check_same_ndims([arr.ndim for arr in arrays]) # Ensure we handle axis being passed as a negative integer first_arr = arrays[0] @@ -116,7 +73,7 @@ def concatenate( axis = axis % first_arr.ndim arr_shapes = [arr.shape for arr in arrays] - _check_same_shapes_except_on_concat_axis(arr_shapes, axis) + check_same_shapes_except_on_concat_axis(arr_shapes, axis) # find what new array shape must be new_length_along_concat_axis = sum([shape[axis] for shape in arr_shapes]) @@ -151,36 +108,6 @@ def concatenate( return ManifestArray(chunkmanifest=concatenated_manifest, zarray=new_zarray) -def _check_same_ndims(ndims: list[int]) -> None: - first_ndim, *other_ndims = ndims - for other_ndim in other_ndims: - if other_ndim != first_ndim: - raise ValueError( - f"Cannot concatenate arrays with differing number of dimensions: {first_ndim} vs {other_ndim}" - ) - - -def _check_same_shapes_except_on_concat_axis(shapes: list[tuple[int, ...]], axis: int): - """Check that shapes are compatible for concatenation""" - - shapes_without_concat_axis = [ - _remove_element_at_position(shape, axis) for shape in shapes - ] - - first_shape, *other_shapes = shapes_without_concat_axis - for other_shape in other_shapes: - if other_shape != first_shape: - raise ValueError( - f"Cannot concatenate arrays with shapes {[shape for shape in shapes]}" - ) - - -def _remove_element_at_position(t: tuple[int, ...], pos: int) -> tuple[int, ...]: - new_l = list(t) - new_l.pop(pos) - return tuple(new_l) - - @implements(np.stack) def stack( arrays: tuple["ManifestArray", ...] | list["ManifestArray"], @@ -199,11 +126,11 @@ def stack( raise TypeError() # ensure dtypes, shapes, codecs etc. are consistent - _check_combineable_zarr_arrays(arrays) + check_combinable_zarr_arrays(arrays) - _check_same_ndims([arr.ndim for arr in arrays]) + check_same_ndims([arr.ndim for arr in arrays]) arr_shapes = [arr.shape for arr in arrays] - _check_same_shapes(arr_shapes) + check_same_shapes(arr_shapes) # Ensure we handle axis being passed as a negative integer first_arr = arrays[0] @@ -251,15 +178,6 @@ def stack( return ManifestArray(chunkmanifest=stacked_manifest, zarray=new_zarray) -def _check_same_shapes(shapes: list[tuple[int, ...]]) -> None: - first_shape, *other_shapes = shapes - for other_shape in other_shapes: - if other_shape != first_shape: - raise ValueError( - f"Cannot concatenate arrays with differing shapes: {first_shape} vs {other_shape}" - ) - - @implements(np.expand_dims) def expand_dims(x: "ManifestArray", /, *, axis: int = 0) -> "ManifestArray": """Expands the shape of an array by inserting a new axis (dimension) of size one at the position specified by axis.""" diff --git a/virtualizarr/manifests/utils.py b/virtualizarr/manifests/utils.py new file mode 100644 index 00000000..07cf2baf --- /dev/null +++ b/virtualizarr/manifests/utils.py @@ -0,0 +1,118 @@ +from typing import TYPE_CHECKING, Any, Iterable, Union + +import numpy as np + +from virtualizarr.codecs import get_codecs + +if TYPE_CHECKING: + from zarr import Array # type: ignore + + from .array import ManifestArray + + +def check_same_dtypes(dtypes: list[np.dtype]) -> None: + """Check all the dtypes are the same""" + + first_dtype, *other_dtypes = dtypes + for other_dtype in other_dtypes: + if other_dtype != first_dtype: + raise ValueError( + f"Cannot concatenate arrays with inconsistent dtypes: {other_dtype} vs {first_dtype}" + ) + + +def check_compatible_encodings(encoding1, encoding2): + for key, value in encoding1.items(): + if key in encoding2: + if encoding2[key] != value: + raise ValueError( + f"Cannot concatenate arrays with different values for encoding key {key}: {encoding2[key]} != {value}" + ) + + +def check_same_codecs(codecs: list[Any]) -> None: + first_codec, *other_codecs = codecs + for codec in other_codecs: + if codec != first_codec: + raise NotImplementedError( + "The ManifestArray class cannot concatenate arrays which were stored using different codecs, " + f"But found codecs {first_codec} vs {codec} ." + "See https://github.com/zarr-developers/zarr-specs/issues/288" + ) + + +def check_same_chunk_shapes(chunks_list: list[tuple[int, ...]]) -> None: + """Check all the chunk shapes are the same""" + + first_chunks, *other_chunks_list = chunks_list + for other_chunks in other_chunks_list: + if other_chunks != first_chunks: + raise ValueError( + f"Cannot concatenate arrays with inconsistent chunk shapes: {other_chunks} vs {first_chunks} ." + "Requires ZEP003 (Variable-length Chunks)." + ) + + +def check_same_ndims(ndims: list[int]) -> None: + first_ndim, *other_ndims = ndims + for other_ndim in other_ndims: + if other_ndim != first_ndim: + raise ValueError( + f"Cannot concatenate arrays with differing number of dimensions: {first_ndim} vs {other_ndim}" + ) + + +def check_same_shapes(shapes: list[tuple[int, ...]]) -> None: + first_shape, *other_shapes = shapes + for other_shape in other_shapes: + if other_shape != first_shape: + raise ValueError( + f"Cannot concatenate arrays with differing shapes: {first_shape} vs {other_shape}" + ) + + +def _remove_element_at_position(t: tuple[int, ...], pos: int) -> tuple[int, ...]: + new_l = list(t) + new_l.pop(pos) + return tuple(new_l) + + +def check_same_shapes_except_on_concat_axis(shapes: list[tuple[int, ...]], axis: int): + """Check that shapes are compatible for concatenation""" + + shapes_without_concat_axis = [ + _remove_element_at_position(shape, axis) for shape in shapes + ] + + first_shape, *other_shapes = shapes_without_concat_axis + for other_shape in other_shapes: + if other_shape != first_shape: + raise ValueError( + f"Cannot concatenate arrays with shapes {[shape for shape in shapes]}" + ) + + +def check_combinable_zarr_arrays( + arrays: Iterable[Union["ManifestArray", "Array"]], +) -> None: + """ + The downside of the ManifestArray approach compared to the VirtualZarrArray concatenation proposal is that + the result must also be a single valid zarr array, implying that the inputs must have the same dtype, codec etc. + """ + check_same_dtypes([arr.dtype for arr in arrays]) + + # Can't combine different codecs in one manifest + # see https://github.com/zarr-developers/zarr-specs/issues/288 + check_same_codecs([get_codecs(arr) for arr in arrays]) + + # Would require variable-length chunks ZEP + check_same_chunk_shapes([arr.chunks for arr in arrays]) + + +def check_compatible_arrays( + ma: "ManifestArray", existing_array: "Array", append_axis: int +): + check_combinable_zarr_arrays([ma, existing_array]) + check_same_ndims([ma.ndim, existing_array.ndim]) + arr_shapes = [ma.shape, existing_array.shape] + check_same_shapes_except_on_concat_axis(arr_shapes, append_axis) diff --git a/virtualizarr/tests/__init__.py b/virtualizarr/tests/__init__.py index b6884782..658cf640 100644 --- a/virtualizarr/tests/__init__.py +++ b/virtualizarr/tests/__init__.py @@ -39,6 +39,8 @@ def _importorskip( has_tifffile, requires_tifffile = _importorskip("tifffile") has_imagecodecs, requires_imagecodecs = _importorskip("imagecodecs") has_hdf5plugin, requires_hdf5plugin = _importorskip("hdf5plugin") +has_zarr_python, requires_zarr_python = _importorskip("zarr") +has_zarr_python_v3, requires_zarr_python_v3 = _importorskip("zarr", "3.0.0b") def create_manifestarray( diff --git a/virtualizarr/tests/test_codecs.py b/virtualizarr/tests/test_codecs.py new file mode 100644 index 00000000..41d16b5b --- /dev/null +++ b/virtualizarr/tests/test_codecs.py @@ -0,0 +1,159 @@ +from unittest.mock import patch + +import numpy as np +import pytest +from numcodecs import Blosc, Delta + +from virtualizarr import ChunkManifest, ManifestArray +from virtualizarr.codecs import get_codecs +from virtualizarr.tests import ( + requires_zarr_python, + requires_zarr_python_v3, +) +from virtualizarr.zarr import Codec + + +class TestCodecs: + def create_manifest_array(self, compressor=None, filters=None, zarr_format=2): + return ManifestArray( + chunkmanifest=ChunkManifest( + entries={"0.0": dict(path="/test.nc", offset=6144, length=48)} + ), + zarray=dict( + shape=(2, 3), + dtype=np.dtype(" "StorageConfig": + from icechunk import StorageConfig + + storage = StorageConfig.filesystem(str(tmpdir)) + + # TODO instead yield store then store.close() ?? + return storage + + +def generate_chunk_manifest( + netcdf4_file: str, + shape: tuple[int, ...], + chunks: tuple[int, ...], + offset=6144, + length=48, +) -> ChunkManifest: + chunk_dict = {} + num_chunks = [shape[i] // chunks[i] for i in range(len(shape))] + offset = offset + + # Generate all possible chunk indices using Cartesian product + for chunk_indices in product(*[range(n) for n in num_chunks]): + chunk_index = ".".join(map(str, chunk_indices)) + chunk_dict[chunk_index] = { + "path": netcdf4_file, + "offset": offset, + "length": length, + } + offset += length # Increase offset for each chunk + + return ChunkManifest(chunk_dict) + + +def gen_virtual_variable( + file_uri: str, + shape: tuple[int, ...] = (3, 4), + chunk_shape: tuple[int, ...] = (3, 4), + dtype: np.dtype = np.dtype("int32"), + compressor: Optional[dict] = None, + filters: Optional[list[dict[Any, Any]]] = None, + fill_value: Optional[str] = None, + encoding: Optional[dict] = None, + offset: int = 6144, + length: int = 48, + dims: list[str] = [], + zarr_format: Literal[2, 3] = 2, + attrs: dict[str, Any] = {}, +) -> Variable: + manifest = generate_chunk_manifest( + file_uri, + shape=shape, + chunks=chunk_shape, + offset=offset, + length=length, + ) + zarray = ZArray( + shape=shape, + chunks=chunk_shape, + dtype=dtype, + compressor=compressor, + filters=filters, + fill_value=fill_value, + zarr_format=zarr_format, + ) + ma = ManifestArray(chunkmanifest=manifest, zarray=zarray) + return Variable( + data=ma, + dims=dims, + encoding=encoding, + attrs=attrs, + ) + + +def gen_virtual_dataset( + file_uri: str, + shape: tuple[int, ...] = (3, 4), + chunk_shape: tuple[int, ...] = (3, 4), + dtype: np.dtype = np.dtype("int32"), + compressor: Optional[dict] = None, + filters: Optional[list[dict[Any, Any]]] = None, + fill_value: Optional[str] = None, + encoding: Optional[dict] = None, + variable_name: str = "foo", + offset: int = 6144, + length: int = 48, + dims: Optional[list[str]] = None, + zarr_format: Literal[2, 3] = 2, + coords: Optional[Coordinates] = None, +) -> Dataset: + ds = open_dataset(file_uri) + ds_dims: list[str] = cast(list[str], list(ds.dims)) + dims = dims or ds_dims + var = gen_virtual_variable( + file_uri, + shape=shape, + chunk_shape=chunk_shape, + dtype=dtype, + compressor=compressor, + filters=filters, + fill_value=fill_value, + encoding=encoding, + offset=offset, + length=length, + dims=dims, + zarr_format=zarr_format, + attrs=ds[variable_name].attrs, + ) + return Dataset( + {variable_name: var}, + coords=coords, + attrs=ds.attrs, + ) + + +class TestAppend: + """ + Tests for appending to existing icechunk store. + """ + + # Success cases + ## When appending to a single virtual ref without encoding, it succeeds + def test_append_virtual_ref_without_encoding( + self, icechunk_storage: "StorageConfig", simple_netcdf4: str + ): + import xarray.testing as xrt + from icechunk import IcechunkStore + + # generate virtual dataset + vds = gen_virtual_dataset(file_uri=simple_netcdf4) + # create the icechunk store and commit the first virtual dataset + icechunk_filestore = IcechunkStore.create(storage=icechunk_storage) + dataset_to_icechunk(vds, icechunk_filestore) + icechunk_filestore.commit( + "test commit" + ) # need to commit it in order to append to it in the next lines + + # Append the same dataset to the same store + icechunk_filestore_append = IcechunkStore.open_existing( + storage=icechunk_storage, read_only=False + ) + dataset_to_icechunk(vds, icechunk_filestore_append, append_dim="x") + icechunk_filestore_append.commit("appended data") + dataset_to_icechunk(vds, icechunk_filestore_append, append_dim="x") + icechunk_filestore_append.commit("appended data again") + array = open_zarr(icechunk_filestore_append, consolidated=False, zarr_format=3) + + expected_ds = open_dataset(simple_netcdf4) + expected_array = concat([expected_ds, expected_ds, expected_ds], dim="x") + xrt.assert_identical(array, expected_array) + + def test_append_virtual_ref_with_encoding( + self, icechunk_storage: "StorageConfig", netcdf4_files_factory: Callable + ): + import xarray.testing as xrt + from icechunk import IcechunkStore + + scale_factor = 0.01 + encoding = {"air": {"scale_factor": scale_factor}} + filepath1, filepath2 = netcdf4_files_factory(encoding=encoding) + + vds1, vds2 = ( + gen_virtual_dataset( + file_uri=filepath1, + shape=(1460, 25, 53), + chunk_shape=(1460, 25, 53), + dims=["time", "lat", "lon"], + dtype=np.dtype("float64"), + variable_name="air", + encoding={"scale_factor": scale_factor}, + offset=15419, + length=15476000, + ), + gen_virtual_dataset( + file_uri=filepath2, + shape=(1460, 25, 53), + chunk_shape=(1460, 25, 53), + dims=["time", "lat", "lon"], + dtype=np.dtype("float64"), + variable_name="air", + encoding={"scale_factor": scale_factor}, + offset=15419, + length=15476000, + ), + ) + + # create the icechunk store and commit the first virtual dataset + icechunk_filestore = IcechunkStore.create(storage=icechunk_storage) + dataset_to_icechunk(vds1, icechunk_filestore) + icechunk_filestore.commit( + "test commit" + ) # need to commit it in order to append to it in the next lines + + # Append the same dataset to the same store + icechunk_filestore_append = IcechunkStore.open_existing( + storage=icechunk_storage, read_only=False + ) + dataset_to_icechunk(vds2, icechunk_filestore_append, append_dim="time") + icechunk_filestore_append.commit("appended data") + new_ds = open_zarr(icechunk_filestore_append, consolidated=False, zarr_format=3) + + expected_ds1, expected_ds2 = open_dataset(filepath1), open_dataset(filepath2) + expected_ds = concat([expected_ds1, expected_ds2], dim="time").drop_vars( + ["time", "lat", "lon"], errors="ignore" + ) + # Because we encode attributes, attributes may differ, for example + # actual_range for expected_ds.air is array([185.16, 322.1 ], dtype=float32) + # but encoded it is [185.16000366210935, 322.1000061035156] + xrt.assert_equal(new_ds, expected_ds) + + ## When appending to a virtual ref with encoding, it succeeds + @pytest.mark.asyncio + async def test_append_with_multiple_root_arrays( + self, icechunk_storage: "StorageConfig", netcdf4_files_factory: Callable + ): + import xarray.testing as xrt + from icechunk import IcechunkStore + + filepath1, filepath2 = netcdf4_files_factory( + encoding={"air": {"dtype": "float64", "chunksizes": (1460, 25, 53)}} + ) + + lon_manifest = gen_virtual_variable( + filepath1, + shape=(53,), + chunk_shape=(53,), + dtype=np.dtype("float32"), + offset=5279, + length=212, + dims=["lon"], + ) + lat_manifest = gen_virtual_variable( + filepath1, + shape=(25,), + chunk_shape=(25,), + dtype=np.dtype("float32"), + offset=5179, + length=100, + dims=["lat"], + ) + time_attrs = { + "standard_name": "time", + "long_name": "Time", + "units": "hours since 1800-01-01", + "calendar": "standard", + } + time_manifest1, time_manifest2 = [ + gen_virtual_variable( + filepath, + shape=(1460,), + chunk_shape=(1460,), + dtype=np.dtype("float32"), + offset=15498221, + length=5840, + dims=["time"], + attrs=time_attrs, + ) + for filepath in [filepath1, filepath2] + ] + [[_, coords1], [_, coords2]] = [ + separate_coords( + vars={"time": time_manifest, "lat": lat_manifest, "lon": lon_manifest}, + indexes={}, + coord_names=[], + ) + for time_manifest in [time_manifest1, time_manifest2] + ] + vds1, vds2 = ( + gen_virtual_dataset( + file_uri=filepath1, + shape=(1460, 25, 53), + chunk_shape=(1460, 25, 53), + dims=["time", "lat", "lon"], + dtype=np.dtype("float64"), + variable_name="air", + offset=18043, + length=15476000, + coords=coords1, + ), + gen_virtual_dataset( + file_uri=filepath2, + shape=(1460, 25, 53), + chunk_shape=(1460, 25, 53), + dims=["time", "lat", "lon"], + dtype=np.dtype("float64"), + variable_name="air", + offset=18043, + length=15476000, + coords=coords2, + ), + ) + + # create the icechunk store and commit the first virtual dataset + icechunk_filestore = IcechunkStore.create(storage=icechunk_storage) + dataset_to_icechunk(vds1, icechunk_filestore) + icechunk_filestore.commit( + "test commit" + ) # need to commit it in order to append to it in the next lines + new_ds = open_zarr(icechunk_filestore, consolidated=False, zarr_format=3) + first_time_chunk_before_append = await icechunk_filestore._store.get("time/c/0") + + # Append the same dataset to the same store + icechunk_filestore_append = IcechunkStore.open_existing( + storage=icechunk_storage, read_only=False + ) + dataset_to_icechunk(vds2, icechunk_filestore_append, append_dim="time") + icechunk_filestore_append.commit("appended data") + assert ( + await icechunk_filestore_append._store.get("time/c/0") + ) == first_time_chunk_before_append + new_ds = open_zarr(icechunk_filestore_append, consolidated=False, zarr_format=3) + + expected_ds1, expected_ds2 = open_dataset(filepath1), open_dataset(filepath2) + expected_ds = concat([expected_ds1, expected_ds2], dim="time") + xrt.assert_equal(new_ds, expected_ds) + + # When appending to a virtual ref with compression, it succeeds + @pytest.mark.parametrize("zarr_format", [2, 3]) + def test_append_with_compression_succeeds( + self, + icechunk_storage: "StorageConfig", + netcdf4_files_factory: Callable, + zarr_format: Literal[2, 3], + ): + import xarray.testing as xrt + from icechunk import IcechunkStore + + encoding = { + "air": { + "zlib": True, + "complevel": 4, + "chunksizes": (1460, 25, 53), + "shuffle": False, + } + } + file1, file2 = netcdf4_files_factory(encoding=encoding) + # Generate compressed dataset + vds1, vds2 = ( + gen_virtual_dataset( + file_uri=file1, + shape=(1460, 25, 53), + chunk_shape=(1460, 25, 53), + compressor={"id": "zlib", "level": 4}, + dims=["time", "lat", "lon"], + dtype=np.dtype("float64"), + variable_name="air", + offset=18043, + length=3936114, + zarr_format=zarr_format, + ), + gen_virtual_dataset( + file_uri=file2, + shape=(1460, 25, 53), + chunk_shape=(1460, 25, 53), + compressor={"id": "zlib", "level": 4}, + dims=["time", "lat", "lon"], + dtype=np.dtype("float64"), + variable_name="air", + offset=18043, + length=3938672, + zarr_format=zarr_format, + ), + ) + + # Create icechunk store and commit the compressed dataset + icechunk_filestore = IcechunkStore.create(storage=icechunk_storage) + dataset_to_icechunk(vds1, icechunk_filestore) + icechunk_filestore.commit("test commit") + + # Append another dataset with compatible compression + icechunk_filestore_append = IcechunkStore.open_existing( + storage=icechunk_storage, read_only=False + ) + dataset_to_icechunk(vds2, icechunk_filestore_append, append_dim="time") + icechunk_filestore_append.commit("appended data") + updated_ds = open_zarr( + store=icechunk_filestore_append, consolidated=False, zarr_format=3 + ) + + expected_ds1, expected_ds2 = open_dataset(file1), open_dataset(file2) + expected_ds = concat([expected_ds1, expected_ds2], dim="time") + expected_ds = expected_ds.drop_vars(["lon", "lat", "time"], errors="ignore") + xrt.assert_equal(updated_ds, expected_ds) + + ## When chunk shapes are different it fails + def test_append_with_different_chunking_fails( + self, icechunk_storage: "StorageConfig", simple_netcdf4: str + ): + from icechunk import IcechunkStore + + # Generate a virtual dataset with specific chunking + vds = gen_virtual_dataset(file_uri=simple_netcdf4, chunk_shape=(3, 4)) + + # Create icechunk store and commit the dataset + icechunk_filestore = IcechunkStore.create(storage=icechunk_storage) + dataset_to_icechunk(vds, icechunk_filestore) + icechunk_filestore.commit("test commit") + + # Try to append dataset with different chunking, expect failure + vds_different_chunking = gen_virtual_dataset( + file_uri=simple_netcdf4, chunk_shape=(1, 1) + ) + icechunk_filestore_append = IcechunkStore.open_existing( + storage=icechunk_storage, read_only=False + ) + with pytest.raises( + ValueError, match="Cannot concatenate arrays with inconsistent chunk shapes" + ): + dataset_to_icechunk( + vds_different_chunking, icechunk_filestore_append, append_dim="x" + ) + + ## When encoding is different it fails + def test_append_with_different_encoding_fails( + self, icechunk_storage: "StorageConfig", simple_netcdf4: str + ): + from icechunk import IcechunkStore + + # Generate datasets with different encoding + vds1 = gen_virtual_dataset( + file_uri=simple_netcdf4, encoding={"scale_factor": 0.1} + ) + vds2 = gen_virtual_dataset( + file_uri=simple_netcdf4, encoding={"scale_factor": 0.01} + ) + + # Create icechunk store and commit the first dataset + icechunk_filestore = IcechunkStore.create(storage=icechunk_storage) + dataset_to_icechunk(vds1, icechunk_filestore) + icechunk_filestore.commit("test commit") + + # Try to append with different encoding, expect failure + icechunk_filestore_append = IcechunkStore.open_existing( + storage=icechunk_storage, read_only=False + ) + with pytest.raises( + ValueError, + match="Cannot concatenate arrays with different values for encoding", + ): + dataset_to_icechunk(vds2, icechunk_filestore_append, append_dim="x") + + def test_dimensions_do_not_align( + self, icechunk_storage: "StorageConfig", simple_netcdf4: str + ): + from icechunk import IcechunkStore + + # Generate datasets with different lengths on the non-append dimension (x) + vds1 = gen_virtual_dataset( + # {'x': 5, 'y': 4} + file_uri=simple_netcdf4, + shape=(5, 4), + ) + vds2 = gen_virtual_dataset( + # {'x': 6, 'y': 4} + file_uri=simple_netcdf4, + shape=(6, 4), + ) + + # Create icechunk store and commit the first dataset + icechunk_filestore = IcechunkStore.create(storage=icechunk_storage) + dataset_to_icechunk(vds1, icechunk_filestore) + icechunk_filestore.commit("test commit") + + # Attempt to append dataset with different length in non-append dimension, expect failure + icechunk_filestore_append = IcechunkStore.open_existing( + storage=icechunk_storage, read_only=False + ) + with pytest.raises(ValueError, match="Cannot concatenate arrays with shapes"): + dataset_to_icechunk(vds2, icechunk_filestore_append, append_dim="y") + + def test_append_dim_not_in_dims_raises_error( + self, icechunk_storage: "StorageConfig", simple_netcdf4: str + ): + """ + Test that attempting to append with an append_dim not present in dims raises a ValueError. + """ + from icechunk import IcechunkStore + + vds = gen_virtual_dataset( + file_uri=simple_netcdf4, shape=(5, 4), chunk_shape=(5, 4), dims=["x", "y"] + ) + + icechunk_filestore = IcechunkStore.create(storage=icechunk_storage) + dataset_to_icechunk(vds, icechunk_filestore) + icechunk_filestore.commit("initial commit") + + # Attempt to append using a non-existent append_dim "z" + icechunk_filestore_append = IcechunkStore.open_existing( + storage=icechunk_storage, read_only=False + ) + with pytest.raises( + ValueError, + match="append_dim z does not match any existing dataset dimensions", + ): + dataset_to_icechunk(vds, icechunk_filestore_append, append_dim="z") + + # TODO test writing to a group that isn't the root group # TODO test with S3 / minio diff --git a/virtualizarr/tests/test_xarray.py b/virtualizarr/tests/test_xarray.py index dec0eb92..5fe22fea 100644 --- a/virtualizarr/tests/test_xarray.py +++ b/virtualizarr/tests/test_xarray.py @@ -1,3 +1,5 @@ +from typing import Callable + import numpy as np import pytest import xarray as xr @@ -227,8 +229,8 @@ def test_concat_dim_coords_along_existing_dim(self): @requires_kerchunk @pytest.mark.parametrize("hdf_backend", [None, HDFVirtualBackend]) class TestCombineUsingIndexes: - def test_combine_by_coords(self, netcdf4_files, hdf_backend): - filepath1, filepath2 = netcdf4_files + def test_combine_by_coords(self, netcdf4_files_factory: Callable, hdf_backend): + filepath1, filepath2 = netcdf4_files_factory() with pytest.warns(UserWarning, match="will create in-memory pandas indexes"): vds1 = open_virtual_dataset(filepath1, backend=hdf_backend) diff --git a/virtualizarr/writers/icechunk.py b/virtualizarr/writers/icechunk.py index ae86eed4..03c5be57 100644 --- a/virtualizarr/writers/icechunk.py +++ b/virtualizarr/writers/icechunk.py @@ -1,19 +1,30 @@ -from typing import TYPE_CHECKING, cast +from typing import TYPE_CHECKING, List, Optional, Union, cast import numpy as np from xarray import Dataset from xarray.backends.zarr import encode_zarr_attr_value from xarray.core.variable import Variable +from virtualizarr.codecs import get_codecs from virtualizarr.manifests import ChunkManifest, ManifestArray +from virtualizarr.manifests.utils import ( + check_compatible_encodings, + check_same_chunk_shapes, + check_same_codecs, + check_same_dtypes, + check_same_ndims, + check_same_shapes_except_on_concat_axis, +) from virtualizarr.zarr import encode_dtype if TYPE_CHECKING: from icechunk import IcechunkStore # type: ignore[import-not-found] - from zarr import Group # type: ignore + from zarr import Array, Group # type: ignore -def dataset_to_icechunk(ds: Dataset, store: "IcechunkStore") -> None: +def dataset_to_icechunk( + ds: Dataset, store: "IcechunkStore", append_dim: Optional[str] = None +) -> None: """ Write an xarray dataset whose variables wrap ManifestArrays to an Icechunk store. @@ -40,7 +51,14 @@ def dataset_to_icechunk(ds: Dataset, store: "IcechunkStore") -> None: # TODO only supports writing to the root group currently # TODO pass zarr_format kwarg? - root_group = Group.from_store(store=store) + if append_dim is not None: + if append_dim not in ds.dims: + raise ValueError( + f"append_dim {append_dim} does not match any existing dataset dimensions" + ) + root_group = Group.open(store=store, zarr_format=3) + else: + root_group = Group.from_store(store=store) # TODO this is Frozen, the API for setting attributes must be something else # root_group.attrs = ds.attrs @@ -52,6 +70,7 @@ def dataset_to_icechunk(ds: Dataset, store: "IcechunkStore") -> None: ds.attrs, store=store, group=root_group, + append_dim=append_dim, ) @@ -60,6 +79,7 @@ def write_variables_to_icechunk_group( attrs, store, group, + append_dim: Optional[str] = None, ): virtual_variables = { name: var @@ -76,7 +96,9 @@ def write_variables_to_icechunk_group( # will overwrite the root group's attributes with the dataset's attributes. We take advantage # of xarrays zarr integration to ignore having to format the attributes ourselves. ds = Dataset(loadable_variables, attrs=attrs) - ds.to_zarr(store, zarr_format=3, consolidated=False, mode="a") + ds.to_zarr( + store, zarr_format=3, consolidated=False, mode="a", append_dim=append_dim + ) # Then finish by writing the virtual variables to the same group for name, var in virtual_variables.items(): @@ -85,27 +107,46 @@ def write_variables_to_icechunk_group( group=group, name=name, var=var, + append_dim=append_dim, ) -def write_variable_to_icechunk( - store: "IcechunkStore", - group: "Group", - name: str, - var: Variable, +def num_chunks( + array, + axis: int, +): + return array.shape[axis] // array.chunks[axis] + + +def resize_array( + arr: "Array", + manifest_array: "ManifestArray", + append_axis: int, ) -> None: - """Write a single (possibly virtual) variable into an icechunk store""" - if isinstance(var.data, ManifestArray): - write_virtual_variable_to_icechunk( - store=store, - group=group, - name=name, - var=var, - ) - else: - raise ValueError( - "Cannot write non-virtual variables as virtual variables to Icechunk stores" - ) + new_shape = list(arr.shape) + new_shape[append_axis] += manifest_array.shape[append_axis] + arr.resize(tuple(new_shape)) + + +def get_axis( + dims: list[str], + dim_name: Optional[str], +) -> int: + if dim_name is None: + raise ValueError("dim_name must be provided") + return dims.index(dim_name) + + +def check_compatible_arrays( + ma: "ManifestArray", existing_array: "Array", append_axis: int +): + arrays: List[Union[ManifestArray, Array]] = [ma, existing_array] + check_same_dtypes([arr.dtype for arr in arrays]) + check_same_codecs([get_codecs(arr, normalize_to_zarr_v3=True) for arr in arrays]) + check_same_chunk_shapes([arr.chunks for arr in arrays]) + check_same_ndims([ma.ndim, existing_array.ndim]) + arr_shapes = [ma.shape, existing_array.shape] + check_same_shapes_except_on_concat_axis(arr_shapes, append_axis) def write_virtual_variable_to_icechunk( @@ -113,37 +154,86 @@ def write_virtual_variable_to_icechunk( group: "Group", name: str, var: Variable, + append_dim: Optional[str] = None, ) -> None: """Write a single virtual variable into an icechunk store""" + from zarr import Array + ma = cast(ManifestArray, var.data) zarray = ma.zarray - # creates array if it doesn't already exist - arr = group.require_array( - name=name, - shape=zarray.shape, - chunk_shape=zarray.chunks, - dtype=encode_dtype(zarray.dtype), - codecs=zarray._v3_codec_pipeline(), - dimension_names=var.dims, - fill_value=zarray.fill_value, - # TODO fill_value? - ) + dims: list[str] = cast(list[str], list(var.dims)) + existing_num_chunks = 0 + if append_dim and append_dim in dims: + # TODO: MRP - zarr, or icechunk zarr, array assignment to a variable doesn't work to point to the same object + # for example, if you resize an array, it resizes the array but not the bound variable. + if not isinstance(group[name], Array): + raise ValueError("Expected existing array to be a zarr.core.Array") + append_axis = get_axis(dims, append_dim) + + # check if arrays can be concatenated + check_compatible_arrays(ma, group[name], append_axis) # type: ignore[arg-type] + check_compatible_encodings(var.encoding, group[name].attrs) + + # determine number of existing chunks along the append axis + existing_num_chunks = num_chunks( + array=group[name], + axis=append_axis, + ) + + # resize the array + resize_array( + group[name], # type: ignore[arg-type] + manifest_array=ma, + append_axis=append_axis, + ) + else: + append_axis = None + # create array if it doesn't already exist - # TODO it would be nice if we could assign directly to the .attrs property - for k, v in var.attrs.items(): - arr.attrs[k] = encode_zarr_attr_value(v) + arr = group.require_array( + name=name, + shape=zarray.shape, + chunk_shape=zarray.chunks, + dtype=encode_dtype(zarray.dtype), + codecs=zarray._v3_codec_pipeline(), + dimension_names=var.dims, + fill_value=zarray.fill_value, + ) - _encoding_keys = {"_FillValue", "missing_value", "scale_factor", "add_offset"} - for k, v in var.encoding.items(): - if k in _encoding_keys: + # TODO it would be nice if we could assign directly to the .attrs property + for k, v in var.attrs.items(): arr.attrs[k] = encode_zarr_attr_value(v) + _encoding_keys = {"_FillValue", "missing_value", "scale_factor", "add_offset"} + for k, v in var.encoding.items(): + if k in _encoding_keys: + arr.attrs[k] = encode_zarr_attr_value(v) + write_manifest_virtual_refs( store=store, group=group, arr_name=name, manifest=ma.manifest, + append_axis=append_axis, + existing_num_chunks=existing_num_chunks, + ) + + +def generate_chunk_key( + index: tuple[int, ...], + append_axis: Optional[int] = None, + existing_num_chunks: Optional[int] = None, +) -> str: + if append_axis and append_axis >= len(index): + raise ValueError( + f"append_axis {append_axis} is greater than the number of indices {len(index)}" + ) + return "/".join( + str(ind + existing_num_chunks) + if axis is append_axis and existing_num_chunks is not None + else str(ind) + for axis, ind in enumerate(index) ) @@ -152,6 +242,8 @@ def write_manifest_virtual_refs( group: "Group", arr_name: str, manifest: ChunkManifest, + append_axis: Optional[int] = None, + existing_num_chunks: Optional[int] = None, ) -> None: """Write all the virtual references for one array manifest at once.""" @@ -170,9 +262,11 @@ def write_manifest_virtual_refs( ], op_flags=[["readonly"]] * 3, # type: ignore ) + for path, offset, length in it: + # it.multi_index will be an iterator of the chunk shape index = it.multi_index - chunk_key = "/".join(str(i) for i in index) + chunk_key = generate_chunk_key(index, append_axis, existing_num_chunks) # set each reference individually store.set_virtual_ref(