From 8c6482bb60dbc3ec68a30a7e90aa65a87bc40623 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Wed, 18 Sep 2024 14:06:30 -0500 Subject: [PATCH 1/7] Convert NetCDF to Zarr --- .github/workflows/tests.yml | 2 +- ci/environment.yml | 1 + tests/geospatial/test_netcdf_to_zarr.py | 93 +++++++++++++++++++++++++ 3 files changed, 95 insertions(+), 1 deletion(-) create mode 100644 tests/geospatial/test_netcdf_to_zarr.py diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 69d483925d..5345b189b8 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -31,7 +31,7 @@ jobs: matrix: os: [ubuntu-latest] python_version: ["3.10"] - pytest_args: [tests --ignore=tests/tpch] + pytest_args: [tests/geospatial/test_netcdf_to_zarr.py --ignore=tests/tpch] extra-env: [""] name_prefix: [tests] include: diff --git a/ci/environment.yml b/ci/environment.yml index 68f2e7177e..eb1031506d 100644 --- a/ci/environment.yml +++ b/ci/environment.yml @@ -48,6 +48,7 @@ dependencies: - gilknocker ==0.4.1 - openssl >1.1.0g - rioxarray ==0.17.0 + - h5netcdf ==1.3.0 ######################################################## # PLEASE READ: diff --git a/tests/geospatial/test_netcdf_to_zarr.py b/tests/geospatial/test_netcdf_to_zarr.py new file mode 100644 index 0000000000..0ab73b9388 --- /dev/null +++ b/tests/geospatial/test_netcdf_to_zarr.py @@ -0,0 +1,93 @@ +import xarray as xr +from dask.utils import format_bytes + + +def test_netcdf_to_zarr( + scale, + s3, + s3_url, + client_factory, + cluster_kwargs={ + "workspace": "dask-engineering", + "region": "us-west-2", + "wait_for_workers": True, + }, + scale_kwargs={ + "small": {"n_workers": 10}, + "medium": {"n_workers": 100}, + "large": {"n_workers": 200}, + }, +): + with client_factory( + **scale_kwargs[scale], **cluster_kwargs + ) as client: # noqa: F841 + # Define models and variables of interest + models = [ + "ACCESS-CM2", + "ACCESS-ESM1-5", + "CMCC-ESM2", + "CNRM-CM6-1", + "CNRM-ESM2-1", + "CanESM5", + "EC-Earth3", + "EC-Earth3-Veg-LR", + "FGOALS-g3", + "GFDL-ESM4", + "GISS-E2-1-G", + "INM-CM4-8", + "INM-CM5-0", + "KACE-1-0-G", + "MIROC-ES2L", + "MPI-ESM1-2-HR", + "MPI-ESM1-2-LR", + "MRI-ESM2-0", + "NorESM2-LM", + "NorESM2-MM", + "TaiESM1", + "UKESM1-0-LL", + ] + variables = [ + "hurs", + "huss", + "pr", + "rlds", + "rsds", + "sfcWind", + "tas", + "tasmax", + "tasmin", + ] + + if scale == "small": + # 130 files (152.83 GiB) + # One model and one variable + models = models[:1] + variables = variables[:1] + elif scale == "medium": + # 715 files (XX TiB) + # One model and all variables + models = models[:1] + else: + # 11635 files (XX TiB) + # All models and variables + pass + + # Get netCDF data files -- see https://registry.opendata.aws/nex-gddp-cmip6 + # for dataset details. + file_list = [] + for model in models: + for variable in variables: + source_directory = f"s3://nex-gddp-cmip6/NEX-GDDP-CMIP6/{model}/historical/r1i1p1f1/{variable}/*.nc" + file_list += [f"s3://{path}" for path in s3.glob(source_directory)] + files = [s3.open(f) for f in file_list] + print(f"Processing {len(files)} NetCDF files") + + ds = xr.open_mfdataset( + files, + engine="h5netcdf", + combine="nested", + concat_dim="time", + parallel=True, + ) + print(f"Converting {format_bytes(ds.nbytes)} from NetCDF to Zarr") + ds.to_zarr(s3_url) From 94828c07715775a75e51c2be8150087313a2d3ee Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Wed, 18 Sep 2024 16:26:49 -0500 Subject: [PATCH 2/7] Nit --- tests/geospatial/test_netcdf_to_zarr.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/tests/geospatial/test_netcdf_to_zarr.py b/tests/geospatial/test_netcdf_to_zarr.py index 0ab73b9388..588eac0de7 100644 --- a/tests/geospatial/test_netcdf_to_zarr.py +++ b/tests/geospatial/test_netcdf_to_zarr.py @@ -59,17 +59,16 @@ def test_netcdf_to_zarr( ] if scale == "small": - # 130 files (152.83 GiB) - # One model and one variable + # 130 files (152.83 GiB). One model and one variable. models = models[:1] variables = variables[:1] elif scale == "medium": - # 715 files (XX TiB) - # One model and all variables + # 715 files. One model and all variables. + # Currently fails after hitting 20 minute idle timeout + # sending `to_zarr` graph to the scheduler. models = models[:1] else: - # 11635 files (XX TiB) - # All models and variables + # 11635 files. All models and variables. pass # Get netCDF data files -- see https://registry.opendata.aws/nex-gddp-cmip6 @@ -77,8 +76,8 @@ def test_netcdf_to_zarr( file_list = [] for model in models: for variable in variables: - source_directory = f"s3://nex-gddp-cmip6/NEX-GDDP-CMIP6/{model}/historical/r1i1p1f1/{variable}/*.nc" - file_list += [f"s3://{path}" for path in s3.glob(source_directory)] + data_dir = f"s3://nex-gddp-cmip6/NEX-GDDP-CMIP6/{model}/historical/r1i1p1f1/{variable}/*.nc" + file_list += [f"s3://{path}" for path in s3.glob(data_dir)] files = [s3.open(f) for f in file_list] print(f"Processing {len(files)} NetCDF files") From d151a0a082ec5aa6cc45c53cd202a4513065c701 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Thu, 19 Sep 2024 10:33:43 -0500 Subject: [PATCH 3/7] Add rechunking step --- .../{test_netcdf_to_zarr.py => test_cloud_optimize.py} | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) rename tests/geospatial/{test_netcdf_to_zarr.py => test_cloud_optimize.py} (91%) diff --git a/tests/geospatial/test_netcdf_to_zarr.py b/tests/geospatial/test_cloud_optimize.py similarity index 91% rename from tests/geospatial/test_netcdf_to_zarr.py rename to tests/geospatial/test_cloud_optimize.py index 588eac0de7..feabe618c8 100644 --- a/tests/geospatial/test_netcdf_to_zarr.py +++ b/tests/geospatial/test_cloud_optimize.py @@ -1,8 +1,7 @@ import xarray as xr -from dask.utils import format_bytes -def test_netcdf_to_zarr( +def test_cloud_optimize( scale, s3, s3_url, @@ -81,6 +80,7 @@ def test_netcdf_to_zarr( files = [s3.open(f) for f in file_list] print(f"Processing {len(files)} NetCDF files") + # Load input NetCDF data files ds = xr.open_mfdataset( files, engine="h5netcdf", @@ -88,5 +88,9 @@ def test_netcdf_to_zarr( concat_dim="time", parallel=True, ) - print(f"Converting {format_bytes(ds.nbytes)} from NetCDF to Zarr") + + # Rechunk from "pancake" to "pencil" format + ds = ds.chunk({"time": -1, "lon": "auto", "lat": "auto"}) + + # Write out to a Zar dataset ds.to_zarr(s3_url) From 874dae937869cf16561f4188b9473851a227c304 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Thu, 19 Sep 2024 12:00:34 -0500 Subject: [PATCH 4/7] Update open_mfdataset kwargs --- tests/geospatial/test_cloud_optimize.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/geospatial/test_cloud_optimize.py b/tests/geospatial/test_cloud_optimize.py index feabe618c8..6c15e4751f 100644 --- a/tests/geospatial/test_cloud_optimize.py +++ b/tests/geospatial/test_cloud_optimize.py @@ -86,6 +86,9 @@ def test_cloud_optimize( engine="h5netcdf", combine="nested", concat_dim="time", + data_vars="minimal", + coords="minimal", + compat="override", parallel=True, ) From 3f72cf26872a9c835bd31d9607c10952d925d733 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Thu, 19 Sep 2024 12:01:46 -0500 Subject: [PATCH 5/7] Remove temp changes --- .github/workflows/tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 5345b189b8..69d483925d 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -31,7 +31,7 @@ jobs: matrix: os: [ubuntu-latest] python_version: ["3.10"] - pytest_args: [tests/geospatial/test_netcdf_to_zarr.py --ignore=tests/tpch] + pytest_args: [tests --ignore=tests/tpch] extra-env: [""] name_prefix: [tests] include: From 22e67a4bb2380d90288bb959f049d6ff478d7455 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Fri, 20 Sep 2024 12:30:38 -0500 Subject: [PATCH 6/7] Apply suggestions from Hendrik Co-authored-by: Hendrik Makait --- tests/geospatial/test_cloud_optimize.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/geospatial/test_cloud_optimize.py b/tests/geospatial/test_cloud_optimize.py index 6c15e4751f..7294832aca 100644 --- a/tests/geospatial/test_cloud_optimize.py +++ b/tests/geospatial/test_cloud_optimize.py @@ -7,7 +7,7 @@ def test_cloud_optimize( s3_url, client_factory, cluster_kwargs={ - "workspace": "dask-engineering", + "workspace": "dask-benchmarks", "region": "us-west-2", "wait_for_workers": True, }, @@ -81,6 +81,7 @@ def test_cloud_optimize( print(f"Processing {len(files)} NetCDF files") # Load input NetCDF data files + # TODO: Reduce explicit settings once https://github.com/pydata/xarray/issues/8778 is completed. ds = xr.open_mfdataset( files, engine="h5netcdf", From 2b33c84ad86c8decece859be9f609567fe034136 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Fri, 20 Sep 2024 12:34:15 -0500 Subject: [PATCH 7/7] Update --- tests/geospatial/test_cloud_optimize.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/geospatial/test_cloud_optimize.py b/tests/geospatial/test_cloud_optimize.py index 7294832aca..afd273ade3 100644 --- a/tests/geospatial/test_cloud_optimize.py +++ b/tests/geospatial/test_cloud_optimize.py @@ -62,10 +62,11 @@ def test_cloud_optimize( models = models[:1] variables = variables[:1] elif scale == "medium": - # 715 files. One model and all variables. + # 390 files. Two models and two variables. # Currently fails after hitting 20 minute idle timeout - # sending `to_zarr` graph to the scheduler. - models = models[:1] + # sending large graph to the scheduler. + models = models[:2] + variables = variables[:2] else: # 11635 files. All models and variables. pass