Skip to content

Commit

Permalink
w-avg with weights
Browse files Browse the repository at this point in the history
  • Loading branch information
cehbrecht committed Oct 30, 2023
1 parent 7ea0802 commit a5da5eb
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 21 deletions.
2 changes: 1 addition & 1 deletion rook/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from rook.utils.average_utils import (
run_average_by_time,
run_average_by_dim,
run_weighted_average,
)
from rook.utils.weighted_average_utils import run_weighted_average
from rook.utils.subset_utils import run_subset
from rook.utils.concat_utils import run_concat
from rook.utils.regrid_utils import run_regrid
Expand Down
3 changes: 1 addition & 2 deletions rook/processes/wps_average_weighted.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from ..utils.input_utils import parse_wps_input
from ..utils.metalink_utils import build_metalink
from ..utils.response_utils import populate_response
from ..utils.average_utils import run_weighted_average
from ..utils.weighted_average_utils import run_weighted_average

LOGGER = logging.getLogger()

Expand Down Expand Up @@ -80,7 +80,6 @@ def _handler(self, request, response):
"output_dir": self.workdir,
"apply_fixes": False,
"pre_checked": False,
"dims": ["latitude", "longitude"],
}
# print(inputs)

Expand Down
12 changes: 6 additions & 6 deletions rook/utils/average_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ def run_average_by_dim(args):
return result.file_uris


def run_weighted_average(args):
from daops.ops.average import average_over_dims
# def run_weighted_average(args):
# from daops.ops.average import average_over_dims

args["apply_fixes"] = False
args["dims"] = ["latitude", "longitude"]
# args["apply_fixes"] = False
# args["dims"] = ["latitude", "longitude"]

result = average_over_dims(**args)
return result.file_uris
# result = average_over_dims(**args)
# return result.file_uris
92 changes: 80 additions & 12 deletions rook/utils/weighted_average_utils.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,101 @@
import numpy as np
import collections

from roocs_utils.parameter import collection_parameter
from roocs_utils.parameter import dimension_parameter

from roocs_utils.project_utils import derive_ds_id

from daops.ops.base import Operation
from daops.utils import normalise

from clisops.ops import average_over_dims
from clisops.ops.average import average_over_dims

class WeightedAverage(Operation):
def _resolve_params(self, collection, **params):
"""
Resolve the input parameters to `self.params` and parameterise
collection parameter and set to `self.collection`.
"""
dims = dimension_parameter.DimensionParameter(params.get("dims"))
collection = collection_parameter.CollectionParameter(collection)

self.collection = collection
self.params = {
"dims": dims,
"ignore_undetected_dims": params.get("ignore_undetected_dims"),
}

# def get_operation_callable(self):
# return clisops_average_over_dims

def _calculate(self):
avg_ds = average_over_dims(
self.ds,
self.params.get("dims", None),
self.params.get("ignore_undetected_dims", None),
config = {
"output_type": self._output_type,
"output_dir": self._output_dir,
"split_method": self._split_method,
"file_namer": self._file_namer,
}

self.params.update(config)

new_collection = collections.OrderedDict()

for dset in self.collection:
ds_id = derive_ds_id(dset)
new_collection[ds_id] = dset.file_paths

# Normalise (i.e. "fix") data inputs based on "character"
norm_collection = normalise.normalise(
new_collection, False # self._apply_fixes
)

return avg_ds
rs = normalise.ResultSet(vars())

# apply weights
datasets = []
for ds_id in norm_collection.keys():
ds = norm_collection[ds_id]
# fix time
ds['time'] = ds['time'].astype('int64')
ds['time_bnds'] = ds['time_bnds'].astype('int64')
# calculate weights
weights = np.cos(np.deg2rad(ds.lat))
weights.name = "weights"
weights.fillna(0)
# apply weights
ds_weighted = ds.weighted(weights)
# add to list
datasets.append(ds_weighted)

# average
outputs = average_over_dims(
datasets,
dims=["latitude", "longitude"],
output_type="nc",
)
# result
rs.add("output", outputs)

return rs


def run_weighted_average(args):
result = weighted_average(**args)
return result.file_uris


def weighted_average(
collection,
ignore_undetected_dims=False,
output_dir=None,
output_type="netcdf",
split_method="time:auto",
file_namer="standard",
apply_fixes=False,
apply_average=False,
):
result_set = WeightedAverage(
collection=collection,
ignore_undetected_dims=ignore_undetected_dims,
output_dir=output_dir,
output_type=output_type,
split_method=split_method,
file_namer=file_namer,
apply_fixes=apply_fixes,
apply_average=apply_average)._calculate()
return result_set

0 comments on commit a5da5eb

Please sign in to comment.