diff --git a/core/src/Ribasim.jl b/core/src/Ribasim.jl index c95296756..b3656f4c1 100644 --- a/core/src/Ribasim.jl +++ b/core/src/Ribasim.jl @@ -146,6 +146,9 @@ using Tables: Tables, AbstractRow, columntable # Wrapper around a vector of structs to easily retrieve the same field from all elements. using StructArrays: StructVector +# OrderedSet is used to store the order of the substances in the network. +using DataStructures: OrderedSet + export libribasim include("schema.jl") diff --git a/core/src/callback.jl b/core/src/callback.jl index 712cb868a..3ec332faf 100644 --- a/core/src/callback.jl +++ b/core/src/callback.jl @@ -10,7 +10,14 @@ function create_callbacks( u0::ComponentVector, saveat, )::Tuple{CallbackSet, SavedResults} - (; starttime, basin, tabulated_rating_curve) = parameters + (; + starttime, + basin, + flow_boundary, + level_boundary, + user_demand, + tabulated_rating_curve, + ) = parameters callbacks = SciMLBase.DECallback[] # Check for negative storage @@ -32,6 +39,18 @@ function create_callbacks( basin_cb = PresetTimeCallback(tstops, update_basin!; save_positions = (false, false)) push!(callbacks, basin_cb) + # Update boundary concentrations + for (boundary, func) in ( + (basin, update_basin_conc!), + (flow_boundary, update_flowb_conc!), + (level_boundary, update_levelb_conc!), + (user_demand, update_userd_conc!), + ) + tstops = get_tstops(boundary.concentration_time.time, starttime) + conc_cb = PresetTimeCallback(tstops, func; save_positions = (false, false)) + push!(callbacks, conc_cb) + end + # Update TabulatedRatingCurve Q(h) relationships tstops = get_tstops(tabulated_rating_curve.time.time, starttime) tabulated_rating_curve_cb = PresetTimeCallback( @@ -88,18 +107,36 @@ Update with the latest timestep: - Cumulative flows/forcings which are input for the allocation algorithm - Cumulative flows/forcings which are realized demands in the allocation context +During these cumulative flow updates, we can also update the mass balance of the system, +as each flow carries mass, based on the concentrations of the flow source. +Specifically, we first use all the inflows to update the mass of the Basins, recalculate +the Basin concentration(s) and then remove the mass that is being lost to the outflows. """ function update_cumulative_flows!(u, t, integrator)::Nothing - (; p, tprev, dt) = integrator - (; basin, flow_boundary, allocation) = p + (; p, uprev, tprev, dt) = integrator + (; + basin, + state_inflow_edge, + state_outflow_edge, + user_demand, + level_boundary, + flow_boundary, + allocation, + ) = p (; vertical_flux) = basin # Update tprev p.tprev[] = t + # Reset cumulative flows, used to calculate the concentration + # of the basins after processing inflows only + basin.cumulative_in .= 0.0 + # Update cumulative forcings which are integrated exactly @. basin.cumulative_drainage += vertical_flux.drainage * dt @. basin.cumulative_drainage_saveat += vertical_flux.drainage * dt + basin.mass .+= basin.concentration[1, :, :] .* vertical_flux.drainage * dt + basin.cumulative_in .= vertical_flux.drainage * dt # Precipitation depends on fixed area for node_id in basin.node_id @@ -108,15 +145,26 @@ function update_cumulative_flows!(u, t, integrator)::Nothing basin.cumulative_precipitation[node_id.idx] += added_precipitation basin.cumulative_precipitation_saveat[node_id.idx] += added_precipitation + basin.mass[node_id.idx, :] .+= + basin.concentration[2, node_id.idx, :] .* added_precipitation + basin.cumulative_in[node_id.idx] += added_precipitation end # Exact boundary flow over time step - for (id, flow_rate, active) in - zip(flow_boundary.node_id, flow_boundary.flow_rate, flow_boundary.active) + for (id, flow_rate, active, edge) in zip( + flow_boundary.node_id, + flow_boundary.flow_rate, + flow_boundary.active, + flow_boundary.outflow_edges, + ) if active + outflow_id = edge[1].edge[2] volume = integral(flow_rate, tprev, t) flow_boundary.cumulative_flow[id.idx] += volume flow_boundary.cumulative_flow_saveat[id.idx] += volume + basin.mass[outflow_id.idx, :] .+= + flow_boundary.concentration[id.idx, :] .* volume + basin.cumulative_in[outflow_id.idx] += volume end end @@ -141,13 +189,139 @@ function update_cumulative_flows!(u, t, integrator)::Nothing end end end + + # Process mass updates for UserDemand separately + # as the inflow and outflow are decoupled in the states + for (inflow_edge, outflow_edge) in + zip(user_demand.inflow_edge, user_demand.outflow_edge) + from_node = inflow_edge.edge[1] + to_node = outflow_edge.edge[2] + userdemand_idx = outflow_edge.edge[1].idx + if from_node.type == NodeType.Basin + flow = flow_update_on_edge(integrator, inflow_edge.edge) + if flow < 0 + basin.mass[from_node.idx, :] .-= + basin.concentration_state[to_node.idx, :] .* flow + basin.mass[from_node.idx, :] .-= + user_demand.concentration[userdemand_idx, :] .* flow + end + end + if to_node.type == NodeType.Basin + flow = flow_update_on_edge(integrator, outflow_edge.edge) + if flow > 0 + basin.mass[to_node.idx, :] .+= + basin.concentration_state[from_node.idx, :] .* flow + basin.mass[to_node.idx, :] .+= + user_demand.concentration[userdemand_idx, :] .* flow + end + end + end + + # Process all mass inflows to basins + for (inflow_edge, outflow_edge) in zip(state_inflow_edge, state_outflow_edge) + from_node = inflow_edge.edge[1] + to_node = outflow_edge.edge[2] + if from_node.type == NodeType.Basin + flow = flow_update_on_edge(integrator, inflow_edge.edge) + if flow < 0 + basin.cumulative_in[from_node.idx] -= flow + if to_node.type == NodeType.Basin + basin.mass[from_node.idx, :] .-= + basin.concentration_state[to_node.idx, :] .* flow + elseif to_node.type == NodeType.LevelBoundary + basin.mass[from_node.idx, :] .-= + level_boundary.concentration[to_node.idx, :] .* flow + elseif to_node.type == NodeType.UserDemand + basin.mass[from_node.idx, :] .-= + user_demand.concentration[to_node.idx, :] .* flow + else + @warn "Unsupported outflow from $(to_node.type) #$(to_node.value) to $(from_node.type) #$(from_node.value) with flow $flow" + end + end + end + + if to_node.type == NodeType.Basin + flow = flow_update_on_edge(integrator, outflow_edge.edge) + if flow > 0 + basin.cumulative_in[to_node.idx] += flow + if from_node.type == NodeType.Basin + basin.mass[to_node.idx, :] .+= + basin.concentration_state[from_node.idx, :] .* flow + elseif from_node.type == NodeType.LevelBoundary + basin.mass[to_node.idx, :] .+= + level_boundary.concentration[from_node.idx, :] .* flow + elseif from_node.type == NodeType.UserDemand + basin.mass[to_node.idx, :] .+= + user_demand.concentration[from_node.idx, :] .* flow + elseif from_node.type == NodeType.Terminal && from_node.value == 0 + # UserDemand outflow is discoupled from its inflow, + # and the unset flow edge defaults to Terminal #0 + nothing + else + @warn "Unsupported outflow from $(from_node.type) #$(from_node.value) to $(to_node.type) #$(to_node.value) with flow $flow" + end + end + end + end + + # Update the Basin concentrations based on the added mass and flows + basin.concentration_state .= basin.mass ./ (basin.storage_prev .+ basin.cumulative_in) + + # Process all mass outflows from Basins + for (inflow_edge, outflow_edge) in zip(state_inflow_edge, state_outflow_edge) + from_node = inflow_edge.edge[1] + to_node = outflow_edge.edge[2] + if from_node.type == NodeType.Basin + flow = flow_update_on_edge(integrator, inflow_edge.edge) + if flow > 0 + basin.mass[from_node.idx, :] .-= + basin.concentration_state[from_node.idx, :] .* flow + end + end + if to_node.type == NodeType.Basin + flow = flow_update_on_edge(integrator, outflow_edge.edge) + if flow < 0 + basin.mass[to_node.idx, :] .+= + basin.concentration_state[to_node.idx, :] .* flow + end + end + end + + # Evaporate mass to keep the mass balance, if enabled in model config + if basin.evaporate_mass + basin.mass .-= basin.concentration_state .* (u.evaporation - uprev.evaporation) + end + basin.mass .-= basin.concentration_state .* (u.infiltration - uprev.infiltration) + + # Take care of infinitely small masses, possibly becoming negative due to truncation. + for I in eachindex(basin.mass) + if (-eps(Float64)) < basin.mass[I] < (eps(Float64)) + basin.mass[I] = 0.0 + end + end + + # Check for negative masses + if any(<(0), basin.mass) + R = CartesianIndices(basin.mass) + locations = findall(<(0), basin.mass) + for I in locations + basin_idx, substance_idx = Tuple(R[I]) + @error "$(basin.node_id[basin_idx]) has negative mass $(basin.mass[I]) for substance $(basin.substances[substance_idx])" + end + error("Negative mass(es) detected") + end + + # Update the Basin concentrations again based on the removed mass + basin.concentration_state .= basin.mass ./ basin.current_storage[parent(u)] + basin.storage_prev .= basin.current_storage[parent(u)] + return nothing end """ Given an edge (from_id, to_id), compute the cumulative flow over that -edge over the latest timestep. If from_id and to_id are both the same basin, -the function returns the sum of the basin forcings. +edge over the latest timestep. If from_id and to_id are both the same Basin, +the function returns the sum of the Basin forcings. """ function flow_update_on_edge( integrator::DEIntegrator, @@ -192,7 +366,7 @@ end """ Save all cumulative forcings and flows over edges over the latest timestep, Both computed by the solver and integrated exactly. Also computes the total horizontal -inflow and outflow per basin. +inflow and outflow per Basin. """ function save_flow(u, t, integrator) (; p) = integrator @@ -247,6 +421,7 @@ function save_flow(u, t, integrator) @. basin.cumulative_precipitation_saveat = 0.0 @. basin.cumulative_drainage_saveat = 0.0 + concentration = copy(basin.concentration_state) saved_flow = SavedFlow(; flow = flow_mean, inflow = inflow_mean, @@ -254,6 +429,7 @@ function save_flow(u, t, integrator) flow_boundary = flow_boundary_mean, precipitation, drainage, + concentration, t, ) check_water_balance_error!(saved_flow, integrator, Δt) @@ -500,6 +676,10 @@ function get_value(subvariable::NamedTuple, p::Parameters, du::AbstractVector, t elseif startswith(variable, "concentration_external.") value = basin.concentration_external[listen_node_id.idx][variable](t) + elseif startswith(variable, "concentration.") + substance = Symbol(last(split(variable, "."))) + var_idx = findfirst(==(substance), basin.substances) + value = basin.concentration_state[listen_node_id.idx, var_idx] else error("Unsupported condition variable $variable.") end @@ -590,6 +770,51 @@ function update_basin!(integrator)::Nothing return nothing end +"Load updates from 'Basin / concentration' into the parameters" +function update_basin_conc!(integrator)::Nothing + (; p) = integrator + (; basin) = p + (; node_id, concentration, concentration_time, substances) = basin + t = datetime_since(integrator.t, integrator.p.starttime) + + rows = searchsorted(concentration_time.time, t) + timeblock = view(concentration_time, rows) + + for row in timeblock + i = searchsortedfirst(node_id, NodeID(NodeType.Basin, row.node_id, 0)) + j = findfirst(==(Symbol(row.substance)), substances) + ismissing(row.drainage) || (concentration[1, i, j] = row.drainage) + ismissing(row.precipitation) || (concentration[2, i, j] = row.precipitation) + end + return nothing +end + +"Load updates from 'concentration' tables into the parameters" +function update_conc!(integrator, parameter, nodetype)::Nothing + (; p) = integrator + node = getproperty(p, parameter) + (; basin) = p + (; node_id, concentration, concentration_time) = node + (; substances) = basin + t = datetime_since(integrator.t, integrator.p.starttime) + + rows = searchsorted(concentration_time.time, t) + timeblock = view(concentration_time, rows) + + for row in timeblock + i = searchsortedfirst(node_id, NodeID(nodetype, row.node_id, 0)) + j = findfirst(==(Symbol(row.substance)), substances) + ismissing(row.concentration) || (concentration[i, j] = row.concentration) + end + return nothing +end +update_flowb_conc!(integrator)::Nothing = + update_conc!(integrator, :flow_boundary, NodeType.FlowBoundary) +update_levelb_conc!(integrator)::Nothing = + update_conc!(integrator, :level_boundary, NodeType.LevelBoundary) +update_userd_conc!(integrator)::Nothing = + update_conc!(integrator, :user_demand, NodeType.UserDemand) + "Solve the allocation problem for all demands and assign allocated abstractions." function update_allocation!(integrator)::Nothing (; p, t, u) = integrator diff --git a/core/src/config.jl b/core/src/config.jl index ba6d3e197..bf9e38c81 100644 --- a/core/src/config.jl +++ b/core/src/config.jl @@ -114,6 +114,7 @@ const nodetypes = collect(keys(nodekinds)) maxiters::Int = 1e9 sparse::Bool = true autodiff::Bool = false + evaporate_mass::Bool = true end # Separate struct, as basin clashes with nodetype diff --git a/core/src/main.jl b/core/src/main.jl index 0a2ae437c..0e613b8f1 100644 --- a/core/src/main.jl +++ b/core/src/main.jl @@ -39,7 +39,7 @@ function main(toml_path::AbstractString)::Cint if config.ribasim_version != cli.ribasim_version @warn "The Ribasim version in the TOML config file does not match the used Ribasim CLI version." config.ribasim_version cli.ribasim_version end - @info "Starting a Ribasim simulation." cli.ribasim_version starttime endtime + @info "Starting a Ribasim simulation." toml_path cli.ribasim_version starttime endtime try model = run(config) diff --git a/core/src/parameter.jl b/core/src/parameter.jl index ac5ded66e..8dcf03705 100644 --- a/core/src/parameter.jl +++ b/core/src/parameter.jl @@ -10,6 +10,9 @@ const SolverStats = @NamedTuple{ @enumx EdgeType flow control none @eval @enumx NodeType $(config.nodetypes...) @enumx ContinuousControlType None Continuous PID +@enumx Substance Continuity = 1 Initial = 2 LevelBoundary = 3 FlowBoundary = 4 UserDemand = + 5 Drainage = 6 Precipitation = 7 +Base.to_index(id::Substance.T) = Int(id) # used to index into concentration matrices # Support creating a NodeType enum instance from a symbol or string function NodeType.T(s::Symbol)::NodeType.T @@ -103,7 +106,7 @@ const ScalarInterpolation = LinearInterpolation{ Float64, } -set_zero!(v) = fill!(v, zero(eltype(v))) +set_zero!(v) = v .= zero(eltype(v)) const Cache = LazyBufferCache{Returns{Int}, typeof(set_zero!)} """ @@ -263,11 +266,12 @@ abstract type AbstractDemandNode <: AbstractParameterNode end In-memory storage of saved mean flows for writing to results. - `flow`: The mean flows on all edges and state-dependent forcings -- `inflow`: The sum of the mean flows coming into each basin -- `outflow`: The sum of the mean flows going out of each basin +- `inflow`: The sum of the mean flows coming into each Basin +- `outflow`: The sum of the mean flows going out of each Basin - `flow_boundary`: The exact integrated mean flows of flow boundaries - `precipitation`: The exact integrated mean precipitation - `drainage`: The exact integrated mean drainage +- `concentration`: Concentrations for each Basin and substance - `balance_error`: The (absolute) water balance error - `relative_error`: The relative water balance error - `t`: Endtime of the interval over which is averaged @@ -279,6 +283,7 @@ In-memory storage of saved mean flows for writing to results. flow_boundary::Vector{Float64} precipitation::Vector{Float64} drainage::Vector{Float64} + concentration::Matrix{Float64} storage_rate::Vector{Float64} = zero(precipitation) balance_error::Vector{Float64} = zero(precipitation) relative_error::Vector{Float64} = zero(precipitation) @@ -310,7 +315,7 @@ else T = Vector{Float64} end """ -@kwdef struct Basin{C, V} <: AbstractParameterNode +@kwdef struct Basin{C, D, V} <: AbstractParameterNode node_id::Vector{NodeID} inflow_ids::Vector{Vector{NodeID}} = [NodeID[]] outflow_ids::Vector{Vector{NodeID}} = [NodeID[]] @@ -345,7 +350,25 @@ end demand::Vector{Float64} # Data source for parameter updates time::StructVector{BasinTimeV1, C, Int} + # Data source for concentration updates + concentration_time::StructVector{BasinConcentrationV1, D, Int} + # Concentrations + # Config setting to enable/disable evaporation of mass + evaporate_mass::Bool = true + # Cumulative inflow for each Basin at a given time + cumulative_in::Vector{Float64} = zeros(length(node_id)) + # Storage for each Basin at the previous time step + storage_prev::Vector{Float64} = zeros(length(node_id)) + # matrix with concentrations for each Basin and substance + concentration_state::Matrix{Float64} # Basin, substance + # matrix with boundary concentrations for each boundary, Basin and substance + concentration::Array{Float64, 3} + # matrix with mass for each Basin and substance + mass::Matrix{Float64} + # substances in use by the model (ordered like their axis in the concentration matrices) + substances::OrderedSet{Symbol} + # Data source for external concentrations (used in control) concentration_external::Vector{Dict{String, ScalarInterpolation}} = Dict{String, ScalarInterpolation}[] end @@ -459,12 +482,16 @@ end """ node_id: node ID of the LevelBoundary node active: whether this node is active -level: the fixed level of this 'infinitely big basin' +level: the fixed level of this 'infinitely big Basin' +concentration: matrix with boundary concentrations for each Basin and substance +concentration_time: Data source for concentration updates """ -@kwdef struct LevelBoundary <: AbstractParameterNode +@kwdef struct LevelBoundary{C} <: AbstractParameterNode node_id::Vector{NodeID} active::Vector{Bool} level::Vector{ScalarInterpolation} + concentration::Matrix{Float64} + concentration_time::StructVector{LevelBoundaryConcentrationV1, C, Int} end """ @@ -474,14 +501,18 @@ active: whether this node is active and thus contributes flow cumulative_flow: The exactly integrated cumulative boundary flow since the start of the simulation cumulative_flow_saveat: The exactly integrated cumulative boundary flow since the last saveat flow_rate: flow rate (exact) +concentration: matrix with boundary concentrations for each Basin and substance +concentration_time: Data source for concentration updates """ -@kwdef struct FlowBoundary <: AbstractParameterNode +@kwdef struct FlowBoundary{C} <: AbstractParameterNode node_id::Vector{NodeID} outflow_edges::Vector{Vector{EdgeMetadata}} active::Vector{Bool} cumulative_flow::Vector{Float64} = zeros(length(node_id)) cumulative_flow_saveat::Vector{Float64} = zeros(length(node_id)) flow_rate::Vector{ScalarInterpolation} + concentration::Matrix{Float64} + concentration_time::StructVector{FlowBoundaryConcentrationV1, C, Int} end """ @@ -745,9 +776,11 @@ demand_itp: Timeseries interpolation objects for demands demand_from_timeseries: If false the demand comes from the BMI or is fixed allocated: water flux currently allocated to UserDemand per priority (node_idx, priority_idx) return_factor: the factor in [0,1] of how much of the abstracted water is given back to the system -min_level: The level of the source basin below which the UserDemand does not abstract +min_level: The level of the source Basin below which the UserDemand does not abstract +concentration: matrix with boundary concentrations for each Basin and substance +concentration_time: Data source for concentration updates """ -@kwdef struct UserDemand <: AbstractDemandNode +@kwdef struct UserDemand{C} <: AbstractDemandNode node_id::Vector{NodeID} inflow_edge::Vector{EdgeMetadata} = [] outflow_edge::Vector{EdgeMetadata} = [] @@ -759,6 +792,8 @@ min_level: The level of the source basin below which the UserDemand does not abs allocated::Matrix{Float64} return_factor::Vector{ScalarInterpolation} min_level::Vector{Float64} + concentration::Matrix{Float64} + concentration_time::StructVector{UserDemandConcentrationV1, C, Int} end """ @@ -821,29 +856,29 @@ const ModelGraph = MetaGraph{ Float64, } -@kwdef struct Parameters{C1, C2, C3, C4, C5, V} +@kwdef struct Parameters{C1, C2, C3, C4, C5, C6, C7, C8, C9, V} starttime::DateTime graph::ModelGraph allocation::Allocation - basin::Basin{C1, V} + basin::Basin{C1, C2, V} linear_resistance::LinearResistance manning_resistance::ManningResistance - tabulated_rating_curve::TabulatedRatingCurve{C2} - level_boundary::LevelBoundary - flow_boundary::FlowBoundary + tabulated_rating_curve::TabulatedRatingCurve{C3} + level_boundary::LevelBoundary{C4} + flow_boundary::FlowBoundary{C5} pump::Pump outlet::Outlet terminal::Terminal discrete_control::DiscreteControl continuous_control::ContinuousControl pid_control::PidControl - user_demand::UserDemand + user_demand::UserDemand{C6} level_demand::LevelDemand flow_demand::FlowDemand subgrid::Subgrid - # Per state the in- and outflow edges associated with that state (if theu exist) - state_inflow_edge::C3 = ComponentVector() - state_outflow_edge::C4 = ComponentVector() + # Per state the in- and outflow edges associated with that state (if they exist) + state_inflow_edge::C7 = ComponentVector() + state_outflow_edge::C8 = ComponentVector() all_nodes_active::Base.RefValue{Bool} = Ref(false) tprev::Base.RefValue{Float64} = Ref(0.0) # Sparse matrix for combining flows into storages @@ -852,7 +887,7 @@ const ModelGraph = MetaGraph{ water_balance_abstol::Float64 water_balance_reltol::Float64 # State at previous saveat - u_prev_saveat::C5 = ComponentVector() + u_prev_saveat::C9 = ComponentVector() end # To opt-out of type checking for ForwardDiff diff --git a/core/src/read.jl b/core/src/read.jl index bb259c3c3..2a9ae9bf5 100644 --- a/core/src/read.jl +++ b/core/src/read.jl @@ -416,6 +416,7 @@ end function LevelBoundary(db::DB, config::Config)::LevelBoundary static = load_structvector(db, config, LevelBoundaryStaticV1) time = load_structvector(db, config, LevelBoundaryTimeV1) + concentration_time = load_structvector(db, config, LevelBoundaryConcentrationV1) _, _, node_ids, valid = static_and_time_node_ids(db, static, time, "LevelBoundary") @@ -427,6 +428,12 @@ function LevelBoundary(db::DB, config::Config)::LevelBoundary parsed_parameters, valid = parse_static_and_time(db, config, LevelBoundary; static, time, time_interpolatables) + substances = get_substances(db, config) + concentration = zeros(length(node_ids), length(substances)) + concentration[:, Substance.Continuity] .= 1.0 + concentration[:, Substance.LevelBoundary] .= 1.0 + set_concentrations!(concentration, concentration_time, substances, Int32.(node_ids)) + if !valid error("Errors occurred when parsing LevelBoundary data.") end @@ -435,12 +442,15 @@ function LevelBoundary(db::DB, config::Config)::LevelBoundary node_id = node_ids, parsed_parameters.active, parsed_parameters.level, + concentration, + concentration_time, ) end function FlowBoundary(db::DB, config::Config, graph::MetaGraph)::FlowBoundary static = load_structvector(db, config, FlowBoundaryStaticV1) time = load_structvector(db, config, FlowBoundaryTimeV1) + concentration_time = load_structvector(db, config, FlowBoundaryConcentrationV1) _, _, node_ids, valid = static_and_time_node_ids(db, static, time, "FlowBoundary") @@ -461,6 +471,12 @@ function FlowBoundary(db::DB, config::Config, graph::MetaGraph)::FlowBoundary end end + substances = get_substances(db, config) + concentration = zeros(length(node_ids), length(substances)) + concentration[:, Substance.Continuity] .= 1.0 + concentration[:, Substance.FlowBoundary] .= 1.0 + set_concentrations!(concentration, concentration_time, substances, Int32.(node_ids)) + if !valid error("Errors occurred when parsing FlowBoundary data.") end @@ -470,6 +486,8 @@ function FlowBoundary(db::DB, config::Config, graph::MetaGraph)::FlowBoundary outflow_edges = outflow_edges.(Ref(graph), node_ids), parsed_parameters.active, parsed_parameters.flow_rate, + concentration, + concentration_time, ) end @@ -559,6 +577,7 @@ function Basin(db::DB, config::Config, graph::MetaGraph)::Basin current_level = cache(n) current_area = cache(n) + evaporate_mass = config.solver.evaporate_mass precipitation = zeros(n) potential_evaporation = zeros(n) evaporation = zeros(n) @@ -572,6 +591,36 @@ function Basin(db::DB, config::Config, graph::MetaGraph)::Basin static = load_structvector(db, config, BasinStaticV1) time = load_structvector(db, config, BasinTimeV1) state = load_structvector(db, config, BasinStateV1) + concentration_state_data = load_structvector(db, config, BasinConcentrationStateV1) + concentration_time = load_structvector(db, config, BasinConcentrationV1) + + # TODO Move into a function + substances = get_substances(db, config) + concentration_state = zeros(n, length(substances)) + concentration_state[:, Substance.Continuity] .= 1.0 + concentration_state[:, Substance.Initial] .= 1.0 + set_concentrations!(concentration_state, concentration_state_data, substances, node_id) + mass = copy(concentration_state) + + concentration = zeros(2, n, length(substances)) + concentration[1, :, Substance.Continuity] .= 1.0 + concentration[1, :, Substance.Drainage] .= 1.0 + concentration[2, :, Substance.Continuity] .= 1.0 + concentration[2, :, Substance.Precipitation] .= 1.0 + set_concentrations!( + view(concentration, 1, :, :), + concentration_time, + substances, + node_id; + concentration_column = :drainage, + ) + set_concentrations!( + view(concentration, 1, :, :), + concentration_time, + substances, + node_id; + concentration_column = :precipitation, + ) set_static_value!(table, node_id, static) set_current_value!(table, node_id, time, config.starttime) @@ -643,12 +692,21 @@ function Basin(db::DB, config::Config, graph::MetaGraph)::Basin level_to_area, demand, time, + concentration_time, + evaporate_mass, + concentration_state, + concentration, + mass, concentration_external, + substances, ) storage0 = get_storages_from_levels(basin, state.level) @assert length(storage0) == n "Basin / state length differs from number of Basins" basin.storage0 .= storage0 + basin.storage_prev .= storage0 + basin.mass .*= storage0 # was initialized by concentration_state, resulting in mass + return basin end @@ -1014,6 +1072,7 @@ end function UserDemand(db::DB, config::Config, graph::MetaGraph)::UserDemand static = load_structvector(db, config, UserDemandStaticV1) time = load_structvector(db, config, UserDemandTimeV1) + concentration_time = load_structvector(db, config, UserDemandConcentrationV1) ids = get_ids(db, "UserDemand") _, _, node_ids, valid = static_and_time_node_ids(db, static, time, "UserDemand") @@ -1070,6 +1129,12 @@ function UserDemand(db::DB, config::Config, graph::MetaGraph)::UserDemand config, ) + substances = get_substances(db, config) + concentration = zeros(length(node_ids), length(substances)) + # Continuity concentration is zero, as the return flow (from a Basin) already includes it + concentration[:, Substance.UserDemand] .= 1.0 + set_concentrations!(concentration, concentration_time, substances, ids) + if errors || !valid_demand(node_ids, demand_itp, priorities) error("Errors occurred when parsing UserDemand data.") end @@ -1086,6 +1151,8 @@ function UserDemand(db::DB, config::Config, graph::MetaGraph)::UserDemand allocated, return_factor, min_level, + concentration, + concentration_time, ) end @@ -1434,3 +1501,43 @@ function create_storage_tables( end return area, level end + +"Determine all substances present in the input over multiple tables" +function get_substances(db::DB, config::Config)::OrderedSet{Symbol} + # Hardcoded tracers + substances = OrderedSet{Symbol}(Symbol.(instances(Substance.T))) + for table in [ + BasinConcentrationStateV1, + BasinConcentrationV1, + FlowBoundaryConcentrationV1, + LevelBoundaryConcentrationV1, + UserDemandConcentrationV1, + ] + data = load_structvector(db, config, table) + for row in data + push!(substances, Symbol(row.substance)) + end + end + return substances +end + +"Set values in wide concentration matrix from a long input table." +function set_concentrations!( + concentration, + concentration_data, + substances, + node_ids; + concentration_column = :concentration, +) + for substance in unique(concentration_data.substance) + data_sub = filter(row -> row.substance == substance, concentration_data) + sub_idx = findfirst(==(Symbol(substance)), substances) + for group in IterTools.groupby(row -> row.node_id, data_sub) + first_row = first(group) + value = getproperty(first_row, concentration_column) + ismissing(value) && continue + node_idx = findfirst(==(first_row.node_id), node_ids) + concentration[node_idx, sub_idx] = value + end + end +end diff --git a/core/src/schema.jl b/core/src/schema.jl index aa13abfa9..e03d8e852 100644 --- a/core/src/schema.jl +++ b/core/src/schema.jl @@ -29,6 +29,7 @@ @schema "ribasim.outlet.static" OutletStatic @schema "ribasim.userdemand.static" UserDemandStatic @schema "ribasim.userdemand.time" UserDemandTime +@schema "ribasim.userdemand.concentration" UserDemandConcentration @schema "ribasim.leveldemand.static" LevelDemandStatic @schema "ribasim.leveldemand.time" LevelDemandTime @schema "ribasim.flowdemand.static" FlowDemandStatic @@ -298,6 +299,13 @@ end priority::Union{Missing, Int32} end +@version UserDemandConcentrationV1 begin + node_id::Int32 + time::DateTime + substance::String + concentration::Float64 +end + @version LevelDemandStaticV1 begin node_id::Int32 min_level::Union{Missing, Float64} diff --git a/core/src/write.jl b/core/src/write.jl index 1c0e1eeef..87afce433 100644 --- a/core/src/write.jl +++ b/core/src/write.jl @@ -24,6 +24,11 @@ function write_results(model::Model)::Model path = results_path(config, RESULTS_FILENAME.flow) write_arrow(path, table, compress; remove_empty_table) + # concentrations + table = concentration_table(model) + path = results_path(config, RESULTS_FILENAME.concentration) + write_arrow(path, table, compress; remove_empty_table) + # discrete control table = discrete_control_table(model) path = results_path(config, RESULTS_FILENAME.control) @@ -57,6 +62,7 @@ const RESULTS_FILENAME = ( basin_state = "basin_state.arrow", basin = "basin.arrow", flow = "flow.arrow", + concentration = "concentration.arrow", control = "control.arrow", allocation = "allocation.arrow", allocation_flow = "allocation_flow.arrow", @@ -247,6 +253,45 @@ function flow_table( return (; time, edge_id, from_node_id, to_node_id, flow_rate) end +"Create a concentration result table from the saved data" +function concentration_table( + model::Model, +)::@NamedTuple{ + time::Vector{DateTime}, + node_id::Vector{Int32}, + substance::Vector{String}, + concentration::Vector{Float64}, +} + (; saved, integrator) = model + (; p) = integrator + (; basin) = p + + # The last timestep is not included; there is no period over which to compute flows. + data = get_storages_and_levels(model) + + ntsteps = length(data.time) - 1 + nbasin = length(data.node_id) + nsubstance = length(basin.substances) + nrows = ntsteps * nbasin * nsubstance + + substances = String.(basin.substances) + concentration = zeros(nrows) + + idx_row = 0 + for cvec in saved.flow.saveval + for concentration_ in vec(cvec.concentration) + idx_row += 1 + concentration[idx_row] = concentration_ + end + end + + time = repeat(data.time[begin:(end - 1)]; inner = nbasin * nsubstance) + substance = repeat(substances; inner = nbasin, outer = ntsteps) + node_id = repeat(Int32.(data.node_id); outer = ntsteps * nsubstance) + + return (; time, node_id, substance, concentration) +end + "Create a discrete control result table from the saved data" function discrete_control_table( model::Model, diff --git a/core/test/allocation_test.jl b/core/test/allocation_test.jl index f7549f1d1..47050c682 100644 --- a/core/test/allocation_test.jl +++ b/core/test/allocation_test.jl @@ -582,8 +582,8 @@ end (; allocation_models) = p.allocation (; basin, level_demand, graph) = p - fill!(level_demand.max_level[1].u, Inf) - fill!(level_demand.max_level[2].u, Inf) + level_demand.max_level[1].u .= Inf + level_demand.max_level[2].u .= Inf # Given a max_level of Inf, the basin capacity is 0.0 because it is not possible for the basin level to be > Inf @test Ribasim.get_basin_capacity(allocation_models[1], u, p, t, basin.node_id[1]) == 0.0 diff --git a/core/test/docs.toml b/core/test/docs.toml index 4fb2c766f..3b91105c6 100644 --- a/core/test/docs.toml +++ b/core/test/docs.toml @@ -38,6 +38,7 @@ water_balance_reltol = 1e-2 # optional, default 1e-2 maxiters = 1e9 # optional, default 1e9 sparse = true # optional, default true autodiff = false # optional, default false +evaporate_mass = true # optional, default true to simulate a correct mass balance [logging] # defines the logging level of Ribasim diff --git a/core/test/utils_test.jl b/core/test/utils_test.jl index 8f5fa79f1..f8a3fd12e 100644 --- a/core/test/utils_test.jl +++ b/core/test/utils_test.jl @@ -13,6 +13,7 @@ end using StructArrays: StructVector using Ribasim: NodeID, cache using DataInterpolations: LinearInterpolation, integral, invert_integral + using DataStructures: OrderedSet # create two basins with different bottoms/levels area = [[0.01, 1.0], [0.01, 1.0]] @@ -24,6 +25,12 @@ end current_area = cache(2) current_level[Float64[]] .= [2.0, 3.0] current_area[Float64[]] .= [2.0, 3.0] + + substances = OrderedSet([:test]) + concentration_state = zeros(2, 1) + concentration = zeros(2, 2, 1) + mass = zeros(2, 1) + basin = Ribasim.Basin(; node_id = NodeID.(:Basin, [5, 7], [1, 2]), current_level, @@ -31,7 +38,12 @@ end storage_to_level, level_to_area, demand, + concentration_state, + concentration, + mass, + substances, time = StructVector{Ribasim.BasinTimeV1}(undef, 0), + concentration_time = StructVector{Ribasim.BasinConcentrationV1}(undef, 0), ) @test Ribasim.basin_levels(basin, 2)[1] === 4.0 @@ -45,6 +57,7 @@ end using Logging using Ribasim: NodeID using DataInterpolations: LinearInterpolation, invert_integral + using DataStructures: OrderedSet level = [ 0.0, @@ -73,12 +86,23 @@ end level_to_area = LinearInterpolation(area, level; extrapolate = true) storage_to_level = invert_integral(level_to_area) demand = zeros(1) + + substances = OrderedSet([:test]) + concentration_state = zeros(1, 1) + concentration = zeros(2, 1, 1) + mass = zeros(1, 1) + basin = Ribasim.Basin(; node_id = NodeID.(:Basin, [1], 1), storage_to_level = [storage_to_level], level_to_area = [level_to_area], demand, time = StructVector{Ribasim.BasinTimeV1}(undef, 0), + concentration_time = StructVector{Ribasim.BasinConcentrationV1}(undef, 0), + concentration_state, + concentration, + mass, + substances, ) logger = TestLogger() diff --git a/docs/concept/core.qmd b/docs/concept/core.qmd index 3103d5944..ebfc9d201 100644 --- a/docs/concept/core.qmd +++ b/docs/concept/core.qmd @@ -119,3 +119,22 @@ end allocation_subNetwork->>user_demand: allocated user_demand->>basin: abstracted ``` + +# Substance (tracer) concentration calculations + +Ribasim can calculate concentrations of conservative tracers (i.e. substances that are non-reactive). +It does so by calculating the mass transports by flows for each timestep, in the `update_cumulative_flows!` callback. +Specifically, for each Basin at each timestep we calculate: + +- all mass inflows (flow * source_concentration) given the edge inflows +- update the concentrations in the Basin based on the added storage (previous storage + inflows) +- all mass outflows (flow * basin_concentration_state) give the edge outflows +- update the concentrations in the Basin based on the current storage + +We thus keep track of both mass and concentration of substances for each Basin. +Note that we have not added the substance mass to the states, and we assume that concentrations of flows are piecewise constant over a timestep. + +By default we calculate concentrations for the following source tracers. +- Continuity (mass balance, fraction of all water sources, sum of all other source tracers) +- Initial (fraction of initial storages) +- LevelBoundary, FlowBoundary, UserDemand, Drainage, Precipitation (fraction of different boundaries) diff --git a/docs/reference/node/basin.qmd b/docs/reference/node/basin.qmd index 1105684a0..ee87fb99a 100644 --- a/docs/reference/node/basin.qmd +++ b/docs/reference/node/basin.qmd @@ -392,7 +392,7 @@ Note that the interpolation to subgrid water level is not constrained by any wat Generally, to create physically meaningful subgrid water levels, the subgrid table must be parametrized properly such that the spatially integrated water volume of the subgrid elements agrees with the total storage volume of the basin. ## Concentration {#sec-basin-conc} -This table defines the concentration(s) of (a) substance(s) for the inflow boundaries of a Basin node. +This table defines the concentration of substances for the inflow boundaries of a Basin node. column | type | unit | restriction ------------- | -------- | ------------------------ | ----------- @@ -403,7 +403,7 @@ drainage | Float64 | $\text{g}/\text{m}^3$ | (optional) precipitation | Float64 | $\text{g}/\text{m}^3$ | (optional) ## ConcentrationState {#sec-basin-conc-state} -This table defines the concentration(s) of (a) substance(s) in the Basin at the start of the simulation. +This table defines the concentration of substances in the Basin at the start of the simulation. column | type | unit | restriction -------------- | -------- | ------------------------ | ----------- diff --git a/docs/reference/node/flow-boundary.qmd b/docs/reference/node/flow-boundary.qmd index 3600ead68..e2b465d89 100644 --- a/docs/reference/node/flow-boundary.qmd +++ b/docs/reference/node/flow-boundary.qmd @@ -38,7 +38,7 @@ time | DateTime | - | sorted per node_id flow_rate | Float64 | $\text{m}^3/\text{s}$ | non-negative ## Concentration {#sec-flow-boundary-conc} -This table defines the concentration(s) of (a) substance(s) for the flow from the FlowBoundary. +This table defines the concentration of substances for the flow from the FlowBoundary. column | type | unit | restriction -------------- | -------- | --------------------- | ----------- diff --git a/docs/reference/node/level-boundary.qmd b/docs/reference/node/level-boundary.qmd index 3cb322f56..ca77ef0a3 100644 --- a/docs/reference/node/level-boundary.qmd +++ b/docs/reference/node/level-boundary.qmd @@ -34,7 +34,7 @@ time | DateTime | - | sorted per node_id level | Float64 | $\text{m}$ | - ## Concentration {#sec-level-boundary-conc} -This table defines the concentration(s) of (a) substance(s) for the flow from the LevelBoundary. +This table defines the concentration of substances for the flow from the LevelBoundary. column | type | unit | restriction -------------- | -------- | --------------------- | ----------- diff --git a/docs/reference/node/user-demand.qmd b/docs/reference/node/user-demand.qmd index f53f5fbfc..9239cd853 100644 --- a/docs/reference/node/user-demand.qmd +++ b/docs/reference/node/user-demand.qmd @@ -51,6 +51,16 @@ demand | Float64 | $\text{m}^3/\text{s}$ | non-negative return_factor | Float64 | - | between [0 - 1] min_level | Float64 | $\text{m}$ | - +## Concentration +This table defines the concentration of substances for the flow from the UserDemand. + +column | type | unit | restriction +-------------- | -------- | --------------------- | ----------- +node_id | Int32 | - | sorted +time | DateTime | - | sorted per node_id +substance | String | - | can correspond to known Delwaq substances +concentration | Float64 | $\text{g}/\text{m}^3$ | + # Equations UserDemand receive an allocated flow rate $F^p$ per priority $p=1,2,\ldots, p_{\max}$, which is either determined by [allocation optimization](/concept/allocation.qmd) or simply equal to the demand at time $t$; $F^p = d^p(t)$. diff --git a/docs/reference/usage.qmd b/docs/reference/usage.qmd index b905da172..3a4d782ee 100644 --- a/docs/reference/usage.qmd +++ b/docs/reference/usage.qmd @@ -54,6 +54,9 @@ The total maximum number of iterations `maxiters = 1e9`, can normally stay as-is The absolute and relative tolerance for adaptive timestepping can be set with `abstol` and `reltol`. For more information on these and other solver options, see the [DifferentialEquations.jl docs](https://docs.sciml.ai/DiffEqDocs/latest/basics/common_solver_opts/#solver_options). +Finally there's the `evaporate_mass = true` setting, which determines whether mass is lost due to evaporation in water quality calculations, by default set to true. +While physically incorrect, it is useful for a first correctness check on a model in terms of mass balance (Continuity tracer should always have a concentration of 1). To simulate increasing concentrations (e.g. salinity) due to evaporation, change the setting to `false`. + ## Allocation settings Currently there are the following allocation settings: - `use_allocation`: A boolean which says whether allocation should be used or not; diff --git a/python/ribasim/ribasim/config.py b/python/ribasim/ribasim/config.py index 58649f4ff..e426cf82f 100644 --- a/python/ribasim/ribasim/config.py +++ b/python/ribasim/ribasim/config.py @@ -121,6 +121,7 @@ class Solver(ChildModel): maxiters: int = 1000000000 sparse: bool = True autodiff: bool = False + evaporate_mass: bool = True class Verbosity(str, Enum): diff --git a/python/ribasim/ribasim/delwaq/generate.py b/python/ribasim/ribasim/delwaq/generate.py index f52961611..3d964aef3 100644 --- a/python/ribasim/ribasim/delwaq/generate.py +++ b/python/ribasim/ribasim/delwaq/generate.py @@ -1,8 +1,9 @@ """Setup a Delwaq model from a Ribasim model and results.""" +import argparse import csv +import logging import shutil -import sys from datetime import timedelta from pathlib import Path @@ -31,8 +32,9 @@ write_volumes, ) +logger = logging.getLogger(__name__) delwaq_dir = Path(__file__).parent -output_folder = delwaq_dir / "model" +output_path = delwaq_dir / "model" env = jinja2.Environment( autoescape=True, loader=jinja2.FileSystemLoader(delwaq_dir / "template") @@ -88,7 +90,7 @@ def _make_boundary(data, boundary_type): return boundary, substances -def _setup_graph(nodes, edge, use_evaporation=True): +def _setup_graph(nodes, edge, evaporate_mass=True): G = nx.DiGraph() assert nodes.df is not None @@ -131,7 +133,7 @@ def _setup_graph(nodes, edge, use_evaporation=True): for outneighbor_id in out.keys(): if outneighbor_id in remove_nodes: - print("Not making edge to removed node.") + logger.debug("Not making edge to removed node.") continue edge = (inneighbor_id, outneighbor_id) edge_id = G.get_edge_data(node_id, outneighbor_id)["id"][0] @@ -156,13 +158,27 @@ def _setup_graph(nodes, edge, use_evaporation=True): G.nodes[loop[0]]["type"] != "UserDemand" and G.nodes[loop[1]]["type"] != "UserDemand" ): - print("Found cycle that is not a UserDemand.") + logger.debug("Found cycle that is not a UserDemand.") else: edge_ids = G.edges[loop]["id"] G.edges[reversed(loop)]["id"].extend(edge_ids) merge_edges.extend(edge_ids) G.remove_edge(*loop) + # Remove boundary to boundary edges + remove_double_edges = [] + for x in G.edges(data=True): + a, b, d = x + if G.nodes[a]["type"] == "Terminal" and G.nodes[b]["type"] == "UserDemand": + logger.debug("Removing edge between Terminal and UserDemand") + remove_double_edges.append(a) + elif G.nodes[a]["type"] == "UserDemand" and G.nodes[b]["type"] == "Terminal": + remove_double_edges.append(b) + logger.debug("Removing edge between UserDemand and Terminal") + + for node_id in remove_double_edges: + G.remove_node(node_id) + # Relabel the nodes as consecutive integers for Delwaq # Note that the node["id"] is the original node_id basin_id = 0 @@ -183,7 +199,7 @@ def _setup_graph(nodes, edge, use_evaporation=True): boundary_id -= 1 node_mapping[node_id] = boundary_id else: - raise Exception(f"Found unexpected node {node_id} in delwaq graph.") + raise ValueError(f"Found unexpected node {node_id} in delwaq graph.") nx.relabel_nodes(G, node_mapping, copy=False) @@ -220,7 +236,7 @@ def _setup_graph(nodes, edge, use_evaporation=True): boundary=(node["id"], "precipitation"), ) - if use_evaporation: + if evaporate_mass: boundary_id -= 1 G.add_node( boundary_id, @@ -276,14 +292,15 @@ def _setup_boundaries(model): def generate( toml_path: Path, - output_folder=output_folder, - use_evaporation=USE_EVAP, - results_folder="results", + output_path: Path = output_path, ) -> tuple[nx.DiGraph, set[str]]: """Generate a Delwaq model from a Ribasim model and results.""" # Read in model and results model = ribasim.Model.read(toml_path) + results_folder = toml_path.parent / model.results_dir + evaporate_mass = model.solver.evaporate_mass + basins = pd.read_feather( toml_path.parent / results_folder / "basin.arrow", dtype_backend="pyarrow" ) @@ -291,11 +308,11 @@ def generate( toml_path.parent / results_folder / "flow.arrow", dtype_backend="pyarrow" ) - output_folder.mkdir(exist_ok=True) + output_path.mkdir(exist_ok=True) # Setup flow network G, merge_edges, node_mapping, edge_mapping, basin_mapping = _setup_graph( - model.node_table(), model.edge, use_evaporation=use_evaporation + model.node_table(), model.edge, evaporate_mass=evaporate_mass ) # Plot @@ -316,15 +333,15 @@ def generate( # Write topology to delwaq pointer file pointer = pd.DataFrame(G.edges(), columns=["from_node_id", "to_node_id"]) - pointer.to_csv(output_folder / "network.csv", index=False) # not needed - write_pointer(output_folder / "ribasim.poi", pointer) + pointer.to_csv(output_path / "network.csv", index=False) # not needed + write_pointer(output_path / "ribasim.poi", pointer) total_segments = len(basin_mapping) total_exchanges = len(pointer) # Write attributes template template = env.get_template("delwaq.atr.j2") - with open(output_folder / "ribasim.atr", mode="w") as f: + with open(output_path / "ribasim.atr", mode="w") as f: f.write( template.render( nsegments=total_segments, @@ -333,7 +350,7 @@ def generate( # Generate mesh and write to NetCDF uds = ugrid(G) - uds.ugrid.to_netcdf(output_folder / "ribasim.nc") + uds.ugrid.to_netcdf(output_path / "ribasim.nc") # Generate area and flows # File format is int32, float32 based @@ -370,14 +387,14 @@ def generate( # Save flows to Delwaq format nflows.sort_values(by=["time", "edge_id"], inplace=True) - nflows.to_csv(output_folder / "flows.csv", index=False) # not needed + nflows.to_csv(output_path / "flows.csv", index=False) # not needed nflows.drop( columns=["edge_id"], inplace=True, ) - write_flows(output_folder / "ribasim.flo", nflows, timestep) + write_flows(output_path / "ribasim.flo", nflows, timestep) write_flows( - output_folder / "ribasim.are", nflows, timestep + output_path / "ribasim.are", nflows, timestep ) # same as flow, so area becomes 1 # Write volumes to Delwaq format @@ -387,25 +404,25 @@ def generate( volumes["node_id"].map(basin_mapping).astype(pd.Int32Dtype()) ) volumes = volumes.sort_values(by=["time", "node_id"]) - volumes.to_csv(output_folder / "volumes.csv", index=False) # not needed + volumes.to_csv(output_path / "volumes.csv", index=False) # not needed volumes.drop(columns=["node_id"], inplace=True) - write_volumes(output_folder / "ribasim.vol", volumes, timestep) + write_volumes(output_path / "ribasim.vol", volumes, timestep) write_volumes( - output_folder / "ribasim.vel", volumes, timestep + output_path / "ribasim.vel", volumes, timestep ) # same as volume, so vel becomes 1 # Length file lengths = nflows.copy() lengths.flow_rate = 1 lengths.iloc[np.repeat(np.arange(len(lengths)), 2)] - write_flows(output_folder / "ribasim.len", lengths, timestep) + write_flows(output_path / "ribasim.len", lengths, timestep) # Find all boundary substances and concentrations boundaries, substances = _setup_boundaries(model) # Write boundary data with substances and concentrations template = env.get_template("B5_bounddata.inc.j2") - with open(output_folder / "B5_bounddata.inc", mode="w") as f: + with open(output_path / "B5_bounddata.inc", mode="w") as f: f.write( template.render( states=[], # no states yet @@ -463,8 +480,27 @@ def generate( bnd["fid"] = list(map(_boundary_name, bnd["node_id"], bnd["node_type"])) bnd["comment"] = "" bnd = bnd[["fid", "comment", "node_type"]] + + def prefix(d): + """Replace duplicate boundary names with unique names. + + As the string length is fixed, we replace the first + character with a unique number. + """ + n = len(d) + if n == 1: + return d + elif n == 2: + logger.debug(f"Renaming duplicate boundaries {d.iloc[0]}") + return [str(i) for i in range(n)] + d.str.lstrip("U") + else: + raise ValueError("Found boundary with more than 2 duplicates.") + + bnd["fid"] = bnd.groupby("fid").fid.transform(prefix) + assert bnd["fid"].is_unique + bnd.to_csv( - output_folder / "ribasim_bndlist.inc", + output_path / "ribasim_bndlist.inc", index=False, header=False, sep=" ", @@ -474,11 +510,11 @@ def generate( # Setup DIMR configuration for running Delwaq via DIMR dimrc = delwaq_dir / "reference/dimr_config.xml" - shutil.copy(dimrc, output_folder / "dimr_config.xml") + shutil.copy(dimrc, output_path / "dimr_config.xml") # Write main Delwaq input file template = env.get_template("delwaq.inp.j2") - with open(output_folder / "delwaq.inp", mode="w") as f: + with open(output_path / "delwaq.inp", mode="w") as f: f.write( template.render( startime=model.starttime, @@ -525,5 +561,21 @@ def add_tracer(model, node_id, tracer_name): if __name__ == "__main__": # Generate a Delwaq model from the default Ribasim model - toml_path = Path(sys.argv[1]) - graph, substances = generate(toml_path, toml_path.parent / "delwaq") + + parser = argparse.ArgumentParser( + description="Generate Delwaq input from Ribasim results." + ) + parser.add_argument( + "toml_path", type=Path, help="The path to the Ribasim TOML file." + ) + parser.add_argument( + "--output_path", + type=Path, + help="The relative path to store the Delwaq model.", + default="delwaq", + ) + args = parser.parse_args() + + graph, substances = generate( + args.toml_path, args.toml_path.parent / args.output_path + ) diff --git a/python/ribasim/ribasim/delwaq/template/delwaq.inp.j2 b/python/ribasim/ribasim/delwaq/template/delwaq.inp.j2 index f9f9793d7..9ff923fa8 100644 --- a/python/ribasim/ribasim/delwaq/template/delwaq.inp.j2 +++ b/python/ribasim/ribasim/delwaq/template/delwaq.inp.j2 @@ -77,7 +77,7 @@ INCLUDE 'ribasim.atr' ; From UI: attributes file 1.0 1.0 1.0 ; Scale factors for 3 directions ; Default dispersion: -1 0.0 1E-07 ; constant dispersion +0.0 0.0 0.0 ; constant dispersion ; Area file -2 ; areas will be interpolated from a binary file diff --git a/python/ribasim/ribasim/nodes/user_demand.py b/python/ribasim/ribasim/nodes/user_demand.py index 362848e5f..eb346e2dd 100644 --- a/python/ribasim/ribasim/nodes/user_demand.py +++ b/python/ribasim/ribasim/nodes/user_demand.py @@ -1,5 +1,6 @@ from ribasim.input_base import TableModel from ribasim.schemas import ( + UserDemandConcentrationSchema, UserDemandStaticSchema, UserDemandTimeSchema, ) @@ -13,3 +14,7 @@ class Static(TableModel[UserDemandStaticSchema]): class Time(TableModel[UserDemandTimeSchema]): pass + + +class Concentration(TableModel[UserDemandConcentrationSchema]): + pass diff --git a/python/ribasim/ribasim/schemas.py b/python/ribasim/ribasim/schemas.py index ffefdb124..2bcd22222 100644 --- a/python/ribasim/ribasim/schemas.py +++ b/python/ribasim/ribasim/schemas.py @@ -566,6 +566,22 @@ class TabulatedRatingCurveTimeSchema(_BaseSchema): ) +class UserDemandConcentrationSchema(_BaseSchema): + fid: Index[Int32] = pa.Field(default=1, check_name=True, coerce=True) + node_id: Series[Annotated[pd.ArrowDtype, pyarrow.int32()]] = pa.Field( + nullable=False, default=0 + ) + time: Series[Annotated[pd.ArrowDtype, pyarrow.timestamp("ms")]] = pa.Field( + nullable=False + ) + substance: Series[Annotated[pd.ArrowDtype, pyarrow.string()]] = pa.Field( + nullable=False + ) + concentration: Series[Annotated[pd.ArrowDtype, pyarrow.float64()]] = pa.Field( + nullable=False + ) + + class UserDemandStaticSchema(_BaseSchema): fid: Index[Int32] = pa.Field(default=1, check_name=True, coerce=True) node_id: Series[Annotated[pd.ArrowDtype, pyarrow.int32()]] = pa.Field( diff --git a/python/ribasim_testmodels/ribasim_testmodels/__init__.py b/python/ribasim_testmodels/ribasim_testmodels/__init__.py index 05086c006..bb065778b 100644 --- a/python/ribasim_testmodels/ribasim_testmodels/__init__.py +++ b/python/ribasim_testmodels/ribasim_testmodels/__init__.py @@ -32,6 +32,7 @@ compound_variable_condition_model, concentration_condition_model, connector_node_flow_condition_model, + continuous_concentration_condition_model, flow_condition_model, level_boundary_condition_model, level_range_model, @@ -70,6 +71,7 @@ "bucket_model", "compound_variable_condition_model", "concentration_condition_model", + "continuous_concentration_condition_model", "connector_node_flow_condition_model", "discrete_control_of_pid_control_model", "fair_distribution_model", diff --git a/python/ribasim_testmodels/ribasim_testmodels/discrete_control.py b/python/ribasim_testmodels/ribasim_testmodels/discrete_control.py index f042b68a9..745efc563 100644 --- a/python/ribasim_testmodels/ribasim_testmodels/discrete_control.py +++ b/python/ribasim_testmodels/ribasim_testmodels/discrete_control.py @@ -581,3 +581,111 @@ def concentration_condition_model() -> Model: model.edge.add(model.discrete_control[4], model.pump[2]) return model + + +def continuous_concentration_condition_model() -> Model: + """ + Set up a minimal model with discrete control based on a + continuous (calculated) concentration condition. + In this case, we setup a salt concentration and mimic + the Dutch coast. + + dc + / | + lb --> lr -> basin <-- fb + | + out + | + term + """ + + model = Model( + starttime="2020-01-01", + endtime="2020-02-01", + crs="EPSG:28992", + solver=Solver(saveat=86400 / 8), + ) + + basi = model.basin.add( + Node(1, Point(0, 0)), + [ + basin.Profile(area=[10.0, 100.0], level=[0.0, 1.0]), + basin.State(level=[10.0]), + basin.ConcentrationState( + substance=["Cl"], + concentration=[35.0], # slightly salty start + ), + basin.Concentration( + time=pd.date_range( + start="2020-01-01", end="2021-01-01", periods=10, unit="ms" + ), + substance="Bar", + precipitation=0.1, + ), + ], + ) + + linearr = model.linear_resistance.add( + Node(2, Point(-1, 0)), + [ + linear_resistance.Static( + control_state=["On", "Off"], + resistance=[0.001, 10], + max_flow_rate=[0.2, 0.0001], + ) + ], + ) + + levelb = model.level_boundary.add( + Node(3, Point(-2, 0)), + [ + level_boundary.Static(level=[35.0]), + level_boundary.Concentration( + time=pd.date_range( + start="2020-01-01", end="2021-01-01", periods=10, unit="ms" + ), + substance="Cl", + concentration=35.0, + ), + ], + ) + + flowb = model.flow_boundary.add( + Node(4, Point(1, 0)), + [ + flow_boundary.Static(flow_rate=[0.1]), + flow_boundary.Concentration( + time=pd.date_range( + start="2020-01-01", end="2021-01-01", periods=11, unit="ms" + ), + substance="Foo", + concentration=1.0, + ), + ], + ) + + discretec = model.discrete_control.add( + Node(5, Point(0, 0.5)), + [ + discrete_control.Variable( + listen_node_id=[1], + variable=["concentration.Cl"], + compound_variable_id=1, + ), + # More than 20% of seawater (35 g/L) + discrete_control.Condition(greater_than=[7], compound_variable_id=1), + discrete_control.Logic(truth_state=["T", "F"], control_state=["Off", "On"]), + ], + ) + + outl = model.outlet.add(Node(6, Point(0, -0.5)), [outlet.Static(flow_rate=[0.11])]) + term = model.terminal.add(Node(7, Point(0, -1))) + + model.edge.add(levelb, linearr) + model.edge.add(linearr, basi) + model.edge.add(flowb, basi) + model.edge.add(discretec, linearr) + model.edge.add(basi, outl) + model.edge.add(outl, term) + + return model diff --git a/utils/generate-testmodels.py b/utils/generate-testmodels.py index 89b2765fe..58bb08114 100644 --- a/utils/generate-testmodels.py +++ b/utils/generate-testmodels.py @@ -20,7 +20,8 @@ def generate_model(args, datadir): if __name__ == "__main__": datadir = Path("generated_testmodels") - if datadir.is_dir(): + # Don't remove all models if we only (re)generate a subset + if datadir.is_dir() and len(sys.argv) == 0: shutil.rmtree(datadir, ignore_errors=True) datadir.mkdir(exist_ok=True)