From 6c15f156cfef0b3057e430a4f00e9ae35cab9a1b Mon Sep 17 00:00:00 2001 From: devsjc Date: Thu, 7 Sep 2023 16:01:54 +0100 Subject: [PATCH 1/4] Add test ecmwf job --- dags_tests/compile_test.py | 11 ++-- nwp/__init__.py | 2 +- nwp/__pycache__/__init__.cpython-310.pyc | Bin 391 -> 397 bytes nwp/__pycache__/jobs.cpython-310.pyc | Bin 1609 -> 2277 bytes nwp/assets/__init__.py | 1 - nwp/assets/ecmwf/mars.py | 47 ++++++++------- nwp/jobs.py | 69 ++++++++++++++--------- pyproject.toml | 4 +- 8 files changed, 77 insertions(+), 57 deletions(-) diff --git a/dags_tests/compile_test.py b/dags_tests/compile_test.py index 58085da..c979b12 100644 --- a/dags_tests/compile_test.py +++ b/dags_tests/compile_test.py @@ -1,10 +1,7 @@ from dagster import Definitions, load_assets_from_modules -from nwp import assets, jobs +from nwp import defs def test_compiles(): - all_assets = load_assets_from_modules([assets]) - defs = Definitions( - assets=all_assets, - jobs=jobs.asset_jobs, - schedules=jobs.schedule_jobs - ) + job_names = [d.name for d in list(defs.get_all_job_defs())] + assert "get_ecmwf_data" in job_names + assert len(job_names) == 18 diff --git a/nwp/__init__.py b/nwp/__init__.py index 3e64b21..1517e8e 100644 --- a/nwp/__init__.py +++ b/nwp/__init__.py @@ -7,6 +7,6 @@ defs = Definitions( assets=all_assets, - jobs=jobs.asset_jobs, + jobs=[jobs.get_ecmwf_data], schedules=jobs.schedule_jobs, ) diff --git a/nwp/__pycache__/__init__.cpython-310.pyc b/nwp/__pycache__/__init__.cpython-310.pyc index 8dc65d80acd7ddfad6f16d96e6d971ea107577f9..e50b1b3b5f309966e86674572a5867aeba24e135 100644 GIT binary patch delta 90 zcmZo??q%l9=jG*M0D|RDf2RDJ$Scd}G*R1>DV=d*!Zf`ozVy_R_|)Xw^0fGr#FE4) s-s0qp)RfYk)cCCYq~cpFDXD42D;bKof%=QMCo3=}vavC;Fmn9`01g@(&;S4c delta 83 zcmeBWZfEAr=jG*M0D_1Y-%`Fzur} zk1Lbq1P459!X1c+2jl;M2k)LSsgsFbxZzPES>LPaZN^0HntJu>RsE`~-mBM>)M^z1 z&o{sR$-k-+@*8%BpE`8jg_{1%2;qd&kVMp_5p!8&xJG2UW@NcmWV<%SaTbn5CAXw? zBP>T1w-Qy|s_vWNs9WQfTjy_bn~&V0?${P_$DvN}(mJ_Azi7IXysWi@?i8HF= z!K-5GF5#nM5@by@IJwH?hrAwBK6aPfw*`GL{Vn0+d_s(gDKRM;H;oSc-`?asoH4a) zxHF<5Cfm#;O(Jj46W)NeZ-378J8tu7=zUFzn0PQNn3&-+e0G<8ODTzeqC_(fbLL^r zJmho0d^f}#D46ea=26Z(%9%&}ATU1;F)tO&Zq7W;na4Tvn7<6nFGI|sf=P4cY0f+a zW*%lg)#v$Rh&fy^pADEhRMrO+Y6A{6JQcKKLu|8P&lK!J!7dc+0-g@o#Ub`c!5*UE zc^ExgkY@|>ES?d@&K3SYT5u2pF%D6g!Dw7NeeJ|GM_9t_AvWp@VLDorPHF+_(EO;*z>!RI>JK8qRQ5T+% zQ@tz6rYfcREM;J~{BbW{OX7BCt3_4iT#)Sdn3{>VyB-+QUL>T)li(^eNmos5T)F^= zHV@E^{-;h9XHJ?W(%%x^ro;Wrcaos}LP?X*mtOZvFHW`tKg~Mvmh(dO5Wp>VJ6#dN zjvaW3vwmqkFz~ebV?hhaPS&8NAA!io4%wkQETfQKbO+>p5)PFn}0G%}Wro3UfQdV{0&tyMOXj; delta 874 zcmZ{h&ubGw6vyYy&TjU1el)46O--x^T@g*ID2Rxl9z+qVCl{r(nX#$1iPrWvxmq=*gYjfiQx&1D5cnV`bbh(Lf zBlLvEE#YnuZVLz15gsS^XmpCZ4dC7ed1DCZ`9;A;GT2%Bj(g~!`61rE>W5Vn5 zNyJ-fz^kNOQss&o@HU}SiWby>_X*vnDBeIaU@M{J9V)DTMx-|Zh_G?Td}9#`mVEf{GClgCU*3jW!bTvM`1v5xsV6rXGUjW3>l4b@tS@M{P?SWxeJkx zwrC8cV;`?pPE-!d70`ipx4Ag*YEiQ>->%gKf4Cwd*|OuHZKk^8iwb|pNkzs-WOi1O z2}!J(iaOJ=sufl0#Gj0UeKmjx&=X|iUTP}-m?}E*1|m;NE;h_*Be^0wK5CX8WhDJ8 zSxg7!nboHJz_YSV`#MUIxRg;_V-5o|S%)A)eQ+rqvR`y)3{ArXA1kBR1^PrfN4bnH L*cACTo*DiIQ$Dnm diff --git a/nwp/assets/__init__.py b/nwp/assets/__init__.py index b413960..3588b17 100644 --- a/nwp/assets/__init__.py +++ b/nwp/assets/__init__.py @@ -1,2 +1 @@ from nwp.assets.dwd.archive_to_hf import download_model_files, process_model_files, upload_model_files_to_hf -from nwp.assets.ecmwf.mars import download_mars_file diff --git a/nwp/assets/ecmwf/mars.py b/nwp/assets/ecmwf/mars.py index 65a70bf..dab74b4 100644 --- a/nwp/assets/ecmwf/mars.py +++ b/nwp/assets/ecmwf/mars.py @@ -1,25 +1,32 @@ +from dagster_docker import execute_docker_container +from dagster import Config, OpExecutionContext, op +import datetime as dt -from dagster import Output, asset -from ecmwfapi import ECMWFService -server = ECMWFService("mars") +class NWPConsumerConfig(Config): + date_from: str + date_to: str + source: str - -@asset -def download_mars_file(): - server.execute( - req={ - "class": "od", - "date": "20230815/to/20230816", - "expver": "1", - "levtype": "sfc", - "param": "28.228/49.128/123.128/165.128/166.128/239.228/246.228/247.228", - "step": "0/t0/48/by/1", - "stream": "oper", - "time": "00:00:00,12:00:00", - "type": "fc", - }, - target="20230815.grib" +@op +def nwp_consumer_docker_op(context: OpExecutionContext, config: NWPConsumerConfig): + execute_docker_container( + context=context, + image="ghcr.io/openclimatefix/nwp-consumer", + command=[ + "consume", f'--source={config.source}', + f'--from={config.date_from}', + f'--to={config.date_to}' + ], + env_vars=["ECMWF_API_KEY", "ECMWF_API_URL", "ECMWF_API_EMAIL"], + container_kwargs={ + "volumes": [ + '/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/raw:/tmp/raw', + '/mnt/storage_b/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/ECMWF/zarr:/tmp/zarr', + '/tmp/nwpc:/tmp/nwpc' + ] + } ) - return Output(None, metadata={"filepath": "20230815.grib"}) + pass + diff --git a/nwp/jobs.py b/nwp/jobs.py index e9b16be..f75653c 100644 --- a/nwp/jobs.py +++ b/nwp/jobs.py @@ -1,6 +1,10 @@ -from dagster import AssetSelection, ScheduleDefinition, define_asset_job, schedule +from dagster import AssetSelection, ScheduleDefinition, define_asset_job, schedule, job, RunConfig from nwp.assets.dwd.common import IconConfig +from nwp.assets.ecmwf.mars import nwp_consumer_docker_op, NWPConsumerConfig + +import datetime as dt + base_path = "/mnt/storage_b/data/ocf/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/DWD" @@ -21,31 +25,42 @@ def build_config_on_runtime(model, run, delay=0): for r in ["00", "06", "12", "18"]: for model in ["global", "eu"]: for delay in [0, 1]: - asset_job = define_asset_job(f"download_{model}_run_{r}_{'today' if delay == 0 else 'yesterday'}", AssetSelection.all(), - config={ - 'ops': {"download_model_files": {"config": build_config_on_runtime(model, r, delay)}, - "process_model_files": {"config": build_config_on_runtime(model, r, delay)}, - "upload_model_files_to_hf": { - "config": build_config_on_runtime(model, r, delay)}, }}) - if delay == 0: - if r == "00": - schedule = ScheduleDefinition(job=asset_job, cron_schedule="30 4 * * *") - elif r == "06": - schedule = ScheduleDefinition(job=asset_job, cron_schedule="30 10 * * *") - elif r == "12": - schedule = ScheduleDefinition(job=asset_job, cron_schedule="30 16 * * *") - elif r == "18": - schedule = ScheduleDefinition(job=asset_job, cron_schedule="30 22 * * *") - elif delay == 1: - if r == "00": - schedule = ScheduleDefinition(job=asset_job, cron_schedule="1 0 * * *") - elif r == "06": - schedule = ScheduleDefinition(job=asset_job, cron_schedule="0 2 * * *") - elif r == "12": - schedule = ScheduleDefinition(job=asset_job, cron_schedule="0 6 * * *") - elif r == "18": - schedule = ScheduleDefinition(job=asset_job, cron_schedule="0 8 * * *") - + asset_job = define_asset_job( + name=f"download_{model}_run_{r}_{'today' if delay == 0 else 'yesterday'}", + selection=AssetSelection.all(), + config={'ops': { + "download_model_files": {"config": build_config_on_runtime(model, r, delay)}, + "process_model_files": {"config": build_config_on_runtime(model, r, delay)}, + "upload_model_files_to_hf": {"config": build_config_on_runtime(model, r, delay)}, + }} + ) + match (delay, r): + case (0, "00"): + schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="30 4 * * *")) + case (0, "06"): + schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="30 10 * * *")) + case (0, "12"): + schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="30 16 * * *")) + case (0, "18"): + schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="30 22 * * *")) + case (1, "00"): + schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="1 0 * * *")) + case (1, "06"): + schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="0 2 * * *")) + case (1, "12"): + schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="0 6 * * *")) + case (1, "18"): + schedule_jobs.append(ScheduleDefinition(job=asset_job, cron_schedule="0 8 * * *")) + asset_jobs.append(asset_job) - schedule_jobs.append(schedule) + +@job(config=RunConfig( + ops={"nwp_consumer_docker_op": NWPConsumerConfig( + date_from="2021-01-01", + date_to="2021-01-01", + source="ecmwf-mars" + )} + )) +def get_ecmwf_data(): + nwp_consumer_docker_op() diff --git a/pyproject.toml b/pyproject.toml index 72150fd..c649e73 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,6 +22,7 @@ dependencies = [ "dagit == 1.4.11", "dagster == 1.4.11", "dagster-cloud == 1.4.11", + "dagster-docker == 0.20.11", "huggingface-hub == 0.16.4", "numpy == 1.24.2", "ocf-blosc2 == 0.0.3", @@ -36,7 +37,8 @@ dev = [ "ruff == 0.0.259", "unittest-xml-reporting == 3.2.0", "dagster-webserver == 1.4.11", - "pytest == 7.4.1" + "pytest == 7.4.1", + "python-lsp-server == 1.7.4" ] [tool.setuptools.packages.find] From d2a917363399abf6183e998a915b82689f90d0d1 Mon Sep 17 00:00:00 2001 From: devsjc Date: Thu, 7 Sep 2023 16:10:19 +0100 Subject: [PATCH 2/4] Remove pycache --- nwp/__pycache__/__init__.cpython-310.pyc | Bin 397 -> 0 bytes nwp/__pycache__/assets.cpython-310.pyc | Bin 1469 -> 0 bytes nwp/__pycache__/jobs.cpython-310.pyc | Bin 2277 -> 0 bytes 3 files changed, 0 insertions(+), 0 deletions(-) delete mode 100644 nwp/__pycache__/__init__.cpython-310.pyc delete mode 100644 nwp/__pycache__/assets.cpython-310.pyc delete mode 100644 nwp/__pycache__/jobs.cpython-310.pyc diff --git a/nwp/__pycache__/__init__.cpython-310.pyc b/nwp/__pycache__/__init__.cpython-310.pyc deleted file mode 100644 index e50b1b3b5f309966e86674572a5867aeba24e135..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 397 zcmYjM%Sr<=6wPaTxKyFI$tH}GJ7Y^SDY;XMYuA2<_)D`D7k+{T z!NgYRg`ArcPR_a4O(q!$_4fI#x1oPTa#%?qm*L=508KPC#8OEulZ-~r47WlGo5+L` zU?3X$LS+gRQb?AAh6WsJ{>tS@i#f@Yui!&+){kdUFWN=7a4q)7#+6!?h|r;`y|c4^b1%YwAIIuPSUtg?>^9H(fi4^Dz2*!>U8zo56^Zxozq~4(*RR4Wf^CT(Uc0B{rm!A2y8O| diff --git a/nwp/__pycache__/assets.cpython-310.pyc b/nwp/__pycache__/assets.cpython-310.pyc deleted file mode 100644 index 6c56df451afc3d4fc812be758c850c042b249cd5..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 1469 zcma)6ON$&g5Z0?7)4Q`V_S)ES5}NgC2s51=axo#;c=I}B2{AaNNf>%Znuk5zYD?-J z@6dBv=a7TR4@d$tIRx^TbPWXj7ku!h)U&f$!v>;8C8?@ZRpR@qWxE|A7$>*?=HGh= z{pAi92ZO^snEDY2jyRs8biLw~G-sMpSjm*-Uh3t3>gPckU?c)gIeU&ya2kq0gd@zo z7nu9P8xkJy@HqoZi{po=(|QT9(LsmW>-V)5=F?|lP??D=9>~IoXU4X_5+>uB$sT3X zN?0#*UITY&GYD)#m=9p;Z$KEd#2l~ilC01I8OjN#C*2w``{?q5e#735}Hd0N5j)zL-u^Ef_i)`>jr~@(6 zZ`eh_OF1b_FN$Iq_rLx!-j@S?Hy#^P>bprYnwW8Q*c-?^`8PxYes>Bn>q4q-R`Bkj z(58Eqv)4qLf3%4(ZZdblUH66)B@Qz!CbEfm^g#jS#FtuYh1a!Q_+?@_L)yzwvdjfG?*&tZ07E5o=;Zu49KEk7dzt>^KH!E@j!;`J7)0dKX!&4(9LwKVrE1 z$=qvTpTB(x8bI&fKp#O}#qigY&^S^QV z>z%9K1F=oF@jIuChM3`K!$}&E2>Wao`>(Hf>JGSdqV;qA5?VIX>Ne<_SKHvM5tRE& sHBVFSx7t+OT)j|HQ(IG3_;D_IH5H#YiZ?A0qL74mmk=7^XlEDy13Sx+$^ZZW diff --git a/nwp/__pycache__/jobs.cpython-310.pyc b/nwp/__pycache__/jobs.cpython-310.pyc deleted file mode 100644 index 60ecc8521bee618453af8e30fa23953461204c1b..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 2277 zcmaKt&5smC6u_&hKXyKLX1``XKv@Pur} zk1Lbq1P459!X1c+2jl;M2k)LSsgsFbxZzPES>LPaZN^0HntJu>RsE`~-mBM>)M^z1 z&o{sR$-k-+@*8%BpE`8jg_{1%2;qd&kVMp_5p!8&xJG2UW@NcmWV<%SaTbn5CAXw? zBP>T1w-Qy|s_vWNs9WQfTjy_bn~&V0?${P_$DvN}(mJ_Azi7IXysWi@?i8HF= z!K-5GF5#nM5@by@IJwH?hrAwBK6aPfw*`GL{Vn0+d_s(gDKRM;H;oSc-`?asoH4a) zxHF<5Cfm#;O(Jj46W)NeZ-378J8tu7=zUFzn0PQNn3&-+e0G<8ODTzeqC_(fbLL^r zJmho0d^f}#D46ea=26Z(%9%&}ATU1;F)tO&Zq7W;na4Tvn7<6nFGI|sf=P4cY0f+a zW*%lg)#v$Rh&fy^pADEhRMrO+Y6A{6JQcKKLu|8P&lK!J!7dc+0-g@o#Ub`c!5*UE zc^ExgkY@|>ES?d@&K3SYT5u2pF%D6g!Dw7NeeJ|GM_9t_AvWp@VLDorPHF+_(EO;*z>!RI>JK8qRQ5T+% zQ@tz6rYfcREM;J~{BbW{OX7BCt3_4iT#)Sdn3{>VyB-+QUL>T)li(^eNmos5T)F^= zHV@E^{-;h9XHJ?W(%%x^ro;Wrcaos}LP?X*mtOZvFHW`tKg~Mvmh(dO5Wp>VJ6#dN zjvaW3vwmqkFz~ebV?hhaPS&8NAA!io4%wkQETfQKbO+>p5)PFn}0G%}Wro3UfQdV{0&tyMOXj; From 49cb8a45a442e7228a3048f5300132be1c7e8ca0 Mon Sep 17 00:00:00 2001 From: devsjc Date: Thu, 7 Sep 2023 16:13:07 +0100 Subject: [PATCH 3/4] Update gitignore --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index b50e7e0..2e9fdcc 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ __pycache__/ *.py[cod] *$py.class +**/__pycache__/ # C extensions *.so From 47f00b1eb8f3333903667dcf99c4dc7d7ac53887 Mon Sep 17 00:00:00 2001 From: devsjc Date: Thu, 7 Sep 2023 16:56:15 +0100 Subject: [PATCH 4/4] Update workflow --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b9afde2..5cccead 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -47,7 +47,7 @@ jobs: # * Except this installs the dev dependencies and binaries as well - name: Install all dependencies run: | - ./venv/bin/pip install -q .[dev] + ./venv/bin/pip install -q --no-binary=dags .[dev] if: steps.restore-cache.outputs.cache-hit != 'true' # Cache the virtualenv for future runs