diff --git a/bmi_config_files/LowerColorado.yml b/bmi_config_files/LowerColorado.yml new file mode 100644 index 000000000..ee1ff325c --- /dev/null +++ b/bmi_config_files/LowerColorado.yml @@ -0,0 +1,4 @@ +flag: + '-f' +file: + 'test/LowerColorado_TX/test_AnA_bmi.yaml' \ No newline at end of file diff --git a/bmi_config_files/standard_AnA.yml b/bmi_config_files/standard_AnA.yml new file mode 100644 index 000000000..384eb9aea --- /dev/null +++ b/bmi_config_files/standard_AnA.yml @@ -0,0 +1,4 @@ +flag: + '-f' +file: + 'test/configs/standard_AnA.yaml' \ No newline at end of file diff --git a/run-troute-with-bmi.ipynb b/run-troute-with-bmi.ipynb new file mode 100644 index 000000000..d01e5749c --- /dev/null +++ b/run-troute-with-bmi.ipynb @@ -0,0 +1,388 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "7b663e55-9edb-40a7-ae1d-380880c6d077", + "metadata": {}, + "source": [ + "# Basic Model Interface (BMI) for streamflow routing using t-route" + ] + }, + { + "cell_type": "markdown", + "id": "9d8b1e3b-38e3-421a-b7d0-beafca9c853e", + "metadata": {}, + "source": [ + "This t-route network was developed for use in the Next Generation National Water Model (Nextgen). Nextgen runs models with Basic Model Interface (BMI)." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "0708364e-4ecf-4fb1-bad4-78ec60c9a1c3", + "metadata": {}, + "outputs": [], + "source": [ + "sys.path.append(\"src/\")\n", + "import bmi_troute # This is the BMI t-route that we will be running from the file: bmi_troute.py \n", + "import troute.main_utilities as mu # This is used to read q_lateral data from files, won't be needed when t-route bmi is run with model engine\n", + "import troute.nhd_preprocess as nhd_prep # This is used to read q_lateral data from files, won't be needed when t-route bmi is run with model engine" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "a53b4945-7966-4df4-a788-fe997fb711f7", + "metadata": {}, + "outputs": [], + "source": [ + "model = bmi_troute.bmi_troute()" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "a2ededa4-6bd9-474a-b014-f104867428f7", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "creating supernetwork connections set\n", + "supernetwork connections set complete\n", + "setting waterbody and channel initial states ...\n", + "waterbody and channel initial states complete\n" + ] + } + ], + "source": [ + "model.initialize(bmi_cfg_file='test/BMI/test_AnA_bmi.yaml')" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "eb34934c-afe4-47a5-9f81-8029ef0453e9", + "metadata": {}, + "outputs": [], + "source": [ + "(run_sets, da_sets, parity_sets) = mu.build_run_sets(model._network,\n", + " model._forcing_parameters,\n", + " model._compute_parameters, \n", + " model._data_assimilation_parameters,\n", + " model._output_parameters,\n", + " model._parity_parameters)\n", + "\n", + "q_lateral, coastal_boundary_depth_df = nhd_prep.nhd_forcing(run_sets[0], \n", + " model._forcing_parameters, \n", + " model._hybrid_parameters,\n", + " model._network.segment_index,\n", + " model._compute_parameters.get('cpu_pool', None),\n", + " model._network._t0, \n", + " model._network._coastal_boundary_depth_df,\n", + " )\n", + "\n", + "data_assimilation = mu.build_data_assimilation(model._network,\n", + " model._data_assimilation_parameters,\n", + " model._waterbody_parameters,\n", + " [], #da_sets[0],\n", + " model._forcing_parameters,\n", + " model._compute_parameters)" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "d3661d08-1340-4765-a676-53ca361d72ba", + "metadata": {}, + "outputs": [], + "source": [ + "model.set_value('land_surface_water_source__volume_flow_rate', q_lateral.to_numpy())\n", + "#model.set_value('coastal_boundary__depth', coastal_boundary_depth_df.to_numpy())\n", + "#model.set_value('usgs_gage_observation__volume_flow_rate', data_assimilation.reservoir_usgs_df.to_numpy())\n", + "#model.set_value('usace_gage_observation__volume_flow_rate', src)\n", + "#model.set_value('rfc_gage_observation__volume_flow_rate', src)" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "954fe3a0-e75f-4629-af6d-5b8ba9e9428c", + "metadata": {}, + "outputs": [], + "source": [ + "model.update()" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "e71a7f52-8bd2-44b5-950e-bc9e6de515ae", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "1611674 0.000091\n", + "1611676 0.000085\n", + "3763126 0.034257\n", + "3762962 0.002928\n", + "3762964 0.001314\n", + " ... \n", + "942080042 0.303696\n", + "942080043 0.763048\n", + "942080044 3.949975\n", + "942080045 1.003813\n", + "942090009 0.022306\n", + "Name: (11, q), Length: 10806, dtype: float32" + ] + }, + "execution_count": 7, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "model.get_value('channel_exit_water_x-section__volume_flow_rate')" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "616ba1d9-56eb-4903-b6e4-e854f127f02f", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "3600.0" + ] + }, + "execution_count": 8, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "model.get_current_time()" + ] + }, + { + "cell_type": "markdown", + "id": "b6ba0ea6-3545-4fb4-a820-8dd519d260b0", + "metadata": {}, + "source": [ + "Now utilize the update_until() function to run t-route for only a portion of the total time" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "9cb21c69-1638-45ad-82d7-258cb69f5d21", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "creating supernetwork connections set\n", + "supernetwork connections set complete\n", + "setting waterbody and channel initial states ...\n", + "waterbody and channel initial states complete\n" + ] + } + ], + "source": [ + "model.initialize(bmi_cfg_file='test/BMI/test_AnA_bmi.yaml')" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "adae192f-6058-4647-b074-e5d891732fd5", + "metadata": {}, + "outputs": [], + "source": [ + "model.set_value('land_surface_water_source__volume_flow_rate', q_lateral.to_numpy())\n", + "#model.set_value('coastal_boundary__depth', coastal_boundary_depth_df.to_numpy())\n", + "#model.set_value('usgs_gage_observation__volume_flow_rate', src)\n", + "#model.set_value('usace_gage_observation__volume_flow_rate', src)\n", + "#model.set_value('rfc_gage_observation__volume_flow_rate', src)" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "98cab86c-a288-4ddb-95c4-75323dad1b7f", + "metadata": {}, + "outputs": [], + "source": [ + "model.update_until(1800) # input value is seconds. 1800s = 0.5 hours" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "690ef259-be43-4c07-808b-f4cd2c5ede77", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "1800.0" + ] + }, + "execution_count": 12, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "model.get_current_time()" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "id": "55af3e52-f9ec-46c4-8f75-5c53784f48a2", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "1611674 0.000084\n", + "1611676 0.000083\n", + "3763126 0.034257\n", + "3762962 0.002928\n", + "3762964 0.001314\n", + " ... \n", + "942080042 0.303720\n", + "942080043 0.763046\n", + "942080044 3.948903\n", + "942080045 1.003609\n", + "942090009 0.022577\n", + "Name: (5, q), Length: 10806, dtype: float32" + ] + }, + "execution_count": 13, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "model.get_value('channel_exit_water_x-section__volume_flow_rate')" + ] + }, + { + "cell_type": "markdown", + "id": "268d9452-5083-4996-b910-bbcf031ff0ae", + "metadata": {}, + "source": [ + "Now call update() to run t-route for the remainder of the full model run" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "id": "52902573-41ae-40c9-b3dc-e790ead208c3", + "metadata": {}, + "outputs": [], + "source": [ + "model.update()" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "id": "92c0e23c-0c0b-401c-9f31-ea41f8b11aa8", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "3600.0" + ] + }, + "execution_count": 15, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "model.get_current_time()" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "id": "4c618714-0415-4aa2-a5d6-c49c582f008d", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "1611674 0.000091\n", + "1611676 0.000085\n", + "3763126 0.034257\n", + "3762962 0.002928\n", + "3762964 0.001314\n", + " ... \n", + "942080042 0.303696\n", + "942080043 0.763048\n", + "942080044 3.949975\n", + "942080045 1.003813\n", + "942090009 0.022306\n", + "Name: (5, q), Length: 10806, dtype: float32" + ] + }, + "execution_count": 16, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "model.get_value('channel_exit_water_x-section__volume_flow_rate')" + ] + }, + { + "cell_type": "markdown", + "id": "95b79ad7-9b56-4f93-b150-1d6e53d65d94", + "metadata": {}, + "source": [ + "Finalize the model by deleting all internal objects" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9479d297-9664-4942-8274-ecef63b45b79", + "metadata": {}, + "outputs": [], + "source": [ + "model.finalize()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python [conda env:miniconda3]", + "language": "python", + "name": "conda-env-miniconda3-py" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.7" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/src/bmi_troute.py b/src/bmi_troute.py new file mode 100644 index 000000000..fe6a43a27 --- /dev/null +++ b/src/bmi_troute.py @@ -0,0 +1,631 @@ +"""Basic Model Interface implementation for t-route.""" + +import numpy as np +import pandas as pd +from bmipy import Bmi +from pathlib import Path +import yaml +from datetime import datetime, timedelta + +import troute.main_utilities as tr +import troute.nhd_network_utilities_v02 as nnu + + + +class bmi_troute(Bmi): + + def __init__(self): + """Create a Bmi troute model that is ready for initialization.""" + super(bmi_troute, self).__init__() + #self._model = None + self._values = {} + #self._var_units = {} + self._var_loc = "node" + self._var_grid_id = 0 + #self._grids = {} + #self._grid_type = {} + + self._start_time = 0.0 + self._end_time = np.finfo("d").max + self._time_units = "s" + + #---------------------------------------------- + # Required, static attributes of the model + #---------------------------------------------- + _att_map = { + 'model_name': 'T-Route for Next Generation NWM', + 'version': '', + 'author_name': '', + 'grid_type': 'scalar', + 'time_step_size': 1, + #'time_step_type': 'donno', #unused + #'step_method': 'none', #unused + #'time_units': '1 hour' #NJF Have to drop the 1 for NGEN to recognize the unit + 'time_units': 'seconds' } + + #--------------------------------------------- + # Input variable names (CSDMS standard names) + #--------------------------------------------- + _input_var_names = ['land_surface_water_source__volume_flow_rate', + 'coastal_boundary__depth', #FIXME: this variable isn't a standard CSDMS name...couldn't find one more appropriate + 'usgs_gage_observation__volume_flow_rate', #FIXME: this variable isn't a standard CSDMS name...couldn't find one more appropriate + 'usace_gage_observation__volume_flow_rate', #FIXME: this variable isn't a standard CSDMS name...couldn't find one more appropriate + 'rfc_gage_observation__volume_flow_rate' #FIXME: this variable isn't a standard CSDMS name...couldn't find one more appropriate + ] + + #--------------------------------------------- + # Output variable names (CSDMS standard names) + #--------------------------------------------- + _output_var_names = ['channel_exit_water_x-section__volume_flow_rate', + 'channel_water_flow__speed', + 'channel_water__mean_depth', + 'lake_water~incoming__volume_flow_rate', + 'lake_water~outgoing__volume_flow_rate', + 'lake_surface__elevation' #FIXME: this variable isn't a standard CSDMS name...couldn't find one more appropriate + ] + + #------------------------------------------------------ + # Create a Python dictionary that maps CSDMS Standard + # Names to the model's internal variable names. + #------------------------------------------------------ + _var_name_units_map = { + 'channel_exit_water_x-section__volume_flow_rate':['streamflow_cms','m3 s-1'], + 'channel_water_flow__speed':['streamflow_ms','m s-1'], + 'channel_water__mean_depth':['streamflow_m','m'], + 'lake_water~incoming__volume_flow_rate':['waterbody_cms','m3 s-1'], + 'lake_water~outgoing__volume_flow_rate':['waterbody_cms','m3 s-1'], + 'lake_surface__elevation':['waterbody_m','m'], + #-------------- Dynamic inputs -------------------------------- + 'land_surface_water_source__volume_flow_rate':['streamflow_cms','m3 s-1'], + 'coastal_boundary__depth':['depth_m', 'm'], + 'usgs_gage_observation__volume_flow_rate':['streamflow_cms','m3 s-1'], + 'usace_gage_observation__volume_flow_rate':['streamflow_cms','m3 s-1'], + 'rfc_gage_observation__volume_flow_rate':['streamflow_cms','m3 s-1'] + } + + #------------------------------------------------------ + # A list of static attributes. Not all these need to be used. + #------------------------------------------------------ + _static_attributes_list = [] + + + #------------------------------------------------------------ + #------------------------------------------------------------ + # BMI: Model Control Functions + #------------------------------------------------------------ + #------------------------------------------------------------ + + #------------------------------------------------------------------- + def initialize(self, bmi_cfg_file=None): + + args = tr._handle_args_v03(['-f', bmi_cfg_file]) + + # -------------- Read in the BMI configuration -------------------------# + bmi_cfg_file = Path(bmi_cfg_file) + # ----- Create some lookup tabels from the long variable names --------# + self._var_name_map_long_first = {long_name:self._var_name_units_map[long_name][0] for \ + long_name in self._var_name_units_map.keys()} + self._var_name_map_short_first = {self._var_name_units_map[long_name][0]:long_name for \ + long_name in self._var_name_units_map.keys()} + self._var_units_map = {long_name:self._var_name_units_map[long_name][1] for \ + long_name in self._var_name_units_map.keys()} + + # This will direct all the next moves. + if bmi_cfg_file is not None: + + with bmi_cfg_file.open('r') as fp: + cfg = yaml.safe_load(fp) + self._cfg_bmi = self._parse_config(cfg) + else: + print("Error: No configuration provided, nothing to do...") + + # ------------- Initialize t-route model ------------------------------# + (self._network, + self._log_parameters, + self._preprocessing_parameters, + self._supernetwork_parameters, + self._waterbody_parameters, + self._compute_parameters, + self._forcing_parameters, + self._restart_parameters, + self._hybrid_parameters, + self._output_parameters, + self._parity_parameters, + self._data_assimilation_parameters, + self._run_parameters, + ) = tr.initialize_network(args) + + # Set number of time steps (1 hour) + self._nts = 12 + + # -------------- Initalize all the variables --------------------------# + # -------------- so that they'll be picked up with the get functions --# + for var_name in list(self._var_name_units_map.keys()): + # ---------- All the variables are single values ------------------# + # ---------- so just set to zero for now. ------------------# + self._values[var_name] = np.zeros(self._network.dataframe.shape[0]) + setattr( self, var_name, 0 ) + + ''' + # -------------- Update dimensions of DA variables --------------------# + #################################### + # Maximum lookback hours from reservoir configurations + usgs_shape = self._network._waterbody_types_df[self._network._waterbody_types_df['reservoir_type']==2].shape[0] + usace_shape = self._network._waterbody_types_df[self._network._waterbody_types_df['reservoir_type']==3].shape[0] + rfc_shape = self._network._waterbody_types_df[self._network._waterbody_types_df['reservoir_type']==4].shape[0] + + max_lookback_hrs = max(self._data_assimilation_parameters.get('timeslice_lookback_hours'), + self._waterbody_parameters.get('rfc').get('reservoir_rfc_forecasts_lookback_hours')) + + self._values['usgs_gage_observation__volume_flow_rate'] = np.zeros((usgs_shape,max_lookback_hrs*4)) + setattr( self, 'usgs_gage_observation__volume_flow_rate', 0 ) + self._values['usace_gage_observation__volume_flow_rate'] = np.zeros((usace_shape,max_lookback_hrs*4)) + setattr( self, 'usace_gage_observation__volume_flow_rate', 0 ) + self._values['rfc_gage_observation__volume_flow_rate'] = np.zeros((rfc_shape,max_lookback_hrs*4)) + setattr( self, 'rfc_gage_observation__volume_flow_rate', 0 ) + ''' + + self._start_time = 0.0 + self._end_time = self._forcing_parameters.get('dt') * self._forcing_parameters.get('nts') + self._time = 0.0 + self._time_step = self._forcing_parameters.get('dt') + self._time_units = 's' + + def update(self): + """Advance model by one time step.""" + + + # Set input data into t-route objects + self._network._qlateral = pd.DataFrame(self._values['land_surface_water_source__volume_flow_rate'], + index=self._network.dataframe.index.to_numpy()) + self._network._coastal_boundary_depth_df = pd.DataFrame(self._values['coastal_boundary__depth']) + + ''' + ( + self._run_sets, + self._da_sets, + self._parity_sets + ) = tr.build_run_sets(self._network, + self._forcing_parameters, + self._compute_parameters, + self._data_assimilation_parameters, + self._output_parameters, + self._parity_parameters) + + # Create forcing data within network object for first loop iteration + self._network.assemble_forcings( + self._run_sets[0], + self._forcing_parameters, + self._hybrid_parameters, + self._compute_parameters.get('cpu_pool', None)) + ''' + # Create data assimilation object from da_sets for first loop iteration + ###NOTE: this is just a place holder, setting DA variables will be done with set_values... + self._data_assimilation = tr.build_data_assimilation( + self._network, + self._data_assimilation_parameters, + self._waterbody_parameters, + [], #self._da_sets[0], + self._forcing_parameters, + self._compute_parameters) + + ''' + self._run_sets[0]['t0'] = self._network.t0 + self._run_sets[0]['dt'] = self._forcing_parameters.get('dt') + ''' + + # Run routing + ''' + self._run_results = tr.run_routing( + self._network, + self._data_assimilation, + [self._run_sets[0]], + self._da_sets, + self._compute_parameters, + self._forcing_parameters, + self._waterbody_parameters, + self._output_parameters, + self._hybrid_parameters, + self._data_assimilation_parameters, + self._run_parameters, + self._parity_sets) + ''' + + ( + self._run_results, + self._subnetwork_list + ) = tr.nwm_route(self._network.connections, + self._network.reverse_network, + self._network.waterbody_connections, + self._network._reaches_by_tw, + self._compute_parameters.get('parallel_compute_method','serial'), + self._compute_parameters.get('compute_kernel'), + self._compute_parameters.get('subnetwork_target_size'), + self._compute_parameters.get('cpu_pool'), + self._network.t0, + self._time_step, + self._nts, + self._forcing_parameters.get('qts_subdivisions', 12), + self._network.independent_networks, + self._network.dataframe, + self._network.q0, + self._network._qlateral, + self._data_assimilation.usgs_df, + self._data_assimilation.lastobs_df, + self._data_assimilation.reservoir_usgs_df, + self._data_assimilation.reservoir_usgs_param_df, + self._data_assimilation.reservoir_usace_df, + self._data_assimilation.reservoir_usace_param_df, + self._data_assimilation.assimilation_parameters, + self._compute_parameters.get('assume_short_ts', False), + self._compute_parameters.get('return_courant', False), + self._network._waterbody_df, + self._waterbody_parameters, + self._network._waterbody_types_df, + self._network.waterbody_type_specified, + self._network.diffusive_network_data, + self._network.topobathy_df, + self._network.refactored_diffusive_domain, + self._network.refactored_reaches, + [], #subnetwork_list, + self._network.coastal_boundary_depth_df, + self._network.unrefactored_topobathy_df,) + + # update initial conditions with results output + self._network.new_nhd_q0(self._run_results) + self._network.update_waterbody_water_elevation() + + # update t0 + self._network.new_t0(self._time_step,self._nts) + + # get reservoir DA initial parameters for next loop iteration + self._data_assimilation.update(self._run_results, + self._data_assimilation_parameters, + self._run_parameters, + self._network, + [], #self._da_sets[run_set_iterator + 1] + ) + + (self._values['channel_exit_water_x-section__volume_flow_rate'], + self._values['channel_water_flow__speed'], + self._values['channel_water__mean_depth'], + self._values['lake_water~incoming__volume_flow_rate'], + self._values['lake_water~outgoing__volume_flow_rate'], + self._values['lake_surface__elevation'], + ) = tr.create_output_dataframes( + self._run_results, + self._nts, + self._network._waterbody_df, + self._network.link_lake_crosswalk) + + self._time += self._time_step * self._nts + + def update_frac(self, time_frac): + """Update model by a fraction of a time step. + Parameters + ---------- + time_frac : float + Fraction fo a time step. + """ + time_step = self.get_time_step() + self._model.time_step = time_frac * time_step + self.update() + self._model.time_step = time_step + + def update_until(self, then): + """Update model until a particular time. + Parameters + ---------- + then : float + Time to run model until in seconds. + """ + n_steps = (then - self.get_current_time()) / self.get_time_step() + + full_nts = self._nts + self._nts = n_steps + + self.update() + + self._nts = full_nts - n_steps + + ''' + for _ in range(int(n_steps)): + self.update() + self.update_frac(n_steps - int(n_steps)) + ''' + + def finalize(self): + """Finalize model.""" + + self._values = None + self._var_loc = None + self._var_grid_id = None + self._var_name_map_long_first = None + self._var_name_map_short_first = None + self._var_units_map = None + self._cfg_bmi = None + self._network = None + self._log_parameters = None + self._preprocessing_parameters = None + self._supernetwork_parameters = None + self._waterbody_parameters = None + self._compute_parameters = None + self._forcing_parameters = None + self._restart_parameters = None + self._hybrid_parameters = None + self._output_parameters = None + self._parity_parameters = None + self._data_assimilation_parameters = None + self._run_parameters = None + self._nts = None + self._values = None + self._start_time = None + self._end_time = None + self._time = None + self._time_step = None + self._time_units = None + self._data_assimilation = None + self._run_results = None + self._subnetwork_list = None + + def get_var_type(self, var_name): + """Data type of variable. + Parameters + ---------- + var_name : str + Name of variable as CSDMS Standard Name. + Returns + ------- + str + Data type. + """ + return str(self.get_value_ptr(var_name).dtype) + + def get_var_units(self, var_name): + """Get units of variable. + Parameters + ---------- + var_name : str + Name of variable as CSDMS Standard Name. + Returns + ------- + str + Variable units. + """ + return self._var_units[var_name] + + def get_var_nbytes(self, var_name): + """Get units of variable. + Parameters + ---------- + var_name : str + Name of variable as CSDMS Standard Name. + Returns + ------- + int + Size of data array in bytes. + """ + return self.get_value_ptr(var_name).nbytes + + def get_var_itemsize(self, name): + return np.dtype(self.get_var_type(name)).itemsize + + def get_var_location(self, name): + return self._var_loc[name] + + def get_var_grid(self, var_name): + """Grid id for a variable. + Parameters + ---------- + var_name : str + Name of variable as CSDMS Standard Name. + Returns + ------- + int + Grid id. + """ + for grid_id, var_name_list in self._grids.items(): + if var_name in var_name_list: + return grid_id + + def get_grid_rank(self, grid_id): + """Rank of grid. + Parameters + ---------- + grid_id : int + Identifier of a grid. + Returns + ------- + int + Rank of grid. + """ + return len(self._model.shape) + + def get_grid_size(self, grid_id): + """Size of grid. + Parameters + ---------- + grid_id : int + Identifier of a grid. + Returns + ------- + int + Size of grid. + """ + return int(np.prod(self._model.shape)) + + def get_value_ptr(self, var_name): + """Reference to values. + Parameters + ---------- + var_name : str + Name of variable as CSDMS Standard Name. + Returns + ------- + array_like + Value array. + """ + return self._values[var_name] + + def get_value(self, var_name): + """Copy of values. + Parameters + ---------- + var_name : str + Name of variable as CSDMS Standard Name. + Returns + ------- + output_df : pd.DataFrame + Copy of values. + """ + output_df = self.get_value_ptr(var_name) + return output_df + + def get_value_at_indices(self, var_name, dest, indices): + """Get values at particular indices. + Parameters + ---------- + var_name : str + Name of variable as CSDMS Standard Name. + dest : ndarray + A numpy array into which to place the values. + indices : array_like + Array of indices. + Returns + ------- + array_like + Values at indices. + """ + dest[:] = self.get_value_ptr(var_name).take(indices) + return dest + + def set_value(self, var_name, src): + """ + Set model values + + Parameters + ---------- + var_name : str + Name of variable as CSDMS Standard Name. + src : array_like + Array of new values. + """ + val = self.get_value_ptr(var_name) + val[:] = src.reshape(val.shape) + + #self._values[var_name] = src + + def set_value_at_indices(self, name, inds, src): + """Set model values at particular indices. + Parameters + ---------- + var_name : str + Name of variable as CSDMS Standard Name. + src : array_like + Array of new values. + indices : array_like + Array of indices. + """ + val = self.get_value_ptr(name) + val.flat[inds] = src + + def get_component_name(self): + """Name of the component.""" + return self._name + + def get_input_item_count(self): + """Get names of input variables.""" + return len(self._input_var_names) + + def get_output_item_count(self): + """Get names of output variables.""" + return len(self._output_var_names) + + def get_input_var_names(self): + """Get names of input variables.""" + return self._input_var_names + + def get_output_var_names(self): + """Get names of output variables.""" + return self._output_var_names + + def get_grid_shape(self, grid_id, shape): + """Number of rows and columns of uniform rectilinear grid.""" + var_name = self._grids[grid_id][0] + shape[:] = self.get_value_ptr(var_name).shape + return shape + + def get_grid_spacing(self, grid_id, spacing): + """Spacing of rows and columns of uniform rectilinear grid.""" + spacing[:] = self._model.spacing + return spacing + + def get_grid_origin(self, grid_id, origin): + """Origin of uniform rectilinear grid.""" + origin[:] = self._model.origin + return origin + + def get_grid_type(self, grid_id): + """Type of grid.""" + return self._grid_type[grid_id] + + def get_start_time(self): + """Start time of model.""" + return self._start_time + + def get_end_time(self): + """End time of model.""" + return self._end_time + + def get_current_time(self): + return self._time + + def get_time_step(self): + return self._time_step + + def get_time_units(self): + return self._time_units + + def get_grid_edge_count(self, grid): + raise NotImplementedError("get_grid_edge_count") + + def get_grid_edge_nodes(self, grid, edge_nodes): + raise NotImplementedError("get_grid_edge_nodes") + + def get_grid_face_count(self, grid): + raise NotImplementedError("get_grid_face_count") + + def get_grid_face_nodes(self, grid, face_nodes): + raise NotImplementedError("get_grid_face_nodes") + + def get_grid_node_count(self, grid): + """Number of grid nodes. + Parameters + ---------- + grid : int + Identifier of a grid. + Returns + ------- + int + Size of grid. + """ + return self.get_grid_size(grid) + + def get_grid_nodes_per_face(self, grid, nodes_per_face): + raise NotImplementedError("get_grid_nodes_per_face") + + def get_grid_face_edges(self, grid, face_edges): + raise NotImplementedError("get_grid_face_edges") + + def get_grid_x(self, grid, x): + raise NotImplementedError("get_grid_x") + + def get_grid_y(self, grid, y): + raise NotImplementedError("get_grid_y") + + def get_grid_z(self, grid, z): + raise NotImplementedError("get_grid_z") + + def _parse_config(self, cfg): + cfg_list = [cfg.get('flag'),cfg.get('file')] + return cfg_list \ No newline at end of file diff --git a/src/troute-network/troute/DataAssimilation.py b/src/troute-network/troute/DataAssimilation.py index 68d5c8b1e..c084fc179 100644 --- a/src/troute-network/troute/DataAssimilation.py +++ b/src/troute-network/troute/DataAssimilation.py @@ -6,6 +6,7 @@ import time import xarray as xr from collections import defaultdict +from datetime import datetime #FIXME parameterize into construciton showtiming = True diff --git a/src/troute-network/troute/main_utilities.py b/src/troute-network/troute/main_utilities.py new file mode 100644 index 000000000..71f3f17f1 --- /dev/null +++ b/src/troute-network/troute/main_utilities.py @@ -0,0 +1,587 @@ +import argparse +import time +import logging +import pandas as pd +import numpy as np +from datetime import timedelta +#from .input import _input_handler_v03 +import troute.nhd_network_utilities_v02 as nnu +import troute.nhd_io as nhd_io + +#from .output import nwm_output_generator +from troute.NHDNetwork import NHDNetwork +from troute.HYFeaturesNetwork import HYFeaturesNetwork +from troute.DataAssimilation import AllDA +from troute.routing.compute import compute_nhd_routing_v02, compute_diffusive_routing + +LOG = logging.getLogger('') + +def initialize_network(args): + + # unpack user inputs + ( + log_parameters, + preprocessing_parameters, + supernetwork_parameters, + waterbody_parameters, + compute_parameters, + forcing_parameters, + restart_parameters, + hybrid_parameters, + output_parameters, + parity_parameters, + data_assimilation_parameters, + ) = _input_handler_v03(args) + + run_parameters = { + 'dt': forcing_parameters.get('dt'), + 'nts': forcing_parameters.get('nts'), + 'cpu_pool': compute_parameters.get('cpu_pool') + } + +# showtiming = log_parameters.get("showtiming", None) +# if showtiming: +# task_times = {} +# task_times['forcing_time'] = 0 +# task_times['route_time'] = 0 +# task_times['output_time'] = 0 +# main_start_time = time.time() + + cpu_pool = compute_parameters.get("cpu_pool", None) + + # Build routing network data objects. Network data objects specify river + # network connectivity, channel geometry, and waterbody parameters. Also + # perform initial warmstate preprocess. +# if showtiming: +# network_start_time = time.time() + + if "ngen_nexus_file" in supernetwork_parameters: + network = HYFeaturesNetwork(supernetwork_parameters, + waterbody_parameters=waterbody_parameters, + restart_parameters=restart_parameters, + forcing_parameters=forcing_parameters, + verbose=verbose, showtiming=showtiming) + else: + network = NHDNetwork(supernetwork_parameters=supernetwork_parameters, + waterbody_parameters=waterbody_parameters, + restart_parameters=restart_parameters, + forcing_parameters=forcing_parameters, + compute_parameters=compute_parameters, + data_assimilation_parameters=data_assimilation_parameters, + preprocessing_parameters=preprocessing_parameters, + verbose=True, + showtiming=False #showtiming, + ) + +# if showtiming: +# network_end_time = time.time() +# task_times['network_time'] = network_end_time - network_start_time + + all_parameters = [log_parameters, #TODO: Delete this... + preprocessing_parameters, + supernetwork_parameters, + waterbody_parameters, + compute_parameters, + forcing_parameters, + restart_parameters, + hybrid_parameters, + output_parameters, + parity_parameters, + data_assimilation_parameters, + run_parameters + ] + + return network, log_parameters, preprocessing_parameters, supernetwork_parameters, waterbody_parameters, compute_parameters, forcing_parameters, restart_parameters, hybrid_parameters, output_parameters, parity_parameters, data_assimilation_parameters, run_parameters + +def build_run_sets(network, forcing_parameters, compute_parameters, data_assimilation_parameters, + output_parameters, parity_parameters): + + # Create run_sets: sets of forcing files for each loop + run_sets = nnu.build_forcing_sets(forcing_parameters, network.t0) + + # Create da_sets: sets of TimeSlice files for each loop + if "data_assimilation_parameters" in compute_parameters: + da_sets = nnu.build_da_sets(data_assimilation_parameters, run_sets, network.t0) + + # Create parity_sets: sets of CHRTOUT files against which to compare t-route flows + if "wrf_hydro_parity_check" in output_parameters: + parity_sets = nnu.build_parity_sets(parity_parameters, run_sets) + else: + parity_sets = [] + + return run_sets, da_sets, parity_sets + +def build_forcings(network, run, forcing_parameters, hybrid_parameters, compute_parameters): + + cpu_pool = compute_parameters.get('cpu_pool', None) + # Create forcing data within network object for first loop iteration + network.assemble_forcings(run, forcing_parameters, hybrid_parameters, cpu_pool) + + return network + +def build_data_assimilation(network, data_assimilation_parameters, waterbody_parameters, da_run, forcing_parameters, compute_parameters): + + #FIXME: hack to get run_parameters. This is done in input_handler_v2. Probably need + # to find a better way to do this here though... + if not 'run_parameters' in locals(): + run_parameters = {'dt': forcing_parameters.get('dt'), + 'nts': forcing_parameters.get('nts'), + 'cpu_pool': compute_parameters.get('cpu_pool', None)} + + # Create data assimilation object from da_sets for first loop iteration + data_assimilation = AllDA(data_assimilation_parameters, + run_parameters, + waterbody_parameters, + network, + da_run) + +# if showtiming: +# forcing_end_time = time.time() +# task_times['forcing_time'] += forcing_end_time - network_end_time + + return data_assimilation + +def run_routing(network, data_assimilation, run_sets, da_sets, compute_parameters, forcing_parameters, waterbody_parameters, + output_parameters, hybrid_parameters, data_assimilation_parameters, run_parameters, parity_sets): + ''' + + ''' + parallel_compute_method = compute_parameters.get("parallel_compute_method", None) + subnetwork_target_size = compute_parameters.get("subnetwork_target_size", 1) + qts_subdivisions = forcing_parameters.get("qts_subdivisions", 1) + compute_kernel = compute_parameters.get("compute_kernel", "V02-caching") + assume_short_ts = compute_parameters.get("assume_short_ts", False) + return_courant = compute_parameters.get("return_courant", False) + cpu_pool = compute_parameters.get("cpu_pool", 1) + + # Pass empty subnetwork list to nwm_route. These objects will be calculated/populated + # on first iteration of for loop only. For additional loops this will be passed + # to function from inital loop. + subnetwork_list = [None, None, None] + + for run_set_iterator, run in enumerate(run_sets): + + t0 = run.get("t0") + dt = run.get("dt") + nts = run.get("nts") + + if parity_sets: + parity_sets[run_set_iterator]["dt"] = dt + parity_sets[run_set_iterator]["nts"] = nts + +# if showtiming: +# route_start_time = time.time() + + run_results = nwm_route( + network.connections, + network.reverse_network, + network.waterbody_connections, + network._reaches_by_tw, ## check: def name is different from return self._ .. + parallel_compute_method, + compute_kernel, + subnetwork_target_size, + cpu_pool, + network.t0, ## check if t0 is being updated + dt, + nts, + qts_subdivisions, + network.independent_networks, + network.dataframe, + network.q0, + network._qlateral, + data_assimilation.usgs_df, + data_assimilation.lastobs_df, + data_assimilation.reservoir_usgs_df, + data_assimilation.reservoir_usgs_param_df, + data_assimilation.reservoir_usace_df, + data_assimilation.reservoir_usace_param_df, + data_assimilation.assimilation_parameters, + assume_short_ts, + return_courant, + network._waterbody_df, ## check: network._waterbody_df ?? def name is different from return self._ .. + waterbody_parameters, + network._waterbody_types_df, ## check: network._waterbody_types_df ?? def name is different from return self._ .. + network.waterbody_type_specified, + network.diffusive_network_data, + network.topobathy_df, + network.refactored_diffusive_domain, + network.refactored_reaches, + subnetwork_list, + network.coastal_boundary_depth_df, + network.unrefactored_topobathy_df, + ) + + # returns list, first item is run result, second item is subnetwork items + subnetwork_list = run_results[1] + run_results = run_results[0] + +# if showtiming: +# route_end_time = time.time() +# task_times['route_time'] += route_end_time - route_start_time + + # create initial conditions for next loop itteration + network.new_nhd_q0(run_results) + network.update_waterbody_water_elevation() + + if run_set_iterator < len(run_sets) - 1: + # update t0 + network.new_t0(dt,nts) + + # update forcing data + network.assemble_forcings(run_sets[run_set_iterator + 1], + forcing_parameters, + hybrid_parameters, + cpu_pool) + + # get reservoir DA initial parameters for next loop iteration + data_assimilation.update(run_results, + data_assimilation_parameters, + run_parameters, + network, + da_sets[run_set_iterator + 1]) + +# if showtiming: +# forcing_end_time = time.time() +# task_times['forcing_time'] += forcing_end_time - route_end_time + + # TODO move the conditional call to write_lite_restart to nwm_output_generator. +# if showtiming: +# output_start_time = time.time() + + if "lite_restart" in output_parameters: + nhd_io.write_lite_restart( + network.q0, + network._waterbody_df, + t0 + timedelta(seconds = dt * nts), + output_parameters['lite_restart'] + ) + ''' + nwm_output_generator( + run, + run_results, + supernetwork_parameters, + output_parameters, + parity_parameters, + restart_parameters, + parity_sets[run_set_iterator] if parity_parameters else {}, + qts_subdivisions, + compute_parameters.get("return_courant", False), + cpu_pool, + network._waterbody_df, + network._waterbody_types_df, + data_assimilation_parameters, + data_assimilation.lastobs_df, + network.link_gage_df, + network.link_lake_crosswalk, + ) + ''' +# if showtiming: +# output_end_time = time.time() +# task_times['output_time'] += output_end_time - output_start_time + return run_results + + +def _handle_args_v03(argv): + ''' + Handle command line input argument - filepath of configuration file + ''' + parser = argparse.ArgumentParser( + formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) + parser.add_argument( + "-f", + "--custom-input-file", + dest="custom_input_file", + help="Path of a .yaml or .json file containing model configuration parameters. See doc/v3_doc.yaml", + ) + + return parser.parse_args(argv) + +def _input_handler_v03(args): + + ''' + Read user inputs from configuration file and set logging level + + Arguments + --------- + Args (argparse.Namespace): Command line input arguments + + Returns + ------- + log_parameters (dict): Input parameters re logging + preprocessing_parameters (dict): Input parameters re preprocessing + supernetwork_parameters (dict): Input parameters re network extent + waterbody_parameters (dict): Input parameters re waterbodies + compute_parameters (dict): Input parameters re computation settings + forcing_parameters (dict): Input parameters re model forcings + restart_parameters (dict): Input parameters re model restart + hybrid_parameters (dict): Input parameters re MC/diffusive wave model + output_parameters (dict): Input parameters re output writing + parity_parameters (dict): Input parameters re parity assessment + data_assimilation_parameters (dict): Input parameters re data assimilation + ''' + # get name of user configuration file (e.g. test.yaml) + custom_input_file = args.custom_input_file + + # read data from user configuration file + ( + log_parameters, + preprocessing_parameters, + supernetwork_parameters, + waterbody_parameters, + compute_parameters, + forcing_parameters, + restart_parameters, + hybrid_parameters, + output_parameters, + parity_parameters, + data_assimilation_parameters, + ) = nhd_io.read_config_file(custom_input_file) + + ''' + # configure python logger + log_level_set(log_parameters) + LOG = logging.getLogger('') + + # if log level is at or below DEBUG, then check user inputs + if LOG.level <= 10: # DEBUG + check_inputs( + log_parameters, + preprocessing_parameters, + supernetwork_parameters, + waterbody_parameters, + compute_parameters, + forcing_parameters, + restart_parameters, + hybrid_parameters, + output_parameters, + parity_parameters, + data_assimilation_parameters + ) + ''' + return ( + log_parameters, + preprocessing_parameters, + supernetwork_parameters, + waterbody_parameters, + compute_parameters, + forcing_parameters, + restart_parameters, + hybrid_parameters, + output_parameters, + parity_parameters, + data_assimilation_parameters, + ) + +def nwm_route( + downstream_connections, + upstream_connections, + waterbodies_in_connections, + reaches_bytw, + parallel_compute_method, + compute_kernel, + subnetwork_target_size, + cpu_pool, + t0, + dt, + nts, + qts_subdivisions, + independent_networks, + param_df, + q0, + qlats, + usgs_df, + lastobs_df, + reservoir_usgs_df, + reservoir_usgs_param_df, + reservoir_usace_df, + reservoir_usace_param_df, + da_parameter_dict, + assume_short_ts, + return_courant, + waterbodies_df, + waterbody_parameters, + waterbody_types_df, + waterbody_type_specified, + diffusive_network_data, + topobathy, + refactored_diffusive_domain, + refactored_reaches, + subnetwork_list, + coastal_boundary_depth_df, + nonrefactored_topobathy, +): + + ################### Main Execution Loop across ordered networks + + + start_time = time.time() + + if return_courant: + LOG.info( + f"executing routing computation, with Courant evaluation metrics returned" + ) + else: + LOG.info(f"executing routing computation ...") + + start_time_mc = time.time() + results = compute_nhd_routing_v02( + downstream_connections, + upstream_connections, + waterbodies_in_connections, + reaches_bytw, + compute_kernel, + parallel_compute_method, + subnetwork_target_size, # The default here might be the whole network or some percentage... + cpu_pool, + t0, + dt, + nts, + qts_subdivisions, + independent_networks, + param_df, + q0, + qlats, + usgs_df, + lastobs_df, + reservoir_usgs_df, + reservoir_usgs_param_df, + reservoir_usace_df, + reservoir_usace_param_df, + da_parameter_dict, + assume_short_ts, + return_courant, + waterbodies_df, + waterbody_parameters, + waterbody_types_df, + waterbody_type_specified, + subnetwork_list, + ) + LOG.debug("MC computation complete in %s seconds." % (time.time() - start_time_mc)) + # returns list, first item is run result, second item is subnetwork items + subnetwork_list = results[1] + results = results[0] + + # run diffusive side of a hybrid simulation + if diffusive_network_data: + start_time_diff = time.time() + ''' + # retrieve MC-computed streamflow value at upstream boundary of diffusive mainstem + qvd_columns = pd.MultiIndex.from_product( + [range(nts), ["q", "v", "d"]] + ).to_flat_index() + flowveldepth = pd.concat( + [pd.DataFrame(r[1], index=r[0], columns=qvd_columns) for r in results], + copy=False, + ) + ''' + #upstream_boundary_flow={} + #for tw,v in diffusive_network_data.items(): + # upstream_boundary_link = diffusive_network_data[tw]['upstream_boundary_link'] + # flow_ = flowveldepth.loc[upstream_boundary_link][0::3] + # the very first value at time (0,q) is flow value at the first time step after initial time. + # upstream_boundary_flow[tw] = flow_ + + + # call diffusive wave simulation and append results to MC results + results.extend( + compute_diffusive_routing( + results, + diffusive_network_data, + cpu_pool, + t0, + dt, + nts, + q0, + qlats, + qts_subdivisions, + usgs_df, + lastobs_df, + da_parameter_dict, + waterbodies_df, + topobathy, + refactored_diffusive_domain, + refactored_reaches, + coastal_boundary_depth_df, + nonrefactored_topobathy, + ) + ) + LOG.debug("Diffusive computation complete in %s seconds." % (time.time() - start_time_diff)) + + LOG.debug("ordered reach computation complete in %s seconds." % (time.time() - start_time)) + + return results, subnetwork_list + +def create_output_dataframes(results, nts, waterbodies_df, link_lake_crosswalk): + + qvd_columns = pd.MultiIndex.from_product( + [range(int(nts)), ["q", "v", "d"]] + ).to_flat_index() + + flowveldepth = pd.concat( + [pd.DataFrame(r[1], index=r[0], columns=qvd_columns) for r in results], copy=False, + ) + + # create waterbody dataframe for output to netcdf file + i_columns = pd.MultiIndex.from_product( + [range(int(nts)), ["i"]] + ).to_flat_index() + + wbdy = pd.concat( + [pd.DataFrame(r[6], index=r[0], columns=i_columns) for r in results], + copy=False, + ) + + wbdy_id_list = waterbodies_df.index.values.tolist() + + i_lakeout_df = wbdy.loc[wbdy_id_list].iloc[:,-1] + q_lakeout_df = flowveldepth.loc[wbdy_id_list].iloc[:,-3] + d_lakeout_df = flowveldepth.loc[wbdy_id_list].iloc[:,-1] + # lakeout = pd.concat([i_df, q_df, d_df], axis=1) + + # replace waterbody lake_ids with outlet link ids + flowveldepth = _reindex_lake_to_link_id(flowveldepth, link_lake_crosswalk) + + q_channel_df = flowveldepth.iloc[:,-3] + v_channel_df = flowveldepth.iloc[:,-2] + d_channel_df = flowveldepth.iloc[:,-1] + + segment_ids = flowveldepth.index.values.tolist() + + return q_channel_df, v_channel_df, d_channel_df, i_lakeout_df, q_lakeout_df, d_lakeout_df#, wbdy_id_list, + +def _reindex_lake_to_link_id(target_df, crosswalk): + ''' + Utility function for replacing lake ID index values + with link ID values in a dataframe. This is used to + reinedex results dataframes + + Arguments: + ---------- + - target_df (DataFrame): Data frame to be reinexed + - crosswalk (dict): Relates lake ids to outlet link ids + + Returns: + -------- + - target_df (DataFrame): Re-indexed with link ids replacing + lake ids + ''' + + # evaluate intersection of lake ids and target_df index values + # i.e. what are the index positions of lake ids that need replacing? + lakeids = np.fromiter(crosswalk.keys(), dtype = int) + idxs = target_df.index.to_numpy() + lake_index_intersect = np.intersect1d( + idxs, + lakeids, + return_indices = True + ) + + # replace lake ids with link IDs in the target_df index array + linkids = np.fromiter(crosswalk.values(), dtype = int) + idxs[lake_index_intersect[1]] = linkids[lake_index_intersect[2]] + + # (re) set the target_df index + target_df.set_index(idxs, inplace = True) + + return target_df \ No newline at end of file diff --git a/src/troute-nwm/src/nwm_routing/__main__.py b/src/troute-nwm/src/nwm_routing/__main__.py index 596261af6..c8f942826 100644 --- a/src/troute-nwm/src/nwm_routing/__main__.py +++ b/src/troute-nwm/src/nwm_routing/__main__.py @@ -11,6 +11,7 @@ from troute.NHDNetwork import NHDNetwork from troute.HYFeaturesNetwork import HYFeaturesNetwork from troute.DataAssimilation import AllDA +import troute.main_utilities as mu import numpy as np import pandas as pd @@ -36,11 +37,11 @@ High level orchestration of ngen t-route simulations for NWM application ''' def main_v04(argv): - - args = _handle_args_v03(argv) - # unpack user inputs + args = _handle_args_v03(argv) + ( + network, log_parameters, preprocessing_parameters, supernetwork_parameters, @@ -52,67 +53,22 @@ def main_v04(argv): output_parameters, parity_parameters, data_assimilation_parameters, - ) = _input_handler_v03(args) - - run_parameters = { - 'dt': forcing_parameters.get('dt'), - 'nts': forcing_parameters.get('nts'), - 'cpu_pool': compute_parameters.get('cpu_pool') - } - - showtiming = log_parameters.get("showtiming", None) - if showtiming: - task_times = {} - task_times['forcing_time'] = 0 - task_times['route_time'] = 0 - task_times['output_time'] = 0 - main_start_time = time.time() - - cpu_pool = compute_parameters.get("cpu_pool", None) - - # Build routing network data objects. Network data objects specify river - # network connectivity, channel geometry, and waterbody parameters. Also - # perform initial warmstate preprocess. - if showtiming: - network_start_time = time.time() - - if "ngen_nexus_file" in supernetwork_parameters: - network = HYFeaturesNetwork(supernetwork_parameters, - waterbody_parameters=waterbody_parameters, - restart_parameters=restart_parameters, - forcing_parameters=forcing_parameters, - verbose=verbose, showtiming=showtiming) - else: - network = NHDNetwork(supernetwork_parameters, - waterbody_parameters, - restart_parameters, - forcing_parameters, - compute_parameters, - data_assimilation_parameters, - preprocessing_parameters, - verbose=True, - showtiming=showtiming, - ) - - if showtiming: - network_end_time = time.time() - task_times['network_creation_time'] = network_end_time - network_start_time + run_parameters + ) = mu.initialize_network(args) - # Create run_sets: sets of forcing files for each loop - run_sets = nnu.build_forcing_sets(forcing_parameters, network.t0) - - # Create da_sets: sets of TimeSlice files for each loop - if "data_assimilation_parameters" in compute_parameters: - da_sets = nnu.build_da_sets(data_assimilation_parameters, run_sets, network.t0) - - # Create parity_sets: sets of CHRTOUT files against which to compare t-route flows - if "wrf_hydro_parity_check" in output_parameters: - parity_sets = nnu.build_parity_sets(parity_parameters, run_sets) - else: - parity_sets = [] + ( + run_sets, + da_sets, + parity_sets + ) = mu.build_run_sets(network, + forcing_parameters, + compute_parameters, + data_assimilation_parameters, + output_parameters, + parity_parameters) # Create forcing data within network object for first loop iteration - network.assemble_forcings(run_sets[0], forcing_parameters, hybrid_parameters, cpu_pool) + network.assemble_forcings(run_sets[0], forcing_parameters, hybrid_parameters, compute_parameters.get('cpu_pool', None)) # Create data assimilation object from da_sets for first loop iteration data_assimilation = AllDA(data_assimilation_parameters, @@ -121,143 +77,20 @@ def main_v04(argv): network, da_sets[0]) - if showtiming: - forcing_end_time = time.time() - task_times['forcing_time'] += forcing_end_time - network_end_time - - - parallel_compute_method = compute_parameters.get("parallel_compute_method", None) - subnetwork_target_size = compute_parameters.get("subnetwork_target_size", 1) - qts_subdivisions = forcing_parameters.get("qts_subdivisions", 1) - compute_kernel = compute_parameters.get("compute_kernel", "V02-caching") - assume_short_ts = compute_parameters.get("assume_short_ts", False) - return_courant = compute_parameters.get("return_courant", False) - - # Pass empty subnetwork list to nwm_route. These objects will be calculated/populated - # on first iteration of for loop only. For additional loops this will be passed - # to function from inital loop. - subnetwork_list = [None, None, None] - - for run_set_iterator, run in enumerate(run_sets): - - t0 = run.get("t0") - dt = run.get("dt") - nts = run.get("nts") - - if parity_sets: - parity_sets[run_set_iterator]["dt"] = dt - parity_sets[run_set_iterator]["nts"] = nts - - if showtiming: - route_start_time = time.time() - - run_results = nwm_route( - network.connections, - network.reverse_network, - network.waterbody_connections, - network._reaches_by_tw, ## check: def name is different from return self._ .. - parallel_compute_method, - compute_kernel, - subnetwork_target_size, - cpu_pool, - network.t0, ## check if t0 is being updated - dt, - nts, - qts_subdivisions, - network.independent_networks, - network.dataframe, - network.q0, - network._qlateral, - data_assimilation.usgs_df, - data_assimilation.lastobs_df, - data_assimilation.reservoir_usgs_df, - data_assimilation.reservoir_usgs_param_df, - data_assimilation.reservoir_usace_df, - data_assimilation.reservoir_usace_param_df, - data_assimilation.assimilation_parameters, - assume_short_ts, - return_courant, - network._waterbody_df, ## check: network._waterbody_df ?? def name is different from return self._ .. - waterbody_parameters, - network._waterbody_types_df, ## check: network._waterbody_types_df ?? def name is different from return self._ .. - network.waterbody_type_specified, - network.diffusive_network_data, - network.topobathy_df, - network.refactored_diffusive_domain, - network.refactored_reaches, - subnetwork_list, - network.coastal_boundary_depth_df, - network.unrefactored_topobathy_df, - ) - - # returns list, first item is run result, second item is subnetwork items - subnetwork_list = run_results[1] - run_results = run_results[0] - - if showtiming: - route_end_time = time.time() - task_times['route_time'] += route_end_time - route_start_time - - # create initial conditions for next loop itteration - network.new_nhd_q0(run_results) - network.update_waterbody_water_elevation() - - # TODO move the conditional call to write_lite_restart to nwm_output_generator. - if "lite_restart" in output_parameters: - nhd_io.write_lite_restart( - network.q0, - network._waterbody_df, - t0 + timedelta(seconds = dt * nts), - output_parameters['lite_restart'] - ) - - if run_set_iterator < len(run_sets) - 1: - # update t0 - network.new_t0(dt,nts) - - # update forcing data - network.assemble_forcings(run_sets[run_set_iterator + 1], - forcing_parameters, - hybrid_parameters, - cpu_pool) - - # get reservoir DA initial parameters for next loop iteration - data_assimilation.update(run_results, - data_assimilation_parameters, - run_parameters, - network, - da_sets[run_set_iterator + 1]) - - if showtiming: - forcing_end_time = time.time() - task_times['forcing_time'] += forcing_end_time - route_end_time - - if showtiming: - output_start_time = time.time() - - nwm_output_generator( - run, - run_results, - supernetwork_parameters, - output_parameters, - parity_parameters, - restart_parameters, - parity_sets[run_set_iterator] if parity_parameters else {}, - qts_subdivisions, - compute_parameters.get("return_courant", False), - cpu_pool, - network._waterbody_df, ## check: network._waterbody_df ?? def name is different from return self._ .. - network._waterbody_types_df, ## check: network._waterbody_types_df ?? def name is different from return self._ .. - data_assimilation_parameters, - data_assimilation.lastobs_df, - network.link_gage_df, - network.link_lake_crosswalk, - ) - - if showtiming: - output_end_time = time.time() - task_times['output_time'] += output_end_time - output_start_time - + run_results = mu.run_routing(network, + data_assimilation, + run_sets, + da_sets, + compute_parameters, + forcing_parameters, + waterbody_parameters, + output_parameters, + hybrid_parameters, + data_assimilation_parameters, + run_parameters, + parity_sets) + + ''' if showtiming: task_times['total_time'] = time.time() - main_start_time @@ -304,7 +137,7 @@ def main_v04(argv): round(task_times['output_time'],2) ) ) - +''' ''' NGEN functions (_v02) diff --git a/test/BMI/10_day_AnA_bmi.yaml b/test/BMI/10_day_AnA_bmi.yaml new file mode 100644 index 000000000..e549af7d7 --- /dev/null +++ b/test/BMI/10_day_AnA_bmi.yaml @@ -0,0 +1,84 @@ +# +#-------------------------------------------------------------------------------- +log_parameters: + #---------- + showtiming: True + log_level : DEBUG +#-------------------------------------------------------------------------------- +network_topology_parameters: + #---------- + supernetwork_parameters: + #---------- + geo_file_path: /glade/p/cisl/nwc/nwmv30_coastal_inland/CONUS/Domain/v21/RouteLink_NWMv2.1.nc + synthetic_wb_segments: + - 4800002 + - 4800004 + - 4800006 + - 4800007 + waterbody_parameters: + #---------- + break_network_at_waterbodies: True + level_pool: + #---------- + level_pool_waterbody_parameter_file_path: /glade/p/cisl/nwc/nwmv30_coastal_inland/CONUS/Domain/v21/LAKEPARM_NWMv2.1.nc + rfc: + #---------- + reservoir_parameter_file : /glade/p/cisl/nwc/nwmv30_coastal_inland/CONUS/Domain/v21/reservoir_index_AnA_NWMv2.1.nc + reservoir_rfc_forecasts : True + reservoir_rfc_forecasts_time_series_path: /glade/p/cisl/nwc/nwmv30_coastal_inland/CONUS/DA_Inputs/BayFlood/rfc_timeseries + reservoir_rfc_forecasts_lookback_hours : 28 +#-------------------------------------------------------------------------------- +compute_parameters: + #---------- + parallel_compute_method: by-network + compute_kernel : V02-structured + assume_short_ts : True + subnetwork_target_size : 10000 + cpu_pool : 36 + restart_parameters: + #---------- + wrf_hydro_channel_restart_file : /glade/p/cisl/nwc/nwmv30_coastal_inland/CONUS/Restarts/BayFlood/v21/HYDRO_RST.2021-10-20_00:00_DOMAIN1 + wrf_hydro_channel_ID_crosswalk_file : /glade/p/cisl/nwc/nwmv30_coastal_inland/CONUS/Domain/v21/RouteLink_NWMv2.1.nc + wrf_hydro_waterbody_restart_file : /glade/p/cisl/nwc/nwmv30_coastal_inland/CONUS/Restarts/BayFlood/v21/HYDRO_RST.2021-10-20_00:00_DOMAIN1 + wrf_hydro_waterbody_ID_crosswalk_file : /glade/p/cisl/nwc/nwmv30_coastal_inland/CONUS/Domain/v21/LAKEPARM_NWMv2.1.nc + wrf_hydro_waterbody_crosswalk_filter_file: /glade/p/cisl/nwc/nwmv30_coastal_inland/CONUS/Domain/v21/RouteLink_NWMv2.1.nc + forcing_parameters: + #---------- + qts_subdivisions : 12 + dt : 300 + qlat_input_folder : /glade/p/cisl/nwc/nwmv30_coastal_inland/CONUS/ChannelForcings/BayFlood/ + qlat_file_pattern_filter: "*.CHRTOUT_DOMAIN1" + nts : 2880 #4032 #2880 + max_loop_size : 1 + data_assimilation_parameters: + #---------- + usgs_timeslices_folder : /glade/p/cisl/nwc/nwmv30_coastal_inland/CONUS/DA_Inputs/BayFlood/usgs_timeslices + usace_timeslices_folder : /glade/p/cisl/nwc/nwmv30_coastal_inland/CONUS/DA_Inputs/BayFlood/usace_timeslices + timeslice_lookback_hours : 24 + qc_threshold : 1 + streamflow_da: + #---------- + streamflow_nudging : True + gage_segID_crosswalk_file: /glade/p/cisl/nwc/nwmv30_coastal_inland/CONUS/Domain/v21/RouteLink_NWMv2.1.nc + crosswalk_gage_field : 'gages' + crosswalk_segID_field : 'link' + wrf_hydro_lastobs_file : #/glade/work/adamw/CONUS/RESTART/nudgingLastObs.2021-08-23_12:00:00.nc ##################### + lastobs_output_folder : #/glade/work/adamw/CONUS/lastobs_troute ###################################### + reservoir_da: + #---------- + reservoir_persistence_usgs : True + reservoir_persistence_usace : True + gage_lakeID_crosswalk_file : /glade/p/cisl/nwc/nwmv30_coastal_inland/CONUS/Domain/v21/reservoir_index_AnA_NWMv2.1.nc + +#-------------------------------------------------------------------------------- +output_parameters: + #---------- + lite_restart: + #---------- + lite_restart_output_directory: /glade/scratch/shorvath/PR_577_tests/10_day_AnA/restart_troute +# chrtout_output: +# #---------- +# wrf_hydro_channel_output_source_folder: /glade/scratch/shorvath/PR_577_tests/10_day_AnA/CHRTOUT_troute +# lakeout_output: /glade/scratch/shorvath/PR_577_tests/10_day_AnA/LAKEOUT + + \ No newline at end of file diff --git a/test/BMI/test_AnA_bmi.yaml b/test/BMI/test_AnA_bmi.yaml new file mode 100644 index 000000000..9a81f81d2 --- /dev/null +++ b/test/BMI/test_AnA_bmi.yaml @@ -0,0 +1,96 @@ +# $ python -m nwm_routing -f -V3 test_AnA.yaml +#-------------------------------------------------------------------------------- +log_parameters: + #---------- + showtiming: True + log_level : DEBUG +#-------------------------------------------------------------------------------- +network_topology_parameters: + #---------- + supernetwork_parameters: + #---------- + geo_file_path: /glade/u/home/shorvath/projects/t-route/test/LowerColorado_TX/domain/RouteLink.nc + mask_file_path: /glade/u/home/shorvath/projects/t-route/test/LowerColorado_TX/domain/coastal_subset.txt + waterbody_parameters: + #---------- + break_network_at_waterbodies: True + level_pool: + #---------- + level_pool_waterbody_parameter_file_path: /glade/u/home/shorvath/projects/t-route/test/LowerColorado_TX/domain/LAKEPARM.nc + rfc: + #---------- + reservoir_parameter_file : /glade/u/home/shorvath/projects/t-route/test/LowerColorado_TX/domain/reservoir_index_AnA.nc + reservoir_rfc_forecasts : False + reservoir_rfc_forecasts_time_series_path: /glade/u/home/shorvath/projects/t-route/test/LowerColorado_TX/rfc_TimeSeries/ + reservoir_rfc_forecasts_lookback_hours : 48 +#-------------------------------------------------------------------------------- +compute_parameters: + #---------- + parallel_compute_method: serial #by-subnetwork-jit-clustered #serial + compute_kernel : V02-structured + assume_short_ts : True + subnetwork_target_size : 10000 + cpu_pool : 36 + restart_parameters: + #---------- + wrf_hydro_channel_restart_file : /glade/u/home/shorvath/projects/t-route/test/LowerColorado_TX/restart/HYDRO_RST.2021-08-23_12:00_DOMAIN1 + #lite_channel_restart_file : restart/RESTART.2020082600_DOMAIN1 + wrf_hydro_channel_ID_crosswalk_file : /glade/u/home/shorvath/projects/t-route/test/LowerColorado_TX/domain/RouteLink.nc + wrf_hydro_waterbody_restart_file : /glade/u/home/shorvath/projects/t-route/test/LowerColorado_TX/restart/HYDRO_RST.2021-08-23_12:00_DOMAIN1 + #lite_waterbody_restart_file : restart/waterbody_restart_202006011200 + wrf_hydro_waterbody_ID_crosswalk_file : /glade/u/home/shorvath/projects/t-route/test/LowerColorado_TX/domain/LAKEPARM.nc + wrf_hydro_waterbody_crosswalk_filter_file: /glade/u/home/shorvath/projects/t-route/test/LowerColorado_TX/domain/RouteLink.nc +# hybrid_parameters: +# run_hybrid_routing: False #True +# diffusive_domain : domain/coastal_domain_subset.yaml +# use_natl_xsections: False #True +# topobathy_domain : domain/final_diffusive_natural_xs.nc +# run_refactored_network: False #True +# refactored_domain: domain/refactored_coastal_domain_subset.yaml +# refactored_topobathy_domain: domain/refac_final_diffusive_natural_xs.nc +# coastal_boundary_domain: domain/coastal_boundary_domain.yaml + forcing_parameters: + #---------- + qts_subdivisions : 12 + dt : 300 # [sec] + qlat_input_folder : /glade/u/home/shorvath/projects/t-route/test/LowerColorado_TX/channel_forcing + qlat_file_pattern_filter : "*.CHRTOUT_DOMAIN1" +# coastal_boundary_input_file : boundary_forcing + nts : 288 # 288 for 1day; 2592 for 9 days + max_loop_size : 1 # [hr] + data_assimilation_parameters: + #---------- + usgs_timeslices_folder : /glade/u/home/shorvath/projects/t-route/test/LowerColorado_TX/usgs_TimeSlice/ + usace_timeslices_folder : /glade/u/home/shorvath/projects/t-route/test/LowerColorado_TX/usace_TimeSlice/ + timeslice_lookback_hours : 48 + qc_threshold : 1 + streamflow_da: + #---------- + streamflow_nudging : False + diffusive_streamflow_nudging : False + gage_segID_crosswalk_file : /glade/u/home/shorvath/projects/t-route/test/LowerColorado_TX/domain/RouteLink.nc + crosswalk_gage_field : 'gages' + crosswalk_segID_field : 'link' + wrf_hydro_lastobs_file : /glade/u/home/shorvath/projects/t-route/test/LowerColorado_TX/lastobs/nudgingLastObs.2021-08-23_12:00:00.nc + lastobs_output_folder : /glade/u/home/shorvath/projects/t-route/test/LowerColorado_TX/lastobs/ + reservoir_da: + #---------- + reservoir_persistence_usgs : False + reservoir_persistence_usace : False + gage_lakeID_crosswalk_file : /glade/u/home/shorvath/projects/t-route/test/LowerColorado_TX/domain/reservoir_index_AnA.nc +#-------------------------------------------------------------------------------- +output_parameters: + #---------- +# test_output: /glade/u/home/shorvath/projects/t-route/test/LowerColorado_TX/output/lcr_flowveldepth.pkl + lite_restart: + #---------- + lite_restart_output_directory: /glade/u/home/shorvath/projects/t-route/test/LowerColorado_TX/restart/ +# chrtout_output: + #---------- +# wrf_hydro_channel_output_source_folder: /glade/u/home/shorvath/projects/t-route/test/LowerColorado_TX/channel_forcing/ +# chanobs_output: + #---------- +# chanobs_output_directory: /glade/u/home/shorvath/projects/t-route/test/LowerColorado_TX/output/ +# chanobs_filepath : lcr_chanobs.nc +# lakeout_output: /glade/u/home/shorvath/projects/t-route/test/LowerColorado_TX/lakeout/ + \ No newline at end of file diff --git a/test/BMI/test_AnA_internal_2_bmi.yaml b/test/BMI/test_AnA_internal_2_bmi.yaml new file mode 100644 index 000000000..a11dcdb8e --- /dev/null +++ b/test/BMI/test_AnA_internal_2_bmi.yaml @@ -0,0 +1,96 @@ +# $ python -m nwm_routing -f -V3 test_AnA.yaml +#-------------------------------------------------------------------------------- +log_parameters: + #---------- + showtiming: True + log_level : DEBUG +#-------------------------------------------------------------------------------- +network_topology_parameters: + #---------- + supernetwork_parameters: + #---------- + geo_file_path: /glade/scratch/adamw/CONUS_1yr/domain/RouteLink_NWMv2.1.nc + mask_file_path: /glade/scratch/donghakim/HurricaneLaura/domain/coastal_10diffusive.txt + waterbody_parameters: + #---------- + break_network_at_waterbodies: True + level_pool: + #---------- + level_pool_waterbody_parameter_file_path: /glade/scratch/adamw/CONUS_1yr/domain/LAKEPARM_NWMv2.1.nc + rfc: + #---------- + reservoir_parameter_file : /glade/scratch/donghakim/HurricaneLaura/domain/reservoir_index_AnA.nc + reservoir_rfc_forecasts : True + reservoir_rfc_forecasts_time_series_path: rfc_TimeSeries/ + reservoir_rfc_forecasts_lookback_hours : 48 +#-------------------------------------------------------------------------------- +compute_parameters: + #---------- + parallel_compute_method: by-subnetwork-jit-clustered #serial + compute_kernel : V02-structured + assume_short_ts : True + subnetwork_target_size : 10000 + cpu_pool : 36 + restart_parameters: + #---------- + wrf_hydro_channel_restart_file: /glade/scratch/donghakim/HurricaneLaura/restart/HYDRO_RST.2020-08-26_00:00_DOMAIN1 + #lite_channel_restart_file: /glade/scratch/donghakim/HurricaneLaura/restart/RESTART.2020082600_DOMAIN1 + wrf_hydro_channel_ID_crosswalk_file : /glade/scratch/adamw/CONUS_1yr/domain/RouteLink_NWMv2.1.nc + wrf_hydro_waterbody_restart_file: /glade/scratch/donghakim/HurricaneLaura/restart/HYDRO_RST.2020-08-26_00:00_DOMAIN1 + #lite_waterbody_restart_file : /glade/scratch/casali/Coastal_Inland/NWM_Coastal_Run_Data/DA_Simulation/Restart_Files/Troute_Restarts/waterbody_restart_202006011200 + wrf_hydro_waterbody_ID_crosswalk_file : /glade/scratch/adamw/CONUS_1yr/domain/LAKEPARM_NWMv2.1.nc + wrf_hydro_waterbody_crosswalk_filter_file: /glade/scratch/adamw/CONUS_1yr/domain/RouteLink_NWMv2.1.nc + hybrid_parameters: + run_hybrid_routing: True + diffusive_domain : /glade/scratch/donghakim/HurricaneLaura/domain/coastal_domain_subset.yaml + use_natl_xsections: True + topobathy_domain : /glade/scratch/donghakim/HurricaneLaura/domain/final_diffusive_natural_xs.nc + run_refactored_network: True + refactored_domain: /glade/scratch/donghakim/HurricaneLaura/domain/refactored_coastal_domain_subset.yaml + refactored_topobathy_domain: /glade/scratch/donghakim/HurricaneLaura/domain/refac_final_diffusive_natural_xs.nc + coastal_boundary_domain: /glade/scratch/donghakim/HurricaneLaura/domain/coastal_boundary_domain.yaml + forcing_parameters: + #---------- + qts_subdivisions : 12 + dt : 300 + qlat_input_folder : /glade/scratch/donghakim/HurricaneLaura/channel_forcing + qlat_file_pattern_filter : "*.CHRTOUT_DOMAIN1" + coastal_boundary_input_file : /glade/scratch/donghakim/HurricaneLaura/domain/schout_1.nc + nts : 288 #288 for 1day; 2592 for 9 days + max_loop_size : 1 #24 # [hr] + data_assimilation_parameters: + #---------- + usgs_timeslices_folder : usgs_TimeSlice/ + usace_timeslices_folder : usace_TimeSlice/ + timeslice_lookback_hours : 48 + qc_threshold : 1 + streamflow_da: + #---------- + streamflow_nudging : False + diffusive_streamflow_nudging : False + gage_segID_crosswalk_file : /glade/scratch/adamw/CONUS_1yr/domain/RouteLink_NWMv2.1.nc + crosswalk_gage_field : 'gages' + crosswalk_segID_field : 'link' + wrf_hydro_lastobs_file : /glade/scratch/casali/Coastal_Inland/NWM_Coastal_Run_Data/DA_Simulation/Restart_Files/Troute_lastobs_outputs/nudgingLastObs.2020-06-01_12:00:00.nc + lastobs_output_folder : lastobs/ + reservoir_da: + #---------- + reservoir_persistence_usgs : True + reservoir_persistence_usace : True + gage_lakeID_crosswalk_file : /glade/scratch/donghakim/HurricaneLaura/domain/reservoir_index_AnA.nc +#-------------------------------------------------------------------------------- +output_parameters: + #---------- +# test_output: /glade/scratch/donghakim/HurricaneLaura/output/HurricaneLaura_flowveldepth.pkl + lite_restart: + #---------- + lite_restart_output_directory: restart/ +# chrtout_output: + #---------- +# wrf_hydro_channel_output_source_folder: /glade/scratch/donghakim/HurricaneLaura/channel_forcing +# chanobs_output: + #---------- +# chanobs_output_directory: output/ +# chanobs_filepath : Chanobs.nc +# lakeout_output: lakeout/ + \ No newline at end of file