Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

updating nexus on parquet file #817

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/troute-network/troute/NHDNetwork.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,13 @@ def __init__(
self._flowpath_dict = {}
self._gl_climatology_df = pd.DataFrame()
self._canadian_gage_link_df = pd.DataFrame(columns=['gages','link']).set_index('link')

self._nexus_dict = dict()
super().__init__()

# Create empty dataframe for coastal_boundary_depth_df. This way we can check if
# it exists, and only read in SCHISM data during 'assemble_forcings' if it doesn't
self._coastal_boundary_depth_df = pd.DataFrame()


def extract_waterbody_connections(rows, target_col, waterbody_null=-9999):
"""Extract waterbody mapping from dataframe.
Expand Down
19 changes: 15 additions & 4 deletions src/troute-nwm/src/nwm_routing/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import troute.nhd_io as nhd_io
from build_tests import parity_check
import logging

from troute.nhd_io import updated_flowveldepth

LOG = logging.getLogger('')

Expand Down Expand Up @@ -47,7 +47,7 @@ def _reindex_lake_to_link_id(target_df, crosswalk):
return target_df


def _parquet_output_format_converter(df, start_datetime, dt, configuration, prefix_ids):
def _parquet_output_format_converter(df, start_datetime, dt, configuration, prefix_ids, nexus_dict):
'''
Utility function for convert flowveldepth dataframe
to a timeseries and to match parquet input format
Expand All @@ -64,13 +64,24 @@ def _parquet_output_format_converter(df, start_datetime, dt, configuration, pref
--------
- timeseries_df (DataFrame): Converted timeseries data frame
'''
nex_id = {}
if prefix_ids == 'nex' and nexus_dict:
for key, val in nexus_dict.items():
nex_key = int(key.split('-')[-1])
nex_id[nex_key] = [int(v.split('-')[-1]) for v in val]

df = updated_flowveldepth(df, nex_id, seg_id = list(), mask_list = None)
df = df.reset_index().drop('Type', axis=1).set_index('featureID')
variable_to_name_map = {"q": "streamflow", "d": "depth", "v": "velocity"}
variable_to_units_map = {"streamflow": "m3/s", "velocity": "m/s", "depth": "m"}

# Prepare the location_id with prefix
df.index.name = 'location_id'
df.reset_index(inplace=True)
location_ids = prefix_ids + '-' + df['location_id'].astype(str)
if nexus_dict:
location_ids = prefix_ids + '-' + df['location_id'].astype(str)
else:
location_ids = df['location_id'].astype(str)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a breaking change. For example if your configuration looked like:

output_parameters:
  parquet_output:
    parquet_output_folder: output/
    prefix_ids: wb

The location_id column would no longer be prefixed with wb- (e.g. wb-111 would now be 111).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate more? I think it still have the prefix if we change the prefix_ids to wb
image


# Flatten the dataframe using NumPy
num_locations = df.shape[0]
Expand Down Expand Up @@ -502,7 +513,7 @@ def nwm_output_generator(
configuration = output_parameters["parquet_output"].get("configuration")
prefix_ids = output_parameters["parquet_output"].get("prefix_ids")
timeseries_df = _parquet_output_format_converter(flowveldepth, restart_parameters.get("start_datetime"), dt,
configuration, prefix_ids)
configuration, prefix_ids, nexus_dict)

parquet_output_segments_str = [prefix_ids + '-' + str(segment) for segment in parquet_output_segments]
timeseries_df.loc[timeseries_df['location_id'].isin(parquet_output_segments_str)].to_parquet(
Expand Down
12 changes: 6 additions & 6 deletions test/LowerColorado_TX/test_AnA_V4_NHD.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ compute_parameters:
reservoir_rfc_forecasts_lookback_hours : 48

#--------------------------------------------------------------------------------
output_parameters:
# output_parameters:
# #----------
# test_output: output/lcr_flowveldepth.pkl
# lite_restart:
Expand All @@ -124,10 +124,10 @@ output_parameters:
# stream_output_time: 1 #[hr]
# stream_output_type: '.nc' #please select only between netcdf '.nc' or '.csv' or '.pkl'
# stream_output_internal_frequency: 30 #[min] it should be order of 5 minutes. For instance if you want to output every hour put 60
parquet_output:
#---------
parquet_output_folder: output/
configuration: short_range
prefix_ids: nex
# parquet_output:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate on why this was commented out?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to simplify the yaml file. So if user need that, can uncomment it

# #---------
# parquet_output_folder: output/
# configuration: short_range
# prefix_ids: nex


Loading