From b2940ecefe74b4cbe081fc5f9195b856b7aeb38a Mon Sep 17 00:00:00 2001 From: Stephan Hoyer Date: Wed, 31 Aug 2022 13:58:36 -0700 Subject: [PATCH] Omit unchunked dimensions from Key objects created with DatasetToChunks This allows for splitting datasets across variables even when those variables have different dimensions. See the new integration test for a concrete use-case, resembling real model output. Also revise the warning message in the README to be a bit friendlier. Fixes https://github.com/google/xarray-beam/issues/43 PiperOrigin-RevId: 471347485 --- README.md | 22 ++++++++++++---------- setup.py | 2 +- xarray_beam/__init__.py | 2 +- xarray_beam/_src/core.py | 3 ++- xarray_beam/_src/core_test.py | 18 ++++++++++++++++++ xarray_beam/_src/integration_test.py | 20 ++++++++++++++++++++ xarray_beam/_src/zarr_test.py | 7 +++++-- 7 files changed, 59 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index 9b13c2d..e7fb3b5 100644 --- a/README.md +++ b/README.md @@ -20,20 +20,22 @@ multi-dimensional labeled arrays, such as: For more about our approach and how to get started, **[read the documentation](https://xarray-beam.readthedocs.io/)**! -**🚨 Warning: Xarray-Beam is new and unpolished 🚨** +**Warning: Xarray-Beam is a sharp tool 🔪** -Expect sharp edges 🔪 and performance cliffs 🧗, particularly related to the -management of lazy data with Dask and reading/writing data with Zarr. We have -used it to efficiently process ~25 TB datasets. We _expect_ it to scale to PB -size datasets, but that's easier said than done. We welcome feedback and -contributions from early adopters, and hope to have it ready for wider audience -soon. +Xarray-Beam is relatively new, and focused on expert users: + +- We use it extensively at Google for processing large-scale weather datasets, + but there is not yet a vibrant external community. It is relatively stable, + but we are still refining parts of its API. +- It provides low-level abstractions that facilitate writing very large + scale data pipelines (e.g., 100+ TB), but by design it requires explicitly + thinking about how every operation is parallelized. ## Installation -Xarray-Beam requires recent versions of immutabledict, xarray, dask, rechunker -and zarr, and the *latest* release of Apache Beam (2.31.0 or later). For best -performance when writing Zarr files, use Xarray 0.19.0 or later. +Xarray-Beam requires recent versions of immutabledict, Xarray, Dask, Rechunker, +Zarr, and Apache Beam. For best performance when writing Zarr files, use Xarray +0.19.0 or later. ## Disclaimer diff --git a/setup.py b/setup.py index 5a50809..4c3acab 100644 --- a/setup.py +++ b/setup.py @@ -42,7 +42,7 @@ setuptools.setup( name='xarray-beam', - version='0.3.1', + version='0.3.2', license='Apache 2.0', author='Google LLC', author_email='noreply@google.com', diff --git a/xarray_beam/__init__.py b/xarray_beam/__init__.py index 4d230e4..77deb50 100644 --- a/xarray_beam/__init__.py +++ b/xarray_beam/__init__.py @@ -43,4 +43,4 @@ ) from xarray_beam import Mean -__version__ = '0.3.1' +__version__ = '0.3.2' diff --git a/xarray_beam/_src/core.py b/xarray_beam/_src/core.py index a740b97..32c4b3a 100644 --- a/xarray_beam/_src/core.py +++ b/xarray_beam/_src/core.py @@ -282,7 +282,8 @@ def __init__( chunks = dataset.chunks if chunks is None: raise ValueError('dataset must be chunked or chunks must be set') - chunks = normalize_expanded_chunks(chunks, dataset.sizes) + sizes = {k: v for k, v in dataset.sizes.items() if k in chunks} + chunks = normalize_expanded_chunks(chunks, sizes) self.dataset = dataset self.chunks = chunks self.split_vars = split_vars diff --git a/xarray_beam/_src/core_test.py b/xarray_beam/_src/core_test.py index 7cdbf89..60ddb9a 100644 --- a/xarray_beam/_src/core_test.py +++ b/xarray_beam/_src/core_test.py @@ -261,6 +261,7 @@ def test_dataset_to_chunks_whole(self): test_util.EagerPipeline() | xbeam.DatasetToChunks(dataset, chunks={}) ) + expected = [(xbeam.Key(), dataset)] self.assertIdenticalChunks(actual, expected) def test_dataset_to_chunks_vars(self): @@ -280,6 +281,23 @@ def test_dataset_to_chunks_vars(self): ) self.assertIdenticalChunks(actual, expected) + def test_dataset_to_chunks_split_with_different_dims(self): + dataset = xarray.Dataset({ + 'foo': (('x', 'y'), np.array([[1, 2, 3], [4, 5, 6]])), + 'bar': ('x', np.array([1, 2])), + }) + expected = [ + (xbeam.Key({'x': 0}, {'foo'}), dataset[['foo']].head(x=1)), + (xbeam.Key({'x': 0}, {'bar'}), dataset[['bar']].head(x=1)), + (xbeam.Key({'x': 1}, {'foo'}), dataset[['foo']].tail(x=1)), + (xbeam.Key({'x': 1}, {'bar'}), dataset[['bar']].tail(x=1)), + ] + actual = ( + test_util.EagerPipeline() + | xbeam.DatasetToChunks(dataset, chunks={'x': 1}, split_vars=True) + ) + self.assertIdenticalChunks(actual, expected) + class ValidateEachChunkTest(test_util.TestCase): diff --git a/xarray_beam/_src/integration_test.py b/xarray_beam/_src/integration_test.py index e83b76b..a61d0e4 100644 --- a/xarray_beam/_src/integration_test.py +++ b/xarray_beam/_src/integration_test.py @@ -95,6 +95,26 @@ def test_rechunk_zarr_to_zarr(self, template_method, split_vars): xarray.testing.assert_identical(roundtripped, dataset) + def test_dataset_to_zarr_with_split_vars(self): + dataset = xarray.Dataset( + { + 'volumetric': ( + ('t', 'x', 'y', 'z'), np.arange(240).reshape(10, 2, 3, 4) + ), + 'surface': (('t', 'x', 'y'), np.arange(60).reshape(10, 2, 3)), + } + ) + temp_dir = self.create_tempdir().full_path + template = dataset.chunk() + chunks = {'t': 1} + ( + test_util.EagerPipeline() + | xbeam.DatasetToChunks(dataset, chunks, split_vars=True) + | xbeam.ChunksToZarr(temp_dir, template, chunks) + ) + actual = xarray.open_zarr(temp_dir, consolidated=True) + xarray.testing.assert_identical(actual, dataset) + if __name__ == '__main__': absltest.main() diff --git a/xarray_beam/_src/zarr_test.py b/xarray_beam/_src/zarr_test.py index 1090c2a..da0748b 100644 --- a/xarray_beam/_src/zarr_test.py +++ b/xarray_beam/_src/zarr_test.py @@ -140,14 +140,13 @@ def test_2d_chunks_to_zarr(self, coords): result = xarray.open_zarr(temp_dir, consolidated=True) xarray.testing.assert_identical(dataset, result) - def test_dataset_to_zarr(self): + def test_dataset_to_zarr_simple(self): dataset = xarray.Dataset( {'foo': ('x', np.arange(0, 60, 10))}, coords={'x': np.arange(6)}, attrs={'meta': 'data'}, ) chunked = dataset.chunk({'x': 3}) - temp_dir = self.create_tempdir().full_path ( test_util.EagerPipeline() @@ -156,6 +155,10 @@ def test_dataset_to_zarr(self): actual = xarray.open_zarr(temp_dir, consolidated=True) xarray.testing.assert_identical(actual, dataset) + def test_dataset_to_zarr_unchunked(self): + dataset = xarray.Dataset( + {'foo': ('x', np.arange(0, 60, 10))}, + ) temp_dir = self.create_tempdir().full_path with self.assertRaisesRegex( ValueError,