Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
attdona committed Jun 6, 2024
1 parent 0f040ae commit 15b2ca8
Show file tree
Hide file tree
Showing 34 changed files with 254 additions and 180 deletions.
7 changes: 2 additions & 5 deletions src/Rembus.jl
Original file line number Diff line number Diff line change
Expand Up @@ -342,11 +342,7 @@ end

Base.show(io::IO, call::CastCall) = print(io, call.topic)

rembus_dir() = joinpath(CONFIG.home, ".config", "rembus")

broker_dir() = CONFIG.db

keystore_dir() = get(ENV, "REMBUS_KEYSTORE", joinpath(broker_dir(), "keystore"))
rembus_dir() = joinpath(CONFIG.root_dir, "rembus")

request_timeout() = parse(Float32, get(ENV, "REMBUS_TIMEOUT", "10"))

Expand Down Expand Up @@ -1482,6 +1478,7 @@ function authenticate(rb)
msg = IdentityMsg(rb.client.id)
response = response_or_timeout(rb, msg, request_timeout())
if (response.status == STS_GENERIC_ERROR)
close(rb.socket)
throw(AlreadyConnected(rb.client.id))
elseif (response.status == STS_CHALLENGE)
msg = attestate(rb, response)
Expand Down
66 changes: 35 additions & 31 deletions src/broker.jl
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ mutable struct Embedded <: AbstractRouter
topic_function::Dict{String,Function}
id_twin::Dict{String,Twin}
context::Any
process::Visor.Process
process::Visor.Supervisor
ws_server::Sockets.TCPServer
Embedded(context=nothing) = new(Dict(), Dict(), context)
end
Expand Down Expand Up @@ -189,7 +189,8 @@ function create_twin(id, router::Embedded, queue=Queue{PubSubMsg}())
else
twin = Twin(router, id, Channel())
spec = process(id, twin_task, args=(twin,))
startup(from("server.twins"), spec)
sv = router.process
startup(Visor.from_supervisor(sv, "twins"), spec)
router.id_twin[id] = twin
return twin
end
Expand Down Expand Up @@ -282,7 +283,7 @@ end
function verify_signature(twin, msg)
challenge = pop!(twin.session, "challenge")
@debug "verify signature, challenge $challenge"
file = pubkey_file(msg.cid)
file = pubkey_file(twin.router, msg.cid)

try
ctx = MbedTLS.parse_public_keyfile(file)
Expand Down Expand Up @@ -428,15 +429,15 @@ function register(router, twin, msg)
if token === nothing
sts = STS_GENERIC_ERROR
reason = "wrong pin"
elseif isregistered(msg.cid)
elseif isregistered(router, msg.cid)
sts = STS_NAME_ALREADY_TAKEN
reason = "name $(msg.cid) not available for registration"
else
save_pubkey(msg.cid, msg.pubkey)
save_pubkey(router, msg.cid, msg.pubkey)
if !(msg.cid in router.component_owner.component)
push!(router.component_owner, [msg.userid, msg.cid])
end
save_token_app(router.component_owner)
save_token_app(router, router.component_owner)
end
response = ResMsg(msg.id, sts, reason)
#@mlog("[$twin] -> $response")
Expand All @@ -460,9 +461,9 @@ function unregister(router, twin, msg)
sts = STS_GENERIC_ERROR
reason = "invalid cid"
else
remove_pubkey(msg.cid)
remove_pubkey(router, msg.cid)
deleteat!(router.component_owner, router.component_owner.component .== msg.cid)
save_token_app(router.component_owner)
save_token_app(router, router.component_owner)
end
response = ResMsg(msg.id, sts, reason)
#@mlog("[$twin] -> $response")
Expand Down Expand Up @@ -778,7 +779,7 @@ function zeromq_receiver(router::Router)
if isa(msg, IdentityMsg)
@debug "[$twin] auth identity: $(msg.cid)"
# check if cid is registered
rembus_login = isfile(key_file(msg.cid))
rembus_login = isfile(key_file(router, msg.cid))
if rembus_login
# authentication mode, send the challenge
response = challenge(router, twin, msg)
Expand All @@ -794,7 +795,7 @@ function zeromq_receiver(router::Router)
# broker restarted
# start the authentication flow if cid is registered
@debug "lost connection to broker: restarting $(msg.cid)"
rembus_login = isfile(key_file(msg.cid))
rembus_login = isfile(key_file(router, msg.cid))
if rembus_login
if haskey(twin.session, "challenge")
# challenge already sent
Expand Down Expand Up @@ -869,7 +870,7 @@ end

