Skip to content

Commit

Permalink
Add process to average over shape (#251)
Browse files Browse the repository at this point in the history
* Added average_shape process to perform average over shape

* Added tests in rook/tests for wps_average_shape

* Fix alignment logic to it does not consider polygon averages aligned

* Set valid polygon for average_shape test

* Update requirements and environments to require latest version of clisops and daops supporting the spatial averager

* Get tests working for shape average operation.

* Adjust title

* updated daops requirements following 0.11.0 release.

* pep8

* removed artefact from merge

---------

Co-authored-by: charlesgauthier-udm <[email protected]>
Co-authored-by: charlesgauthier-udm <[email protected]>
  • Loading branch information
3 people authored Apr 15, 2024
1 parent b5c5332 commit ee091a7
Show file tree
Hide file tree
Showing 11 changed files with 222 additions and 15 deletions.
10 changes: 8 additions & 2 deletions CHANGES.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
Changes
*******


0.13.1 (unreleased)
==================

* Added process for average over polygon.

0.13.0 (2024-02-06)
==================

Expand All @@ -9,7 +15,7 @@ Changes
0.12.2 (2023-12-08)
==================

* Fixed the `time_components` paramater to avoid issues with 360day calendar (#245)
* Fixed the `time_components` parameter to avoid issues with 360day calendar (#245)

0.12.1 (2023-12-04)
==================
Expand Down Expand Up @@ -43,7 +49,7 @@ Changes
0.10.0 (2023-07-12)
==================

* Updated concat operator to optinally apply subsetting and averaging to improve performance.
* Updated concat operator to optionally apply subsetting and averaging to improve performance.
* Apply cmip6 decadal fixes directly using Python code. Skip lookup of fixes in ElasticSearch.
* Updated to clisops 0.10.0.

Expand Down
4 changes: 2 additions & 2 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ dependencies:
- dask >=2021.12
- netcdf4 >=1.4
- bottleneck >=1.3.1,<1.4
- daops >=0.10.0,<0.11
- clisops >=0.12.1,<0.13
- daops >=0.11.0,<0.12
- clisops >=0.12.2,<0.13
- roocs-utils >=0.6.6,<0.7
- roocs-grids >=0.1.2
# workflow
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ jinja2
click
psutil
# daops
daops>=0.10.0,<0.11
daops>=0.11.0,<0.12
# daops @ git+https://github.com/roocs/daops.git@regrid-main-ce#egg=daops
clisops>=0.12.2,<0.13
# clisops @ git+https://github.com/roocs/clisops.git@master#egg=clisops
Expand Down
3 changes: 2 additions & 1 deletion rook/director/alignment.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ def __init__(self, input_files, inputs):
self._deduce_alignment(inputs)

def _deduce_alignment(self, inputs):
# At present, we reject alignment if any "time_components", "area" or "level" subset is requested
# At present, we reject alignment if any "time_components", "area", "shape" or "level" subset is requested
if (
inputs.get("time_components", None)
or inputs.get("area", None)
or inputs.get("level", None)
or inputs.get("shape", None)
):
return

Expand Down
8 changes: 8 additions & 0 deletions rook/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from rook.utils.average_utils import (
run_average_by_time,
run_average_by_dim,
run_average_by_shape,
)
from rook.utils.weighted_average_utils import run_weighted_average
from rook.utils.subset_utils import run_subset
Expand Down Expand Up @@ -83,6 +84,13 @@ def _get_runner(self):
return run_average_by_dim


class AverageByShape(Operator):
prefix = "average_shape"

def _get_runner(self):
return run_average_by_shape


class WeightedAverage(Operator):
prefix = "weighted_average"

Expand Down
2 changes: 2 additions & 0 deletions rook/processes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from .wps_dashboard import DashboardProcess
from .wps_average_time import AverageByTime
from .wps_average_dim import AverageByDimension
from .wps_average_shape import AverageByShape
from .wps_average_weighted import WeightedAverage
from .wps_orchestrate import Orchestrate
from .wps_subset import Subset
Expand All @@ -14,6 +15,7 @@
Subset(),
AverageByTime(),
AverageByDimension(),
AverageByShape(),
WeightedAverage(),
Concat(),
Regrid(),
Expand Down
136 changes: 136 additions & 0 deletions rook/processes/wps_average_shape.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import logging
import os

from pywps import FORMATS, ComplexOutput, Format, LiteralInput, Process, ComplexInput
from pywps.app.Common import Metadata
from pywps.app.exceptions import ProcessError
from pywps.inout.outputs import MetaFile, MetaLink4

from ..director import wrap_director
from ..utils.input_utils import parse_wps_input
from ..utils.metalink_utils import build_metalink
from ..utils.response_utils import populate_response
from ..utils.average_utils import run_average_by_shape

LOGGER = logging.getLogger()


class AverageByShape(Process):
def __init__(self):
inputs = [
LiteralInput(
"collection",
"Collection",
abstract="A dataset identifier or list of comma separated identifiers. "
"Example: c3s-cmip5.output1.ICHEC.EC-EARTH.historical.day.atmos.day.r1i1p1.tas.latest",
data_type="string",
min_occurs=1,
max_occurs=1,
),
ComplexInput(
"shape",
"Vector Shape",
abstract="An ESRI Shapefile, GML, GeoPackage, JSON or GeoJSON file."
" The ESRI Shapefile must be zipped and contain the .shp, .shx, and .dbf.",
supported_formats=[
FORMATS.GML,
FORMATS.GEOJSON,
FORMATS.SHP,
FORMATS.JSON,
FORMATS.ZIP,
],
min_occurs=1,
max_occurs=1,
),
LiteralInput(
"pre_checked",
"Pre-Checked",
data_type="boolean",
abstract="Use checked data only.",
default="0",
min_occurs=1,
max_occurs=1,
),
LiteralInput(
"apply_fixes",
"Apply Fixes",
data_type="boolean",
abstract="Apply fixes to datasets.",
default="1",
min_occurs=1,
max_occurs=1,
),
]
outputs = [
ComplexOutput(
"output",
"METALINK v4 output",
abstract="Metalink v4 document with references to NetCDF files.",
as_reference=True,
supported_formats=[FORMATS.META4],
),
ComplexOutput(
"prov",
"Provenance",
abstract="Provenance document using W3C standard.",
as_reference=True,
supported_formats=[FORMATS.JSON],
),
ComplexOutput(
"prov_plot",
"Provenance Diagram",
abstract="Provenance document as diagram.",
as_reference=True,
supported_formats=[
Format("image/png", extension=".png", encoding="base64")
],
),
]

super(AverageByShape, self).__init__(
self._handler,
identifier="average_shape",
title="Average over polygonal shape",
abstract="Run averaging over a specified shape on climate model data.",
metadata=[
Metadata("DAOPS", "https://github.com/roocs/daops"),
],
version="1.0",
inputs=inputs,
outputs=outputs,
store_supported=True,
status_supported=True,
)

def _handler(self, request, response):
# show me the environment used by the process in debug mode
LOGGER.debug(f"Environment used in average_shape: {os.environ}")

collection = parse_wps_input(
request.inputs, "collection", as_sequence=True, must_exist=True
)

inputs = {
"collection": collection,
"output_dir": self.workdir,
"apply_fixes": parse_wps_input(request.inputs, "apply_fixes", default=True),
"pre_checked": parse_wps_input(
request.inputs, "pre_checked", default=False
),
"shape": parse_wps_input(request.inputs, "shape", default=None),
}

# Let the director manage the processing or redirection to original files
director = wrap_director(collection, inputs, run_average_by_shape)

ml4 = build_metalink(
"average-shape-result",
"Averaging by shape result as NetCDF files.",
self.workdir,
director.output_uris,
)

populate_response(
response, "average_shape", self.workdir, inputs, collection, ml4
)
return response
7 changes: 7 additions & 0 deletions rook/utils/average_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,10 @@ def run_average_by_dim(args):

result = average_over_dims(**args)
return result.file_uris


def run_average_by_shape(args):
from daops.ops.average import average_shape

result = average_shape(**args)
return result.file_uris
54 changes: 54 additions & 0 deletions tests/test_wps_average_shape.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import pytest

from pywps import Service
from pywps.tests import assert_process_exception, assert_response_success, client_for
from pywps.app.exceptions import ProcessError
from rook.processes.wps_average_shape import AverageByShape
from shapely import Polygon
import geopandas as gpd
import xarray as xr

from .common import PYWPS_CFG, get_output, extract_paths_from_metalink


POLY = Polygon([[5.8671874999999996, 57.326521225217064],
[-15.468749999999998, 48.45835188280866],
[-16.171875, 24.84656534821976],
[-3.8671874999999996, 13.581920900545844],
[21.796875, 25.799891182088334],
[22.8515625, 52.482780222078226],
[5.8671874999999996, 57.326521225217064]])


def test_wps_average_shape_cmip6(tmp_path):
# Save POLY to tmpdir
tmp_poly_path = tmp_path / "tmppoly.json"
gpd.GeoDataFrame([{'geometry': POLY}]).to_file(tmp_poly_path)

# test the case where the inventory is used
client = client_for(Service(processes=[AverageByShape()], cfgfiles=[PYWPS_CFG]))
datainputs = "collection=c3s-cmip6.ScenarioMIP.INM.INM-CM5-0.ssp245.r1i1p1f1.Amon.rlds.gr1.v20190619"
datainputs += f";shape={tmp_poly_path}"
resp = client.get(
f"?service=WPS&request=Execute&version=1.0.0&identifier=average_shape&datainputs={datainputs}"
)
assert_response_success(resp)
assert "output" in get_output(resp.xml)
assert_geom_created(path=get_output(resp.xml)["output"])


def assert_geom_created(path):
assert "meta4" in path
paths = extract_paths_from_metalink(path)
assert len(paths) > 0
ds = xr.open_dataset(paths[0])
assert "geom" in ds.coords


def test_wps_average_no_shape():
client = client_for(Service(processes=[AverageByShape()], cfgfiles=[PYWPS_CFG]))
datainputs = "collection=c3s-cmip6.ScenarioMIP.INM.INM-CM5-0.ssp245.r1i1p1f1.Amon.rlds.gr1.v20190619"
resp = client.get(
f"?service=WPS&request=Execute&version=1.0.0&identifier=average_shape&datainputs={datainputs}"
)
assert_process_exception(resp, code="MissingParameterValue")
1 change: 1 addition & 0 deletions tests/test_wps_caps.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ def test_wps_caps():
)
assert sorted(names.split()) == [
"average",
"average_shape",
"average_time",
"concat",
"dashboard",
Expand Down
10 changes: 1 addition & 9 deletions tests/test_wps_cmip6_decadal.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,13 @@
import pytest

import xarray as xr
from bs4 import BeautifulSoup

from pywps import Service
from pywps.tests import assert_process_exception, assert_response_success, client_for

from rook.processes.wps_subset import Subset

from .common import PYWPS_CFG, get_output


def extract_paths_from_metalink(path):
path = path.replace("file://", "")
doc = BeautifulSoup(open(path, "r").read(), "xml")
paths = [el.text.replace("file://", "") for el in doc.find_all("metaurl")]
return paths
from .common import PYWPS_CFG, get_output, extract_paths_from_metalink


def assert_decadal_fix_applied(path):
Expand Down

0 comments on commit ee091a7

Please sign in to comment.