Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read some interpolation tables into the Model struct #674

Merged
merged 43 commits into from
Nov 24, 2023
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
5096122
Read some interpolation tables into the Model struct
Huite Oct 17, 2023
5373322
Merge branch 'main' into interpolate-level
Huite Nov 15, 2023
7d6e552
format, process comments
Huite Nov 15, 2023
4e3a42b
Update LevelExporter to new Python API
Huite Nov 15, 2023
2c5a384
Make LevelExporter part of Basin
Huite Nov 17, 2023
b3829d1
Merge branch 'main' into interpolate-level
Huite Nov 17, 2023
0cab52a
Move LevelExporter into Parameters
Huite Nov 17, 2023
256c1e0
Get level interpolation running (currently at preset daily times)
Huite Nov 17, 2023
6e98867
Write exported levels to output arrow
Huite Nov 17, 2023
17f2b6e
Allow outputting of multiple systems for exported_levels
Huite Nov 17, 2023
f465a15
lower case basin exporter in model root
Huite Nov 20, 2023
4e348a0
Re-enable logging
Huite Nov 20, 2023
3e30179
Set debug to info for more readable terminal output while running tests
Huite Nov 20, 2023
9456cd1
Add validation for LevelExporter
Huite Nov 20, 2023
db4b6d9
Rename exporter to subgrid; Add validation; Make subgrids optional vi…
Huite Nov 20, 2023
263d5a6
Run codegen
Huite Nov 20, 2023
96a8d25
Update docs: usage.qmd
Huite Nov 20, 2023
973e0e6
Merge branch 'main' into interpolate-level
Huite Nov 20, 2023
b3b376a
Update docs/core/usage.qmd
Huite Nov 21, 2023
3382652
Enable extrapolation on subgrid level interpolation
Huite Nov 21, 2023
75b91fa
Merge branch 'main' into interpolate-level
Huite Nov 21, 2023
332aa57
subgrid levels not optional in expectation of PR 815
Huite Nov 22, 2023
d5d21b4
testset -> testitem for new test
Huite Nov 22, 2023
0dd9670
fixes
visr Nov 23, 2023
cdb0ea8
sentence per line
visr Nov 23, 2023
50e5246
Clarify name
visr Nov 23, 2023
f9a5980
rename to Basin / subgrid_level
visr Nov 23, 2023
be49933
no need for isempty
visr Nov 23, 2023
547cfb0
Expose subgrid_levels via BMI
Huite Nov 24, 2023
36bdd2e
Simplify & make naming consistent
Huite Nov 24, 2023
9313b76
Merge branch 'main' into interpolate-level
Huite Nov 24, 2023
b8d99e4
Add subgrid_levels to RESULTS_FILENAME
Huite Nov 24, 2023
ac58be5
Make subgrid levels computation option via Results config
Huite Nov 24, 2023
f836404
Merge branch 'main' into interpolate-level
visr Nov 24, 2023
359814c
update nodes.py
visr Nov 24, 2023
232d3c0
avoid splatting
visr Nov 24, 2023
6236211
Write out return type
visr Nov 24, 2023
55021b4
log errors directly
visr Nov 24, 2023
1268594
remove redundant sorting key
visr Nov 24, 2023
79c3c4e
pixi run codegen
visr Nov 24, 2023
38c3a5f
update last validation function to log errors directly
visr Nov 24, 2023
7cdcbb8
fix runstats.jl
visr Nov 24, 2023
5ab0091
Fix QGIS plugin in Python 3.9
visr Nov 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 38 additions & 4 deletions core/src/bmi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ function BMI.initialize(T::Type{Model}, config::Config)::Model
# use state
state = load_structvector(db, config, BasinStateV1)
n = length(get_ids(db, "Basin"))

finally
# always close the database, also in case of an error
close(db)
Expand Down Expand Up @@ -105,7 +106,7 @@ function BMI.initialize(T::Type{Model}, config::Config)::Model
end
@debug "Setup ODEProblem."

callback, saved_flow = create_callbacks(parameters, config; config.solver.saveat)
callback, saved = create_callbacks(parameters, config; config.solver.saveat)
@debug "Created callbacks."

# Initialize the integrator, providing all solver options as described in
Expand Down Expand Up @@ -139,7 +140,7 @@ function BMI.initialize(T::Type{Model}, config::Config)::Model

set_initial_discrete_controlled_parameters!(integrator, storage)

return Model(integrator, config, saved_flow)
return Model(integrator, config, saved)
end

