Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ecmwf): Add india-stat assets #140

Merged
merged 5 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,6 @@ zarr

# Dagster
tmp*

# uv
uv.lock
21 changes: 12 additions & 9 deletions local_archives/nwp/ceda/ceda_global.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
"""Zarr archive of NWP data from the Met Office's Global model.

The MetOffice runs it's Unified Model (UM) in two configurations: Global, and UK.
This asset contains data from the global configuration covering the whole globe.

Sourced via FTP from CEDA (https://catalogue.ceda.ac.uk/uuid/86df725b793b4b4cb0ca0646686bd783).
This asset is updated monthly, and surfaced as a Zarr Directory Store for each month.
It is downloaded using the nwp-consumer docker image
(https://github.com/openclimatefix/nwp-consumer)
"""

import datetime as dt
import os
from typing import Any
Expand All @@ -12,15 +23,7 @@

@dg.asset(
name="zarr_archive",
description="".join((
"Zarr archive of NWP data from the Met Office's Global model. ",
"Sourced via FTP from CEDA ",
"(https://catalogue.ceda.ac.uk/uuid/86df725b793b4b4cb0ca0646686bd783). ",
"This asset is updated monthly, and surfaced as a Zarr Directory Store ",
"for each month. It is downloaded using the nwp-consumer ",
"docker image, currently from the 'major-refactor' branch ",
"(https://github.com/openclimatefix/nwp-consumer). ",
)),
description=__doc__,
key_prefix=["nwp", "ceda", "global"],
metadata={
"archive_folder": dg.MetadataValue.text(f"{ZARR_FOLDER}/nwp/ceda/global"),
Expand Down
14 changes: 13 additions & 1 deletion local_archives/nwp/ecmwf/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import dagster as dg

from . import ecmwf_malta, ecmwf_nw_india, ecmwf_uk, ecmwf_india
from . import (
ecmwf_malta,
ecmwf_nw_india,
ecmwf_uk,
ecmwf_india,
ecmwf_ens_stat_india,
)

uk_assets = dg.load_assets_from_modules(
modules=[ecmwf_uk],
Expand All @@ -23,9 +29,15 @@
group_name="ecmwf_india",
)

india_stat_assets = dg.load_assets_from_modules(
modules=[ecmwf_ens_stat_india],
group_name="ecmwf_ens_india_stat",
)

all_assets: list[dg.AssetsDefinition] = [
*uk_assets,
*nw_india_assets,
*malta_assets,
*india_assets,
*india_stat_assets,
]
75 changes: 75 additions & 0 deletions local_archives/nwp/ecmwf/ecmwf_ens_stat_india.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
"""Zarr archive of Summary NWP data from ECMWF's EPS.

EPS is the ECMWF Ensemble Prediction System,
which provides 50 perturbed forecasts of upcoming atmospheric conditions.
This asset contains summary statistics of this data (mean, standard deviation) for India.

Sourced via MARS API from ECMWF (https://apps.ecmwf.int/mars-catalogue).
This asset is updated monthly, and surfaced as a Zarr Directory Store for each month.
It is downloaded using the nwp-consumer docker image
(https://github.com/openclimatefix/nwp-consumer).
"""

import datetime as dt
devsjc marked this conversation as resolved.
Show resolved Hide resolved
import os
from typing import Any

import dagster as dg
from dagster_docker import PipesDockerClient

from constants import LOCATIONS_BY_ENVIRONMENT

env = os.getenv("ENVIRONMENT", "local")
ZARR_FOLDER = LOCATIONS_BY_ENVIRONMENT[env].NWP_ZARR_FOLDER
ARCHIVE_FOLDER = f"{ZARR_FOLDER}/nwp/ecmwf-eps/india-stat"

@dg.asset(
name="zarr_archive",
description=__doc__,
key_prefix=["nwp", "ecmwf-eps", "india-stat"],
metadata={
"archive_folder": dg.MetadataValue.text(ARCHIVE_FOLDER),
"area": dg.MetadataValue.text("global"),
"source": dg.MetadataValue.text("ecmwf-mars"),
"expected_runtime": dg.MetadataValue.text("6 hours"),
},
compute_kind="docker",
automation_condition=dg.AutomationCondition.eager(),
tags={
"dagster/max_runtime": str(60 * 60 * 10), # Should take 6 ish hours
"dagster/priority": "1",
"dagster/concurrency_key": "ecmwf-mars-consumer",
},
partitions_def=dg.MonthlyPartitionsDefinition(
start_date="2020-01-01",
end_offset=-3,
),
)
def ecmwf_eps_india_stat(
context: dg.AssetExecutionContext,
pipes_docker_client: PipesDockerClient,
) -> Any:
image: str = "ghcr.io/openclimatefix/nwp-consumer:1.0.3"
it: dt.datetime = context.partition_time_window.start
return pipes_docker_client.run(
image=image,
command=[
"archive"
"-y",
str(it.year),
"-m",
str(it.month),
],
env={
"MODEL_REPOSITORY": "ceda-metoffice-global",
"NOTIFICATION_REPOSITORY": "dagster-pipes",
"ECMWF_API_KEY": os.environ["ECMWF_API_KEY"],
"ECMWF_API_EMAIL": os.environ["ECMWF_API_EMAIL"],
"ECMWF_API_URL": os.environ["ECMWF_API_URL"],
"ECMWF_MARS_AREA": "35/67/6/97",
},
container_kwargs={
"volumes": [f"{ARCHIVE_FOLDER}:/work"],
},
context=context,
).get_results()
4 changes: 2 additions & 2 deletions tests/compile_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ def test_nwp_asset_key_prefixes() -> None:
# The first element should be the flavor:
assert asset.key.path[0] in ["nwp", "sat"]
# The second element should be the provider
assert asset.key.path[1] in ["ecmwf", "metoffice", "eumetsat", "cams", "ceda", "meteomatics", "gfs"]
assert asset.key.path[1] in ["ecmwf", "metoffice", "eumetsat", "cams", "ceda", "meteomatics", "gfs", "ecmwf-eps"]
# The third element should be the region
assert asset.key.path[2] in ["uk", "eu", "global", "nw_india", "malta", "india"]
assert asset.key.path[2] in ["uk", "eu", "global", "nw_india", "malta", "india", "india-stat"]
Loading