close_is_ok(::Nothing, e) = true

page_file(twin) = joinpath(twins_dir(), twin.id, string(twin.pager.ts))
page_file(twin) = joinpath(twins_dir(twin.router), twin.id, string(twin.pager.ts))

#=
detach(twin)
Expand Down Expand Up @@ -1059,10 +1060,11 @@ function command_line()
return parse_args(s)
end

function caronte_reset()
rm(twins_dir(), force=true, recursive=true)
if isdir(root_dir())
foreach(rm, filter(isfile, readdir(root_dir(), join=true)))
function caronte_reset(broker_name="caronte")
rm(twins_dir(broker_name), force=true, recursive=true)
bdir = broker_dir(broker_name)
if isdir(bdir)
foreach(rm, filter(isfile, readdir(bdir, join=true)))
end
end

Expand All @@ -1080,15 +1082,15 @@ function caronte(; wait=true, plugin=nothing, context=nothing, args=Dict())
args = command_line()
end

sv_name = get(args, "sv_name", "caronte")
sv_name = get(args, "broker", "caronte")

setup(CONFIG, sv_name)
setup(CONFIG)
if haskey(args, "debug") && args["debug"] === true
CONFIG.debug = true
end

if haskey(args, "reset") && args["reset"] === true
Rembus.caronte_reset()
Rembus.caronte_reset(sv_name)
end

