Skip to content

Commit

Permalink
add model inputs and outputs to flow output (#644)
Browse files Browse the repository at this point in the history
Fixes #199.

The flow sparse matrix is currently only used for flows on edges between
nodes. To be able to make a water balance, this adds the boundary
condition flows onto the diagonal, with + for adding water to the model,
and - for removing water.

For a graph, an entry onto the diagonal is a self-loop, and edge from a
vertex onto itself. We don't actually have an edge_id for this, so in
the [flow
output](https://deltares.github.io/Ribasim/core/usage.html#edge) this is
`missing`, and `from_node_id` and `to_node_id` are the same.

Still have to
- [x] clean up the PR
- [x] add tests
- [x] ask @SouthEndMusic if he accepts me removing explicit `flow = 0`
statement on inactive nodes (`flow` and `du` are set to 0 at the start
of `water_balance!`, so it's not needed)
- [x] check if QGIS likes this
- [x] update addnode.qmd
- [x] make an issue on the horrible way we currently check flows in the
julia tests, (done using the new `Ribasim.flow_table(model)` function
from this PR)
- [x] make an issue for
#644 (comment)

But welcome to hear early feedback.

We could add these self-loops to the actual `flow_graph` as well, but I
feel like it would be annoying having to always filter these out of
`inneighbors` and `outneighbors`, so this adds the self-loops only to
the flow sparse matrix. They are only used for output.

The QGIS time series visualization just ignores this currently, which is
fine:

![image](https://github.com/Deltares/Ribasim/assets/4471859/a30f1ea9-6415-43cc-83e9-c832537232ce)
  • Loading branch information
visr authored Oct 11, 2023
1 parent 3639313 commit bab694a
Show file tree
Hide file tree
Showing 10 changed files with 196 additions and 89 deletions.
4 changes: 3 additions & 1 deletion core/Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ CodecZstd = "0.7,0.8"
ComponentArrays = "0.13.14, 0.14, 0.15"
Configurations = "0.17"
DBInterface = "2.4"
DataFrames = "1.4"
DataInterpolations = "3.7, 4"
DataStructures = "0.18"
Dictionaries = "0.3.25"
Expand All @@ -66,6 +67,7 @@ julia = "1.9"
[extras]
Aqua = "4c88cf16-eb10-579e-8560-4a9242c79595"
CSV = "336ed68f-0bac-5ca0-87d4-7b16caf5d00b"
DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0"
Documenter = "e30172f5-a6a5-5a46-863b-614d45cd2de4"
IOCapture = "b5f81e59-6552-4d32-b1f0-c071b021bf89"
Logging = "56ddb016-857b-54e1-b83d-db4d58db5568"
Expand All @@ -76,4 +78,4 @@ Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
TestReports = "dcd651b4-b50a-5b6b-8f22-87e9f253a252"

[targets]
test = ["Aqua", "CSV", "Documenter", "IOCapture", "Logging", "SafeTestsets", "TerminalLoggers", "Test", "TestReports", "TOML"]
test = ["Aqua", "CSV", "DataFrames", "Documenter", "IOCapture", "Logging", "SafeTestsets", "TerminalLoggers", "Test", "TestReports", "TOML"]
43 changes: 26 additions & 17 deletions core/src/bmi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,25 @@ end
Write all output to the configured output files.
"""
function BMI.finalize(model::Model)::Model
compress = get_compressor(model.config.output)
write_basin_output(model, compress)
write_flow_output(model, compress)
write_discrete_control_output(model, compress)
(; config) = model
(; output) = model.config
compress = get_compressor(output)

# basin
table = basin_table(model)
path = output_path(config, output.basin)
write_arrow(path, table, compress)

# flow
table = flow_table(model)
path = output_path(config, output.flow)
write_arrow(path, table, compress)

# discrete control
table = discrete_control_table(model)
path = output_path(config, output.control)
write_arrow(path, table, compress)

@debug "Wrote output."
return model
end
Expand Down Expand Up @@ -198,20 +213,20 @@ function create_callbacks(
saveat,
)::Tuple{CallbackSet, SavedValues{Float64, Vector{Float64}}}
(; starttime, basin, tabulated_rating_curve, discrete_control) = parameters
callbacks = SciMLBase.DECallback[]

tstops = get_tstops(basin.time.time, starttime)
basin_cb = PresetTimeCallback(tstops, update_basin)
push!(callbacks, basin_cb)

tstops = get_tstops(tabulated_rating_curve.time.time, starttime)
tabulated_rating_curve_cb = PresetTimeCallback(tstops, update_tabulated_rating_curve!)
push!(callbacks, tabulated_rating_curve_cb)

# add a single time step's contribution to the water balance step's totals
# trackwb_cb = FunctionCallingCallback(track_waterbalance!)
# flows: save the flows over time, as a Vector of the nonzeros(flow)

# save the flows over time, as a Vector of the nonzeros(flow)
saved_flow = SavedValues(Float64, Vector{Float64})

save_flow_cb = SavingCallback(save_flow, saved_flow; saveat, save_start = false)
push!(callbacks, save_flow_cb)

n_conditions = length(discrete_control.node_id)
if n_conditions > 0
Expand All @@ -221,15 +236,9 @@ function create_callbacks(
discrete_control_affect_downcrossing!,
n_conditions,
)
callback = CallbackSet(
save_flow_cb,
basin_cb,
tabulated_rating_curve_cb,
discrete_control_cb,
)
else
callback = CallbackSet(save_flow_cb, basin_cb, tabulated_rating_curve_cb)
push!(callbacks, discrete_control_cb)
end
callback = CallbackSet(callbacks...)

return callback, saved_flow
end
Expand Down
13 changes: 13 additions & 0 deletions core/src/create.jl
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,9 @@ function static_and_time_node_ids(
return static_node_ids, time_node_ids, node_ids, node_names, !errors
end

const nonconservative_nodetypes =
Set{String}(["Basin", "LevelBoundary", "FlowBoundary", "Terminal", "User"])

function Connectivity(db::DB, config::Config, chunk_size::Int)::Connectivity
if !valid_edge_types(db)
error("Invalid edge types found.")
Expand All @@ -210,6 +213,16 @@ function Connectivity(db::DB, config::Config, chunk_size::Int)::Connectivity
edge_ids_flow_inv = Dictionary(values(edge_ids_flow), keys(edge_ids_flow))

flow = adjacency_matrix(graph_flow, Float64)
# Add a self-loop, i.e. an entry on the diagonal, for all non-conservative node types.
# This is used to store the gain (positive) or loss (negative) for the water balance.
# Note that this only affects the sparsity structure.
# We want to do it here to avoid changing that during the simulation and keeping it predictable,
# e.g. if we wouldn't do this, inactive nodes can appear if control turns them on during runtime.
for (i, nodetype) in enumerate(get_nodetypes(db))
if nodetype in nonconservative_nodetypes
flow[i, i] = 1.0
end
end
flow .= 0.0

if config.solver.autodiff
Expand Down
58 changes: 30 additions & 28 deletions core/src/io.jl
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
function get_nodetypes(db::DB)::Vector{String}
return only(execute(columntable, db, "SELECT type FROM Node ORDER BY fid"))
end

function get_ids(db::DB)::Vector{Int}
return only(execute(columntable, db, "SELECT fid FROM Node ORDER BY fid"))
end
Expand Down Expand Up @@ -166,59 +170,57 @@ function get_storages_and_levels(
return (; time = tsteps, node_id, storage, level)
end

function write_basin_output(model::Model, compress)
(; config, integrator) = model
(; p) = integrator

"Create the basin result table from the saved data"
function basin_table(model::Model)::NamedTuple
data = get_storages_and_levels(model)
nbasin = length(data.node_id)
ntsteps = length(data.time)

time = convert.(Arrow.DATETIME, repeat(data.time; inner = nbasin))
time = repeat(data.time; inner = nbasin)
node_id = repeat(data.node_id; outer = ntsteps)

basin = (; time, node_id, storage = vec(data.storage), level = vec(data.level))
path = output_path(config, config.output.basin)
mkpath(dirname(path))
Arrow.write(path, basin; compress)
return (; time, node_id, storage = vec(data.storage), level = vec(data.level))
end

function write_flow_output(model::Model, compress)
"Create a flow result table from the saved data"
function flow_table(model::Model)::NamedTuple
(; config, saved_flow, integrator) = model
(; t, saveval) = saved_flow
(; connectivity) = integrator.p

I, J, _ = findnz(get_tmp(connectivity.flow, integrator.u))
unique_edge_ids = [connectivity.edge_ids_flow[ij] for ij in zip(I, J)]
# self-loops have no edge ID
unique_edge_ids = [get(connectivity.edge_ids_flow, ij, missing) for ij in zip(I, J)]
nflow = length(I)
ntsteps = length(t)

time =
convert.(
Arrow.DATETIME,
repeat(datetime_since.(t, config.starttime); inner = nflow),
)

time = repeat(datetime_since.(t, config.starttime); inner = nflow)
edge_id = repeat(unique_edge_ids; outer = ntsteps)
from_node_id = repeat(I; outer = ntsteps)
to_node_id = repeat(J; outer = ntsteps)
flow = FlatVector(saveval)

table = (; time, edge_id, from_node_id, to_node_id, flow)
path = output_path(config, config.output.flow)
mkpath(dirname(path))
Arrow.write(path, table; compress)
return (; time, edge_id, from_node_id, to_node_id, flow)
end

function write_discrete_control_output(model::Model, compress)
config = model.config
record = model.integrator.p.discrete_control.record
"Create a discrete control result table from the saved data"
function discrete_control_table(model::Model)::NamedTuple
(; config) = model
(; record) = model.integrator.p.discrete_control

time = convert.(Arrow.DATETIME, datetime_since.(record.time, config.starttime))

table = (; time, record.control_node_id, record.truth_state, record.control_state)
time = datetime_since.(record.time, config.starttime)
return (; time, record.control_node_id, record.truth_state, record.control_state)
end

path = output_path(config, config.output.control)
"Write a result table to disk as an Arrow file"
function write_arrow(
path::AbstractString,
table::NamedTuple,
compress::TranscodingStreams.Codec,
)
# ensure DateTime is encoded in a compatible manner
# https://github.com/apache/arrow-julia/issues/303
table = merge(table, (; time = convert.(Arrow.DATETIME, table.time)))
mkpath(dirname(path))
Arrow.write(path, table; compress)
end
8 changes: 7 additions & 1 deletion core/src/lib.jl
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,13 @@ function Model(config::Config)::Model
return BMI.initialize(Model, config::Config)
end

timesteps(model::Model) = model.integrator.sol.t
"Get all saved times in seconds since start"
timesteps(model::Model)::Vector{Float64} = model.integrator.sol.t

"Get all saved times as a Vector{DateTime}"
function datetimes(model::Model)::Vector{DateTime}
return datetime_since.(timesteps(model), model.config.starttime)
end

function Base.show(io::IO, model::Model)
(; config, integrator) = model
Expand Down
Loading

0 comments on commit bab694a

Please sign in to comment.