Skip to content

Commit

Permalink
Merge pull request #18 from IMMM-SFA/feature/restart-files
Browse files Browse the repository at this point in the history
support restart files, fixes #15; remove hacked alignment code in pre…
  • Loading branch information
thurber authored Feb 10, 2021
2 parents c51964d + 9bc522e commit 3d73125
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 65 deletions.
31 changes: 2 additions & 29 deletions config_defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,6 @@
### READ ONLY
### To override configuration defaults, edit config.yaml

###
### Batch
### note: so far only supported on PNNL's constance cluster
### TODO this isn't in use yet
###
batch:
# whether to run in batch mode using slurm (otherwise runs in default mode)
enabled: false
# which job queue to use, i.e. slurm or short
queue: slurm
# project to charge the time to
project: IM3
# number of nodes; note: only 1 supported so far
nodes: 4

###
### Multiprocessing
### note: if running on a local machine, it is best not to use all your cores if you plan on doing other things in the meantime
### note: when running in batch mode, all cores are used no matter what is specified here
### TODO this isn't working quite right for WM yet, but works somewhat for just routing
###
multiprocessing:
# whether or not to use multiple cores
enabled: false
# number of cores to use; set to ~ to use all available cores
cores: 4

###
### Simulation
###
Expand All @@ -57,8 +30,8 @@ simulation:
# resume simulation from restart file
# TODO not yet supported
restart_file: ~
# how often to write a restart file; one of 'daily', 'monthly', 'yearly', 'end' -- 'end' meaning only at the end of the simulation
restart_file_frequency: end
# how often to write a restart file; one of 'daily', 'monthly', 'yearly' -- a restart file is also always written at the end of the simulation
restart_file_frequency: yearly
# output write frequency in simulation seconds; must be a multiple of simulation timestep; values will be averaged across the intervening timesteps
# TODO only daily (86400) is currently supported
output_resolution: 86400 # output values once each day, averaged across the timesteps
Expand Down
1 change: 0 additions & 1 deletion mosartwmpy/input/runoff.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ def load_runoff(state, grid, config, current_time):

runoff = open_dataset(config.get('runoff.path'))

# TODO hacking in the three hour offset seen in fortran mosart
sel = {
config.get('runoff.time'): current_time
}
Expand Down
38 changes: 24 additions & 14 deletions mosartwmpy/mosartwmpy.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
import matplotlib.pyplot as plt
import numexpr as ne
import numpy as np
import pandas as pd
import psutil
import regex as re
import subprocess

from benedict import benedict
Expand All @@ -12,6 +14,7 @@
from pathlib import Path
from pathvalidate import sanitize_filename
from timeit import default_timer as timer
from xarray import open_dataset