"""
Expand Down Expand Up @@ -172,6 +173,11 @@ function BMI.finalize(model::Model)::Model
path = results_path(config, results.allocation)
write_arrow(path, table, compress)

# exported levels
table = subgrid_level_table(model)
path = results_path(config, results.subgrid_level)
write_arrow(path, table, compress)

@debug "Wrote results."
return model
end
Expand Down Expand Up @@ -206,7 +212,7 @@ function create_callbacks(
parameters::Parameters,
config::Config;
saveat,
)::Tuple{CallbackSet, SavedValues{Float64, Vector{Float64}}}
)::Tuple{CallbackSet, SavedResults}
(; starttime, basin, tabulated_rating_curve, discrete_control) = parameters
callbacks = SciMLBase.DECallback[]

Expand All @@ -232,6 +238,14 @@ function create_callbacks(
save_flow_cb = SavingCallback(save_flow, saved_flow; saveat, save_start = false)
push!(callbacks, save_flow_cb)

# interpolate the levels
saved_subgrid_level = SavedValues(Float64, Vector{Float64})
export_cb =
SavingCallback(save_subgrid_level, saved_subgrid_level; saveat, save_start = false)
push!(callbacks, export_cb)

saved = SavedResults(saved_flow, saved_subgrid_level)

n_conditions = length(discrete_control.node_id)
if n_conditions > 0
discrete_control_cb = VectorContinuousCallback(
Expand All @@ -244,7 +258,7 @@ function create_callbacks(
end
callback = CallbackSet(callbacks...)

return callback, saved_flow
return callback, saved
end

"""
Expand Down Expand Up @@ -480,6 +494,26 @@ function save_flow(u, t, integrator)
copy(nonzeros(get_tmp(integrator.p.connectivity.flow, u)))
end

function update_subgrid_level!(integrator)::Nothing
parameters = integrator.p
basin_level = get_tmp(parameters.basin.current_level, 0)

for exporter in values(parameters.subgrid_exporters)
for (i, (index, interp)) in
enumerate(zip(exporter.basin_index, exporter.interpolations))
exporter.subgrid_level[i] = interp(basin_level[index])
end
end
end

"Interpolate the levels and save them to SavedValues"
function save_subgrid_level(u, t, integrator)
update_subgrid_level!(integrator)
return vcat(
[exporter.subgrid_level for exporter in values(integrator.p.subgrid_exporters)]...,
)
visr marked this conversation as resolved.
Show resolved Hide resolved
end

"Load updates from 'Basin / time' into the parameters"
function update_basin(integrator)::Nothing
(; basin) = integrator.p
Expand Down
1 change: 1 addition & 0 deletions core/src/config.jl
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ end
flow::String = "results/flow.arrow"
control::String = "results/control.arrow"
allocation::String = "results/allocation.arrow"
subgrid_level::String = "results/subgrid_level.arrow"
outstate::Union{String, Nothing} = nothing
compression::Compression = "zstd"
compression_level::Int = 6
Expand Down
61 changes: 61 additions & 0 deletions core/src/create.jl
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,64 @@ function User(db::DB, config::Config)::User
)
end

function SubgridExporter(tables, name, node_to_basin::Dict{Int, Int})::SubgridExporter
basin_ids = Int[]
interpolations = ScalarInterpolation[]

errors = String[]
for group in IterTools.groupby(row -> row.subgrid_id, tables)
subgrid_id = first(getproperty.(group, :subgrid_id))
node_id = first(getproperty.(group, :node_id))
basin_level = getproperty.(group, :basin_level)
subgrid_level = getproperty.(group, :subgrid_level)

group_errors = valid_subgrid_exporter(
subgrid_id,
node_id,
node_to_basin,
basin_level,
subgrid_level,
)

if isempty(group_errors)
# Ensure it doesn't extrapolate before the first value.
new_interp = LinearInterpolation(
[subgrid_level[1], subgrid_level...],
[prevfloat(basin_level[1]), basin_level...];
extrapolate = true,
)
push!(basin_ids, node_to_basin[node_id])
push!(interpolations, new_interp)
else
append!(errors, group_errors)
end
end

if isempty(errors)
return SubgridExporter(basin_ids, interpolations, fill(NaN, length(basin_ids)))
else
foreach(x -> @error(x), errors)
error(
"Errors occurred while parsing Basin / subgrid_level data for group with name: $(name).",
)
end
end

