From 7f894a2d6f5b09248e6d04220239fb0cfb20197e Mon Sep 17 00:00:00 2001 From: devsjc Date: Mon, 16 Oct 2023 15:49:57 +0100 Subject: [PATCH] Enable step selection for india --- nwp/assets/ecmwf/mars.py | 13 +++++++------ nwp/jobs.py | 7 ++++--- pyproject.toml | 2 +- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/nwp/assets/ecmwf/mars.py b/nwp/assets/ecmwf/mars.py index da89c84..6acfd81 100644 --- a/nwp/assets/ecmwf/mars.py +++ b/nwp/assets/ecmwf/mars.py @@ -2,7 +2,7 @@ import os import nwp_consumer.cmd.main as consumer -from dagster import Config, OpExecutionContext, op +import dagster @contextlib.contextmanager @@ -20,7 +20,7 @@ def modify_env(newvars: dict[str, str]): else: del os.environ[key] -class NWPConsumerConfig(Config): +class NWPConsumerConfig(dagster.Config): """Configuration for the NWP consumer.""" date_from: str @@ -31,8 +31,8 @@ class NWPConsumerConfig(Config): env_overrides: dict[str, str] -@op -def nwp_consumer_download_op(context: OpExecutionContext, config: NWPConsumerConfig) \ +@dagster.op +def nwp_consumer_download_op(context: dagster.OpExecutionContext, config: NWPConsumerConfig) \ -> NWPConsumerConfig: """Download the data from the source.""" with modify_env(config.env_overrides): @@ -53,8 +53,8 @@ def nwp_consumer_download_op(context: OpExecutionContext, config: NWPConsumerCon return config -@op -def nwp_consumer_convert_op(context: OpExecutionContext, downloadedConfig: NWPConsumerConfig): +@dagster.op +def nwp_consumer_convert_op(context: dagster.OpExecutionContext, downloadedConfig: NWPConsumerConfig): """Convert the downloaded data to zarr format.""" with modify_env(downloadedConfig.env_overrides): consumer.run({ @@ -71,3 +71,4 @@ def nwp_consumer_convert_op(context: OpExecutionContext, downloadedConfig: NWPCo "--zdir": downloadedConfig.zarr_dir, "--create-latest": False, }) + diff --git a/nwp/jobs.py b/nwp/jobs.py index c585889..b8ab710 100644 --- a/nwp/jobs.py +++ b/nwp/jobs.py @@ -84,7 +84,7 @@ class NWPConsumerDagDefinition: """A class to define the NWPConsumerDagDefinition.""" def __init__( - self, area: str, source: str, storage_path: str | None = None + self, area: str, source: str, storage_path: str | None = None, hours: int | None = 48 ) -> "NWPConsumerDagDefinition": """Create a NWPConsumerDagDefinition.""" self.area = area @@ -92,10 +92,11 @@ def __init__( self.storage_path = \ storage_path or \ f'/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/{self.area}' + self.hours = hours nwp_consumer_jobs: dict[str, NWPConsumerDagDefinition] = { "uk": NWPConsumerDagDefinition(area="uk", source="ecmwf-mars"), - "india": NWPConsumerDagDefinition(area="nw-india", source="ecmwf-mars"), + "india": NWPConsumerDagDefinition(area="nw-india", source="ecmwf-mars", hours=84), "malta": NWPConsumerDagDefinition(area="malta", source="ecmwf-mars") } @@ -110,7 +111,7 @@ def partitioned_config_func(partition_key: str) -> dict[str, Any]: date_from=time_window.start.strftime("%Y-%m-%d"), date_to=time_window.start.strftime("%Y-%m-%d"), source=dagdef.source, - env_overrides={"ECMWF_AREA": dagdef.area}, + env_overrides={"ECMWF_AREA": dagdef.area, "ECMWF_HOURS": dagdef.hours}, zarr_dir=f"{dagdef.storage_path}/zarr", raw_dir=f"{dagdef.storage_path}/raw", ) diff --git a/pyproject.toml b/pyproject.toml index 2dbdfa9..a1e2084 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,7 +25,7 @@ dependencies = [ "dagster-docker == 0.20.11", "huggingface-hub == 0.16.4", "numpy >= 1.23.0", - "nwp-consumer == 0.1.21", + "nwp-consumer == 0.1.22", "ocf-blosc2 == 0.0.3", "pathlib == 1.0.1", "requests >= 2.28.0",