diff --git a/cloud_archives/pv/passiv/passiv_monthly.py b/cloud_archives/pv/passiv/passiv_monthly.py index f3b9206..6645b72 100644 --- a/cloud_archives/pv/passiv/passiv_monthly.py +++ b/cloud_archives/pv/passiv/passiv_monthly.py @@ -83,7 +83,7 @@ def get_monthly_passiv_data(start_date: datetime, upload_to_hf: bool = True, ove @dg.asset( key=["pv", "passiv", "monthly_30min"], - auto_materialize_policy=dg.AutoMaterializePolicy.eager(), + automation_condition=dg.AutomationCondition.eager(), partitions_def=dg.TimeWindowPartitionsDefinition( fmt="%Y-%m", start="2010-01", @@ -104,7 +104,7 @@ def pv_passiv_monthly_30min(context: dg.AssetExecutionContext): @dg.asset( key=["pv", "passiv", "monthly_5min"], - auto_materialize_policy=dg.AutoMaterializePolicy.eager(), + automation_condition=dg.AutomationCondition.eager(), partitions_def=dg.TimeWindowPartitionsDefinition( fmt="%Y-%m", start="2018-01", diff --git a/cloud_archives/pv/passiv/passiv_year.py b/cloud_archives/pv/passiv/passiv_year.py index dbc8c9d..3b65252 100644 --- a/cloud_archives/pv/passiv/passiv_year.py +++ b/cloud_archives/pv/passiv/passiv_year.py @@ -67,7 +67,7 @@ def get_yearly_passiv_data(start_date: datetime, upload_to_hf: bool = True, over @dg.asset( key=["pv", "passiv", "yearly_5min"], - auto_materialize_policy=dg.AutoMaterializePolicy.eager(), + automation_condition=dg.AutomationCondition.eager(), partitions_def=dg.TimeWindowPartitionsDefinition( fmt="%Y", start="2018", @@ -86,7 +86,7 @@ def pv_passiv_yearly_5min(context: dg.AssetExecutionContext): @dg.asset( key=["pv", "passiv", "yearly_30min"], - auto_materialize_policy=dg.AutoMaterializePolicy.eager(), + automation_condition=dg.AutomationCondition.eager(), partitions_def=dg.TimeWindowPartitionsDefinition( fmt="%Y", start="2010", diff --git a/local_archives/nwp/_generic_definitions_factory.py b/local_archives/nwp/_generic_definitions_factory.py index 3567af1..b3a2a5e 100644 --- a/local_archives/nwp/_generic_definitions_factory.py +++ b/local_archives/nwp/_generic_definitions_factory.py @@ -60,7 +60,7 @@ def make_definitions( @dg.asset( name="raw_archive", key_prefix=opts.key_prefix(), - auto_materialize_policy=dg.AutoMaterializePolicy.eager(), + automation_condition=dg.AutomationCondition.eager(), partitions_def=opts.partitions, check_specs=[ dg.AssetCheckSpec( @@ -168,7 +168,7 @@ def _raw_archive( name="zarr_archive", key_prefix=opts.key_prefix(), partitions_def=opts.partitions, - auto_materialize_policy=dg.AutoMaterializePolicy.eager(), + automation_condition=dg.AutomationCondition.eager(), ins={"raw_paths": dg.AssetIn(key=_raw_archive.key)}, io_manager_key="nwp_xr_zarr_io", compute_kind="process", diff --git a/local_archives/nwp/ceda/__init__.py b/local_archives/nwp/ceda/__init__.py index 9743b5c..a7926e7 100644 --- a/local_archives/nwp/ceda/__init__.py +++ b/local_archives/nwp/ceda/__init__.py @@ -1,10 +1,15 @@ import dagster as dg -from . import ceda_uk +from . import ceda_uk, ceda_global uk_assets = dg.load_assets_from_modules( modules=[ceda_uk], group_name="ceda_uk", ) -all_assets: list[dg.AssetsDefinition] = [*uk_assets] +global_assets = dg.load_assets_from_modules( + modules=[ceda_global], + group_name="ceda_global", +) + +all_assets: list[dg.AssetsDefinition] = [*uk_assets, *global_assets] diff --git a/local_archives/nwp/ceda/ceda_global.py b/local_archives/nwp/ceda/ceda_global.py new file mode 100644 index 0000000..7174d2e --- /dev/null +++ b/local_archives/nwp/ceda/ceda_global.py @@ -0,0 +1,52 @@ +import dagster as dg +import os +from typing import Any +import datetime as dt + +from dagster_docker import PipesDockerClient +from constants import LOCATIONS_BY_ENVIRONMENT + +env = os.getenv("ENVIRONMENT", "local") +ZARR_FOLDER = LOCATIONS_BY_ENVIRONMENT[env].NWP_ZARR_FOLDER + +@dg.asset( + name="zarr_archive", + key_prefix=["nwp", "ceda", "global"], + metadata={ + "archive_folder": dg.MetadataValue.text(f"{ZARR_FOLDER}/nwp/ceda/global"), + "area": dg.MetadataValue.text("global"), + "source": dg.MetadataValue.text("ceda"), + }, + compute_kind="download", + op_tags={"dagster/max_runtime": int(60 * 100)}, + partitions_def=dg.MonthlyPartitionsDefinition( + start_date="2019-01-01", + end_offset=-3, + ), +) +def ceda_global( + context: dg.AssetExecutionContext, + pipes_docker_client: PipesDockerClient, +) -> Any: + image: str = "ghcr.io/openclimatefix/nwp-consumer:devsjc-major-refactor" + it: dt.datetime = context.partition_time_window.start + return pipes_docker_client.run( + image=image, + command=[ + "consume", + "-y", + str(it.year), + "-m", + str(it.month), + ], + env={ + "NWP_CONSUMER_MODEL_REPOSITORY": "ceda-metoffice-global", + "NWP_CONSUMER_NOTIFICATION_REPOSITORY": "dagster-pipes", + "CEDA_FTP_USER": os.environ["CEDA_FTP_USER"], + "CEDA_FTP_PASSWORD": os.environ["CEDA_FTP_PASSWORD"], + }, + container_kwargs={ + "volumes": [f"{ZARR_FOLDER}/nwp/ceda/global:/work"], + }, + context=context, + ).get_results() diff --git a/local_archives/nwp/gfs/gfs.py b/local_archives/nwp/gfs/gfs.py index ed5c4b9..10f8e3b 100644 --- a/local_archives/nwp/gfs/gfs.py +++ b/local_archives/nwp/gfs/gfs.py @@ -13,7 +13,7 @@ name="zarr_daily_archive", description="Daily archive of GFS global NWP data", key_prefix=["nwp", "gfs", "global"], - auto_materialize_policy=dg.AutoMaterializePolicy.eager(), + automation_condition=dg.AutomationCondition.eager(), partitions_def=dg.DailyPartitionsDefinition( start_date="2015-01-15", end_offset=-2, diff --git a/pyproject.toml b/pyproject.toml index 7918818..8e2f5cc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,14 +20,14 @@ classifiers = ["Programming Language :: Python :: 3"] dependencies = [ "cdsapi >= 0.6.1", "ecmwf-api-client >= 1.6.3", - "dagit >= 1.6.2", - "dagster >= 1.6.2", - "dagster-cloud >= 1.6.2", - "dagster-webserver >= 1.6.2", - "dagster-graphql >= 1.6.2", - "dagster-postgres >= 0.22.9", - "dagster-docker >= 0.22.9", - "dagster-pipes >= 1.7.8", + "dagit >= 1.8.5", + "dagster >= 1.8.5", + "dagster-cloud >= 1.8.5", + "dagster-webserver >= 1.8.5", + "dagster-graphql >= 1.8.5", + "dagster-postgres >= 0.24.5", + "dagster-docker >= 0.24.5", + "dagster-pipes >= 1.8.5", "huggingface-hub >= 0.19.4", "kbatch >= 0.4.2", "meteomatics == 2.11.1", diff --git a/tests/cloud_archives/pv/test_passiv.py b/tests/cloud_archives/pv/test_passiv.py index 8dd78e8..6e9139d 100644 --- a/tests/cloud_archives/pv/test_passiv.py +++ b/tests/cloud_archives/pv/test_passiv.py @@ -5,4 +5,4 @@ def test_get_daily_passiv_data(): start_date = datetime(2022, 1, 1, tzinfo=timezone.utc) - get_monthly_passiv_data(start_date, upload_to_hf=False, overwrite=True) \ No newline at end of file + get_monthly_passiv_data(start_date, upload_to_hf=False, overwrite=True)