Skip to content

Commit

Permalink
feat(local): Add MetOffice global asset (#125)
Browse files Browse the repository at this point in the history
* feat(local): Add MetOffice global asset

* style(api): Update AutomizationPolicy -> AutomationCondition

* chore(env): Bump dagster versions

* fix(api): Use newer dagster API naming

* fix(ceda_global): Add file

* fix(ceda_global): Use constants for folders
  • Loading branch information
devsjc authored Sep 11, 2024
1 parent ecde896 commit 07d41fe
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 18 deletions.
4 changes: 2 additions & 2 deletions cloud_archives/pv/passiv/passiv_monthly.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def get_monthly_passiv_data(start_date: datetime, upload_to_hf: bool = True, ove

@dg.asset(
key=["pv", "passiv", "monthly_30min"],
auto_materialize_policy=dg.AutoMaterializePolicy.eager(),
automation_condition=dg.AutomationCondition.eager(),
partitions_def=dg.TimeWindowPartitionsDefinition(
fmt="%Y-%m",
start="2010-01",
Expand All @@ -104,7 +104,7 @@ def pv_passiv_monthly_30min(context: dg.AssetExecutionContext):

@dg.asset(
key=["pv", "passiv", "monthly_5min"],
auto_materialize_policy=dg.AutoMaterializePolicy.eager(),
automation_condition=dg.AutomationCondition.eager(),
partitions_def=dg.TimeWindowPartitionsDefinition(
fmt="%Y-%m",
start="2018-01",
Expand Down
4 changes: 2 additions & 2 deletions cloud_archives/pv/passiv/passiv_year.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def get_yearly_passiv_data(start_date: datetime, upload_to_hf: bool = True, over

@dg.asset(
key=["pv", "passiv", "yearly_5min"],
auto_materialize_policy=dg.AutoMaterializePolicy.eager(),
automation_condition=dg.AutomationCondition.eager(),
partitions_def=dg.TimeWindowPartitionsDefinition(
fmt="%Y",
start="2018",
Expand All @@ -86,7 +86,7 @@ def pv_passiv_yearly_5min(context: dg.AssetExecutionContext):

@dg.asset(
key=["pv", "passiv", "yearly_30min"],
auto_materialize_policy=dg.AutoMaterializePolicy.eager(),
automation_condition=dg.AutomationCondition.eager(),
partitions_def=dg.TimeWindowPartitionsDefinition(
fmt="%Y",
start="2010",
Expand Down
4 changes: 2 additions & 2 deletions local_archives/nwp/_generic_definitions_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def make_definitions(
@dg.asset(
name="raw_archive",
key_prefix=opts.key_prefix(),
auto_materialize_policy=dg.AutoMaterializePolicy.eager(),
automation_condition=dg.AutomationCondition.eager(),
partitions_def=opts.partitions,
check_specs=[
dg.AssetCheckSpec(
Expand Down Expand Up @@ -168,7 +168,7 @@ def _raw_archive(
name="zarr_archive",
key_prefix=opts.key_prefix(),
partitions_def=opts.partitions,
auto_materialize_policy=dg.AutoMaterializePolicy.eager(),
automation_condition=dg.AutomationCondition.eager(),
ins={"raw_paths": dg.AssetIn(key=_raw_archive.key)},
io_manager_key="nwp_xr_zarr_io",
compute_kind="process",
Expand Down
9 changes: 7 additions & 2 deletions local_archives/nwp/ceda/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import dagster as dg

from . import ceda_uk
from . import ceda_uk, ceda_global

uk_assets = dg.load_assets_from_modules(
modules=[ceda_uk],
group_name="ceda_uk",
)

all_assets: list[dg.AssetsDefinition] = [*uk_assets]
global_assets = dg.load_assets_from_modules(
modules=[ceda_global],
group_name="ceda_global",
)

all_assets: list[dg.AssetsDefinition] = [*uk_assets, *global_assets]
52 changes: 52 additions & 0 deletions local_archives/nwp/ceda/ceda_global.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import dagster as dg
import os
from typing import Any
import datetime as dt

from dagster_docker import PipesDockerClient
from constants import LOCATIONS_BY_ENVIRONMENT

env = os.getenv("ENVIRONMENT", "local")
ZARR_FOLDER = LOCATIONS_BY_ENVIRONMENT[env].NWP_ZARR_FOLDER

@dg.asset(
name="zarr_archive",
key_prefix=["nwp", "ceda", "global"],
metadata={
"archive_folder": dg.MetadataValue.text(f"{ZARR_FOLDER}/nwp/ceda/global"),
"area": dg.MetadataValue.text("global"),
"source": dg.MetadataValue.text("ceda"),
},
compute_kind="download",
op_tags={"dagster/max_runtime": int(60 * 100)},
partitions_def=dg.MonthlyPartitionsDefinition(
start_date="2019-01-01",
end_offset=-3,
),
)
def ceda_global(
context: dg.AssetExecutionContext,
pipes_docker_client: PipesDockerClient,
) -> Any:
image: str = "ghcr.io/openclimatefix/nwp-consumer:devsjc-major-refactor"
it: dt.datetime = context.partition_time_window.start
return pipes_docker_client.run(
image=image,
command=[
"consume",
"-y",
str(it.year),
"-m",
str(it.month),
],
env={
"NWP_CONSUMER_MODEL_REPOSITORY": "ceda-metoffice-global",
"NWP_CONSUMER_NOTIFICATION_REPOSITORY": "dagster-pipes",
"CEDA_FTP_USER": os.environ["CEDA_FTP_USER"],
"CEDA_FTP_PASSWORD": os.environ["CEDA_FTP_PASSWORD"],
},
container_kwargs={
"volumes": [f"{ZARR_FOLDER}/nwp/ceda/global:/work"],
},
context=context,
).get_results()
2 changes: 1 addition & 1 deletion local_archives/nwp/gfs/gfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
name="zarr_daily_archive",
description="Daily archive of GFS global NWP data",
key_prefix=["nwp", "gfs", "global"],
auto_materialize_policy=dg.AutoMaterializePolicy.eager(),
automation_condition=dg.AutomationCondition.eager(),
partitions_def=dg.DailyPartitionsDefinition(
start_date="2015-01-15",
end_offset=-2,
Expand Down
16 changes: 8 additions & 8 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ classifiers = ["Programming Language :: Python :: 3"]
dependencies = [
"cdsapi >= 0.6.1",
"ecmwf-api-client >= 1.6.3",
"dagit >= 1.6.2",
"dagster >= 1.6.2",
"dagster-cloud >= 1.6.2",
"dagster-webserver >= 1.6.2",
"dagster-graphql >= 1.6.2",
"dagster-postgres >= 0.22.9",
"dagster-docker >= 0.22.9",
"dagster-pipes >= 1.7.8",
"dagit >= 1.8.5",
"dagster >= 1.8.5",
"dagster-cloud >= 1.8.5",
"dagster-webserver >= 1.8.5",
"dagster-graphql >= 1.8.5",
"dagster-postgres >= 0.24.5",
"dagster-docker >= 0.24.5",
"dagster-pipes >= 1.8.5",
"huggingface-hub >= 0.19.4",
"kbatch >= 0.4.2",
"meteomatics == 2.11.1",
Expand Down
2 changes: 1 addition & 1 deletion tests/cloud_archives/pv/test_passiv.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@

def test_get_daily_passiv_data():
start_date = datetime(2022, 1, 1, tzinfo=timezone.utc)
get_monthly_passiv_data(start_date, upload_to_hf=False, overwrite=True)
get_monthly_passiv_data(start_date, upload_to_hf=False, overwrite=True)

0 comments on commit 07d41fe

Please sign in to comment.