Skip to content

Commit

Permalink
Create partitioned, backfillable versions of ICON today runs
Browse files Browse the repository at this point in the history
  • Loading branch information
devsjc committed Sep 14, 2023
1 parent 9b5cc8f commit f9e03c2
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 51 deletions.
2 changes: 1 addition & 1 deletion dags_tests/compile_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@

def test_compiles():
job_names = [d.name for d in list(defs.get_all_job_defs())]
assert len(job_names) == 18
assert len(job_names) == 3
2 changes: 0 additions & 2 deletions nwp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@

from nwp import assets, jobs


all_assets = load_assets_from_modules([assets])

defs = Definitions(
assets=all_assets,
schedules=jobs.schedules,
)
8 changes: 4 additions & 4 deletions nwp/assets/dwd/archive_to_hf.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import xarray as xr
import zarr
from dagster import asset
from dagster import op
from huggingface_hub import HfApi
from ocf_blosc2 import Blosc2

Expand Down Expand Up @@ -36,7 +36,7 @@ def does_files_exist(config, now_datetime):
return path_in_repo in existing_files


@asset
@op
def download_model_files(config: IconConfig):
model = config.model
run = config.run
Expand Down Expand Up @@ -74,7 +74,7 @@ def download_model_files(config: IconConfig):
)


@asset(deps=[download_model_files])
@op
def process_model_files(
config: IconConfig
):
Expand Down Expand Up @@ -199,7 +199,7 @@ def process_model_files(
ds.chunk(chunking).to_zarr(store, encoding=encoding, compute=True)


@asset(deps=[process_model_files])
@op
def upload_model_files_to_hf(config: IconConfig):
_, _, now = get_run(config.run, delay=config.delay)
if does_files_exist(config, now):
Expand Down
11 changes: 11 additions & 0 deletions nwp/assets/dwd/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,17 @@

import requests

dwd_base_path = "/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/DWD"

def build_config(model, run, delay=0):
config = IconConfig(model=model,
run=run,
delay=delay,
folder=f"{dwd_base_path}/{'ICON_Global' if model == 'global' else 'ICON_EU'}/{run}",
zarr_path=f"{dwd_base_path}/{'ICON_Global' if model == 'global' else 'ICON_EU'}/{run}/{run}.zarr.zip")
config_dict = {"delay": config.delay, "folder": config.folder, "model": config.model, "run": config.run,
"zarr_path": config.zarr_path}
return config_dict

def get_run(run: str, delay: int = 0):
"""Get run name.
Expand Down
87 changes: 43 additions & 44 deletions nwp/jobs.py
Original file line number Diff line number Diff line change
@@ -1,63 +1,62 @@
import datetime as dt

from dagster import (
AssetSelection,
DailyPartitionsDefinition,
ScheduleDefinition,
MultiPartitionsDefinition,
StaticPartitionsDefinition,
build_schedule_from_partitioned_job,
define_asset_job,
job,
partitioned_config,
)

from nwp.assets.dwd.archive_to_hf import (
download_model_files,
process_model_files,
upload_model_files_to_hf,
)
from nwp.assets.dwd.common import IconConfig
from nwp.assets.dwd.utils import build_config
from nwp.assets.ecmwf.mars import nwp_consumer_docker_op

schedules = []

dwd_base_path = "/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/DWD"
@partitioned_config(partitions_def=MultiPartitionsDefinition({
"date": DailyPartitionsDefinition(start_date=dt.datetime(2023, 9, 10)),
"run": StaticPartitionsDefinition(["00", "06", "12", "18"]),
}))
def icon_global_dailyrun_partitioned_config(partition_key):
return {"ops": {
"download_model_files": {"config": build_config("global", partition_key["run"])},
"process_model_files": {"config": build_config("global", partition_key["run"])},
"upload_model_files_to_hf": {"config": build_config("global", partition_key["run"])},
}}

def build_config_on_runtime(model, run, delay=0):
config = IconConfig(model=model,
run=run,
delay=delay,
folder=f"{dwd_base_path}/{'ICON_Global' if model == 'global' else 'ICON_EU'}/{run}",
zarr_path=f"{dwd_base_path}/{'ICON_Global' if model == 'global' else 'ICON_EU'}/{run}/{run}.zarr.zip")
config_dict = {"delay": config.delay, "folder": config.folder, "model": config.model, "run": config.run,
"zarr_path": config.zarr_path}
return config_dict
@job(config=icon_global_dailyrun_partitioned_config)
def icon_global_hf_archive():
download_model_files()
process_model_files()
upload_model_files_to_hf()

schedules = []
for r in ["00", "06", "12", "18"]:
for model in ["global", "eu"]:
for delay in [0, 1]:
asset_job = define_asset_job(
name=f"download_{model}_run_{r}_{'today' if delay == 0 else 'yesterday'}",
selection=AssetSelection.all(),
config={'ops': {
"download_model_files": {"config": build_config_on_runtime(model, r, delay)},
"process_model_files": {"config": build_config_on_runtime(model, r, delay)},
"upload_model_files_to_hf": {"config": build_config_on_runtime(model, r, delay)},
}}
)
match (delay, r):
case (0, "00"):
schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="30 4 * * *"))
case (0, "06"):
schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="30 10 * * *"))
case (0, "12"):
schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="30 16 * * *"))
case (0, "18"):
schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="30 22 * * *"))
case (1, "00"):
schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="1 0 * * *"))
case (1, "06"):
schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="0 2 * * *"))
case (1, "12"):
schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="0 6 * * *"))
case (1, "18"):
schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="0 8 * * *"))
schedules.append(build_schedule_from_partitioned_job(icon_global_hf_archive))

@partitioned_config(partitions_def=MultiPartitionsDefinition({
"date": DailyPartitionsDefinition(start_date=dt.datetime(2023, 9, 10)),
"run": StaticPartitionsDefinition(["00", "06", "12", "18"]),
}))
def icon_eu_dailyrun_partitioned_config(partition_key):
return {"ops": {
"download_model_files": {"config": build_config("eu", partition_key["run"])},
"process_model_files": {"config": build_config("eu", partition_key["run"])},
"upload_model_files_to_hf": {"config": build_config("eu", partition_key["run"])},
}}

@job(config=icon_eu_dailyrun_partitioned_config)
def icon_eu_hf_archive():
download_model_files()
process_model_files()
upload_model_files_to_hf()

schedules.append(build_schedule_from_partitioned_job(icon_eu_hf_archive))

@partitioned_config(partitions_def=DailyPartitionsDefinition(start_date=dt.datetime(2021, 1, 1)))
def ecmwf_daily_partitioned_config(start: dt.datetime, _end: dt.datetime):
Expand All @@ -66,7 +65,7 @@ def ecmwf_daily_partitioned_config(start: dt.datetime, _end: dt.datetime):
"date_to": start.strftime("%Y-%m-%d"),
"source": "ecmwf-mars",
"env_vars": ["ECMWF_API_URL", "ECMWF_API_KEY", "ECMWF_API_EMAIL"],
"docker_volumes": ['/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF:/tmp']
"docker_volumes": ['}/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF:/tmp']
}}}}

@job(config=ecmwf_daily_partitioned_config)
Expand Down

0 comments on commit f9e03c2

Please sign in to comment.