From 89b8574a3ccf7e7ae8c669394a269c2569b9eca3 Mon Sep 17 00:00:00 2001 From: Martijn Visser Date: Wed, 5 Jun 2024 11:08:18 +0200 Subject: [PATCH] Don't require unique node IDs (#1513) Fixes #1318 Fixes #1256 @evetion I didn't touch the Delwaq `node_lookup`. If only Basins are in the graph, then you can still use an ID as a unique index. The #1256 fix in 72dcf24661fc82ee84d38666a43d113305a43feb is not tested, since Ribasim-Python stops us from creating such an invalid model. I manually confirmed it works by duplicating rows in QGIS. The `main` methods were needed to fix `pixi run ribasim-core`. --------- Co-authored-by: Maarten Pronk <8655030+evetion@users.noreply.github.com> Co-authored-by: Maarten Pronk --- core/src/main.jl | 3 ++ core/src/model.jl | 10 ++--- core/src/validation.jl | 15 +++++++ core/test/run_models_test.jl | 4 +- coupling/delwaq/generate.py | 9 +++- python/ribasim/ribasim/model.py | 42 ++++++++----------- python/ribasim/ribasim/utils.py | 25 +++-------- python/ribasim/tests/test_model.py | 16 +++---- .../ribasim_testmodels/trivial.py | 10 ++--- 9 files changed, 66 insertions(+), 68 deletions(-) diff --git a/core/src/main.jl b/core/src/main.jl index f8b1ee004..8fedf50bf 100644 --- a/core/src/main.jl +++ b/core/src/main.jl @@ -14,6 +14,9 @@ function run(config::Config)::Model return model end +main(ARGS::Vector{String})::Cint = main(only(ARGS)) +main()::Cint = main(ARGS) + """ main(toml_path::AbstractString)::Cint main(ARGS::Vector{String})::Cint diff --git a/core/src/model.jl b/core/src/model.jl index 175c7ee47..11f2059ed 100644 --- a/core/src/model.jl +++ b/core/src/model.jl @@ -51,6 +51,9 @@ function Model(config::Config)::Model # so we can directly close it again. db = SQLite.DB(db_path) + if !valid_nodes(db) + error("Invalid nodes found.") + end if !valid_edge_types(db) error("Invalid edge types found.") end @@ -101,13 +104,6 @@ function Model(config::Config)::Model state = load_structvector(db, config, BasinStateV1) n = length(get_ids(db, "Basin")) - sql = "SELECT node_id FROM Node ORDER BY node_id" - node_id = only(execute(columntable, db, sql)) - if !allunique(node_id) - error( - "Node IDs need to be globally unique until https://github.com/Deltares/Ribasim/issues/1262 is fixed.", - ) - end finally # always close the database, also in case of an error close(db) diff --git a/core/src/validation.jl b/core/src/validation.jl index 40ecdee1b..1b5ac809c 100644 --- a/core/src/validation.jl +++ b/core/src/validation.jl @@ -162,6 +162,21 @@ function valid_config(config::Config)::Bool return !errors end +function valid_nodes(db::DB)::Bool + errors = false + + sql = "SELECT node_type, node_id FROM Node GROUP BY node_type, node_id HAVING COUNT(*) > 1" + node_type, node_id = execute(columntable, db, sql) + + for (node_type, node_id) in zip(node_type, node_id) + errors = true + id = NodeID(node_type, node_id) + @error "Multiple occurrences of node $id found in Node table." + end + + return !errors +end + """ Test for each node given its node type whether the nodes that # are downstream ('down-edge') of this node are of an allowed type diff --git a/core/test/run_models_test.jl b/core/test/run_models_test.jl index 45fa53f73..b1a570a4a 100644 --- a/core/test/run_models_test.jl +++ b/core/test/run_models_test.jl @@ -128,8 +128,8 @@ @testset "Results values" begin @test flow.time[1] == DateTime(2020) @test coalesce.(flow.edge_id[1:2], -1) == [0, 1] - @test flow.from_node_id[1:2] == [6, 0] - @test flow.to_node_id[1:2] == [0, 922] + @test flow.from_node_id[1:2] == [6, 6] + @test flow.to_node_id[1:2] == [6, 2147483647] @test basin.storage[1] ≈ 1.0 @test basin.level[1] ≈ 0.044711584 diff --git a/coupling/delwaq/generate.py b/coupling/delwaq/generate.py index df4ce90f6..583c26458 100644 --- a/coupling/delwaq/generate.py +++ b/coupling/delwaq/generate.py @@ -44,7 +44,7 @@ def generate(toml_path: Path) -> tuple[nx.DiGraph, set[str]]: for row in model.node_table().df.itertuples(): if row.node_type not in ribasim.geometry.edge.SPATIALCONTROLNODETYPES: G.add_node( - row.node_id, + row.node_type + str(row.node_id), type=row.node_type, id=row.node_id, x=row.geometry.x, @@ -53,7 +53,12 @@ def generate(toml_path: Path) -> tuple[nx.DiGraph, set[str]]: ) for row in model.edge.df.itertuples(): if row.edge_type == "flow": - G.add_edge(row.from_node_id, row.to_node_id, id=[row.Index], duplicate=None) + G.add_edge( + row.from_node_type + str(row.from_node_id), + row.to_node_type + str(row.to_node_id), + id=[row.Index], + duplicate=None, + ) # Simplify network, only keeping Basins and Boundaries. # We find an unwanted node, remove it, diff --git a/python/ribasim/ribasim/model.py b/python/ribasim/ribasim/model.py index 6c01525d7..b214ddd3f 100644 --- a/python/ribasim/ribasim/model.py +++ b/python/ribasim/ribasim/model.py @@ -52,7 +52,6 @@ MissingOptionalModule, _edge_lookup, _node_lookup, - _node_lookup_numpy, _time_in_ns, ) @@ -174,16 +173,7 @@ def _save(self, directory: DirectoryPath, input_dir: DirectoryPath): db_path.unlink(missing_ok=True) context_file_loading.get()["database"] = db_path self.edge._save(directory, input_dir) - node = self.node_table() - assert node.df is not None - # Temporarily require unique node_id for #1262 - # and copy them to the fid for #1306. - if not node.df["node_id"].is_unique: - raise ValueError("node_id must be unique") - node.df.set_index("node_id", drop=False, inplace=True) - node.df.index.name = "fid" - node.df.sort_index(inplace=True) node._save(directory, input_dir) for sub in self._nodes(): @@ -209,7 +199,7 @@ def _apply_crs_function(self, function_name: str, crs: str) -> None: self.crs = crs def node_table(self) -> NodeTable: - """Compute the full NodeTable from all node types.""" + """Compute the full sorted NodeTable from all node types.""" df_chunks = [node.node.df.set_crs(self.crs) for node in self._nodes()] # type:ignore df = pd.concat(df_chunks, ignore_index=True) node_table = NodeTable(df=df) @@ -402,12 +392,6 @@ def to_xugrid(self, add_flow: bool = False, add_allocation: bool = False): node_df = self.node_table().df assert node_df is not None - # This will need to be adopted for locally unique node IDs, - # otherwise the `node_lookup` with `argsort` is not correct. - if not node_df.node_id.is_unique: - raise ValueError("node_id must be unique") - node_df.sort_values("node_id", inplace=True) - assert self.edge.df is not None edge_df = self.edge.df.copy() # We assume only the flow network is of interest. @@ -417,7 +401,13 @@ def to_xugrid(self, add_flow: bool = False, add_allocation: bool = False): edge_id = edge_df.index.to_numpy() from_node_id = edge_df.from_node_id.to_numpy() to_node_id = edge_df.to_node_id.to_numpy() - node_lookup = _node_lookup_numpy(node_id) + node_lookup = _node_lookup(node_df) + from_node_index = pd.MultiIndex.from_frame( + edge_df[["from_node_type", "from_node_id"]] + ) + to_node_index = pd.MultiIndex.from_frame( + edge_df[["to_node_type", "to_node_id"]] + ) grid = xugrid.Ugrid1d( node_x=node_df.geometry.x, @@ -425,8 +415,8 @@ def to_xugrid(self, add_flow: bool = False, add_allocation: bool = False): fill_value=-1, edge_node_connectivity=np.column_stack( ( - node_lookup[from_node_id], - node_lookup[to_node_id], + node_lookup.loc[from_node_index], + node_lookup.loc[to_node_index], ) ), name="ribasim", @@ -444,7 +434,7 @@ def to_xugrid(self, add_flow: bool = False, add_allocation: bool = False): uds = uds.assign_coords(to_node_id=(edge_dim, to_node_id)) if add_flow: - uds = self._add_flow(uds) + uds = self._add_flow(uds, node_lookup) elif add_allocation: uds = self._add_allocation(uds) @@ -456,7 +446,7 @@ def _checked_toml_path(self) -> Path: raise FileNotFoundError("Model must be written to disk to add results.") return toml_path - def _add_flow(self, uds): + def _add_flow(self, uds, node_lookup): toml_path = self._checked_toml_path() results_path = toml_path.parent / self.results_dir @@ -477,17 +467,19 @@ def _add_flow(self, uds): # add the xugrid dimension indices to the dataframes edge_dim = uds.grid.edge_dimension node_dim = uds.grid.node_dimension - node_lookup = _node_lookup(uds) edge_lookup = _edge_lookup(uds) flow_df[edge_dim] = edge_lookup[flow_df["edge_id"]].to_numpy() - basin_df[node_dim] = node_lookup[basin_df["node_id"]].to_numpy() + # Use a MultiIndex to ensure the lookup results is the same length as basin_df + basin_df["node_type"] = "Basin" + multi_index = pd.MultiIndex.from_frame(basin_df[["node_type", "node_id"]]) + basin_df[node_dim] = node_lookup.loc[multi_index].to_numpy() # add flow results to the UgridDataset flow_da = flow_df.set_index(["time", edge_dim])["flow_rate"].to_xarray() uds[flow_da.name] = flow_da # add basin results to the UgridDataset - basin_df.drop(columns=["node_id"], inplace=True) + basin_df.drop(columns=["node_type", "node_id"], inplace=True) basin_ds = basin_df.set_index(["time", node_dim]).to_xarray() for var_name, da in basin_ds.data_vars.items(): diff --git a/python/ribasim/ribasim/utils.py b/python/ribasim/ribasim/utils.py index 57fa9a684..b8fd26a02 100644 --- a/python/ribasim/ribasim/utils.py +++ b/python/ribasim/ribasim/utils.py @@ -1,6 +1,5 @@ import re -import numpy as np import pandas as pd from pandera.dtypes import Int32 from pandera.typing import Series @@ -22,28 +21,14 @@ def __getattr__(self, name): raise ImportError(f"{self.name} is required for this functionality") -def _node_lookup_numpy(node_id) -> Series[Int32]: - """Create a lookup table from from node_id to the node dimension index. +def _node_lookup(df) -> Series[Int32]: + """Create a lookup table from from (node_type, node_id) to the node dimension index. Used when adding data onto the nodes of an xugrid dataset. """ - return pd.Series( - index=node_id, - data=node_id.argsort().astype(np.int32), - name="node_index", - ) - - -def _node_lookup(uds) -> Series[Int32]: - """Create a lookup table from from node_id to the node dimension index. - - Used when adding data onto the nodes of an xugrid dataset. - """ - return pd.Series( - index=uds["node_id"], - data=uds[uds.grid.node_dimension], - name="node_index", - ) + return df.reset_index(names="node_index").set_index(["node_type", "node_id"])[ + "node_index" + ] def _edge_lookup(uds) -> Series[Int32]: diff --git a/python/ribasim/tests/test_model.py b/python/ribasim/tests/test_model.py index 433b275c0..c6c91ee06 100644 --- a/python/ribasim/tests/test_model.py +++ b/python/ribasim/tests/test_model.py @@ -12,6 +12,7 @@ from ribasim.geometry.edge import NodeData from ribasim.input_base import esc_id from ribasim.model import Model +from ribasim_testmodels import basic_model, trivial_model from shapely import Point @@ -221,8 +222,9 @@ def test_indexing(basic): model.basin.time[1] -def test_xugrid(basic, tmp_path): - uds = basic.to_xugrid(add_flow=False) +@pytest.mark.parametrize("model", [basic_model(), trivial_model()]) +def test_xugrid(model, tmp_path): + uds = model.to_xugrid(add_flow=False) assert isinstance(uds, xugrid.UgridDataset) assert uds.grid.edge_dimension == "ribasim_nEdges" assert uds.grid.node_dimension == "ribasim_nNodes" @@ -233,15 +235,15 @@ def test_xugrid(basic, tmp_path): assert uds.attrs["Conventions"] == "CF-1.9 UGRID-1.0" with pytest.raises(FileNotFoundError, match="Model must be written to disk"): - basic.to_xugrid(add_flow=True) + model.to_xugrid(add_flow=True) - basic.write(tmp_path / "ribasim.toml") + model.write(tmp_path / "ribasim.toml") with pytest.raises(FileNotFoundError, match="Cannot find results"): - basic.to_xugrid(add_flow=True) + model.to_xugrid(add_flow=True) with pytest.raises(FileNotFoundError, match="or allocation is not used"): - basic.to_xugrid(add_flow=False, add_allocation=True) + model.to_xugrid(add_flow=False, add_allocation=True) with pytest.raises(ValueError, match="Cannot add both allocation and flow results"): - basic.to_xugrid(add_flow=True, add_allocation=True) + model.to_xugrid(add_flow=True, add_allocation=True) def test_to_crs(bucket: Model): diff --git a/python/ribasim_testmodels/ribasim_testmodels/trivial.py b/python/ribasim_testmodels/ribasim_testmodels/trivial.py index ad3669eb3..2864edbd3 100644 --- a/python/ribasim_testmodels/ribasim_testmodels/trivial.py +++ b/python/ribasim_testmodels/ribasim_testmodels/trivial.py @@ -40,20 +40,20 @@ def trivial_model() -> Model: ], ) - # TODO largest signed 64 bit integer, to check encoding - terminal_id = 922 # 3372036854775807 + # TODO largest signed 32 bit integer, to check encoding + terminal_id = 2147483647 model.terminal.add(Node(terminal_id, Point(500, 200))) model.tabulated_rating_curve.add( - Node(0, Point(450, 200)), + Node(6, Point(450, 200)), [tabulated_rating_curve.Static(level=[0.0, 1.0], flow_rate=[0.0, 10 / 86400])], ) model.edge.add( model.basin[6], - model.tabulated_rating_curve[0], + model.tabulated_rating_curve[6], ) model.edge.add( - model.tabulated_rating_curve[0], + model.tabulated_rating_curve[6], model.terminal[terminal_id], )