diff --git a/nwp/assets/ecmwf/mars.py b/nwp/assets/ecmwf/mars.py index 762524c..e641132 100644 --- a/nwp/assets/ecmwf/mars.py +++ b/nwp/assets/ecmwf/mars.py @@ -1,4 +1,5 @@ -from dagster import Config, OpExecutionContext, op +import subprocess +from dagster import Config, OpExecutionContext, op, graph from dagster_docker import execute_docker_container @@ -7,6 +8,8 @@ class NWPConsumerConfig(Config): date_to: str source: str docker_volumes: list[str] + raw_dir: str + zarr_dir: str env_vars: list[str] @op @@ -27,3 +30,32 @@ def nwp_consumer_docker_op(context: OpExecutionContext, config: NWPConsumerConfi pass +@op +def nwp_consumer_download_op(context: OpExecutionContext, config: NWPConsumerConfig): + process = subprocess.run( + ["nwp-consumer", "download", + f'--source={config.source}', f'--from={config.date_from}', f'--to={config.date_to}', + f'--rdir={config.raw_dir}', f'--zdir={config.zarr_dir}'], + stdout = subprocess.PIPE, + stderr = subprocess.PIPE, + text=True + ) + code = process.returncode + print(process.stdout) + print(process.stderr) + return config + +@op +def nwp_consumer_convert_op(context: OpExecutionContext, downloadedConfig: NWPConsumerConfig): + process = subprocess.run( + ["nwp-consumer", "convert", + f'--source={downloadedConfig.source}', f'--from={downloadedConfig.date_from}', + f'--to={downloadedConfig.date_to}', + f'--rdir={downloadedConfig.raw_dir}', f'--zdir={downloadedConfig.zarr_dir}'], + stdout = subprocess.PIPE, + stderr = subprocess.PIPE, + text=True + ) + print(process.stdout) + print(process.stderr) + diff --git a/nwp/jobs.py b/nwp/jobs.py index 890d852..1c47058 100644 --- a/nwp/jobs.py +++ b/nwp/jobs.py @@ -1,24 +1,28 @@ import datetime as dt +import json from dagster import ( - daily_partitioned_config, AssetSelection, - DailyPartitionsDefinition, ScheduleDefinition, build_schedule_from_partitioned_job, + daily_partitioned_config, define_asset_job, job, - partitioned_config, ) from nwp.assets.dwd.common import IconConfig -from nwp.assets.ecmwf.mars import nwp_consumer_docker_op +from nwp.assets.ecmwf.mars import ( + NWPConsumerConfig, + nwp_consumer_convert_op, + nwp_consumer_download_op, +) schedules = [] dwd_base_path = "/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/DWD" -def build_config_on_runtime(model, run, delay=0): +def build_config_on_runtime(model: str, run: str, delay: int = 0) -> dict: + """Create a config dict for the DWD ICON model.""" config = IconConfig(model=model, run=run, delay=delay, @@ -61,17 +65,37 @@ def build_config_on_runtime(model, run, delay=0): @daily_partitioned_config(start_date=dt.datetime(2021, 1, 1)) -def ecmwf_daily_partitioned_config(start: dt.datetime, _end: dt.datetime): - return {"ops": {"nwp_consumer_docker_op": {"config": { - "date_from": start.strftime("%Y-%m-%d"), - "date_to": start.strftime("%Y-%m-%d"), - "source": "ecmwf-mars", - "env_vars": ["ECMWF_API_URL", "ECMWF_API_KEY", "ECMWF_API_EMAIL"], - "docker_volumes": ['/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF:/tmp'] - }}}} +def ecmwf_daily_partitioned_config_docker(start: dt.datetime, _end: dt.datetime): + config: NWPConsumerConfig = NWPConsumerConfig( + date_from=start.strftime("%Y-%m-%d"), + date_to=start.strftime("%Y-%m-%d"), + source="ecmwf-mars", + env_vars=["ECMWF_API_URL", "ECMWF_API_KEY", "ECMWF_API_EMAIL"], + docker_volumes=['/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF:/tmp'], + zarr_dir='/tmp/zarr', + raw_dir='/tmp/raw', + ) + return {"ops": { + "nwp_consumer_docker_op": {"config": json.loads(config.json())}, + }} + +@daily_partitioned_config(start_date=dt.datetime(2021, 1, 1)) +def ecmwf_daily_partitioned_config(start: dt.datetime, _end: dt.datetime): + config: NWPConsumerConfig = NWPConsumerConfig( + date_from=start.strftime("%Y-%m-%d"), + date_to=start.strftime("%Y-%m-%d"), + source="ecmwf-mars", + env_vars=["ECMWF_API_URL", "ECMWF_API_KEY", "ECMWF_API_EMAIL"], + zarr_dir='/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/zarr', + raw_dir='/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/raw', + ) + return {"ops": { + "nwp_consumer_download_op": {"config": json.loads(config.json())}, + }} @job(config=ecmwf_daily_partitioned_config) def ecmwf_daily_local_archive(): - nwp_consumer_docker_op() + nwp_consumer_convert_op(nwp_consumer_download_op()) schedules.append(build_schedule_from_partitioned_job(ecmwf_daily_local_archive, hour_of_day=13)) + diff --git a/pyproject.toml b/pyproject.toml index da82544..4e8a4f3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,12 +25,13 @@ dependencies = [ "dagster-docker == 0.20.11", "huggingface-hub == 0.16.4", "numpy >= 1.23.0", + "nwp-consumer >= 0.1.13", "ocf-blosc2 == 0.0.3", "pathlib == 1.0.1", "requests >= 2.28.0", "xarray >= 2022.3.0", "zarr >= 2.13.3", - "satip >= 2.11.10", + "satip", ] [project.optional-dependencies]