From c61348eef7d2296fa5d78a06528a52c439ca5d4f Mon Sep 17 00:00:00 2001 From: Maarten Pronk Date: Mon, 12 Feb 2024 10:32:29 +0100 Subject: [PATCH] Implement external timeseries and adds concentration field to Basin. --- core/src/callback.jl | 15 ++++++- core/src/parameter.jl | 12 +++++ core/src/read.jl | 44 ++++++++++++++++++- core/src/schema.jl | 8 ++++ core/src/util.jl | 22 ++++++++-- core/test/utils_test.jl | 4 ++ python/ribasim/ribasim/__init__.py | 2 + python/ribasim/ribasim/config.py | 8 ++++ python/ribasim/ribasim/model.py | 2 + python/ribasim/ribasim/schemas.py | 7 +++ .../ribasim_testmodels/discrete_control.py | 12 ++++- 11 files changed, 129 insertions(+), 7 deletions(-) diff --git a/core/src/callback.jl b/core/src/callback.jl index 62bb8e380..9960aaec7 100644 --- a/core/src/callback.jl +++ b/core/src/callback.jl @@ -120,7 +120,7 @@ function get_value( u::AbstractVector{Float64}, t::Float64, ) - (; basin, flow_boundary, level_boundary) = p + (; basin, flow_boundary, level_boundary, external) = p if variable == "level" hasindex_basin, basin_idx = id_index(basin.node_id, node_id) @@ -146,6 +146,19 @@ function get_value( end value = flow_boundary.flow_rate[flow_boundary_idx](t + Δt) + + elseif variable == "concentration" + hasindex_basin, basin_idx = id_index(basin.node_id, node_id) + + if hasindex_basin + value = basin.concentration[basin_idx](t + Δt) + else + error("Concentration condition node '$node_id' is not a basin.") + end + + elseif variable == "external" + value = external.external(t + Δt) + else error("Unsupported condition variable $variable.") end diff --git a/core/src/parameter.jl b/core/src/parameter.jl index da492b245..5a5bc9baf 100644 --- a/core/src/parameter.jl +++ b/core/src/parameter.jl @@ -130,6 +130,7 @@ struct Basin{T, C} <: AbstractParameterNode area::Vector{Vector{Float64}} level::Vector{Vector{Float64}} storage::Vector{Vector{Float64}} + concentration::Vector{ScalarInterpolation} # data source for parameter updates time::StructVector{BasinTimeV1, C, Int} @@ -144,6 +145,7 @@ struct Basin{T, C} <: AbstractParameterNode area, level, storage, + concentration, time::StructVector{BasinTimeV1, C, Int}, ) where {T, C} is_valid = valid_profiles(node_id, level, area) @@ -159,6 +161,7 @@ struct Basin{T, C} <: AbstractParameterNode area, level, storage, + concentration, time, ) end @@ -504,6 +507,14 @@ struct Subgrid level::Vector{Float64} end +"""An external interpolated timeseries. + +Can store anything, and doesn't belong to a node. +""" +struct External + external::ScalarInterpolation +end + # TODO Automatically add all nodetypes here struct Parameters{T, C1, C2} starttime::DateTime @@ -541,4 +552,5 @@ struct Parameters{T, C1, C2} user::User lookup::Dict{Int, Symbol} subgrid::Subgrid + external::External end diff --git a/core/src/read.jl b/core/src/read.jl index 39a24ef12..198c51203 100644 --- a/core/src/read.jl +++ b/core/src/read.jl @@ -158,6 +158,7 @@ function parse_static_and_time( else # If the parameter is not interpolatable, get the instance in the first row val = getfield(time_subset[time_first_idx], parameter_name) + val = ismissing(val) ? defaults[parameter_name] : val end end getfield(out, parameter_name)[node_idx] = val @@ -510,6 +511,30 @@ function Basin(db::DB, config::Config, chunk_sizes::Vector{Int})::Basin static = load_structvector(db, config, BasinStaticV1) time = load_structvector(db, config, BasinTimeV1) + # TODO Parsing everything here is overkill + # as we only want a vector of interpolatables for concentration + time_interpolatables = [:concentration] + parsed_parameters, valid = parse_static_and_time( + db, + config, + "Basin"; + static, + time, + time_interpolatables, + defaults = (; + precipitation = 0.0, + potential_evaporation = 0.0, + drainage = 0.0, + infiltration = 0.0, + urban_runoff = 0.0, + concentration = 0.0, + ), + ) + if !valid + error("Errors occurred when parsing Basin data.") + end + concentration = parsed_parameters.concentration + set_static_value!(table, node_id, static) set_current_value!(table, node_id, time, config.starttime) check_no_nans(table, "Basin") @@ -525,6 +550,7 @@ function Basin(db::DB, config::Config, chunk_sizes::Vector{Int})::Basin area, level, storage, + concentration, time, ) end @@ -797,6 +823,21 @@ function Subgrid(db::DB, config::Config, basin::Basin)::Subgrid return Subgrid(basin_ids, interpolations, fill(NaN, length(basin_ids))) end +function External(db::DB, config::Config)::External + time = load_structvector(db, config, ExternalTimeV1) + + if isempty(time) + push!(time, ExternalTimeV1(; time = config.starttime, external = NaN)) + end + t_end = seconds_since(config.endtime, config.starttime) + val, is_valid = + get_scalar_interpolation(config.starttime, t_end, time.time, time.external, NaN) + if !is_valid + @error "The external time series has duplicate timestamps." + end + return External(val) +end + """ Get the chunk sizes for DiffCache; differentiation w.r.t. u and t (the latter only if a Rosenbrock algorithm is used). @@ -848,9 +889,9 @@ function Parameters(db::DB, config::Config)::Parameters discrete_control = DiscreteControl(db, config) pid_control = PidControl(db, config, chunk_sizes) user = User(db, config) - basin = Basin(db, config, chunk_sizes) subgrid_level = Subgrid(db, config, basin) + external = External(db, config) # Set is_pid_controlled to true for those pumps and outlets that are PID controlled for id in pid_control.node_id @@ -883,6 +924,7 @@ function Parameters(db::DB, config::Config)::Parameters user, Dict{Int, Symbol}(), subgrid_level, + external, ) if !valid_n_neighbors(p) diff --git a/core/src/schema.jl b/core/src/schema.jl index da9c61421..891ad7124 100644 --- a/core/src/schema.jl +++ b/core/src/schema.jl @@ -25,6 +25,7 @@ @schema "ribasim.outlet.static" OutletStatic @schema "ribasim.user.static" UserStatic @schema "ribasim.user.time" UserTime +@schema "ribasim.external.time" ExternalTime const delimiter = " / " tablename(sv::Type{SchemaVersion{T, N}}) where {T, N} = tablename(sv()) @@ -100,6 +101,7 @@ end infiltration::Union{Missing, Float64} precipitation::Union{Missing, Float64} urban_runoff::Union{Missing, Float64} + concentration::Union{Missing, Float64} end @version BasinTimeV1 begin @@ -110,6 +112,7 @@ end infiltration::Union{Missing, Float64} precipitation::Union{Missing, Float64} urban_runoff::Union{Missing, Float64} + concentration::Union{Missing, Float64} end @version BasinProfileV1 begin @@ -250,3 +253,8 @@ end min_level::Float64 priority::Int end + +@version ExternalTimeV1 begin + time::DateTime + external::Float64 +end diff --git a/core/src/util.jl b/core/src/util.jl index ba3c37fe4..5e175243d 100644 --- a/core/src/util.jl +++ b/core/src/util.jl @@ -214,9 +214,25 @@ function get_scalar_interpolation( default_value::Float64 = 0.0, )::Tuple{LinearInterpolation, Bool} rows = searchsorted(time.node_id, node_id) - parameter = getfield.(time, param)[rows] - parameter = coalesce(parameter, default_value) - times = seconds_since.(time.time[rows], starttime) + values = getfield.(time, param)[rows] + return get_scalar_interpolation( + starttime, + t_end, + time.time[rows], + values, + default_value, + ) +end + +function get_scalar_interpolation( + starttime::DateTime, + t_end::Float64, + time::AbstractVector, + values::AbstractVector, + default_value::Float64 = 0.0, +)::Tuple{LinearInterpolation, Bool} + parameter = coalesce(values, default_value) + times = seconds_since.(time, starttime) # Add extra timestep at start for constant extrapolation if times[1] > 0 pushfirst!(times, 0.0) diff --git a/core/test/utils_test.jl b/core/test/utils_test.jl index d4263a878..f4a4c1ff4 100644 --- a/core/test/utils_test.jl +++ b/core/test/utils_test.jl @@ -29,6 +29,7 @@ end @testitem "bottom" begin using Dictionaries: Indices using StructArrays: StructVector + using DataInterpolations # create two basins with different bottoms/levels area = [[0.01, 1.0], [0.01, 1.0]] @@ -46,6 +47,7 @@ end area, level, storage, + [LinearInterpolation([0.0, 0.0], [0.0, 0.0])], StructVector{Ribasim.BasinTimeV1}(undef, 0), ) @@ -83,6 +85,7 @@ end using Dictionaries: Indices using StructArrays: StructVector using Logging + using DataInterpolations level = [ 0.0, @@ -120,6 +123,7 @@ end [area], [level], [storage], + [LinearInterpolation([0.0, 0.0], [0.0, 0.0])], StructVector{Ribasim.BasinTimeV1}(undef, 0), ) diff --git a/python/ribasim/ribasim/__init__.py b/python/ribasim/ribasim/__init__.py index c788eee55..6bae67db3 100644 --- a/python/ribasim/ribasim/__init__.py +++ b/python/ribasim/ribasim/__init__.py @@ -7,6 +7,7 @@ Basin, Compression, DiscreteControl, + External, FlowBoundary, FractionalFlow, LevelBoundary, @@ -34,6 +35,7 @@ "Compression", "Edge", "EdgeSchema", + "External", "FlowBoundary", "FractionalFlow", "Results", diff --git a/python/ribasim/ribasim/config.py b/python/ribasim/ribasim/config.py index 1bffe0562..f90719ae6 100644 --- a/python/ribasim/ribasim/config.py +++ b/python/ribasim/ribasim/config.py @@ -13,6 +13,7 @@ BasinTimeSchema, DiscreteControlConditionSchema, DiscreteControlLogicSchema, + ExternalTimeSchema, FlowBoundaryStaticSchema, FlowBoundaryTimeSchema, FractionalFlowStaticSchema, @@ -208,3 +209,10 @@ class FractionalFlow(NodeModel): default_factory=TableModel[FractionalFlowStaticSchema], json_schema_extra={"sort_keys": ["node_id", "control_state"]}, ) + + +class External(NodeModel): + time: TableModel[ExternalTimeSchema] = Field( + default_factory=TableModel[ExternalTimeSchema], + json_schema_extra={"sort_keys": ["time", "external"]}, + ) diff --git a/python/ribasim/ribasim/model.py b/python/ribasim/ribasim/model.py index 4f1188409..829256018 100644 --- a/python/ribasim/ribasim/model.py +++ b/python/ribasim/ribasim/model.py @@ -20,6 +20,7 @@ Allocation, Basin, DiscreteControl, + External, FlowBoundary, FractionalFlow, LevelBoundary, @@ -182,6 +183,7 @@ class Model(FileModel): discrete_control: DiscreteControl = Field(default_factory=DiscreteControl) pid_control: PidControl = Field(default_factory=PidControl) user: User = Field(default_factory=User) + external: External = Field(default_factory=External) @model_validator(mode="after") def set_node_parent(self) -> "Model": diff --git a/python/ribasim/ribasim/schemas.py b/python/ribasim/ribasim/schemas.py index c907d6d54..e4bc01c3e 100644 --- a/python/ribasim/ribasim/schemas.py +++ b/python/ribasim/ribasim/schemas.py @@ -29,6 +29,7 @@ class BasinStaticSchema(_BaseSchema): infiltration: Series[float] = pa.Field(nullable=True) precipitation: Series[float] = pa.Field(nullable=True) urban_runoff: Series[float] = pa.Field(nullable=True) + concentration: Series[float] = pa.Field(nullable=True) class BasinSubgridSchema(_BaseSchema): @@ -46,6 +47,7 @@ class BasinTimeSchema(_BaseSchema): infiltration: Series[float] = pa.Field(nullable=True) precipitation: Series[float] = pa.Field(nullable=True) urban_runoff: Series[float] = pa.Field(nullable=True) + concentration: Series[float] = pa.Field(nullable=True) class DiscreteControlConditionSchema(_BaseSchema): @@ -71,6 +73,11 @@ class EdgeSchema(_BaseSchema): allocation_network_id: Series[int] = pa.Field(nullable=True) +class ExternalTimeSchema(_BaseSchema): + time: Series[Timestamp] = pa.Field(nullable=False) + external: Series[float] = pa.Field(nullable=False) + + class FlowBoundaryStaticSchema(_BaseSchema): node_id: Series[int] = pa.Field(nullable=False) active: Series[pa.BOOL] = pa.Field(nullable=True) diff --git a/python/ribasim_testmodels/ribasim_testmodels/discrete_control.py b/python/ribasim_testmodels/ribasim_testmodels/discrete_control.py index a1bb7f88c..2c76627a2 100644 --- a/python/ribasim_testmodels/ribasim_testmodels/discrete_control.py +++ b/python/ribasim_testmodels/ribasim_testmodels/discrete_control.py @@ -1,3 +1,5 @@ +from datetime import datetime + import geopandas as gpd import numpy as np import pandas as pd @@ -75,19 +77,24 @@ def pump_discrete_control_model() -> ribasim.Model: "infiltration": [0.0] * 2, "precipitation": [0.0] * 2, "urban_runoff": [0.0] * 2, + "concentration": [0.0] * 2, } ) state = pd.DataFrame(data={"node_id": [1, 3], "level": [1.0, 1e-5]}) + external = ribasim.External( + time=pd.DataFrame(data={"time": [datetime(2020, 1, 1)], "external": [0.0]}) + ) + basin = ribasim.Basin(profile=profile, static=static, state=state) # Setup the discrete control: condition = pd.DataFrame( data={ "node_id": [5, 5, 6], - "listen_feature_id": [1, 3, 3], - "variable": ["level", "level", "level"], + "listen_feature_id": [1, 0, 3], + "variable": ["level", "external", "concentration"], "greater_than": [0.8, 0.4, 0.45], } ) @@ -141,6 +148,7 @@ def pump_discrete_control_model() -> ribasim.Model: linear_resistance=linear_resistance, pump=pump, discrete_control=discrete_control, + external=external, starttime="2020-01-01 00:00:00", endtime="2021-01-01 00:00:00", )