Skip to content

Commit

Permalink
Don't require unique node IDs (#1513)
Browse files Browse the repository at this point in the history
Fixes #1318
Fixes #1256

@evetion I didn't touch the Delwaq `node_lookup`. If only Basins are in
the graph, then you can still use an ID as a unique index.

The #1256 fix in 72dcf24 is not tested,
since Ribasim-Python stops us from creating such an invalid model. I
manually confirmed it works by duplicating rows in QGIS.

The `main` methods were needed to fix `pixi run ribasim-core`.

---------

Co-authored-by: Maarten Pronk <[email protected]>
Co-authored-by: Maarten Pronk <[email protected]>
  • Loading branch information
3 people authored Jun 5, 2024
1 parent 7524a19 commit 89b8574
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 68 deletions.
3 changes: 3 additions & 0 deletions core/src/main.jl
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ function run(config::Config)::Model
return model
end

main(ARGS::Vector{String})::Cint = main(only(ARGS))
main()::Cint = main(ARGS)

"""
main(toml_path::AbstractString)::Cint
main(ARGS::Vector{String})::Cint
Expand Down
10 changes: 3 additions & 7 deletions core/src/model.jl
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ function Model(config::Config)::Model
# so we can directly close it again.
db = SQLite.DB(db_path)

if !valid_nodes(db)
error("Invalid nodes found.")
end
if !valid_edge_types(db)
error("Invalid edge types found.")
end
Expand Down Expand Up @@ -101,13 +104,6 @@ function Model(config::Config)::Model
state = load_structvector(db, config, BasinStateV1)
n = length(get_ids(db, "Basin"))

sql = "SELECT node_id FROM Node ORDER BY node_id"
node_id = only(execute(columntable, db, sql))
if !allunique(node_id)
error(
"Node IDs need to be globally unique until https://github.com/Deltares/Ribasim/issues/1262 is fixed.",
)
end
finally
# always close the database, also in case of an error
close(db)
Expand Down
15 changes: 15 additions & 0 deletions core/src/validation.jl
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,21 @@ function valid_config(config::Config)::Bool
return !errors
end

function valid_nodes(db::DB)::Bool
errors = false

sql = "SELECT node_type, node_id FROM Node GROUP BY node_type, node_id HAVING COUNT(*) > 1"
node_type, node_id = execute(columntable, db, sql)

for (node_type, node_id) in zip(node_type, node_id)
errors = true
id = NodeID(node_type, node_id)
@error "Multiple occurrences of node $id found in Node table."
end

return !errors
end

"""
Test for each node given its node type whether the nodes that
# are downstream ('down-edge') of this node are of an allowed type
Expand Down
4 changes: 2 additions & 2 deletions core/test/run_models_test.jl
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@
@testset "Results values" begin
@test flow.time[1] == DateTime(2020)
@test coalesce.(flow.edge_id[1:2], -1) == [0, 1]
@test flow.from_node_id[1:2] == [6, 0]
@test flow.to_node_id[1:2] == [0, 922]
@test flow.from_node_id[1:2] == [6, 6]
@test flow.to_node_id[1:2] == [6, 2147483647]

@test basin.storage[1] 1.0
@test basin.level[1] 0.044711584
Expand Down
9 changes: 7 additions & 2 deletions coupling/delwaq/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def generate(toml_path: Path) -> tuple[nx.DiGraph, set[str]]:
for row in model.node_table().df.itertuples():
if row.node_type not in ribasim.geometry.edge.SPATIALCONTROLNODETYPES:
G.add_node(
row.node_id,
row.node_type + str(row.node_id),
type=row.node_type,
id=row.node_id,
x=row.geometry.x,
Expand All @@ -53,7 +53,12 @@ def generate(toml_path: Path) -> tuple[nx.DiGraph, set[str]]:
)
for row in model.edge.df.itertuples():
if row.edge_type == "flow":
G.add_edge(row.from_node_id, row.to_node_id, id=[row.Index], duplicate=None)
G.add_edge(
row.from_node_type + str(row.from_node_id),
row.to_node_type + str(row.to_node_id),
id=[row.Index],
duplicate=None,
)

# Simplify network, only keeping Basins and Boundaries.
# We find an unwanted node, remove it,
Expand Down
42 changes: 17 additions & 25 deletions python/ribasim/ribasim/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
MissingOptionalModule,
_edge_lookup,
_node_lookup,
_node_lookup_numpy,
_time_in_ns,
)

