Skip to content

Commit

Permalink
Add partitioned schedule config to ECMWF job
Browse files Browse the repository at this point in the history
  • Loading branch information
devsjc committed Sep 14, 2023
1 parent 66689ab commit 6981ff7
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 47 deletions.
3 changes: 2 additions & 1 deletion nwp/assets/dwd/archive_to_hf.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import xarray as xr
import zarr
from dagster import asset # import the `dagster` library
from dagster import asset
from huggingface_hub import HfApi
from ocf_blosc2 import Blosc2

Expand Down Expand Up @@ -221,3 +221,4 @@ def upload_model_files_to_hf(config: IconConfig):
repo_type="dataset",
)
shutil.rmtree(config.folder)

31 changes: 13 additions & 18 deletions nwp/assets/ecmwf/mars.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,28 @@
from dagster_docker import execute_docker_container
from dagster import Config, OpExecutionContext, op
import datetime as dt
from dagster_docker import execute_docker_container


class NWPConsumerConfig(Config):
date_from: str
date_to: str
source: str
docker_volumes: list[str]
env_vars: list[str]

@op
def nwp_consumer_docker_op(context: OpExecutionContext, config: NWPConsumerConfig):
execute_docker_container(
context=context,
image="ghcr.io/openclimatefix/nwp-consumer",
command=[
"consume", f'--source={config.source}',
f'--from={config.date_from}',
f'--to={config.date_to}'
],
env_vars=[
"ECMWF_API_KEY", "ECMWF_API_URL", "ECMWF_API_EMAIL",
"CEDA_FTP_USER", "CEDA_FTP_PASS",
"METOFFICE_ORDER_ID", "METOFFICE_CLIENT_ID", "METOFFICE_CLIENT_SECRET",
"AWS_S3_BUCKET", "AWS_REGION", "AWS_ACCESS_KEY", "AWS_ACCESS_SECRET",
],
container_kwargs={
"volumes": docker_volumes
}
context=context,
image="ghcr.io/openclimatefix/nwp-consumer:latest",
command=[
"consume", f'--source={config.source}',
f'--from={config.date_from}',
f'--to={config.date_to}'
],
env_vars=config.env_vars,
container_kwargs={
"volumes": config.docker_volumes
}
)

pass
Expand Down
56 changes: 28 additions & 28 deletions nwp/jobs.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
from dagster import AssetSelection, ScheduleDefinition, define_asset_job, schedule, job, RunConfig, ScheduleEvaluationContext
import datetime as dt

from dagster import (
AssetSelection,
DailyPartitionsDefinition,
ScheduleDefinition,
build_schedule_from_partitioned_job,
define_asset_job,
job,
partitioned_config,
)

from nwp.assets.dwd.common import IconConfig
from nwp.assets.ecmwf.mars import nwp_consumer_docker_op, NWPConsumerConfig
from nwp.assets.ecmwf.mars import nwp_consumer_docker_op

import datetime as dt
schedules = []

dwd_base_path = "/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/DWD"

Expand Down Expand Up @@ -31,7 +41,7 @@ def build_config_on_runtime(model, run, delay=0):
}}
)
match (delay, r):
case (0, "00"):
case (0, "00"):
schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="30 4 * * *"))
case (0, "06"):
schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="30 10 * * *"))
Expand All @@ -47,30 +57,20 @@ def build_config_on_runtime(model, run, delay=0):
schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="0 6 * * *"))
case (1, "18"):
schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="0 8 * * *"))

@job
def nwp_consumer_docker_job():
nwp_consumer_docker_op()

@schedule(job=nwp_consumer_docker_job, cron_schedule="0 13 * * *")
def ecmwf_daily_local_archive_schedule(context: ScheduleEvaluationContext):
scheduled_date = context.scheduled_execution_time.strftime("%Y-%m-%d")
return RunRequest(
run_key=None,
run_config={
"ops": {"nwp_consumer_docker_op": NWPConsumerConfig(
date_from=scheduled_date,
date_to=scheduled_date,
source="ecmwf-mars",
docker_volumes=[
'/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/raw:/tmp/raw',
'/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/zarr:/tmp/zarr',
'/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/tmp:/tmp/nwpc'
]
)}
},
tags={"date": scheduled_date},
)

schedules.append(ecmwf_daily_local_archive_schedule)
@partitioned_config(partitions_def=DailyPartitionsDefinition(start_date=dt.datetime(2021, 1, 1)))
def ecmwf_daily_partitioned_config(start: dt.datetime, _end: dt.datetime):
return {"ops": {"nwp_consumer_docker_op": {"config": {
"date_from": start.strftime("%Y-%m-%d"),
"date_to": start.strftime("%Y-%m-%d"),
"source": "ecmwf-mars",
"env_vars": ["ECMWF_API_URL", "ECMWF_API_KEY", "ECMWF_API_EMAIL"],
"docker_volumes": ['/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF:/tmp']
}}}}

@job(config=ecmwf_daily_partitioned_config)
def ecmwf_daily_local_archive():
nwp_consumer_docker_op()

schedules.append(build_schedule_from_partitioned_job(ecmwf_daily_local_archive, hour_of_day=13))

0 comments on commit 6981ff7

Please sign in to comment.