Skip to content

Commit

Permalink
Merge branch 'main' into new-pv-url
Browse files Browse the repository at this point in the history
  • Loading branch information
devsjc authored Dec 6, 2024
2 parents e8cc229 + 1616cdd commit 31ec2a8
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 12 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,6 @@ zarr

# Dagster
tmp*

# uv
uv.lock
21 changes: 12 additions & 9 deletions local_archives/nwp/ceda/ceda_global.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
"""Zarr archive of NWP data from the Met Office's Global model.
The MetOffice runs it's Unified Model (UM) in two configurations: Global, and UK.
This asset contains data from the global configuration covering the whole globe.
Sourced via FTP from CEDA (https://catalogue.ceda.ac.uk/uuid/86df725b793b4b4cb0ca0646686bd783).
This asset is updated monthly, and surfaced as a Zarr Directory Store for each month.
It is downloaded using the nwp-consumer docker image
(https://github.com/openclimatefix/nwp-consumer)
"""

import datetime as dt
import os
from typing import Any
Expand All @@ -12,15 +23,7 @@

@dg.asset(
name="zarr_archive",
description="".join((
"Zarr archive of NWP data from the Met Office's Global model. ",
"Sourced via FTP from CEDA ",
"(https://catalogue.ceda.ac.uk/uuid/86df725b793b4b4cb0ca0646686bd783). ",
"This asset is updated monthly, and surfaced as a Zarr Directory Store ",
"for each month. It is downloaded using the nwp-consumer ",
"docker image, currently from the 'major-refactor' branch ",
"(https://github.com/openclimatefix/nwp-consumer). ",
)),
description=__doc__,
key_prefix=["nwp", "ceda", "global"],
metadata={
"archive_folder": dg.MetadataValue.text(f"{ZARR_FOLDER}/nwp/ceda/global"),
Expand Down
14 changes: 13 additions & 1 deletion local_archives/nwp/ecmwf/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import dagster as dg

from . import ecmwf_malta, ecmwf_nw_india, ecmwf_uk, ecmwf_india
from . import (
ecmwf_malta,
ecmwf_nw_india,
ecmwf_uk,
ecmwf_india,
ecmwf_ens_stat_india,
)

uk_assets = dg.load_assets_from_modules(
modules=[ecmwf_uk],
Expand All @@ -23,9 +29,15 @@
group_name="ecmwf_india",
)

india_stat_assets = dg.load_assets_from_modules(
modules=[ecmwf_ens_stat_india],
group_name="ecmwf_ens_india_stat",
)

all_assets: list[dg.AssetsDefinition] = [
*uk_assets,
*nw_india_assets,
*malta_assets,
*india_assets,
*india_stat_assets,
]
75 changes: 75 additions & 0 deletions local_archives/nwp/ecmwf/ecmwf_ens_stat_india.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
"""Zarr archive of Summary NWP data from ECMWF's EPS.
EPS is the ECMWF Ensemble Prediction System,
which provides 50 perturbed forecasts of upcoming atmospheric conditions.
This asset contains summary statistics of this data (mean, standard deviation) for India.
Sourced via MARS API from ECMWF (https://apps.ecmwf.int/mars-catalogue).
This asset is updated monthly, and surfaced as a Zarr Directory Store for each month.
It is downloaded using the nwp-consumer docker image
(https://github.com/openclimatefix/nwp-consumer).
"""

import datetime as dt
import os
from typing import Any

import dagster as dg
from dagster_docker import PipesDockerClient

from constants import LOCATIONS_BY_ENVIRONMENT

env = os.getenv("ENVIRONMENT", "local")
ZARR_FOLDER = LOCATIONS_BY_ENVIRONMENT[env].NWP_ZARR_FOLDER
ARCHIVE_FOLDER = f"{ZARR_FOLDER}/nwp/ecmwf-eps/india-stat"

@dg.asset(
name="zarr_archive",
description=__doc__,
key_prefix=["nwp", "ecmwf-eps", "india-stat"],
metadata={
"archive_folder": dg.MetadataValue.text(ARCHIVE_FOLDER),
"area": dg.MetadataValue.text("global"),
"source": dg.MetadataValue.text("ecmwf-mars"),
"expected_runtime": dg.MetadataValue.text("6 hours"),
},
compute_kind="docker",
automation_condition=dg.AutomationCondition.eager(),
tags={
"dagster/max_runtime": str(60 * 60 * 10), # Should take 6 ish hours
"dagster/priority": "1",
"dagster/concurrency_key": "ecmwf-mars-consumer",
},
partitions_def=dg.MonthlyPartitionsDefinition(
start_date="2020-01-01",
end_offset=-3,
),
)
def ecmwf_eps_india_stat(
context: dg.AssetExecutionContext,
pipes_docker_client: PipesDockerClient,
) -> Any:
image: str = "ghcr.io/openclimatefix/nwp-consumer:1.0.3"
it: dt.datetime = context.partition_time_window.start
return pipes_docker_client.run(
image=image,
command=[
"archive"
"-y",
str(it.year),
"-m",
str(it.month),
],
env={
"MODEL_REPOSITORY": "ceda-metoffice-global",
"NOTIFICATION_REPOSITORY": "dagster-pipes",
"ECMWF_API_KEY": os.environ["ECMWF_API_KEY"],
"ECMWF_API_EMAIL": os.environ["ECMWF_API_EMAIL"],
"ECMWF_API_URL": os.environ["ECMWF_API_URL"],
"ECMWF_MARS_AREA": "35/67/6/97",
},
container_kwargs={
"volumes": [f"{ARCHIVE_FOLDER}:/work"],
},
context=context,
).get_results()
4 changes: 2 additions & 2 deletions tests/compile_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ def test_nwp_asset_key_prefixes() -> None:
# The first element should be the flavor:
assert asset.key.path[0] in ["nwp", "sat"]
# The second element should be the provider
assert asset.key.path[1] in ["ecmwf", "metoffice", "eumetsat", "cams", "ceda", "meteomatics", "gfs"]
assert asset.key.path[1] in ["ecmwf", "metoffice", "eumetsat", "cams", "ceda", "meteomatics", "gfs", "ecmwf-eps"]
# The third element should be the region
assert asset.key.path[2] in ["uk", "eu", "global", "nw_india", "malta", "india"]
assert asset.key.path[2] in ["uk", "eu", "global", "nw_india", "malta", "india", "india-stat"]

0 comments on commit 31ec2a8

Please sign in to comment.