Skip to content

Commit

Permalink
Review
Browse files Browse the repository at this point in the history
  • Loading branch information
attdona committed Jun 21, 2024
1 parent 3a9a75d commit 910d24b
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 14 deletions.
3 changes: 2 additions & 1 deletion examples/data_hierarchy.jl → examples/hierarchy_broker.jl
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,5 @@ end

end

caronte(plugin=CarontePlugin, context=Ctx(), args=Dict("ws" => 8000))
#caronte(plugin=CarontePlugin, context=Ctx(), args=Dict("ws" => 8000))
caronte(plugin=CarontePlugin, context=Ctx())
10 changes: 3 additions & 7 deletions src/Rembus.jl
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ end

struct CloseConnection end

struct ConnectionClosed <: Exception
# Wrong tcp packet received.
struct WrongTcpPacket <: Exception
end

# A message error received from the broker.
Expand Down Expand Up @@ -999,11 +1000,6 @@ function rembus_task(pd, rb, protocol=:ws)
if isshutdown(msg)
return
elseif isa(msg, Exception)
if isa(msg, ConnectionClosed) && isconnected(rb)
@debug "[$pd] ignoring connection closed message"
continue
end
@info "[$pd] rembus task: $msg"
throw(msg)
elseif isrequest(msg)
req = msg.request
Expand Down Expand Up @@ -1057,7 +1053,7 @@ function rembus_task(pd, rb, protocol=:ws)
if isa(e, HTTP.Exceptions.ConnectError)
msg = "[$pd]: $(e.url) connection error"
else
msg = "[$pd]: $e"
msg = "[$pd] component: $e"
end

if last_error.msg !== msg
Expand Down
8 changes: 6 additions & 2 deletions src/broker.jl
Original file line number Diff line number Diff line change
Expand Up @@ -879,7 +879,7 @@ function close_is_ok(ws::WebSockets.WebSocket, e)
end

function close_is_ok(ws::TCPSocket, e)
isa(e, ConnectionClosed)
isa(e, WrongTcpPacket)
end

close_is_ok(::Nothing, e) = true
Expand Down Expand Up @@ -2049,7 +2049,11 @@ function boot(router)
return nothing
end

init_log() = logging()
function init_log()
if !haskey(ENV, "JULIA_DEBUG")
logging()
end
end

function init(router)
init_log()
Expand Down
4 changes: 2 additions & 2 deletions src/transport.jl
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,7 @@ function transport_read(sock)
len = Int(lb[1]) << 24 + Int(lb[2]) << 16 + Int(lb[3]) << 8 + lb[4]
else
@error "tcp channel invalid header value [$type]"
throw(ConnectionClosed())
throw(WrongTcpPacket())
end
payload = read(sock, len)
@rawlog("in: $payload")
Expand All @@ -958,7 +958,7 @@ function isconnectionerror(ws::WebSockets.WebSocket, e)
end

function isconnectionerror(ws, e)
return isa(e, EOFError) || isa(e, Base.IOError) || isa(e, ConnectionClosed)
return isa(e, EOFError) || isa(e, Base.IOError) || isa(e, WrongTcpPacket)
end

Base.isopen(ws::WebSockets.WebSocket) = Base.isopen(ws.io)
2 changes: 1 addition & 1 deletion test/integration/test_rembus_task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ function run()

proc = from("rembus")

Rembus.processput!(proc, Rembus.ConnectionClosed())
Rembus.processput!(proc, Rembus.WrongTcpPacket())
sleep(0.1)
Rembus.processput!(proc, ErrorException("boom"))
sleep(1)
Expand Down
5 changes: 4 additions & 1 deletion test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,12 @@ const GROUP = get(ENV, "GROUP", "all")
end
end
if GROUP == "all" || GROUP == "tcp"
@time @safetestset "zmq" begin
@time @safetestset "tcp" begin
include("tcp/test_tcp.jl")
end
@time @safetestset "wrong_response" begin
include("tcp/test_wrong_response.jl")
end
end
if GROUP == "all" || GROUP == "broker_plugin"
@time @safetestset "error_plugin" begin
Expand Down
25 changes: 25 additions & 0 deletions test/tcp/test_wrong_response.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
include("../utils.jl")

function run()
cid = "myc"
@component "tcp://:8001/$cid"

@rpc version()

twin = from("$BROKER_NAME.twins.$cid")

socket = twin.args[1].socket

# send a wrong packet
write(socket, UInt8[1, 2, 3])

# the connection is closed
@test eof(socket)

sleep(3)
# reconnected
@test isopen(twin.args[1].socket)

end

execute(run, "test_wrong_response")

0 comments on commit 910d24b

Please sign in to comment.