Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add malta job, fix context manager #18

Merged
merged 2 commits into from
Oct 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dags_tests/compile_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@

def test_compiles():
job_names = [d.name for d in list(defs.get_all_job_defs())]
assert len(job_names) == 19
assert len(job_names) == 20
13 changes: 8 additions & 5 deletions nwp/assets/ecmwf/mars.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,25 @@
import nwp_consumer.cmd.main as consumer
import contextlib
import os

import nwp_consumer.cmd.main as consumer
from dagster import Config, OpExecutionContext, op
from dagster_docker import execute_docker_container


@contextlib.contextmanager
def modify_env(vars: dict[str, str]):
"""Temporarily modify the environment."""
oldvars = os.environ.copy()
for var in vars:
oldval = os.environ.get(var)
newval = vars[var]
os.environ[var] = newval
vars[var] = oldval
try:
yield
finally:
for var in vars:
os.environ[var] = oldval
if var in oldvars:
os.environ[var] = oldvars[var]
else:
del os.environ[var]

class NWPConsumerConfig(Config):
"""Configuration for the NWP consumer."""
Expand Down
182 changes: 112 additions & 70 deletions nwp/jobs.py
Original file line number Diff line number Diff line change
@@ -1,109 +1,151 @@
import datetime as dt
import json
from collections.abc import Callable
from typing import Any

from dagster import (
AssetSelection,
ScheduleDefinition,
build_schedule_from_partitioned_job,
daily_partitioned_config,
define_asset_job,
job,
)
import dagster

from nwp.assets.dwd.common import IconConfig
from nwp.assets.ecmwf.mars import (
NWPConsumerConfig,
nwp_consumer_convert_op,
nwp_consumer_download_op,
nwp_consumer_download_op
)

schedules = []
jobs: list[dagster.JobDefinition] = []
schedules: list[dagster.ScheduleDefinition] = []

# --- DWD ICON jobs and schedules --------------------------------------

dwd_base_path = "/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/DWD"

def build_config_on_runtime(model: str, run: str, delay: int = 0) -> dict:
"""Create a config dict for the DWD ICON model."""
config = IconConfig(model=model,
run=run,
delay=delay,
folder=f"{dwd_base_path}/{'ICON_Global' if model == 'global' else 'ICON_EU'}/{run}",
zarr_path=f"{dwd_base_path}/{'ICON_Global' if model == 'global' else 'ICON_EU'}/{run}/{run}.zarr.zip")
config_dict = {"delay": config.delay, "folder": config.folder, "model": config.model, "run": config.run,
"zarr_path": config.zarr_path}
config = IconConfig(
model=model,
run=run,
delay=delay,
folder=f"{dwd_base_path}/{'ICON_Global' if model == 'global' else 'ICON_EU'}/{run}",
zarr_path=f"{dwd_base_path}/{'ICON_Global' if model == 'global' else 'ICON_EU'}/{run}/{run}.zarr.zip"
)
config_dict = {
"delay": config.delay,
"folder": config.folder,
"model": config.model,
"run": config.run,
"zarr_path": config.zarr_path
}
return config_dict

schedules = []
for r in ["00", "06", "12", "18"]:
for model in ["global", "eu"]:
for delay in [0, 1]:
asset_job = define_asset_job(
asset_job = dagster.define_asset_job(
name=f"download_{model}_run_{r}_{'today' if delay == 0 else 'yesterday'}",
selection=AssetSelection.all(),
selection=dagster.AssetSelection.all(),
config={'ops': {
"download_model_files": {"config": build_config_on_runtime(model, r, delay)},
"process_model_files": {"config": build_config_on_runtime(model, r, delay)},
"upload_model_files_to_hf": {"config": build_config_on_runtime(model, r, delay)},
"process_model_files": {
"config": build_config_on_runtime(model, r, delay)},
"upload_model_files_to_hf": {
"config": build_config_on_runtime(model, r, delay)},
}}
)
match (delay, r):
case (0, "00"):
schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="30 4 * * *"))
schedules.append(
dagster.ScheduleDefinition(job=asset_job, cron_schedule="30 4 * * *"))
case (0, "06"):
schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="30 10 * * *"))
schedules.append(
dagster.ScheduleDefinition(job=asset_job, cron_schedule="30 10 * * *"))
case (0, "12"):
schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="30 16 * * *"))
schedules.append(
dagster.ScheduleDefinition(job=asset_job, cron_schedule="30 16 * * *"))
case (0, "18"):
schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="30 22 * * *"))
schedules.append(
dagster.ScheduleDefinition(job=asset_job, cron_schedule="30 22 * * *"))
case (1, "00"):
schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="1 0 * * *"))
schedules.append(
dagster.ScheduleDefinition(job=asset_job, cron_schedule="1 0 * * *"))
case (1, "06"):
schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="0 2 * * *"))
schedules.append(
dagster.ScheduleDefinition(job=asset_job, cron_schedule="0 2 * * *"))
case (1, "12"):
schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="0 6 * * *"))
schedules.append(
dagster.ScheduleDefinition(job=asset_job, cron_schedule="0 6 * * *"))
case (1, "18"):
schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="0 8 * * *"))