function create_subgrid_exporters(
db::DB,
config::Config,
basin::Basin,
)::Dict{String, SubgridExporter}
subgrid_exporters = Dict{String, SubgridExporter}()
node_to_basin = Dict(node_id => index for (index, node_id) in enumerate(basin.node_id))
tables = load_structvector(db, config, BasinSubgridLevelV1)
for group in IterTools.groupby(row -> row.name, tables)
name = first(getproperty.(group, :name))
subgrid_exporters[name] = SubgridExporter(group, name, node_to_basin)
end
return subgrid_exporters
end

function Parameters(db::DB, config::Config)::Parameters
n_states = length(get_ids(db, "Basin")) + length(get_ids(db, "PidControl"))
chunk_size = pickchunksize(n_states)
Expand All @@ -832,6 +890,8 @@ function Parameters(db::DB, config::Config)::Parameters

basin = Basin(db, config, chunk_size)

subgrid_exporters = create_subgrid_exporters(db, config, basin)

# Set is_pid_controlled to true for those pumps and outlets that are PID controlled
for id in pid_control.node_id
id_controlled = only(outneighbors(connectivity.graph_control, id))
Expand Down Expand Up @@ -861,6 +921,7 @@ function Parameters(db::DB, config::Config)::Parameters
pid_control,
user,
Dict{Int, Symbol}(),
subgrid_exporters,
)
for (fieldname, fieldtype) in zip(fieldnames(Parameters), fieldtypes(Parameters))
if fieldtype <: AbstractParameterNode
Expand Down
26 changes: 24 additions & 2 deletions core/src/io.jl
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ end

"Create a flow result table from the saved data"
function flow_table(model::Model)::NamedTuple
(; config, saved_flow, integrator) = model
(; t, saveval) = saved_flow
(; config, saved, integrator) = model
(; t, saveval) = saved.flow
(; connectivity) = integrator.p

I, J, _ = findnz(get_tmp(connectivity.flow, integrator.u))
Expand Down Expand Up @@ -227,6 +227,28 @@ function allocation_table(model::Model)::NamedTuple
)
end

function subgrid_level_table(model::Model)::NamedTuple
(; config, saved, integrator) = model
(; t, saveval) = saved.subgrid_level

# The level exporter may contain multiple named systems, but the
# saved levels are flat.
time = DateTime[]
name = String[]
subgrid_id = Int[]
for (unique_name, exporter) in integrator.p.subgrid_exporters
nelem = length(exporter.basin_index)
unique_elem_id = collect(1:nelem)
ntsteps = length(t)
append!(time, repeat(datetime_since.(t, config.starttime); inner = nelem))
append!(subgrid_id, repeat(unique_elem_id; outer = ntsteps))
append!(name, fill(unique_name, length(time)))
end

subgrid_level = FlatVector(saveval)
return (; time, name, subgrid_id, subgrid_level)
end

