diff --git a/src/troute-network/troute/NHDNetwork.py b/src/troute-network/troute/NHDNetwork.py index 189393200..3568bb2c7 100644 --- a/src/troute-network/troute/NHDNetwork.py +++ b/src/troute-network/troute/NHDNetwork.py @@ -66,12 +66,22 @@ def __init__( self._flowpath_dict = {} self._gl_climatology_df = pd.DataFrame() self._canadian_gage_link_df = pd.DataFrame(columns=['gages','link']).set_index('link') - + self.crosswalk_nex_flowpath_poi() super().__init__() # Create empty dataframe for coastal_boundary_depth_df. This way we can check if # it exists, and only read in SCHISM data during 'assemble_forcings' if it doesn't self._coastal_boundary_depth_df = pd.DataFrame() + + def crosswalk_nex_flowpath_poi(self): + self._nexus_dict = dict() + for key, values in self._connections.items(): + for value in values: + new_key = f"nex-{value}" + new_value = f"wb-{key}" + if new_key not in self.nexus_dict: + self._nexus_dict[new_key] = [] + self._nexus_dict[new_key].append(new_value) def extract_waterbody_connections(rows, target_col, waterbody_null=-9999): """Extract waterbody mapping from dataframe. diff --git a/src/troute-nwm/src/nwm_routing/output.py b/src/troute-nwm/src/nwm_routing/output.py index 893825015..ed2e2f9dc 100644 --- a/src/troute-nwm/src/nwm_routing/output.py +++ b/src/troute-nwm/src/nwm_routing/output.py @@ -6,7 +6,7 @@ import troute.nhd_io as nhd_io from build_tests import parity_check import logging - +from troute.nhd_io import updated_flowveldepth LOG = logging.getLogger('') @@ -47,7 +47,7 @@ def _reindex_lake_to_link_id(target_df, crosswalk): return target_df -def _parquet_output_format_converter(df, start_datetime, dt, configuration, prefix_ids): +def _parquet_output_format_converter(df, start_datetime, dt, configuration, prefix_ids, nexus_dict): ''' Utility function for convert flowveldepth dataframe to a timeseries and to match parquet input format @@ -64,6 +64,14 @@ def _parquet_output_format_converter(df, start_datetime, dt, configuration, pref -------- - timeseries_df (DataFrame): Converted timeseries data frame ''' + nex_id = {} + if prefix_ids == 'nex': + for key, val in nexus_dict.items(): + nex_key = int(key.split('-')[-1]) + nex_id[nex_key] = [int(v.split('-')[-1]) for v in val] + + df = updated_flowveldepth(df, nex_id, seg_id = list(), mask_list = None) + df = df.reset_index().drop('Type', axis=1).set_index('featureID') variable_to_name_map = {"q": "streamflow", "d": "depth", "v": "velocity"} variable_to_units_map = {"streamflow": "m3/s", "velocity": "m/s", "depth": "m"} @@ -502,7 +510,7 @@ def nwm_output_generator( configuration = output_parameters["parquet_output"].get("configuration") prefix_ids = output_parameters["parquet_output"].get("prefix_ids") timeseries_df = _parquet_output_format_converter(flowveldepth, restart_parameters.get("start_datetime"), dt, - configuration, prefix_ids) + configuration, prefix_ids, nexus_dict) parquet_output_segments_str = [prefix_ids + '-' + str(segment) for segment in parquet_output_segments] timeseries_df.loc[timeseries_df['location_id'].isin(parquet_output_segments_str)].to_parquet( diff --git a/test/LowerColorado_TX/test_AnA_V4_NHD.yaml b/test/LowerColorado_TX/test_AnA_V4_NHD.yaml index c53cd8412..a8ad098b5 100644 --- a/test/LowerColorado_TX/test_AnA_V4_NHD.yaml +++ b/test/LowerColorado_TX/test_AnA_V4_NHD.yaml @@ -124,10 +124,6 @@ output_parameters: # stream_output_time: 1 #[hr] # stream_output_type: '.nc' #please select only between netcdf '.nc' or '.csv' or '.pkl' # stream_output_internal_frequency: 30 #[min] it should be order of 5 minutes. For instance if you want to output every hour put 60 - parquet_output: - #--------- - parquet_output_folder: output/ - configuration: short_range - prefix_ids: nex + \ No newline at end of file