Skip to content

Commit

Permalink
Work in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
attdona committed May 8, 2024
1 parent 05149ac commit efee234
Show file tree
Hide file tree
Showing 14 changed files with 248 additions and 84 deletions.
56 changes: 20 additions & 36 deletions src/Rembus.jl
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,10 @@ request_timeout() = parse(Float32, get(ENV, "REMBUS_TIMEOUT", "5"))
getcomponent() = Component(Rembus.CONFIG.cid)

function name2proc(name::AbstractString, startproc=false, setanonymous=false)
cmp = Component(name)
if from(cmp.id) === nothing
throw(ErrorException("unknown process $(cmp.id)"))
end
return name2proc(Component(name), startproc, setanonymous)
end

Expand All @@ -373,8 +377,6 @@ function name2proc(cmp::Component, startproc=false, setanonymous=false)
if proc === nothing
if setanonymous && CONFIG.cid == "rembus"
proc = startup(rembus())
else
throw(Visor.UnknownProcess(cmp.id))
end
end
if startproc && !isdefined(proc, :task)
Expand Down Expand Up @@ -403,6 +405,8 @@ macro component(name)
end
end

Visor.shutdown(nothing) = nothing

"""
@terminate
Expand Down Expand Up @@ -1195,7 +1199,6 @@ function handle_input(rb, msg)
yield()

while notify(rb.out[msg.id], msg) == 0
@info "$msg: notifying too early"
sleep(0.001)
end
else
Expand Down Expand Up @@ -1240,7 +1243,7 @@ function parse_msg(rb, response)
msg = connected_socket_load(response)
handle_input(rb, msg)
catch e
@error "parse_msg: $e\n"
@error "message decoding: $e"
@showerror e
end

Expand Down Expand Up @@ -1296,10 +1299,6 @@ function read_socket(socket, process, rb, isconnected::Condition)
end

function write_task(rb::RBConnection)
# manage reconnections events
if !isopen(rb.msgch)
rb.msgch = Channel(MESSAGE_CHANNEL_SZ)
end

for msg in rb.msgch
if isa(msg, CloseConnection)
Expand All @@ -1314,6 +1313,8 @@ function write_task(rb::RBConnection)
end
catch e
@warn "[$(rb.client.id)] close: $e"
finally
rb.socket = nothing
end
close(rb.msgch)
else
Expand Down Expand Up @@ -1361,7 +1362,7 @@ function zmq_receive(rb)
if !isopen(rb.socket)
break
else
@error "[zmq_receive] error: $e"
@error "zmq message decoding: $e"
@showerror e
end
end
Expand All @@ -1373,17 +1374,9 @@ function zmq_connect(rb)
rb.context = ZMQ.Context()
rb.socket = ZMQ.Socket(rb.context, DEALER)
rb.socket.linger = 1
try
url = brokerurl(rb.client)
ZMQ.connect(rb.socket, url)
@async zmq_receive(rb)
catch e
@showerror e
close(rb.socket)
close(rb.context)
rethrow()
end

url = brokerurl(rb.client)
ZMQ.connect(rb.socket, url)
@async zmq_receive(rb)
return nothing
end

Expand Down Expand Up @@ -1440,16 +1433,6 @@ function pkfile(name)
return joinpath(cfgdir, name)
end

function loadkey(name::AbstractString)
file = pkfile(name)
@debug "keyfile: $file"
if isfile(file)
return MbedTLS.parse_keyfile(file)
end

return missing
end

function resend_attestate(rb, response)
try
msg = attestate(rb, response)
Expand Down Expand Up @@ -1526,6 +1509,11 @@ function connect_timeout(rb, isconnected)
end

function _connect(rb, process)
# manage reconnections events
if !isopen(rb.msgch)
rb.msgch = Channel(MESSAGE_CHANNEL_SZ)
end

if rb.client.protocol === :ws || rb.client.protocol === :wss
isconnected = Condition()
t = Timer((tim) -> connect_timeout(rb, isconnected), request_timeout())
Expand Down Expand Up @@ -1582,7 +1570,7 @@ end

function isconnected(rb::RBConnection)
if rb.socket === nothing
false
return false
else
if isa(rb.socket, WebSockets.WebSocket)
return isopen(rb.socket.io)
Expand Down Expand Up @@ -1701,11 +1689,7 @@ function connect(rb::RBPool)
try
connect(c)
catch e
if isa(e, RembusError)
@warn "error: $e"
else
@warn "[$(c.client.id)] connection failed: $e"
end
@warn "[$(c.client.id)] error: $e"
end
end

Expand Down
17 changes: 5 additions & 12 deletions src/broker.jl
Original file line number Diff line number Diff line change
Expand Up @@ -383,8 +383,7 @@ function attestation(router::Embedded, twin, msg)

response = ResMsg(msg.id, sts, reason)
#@mlog("[$twin] -> $response")
#transport_send(twin, twin.sock, response)
put!(twin.process.inbox, response)
transport_send(twin, twin.sock, response)
if sts !== STS_SUCCESS
detach(twin)
end
Expand Down Expand Up @@ -578,10 +577,8 @@ function receiver_exception(router, twin, e)
end
elseif isa(e, InterruptException)
rethrow()
elseif isa(e, ArgumentError)
@error "[$twin] invalid message format: $e"
else
@error "[$twin] internal error: $e"
@error "[$twin] receiver error: $e"
end
end

Expand Down Expand Up @@ -814,12 +811,8 @@ Disconnect the twin from the ws/tcp channel.
=#
function detach(twin)
if twin.sock !== nothing
try
if !isa(twin.sock, ZMQ.Socket)
close(twin.sock)
end
catch e
@debug "error closing websocket: $e"
if !isa(twin.sock, ZMQ.Socket)
close(twin.sock)
end
twin.sock = nothing
end
Expand Down Expand Up @@ -872,7 +865,7 @@ function handle_ack_timeout(tim, twin, msg, msgid)
try
twin.router.park(CONFIG.broker_ctx, twin, msg)
catch e
@error "[$twin] ack_timeout: $e"
@error "[$twin] park (ack timeout): $e"
end
end
delete!(twin.acktimer, msgid)
Expand Down
36 changes: 8 additions & 28 deletions src/transport.jl
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ The decoding is performed at the client side.
function zmq_load(socket::ZMQ.Socket)

pkt = zmq_message(socket)

header = pkt.header
data::Vector{UInt8} = pkt.data

Expand Down Expand Up @@ -54,7 +53,10 @@ function zmq_load(socket::ZMQ.Socket)
catch e
return ResMsg(id, STS_GENERIC_ERROR, "$e", flags)
end
else
throw(ErrorException("unknown packet type $ptype"))
end

msg
end

Expand Down Expand Up @@ -491,7 +493,11 @@ data2message(data::ZMQ.Message) = Vector{UInt8}(data)

transport_send(twin::Twin, ::Nothing, msg) = error("$twin connection closed")

function transport_send(twin::Twin, ws::Union{WebSockets.WebSocket,TCPSocket}, msg::PubSubMsg)
function transport_send(
twin::Twin,
ws::Union{WebSockets.WebSocket,TCPSocket},
msg::PubSubMsg
)
if twin.qos === with_ack

msg.flags |= ACK_FLAG
Expand Down Expand Up @@ -856,32 +862,6 @@ function transport_write(sock, llmsg)
tcp_write(sock, payload)
end

##function transport_read(sock::MbedTLS.SSLContext)
## while true
## headers = read(sock, 1)
## if isempty(headers)
## MbedTLS.ssl_session_reset(sock)
## throw(ConnectionClosed())
## end
## type = headers[1]
## if type === HEADER_LEN1
## len = read(sock, 1)[1]
## elseif type === HEADER_LEN2
## lb = read(sock, 2)
## len = Int(lb[1]) << 8 + lb[2]
## elseif type === HEADER_LEN4
## lb = read(sock, 4)
## len = Int(lb[1]) << 24 + Int(lb[2]) << 16 + Int(lb[3]) << 8 + lb[4]
## else
## @error "tcp channel invalid header format"
## throw(ConnectionClosed())
## end
## payload = read(sock, len)
## @rawlog("in: $payload")
## return payload
## end
##end

function transport_read(sock)
headers = read(sock, 1)
if isempty(headers)
Expand Down
14 changes: 9 additions & 5 deletions test/api/test_component.jl
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,14 @@ function atopic(ctx, x)
ctx.data = x
end

function aservice(ctx, x, y)
return x + y
end

function run()
ctx = Ctx(nothing)

@component "mycomponent"
sleep(1)

@test_throws ErrorException @subscribe atopic invalid_mode

@subscribe atopic
@shared ctx
@reactive
Expand All @@ -37,7 +36,7 @@ function run()

@unsubscribe atopic

@expose aservice
@expose aservice(ctx, x, y) = x + y

res = rpc(rb, "aservice", [1, 2])
@test res == 3
Expand All @@ -46,6 +45,11 @@ function run()
@test_throws RpcMethodUnavailable rpc(rb, "aservice", [1, 2])

close(rb)

# test unknown process
#@test_throws Visor.UnknownProcess @publish "fabulous" foo()
@publish "fabulous" foo()

end

execute(run, "test_component")
10 changes: 9 additions & 1 deletion test/broker_plugin/test_plugin.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ test_name = "test_plugin"

# set a mismatched shared secret
function init(ok_cid, ko_cid)
mkpath(Rembus.keys_dir())
# component side
for cid in [ok_cid, ko_cid]
pkfile = Rembus.pkfile(cid)
Expand All @@ -17,7 +18,6 @@ function init(ok_cid, ko_cid)
open(fn, create=true, write=true) do f
write(f, "aaa")
end

fn = Rembus.key_file(ko_cid)
open(fn, create=true, write=true) do f
write(f, "bbb")
Expand All @@ -31,6 +31,7 @@ using Rembus #needed for session()

export add_interest
export challenge
export save_configuration
export login
export myfunction

Expand Down Expand Up @@ -65,6 +66,10 @@ function unpark(ctx, twin, msg)
@info "[$twin] park: $msg"
end

function save_configuration(ctx, router)
@info "CarontePlugin::save_configuration"
end

#
# Called when the twin startup.
#
Expand All @@ -84,6 +89,9 @@ function test_plugin_topic()
end

function run(ok_cid, ko_cid)
# wait for secret files creation
sleep(1)

# store test related info
ctx = Dict()

Expand Down
38 changes: 38 additions & 0 deletions test/connect/test_reconnect.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
include("../utils.jl")

mutable struct Ctx
data::Any
end

function foo(ctx, x)
@info "[test_simple_publish] foo=$x"
ctx.data = x
end

function bar(ctx, x)
@info "[test_reconnect] bar: $x"
end

function run()
try
@component "myserver"
@subscribe foo
@expose bar
@reactive

myserver = from("myserver")

# force a process restart
Rembus.processput!(myserver, ErrorException("boom"))
sleep(2)
res = @rpc version()
@info "version: $res"
@test isa(res, String)
catch e
@error "[test_reconnect] error: $e"
@test false
end
@info "shutting down"
end

execute(run, "test_reconnect")
1 change: 1 addition & 0 deletions test/coverage.jl
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ for dir in [
"test/auth",
"test/connect",
"test/embedded",
"test/errors",
"test/integration",
"test/park",
"test/private",
Expand Down
Loading

0 comments on commit efee234

Please sign in to comment.