Skip to content

Commit

Permalink
Add india ECMWF Job
Browse files Browse the repository at this point in the history
  • Loading branch information
devsjc committed Oct 3, 2023
1 parent e09ca4f commit b296d36
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 60 deletions.
95 changes: 49 additions & 46 deletions nwp/assets/ecmwf/mars.py
Original file line number Diff line number Diff line change
@@ -1,66 +1,69 @@
import nwp_consumer.cmd.main as consumer
import contextlib
import os
from dagster import Config, OpExecutionContext, op
from dagster_docker import execute_docker_container

@contextlib.contextmanager
def modify_env(vars: dict[str, str]):
"""Temporarily modify the environment."""
for var in vars:
oldval = os.environ.get(var)
newval = vars[var]
os.environ[var] = newval
vars[var] = oldval
try:
yield
finally:
for var in vars:
os.environ[var] = oldval

class NWPConsumerConfig(Config):
"""Configuration for the NWP consumer."""

date_from: str
date_to: str
source: str
docker_volumes: list[str]
raw_dir: str
zarr_dir: str
env_vars: list[str]

@op
def nwp_consumer_docker_op(context: OpExecutionContext, config: NWPConsumerConfig):
execute_docker_container(
context=context,
image="ghcr.io/openclimatefix/nwp-consumer:latest",
command=[
"consume", f'--source={config.source}',
f'--from={config.date_from}',
f'--to={config.date_to}'
],
env_vars=config.env_vars,
container_kwargs={
"volumes": config.docker_volumes
}
)
env_overrides: dict[str, str]

pass

@op
def nwp_consumer_download_op(context: OpExecutionContext, config: NWPConsumerConfig):
def nwp_consumer_download_op(context: OpExecutionContext, config: NWPConsumerConfig) \
-> NWPConsumerConfig:
"""Download the data from the source."""
consumer.run({
"download": True,
"convert": False,
"consume": False,
"check": False,
"--source": config.source,
"--sink": "local",
"--from": config.date_from,
"--to": config.date_to,
"--rdir": config.raw_dir,
"--zdir": config.zarr_dir,
"--create-latest": False,
})
with modify_env(config.env_overrides):
consumer.run({
"download": True,
"convert": False,
"consume": False,
"check": False,
"--source": config.source,
"--sink": "local",
"--from": config.date_from,
"--to": config.date_to,
"--rdir": config.raw_dir,
"--zdir": config.zarr_dir,
"--create-latest": False,
})

return config

@op
def nwp_consumer_convert_op(context: OpExecutionContext, downloadedConfig: NWPConsumerConfig):
"""Convert the downloaded data to zarr format."""
consumer.run({
"download": False,
"convert": True,
"consume": False,
"check": False,
"--source": downloadedConfig.source,
"--sink": "local",
"--from": downloadedConfig.date_from,
"--to": downloadedConfig.date_to,
"--rdir": downloadedConfig.raw_dir,
"--zdir": downloadedConfig.zarr_dir,
"--create-latest": False,
})
with modify_env(downloadedConfig.env_overrides):
consumer.run({
"download": False,
"convert": True,
"consume": False,
"check": False,
"--source": downloadedConfig.source,
"--sink": "local",
"--from": downloadedConfig.date_from,
"--to": downloadedConfig.date_to,
"--rdir": downloadedConfig.raw_dir,
"--zdir": downloadedConfig.zarr_dir,
"--create-latest": False,
})
34 changes: 20 additions & 14 deletions nwp/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,39 +65,45 @@ def build_config_on_runtime(model: str, run: str, delay: int = 0) -> dict:


@daily_partitioned_config(start_date=dt.datetime(2020, 1, 1))
def ecmwf_daily_partitioned_config_docker(start: dt.datetime, _end: dt.datetime):
def ecmwf_uk_daily_partitioned_config(start: dt.datetime, _end: dt.datetime) -> dict:
"""Create a config dict for the nwp-consumer for uk data from ECMWF."""
config: NWPConsumerConfig = NWPConsumerConfig(
date_from=start.strftime("%Y-%m-%d"),
date_to=start.strftime("%Y-%m-%d"),
source="ecmwf-mars",
env_vars=["ECMWF_API_URL", "ECMWF_API_KEY", "ECMWF_API_EMAIL"],
docker_volumes=['/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF:/tmp'],
zarr_dir='/tmp/zarr',
raw_dir='/tmp/raw',
env_overrides={"ECMWF_AREA": "uk"},
zarr_dir='/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/uk/zarr',
raw_dir='/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/uk/raw',
)
return {"ops": {
"nwp_consumer_docker_op": {"config": json.loads(config.json())},
"nwp_consumer_download_op": {"config": json.loads(config.json())},
}}

@job(config=ecmwf_uk_daily_partitioned_config)
def ecmwf_uk_daily_local_archive() -> None:
"""Download and convert ECMWF data for the UK."""
nwp_consumer_convert_op(nwp_consumer_download_op())

@daily_partitioned_config(start_date=dt.datetime(2020, 1, 1))
def ecmwf_daily_partitioned_config(start: dt.datetime, _end: dt.datetime):
def ecmwf_india_daily_partitioned_config(start: dt.datetime, _end: dt.datetime) -> dict:
"""Create a config dict for the nwp-consumer for india data from ECMWF."""
config: NWPConsumerConfig = NWPConsumerConfig(
date_from=start.strftime("%Y-%m-%d"),
date_to=start.strftime("%Y-%m-%d"),
source="ecmwf-mars",
env_vars=["ECMWF_API_URL", "ECMWF_API_KEY", "ECMWF_API_EMAIL"],
docker_volumes=[],
zarr_dir='/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/zarr',
raw_dir='/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/raw',
env_overrides={"ECMWF_AREA": "nw-india"},
zarr_dir='/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/india/zarr',
raw_dir='/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/india/raw',
)
return {"ops": {
"nwp_consumer_download_op": {"config": json.loads(config.json())},
}}

@job(config=ecmwf_daily_partitioned_config)
def ecmwf_daily_local_archive():
@job(config=ecmwf_india_daily_partitioned_config)
def ecmwf_india_daily_local_archive() -> None:
"""Download and convert ECMWF data for India."""
nwp_consumer_convert_op(nwp_consumer_download_op())

schedules.append(build_schedule_from_partitioned_job(ecmwf_daily_local_archive, hour_of_day=13))
schedules.append(build_schedule_from_partitioned_job(ecmwf_uk_daily_local_archive, hour_of_day=13))
schedules.append(build_schedule_from_partitioned_job(ecmwf_india_daily_local_archive, hour_of_day=14))

0 comments on commit b296d36

Please sign in to comment.