Skip to content

Commit

Permalink
Supervised api
Browse files Browse the repository at this point in the history
  • Loading branch information
attdona committed Jun 21, 2024
1 parent 910d24b commit 1bf0b68
Show file tree
Hide file tree
Showing 7 changed files with 161 additions and 10 deletions.
10 changes: 10 additions & 0 deletions examples/hierarchy_subscriber.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
using Rembus
using UUIDs

consume(topic, value) = @info "$topic = $value"

url = isempty(ARGS) ? string(uuid4()) : ARGS[1]
rb = component(url)
subscribe(rb, "a/*/c", consume, true)

forever(rb)
111 changes: 105 additions & 6 deletions src/Rembus.jl
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ export @forever
export @terminate

# rembus client api
export component
export connect
export isauthenticated
export server
Expand All @@ -65,6 +66,7 @@ export rembus
export shared
export set_balancer
export forever
export terminate

# broker api
export caronte, session, republish, msg_payload
Expand Down Expand Up @@ -919,20 +921,37 @@ struct SetHolder
end

struct AddImpl
topic::String
fn::Function
AddImpl(fn::Function) = new(string(fn), fn)
AddImpl(topic::AbstractString, fn::Function) = new(topic, fn)
end

struct RemoveImpl
fn::Function
fn::String
RemoveImpl(fn::AbstractString) = new(fn)
RemoveImpl(fn::Function) = new(string(fn))
end

struct AddInterest
topic::String
fn::Function
retroactive::Bool
AddInterest(
topic::AbstractString,
fn::Function,
retroactive::Bool
) = new(topic, fn, retroactive)
AddInterest(
fn::Function,
retroactive::Bool
) = new(string(fn), fn, retroactive)
end

struct RemoveInterest
fn::Function
fn::String
RemoveInterest(fn::AbstractString) = new(fn)
RemoveInterest(fn::Function) = new(string(fn))
end