from mosartwmpy.config.config import get_config, Parameters
from mosartwmpy.grid.grid import Grid
Expand Down Expand Up @@ -86,11 +89,7 @@ def initialize(self, config_file_path: str = None):
except:
pass
# detect available physical cores
max_cores = psutil.cpu_count(logical=False)
requested = self.config.get('multiprocessing.cores', None)
if requested is None or requested > max_cores:
requested = max_cores
self.cores = requested
self.cores = psutil.cpu_count(logical=False)
logging.info(f'Cores: {self.cores}.')
ne.set_num_threads(self.cores)
except Exception as e:
Expand All @@ -108,11 +107,20 @@ def initialize(self, config_file_path: str = None):
try:
# restart file
if self.config.get('simulation.restart_file') is not None and self.config.get('simulation.restart_file') != '':
logging.info('Loading restart file.')
# TODO set current timestep based on restart
# TODO initialize state from restart file
logging.error('Restart file not yet implemented. Aborting.')
raise NotImplementedError
path = self.config.get('simulation.restart_file')
logging.info(f'Loading restart file from: `{path}`.')
# set simulation start time based on file name
date = re.search(r'\d{4}_\d{2}_\d{2}', path)
if date:
date = date[len(date) - 1].split('_')
self.current_time = datetime(int(date[0]), int(date[1]), int(date[2]))
else:
logging.warn('Unable to parse date from restart file name, falling back to configured start date.')
self.current_time = datetime.combine(self.config.get('simulation.start_date'), time.min)
x = open_dataset(path)
self.state = State.from_dataframe(x.to_dataframe())
x.close()
# TODO ensure output file writes still workr
else:
# simulation start time
self.current_time = datetime.combine(self.config.get('simulation.start_date'), time.min)
Expand All @@ -139,14 +147,14 @@ def update(self):
try:
# read runoff
if self.config.get('runoff.enabled', False):
logging.debug(f'Reading runoff input.')
load_runoff(self.state, self.grid, self.config, self.current_time)
# advance timestep
self.current_time += timedelta(seconds=self.config.get('simulation.timestep'))
# read demand
if self.config.get('water_management.enabled', False):
# only read new demand and compute new release if it's the very start of simulation or new time period
# TODO this is currently adjusted to try to match fortran mosart
if self.current_time == datetime.combine(self.config.get('simulation.start_date'), time(3)) or self.current_time == datetime(self.current_time.year, self.current_time.month, 1):
# TODO this currently assumes monthly demand input
if self.current_time == datetime.combine(self.config.get('simulation.start_date'), time.min) or self.current_time == datetime(self.current_time.year, self.current_time.month, 1):
logging.debug(f'Reading demand input.')
# load the demand from file
load_demand(self.state, self.config, self.current_time)
# release water from reservoirs
Expand All @@ -163,6 +171,8 @@ def update(self):
self.state.reservoir_streamflow[:] = self.grid.reservoir_streamflow_schedule.sel({streamflow_time_name: month}).values
# perform simulation for one timestep
update(self.state, self.grid, self.parameters, self.config)
# advance timestep
self.current_time += timedelta(seconds=self.config.get('simulation.timestep'))
except Exception as e:
logging.exception('Failed to complete timestep; see below for stacktrace.')
raise e
Expand Down
46 changes: 26 additions & 20 deletions mosartwmpy/output/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,43 +30,43 @@ def update_output(self):
self.output_buffer.loc[:, output.get('name')] += getattr(self.state, output.get('variable'))

# if a new period has begun: average output buffer, write to file, and zero output buffer
if self.current_time.replace(tzinfo=timezone.utc).timestamp() % self.config.get('simulation.output_resolution') == 75600: # 0: TODO
if self.current_time.replace(tzinfo=timezone.utc).timestamp() % self.config.get('simulation.output_resolution') == 0:
logging.info('Writing to output file.')
self.output_buffer = self.output_buffer / self.output_n
write_output(self)
self.output_n = 0
for output in self.config.get('simulation.output'):
if getattr(self.state, output.get('variable'), None) is not None:
self.output_buffer.loc[:, output.get('name')] = 0.0 * self.state.zeros
# check if restart file if need
check_restart(self)

# check if restart file if need
check_restart(self)

def write_output(self):
# handle writing output to file
# TODO only daily resolution is currently supported - need to support arbitrary resolutions

# logging.info(f'WRM_DEMAND0 sum: {self.output_buffer.WRM_DEMAND0.sum()}')
# logging.info(f'WRM_DEMAND sum: {self.output_buffer.WRM_DEMAND.sum()}')
# logging.info(f'WRM_SUPPLY sum: {self.output_buffer.WRM_SUPPLY.sum()}')
# logging.info(f'QSUR sum: {self.output_buffer.QSUR_LIQ.sum()}')

