diff --git a/nwp/assets/dwd/archive_to_hf.py b/nwp/assets/dwd/archive_to_hf.py index 55f2101..1d57301 100644 --- a/nwp/assets/dwd/archive_to_hf.py +++ b/nwp/assets/dwd/archive_to_hf.py @@ -4,7 +4,7 @@ import xarray as xr import zarr -from dagster import asset # import the `dagster` library +from dagster import asset from huggingface_hub import HfApi from ocf_blosc2 import Blosc2 @@ -221,3 +221,4 @@ def upload_model_files_to_hf(config: IconConfig): repo_type="dataset", ) shutil.rmtree(config.folder) + diff --git a/nwp/assets/ecmwf/mars.py b/nwp/assets/ecmwf/mars.py index 4669d80..762524c 100644 --- a/nwp/assets/ecmwf/mars.py +++ b/nwp/assets/ecmwf/mars.py @@ -1,6 +1,5 @@ -from dagster_docker import execute_docker_container from dagster import Config, OpExecutionContext, op -import datetime as dt +from dagster_docker import execute_docker_container class NWPConsumerConfig(Config): @@ -8,26 +7,22 @@ class NWPConsumerConfig(Config): date_to: str source: str docker_volumes: list[str] + env_vars: list[str] @op def nwp_consumer_docker_op(context: OpExecutionContext, config: NWPConsumerConfig): execute_docker_container( - context=context, - image="ghcr.io/openclimatefix/nwp-consumer", - command=[ - "consume", f'--source={config.source}', - f'--from={config.date_from}', - f'--to={config.date_to}' - ], - env_vars=[ - "ECMWF_API_KEY", "ECMWF_API_URL", "ECMWF_API_EMAIL", - "CEDA_FTP_USER", "CEDA_FTP_PASS", - "METOFFICE_ORDER_ID", "METOFFICE_CLIENT_ID", "METOFFICE_CLIENT_SECRET", - "AWS_S3_BUCKET", "AWS_REGION", "AWS_ACCESS_KEY", "AWS_ACCESS_SECRET", - ], - container_kwargs={ - "volumes": docker_volumes - } + context=context, + image="ghcr.io/openclimatefix/nwp-consumer:latest", + command=[ + "consume", f'--source={config.source}', + f'--from={config.date_from}', + f'--to={config.date_to}' + ], + env_vars=config.env_vars, + container_kwargs={ + "volumes": config.docker_volumes + } ) pass diff --git a/nwp/jobs.py b/nwp/jobs.py index 9d0184a..56368f6 100644 --- a/nwp/jobs.py +++ b/nwp/jobs.py @@ -1,9 +1,19 @@ -from dagster import AssetSelection, ScheduleDefinition, define_asset_job, schedule, job, RunConfig, ScheduleEvaluationContext +import datetime as dt + +from dagster import ( + AssetSelection, + DailyPartitionsDefinition, + ScheduleDefinition, + build_schedule_from_partitioned_job, + define_asset_job, + job, + partitioned_config, +) from nwp.assets.dwd.common import IconConfig -from nwp.assets.ecmwf.mars import nwp_consumer_docker_op, NWPConsumerConfig +from nwp.assets.ecmwf.mars import nwp_consumer_docker_op -import datetime as dt +schedules = [] dwd_base_path = "/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/DWD" @@ -31,7 +41,7 @@ def build_config_on_runtime(model, run, delay=0): }} ) match (delay, r): - case (0, "00"): + case (0, "00"): schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="30 4 * * *")) case (0, "06"): schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="30 10 * * *")) @@ -47,30 +57,20 @@ def build_config_on_runtime(model, run, delay=0): schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="0 6 * * *")) case (1, "18"): schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="0 8 * * *")) - -@job -def nwp_consumer_docker_job(): - nwp_consumer_docker_op() -@schedule(job=nwp_consumer_docker_job, cron_schedule="0 13 * * *") -def ecmwf_daily_local_archive_schedule(context: ScheduleEvaluationContext): - scheduled_date = context.scheduled_execution_time.strftime("%Y-%m-%d") - return RunRequest( - run_key=None, - run_config={ - "ops": {"nwp_consumer_docker_op": NWPConsumerConfig( - date_from=scheduled_date, - date_to=scheduled_date, - source="ecmwf-mars", - docker_volumes=[ - '/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/raw:/tmp/raw', - '/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/zarr:/tmp/zarr', - '/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/tmp:/tmp/nwpc' - ] - )} - }, - tags={"date": scheduled_date}, - ) -schedules.append(ecmwf_daily_local_archive_schedule) +@partitioned_config(partitions_def=DailyPartitionsDefinition(start_date=dt.datetime(2021, 1, 1))) +def ecmwf_daily_partitioned_config(start: dt.datetime, _end: dt.datetime): + return {"ops": {"nwp_consumer_docker_op": {"config": { + "date_from": start.strftime("%Y-%m-%d"), + "date_to": start.strftime("%Y-%m-%d"), + "source": "ecmwf-mars", + "env_vars": ["ECMWF_API_URL", "ECMWF_API_KEY", "ECMWF_API_EMAIL"], + "docker_volumes": ['/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF:/tmp'] + }}}} + +@job(config=ecmwf_daily_partitioned_config) +def ecmwf_daily_local_archive(): + nwp_consumer_docker_op() +schedules.append(build_schedule_from_partitioned_job(ecmwf_daily_local_archive, hour_of_day=13))