struct Reactive
Expand Down Expand Up @@ -1007,20 +1026,20 @@ function rembus_task(pd, rb, protocol=:ws)
result = shared(rb, msg.request.shared)
elseif isa(req, AddImpl)
result = expose(
rb, string(msg.request.fn), msg.request.fn, exceptionerror=false
rb, msg.request.topic, msg.request.fn, exceptionerror=false
)
elseif isa(req, RemoveImpl)
result = unexpose(rb, string(msg.request.fn), exceptionerror=false)
result = unexpose(rb, msg.request.fn, exceptionerror=false)
elseif isa(req, AddInterest)
result = subscribe(
rb,
string(msg.request.fn),
msg.request.topic,
msg.request.fn,
msg.request.retroactive,
exceptionerror=false
)
elseif isa(req, RemoveInterest)
result = unsubscribe(rb, string(msg.request.fn), exceptionerror=false)
result = unsubscribe(rb, msg.request.fn, exceptionerror=false)
elseif isa(req, EnableAck)
if req.status
result = enable_ack(rb, exceptionerror=false)
Expand Down Expand Up @@ -1870,6 +1889,74 @@ function subscribe(
return subscribe(rb, string(fn), fn, retroactive; exceptionerror=exceptionerror)
end

"""
component(url)
Connect rembus component defined by `url`.
The connection is supervised and network faults starts connection retries attempts
until successful outcome.
"""
function component(url=getcomponent())
rb = Rembus.RBConnection(url)

p = process(rb.client.id, Rembus.rembus_task,
args=(rb,), debounce_time=2, restart=:transient)

supervise(
p, intensity=3, wait=false
)
return p
end

terminate(proc::Visor.Process) = shutdown(proc)

function expose(proc::Visor.Process, fn::Function)
return call(proc, Rembus.AddImpl(fn), timeout=request_timeout())
end

function expose(proc::Visor.Process, topic::AbstractString, fn::Function)
return call(proc, Rembus.AddImpl(topic, fn), timeout=request_timeout())
end

function unexpose(proc::Visor.Process, fn)
return call(proc, Rembus.RemoveImpl(fn), timeout=request_timeout())
end

function subscribe(proc::Visor.Process, fn::Function, retroactive::Bool=false)
return call(proc, Rembus.AddInterest(fn, retroactive), timeout=request_timeout())
end

function unsubscribe(proc::Visor.Process, fn)
return call(proc, Rembus.RemoveInterest(fn), timeout=request_timeout())
end

function subscribe(
proc::Visor.Process, topic::AbstractString, fn::Function, retroactive::Bool=false
)
return call(proc, Rembus.AddInterest(topic, fn, retroactive), timeout=request_timeout())
end

function reactive(proc::Visor.Process)
return call(proc, Reactive(true), timeout=request_timeout())
end

function unreactive(proc::Visor.Process)
return call(proc, Reactive(false), timeout=request_timeout())
end

function shared(proc::Visor.Process, ctx)
return call(proc, SetHolder(ctx), timeout=request_timeout())
end

function publish(proc::Visor.Process, topic::AbstractString, data=[])
cast(proc, PubSubMsg(topic, data))
end

function rpc(proc::Visor.Process, topic::AbstractString, data=[])
return call(proc, RpcReqMsg(topic, data), timeout=request_timeout())
end

"""
unsubscribe(rb::RBHandle, topic::AbstractString; exceptionerror=true)
unsubscribe(rb::RBHandle, fn::Function; exceptionerror=true)
Expand Down Expand Up @@ -2187,6 +2274,18 @@ function waiter(pd)
@info "forever done"
end

"""
forever(rb::Visor.Process)
Start the event loop awaiting to execute exposed and subscribed methods.
"""
function forever(rb::Visor.Process)
reactive(rb)
if !isinteractive()
wait(Visor.root_supervisor(rb))
end
end

"""
forever(rb::RBHandle)
Expand Down
40 changes: 40 additions & 0 deletions test/api/test_supervised_api.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
include("../utils.jl")

my_topic(ctx) = @info "my_topic called"
my_service(ctx, x, y) = x + y

function run()
ctx = "mystring"

rb = component("abc")

shared(rb, ctx)
subscribe(rb, my_topic, true)
subscribe(rb, "topic", my_topic, true)
expose(rb, "service", my_service)
expose(rb, my_service)
reactive(rb)

client = connect()
publish(client, "topic")
res = rpc(client, "service", [1, 2])
@test res == 3

unsubscribe(rb, "topic")
unexpose(rb, "service")
unreactive(rb)

@test_throws RpcMethodUnavailable rpc(client, "service", [1, 2])

# just for lines coverage
@async forever(rb)

publish(rb, "devnull")
res = rpc(rb, "version")
@test isa(res, String)

close(client)
terminate(rb)
end

execute(run, "test_supervised_api")
2 changes: 0 additions & 2 deletions test/integration/test_rembus_task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ function run()

proc = from("rembus")

Rembus.processput!(proc, Rembus.WrongTcpPacket())
sleep(0.1)
Rembus.processput!(proc, ErrorException("boom"))
sleep(1)
res = @rpc version()
Expand Down
4 changes: 2 additions & 2 deletions test/integration/test_zmq_protocol_errors.jl
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ end

function Rembus.transport_send(::Rembus.RBConnection, socket::ZMQ.Socket, msg::InvalidMsg)
send(socket, Message(), more=true)
send(socket, encode([63]), more=true)
send(socket, encode([104]), more=true)
send(socket, "aaaa", more=true)
send(socket, Rembus.MESSAGE_END, more=false)
end
Expand Down Expand Up @@ -45,7 +45,7 @@ function run()
@debug "expected error: $(e.msg)" _group = :test
@test isa(e, Rembus.RembusTimeout)
end

@test isopen(rb.socket)
version = Rembus.rpc(rb, "version")
@test version === Rembus.VERSION

Expand Down
3 changes: 3 additions & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ const GROUP = get(ENV, "GROUP", "all")
end
end
if GROUP == "all" || GROUP == "api"
@time @safetestset "supervised_api" begin
include("api/test_supervised_api.jl")
end
@time @safetestset "component" begin
include("api/test_component.jl")
end
Expand Down
1 change: 1 addition & 0 deletions test/utils.jl
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ function execute(
fn()
catch e
@error "[$testname] failed: $e"
@test false
#showerror(stdout, e, catch_backtrace())
finally
shutdown()
Expand Down

0 comments on commit 1bf0b68

Please sign in to comment.