issecure = get(args, "secure", false)
Expand Down Expand Up @@ -1152,9 +1154,9 @@ Start an embedded server and accept connections.
"""
function serve(
server::Embedded, port=parse(UInt16, get(ENV, "BROKER_WS_PORT", "8000")),
; wait=true, secure=false
; wait=true, name="server", secure=false
)
embedded_sv = from("server")
embedded_sv = from(name)
if embedded_sv === nothing
# first server process
setup(CONFIG)
Expand All @@ -1170,7 +1172,7 @@ function serve(
),
]
sv = supervise(
[supervisor("server", tasks, strategy=:one_for_one)],
[supervisor(name, tasks, strategy=:one_for_one)],
intensity=5,
wait=wait
)
Expand All @@ -1185,6 +1187,7 @@ function serve(
Visor.add_node(embedded_sv, p)
Visor.start(p)
end
server.process = from(name)
return nothing
end

Expand Down Expand Up @@ -1243,7 +1246,7 @@ function identity_check(router, twin, msg; paging=true)
respond(router, ResMsg(msg.id, STS_GENERIC_ERROR, "already connected"), twin)
else
# check if cid is registered
rembus_login = isfile(key_file(msg.cid))
rembus_login = isfile(key_file(router, msg.cid))
if rembus_login
# authentication mode, send the challenge
response = challenge(router, twin, msg)
Expand Down Expand Up @@ -1300,8 +1303,8 @@ function client_receiver(router::Embedded, ws)
return nothing
end

function secure_config()
trust_store = keystore_dir()
function secure_config(router)
trust_store = keystore_dir(router)

entropy = MbedTLS.Entropy()
rng = MbedTLS.CtrDrbg()
Expand Down Expand Up @@ -1345,7 +1348,7 @@ function serve_ws(td, router, port, issecure=false)
sslconfig = nothing
try
if issecure
sslconfig = secure_config()
sslconfig = secure_config(router)
end

listener(td, port, router, sslconfig)
Expand Down Expand Up @@ -1401,7 +1404,7 @@ function serve_tcp(pd, router, caronte_port, issecure=false)

if issecure
proto = "tls"
sslconfig = secure_config()
sslconfig = secure_config(router)
end

server = Sockets.listen(Sockets.InetAddr(parse(IPAddr, IP), caronte_port))
Expand Down Expand Up @@ -1781,16 +1784,17 @@ end
Setup the router.
=#
function boot(router)
if !isdir(CONFIG.db)
mkpath(CONFIG.db)
dir = broker_dir(router)
if !isdir(dir)
mkpath(dir)
end

appdir = keys_dir()
appdir = keys_dir(router)
if !isdir(appdir)
mkdir(appdir)
end

twin_dir = twins_dir()
twin_dir = twins_dir(router)
if !isdir(twin_dir)
mkdir(twin_dir)
end
Expand All @@ -1804,7 +1808,7 @@ init_log() = logging()
function init(router)
init_log()
boot(router)
@debug "broker datadir: $(CONFIG.db)"
@debug "broker datadir: $(broker_dir(router))"

if router.plugin !== nothing
if isdefined(router.plugin, :park) &&
Expand Down
22 changes: 8 additions & 14 deletions src/configuration.jl
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ mutable struct Settings
zmq_ping_interval::Float32
ws_ping_interval::Float32
balancer::String
home::String
db::String
root_dir::String
log::String
debug::Bool
overwrite_connection::Bool
Expand All @@ -38,21 +37,17 @@ mutable struct Settings
broker_plugin::Union{Nothing,Module}
context::Any
page_size::UInt
Settings(rootdir=nothing) = begin
if rootdir === nothing
if Sys.iswindows()
home = get(ENV, "USERPROFILE", ".")
else
home = get(ENV, "HOME", ".")
end
Settings() = begin
if Sys.iswindows()
home = get(ENV, "USERPROFILE", ".")
else
home = rootdir
home = get(ENV, "HOME", ".")
end

zmq_ping_interval = 0
ws_ping_interval = 0
balancer = "first_up"
db = joinpath(home, ".config", "caronte")
root_dir = joinpath(home, ".config")
log = "stdout"
overwrite_connection = true
stacktrace = false
Expand All @@ -62,7 +57,7 @@ mutable struct Settings
connection_retry_period = 2.0
debug = false
page_size = get(ENV, "REMBUS_PAGE_SIZE", REMBUS_PAGE_SIZE)
new(zmq_ping_interval, ws_ping_interval, balancer, home, db, log, debug,
new(zmq_ping_interval, ws_ping_interval, balancer, root_dir, log, debug,
overwrite_connection, stacktrace, metering, rawdump, cid,
connection_retry_period, nothing, nothing, page_size)
end
Expand All @@ -71,7 +66,6 @@ end
set_balancer(policy::AbstractString) = set_balancer(CONFIG, policy)

function set_balancer(setting, policy)
#balancer = get(cfg, "balancer", get(ENV, "BROKER_BALANCER", "first_up"))
if !(policy in ["first_up", "less_busy", "round_robin"])
error("wrong balancer, must be one of first_up, less_busy, round_robin")
end
Expand All @@ -89,7 +83,7 @@ function setup(setting)
setting.ws_ping_interval = get(cfg, "ws_ping_interval",
parse(Float32, get(ENV, "REMBUS_WS_PING_INTERVAL", "120")))

setting.db = get(cfg, "db", get(ENV, "BROKER_DIR", setting.db))
setting.root_dir = get(cfg, "root_dir", get(ENV, "REMBUS_ROOT_DIR", setting.root_dir))
setting.log = get(cfg, "log", get(ENV, "BROKER_LOG", "stdout"))
setting.overwrite_connection = get(cfg, "overwrite_connection", true)
setting.stacktrace = get(cfg, "stacktrace", false)
Expand Down
4 changes: 3 additions & 1 deletion src/register.jl
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ function register(cid::AbstractString, userid::AbstractString, pin::AbstractStri

msg = Register(msgid, cmp.id, userid, pubkey)
response = wait_response(rb, msg, request_timeout())
if (response.status != STS_SUCCESS)
if isa(response, RembusTimeout)
rembuserror(code=STS_TIMEOUT)
elseif (response.status != STS_SUCCESS)
rembuserror(code=response.status, reason=response.data)
end
# finally save the key
Expand Down
Loading

0 comments on commit 15b2ca8

Please sign in to comment.