diff --git a/dags_tests/compile_test.py b/dags_tests/compile_test.py index 45029fd..4c13a69 100644 --- a/dags_tests/compile_test.py +++ b/dags_tests/compile_test.py @@ -3,4 +3,4 @@ def test_compiles(): job_names = [d.name for d in list(defs.get_all_job_defs())] - assert len(job_names) == 18 + assert len(job_names) == 19 diff --git a/nwp/assets/ecmwf/mars.py b/nwp/assets/ecmwf/mars.py index 76bff4a..0be1ba3 100644 --- a/nwp/assets/ecmwf/mars.py +++ b/nwp/assets/ecmwf/mars.py @@ -1,66 +1,69 @@ import nwp_consumer.cmd.main as consumer +import contextlib +import os from dagster import Config, OpExecutionContext, op from dagster_docker import execute_docker_container +@contextlib.contextmanager +def modify_env(vars: dict[str, str]): + """Temporarily modify the environment.""" + for var in vars: + oldval = os.environ.get(var) + newval = vars[var] + os.environ[var] = newval + vars[var] = oldval + try: + yield + finally: + for var in vars: + os.environ[var] = oldval class NWPConsumerConfig(Config): + """Configuration for the NWP consumer.""" + date_from: str date_to: str source: str - docker_volumes: list[str] raw_dir: str zarr_dir: str - env_vars: list[str] - -@op -def nwp_consumer_docker_op(context: OpExecutionContext, config: NWPConsumerConfig): - execute_docker_container( - context=context, - image="ghcr.io/openclimatefix/nwp-consumer:latest", - command=[ - "consume", f'--source={config.source}', - f'--from={config.date_from}', - f'--to={config.date_to}' - ], - env_vars=config.env_vars, - container_kwargs={ - "volumes": config.docker_volumes - } - ) + env_overrides: dict[str, str] - pass @op -def nwp_consumer_download_op(context: OpExecutionContext, config: NWPConsumerConfig): +def nwp_consumer_download_op(context: OpExecutionContext, config: NWPConsumerConfig) \ + -> NWPConsumerConfig: """Download the data from the source.""" - consumer.run({ - "download": True, - "convert": False, - "consume": False, - "check": False, - "--source": config.source, - "--sink": "local", - "--from": config.date_from, - "--to": config.date_to, - "--rdir": config.raw_dir, - "--zdir": config.zarr_dir, - "--create-latest": False, - }) + with modify_env(config.env_overrides): + consumer.run({ + "download": True, + "convert": False, + "consume": False, + "check": False, + "--source": config.source, + "--sink": "local", + "--from": config.date_from, + "--to": config.date_to, + "--rdir": config.raw_dir, + "--zdir": config.zarr_dir, + "--create-latest": False, + }) + return config @op def nwp_consumer_convert_op(context: OpExecutionContext, downloadedConfig: NWPConsumerConfig): """Convert the downloaded data to zarr format.""" - consumer.run({ - "download": False, - "convert": True, - "consume": False, - "check": False, - "--source": downloadedConfig.source, - "--sink": "local", - "--from": downloadedConfig.date_from, - "--to": downloadedConfig.date_to, - "--rdir": downloadedConfig.raw_dir, - "--zdir": downloadedConfig.zarr_dir, - "--create-latest": False, - }) + with modify_env(downloadedConfig.env_overrides): + consumer.run({ + "download": False, + "convert": True, + "consume": False, + "check": False, + "--source": downloadedConfig.source, + "--sink": "local", + "--from": downloadedConfig.date_from, + "--to": downloadedConfig.date_to, + "--rdir": downloadedConfig.raw_dir, + "--zdir": downloadedConfig.zarr_dir, + "--create-latest": False, + }) diff --git a/nwp/jobs.py b/nwp/jobs.py index e798d6f..ae1d9ad 100644 --- a/nwp/jobs.py +++ b/nwp/jobs.py @@ -65,39 +65,45 @@ def build_config_on_runtime(model: str, run: str, delay: int = 0) -> dict: @daily_partitioned_config(start_date=dt.datetime(2020, 1, 1)) -def ecmwf_daily_partitioned_config_docker(start: dt.datetime, _end: dt.datetime): +def ecmwf_uk_daily_partitioned_config(start: dt.datetime, _end: dt.datetime) -> dict: + """Create a config dict for the nwp-consumer for uk data from ECMWF.""" config: NWPConsumerConfig = NWPConsumerConfig( date_from=start.strftime("%Y-%m-%d"), date_to=start.strftime("%Y-%m-%d"), source="ecmwf-mars", - env_vars=["ECMWF_API_URL", "ECMWF_API_KEY", "ECMWF_API_EMAIL"], - docker_volumes=['/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF:/tmp'], - zarr_dir='/tmp/zarr', - raw_dir='/tmp/raw', + env_overrides={"ECMWF_AREA": "uk"}, + zarr_dir='/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/uk/zarr', + raw_dir='/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/uk/raw', ) return {"ops": { - "nwp_consumer_docker_op": {"config": json.loads(config.json())}, + "nwp_consumer_download_op": {"config": json.loads(config.json())}, }} +@job(config=ecmwf_uk_daily_partitioned_config) +def ecmwf_uk_daily_local_archive() -> None: + """Download and convert ECMWF data for the UK.""" + nwp_consumer_convert_op(nwp_consumer_download_op()) @daily_partitioned_config(start_date=dt.datetime(2020, 1, 1)) -def ecmwf_daily_partitioned_config(start: dt.datetime, _end: dt.datetime): +def ecmwf_india_daily_partitioned_config(start: dt.datetime, _end: dt.datetime) -> dict: + """Create a config dict for the nwp-consumer for india data from ECMWF.""" config: NWPConsumerConfig = NWPConsumerConfig( date_from=start.strftime("%Y-%m-%d"), date_to=start.strftime("%Y-%m-%d"), source="ecmwf-mars", - env_vars=["ECMWF_API_URL", "ECMWF_API_KEY", "ECMWF_API_EMAIL"], - docker_volumes=[], - zarr_dir='/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/zarr', - raw_dir='/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/raw', + env_overrides={"ECMWF_AREA": "nw-india"}, + zarr_dir='/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/india/zarr', + raw_dir='/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/india/raw', ) return {"ops": { "nwp_consumer_download_op": {"config": json.loads(config.json())}, }} -@job(config=ecmwf_daily_partitioned_config) -def ecmwf_daily_local_archive(): +@job(config=ecmwf_india_daily_partitioned_config) +def ecmwf_india_daily_local_archive() -> None: + """Download and convert ECMWF data for India.""" nwp_consumer_convert_op(nwp_consumer_download_op()) -schedules.append(build_schedule_from_partitioned_job(ecmwf_daily_local_archive, hour_of_day=13)) +schedules.append(build_schedule_from_partitioned_job(ecmwf_uk_daily_local_archive, hour_of_day=13)) +schedules.append(build_schedule_from_partitioned_job(ecmwf_india_daily_local_archive, hour_of_day=14)) diff --git a/pyproject.toml b/pyproject.toml index d81fe28..ed18f82 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,7 +25,7 @@ dependencies = [ "dagster-docker == 0.20.11", "huggingface-hub == 0.16.4", "numpy >= 1.23.0", - "nwp-consumer == 0.1.17", + "nwp-consumer == 0.1.20", "ocf-blosc2 == 0.0.3", "pathlib == 1.0.1", "requests >= 2.28.0",