Skip to content

Commit

Permalink
Tests
Browse files Browse the repository at this point in the history
  • Loading branch information
attdona committed May 3, 2024
1 parent c8f7367 commit 6bc7c5e
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 10 deletions.
30 changes: 21 additions & 9 deletions src/Rembus.jl
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ function holder_expr(shared, cid=getcomponent())
))
ex.args[3].args[2] = shared
ex.args[2].args[2] = cid
ex
return ex
end

"""
Expand Down Expand Up @@ -497,7 +497,7 @@ function publish_expr(topic, cid=getcomponent())

args = topic.args[2:end]
ext.args[3].args[3].args = args
ext
return ext
end

"""
Expand Down Expand Up @@ -551,7 +551,7 @@ function rpc_expr(topic, cid=getcomponent())

args = topic.args[2:end]
ext.args[3].args[3].args = args
ext
return ext
end

"""
Expand Down Expand Up @@ -619,7 +619,7 @@ function expose_expr(fn, cid=getcomponent())
))
ex.args[3].args[2] = fnname(fn)
ex.args[2].args[2] = cid
ex
return ex
end

function subscribe_expr(fn, mode::Symbol, cid=getcomponent())
Expand All @@ -639,7 +639,7 @@ function subscribe_expr(fn, mode::Symbol, cid=getcomponent())
))
ex.args[3].args[2] = fnname(fn)
ex.args[2].args[2] = cid
ex
return ex
end

"""
Expand Down Expand Up @@ -851,7 +851,7 @@ function reactive_expr(reactive, cid=nothing)
timeout=Rembus.request_timeout()
))
ex.args[2].args[2] = id
ex
return ex
end

function enable_ack_expr(enable, cid=nothing)
Expand All @@ -866,7 +866,7 @@ function enable_ack_expr(enable, cid=nothing)
timeout=Rembus.request_timeout()
))
ex.args[2].args[2] = id
ex
return ex
end

"""
Expand Down Expand Up @@ -922,7 +922,7 @@ This feature assure that messages get delivered at least one to the
subscribed component.
=#
macro disable_ack(cid=nothing)
ex = disable_ack_expr(false, cid)
ex = enable_ack_expr(false, cid)
quote
$(esc(ex))
nothing
Expand Down Expand Up @@ -1054,6 +1054,12 @@ function rembus_task(pd, rb, protocol=:ws)
)
elseif isa(req, RemoveInterest)
result = unsubscribe(rb, string(msg.request.fn), exceptionerror=false)
elseif isa(req, EnableAck)
if req.status
result = enable_ack(rb, exceptionerror=false)
else
result = disable_ack(rb, exceptionerror=false)
end
elseif isa(req, Reactive)
if req.status
result = reactive(rb, exceptionerror=false)
Expand Down Expand Up @@ -1588,6 +1594,7 @@ function rembus_block_write(rb::RBHandle, msg, cond)
return nothing
end

#=
function configure(rb::RBHandle, retroactives=Dict(), interests=Dict(), impls=Dict())
for (topic, fn) in retroactives
subscribe(rb, topic, fn, true)
Expand All @@ -1601,6 +1608,7 @@ function configure(rb::RBHandle, retroactives=Dict(), interests=Dict(), impls=Di
return rb
end
=#

function isconnected(rb::RBConnection)
if rb.socket === nothing
Expand Down Expand Up @@ -1739,6 +1747,7 @@ function connect(urls::Vector)
return connect(pool)
end

#=
function login(rb::RBHandle, cid::AbstractString, secret::AbstractString)
try
challenge = rpc(rb, "challenge")
Expand All @@ -1751,6 +1760,7 @@ function login(rb::RBHandle, cid::AbstractString, secret::AbstractString)
return nothing
end
=#

function Base.close(rb::RBPool)
for c in rb.connections
Expand All @@ -1769,11 +1779,13 @@ function Base.close(rb::RBConnection)
return nothing
end

#=
function assert_rembus(process::Visor.Process)
if length(process.args) == 0 || !isa(process.args[1], RBHandle)
throw(ErrorException("invalid $process process: not a rembus process"))
end
end
=#

function enable_debug(rb::RBHandle; exceptionerror=true)
return rpcreq(
Expand Down Expand Up @@ -1831,7 +1843,7 @@ function disable_ack(rb::RBHandle; exceptionerror=true)
)
end

function enable_ack(rb::RBHandle, timeout=5; exceptionerror=true)
function enable_ack(rb::RBHandle; exceptionerror=true)
return rpcreq(
rb,
AdminReqMsg(BROKER_CONFIG, Dict(COMMAND => ENABLE_ACK_CMD)),
Expand Down
2 changes: 2 additions & 0 deletions test/api/test_mixed.jl
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ function run()

@subscribe "subscriber" mymethod
@shared "subscriber" bag
@enable_ack "subscriber"
@reactive "subscriber"

invalue = "pippo"
Expand All @@ -40,6 +41,7 @@ function run()
@test bag.request_arg == invalue

@unexpose "exposer" mymethod
@disable_ack "subscriber"

@terminate "client"
@terminate "subscriber"
Expand Down
2 changes: 1 addition & 1 deletion test/api/test_types.jl
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ function run()
@publish requestor bar(msg)
end

sleep(1)
sleep(1.5)
@test bag.handler_called == length(bag.valuemap)
end

Expand Down

0 comments on commit 6bc7c5e

Please sign in to comment.