Skip to content

Commit

Permalink
Add test ecmwf job
Browse files Browse the repository at this point in the history
  • Loading branch information
devsjc committed Sep 7, 2023
1 parent 8c22682 commit 6c15f15
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 57 deletions.
11 changes: 4 additions & 7 deletions dags_tests/compile_test.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
from dagster import Definitions, load_assets_from_modules
from nwp import assets, jobs
from nwp import defs

def test_compiles():
all_assets = load_assets_from_modules([assets])
defs = Definitions(
assets=all_assets,
jobs=jobs.asset_jobs,
schedules=jobs.schedule_jobs
)
job_names = [d.name for d in list(defs.get_all_job_defs())]
assert "get_ecmwf_data" in job_names
assert len(job_names) == 18
2 changes: 1 addition & 1 deletion nwp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@

defs = Definitions(
assets=all_assets,
jobs=jobs.asset_jobs,
jobs=[jobs.get_ecmwf_data],
schedules=jobs.schedule_jobs,
)
Binary file modified nwp/__pycache__/__init__.cpython-310.pyc
Binary file not shown.
Binary file modified nwp/__pycache__/jobs.cpython-310.pyc
Binary file not shown.
1 change: 0 additions & 1 deletion nwp/assets/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
from nwp.assets.dwd.archive_to_hf import download_model_files, process_model_files, upload_model_files_to_hf
from nwp.assets.ecmwf.mars import download_mars_file
47 changes: 27 additions & 20 deletions nwp/assets/ecmwf/mars.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,32 @@
from dagster_docker import execute_docker_container
from dagster import Config, OpExecutionContext, op
import datetime as dt

from dagster import Output, asset
from ecmwfapi import ECMWFService

server = ECMWFService("mars")
class NWPConsumerConfig(Config):
date_from: str
date_to: str
source: str


@asset
def download_mars_file():
server.execute(
req={
"class": "od",
"date": "20230815/to/20230816",
"expver": "1",
"levtype": "sfc",
"param": "28.228/49.128/123.128/165.128/166.128/239.228/246.228/247.228",
"step": "0/t0/48/by/1",
"stream": "oper",
"time": "00:00:00,12:00:00",
"type": "fc",
},
target="20230815.grib"
@op
def nwp_consumer_docker_op(context: OpExecutionContext, config: NWPConsumerConfig):
execute_docker_container(
context=context,
image="ghcr.io/openclimatefix/nwp-consumer",
command=[
"consume", f'--source={config.source}',
f'--from={config.date_from}',
f'--to={config.date_to}'
],
env_vars=["ECMWF_API_KEY", "ECMWF_API_URL", "ECMWF_API_EMAIL"],
container_kwargs={
"volumes": [
'/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/raw:/tmp/raw',
'/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/zarr:/tmp/zarr',
'/tmp/nwpc:/tmp/nwpc'
]
}
)

return Output(None, metadata={"filepath": "20230815.grib"})
pass

69 changes: 42 additions & 27 deletions nwp/jobs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from dagster import AssetSelection, ScheduleDefinition, define_asset_job, schedule
from dagster import AssetSelection, ScheduleDefinition, define_asset_job, schedule, job, RunConfig

from nwp.assets.dwd.common import IconConfig
from nwp.assets.ecmwf.mars import nwp_consumer_docker_op, NWPConsumerConfig

import datetime as dt


base_path = "/mnt/storage_b/data/ocf/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/DWD"

Expand All @@ -21,31 +25,42 @@ def build_config_on_runtime(model, run, delay=0):
for r in ["00", "06", "12", "18"]:
for model in ["global", "eu"]:
for delay in [0, 1]:
asset_job = define_asset_job(f"download_{model}_run_{r}_{'today' if delay == 0 else 'yesterday'}", AssetSelection.all(),
config={
'ops': {"download_model_files": {"config": build_config_on_runtime(model, r, delay)},
"process_model_files": {"config": build_config_on_runtime(model, r, delay)},
"upload_model_files_to_hf": {
"config": build_config_on_runtime(model, r, delay)}, }})
if delay == 0:
if r == "00":
schedule = ScheduleDefinition(job=asset_job, cron_schedule="30 4 * * *")
elif r == "06":
schedule = ScheduleDefinition(job=asset_job, cron_schedule="30 10 * * *")
elif r == "12":
schedule = ScheduleDefinition(job=asset_job, cron_schedule="30 16 * * *")
elif r == "18":
schedule = ScheduleDefinition(job=asset_job, cron_schedule="30 22 * * *")
elif delay == 1:
if r == "00":
schedule = ScheduleDefinition(job=asset_job, cron_schedule="1 0 * * *")
elif r == "06":
schedule = ScheduleDefinition(job=asset_job, cron_schedule="0 2 * * *")
elif r == "12":
schedule = ScheduleDefinition(job=asset_job, cron_schedule="0 6 * * *")
elif r == "18":
schedule = ScheduleDefinition(job=asset_job, cron_schedule="0 8 * * *")

asset_job = define_asset_job(
name=f"download_{model}_run_{r}_{'today' if delay == 0 else 'yesterday'}",
selection=AssetSelection.all(),
config={'ops': {
"download_model_files": {"config": build_config_on_runtime(model, r, delay)},
"process_model_files": {"config": build_config_on_runtime(model, r, delay)},
"upload_model_files_to_hf": {"config": build_config_on_runtime(model, r, delay)},
}}
)
match (delay, r):
case (0, "00"):
schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="30 4 * * *"))
case (0, "06"):
schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="30 10 * * *"))
case (0, "12"):
schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="30 16 * * *"))
case (0, "18"):
schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="30 22 * * *"))
case (1, "00"):
schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="1 0 * * *"))
case (1, "06"):
schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="0 2 * * *"))
case (1, "12"):
schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="0 6 * * *"))
case (1, "18"):
schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="0 8 * * *"))

asset_jobs.append(asset_job)
schedule_jobs.append(schedule)

@job(config=RunConfig(
ops={"nwp_consumer_docker_op": NWPConsumerConfig(
date_from="2021-01-01",
date_to="2021-01-01",
source="ecmwf-mars"
)}
))
def get_ecmwf_data():
nwp_consumer_docker_op()

4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ dependencies = [
"dagit == 1.4.11",
"dagster == 1.4.11",
"dagster-cloud == 1.4.11",
"dagster-docker == 0.20.11",
"huggingface-hub == 0.16.4",
"numpy == 1.24.2",
"ocf-blosc2 == 0.0.3",
Expand All @@ -36,7 +37,8 @@ dev = [
"ruff == 0.0.259",
"unittest-xml-reporting == 3.2.0",
"dagster-webserver == 1.4.11",
"pytest == 7.4.1"
"pytest == 7.4.1",
"python-lsp-server == 1.7.4"
]

[tool.setuptools.packages.find]
Expand Down

0 comments on commit 6c15f15

Please sign in to comment.