Skip to content

Commit

Permalink
Add malta job, fix context manager (#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
devsjc authored Oct 6, 2023
1 parent 4415a88 commit 65f7395
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 76 deletions.
2 changes: 1 addition & 1 deletion dags_tests/compile_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@

def test_compiles():
job_names = [d.name for d in list(defs.get_all_job_defs())]
assert len(job_names) == 19
assert len(job_names) == 20
13 changes: 8 additions & 5 deletions nwp/assets/ecmwf/mars.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,25 @@
import nwp_consumer.cmd.main as consumer
import contextlib
import os

import nwp_consumer.cmd.main as consumer
from dagster import Config, OpExecutionContext, op
from dagster_docker import execute_docker_container


@contextlib.contextmanager
def modify_env(vars: dict[str, str]):
"""Temporarily modify the environment."""
oldvars = os.environ.copy()
for var in vars:
oldval = os.environ.get(var)
newval = vars[var]
os.environ[var] = newval
vars[var] = oldval
try:
yield
finally:
for var in vars:
os.environ[var] = oldval
if var in oldvars:
os.environ[var] = oldvars[var]
else:
del os.environ[var]

class NWPConsumerConfig(Config):
"""Configuration for the NWP consumer."""
Expand Down
182 changes: 112 additions & 70 deletions nwp/jobs.py
Original file line number Diff line number Diff line change
@@ -1,109 +1,151 @@
import datetime as dt
import json
from collections.abc import Callable
from typing import Any

from dagster import (
AssetSelection,
ScheduleDefinition,
build_schedule_from_partitioned_job,
daily_partitioned_config,
define_asset_job,
job,
)
import dagster

from nwp.assets.dwd.common import IconConfig
from nwp.assets.ecmwf.mars import (
NWPConsumerConfig,
nwp_consumer_convert_op,
nwp_consumer_download_op,
nwp_consumer_download_op
)

schedules = []
jobs: list[dagster.JobDefinition] = []
schedules: list[dagster.ScheduleDefinition] = []

# --- DWD ICON jobs and schedules --------------------------------------

dwd_base_path = "/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/DWD"

def build_config_on_runtime(model: str, run: str, delay: int = 0) -> dict:
"""Create a config dict for the DWD ICON model."""
config = IconConfig(model=model,
run=run,
delay=delay,
folder=f"{dwd_base_path}/{'ICON_Global' if model == 'global' else 'ICON_EU'}/{run}",
zarr_path=f"{dwd_base_path}/{'ICON_Global' if model == 'global' else 'ICON_EU'}/{run}/{run}.zarr.zip")
config_dict = {"delay": config.delay, "folder": config.folder, "model": config.model, "run": config.run,
"zarr_path": config.zarr_path}
config = IconConfig(
model=model,
run=run,
delay=delay,
folder=f"{dwd_base_path}/{'ICON_Global' if model == 'global' else 'ICON_EU'}/{run}",
zarr_path=f"{dwd_base_path}/{'ICON_Global' if model == 'global' else 'ICON_EU'}/{run}/{run}.zarr.zip"
)
config_dict = {
"delay": config.delay,
"folder": config.folder,
"model": config.model,
"run": config.run,
"zarr_path": config.zarr_path
}
return config_dict

schedules = []
for r in ["00", "06", "12", "18"]:
for model in ["global", "eu"]:
for delay in [0, 1]:
asset_job = define_asset_job(
asset_job = dagster.define_asset_job(
name=f"download_{model}_run_{r}_{'today' if delay == 0 else 'yesterday'}",
selection=AssetSelection.all(),
selection=dagster.AssetSelection.all(),
config={'ops': {
"download_model_files": {"config": build_config_on_runtime(model, r, delay)},
"process_model_files": {"config": build_config_on_runtime(model, r, delay)},
"upload_model_files_to_hf": {"config": build_config_on_runtime(model, r, delay)},
"process_model_files": {
"config": build_config_on_runtime(model, r, delay)},
"upload_model_files_to_hf": {
"config": build_config_on_runtime(model, r, delay)},
}}
)
match (delay, r):
case (0, "00"):
schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="30 4 * * *"))
schedules.append(
dagster.ScheduleDefinition(job=asset_job, cron_schedule="30 4 * * *"))
case (0, "06"):
schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="30 10 * * *"))
schedules.append(
dagster.ScheduleDefinition(job=asset_job, cron_schedule="30 10 * * *"))
case (0, "12"):
schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="30 16 * * *"))
schedules.append(
dagster.ScheduleDefinition(job=asset_job, cron_schedule="30 16 * * *"))
case (0, "18"):
schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="30 22 * * *"))
schedules.append(
dagster.ScheduleDefinition(job=asset_job, cron_schedule="30 22 * * *"))
case (1, "00"):
schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="1 0 * * *"))
schedules.append(
dagster.ScheduleDefinition(job=asset_job, cron_schedule="1 0 * * *"))
case (1, "06"):
schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="0 2 * * *"))
schedules.append(
dagster.ScheduleDefinition(job=asset_job, cron_schedule="0 2 * * *"))
case (1, "12"):
schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="0 6 * * *"))
schedules.append(
dagster.ScheduleDefinition(job=asset_job, cron_schedule="0 6 * * *"))
case (1, "18"):
schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="0 8 * * *"))