# check the write frequency to see if writing to new file or appending to existing file
# also construct the file name
period = self.config.get('simulation.output_file_frequency')
is_new_period = False
true_date = self.current_time #- timedelta(days=1) TODO
# use yesterday's date as the file name, to match with what is actually being averaged
true_date = self.current_time if not (self.current_time.hour == 0 and self.current_time.minute == 0 and self.current_time.second == 0) else (self.current_time - timedelta(days=1))
filename = f'./output/{self.name}/{self.name}_{true_date.year}'
if period == 'daily':
filename += f'_{true_date.strftime("%m")}_{true_date.strftime("%d")}'
if self.current_time.hour == 0 and self.current_time.second == 0:
is_new_period = True
if period == 'monthly':
elif period == 'monthly':
filename += f'_{true_date.strftime("%m")}'
if self.current_time.day == 1 and self.current_time.hour == 21: #2 and self.current_time.hour == 0 and self.current_time.second == 0: TODO
if self.current_time.day == 2 and self.current_time.hour == 0 and self.current_time.second == 0:
is_new_period = True
if period == 'yearly':
elif period == 'yearly':
if self.current_time.month == 1 and self.current_time.day == 2 and self.current_time.hour == 0 and self.current_time.second == 0:
is_new_period = True
else:
logging.warn(f'Configuration value for `simulation.output_file_frequency: {period}` is not recognized.')
return
filename += '.nc'

# create the data frame
Expand Down Expand Up @@ -102,6 +102,8 @@ def write_output(self):
# if new period, write to new file and include grid variables, otherwise update file
if not is_new_period:
nc = open_dataset(filename).load()
# slice the existing data to account for restarts
nc = nc.sel(time=slice(None, pd.to_datetime(self.current_time) - pd.Timedelta('1ms')))
frame = concat([nc, frame], dim='time', data_vars='minimal')
nc.close()
else:
Expand Down Expand Up @@ -131,20 +133,24 @@ def check_restart(self):
# check if new restart file is desired
frequency = self.config.get('simulation.restart_file_frequency')
is_needed = False
true_date = self.current_time - timedelta(days=1)
if frequency == 'daily':
if self.current_time.hour == 0 and self.current_time.second == 0:
is_needed = True
if frequency == 'monthly':
if self.current_time.day == 2 and self.current_time.hour == 0 and self.current_time.second == 0:
elif frequency == 'monthly':
if self.current_time.day == 1 and self.current_time.hour == 0 and self.current_time.second == 0:
is_needed = True
if frequency == 'yearly':
if self.current_time.month == 1 and self.current_time.day == 2 and self.current_time.hour == 0 and self.current_time.second == 0:
elif frequency == 'yearly':
if self.current_time.month == 1 and self.current_time.day == 1 and self.current_time.hour == 0 and self.current_time.second == 0:
is_needed = True
if self.current_time.timestamp() >= self.get_end_time():
# always write a restart file at the end of simulation
is_needed = True
if is_needed:
write_restart(self)

def write_restart(self):
# TODO
# need to save state and possibly some wm stuff? ideally just state
pass
# save all state values to file
logging.info('Writing restart file.')
x = self.state.to_dataframe().to_xarray()
filename = f'./output/{self.name}/{self.name}_restart_{self.current_time.year}_{self.current_time.strftime("%m")}_{self.current_time.strftime("%d")}.nc'
x.to_netcdf(filename)
30 changes: 29 additions & 1 deletion mosartwmpy/state/state.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import numpy as np
import pandas as pd
from datetime import datetime, time
from xarray import open_dataset

Expand Down Expand Up @@ -247,4 +248,31 @@ def __init__(self, grid=None, config=None, parameters=None, grid_size=None, empt
'reservoir_potential_evaporation'
]:
setattr(self, var, np.zeros(grid_size))
initialize_reservoir_state(self, grid, config, parameters)
initialize_reservoir_state(self, grid, config, parameters)

def to_dataframe(self):
"""Builds a dataframe from all the state values.
Returns:
pd.DataFrame: a dataframe with all the state values as columns
"""
keys = [key for key in dir(self) if isinstance(getattr(self, key), np.ndarray)]
df = pd.DataFrame()
for key in keys:
df[key] = getattr(self, key)
return df

@staticmethod
def from_dataframe(df):
"""Creates a State instance from a dataframe.
Args:
df (pd.DataFrame): the dataframe from which to build the state
Returns:
State: a State instance populated with the columns from the dataframe
"""
state = State(empty=True)
for key in df.columns:
setattr(state, key, df[key].values)
return state

0 comments on commit 3d73125

Please sign in to comment.