diff --git a/.gitignore b/.gitignore index d972f37..30647d8 100644 --- a/.gitignore +++ b/.gitignore @@ -152,3 +152,6 @@ zarr # Dagster tmp* + +# uv +uv.lock diff --git a/local_archives/nwp/ceda/ceda_global.py b/local_archives/nwp/ceda/ceda_global.py index e93b539..c541418 100644 --- a/local_archives/nwp/ceda/ceda_global.py +++ b/local_archives/nwp/ceda/ceda_global.py @@ -1,3 +1,14 @@ +"""Zarr archive of NWP data from the Met Office's Global model. + +The MetOffice runs it's Unified Model (UM) in two configurations: Global, and UK. +This asset contains data from the global configuration covering the whole globe. + +Sourced via FTP from CEDA (https://catalogue.ceda.ac.uk/uuid/86df725b793b4b4cb0ca0646686bd783). +This asset is updated monthly, and surfaced as a Zarr Directory Store for each month. +It is downloaded using the nwp-consumer docker image +(https://github.com/openclimatefix/nwp-consumer) +""" + import datetime as dt import os from typing import Any @@ -12,15 +23,7 @@ @dg.asset( name="zarr_archive", - description="".join(( - "Zarr archive of NWP data from the Met Office's Global model. ", - "Sourced via FTP from CEDA ", - "(https://catalogue.ceda.ac.uk/uuid/86df725b793b4b4cb0ca0646686bd783). ", - "This asset is updated monthly, and surfaced as a Zarr Directory Store ", - "for each month. It is downloaded using the nwp-consumer ", - "docker image, currently from the 'major-refactor' branch ", - "(https://github.com/openclimatefix/nwp-consumer). ", - )), + description=__doc__, key_prefix=["nwp", "ceda", "global"], metadata={ "archive_folder": dg.MetadataValue.text(f"{ZARR_FOLDER}/nwp/ceda/global"), diff --git a/local_archives/nwp/ecmwf/__init__.py b/local_archives/nwp/ecmwf/__init__.py index 1d33007..aab2a92 100644 --- a/local_archives/nwp/ecmwf/__init__.py +++ b/local_archives/nwp/ecmwf/__init__.py @@ -1,6 +1,12 @@ import dagster as dg -from . import ecmwf_malta, ecmwf_nw_india, ecmwf_uk, ecmwf_india +from . import ( + ecmwf_malta, + ecmwf_nw_india, + ecmwf_uk, + ecmwf_india, + ecmwf_ens_stat_india, +) uk_assets = dg.load_assets_from_modules( modules=[ecmwf_uk], @@ -23,9 +29,15 @@ group_name="ecmwf_india", ) +india_stat_assets = dg.load_assets_from_modules( + modules=[ecmwf_ens_stat_india], + group_name="ecmwf_ens_india_stat", +) + all_assets: list[dg.AssetsDefinition] = [ *uk_assets, *nw_india_assets, *malta_assets, *india_assets, + *india_stat_assets, ] diff --git a/local_archives/nwp/ecmwf/ecmwf_ens_stat_india.py b/local_archives/nwp/ecmwf/ecmwf_ens_stat_india.py new file mode 100644 index 0000000..b58e711 --- /dev/null +++ b/local_archives/nwp/ecmwf/ecmwf_ens_stat_india.py @@ -0,0 +1,75 @@ +"""Zarr archive of Summary NWP data from ECMWF's EPS. + +EPS is the ECMWF Ensemble Prediction System, +which provides 50 perturbed forecasts of upcoming atmospheric conditions. +This asset contains summary statistics of this data (mean, standard deviation) for India. + +Sourced via MARS API from ECMWF (https://apps.ecmwf.int/mars-catalogue). +This asset is updated monthly, and surfaced as a Zarr Directory Store for each month. +It is downloaded using the nwp-consumer docker image +(https://github.com/openclimatefix/nwp-consumer). +""" + +import datetime as dt +import os +from typing import Any + +import dagster as dg +from dagster_docker import PipesDockerClient + +from constants import LOCATIONS_BY_ENVIRONMENT + +env = os.getenv("ENVIRONMENT", "local") +ZARR_FOLDER = LOCATIONS_BY_ENVIRONMENT[env].NWP_ZARR_FOLDER +ARCHIVE_FOLDER = f"{ZARR_FOLDER}/nwp/ecmwf-eps/india-stat" + +@dg.asset( + name="zarr_archive", + description=__doc__, + key_prefix=["nwp", "ecmwf-eps", "india-stat"], + metadata={ + "archive_folder": dg.MetadataValue.text(ARCHIVE_FOLDER), + "area": dg.MetadataValue.text("global"), + "source": dg.MetadataValue.text("ecmwf-mars"), + "expected_runtime": dg.MetadataValue.text("6 hours"), + }, + compute_kind="docker", + automation_condition=dg.AutomationCondition.eager(), + tags={ + "dagster/max_runtime": str(60 * 60 * 10), # Should take 6 ish hours + "dagster/priority": "1", + "dagster/concurrency_key": "ecmwf-mars-consumer", + }, + partitions_def=dg.MonthlyPartitionsDefinition( + start_date="2020-01-01", + end_offset=-3, + ), +) +def ecmwf_eps_india_stat( + context: dg.AssetExecutionContext, + pipes_docker_client: PipesDockerClient, +) -> Any: + image: str = "ghcr.io/openclimatefix/nwp-consumer:1.0.3" + it: dt.datetime = context.partition_time_window.start + return pipes_docker_client.run( + image=image, + command=[ + "archive" + "-y", + str(it.year), + "-m", + str(it.month), + ], + env={ + "MODEL_REPOSITORY": "ceda-metoffice-global", + "NOTIFICATION_REPOSITORY": "dagster-pipes", + "ECMWF_API_KEY": os.environ["ECMWF_API_KEY"], + "ECMWF_API_EMAIL": os.environ["ECMWF_API_EMAIL"], + "ECMWF_API_URL": os.environ["ECMWF_API_URL"], + "ECMWF_MARS_AREA": "35/67/6/97", + }, + container_kwargs={ + "volumes": [f"{ARCHIVE_FOLDER}:/work"], + }, + context=context, + ).get_results() diff --git a/tests/compile_test.py b/tests/compile_test.py index 33a7b8b..454d46b 100644 --- a/tests/compile_test.py +++ b/tests/compile_test.py @@ -12,6 +12,6 @@ def test_nwp_asset_key_prefixes() -> None: # The first element should be the flavor: assert asset.key.path[0] in ["nwp", "sat"] # The second element should be the provider - assert asset.key.path[1] in ["ecmwf", "metoffice", "eumetsat", "cams", "ceda", "meteomatics", "gfs"] + assert asset.key.path[1] in ["ecmwf", "metoffice", "eumetsat", "cams", "ceda", "meteomatics", "gfs", "ecmwf-eps"] # The third element should be the region - assert asset.key.path[2] in ["uk", "eu", "global", "nw_india", "malta", "india"] + assert asset.key.path[2] in ["uk", "eu", "global", "nw_india", "malta", "india", "india-stat"]