diff --git a/core/Project.toml b/core/Project.toml index bafffe8e2..d02b7b56f 100644 --- a/core/Project.toml +++ b/core/Project.toml @@ -42,6 +42,7 @@ CodecZstd = "0.7,0.8" ComponentArrays = "0.13.14, 0.14, 0.15" Configurations = "0.17" DBInterface = "2.4" +DataFrames = "1.4" DataInterpolations = "3.7, 4" DataStructures = "0.18" Dictionaries = "0.3.25" @@ -66,6 +67,7 @@ julia = "1.9" [extras] Aqua = "4c88cf16-eb10-579e-8560-4a9242c79595" CSV = "336ed68f-0bac-5ca0-87d4-7b16caf5d00b" +DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0" Documenter = "e30172f5-a6a5-5a46-863b-614d45cd2de4" IOCapture = "b5f81e59-6552-4d32-b1f0-c071b021bf89" Logging = "56ddb016-857b-54e1-b83d-db4d58db5568" @@ -76,4 +78,4 @@ Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" TestReports = "dcd651b4-b50a-5b6b-8f22-87e9f253a252" [targets] -test = ["Aqua", "CSV", "Documenter", "IOCapture", "Logging", "SafeTestsets", "TerminalLoggers", "Test", "TestReports", "TOML"] +test = ["Aqua", "CSV", "DataFrames", "Documenter", "IOCapture", "Logging", "SafeTestsets", "TerminalLoggers", "Test", "TestReports", "TOML"] diff --git a/core/src/bmi.jl b/core/src/bmi.jl index c35e820ed..13ce7a70e 100644 --- a/core/src/bmi.jl +++ b/core/src/bmi.jl @@ -159,10 +159,25 @@ end Write all output to the configured output files. """ function BMI.finalize(model::Model)::Model - compress = get_compressor(model.config.output) - write_basin_output(model, compress) - write_flow_output(model, compress) - write_discrete_control_output(model, compress) + (; config) = model + (; output) = model.config + compress = get_compressor(output) + + # basin + table = basin_table(model) + path = output_path(config, output.basin) + write_arrow(path, table, compress) + + # flow + table = flow_table(model) + path = output_path(config, output.flow) + write_arrow(path, table, compress) + + # discrete control + table = discrete_control_table(model) + path = output_path(config, output.control) + write_arrow(path, table, compress) + @debug "Wrote output." return model end @@ -198,20 +213,20 @@ function create_callbacks( saveat, )::Tuple{CallbackSet, SavedValues{Float64, Vector{Float64}}} (; starttime, basin, tabulated_rating_curve, discrete_control) = parameters + callbacks = SciMLBase.DECallback[] tstops = get_tstops(basin.time.time, starttime) basin_cb = PresetTimeCallback(tstops, update_basin) + push!(callbacks, basin_cb) tstops = get_tstops(tabulated_rating_curve.time.time, starttime) tabulated_rating_curve_cb = PresetTimeCallback(tstops, update_tabulated_rating_curve!) + push!(callbacks, tabulated_rating_curve_cb) - # add a single time step's contribution to the water balance step's totals - # trackwb_cb = FunctionCallingCallback(track_waterbalance!) - # flows: save the flows over time, as a Vector of the nonzeros(flow) - + # save the flows over time, as a Vector of the nonzeros(flow) saved_flow = SavedValues(Float64, Vector{Float64}) - save_flow_cb = SavingCallback(save_flow, saved_flow; saveat, save_start = false) + push!(callbacks, save_flow_cb) n_conditions = length(discrete_control.node_id) if n_conditions > 0 @@ -221,15 +236,9 @@ function create_callbacks( discrete_control_affect_downcrossing!, n_conditions, ) - callback = CallbackSet( - save_flow_cb, - basin_cb, - tabulated_rating_curve_cb, - discrete_control_cb, - ) - else - callback = CallbackSet(save_flow_cb, basin_cb, tabulated_rating_curve_cb) + push!(callbacks, discrete_control_cb) end + callback = CallbackSet(callbacks...) return callback, saved_flow end diff --git a/core/src/create.jl b/core/src/create.jl index 3afd22730..e1e8b921a 100644 --- a/core/src/create.jl +++ b/core/src/create.jl @@ -198,6 +198,9 @@ function static_and_time_node_ids( return static_node_ids, time_node_ids, node_ids, node_names, !errors end +const nonconservative_nodetypes = + Set{String}(["Basin", "LevelBoundary", "FlowBoundary", "Terminal", "User"]) + function Connectivity(db::DB, config::Config, chunk_size::Int)::Connectivity if !valid_edge_types(db) error("Invalid edge types found.") @@ -210,6 +213,16 @@ function Connectivity(db::DB, config::Config, chunk_size::Int)::Connectivity edge_ids_flow_inv = Dictionary(values(edge_ids_flow), keys(edge_ids_flow)) flow = adjacency_matrix(graph_flow, Float64) + # Add a self-loop, i.e. an entry on the diagonal, for all non-conservative node types. + # This is used to store the gain (positive) or loss (negative) for the water balance. + # Note that this only affects the sparsity structure. + # We want to do it here to avoid changing that during the simulation and keeping it predictable, + # e.g. if we wouldn't do this, inactive nodes can appear if control turns them on during runtime. + for (i, nodetype) in enumerate(get_nodetypes(db)) + if nodetype in nonconservative_nodetypes + flow[i, i] = 1.0 + end + end flow .= 0.0 if config.solver.autodiff diff --git a/core/src/io.jl b/core/src/io.jl index 5628e3a85..ea4be2277 100644 --- a/core/src/io.jl +++ b/core/src/io.jl @@ -1,3 +1,7 @@ +function get_nodetypes(db::DB)::Vector{String} + return only(execute(columntable, db, "SELECT type FROM Node ORDER BY fid")) +end + function get_ids(db::DB)::Vector{Int} return only(execute(columntable, db, "SELECT fid FROM Node ORDER BY fid")) end @@ -166,59 +170,57 @@ function get_storages_and_levels( return (; time = tsteps, node_id, storage, level) end -function write_basin_output(model::Model, compress) - (; config, integrator) = model - (; p) = integrator - +"Create the basin result table from the saved data" +function basin_table(model::Model)::NamedTuple data = get_storages_and_levels(model) nbasin = length(data.node_id) ntsteps = length(data.time) - time = convert.(Arrow.DATETIME, repeat(data.time; inner = nbasin)) + time = repeat(data.time; inner = nbasin) node_id = repeat(data.node_id; outer = ntsteps) - basin = (; time, node_id, storage = vec(data.storage), level = vec(data.level)) - path = output_path(config, config.output.basin) - mkpath(dirname(path)) - Arrow.write(path, basin; compress) + return (; time, node_id, storage = vec(data.storage), level = vec(data.level)) end -function write_flow_output(model::Model, compress) +"Create a flow result table from the saved data" +function flow_table(model::Model)::NamedTuple (; config, saved_flow, integrator) = model (; t, saveval) = saved_flow (; connectivity) = integrator.p I, J, _ = findnz(get_tmp(connectivity.flow, integrator.u)) - unique_edge_ids = [connectivity.edge_ids_flow[ij] for ij in zip(I, J)] + # self-loops have no edge ID + unique_edge_ids = [get(connectivity.edge_ids_flow, ij, missing) for ij in zip(I, J)] nflow = length(I) ntsteps = length(t) - time = - convert.( - Arrow.DATETIME, - repeat(datetime_since.(t, config.starttime); inner = nflow), - ) - + time = repeat(datetime_since.(t, config.starttime); inner = nflow) edge_id = repeat(unique_edge_ids; outer = ntsteps) from_node_id = repeat(I; outer = ntsteps) to_node_id = repeat(J; outer = ntsteps) flow = FlatVector(saveval) - table = (; time, edge_id, from_node_id, to_node_id, flow) - path = output_path(config, config.output.flow) - mkpath(dirname(path)) - Arrow.write(path, table; compress) + return (; time, edge_id, from_node_id, to_node_id, flow) end -function write_discrete_control_output(model::Model, compress) - config = model.config - record = model.integrator.p.discrete_control.record +"Create a discrete control result table from the saved data" +function discrete_control_table(model::Model)::NamedTuple + (; config) = model + (; record) = model.integrator.p.discrete_control - time = convert.(Arrow.DATETIME, datetime_since.(record.time, config.starttime)) - - table = (; time, record.control_node_id, record.truth_state, record.control_state) + time = datetime_since.(record.time, config.starttime) + return (; time, record.control_node_id, record.truth_state, record.control_state) +end - path = output_path(config, config.output.control) +"Write a result table to disk as an Arrow file" +function write_arrow( + path::AbstractString, + table::NamedTuple, + compress::TranscodingStreams.Codec, +) + # ensure DateTime is encoded in a compatible manner + # https://github.com/apache/arrow-julia/issues/303 + table = merge(table, (; time = convert.(Arrow.DATETIME, table.time))) mkpath(dirname(path)) Arrow.write(path, table; compress) end diff --git a/core/src/lib.jl b/core/src/lib.jl index b18804bfb..ab33f6f44 100644 --- a/core/src/lib.jl +++ b/core/src/lib.jl @@ -29,7 +29,13 @@ function Model(config::Config)::Model return BMI.initialize(Model, config::Config) end -timesteps(model::Model) = model.integrator.sol.t +"Get all saved times in seconds since start" +timesteps(model::Model)::Vector{Float64} = model.integrator.sol.t + +"Get all saved times as a Vector{DateTime}" +function datetimes(model::Model)::Vector{DateTime} + return datetime_since.(timesteps(model), model.config.starttime) +end function Base.show(io::IO, model::Model) (; config, integrator) = model diff --git a/core/src/solve.jl b/core/src/solve.jl index 594ee6ac4..dd9bceb58 100644 --- a/core/src/solve.jl +++ b/core/src/solve.jl @@ -525,12 +525,17 @@ end Smoothly let the evaporation flux go to 0 when at small water depths Currently at less than 0.1 m. """ -function formulate!(du::AbstractVector, basin::Basin, storage::AbstractVector)::Nothing - (; current_level, current_area) = basin +function formulate_basins!( + du::AbstractVector, + basin::Basin, + flow::AbstractMatrix, + storage::AbstractVector, +)::Nothing + (; node_id, current_level, current_area) = basin current_level = get_tmp(current_level, storage) current_area = get_tmp(current_area, storage) - for i in eachindex(storage) + for (i, id) in enumerate(node_id) # add all precipitation that falls within the profile level = current_level[i] area = current_area[i] @@ -545,7 +550,9 @@ function formulate!(du::AbstractVector, basin::Basin, storage::AbstractVector):: drainage = basin.drainage[i] infiltration = factor * basin.infiltration[i] - du.storage[i] += precipitation - evaporation + drainage - infiltration + influx = precipitation - evaporation + drainage - infiltration + du.storage[i] += influx + flow[id, id] = influx end return nothing end @@ -748,8 +755,6 @@ function formulate_flow!( dst_id = only(outneighbors(graph_flow, id)) if !active[i] - flow[src_id, id] = 0.0 - flow[id, dst_id] = 0.0 continue end @@ -776,6 +781,7 @@ function formulate_flow!( # Return flow is immediate flow[id, dst_id] = q * return_factor[i] + flow[id, id] = -q * (1 - return_factor[i]) end return nothing end @@ -805,9 +811,6 @@ function formulate_flow!( ) / resistance[i] flow[basin_a_id, id] = q flow[id, basin_b_id] = q - else - flow[basin_a_id, id] = 0.0 - flow[id, basin_b_id] = 0.0 end end return nothing @@ -902,8 +905,6 @@ function formulate_flow!( basin_b_id = only(outneighbors(graph_flow, id)) if !active[i] - flow[basin_a_id, id] = 0.0 - flow[id, basin_b_id] = 0.0 continue end @@ -964,6 +965,50 @@ function formulate_flow!( return nothing end +function formulate_flow!( + terminal::Terminal, + p::Parameters, + storage::AbstractVector, + t::Float64, +)::Nothing + (; connectivity) = p + (; graph_flow, flow) = connectivity + (; node_id) = terminal + flow = get_tmp(flow, storage) + + for id in node_id + for upstream_id in inneighbors(graph_flow, id) + q = flow[upstream_id, id] + flow[id, id] -= q + end + end + return nothing +end + +function formulate_flow!( + level_boundary::LevelBoundary, + p::Parameters, + storage::AbstractVector, + t::Float64, +)::Nothing + (; connectivity) = p + (; graph_flow, flow) = connectivity + (; node_id) = level_boundary + flow = get_tmp(flow, storage) + + for id in node_id + for in_id in inneighbors(graph_flow, id) + q = flow[in_id, id] + flow[id, id] -= q + end + for out_id in outneighbors(graph_flow, id) + q = flow[id, out_id] + flow[id, id] += q + end + end + return nothing +end + function formulate_flow!( flow_boundary::FlowBoundary, p::Parameters, @@ -979,7 +1024,6 @@ function formulate_flow!( # Requirement: edge points away from the flow boundary for dst_id in outneighbors(graph_flow, id) if !active[i] - flow[id, dst_id] = 0.0 continue end @@ -987,6 +1031,7 @@ function formulate_flow!( # Adding water is always possible flow[id, dst_id] = rate + flow[id, id] = rate end end end @@ -1008,8 +1053,6 @@ function formulate_flow!( dst_id = only(outneighbors(graph_flow, id)) if !isactive || pid_controlled - flow[src_id, id] = 0.0 - flow[id, dst_id] = 0.0 continue end @@ -1044,8 +1087,6 @@ function formulate_flow!( dst_id = only(outneighbors(graph_flow, id)) if !active[i] || is_pid_controlled[i] - flow[src_id, id] = 0.0 - flow[id, dst_id] = 0.0 continue end @@ -1078,7 +1119,7 @@ function formulate_flow!( return nothing end -function formulate!( +function formulate_du!( du::ComponentVector, connectivity::Connectivity, flow::AbstractMatrix, @@ -1105,10 +1146,12 @@ function formulate_flows!(p::Parameters, storage::AbstractVector, t::Float64)::N manning_resistance, tabulated_rating_curve, flow_boundary, + level_boundary, pump, outlet, user, fractional_flow, + terminal, ) = p formulate_flow!(linear_resistance, p, storage, t) @@ -1119,8 +1162,10 @@ function formulate_flows!(p::Parameters, storage::AbstractVector, t::Float64)::N formulate_flow!(outlet, p, storage, t) formulate_flow!(user, p, storage, t) - # FractionalFlow must be done last as it relies on formulated input flows + # do these last since they rely on formulated input flows formulate_flow!(fractional_flow, p, storage, t) + formulate_flow!(level_boundary, p, storage, t) + formulate_flow!(terminal, p, storage, t) end """ @@ -1146,13 +1191,13 @@ function water_balance!( set_current_basin_properties!(basin, storage) # Basin forcings - formulate!(du, basin, storage) + formulate_basins!(du, basin, flow, storage) # First formulate intermediate flows formulate_flows!(p, storage, t) # Now formulate du - formulate!(du, connectivity, flow, basin) + formulate_du!(du, connectivity, flow, basin) # PID control (changes the du of PID controlled basins) continuous_control!(u, du, pid_control, p, integral, t) diff --git a/core/test/run_models.jl b/core/test/run_models.jl index 6a59c0cc6..116031f4c 100644 --- a/core/test/run_models.jl +++ b/core/test/run_models.jl @@ -1,3 +1,4 @@ +using Dates using Logging: Debug, with_logger using Test using Ribasim @@ -5,6 +6,7 @@ import BasicModelInterface as BMI using SciMLBase: successful_retcode import Tables using PreallocationTools: get_tmp +using DataFrames: DataFrame @testset "trivial model" begin toml_path = normpath(@__DIR__, "../../generated_testmodels/trivial/trivial.toml") @@ -38,13 +40,21 @@ end @test all(isconcretetype, fieldtypes(typeof(p))) @test successful_retcode(model) - @test allunique(model.integrator.sol.t) + @test allunique(Ribasim.timesteps(model)) @test model.integrator.sol.u[end] ≈ Float32[519.8817, 519.8798, 339.3959, 1418.4331] skip = Sys.isapple() atol = 1.5 @test length(logger.logs) == 7 @test logger.logs[1].level == Debug @test logger.logs[1].message == "Read database into memory." + + table = Ribasim.flow_table(model) + @test Tables.schema(table) == Tables.Schema( + (:time, :edge_id, :from_node_id, :to_node_id, :flow), + (DateTime, Union{Int, Missing}, Int, Int, Float64), + ) + # flows are recorded at the end of each period, and are undefined at the start + @test unique(table.time) == Ribasim.datetimes(model)[2:end] end @testset "basic transient model" begin @@ -158,13 +168,15 @@ end level = level[1] timesteps = model.saved_flow.t - outlet_flow = [saveval[1] for saveval in model.saved_flow.saveval] + flow = DataFrame(Ribasim.flow_table(model)) + outlet_flow = + filter([:from_node_id, :to_node_id] => (from, to) -> from === 2 && to === 3, flow) t_min_crest_level = level.t[2] * (outlet.min_crest_level[1] - level.u[1]) / (level.u[2] - level.u[1]) # No outlet flow when upstream level is below minimum crest level - @test all(@. outlet_flow[timesteps <= t_min_crest_level] == 0) + @test all(@. outlet_flow.flow[timesteps <= t_min_crest_level] == 0) timesteps = Ribasim.timesteps(model) t_maximum_level = level.t[2] @@ -258,7 +270,7 @@ end # slope is. See e.g.: # https://www.hec.usace.army.mil/confluence/rasdocs/ras1dtechref/latest/theoretical-basis-for-one-dimensional-and-two-dimensional-hydrodynamic-calculations/1d-steady-flow-water-surface-profiles/friction-loss-evaluation @test all(isapprox.(h_expected, h_actual; atol = 0.02)) - # Test for conservation of mass - @test all(isapprox.(model.saved_flow.saveval[end], 5.0, atol = 0.001)) skip = - Sys.isapple() + # Test for conservation of mass, flow at the beginning == flow at the end + @test model.saved_flow.saveval[end][2] ≈ 5.0 atol = 0.001 skip = Sys.isapple() + @test model.saved_flow.saveval[end][end - 1] ≈ 5.0 atol = 0.001 skip = Sys.isapple() end diff --git a/core/test/time.jl b/core/test/time.jl index 8a30204a2..6724ea808 100644 --- a/core/test/time.jl +++ b/core/test/time.jl @@ -1,5 +1,7 @@ using Ribasim using SQLite +using Dates +using DataFrames: DataFrame @testset "Time dependent flow boundary" begin toml_path = normpath( @@ -9,14 +11,24 @@ using SQLite @test ispath(toml_path) model = Ribasim.run(toml_path) - flow = [flows[1] for flows in model.saved_flow.saveval] - i_start = searchsortedlast(flow, 1) - i_end = searchsortedfirst(flow, 2) - - t = model.saved_flow.t[i_start:i_end] + flow = DataFrame(Ribasim.flow_table(model)) + # only from March to September the FlowBoundary varies + sin_timestamps = 3 .<= month.(unique(flow.time)) .< 10 + + flow_added_1 = filter( + [:from_node_id, :to_node_id] => (from, to) -> from === 1 && to === 1, + flow, + ).flow[sin_timestamps] + flow_1_to_2 = filter( + [:from_node_id, :to_node_id] => (from, to) -> from === 1 && to === 2, + flow, + ).flow[sin_timestamps] + @test flow_added_1 == flow_1_to_2 + + t = model.saved_flow.t[sin_timestamps] flow_expected = @. 1 + sin(0.5 * π * (t - t[1]) / (t[end] - t[1]))^2 - - @test isapprox(flow[i_start:i_end], flow_expected, rtol = 0.001) + # some difference is expected since the modeled flow is for the period up to t + @test isapprox(flow_added_1, flow_expected, rtol = 0.005) end @testset "User demand interpolation" begin diff --git a/docs/contribute/addnode.qmd b/docs/contribute/addnode.qmd index d55e98f52..491743c91 100644 --- a/docs/contribute/addnode.qmd +++ b/docs/contribute/addnode.qmd @@ -83,6 +83,10 @@ function formulate_flow!(new_node_type::NewNodeType, p::Parameters)::Nothing end ``` +If the new node type is non-conservative, meaning it either adds or removes water from the model, these boundary flows also need to be recorded. +This is done by storing it on the diagonal of the `flow[from, to]` matrix, e.g. `flow[id, id] = q`, where `q` is positive for water added to the model. +Non-conservative node types need to be added to the `nonconservative_nodetypes` set such that this diagonal is set to a nonzero on creating the flow sparse matrix in the `Connectivity` constructor. + ## The Jacobian See [Equations](../core/equations.qmd#the-jacobian) for a mathematical description of the Jacobian. diff --git a/docs/core/usage.qmd b/docs/core/usage.qmd index 94135cd5a..9865146c5 100644 --- a/docs/core/usage.qmd +++ b/docs/core/usage.qmd @@ -286,15 +286,17 @@ The table is sorted by time, and per time it is sorted by `node_id`. The flow table contains outputs of the flow on every edge in the model, for each solver timestep. -column | type | unit -------------- | -------- | ---- -time | DateTime | - -edge_id | Int | - -from_node_id | Int | - -to_node_id | Int | - -flow | Float64 | $m^3 s^{-1}$ - -The table is sorted by time, and per time the same edge_id order is used, though not sorted. +column | type | unit +------------- | ------------------- | ---- +time | DateTime | - +edge_id | Union{Int, Missing} | - +from_node_id | Int | - +to_node_id | Int | - +flow | Float64 | $m^3 s^{-1}$ + +The table is sorted by time, and per time the same `edge_id` order is used, though not sorted. +Flows that are added to the model at a node, have a missing `edge_id`, and identical `from_node_id` and `to_node_id`. +Flows out of the model always have a negative sign, and additions a positive sign. # FractionalFlow