Expand Down Expand Up @@ -174,16 +173,7 @@ def _save(self, directory: DirectoryPath, input_dir: DirectoryPath):
db_path.unlink(missing_ok=True)
context_file_loading.get()["database"] = db_path
self.edge._save(directory, input_dir)

node = self.node_table()
assert node.df is not None
# Temporarily require unique node_id for #1262
# and copy them to the fid for #1306.
if not node.df["node_id"].is_unique:
raise ValueError("node_id must be unique")
node.df.set_index("node_id", drop=False, inplace=True)
node.df.index.name = "fid"
node.df.sort_index(inplace=True)
node._save(directory, input_dir)

for sub in self._nodes():
Expand All @@ -209,7 +199,7 @@ def _apply_crs_function(self, function_name: str, crs: str) -> None:
self.crs = crs

def node_table(self) -> NodeTable:
"""Compute the full NodeTable from all node types."""
"""Compute the full sorted NodeTable from all node types."""
df_chunks = [node.node.df.set_crs(self.crs) for node in self._nodes()] # type:ignore
df = pd.concat(df_chunks, ignore_index=True)
node_table = NodeTable(df=df)
Expand Down Expand Up @@ -402,12 +392,6 @@ def to_xugrid(self, add_flow: bool = False, add_allocation: bool = False):
node_df = self.node_table().df
assert node_df is not None

# This will need to be adopted for locally unique node IDs,
# otherwise the `node_lookup` with `argsort` is not correct.
if not node_df.node_id.is_unique:
raise ValueError("node_id must be unique")
node_df.sort_values("node_id", inplace=True)

assert self.edge.df is not None
edge_df = self.edge.df.copy()
# We assume only the flow network is of interest.
Expand All @@ -417,16 +401,22 @@ def to_xugrid(self, add_flow: bool = False, add_allocation: bool = False):
edge_id = edge_df.index.to_numpy()
from_node_id = edge_df.from_node_id.to_numpy()
to_node_id = edge_df.to_node_id.to_numpy()
node_lookup = _node_lookup_numpy(node_id)
node_lookup = _node_lookup(node_df)
from_node_index = pd.MultiIndex.from_frame(
edge_df[["from_node_type", "from_node_id"]]
)
to_node_index = pd.MultiIndex.from_frame(
edge_df[["to_node_type", "to_node_id"]]
)