@daily_partitioned_config(start_date=dt.datetime(2020, 1, 1))
def ecmwf_uk_daily_partitioned_config(start: dt.datetime, _end: dt.datetime) -> dict:
"""Create a config dict for the nwp-consumer for uk data from ECMWF."""
config: NWPConsumerConfig = NWPConsumerConfig(
date_from=start.strftime("%Y-%m-%d"),
date_to=start.strftime("%Y-%m-%d"),
source="ecmwf-mars",
env_overrides={"ECMWF_AREA": "uk"},
zarr_dir='/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/uk/zarr',
raw_dir='/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/uk/raw',
schedules.append(
dagster.ScheduleDefinition(job=asset_job, cron_schedule="0 8 * * *"))


# --- NWP Consumer jobs and schedules --------------------------------------

class NWPConsumerDagDefinition:
"""A class to define the NWPConsumerDagDefinition."""

def __init__(
self, area: str, source: str, storage_path: str | None = None
) -> "NWPConsumerDagDefinition":
"""Create a NWPConsumerDagDefinition."""
self.area = area
self.source = source
self.storage_path = \
storage_path or \
f'/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/{self.area}'

nwp_consumer_jobs: dict[str, NWPConsumerDagDefinition] = {
"uk": NWPConsumerDagDefinition(area="uk", source="ecmwf-mars"),
"india": NWPConsumerDagDefinition(area="nw-india", source="ecmwf-mars"),
"malta": NWPConsumerDagDefinition(area="malta", source="ecmwf-mars")
}


def gen_partitioned_config_func(dagdef: NWPConsumerDagDefinition) \
-> Callable[[str], dict[str, Any]]:
"""Create a config dict from a partition key."""

def partitioned_config_func(partition_key: str) -> dict[str, Any]:
time_window = partitions_def.time_window_for_partition_key(partition_key)
consumer_config = NWPConsumerConfig(
date_from=time_window.start.strftime("%Y-%m-%d"),
date_to=time_window.start.strftime("%Y-%m-%d"),
source=dagdef.source,
env_overrides={"ECMWF_AREA": dagdef.area},
zarr_dir=f"{dagdef.storage_path}/zarr",
raw_dir=f"{dagdef.storage_path}/raw",
)
return {
"ops": {"nwp_consumer_download_op": {
"config": json.loads(consumer_config.json()),
}}
}

return partitioned_config_func

# Define the jobs and schedules from the above dict
for loc, dagdef in nwp_consumer_jobs.items():

partitions_def = dagster.DailyPartitionsDefinition(start_date=dt.datetime(2020, 1, 1))

config = dagster.PartitionedConfig(
partitions_def=partitions_def,
run_config_for_partition_key_fn=gen_partitioned_config_func(dagdef),
)
return {"ops": {
"nwp_consumer_download_op": {"config": json.loads(config.json())},
}}

@job(config=ecmwf_uk_daily_partitioned_config, name="ecmwf_daily_local_archive")
def ecmwf_uk_daily_local_archive() -> None:
"""Download and convert ECMWF data for the UK."""
nwp_consumer_convert_op(nwp_consumer_download_op())

@daily_partitioned_config(start_date=dt.datetime(2020, 1, 1))
def ecmwf_india_daily_partitioned_config(start: dt.datetime, _end: dt.datetime) -> dict:
"""Create a config dict for the nwp-consumer for india data from ECMWF."""
config: NWPConsumerConfig = NWPConsumerConfig(
date_from=start.strftime("%Y-%m-%d"),
date_to=start.strftime("%Y-%m-%d"),
source="ecmwf-mars",
env_overrides={"ECMWF_AREA": "nw-india"},
zarr_dir='/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/india/zarr',
raw_dir='/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/india/raw',

@dagster.job(
name="ecmwf_daily_local_archive" if loc=="uk" else f"ecmwf_{loc}_daily_archive",
config=config,
tags={"area": loc},
)
return {"ops": {
"nwp_consumer_download_op": {"config": json.loads(config.json())},
}}
def ecmwf_daily_partitioned_archive() -> None:
"""Download and convert NWP data using the consumer according to input config."""
nwp_consumer_convert_op(nwp_consumer_download_op())

@job(config=ecmwf_india_daily_partitioned_config)
def ecmwf_india_daily_local_archive() -> None:
"""Download and convert ECMWF data for India."""
nwp_consumer_convert_op(nwp_consumer_download_op())
schedule = dagster.build_schedule_from_partitioned_job(
job=ecmwf_daily_partitioned_archive,
hour_of_day=20
)

schedules.append(build_schedule_from_partitioned_job(ecmwf_uk_daily_local_archive, hour_of_day=13))
schedules.append(build_schedule_from_partitioned_job(ecmwf_india_daily_local_archive, hour_of_day=14))
jobs.append(ecmwf_daily_partitioned_archive)
schedules.append(schedule)

0 comments on commit 65f7395

Please sign in to comment.