From 9562d76323f0801961bafcb50ca0c57c4a8eb865 Mon Sep 17 00:00:00 2001 From: devsjc Date: Fri, 20 Oct 2023 14:57:08 +0100 Subject: [PATCH 1/4] Add CAMS job and op --- dags_tests/cams_test.py | 9 ++ nwp/assets/cams/__init__.py | 1 + nwp/assets/cams/cams.py | 168 ++++++++++++++++++++++++++++++++++++ nwp/jobs.py | 22 +++++ pyproject.toml | 1 + 5 files changed, 201 insertions(+) create mode 100644 dags_tests/cams_test.py create mode 100644 nwp/assets/cams/__init__.py create mode 100644 nwp/assets/cams/cams.py diff --git a/dags_tests/cams_test.py b/dags_tests/cams_test.py new file mode 100644 index 0000000..0bebb2f --- /dev/null +++ b/dags_tests/cams_test.py @@ -0,0 +1,9 @@ +from nwp.assets.cams import fetch_cams_forecast_for_day +import unittest + +import datetime as dt + +class TestCams(unittest.TestCase): + def test_cams(self): + fetch_cams_forecast_for_day(dt.datetime.utcnow()) + diff --git a/nwp/assets/cams/__init__.py b/nwp/assets/cams/__init__.py new file mode 100644 index 0000000..c7f5d79 --- /dev/null +++ b/nwp/assets/cams/__init__.py @@ -0,0 +1 @@ +from .cams import fetch_cams_forecast_for_day, CAMSConfig diff --git a/nwp/assets/cams/cams.py b/nwp/assets/cams/cams.py new file mode 100644 index 0000000..355cf81 --- /dev/null +++ b/nwp/assets/cams/cams.py @@ -0,0 +1,168 @@ +import cdsapi +import dagster +import os + +import datetime as dt + +c = cdsapi.Client() + +VARIABLES: list[str] = [ + 'aerosol_extinction_coefficient_1064nm', 'aerosol_extinction_coefficient_355nm', 'aerosol_extinction_coefficient_532nm', + 'ammonium_aerosol_mass_mixing_ratio', 'ammonium_aerosol_optical_depth_550nm', 'anthropogenic_secondary_organic_aerosol_mass_mixing_ratio', + 'asymmetry_factor_1020nm', 'asymmetry_factor_1064nm', 'asymmetry_factor_1240nm', + 'asymmetry_factor_1640nm', 'asymmetry_factor_2130nm', 'asymmetry_factor_340nm', + 'asymmetry_factor_355nm', 'asymmetry_factor_380nm', 'asymmetry_factor_400nm', + 'asymmetry_factor_440nm', 'asymmetry_factor_469nm', 'asymmetry_factor_500nm', + 'asymmetry_factor_532nm', 'asymmetry_factor_550nm', 'asymmetry_factor_645nm', + 'asymmetry_factor_670nm', 'asymmetry_factor_800nm', 'asymmetry_factor_858nm', + 'asymmetry_factor_865nm', 'attenuated_backscatter_due_to_aerosol_1064nm_from_ground', 'attenuated_backscatter_due_to_aerosol_1064nm_from_top_of_atmosphere', + 'attenuated_backscatter_due_to_aerosol_355nm_from_ground', 'attenuated_backscatter_due_to_aerosol_355nm_from_top_of_atmosphere', 'attenuated_backscatter_due_to_aerosol_532nm_from_ground', + 'attenuated_backscatter_due_to_aerosol_532nm_from_top_of_atmosphere', 'biogenic_secondary_organic_aerosol_mass_mixing_ratio', 'black_carbon_aerosol_optical_depth_550nm', + 'carbon_monoxide', 'chlorine_monoxide', 'chlorine_nitrate', + 'dust_aerosol_0.03-0.55um_mixing_ratio', 'dust_aerosol_0.03-0.55um_optical_depth_550nm', 'dust_aerosol_0.55-0.9um_mixing_ratio', + 'dust_aerosol_0.55-9um_optical_depth_550nm', 'dust_aerosol_0.9-20um_mixing_ratio', 'dust_aerosol_9-20um_optical_depth_550nm', + 'dust_aerosol_optical_depth_550nm', 'ethane', 'formaldehyde', + 'hydrogen_chloride', 'hydrogen_cyanide', 'hydrogen_peroxide', + 'hydrophilic_black_carbon_aerosol_mixing_ratio', 'hydrophilic_black_carbon_aerosol_optical_depth_550nm', 'hydrophilic_organic_matter_aerosol_mixing_ratio', + 'hydrophilic_organic_matter_aerosol_optical_depth_550nm', 'hydrophobic_black_carbon_aerosol_mixing_ratio', 'hydrophobic_black_carbon_aerosol_optical_depth_550nm', + 'hydrophobic_organic_matter_aerosol_mixing_ratio', 'hydrophobic_organic_matter_aerosol_optical_depth_550nm', 'hydroxyl_radical', + 'isoprene', 'methane', 'nitrate_aerosol_optical_depth_550nm', + 'nitrate_coarse_mode_aerosol_mass_mixing_ratio', 'nitrate_coarse_mode_aerosol_optical_depth_550nm', 'nitrate_fine_mode_aerosol_mass_mixing_ratio', + 'nitrate_fine_mode_aerosol_optical_depth_550nm', 'nitric_acid', 'nitrogen_dioxide', + 'nitrogen_monoxide', 'organic_matter_aerosol_optical_depth_550nm', 'ozone', + 'particulate_matter_10um', 'particulate_matter_1um', 'particulate_matter_2.5um', + 'peroxyacetyl_nitrate', 'propane', 'sea_salt_aerosol_0.03-0.5um_mixing_ratio', + 'sea_salt_aerosol_0.03-0.5um_optical_depth_550nm', 'sea_salt_aerosol_0.5-5um_mixing_ratio', 'sea_salt_aerosol_0.5-5um_optical_depth_550nm', + 'sea_salt_aerosol_5-20um_mixing_ratio', 'sea_salt_aerosol_5-20um_optical_depth_550nm', 'sea_salt_aerosol_optical_depth_550nm', + 'secondary_organic_aerosol_optical_depth_550nm', 'single_scattering_albedo_1020nm', 'single_scattering_albedo_1064nm', + 'single_scattering_albedo_1240nm', 'single_scattering_albedo_1640nm', 'single_scattering_albedo_2130nm', + 'single_scattering_albedo_340nm', 'single_scattering_albedo_355nm', 'single_scattering_albedo_380nm', + 'single_scattering_albedo_400nm', 'single_scattering_albedo_440nm', 'single_scattering_albedo_469nm', + 'single_scattering_albedo_500nm', 'single_scattering_albedo_532nm', 'single_scattering_albedo_550nm', + 'single_scattering_albedo_645nm', 'single_scattering_albedo_670nm', 'single_scattering_albedo_800nm', + 'single_scattering_albedo_858nm', 'single_scattering_albedo_865nm', 'sulphate_aerosol_mixing_ratio', + 'sulphate_aerosol_optical_depth_550nm', 'sulphur_dioxide', 'total_absorption_aerosol_optical_depth_1020nm', + 'total_absorption_aerosol_optical_depth_1064nm', 'total_absorption_aerosol_optical_depth_1240nm', 'total_absorption_aerosol_optical_depth_1640nm', + 'total_absorption_aerosol_optical_depth_2130nm', 'total_absorption_aerosol_optical_depth_340nm', 'total_absorption_aerosol_optical_depth_355nm', + 'total_absorption_aerosol_optical_depth_380nm', 'total_absorption_aerosol_optical_depth_400nm', 'total_absorption_aerosol_optical_depth_440nm', + 'total_absorption_aerosol_optical_depth_469nm', 'total_absorption_aerosol_optical_depth_500nm', 'total_absorption_aerosol_optical_depth_532nm', + 'total_absorption_aerosol_optical_depth_550nm', 'total_absorption_aerosol_optical_depth_645nm', 'total_absorption_aerosol_optical_depth_670nm', + 'total_absorption_aerosol_optical_depth_800nm', 'total_absorption_aerosol_optical_depth_858nm', 'total_absorption_aerosol_optical_depth_865nm', + 'total_aerosol_optical_depth_1020nm', 'total_aerosol_optical_depth_1064nm', 'total_aerosol_optical_depth_1240nm', + 'total_aerosol_optical_depth_1640nm', 'total_aerosol_optical_depth_2130nm', 'total_aerosol_optical_depth_340nm', + 'total_aerosol_optical_depth_355nm', 'total_aerosol_optical_depth_380nm', 'total_aerosol_optical_depth_400nm', + 'total_aerosol_optical_depth_440nm', 'total_aerosol_optical_depth_469nm', 'total_aerosol_optical_depth_500nm', + 'total_aerosol_optical_depth_532nm', 'total_aerosol_optical_depth_550nm', 'total_aerosol_optical_depth_645nm', + 'total_aerosol_optical_depth_670nm', 'total_aerosol_optical_depth_800nm', 'total_aerosol_optical_depth_858nm', + 'total_aerosol_optical_depth_865nm', 'total_column_carbon_monoxide', 'total_column_chlorine_monoxide', + 'total_column_chlorine_nitrate', 'total_column_ethane', 'total_column_formaldehyde', + 'total_column_hydrogen_chloride', 'total_column_hydrogen_cyanide', 'total_column_hydrogen_peroxide', + 'total_column_hydroxyl_radical', 'total_column_isoprene', 'total_column_methane', + 'total_column_nitric_acid', 'total_column_nitrogen_dioxide', 'total_column_nitrogen_monoxide', + 'total_column_ozone', 'total_column_peroxyacetyl_nitrate', 'total_column_propane', + 'total_column_sulphur_dioxide', 'total_fine_mode_aerosol_optical_depth_1020nm', 'total_fine_mode_aerosol_optical_depth_1064nm', + 'total_fine_mode_aerosol_optical_depth_1240nm', 'total_fine_mode_aerosol_optical_depth_1640nm', 'total_fine_mode_aerosol_optical_depth_2130nm', + 'total_fine_mode_aerosol_optical_depth_340nm', 'total_fine_mode_aerosol_optical_depth_355nm', 'total_fine_mode_aerosol_optical_depth_380nm', + 'total_fine_mode_aerosol_optical_depth_400nm', 'total_fine_mode_aerosol_optical_depth_440nm', 'total_fine_mode_aerosol_optical_depth_469nm', + 'total_fine_mode_aerosol_optical_depth_500nm', 'total_fine_mode_aerosol_optical_depth_532nm', 'total_fine_mode_aerosol_optical_depth_550nm', + 'total_fine_mode_aerosol_optical_depth_645nm', 'total_fine_mode_aerosol_optical_depth_670nm', 'total_fine_mode_aerosol_optical_depth_800nm', + 'total_fine_mode_aerosol_optical_depth_858nm', 'total_fine_mode_aerosol_optical_depth_865nm', + ] + +INIT_TIMES: list[str] = [ + '00:00', '12:00', +] + +class CAMSConfig(dagster.Config): + date: str + raw_dir: str + +@dagster.op +def fetch_cams_forecast_for_day(context: dagster.OpExecutionContext, config: CAMSConfig): + """Fetch CAMS forecast for a given day.""" + + date: dt.datetime = dt.datetime.strptime(config.date, "%Y-%m-%d") + + if date < dt.datetime(2015, 1, 1): + raise ValueError('CAMS data is only available from 2015-01-01 onwards.') + + for it in INIT_TIMES: + for var in VARIABLES: + fname: str = f'{config.raw_dir}/{date.strftime("%Y%m%d")}{it[:2]}_{var}.grib' + + c.retrieve( + 'cams-global-atmospheric-composition-forecasts', + { + 'date': date.strftime("%Y-%m-%d/%Y-%m-%d"), + 'type': 'forecast', + 'format': 'grib', + 'variable': var, + 'pressure_level': [ + '1', '2', '3', + '5', '7', '10', + '20', '30', '50', + '70', '100', '150', + '200', '250', '300', + '400', '500', '600', + '700', '800', '850', + '900', '925', '950', + '1000', + ], + 'leadtime_hour': [ + '0', '1', '10', + '100', '101', '102', + '103', '104', '105', + '106', '107', '108', + '109', '11', '110', + '111', '112', '113', + '114', '115', '116', + '117', '118', '119', + '12', '120', '13', + '14', '15', '16', + '17', '18', '19', + '2', '20', '21', + '22', '23', '24', + '25', '26', '27', + '28', '29', '3', + '30', '31', '32', + '33', '34', '35', + '36', '37', '38', + '39', '4', '40', + '41', '42', '43', + '44', '45', '46', + '47', '48', '49', + '5', '50', '51', + '52', '53', '54', + '55', '56', '57', + '58', '59', '6', + '60', '61', '62', + '63', '64', '65', + '66', '67', '68', + '69', '7', '70', + '71', '72', '73', + '74', '75', '76', + '77', '78', '79', + '8', '80', '81', + '82', '83', '84', + '85', '86', '87', + '88', '89', '9', + '90', '91', '92', + '93', '94', '95', + '96', '97', '98', + '99', + ], + "time": it, + }, + f'{config.raw_dir}/{date.strftime("%Y%m%d")}{it[:2]}_{var}.grib') + + context.log_event( + AssetObservation( + asset_key="cams_data", + metadata={ + "path": fname, + "date": date.strftime("%Y-%m-%d"), + "variable": var, + "init_time": it, + } + ) + ) diff --git a/nwp/jobs.py b/nwp/jobs.py index 3e52189..88eb0b0 100644 --- a/nwp/jobs.py +++ b/nwp/jobs.py @@ -1,4 +1,8 @@ import datetime as dt +from nwp.assets.cams import ( + fetch_cams_forecast_for_day, + CAMSConfig +) import json from collections.abc import Callable from typing import Any @@ -77,6 +81,24 @@ def build_config_on_runtime(model: str, run: str, delay: int = 0) -> dict: schedules.append( dagster.ScheduleDefinition(job=asset_job, cron_schedule="0 8 * * *")) +# --- CAMS jobs and schedules ---------------------------------------------- + +@dagster.daily_partitioned_config(start_date=dt.datetime(2015, 1, 1)) +def CAMSDailyPartitionConfig(start: dt.datetime, _end: dt.datetime) -> dict[str, Any]: + config = CAMSConfig( + date=start.strftime("%Y-%m-%d"), + raw_dir="/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/CAMS/raw", + ) + return {"ops": {"fetch_cams_forecast_for_day": {"config": json.loads(config.json())}}} + + +@dagster.job(config=CAMSDailyPartitionConfig) +def cams_daily_archive() -> None: + """Download CAMS data for a given day.""" + fetch_cams_forecast_for_day() + +jobs.append(cams_daily_archive) +schedules.append(dagster.build_schedule_from_partitioned_job(cams_daily_archive, hour_of_day=16)) # --- NWP Consumer jobs and schedules -------------------------------------- diff --git a/pyproject.toml b/pyproject.toml index 009426b..03d6ecb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,6 +18,7 @@ authors = [ ] classifiers = ["Programming Language :: Python :: 3"] dependencies = [ + "cdsapi == 0.6.1", "ecmwf-api-client == 1.6.3", "dagit == 1.4.11", "dagster == 1.4.11", From 18392278f139f1a33ee8c275e791ac1eef511915 Mon Sep 17 00:00:00 2001 From: devsjc Date: Fri, 20 Oct 2023 14:59:03 +0100 Subject: [PATCH 2/4] Formatting --- dags_tests/cams_test.py | 5 +++-- nwp/assets/cams/cams.py | 6 ++---- nwp/jobs.py | 7 ++----- 3 files changed, 7 insertions(+), 11 deletions(-) diff --git a/dags_tests/cams_test.py b/dags_tests/cams_test.py index 0bebb2f..3a7fb58 100644 --- a/dags_tests/cams_test.py +++ b/dags_tests/cams_test.py @@ -1,7 +1,8 @@ -from nwp.assets.cams import fetch_cams_forecast_for_day +import datetime as dt import unittest -import datetime as dt +from nwp.assets.cams import fetch_cams_forecast_for_day + class TestCams(unittest.TestCase): def test_cams(self): diff --git a/nwp/assets/cams/cams.py b/nwp/assets/cams/cams.py index 355cf81..99e5b14 100644 --- a/nwp/assets/cams/cams.py +++ b/nwp/assets/cams/cams.py @@ -1,8 +1,7 @@ +import datetime as dt + import cdsapi import dagster -import os - -import datetime as dt c = cdsapi.Client() @@ -80,7 +79,6 @@ class CAMSConfig(dagster.Config): @dagster.op def fetch_cams_forecast_for_day(context: dagster.OpExecutionContext, config: CAMSConfig): """Fetch CAMS forecast for a given day.""" - date: dt.datetime = dt.datetime.strptime(config.date, "%Y-%m-%d") if date < dt.datetime(2015, 1, 1): diff --git a/nwp/jobs.py b/nwp/jobs.py index 88eb0b0..7edf7dd 100644 --- a/nwp/jobs.py +++ b/nwp/jobs.py @@ -1,19 +1,16 @@ import datetime as dt -from nwp.assets.cams import ( - fetch_cams_forecast_for_day, - CAMSConfig -) import json from collections.abc import Callable from typing import Any import dagster +from nwp.assets.cams import CAMSConfig, fetch_cams_forecast_for_day from nwp.assets.dwd.common import IconConfig from nwp.assets.ecmwf.mars import ( NWPConsumerConfig, nwp_consumer_convert_op, - nwp_consumer_download_op + nwp_consumer_download_op, ) jobs: list[dagster.JobDefinition] = [] From d275da2629b48ee948ec824902d04333382a976e Mon Sep 17 00:00:00 2001 From: devsjc Date: Fri, 20 Oct 2023 15:01:50 +0100 Subject: [PATCH 3/4] Remove test --- dags_tests/cams_test.py | 10 ---------- dags_tests/compile_test.py | 2 +- 2 files changed, 1 insertion(+), 11 deletions(-) delete mode 100644 dags_tests/cams_test.py diff --git a/dags_tests/cams_test.py b/dags_tests/cams_test.py deleted file mode 100644 index 3a7fb58..0000000 --- a/dags_tests/cams_test.py +++ /dev/null @@ -1,10 +0,0 @@ -import datetime as dt -import unittest - -from nwp.assets.cams import fetch_cams_forecast_for_day - - -class TestCams(unittest.TestCase): - def test_cams(self): - fetch_cams_forecast_for_day(dt.datetime.utcnow()) - diff --git a/dags_tests/compile_test.py b/dags_tests/compile_test.py index 5aa6f04..8b7d347 100644 --- a/dags_tests/compile_test.py +++ b/dags_tests/compile_test.py @@ -3,4 +3,4 @@ def test_compiles(): job_names = [d.name for d in list(defs.get_all_job_defs())] - assert len(job_names) == 20 + assert len(job_names) == 21 From 0a8bcd82db476465ed74a06b8a802a9174551f0c Mon Sep 17 00:00:00 2001 From: devsjc Date: Fri, 20 Oct 2023 15:10:56 +0100 Subject: [PATCH 4/4] Make client instantiation in function --- nwp/assets/cams/cams.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/nwp/assets/cams/cams.py b/nwp/assets/cams/cams.py index 99e5b14..fb2c0dd 100644 --- a/nwp/assets/cams/cams.py +++ b/nwp/assets/cams/cams.py @@ -3,7 +3,6 @@ import cdsapi import dagster -c = cdsapi.Client() VARIABLES: list[str] = [ 'aerosol_extinction_coefficient_1064nm', 'aerosol_extinction_coefficient_355nm', 'aerosol_extinction_coefficient_532nm', @@ -79,6 +78,9 @@ class CAMSConfig(dagster.Config): @dagster.op def fetch_cams_forecast_for_day(context: dagster.OpExecutionContext, config: CAMSConfig): """Fetch CAMS forecast for a given day.""" + + c = cdsapi.Client() + date: dt.datetime = dt.datetime.strptime(config.date, "%Y-%m-%d") if date < dt.datetime(2015, 1, 1):