From eef3d1f574cd514921f5a94182e4f558d0597a37 Mon Sep 17 00:00:00 2001 From: Willem van Verseveld Date: Tue, 27 Feb 2024 09:52:47 +0100 Subject: [PATCH] Wflow ZMQ Server (#356) * Start with Wflow (BMI) ZMQ server initialization of Wflow model * improve error handling and check request client * add `get_start_time` function also changed the `ModelHandler` so the same `wflow_bmi` function can be used for initialization and other BMI functions. * add `get_end_time` function * add functions: `finalize` and `update_until` * fix shutdown of server * add remaining BMI functions and tests * run tests from runtests.jl * renamed bmi_api.jl to bmi_service.jl should have been part of previous 2 commits... * set states during initialization Move set states functionality from initialization function to a separate `set_states` function for each Model type. It also makes coupling to OpenDA more convenient (load and save states from OpenDA). * add functions `load_state` and `save_state` * improve and fix bmi - fix list of exchange variables, if a model does not include reservoirs or lakes, these variables should be left out. - add grid types "scalar" (for uniform/constant variables) and "none" (for variables not related to a model grid) - add "flextopo" model * split struct `SurfaceFlow` SurfaceFlow is used for both kinematic wave river and overland flow. This is now split into structs SurfaceFlowRiver and SurfaceFlowLand. It simplifies the orginal update function (split into two, logic easier to follow), and dispatching on these types is easier (e.g. for BMI to extract relevant properties as grid type). * fix tests and code because of update BMI - use simulation starttime as t = 0 (not 1970). - BMI.get_time_units: return "s" (instead of "seconds since 1970-01-01T00:00:00"). - BMI.get_time_units now gets called on the model rather than the type. * add `get_start_unix_time` function to get a reference datetime. Model start time (BMI) returns always 0.0. * Flexible port number ZMQ port number can be provided as command line argument (default = 5555). * update test change of `starttime` definition * Fix bmi.jl File was not up-to-date with Master. * Align with BMI fixes Master branch * Update tests Scalar variables are not exposed through BMI. * Add doc-strings functions, README.md, comments Also update docs Wflow.jl (mention possibilty to run Wflow as a server). * Rename module and add start function server * Use relative paths in README * Update README * Update docs (typo OpenDA) Co-authored-by: Martijn Visser * Update docs Co-authored-by: Martijn Visser * Update server/README.md Co-authored-by: Martijn Visser * Update server/README.md Co-authored-by: Martijn Visser * Do not export `start` Co-authored-by: Martijn Visser * Update README.md Co-authored-by: Martijn Visser * Update flow.jl (remove trailing spaces) Co-authored-by: Martijn Visser * Address review comments * Fix import for tests * Make testing work from `Pkg` - Project.toml needs uuid - Logging dep was missing * Run CI for Wflow Server * Fix missing yml extension * Delete CIWflowServer * Fix syntax `workflow_run` * Update CIWflowServer.yml * Remove `workflow_run` (only works on the main branch) CI workflow for Wflow ZMQ Server now also triggered by PR (and push to Master) as CI workflow for Wflow. Wflow test model (Moselle) for Wflow ZMQ Server is downloaded to Wflow\server\test\. * Fix paths * Add `julia-runtest@v1` * Replace `julia-runtest@v1` (runs Wflow tests) * Add `sbm_config.toml` for tests * Local dev install of Wflow * Update test WflowServer (merge master) * Remove exit from CIWflowServer * Remove threads from name CI WflowServer * Update codecov-action * Add dependabot githubactions * Specify directory processcoverage CI WflowServer --------- Co-authored-by: Martijn Visser --- .github/dependabot.yml | 7 + .github/workflows/CI.yml | 5 +- .github/workflows/CIWflowServer.yml | 56 ++++ docs/src/user_guide/additional_options.md | 6 + server/Project.toml | 25 ++ server/README.md | 51 ++++ server/run_server.jl | 3 + server/src/WflowServer.jl | 9 + server/src/bmi_service.jl | 357 ++++++++++++++++++++++ server/src/server.jl | 185 +++++++++++ server/test/client.jl | 170 +++++++++++ server/test/runtests.jl | 35 +++ server/test/sbm_config.toml | 232 ++++++++++++++ src/flow.jl | 2 - src/sediment_model.jl | 2 +- 15 files changed, 1140 insertions(+), 5 deletions(-) create mode 100644 .github/dependabot.yml create mode 100644 .github/workflows/CIWflowServer.yml create mode 100644 server/Project.toml create mode 100644 server/README.md create mode 100644 server/run_server.jl create mode 100644 server/src/WflowServer.jl create mode 100644 server/src/bmi_service.jl create mode 100644 server/src/server.jl create mode 100644 server/test/client.jl create mode 100644 server/test/runtests.jl create mode 100644 server/test/sbm_config.toml diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 000000000..700707ced --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,7 @@ +# https://docs.github.com/github/administering-a-repository/configuration-options-for-dependency-updates +version: 2 +updates: + - package-ecosystem: "github-actions" + directory: "/" # Location of package manifests + schedule: + interval: "weekly" diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 5a94f1f7a..ca916af89 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -44,6 +44,7 @@ jobs: env: JULIA_NUM_THREADS: ${{ matrix.threads }} - uses: julia-actions/julia-processcoverage@v1 - - uses: codecov/codecov-action@v3 + - uses: codecov/codecov-action@v4 with: - file: lcov.info + files: lcov.info + token: ${{ secrets.CODECOV_TOKEN }} diff --git a/.github/workflows/CIWflowServer.yml b/.github/workflows/CIWflowServer.yml new file mode 100644 index 000000000..89e442d80 --- /dev/null +++ b/.github/workflows/CIWflowServer.yml @@ -0,0 +1,56 @@ +name: Wflow Server CI +on: + pull_request: + push: + branches: + - master + tags: '*' +jobs: + test: + name: WflowServer Julia ${{ matrix.version }} - ${{ matrix.os }} - ${{ matrix.arch }} - ${{ github.event_name }} + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + version: + - '1.6' + - '1' + os: + - ubuntu-latest + - windows-latest + arch: + - x64 + steps: + - uses: actions/checkout@v2 + - uses: julia-actions/setup-julia@v1 + with: + version: ${{ matrix.version }} + arch: ${{ matrix.arch }} + - uses: actions/cache@v1 + env: + cache-name: cache-artifacts + with: + path: ~/.julia/artifacts + key: ${{ runner.os }}-test-${{ env.cache-name }}-${{ hashFiles('**/Project.toml') }} + restore-keys: | + ${{ runner.os }}-test-${{ env.cache-name }}- + ${{ runner.os }}-test- + ${{ runner.os }}- + - name: Install Julia dependencies + shell: julia --project=server {0} + run: | + using Pkg; + # dev install Wflow + pkg"dev ." + - name: Run the Wflow Server tests + continue-on-error: true + run: > + julia --color=yes --project=server -e 'using Pkg; Pkg.test("WflowServer", coverage=true)' + shell: bash + - uses: julia-actions/julia-processcoverage@v1 + with: + directories: server/src + - uses: codecov/codecov-action@v4 + with: + files: lcov.info + token: ${{ secrets.CODECOV_TOKEN }} diff --git a/docs/src/user_guide/additional_options.md b/docs/src/user_guide/additional_options.md index 7862a5f7c..1f2bdcf57 100644 --- a/docs/src/user_guide/additional_options.md +++ b/docs/src/user_guide/additional_options.md @@ -289,3 +289,9 @@ expectations, which then can get parsed with these Delft-FEWS log parsing settin * [Debug] * ``` + +## [Run Wflow as a ZMQ Server] +It is possible to run Wflow as a ZMQ Server, for example for the coupling to the +[OpenDA](https://openda.org/) software for data-assimilation. The code for the Wflow ZMQ +Server is not part of the Wflow.jl package, and is located +[here](https://github.com/Deltares/Wflow.jl/tree/zmq_server/server). diff --git a/server/Project.toml b/server/Project.toml new file mode 100644 index 000000000..38bcca654 --- /dev/null +++ b/server/Project.toml @@ -0,0 +1,25 @@ +name = "WflowServer" +uuid = "6f3aa2dc-f527-41f4-85fc-b3cbabe74cf6" +authors = ["Deltares and contributors"] +version = "0.1.0" + +[deps] +JSON3 = "0f8b85d8-7281-11e9-16c2-39a750bddbf1" +Logging = "56ddb016-857b-54e1-b83d-db4d58db5568" +StructTypes = "856f2bd8-1eba-4b0a-8007-ebc267875bd4" +Wflow = "d48b7d99-76e7-47ae-b1d5-ff0c1cf9a818" +ZMQ = "c2297ded-f4af-51ae-bb23-16f91089e4e1" + +[compat] +JSON3 = "1.14" +StructTypes = "1.10" +ZMQ = "1.2" +julia = "1.6" + +[extras] +Downloads = "f43a241f-c20a-4ad4-852c-f6b1247861c6" +Statistics = "10745b16-79ce-11e8-11f9-7d13ad32a3b2" +Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" + +[targets] +test = ["Downloads", "Statistics", "Test"] diff --git a/server/README.md b/server/README.md new file mode 100644 index 000000000..8be13597a --- /dev/null +++ b/server/README.md @@ -0,0 +1,51 @@ +# Wflow ZMQ Server +Call [Wflow](https://github.com/Deltares/Wflow.jl) functions exposed through the Basic Model +Interface (BMI) implementation of Wflow and three additonal Wflow functions related to model +states and start time of the model in Unix time, using [ZeroMQ](https://zeromq.org/) with +TCP data transport. The Wflow ZMQ Server allows users to interact with a Wflow model from +many other programming languages with ZeroMQ bindings. An example is the use of +[OpenDA](https://openda.org/) Java software for data-assimilation. The coupling of OpenDA +and Wflow through a black box model approach is too slow because it requires a restart of +Wflow (multiple times), while this is not required with a client-server approach. + +The Wflow ZMQ Server can be started through an interactive Julia version, using different +functions. First, start the `WflowServer` environment on startup, in the current directory: +``` +julia --project=. +``` +then start the Wflow ZMQ Server, listening on port 5556 with the `start` function as +follows: +```julia-repl +julia> using WflowServer +julia> WflowServer.start(5556) +``` +or start the Wflow ZMQ Server, listening on port 5556 with the `main` function: +```julia-repl +julia> using WflowServer +julia> WflowServer.main(["port=5556"]) +``` +or start the Wflow ZMQ Server with the `main` function using the default port number 5555: +```julia-repl +julia> using WflowServer +julia> WflowServer.main() +``` +Finally, it is also possible to start the Wflow ZMQ server directly from the command line +with the Julia script `run_server.jl` in the current directory, as follows: +``` +julia --project=. run_server.jl --port 5556 +``` + +## JSON +JSON is used for data serialization. The Wflow ZMQ Server maps a JSON message to a Julia +structure directly. Below examples of two messages that provide a function name `fn` and +other arguments required by the exposed Wflow functions (mostly through BMI): + +``` +# initialize a Wflow model through the configuration file `sbm_config.toml` +"""{"fn": "initialize", "config_file": "sbm_config.toml"}""" +# update model until time 86400.0 +"""{"fn": "update_until", "time": 86400.0}""" +``` + +The [tests](/server/test/) provide further examples of the interface to the Wflow (BMI) and +server functions. diff --git a/server/run_server.jl b/server/run_server.jl new file mode 100644 index 000000000..421c1f4c5 --- /dev/null +++ b/server/run_server.jl @@ -0,0 +1,3 @@ +using WflowServer + +WflowServer.main() diff --git a/server/src/WflowServer.jl b/server/src/WflowServer.jl new file mode 100644 index 000000000..c96c7731e --- /dev/null +++ b/server/src/WflowServer.jl @@ -0,0 +1,9 @@ +module WflowServer +import ZMQ +import JSON3 +import StructTypes +import Wflow + +include("bmi_service.jl") +include("server.jl") +end diff --git a/server/src/bmi_service.jl b/server/src/bmi_service.jl new file mode 100644 index 000000000..3a344179a --- /dev/null +++ b/server/src/bmi_service.jl @@ -0,0 +1,357 @@ +# Structs that contain function name `fn` and possible other function arguments. These +# structs are used in `wflow_bmi` functions (below these structs), that run Wflow.BMI (and +# Wflow) functions. + +struct Initialize + config_file::String + fn::String +end + +struct GetComponentName + fn::String +end + +struct GetInputItemCount + fn::String +end + +struct GetOutputItemCount + fn::String +end + +struct GetStartTime + fn::String +end + +struct GetStartUnixTime + fn::String +end + +struct GetEndTime + fn::String +end + +struct GetTimeStep + fn::String +end + +struct GetTimeUnits + fn::String +end + +struct GetCurrentTime + fn::String +end + +struct Update + fn::String +end + +struct UpdateUntil + fn::String + time::Float64 +end + +struct GetInputVarNames + fn::String +end + +struct GetOutputVarNames + fn::String +end + +struct GetVarItemSize + fn::String + name::String +end + +struct GetVarType + fn::String + name::String +end + +struct GetVarUnits + fn::String + name::String +end + +struct GetVarNbytes + fn::String + name::String +end + +struct GetVarLocation + fn::String + name::String +end + +struct GetValue + fn::String + name::String + dest::Vector{Wflow.Float} +end + +struct GetValuePtr + fn::String + name::String +end + +struct GetValueAtIndices + fn::String + inds::Vector{Int} + dest::Vector{Wflow.Float} + name::String +end + +struct SetValue + fn::String + name::String + src::Vector{Wflow.Float} +end + +struct SetValueAtIndices + fn::String + inds::Vector{Int} + name::String + src::Vector{Wflow.Float} +end + +struct GetGridType + fn::String + grid::Int +end + +struct GetGridRank + fn::String + grid::Int +end + +struct GetGridSize + fn::String + grid::Int +end + +struct GetGridX + fn::String + grid::Int + x::Vector{Wflow.Float} +end + +struct GetGridY + fn::String + grid::Int + y::Vector{Wflow.Float} +end + +struct GetGridNodeCount + fn::String + grid::Int +end + +struct GetGridEdgeCount + fn::String + grid::Int +end + +struct GetGridEdgeNodes + fn::String + grid::Int + edge_nodes::Vector{Int} +end + +struct GetVarGrid + fn::String + name::String +end + +struct Finalize + fn::String +end + +struct LoadState + fn::String +end + +struct SaveState + fn::String +end + +function wflow_bmi(m::Initialize, model::Union{Wflow.Model,Nothing}) + model = getfield(Wflow.BMI, Symbol(m.fn))(Wflow.Model, m.config_file) + return model +end + +function wflow_bmi(m::GetComponentName, model::Wflow.Model) + component_name = getfield(Wflow.BMI, Symbol(m.fn))(model) + return Dict("component_name" => component_name) +end + +function wflow_bmi(m::GetInputItemCount, model::Wflow.Model) + input_item_count = getfield(Wflow.BMI, Symbol(m.fn))(model) + return Dict("input_item_count" => input_item_count) +end + +function wflow_bmi(m::GetOutputItemCount, model::Wflow.Model) + output_item_count = getfield(Wflow.BMI, Symbol(m.fn))(model) + return Dict("output_item_count" => output_item_count) +end + +function wflow_bmi(m::GetStartTime, model::Wflow.Model) + start_time = getfield(Wflow.BMI, Symbol(m.fn))(model) + return Dict("start_time" => start_time) +end + +function wflow_bmi(m::GetStartUnixTime, model::Wflow.Model) + start_unix_time = getfield(Wflow, Symbol(m.fn))(model) + return Dict("start_unix_time" => start_unix_time) +end + +function wflow_bmi(m::GetEndTime, model::Wflow.Model) + end_time = getfield(Wflow.BMI, Symbol(m.fn))(model) + return Dict("end_time" => end_time) +end + +function wflow_bmi(m::GetTimeStep, model::Wflow.Model) + time_step = getfield(Wflow.BMI, Symbol(m.fn))(model) + return Dict("time_step" => time_step) +end + +function wflow_bmi(m::GetTimeUnits, model::Wflow.Model) + time_units = getfield(Wflow.BMI, Symbol(m.fn))(model) + return Dict("time_units" => time_units) +end + +function wflow_bmi(m::GetCurrentTime, model::Wflow.Model) + current_time = getfield(Wflow.BMI, Symbol(m.fn))(model) + return Dict("current_time" => current_time) +end + +function wflow_bmi(m::UpdateUntil, model::Wflow.Model) + model = getfield(Wflow.BMI, Symbol(m.fn))(model, m.time) + return model +end + +function wflow_bmi(m::Update, model::Wflow.Model) + model = getfield(Wflow.BMI, Symbol(m.fn))(model) + return model +end + +function wflow_bmi(m::GetInputVarNames, model::Wflow.Model) + input_var_names = getfield(Wflow.BMI, Symbol(m.fn))(model) + return Dict("input_var_names" => input_var_names) +end + +function wflow_bmi(m::GetOutputVarNames, model::Wflow.Model) + output_var_names = getfield(Wflow.BMI, Symbol(m.fn))(model) + return Dict("output_var_names" => output_var_names) +end + +function wflow_bmi(m::GetVarItemSize, model::Wflow.Model) + var_itemsize = getfield(Wflow.BMI, Symbol(m.fn))(model, m.name) + return Dict("var_itemsize" => var_itemsize) +end + +function wflow_bmi(m::GetVarType, model::Wflow.Model) + var_type = getfield(Wflow.BMI, Symbol(m.fn))(model, m.name) + return Dict("var_type" => var_type) +end + +function wflow_bmi(m::GetVarUnits, model::Wflow.Model) + var_units = getfield(Wflow.BMI, Symbol(m.fn))(model, m.name) + return Dict("var_units" => var_units) +end + +function wflow_bmi(m::GetVarNbytes, model::Wflow.Model) + var_nbytes = getfield(Wflow.BMI, Symbol(m.fn))(model, m.name) + return Dict("var_nbytes" => var_nbytes) +end + +function wflow_bmi(m::GetVarLocation, model::Wflow.Model) + var_location = getfield(Wflow.BMI, Symbol(m.fn))(model, m.name) + return Dict("var_location" => var_location) +end + +function wflow_bmi(m::GetValue, model::Wflow.Model) + getfield(Wflow.BMI, Symbol(m.fn))(model, m.name, m.dest) + return Dict("value" => m.dest) +end + +function wflow_bmi(m::GetValuePtr, model::Wflow.Model) + value_ptr = getfield(Wflow.BMI, Symbol(m.fn))(model, m.name) + return Dict("value_ptr" => value_ptr) +end + +function wflow_bmi(m::GetValueAtIndices, model::Wflow.Model) + getfield(Wflow.BMI, Symbol(m.fn))(model, m.name, m.dest, m.inds) + return Dict("value_at_indices" => m.dest) +end + +function wflow_bmi(m::SetValue, model::Wflow.Model) + getfield(Wflow.BMI, Symbol(m.fn))(model, m.name, m.src) + return nothing +end + +function wflow_bmi(m::SetValueAtIndices, model::Wflow.Model) + getfield(Wflow.BMI, Symbol(m.fn))(model, m.name, m.inds, m.src) + return nothing +end + +function wflow_bmi(m::GetGridType, model::Wflow.Model) + grid_type = getfield(Wflow.BMI, Symbol(m.fn))(model, m.grid) + return Dict("grid_type" => grid_type) +end + +function wflow_bmi(m::GetGridRank, model::Wflow.Model) + grid_rank = getfield(Wflow.BMI, Symbol(m.fn))(model, m.grid) + return Dict("grid_rank" => grid_rank) +end + +function wflow_bmi(m::GetGridSize, model::Wflow.Model) + grid_size = getfield(Wflow.BMI, Symbol(m.fn))(model, m.grid) + return Dict("grid_size" => grid_size) +end + +function wflow_bmi(m::GetGridX, model::Wflow.Model) + getfield(Wflow.BMI, Symbol(m.fn))(model, m.grid, m.x) + return Dict("grid_x" => m.x) +end + +function wflow_bmi(m::GetGridY, model::Wflow.Model) + getfield(Wflow.BMI, Symbol(m.fn))(model, m.grid, m.y) + return Dict("grid_y" => m.y) +end + +function wflow_bmi(m::GetGridNodeCount, model::Wflow.Model) + grid_node_count = getfield(Wflow.BMI, Symbol(m.fn))(model, m.grid) + return Dict("grid_node_count" => grid_node_count) +end + +function wflow_bmi(m::GetGridEdgeCount, model::Wflow.Model) + grid_edge_count = getfield(Wflow.BMI, Symbol(m.fn))(model, m.grid) + return Dict("grid_edge_count" => grid_edge_count) +end + +function wflow_bmi(m::GetGridEdgeNodes, model::Wflow.Model) + getfield(Wflow.BMI, Symbol(m.fn))(model, m.grid, m.edge_nodes) + return Dict("grid_edge_nodes" => m.edge_nodes) +end + +function wflow_bmi(m::GetVarGrid, model::Wflow.Model) + var_grid = getfield(Wflow.BMI, Symbol(m.fn))(model, m.name) + return Dict("var_grid" => var_grid) +end + +function wflow_bmi(m::Finalize, model::Wflow.Model) + getfield(Wflow.BMI, Symbol(m.fn))(model) + return Dict("status" => "OK") +end + +function wflow_bmi(m::LoadState, model::Wflow.Model) + model = getfield(Wflow, Symbol(m.fn))(model) + return model +end + +function wflow_bmi(m::SaveState, model::Wflow.Model) + getfield(Wflow, Symbol(m.fn))(model) + return Dict("status" => "OK") +end diff --git a/server/src/server.jl b/server/src/server.jl new file mode 100644 index 000000000..4e183af90 --- /dev/null +++ b/server/src/server.jl @@ -0,0 +1,185 @@ +# map JSON function name to Struct (bmi_service.jl) +const map_structs = Dict( + "initialize" => Initialize, + "get_component_name" => GetComponentName, + "get_input_item_count" => GetInputItemCount, + "get_output_item_count" => GetOutputItemCount, + "get_start_time" => GetStartTime, + "get_start_unix_time" => GetStartUnixTime, + "get_end_time" => GetEndTime, + "get_time_step" => GetTimeStep, + "get_time_units" => GetTimeUnits, + "get_current_time" => GetCurrentTime, + "update_until" => UpdateUntil, + "update" => Update, + "get_input_var_names" => GetInputVarNames, + "get_output_var_names" => GetOutputVarNames, + "get_var_itemsize" => GetVarItemSize, + "get_var_type" => GetVarType, + "get_var_units" => GetVarUnits, + "get_var_location" => GetVarLocation, + "get_var_nbytes" => GetVarNbytes, + "get_value" => GetValue, + "get_value_ptr" => GetValuePtr, + "get_value_at_indices" => GetValueAtIndices, + "set_value" => SetValue, + "set_value_at_indices" => SetValueAtIndices, + "get_grid_type" => GetGridType, + "get_var_grid" => GetVarGrid, + "get_grid_rank" => GetGridRank, + "get_grid_size" => GetGridSize, + "get_grid_x" => GetGridX, + "get_grid_y" => GetGridY, + "get_grid_node_count" => GetGridNodeCount, + "get_grid_edge_count" => GetGridEdgeCount, + "get_grid_edge_nodes" => GetGridEdgeNodes, + "finalize" => Finalize, + "load_state" => LoadState, + "save_state" => SaveState, +) + +mutable struct ModelHandler + model::Union{Wflow.Model,Nothing} +end + +"Shutdown ZMQ server" +function shutdown(s::ZMQ.Socket, ctx::ZMQ.Context) + @info "Shutting down Wflow ZMQ server on request..." + ZMQ.close(s) + ZMQ.close(ctx) +end + +"Error response ZMQ server" +function response(err::AbstractString, s::ZMQ.Socket) + @info "Send error response" + resp = Dict{String,String}("status" => "ERROR", "error" => err) + ZMQ.send(s, JSON3.write(resp)) +end + +"Status response ZMQ server" +function response(s::ZMQ.Socket) + @info "Send status response" + resp = Dict{String,String}("status" => "OK") + ZMQ.send(s, JSON3.write(resp)) +end + +"Validate JSON request against mapped Struct" +function valid_request(json) + for f in fieldnames(map_structs[json.fn]) + if f ∉ keys(json) + return f + break + end + end +end + +""" + wflow_bmi(s::Socket, handler::ModelHandler, f) + +Run a Wflow function through `wflow.bmi(f, handler.model)` and update Wflow Model `handler` +if required, depending on return type of `wflow.bmi(f, handler.model)`. +""" +function wflow_bmi(s::ZMQ.Socket, handler::ModelHandler, f) + try + ret = wflow_bmi(f, handler.model) + if typeof(ret) <: Wflow.Model # update of Wflow model + handler.model = ret + response(s) + elseif isnothing(ret) # for SetValue and SetValueAtIndices + response(s) + else + @info "Send response including output from Wflow function `$(f.fn)`" + ZMQ.send(s, JSON3.write(ret)) + end + catch e + @error "Wflow function `$(f.fn)` failed" exception = (e, catch_backtrace()) + err = string( + "Wflow function `$(f.fn)` failed\n exception =\n ", + sprint(showerror, e, catch_backtrace()), + ) + response(err, s) + end +end + +main() = main(ARGS) + +""" + main(ARGS::Vector{String}) + main() + +This is the main entry point of the Wflow ZMQ Server. Performs argument parsing and starts +the Wflow ZMQ Server, with `WflowServer.start(port::Int)`. +""" +function main(ARGS::Vector{String}) + n = length(ARGS) + if n == 0 + port = 5555 + elseif startswith(ARGS[1], "--port") + if occursin("=", ARGS[1]) + port = parse(Int, split(ARGS[1], "=")[2]) + else + port = parse(Int, ARGS[2]) + end + else + throw( + ArgumentError( + "One argument is allowed to specify the port number: `--port=` or `--port `, + where `` refers to the port number.", + ), + ) + end + start(port) +end + +""" + start(port::Int) + +Start the Wflow ZMQ Server using port number `port`. +""" +function start(port::Int) + @info "Start Wflow ZMQ Server..." + + # initialize Wflow model handler + handler = ModelHandler(nothing) + + # set up a ZMQ context, with optional port number (default = 5555) argument + context = ZMQ.Context() + socket = ZMQ.Socket(context, ZMQ.REP) + ZMQ.bind(socket, "tcp://*:$port") + + try + while true + # Wait for next request from client + req = ZMQ.recv(socket) + json = JSON3.read(req) + @info "Received request to run function `$(json.fn)`..." + + if haskey(map_structs, json.fn) + v = valid_request(json) + if isnothing(v) + f = StructTypes.constructfrom(map_structs[json.fn], json) + wflow_bmi(socket, handler, f) + else + err = ( + """At least one required argument name (`$v`) not available for function: `$(json.fn)`""" + ) + @error err + response(err, socket) + end + elseif json.fn === "shutdown" + response(socket) + break + else + err = "Received invalid Wflow function: `$(json.fn)`" + @error err + response(err, socket) + end + end + catch e + err = "Wflow ZMQ Server: exception in process" + @error err + response(err, socket) + finally + shutdown(socket, context) + end +end diff --git a/server/test/client.jl b/server/test/client.jl new file mode 100644 index 000000000..41fc1182d --- /dev/null +++ b/server/test/client.jl @@ -0,0 +1,170 @@ +# start Wflow ZMQ server (@async) +@async begin + WflowServer.main() +end + +# Connecting to the Wflow ZMQ Server +context = ZMQ.Context() +socket = ZMQ.Socket(context, ZMQ.REQ) +ZMQ.connect(socket, "tcp://localhost:5555") + +function request(message) + ZMQ.send(socket, JSON3.write(message)) + ret_value = JSON3.read(ZMQ.recv(socket), Dict) + return ret_value +end + +@testset "initialization and time functions" begin + msg = + (fn = "initialize", config_file = joinpath(@__DIR__, "sbm_config.toml")) + @test request(msg) == Dict("status" => "OK") + @test request((fn = "get_end_time",)) == Dict("end_time" => 2678400) + @test request((fn = "get_start_time",)) == Dict("start_time" => 0) + @test request((fn = "get_start_unix_time",)) == Dict("start_unix_time" => 946684800) + @test request((fn = "get_time_step",)) == Dict("time_step" => 86400) + @test request((fn = "get_time_units",)) == Dict("time_units" => "s") +end + +@testset "update functions" begin + @test request((fn = "update_until", time = 86400.0)) == Dict("status" => "OK") + @test request((fn = "get_current_time",)) == Dict("current_time" => 86400) + @test request((fn = "update",)) == Dict("status" => "OK") +end + +@testset "model information functions" begin + @test request((fn = "get_component_name",)) == Dict("component_name" => "sbm") + @test request((fn = "get_input_item_count",)) == Dict("input_item_count" => 181) + @test request((fn = "get_output_item_count",)) == Dict("output_item_count" => 181) + @test request((fn = "get_input_var_names",))["input_var_names"][[1, 5, 151, 175]] == [ + "vertical.nlayers", + "vertical.θᵣ", + "lateral.river.q", + "lateral.river.reservoir.outflow", + ] + @test request((fn = "get_output_var_names",))["output_var_names"][[1, 5, 151, 175]] == [ + "vertical.nlayers", + "vertical.θᵣ", + "lateral.river.q", + "lateral.river.reservoir.outflow", + ] +end + +zi_size = 0 +vwc_1_size = 0 +@testset "variable information and get and set functions" begin + @test request((fn = "get_var_itemsize", name = "lateral.subsurface.ssf")) == + Dict("var_itemsize" => sizeof(Wflow.Float)) + @test request((fn = "get_var_type", name = "vertical.n"))["status"] == "ERROR" + @test request((fn = "get_var_units", name = "vertical.θₛ")) == Dict("var_units" => "-") + @test request((fn = "get_var_location", name = "lateral.river.q")) == + Dict("var_location" => "node") + zi_nbytes = request((fn = "get_var_nbytes", name = "vertical.zi"))["var_nbytes"] + @test zi_nbytes == 400504 + zi_itemsize = request((fn = "get_var_itemsize", name = "vertical.zi"))["var_itemsize"] + zi_size = Int(zi_nbytes / zi_itemsize) + vwc_1_nbytes = request((fn = "get_var_nbytes", name = "vertical.vwc[1]"))["var_nbytes"] + @test vwc_1_nbytes == 400504 + vwc_1_itemsize = + request((fn = "get_var_itemsize", name = "vertical.vwc[1]"))["var_itemsize"] + vwc_1_size = Int(vwc_1_nbytes / vwc_1_itemsize) + @test request((fn = "get_var_grid", name = "lateral.river.h")) == Dict("var_grid" => 3) + msg = (fn = "get_value", name = "vertical.zi", dest = fill(0.0, zi_size)) + @test mean(request(msg)["value"]) ≈ 278.1510965581235 + msg = (fn = "get_value_ptr", name = "vertical.θₛ") + @test mean(request(msg)["value_ptr"]) ≈ 0.4409211971535584 + msg = ( + fn = "get_value_at_indices", + name = "lateral.river.q", + dest = [0.0, 0.0, 0.0], + inds = [1, 5, 10], + ) + @test request(msg)["value_at_indices"] ≈ + [2.1901434445889123, 2.6778265820894545, 3.470059871798756] + msg = (fn = "set_value", name = "vertical.zi", src = fill(300.0, zi_size)) + @test request(msg) == Dict("status" => "OK") + msg = (fn = "get_value", name = "vertical.zi", dest = fill(0.0, zi_size)) + @test mean(request(msg)["value"]) == 300.0 + msg = ( + fn = "set_value_at_indices", + name = "vertical.zi", + src = [250.0, 350.0], + inds = [1, 2], + ) + @test request(msg) == Dict("status" => "OK") + msg = ( + fn = "get_value_at_indices", + name = "vertical.zi", + dest = [0.0, 0.0, 0.0], + inds = [1, 2, 3], + ) + @test request(msg)["value_at_indices"] == [250.0, 350.0, 300.0] + msg = (fn = "get_value", name = "vertical.vwc[1]", dest = fill(0.0, vwc_1_size)) + @test mean(request(msg)["value"]) ≈ 0.1845159140308566f0 + msg = ( + fn = "get_value_at_indices", + name = "vertical.vwc[1]", + dest = [0.0, 0.0, 0.0], + inds = [1, 2, 3], + ) + @test request(msg)["value_at_indices"] ≈ + [0.12089607119560242f0, 0.11967185322648062f0, 0.14503555864288548f0] + msg = (fn = "set_value", name = "vertical.vwc[1]", src = fill(0.3, vwc_1_size)) + @test request(msg) == Dict("status" => "OK") + msg = (fn = "get_value", name = "vertical.vwc[1]", dest = fill(0.0, vwc_1_size)) + @test mean(request(msg)["value"]) ≈ 0.3f0 + msg = ( + fn = "get_value_at_indices", + name = "vertical.vwc[1]", + dest = [0.0, 0.0, 0.0], + inds = [1, 2, 3], + ) + @test request(msg)["value_at_indices"] == [0.3, 0.3, 0.3] + msg = ( + fn = "set_value_at_indices", + name = "vertical.vwc[1]", + src = [0.1, 0.25], + inds = [1, 2], + ) + @test request(msg) == Dict("status" => "OK") + msg = ( + fn = "get_value_at_indices", + name = "vertical.vwc[1]", + dest = [0.0, 0.0], + inds = [1, 2], + ) + @test request(msg)["value_at_indices"] == [0.1, 0.25] +end + +@testset "model grid functions" begin + @test request((fn = "get_grid_type", grid = 0)) == Dict("grid_type" => "points") + @test request((fn = "get_grid_rank", grid = 0)) == Dict("grid_rank" => 2) + grid_size = request((fn = "get_grid_size", grid = 4))["grid_size"] + @test grid_size == 50063 + msg = (fn = "get_grid_x", grid = 4, x = fill(0.0, grid_size)) + @test request(msg)["grid_x"][1:3] ≈ + [6.826666666666673, 6.810000000000006, 6.81833333333334] + msg = (fn = "get_grid_y", grid = 4, y = fill(0.0, grid_size)) + @test request(msg)["grid_y"][1:3] ≈ [47.8175, 47.825833333333335, 47.825833333333335] + @test request((fn = "get_grid_node_count", grid = 0)) == Dict("grid_node_count" => 2) + @test request((fn = "get_grid_edge_count", grid = 3))["grid_edge_count"] == 5808 + msg = (fn = "get_grid_edge_nodes", grid = 3, edge_nodes = fill(0, 2 * 5808)) + @test request(msg)["grid_edge_nodes"][1:6] == [1, 5, 2, 1, 3, 2] +end + +@testset "model states and finalize functions" begin + @test request((fn = "load_state",)) == Dict("status" => "OK") + @test request((fn = "save_state",)) == Dict("status" => "OK") + @test request((fn = "finalize",)) == Dict("status" => "OK") +end + +@testset "Error handling and shutdown" begin + @test request((fn = "not_existing_function",)) == Dict( + "status" => "ERROR", + "error" => "Received invalid Wflow function: `not_existing_function`", + ) + @test request((fn = "initialize",)) == Dict( + "status" => "ERROR", + "error" => "At least one required argument name (`config_file`) not available for function: `initialize`", + ) + @test request((fn = "shutdown",)) == Dict("status" => "OK") +end diff --git a/server/test/runtests.jl b/server/test/runtests.jl new file mode 100644 index 000000000..8d016a2b6 --- /dev/null +++ b/server/test/runtests.jl @@ -0,0 +1,35 @@ +import ZMQ +import JSON3 +import StructTypes +import Wflow +import WflowServer +import Statistics: mean +import Logging: with_logger, NullLogger +import Test: @testset, @test +import Downloads + +# ensure test data is present +testdir = @__DIR__ +datadir = joinpath(testdir, "data") +inputdir = joinpath(datadir, "input") +isdir(inputdir) || mkpath(inputdir) + +"Download a test data file if it does not already exist" +function testdata(version, source_filename, target_filename) + target_path = joinpath(inputdir, target_filename) + base_url = "https://github.com/visr/wflow-artifacts/releases/download" + url = string(base_url, '/', string('v', version), '/', source_filename) + isfile(target_path) || Downloads.download(url, target_path) + return target_path +end + +staticmaps_moselle_path = + testdata(v"0.2.9", "staticmaps-moselle.nc", "staticmaps-moselle.nc") +forcing_moselle_path = testdata(v"0.2.6", "forcing-moselle.nc", "forcing-moselle.nc") +instates_moselle_path = testdata(v"0.2.6", "instates-moselle.nc", "instates-moselle.nc") + +with_logger(NullLogger()) do + @testset "Test client server Wflow ZMQ Server" begin + include("client.jl") + end +end diff --git a/server/test/sbm_config.toml b/server/test/sbm_config.toml new file mode 100644 index 000000000..3ab5c92d0 --- /dev/null +++ b/server/test/sbm_config.toml @@ -0,0 +1,232 @@ +# This is a TOML configuration file for Wflow. +# Relative file paths are interpreted as being relative to this TOML file. +# Wflow documentation https://deltares.github.io/Wflow.jl/dev/ +# TOML documentation: https://github.com/toml-lang/toml + +calendar = "proleptic_gregorian" +endtime = 2000-02-01T00:00:00 +starttime = 2000-01-01T00:00:00 +time_units = "days since 1900-01-01 00:00:00" +timestepsecs = 86400 +dir_input = "data/input" +dir_output = "data/output" +loglevel = "info" + +[state] +path_input = "instates-moselle.nc" +path_output = "outstates-moselle.nc" + +# if listed, the variable must be present in the NetCDF or error +# if not listed, the variable can get a default value if it has one + +[state.vertical] +canopystorage = "canopystorage" +satwaterdepth = "satwaterdepth" +snow = "snow" +snowwater = "snowwater" +tsoil = "tsoil" +ustorelayerdepth = "ustorelayerdepth" + +[state.lateral.river] +h = "h_river" +h_av = "h_av_river" +q = "q_river" + +[state.lateral.river.reservoir] +volume = "volume_reservoir" + +[state.lateral.subsurface] +ssf = "ssf" + +[state.lateral.land] +h = "h_land" +h_av = "h_av_land" +q = "q_land" + +[input] +path_forcing = "forcing-moselle.nc" +path_static = "staticmaps-moselle.nc" + +# these are not directly part of the model +gauges = "wflow_gauges_grdc" +ldd = "wflow_ldd" +river_location = "wflow_river" +subcatchment = "wflow_subcatch" + +# specify the internal IDs of the parameters which vary over time +# the external name mapping needs to be below together with the other mappings +forcing = [ + "vertical.precipitation", + "vertical.temperature", + "vertical.potential_evaporation", +] + +cyclic = ["vertical.leaf_area_index"] + +[input.vertical] +c = "c" +cf_soil = "cf_soil" +cfmax = "Cfmax" +e_r = "EoverR" +f = "f" +infiltcappath = "InfiltCapPath" +infiltcapsoil = "InfiltCapSoil" +kext = "Kext" +leaf_area_index = "LAI" +maxleakage = "MaxLeakage" +pathfrac = "PathFrac" +potential_evaporation = "pet" +precipitation = "precip" +rootdistpar = "rootdistpar" +rootingdepth = "RootingDepth" +soilthickness = "SoilThickness" +specific_leaf = "Sl" +storage_wood = "Swood" +temperature = "temp" +tt = "TT" +tti = "TTI" +ttm = "TTM" +water_holding_capacity = "WHC" +waterfrac = "WaterFrac" +theta_r = "thetaR" +theta_s = "thetaS" + +[input.vertical.kv_0] +netcdf.variable.name = "KsatVer" +scale = 1.0 +offset = 0.0 + +[input.lateral.river] +length = "wflow_riverlength" +n = "N_River" +slope = "RiverSlope" +width = "wflow_riverwidth" +bankfull_elevation = "RiverZ" +bankfull_depth = "RiverDepth" + +[input.lateral.river.reservoir] +area = "ResSimpleArea" +areas = "wflow_reservoirareas" +demand = "ResDemand" +locs = "wflow_reservoirlocs" +maxrelease = "ResMaxRelease" +maxvolume = "ResMaxVolume" +targetfullfrac = "ResTargetFullFrac" +targetminfrac = "ResTargetMinFrac" + +[input.lateral.subsurface] +ksathorfrac = "KsatHorFrac" + +[input.lateral.land] +n = "N" +slope = "Slope" + +[model] +kin_wave_iteration = true +masswasting = true +reinit = true +reservoirs = true +snow = true +thicknesslayers = [100, 300, 800] +type = "sbm" +min_streamorder_river = 6 +min_streamorder_land = 5 + +[output] +path = "output_moselle.nc" + +[output.vertical] +canopystorage = "canopystorage" +satwaterdepth = "satwaterdepth" +snow = "snow" +snowwater = "snowwater" +tsoil = "tsoil" +ustorelayerdepth = "ustorelayerdepth" + +[output.lateral.river] +h = "h_river" +q = "q_river" + +[output.lateral.river.reservoir] +volume = "volume_reservoir" + +[output.lateral.subsurface] +ssf = "ssf" + +[output.lateral.land] +h = "h_land" +q = "q_land" + +[netcdf] +path = "output_scalar_moselle.nc" + +[[netcdf.variable]] +name = "Q" +map = "gauges" +parameter = "lateral.river.q" + +[[netcdf.variable]] +coordinate.x = 6.255 +coordinate.y = 50.012 +name = "temp_coord" +location = "temp_bycoord" +parameter = "vertical.temperature" + +[[netcdf.variable]] +location = "temp_byindex" +name = "temp_index" +index.x = 100 +index.y = 264 +parameter = "vertical.temperature" + +[csv] +path = "output_moselle.csv" + +[[csv.column]] +header = "Q" +parameter = "lateral.river.q" +reducer = "maximum" + +[[csv.column]] +header = "volume" +index = 1 +parameter = "lateral.river.reservoir.volume" + +[[csv.column]] +coordinate.x = 6.255 +coordinate.y = 50.012 +header = "temp_bycoord" +parameter = "vertical.temperature" + +[[csv.column]] +coordinate.x = 6.255 +coordinate.y = 50.012 +header = "vwc_layer2_bycoord" +parameter = "vertical.vwc" +layer = 2 + +[[csv.column]] +header = "temp_byindex" +index.x = 100 +index.y = 264 +parameter = "vertical.temperature" + +[[csv.column]] +header = "Q" +map = "gauges" +parameter = "lateral.river.q" + +[[csv.column]] +header = "recharge" +map = "subcatchment" +parameter = "vertical.recharge" +reducer = "mean" + +[API] +components = [ + "vertical", + "lateral.subsurface", + "lateral.land", + "lateral.river", + "lateral.river.reservoir", +] diff --git a/src/flow.jl b/src/flow.jl index c0c0a1515..c9c8d25e8 100644 --- a/src/flow.jl +++ b/src/flow.jl @@ -283,7 +283,6 @@ function update(sf::SurfaceFlowRiver, network, inflow_wb, doy) for (n, v) in zip(indices_subdomain[m], topo_subdomain[m]) # sf.qin by outflow from upstream reservoir or lake location is added sf.qin[v] += sum_at(sf.q, upstream_nodes[n]) - # Inflow supply/abstraction is added to qlat (divide by flow length) # If inflow < 0, abstraction is limited if sf.inflow[v] < 0.0 @@ -349,7 +348,6 @@ function update(sf::SurfaceFlowRiver, network, inflow_wb, doy) # update h crossarea = sf.α[v] * pow(sf.q[v], sf.β) sf.h[v] = crossarea / sf.width[v] - sf.q_av[v] += sf.q[v] sf.h_av[v] += sf.h[v] end diff --git a/src/sediment_model.jl b/src/sediment_model.jl index aea2ceb83..6e1f1397d 100644 --- a/src/sediment_model.jl +++ b/src/sediment_model.jl @@ -149,8 +149,8 @@ function initialize_sediment_model(config::Config) ) model = set_states(model) - @info "Initialized model" + return model end