diff --git a/dags_tests/compile_test.py b/dags_tests/compile_test.py index 5aa6f04..8b7d347 100644 --- a/dags_tests/compile_test.py +++ b/dags_tests/compile_test.py @@ -3,4 +3,4 @@ def test_compiles(): job_names = [d.name for d in list(defs.get_all_job_defs())] - assert len(job_names) == 20 + assert len(job_names) == 21 diff --git a/nwp/assets/cams/__init__.py b/nwp/assets/cams/__init__.py new file mode 100644 index 0000000..c7f5d79 --- /dev/null +++ b/nwp/assets/cams/__init__.py @@ -0,0 +1 @@ +from .cams import fetch_cams_forecast_for_day, CAMSConfig diff --git a/nwp/assets/cams/cams.py b/nwp/assets/cams/cams.py new file mode 100644 index 0000000..fb2c0dd --- /dev/null +++ b/nwp/assets/cams/cams.py @@ -0,0 +1,168 @@ +import datetime as dt + +import cdsapi +import dagster + + +VARIABLES: list[str] = [ + 'aerosol_extinction_coefficient_1064nm', 'aerosol_extinction_coefficient_355nm', 'aerosol_extinction_coefficient_532nm', + 'ammonium_aerosol_mass_mixing_ratio', 'ammonium_aerosol_optical_depth_550nm', 'anthropogenic_secondary_organic_aerosol_mass_mixing_ratio', + 'asymmetry_factor_1020nm', 'asymmetry_factor_1064nm', 'asymmetry_factor_1240nm', + 'asymmetry_factor_1640nm', 'asymmetry_factor_2130nm', 'asymmetry_factor_340nm', + 'asymmetry_factor_355nm', 'asymmetry_factor_380nm', 'asymmetry_factor_400nm', + 'asymmetry_factor_440nm', 'asymmetry_factor_469nm', 'asymmetry_factor_500nm', + 'asymmetry_factor_532nm', 'asymmetry_factor_550nm', 'asymmetry_factor_645nm', + 'asymmetry_factor_670nm', 'asymmetry_factor_800nm', 'asymmetry_factor_858nm', + 'asymmetry_factor_865nm', 'attenuated_backscatter_due_to_aerosol_1064nm_from_ground', 'attenuated_backscatter_due_to_aerosol_1064nm_from_top_of_atmosphere', + 'attenuated_backscatter_due_to_aerosol_355nm_from_ground', 'attenuated_backscatter_due_to_aerosol_355nm_from_top_of_atmosphere', 'attenuated_backscatter_due_to_aerosol_532nm_from_ground', + 'attenuated_backscatter_due_to_aerosol_532nm_from_top_of_atmosphere', 'biogenic_secondary_organic_aerosol_mass_mixing_ratio', 'black_carbon_aerosol_optical_depth_550nm', + 'carbon_monoxide', 'chlorine_monoxide', 'chlorine_nitrate', + 'dust_aerosol_0.03-0.55um_mixing_ratio', 'dust_aerosol_0.03-0.55um_optical_depth_550nm', 'dust_aerosol_0.55-0.9um_mixing_ratio', + 'dust_aerosol_0.55-9um_optical_depth_550nm', 'dust_aerosol_0.9-20um_mixing_ratio', 'dust_aerosol_9-20um_optical_depth_550nm', + 'dust_aerosol_optical_depth_550nm', 'ethane', 'formaldehyde', + 'hydrogen_chloride', 'hydrogen_cyanide', 'hydrogen_peroxide', + 'hydrophilic_black_carbon_aerosol_mixing_ratio', 'hydrophilic_black_carbon_aerosol_optical_depth_550nm', 'hydrophilic_organic_matter_aerosol_mixing_ratio', + 'hydrophilic_organic_matter_aerosol_optical_depth_550nm', 'hydrophobic_black_carbon_aerosol_mixing_ratio', 'hydrophobic_black_carbon_aerosol_optical_depth_550nm', + 'hydrophobic_organic_matter_aerosol_mixing_ratio', 'hydrophobic_organic_matter_aerosol_optical_depth_550nm', 'hydroxyl_radical', + 'isoprene', 'methane', 'nitrate_aerosol_optical_depth_550nm', + 'nitrate_coarse_mode_aerosol_mass_mixing_ratio', 'nitrate_coarse_mode_aerosol_optical_depth_550nm', 'nitrate_fine_mode_aerosol_mass_mixing_ratio', + 'nitrate_fine_mode_aerosol_optical_depth_550nm', 'nitric_acid', 'nitrogen_dioxide', + 'nitrogen_monoxide', 'organic_matter_aerosol_optical_depth_550nm', 'ozone', + 'particulate_matter_10um', 'particulate_matter_1um', 'particulate_matter_2.5um', + 'peroxyacetyl_nitrate', 'propane', 'sea_salt_aerosol_0.03-0.5um_mixing_ratio', + 'sea_salt_aerosol_0.03-0.5um_optical_depth_550nm', 'sea_salt_aerosol_0.5-5um_mixing_ratio', 'sea_salt_aerosol_0.5-5um_optical_depth_550nm', + 'sea_salt_aerosol_5-20um_mixing_ratio', 'sea_salt_aerosol_5-20um_optical_depth_550nm', 'sea_salt_aerosol_optical_depth_550nm', + 'secondary_organic_aerosol_optical_depth_550nm', 'single_scattering_albedo_1020nm', 'single_scattering_albedo_1064nm', + 'single_scattering_albedo_1240nm', 'single_scattering_albedo_1640nm', 'single_scattering_albedo_2130nm', + 'single_scattering_albedo_340nm', 'single_scattering_albedo_355nm', 'single_scattering_albedo_380nm', + 'single_scattering_albedo_400nm', 'single_scattering_albedo_440nm', 'single_scattering_albedo_469nm', + 'single_scattering_albedo_500nm', 'single_scattering_albedo_532nm', 'single_scattering_albedo_550nm', + 'single_scattering_albedo_645nm', 'single_scattering_albedo_670nm', 'single_scattering_albedo_800nm', + 'single_scattering_albedo_858nm', 'single_scattering_albedo_865nm', 'sulphate_aerosol_mixing_ratio', + 'sulphate_aerosol_optical_depth_550nm', 'sulphur_dioxide', 'total_absorption_aerosol_optical_depth_1020nm', + 'total_absorption_aerosol_optical_depth_1064nm', 'total_absorption_aerosol_optical_depth_1240nm', 'total_absorption_aerosol_optical_depth_1640nm', + 'total_absorption_aerosol_optical_depth_2130nm', 'total_absorption_aerosol_optical_depth_340nm', 'total_absorption_aerosol_optical_depth_355nm', + 'total_absorption_aerosol_optical_depth_380nm', 'total_absorption_aerosol_optical_depth_400nm', 'total_absorption_aerosol_optical_depth_440nm', + 'total_absorption_aerosol_optical_depth_469nm', 'total_absorption_aerosol_optical_depth_500nm', 'total_absorption_aerosol_optical_depth_532nm', + 'total_absorption_aerosol_optical_depth_550nm', 'total_absorption_aerosol_optical_depth_645nm', 'total_absorption_aerosol_optical_depth_670nm', + 'total_absorption_aerosol_optical_depth_800nm', 'total_absorption_aerosol_optical_depth_858nm', 'total_absorption_aerosol_optical_depth_865nm', + 'total_aerosol_optical_depth_1020nm', 'total_aerosol_optical_depth_1064nm', 'total_aerosol_optical_depth_1240nm', + 'total_aerosol_optical_depth_1640nm', 'total_aerosol_optical_depth_2130nm', 'total_aerosol_optical_depth_340nm', + 'total_aerosol_optical_depth_355nm', 'total_aerosol_optical_depth_380nm', 'total_aerosol_optical_depth_400nm', + 'total_aerosol_optical_depth_440nm', 'total_aerosol_optical_depth_469nm', 'total_aerosol_optical_depth_500nm', + 'total_aerosol_optical_depth_532nm', 'total_aerosol_optical_depth_550nm', 'total_aerosol_optical_depth_645nm', + 'total_aerosol_optical_depth_670nm', 'total_aerosol_optical_depth_800nm', 'total_aerosol_optical_depth_858nm', + 'total_aerosol_optical_depth_865nm', 'total_column_carbon_monoxide', 'total_column_chlorine_monoxide', + 'total_column_chlorine_nitrate', 'total_column_ethane', 'total_column_formaldehyde', + 'total_column_hydrogen_chloride', 'total_column_hydrogen_cyanide', 'total_column_hydrogen_peroxide', + 'total_column_hydroxyl_radical', 'total_column_isoprene', 'total_column_methane', + 'total_column_nitric_acid', 'total_column_nitrogen_dioxide', 'total_column_nitrogen_monoxide', + 'total_column_ozone', 'total_column_peroxyacetyl_nitrate', 'total_column_propane', + 'total_column_sulphur_dioxide', 'total_fine_mode_aerosol_optical_depth_1020nm', 'total_fine_mode_aerosol_optical_depth_1064nm', + 'total_fine_mode_aerosol_optical_depth_1240nm', 'total_fine_mode_aerosol_optical_depth_1640nm', 'total_fine_mode_aerosol_optical_depth_2130nm', + 'total_fine_mode_aerosol_optical_depth_340nm', 'total_fine_mode_aerosol_optical_depth_355nm', 'total_fine_mode_aerosol_optical_depth_380nm', + 'total_fine_mode_aerosol_optical_depth_400nm', 'total_fine_mode_aerosol_optical_depth_440nm', 'total_fine_mode_aerosol_optical_depth_469nm', + 'total_fine_mode_aerosol_optical_depth_500nm', 'total_fine_mode_aerosol_optical_depth_532nm', 'total_fine_mode_aerosol_optical_depth_550nm', + 'total_fine_mode_aerosol_optical_depth_645nm', 'total_fine_mode_aerosol_optical_depth_670nm', 'total_fine_mode_aerosol_optical_depth_800nm', + 'total_fine_mode_aerosol_optical_depth_858nm', 'total_fine_mode_aerosol_optical_depth_865nm', + ] + +INIT_TIMES: list[str] = [ + '00:00', '12:00', +] + +class CAMSConfig(dagster.Config): + date: str + raw_dir: str + +@dagster.op +def fetch_cams_forecast_for_day(context: dagster.OpExecutionContext, config: CAMSConfig): + """Fetch CAMS forecast for a given day.""" + + c = cdsapi.Client() + + date: dt.datetime = dt.datetime.strptime(config.date, "%Y-%m-%d") + + if date < dt.datetime(2015, 1, 1): + raise ValueError('CAMS data is only available from 2015-01-01 onwards.') + + for it in INIT_TIMES: + for var in VARIABLES: + fname: str = f'{config.raw_dir}/{date.strftime("%Y%m%d")}{it[:2]}_{var}.grib' + + c.retrieve( + 'cams-global-atmospheric-composition-forecasts', + { + 'date': date.strftime("%Y-%m-%d/%Y-%m-%d"), + 'type': 'forecast', + 'format': 'grib', + 'variable': var, + 'pressure_level': [ + '1', '2', '3', + '5', '7', '10', + '20', '30', '50', + '70', '100', '150', + '200', '250', '300', + '400', '500', '600', + '700', '800', '850', + '900', '925', '950', + '1000', + ], + 'leadtime_hour': [ + '0', '1', '10', + '100', '101', '102', + '103', '104', '105', + '106', '107', '108', + '109', '11', '110', + '111', '112', '113', + '114', '115', '116', + '117', '118', '119', + '12', '120', '13', + '14', '15', '16', + '17', '18', '19', + '2', '20', '21', + '22', '23', '24', + '25', '26', '27', + '28', '29', '3', + '30', '31', '32', + '33', '34', '35', + '36', '37', '38', + '39', '4', '40', + '41', '42', '43', + '44', '45', '46', + '47', '48', '49', + '5', '50', '51', + '52', '53', '54', + '55', '56', '57', + '58', '59', '6', + '60', '61', '62', + '63', '64', '65', + '66', '67', '68', + '69', '7', '70', + '71', '72', '73', + '74', '75', '76', + '77', '78', '79', + '8', '80', '81', + '82', '83', '84', + '85', '86', '87', + '88', '89', '9', + '90', '91', '92', + '93', '94', '95', + '96', '97', '98', + '99', + ], + "time": it, + }, + f'{config.raw_dir}/{date.strftime("%Y%m%d")}{it[:2]}_{var}.grib') + + context.log_event( + AssetObservation( + asset_key="cams_data", + metadata={ + "path": fname, + "date": date.strftime("%Y-%m-%d"), + "variable": var, + "init_time": it, + } + ) + ) diff --git a/nwp/jobs.py b/nwp/jobs.py index 3e52189..7edf7dd 100644 --- a/nwp/jobs.py +++ b/nwp/jobs.py @@ -5,11 +5,12 @@ import dagster +from nwp.assets.cams import CAMSConfig, fetch_cams_forecast_for_day from nwp.assets.dwd.common import IconConfig from nwp.assets.ecmwf.mars import ( NWPConsumerConfig, nwp_consumer_convert_op, - nwp_consumer_download_op + nwp_consumer_download_op, ) jobs: list[dagster.JobDefinition] = [] @@ -77,6 +78,24 @@ def build_config_on_runtime(model: str, run: str, delay: int = 0) -> dict: schedules.append( dagster.ScheduleDefinition(job=asset_job, cron_schedule="0 8 * * *")) +# --- CAMS jobs and schedules ---------------------------------------------- + +@dagster.daily_partitioned_config(start_date=dt.datetime(2015, 1, 1)) +def CAMSDailyPartitionConfig(start: dt.datetime, _end: dt.datetime) -> dict[str, Any]: + config = CAMSConfig( + date=start.strftime("%Y-%m-%d"), + raw_dir="/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/CAMS/raw", + ) + return {"ops": {"fetch_cams_forecast_for_day": {"config": json.loads(config.json())}}} + + +@dagster.job(config=CAMSDailyPartitionConfig) +def cams_daily_archive() -> None: + """Download CAMS data for a given day.""" + fetch_cams_forecast_for_day() + +jobs.append(cams_daily_archive) +schedules.append(dagster.build_schedule_from_partitioned_job(cams_daily_archive, hour_of_day=16)) # --- NWP Consumer jobs and schedules -------------------------------------- diff --git a/pyproject.toml b/pyproject.toml index 009426b..03d6ecb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,6 +18,7 @@ authors = [ ] classifiers = ["Programming Language :: Python :: 3"] dependencies = [ + "cdsapi == 0.6.1", "ecmwf-api-client == 1.6.3", "dagit == 1.4.11", "dagster == 1.4.11",