Skip to content

Commit

Permalink
Add simple ecmwf asset (#4)
Browse files Browse the repository at this point in the history
  • Loading branch information
devsjc authored Sep 6, 2023
1 parent f621ad0 commit 8c22682
Show file tree
Hide file tree
Showing 13 changed files with 173 additions and 37 deletions.
90 changes: 90 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
name: Python CI

on:
push:
branches: []
paths-ignore:
- 'README.md'
pull_request:
branches: []
paths-ignore:
- 'README.md'

# Specify concurrency such that only one workflow can run at a time
# * Different workflow files are not affected
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
# Define a dependencies job that runs on all branches and PRs
# * Installs dependencies and caches them
build-venv:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v3

# Restore cached virtualenv, if available
# * The pyproject.toml hash is part of the cache key, invalidating
# the cache if the file changes
- name: Restore cached virtualenv
id: restore-cache
uses: actions/cache/restore@v3
with:
path: ./venv
key: ${{ runner.os }}-venv-${{ hashFiles('**/pyproject.toml') }}

# Should mirror the build-venv stage in the Containerfile
- name: Build venv
run: |
apt -qq update && apt -qq install -y build-essential
python -m venv ./venv
./venv/bin/pip install --upgrade -q pip wheel setuptools
if: steps.restore-cache.outputs.cache-hit != 'true'

# Should mirror the build-reqs stage in the Containerfile
# * Except this installs the dev dependencies and binaries as well
- name: Install all dependencies
run: |
./venv/bin/pip install -q .[dev]
if: steps.restore-cache.outputs.cache-hit != 'true'

# Cache the virtualenv for future runs
- name: Cache virtualenv
uses: actions/cache/save@v3
with:
path: ./venv
key: ${{ steps.restore-cache.outputs.cache-primary-key }}
if: steps.restore-cache.outputs.cache-hit != 'true'

# Define a unittest job that runs on all branches and PRs
test-unit:
runs-on: ubuntu-latest
needs: build-venv

steps:
- name: Checkout repository
uses: actions/checkout@v3

# Restore cached virtualenv
- name: Restore cached virtualenv
uses: actions/cache/restore@v3
with:
path: ./venv
key: ${{ runner.os }}-venv-${{ hashFiles('**/pyproject.toml') }}

# Run unittests
# * Produce JUnit XML report
- name: Run unit tests
run: ./venv/bin/python -m pytest --junitxml=ut-report.xml dags_tests

# Create test summary to be visualised on the job summary screen on GitHub
# * Runs even if previous steps fail
- name: Create test summary
uses: test-summary/action@v2
with:
paths: "*t-report.xml"
show: "fail, skip"
if: always()

10 changes: 10 additions & 0 deletions dags_tests/compile_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from dagster import Definitions, load_assets_from_modules
from nwp import assets, jobs

def test_compiles():
all_assets = load_assets_from_modules([assets])
defs = Definitions(
assets=all_assets,
jobs=jobs.asset_jobs,
schedules=jobs.schedule_jobs
)
Binary file modified nwp/__pycache__/__init__.cpython-310.pyc
Binary file not shown.
Binary file modified nwp/__pycache__/jobs.cpython-310.pyc
Binary file not shown.
1 change: 1 addition & 0 deletions nwp/assets/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from nwp.assets.dwd.archive_to_hf import download_model_files, process_model_files, upload_model_files_to_hf
from nwp.assets.ecmwf.mars import download_mars_file
12 changes: 4 additions & 8 deletions nwp/assets/dwd/archive_to_hf.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
from dagster import asset # import the `dagster` library
from nwp.assets.dwd.common import IconConfig

import os
import shutil
from glob import glob

import xarray as xr
import zarr
from dagster import asset # import the `dagster` library
from huggingface_hub import HfApi
from ocf_blosc2 import Blosc2
import shutil

from nwp.assets.dwd.common import IconConfig
from nwp.assets.dwd.consts import (
EU_PRESSURE_LEVELS,
EU_VAR2D_LIST,
Expand All @@ -34,10 +33,7 @@ def does_files_exist(config, now_datetime):
f"{now_datetime.year}{str(now_datetime.month).zfill(2)}{str(now_datetime.day).zfill(2)}"
f"_{str(now_datetime.hour).zfill(2)}.zarr.zip"
# Check if the current run exists or not
if path_in_repo in existing_files:
return True
else:
return False
return path_in_repo in existing_files


@asset
Expand Down
4 changes: 2 additions & 2 deletions nwp/assets/dwd/common.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from datetime import timedelta

from dagster import Config
from datetime import datetime, timedelta
import os


class IconConfig(Config):
Expand Down
30 changes: 13 additions & 17 deletions nwp/assets/dwd/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Utilities for downloading the DWD ICON models"""
"""Utilities for downloading the DWD ICON models."""
import bz2
import os
from datetime import datetime, timedelta
Expand All @@ -9,13 +9,14 @@


def get_run(run: str, delay: int = 0):
"""
Get run name
"""Get run name.
Args:
----
run: Run number
Returns:
-------
Run date and run number
"""
now = datetime.now() - timedelta(days=delay)
Expand Down Expand Up @@ -50,12 +51,10 @@ def find_file_name(
if (vars_2d is None) and (vars_3d is None):
raise ValueError("You need to specify at least one 2D or one 3D variable")

if vars_2d is not None:
if type(vars_2d) is not list:
vars_2d = [vars_2d]
if vars_3d is not None:
if type(vars_3d) is not list:
vars_3d = [vars_3d]
if vars_2d is not None and type(vars_2d) is not list:
vars_2d = [vars_2d]
if vars_3d is not None and type(vars_3d) is not list:
vars_3d = [vars_3d]

urls = []
for f_time in f_times:
Expand Down Expand Up @@ -84,13 +83,9 @@ def find_file_name(

def download_extract_files(urls: list, folder: str):
"""Given a list of urls download and bunzip2 them.
Return a list of the path of the extracted files
Return a list of the path of the extracted files.
"""

if type(urls) is list:
urls_list = urls
else:
urls_list = [urls]
urls_list = urls if type(urls) is list else [urls]

# We only parallelize if we have a number of files
# larger than the cpu count
Expand All @@ -108,13 +103,14 @@ def download_extract_files(urls: list, folder: str):


def download_extract_url(url_and_folder):
"""
Download and extract url if file isn't already downloaded
"""Download and extract url if file isn't already downloaded.
Args:
----
url_and_folder: Tuple of URL and folder
Returns:
-------
"""
url, folder = url_and_folder
Expand Down
4 changes: 0 additions & 4 deletions nwp/assets/ecmwf/consts.py

This file was deleted.

25 changes: 25 additions & 0 deletions nwp/assets/ecmwf/mars.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@

from dagster import Output, asset
from ecmwfapi import ECMWFService

server = ECMWFService("mars")


@asset
def download_mars_file():
server.execute(
req={
"class": "od",
"date": "20230815/to/20230816",
"expver": "1",
"levtype": "sfc",
"param": "28.228/49.128/123.128/165.128/166.128/239.228/246.228/247.228",
"step": "0/t0/48/by/1",
"stream": "oper",
"time": "00:00:00,12:00:00",
"type": "fc",
},
target="20230815.grib"
)

return Output(None, metadata={"filepath": "20230815.grib"})
4 changes: 2 additions & 2 deletions nwp/jobs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from dagster import job, schedule, ScheduleEvaluationContext, RunRequest, RunConfig, materialize, ScheduleDefinition, op
from dagster import AssetSelection, ScheduleDefinition, define_asset_job, schedule

from nwp.assets.dwd.common import IconConfig
from dagster import AssetSelection, define_asset_job

base_path = "/mnt/storage_b/data/ocf/data/ocf/solar_pv_nowcasting/nowcasting_dataset_pipeline/NWP/DWD"

Expand Down
26 changes: 26 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ authors = [
]
classifiers = ["Programming Language :: Python :: 3"]
dependencies = [
"ecmwf-api-client == 1.6.3",
"dagit == 1.4.11",
"dagster == 1.4.11",
"dagster-cloud == 1.4.11",
"huggingface-hub == 0.16.4",
"numpy == 1.24.2",
"ocf-blosc2 == 0.0.3",
"pathlib == 1.0.1",
Expand All @@ -40,4 +42,28 @@ dev = [
[tool.setuptools.packages.find]
exclude = ["*_tests"]

# Ruff configuration
# * See https://beta.ruff.rs/docs/
[tool.ruff]
select = [
"F", # pyflakes
"E", # pycodestyle
"W", # whitespace and newlines
"I", # isort
"UP", # modernize
"ANN", # flake8 type annotations
"S", # flake8 bandit
"B", # flake8 bugbear
"C4", # flake8 comprehensions
"T20", # flake8 print
"SIM", # flake8 simplify
"ARG", # flake8 unused arguments
"D", # pydocstyle
]
line-length = 100
ignore = ["D203", "D213", "ANN101"]
exclude = ["__init__.py"]

[tool.ruff.per-file-ignores]
"test*" = ["D", "ANN"]

4 changes: 0 additions & 4 deletions sat/assets.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
import json

import pandas as pd
import requests

from dagster import AssetExecutionContext, MetadataValue, asset


0 comments on commit 8c22682

Please sign in to comment.