-
-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add test ecmwf job #7
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ | |
__pycache__/ | ||
*.py[cod] | ||
*$py.class | ||
**/__pycache__/ | ||
|
||
# C extensions | ||
*.so | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,10 +1,7 @@ | ||
from dagster import Definitions, load_assets_from_modules | ||
from nwp import assets, jobs | ||
from nwp import defs | ||
|
||
def test_compiles(): | ||
all_assets = load_assets_from_modules([assets]) | ||
defs = Definitions( | ||
assets=all_assets, | ||
jobs=jobs.asset_jobs, | ||
schedules=jobs.schedule_jobs | ||
) | ||
job_names = [d.name for d in list(defs.get_all_job_defs())] | ||
assert "get_ecmwf_data" in job_names | ||
assert len(job_names) == 18 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1 @@ | ||
from nwp.assets.dwd.archive_to_hf import download_model_files, process_model_files, upload_model_files_to_hf | ||
from nwp.assets.ecmwf.mars import download_mars_file |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,25 +1,32 @@ | ||
from dagster_docker import execute_docker_container | ||
from dagster import Config, OpExecutionContext, op | ||
import datetime as dt | ||
|
||
from dagster import Output, asset | ||
from ecmwfapi import ECMWFService | ||
|
||
server = ECMWFService("mars") | ||
class NWPConsumerConfig(Config): | ||
date_from: str | ||
date_to: str | ||
source: str | ||
|
||
|
||
@asset | ||
def download_mars_file(): | ||
server.execute( | ||
req={ | ||
"class": "od", | ||
"date": "20230815/to/20230816", | ||
"expver": "1", | ||
"levtype": "sfc", | ||
"param": "28.228/49.128/123.128/165.128/166.128/239.228/246.228/247.228", | ||
"step": "0/t0/48/by/1", | ||
"stream": "oper", | ||
"time": "00:00:00,12:00:00", | ||
"type": "fc", | ||
}, | ||
target="20230815.grib" | ||
@op | ||
def nwp_consumer_docker_op(context: OpExecutionContext, config: NWPConsumerConfig): | ||
execute_docker_container( | ||
context=context, | ||
image="ghcr.io/openclimatefix/nwp-consumer", | ||
command=[ | ||
"consume", f'--source={config.source}', | ||
f'--from={config.date_from}', | ||
f'--to={config.date_to}' | ||
], | ||
env_vars=["ECMWF_API_KEY", "ECMWF_API_URL", "ECMWF_API_EMAIL"], | ||
container_kwargs={ | ||
"volumes": [ | ||
'/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/raw:/tmp/raw', | ||
'/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/zarr:/tmp/zarr', | ||
'/tmp/nwpc:/tmp/nwpc' | ||
] | ||
} | ||
) | ||
|
||
return Output(None, metadata={"filepath": "20230815.grib"}) | ||
pass | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,10 @@ | ||
from dagster import AssetSelection, ScheduleDefinition, define_asset_job, schedule | ||
from dagster import AssetSelection, ScheduleDefinition, define_asset_job, schedule, job, RunConfig | ||
|
||
from nwp.assets.dwd.common import IconConfig | ||
from nwp.assets.ecmwf.mars import nwp_consumer_docker_op, NWPConsumerConfig | ||
|
||
import datetime as dt | ||
|
||
|
||
base_path = "/mnt/storage_b/data/ocf/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/DWD" | ||
|
||
|
@@ -21,31 +25,42 @@ def build_config_on_runtime(model, run, delay=0): | |
for r in ["00", "06", "12", "18"]: | ||
for model in ["global", "eu"]: | ||
for delay in [0, 1]: | ||
asset_job = define_asset_job(f"download_{model}_run_{r}_{'today' if delay == 0 else 'yesterday'}", AssetSelection.all(), | ||
config={ | ||
'ops': {"download_model_files": {"config": build_config_on_runtime(model, r, delay)}, | ||
"process_model_files": {"config": build_config_on_runtime(model, r, delay)}, | ||
"upload_model_files_to_hf": { | ||
"config": build_config_on_runtime(model, r, delay)}, }}) | ||
if delay == 0: | ||
if r == "00": | ||
schedule = ScheduleDefinition(job=asset_job, cron_schedule="30 4 * * *") | ||
elif r == "06": | ||
schedule = ScheduleDefinition(job=asset_job, cron_schedule="30 10 * * *") | ||
elif r == "12": | ||
schedule = ScheduleDefinition(job=asset_job, cron_schedule="30 16 * * *") | ||
elif r == "18": | ||
schedule = ScheduleDefinition(job=asset_job, cron_schedule="30 22 * * *") | ||
elif delay == 1: | ||
if r == "00": | ||
schedule = ScheduleDefinition(job=asset_job, cron_schedule="1 0 * * *") | ||
elif r == "06": | ||
schedule = ScheduleDefinition(job=asset_job, cron_schedule="0 2 * * *") | ||
elif r == "12": | ||
schedule = ScheduleDefinition(job=asset_job, cron_schedule="0 6 * * *") | ||
elif r == "18": | ||
schedule = ScheduleDefinition(job=asset_job, cron_schedule="0 8 * * *") | ||
|
||
asset_job = define_asset_job( | ||
name=f"download_{model}_run_{r}_{'today' if delay == 0 else 'yesterday'}", | ||
selection=AssetSelection.all(), | ||
config={'ops': { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this needs to be changed, but from the docs, I'm not really sure how? In the UI on leonardo, I was seeing all the jobs, including the ECMWF ones, and satellite one, show up in the assets to be materialized. Not sure how to select the assets based off the prefix, or if the prefix would just be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Potentially you're seeing nwp and sat in the UI on leonardo because I specified in the |
||
"download_model_files": {"config": build_config_on_runtime(model, r, delay)}, | ||
"process_model_files": {"config": build_config_on_runtime(model, r, delay)}, | ||
"upload_model_files_to_hf": {"config": build_config_on_runtime(model, r, delay)}, | ||
}} | ||
) | ||
match (delay, r): | ||
case (0, "00"): | ||
schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="30 4 * * *")) | ||
case (0, "06"): | ||
schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="30 10 * * *")) | ||
case (0, "12"): | ||
schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="30 16 * * *")) | ||
case (0, "18"): | ||
schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="30 22 * * *")) | ||
case (1, "00"): | ||
schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="1 0 * * *")) | ||
case (1, "06"): | ||
schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="0 2 * * *")) | ||
case (1, "12"): | ||
schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="0 6 * * *")) | ||
case (1, "18"): | ||
schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="0 8 * * *")) | ||
|
||
asset_jobs.append(asset_job) | ||
schedule_jobs.append(schedule) | ||
|
||
@job(config=RunConfig( | ||
ops={"nwp_consumer_docker_op": NWPConsumerConfig( | ||
date_from="2021-01-01", | ||
date_to="2021-01-01", | ||
source="ecmwf-mars" | ||
)} | ||
)) | ||
def get_ecmwf_data(): | ||
nwp_consumer_docker_op() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know this looks weird, but locally all the other jobs still showed up in the UI? Might be worth you pulling and checking yourself. I'm not sure if it's because they're already initialised as ScheduledJobDefinitions, and as such, don't need to go in the
Definitions
object? But I could be wrong!There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah okay, yeah, that would be interesting. I am fine merging it for now, as I'm off tomorrow. But I'll check next week on it, it seems like it should have to be defined here, but yeah, not really sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it's definitely odd. The unit test checks that there are 18 jobs, not just one, and it is passing, so it does seem that they really are all defined still! But I'm certainly confused by it as well!