Skip to content

Commit

Permalink
Introduce TrapezoidIntegrationCallback
Browse files Browse the repository at this point in the history
  • Loading branch information
SouthEndMusic committed May 16, 2024
1 parent 773cdcf commit 9a3fd8f
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 63 deletions.
4 changes: 2 additions & 2 deletions core/src/allocation_optim.jl
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ function set_initial_capacities_source!(
# If it is a source edge for this allocation problem
if edge main_network_source_edges
# Reset the source to the averaged flow over the last allocation period
source_capacity = integrated_flow.integrand[integrated_flow_mapping[edge]]
source_capacity = integrated_flow[integrated_flow_mapping[edge]]
JuMP.set_normalized_rhs(
source_constraints[edge],
# It is assumed that the allocation procedure does not have to be differentiated.
Expand Down Expand Up @@ -365,7 +365,7 @@ function get_basin_data(
@assert node_id.type == NodeType.Basin
vertical_flux = get_tmp(vertical_flux, 0)
_, basin_idx = id_index(basin.node_id, node_id)
influx = integrated_flow.integrand[integrated_flow_mapping[node_id, node_id]]
influx = integrated_flow[integrated_flow_mapping[node_id, node_id]]
_, basin_idx = id_index(basin.node_id, node_id)
storage_basin = u.storage[basin_idx]
control_inneighbors = inneighbor_labels_type(graph, node_id, EdgeType.control)
Expand Down
4 changes: 2 additions & 2 deletions core/src/bmi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ function BMI.get_value_ptr(model::Model, name::AbstractString)::AbstractVector{F
elseif name == "basin.drainage"
model.integrator.p.basin.vertical_flux_from_input.drainage
elseif name == "basin.infiltration_integrated"
model.saved.stored_for_bmi.integrand.infiltration
model.saved.integrated_bmi_data.infiltration
elseif name == "basin.drainage_integrated"
model.saved.stored_for_bmi.integrand.drainage
model.saved.integrated_bmi_data.drainage
elseif name == "basin.subgrid_level"
model.integrator.p.subgrid.level
elseif name == "user_demand.demand"
Expand Down
115 changes: 87 additions & 28 deletions core/src/callback.jl
Original file line number Diff line number Diff line change
@@ -1,3 +1,39 @@
struct TrapezoidIntegrationAffect{IntegrandFunc, T}
integrand_func!::IntegrandFunc
integrand_value::T
integrand_value_prev::T
cache::T
integral::T
end

function TrapezoidIntegrationCallback(integrand_func!, integral_value)::DiscreteCallback
integrand_value = zero(integral_value)
integrand_value_prev = zero(integral_value)
cache = zero(integral_value)
affect! = TrapezoidIntegrationAffect(
integrand_func!,
integrand_value,
integrand_value_prev,
cache,
integral_value,
)
return DiscreteCallback((u, t, integrator) -> t != 0, affect!)
end

function (affect!::TrapezoidIntegrationAffect)(integrator)::Nothing
(; integrand_func, integrand_value, integrand_value_prev, cache, integral) = affect!
(; dt) = integrand

copyto!(integrand_value_prev, integrand_value)
integrand_func(integrand_value, integrator.p)

cache += integrand_value_prev
cache += integrand_value
cache *= 0.5 * dt
integral += cache
return nothing
end

"""
Create the different callbacks that are used to store results
and feed the simulation with new data. The different callbacks
Expand All @@ -11,6 +47,7 @@ function create_callbacks(
)::Tuple{CallbackSet, SavedResults}
(;
starttime,
graph,
basin,
user_demand,
tabulated_rating_curve,
Expand All @@ -24,28 +61,22 @@ function create_callbacks(
push!(callbacks, negative_storage_cb)

# Integrate flows and vertical basin fluxes for mean outputs
integrand_output_prototype = get_flow_integrand_output_prototype(parameters)
saved_flow = IntegrandValues(Float64, typeof(integrand_output_prototype))
integrating_flow_cb =
IntegratingCallback(flow_integrand!, saved_flow, integrand_output_prototype)
TrapezoidIntegrationCallback(flow_integrand!, graph[].integrated_flow)
push!(callbacks, integrating_flow_cb)

# Integrate vertical basin fluxes for BMI
integrand_output_prototype = ComponentVector{Float64}(;
integrated_bmi_data = ComponentVector{Float64}(;
get_tmp(basin.vertical_flux, 0)...,
user_realized = zeros(length(user_demand.node_id)),
)
stored_for_bmi = IntegrandValuesSum(integrand_output_prototype)
integrating_for_bmi_cb =
IntegratingSumCallback(bmi_integrand!, stored_for_bmi, integrand_output_prototype)
TrapezoidIntegrationCallback(bmi_integrand!, integrated_bmi_data)
push!(callbacks, integrating_for_bmi_cb)

# Integrate flows for allocation input
integrating_for_allocation_cb = IntegratingSumCallback(
allocation_integrand!,
allocation.integrated_flow,
zeros(length(allocation.integrated_flow_mapping)),
)
integrating_for_allocation_cb =
TrapezoidIntegrationCallback(allocation_integrand!, allocation.integrated_flow)
push!(callbacks, integrating_for_allocation_cb)

# TODO: What is this?
Expand Down Expand Up @@ -73,7 +104,7 @@ function create_callbacks(
push!(callbacks, export_cb)
end

saved = SavedResults(saved_flow, saved_subgrid_level, stored_for_bmi)
saved = SavedResults(saved_flow, saved_subgrid_level, integrated_bmi_data)

n_conditions = sum(length(vec) for vec in discrete_control.greater_than; init = 0)
if n_conditions > 0
Expand All @@ -85,8 +116,8 @@ function create_callbacks(
return callback, saved
end

function flow_integrand!(out, u, t, integrator)::Nothing
(; basin, graph) = integrator.p
function flow_integrand!(out, p)::Nothing
(; basin, graph) = p
(; vertical_flux) = basin
vertical_flux = get_tmp(vertical_flux, 0)
flow = get_tmp(graph[].flow, 0)
Expand All @@ -97,8 +128,8 @@ function flow_integrand!(out, u, t, integrator)::Nothing
return nothing
end

function bmi_integrand!(out, u, t, integrator)::Nothing
(; basin, user_demand, graph) = integrator.p
function bmi_integrand!(out, p)::Nothing
(; basin, user_demand, graph) = p
vertical_flux_view(out) .= get_tmp(basin.vertical_flux, 0)

for i in eachindex(user_demand.node_id)
Expand All @@ -107,8 +138,8 @@ function bmi_integrand!(out, u, t, integrator)::Nothing
return
end

function allocation_integrand!(out, u, t, integrator)::Nothing
(; allocation, graph, basin) = integrator.p
function allocation_integrand!(out, p)::Nothing
(; allocation, graph, basin) = p
for (edge, i) in allocation.integrated_flow_mapping
if edge[1] == edge[2]
# Vertical fluxes
Expand All @@ -122,14 +153,42 @@ function allocation_integrand!(out, u, t, integrator)::Nothing
return nothing
end

function get_flow_integrand_output_prototype(p::Parameters)
(; graph, basin) = p
flow = get_tmp(graph[].flow, 0)
vertical_flux = get_tmp(basin.vertical_flux, 0)
n_basin = length(basin.node_id)
basin_inflow = zeros(n_basin)
basin_outflow = zeros(n_basin)
return ComponentVector{Float64}(; basin_inflow, basin_outflow, flow, vertical_flux...)
"Compute the average flows over the last saveat interval and write
them to SavedValues"
function save_flow(u, t, integrator)
(; graph) = integrator.p
(; flow_integrated, flow_dict) = graph[]
(; node_id) = integrator.p.basin

Δt = get_Δt(integrator)
flow_mean = copy(flow_integrated)
flow_mean ./= Δt
fill!(flow_integrated, 0.0)

# Divide the flows over edges to Basin inflow and outflow, regardless of edge direction.
inflow_mean = zeros(length(node_id))
outflow_mean = zeros(length(node_id))

for (i, basin_id) in enumerate(node_id)
for inflow_id in inflow_ids(graph, basin_id)
q = flow_mean[flow_dict[inflow_id, basin_id]]
if q > 0
inflow_mean[i] += q
else
outflow_mean[i] -= q
end
end
for outflow_id in outflow_ids(graph, basin_id)
q = flow_mean[flow_dict[basin_id, outflow_id]]
if q > 0
outflow_mean[i] += q
else
inflow_mean[i] -= q
end
end
end

return SavedFlow(; flow = flow_mean, inflow = inflow_mean, outflow = outflow_mean)
end

function check_negative_storage(u, t, integrator)::Nothing
Expand Down Expand Up @@ -433,7 +492,7 @@ function update_allocation!(integrator)::Nothing

# Divide by the allocation Δt to obtain the mean flows
# from the integrated flows
integrated_flow.integrand ./= Δt_allocation
integrated_flow ./= Δt_allocation

# If a main network is present, collect demands of subnetworks
if has_main_network(allocation)
Expand All @@ -451,7 +510,7 @@ function update_allocation!(integrator)::Nothing
end

# Reset the mean source flows
integrated_flow.integrand .= 0.0
integrated_flow .= 0.0
return nothing
end

Expand Down
9 changes: 8 additions & 1 deletion core/src/graph.jl
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,14 @@ function create_graph(db::DB, config::Config, chunk_sizes::Vector{Int})::MetaGra
if config.solver.autodiff
flow = DiffCache(flow, chunk_sizes)
end
graph_data = (; node_ids, edges_source, flow_dict, flow, config.solver.saveat)

flow = zero(flow_counter)
n_basins = length(get_ids(db, "Basin"))
vertical_flux = zero(n_basins)
integrated_flow = ComponentVector{Float64}(; flow, vertical_flux...)

graph_data =
(; node_ids, edges_source, flow_dict, flow, integrated_flow, config.solver.saveat)
graph = @set graph.graph_data = graph_data

return graph
Expand Down
2 changes: 1 addition & 1 deletion core/src/model.jl
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
struct SavedResults{V1 <: ComponentVector{Float64}, V2 <: ComponentVector{Float64}}
flow::IntegrandValues{Float64, V1}
subgrid_level::SavedValues{Float64, Vector{Float64}}
stored_for_bmi::IntegrandValuesSum{V2}
integrated_bmi_data::V2
end

"""
Expand Down
5 changes: 3 additions & 2 deletions core/src/parameter.jl
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ struct Allocation
subnetwork_demands::Dict{Tuple{NodeID, NodeID}, Vector{Float64}}
subnetwork_allocateds::Dict{Tuple{NodeID, NodeID}, Vector{Float64}}
integrated_flow_mapping::Dict{Tuple{NodeID, NodeID}, Int32}
integrated_flow::IntegrandValuesSum{Vector{Float64}}
integrated_flow::Vector{Float64}
record_demand::@NamedTuple{
time::Vector{Float64},
subnetwork_id::Vector{Int32},
Expand Down Expand Up @@ -611,7 +611,7 @@ struct Subgrid
end

# TODO Automatically add all nodetypes here
struct Parameters{T, C1, C2, V1, V2}
struct Parameters{T, C1, C2, V1, V2, V3}
starttime::DateTime
graph::MetaGraph{
Int64,
Expand All @@ -624,6 +624,7 @@ struct Parameters{T, C1, C2, V1, V2}
edges_source::Dict{Int32, Set{EdgeMetadata}},
flow_dict::Dict{Tuple{NodeID, NodeID}, Int32},
flow::T,
integrated_flow::V3,
saveat::Float64,
},
MetaGraphsNext.var"#11#13",
Expand Down
22 changes: 0 additions & 22 deletions core/src/write.jl
Original file line number Diff line number Diff line change
Expand Up @@ -76,28 +76,6 @@ function get_storages_and_levels(
return (; time = tsteps, node_id, storage, level)
end

function average_over_saveats(
integrand_values::IntegrandValues,
symb::Symbol,
saveats::Vector{Float64},
)
(; integrand, ts) = integrand_values
averages = Vector{Float64}[]
n_saveats = length(saveats)
saveat_index = 2
integral_saveat = zero(first(integrand)[symb])
for (integral_step, t) in zip(integrand, ts)
integral_saveat += integral_step[symb]
if saveat_index <= n_saveats && t == saveats[saveat_index]
Δt_saveat = saveats[saveat_index] - saveats[saveat_index - 1]
push!(averages, copy(integral_saveat) / Δt_saveat)
integral_saveat .= 0.0
saveat_index += 1
end
end
return FlatVector(averages)
end

"Create the basin result table from the saved data"
function basin_table(
model::Model,
Expand Down
10 changes: 5 additions & 5 deletions core/test/allocation_test.jl
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
(; graph, allocation) = p
close(db)

allocation.integrated_flow.integrand[allocation.integrated_flow_mapping[(
allocation.integrated_flow[allocation.integrated_flow_mapping[(
NodeID(:FlowBoundary, 1),
NodeID(:Basin, 2),
)]] = 4.5
Expand Down Expand Up @@ -251,7 +251,7 @@ end

# Running full allocation algorithm
(; Δt_allocation) = allocation_models[1]
integrated_flow.integrand[integrated_flow_mapping[(
integrated_flow[integrated_flow_mapping[(
NodeID(:FlowBoundary, 1),
NodeID(:Basin, 2),
)]] = 4.5 * Δt_allocation
Expand Down Expand Up @@ -306,11 +306,11 @@ end
t = 0.0

# Set flows of sources
integrated_flow.integrand[integrated_flow_mapping[(
integrated_flow[integrated_flow_mapping[(
NodeID(:FlowBoundary, 58),
NodeID(:Basin, 16),
)]] = 1.0
integrated_flow.integrand[integrated_flow_mapping[(
integrated_flow[integrated_flow_mapping[(
NodeID(:FlowBoundary, 59),
NodeID(:Basin, 44),
)]] = 1e-3
Expand Down Expand Up @@ -458,7 +458,7 @@ end
(; u) = model.integrator
optimization_type = OptimizationType.internal_sources
for (edge, i) in allocation.integrated_flow_mapping
allocation.integrated_flow.integrand[i] = Ribasim.get_flow(graph, edge..., 0)
allocation.integrated_flow[i] = Ribasim.get_flow(graph, edge..., 0)
end
Ribasim.set_initial_values!(allocation_model, p, u, t)

Expand Down

0 comments on commit 9a3fd8f

Please sign in to comment.