diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b9afde2..5cccead 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -47,7 +47,7 @@ jobs: # * Except this installs the dev dependencies and binaries as well - name: Install all dependencies run: | - ./venv/bin/pip install -q .[dev] + ./venv/bin/pip install -q --no-binary=dags .[dev] if: steps.restore-cache.outputs.cache-hit != 'true' # Cache the virtualenv for future runs diff --git a/.gitignore b/.gitignore index b50e7e0..2e9fdcc 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ __pycache__/ *.py[cod] *$py.class +**/__pycache__/ # C extensions *.so diff --git a/dags_tests/compile_test.py b/dags_tests/compile_test.py index 58085da..c979b12 100644 --- a/dags_tests/compile_test.py +++ b/dags_tests/compile_test.py @@ -1,10 +1,7 @@ from dagster import Definitions, load_assets_from_modules -from nwp import assets, jobs +from nwp import defs def test_compiles(): - all_assets = load_assets_from_modules([assets]) - defs = Definitions( - assets=all_assets, - jobs=jobs.asset_jobs, - schedules=jobs.schedule_jobs - ) + job_names = [d.name for d in list(defs.get_all_job_defs())] + assert "get_ecmwf_data" in job_names + assert len(job_names) == 18 diff --git a/nwp/__init__.py b/nwp/__init__.py index 3e64b21..1517e8e 100644 --- a/nwp/__init__.py +++ b/nwp/__init__.py @@ -7,6 +7,6 @@ defs = Definitions( assets=all_assets, - jobs=jobs.asset_jobs, + jobs=[jobs.get_ecmwf_data], schedules=jobs.schedule_jobs, ) diff --git a/nwp/__pycache__/__init__.cpython-310.pyc b/nwp/__pycache__/__init__.cpython-310.pyc deleted file mode 100644 index 8dc65d8..0000000 Binary files a/nwp/__pycache__/__init__.cpython-310.pyc and /dev/null differ diff --git a/nwp/__pycache__/assets.cpython-310.pyc b/nwp/__pycache__/assets.cpython-310.pyc deleted file mode 100644 index 6c56df4..0000000 Binary files a/nwp/__pycache__/assets.cpython-310.pyc and /dev/null differ diff --git a/nwp/__pycache__/jobs.cpython-310.pyc b/nwp/__pycache__/jobs.cpython-310.pyc deleted file mode 100644 index 721cd79..0000000 Binary files a/nwp/__pycache__/jobs.cpython-310.pyc and /dev/null differ diff --git a/nwp/assets/__init__.py b/nwp/assets/__init__.py index b413960..3588b17 100644 --- a/nwp/assets/__init__.py +++ b/nwp/assets/__init__.py @@ -1,2 +1 @@ from nwp.assets.dwd.archive_to_hf import download_model_files, process_model_files, upload_model_files_to_hf -from nwp.assets.ecmwf.mars import download_mars_file diff --git a/nwp/assets/ecmwf/mars.py b/nwp/assets/ecmwf/mars.py index 65a70bf..dab74b4 100644 --- a/nwp/assets/ecmwf/mars.py +++ b/nwp/assets/ecmwf/mars.py @@ -1,25 +1,32 @@ +from dagster_docker import execute_docker_container +from dagster import Config, OpExecutionContext, op +import datetime as dt -from dagster import Output, asset -from ecmwfapi import ECMWFService -server = ECMWFService("mars") +class NWPConsumerConfig(Config): + date_from: str + date_to: str + source: str - -@asset -def download_mars_file(): - server.execute( - req={ - "class": "od", - "date": "20230815/to/20230816", - "expver": "1", - "levtype": "sfc", - "param": "28.228/49.128/123.128/165.128/166.128/239.228/246.228/247.228", - "step": "0/t0/48/by/1", - "stream": "oper", - "time": "00:00:00,12:00:00", - "type": "fc", - }, - target="20230815.grib" +@op +def nwp_consumer_docker_op(context: OpExecutionContext, config: NWPConsumerConfig): + execute_docker_container( + context=context, + image="ghcr.io/openclimatefix/nwp-consumer", + command=[ + "consume", f'--source={config.source}', + f'--from={config.date_from}', + f'--to={config.date_to}' + ], + env_vars=["ECMWF_API_KEY", "ECMWF_API_URL", "ECMWF_API_EMAIL"], + container_kwargs={ + "volumes": [ + '/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/raw:/tmp/raw', + '/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/zarr:/tmp/zarr', + '/tmp/nwpc:/tmp/nwpc' + ] + } ) - return Output(None, metadata={"filepath": "20230815.grib"}) + pass + diff --git a/nwp/jobs.py b/nwp/jobs.py index e9b16be..f75653c 100644 --- a/nwp/jobs.py +++ b/nwp/jobs.py @@ -1,6 +1,10 @@ -from dagster import AssetSelection, ScheduleDefinition, define_asset_job, schedule +from dagster import AssetSelection, ScheduleDefinition, define_asset_job, schedule, job, RunConfig from nwp.assets.dwd.common import IconConfig +from nwp.assets.ecmwf.mars import nwp_consumer_docker_op, NWPConsumerConfig + +import datetime as dt + base_path = "/mnt/storage_b/data/ocf/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/DWD" @@ -21,31 +25,42 @@ def build_config_on_runtime(model, run, delay=0): for r in ["00", "06", "12", "18"]: for model in ["global", "eu"]: for delay in [0, 1]: - asset_job = define_asset_job(f"download_{model}_run_{r}_{'today' if delay == 0 else 'yesterday'}", AssetSelection.all(), - config={ - 'ops': {"download_model_files": {"config": build_config_on_runtime(model, r, delay)}, - "process_model_files": {"config": build_config_on_runtime(model, r, delay)}, - "upload_model_files_to_hf": { - "config": build_config_on_runtime(model, r, delay)}, }}) - if delay == 0: - if r == "00": - schedule = ScheduleDefinition(job=asset_job, cron_schedule="30 4 * * *") - elif r == "06": - schedule = ScheduleDefinition(job=asset_job, cron_schedule="30 10 * * *") - elif r == "12": - schedule = ScheduleDefinition(job=asset_job, cron_schedule="30 16 * * *") - elif r == "18": - schedule = ScheduleDefinition(job=asset_job, cron_schedule="30 22 * * *") - elif delay == 1: - if r == "00": - schedule = ScheduleDefinition(job=asset_job, cron_schedule="1 0 * * *") - elif r == "06": - schedule = ScheduleDefinition(job=asset_job, cron_schedule="0 2 * * *") - elif r == "12": - schedule = ScheduleDefinition(job=asset_job, cron_schedule="0 6 * * *") - elif r == "18": - schedule = ScheduleDefinition(job=asset_job, cron_schedule="0 8 * * *") - + asset_job = define_asset_job( + name=f"download_{model}_run_{r}_{'today' if delay == 0 else 'yesterday'}", + selection=AssetSelection.all(), + config={'ops': { + "download_model_files": {"config": build_config_on_runtime(model, r, delay)}, + "process_model_files": {"config": build_config_on_runtime(model, r, delay)}, + "upload_model_files_to_hf": {"config": build_config_on_runtime(model, r, delay)}, + }} + ) + match (delay, r): + case (0, "00"): + schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="30 4 * * *")) + case (0, "06"): + schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="30 10 * * *")) + case (0, "12"): + schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="30 16 * * *")) + case (0, "18"): + schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="30 22 * * *")) + case (1, "00"): + schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="1 0 * * *")) + case (1, "06"): + schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="0 2 * * *")) + case (1, "12"): + schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="0 6 * * *")) + case (1, "18"): + schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="0 8 * * *")) + asset_jobs.append(asset_job) - schedule_jobs.append(schedule) + +@job(config=RunConfig( + ops={"nwp_consumer_docker_op": NWPConsumerConfig( + date_from="2021-01-01", + date_to="2021-01-01", + source="ecmwf-mars" + )} + )) +def get_ecmwf_data(): + nwp_consumer_docker_op() diff --git a/pyproject.toml b/pyproject.toml index 72150fd..c649e73 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,6 +22,7 @@ dependencies = [ "dagit == 1.4.11", "dagster == 1.4.11", "dagster-cloud == 1.4.11", + "dagster-docker == 0.20.11", "huggingface-hub == 0.16.4", "numpy == 1.24.2", "ocf-blosc2 == 0.0.3", @@ -36,7 +37,8 @@ dev = [ "ruff == 0.0.259", "unittest-xml-reporting == 3.2.0", "dagster-webserver == 1.4.11", - "pytest == 7.4.1" + "pytest == 7.4.1", + "python-lsp-server == 1.7.4" ] [tool.setuptools.packages.find]