From f9e03c29369ff86178e6c7574909a70c5aec4e7d Mon Sep 17 00:00:00 2001 From: devsjc Date: Thu, 14 Sep 2023 14:12:29 +0100 Subject: [PATCH] Create partitioned, backfillable versions of ICON today runs --- dags_tests/compile_test.py | 2 +- nwp/__init__.py | 2 - nwp/assets/dwd/archive_to_hf.py | 8 +-- nwp/assets/dwd/utils.py | 11 +++++ nwp/jobs.py | 87 ++++++++++++++++----------------- 5 files changed, 59 insertions(+), 51 deletions(-) diff --git a/dags_tests/compile_test.py b/dags_tests/compile_test.py index 45029fd..60ffdf3 100644 --- a/dags_tests/compile_test.py +++ b/dags_tests/compile_test.py @@ -3,4 +3,4 @@ def test_compiles(): job_names = [d.name for d in list(defs.get_all_job_defs())] - assert len(job_names) == 18 + assert len(job_names) == 3 diff --git a/nwp/__init__.py b/nwp/__init__.py index 5885e95..694981b 100644 --- a/nwp/__init__.py +++ b/nwp/__init__.py @@ -2,10 +2,8 @@ from nwp import assets, jobs - all_assets = load_assets_from_modules([assets]) defs = Definitions( - assets=all_assets, schedules=jobs.schedules, ) diff --git a/nwp/assets/dwd/archive_to_hf.py b/nwp/assets/dwd/archive_to_hf.py index 1d57301..d32b89b 100644 --- a/nwp/assets/dwd/archive_to_hf.py +++ b/nwp/assets/dwd/archive_to_hf.py @@ -4,7 +4,7 @@ import xarray as xr import zarr -from dagster import asset +from dagster import op from huggingface_hub import HfApi from ocf_blosc2 import Blosc2 @@ -36,7 +36,7 @@ def does_files_exist(config, now_datetime): return path_in_repo in existing_files -@asset +@op def download_model_files(config: IconConfig): model = config.model run = config.run @@ -74,7 +74,7 @@ def download_model_files(config: IconConfig): ) -@asset(deps=[download_model_files]) +@op def process_model_files( config: IconConfig ): @@ -199,7 +199,7 @@ def process_model_files( ds.chunk(chunking).to_zarr(store, encoding=encoding, compute=True) -@asset(deps=[process_model_files]) +@op def upload_model_files_to_hf(config: IconConfig): _, _, now = get_run(config.run, delay=config.delay) if does_files_exist(config, now): diff --git a/nwp/assets/dwd/utils.py b/nwp/assets/dwd/utils.py index c481a5e..6ba2e03 100644 --- a/nwp/assets/dwd/utils.py +++ b/nwp/assets/dwd/utils.py @@ -7,6 +7,17 @@ import requests +dwd_base_path = "/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/DWD" + +def build_config(model, run, delay=0): + config = IconConfig(model=model, + run=run, + delay=delay, + folder=f"{dwd_base_path}/{'ICON_Global' if model == 'global' else 'ICON_EU'}/{run}", + zarr_path=f"{dwd_base_path}/{'ICON_Global' if model == 'global' else 'ICON_EU'}/{run}/{run}.zarr.zip") + config_dict = {"delay": config.delay, "folder": config.folder, "model": config.model, "run": config.run, + "zarr_path": config.zarr_path} + return config_dict def get_run(run: str, delay: int = 0): """Get run name. diff --git a/nwp/jobs.py b/nwp/jobs.py index 56368f6..2c129de 100644 --- a/nwp/jobs.py +++ b/nwp/jobs.py @@ -1,63 +1,62 @@ import datetime as dt from dagster import ( - AssetSelection, DailyPartitionsDefinition, - ScheduleDefinition, + MultiPartitionsDefinition, + StaticPartitionsDefinition, build_schedule_from_partitioned_job, - define_asset_job, job, partitioned_config, ) +from nwp.assets.dwd.archive_to_hf import ( + download_model_files, + process_model_files, + upload_model_files_to_hf, +) from nwp.assets.dwd.common import IconConfig +from nwp.assets.dwd.utils import build_config from nwp.assets.ecmwf.mars import nwp_consumer_docker_op schedules = [] -dwd_base_path = "/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/DWD" +@partitioned_config(partitions_def=MultiPartitionsDefinition({ + "date": DailyPartitionsDefinition(start_date=dt.datetime(2023, 9, 10)), + "run": StaticPartitionsDefinition(["00", "06", "12", "18"]), +})) +def icon_global_dailyrun_partitioned_config(partition_key): + return {"ops": { + "download_model_files": {"config": build_config("global", partition_key["run"])}, + "process_model_files": {"config": build_config("global", partition_key["run"])}, + "upload_model_files_to_hf": {"config": build_config("global", partition_key["run"])}, + }} -def build_config_on_runtime(model, run, delay=0): - config = IconConfig(model=model, - run=run, - delay=delay, - folder=f"{dwd_base_path}/{'ICON_Global' if model == 'global' else 'ICON_EU'}/{run}", - zarr_path=f"{dwd_base_path}/{'ICON_Global' if model == 'global' else 'ICON_EU'}/{run}/{run}.zarr.zip") - config_dict = {"delay": config.delay, "folder": config.folder, "model": config.model, "run": config.run, - "zarr_path": config.zarr_path} - return config_dict +@job(config=icon_global_dailyrun_partitioned_config) +def icon_global_hf_archive(): + download_model_files() + process_model_files() + upload_model_files_to_hf() -schedules = [] -for r in ["00", "06", "12", "18"]: - for model in ["global", "eu"]: - for delay in [0, 1]: - asset_job = define_asset_job( - name=f"download_{model}_run_{r}_{'today' if delay == 0 else 'yesterday'}", - selection=AssetSelection.all(), - config={'ops': { - "download_model_files": {"config": build_config_on_runtime(model, r, delay)}, - "process_model_files": {"config": build_config_on_runtime(model, r, delay)}, - "upload_model_files_to_hf": {"config": build_config_on_runtime(model, r, delay)}, - }} - ) - match (delay, r): - case (0, "00"): - schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="30 4 * * *")) - case (0, "06"): - schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="30 10 * * *")) - case (0, "12"): - schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="30 16 * * *")) - case (0, "18"): - schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="30 22 * * *")) - case (1, "00"): - schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="1 0 * * *")) - case (1, "06"): - schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="0 2 * * *")) - case (1, "12"): - schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="0 6 * * *")) - case (1, "18"): - schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="0 8 * * *")) +schedules.append(build_schedule_from_partitioned_job(icon_global_hf_archive)) + +@partitioned_config(partitions_def=MultiPartitionsDefinition({ + "date": DailyPartitionsDefinition(start_date=dt.datetime(2023, 9, 10)), + "run": StaticPartitionsDefinition(["00", "06", "12", "18"]), +})) +def icon_eu_dailyrun_partitioned_config(partition_key): + return {"ops": { + "download_model_files": {"config": build_config("eu", partition_key["run"])}, + "process_model_files": {"config": build_config("eu", partition_key["run"])}, + "upload_model_files_to_hf": {"config": build_config("eu", partition_key["run"])}, +}} + +@job(config=icon_eu_dailyrun_partitioned_config) +def icon_eu_hf_archive(): + download_model_files() + process_model_files() + upload_model_files_to_hf() +schedules.append(build_schedule_from_partitioned_job(icon_eu_hf_archive)) @partitioned_config(partitions_def=DailyPartitionsDefinition(start_date=dt.datetime(2021, 1, 1))) def ecmwf_daily_partitioned_config(start: dt.datetime, _end: dt.datetime): @@ -66,7 +65,7 @@ def ecmwf_daily_partitioned_config(start: dt.datetime, _end: dt.datetime): "date_to": start.strftime("%Y-%m-%d"), "source": "ecmwf-mars", "env_vars": ["ECMWF_API_URL", "ECMWF_API_KEY", "ECMWF_API_EMAIL"], - "docker_volumes": ['/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF:/tmp'] + "docker_volumes": ['}/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF:/tmp'] }}}} @job(config=ecmwf_daily_partitioned_config)