diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 000000000..700707ced --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,7 @@ +# https://docs.github.com/github/administering-a-repository/configuration-options-for-dependency-updates +version: 2 +updates: + - package-ecosystem: "github-actions" + directory: "/" # Location of package manifests + schedule: + interval: "weekly" diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 5a94f1f7a..ca916af89 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -44,6 +44,7 @@ jobs: env: JULIA_NUM_THREADS: ${{ matrix.threads }} - uses: julia-actions/julia-processcoverage@v1 - - uses: codecov/codecov-action@v3 + - uses: codecov/codecov-action@v4 with: - file: lcov.info + files: lcov.info + token: ${{ secrets.CODECOV_TOKEN }} diff --git a/.github/workflows/CIWflowServer.yml b/.github/workflows/CIWflowServer.yml new file mode 100644 index 000000000..89e442d80 --- /dev/null +++ b/.github/workflows/CIWflowServer.yml @@ -0,0 +1,56 @@ +name: Wflow Server CI +on: + pull_request: + push: + branches: + - master + tags: '*' +jobs: + test: + name: WflowServer Julia ${{ matrix.version }} - ${{ matrix.os }} - ${{ matrix.arch }} - ${{ github.event_name }} + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + version: + - '1.6' + - '1' + os: + - ubuntu-latest + - windows-latest + arch: + - x64 + steps: + - uses: actions/checkout@v2 + - uses: julia-actions/setup-julia@v1 + with: + version: ${{ matrix.version }} + arch: ${{ matrix.arch }} + - uses: actions/cache@v1 + env: + cache-name: cache-artifacts + with: + path: ~/.julia/artifacts + key: ${{ runner.os }}-test-${{ env.cache-name }}-${{ hashFiles('**/Project.toml') }} + restore-keys: | + ${{ runner.os }}-test-${{ env.cache-name }}- + ${{ runner.os }}-test- + ${{ runner.os }}- + - name: Install Julia dependencies + shell: julia --project=server {0} + run: | + using Pkg; + # dev install Wflow + pkg"dev ." + - name: Run the Wflow Server tests + continue-on-error: true + run: > + julia --color=yes --project=server -e 'using Pkg; Pkg.test("WflowServer", coverage=true)' + shell: bash + - uses: julia-actions/julia-processcoverage@v1 + with: + directories: server/src + - uses: codecov/codecov-action@v4 + with: + files: lcov.info + token: ${{ secrets.CODECOV_TOKEN }} diff --git a/docs/src/user_guide/additional_options.md b/docs/src/user_guide/additional_options.md index 7862a5f7c..1f2bdcf57 100644 --- a/docs/src/user_guide/additional_options.md +++ b/docs/src/user_guide/additional_options.md @@ -289,3 +289,9 @@ expectations, which then can get parsed with these Delft-FEWS log parsing settin * [Debug] * ``` + +## [Run Wflow as a ZMQ Server] +It is possible to run Wflow as a ZMQ Server, for example for the coupling to the +[OpenDA](https://openda.org/) software for data-assimilation. The code for the Wflow ZMQ +Server is not part of the Wflow.jl package, and is located +[here](https://github.com/Deltares/Wflow.jl/tree/zmq_server/server). diff --git a/server/Project.toml b/server/Project.toml new file mode 100644 index 000000000..38bcca654 --- /dev/null +++ b/server/Project.toml @@ -0,0 +1,25 @@ +name = "WflowServer" +uuid = "6f3aa2dc-f527-41f4-85fc-b3cbabe74cf6" +authors = ["Deltares and contributors"] +version = "0.1.0" + +[deps] +JSON3 = "0f8b85d8-7281-11e9-16c2-39a750bddbf1" +Logging = "56ddb016-857b-54e1-b83d-db4d58db5568" +StructTypes = "856f2bd8-1eba-4b0a-8007-ebc267875bd4" +Wflow = "d48b7d99-76e7-47ae-b1d5-ff0c1cf9a818" +ZMQ = "c2297ded-f4af-51ae-bb23-16f91089e4e1" + +[compat] +JSON3 = "1.14" +StructTypes = "1.10" +ZMQ = "1.2" +julia = "1.6" + +[extras] +Downloads = "f43a241f-c20a-4ad4-852c-f6b1247861c6" +Statistics = "10745b16-79ce-11e8-11f9-7d13ad32a3b2" +Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" + +[targets] +test = ["Downloads", "Statistics", "Test"] diff --git a/server/README.md b/server/README.md new file mode 100644 index 000000000..8be13597a --- /dev/null +++ b/server/README.md @@ -0,0 +1,51 @@ +# Wflow ZMQ Server +Call [Wflow](https://github.com/Deltares/Wflow.jl) functions exposed through the Basic Model +Interface (BMI) implementation of Wflow and three additonal Wflow functions related to model +states and start time of the model in Unix time, using [ZeroMQ](https://zeromq.org/) with +TCP data transport. The Wflow ZMQ Server allows users to interact with a Wflow model from +many other programming languages with ZeroMQ bindings. An example is the use of +[OpenDA](https://openda.org/) Java software for data-assimilation. The coupling of OpenDA +and Wflow through a black box model approach is too slow because it requires a restart of +Wflow (multiple times), while this is not required with a client-server approach. + +The Wflow ZMQ Server can be started through an interactive Julia version, using different +functions. First, start the `WflowServer` environment on startup, in the current directory: +``` +julia --project=. +``` +then start the Wflow ZMQ Server, listening on port 5556 with the `start` function as +follows: +```julia-repl +julia> using WflowServer +julia> WflowServer.start(5556) +``` +or start the Wflow ZMQ Server, listening on port 5556 with the `main` function: +```julia-repl +julia> using WflowServer +julia> WflowServer.main(["port=5556"]) +``` +or start the Wflow ZMQ Server with the `main` function using the default port number 5555: +```julia-repl +julia> using WflowServer +julia> WflowServer.main() +``` +Finally, it is also possible to start the Wflow ZMQ server directly from the command line +with the Julia script `run_server.jl` in the current directory, as follows: +``` +julia --project=. run_server.jl --port 5556 +``` + +## JSON +JSON is used for data serialization. The Wflow ZMQ Server maps a JSON message to a Julia +structure directly. Below examples of two messages that provide a function name `fn` and +other arguments required by the exposed Wflow functions (mostly through BMI): + +``` +# initialize a Wflow model through the configuration file `sbm_config.toml` +"""{"fn": "initialize", "config_file": "sbm_config.toml"}""" +# update model until time 86400.0 +"""{"fn": "update_until", "time": 86400.0}""" +``` + +The [tests](/server/test/) provide further examples of the interface to the Wflow (BMI) and +server functions. diff --git a/server/run_server.jl b/server/run_server.jl new file mode 100644 index 000000000..421c1f4c5 --- /dev/null +++ b/server/run_server.jl @@ -0,0 +1,3 @@ +using WflowServer + +WflowServer.main() diff --git a/server/src/WflowServer.jl b/server/src/WflowServer.jl new file mode 100644 index 000000000..c96c7731e --- /dev/null +++ b/server/src/WflowServer.jl @@ -0,0 +1,9 @@ +module WflowServer +import ZMQ +import JSON3 +import StructTypes +import Wflow + +include("bmi_service.jl") +include("server.jl") +end diff --git a/server/src/bmi_service.jl b/server/src/bmi_service.jl new file mode 100644 index 000000000..3a344179a --- /dev/null +++ b/server/src/bmi_service.jl @@ -0,0 +1,357 @@ +# Structs that contain function name `fn` and possible other function arguments. These +# structs are used in `wflow_bmi` functions (below these structs), that run Wflow.BMI (and +# Wflow) functions. + +struct Initialize + config_file::String + fn::String +end + +struct GetComponentName + fn::String +end + +struct GetInputItemCount + fn::String +end + +struct GetOutputItemCount + fn::String +end + +struct GetStartTime + fn::String +end + +struct GetStartUnixTime + fn::String +end + +struct GetEndTime + fn::String +end + +struct GetTimeStep + fn::String +end + +struct GetTimeUnits + fn::String +end + +struct GetCurrentTime + fn::String +end + +struct Update + fn::String +end + +struct UpdateUntil + fn::String + time::Float64 +end + +struct GetInputVarNames + fn::String +end + +struct GetOutputVarNames + fn::String +end + +struct GetVarItemSize + fn::String + name::String +end + +struct GetVarType + fn::String + name::String +end + +struct GetVarUnits + fn::String + name::String +end + +struct GetVarNbytes + fn::String + name::String +end + +struct GetVarLocation + fn::String + name::String +end + +struct GetValue + fn::String + name::String + dest::Vector{Wflow.Float} +end + +struct GetValuePtr + fn::String + name::String +end + +struct GetValueAtIndices + fn::String + inds::Vector{Int} + dest::Vector{Wflow.Float} + name::String +end + +struct SetValue + fn::String + name::String + src::Vector{Wflow.Float} +end + +struct SetValueAtIndices + fn::String + inds::Vector{Int} + name::String + src::Vector{Wflow.Float} +end + +struct GetGridType + fn::String + grid::Int +end + +struct GetGridRank + fn::String + grid::Int +end + +struct GetGridSize + fn::String + grid::Int +end + +struct GetGridX + fn::String + grid::Int + x::Vector{Wflow.Float} +end + +struct GetGridY + fn::String + grid::Int + y::Vector{Wflow.Float} +end + +struct GetGridNodeCount + fn::String + grid::Int +end + +struct GetGridEdgeCount + fn::String + grid::Int +end + +struct GetGridEdgeNodes + fn::String + grid::Int + edge_nodes::Vector{Int} +end + +struct GetVarGrid + fn::String + name::String +end + +struct Finalize + fn::String +end + +struct LoadState + fn::String +end + +struct SaveState + fn::String +end + +function wflow_bmi(m::Initialize, model::Union{Wflow.Model,Nothing}) + model = getfield(Wflow.BMI, Symbol(m.fn))(Wflow.Model, m.config_file) + return model +end + +function wflow_bmi(m::GetComponentName, model::Wflow.Model) + component_name = getfield(Wflow.BMI, Symbol(m.fn))(model) + return Dict("component_name" => component_name) +end + +function wflow_bmi(m::GetInputItemCount, model::Wflow.Model) + input_item_count = getfield(Wflow.BMI, Symbol(m.fn))(model) + return Dict("input_item_count" => input_item_count) +end + +function wflow_bmi(m::GetOutputItemCount, model::Wflow.Model) + output_item_count = getfield(Wflow.BMI, Symbol(m.fn))(model) + return Dict("output_item_count" => output_item_count) +end + +function wflow_bmi(m::GetStartTime, model::Wflow.Model) + start_time = getfield(Wflow.BMI, Symbol(m.fn))(model) + return Dict("start_time" => start_time) +end + +function wflow_bmi(m::GetStartUnixTime, model::Wflow.Model) + start_unix_time = getfield(Wflow, Symbol(m.fn))(model) + return Dict("start_unix_time" => start_unix_time) +end + +function wflow_bmi(m::GetEndTime, model::Wflow.Model) + end_time = getfield(Wflow.BMI, Symbol(m.fn))(model) + return Dict("end_time" => end_time) +end + +function wflow_bmi(m::GetTimeStep, model::Wflow.Model) + time_step = getfield(Wflow.BMI, Symbol(m.fn))(model) + return Dict("time_step" => time_step) +end + +function wflow_bmi(m::GetTimeUnits, model::Wflow.Model) + time_units = getfield(Wflow.BMI, Symbol(m.fn))(model) + return Dict("time_units" => time_units) +end + +function wflow_bmi(m::GetCurrentTime, model::Wflow.Model) + current_time = getfield(Wflow.BMI, Symbol(m.fn))(model) + return Dict("current_time" => current_time) +end + +function wflow_bmi(m::UpdateUntil, model::Wflow.Model) + model = getfield(Wflow.BMI, Symbol(m.fn))(model, m.time) + return model +end + +function wflow_bmi(m::Update, model::Wflow.Model) + model = getfield(Wflow.BMI, Symbol(m.fn))(model) + return model +end + +function wflow_bmi(m::GetInputVarNames, model::Wflow.Model) + input_var_names = getfield(Wflow.BMI, Symbol(m.fn))(model) + return Dict("input_var_names" => input_var_names) +end + +function wflow_bmi(m::GetOutputVarNames, model::Wflow.Model) + output_var_names = getfield(Wflow.BMI, Symbol(m.fn))(model) + return Dict("output_var_names" => output_var_names) +end + +function wflow_bmi(m::GetVarItemSize, model::Wflow.Model) + var_itemsize = getfield(Wflow.BMI, Symbol(m.fn))(model, m.name) + return Dict("var_itemsize" => var_itemsize) +end + +function wflow_bmi(m::GetVarType, model::Wflow.Model) + var_type = getfield(Wflow.BMI, Symbol(m.fn))(model, m.name) + return Dict("var_type" => var_type) +end + +function wflow_bmi(m::GetVarUnits, model::Wflow.Model) + var_units = getfield(Wflow.BMI, Symbol(m.fn))(model, m.name) + return Dict("var_units" => var_units) +end + +function wflow_bmi(m::GetVarNbytes, model::Wflow.Model) + var_nbytes = getfield(Wflow.BMI, Symbol(m.fn))(model, m.name) + return Dict("var_nbytes" => var_nbytes) +end + +function wflow_bmi(m::GetVarLocation, model::Wflow.Model) + var_location = getfield(Wflow.BMI, Symbol(m.fn))(model, m.name) + return Dict("var_location" => var_location) +end + +function wflow_bmi(m::GetValue, model::Wflow.Model) + getfield(Wflow.BMI, Symbol(m.fn))(model, m.name, m.dest) + return Dict("value" => m.dest) +end + +function wflow_bmi(m::GetValuePtr, model::Wflow.Model) + value_ptr = getfield(Wflow.BMI, Symbol(m.fn))(model, m.name) + return Dict("value_ptr" => value_ptr) +end + +function wflow_bmi(m::GetValueAtIndices, model::Wflow.Model) + getfield(Wflow.BMI, Symbol(m.fn))(model, m.name, m.dest, m.inds) + return Dict("value_at_indices" => m.dest) +end + +function wflow_bmi(m::SetValue, model::Wflow.Model) + getfield(Wflow.BMI, Symbol(m.fn))(model, m.name, m.src) + return nothing +end + +function wflow_bmi(m::SetValueAtIndices, model::Wflow.Model) + getfield(Wflow.BMI, Symbol(m.fn))(model, m.name, m.inds, m.src) + return nothing +end + +function wflow_bmi(m::GetGridType, model::Wflow.Model) + grid_type = getfield(Wflow.BMI, Symbol(m.fn))(model, m.grid) + return Dict("grid_type" => grid_type) +end + +function wflow_bmi(m::GetGridRank, model::Wflow.Model) + grid_rank = getfield(Wflow.BMI, Symbol(m.fn))(model, m.grid) + return Dict("grid_rank" => grid_rank) +end + +function wflow_bmi(m::GetGridSize, model::Wflow.Model) + grid_size = getfield(Wflow.BMI, Symbol(m.fn))(model, m.grid) + return Dict("grid_size" => grid_size) +end + +function wflow_bmi(m::GetGridX, model::Wflow.Model) + getfield(Wflow.BMI, Symbol(m.fn))(model, m.grid, m.x) + return Dict("grid_x" => m.x) +end + +function wflow_bmi(m::GetGridY, model::Wflow.Model) + getfield(Wflow.BMI, Symbol(m.fn))(model, m.grid, m.y) + return Dict("grid_y" => m.y) +end + +function wflow_bmi(m::GetGridNodeCount, model::Wflow.Model) + grid_node_count = getfield(Wflow.BMI, Symbol(m.fn))(model, m.grid) + return Dict("grid_node_count" => grid_node_count) +end + +function wflow_bmi(m::GetGridEdgeCount, model::Wflow.Model) + grid_edge_count = getfield(Wflow.BMI, Symbol(m.fn))(model, m.grid) + return Dict("grid_edge_count" => grid_edge_count) +end + +function wflow_bmi(m::GetGridEdgeNodes, model::Wflow.Model) + getfield(Wflow.BMI, Symbol(m.fn))(model, m.grid, m.edge_nodes) + return Dict("grid_edge_nodes" => m.edge_nodes) +end + +function wflow_bmi(m::GetVarGrid, model::Wflow.Model) + var_grid = getfield(Wflow.BMI, Symbol(m.fn))(model, m.name) + return Dict("var_grid" => var_grid) +end + +function wflow_bmi(m::Finalize, model::Wflow.Model) + getfield(Wflow.BMI, Symbol(m.fn))(model) + return Dict("status" => "OK") +end + +function wflow_bmi(m::LoadState, model::Wflow.Model) + model = getfield(Wflow, Symbol(m.fn))(model) + return model +end + +function wflow_bmi(m::SaveState, model::Wflow.Model) + getfield(Wflow, Symbol(m.fn))(model) + return Dict("status" => "OK") +end diff --git a/server/src/server.jl b/server/src/server.jl new file mode 100644 index 000000000..4e183af90 --- /dev/null +++ b/server/src/server.jl @@ -0,0 +1,185 @@ +# map JSON function name to Struct (bmi_service.jl) +const map_structs = Dict( + "initialize" => Initialize, + "get_component_name" => GetComponentName, + "get_input_item_count" => GetInputItemCount, + "get_output_item_count" => GetOutputItemCount, + "get_start_time" => GetStartTime, + "get_start_unix_time" => GetStartUnixTime, + "get_end_time" => GetEndTime, + "get_time_step" => GetTimeStep, + "get_time_units" => GetTimeUnits, + "get_current_time" => GetCurrentTime, + "update_until" => UpdateUntil, + "update" => Update, + "get_input_var_names" => GetInputVarNames, + "get_output_var_names" => GetOutputVarNames, + "get_var_itemsize" => GetVarItemSize, + "get_var_type" => GetVarType, + "get_var_units" => GetVarUnits, + "get_var_location" => GetVarLocation, + "get_var_nbytes" => GetVarNbytes, + "get_value" => GetValue, + "get_value_ptr" => GetValuePtr, + "get_value_at_indices" => GetValueAtIndices, + "set_value" => SetValue, + "set_value_at_indices" => SetValueAtIndices, + "get_grid_type" => GetGridType, + "get_var_grid" => GetVarGrid, + "get_grid_rank" => GetGridRank, + "get_grid_size" => GetGridSize, + "get_grid_x" => GetGridX, + "get_grid_y" => GetGridY, + "get_grid_node_count" => GetGridNodeCount, + "get_grid_edge_count" => GetGridEdgeCount, + "get_grid_edge_nodes" => GetGridEdgeNodes, + "finalize" => Finalize, + "load_state" => LoadState, + "save_state" => SaveState, +) + +mutable struct ModelHandler + model::Union{Wflow.Model,Nothing} +end + +"Shutdown ZMQ server" +function shutdown(s::ZMQ.Socket, ctx::ZMQ.Context) + @info "Shutting down Wflow ZMQ server on request..." + ZMQ.close(s) + ZMQ.close(ctx) +end + +"Error response ZMQ server" +function response(err::AbstractString, s::ZMQ.Socket) + @info "Send error response" + resp = Dict{String,String}("status" => "ERROR", "error" => err) + ZMQ.send(s, JSON3.write(resp)) +end + +"Status response ZMQ server" +function response(s::ZMQ.Socket) + @info "Send status response" + resp = Dict{String,String}("status" => "OK") + ZMQ.send(s, JSON3.write(resp)) +end + +"Validate JSON request against mapped Struct" +function valid_request(json) + for f in fieldnames(map_structs[json.fn]) + if f ∉ keys(json) + return f + break + end + end +end + +""" + wflow_bmi(s::Socket, handler::ModelHandler, f) + +Run a Wflow function through `wflow.bmi(f, handler.model)` and update Wflow Model `handler` +if required, depending on return type of `wflow.bmi(f, handler.model)`. +""" +function wflow_bmi(s::ZMQ.Socket, handler::ModelHandler, f) + try + ret = wflow_bmi(f, handler.model) + if typeof(ret) <: Wflow.Model # update of Wflow model + handler.model = ret + response(s) + elseif isnothing(ret) # for SetValue and SetValueAtIndices + response(s) + else + @info "Send response including output from Wflow function `$(f.fn)`" + ZMQ.send(s, JSON3.write(ret)) + end + catch e + @error "Wflow function `$(f.fn)` failed" exception = (e, catch_backtrace()) + err = string( + "Wflow function `$(f.fn)` failed\n exception =\n ", + sprint(showerror, e, catch_backtrace()), + ) + response(err, s) + end +end + +main() = main(ARGS) + +""" + main(ARGS::Vector{String}) + main() + +This is the main entry point of the Wflow ZMQ Server. Performs argument parsing and starts +the Wflow ZMQ Server, with `WflowServer.start(port::Int)`. +""" +function main(ARGS::Vector{String}) + n = length(ARGS) + if n == 0 + port = 5555 + elseif startswith(ARGS[1], "--port") + if occursin("=", ARGS[1]) + port = parse(Int, split(ARGS[1], "=")[2]) + else + port = parse(Int, ARGS[2]) + end + else + throw( + ArgumentError( + "One argument is allowed to specify the port number: `--port=` or `--port `, + where `` refers to the port number.", + ), + ) + end + start(port) +end + +""" + start(port::Int) + +Start the Wflow ZMQ Server using port number `port`. +""" +function start(port::Int) + @info "Start Wflow ZMQ Server..." + + # initialize Wflow model handler + handler = ModelHandler(nothing) + + # set up a ZMQ context, with optional port number (default = 5555) argument + context = ZMQ.Context() + socket = ZMQ.Socket(context, ZMQ.REP) + ZMQ.bind(socket, "tcp://*:$port") + + try + while true + # Wait for next request from client + req = ZMQ.recv(socket) + json = JSON3.read(req) + @info "Received request to run function `$(json.fn)`..." + + if haskey(map_structs, json.fn) + v = valid_request(json) + if isnothing(v) + f = StructTypes.constructfrom(map_structs[json.fn], json) + wflow_bmi(socket, handler, f) + else + err = ( + """At least one required argument name (`$v`) not available for function: `$(json.fn)`""" + ) + @error err + response(err, socket) + end + elseif json.fn === "shutdown" + response(socket) + break + else + err = "Received invalid Wflow function: `$(json.fn)`" + @error err + response(err, socket) + end + end + catch e + err = "Wflow ZMQ Server: exception in process" + @error err + response(err, socket) + finally + shutdown(socket, context) + end +end diff --git a/server/test/client.jl b/server/test/client.jl new file mode 100644 index 000000000..41fc1182d --- /dev/null +++ b/server/test/client.jl @@ -0,0 +1,170 @@ +# start Wflow ZMQ server (@async) +@async begin + WflowServer.main() +end + +# Connecting to the Wflow ZMQ Server +context = ZMQ.Context() +socket = ZMQ.Socket(context, ZMQ.REQ) +ZMQ.connect(socket, "tcp://localhost:5555") + +function request(message) + ZMQ.send(socket, JSON3.write(message)) + ret_value = JSON3.read(ZMQ.recv(socket), Dict) + return ret_value +end + +@testset "initialization and time functions" begin + msg = + (fn = "initialize", config_file = joinpath(@__DIR__, "sbm_config.toml")) + @test request(msg) == Dict("status" => "OK") + @test request((fn = "get_end_time",)) == Dict("end_time" => 2678400) + @test request((fn = "get_start_time",)) == Dict("start_time" => 0) + @test request((fn = "get_start_unix_time",)) == Dict("start_unix_time" => 946684800) + @test request((fn = "get_time_step",)) == Dict("time_step" => 86400) + @test request((fn = "get_time_units",)) == Dict("time_units" => "s") +end + +@testset "update functions" begin + @test request((fn = "update_until", time = 86400.0)) == Dict("status" => "OK") + @test request((fn = "get_current_time",)) == Dict("current_time" => 86400) + @test request((fn = "update",)) == Dict("status" => "OK") +end + +@testset "model information functions" begin + @test request((fn = "get_component_name",)) == Dict("component_name" => "sbm") + @test request((fn = "get_input_item_count",)) == Dict("input_item_count" => 181) + @test request((fn = "get_output_item_count",)) == Dict("output_item_count" => 181) + @test request((fn = "get_input_var_names",))["input_var_names"][[1, 5, 151, 175]] == [ + "vertical.nlayers", + "vertical.θᵣ", + "lateral.river.q", + "lateral.river.reservoir.outflow", + ] + @test request((fn = "get_output_var_names",))["output_var_names"][[1, 5, 151, 175]] == [ + "vertical.nlayers", + "vertical.θᵣ", + "lateral.river.q", + "lateral.river.reservoir.outflow", + ] +end + +zi_size = 0 +vwc_1_size = 0 +@testset "variable information and get and set functions" begin + @test request((fn = "get_var_itemsize", name = "lateral.subsurface.ssf")) == + Dict("var_itemsize" => sizeof(Wflow.Float)) + @test request((fn = "get_var_type", name = "vertical.n"))["status"] == "ERROR" + @test request((fn = "get_var_units", name = "vertical.θₛ")) == Dict("var_units" => "-") + @test request((fn = "get_var_location", name = "lateral.river.q")) == + Dict("var_location" => "node") + zi_nbytes = request((fn = "get_var_nbytes", name = "vertical.zi"))["var_nbytes"] + @test zi_nbytes == 400504 + zi_itemsize = request((fn = "get_var_itemsize", name = "vertical.zi"))["var_itemsize"] + zi_size = Int(zi_nbytes / zi_itemsize) + vwc_1_nbytes = request((fn = "get_var_nbytes", name = "vertical.vwc[1]"))["var_nbytes"] + @test vwc_1_nbytes == 400504 + vwc_1_itemsize = + request((fn = "get_var_itemsize", name = "vertical.vwc[1]"))["var_itemsize"] + vwc_1_size = Int(vwc_1_nbytes / vwc_1_itemsize) + @test request((fn = "get_var_grid", name = "lateral.river.h")) == Dict("var_grid" => 3) + msg = (fn = "get_value", name = "vertical.zi", dest = fill(0.0, zi_size)) + @test mean(request(msg)["value"]) ≈ 278.1510965581235 + msg = (fn = "get_value_ptr", name = "vertical.θₛ") + @test mean(request(msg)["value_ptr"]) ≈ 0.4409211971535584 + msg = ( + fn = "get_value_at_indices", + name = "lateral.river.q", + dest = [0.0, 0.0, 0.0], + inds = [1, 5, 10], + ) + @test request(msg)["value_at_indices"] ≈ + [2.1901434445889123, 2.6778265820894545, 3.470059871798756] + msg = (fn = "set_value", name = "vertical.zi", src = fill(300.0, zi_size)) + @test request(msg) == Dict("status" => "OK") + msg = (fn = "get_value", name = "vertical.zi", dest = fill(0.0, zi_size)) + @test mean(request(msg)["value"]) == 300.0 + msg = ( + fn = "set_value_at_indices", + name = "vertical.zi", + src = [250.0, 350.0], + inds = [1, 2], + ) + @test request(msg) == Dict("status" => "OK") + msg = ( + fn = "get_value_at_indices", + name = "vertical.zi", + dest = [0.0, 0.0, 0.0], + inds = [1, 2, 3], + ) + @test request(msg)["value_at_indices"] == [250.0, 350.0, 300.0] + msg = (fn = "get_value", name = "vertical.vwc[1]", dest = fill(0.0, vwc_1_size)) + @test mean(request(msg)["value"]) ≈ 0.1845159140308566f0 + msg = ( + fn = "get_value_at_indices", + name = "vertical.vwc[1]", + dest = [0.0, 0.0, 0.0], + inds = [1, 2, 3], + ) + @test request(msg)["value_at_indices"] ≈ + [0.12089607119560242f0, 0.11967185322648062f0, 0.14503555864288548f0] + msg = (fn = "set_value", name = "vertical.vwc[1]", src = fill(0.3, vwc_1_size)) + @test request(msg) == Dict("status" => "OK") + msg = (fn = "get_value", name = "vertical.vwc[1]", dest = fill(0.0, vwc_1_size)) + @test mean(request(msg)["value"]) ≈ 0.3f0 + msg = ( + fn = "get_value_at_indices", + name = "vertical.vwc[1]", + dest = [0.0, 0.0, 0.0], + inds = [1, 2, 3], + ) + @test request(msg)["value_at_indices"] == [0.3, 0.3, 0.3] + msg = ( + fn = "set_value_at_indices", + name = "vertical.vwc[1]", + src = [0.1, 0.25], + inds = [1, 2], + ) + @test request(msg) == Dict("status" => "OK") + msg = ( + fn = "get_value_at_indices", + name = "vertical.vwc[1]", + dest = [0.0, 0.0], + inds = [1, 2], + ) + @test request(msg)["value_at_indices"] == [0.1, 0.25] +end + +@testset "model grid functions" begin + @test request((fn = "get_grid_type", grid = 0)) == Dict("grid_type" => "points") + @test request((fn = "get_grid_rank", grid = 0)) == Dict("grid_rank" => 2) + grid_size = request((fn = "get_grid_size", grid = 4))["grid_size"] + @test grid_size == 50063 + msg = (fn = "get_grid_x", grid = 4, x = fill(0.0, grid_size)) + @test request(msg)["grid_x"][1:3] ≈ + [6.826666666666673, 6.810000000000006, 6.81833333333334] + msg = (fn = "get_grid_y", grid = 4, y = fill(0.0, grid_size)) + @test request(msg)["grid_y"][1:3] ≈ [47.8175, 47.825833333333335, 47.825833333333335] + @test request((fn = "get_grid_node_count", grid = 0)) == Dict("grid_node_count" => 2) + @test request((fn = "get_grid_edge_count", grid = 3))["grid_edge_count"] == 5808 + msg = (fn = "get_grid_edge_nodes", grid = 3, edge_nodes = fill(0, 2 * 5808)) + @test request(msg)["grid_edge_nodes"][1:6] == [1, 5, 2, 1, 3, 2] +end + +@testset "model states and finalize functions" begin + @test request((fn = "load_state",)) == Dict("status" => "OK") + @test request((fn = "save_state",)) == Dict("status" => "OK") + @test request((fn = "finalize",)) == Dict("status" => "OK") +end + +@testset "Error handling and shutdown" begin + @test request((fn = "not_existing_function",)) == Dict( + "status" => "ERROR", + "error" => "Received invalid Wflow function: `not_existing_function`", + ) + @test request((fn = "initialize",)) == Dict( + "status" => "ERROR", + "error" => "At least one required argument name (`config_file`) not available for function: `initialize`", + ) + @test request((fn = "shutdown",)) == Dict("status" => "OK") +end diff --git a/server/test/runtests.jl b/server/test/runtests.jl new file mode 100644 index 000000000..8d016a2b6 --- /dev/null +++ b/server/test/runtests.jl @@ -0,0 +1,35 @@ +import ZMQ +import JSON3 +import StructTypes +import Wflow +import WflowServer +import Statistics: mean +import Logging: with_logger, NullLogger +import Test: @testset, @test +import Downloads + +# ensure test data is present +testdir = @__DIR__ +datadir = joinpath(testdir, "data") +inputdir = joinpath(datadir, "input") +isdir(inputdir) || mkpath(inputdir) + +"Download a test data file if it does not already exist" +function testdata(version, source_filename, target_filename) + target_path = joinpath(inputdir, target_filename) + base_url = "https://github.com/visr/wflow-artifacts/releases/download" + url = string(base_url, '/', string('v', version), '/', source_filename) + isfile(target_path) || Downloads.download(url, target_path) + return target_path +end + +staticmaps_moselle_path = + testdata(v"0.2.9", "staticmaps-moselle.nc", "staticmaps-moselle.nc") +forcing_moselle_path = testdata(v"0.2.6", "forcing-moselle.nc", "forcing-moselle.nc") +instates_moselle_path = testdata(v"0.2.6", "instates-moselle.nc", "instates-moselle.nc") + +with_logger(NullLogger()) do + @testset "Test client server Wflow ZMQ Server" begin + include("client.jl") + end +end diff --git a/server/test/sbm_config.toml b/server/test/sbm_config.toml new file mode 100644 index 000000000..3ab5c92d0 --- /dev/null +++ b/server/test/sbm_config.toml @@ -0,0 +1,232 @@ +# This is a TOML configuration file for Wflow. +# Relative file paths are interpreted as being relative to this TOML file. +# Wflow documentation https://deltares.github.io/Wflow.jl/dev/ +# TOML documentation: https://github.com/toml-lang/toml + +calendar = "proleptic_gregorian" +endtime = 2000-02-01T00:00:00 +starttime = 2000-01-01T00:00:00 +time_units = "days since 1900-01-01 00:00:00" +timestepsecs = 86400 +dir_input = "data/input" +dir_output = "data/output" +loglevel = "info" + +[state] +path_input = "instates-moselle.nc" +path_output = "outstates-moselle.nc" + +# if listed, the variable must be present in the NetCDF or error +# if not listed, the variable can get a default value if it has one + +[state.vertical] +canopystorage = "canopystorage" +satwaterdepth = "satwaterdepth" +snow = "snow" +snowwater = "snowwater" +tsoil = "tsoil" +ustorelayerdepth = "ustorelayerdepth" + +[state.lateral.river] +h = "h_river" +h_av = "h_av_river" +q = "q_river" + +[state.lateral.river.reservoir] +volume = "volume_reservoir" + +[state.lateral.subsurface] +ssf = "ssf" + +[state.lateral.land] +h = "h_land" +h_av = "h_av_land" +q = "q_land" + +[input] +path_forcing = "forcing-moselle.nc" +path_static = "staticmaps-moselle.nc" + +# these are not directly part of the model +gauges = "wflow_gauges_grdc" +ldd = "wflow_ldd" +river_location = "wflow_river" +subcatchment = "wflow_subcatch" + +# specify the internal IDs of the parameters which vary over time +# the external name mapping needs to be below together with the other mappings +forcing = [ + "vertical.precipitation", + "vertical.temperature", + "vertical.potential_evaporation", +] + +cyclic = ["vertical.leaf_area_index"] + +[input.vertical] +c = "c" +cf_soil = "cf_soil" +cfmax = "Cfmax" +e_r = "EoverR" +f = "f" +infiltcappath = "InfiltCapPath" +infiltcapsoil = "InfiltCapSoil" +kext = "Kext" +leaf_area_index = "LAI" +maxleakage = "MaxLeakage" +pathfrac = "PathFrac" +potential_evaporation = "pet" +precipitation = "precip" +rootdistpar = "rootdistpar" +rootingdepth = "RootingDepth" +soilthickness = "SoilThickness" +specific_leaf = "Sl" +storage_wood = "Swood" +temperature = "temp" +tt = "TT" +tti = "TTI" +ttm = "TTM" +water_holding_capacity = "WHC" +waterfrac = "WaterFrac" +theta_r = "thetaR" +theta_s = "thetaS" + +[input.vertical.kv_0] +netcdf.variable.name = "KsatVer" +scale = 1.0 +offset = 0.0 + +[input.lateral.river] +length = "wflow_riverlength" +n = "N_River" +slope = "RiverSlope" +width = "wflow_riverwidth" +bankfull_elevation = "RiverZ" +bankfull_depth = "RiverDepth" + +[input.lateral.river.reservoir] +area = "ResSimpleArea" +areas = "wflow_reservoirareas" +demand = "ResDemand" +locs = "wflow_reservoirlocs" +maxrelease = "ResMaxRelease" +maxvolume = "ResMaxVolume" +targetfullfrac = "ResTargetFullFrac" +targetminfrac = "ResTargetMinFrac" + +[input.lateral.subsurface] +ksathorfrac = "KsatHorFrac" + +[input.lateral.land] +n = "N" +slope = "Slope" + +[model] +kin_wave_iteration = true +masswasting = true +reinit = true +reservoirs = true +snow = true +thicknesslayers = [100, 300, 800] +type = "sbm" +min_streamorder_river = 6 +min_streamorder_land = 5 + +[output] +path = "output_moselle.nc" + +[output.vertical] +canopystorage = "canopystorage" +satwaterdepth = "satwaterdepth" +snow = "snow" +snowwater = "snowwater" +tsoil = "tsoil" +ustorelayerdepth = "ustorelayerdepth" + +[output.lateral.river] +h = "h_river" +q = "q_river" + +[output.lateral.river.reservoir] +volume = "volume_reservoir" + +[output.lateral.subsurface] +ssf = "ssf" + +[output.lateral.land] +h = "h_land" +q = "q_land" + +[netcdf] +path = "output_scalar_moselle.nc" + +[[netcdf.variable]] +name = "Q" +map = "gauges" +parameter = "lateral.river.q" + +[[netcdf.variable]] +coordinate.x = 6.255 +coordinate.y = 50.012 +name = "temp_coord" +location = "temp_bycoord" +parameter = "vertical.temperature" + +[[netcdf.variable]] +location = "temp_byindex" +name = "temp_index" +index.x = 100 +index.y = 264 +parameter = "vertical.temperature" + +[csv] +path = "output_moselle.csv" + +[[csv.column]] +header = "Q" +parameter = "lateral.river.q" +reducer = "maximum" + +[[csv.column]] +header = "volume" +index = 1 +parameter = "lateral.river.reservoir.volume" + +[[csv.column]] +coordinate.x = 6.255 +coordinate.y = 50.012 +header = "temp_bycoord" +parameter = "vertical.temperature" + +[[csv.column]] +coordinate.x = 6.255 +coordinate.y = 50.012 +header = "vwc_layer2_bycoord" +parameter = "vertical.vwc" +layer = 2 + +[[csv.column]] +header = "temp_byindex" +index.x = 100 +index.y = 264 +parameter = "vertical.temperature" + +[[csv.column]] +header = "Q" +map = "gauges" +parameter = "lateral.river.q" + +[[csv.column]] +header = "recharge" +map = "subcatchment" +parameter = "vertical.recharge" +reducer = "mean" + +[API] +components = [ + "vertical", + "lateral.subsurface", + "lateral.land", + "lateral.river", + "lateral.river.reservoir", +] diff --git a/src/flow.jl b/src/flow.jl index c0c0a1515..c9c8d25e8 100644 --- a/src/flow.jl +++ b/src/flow.jl @@ -283,7 +283,6 @@ function update(sf::SurfaceFlowRiver, network, inflow_wb, doy) for (n, v) in zip(indices_subdomain[m], topo_subdomain[m]) # sf.qin by outflow from upstream reservoir or lake location is added sf.qin[v] += sum_at(sf.q, upstream_nodes[n]) - # Inflow supply/abstraction is added to qlat (divide by flow length) # If inflow < 0, abstraction is limited if sf.inflow[v] < 0.0 @@ -349,7 +348,6 @@ function update(sf::SurfaceFlowRiver, network, inflow_wb, doy) # update h crossarea = sf.α[v] * pow(sf.q[v], sf.β) sf.h[v] = crossarea / sf.width[v] - sf.q_av[v] += sf.q[v] sf.h_av[v] += sf.h[v] end diff --git a/src/sediment_model.jl b/src/sediment_model.jl index aea2ceb83..6e1f1397d 100644 --- a/src/sediment_model.jl +++ b/src/sediment_model.jl @@ -149,8 +149,8 @@ function initialize_sediment_model(config::Config) ) model = set_states(model) - @info "Initialized model" + return model end