Skip to content

Commit

Permalink
Merge pull request #87 from IMMM-SFA/feature/subdomains
Browse files Browse the repository at this point in the history
allow running subdomains; mask inactive cells
  • Loading branch information
thurber authored Jul 19, 2022
2 parents 9a18d1a + 99c678c commit dd30856
Show file tree
Hide file tree
Showing 11 changed files with 178 additions and 38 deletions.
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,30 @@ mosart_wm.get_output_var_names()
supply = mosart_wm.get_value_ptr('supply_water_amount')
```


## subdomains

To simulate only a subset of basins (defined here as a collection of grid cells that share the same outlet cell),
use the configuration option `grid -> subdomain` (see example below) and provide a list of latitude/longitude
coordinate pairs representing each basin of interest (any single coordinate pair within the basin). For example, to
simulate only the Columbia River basin and the Lake Washington regions, one could enter the coordinates for Portland and
Seattle:

> `config.yaml`
> ```yaml
> grid:
> subdomain:
> - 47.6062,-122.3321
> - 45.5152,-122.6784
> unmask_output: true
> ```
By default, the output files will still store empty NaN-like values for grid cells outside the subdomain, but
for even faster simulations and smaller output files set the `grid -> unmask_output` option to `false`. Disabling
this option causes the output files to only store values for grid cells within the subdomain. These smaller files
will likely take extra processing to effectively interoperate with other models.
## visualization
`Model` instances can plot the current value of certain input and output variables (those available from `Model.get_output_var_name` and `Model.get_input_var_names`):
Expand Down
2 changes: 2 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ simulation:
end_date: 1981-05-30

grid:
subdomain: ~
unmask_output: true
path: ./input/domains/mosart.nc
land:
path: ./input/domains/land.nc
Expand Down
2 changes: 1 addition & 1 deletion mosartwmpy/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.2.7"
__version__ = "0.3.0"
12 changes: 12 additions & 0 deletions mosartwmpy/config_defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,18 @@ grid:
longitude: lon
# latitude field name
latitude: lat
# limit simulation to particular basins
# ~ for all, or
# a list of coordinate pairs to simulate only basins that include them, i.e:
# subdomain:
# - 47.6062,-122.3321
# - 45.5152,-122.6784
subdomain: ~
# Specify whether to unmask the output
# - true if output should cover the whole domain regardless of active cells (inactive cells will be filled with NaN-like values)
# - false if output should only consist of active cells
# False will result in faster runs and much smaller output sizes, but the output may be more difficult to interoperate with other models
unmask_output: true
# variable field names
variables:
# fraction of unit draining to outlet field
Expand Down
42 changes: 41 additions & 1 deletion mosartwmpy/grid/grid.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from pathlib import Path
import pickle
import tempfile
from typing import Union
import xarray as xr

from benedict.dicts import benedict as Benedict
Expand Down Expand Up @@ -215,7 +216,24 @@ def __init__(self, config: Benedict = None, parameters: Parameters = None, empty
# ocean
self.upstream_cell_count[i] += 1
self.outlet_id[i] = i


# if a subdomain is desired, update mosart_mask to disable out-of-subdomain cells
subdomain = config.get('grid.subdomain', None)
if subdomain is not None:
if not isinstance(subdomain, list):
subdomain = [subdomain]
outlet_ids = set()
for point in subdomain:
point = [float(x) for x in point.split(',')]
distance = Grid.haversine(self.latitude, self.longitude, point[0], point[1])
index = np.argmin(distance)
outlet_ids.add(self.outlet_id[index])
self.mosart_mask = np.where(
np.in1d(self.outlet_id, list(outlet_ids)),
self.mosart_mask,
0
)

# recalculate area to fill in missing values
# assumes grid spacing is in degrees and uniform
deg2rad = np.pi / 180.0
Expand Down Expand Up @@ -477,3 +495,25 @@ def from_files(path: Path) -> 'Grid':
grid.grid_index_to_reservoirs_map[grid_cell_id] = group.reservoir_id.values

return grid

@staticmethod
def haversine(
lat1: Union[float, np.ndarray],
lon1: Union[float, np.ndarray],
lat2: float,
lon2: float
) -> Union[float, np.ndarray]:
"""Calculates the haversine distance between points
Args:
lat1 (float|ArrayLike[float]): latitude of the origin point or array of points
lon1 (float|ArrayLike[float]): longitude of the origin point or array of points
lat2 (float): latitude of the point in question
lon2 (float): longitude of the point in question
Returns:
float or array of floats of the haversine distance between points
"""
p = 0.017453292519943295
hav = 0.5 - np.cos((lat2 - lat1) * p) / 2 + np.cos(lat1 * p) * np.cos(lat2 * p) * (1 - np.cos((lon2 - lon1) * p)) / 2
return 12742 * np.arcsin(np.sqrt(hav))
7 changes: 4 additions & 3 deletions mosartwmpy/input/demand.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@


# @timing
def load_demand(state: State, config: Benedict, current_time: datetime) -> None:
def load_demand(state: State, config: Benedict, current_time: datetime, mask: np.ndarray) -> None:
"""Loads water demand from file into the state for each grid cell.
Args:
state (State): the current model state; will be mutated
config (Benedict): the model configuration
current_time (datetime): the current time of the simulation
mask (ndarray): mask of active grid cells
"""

# demand path can have placeholders for year and month and day, so check for those and replace if needed
Expand All @@ -29,9 +30,9 @@ def load_demand(state: State, config: Benedict, current_time: datetime) -> None:

# if the demand file has a time axis, use it; otherwise assume data is just 2d
if config.get('water_management.demand.time', None) in demand:
state.grid_cell_demand_rate = np.array(demand[config.get('water_management.demand.demand')].sel({config.get('water_management.demand.time'): current_time}, method='pad')).flatten()
state.grid_cell_demand_rate = np.array(demand[config.get('water_management.demand.demand')].sel({config.get('water_management.demand.time'): current_time}, method='pad')).flatten()[mask]
else:
state.grid_cell_demand_rate = np.array(demand[config.get('water_management.demand.demand')]).flatten()
state.grid_cell_demand_rate = np.array(demand[config.get('water_management.demand.demand')]).flatten()[mask]

# fill missing values with 0
state.grid_cell_demand_rate = np.where(
Expand Down
9 changes: 5 additions & 4 deletions mosartwmpy/input/runoff.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@


# @timing
def load_runoff(state: State, grid: Grid, config: Benedict, current_time: datetime) -> None:
def load_runoff(state: State, grid: Grid, config: Benedict, current_time: datetime, mask: np.ndarray) -> None:
"""Loads runoff from file into the state for each grid cell.
Args:
state (State): the current model state; will be mutated
grid (Grid): the model grid
config (Benedict): the model configuration
current_time (datetime): the current time of the simulation
mask (ndarray): mask of active grid cells
"""

# note that the forcing is provided in mm/s
Expand All @@ -32,16 +33,16 @@ def load_runoff(state: State, grid: Grid, config: Benedict, current_time: dateti
if config.get('runoff.variables.surface_runoff', None) is not None:
state.hillslope_surface_runoff = 0.001 * grid.land_fraction * grid.area * np.array(
runoff[config.get('runoff.variables.surface_runoff')].sel(sel, method='pad')
).flatten()
).flatten()[mask]

if config.get('runoff.variables.subsurface_runoff', None) is not None:
state.hillslope_subsurface_runoff = 0.001 * grid.land_fraction * grid.area * np.array(
runoff[config.get('runoff.variables.subsurface_runoff')].sel(sel, method='pad')
).flatten()
).flatten()[mask]

if config.get('runoff.variables.wetland_runoff', None) is not None:
state.hillslope_wetland_runoff = 0.001 * grid.land_fraction * grid.area * np.array(
runoff[config.get('runoff.variables.wetland_runoff')].sel(sel, method='pad')
).flatten()
).flatten()[mask]

runoff.close()
46 changes: 37 additions & 9 deletions mosartwmpy/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def __init__(self):
self.output_n = 0
self.cores = 1
self.client = None
self.mask = None

def __getitem__(self, item):
return getattr(self, item)
Expand Down Expand Up @@ -140,7 +141,21 @@ def initialize(self, config_file_path: str = './config.yaml', grid: Grid = None,
except Exception as e:
logging.exception('Failed to initialize model; see below for stacktrace.')
raise e


# trim all grid and state arrays to match mosart mask
self.mask = np.where(
self.grid.mosart_mask > 0,
True,
False
)
n = self.mask.size
for key in [key for key in dir(self.grid) if isinstance(getattr(self.grid, key), np.ndarray)]:
if getattr(self.grid, key).size == n:
setattr(self.grid, key, getattr(self.grid, key)[self.mask])
for key in [key for key in dir(self.state) if isinstance(getattr(self.state, key), np.ndarray)]:
if getattr(self.state, key).size == n:
setattr(self.state, key, getattr(self.state, key)[self.mask])

# setup output file averaging
try:
initialize_output(self)
Expand All @@ -160,7 +175,7 @@ def update(self) -> None:
if self.config.get('runoff.read_from_file', False):
# read runoff from file
logging.debug(f'Reading runoff input from file.')
load_runoff(self.state, self.grid, self.config, self.current_time)
load_runoff(self.state, self.grid, self.config, self.current_time, self.mask)
else:
# convert provided runoff from mm/s to m3/s
self.state.hillslope_surface_runoff = 0.001 * self.grid.land_fraction * self.grid.area * self.state.hillslope_surface_runoff
Expand All @@ -175,7 +190,7 @@ def update(self) -> None:
if self.config.get('water_management.demand.read_from_file', False):
logging.debug(f'Reading demand rate input from file.')
# load the demand from file
load_demand(self.state, self.config, self.current_time)
load_demand(self.state, self.config, self.current_time, self.mask)
# only compute new release if it's the very start of simulation or new month
# unless ISTARF mode is enabled, in which case update the release if it's the start of a new day
if self.current_time == datetime.combine(self.config.get('simulation.start_date'), time.min) or \
Expand Down Expand Up @@ -251,7 +266,7 @@ def plot_variable(
log_scale: bool = False,
):
"""Display a colormap of a spatial variable at the current timestep."""
data = self.get_value_ptr(variable).reshape(self.get_grid_shape())
data = self.unmask(self.get_value_ptr(variable)).reshape(self.get_grid_shape())
if log_scale:
data = np.where(data > 0, data, np.nan)
xr.DataArray(
Expand All @@ -268,6 +283,17 @@ def plot_variable(
)
plt.show()

def unmask(self, vector: np.ndarray) -> np.ndarray:
unmasked = np.empty_like(self.mask, dtype=vector.dtype)
if vector.dtype == float:
unmasked[:] = np.nan
elif vector.dtype == int:
unmasked[:] = -9999
elif vector.dtype == bool:
unmasked[:] = False
unmasked[self.mask] = vector
return unmasked

def get_component_name(self) -> str:
return f'mosartwmpy ({self.git_hash})'

Expand Down Expand Up @@ -323,34 +349,36 @@ def get_value(self, name: str, dest: np.ndarray) -> int:
var = next((var for var in IO.inputs + IO.outputs if var.standard_name == name), None)
if var is None:
return 1
dest[:] = self[var.variable_class][var.variable]
dest[:] = self.unmask(self[var.variable_class][var.variable])
return 0

def get_value_ptr(self, name: str) -> np.ndarray:
var = next((var for var in IO.inputs + IO.outputs if var.standard_name == name), None)
if var is None:
raise IOError(f'Variable {name} not found in model input/output definition.')
return self[var.variable_class][var.variable]
return self.unmask(self[var.variable_class][var.variable])

def get_value_at_indices(self, name: str, dest: np.ndarray, inds: np.ndarray) -> int:
var = next((var for var in IO.inputs + IO.outputs if var.standard_name == name), None)
if var is None:
return 1
dest[:] = self[var.variable_class][var.variable][inds]
dest[:] = self.unmask(self[var.variable_class][var.variable])[inds]
return 0

def set_value(self, name: str, src: np.ndarray) -> int:
var = next((var for var in IO.inputs + IO.outputs if var.standard_name == name), None)
if var is None:
return 1
self[var.variable_class][var.variable][:] = src
self[var.variable_class][var.variable][:] = src[self.mask]
return 0

def set_value_at_indices(self, name: str, inds: np.ndarray, src: np.ndarray) -> int:
var = next((var for var in IO.inputs + IO.outputs if var.standard_name == name), None)
if var is None:
return 1
self[var.variable_class][var.variable][inds] = src
unmasked = self.unmask(self[var.variable_class][var.variable])
unmasked[inds] = src
self[var.variable_class][var.variable][:] = unmasked[self.mask]
return 0

def get_grid_type(self, grid: int = 0) -> str:
Expand Down
45 changes: 36 additions & 9 deletions mosartwmpy/output/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ def initialize_output(self):
raise Exception('The `simulation.output_resolution` must be greater than or equal to and evenly divisible by the `simulation.timestep`.')
for output in self.config.get('simulation.output'):
if getattr(self.state, output.get('variable'), None) is not None and len(getattr(self.state, output.get('variable'))) > 0:
data = np.zeros_like(self.state.zeros)
if self.config.get('grid.unmask_output', True):
data = self.unmask(data)
if self.output_buffer is None:
self.output_buffer = pd.DataFrame(self.state.zeros, columns=[output.get('name')])
self.output_buffer = pd.DataFrame(data, columns=[output.get('name')])
else:
self.output_buffer = self.output_buffer.join(pd.DataFrame(self.state.zeros, columns=[output.get('name')]))
self.output_buffer = self.output_buffer.join(pd.DataFrame(data, columns=[output.get('name')]))


# @timing
Expand All @@ -32,7 +35,10 @@ def update_output(self):
self.output_n += 1
for output in self.config.get('simulation.output'):
if getattr(self.state, output.get('variable'), None) is not None and len(getattr(self.state, output.get('variable'))) > 0:
self.output_buffer.loc[:, output.get('name')] += getattr(self.state, output.get('variable'))
data = getattr(self.state, output.get('variable'))
if self.config.get('grid.unmask_output', True):
data = self.unmask(data)
self.output_buffer.loc[:, output.get('name')] += data

# if a new period has begun: average output buffer, write to file, and zero output buffer
if self.current_time.replace(tzinfo=timezone.utc).timestamp() % self.config.get('simulation.output_resolution') == 0:
Expand All @@ -41,7 +47,7 @@ def update_output(self):
self.output_n = 0
for output in self.config.get('simulation.output'):
if getattr(self.state, output.get('variable'), None) is not None and len(getattr(self.state, output.get('variable'))) > 0:
self.output_buffer.loc[:, output.get('name')] = 0.0 * self.state.zeros
self.output_buffer.loc[:, output.get('name')] = 0.0

# check if restart file if need
check_restart(self)
Expand Down Expand Up @@ -75,8 +81,22 @@ def write_output(self):
filename += '.nc'

# create the data frame
frame = pd.DataFrame(self.grid.latitude, columns=['latitude']).join(pd.DataFrame(self.grid.longitude, columns=['longitude'])).join(
pd.DataFrame(np.full(self.get_grid_size(), pd.to_datetime(true_date)), columns=['time'])
latitude = self.grid.latitude
longitude = self.grid.longitude
if self.config.get('grid.unmask_output', True):
longitude, latitude = np.meshgrid(
self.grid.unique_longitudes,
self.grid.unique_latitudes
)
longitude = longitude.flatten()
latitude = latitude.flatten()
frame = pd.DataFrame(
latitude,
columns=['latitude']
).join(
pd.DataFrame(longitude, columns=['longitude'])
).join(
pd.DataFrame(np.full(latitude.size, pd.to_datetime(true_date)), columns=['time'])
).join(
self.output_buffer
).rename(columns={
Expand Down Expand Up @@ -115,10 +135,17 @@ def write_output(self):
nc.close()
else:
if len(self.config.get('simulation.grid_output', [])) > 0:
grid_frame = pd.DataFrame(self.grid.latitude, columns=['latitude']).join(pd.DataFrame(self.grid.longitude, columns=['longitude']))
grid_frame = pd.DataFrame(
latitude, columns=['latitude']
).join(
pd.DataFrame(longitude, columns=['longitude'])
)
for grid_output in self.config.get('simulation.grid_output'):
if getattr(self.grid, grid_output.get('variable'), None) is not None:
grid_frame = grid_frame.join(pd.DataFrame(getattr(self.grid, grid_output.get('variable')), columns=[grid_output.get('variable')]))
data = getattr(self.grid, grid_output.get('variable'))
if self.config.get('grid.unmask_output', True):
data = self.unmask(data)
grid_frame = grid_frame.join(pd.DataFrame(data, columns=[grid_output.get('variable')]))
grid_frame = grid_frame.rename(columns={
'latitude': 'lat',
'longitude': 'lon'
Expand Down Expand Up @@ -164,7 +191,7 @@ def check_restart(self):
def write_restart(self):
"""Writes the state to a netcdf file, with the current simulation time in the file name."""

x = self.state.to_dataframe().to_xarray()
x = self.state.to_dataframe(self.mask).to_xarray()
filename = Path(f'{self.config.get("simulation.output_path")}/{self.name}/restart_files/{self.name}_restart_{self.current_time.year}_{self.current_time.strftime("%m")}_{self.current_time.strftime("%d")}.nc')
x = x.rio.write_crs(4326)
logging.debug(f'Writing restart file: {filename}.')
Expand Down
Loading

0 comments on commit dd30856

Please sign in to comment.