Skip to content

Commit

Permalink
Update RibaMetaMod-tests with cases where dtsw != dtgw (#314)
Browse files Browse the repository at this point in the history
* update tests to support smaller dtsw and larger dtgw + fix corresponding issues for ponding.

also: necessary and inevitable update pixi toml and lock
  • Loading branch information
HendrikKok authored Aug 13, 2024
1 parent a2d3015 commit 5a5633b
Show file tree
Hide file tree
Showing 9 changed files with 6,815 additions and 7,101 deletions.
138 changes: 83 additions & 55 deletions imod_coupler/drivers/ribametamod/exchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,45 +23,45 @@ class ExchangeBalance:

def __init__(self, shape: int, labels: list[str]) -> None:
self.shape = shape
self.flux_labels = labels
self.volume_labels = labels
self._init_arrays()

def compute_realised(self, realised: NDArray[np.float64]) -> None:
def compute_realised(self, realised_volume: NDArray[np.float64]) -> None:
"""
This function computes the realised (negative) volumes
"""
shortage = np.absolute(self.demand - realised)
shortage = self.demand - realised_volume
demand_negative = self.demand_negative
self._check_valid_shortage(
shortage
) # use a numpy isclose to set the max non-zero value
# deal with zero division
realised_fraction = np.where(
demand_negative < 0.0, 1.0 - (-shortage / demand_negative), 1.0
demand_negative < 0.0, 1.0 - (shortage / demand_negative), 1.0
)
for flux_label in self.flux_labels:
self.realised_negative[flux_label] = (
self.demands_negative[flux_label] * realised_fraction
for volume_label in self.volume_labels:
self.realised_negative[volume_label] = (
self.demands_negative[volume_label] * realised_fraction
)

def reset(self) -> None:
"""
function sets all arrays to zero
"""
for flux_label in self.flux_labels:
self.demands[flux_label][:] = 0.0
self.demands_negative[flux_label][:] = 0.0
self.realised_negative[flux_label][:] = 0.0
for volume_label in self.volume_labels:
self.demands[volume_label][:] = 0.0
self.demands_negative[volume_label][:] = 0.0
self.realised_negative[volume_label][:] = 0.0

def _check_valid_shortage(self, shortage: NDArray[np.float64]) -> None:
eps: float = 1.0e-04
if np.any(np.logical_and(self.demand > 0.0, shortage > eps)):
if np.any(np.logical_and(self.demand > 0.0, np.absolute(shortage) > eps)):
raise ValueError(
"Invalid realised values: found shortage for positive demand"
"Invalid realised volumes: found shortage for positive demand"
)
if np.any(shortage > np.absolute(self.demand_negative) + eps):
if np.any(shortage < self.demand_negative - eps):
raise ValueError(
"Invalid realised values: found shortage larger than negative demand contributions"
"Invalid realised volumes: found shortage larger than negative demand contributions"
)

def _zeros_array(self) -> NDArray[np.float64]:
Expand All @@ -72,10 +72,10 @@ def _init_arrays(self) -> None:
self.demands_mf6 = {}
self.demands_negative = {}
self.realised_negative = {}
for flux_label in self.flux_labels:
self.demands[flux_label] = self._zeros_array()
self.demands_negative[flux_label] = self._zeros_array()
self.realised_negative[flux_label] = self._zeros_array()
for volume_label in self.volume_labels:
self.demands[volume_label] = self._zeros_array()
self.demands_negative[volume_label] = self._zeros_array()
self.realised_negative[volume_label] = self._zeros_array()

@property
def demand_negative(self) -> Any:
Expand Down Expand Up @@ -123,6 +123,7 @@ def __init__(
self.ribasim_infiltration = ribasim_infiltration
self.ribasim_drainage = ribasim_drainage
self.exchange_logger = exchange_logger
self.exchanged_ponding_per_dtsw = np.zeros_like(self.demand)

def update_api_packages(self) -> None:
"""
Expand Down Expand Up @@ -151,12 +152,17 @@ def reset(self) -> None:
self.ribasim_drainage[:] = 0.0
super().reset()
self.update_api_packages()
# reset cummulative array for subtimestepping
self.exchanged_ponding_per_dtsw[:] = 0.0

def add_flux_estimate_mod(self, delt: float, mf6_head: NDArray[np.float64]) -> None:
def add_flux_estimate_mod(
self, mf6_head: NDArray[np.float64], delt_gw: float
) -> None:
# Compute MODFLOW 6 river and drain flux extimates
for key, river in self.mf6_river_packages.items():
# Swap sign since a negative RIV flux means a positive contribution to Ribasim
river_flux = -river.get_flux_estimate(mf6_head) / days_to_seconds(1.0)
# Flux estimation is always in m3/d; add to demands as volume per delt_gw
river_flux = -river.get_flux_estimate(mf6_head) * delt_gw
river_flux_negative = np.where(river_flux < 0, river_flux, 0)
self.demands_mf6[key] = river_flux[:]
self.demands[key] = self.mapping.map_mod2rib[key].dot(self.demands_mf6[key])
Expand All @@ -165,45 +171,34 @@ def add_flux_estimate_mod(self, delt: float, mf6_head: NDArray[np.float64]) -> N
)
for key, drainage in self.mf6_drainage_packages.items():
# Swap sign since a negative RIV flux means a positive contribution to Ribasim
drain_flux = -drainage.get_flux_estimate(mf6_head) / days_to_seconds(1.0)
drain_flux = -drainage.get_flux_estimate(mf6_head) * delt_gw
self.demands[key] = self.mapping.map_mod2rib[key].dot(drain_flux)
# convert modflow flux (m3/day) to ribasim flux (m3/s)

def log_demands(self, current_time: float) -> None:
for key, array in self.demands.items():
self.exchange_logger.log_exchange(
("exchange_demand_" + key), array, current_time
)
for key in self.mf6_active_river_api_packages.keys():
self.mf6_active_river_api_packages[key].rhs[:]
self.exchange_logger.log_exchange(
("exchange_correction_" + key), array, current_time
)

def add_ponding_msw(
self, delt: float, allocated_volume: NDArray[np.float64]
) -> None:
def add_ponding_volume_msw(self, allocated_volume: NDArray[np.float64]) -> None:
if self.mapping.msw2rib is not None:
# flux from metaswap ponding to Ribasim
# sw_ponding volumes are accumulated over the delt_gw timestep,
# resulting in a total volume per delt_gw
if "sw_ponding" in self.mapping.msw2rib:
allocated_flux_sec = allocated_volume / days_to_seconds(delt)
self.demands["sw_ponding"] += self.mapping.msw2rib["sw_ponding"].dot(
allocated_flux_sec
allocated_volume
)[:]

def to_ribasim(self) -> None:
demand = self.demand
# negative demand in exchange class means infiltration from Ribasim
def flux_to_ribasim(self, delt_gw: float, delt_sw: float) -> None:
demand_per_subtimestep = self.get_demand_flux_sec(delt_gw, delt_sw)

# exchange to Ribasim; negative demand in exchange class means infiltration from Ribasim
coupled_index = self.mapping.coupled_mod2rib
self.ribasim_infiltration[coupled_index] = np.where(demand < 0, -demand, 0)[
coupled_index
]
self.ribasim_drainage[coupled_index] = np.where(demand > 0, demand, 0)[
coupled_index
]

def to_modflow(self, realised: NDArray[np.float64]) -> None:
super().compute_realised(realised)
self.ribasim_infiltration[coupled_index] = np.where(
demand_per_subtimestep < 0, -demand_per_subtimestep, 0
)[coupled_index]
self.ribasim_drainage[coupled_index] = np.where(
demand_per_subtimestep > 0, demand_per_subtimestep, 0
)[coupled_index]

def flux_to_modflow(
self, realised_volume: NDArray[np.float64], delt_gw: float
) -> None:
super().compute_realised(realised_volume)
for key in self.mf6_active_river_api_packages.keys():
realised_fraction = np.where(
np.isclose(self.demands_negative[key], 0.0),
Expand All @@ -212,10 +207,43 @@ def to_modflow(self, realised: NDArray[np.float64]) -> None:
)
# correction only applies to MF6 cells which negatively contribute to the Ribasim volumes
# correction as extraction from MF6 model.
# demands in exchange class are volumes per delt_gw, RHS needs a flux in m3/day
self.mf6_active_river_api_packages[key].rhs[:] = -(
np.minimum(self.demands_mf6[key], 0.0) * days_to_seconds(1.0)
np.minimum(self.demands_mf6[key] / delt_gw, 0.0)
) * (1 - self.mapping.map_rib2mod_flux[key].dot(realised_fraction))

def get_demand_flux_sec(self, delt_gw: float, delt_sw: float) -> Any:
# returns the MODFLOW6 demands and MetaSWAP demands as a flux in m3/s
mf6_labels = list(self.demands.keys())
mf6_labels.remove("sw_ponding")
demands = {key: self.demands[key] for key in mf6_labels}
mf6_demand_flux = np.stack(list(demands.values())).sum(axis=0) / delt_gw
msw_demand_flux = (
self.demands["sw_ponding"] - self.exchanged_ponding_per_dtsw
) / delt_sw

# update the ponding demand volume exchanged to Ribasim.
self.exchanged_ponding_per_dtsw[:] = self.demands["sw_ponding"][:]

return (mf6_demand_flux + msw_demand_flux) / day_to_seconds

def log_demands(self, current_time: float) -> None:
for key, array in self.demands.items():
if "sw_ponding" not in key:
self.exchange_logger.log_exchange(
("exchange_demand_" + key), array, current_time
)
for key in self.mf6_active_river_api_packages.keys():
if "sw_ponding" not in key:
self.mf6_active_river_api_packages[key].rhs[:]
self.exchange_logger.log_exchange(
("exchange_correction_" + key), array, current_time
)
self.exchange_logger.log_exchange(
("exchange_demand_sw_ponding"),
self.demands["sw_ponding"],
current_time,
)


def days_to_seconds(day: float) -> float:
return day * 86400
day_to_seconds = 86400
61 changes: 31 additions & 30 deletions imod_coupler/drivers/ribametamod/ribametamod.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,9 @@ def couple_metaswap(self) -> dict[str, Any]:

# Get all MetaSWAP pointers, relevant for coupling with Ribasim
if self.has_ribasim:
self.msw_ponding = self.msw.get_surfacewater_ponding_allocation_ptr()
self.msw_ponding_volume = (
self.msw.get_surfacewater_ponding_allocation_ptr()
)
self.delt_sw = self.msw.get_sw_time_step()
# add to return dict
arrays["ribmsw_nbound"] = np.size(
Expand Down Expand Up @@ -343,31 +345,32 @@ def couple(self) -> None:
)

def update_ribasim_metaswap(self) -> None:
self.subtimesteps_sw = range(1, int(self.delt_gw / self.delt_sw) + 1)
nsubtimesteps = self.delt_gw / self.delt_sw
self.msw.prepare_time_step_noSW(self.delt_gw)
for timestep_sw in self.subtimesteps_sw:

for timestep_sw in range(1, int(nsubtimesteps) + 1):
self.msw.prepare_surface_water_time_step(timestep_sw)
self.exchange.add_ponding_msw(self.delt_sw, self.msw_ponding)
self.exchange.add_ponding_volume_msw(self.msw_ponding_volume)
if self.enable_sprinkling_surface_water:
self.exchange_sprinkling_demand_msw2rib(self.delt_sw)
self.exchange_sprinkling_demand_msw2rib()
self.ribasim_user_realized[:] = (
0.0 # reset cummulative for the next timestep
)
# exchange summed volumes to Ribasim
self.exchange.to_ribasim()
self.exchange.flux_to_ribasim(self.delt_gw, self.delt_sw)
# update Ribasim per delt_sw
self.current_time += self.delt_sw
self.ribasim.update_until(days_to_seconds(self.current_time))
self.ribasim.update_until(day_to_seconds * self.current_time)
# get realised values on wateruser nodes
if self.enable_sprinkling_surface_water:
self.exchange_sprinkling_flux_realised_msw2rib(self.delt_sw)
self.msw.finish_surface_water_time_step(timestep_sw)
self.exchange_sprinkling_flux_realised_msw2rib()
self.msw.finish_surface_water_time_step(timestep_sw)

def update_ribasim(self) -> None:
# exchange summed volumes to Ribasim
self.exchange.to_ribasim()
self.exchange.flux_to_ribasim(self.delt_gw, self.delt_gw)
# update Ribasim per delt_gw
self.ribasim.update_until(days_to_seconds(self.get_current_time()))
self.ribasim.update_until(day_to_seconds * self.get_current_time())

def update(self) -> None:
if self.has_metaswap:
Expand All @@ -378,15 +381,16 @@ def update(self) -> None:

if self.has_ribasim:
self.exchange_rib2mod()
self.exchange_mod2rib()

if self.has_ribasim:
if self.has_metaswap:
self.update_ribasim_metaswap()
else:
self.update_ribasim()
self.ribasim_drainage_sum -= self.ribasim_infiltration_sum
self.exchange.to_modflow(
self.ribasim_drainage_sum / days_to_seconds(self.delt_gw)
self.exchange.flux_to_modflow(
(self.ribasim_drainage_sum - self.ribasim_infiltration_sum),
self.delt_gw,
)
self.exchange.log_demands(self.get_current_time())

Expand Down Expand Up @@ -445,15 +449,20 @@ def exchange_rib2mod(self) -> None:
self.exchange.reset()
# exchange stage and compute flux estimates over MODFLOW 6 timestep
self.exchange_stage_rib2mod()
self.exchange.add_flux_estimate_mod(self.delt_gw, self.mf6_head)

def exchange_mod2rib(self) -> None:
self.exchange.add_flux_estimate_mod(self.mf6_head, self.delt_gw)
# reset Ribasim pointers
self.ribasim_infiltration_sum[:] = 0.0
self.ribasim_drainage_sum[:] = 0.0

def exchange_sprinkling_demand_msw2rib(self, delt: float) -> None:
def exchange_sprinkling_demand_msw2rib(self) -> None:
# flux demand from metaswap sprinkling to Ribasim (demand)
self.msw_sprinkling_demand_sec = (
self.msw.get_surfacewater_sprinking_demand_ptr() / days_to_seconds(delt)
self.msw.get_surfacewater_sprinking_demand_ptr()
/ (self.delt_sw * day_to_seconds)
)

self.mapped_sprinkling_demand = self.mapping.msw2rib["sw_sprinkling"].dot(
-self.msw_sprinkling_demand_sec
) # flip sign since ribasim expect a positive value for demand
Expand All @@ -467,29 +476,22 @@ def exchange_sprinkling_demand_msw2rib(self, delt: float) -> None:
self.current_time,
)

def exchange_sprinkling_flux_realised_msw2rib(self, delt: float) -> None:
def exchange_sprinkling_flux_realised_msw2rib(self) -> None:
msw_sprinkling_realized = self.msw.get_surfacewater_sprinking_realised_ptr()

nonzero_user_indices = np.flatnonzero(self.mapped_sprinkling_demand)

# maximize realised array by demand values
self.ribasim_user_realized[:] = np.minimum(
self.ribasim_user_realized[:],
self.mapped_sprinkling_demand[:] * days_to_seconds(self.delt_sw),
)

self.realised_fractions_swspr[:] = 0.0
self.realised_fractions_swspr[:] = 1.0 # all realized for non-coupled svats
self.realised_fractions_swspr[nonzero_user_indices] = (
self.ribasim_user_realized[nonzero_user_indices]
/ days_to_seconds(self.delt_sw)
/ (self.delt_sw * day_to_seconds)
) / self.mapped_sprinkling_demand[nonzero_user_indices]

msw_sprfrac_realised = (
self.realised_fractions_swspr * self.mapping.msw2rib["sw_sprinkling"]
)
msw_sprinkling_realized[:] = (
(self.msw_sprinkling_demand_sec * days_to_seconds(self.delt_sw))
* msw_sprfrac_realised
self.msw.get_surfacewater_sprinking_demand_ptr() * msw_sprfrac_realised
)[:]
self.ribasim_user_realized[:] = 0.0 # reset cummulative for the next timestep
self.exchange_logger.log_exchange(
Expand Down Expand Up @@ -579,5 +581,4 @@ def get_api_packages(
return dict(zip(return_labels, return_values))


def days_to_seconds(day: float) -> float:
return day * 86400
day_to_seconds = 86400.0
2 changes: 1 addition & 1 deletion imod_coupler/kernelwrappers/mf6_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ def get_flux_estimate(
) -> NDArray[np.float64]:
"""
Returns the river fluxes consistent with current head, river stage and conductance.
a simple linear model is used: flux = conductance * (stage - max(head, bottom))
a simple linear model is used: flux[m3/d] = conductance[m2/d] * (stage[m] - max(head[m], bottom[m]))
Bottom is the level of the river bottom.
This function does not use the HCOF and RHS for calculating the flux, bacause it is used
Expand Down
Loading

0 comments on commit 5a5633b

Please sign in to comment.