From 8a4e814cddd36bc7fda1e0cc8532897db92ae3b9 Mon Sep 17 00:00:00 2001 From: Jacob Bieker Date: Thu, 26 Oct 2023 11:20:58 +0100 Subject: [PATCH] Add CAMS EU Downloading (#27) * CAMS EU Downloading * Update cams_eu.py --- nwp/assets/cams/__init__.py | 4 +- nwp/assets/cams/cams.py | 5 +- nwp/assets/cams/cams_eu.py | 109 ++++++++++++++++++++++++++++++++++++ nwp/assets/cams/utils.py | 6 ++ nwp/jobs.py | 20 ++++++- 5 files changed, 139 insertions(+), 5 deletions(-) create mode 100644 nwp/assets/cams/cams_eu.py create mode 100644 nwp/assets/cams/utils.py diff --git a/nwp/assets/cams/__init__.py b/nwp/assets/cams/__init__.py index c7f5d79..cb9764a 100644 --- a/nwp/assets/cams/__init__.py +++ b/nwp/assets/cams/__init__.py @@ -1 +1,3 @@ -from .cams import fetch_cams_forecast_for_day, CAMSConfig +from .cams import fetch_cams_forecast_for_day +from .cams_eu import fetch_cams_eu_forecast_for_day +from .utils import CAMSConfig diff --git a/nwp/assets/cams/cams.py b/nwp/assets/cams/cams.py index 7bacc3b..d68b885 100644 --- a/nwp/assets/cams/cams.py +++ b/nwp/assets/cams/cams.py @@ -4,6 +4,8 @@ import dagster from dagster import AssetObservation +from nwp.assets.cams import CAMSConfig + SINGLE_NEW_VARIABLES: list[str] = [ 'ammonium_aerosol_optical_depth_550nm', 'black_carbon_aerosol_optical_depth_550nm', 'dust_aerosol_optical_depth_550nm', 'nitrate_aerosol_optical_depth_550nm', 'organic_matter_aerosol_optical_depth_550nm', 'particulate_matter_10um', @@ -142,9 +144,6 @@ '00:00', '12:00', ] -class CAMSConfig(dagster.Config): - date: str - raw_dir: str @dagster.op def fetch_cams_forecast_for_day(context: dagster.OpExecutionContext, config: CAMSConfig): diff --git a/nwp/assets/cams/cams_eu.py b/nwp/assets/cams/cams_eu.py new file mode 100644 index 0000000..00c6450 --- /dev/null +++ b/nwp/assets/cams/cams_eu.py @@ -0,0 +1,109 @@ +import datetime as dt +import os +import cdsapi +import dagster +from dagster import AssetObservation +from nwp.assets.cams import CAMSConfig + + +INIT_TIMES: list[str] = [ + '00:00', +] + +VARIABLES = [ + 'alder_pollen', 'ammonia', 'birch_pollen', + 'carbon_monoxide', 'dust', 'grass_pollen', + 'nitrogen_dioxide', 'nitrogen_monoxide', 'non_methane_vocs', + 'olive_pollen', 'ozone', 'particulate_matter_10um', + 'particulate_matter_2.5um', 'peroxyacyl_nitrates', 'pm10_wildfires', + 'ragweed_pollen', 'secondary_inorganic_aerosol', 'sulphur_dioxide', +] + + +@dagster.op +def fetch_cams_eu_forecast_for_day(context: dagster.OpExecutionContext, config: CAMSConfig): + """Fetch CAMS forecast for a given day.""" + + c = cdsapi.Client() + + date: dt.datetime = dt.datetime.strptime(config.date, "%Y-%m-%d") + + if date < dt.datetime.utcnow() - dt.timedelta(days=1095): + raise ValueError('CAMS data is only available from 3 years ago onwards.') + + # Multi-level variables first + for it in INIT_TIMES: + for var in VARIALES: + fname: str = f'{config.raw_dir}/{date.strftime("%Y%m%d")}{it[:2]}_{var}.grib' + + c.retrieve( + 'cams-europe-air-quality-forecasts', + { + 'date': date.strftime("%Y-%m-%d/%Y-%m-%d"), + 'type': 'forecast', + 'format': 'netcdf', + 'model': 'ensemble', + 'variable': var, + 'level': [ + '0', '1000', '2000', + '250', '3000', '50', + '500', '5000', + ], + 'leadtime_hour': [ + '0', '1', '10', + '11', '12', '13', + '14', '15', '16', + '17', '18', '19', + '2', '20', '21', + '22', '23', '24', + '25', '26', '27', + '28', '29', '3', + '30', '31', '32', + '33', '34', '35', + '36', '37', '38', + '39', '4', '40', + '41', '42', '43', + '44', '45', '46', + '47', '48', '49', + '5', '50', '51', + '52', '53', '54', + '55', '56', '57', + '58', '59', '6', + '60', '61', '62', + '63', '64', '65', + '66', '67', '68', + '69', '7', '70', + '71', '72', '73', + '74', '75', '76', + '77', '78', '79', + '8', '80', '81', + '82', '83', '84', + '85', '86', '87', + '88', '89', '9', + '90', '91', '92', + '93', '94', '95', + '96', + ], + "time": it, + }, + f'{config.raw_dir}/{date.strftime("%Y%m%d")}{it[:2]}_{var}.nc') + + context.log_event( + AssetObservation( + asset_key="cams_eu_data", + metadata={ + "path": fname, + "date": date.strftime("%Y-%m-%d"), + "variable": var, + "init_time": it, + } + ) + ) + + + + # Validate that all files were downloaded + for var in VARIABLES: + fname: str = f'{config.raw_dir}/{date.strftime("%Y%m%d")}{it[:2]}_{var}.nc' + if not os.path.isfile(fname): + raise FileNotFoundError(f"File {fname} was not downloaded.") diff --git a/nwp/assets/cams/utils.py b/nwp/assets/cams/utils.py new file mode 100644 index 0000000..653f2da --- /dev/null +++ b/nwp/assets/cams/utils.py @@ -0,0 +1,6 @@ +import dagster + + +class CAMSConfig(dagster.Config): + date: str + raw_dir: str diff --git a/nwp/jobs.py b/nwp/jobs.py index 7edf7dd..bcaed2a 100644 --- a/nwp/jobs.py +++ b/nwp/jobs.py @@ -5,7 +5,7 @@ import dagster -from nwp.assets.cams import CAMSConfig, fetch_cams_forecast_for_day +from nwp.assets.cams import CAMSConfig, fetch_cams_forecast_for_day, fetch_cams_eu_forecast_for_day from nwp.assets.dwd.common import IconConfig from nwp.assets.ecmwf.mars import ( NWPConsumerConfig, @@ -97,6 +97,24 @@ def cams_daily_archive() -> None: jobs.append(cams_daily_archive) schedules.append(dagster.build_schedule_from_partitioned_job(cams_daily_archive, hour_of_day=16)) + +@dagster.daily_partitioned_config(start_date=dt.datetime(2020, 10, 27)) +def CAMSEUDailyPartitionConfig(start: dt.datetime, _end: dt.datetime) -> dict[str, Any]: + config = CAMSConfig( + date=start.strftime("%Y-%m-%d"), + raw_dir="/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/CAMS_EU/raw", + ) + return {"ops": {"fetch_cams_eu_forecast_for_day": {"config": json.loads(config.json())}}} + + +@dagster.job(config=CAMSDailyPartitionConfig) +def cams_eu_daily_archive() -> None: + """Download CAMS data for a given day.""" + fetch_cams_eu_forecast_for_day() + +jobs.append(cams_eu_daily_archive) +schedules.append(dagster.build_schedule_from_partitioned_job(cams_eu_daily_archive, hour_of_day=16)) + # --- NWP Consumer jobs and schedules -------------------------------------- class NWPConsumerDagDefinition: