Skip to content

Commit

Permalink
experimental multiplexer feature
Browse files Browse the repository at this point in the history
  • Loading branch information
attdona committed Jun 14, 2024
1 parent 01ef6f3 commit 3d602d9
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 49 deletions.
4 changes: 1 addition & 3 deletions examples/custom_broker.jl
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,4 @@ unsubscribe(ctx, router, component, msg) = @info "[$component] unsubscribing $(m

end

Rembus.set_broker_plugin(CarontePlugin)

caronte()
caronte(plugin=CarontePlugin)
6 changes: 1 addition & 5 deletions examples/data_hierarchy.jl
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,4 @@ end

end


Rembus.set_context(Ctx())
Rembus.set_broker_plugin(CarontePlugin)

caronte(args=Dict("ws" => 8000))
caronte(plugin=CarontePlugin, context=Ctx(), args=Dict("ws" => 8000))
71 changes: 54 additions & 17 deletions src/broker.jl
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ function rpc_response(router, twin, msg)

delete!(twin.sent, msg.id)
else
@error "[$twin] unexpected response: $msg"
@debug "[$twin] unexpected response: $msg"
end
end

Expand Down Expand Up @@ -916,7 +916,7 @@ function twin_task(self, twin)
end
catch e
@error "twin_task: $e" exception = (e, catch_backtrace())
#### rethrow()
rethrow()
finally
cleanup(twin, twin.router)
if isa(twin.socket, WebSockets.WebSocket)
Expand Down Expand Up @@ -1073,7 +1073,7 @@ function caronte_reset(broker_name="caronte")
end

