From 9850ac12848890115b643adbde573d29b376b1da Mon Sep 17 00:00:00 2001 From: devsjc Date: Fri, 8 Sep 2023 11:31:47 +0100 Subject: [PATCH 01/11] Add ecmwf daily scheduled job --- nwp/jobs.py | 33 ++++++++++++++++++++++----------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/nwp/jobs.py b/nwp/jobs.py index 6ba61c2..bd2afcf 100644 --- a/nwp/jobs.py +++ b/nwp/jobs.py @@ -1,4 +1,4 @@ -from dagster import AssetSelection, ScheduleDefinition, define_asset_job, schedule, job, RunConfig +from dagster import AssetSelection, ScheduleDefinition, define_asset_job, schedule, job, RunConfig, ScheduleEvaluationContext from nwp.assets.dwd.common import IconConfig from nwp.assets.ecmwf.mars import nwp_consumer_docker_op, NWPConsumerConfig @@ -6,15 +6,15 @@ import datetime as dt -base_path = "/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/DWD" +base_path = "/mnt/storage_b/data/ocf/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/DWD" def build_config_on_runtime(model, run, delay=0): config = IconConfig(model=model, run=run, delay=delay, - folder=f"{base_path}/{'ICON_Global' if model == 'global' else 'ICON_EU'}/{run}", - zarr_path=f"{base_path}/{'ICON_Global' if model == 'global' else 'ICON_EU'}/{run}/{run}.zarr.zip") + folder=f"{base_path}/ICON_Global/{run}", + zarr_path=f"{base_path}/ICON_Global/{run}/{run}.zarr.zip") config_dict = {"delay": config.delay, "folder": config.folder, "model": config.model, "run": config.run, "zarr_path": config.zarr_path} return config_dict @@ -54,13 +54,24 @@ def build_config_on_runtime(model, run, delay=0): asset_jobs.append(asset_job) -@job(config=RunConfig( - ops={"nwp_consumer_docker_op": NWPConsumerConfig( - date_from="2021-01-01", - date_to="2021-01-01", - source="ecmwf-mars" - )} - )) +@job def get_ecmwf_data(): nwp_consumer_docker_op() +@schedule(job=get_ecmwf_data, cron_schedule="0 13 * * *") +def ecmwf_daily_schedule(context: ScheduleEvaluationContext): + scheduled_date = context.scheduled_execution_time.strftime("%Y-%m-%d") + return RunRequest( + run_key=None, + run_config={ + "ops": {"nwp_consumer_docker_op": NWPConsumerConfig( + date_from=scheduled_date, + date_to=scheduled_date, + source="ecmwf-mars" + )} + }, + tags={"date": scheduled_date}, + ) + +schedule_jobs.append(ecmwf_daily_schedule) + From f832b8aa50d4dfb384d18781ebd2a93012d95ecc Mon Sep 17 00:00:00 2001 From: devsjc Date: Fri, 8 Sep 2023 11:46:36 +0100 Subject: [PATCH 02/11] Add ECMWF daily scheduled job --- nwp/__init__.py | 3 +-- nwp/assets/ecmwf/mars.py | 14 ++++++++------ nwp/jobs.py | 34 ++++++++++++++++++---------------- 3 files changed, 27 insertions(+), 24 deletions(-) diff --git a/nwp/__init__.py b/nwp/__init__.py index 1517e8e..5885e95 100644 --- a/nwp/__init__.py +++ b/nwp/__init__.py @@ -7,6 +7,5 @@ defs = Definitions( assets=all_assets, - jobs=[jobs.get_ecmwf_data], - schedules=jobs.schedule_jobs, + schedules=jobs.schedules, ) diff --git a/nwp/assets/ecmwf/mars.py b/nwp/assets/ecmwf/mars.py index dab74b4..4669d80 100644 --- a/nwp/assets/ecmwf/mars.py +++ b/nwp/assets/ecmwf/mars.py @@ -7,6 +7,7 @@ class NWPConsumerConfig(Config): date_from: str date_to: str source: str + docker_volumes: list[str] @op def nwp_consumer_docker_op(context: OpExecutionContext, config: NWPConsumerConfig): @@ -18,13 +19,14 @@ def nwp_consumer_docker_op(context: OpExecutionContext, config: NWPConsumerConfi f'--from={config.date_from}', f'--to={config.date_to}' ], - env_vars=["ECMWF_API_KEY", "ECMWF_API_URL", "ECMWF_API_EMAIL"], + env_vars=[ + "ECMWF_API_KEY", "ECMWF_API_URL", "ECMWF_API_EMAIL", + "CEDA_FTP_USER", "CEDA_FTP_PASS", + "METOFFICE_ORDER_ID", "METOFFICE_CLIENT_ID", "METOFFICE_CLIENT_SECRET", + "AWS_S3_BUCKET", "AWS_REGION", "AWS_ACCESS_KEY", "AWS_ACCESS_SECRET", + ], container_kwargs={ - "volumes": [ - '/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/raw:/tmp/raw', - '/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/zarr:/tmp/zarr', - '/tmp/nwpc:/tmp/nwpc' - ] + "volumes": docker_volumes } ) diff --git a/nwp/jobs.py b/nwp/jobs.py index bd2afcf..04cdafe 100644 --- a/nwp/jobs.py +++ b/nwp/jobs.py @@ -20,8 +20,7 @@ def build_config_on_runtime(model, run, delay=0): return config_dict -asset_jobs = [] -schedule_jobs = [] +schedules = [] for r in ["00", "06", "12", "18"]: for model in ["global", "eu"]: for delay in [0, 1]: @@ -36,29 +35,27 @@ def build_config_on_runtime(model, run, delay=0): ) match (delay, r): case (0, "00"): - schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="30 4 * * *")) + schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="30 4 * * *")) case (0, "06"): - schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="30 10 * * *")) + schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="30 10 * * *")) case (0, "12"): - schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="30 16 * * *")) + schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="30 16 * * *")) case (0, "18"): - schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="30 22 * * *")) + schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="30 22 * * *")) case (1, "00"): - schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="1 0 * * *")) + schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="1 0 * * *")) case (1, "06"): - schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="0 2 * * *")) + schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="0 2 * * *")) case (1, "12"): - schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="0 6 * * *")) + schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="0 6 * * *")) case (1, "18"): - schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="0 8 * * *")) + schedules.append(ScheduleDefinition(job=asset_job, cron_schedule="0 8 * * *")) - asset_jobs.append(asset_job) - @job -def get_ecmwf_data(): +def nwp_consumer_docker_job(): nwp_consumer_docker_op() -@schedule(job=get_ecmwf_data, cron_schedule="0 13 * * *") +@schedule(job=nwp_consumer_docker_job, cron_schedule="0 13 * * *") def ecmwf_daily_schedule(context: ScheduleEvaluationContext): scheduled_date = context.scheduled_execution_time.strftime("%Y-%m-%d") return RunRequest( @@ -67,11 +64,16 @@ def ecmwf_daily_schedule(context: ScheduleEvaluationContext): "ops": {"nwp_consumer_docker_op": NWPConsumerConfig( date_from=scheduled_date, date_to=scheduled_date, - source="ecmwf-mars" + source="ecmwf-mars", + docker_volumes=[ + '/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/raw:/tmp/raw', + '/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/zarr:/tmp/zarr', + '/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/tmp:/tmp/nwpc' + ] )} }, tags={"date": scheduled_date}, ) -schedule_jobs.append(ecmwf_daily_schedule) +schedules.append(ecmwf_daily_schedule) From 36f1db2284a740e6b7efc702795304c316c90524 Mon Sep 17 00:00:00 2001 From: devsjc Date: Fri, 8 Sep 2023 11:47:48 +0100 Subject: [PATCH 03/11] Update name of schedule --- nwp/jobs.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nwp/jobs.py b/nwp/jobs.py index 04cdafe..0fce8c0 100644 --- a/nwp/jobs.py +++ b/nwp/jobs.py @@ -56,7 +56,7 @@ def nwp_consumer_docker_job(): nwp_consumer_docker_op() @schedule(job=nwp_consumer_docker_job, cron_schedule="0 13 * * *") -def ecmwf_daily_schedule(context: ScheduleEvaluationContext): +def ecmwf_daily_local_archive_schedule(context: ScheduleEvaluationContext): scheduled_date = context.scheduled_execution_time.strftime("%Y-%m-%d") return RunRequest( run_key=None, @@ -75,5 +75,5 @@ def ecmwf_daily_schedule(context: ScheduleEvaluationContext): tags={"date": scheduled_date}, ) -schedules.append(ecmwf_daily_schedule) +schedules.append(ecmwf_daily_local_archive_schedule) From 3d106a1fa3fd1f08caa429c2319948b9e1fe7aec Mon Sep 17 00:00:00 2001 From: devsjc Date: Fri, 8 Sep 2023 12:00:44 +0100 Subject: [PATCH 04/11] Undo accidental changes --- nwp/jobs.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/nwp/jobs.py b/nwp/jobs.py index 0fce8c0..8022b31 100644 --- a/nwp/jobs.py +++ b/nwp/jobs.py @@ -6,16 +6,16 @@ import datetime as dt -base_path = "/mnt/storage_b/data/ocf/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/DWD" +dwd_base_path = "/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/DWD" def build_config_on_runtime(model, run, delay=0): config = IconConfig(model=model, run=run, delay=delay, folder=f"{base_path}/ICON_Global/{run}", - zarr_path=f"{base_path}/ICON_Global/{run}/{run}.zarr.zip") - config_dict = {"delay": config.delay, "folder": config.folder, "model": config.model, "run": config.run, + zarr_path=f"{base_path}/ICON_Global/{run}/{run}.zarr.zip"), + config_dict = {"delay": config.delay, "folder": config.folder, "model": config.model, "run": config.run, "zarr_path": config.zarr_path} return config_dict From ca9b4027c7aa604dfae24b7c9131eca482df6242 Mon Sep 17 00:00:00 2001 From: devsjc Date: Fri, 8 Sep 2023 12:49:51 +0100 Subject: [PATCH 05/11] Return to earlier job definition --- nwp/jobs.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/nwp/jobs.py b/nwp/jobs.py index 8022b31..c325bbc 100644 --- a/nwp/jobs.py +++ b/nwp/jobs.py @@ -5,21 +5,18 @@ import datetime as dt - - dwd_base_path = "/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/DWD" def build_config_on_runtime(model, run, delay=0): config = IconConfig(model=model, run=run, delay=delay, - folder=f"{base_path}/ICON_Global/{run}", - zarr_path=f"{base_path}/ICON_Global/{run}/{run}.zarr.zip"), - config_dict = {"delay": config.delay, "folder": config.folder, "model": config.model, "run": config.run, + folder=f"{base_path}/{'ICON_Global' if model == 'global' else 'ICON_EU'}/{run}", + zarr_path=f"{base_path}/{'ICON_Global' if model == 'global' else 'ICON_EU'}/{run}/{run}.zarr.zip") + config_dict = {"delay": config.delay, "folder": config.folder, "model": config.model, "run": config.run, "zarr_path": config.zarr_path} return config_dict - schedules = [] for r in ["00", "06", "12", "18"]: for model in ["global", "eu"]: From 096f4dfd5e2bafce0de8a26783b3c8917d480183 Mon Sep 17 00:00:00 2001 From: devsjc Date: Fri, 8 Sep 2023 12:51:06 +0100 Subject: [PATCH 06/11] dwd_base_path --- nwp/jobs.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nwp/jobs.py b/nwp/jobs.py index c325bbc..9d0184a 100644 --- a/nwp/jobs.py +++ b/nwp/jobs.py @@ -11,8 +11,8 @@ def build_config_on_runtime(model, run, delay=0): config = IconConfig(model=model, run=run, delay=delay, - folder=f"{base_path}/{'ICON_Global' if model == 'global' else 'ICON_EU'}/{run}", - zarr_path=f"{base_path}/{'ICON_Global' if model == 'global' else 'ICON_EU'}/{run}/{run}.zarr.zip") + folder=f"{dwd_base_path}/{'ICON_Global' if model == 'global' else 'ICON_EU'}/{run}", + zarr_path=f"{dwd_base_path}/{'ICON_Global' if model == 'global' else 'ICON_EU'}/{run}/{run}.zarr.zip") config_dict = {"delay": config.delay, "folder": config.folder, "model": config.model, "run": config.run, "zarr_path": config.zarr_path} return config_dict From 624ba19356a1bb3d285965bb09a8a441ac306a66 Mon Sep 17 00:00:00 2001 From: devsjc Date: Fri, 8 Sep 2023 15:09:47 +0100 Subject: [PATCH 07/11] Reduce pinned versions to stop satip conflicts --- pyproject.toml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 1432dc0..36fdeea 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,12 +24,12 @@ dependencies = [ "dagster-cloud == 1.4.11", "dagster-docker == 0.20.11", "huggingface-hub == 0.16.4", - "numpy == 1.24.2", + "numpy >= 1.23.0", "ocf-blosc2 == 0.0.3", "pathlib == 1.0.1", - "requests == 2.31.0", - "xarray == 2023.2.0", - "zarr == 2.14.2", + "requests >= 2.28.0", + "xarray >= 2022.3.0", + "zarr >= 2.13.3", "satip == 2.11.10", ] From 379baec7b6c681b5608e1fa4d5e79e3e89cc1c32 Mon Sep 17 00:00:00 2001 From: devsjc Date: Fri, 8 Sep 2023 15:16:27 +0100 Subject: [PATCH 08/11] Add libgeos to apt install --- .github/workflows/ci.yml | 2 +- pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5cccead..652d8dd 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -38,7 +38,7 @@ jobs: # Should mirror the build-venv stage in the Containerfile - name: Build venv run: | - apt -qq update && apt -qq install -y build-essential + apt -qq update && apt -qq install -y build-essential libgeos-dev python -m venv ./venv ./venv/bin/pip install --upgrade -q pip wheel setuptools if: steps.restore-cache.outputs.cache-hit != 'true' diff --git a/pyproject.toml b/pyproject.toml index 36fdeea..da82544 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,7 +30,7 @@ dependencies = [ "requests >= 2.28.0", "xarray >= 2022.3.0", "zarr >= 2.13.3", - "satip == 2.11.10", + "satip >= 2.11.10", ] [project.optional-dependencies] From a4f93f218b26de33c2a61cc0c5f0773d6d3aff52 Mon Sep 17 00:00:00 2001 From: devsjc Date: Fri, 8 Sep 2023 15:19:56 +0100 Subject: [PATCH 09/11] Sudo in apt --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 652d8dd..c91264a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -38,7 +38,7 @@ jobs: # Should mirror the build-venv stage in the Containerfile - name: Build venv run: | - apt -qq update && apt -qq install -y build-essential libgeos-dev + sudo apt -qq update && apt -qq install -y build-essential libgeos-dev python -m venv ./venv ./venv/bin/pip install --upgrade -q pip wheel setuptools if: steps.restore-cache.outputs.cache-hit != 'true' From c982b19b67d8743af54ab40ec05c52c88c67b054 Mon Sep 17 00:00:00 2001 From: devsjc Date: Fri, 8 Sep 2023 15:21:37 +0100 Subject: [PATCH 10/11] Sudo in apt x2 --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c91264a..60f14b3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -38,7 +38,7 @@ jobs: # Should mirror the build-venv stage in the Containerfile - name: Build venv run: | - sudo apt -qq update && apt -qq install -y build-essential libgeos-dev + sudo apt -qq update && sudo apt -qq install -y build-essential libgeos-dev python -m venv ./venv ./venv/bin/pip install --upgrade -q pip wheel setuptools if: steps.restore-cache.outputs.cache-hit != 'true' From 31511d0b1caf4baa6198d7760bff620256341e3e Mon Sep 17 00:00:00 2001 From: devsjc Date: Fri, 8 Sep 2023 15:29:56 +0100 Subject: [PATCH 11/11] Fix test --- dags_tests/compile_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dags_tests/compile_test.py b/dags_tests/compile_test.py index c979b12..e915e32 100644 --- a/dags_tests/compile_test.py +++ b/dags_tests/compile_test.py @@ -3,5 +3,5 @@ def test_compiles(): job_names = [d.name for d in list(defs.get_all_job_defs())] - assert "get_ecmwf_data" in job_names + assert "nwp_consumer_docker_job" in job_names assert len(job_names) == 18