@daily_partitioned_config(start_date=dt.datetime(2020, 1, 1))
def ecmwf_uk_daily_partitioned_config(start: dt.datetime, _end: dt.datetime) -> dict:
"""Create a config dict for the nwp-consumer for uk data from ECMWF."""
config: NWPConsumerConfig = NWPConsumerConfig(
date_from=start.strftime("%Y-%m-%d"),
date_to=start.strftime("%Y-%m-%d"),
source="ecmwf-mars",
env_overrides={"ECMWF_AREA": "uk"},
zarr_dir='/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/uk/zarr',
raw_dir='/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/uk/raw',
schedules.append(
dagster.ScheduleDefinition(job=asset_job, cron_schedule="0 8 * * *"))


# --- NWP Consumer jobs and schedules --------------------------------------

class NWPConsumerDagDefinition:
"""A class to define the NWPConsumerDagDefinition."""

def __init__(
self, area: str, source: str, storage_path: str | None = None
) -> "NWPConsumerDagDefinition":
"""Create a NWPConsumerDagDefinition."""
self.area = area
self.source = source
self.storage_path = \
storage_path or \
f'/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/{self.area}'

nwp_consumer_jobs: dict[str, NWPConsumerDagDefinition] = {
"uk": NWPConsumerDagDefinition(area="uk", source="ecmwf-mars"),
"india": NWPConsumerDagDefinition(area="nw-india", source="ecmwf-mars"),
"malta": NWPConsumerDagDefinition(area="malta", source="ecmwf-mars")
}


def gen_partitioned_config_func(dagdef: NWPConsumerDagDefinition) \
-> Callable[[str], dict[str, Any]]:
"""Create a config dict from a partition key."""

def partitioned_config_func(partition_key: str) -> dict[str, Any]:
time_window = partitions_def.time_window_for_partition_key(partition_key)
consumer_config = NWPConsumerConfig(
date_from=time_window.start.strftime("%Y-%m-%d"),
date_to=time_window.start.strftime("%Y-%m-%d"),
source=dagdef.source,
env_overrides={"ECMWF_AREA": dagdef.area},
zarr_dir=f"{dagdef.storage_path}/zarr",
raw_dir=f"{dagdef.storage_path}/raw",
)
return {
"ops": {"nwp_consumer_download_op": {
"config": json.loads(consumer_config.json()),
}}
}

return partitioned_config_func

# Define the jobs and schedules from the above dict
for loc, dagdef in nwp_consumer_jobs.items():

partitions_def = dagster.DailyPartitionsDefinition(start_date=dt.datetime(2020, 1, 1))

config = dagster.PartitionedConfig(
partitions_def=partitions_def,
run_config_for_partition_key_fn=gen_partitioned_config_func(dagdef),
)
return {"ops": {
"nwp_consumer_download_op": {"config": json.loads(config.json())},
}}

@job(config=ecmwf_uk_daily_partitioned_config, name="ecmwf_daily_local_archive")
def ecmwf_uk_daily_local_archive() -> None:
"""Download and convert ECMWF data for the UK."""
nwp_consumer_convert_op(nwp_consumer_download_op())

@daily_partitioned_config(start_date=dt.datetime(2020, 1, 1))
def ecmwf_india_daily_partitioned_config(start: dt.datetime, _end: dt.datetime) -> dict:
"""Create a config dict for the nwp-consumer for india data from ECMWF."""
config: NWPConsumerConfig = NWPConsumerConfig(
date_from=start.strftime("%Y-%m-%d"),
date_to=start.strftime("%Y-%m-%d"),
source="ecmwf-mars",
env_overrides={"ECMWF_AREA": "nw-india"},
zarr_dir='/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/india/zarr',
raw_dir='/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/india/raw',

@dagster.job(
name="ecmwf_daily_local_archive" if loc=="uk" else f"ecmwf_{loc}_daily_archive",
config=config,
tags={"area": loc},
)
return {"ops": {
"nwp_consumer_download_op": {"config": json.loads(config.json())},
}}
def ecmwf_daily_partitioned_archive() -> None:
"""Download and convert NWP data using the consumer according to input config."""
nwp_consumer_convert_op(nwp_consumer_download_op())

@job(config=ecmwf_india_daily_partitioned_config)
def ecmwf_india_daily_local_archive() -> None:
"""Download and convert ECMWF data for India."""
nwp_consumer_convert_op(nwp_consumer_download_op())
schedule = dagster.build_schedule_from_partitioned_job(
job=ecmwf_daily_partitioned_archive,
hour_of_day=20
)

schedules.append(build_schedule_from_partitioned_job(ecmwf_uk_daily_local_archive, hour_of_day=13))
schedules.append(build_schedule_from_partitioned_job(ecmwf_india_daily_local_archive, hour_of_day=14))
jobs.append(ecmwf_daily_partitioned_archive)
schedules.append(schedule)