"""
caronte(; wait=true, args=Dict())
caronte(; wait=true, plugin=nothing, context=nothing, args=Dict())
Start the broker.
Expand Down Expand Up @@ -1920,7 +1920,7 @@ function broker(self, router)
# process control messages
!isshutdown(msg) || break

@debug "[broker] recv $(typeof(msg)): $msg"
@debug "[broker] recv [type=$(msg.ptype)]: $msg"
if isa(msg, Msg)
if msg.ptype == TYPE_PUB
if !caronte_embedded_method(router, msg.twchannel, msg.content)
Expand Down Expand Up @@ -2036,7 +2036,9 @@ function init(router)
return nothing
end

function ws_connect(twin::Twin, broker::Component, isconnected::Condition)
function ws_connect(
egress::Visor.Process, twin::Twin, broker::Component, isconnected::Condition
)
try
url = brokerurl(broker)
uri = URI(url)
Expand All @@ -2051,12 +2053,14 @@ function ws_connect(twin::Twin, broker::Component, isconnected::Condition)
twin.socket = socket
notify(isconnected)
twin_receiver(twin.router, twin)
put!(egress.inbox, "connection closed")
end, url)
elseif uri.scheme == "ws"
HTTP.WebSockets.open(socket -> begin
twin.socket = socket
notify(isconnected)
twin_receiver(twin.router, twin)
put!(egress.inbox, "connection closed")
end, url, idle_timeout=1, forcenew=true)
else
error("ws endpoint: wrong $(uri.scheme) scheme")
Expand All @@ -2067,12 +2071,12 @@ function ws_connect(twin::Twin, broker::Component, isconnected::Condition)
end
end

function wait_response(twin::Twin, msg::RembusMsg, timeout)
mid::UInt128 = msg.id
function wait_response(twin::Twin, msg::Msg, timeout)
mid::UInt128 = msg.content.id
resp_cond = Threads.Condition()
twin.out[mid] = resp_cond
t = Timer((tim) -> response_timeout(resp_cond, msg), timeout)
signal!(twin, Msg(TYPE_RPC, msg, twin))
t = Timer((tim) -> response_timeout(resp_cond, msg.content), timeout)
signal!(twin, msg)
lock(resp_cond)
res = wait(resp_cond)
unlock(resp_cond)
Expand All @@ -2081,20 +2085,53 @@ function wait_response(twin::Twin, msg::RembusMsg, timeout)
return res
end

function broker_twin(router::Router, broker_url::AbstractString)
broker = Component(broker_url)
twin = create_twin(broker.id, router)
twin.hasname = true
twin.pager = Pager(twin)
function egress_task(proc, twin::Twin, broker::Component)
isconnected = Condition()
t = Timer((tim) -> connect_timeout(twin, isconnected), connect_request_timeout())
@async ws_connect(twin, broker, isconnected)
@async ws_connect(proc, twin, broker, isconnected)
wait(isconnected)
close(t)
msg = IdentityMsg(broker.id)
wait_response(twin, msg, request_timeout())
wait_response(twin, Msg(TYPE_IDENTITY, msg, twin), request_timeout())

return twin
# The context to pass to the plugin callbacks.
twin.router.context = twin.socket

for msg in proc.inbox
if isshutdown(msg)
# close the connection
close(twin.socket)
break
else
# the only message is an error condition
error(msg)
end
end
@debug "[$proc] egress done"
end

#=
Connect this broker as a component to broker extracted from remote_url.
=#
function egress(
remote_url::AbstractString, broker_name::AbstractString="caronte"
)
proc = from("$broker_name.broker")

# setup the twin
router = proc.args[1]
broker = Component(remote_url)
twin = create_twin(broker.id, router)
twin.hasname = true
twin.pager = Pager(twin)

# start the egress process
startup(
proc.supervisor.supervisor,
process(remote_url, egress_task, args=(twin, broker), debounce_time=6)
)

return nothing
end

#=
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,20 @@
using Rembus
using Test

struct Ctx
broker::Rembus.Twin
end

module Broker

using Rembus

function expose_handler(ctx, router, component, message)
Rembus.transport_send(component, ctx.broker.socket, message)
function expose_handler(sock, router, component, message)
Rembus.transport_send(component, sock, message)
end

function subscribe_handler(ctx, router, component, message)
Rembus.transport_send(component, ctx.broker.socket, message)
function subscribe_handler(sock, router, component, message)
Rembus.transport_send(component, sock, message)
end

function reactive_handler(ctx, router, component, message)
Rembus.transport_send(component, ctx.broker.socket, message)
function reactive_handler(sock, router, component, message)
Rembus.transport_send(component, sock, message)
end


Expand Down Expand Up @@ -47,9 +43,6 @@ function run(exposer_url, secure=false)
args=Dict("broker" => "edge_broker", "ws" => 9000, "secure" => secure))
yield()

proc = from("edge_broker.broker")
router = proc.args[1]

if secure
cli_url = "wss://:8000/client"
broker_url = "wss://:8000/combo"
Expand All @@ -58,8 +51,7 @@ function run(exposer_url, secure=false)
broker_url = "ws://:8000/combo"
end

twin = Rembus.broker_twin(router, broker_url)
router.context = Ctx(twin)
Rembus.egress(broker_url, "edge_broker")

@component exposer_url
@expose foo
Expand All @@ -71,21 +63,19 @@ function run(exposer_url, secure=false)
@test res == 2

publish(cli, "subscriber", 2.0)
sleep(2)

@terminate
close(cli)

return twin
end

ENV["REMBUS_CONNECT_TIMEOUT"] = 20
ENV["REMBUS_CONNECT_TIMEOUT"] = 10

run("ws://:9000/server")
shutdown()
Visor.dump()

if Base.Sys.iswindows()
@info "Windows platform detected: skipping test_combo"
@info "Windows platform detected: skipping test_multiplexer"
else
# create keystore
test_keystore = "/tmp/keystore"
Expand All @@ -96,13 +86,14 @@ else
Base.run(`$script -k $test_keystore`)
run("wss://:9000/server", true)
shutdown()

Visor.dump()
# unsetting HTTP_CA_BUNDLE implies that ws_connect throws an error
delete!(ENV, "HTTP_CA_BUNDLE")
run("wss://:9000/server", true)

catch e
@error "[test_combo]: $e"
@error "[test_multiplexer]: $e"
showerror(stdout, e, stacktrace())
finally
shutdown()
delete!(ENV, "REMBUS_KEYSTORE")
Expand Down
73 changes: 73 additions & 0 deletions test/broker_plugin/test_multiplexer_fault.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
using Rembus
using Test

module Broker

using Rembus

function expose_handler(sock, router, component, message)
Rembus.transport_send(component, sock, message)
end

function subscribe_handler(sock, router, component, message)
Rembus.transport_send(component, sock, message)
end

function reactive_handler(sock, router, component, message)
Rembus.transport_send(component, sock, message)
end


end # module Broker

function foo(x)
@info "[foo] arg:$x"
return x + 1
end

function subscriber(x)
@info "[subscriber] arg: $x"
end

function run(exposer_url)
# main broker
caronte(
wait=false,
args=Dict("broker" => "main_broker", "reset" => true)
)
yield()

caronte(
wait=false,
plugin=Broker,
args=Dict("broker" => "edge_broker", "ws" => 9000))
yield()

# cli_url = "ws://:8000/client"
broker_url = "ws://:8000/combo"

Rembus.egress(broker_url, "edge_broker")

# shutting down the main_broker triggers an error on the edge_broker
# and the supervisor
main_broker = from("main_broker")
shutdown(main_broker)
sleep(1)

p = from(broker_url)
@test p.status === Visor.failed

# reconnect
caronte(
wait=false,
args=Dict("broker" => "main_broker", "reset" => true)
)
sleep(5)
p = from(broker_url)
@test p.status === Visor.running
end

ENV["REMBUS_CONNECT_TIMEOUT"] = 20
run("ws://:9000/server")
shutdown()
delete!(ENV, "REMBUS_CONNECT_TIMEOUT")
7 changes: 5 additions & 2 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,11 @@ const GROUP = get(ENV, "GROUP", "all")
@time @safetestset "error_plugin" begin
include("broker_plugin/test_error_plugin.jl")
end
@time @safetestset "combo" begin
include("broker_plugin/test_combo.jl")
@time @safetestset "multiplexer" begin
include("broker_plugin/test_multiplexer.jl")
end
@time @safetestset "multiplexer_fault" begin
include("broker_plugin/test_multiplexer_fault.jl")
end
@time @safetestset "broker_plugin" begin
include("broker_plugin/test_plugin.jl")
Expand Down

0 comments on commit 3d602d9

Please sign in to comment.