Skip to content

Commit

Permalink
CAMS EU Downloading
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobbieker committed Oct 26, 2023
1 parent 9b25ab7 commit 041a274
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 5 deletions.
4 changes: 3 additions & 1 deletion nwp/assets/cams/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
from .cams import fetch_cams_forecast_for_day, CAMSConfig
from .cams import fetch_cams_forecast_for_day
from .cams_eu import fetch_cams_eu_forecast_for_day
from .utils import CAMSConfig
5 changes: 2 additions & 3 deletions nwp/assets/cams/cams.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import dagster
from dagster import AssetObservation

from nwp.assets.cams import CAMSConfig

SINGLE_NEW_VARIABLES: list[str] = [
'ammonium_aerosol_optical_depth_550nm', 'black_carbon_aerosol_optical_depth_550nm', 'dust_aerosol_optical_depth_550nm',
'nitrate_aerosol_optical_depth_550nm', 'organic_matter_aerosol_optical_depth_550nm', 'particulate_matter_10um',
Expand Down Expand Up @@ -142,9 +144,6 @@
'00:00', '12:00',
]

class CAMSConfig(dagster.Config):
date: str
raw_dir: str

@dagster.op
def fetch_cams_forecast_for_day(context: dagster.OpExecutionContext, config: CAMSConfig):
Expand Down
109 changes: 109 additions & 0 deletions nwp/assets/cams/cams_eu.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import datetime as dt
import os
import cdsapi
import dagster
from dagster import AssetObservation
from nwp.assets.cams import CAMSConfig


INIT_TIMES: list[str] = [
'00:00',
]

VARIALES = [
'alder_pollen', 'ammonia', 'birch_pollen',
'carbon_monoxide', 'dust', 'grass_pollen',
'nitrogen_dioxide', 'nitrogen_monoxide', 'non_methane_vocs',
'olive_pollen', 'ozone', 'particulate_matter_10um',
'particulate_matter_2.5um', 'peroxyacyl_nitrates', 'pm10_wildfires',
'ragweed_pollen', 'secondary_inorganic_aerosol', 'sulphur_dioxide',
]


@dagster.op
def fetch_cams_eu_forecast_for_day(context: dagster.OpExecutionContext, config: CAMSConfig):
"""Fetch CAMS forecast for a given day."""

c = cdsapi.Client()

date: dt.datetime = dt.datetime.strptime(config.date, "%Y-%m-%d")

if date < dt.datetime.utcnow() - dt.timedelta(days=1095):
raise ValueError('CAMS data is only available from 3 years ago onwards.')

# Multi-level variables first
for it in INIT_TIMES:
for var in VARIALES:
fname: str = f'{config.raw_dir}/{date.strftime("%Y%m%d")}{it[:2]}_{var}.grib'

c.retrieve(
'cams-europe-air-quality-forecasts',
{
'date': date.strftime("%Y-%m-%d/%Y-%m-%d"),
'type': 'forecast',
'format': 'netcdf',
'model': 'ensemble',
'variable': var,
'level': [
'0', '1000', '2000',
'250', '3000', '50',
'500', '5000',
],
'leadtime_hour': [
'0', '1', '10',
'11', '12', '13',
'14', '15', '16',
'17', '18', '19',
'2', '20', '21',
'22', '23', '24',
'25', '26', '27',
'28', '29', '3',
'30', '31', '32',
'33', '34', '35',
'36', '37', '38',
'39', '4', '40',
'41', '42', '43',
'44', '45', '46',
'47', '48', '49',
'5', '50', '51',
'52', '53', '54',
'55', '56', '57',
'58', '59', '6',
'60', '61', '62',
'63', '64', '65',
'66', '67', '68',
'69', '7', '70',
'71', '72', '73',
'74', '75', '76',
'77', '78', '79',
'8', '80', '81',
'82', '83', '84',
'85', '86', '87',
'88', '89', '9',
'90', '91', '92',
'93', '94', '95',
'96',
],
"time": it,
},
f'{config.raw_dir}/{date.strftime("%Y%m%d")}{it[:2]}_{var}.nc')

context.log_event(
AssetObservation(
asset_key="cams_eu_data",
metadata={
"path": fname,
"date": date.strftime("%Y-%m-%d"),
"variable": var,
"init_time": it,
}
)
)



# Validate that all files were downloaded
for var in VARIALES:
fname: str = f'{config.raw_dir}/{date.strftime("%Y%m%d")}{it[:2]}_{var}.nc'
if not os.path.isfile(fname):
raise FileNotFoundError(f"File {fname} was not downloaded.")
6 changes: 6 additions & 0 deletions nwp/assets/cams/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import dagster


class CAMSConfig(dagster.Config):
date: str
raw_dir: str
20 changes: 19 additions & 1 deletion nwp/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import dagster

from nwp.assets.cams import CAMSConfig, fetch_cams_forecast_for_day
from nwp.assets.cams import CAMSConfig, fetch_cams_forecast_for_day, fetch_cams_eu_forecast_for_day
from nwp.assets.dwd.common import IconConfig
from nwp.assets.ecmwf.mars import (
NWPConsumerConfig,
Expand Down Expand Up @@ -97,6 +97,24 @@ def cams_daily_archive() -> None:
jobs.append(cams_daily_archive)
schedules.append(dagster.build_schedule_from_partitioned_job(cams_daily_archive, hour_of_day=16))


@dagster.daily_partitioned_config(start_date=dt.datetime(2020, 10, 27))
def CAMSEUDailyPartitionConfig(start: dt.datetime, _end: dt.datetime) -> dict[str, Any]:
config = CAMSConfig(
date=start.strftime("%Y-%m-%d"),
raw_dir="/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/CAMS_EU/raw",
)
return {"ops": {"fetch_cams_eu_forecast_for_day": {"config": json.loads(config.json())}}}


@dagster.job(config=CAMSDailyPartitionConfig)
def cams_eu_daily_archive() -> None:
"""Download CAMS data for a given day."""
fetch_cams_eu_forecast_for_day()

jobs.append(cams_eu_daily_archive)
schedules.append(dagster.build_schedule_from_partitioned_job(cams_eu_daily_archive, hour_of_day=16))

# --- NWP Consumer jobs and schedules --------------------------------------

class NWPConsumerDagDefinition:
Expand Down

0 comments on commit 041a274

Please sign in to comment.