From 7c64d94bebc8a4a34ea3c1f43039bb21964b90a9 Mon Sep 17 00:00:00 2001 From: Bart de Koning Date: Tue, 14 May 2024 16:22:35 +0200 Subject: [PATCH 01/11] Towards using IntegratingCallback --- core/src/Ribasim.jl | 4 +- core/src/callback.jl | 163 ++++++++++++------------------------------ core/src/graph.jl | 12 +--- core/src/model.jl | 3 +- core/src/parameter.jl | 46 +++--------- core/src/read.jl | 10 +-- core/src/util.jl | 14 ++-- core/src/write.jl | 13 +++- 8 files changed, 83 insertions(+), 182 deletions(-) diff --git a/core/src/Ribasim.jl b/core/src/Ribasim.jl index a761c4b34..65d39587f 100644 --- a/core/src/Ribasim.jl +++ b/core/src/Ribasim.jl @@ -34,7 +34,9 @@ using DiffEqCallbacks: PeriodicCallback, PresetTimeCallback, SavedValues, - SavingCallback + SavingCallback, + IntegratingCallback, + IntegrandValues using EnumX: EnumX, @enumx using ForwardDiff: pickchunksize using Graphs: diff --git a/core/src/callback.jl b/core/src/callback.jl index 908e21b03..6d2f34be9 100644 --- a/core/src/callback.jl +++ b/core/src/callback.jl @@ -1,4 +1,3 @@ - """ Create the different callbacks that are used to store results and feed the simulation with new data. The different callbacks @@ -13,12 +12,17 @@ function create_callbacks( (; starttime, basin, tabulated_rating_curve, discrete_control) = parameters callbacks = SciMLBase.DECallback[] + # Check for negative storages negative_storage_cb = FunctionCallingCallback(check_negative_storage) push!(callbacks, negative_storage_cb) - integrating_flows_cb = FunctionCallingCallback(integrate_flows!; func_start = false) + integrand_output_prototype = get_flow_integrand_output_prototype(parameters) + saved_flow = IntegrandValues(Float64, typeof(integrand_output_prototype)) + integrating_flows_cb = + IntegratingCallback(flow_integrand, saved_flow, integrand_output_prototype) push!(callbacks, integrating_flows_cb) + # TODO: What is this? tstops = get_tstops(basin.time.time, starttime) basin_cb = PresetTimeCallback(tstops, update_basin; save_positions = (false, false)) push!(callbacks, basin_cb) @@ -31,19 +35,6 @@ function create_callbacks( ) push!(callbacks, tabulated_rating_curve_cb) - # If saveat is a vector which contains 0.0 this callback will still be called - # at t = 0.0 despite save_start = false - saveat = saveat isa Vector ? filter(x -> x != 0.0, saveat) : saveat - saved_vertical_flux = SavedValues(Float64, typeof(basin.vertical_flux_integrated)) - save_vertical_flux_cb = - SavingCallback(save_vertical_flux, saved_vertical_flux; saveat, save_start = false) - push!(callbacks, save_vertical_flux_cb) - - # save the flows over time - saved_flow = SavedValues(Float64, SavedFlow) - save_flow_cb = SavingCallback(save_flow, saved_flow; saveat, save_start = false) - push!(callbacks, save_flow_cb) - # interpolate the levels saved_subgrid_level = SavedValues(Float64, Vector{Float64}) if config.results.subgrid @@ -56,7 +47,7 @@ function create_callbacks( push!(callbacks, export_cb) end - saved = SavedResults(saved_flow, saved_vertical_flux, saved_subgrid_level) + saved = SavedResults(saved_flow, saved_subgrid_level) n_conditions = sum(length(vec) for vec in discrete_control.greater_than; init = 0) if n_conditions > 0 @@ -68,128 +59,68 @@ function create_callbacks( return callback, saved end -function check_negative_storage(u, t, integrator)::Nothing - (; basin) = integrator.p - (; node_id) = basin - errors = false - for (i, id) in enumerate(node_id) - if u.storage[i] < 0 - @error "Negative storage detected in $id" - errors = true - end - end - - if errors - t_datetime = datetime_since(integrator.t, integrator.p.starttime) - error("Negative storages found at $t_datetime.") - end - return nothing -end - -""" -Integrate flows over the last timestep -""" -function integrate_flows!(u, t, integrator)::Nothing - (; p, dt) = integrator - (; graph, user_demand, basin, allocation) = p - (; flow, flow_dict, flow_prev, flow_integrated) = graph[] - (; vertical_flux, vertical_flux_prev, vertical_flux_integrated, vertical_flux_bmi) = - basin - flow = get_tmp(flow, 0) +function flow_integrand(out, u, t, integrator)::Nothing + (; basin, graph) = integrator.p + (; vertical_flux) = basin vertical_flux = get_tmp(vertical_flux, 0) - if !isempty(flow_prev) && isnan(flow_prev[1]) - # If flow_prev is not populated yet - copyto!(flow_prev, flow) - end - - @. flow_integrated += 0.5 * (flow + flow_prev) * dt - @. vertical_flux_integrated += 0.5 * (vertical_flux + vertical_flux_prev) * dt - @. vertical_flux_bmi += 0.5 * (vertical_flux + vertical_flux_prev) * dt + flow = get_tmp(graph[].flow, 0) + out_vertical_flux = @view out[(:precipitation, :evaporation, :drainage, :infiltration)] - # UserDemand realized flows for BMI - for (i, id) in enumerate(user_demand.node_id) - src_id = inflow_id(graph, id) - flow_idx = flow_dict[src_id, id] - user_demand.realized_bmi[i] += 0.5 * (flow[flow_idx] + flow_prev[flow_idx]) * dt - end + out.flow .= flow + out_vertical_flux .= vertical_flux - # Allocation source flows - for (edge, value) in allocation.mean_flows - if edge[1] == edge[2] - # Vertical fluxes - _, basin_idx = id_index(basin.node_id, edge[1]) - value[] += - 0.5 * - (get_influx(basin, basin_idx) + get_influx(basin, basin_idx; prev = true)) * - dt - else - # Horizontal flows - value[] += - 0.5 * (get_flow(graph, edge..., 0) + get_flow_prev(graph, edge..., 0)) * dt - end - end - - copyto!(flow_prev, flow) - copyto!(vertical_flux_prev, vertical_flux) - return nothing -end - -"Compute the average flows over the last saveat interval and write -them to SavedValues" -function save_flow(u, t, integrator) - (; graph) = integrator.p - (; flow_integrated, flow_dict) = graph[] - (; node_id) = integrator.p.basin - - Δt = get_Δt(integrator) - flow_mean = copy(flow_integrated) - flow_mean ./= Δt - fill!(flow_integrated, 0.0) - - # Divide the flows over edges to Basin inflow and outflow, regardless of edge direction. - inflow_mean = zeros(length(node_id)) - outflow_mean = zeros(length(node_id)) - - for (i, basin_id) in enumerate(node_id) - for inflow_id in inflow_ids(graph, basin_id) - q = flow_mean[flow_dict[inflow_id, basin_id]] + for (i, basin_id) in enumerate(basin.node_id) + for inflow_edge in basin.inflow_edges[i] + q = get_flow(graph, inflow_edge, 0) if q > 0 - inflow_mean[i] += q + out.basin_inflow[i] += q else - outflow_mean[i] -= q + out.basin_outflow[i] -= q end end - for outflow_id in outflow_ids(graph, basin_id) - q = flow_mean[flow_dict[basin_id, outflow_id]] + for outflow_edge in basin.outflow_edges[i] + q = get_flow(graph, outflow_edge, 0) if q > 0 - outflow_mean[i] += q + out.basin_outflow[i] += q else - inflow_mean[i] -= q + out.basin_inflow[i] -= q end end end + return nothing +end - return SavedFlow(; flow = flow_mean, inflow = inflow_mean, outflow = outflow_mean) +function get_flow_integrand_output_prototype(p::Parameters) + (; graph, basin) = p + flow = get_tmp(graph[].flow, 0) + vertical_flux = get_tmp(basin.vertical_flux, 0) + n_basin = length(basin.node_id) + basin_inflow = zeros(n_basin) + basin_outflow = zeros(n_basin) + return ComponentVector{Float64}(; basin_inflow, basin_outflow, flow, vertical_flux...) end -"Compute the average vertical fluxes over the last saveat interval and write -them to SavedValues" -function save_vertical_flux(u, t, integrator) +function check_negative_storage(u, t, integrator)::Nothing (; basin) = integrator.p - (; vertical_flux_integrated) = basin - - Δt = get_Δt(integrator) - vertical_flux_mean = copy(vertical_flux_integrated) - vertical_flux_mean ./= Δt - fill!(vertical_flux_integrated, 0.0) + (; node_id) = basin + errors = false + for (i, id) in enumerate(node_id) + if u.storage[i] < 0 + @error "Negative storage detected in $id" + errors = true + end + end - return vertical_flux_mean + if errors + t_datetime = datetime_since(integrator.t, integrator.p.starttime) + error("Negative storages found at $t_datetime.") + end + return nothing end function apply_discrete_control!(u, t, integrator)::Nothing (; p) = integrator (; discrete_control) = p - condition_idx = 0 discrete_control_condition!(u, t, integrator) diff --git a/core/src/graph.jl b/core/src/graph.jl index 8351dd92d..cd3009ca6 100644 --- a/core/src/graph.jl +++ b/core/src/graph.jl @@ -100,20 +100,10 @@ function create_graph(db::DB, config::Config, chunk_sizes::Vector{Int})::MetaGra end flow = zeros(flow_counter) - flow_prev = fill(NaN, flow_counter) - flow_integrated = zeros(flow_counter) if config.solver.autodiff flow = DiffCache(flow, chunk_sizes) end - graph_data = (; - node_ids, - edges_source, - flow_dict, - flow, - flow_prev, - flow_integrated, - config.solver.saveat, - ) + graph_data = (; node_ids, edges_source, flow_dict, flow, config.solver.saveat) graph = @set graph.graph_data = graph_data return graph diff --git a/core/src/model.jl b/core/src/model.jl index 4ab790d21..d51865011 100644 --- a/core/src/model.jl +++ b/core/src/model.jl @@ -1,6 +1,5 @@ struct SavedResults{V1 <: ComponentVector{Float64}} - flow::SavedValues{Float64, SavedFlow} - vertical_flux::SavedValues{Float64, V1} + flow::IntegrandValues{Float64, V1} subgrid_level::SavedValues{Float64, Vector{Float64}} end diff --git a/core/src/parameter.jl b/core/src/parameter.jl index ccb017ca2..6b1f6f289 100644 --- a/core/src/parameter.jl +++ b/core/src/parameter.jl @@ -136,19 +136,6 @@ end abstract type AbstractParameterNode end -""" -In-memory storage of saved mean flows for writing to results. - -- `flow`: The mean flows on all edges -- `inflow`: The sum of the mean flows coming into each basin -- `outflow`: The sum of the mean flows going out of each basin -""" -@kwdef struct SavedFlow - flow::Vector{Float64} - inflow::Vector{Float64} - outflow::Vector{Float64} -end - """ Requirements: @@ -166,16 +153,13 @@ else T = Vector{Float64} end """ -struct Basin{T, C, V1, V2, V3} <: AbstractParameterNode +struct Basin{T, C, V1, V2} <: AbstractParameterNode node_id::Indices{NodeID} - inflow_ids::Vector{Vector{NodeID}} - outflow_ids::Vector{Vector{NodeID}} + inflow_edges::Vector{Vector{EdgeMetadata}} + outflow_edges::Vector{Vector{EdgeMetadata}} # Vertical fluxes vertical_flux_from_input::V1 vertical_flux::V2 - vertical_flux_prev::V3 - vertical_flux_integrated::V3 - vertical_flux_bmi::V3 # Cache this to avoid recomputation current_level::T current_area::T @@ -190,13 +174,10 @@ struct Basin{T, C, V1, V2, V3} <: AbstractParameterNode function Basin( node_id, - inflow_ids, - outflow_ids, + inflow_edges, + outflow_edges, vertical_flux_from_input::V1, vertical_flux::V2, - vertical_flux_prev::V3, - vertical_flux_integrated::V3, - vertical_flux_bmi::V3, current_level::T, current_area::T, area, @@ -204,18 +185,15 @@ struct Basin{T, C, V1, V2, V3} <: AbstractParameterNode storage, demand, time::StructVector{BasinTimeV1, C, Int}, - ) where {T, C, V1, V2, V3} + ) where {T, C, V1, V2} is_valid = valid_profiles(node_id, level, area) is_valid || error("Invalid Basin / profile table.") - return new{T, C, V1, V2, V3}( + return new{T, C, V1, V2}( node_id, - inflow_ids, - outflow_ids, + inflow_edges, + outflow_edges, vertical_flux_from_input, vertical_flux, - vertical_flux_prev, - vertical_flux_integrated, - vertical_flux_bmi, current_level, current_area, area, @@ -637,7 +615,7 @@ struct Subgrid end # TODO Automatically add all nodetypes here -struct Parameters{T, C1, C2, V1, V2, V3} +struct Parameters{T, C1, C2, V1, V2} starttime::DateTime graph::MetaGraph{ Int64, @@ -650,15 +628,13 @@ struct Parameters{T, C1, C2, V1, V2, V3} edges_source::Dict{Int32, Set{EdgeMetadata}}, flow_dict::Dict{Tuple{NodeID, NodeID}, Int32}, flow::T, - flow_prev::Vector{Float64}, - flow_integrated::Vector{Float64}, saveat::Float64, }, MetaGraphsNext.var"#11#13", Float64, } allocation::Allocation - basin::Basin{T, C1, V1, V2, V3} + basin::Basin{T, C1, V1, V2} linear_resistance::LinearResistance manning_resistance::ManningResistance tabulated_rating_curve::TabulatedRatingCurve{C2} diff --git a/core/src/read.jl b/core/src/read.jl index 2c7b2df43..203d177dd 100644 --- a/core/src/read.jl +++ b/core/src/read.jl @@ -554,9 +554,6 @@ function Basin(db::DB, config::Config, graph::MetaGraph, chunk_sizes::Vector{Int drainage = copy(drainage), infiltration = copy(infiltration), ) - vertical_flux_prev = zero(vertical_flux) - vertical_flux_integrated = zero(vertical_flux) - vertical_flux_bmi = zero(vertical_flux) if config.solver.autodiff current_level = DiffCache(current_level, chunk_sizes) @@ -570,13 +567,10 @@ function Basin(db::DB, config::Config, graph::MetaGraph, chunk_sizes::Vector{Int return Basin( Indices(node_id), - [collect(inflow_ids(graph, id)) for id in node_id], - [collect(outflow_ids(graph, id)) for id in node_id], + inflow_edges.(Ref(graph), node_id), + outflow_edges.(Ref(graph), node_id), vertical_flux_from_input, vertical_flux, - vertical_flux_prev, - vertical_flux_integrated, - vertical_flux_bmi, current_level, current_area, area, diff --git a/core/src/util.jl b/core/src/util.jl index 4cc6d0f84..b341bba8d 100644 --- a/core/src/util.jl +++ b/core/src/util.jl @@ -521,8 +521,9 @@ function Base.getindex(fv::FlatVector, i::Int) return v[r + 1] end -"Construct a FlatVector from one of the fields of SavedFlow." -FlatVector(saveval::Vector{SavedFlow}, sym::Symbol) = FlatVector(getfield.(saveval, sym)) +"Construct a FlatVector from one of the components of a ComponentVector." +FlatVector(saveval::Vector{ComponentVector{A, B, C}} where {A, B, C}, sym::Symbol) = + FlatVector(collect.(view.(saveval, sym))) """ Function that goes smoothly from 0 to 1 in the interval [0,threshold], @@ -698,11 +699,10 @@ function get_influx(basin::Basin, node_id::NodeID)::Float64 return get_influx(basin, basin_idx) end -function get_influx(basin::Basin, basin_idx::Int; prev::Bool = false)::Float64 - (; vertical_flux, vertical_flux_prev) = basin +function get_influx(basin::Basin, basin_idx::Int)::Float64 + (; vertical_flux) = basin vertical_flux = get_tmp(vertical_flux, 0) - flux_vector = prev ? vertical_flux_prev : vertical_flux - (; precipitation, evaporation, drainage, infiltration) = flux_vector + (; precipitation, evaporation, drainage, infiltration) = vertical_flux return precipitation[basin_idx] - evaporation[basin_idx] + drainage[basin_idx] - infiltration[basin_idx] end @@ -730,6 +730,8 @@ has_fractional_flow_outneighbors(graph::MetaGraph, node_id::NodeID)::Bool = any( inflow_edge(graph, node_id)::EdgeMetadata = graph[inflow_id(graph, node_id), node_id] outflow_edge(graph, node_id)::EdgeMetadata = graph[node_id, outflow_id(graph, node_id)] +inflow_edges(graph, node_id)::Vector{EdgeMetadata} = + [graph[inflow_id, node_id] for inflow_id in inflow_ids(graph, node_id)] outflow_edges(graph, node_id)::Vector{EdgeMetadata} = [graph[node_id, outflow_id] for outflow_id in outflow_ids(graph, node_id)] diff --git a/core/src/write.jl b/core/src/write.jl index 1f2ed07d4..89bf5119d 100644 --- a/core/src/write.jl +++ b/core/src/write.jl @@ -76,6 +76,9 @@ function get_storages_and_levels( return (; time = tsteps, node_id, storage, level) end +function average_over_saveats(integral_data, integrator) +end + "Create the basin result table from the saved data" function basin_table( model::Model, @@ -105,8 +108,8 @@ function basin_table( ntsteps = length(data.time) - 1 nrows = nbasin * ntsteps - inflow_rate = FlatVector(saved.flow.saveval, :inflow) - outflow_rate = FlatVector(saved.flow.saveval, :outflow) + inflow_rate = FlatVector(saved.flow.integrand, :basin_inflow) + outflow_rate = FlatVector(saved.flow.integrand, :basin_outflow) precipitation = zeros(nrows) evaporation = zeros(nrows) drainage = zeros(nrows) @@ -114,8 +117,12 @@ function basin_table( balance_error = zeros(nrows) relative_error = zeros(nrows) + @show length(model.integrator.sol.t) + @show ntsteps + @show length(saved.flow.integrand) + idx_row = 0 - for cvec in saved.vertical_flux.saveval + for cvec in saved.flow.integrand for (precipitation_, evaporation_, drainage_, infiltration_) in zip(cvec.precipitation, cvec.evaporation, cvec.drainage, cvec.infiltration) idx_row += 1 From f7570f38c945e1ca67dc3129bb0dce8f697e80de Mon Sep 17 00:00:00 2001 From: Bart de Koning Date: Tue, 14 May 2024 17:01:08 +0200 Subject: [PATCH 02/11] Fix writing basin table --- core/src/write.jl | 53 +++++++++++++++++++++++++---------------------- 1 file changed, 28 insertions(+), 25 deletions(-) diff --git a/core/src/write.jl b/core/src/write.jl index 89bf5119d..88ca75e36 100644 --- a/core/src/write.jl +++ b/core/src/write.jl @@ -76,7 +76,25 @@ function get_storages_and_levels( return (; time = tsteps, node_id, storage, level) end -function average_over_saveats(integral_data, integrator) +function average_over_saveats( + integrand_values::IntegrandValues, + symb::Symbol, + saveats::Vector{Float64}, +) + (; integrand, ts) = integrand_values + averages = Vector{Float64}[] + saveat_index = 2 + integral_saveat = zero(first(integrand)[symb]) + for (integral_step, t) in zip(integrand, ts) + if t >= saveats[saveat_index] + Δt_saveat = saveats[saveat_index] - saveats[saveat_index - 1] + push!(averages, copy(integral_saveat) / Δt_saveat) + integral_saveat .= 0.0 + saveat_index += 1 + end + integral_saveat += integral_step[symb] + end + return averages end "Create the basin result table from the saved data" @@ -97,7 +115,7 @@ function basin_table( balance_error::Vector{Float64}, relative_error::Vector{Float64}, } - (; saved) = model + (; saved, integrator) = model # The last timestep is not included; there is no period over which to compute flows. data = get_storages_and_levels(model) storage = vec(data.storage[:, begin:(end - 1)]) @@ -107,32 +125,17 @@ function basin_table( nbasin = length(data.node_id) ntsteps = length(data.time) - 1 nrows = nbasin * ntsteps - - inflow_rate = FlatVector(saved.flow.integrand, :basin_inflow) - outflow_rate = FlatVector(saved.flow.integrand, :basin_outflow) - precipitation = zeros(nrows) - evaporation = zeros(nrows) - drainage = zeros(nrows) - infiltration = zeros(nrows) + saveats = integrator.sol.t + + inflow_rate = FlatVector(average_over_saveats(saved.flow, :basin_inflow, saveats)) + outflow_rate = FlatVector(average_over_saveats(saved.flow, :basin_inflow, saveats)) + precipitation = FlatVector(average_over_saveats(saved.flow, :precipitation, saveats)) + evaporation = FlatVector(average_over_saveats(saved.flow, :evaporation, saveats)) + drainage = FlatVector(average_over_saveats(saved.flow, :drainage, saveats)) + infiltration = FlatVector(average_over_saveats(saved.flow, :infiltration, saveats)) balance_error = zeros(nrows) relative_error = zeros(nrows) - @show length(model.integrator.sol.t) - @show ntsteps - @show length(saved.flow.integrand) - - idx_row = 0 - for cvec in saved.flow.integrand - for (precipitation_, evaporation_, drainage_, infiltration_) in - zip(cvec.precipitation, cvec.evaporation, cvec.drainage, cvec.infiltration) - idx_row += 1 - precipitation[idx_row] = precipitation_ - evaporation[idx_row] = evaporation_ - drainage[idx_row] = drainage_ - infiltration[idx_row] = infiltration_ - end - end - time = repeat(data.time[begin:(end - 1)]; inner = nbasin) Δtime_seconds = seconds.(diff(data.time)) Δtime = repeat(Δtime_seconds; inner = nbasin) From b7b6299ac7b0f42793cc5c85e308bef2a8c13269 Mon Sep 17 00:00:00 2001 From: Bart de Koning Date: Wed, 15 May 2024 12:50:15 +0200 Subject: [PATCH 03/11] Fix writing output --- core/src/util.jl | 4 ---- core/src/write.jl | 32 +++++++++++++++++--------------- 2 files changed, 17 insertions(+), 19 deletions(-) diff --git a/core/src/util.jl b/core/src/util.jl index 0ba4a10a0..8b35af23e 100644 --- a/core/src/util.jl +++ b/core/src/util.jl @@ -521,10 +521,6 @@ function Base.getindex(fv::FlatVector, i::Int) return v[r + 1] end -"Construct a FlatVector from one of the components of a ComponentVector." -FlatVector(saveval::Vector{ComponentVector{A, B, C}} where {A, B, C}, sym::Symbol) = - FlatVector(collect.(view.(saveval, sym))) - """ Function that goes smoothly from 0 to 1 in the interval [0,threshold], and is constant outside this interval. diff --git a/core/src/write.jl b/core/src/write.jl index 88ca75e36..6ecaf41f6 100644 --- a/core/src/write.jl +++ b/core/src/write.jl @@ -86,15 +86,15 @@ function average_over_saveats( saveat_index = 2 integral_saveat = zero(first(integrand)[symb]) for (integral_step, t) in zip(integrand, ts) - if t >= saveats[saveat_index] + integral_saveat += integral_step[symb] + if t == saveats[saveat_index] Δt_saveat = saveats[saveat_index] - saveats[saveat_index - 1] push!(averages, copy(integral_saveat) / Δt_saveat) integral_saveat .= 0.0 saveat_index += 1 end - integral_saveat += integral_step[symb] end - return averages + return FlatVector(averages) end "Create the basin result table from the saved data" @@ -127,12 +127,12 @@ function basin_table( nrows = nbasin * ntsteps saveats = integrator.sol.t - inflow_rate = FlatVector(average_over_saveats(saved.flow, :basin_inflow, saveats)) - outflow_rate = FlatVector(average_over_saveats(saved.flow, :basin_inflow, saveats)) - precipitation = FlatVector(average_over_saveats(saved.flow, :precipitation, saveats)) - evaporation = FlatVector(average_over_saveats(saved.flow, :evaporation, saveats)) - drainage = FlatVector(average_over_saveats(saved.flow, :drainage, saveats)) - infiltration = FlatVector(average_over_saveats(saved.flow, :infiltration, saveats)) + inflow_rate = average_over_saveats(saved.flow, :basin_inflow, saveats) + outflow_rate = average_over_saveats(saved.flow, :basin_inflow, saveats) + precipitation = average_over_saveats(saved.flow, :precipitation, saveats) + evaporation = average_over_saveats(saved.flow, :evaporation, saveats) + drainage = average_over_saveats(saved.flow, :drainage, saveats) + infiltration = average_over_saveats(saved.flow, :infiltration, saveats) balance_error = zeros(nrows) relative_error = zeros(nrows) @@ -186,9 +186,9 @@ function flow_table( flow_rate::FlatVector{Float64}, } (; config, saved, integrator) = model - (; t, saveval) = saved.flow (; graph) = integrator.p (; flow_dict) = graph[] + saveats = model.integrator.sol.t from_node_type = String[] from_node_id = Int32[] @@ -210,20 +210,22 @@ function flow_table( end nflow = length(unique_edge_ids_flow) - ntsteps = length(t) + ntsteps = length(saveats) - 1 # the timestamp should represent the start of the period, not the end - t_starts = circshift(t, 1) - if !isempty(t) - t_starts[1] = 0.0 + t_starts = if length(saveats) > 1 + saveats[1:(end - 1)] + else + Float64[] end + time = repeat(datetime_since.(t_starts, config.starttime); inner = nflow) edge_id = repeat(unique_edge_ids_flow; outer = ntsteps) from_node_type = repeat(from_node_type; outer = ntsteps) from_node_id = repeat(from_node_id; outer = ntsteps) to_node_type = repeat(to_node_type; outer = ntsteps) to_node_id = repeat(to_node_id; outer = ntsteps) - flow_rate = FlatVector(saveval, :flow) + flow_rate = average_over_saveats(saved.flow, :flow, saveats) return (; time, From 363b9dc6d66ce87be2427da598bb43567880f6b2 Mon Sep 17 00:00:00 2001 From: Bart de Koning Date: Wed, 15 May 2024 14:59:06 +0200 Subject: [PATCH 04/11] Add integration for BMI --- core/src/Ribasim.jl | 4 +++- core/src/bmi.jl | 4 ++-- core/src/callback.jl | 36 ++++++++++++++++++++++++++++++------ core/src/model.jl | 3 ++- core/src/util.jl | 4 ++++ 5 files changed, 41 insertions(+), 10 deletions(-) diff --git a/core/src/Ribasim.jl b/core/src/Ribasim.jl index 65d39587f..ac53f04cd 100644 --- a/core/src/Ribasim.jl +++ b/core/src/Ribasim.jl @@ -36,7 +36,9 @@ using DiffEqCallbacks: SavedValues, SavingCallback, IntegratingCallback, - IntegrandValues + IntegratingSumCallback, + IntegrandValues, + IntegrandValuesSum using EnumX: EnumX, @enumx using ForwardDiff: pickchunksize using Graphs: diff --git a/core/src/bmi.jl b/core/src/bmi.jl index d598722f8..f206238e3 100644 --- a/core/src/bmi.jl +++ b/core/src/bmi.jl @@ -40,9 +40,9 @@ function BMI.get_value_ptr(model::Model, name::AbstractString)::AbstractVector{F elseif name == "basin.drainage" model.integrator.p.basin.vertical_flux_from_input.drainage elseif name == "basin.infiltration_integrated" - model.integrator.p.basin.vertical_flux_bmi.infiltration + model.saved.stored_for_bmi.integrand.infiltration elseif name == "basin.drainage_integrated" - model.integrator.p.basin.vertical_flux_bmi.drainage + model.saved.stored_for_bmi.integrand.drainage elseif name == "basin.subgrid_level" model.integrator.p.subgrid.level elseif name == "user_demand.demand" diff --git a/core/src/callback.jl b/core/src/callback.jl index 6d2f34be9..7bfac0e7f 100644 --- a/core/src/callback.jl +++ b/core/src/callback.jl @@ -9,18 +9,33 @@ function create_callbacks( config::Config, saveat, )::Tuple{CallbackSet, SavedResults} - (; starttime, basin, tabulated_rating_curve, discrete_control) = parameters + (; starttime, basin, user_demand, tabulated_rating_curve, discrete_control) = parameters callbacks = SciMLBase.DECallback[] # Check for negative storages negative_storage_cb = FunctionCallingCallback(check_negative_storage) push!(callbacks, negative_storage_cb) + # Integrate flows and vertical basin fluxes for mean outputs integrand_output_prototype = get_flow_integrand_output_prototype(parameters) saved_flow = IntegrandValues(Float64, typeof(integrand_output_prototype)) - integrating_flows_cb = + integrating_flow_cb = IntegratingCallback(flow_integrand, saved_flow, integrand_output_prototype) - push!(callbacks, integrating_flows_cb) + push!(callbacks, integrating_flow_cb) + + # Integrate vertical basin fluxes for BMI + stored_for_bmi = IntegrandValuesSum( + ComponentVector{Float64}(; + get_tmp(basin.vertical_flux, 0)..., + user_realized = zeros(length(user_demand.node_id)), + ), + ) + integrating_for_bmi_cb = IntegratingSumCallback( + bmi_integrand, + stored_for_bmi, + get_tmp(basin.vertical_flux, 0), + ) + push!(callbacks, integrating_for_bmi_cb) # TODO: What is this? tstops = get_tstops(basin.time.time, starttime) @@ -47,7 +62,7 @@ function create_callbacks( push!(callbacks, export_cb) end - saved = SavedResults(saved_flow, saved_subgrid_level) + saved = SavedResults(saved_flow, saved_subgrid_level, stored_for_bmi) n_conditions = sum(length(vec) for vec in discrete_control.greater_than; init = 0) if n_conditions > 0 @@ -64,10 +79,9 @@ function flow_integrand(out, u, t, integrator)::Nothing (; vertical_flux) = basin vertical_flux = get_tmp(vertical_flux, 0) flow = get_tmp(graph[].flow, 0) - out_vertical_flux = @view out[(:precipitation, :evaporation, :drainage, :infiltration)] + vertical_flux_view(out) .= vertical_flux out.flow .= flow - out_vertical_flux .= vertical_flux for (i, basin_id) in enumerate(basin.node_id) for inflow_edge in basin.inflow_edges[i] @@ -90,6 +104,16 @@ function flow_integrand(out, u, t, integrator)::Nothing return nothing end +function bmi_integrand(out, u, t, integrator)::Nothing + (; basin, user_demand) = integrator.p + vertical_flux_view(out) .= get_tmp(basin.vertical_flux, 0) + + for i in eachindex(user_demand.node_id) + out.user_realized[i] = get_flow(graph, user_demand.inflow_edge[i], 0) + end + return +end + function get_flow_integrand_output_prototype(p::Parameters) (; graph, basin) = p flow = get_tmp(graph[].flow, 0) diff --git a/core/src/model.jl b/core/src/model.jl index d51865011..a7e917f85 100644 --- a/core/src/model.jl +++ b/core/src/model.jl @@ -1,6 +1,7 @@ -struct SavedResults{V1 <: ComponentVector{Float64}} +struct SavedResults{V1 <: ComponentVector{Float64}, V2 <: ComponentVector{Float64}} flow::IntegrandValues{Float64, V1} subgrid_level::SavedValues{Float64, Vector{Float64}} + stored_for_bmi::IntegrandValuesSum{V2} end """ diff --git a/core/src/util.jl b/core/src/util.jl index 8b35af23e..c2db31039 100644 --- a/core/src/util.jl +++ b/core/src/util.jl @@ -753,3 +753,7 @@ function get_basin_idx(edge_metadata::EdgeMetadata, id::NodeID)::Int32 0 end end + +function vertical_flux_view(v::ComponentVector) + return @view v[(:precipitation, :evaporation, :drainage, :infiltration)] +end From 13bacbbd5e8e7c78b73d22ada13740f3447fa950 Mon Sep 17 00:00:00 2001 From: Bart de Koning Date: Wed, 15 May 2024 19:01:24 +0200 Subject: [PATCH 05/11] Add integration for allocation --- core/src/allocation_optim.jl | 8 +-- core/src/callback.jl | 94 ++++++++++++++++++------------------ core/src/parameter.jl | 3 +- core/src/read.jl | 13 +++-- core/src/util.jl | 26 ++++++++++ core/src/write.jl | 3 +- 6 files changed, 91 insertions(+), 56 deletions(-) diff --git a/core/src/allocation_optim.jl b/core/src/allocation_optim.jl index 5c306c6ac..cf26f7294 100644 --- a/core/src/allocation_optim.jl +++ b/core/src/allocation_optim.jl @@ -255,7 +255,7 @@ function set_initial_capacities_source!( )::Nothing (; problem) = allocation_model (; graph, allocation) = p - (; mean_flows) = allocation + (; integrated_flow, integrated_flow_mapping) = allocation (; subnetwork_id) = allocation_model source_constraints = problem[:source] main_network_source_edges = get_main_network_connections(p, subnetwork_id) @@ -266,7 +266,7 @@ function set_initial_capacities_source!( # If it is a source edge for this allocation problem if edge ∉ main_network_source_edges # Reset the source to the averaged flow over the last allocation period - source_capacity = mean_flows[edge][] + source_capacity = integrated_flow.integrand[integrated_flow_mapping[edge]] JuMP.set_normalized_rhs( source_constraints[edge], # It is assumed that the allocation procedure does not have to be differentiated. @@ -361,11 +361,11 @@ function get_basin_data( (; graph, basin, level_demand, allocation) = p (; vertical_flux) = basin (; Δt_allocation) = allocation_model - (; mean_flows) = allocation + (; integrated_flow, integrated_flow_mapping) = allocation @assert node_id.type == NodeType.Basin vertical_flux = get_tmp(vertical_flux, 0) _, basin_idx = id_index(basin.node_id, node_id) - influx = mean_flows[(node_id, node_id)][] + influx = integrated_flow.integrand[integrated_flow_mapping[node_id, node_id]] _, basin_idx = id_index(basin.node_id, node_id) storage_basin = u.storage[basin_idx] control_inneighbors = inneighbor_labels_type(graph, node_id, EdgeType.control) diff --git a/core/src/callback.jl b/core/src/callback.jl index 7bfac0e7f..3b25de786 100644 --- a/core/src/callback.jl +++ b/core/src/callback.jl @@ -9,7 +9,14 @@ function create_callbacks( config::Config, saveat, )::Tuple{CallbackSet, SavedResults} - (; starttime, basin, user_demand, tabulated_rating_curve, discrete_control) = parameters + (; + starttime, + basin, + user_demand, + tabulated_rating_curve, + discrete_control, + allocation, + ) = parameters callbacks = SciMLBase.DECallback[] # Check for negative storages @@ -20,23 +27,27 @@ function create_callbacks( integrand_output_prototype = get_flow_integrand_output_prototype(parameters) saved_flow = IntegrandValues(Float64, typeof(integrand_output_prototype)) integrating_flow_cb = - IntegratingCallback(flow_integrand, saved_flow, integrand_output_prototype) + IntegratingCallback(flow_integrand!, saved_flow, integrand_output_prototype) push!(callbacks, integrating_flow_cb) # Integrate vertical basin fluxes for BMI - stored_for_bmi = IntegrandValuesSum( - ComponentVector{Float64}(; - get_tmp(basin.vertical_flux, 0)..., - user_realized = zeros(length(user_demand.node_id)), - ), - ) - integrating_for_bmi_cb = IntegratingSumCallback( - bmi_integrand, - stored_for_bmi, - get_tmp(basin.vertical_flux, 0), + integrand_output_prototype = ComponentVector{Float64}(; + get_tmp(basin.vertical_flux, 0)..., + user_realized = zeros(length(user_demand.node_id)), ) + stored_for_bmi = IntegrandValuesSum(integrand_output_prototype) + integrating_for_bmi_cb = + IntegratingSumCallback(bmi_integrand!, stored_for_bmi, integrand_output_prototype) push!(callbacks, integrating_for_bmi_cb) + # Integrate flows for allocation input + integrating_for_allocation_cb = IntegratingSumCallback( + allocation_integrand!, + allocation.integrated_flow, + zeros(length(allocation.integrated_flow_mapping)), + ) + push!(callbacks, integrating_for_allocation_cb) + # TODO: What is this? tstops = get_tstops(basin.time.time, starttime) basin_cb = PresetTimeCallback(tstops, update_basin; save_positions = (false, false)) @@ -74,38 +85,20 @@ function create_callbacks( return callback, saved end -function flow_integrand(out, u, t, integrator)::Nothing +function flow_integrand!(out, u, t, integrator)::Nothing (; basin, graph) = integrator.p (; vertical_flux) = basin vertical_flux = get_tmp(vertical_flux, 0) flow = get_tmp(graph[].flow, 0) - vertical_flux_view(out) .= vertical_flux out.flow .= flow - - for (i, basin_id) in enumerate(basin.node_id) - for inflow_edge in basin.inflow_edges[i] - q = get_flow(graph, inflow_edge, 0) - if q > 0 - out.basin_inflow[i] += q - else - out.basin_outflow[i] -= q - end - end - for outflow_edge in basin.outflow_edges[i] - q = get_flow(graph, outflow_edge, 0) - if q > 0 - out.basin_outflow[i] += q - else - out.basin_inflow[i] -= q - end - end - end + vertical_flux_view(out) .= vertical_flux + set_inoutflows!(out, graph, basin) return nothing end -function bmi_integrand(out, u, t, integrator)::Nothing - (; basin, user_demand) = integrator.p +function bmi_integrand!(out, u, t, integrator)::Nothing + (; basin, user_demand, graph) = integrator.p vertical_flux_view(out) .= get_tmp(basin.vertical_flux, 0) for i in eachindex(user_demand.node_id) @@ -114,6 +107,21 @@ function bmi_integrand(out, u, t, integrator)::Nothing return end +function allocation_integrand!(out, u, t, integrator)::Nothing + (; allocation, graph, basin) = integrator.p + for (edge, i) in allocation.integrated_flow_mapping + if edge[1] == edge[2] + # Vertical fluxes + _, basin_idx = id_index(basin.node_id, edge[1]) + out[i] = get_influx(basin, basin_idx) + else + # Horizontal flows + out[i] = get_flow(graph, edge..., 0) + end + end + return nothing +end + function get_flow_integrand_output_prototype(p::Parameters) (; graph, basin) = p flow = get_tmp(graph[].flow, 0) @@ -385,7 +393,7 @@ function update_basin(integrator)::Nothing (; p, u) = integrator (; basin) = p (; storage) = u - (; node_id, time, vertical_flux_from_input, vertical_flux, vertical_flux_prev) = basin + (; node_id, time, vertical_flux_from_input, vertical_flux) = basin t = datetime_since(integrator.t, integrator.p.starttime) vertical_flux = get_tmp(vertical_flux, integrator.u) @@ -406,9 +414,6 @@ function update_basin(integrator)::Nothing end update_vertical_flux!(basin, storage) - - # Forget about vertical fluxes to handle discontinuous forcing from basin_update - copyto!(vertical_flux_prev, vertical_flux) return nothing end @@ -416,7 +421,7 @@ end function update_allocation!(integrator)::Nothing (; p, t, u) = integrator (; allocation) = p - (; allocation_models, mean_flows) = allocation + (; allocation_models, integrated_flow) = allocation # Don't run the allocation algorithm if allocation is not active # (Specifically for running Ribasim via the BMI) @@ -428,9 +433,7 @@ function update_allocation!(integrator)::Nothing # Divide by the allocation Δt to obtain the mean flows # from the integrated flows - for value in values(mean_flows) - value[] /= Δt_allocation - end + integrated_flow.integrand ./= Δt_allocation # If a main network is present, collect demands of subnetworks if has_main_network(allocation) @@ -448,9 +451,8 @@ function update_allocation!(integrator)::Nothing end # Reset the mean source flows - for value in values(mean_flows) - value[] = 0.0 - end + integrated_flow.integrand .= 0.0 + return nothing end "Load updates from 'TabulatedRatingCurve / time' into the parameters" diff --git a/core/src/parameter.jl b/core/src/parameter.jl index 413485603..b3849fdbf 100644 --- a/core/src/parameter.jl +++ b/core/src/parameter.jl @@ -75,7 +75,8 @@ struct Allocation priorities::Vector{Int32} subnetwork_demands::Dict{Tuple{NodeID, NodeID}, Vector{Float64}} subnetwork_allocateds::Dict{Tuple{NodeID, NodeID}, Vector{Float64}} - mean_flows::Dict{Tuple{NodeID, NodeID}, Base.RefValue{Float64}} + integrated_flow_mapping::Dict{Tuple{NodeID, NodeID}, Int32} + integrated_flow::IntegrandValuesSum{Vector{Float64}} record_demand::@NamedTuple{ time::Vector{Float64}, subnetwork_id::Vector{Int32}, diff --git a/core/src/read.jl b/core/src/read.jl index 203d177dd..74c9ec8b7 100644 --- a/core/src/read.jl +++ b/core/src/read.jl @@ -1018,22 +1018,26 @@ function Allocation(db::DB, config::Config, graph::MetaGraph)::Allocation optimization_type = String[], ) - mean_flows = Dict{Tuple{NodeID, NodeID}, Base.RefValue{Float64}}() + integrated_flow_mapping = Dict{Tuple{NodeID, NodeID}, Int32}() # Find edges which serve as sources in allocation + flow_counter = 0 for edge_metadata in values(graph.edge_data) (; subnetwork_id_source, edge) = edge_metadata if subnetwork_id_source != 0 - mean_flows[edge] = Ref(0.0) + flow_counter += 1 + integrated_flow_mapping[edge] = flow_counter end end # Find basins with a level demand for node_id in values(graph.vertex_labels) if has_external_demand(graph, node_id, :level_demand)[1] - mean_flows[(node_id, node_id)] = Ref(0.0) + flow_counter += 1 + integrated_flow_mapping[(node_id, node_id)] = flow_counter end end + integrated_flow = IntegrandValuesSum(zeros(flow_counter)) return Allocation( Int32[], @@ -1042,7 +1046,8 @@ function Allocation(db::DB, config::Config, graph::MetaGraph)::Allocation get_all_priorities(db, config), Dict{Tuple{NodeID, NodeID}, Vector{Float64}}(), Dict{Tuple{NodeID, NodeID}, Vector{Float64}}(), - mean_flows, + integrated_flow_mapping, + integrated_flow, record_demand, record_flow, ) diff --git a/core/src/util.jl b/core/src/util.jl index c2db31039..54b0ef561 100644 --- a/core/src/util.jl +++ b/core/src/util.jl @@ -757,3 +757,29 @@ end function vertical_flux_view(v::ComponentVector) return @view v[(:precipitation, :evaporation, :drainage, :infiltration)] end + +function set_inoutflows!( + v::ComponentVector{Float64}, + graph::MetaGraph, + basin::Basin, +)::Nothing + for (i, basin_id) in enumerate(basin.node_id) + for inflow_edge in basin.inflow_edges[i] + q = get_flow(graph, inflow_edge, 0) + if q > 0 + v.basin_inflow[i] += q + else + v.basin_outflow[i] -= q + end + end + for outflow_edge in basin.outflow_edges[i] + q = get_flow(graph, outflow_edge, 0) + if q > 0 + v.basin_outflow[i] += q + else + v.basin_inflow[i] -= q + end + end + end + return nothing +end diff --git a/core/src/write.jl b/core/src/write.jl index 6ecaf41f6..8ac45ee1a 100644 --- a/core/src/write.jl +++ b/core/src/write.jl @@ -83,11 +83,12 @@ function average_over_saveats( ) (; integrand, ts) = integrand_values averages = Vector{Float64}[] + n_saveats = length(saveats) saveat_index = 2 integral_saveat = zero(first(integrand)[symb]) for (integral_step, t) in zip(integrand, ts) integral_saveat += integral_step[symb] - if t == saveats[saveat_index] + if saveat_index <= n_saveats && t == saveats[saveat_index] Δt_saveat = saveats[saveat_index] - saveats[saveat_index - 1] push!(averages, copy(integral_saveat) / Δt_saveat) integral_saveat .= 0.0 From 773cdcf712de7490b5915fc67579d03737e9cc85 Mon Sep 17 00:00:00 2001 From: Bart de Koning Date: Wed, 15 May 2024 19:45:15 +0200 Subject: [PATCH 06/11] Fix allocation tests --- core/src/bmi.jl | 2 +- core/src/parameter.jl | 4 ---- core/src/read.jl | 2 -- core/test/allocation_test.jl | 38 ++++++++++++++++++++++++++---------- 4 files changed, 29 insertions(+), 17 deletions(-) diff --git a/core/src/bmi.jl b/core/src/bmi.jl index f206238e3..4c5e7b270 100644 --- a/core/src/bmi.jl +++ b/core/src/bmi.jl @@ -48,7 +48,7 @@ function BMI.get_value_ptr(model::Model, name::AbstractString)::AbstractVector{F elseif name == "user_demand.demand" vec(model.integrator.p.user_demand.demand) elseif name == "user_demand.realized" - model.integrator.p.user_demand.realized_bmi + model.saved.stored_for_bmi.integrand.user_realized else error("Unknown variable $name") end diff --git a/core/src/parameter.jl b/core/src/parameter.jl index b3849fdbf..c9e412c01 100644 --- a/core/src/parameter.jl +++ b/core/src/parameter.jl @@ -525,7 +525,6 @@ inflow_edge: incoming flow edge outflow_edge: outgoing flow edge metadata The ID of the source node is always the ID of the UserDemand node active: whether this node is active and thus demands water -realized_bmi: Cumulative inflow volume, for read or reset by BMI only demand: water flux demand of UserDemand per priority over time Each UserDemand has a demand for all priorities, which is 0.0 if it is not provided explicitly. @@ -542,7 +541,6 @@ struct UserDemand <: AbstractParameterNode inflow_edge::Vector{EdgeMetadata} outflow_edge::Vector{EdgeMetadata} active::BitVector - realized_bmi::Vector{Float64} demand::Matrix{Float64} demand_reduced::Matrix{Float64} demand_itp::Vector{Vector{ScalarInterpolation}} @@ -556,7 +554,6 @@ struct UserDemand <: AbstractParameterNode inflow_id, outflow_id, active, - realized_bmi, demand, demand_reduced, demand_itp, @@ -572,7 +569,6 @@ struct UserDemand <: AbstractParameterNode inflow_id, outflow_id, active, - realized_bmi, demand, demand_reduced, demand_itp, diff --git a/core/src/read.jl b/core/src/read.jl index 74c9ec8b7..d6f0584a8 100644 --- a/core/src/read.jl +++ b/core/src/read.jl @@ -848,7 +848,6 @@ function UserDemand(db::DB, config::Config, graph::MetaGraph)::UserDemand n_user = length(node_ids) n_priority = length(priorities) active = BitVector(ones(Bool, n_user)) - realized_bmi = zeros(n_user) demand = zeros(n_user, n_priority) demand_reduced = zeros(n_user, n_priority) trivial_timespan = [0.0, prevfloat(Inf)] @@ -895,7 +894,6 @@ function UserDemand(db::DB, config::Config, graph::MetaGraph)::UserDemand inflow_edge.(Ref(graph), node_ids), outflow_edge.(Ref(graph), node_ids), active, - realized_bmi, demand, demand_reduced, demand_itp, diff --git a/core/test/allocation_test.jl b/core/test/allocation_test.jl index 1171743b7..27fcadc97 100644 --- a/core/test/allocation_test.jl +++ b/core/test/allocation_test.jl @@ -14,7 +14,10 @@ (; graph, allocation) = p close(db) - allocation.mean_flows[(NodeID(:FlowBoundary, 1), NodeID(:Basin, 2))][] = 4.5 + allocation.integrated_flow.integrand[allocation.integrated_flow_mapping[( + NodeID(:FlowBoundary, 1), + NodeID(:Basin, 2), + )]] = 4.5 allocation_model = p.allocation.allocation_models[1] u = ComponentVector(; storage = zeros(length(p.basin.node_id))) Ribasim.allocate!(p, allocation_model, 0.0, u, OptimizationType.allocate) @@ -215,7 +218,8 @@ end subnetwork_demands, subnetwork_allocateds, record_flow, - mean_flows, + integrated_flow, + integrated_flow_mapping, ) = allocation t = 0.0 @@ -247,7 +251,10 @@ end # Running full allocation algorithm (; Δt_allocation) = allocation_models[1] - mean_flows[(NodeID(:FlowBoundary, 1), NodeID(:Basin, 2))][] = 4.5 * Δt_allocation + integrated_flow.integrand[integrated_flow_mapping[( + NodeID(:FlowBoundary, 1), + NodeID(:Basin, 2), + )]] = 4.5 * Δt_allocation u = ComponentVector(; storage = zeros(length(p.basin.node_id))) Ribasim.update_allocation!((; p, t, u)) @@ -289,13 +296,24 @@ end close(db) (; allocation, user_demand, graph, basin) = p - (; allocation_models, subnetwork_demands, subnetwork_allocateds, mean_flows) = - allocation + (; + allocation_models, + subnetwork_demands, + subnetwork_allocateds, + integrated_flow, + integrated_flow_mapping, + ) = allocation t = 0.0 - # Set flows of sources in - mean_flows[(NodeID(:FlowBoundary, 58), NodeID(:Basin, 16))][] = 1.0 - mean_flows[(NodeID(:FlowBoundary, 59), NodeID(:Basin, 44))][] = 1e-3 + # Set flows of sources + integrated_flow.integrand[integrated_flow_mapping[( + NodeID(:FlowBoundary, 58), + NodeID(:Basin, 16), + )]] = 1.0 + integrated_flow.integrand[integrated_flow_mapping[( + NodeID(:FlowBoundary, 59), + NodeID(:Basin, 44), + )]] = 1e-3 # Collecting demands u = ComponentVector(; storage = zeros(length(basin.node_id))) @@ -439,8 +457,8 @@ end t = 0.0 (; u) = model.integrator optimization_type = OptimizationType.internal_sources - for (edge, value) in allocation.mean_flows - value[] = Ribasim.get_flow(graph, edge..., 0) + for (edge, i) in allocation.integrated_flow_mapping + allocation.integrated_flow.integrand[i] = Ribasim.get_flow(graph, edge..., 0) end Ribasim.set_initial_values!(allocation_model, p, u, t) From 9a3fd8f41c06f5ee5f01b6b601943dde804b99fe Mon Sep 17 00:00:00 2001 From: Bart de Koning Date: Thu, 16 May 2024 11:20:52 +0200 Subject: [PATCH 07/11] Introduce TrapezoidIntegrationCallback --- core/src/allocation_optim.jl | 4 +- core/src/bmi.jl | 4 +- core/src/callback.jl | 115 ++++++++++++++++++++++++++--------- core/src/graph.jl | 9 ++- core/src/model.jl | 2 +- core/src/parameter.jl | 5 +- core/src/write.jl | 22 ------- core/test/allocation_test.jl | 10 +-- 8 files changed, 108 insertions(+), 63 deletions(-) diff --git a/core/src/allocation_optim.jl b/core/src/allocation_optim.jl index cf26f7294..e046f5121 100644 --- a/core/src/allocation_optim.jl +++ b/core/src/allocation_optim.jl @@ -266,7 +266,7 @@ function set_initial_capacities_source!( # If it is a source edge for this allocation problem if edge ∉ main_network_source_edges # Reset the source to the averaged flow over the last allocation period - source_capacity = integrated_flow.integrand[integrated_flow_mapping[edge]] + source_capacity = integrated_flow[integrated_flow_mapping[edge]] JuMP.set_normalized_rhs( source_constraints[edge], # It is assumed that the allocation procedure does not have to be differentiated. @@ -365,7 +365,7 @@ function get_basin_data( @assert node_id.type == NodeType.Basin vertical_flux = get_tmp(vertical_flux, 0) _, basin_idx = id_index(basin.node_id, node_id) - influx = integrated_flow.integrand[integrated_flow_mapping[node_id, node_id]] + influx = integrated_flow[integrated_flow_mapping[node_id, node_id]] _, basin_idx = id_index(basin.node_id, node_id) storage_basin = u.storage[basin_idx] control_inneighbors = inneighbor_labels_type(graph, node_id, EdgeType.control) diff --git a/core/src/bmi.jl b/core/src/bmi.jl index 4c5e7b270..06a5e7cd4 100644 --- a/core/src/bmi.jl +++ b/core/src/bmi.jl @@ -40,9 +40,9 @@ function BMI.get_value_ptr(model::Model, name::AbstractString)::AbstractVector{F elseif name == "basin.drainage" model.integrator.p.basin.vertical_flux_from_input.drainage elseif name == "basin.infiltration_integrated" - model.saved.stored_for_bmi.integrand.infiltration + model.saved.integrated_bmi_data.infiltration elseif name == "basin.drainage_integrated" - model.saved.stored_for_bmi.integrand.drainage + model.saved.integrated_bmi_data.drainage elseif name == "basin.subgrid_level" model.integrator.p.subgrid.level elseif name == "user_demand.demand" diff --git a/core/src/callback.jl b/core/src/callback.jl index 3b25de786..aba885d5c 100644 --- a/core/src/callback.jl +++ b/core/src/callback.jl @@ -1,3 +1,39 @@ +struct TrapezoidIntegrationAffect{IntegrandFunc, T} + integrand_func!::IntegrandFunc + integrand_value::T + integrand_value_prev::T + cache::T + integral::T +end + +function TrapezoidIntegrationCallback(integrand_func!, integral_value)::DiscreteCallback + integrand_value = zero(integral_value) + integrand_value_prev = zero(integral_value) + cache = zero(integral_value) + affect! = TrapezoidIntegrationAffect( + integrand_func!, + integrand_value, + integrand_value_prev, + cache, + integral_value, + ) + return DiscreteCallback((u, t, integrator) -> t != 0, affect!) +end + +function (affect!::TrapezoidIntegrationAffect)(integrator)::Nothing + (; integrand_func, integrand_value, integrand_value_prev, cache, integral) = affect! + (; dt) = integrand + + copyto!(integrand_value_prev, integrand_value) + integrand_func(integrand_value, integrator.p) + + cache += integrand_value_prev + cache += integrand_value + cache *= 0.5 * dt + integral += cache + return nothing +end + """ Create the different callbacks that are used to store results and feed the simulation with new data. The different callbacks @@ -11,6 +47,7 @@ function create_callbacks( )::Tuple{CallbackSet, SavedResults} (; starttime, + graph, basin, user_demand, tabulated_rating_curve, @@ -24,28 +61,22 @@ function create_callbacks( push!(callbacks, negative_storage_cb) # Integrate flows and vertical basin fluxes for mean outputs - integrand_output_prototype = get_flow_integrand_output_prototype(parameters) - saved_flow = IntegrandValues(Float64, typeof(integrand_output_prototype)) integrating_flow_cb = - IntegratingCallback(flow_integrand!, saved_flow, integrand_output_prototype) + TrapezoidIntegrationCallback(flow_integrand!, graph[].integrated_flow) push!(callbacks, integrating_flow_cb) # Integrate vertical basin fluxes for BMI - integrand_output_prototype = ComponentVector{Float64}(; + integrated_bmi_data = ComponentVector{Float64}(; get_tmp(basin.vertical_flux, 0)..., user_realized = zeros(length(user_demand.node_id)), ) - stored_for_bmi = IntegrandValuesSum(integrand_output_prototype) integrating_for_bmi_cb = - IntegratingSumCallback(bmi_integrand!, stored_for_bmi, integrand_output_prototype) + TrapezoidIntegrationCallback(bmi_integrand!, integrated_bmi_data) push!(callbacks, integrating_for_bmi_cb) # Integrate flows for allocation input - integrating_for_allocation_cb = IntegratingSumCallback( - allocation_integrand!, - allocation.integrated_flow, - zeros(length(allocation.integrated_flow_mapping)), - ) + integrating_for_allocation_cb = + TrapezoidIntegrationCallback(allocation_integrand!, allocation.integrated_flow) push!(callbacks, integrating_for_allocation_cb) # TODO: What is this? @@ -73,7 +104,7 @@ function create_callbacks( push!(callbacks, export_cb) end - saved = SavedResults(saved_flow, saved_subgrid_level, stored_for_bmi) + saved = SavedResults(saved_flow, saved_subgrid_level, integrated_bmi_data) n_conditions = sum(length(vec) for vec in discrete_control.greater_than; init = 0) if n_conditions > 0 @@ -85,8 +116,8 @@ function create_callbacks( return callback, saved end -function flow_integrand!(out, u, t, integrator)::Nothing - (; basin, graph) = integrator.p +function flow_integrand!(out, p)::Nothing + (; basin, graph) = p (; vertical_flux) = basin vertical_flux = get_tmp(vertical_flux, 0) flow = get_tmp(graph[].flow, 0) @@ -97,8 +128,8 @@ function flow_integrand!(out, u, t, integrator)::Nothing return nothing end -function bmi_integrand!(out, u, t, integrator)::Nothing - (; basin, user_demand, graph) = integrator.p +function bmi_integrand!(out, p)::Nothing + (; basin, user_demand, graph) = p vertical_flux_view(out) .= get_tmp(basin.vertical_flux, 0) for i in eachindex(user_demand.node_id) @@ -107,8 +138,8 @@ function bmi_integrand!(out, u, t, integrator)::Nothing return end -function allocation_integrand!(out, u, t, integrator)::Nothing - (; allocation, graph, basin) = integrator.p +function allocation_integrand!(out, p)::Nothing + (; allocation, graph, basin) = p for (edge, i) in allocation.integrated_flow_mapping if edge[1] == edge[2] # Vertical fluxes @@ -122,14 +153,42 @@ function allocation_integrand!(out, u, t, integrator)::Nothing return nothing end -function get_flow_integrand_output_prototype(p::Parameters) - (; graph, basin) = p - flow = get_tmp(graph[].flow, 0) - vertical_flux = get_tmp(basin.vertical_flux, 0) - n_basin = length(basin.node_id) - basin_inflow = zeros(n_basin) - basin_outflow = zeros(n_basin) - return ComponentVector{Float64}(; basin_inflow, basin_outflow, flow, vertical_flux...) +"Compute the average flows over the last saveat interval and write +them to SavedValues" +function save_flow(u, t, integrator) + (; graph) = integrator.p + (; flow_integrated, flow_dict) = graph[] + (; node_id) = integrator.p.basin + + Δt = get_Δt(integrator) + flow_mean = copy(flow_integrated) + flow_mean ./= Δt + fill!(flow_integrated, 0.0) + + # Divide the flows over edges to Basin inflow and outflow, regardless of edge direction. + inflow_mean = zeros(length(node_id)) + outflow_mean = zeros(length(node_id)) + + for (i, basin_id) in enumerate(node_id) + for inflow_id in inflow_ids(graph, basin_id) + q = flow_mean[flow_dict[inflow_id, basin_id]] + if q > 0 + inflow_mean[i] += q + else + outflow_mean[i] -= q + end + end + for outflow_id in outflow_ids(graph, basin_id) + q = flow_mean[flow_dict[basin_id, outflow_id]] + if q > 0 + outflow_mean[i] += q + else + inflow_mean[i] -= q + end + end + end + + return SavedFlow(; flow = flow_mean, inflow = inflow_mean, outflow = outflow_mean) end function check_negative_storage(u, t, integrator)::Nothing @@ -433,7 +492,7 @@ function update_allocation!(integrator)::Nothing # Divide by the allocation Δt to obtain the mean flows # from the integrated flows - integrated_flow.integrand ./= Δt_allocation + integrated_flow ./= Δt_allocation # If a main network is present, collect demands of subnetworks if has_main_network(allocation) @@ -451,7 +510,7 @@ function update_allocation!(integrator)::Nothing end # Reset the mean source flows - integrated_flow.integrand .= 0.0 + integrated_flow .= 0.0 return nothing end diff --git a/core/src/graph.jl b/core/src/graph.jl index cd3009ca6..ea39611a1 100644 --- a/core/src/graph.jl +++ b/core/src/graph.jl @@ -103,7 +103,14 @@ function create_graph(db::DB, config::Config, chunk_sizes::Vector{Int})::MetaGra if config.solver.autodiff flow = DiffCache(flow, chunk_sizes) end - graph_data = (; node_ids, edges_source, flow_dict, flow, config.solver.saveat) + + flow = zero(flow_counter) + n_basins = length(get_ids(db, "Basin")) + vertical_flux = zero(n_basins) + integrated_flow = ComponentVector{Float64}(; flow, vertical_flux...) + + graph_data = + (; node_ids, edges_source, flow_dict, flow, integrated_flow, config.solver.saveat) graph = @set graph.graph_data = graph_data return graph diff --git a/core/src/model.jl b/core/src/model.jl index a7e917f85..94f5aefd3 100644 --- a/core/src/model.jl +++ b/core/src/model.jl @@ -1,7 +1,7 @@ struct SavedResults{V1 <: ComponentVector{Float64}, V2 <: ComponentVector{Float64}} flow::IntegrandValues{Float64, V1} subgrid_level::SavedValues{Float64, Vector{Float64}} - stored_for_bmi::IntegrandValuesSum{V2} + integrated_bmi_data::V2 end """ diff --git a/core/src/parameter.jl b/core/src/parameter.jl index c9e412c01..9503ce76c 100644 --- a/core/src/parameter.jl +++ b/core/src/parameter.jl @@ -76,7 +76,7 @@ struct Allocation subnetwork_demands::Dict{Tuple{NodeID, NodeID}, Vector{Float64}} subnetwork_allocateds::Dict{Tuple{NodeID, NodeID}, Vector{Float64}} integrated_flow_mapping::Dict{Tuple{NodeID, NodeID}, Int32} - integrated_flow::IntegrandValuesSum{Vector{Float64}} + integrated_flow::Vector{Float64} record_demand::@NamedTuple{ time::Vector{Float64}, subnetwork_id::Vector{Int32}, @@ -611,7 +611,7 @@ struct Subgrid end # TODO Automatically add all nodetypes here -struct Parameters{T, C1, C2, V1, V2} +struct Parameters{T, C1, C2, V1, V2, V3} starttime::DateTime graph::MetaGraph{ Int64, @@ -624,6 +624,7 @@ struct Parameters{T, C1, C2, V1, V2} edges_source::Dict{Int32, Set{EdgeMetadata}}, flow_dict::Dict{Tuple{NodeID, NodeID}, Int32}, flow::T, + integrated_flow::V3, saveat::Float64, }, MetaGraphsNext.var"#11#13", diff --git a/core/src/write.jl b/core/src/write.jl index 8ac45ee1a..b1af36ee1 100644 --- a/core/src/write.jl +++ b/core/src/write.jl @@ -76,28 +76,6 @@ function get_storages_and_levels( return (; time = tsteps, node_id, storage, level) end -function average_over_saveats( - integrand_values::IntegrandValues, - symb::Symbol, - saveats::Vector{Float64}, -) - (; integrand, ts) = integrand_values - averages = Vector{Float64}[] - n_saveats = length(saveats) - saveat_index = 2 - integral_saveat = zero(first(integrand)[symb]) - for (integral_step, t) in zip(integrand, ts) - integral_saveat += integral_step[symb] - if saveat_index <= n_saveats && t == saveats[saveat_index] - Δt_saveat = saveats[saveat_index] - saveats[saveat_index - 1] - push!(averages, copy(integral_saveat) / Δt_saveat) - integral_saveat .= 0.0 - saveat_index += 1 - end - end - return FlatVector(averages) -end - "Create the basin result table from the saved data" function basin_table( model::Model, diff --git a/core/test/allocation_test.jl b/core/test/allocation_test.jl index 27fcadc97..cf1949ecd 100644 --- a/core/test/allocation_test.jl +++ b/core/test/allocation_test.jl @@ -14,7 +14,7 @@ (; graph, allocation) = p close(db) - allocation.integrated_flow.integrand[allocation.integrated_flow_mapping[( + allocation.integrated_flow[allocation.integrated_flow_mapping[( NodeID(:FlowBoundary, 1), NodeID(:Basin, 2), )]] = 4.5 @@ -251,7 +251,7 @@ end # Running full allocation algorithm (; Δt_allocation) = allocation_models[1] - integrated_flow.integrand[integrated_flow_mapping[( + integrated_flow[integrated_flow_mapping[( NodeID(:FlowBoundary, 1), NodeID(:Basin, 2), )]] = 4.5 * Δt_allocation @@ -306,11 +306,11 @@ end t = 0.0 # Set flows of sources - integrated_flow.integrand[integrated_flow_mapping[( + integrated_flow[integrated_flow_mapping[( NodeID(:FlowBoundary, 58), NodeID(:Basin, 16), )]] = 1.0 - integrated_flow.integrand[integrated_flow_mapping[( + integrated_flow[integrated_flow_mapping[( NodeID(:FlowBoundary, 59), NodeID(:Basin, 44), )]] = 1e-3 @@ -458,7 +458,7 @@ end (; u) = model.integrator optimization_type = OptimizationType.internal_sources for (edge, i) in allocation.integrated_flow_mapping - allocation.integrated_flow.integrand[i] = Ribasim.get_flow(graph, edge..., 0) + allocation.integrated_flow[i] = Ribasim.get_flow(graph, edge..., 0) end Ribasim.set_initial_values!(allocation_model, p, u, t) From a2a22aa4b93aed36b7743eb325cee3623518e898 Mon Sep 17 00:00:00 2001 From: Bart de Koning Date: Thu, 16 May 2024 15:19:55 +0200 Subject: [PATCH 08/11] Some fixes --- core/src/Ribasim.jl | 2 +- core/src/callback.jl | 49 ++++++++++++------------------------------- core/src/graph.jl | 14 ++++++++++--- core/src/model.jl | 6 +++--- core/src/parameter.jl | 13 ++++++++++++ core/src/read.jl | 2 +- core/src/util.jl | 27 ++++++++++++++++-------- core/src/write.jl | 38 +++++++++++++++++++++++++-------- 8 files changed, 89 insertions(+), 62 deletions(-) diff --git a/core/src/Ribasim.jl b/core/src/Ribasim.jl index ac53f04cd..c6ec1b35c 100644 --- a/core/src/Ribasim.jl +++ b/core/src/Ribasim.jl @@ -67,7 +67,7 @@ using SciMLBase: ODEFunction, ODEProblem, ODESolution, - VectorContinuousCallback, + DiscreteCallback, get_proposed_dt using SparseArrays: SparseMatrixCSC, spzeros using SQLite: SQLite, DB, Query, esc_id diff --git a/core/src/callback.jl b/core/src/callback.jl index aba885d5c..16f8ab422 100644 --- a/core/src/callback.jl +++ b/core/src/callback.jl @@ -21,11 +21,11 @@ function TrapezoidIntegrationCallback(integrand_func!, integral_value)::Discrete end function (affect!::TrapezoidIntegrationAffect)(integrator)::Nothing - (; integrand_func, integrand_value, integrand_value_prev, cache, integral) = affect! - (; dt) = integrand + (; integrand_func!, integrand_value, integrand_value_prev, cache, integral) = affect! + (; dt) = integrator copyto!(integrand_value_prev, integrand_value) - integrand_func(integrand_value, integrator.p) + integrand_func!(integrand_value, integrator.p) cache += integrand_value_prev cache += integrand_value @@ -65,6 +65,11 @@ function create_callbacks( TrapezoidIntegrationCallback(flow_integrand!, graph[].integrated_flow) push!(callbacks, integrating_flow_cb) + # Save mean flows + saved_flow = SavedValues(Float64, SavedFlow) + save_flow_cb = SavingCallback(save_flow, saved_flow; saveat, save_start = false) + push!(callbacks, save_flow_cb) + # Integrate vertical basin fluxes for BMI integrated_bmi_data = ComponentVector{Float64}(; get_tmp(basin.vertical_flux, 0)..., @@ -124,7 +129,6 @@ function flow_integrand!(out, p)::Nothing out.flow .= flow vertical_flux_view(out) .= vertical_flux - set_inoutflows!(out, graph, basin) return nothing end @@ -153,41 +157,14 @@ function allocation_integrand!(out, p)::Nothing return nothing end -"Compute the average flows over the last saveat interval and write -them to SavedValues" function save_flow(u, t, integrator) - (; graph) = integrator.p - (; flow_integrated, flow_dict) = graph[] - (; node_id) = integrator.p.basin - + (; basin, graph) = integrator.p + (; integrated_flow) = graph[] Δt = get_Δt(integrator) - flow_mean = copy(flow_integrated) + flow_mean = copy(integrated_flow) flow_mean ./= Δt - fill!(flow_integrated, 0.0) - - # Divide the flows over edges to Basin inflow and outflow, regardless of edge direction. - inflow_mean = zeros(length(node_id)) - outflow_mean = zeros(length(node_id)) - - for (i, basin_id) in enumerate(node_id) - for inflow_id in inflow_ids(graph, basin_id) - q = flow_mean[flow_dict[inflow_id, basin_id]] - if q > 0 - inflow_mean[i] += q - else - outflow_mean[i] -= q - end - end - for outflow_id in outflow_ids(graph, basin_id) - q = flow_mean[flow_dict[basin_id, outflow_id]] - if q > 0 - outflow_mean[i] += q - else - inflow_mean[i] -= q - end - end - end - + fill!(integrated_flow, 0.0) + inflow_mean, outflow_mean = compute_mean_inoutflows(flow_mean.flow, graph, basin) return SavedFlow(; flow = flow_mean, inflow = inflow_mean, outflow = outflow_mean) end diff --git a/core/src/graph.jl b/core/src/graph.jl index ea39611a1..e73533a4b 100644 --- a/core/src/graph.jl +++ b/core/src/graph.jl @@ -104,11 +104,19 @@ function create_graph(db::DB, config::Config, chunk_sizes::Vector{Int})::MetaGra flow = DiffCache(flow, chunk_sizes) end - flow = zero(flow_counter) + flow = zeros(flow_counter) n_basins = length(get_ids(db, "Basin")) - vertical_flux = zero(n_basins) - integrated_flow = ComponentVector{Float64}(; flow, vertical_flux...) + integrated_flow = ComponentVector{Float64}(; + flow, + precipitation = zeros(n_basins), + evaporation = zeros(n_basins), + drainage = zeros(n_basins), + infiltration = zeros(n_basins), + ) + if config.solver.autodiff + flow = DiffCache(flow, chunk_sizes) + end graph_data = (; node_ids, edges_source, flow_dict, flow, integrated_flow, config.solver.saveat) graph = @set graph.graph_data = graph_data diff --git a/core/src/model.jl b/core/src/model.jl index 94f5aefd3..5aef1ab81 100644 --- a/core/src/model.jl +++ b/core/src/model.jl @@ -1,7 +1,7 @@ -struct SavedResults{V1 <: ComponentVector{Float64}, V2 <: ComponentVector{Float64}} - flow::IntegrandValues{Float64, V1} +struct SavedResults{V1 <: ComponentVector{Float64}} + flow::SavedValues{Float64, SavedFlow} subgrid_level::SavedValues{Float64, Vector{Float64}} - integrated_bmi_data::V2 + integrated_bmi_data::V1 end """ diff --git a/core/src/parameter.jl b/core/src/parameter.jl index 9503ce76c..4fa98fd1a 100644 --- a/core/src/parameter.jl +++ b/core/src/parameter.jl @@ -136,6 +136,19 @@ end abstract type AbstractParameterNode end +""" +In-memory storage of saved mean flows for writing to results. + +- `flow`: The mean flows on all edges +- `inflow`: The sum of the mean flows coming into each basin +- `outflow`: The sum of the mean flows going out of each basin +""" +@kwdef struct SavedFlow{V1 <: ComponentVector{Float64}} + flow::V1 + inflow::Vector{Float64} + outflow::Vector{Float64} +end + """ Requirements: diff --git a/core/src/read.jl b/core/src/read.jl index d6f0584a8..1020d2d56 100644 --- a/core/src/read.jl +++ b/core/src/read.jl @@ -1035,7 +1035,7 @@ function Allocation(db::DB, config::Config, graph::MetaGraph)::Allocation integrated_flow_mapping[(node_id, node_id)] = flow_counter end end - integrated_flow = IntegrandValuesSum(zeros(flow_counter)) + integrated_flow = zeros(flow_counter) return Allocation( Int32[], diff --git a/core/src/util.jl b/core/src/util.jl index 54b0ef561..f48d3c7ae 100644 --- a/core/src/util.jl +++ b/core/src/util.jl @@ -521,6 +521,9 @@ function Base.getindex(fv::FlatVector, i::Int) return v[r + 1] end +"Construct a FlatVector from one of the fields of SavedFlow." +FlatVector(saveval::Vector{SavedFlow}, sym::Symbol) = FlatVector(getfield.(saveval, sym)) + """ Function that goes smoothly from 0 to 1 in the interval [0,threshold], and is constant outside this interval. @@ -758,28 +761,34 @@ function vertical_flux_view(v::ComponentVector) return @view v[(:precipitation, :evaporation, :drainage, :infiltration)] end -function set_inoutflows!( - v::ComponentVector{Float64}, +function compute_mean_inoutflows( + flow_mean::AbstractVector, graph::MetaGraph, basin::Basin, -)::Nothing - for (i, basin_id) in enumerate(basin.node_id) +)::Tuple{Vector{Float64}, Vector{Float64}} + (; node_id) = basin + + # Divide the flows over edges to Basin inflow and outflow, regardless of edge direction. + inflow_mean = zeros(length(node_id)) + outflow_mean = zeros(length(node_id)) + + for (i, basin_id) in enumerate(node_id) for inflow_edge in basin.inflow_edges[i] q = get_flow(graph, inflow_edge, 0) if q > 0 - v.basin_inflow[i] += q + inflow_mean[i] += q else - v.basin_outflow[i] -= q + outflow_mean[i] -= q end end for outflow_edge in basin.outflow_edges[i] q = get_flow(graph, outflow_edge, 0) if q > 0 - v.basin_outflow[i] += q + inflow_mean[i] += q else - v.basin_inflow[i] -= q + outflow_mean[i] -= q end end end - return nothing + return inflow_mean, outflow_mean end diff --git a/core/src/write.jl b/core/src/write.jl index b1af36ee1..2002c7d6e 100644 --- a/core/src/write.jl +++ b/core/src/write.jl @@ -94,7 +94,7 @@ function basin_table( balance_error::Vector{Float64}, relative_error::Vector{Float64}, } - (; saved, integrator) = model + (; saved) = model # The last timestep is not included; there is no period over which to compute flows. data = get_storages_and_levels(model) storage = vec(data.storage[:, begin:(end - 1)]) @@ -104,17 +104,37 @@ function basin_table( nbasin = length(data.node_id) ntsteps = length(data.time) - 1 nrows = nbasin * ntsteps - saveats = integrator.sol.t - - inflow_rate = average_over_saveats(saved.flow, :basin_inflow, saveats) - outflow_rate = average_over_saveats(saved.flow, :basin_inflow, saveats) - precipitation = average_over_saveats(saved.flow, :precipitation, saveats) - evaporation = average_over_saveats(saved.flow, :evaporation, saveats) - drainage = average_over_saveats(saved.flow, :drainage, saveats) - infiltration = average_over_saveats(saved.flow, :infiltration, saveats) + + inflow_rate = FlatVector(saved.flow.saveval, :inflow) + outflow_rate = FlatVector(saved.flow.saveval, :outflow) + precipitation = zeros(nrows) + evaporation = zeros(nrows) + drainage = zeros(nrows) + infiltration = zeros(nrows) balance_error = zeros(nrows) relative_error = zeros(nrows) + @show length(model.integrator.sol.t) + + idx_row = 0 + for cvec in saved.flow.saveval + for (precipitation_, evaporation_, drainage_, infiltration_) in zip( + cvec.flow.precipitation, + cvec.flow.evaporation, + cvec.flow.drainage, + cvec.flow.infiltration, + ) + idx_row += 1 + precipitation[idx_row] = precipitation_ + evaporation[idx_row] = evaporation_ + drainage[idx_row] = drainage_ + infiltration[idx_row] = infiltration_ + end + end + + @show ntsteps + @show length(saved.flow.saveval) + time = repeat(data.time[begin:(end - 1)]; inner = nbasin) Δtime_seconds = seconds.(diff(data.time)) Δtime = repeat(Δtime_seconds; inner = nbasin) From b327a006641c07be72e34f5331b8e6a64c983750 Mon Sep 17 00:00:00 2001 From: Bart de Koning Date: Thu, 16 May 2024 17:15:12 +0200 Subject: [PATCH 09/11] Bugfixes --- core/src/bmi.jl | 2 +- core/src/callback.jl | 17 +++++++++++------ core/src/util.jl | 11 +++++------ core/src/write.jl | 11 +++++------ core/test/bmi_test.jl | 2 ++ core/test/utils_test.jl | 9 +++------ 6 files changed, 27 insertions(+), 25 deletions(-) diff --git a/core/src/bmi.jl b/core/src/bmi.jl index 06a5e7cd4..f05722297 100644 --- a/core/src/bmi.jl +++ b/core/src/bmi.jl @@ -48,7 +48,7 @@ function BMI.get_value_ptr(model::Model, name::AbstractString)::AbstractVector{F elseif name == "user_demand.demand" vec(model.integrator.p.user_demand.demand) elseif name == "user_demand.realized" - model.saved.stored_for_bmi.integrand.user_realized + model.saved.integrated_bmi_data.user_realized else error("Unknown variable $name") end diff --git a/core/src/callback.jl b/core/src/callback.jl index 16f8ab422..6068dc030 100644 --- a/core/src/callback.jl +++ b/core/src/callback.jl @@ -17,7 +17,11 @@ function TrapezoidIntegrationCallback(integrand_func!, integral_value)::Discrete cache, integral_value, ) - return DiscreteCallback((u, t, integrator) -> t != 0, affect!) + return DiscreteCallback( + (u, t, integrator) -> t != 0, + affect!; + save_positions = (false, false), + ) end function (affect!::TrapezoidIntegrationAffect)(integrator)::Nothing @@ -27,10 +31,11 @@ function (affect!::TrapezoidIntegrationAffect)(integrator)::Nothing copyto!(integrand_value_prev, integrand_value) integrand_func!(integrand_value, integrator.p) - cache += integrand_value_prev - cache += integrand_value - cache *= 0.5 * dt - integral += cache + cache .= integrand_value_prev + cache .+= integrand_value + cache .*= 0.5 * dt + + integral .+= cache return nothing end @@ -164,7 +169,7 @@ function save_flow(u, t, integrator) flow_mean = copy(integrated_flow) flow_mean ./= Δt fill!(integrated_flow, 0.0) - inflow_mean, outflow_mean = compute_mean_inoutflows(flow_mean.flow, graph, basin) + inflow_mean, outflow_mean = compute_mean_inoutflows(flow_mean.flow, basin) return SavedFlow(; flow = flow_mean, inflow = inflow_mean, outflow = outflow_mean) end diff --git a/core/src/util.jl b/core/src/util.jl index f48d3c7ae..44cd77fb9 100644 --- a/core/src/util.jl +++ b/core/src/util.jl @@ -763,26 +763,25 @@ end function compute_mean_inoutflows( flow_mean::AbstractVector, - graph::MetaGraph, basin::Basin, )::Tuple{Vector{Float64}, Vector{Float64}} - (; node_id) = basin + (; node_id, inflow_edges, outflow_edges) = basin # Divide the flows over edges to Basin inflow and outflow, regardless of edge direction. inflow_mean = zeros(length(node_id)) outflow_mean = zeros(length(node_id)) for (i, basin_id) in enumerate(node_id) - for inflow_edge in basin.inflow_edges[i] - q = get_flow(graph, inflow_edge, 0) + for inflow_edge in inflow_edges[i] + q = flow_mean[inflow_edge.flow_idx] if q > 0 inflow_mean[i] += q else outflow_mean[i] -= q end end - for outflow_edge in basin.outflow_edges[i] - q = get_flow(graph, outflow_edge, 0) + for outflow_edge in outflow_edges[i] + q = flow_mean[outflow_edge.flow_idx] if q > 0 inflow_mean[i] += q else diff --git a/core/src/write.jl b/core/src/write.jl index 2002c7d6e..c821c23ce 100644 --- a/core/src/write.jl +++ b/core/src/write.jl @@ -114,8 +114,6 @@ function basin_table( balance_error = zeros(nrows) relative_error = zeros(nrows) - @show length(model.integrator.sol.t) - idx_row = 0 for cvec in saved.flow.saveval for (precipitation_, evaporation_, drainage_, infiltration_) in zip( @@ -132,9 +130,6 @@ function basin_table( end end - @show ntsteps - @show length(saved.flow.saveval) - time = repeat(data.time[begin:(end - 1)]; inner = nbasin) Δtime_seconds = seconds.(diff(data.time)) Δtime = repeat(Δtime_seconds; inner = nbasin) @@ -146,6 +141,9 @@ function basin_table( storage_increase = max(storage_flow, 0.0) storage_decrease = max(-storage_flow, 0.0) + inflow_rate[i] + precipitation[i] + total_in = inflow_rate[i] + precipitation[i] + drainage[i] - storage_increase total_out = outflow_rate[i] + evaporation[i] + infiltration[i] - storage_decrease balance_error[i] = total_in - total_out @@ -224,7 +222,8 @@ function flow_table( from_node_id = repeat(from_node_id; outer = ntsteps) to_node_type = repeat(to_node_type; outer = ntsteps) to_node_id = repeat(to_node_id; outer = ntsteps) - flow_rate = average_over_saveats(saved.flow, :flow, saveats) + flow_rate = + FlatVector([collect(saved_flows.flow.flow) for saved_flows in saved.flow.saveval]) return (; time, diff --git a/core/test/bmi_test.jl b/core/test/bmi_test.jl index 3e4847c27..055562580 100644 --- a/core/test/bmi_test.jl +++ b/core/test/bmi_test.jl @@ -86,6 +86,8 @@ end model = BMI.initialize(Ribasim.Model, toml_path) demand = BMI.get_value_ptr(model, "user_demand.demand") realized = BMI.get_value_ptr(model, "user_demand.realized") + @test realized === + model.integrator.opts.callback.discrete_callbacks[4].affect!.integral.user_realized day = 86400.0 BMI.update_until(model, 0.4day) demand = 0.001 # for both users at the start diff --git a/core/test/utils_test.jl b/core/test/utils_test.jl index 50bfe9076..5eeb8b81f 100644 --- a/core/test/utils_test.jl +++ b/core/test/utils_test.jl @@ -64,7 +64,7 @@ end using Dictionaries: Indices using StructArrays: StructVector using Logging - using Ribasim: NodeID + using Ribasim: EdgeMetadata, NodeID level = [ 0.0, @@ -94,11 +94,8 @@ end demand = zeros(1) basin = Ribasim.Basin( Indices(NodeID.(:Basin, [1])), - [NodeID[]], - [NodeID[]], - zeros(1), - zeros(1), - zeros(1), + [EdgeMetadata[]], + [EdgeMetadata[]], zeros(1), zeros(1), zeros(1), From f1a03477847580310a5725391e03b13236f4f9ca Mon Sep 17 00:00:00 2001 From: Bart de Koning Date: Tue, 21 May 2024 11:38:55 +0200 Subject: [PATCH 10/11] Update integrands with new basin forcing data --- core/src/callback.jl | 33 ++++++++++++++++++--------------- core/src/util.jl | 11 +++++++++++ 2 files changed, 29 insertions(+), 15 deletions(-) diff --git a/core/src/callback.jl b/core/src/callback.jl index 6068dc030..e56698563 100644 --- a/core/src/callback.jl +++ b/core/src/callback.jl @@ -1,22 +1,15 @@ struct TrapezoidIntegrationAffect{IntegrandFunc, T} integrand_func!::IntegrandFunc integrand_value::T - integrand_value_prev::T cache::T integral::T end function TrapezoidIntegrationCallback(integrand_func!, integral_value)::DiscreteCallback integrand_value = zero(integral_value) - integrand_value_prev = zero(integral_value) cache = zero(integral_value) - affect! = TrapezoidIntegrationAffect( - integrand_func!, - integrand_value, - integrand_value_prev, - cache, - integral_value, - ) + affect! = + TrapezoidIntegrationAffect(integrand_func!, integrand_value, cache, integral_value) return DiscreteCallback( (u, t, integrator) -> t != 0, affect!; @@ -25,13 +18,17 @@ function TrapezoidIntegrationCallback(integrand_func!, integral_value)::Discrete end function (affect!::TrapezoidIntegrationAffect)(integrator)::Nothing - (; integrand_func!, integrand_value, integrand_value_prev, cache, integral) = affect! - (; dt) = integrator + (; integrand_func!, integrand_value, cache, integral) = affect! + (; dt, p) = integrator - copyto!(integrand_value_prev, integrand_value) - integrand_func!(integrand_value, integrator.p) + # What is a good way to check that this is the first time this + # function is called? E.g. that only one timestep has been done? + if iszero(integral) + integrand_func!(integrand_value, p) + end - cache .= integrand_value_prev + cache .= integrand_value + integrand_func!(integrand_value, p) cache .+= integrand_value cache .*= 0.5 * dt @@ -72,7 +69,12 @@ function create_callbacks( # Save mean flows saved_flow = SavedValues(Float64, SavedFlow) - save_flow_cb = SavingCallback(save_flow, saved_flow; saveat, save_start = false) + save_flow_cb = SavingCallback( + save_flow, + saved_flow; + saveat = (saveat isa Vector) ? filter(x -> x != 0, saveat) : saveat, + save_start = false, + ) push!(callbacks, save_flow_cb) # Integrate vertical basin fluxes for BMI @@ -455,6 +457,7 @@ function update_basin(integrator)::Nothing end update_vertical_flux!(basin, storage) + update_vertical_flux_integrands!(integrator, vertical_flux) return nothing end diff --git a/core/src/util.jl b/core/src/util.jl index 44cd77fb9..c56681f0f 100644 --- a/core/src/util.jl +++ b/core/src/util.jl @@ -791,3 +791,14 @@ function compute_mean_inoutflows( end return inflow_mean, outflow_mean end + +function update_vertical_flux_integrands!(integrator, vertical_flux)::Nothing + for callback in integrator.opts.callback.discrete_callbacks + (; affect!) = callback + if affect! isa TrapezoidIntegrationAffect && + hasproperty(affect!.integrand_value, :precipitation) + vertical_flux_view(affect!.integrand_value) .= vertical_flux + end + end + return nothing +end From a511e7e7a8ae3abce38a94adf0a16d75351c29dc Mon Sep 17 00:00:00 2001 From: Bart de Koning Date: Tue, 21 May 2024 15:18:40 +0200 Subject: [PATCH 11/11] fix mean inoutflow computation --- core/src/util.jl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/util.jl b/core/src/util.jl index 5167d60dd..d4ddc189b 100644 --- a/core/src/util.jl +++ b/core/src/util.jl @@ -778,9 +778,9 @@ function compute_mean_inoutflows( for outflow_edge in outflow_edges[i] q = flow_mean[outflow_edge.flow_idx] if q > 0 - inflow_mean[i] += q + outflow_mean[i] += q else - outflow_mean[i] -= q + inflow_mean[i] -= q end end end