"Write a result table to disk as an Arrow file"
function write_arrow(
path::AbstractString,
Expand Down
11 changes: 8 additions & 3 deletions core/src/lib.jl
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
struct SavedResults
flow::SavedValues{Float64, Vector{Float64}}
subgrid_level::SavedValues{Float64, Vector{Float64}}
end

"""
Model(config_path::AbstractString)
Model(config::Config)
Expand All @@ -11,13 +16,13 @@ A Model can be created from the path to a TOML configuration file, or a Config o
struct Model{T}
integrator::T
config::Config
saved_flow::SavedValues{Float64, Vector{Float64}}
saved::SavedResults
function Model(
integrator::T,
config,
saved_flow,
saved,
) where {T <: SciMLBase.AbstractODEIntegrator}
new{T}(integrator, config, saved_flow)
new{T}(integrator, config, saved)
end
end

Expand Down
8 changes: 8 additions & 0 deletions core/src/solve.jl
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,13 @@ struct User <: AbstractParameterNode
}
end

"SubgridExporter linearly interpolates basin levels."
struct SubgridExporter
basin_index::Vector{Int}
interpolations::Vector{ScalarInterpolation}
subgrid_level::Vector{Float64}
end

# TODO Automatically add all nodetypes here
struct Parameters{T, TSparse, C1, C2}
starttime::DateTime
Expand All @@ -485,6 +492,7 @@ struct Parameters{T, TSparse, C1, C2}
pid_control::PidControl{T}
user::User
lookup::Dict{Int, Symbol}
subgrid_exporters::Dict{String, SubgridExporter}
end

"""
Expand Down
56 changes: 53 additions & 3 deletions core/src/validation.jl
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# These schemas define the name of database tables and the configuration file structure
# The identifier is parsed as ribasim.nodetype.kind, no capitals or underscores are allowed.
# If the kind consists of multiple words, these can also be split with extra dots,
# such that from the "subgrid.level" schema we get "subgrid_level".
@schema "ribasim.node" Node
@schema "ribasim.edge" Edge
@schema "ribasim.discretecontrol.condition" DiscreteControlCondition
Expand All @@ -8,6 +10,7 @@
@schema "ribasim.basin.time" BasinTime
@schema "ribasim.basin.profile" BasinProfile
@schema "ribasim.basin.state" BasinState
@schema "ribasim.basin.subgrid.level" BasinSubgridLevel
@schema "ribasim.terminal.static" TerminalStatic
@schema "ribasim.fractionalflow.static" FractionalFlowStatic
@schema "ribasim.flowboundary.static" FlowBoundaryStatic
Expand All @@ -30,7 +33,7 @@ tablename(sv::Type{SchemaVersion{T, N}}) where {T, N} = tablename(sv())
tablename(sv::SchemaVersion{T, N}) where {T, N} =
join(filter(!isnothing, nodetype(sv)), delimiter)
isnode(sv::Type{SchemaVersion{T, N}}) where {T, N} = isnode(sv())
isnode(::SchemaVersion{T, N}) where {T, N} = length(split(string(T), ".")) == 3
isnode(::SchemaVersion{T, N}) where {T, N} = length(split(string(T), '.'; limit = 3)) == 3
nodetype(sv::Type{SchemaVersion{T, N}}) where {T, N} = nodetype(sv())

"""
Expand All @@ -43,9 +46,9 @@ function nodetype(
# so we parse the related record Ribasim.BasinTimeV1
# to derive BasinTime from it.
record = Legolas.record_type(sv)
node = last(split(string(Symbol(record)), "."))
node = last(split(string(Symbol(record)), '.'; limit = 3))

elements = split(string(T), ".")
elements = split(string(T), '.'; limit = 3)
if isnode(sv)
n = elements[2]
k = Symbol(elements[3])
Expand Down Expand Up @@ -202,6 +205,14 @@ end
level::Float64
end

@version BasinSubgridLevelV1 begin
name::String
subgrid_id::Int
node_id::Int
basin_level::Float64
subgrid_level::Float64
end

@version FractionalFlowStaticV1 begin
node_id::Int
fraction::Float64
Expand Down Expand Up @@ -362,6 +373,7 @@ sort_by_id_level(row) = (row.node_id, row.level)
sort_by_id_state_level(row) = (row.node_id, row.control_state, row.level)
sort_by_priority(row) = (row.node_id, row.priority)
sort_by_priority_time(row) = (row.node_id, row.priority, row.time)
sort_by_exporter(row) = (row.name, row.subgrid_id, row.node_id, row.basin_level)

# get the right sort by function given the Schema, with sort_by_id as the default
sort_by_function(table::StructVector{<:Legolas.AbstractRecord}) = sort_by_id
Expand All @@ -371,6 +383,7 @@ sort_by_function(table::StructVector{TabulatedRatingCurveStaticV1}) = sort_by_id
sort_by_function(table::StructVector{BasinProfileV1}) = sort_by_id_level
sort_by_function(table::StructVector{UserStaticV1}) = sort_by_priority
sort_by_function(table::StructVector{UserTimeV1}) = sort_by_priority_time
sort_by_function(table::StructVector{BasinSubgridLevelV1}) = sort_by_exporter

const TimeSchemas = Union{
BasinTimeV1,
Expand Down Expand Up @@ -610,3 +623,40 @@ function valid_fractional_flow(
end
return !errors
end

"""
Validate the entries for a single subgrid element.
"""
function valid_subgrid_exporter(
subgrid_id::Int,
node_id::Int,
node_to_basin::Dict{Int, Int},
basin_level::Vector{Float64},
subgrid_level::Vector{Float64},
)
# The Schema ensures that the entries are sorted properly, so we do not need to validate the order here.
errors = String[]

if !(node_id in keys(node_to_basin))
push!(
errors,
"The node_id of the Basin / subgrid_level does not refer to a basin: node_id $(node_id) for subgrid_id $(subgrid_id).",
)
end

if !allunique(basin_level)
push!(
errors,
"Basin / subgrid_level subgrid_id $(subgrid_id) has repeated basin levels, this cannot be interpolated.",
)
end

if !allunique(subgrid_level)
push!(
errors,
"Basin / subgrid_level subgrid_id $(subgrid_id) has repeated element levels, this cannot be interpolated.",
)
end

return errors
end
Loading
Loading