From a5da5ebdf877ce5fbfcdad2c68bf641d26f0b9c5 Mon Sep 17 00:00:00 2001 From: Pingu Carsti Date: Mon, 30 Oct 2023 18:32:41 +0100 Subject: [PATCH] w-avg with weights --- rook/operator.py | 2 +- rook/processes/wps_average_weighted.py | 3 +- rook/utils/average_utils.py | 12 ++-- rook/utils/weighted_average_utils.py | 92 ++++++++++++++++++++++---- 4 files changed, 88 insertions(+), 21 deletions(-) diff --git a/rook/operator.py b/rook/operator.py index cb5cc45..2a7a590 100644 --- a/rook/operator.py +++ b/rook/operator.py @@ -8,8 +8,8 @@ from rook.utils.average_utils import ( run_average_by_time, run_average_by_dim, - run_weighted_average, ) +from rook.utils.weighted_average_utils import run_weighted_average from rook.utils.subset_utils import run_subset from rook.utils.concat_utils import run_concat from rook.utils.regrid_utils import run_regrid diff --git a/rook/processes/wps_average_weighted.py b/rook/processes/wps_average_weighted.py index 0d2213d..d1939c7 100644 --- a/rook/processes/wps_average_weighted.py +++ b/rook/processes/wps_average_weighted.py @@ -10,7 +10,7 @@ from ..utils.input_utils import parse_wps_input from ..utils.metalink_utils import build_metalink from ..utils.response_utils import populate_response -from ..utils.average_utils import run_weighted_average +from ..utils.weighted_average_utils import run_weighted_average LOGGER = logging.getLogger() @@ -80,7 +80,6 @@ def _handler(self, request, response): "output_dir": self.workdir, "apply_fixes": False, "pre_checked": False, - "dims": ["latitude", "longitude"], } # print(inputs) diff --git a/rook/utils/average_utils.py b/rook/utils/average_utils.py index 2f4c42d..60dee26 100644 --- a/rook/utils/average_utils.py +++ b/rook/utils/average_utils.py @@ -13,11 +13,11 @@ def run_average_by_dim(args): return result.file_uris -def run_weighted_average(args): - from daops.ops.average import average_over_dims +# def run_weighted_average(args): +# from daops.ops.average import average_over_dims - args["apply_fixes"] = False - args["dims"] = ["latitude", "longitude"] +# args["apply_fixes"] = False +# args["dims"] = ["latitude", "longitude"] - result = average_over_dims(**args) - return result.file_uris +# result = average_over_dims(**args) +# return result.file_uris diff --git a/rook/utils/weighted_average_utils.py b/rook/utils/weighted_average_utils.py index c3a3dc2..b1b53da 100644 --- a/rook/utils/weighted_average_utils.py +++ b/rook/utils/weighted_average_utils.py @@ -1,9 +1,14 @@ +import numpy as np +import collections + from roocs_utils.parameter import collection_parameter -from roocs_utils.parameter import dimension_parameter + +from roocs_utils.project_utils import derive_ds_id from daops.ops.base import Operation +from daops.utils import normalise -from clisops.ops import average_over_dims +from clisops.ops.average import average_over_dims class WeightedAverage(Operation): def _resolve_params(self, collection, **params): @@ -11,23 +16,86 @@ def _resolve_params(self, collection, **params): Resolve the input parameters to `self.params` and parameterise collection parameter and set to `self.collection`. """ - dims = dimension_parameter.DimensionParameter(params.get("dims")) collection = collection_parameter.CollectionParameter(collection) self.collection = collection self.params = { - "dims": dims, "ignore_undetected_dims": params.get("ignore_undetected_dims"), } - # def get_operation_callable(self): - # return clisops_average_over_dims - def _calculate(self): - avg_ds = average_over_dims( - self.ds, - self.params.get("dims", None), - self.params.get("ignore_undetected_dims", None), + config = { + "output_type": self._output_type, + "output_dir": self._output_dir, + "split_method": self._split_method, + "file_namer": self._file_namer, + } + + self.params.update(config) + + new_collection = collections.OrderedDict() + + for dset in self.collection: + ds_id = derive_ds_id(dset) + new_collection[ds_id] = dset.file_paths + + # Normalise (i.e. "fix") data inputs based on "character" + norm_collection = normalise.normalise( + new_collection, False # self._apply_fixes ) - return avg_ds \ No newline at end of file + rs = normalise.ResultSet(vars()) + + # apply weights + datasets = [] + for ds_id in norm_collection.keys(): + ds = norm_collection[ds_id] + # fix time + ds['time'] = ds['time'].astype('int64') + ds['time_bnds'] = ds['time_bnds'].astype('int64') + # calculate weights + weights = np.cos(np.deg2rad(ds.lat)) + weights.name = "weights" + weights.fillna(0) + # apply weights + ds_weighted = ds.weighted(weights) + # add to list + datasets.append(ds_weighted) + + # average + outputs = average_over_dims( + datasets, + dims=["latitude", "longitude"], + output_type="nc", + ) + # result + rs.add("output", outputs) + + return rs + + +def run_weighted_average(args): + result = weighted_average(**args) + return result.file_uris + + +def weighted_average( + collection, + ignore_undetected_dims=False, + output_dir=None, + output_type="netcdf", + split_method="time:auto", + file_namer="standard", + apply_fixes=False, + apply_average=False, +): + result_set = WeightedAverage( + collection=collection, + ignore_undetected_dims=ignore_undetected_dims, + output_dir=output_dir, + output_type=output_type, + split_method=split_method, + file_namer=file_namer, + apply_fixes=apply_fixes, + apply_average=apply_average)._calculate() + return result_set \ No newline at end of file