grid = xugrid.Ugrid1d(
node_x=node_df.geometry.x,
node_y=node_df.geometry.y,
fill_value=-1,
edge_node_connectivity=np.column_stack(
(
node_lookup[from_node_id],
node_lookup[to_node_id],
node_lookup.loc[from_node_index],
node_lookup.loc[to_node_index],
)
),
name="ribasim",
Expand All @@ -444,7 +434,7 @@ def to_xugrid(self, add_flow: bool = False, add_allocation: bool = False):
uds = uds.assign_coords(to_node_id=(edge_dim, to_node_id))

if add_flow:
uds = self._add_flow(uds)
uds = self._add_flow(uds, node_lookup)
elif add_allocation:
uds = self._add_allocation(uds)

Expand All @@ -456,7 +446,7 @@ def _checked_toml_path(self) -> Path:
raise FileNotFoundError("Model must be written to disk to add results.")
return toml_path

def _add_flow(self, uds):
def _add_flow(self, uds, node_lookup):
toml_path = self._checked_toml_path()

results_path = toml_path.parent / self.results_dir
Expand All @@ -477,17 +467,19 @@ def _add_flow(self, uds):
# add the xugrid dimension indices to the dataframes
edge_dim = uds.grid.edge_dimension
node_dim = uds.grid.node_dimension
node_lookup = _node_lookup(uds)
edge_lookup = _edge_lookup(uds)
flow_df[edge_dim] = edge_lookup[flow_df["edge_id"]].to_numpy()
basin_df[node_dim] = node_lookup[basin_df["node_id"]].to_numpy()
# Use a MultiIndex to ensure the lookup results is the same length as basin_df
basin_df["node_type"] = "Basin"
multi_index = pd.MultiIndex.from_frame(basin_df[["node_type", "node_id"]])
basin_df[node_dim] = node_lookup.loc[multi_index].to_numpy()

# add flow results to the UgridDataset
flow_da = flow_df.set_index(["time", edge_dim])["flow_rate"].to_xarray()
uds[flow_da.name] = flow_da

# add basin results to the UgridDataset
basin_df.drop(columns=["node_id"], inplace=True)
basin_df.drop(columns=["node_type", "node_id"], inplace=True)
basin_ds = basin_df.set_index(["time", node_dim]).to_xarray()

for var_name, da in basin_ds.data_vars.items():
Expand Down
25 changes: 5 additions & 20 deletions python/ribasim/ribasim/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import re

import numpy as np
import pandas as pd
from pandera.dtypes import Int32
from pandera.typing import Series
Expand All @@ -22,28 +21,14 @@ def __getattr__(self, name):
raise ImportError(f"{self.name} is required for this functionality")


def _node_lookup_numpy(node_id) -> Series[Int32]:
"""Create a lookup table from from node_id to the node dimension index.
def _node_lookup(df) -> Series[Int32]:
"""Create a lookup table from from (node_type, node_id) to the node dimension index.
Used when adding data onto the nodes of an xugrid dataset.
"""
return pd.Series(
index=node_id,
data=node_id.argsort().astype(np.int32),
name="node_index",
)


def _node_lookup(uds) -> Series[Int32]:
"""Create a lookup table from from node_id to the node dimension index.
Used when adding data onto the nodes of an xugrid dataset.
"""
return pd.Series(
index=uds["node_id"],
data=uds[uds.grid.node_dimension],
name="node_index",
)
return df.reset_index(names="node_index").set_index(["node_type", "node_id"])[
"node_index"
]


def _edge_lookup(uds) -> Series[Int32]:
Expand Down
16 changes: 9 additions & 7 deletions python/ribasim/tests/test_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from ribasim.geometry.edge import NodeData
from ribasim.input_base import esc_id
from ribasim.model import Model
from ribasim_testmodels import basic_model, trivial_model
from shapely import Point


Expand Down Expand Up @@ -221,8 +222,9 @@ def test_indexing(basic):
model.basin.time[1]


def test_xugrid(basic, tmp_path):
uds = basic.to_xugrid(add_flow=False)
@pytest.mark.parametrize("model", [basic_model(), trivial_model()])
def test_xugrid(model, tmp_path):
uds = model.to_xugrid(add_flow=False)
assert isinstance(uds, xugrid.UgridDataset)
assert uds.grid.edge_dimension == "ribasim_nEdges"
assert uds.grid.node_dimension == "ribasim_nNodes"
Expand All @@ -233,15 +235,15 @@ def test_xugrid(basic, tmp_path):
assert uds.attrs["Conventions"] == "CF-1.9 UGRID-1.0"

with pytest.raises(FileNotFoundError, match="Model must be written to disk"):
basic.to_xugrid(add_flow=True)
model.to_xugrid(add_flow=True)

basic.write(tmp_path / "ribasim.toml")
model.write(tmp_path / "ribasim.toml")
with pytest.raises(FileNotFoundError, match="Cannot find results"):
basic.to_xugrid(add_flow=True)
model.to_xugrid(add_flow=True)
with pytest.raises(FileNotFoundError, match="or allocation is not used"):
basic.to_xugrid(add_flow=False, add_allocation=True)
model.to_xugrid(add_flow=False, add_allocation=True)
with pytest.raises(ValueError, match="Cannot add both allocation and flow results"):
basic.to_xugrid(add_flow=True, add_allocation=True)
model.to_xugrid(add_flow=True, add_allocation=True)


def test_to_crs(bucket: Model):
Expand Down
10 changes: 5 additions & 5 deletions python/ribasim_testmodels/ribasim_testmodels/trivial.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,20 @@ def trivial_model() -> Model:
],
)

# TODO largest signed 64 bit integer, to check encoding
terminal_id = 922 # 3372036854775807
# TODO largest signed 32 bit integer, to check encoding
terminal_id = 2147483647
model.terminal.add(Node(terminal_id, Point(500, 200)))
model.tabulated_rating_curve.add(
Node(0, Point(450, 200)),
Node(6, Point(450, 200)),
[tabulated_rating_curve.Static(level=[0.0, 1.0], flow_rate=[0.0, 10 / 86400])],
)

model.edge.add(
model.basin[6],
model.tabulated_rating_curve[0],
model.tabulated_rating_curve[6],
)
model.edge.add(
model.tabulated_rating_curve[0],
model.tabulated_rating_curve[6],
model.terminal[terminal_id],
)

Expand Down

0 comments on commit 89b8574

Please sign in to comment.