From 1bf0b68138ea5dea95a3d094175e3ef9a857d8c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attilio=20Don=C3=A0?= Date: Fri, 21 Jun 2024 16:16:35 +0200 Subject: [PATCH] Supervised api --- examples/hierarchy_subscriber.jl | 10 ++ src/Rembus.jl | 111 ++++++++++++++++++- test/api/test_supervised_api.jl | 40 +++++++ test/integration/test_rembus_task.jl | 2 - test/integration/test_zmq_protocol_errors.jl | 4 +- test/runtests.jl | 3 + test/utils.jl | 1 + 7 files changed, 161 insertions(+), 10 deletions(-) create mode 100644 examples/hierarchy_subscriber.jl create mode 100644 test/api/test_supervised_api.jl diff --git a/examples/hierarchy_subscriber.jl b/examples/hierarchy_subscriber.jl new file mode 100644 index 0000000..e1a653d --- /dev/null +++ b/examples/hierarchy_subscriber.jl @@ -0,0 +1,10 @@ +using Rembus +using UUIDs + +consume(topic, value) = @info "$topic = $value" + +url = isempty(ARGS) ? string(uuid4()) : ARGS[1] +rb = component(url) +subscribe(rb, "a/*/c", consume, true) + +forever(rb) diff --git a/src/Rembus.jl b/src/Rembus.jl index 0f48cbf..330b26d 100644 --- a/src/Rembus.jl +++ b/src/Rembus.jl @@ -45,6 +45,7 @@ export @forever export @terminate # rembus client api +export component export connect export isauthenticated export server @@ -65,6 +66,7 @@ export rembus export shared export set_balancer export forever +export terminate # broker api export caronte, session, republish, msg_payload @@ -919,20 +921,37 @@ struct SetHolder end struct AddImpl + topic::String fn::Function + AddImpl(fn::Function) = new(string(fn), fn) + AddImpl(topic::AbstractString, fn::Function) = new(topic, fn) end struct RemoveImpl - fn::Function + fn::String + RemoveImpl(fn::AbstractString) = new(fn) + RemoveImpl(fn::Function) = new(string(fn)) end struct AddInterest + topic::String fn::Function retroactive::Bool + AddInterest( + topic::AbstractString, + fn::Function, + retroactive::Bool + ) = new(topic, fn, retroactive) + AddInterest( + fn::Function, + retroactive::Bool + ) = new(string(fn), fn, retroactive) end struct RemoveInterest - fn::Function + fn::String + RemoveInterest(fn::AbstractString) = new(fn) + RemoveInterest(fn::Function) = new(string(fn)) end struct Reactive @@ -1007,20 +1026,20 @@ function rembus_task(pd, rb, protocol=:ws) result = shared(rb, msg.request.shared) elseif isa(req, AddImpl) result = expose( - rb, string(msg.request.fn), msg.request.fn, exceptionerror=false + rb, msg.request.topic, msg.request.fn, exceptionerror=false ) elseif isa(req, RemoveImpl) - result = unexpose(rb, string(msg.request.fn), exceptionerror=false) + result = unexpose(rb, msg.request.fn, exceptionerror=false) elseif isa(req, AddInterest) result = subscribe( rb, - string(msg.request.fn), + msg.request.topic, msg.request.fn, msg.request.retroactive, exceptionerror=false ) elseif isa(req, RemoveInterest) - result = unsubscribe(rb, string(msg.request.fn), exceptionerror=false) + result = unsubscribe(rb, msg.request.fn, exceptionerror=false) elseif isa(req, EnableAck) if req.status result = enable_ack(rb, exceptionerror=false) @@ -1870,6 +1889,74 @@ function subscribe( return subscribe(rb, string(fn), fn, retroactive; exceptionerror=exceptionerror) end +""" + component(url) + +Connect rembus component defined by `url`. + +The connection is supervised and network faults starts connection retries attempts +until successful outcome. +""" +function component(url=getcomponent()) + rb = Rembus.RBConnection(url) + + p = process(rb.client.id, Rembus.rembus_task, + args=(rb,), debounce_time=2, restart=:transient) + + supervise( + p, intensity=3, wait=false + ) + return p +end + +terminate(proc::Visor.Process) = shutdown(proc) + +function expose(proc::Visor.Process, fn::Function) + return call(proc, Rembus.AddImpl(fn), timeout=request_timeout()) +end + +function expose(proc::Visor.Process, topic::AbstractString, fn::Function) + return call(proc, Rembus.AddImpl(topic, fn), timeout=request_timeout()) +end + +function unexpose(proc::Visor.Process, fn) + return call(proc, Rembus.RemoveImpl(fn), timeout=request_timeout()) +end + +function subscribe(proc::Visor.Process, fn::Function, retroactive::Bool=false) + return call(proc, Rembus.AddInterest(fn, retroactive), timeout=request_timeout()) +end + +function unsubscribe(proc::Visor.Process, fn) + return call(proc, Rembus.RemoveInterest(fn), timeout=request_timeout()) +end + +function subscribe( + proc::Visor.Process, topic::AbstractString, fn::Function, retroactive::Bool=false +) + return call(proc, Rembus.AddInterest(topic, fn, retroactive), timeout=request_timeout()) +end + +function reactive(proc::Visor.Process) + return call(proc, Reactive(true), timeout=request_timeout()) +end + +function unreactive(proc::Visor.Process) + return call(proc, Reactive(false), timeout=request_timeout()) +end + +function shared(proc::Visor.Process, ctx) + return call(proc, SetHolder(ctx), timeout=request_timeout()) +end + +function publish(proc::Visor.Process, topic::AbstractString, data=[]) + cast(proc, PubSubMsg(topic, data)) +end + +function rpc(proc::Visor.Process, topic::AbstractString, data=[]) + return call(proc, RpcReqMsg(topic, data), timeout=request_timeout()) +end + """ unsubscribe(rb::RBHandle, topic::AbstractString; exceptionerror=true) unsubscribe(rb::RBHandle, fn::Function; exceptionerror=true) @@ -2187,6 +2274,18 @@ function waiter(pd) @info "forever done" end +""" + forever(rb::Visor.Process) + + Start the event loop awaiting to execute exposed and subscribed methods. +""" +function forever(rb::Visor.Process) + reactive(rb) + if !isinteractive() + wait(Visor.root_supervisor(rb)) + end +end + """ forever(rb::RBHandle) diff --git a/test/api/test_supervised_api.jl b/test/api/test_supervised_api.jl new file mode 100644 index 0000000..dfb8028 --- /dev/null +++ b/test/api/test_supervised_api.jl @@ -0,0 +1,40 @@ +include("../utils.jl") + +my_topic(ctx) = @info "my_topic called" +my_service(ctx, x, y) = x + y + +function run() + ctx = "mystring" + + rb = component("abc") + + shared(rb, ctx) + subscribe(rb, my_topic, true) + subscribe(rb, "topic", my_topic, true) + expose(rb, "service", my_service) + expose(rb, my_service) + reactive(rb) + + client = connect() + publish(client, "topic") + res = rpc(client, "service", [1, 2]) + @test res == 3 + + unsubscribe(rb, "topic") + unexpose(rb, "service") + unreactive(rb) + + @test_throws RpcMethodUnavailable rpc(client, "service", [1, 2]) + + # just for lines coverage + @async forever(rb) + + publish(rb, "devnull") + res = rpc(rb, "version") + @test isa(res, String) + + close(client) + terminate(rb) +end + +execute(run, "test_supervised_api") diff --git a/test/integration/test_rembus_task.jl b/test/integration/test_rembus_task.jl index eec9554..7353041 100644 --- a/test/integration/test_rembus_task.jl +++ b/test/integration/test_rembus_task.jl @@ -7,8 +7,6 @@ function run() proc = from("rembus") - Rembus.processput!(proc, Rembus.WrongTcpPacket()) - sleep(0.1) Rembus.processput!(proc, ErrorException("boom")) sleep(1) res = @rpc version() diff --git a/test/integration/test_zmq_protocol_errors.jl b/test/integration/test_zmq_protocol_errors.jl index 353edb3..c59d244 100644 --- a/test/integration/test_zmq_protocol_errors.jl +++ b/test/integration/test_zmq_protocol_errors.jl @@ -14,7 +14,7 @@ end function Rembus.transport_send(::Rembus.RBConnection, socket::ZMQ.Socket, msg::InvalidMsg) send(socket, Message(), more=true) - send(socket, encode([63]), more=true) + send(socket, encode([104]), more=true) send(socket, "aaaa", more=true) send(socket, Rembus.MESSAGE_END, more=false) end @@ -45,7 +45,7 @@ function run() @debug "expected error: $(e.msg)" _group = :test @test isa(e, Rembus.RembusTimeout) end - + @test isopen(rb.socket) version = Rembus.rpc(rb, "version") @test version === Rembus.VERSION diff --git a/test/runtests.jl b/test/runtests.jl index 3903c68..bf93106 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -145,6 +145,9 @@ const GROUP = get(ENV, "GROUP", "all") end end if GROUP == "all" || GROUP == "api" + @time @safetestset "supervised_api" begin + include("api/test_supervised_api.jl") + end @time @safetestset "component" begin include("api/test_component.jl") end diff --git a/test/utils.jl b/test/utils.jl index cdcafa1..f42638e 100755 --- a/test/utils.jl +++ b/test/utils.jl @@ -74,6 +74,7 @@ function execute( fn() catch e @error "[$testname] failed: $e" + @test false #showerror(stdout, e, catch_backtrace()) finally shutdown()