Skip to content

Commit

Permalink
router: improve master connection parallelism in map-reduce
Browse files Browse the repository at this point in the history
  • Loading branch information
darthunix authored and Gerold103 committed Sep 18, 2024
1 parent e89f444 commit e48b75d
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 7 deletions.
1 change: 1 addition & 0 deletions vshard/replicaset.lua
Original file line number Diff line number Diff line change
Expand Up @@ -1225,6 +1225,7 @@ local replicaset_mt = {
up_replica_priority = replicaset_up_replica_priority;
wait_connected = replicaset_wait_connected,
wait_connected_all = replicaset_wait_connected_all,
wait_master = replicaset_wait_master,
call = replicaset_master_call;
callrw = replicaset_master_call;
callro = replicaset_template_multicallro(false, false);
Expand Down
71 changes: 64 additions & 7 deletions vshard/router/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -804,6 +804,47 @@ local function router_call(router, bucket_id, opts, ...)
...)
end

--
-- Wait until all masters in the list are connected.
--
-- @param replicasets List of replicasets.
-- @param timeout Timeout in seconds.
--
-- @return In case of success - remaining timeout.
--
-- @return In case of error - nil, err, UUID of the failed replicaset.
--
local function wait_connected_to_masters(replicasets, timeout)
local master, id
local err, err_id

-- Start connecting to all masters in parallel.
local deadline = fiber_clock() + timeout
for _, replicaset in pairs(replicasets) do
master, err = replicaset:wait_master(timeout)
id = replicaset.id
if not master then
err_id = id
goto fail
end
replicaset:connect_replica(master)
timeout = deadline - fiber_clock()
end

-- Wait until all connections are established.
for _, replicaset in pairs(replicasets) do
timeout, err = replicaset:wait_connected(timeout)
if not timeout then
err_id = replicaset.id
goto fail
end
end
do return timeout end

::fail::
return nil, err, err_id
end

--
-- Consistent Map-Reduce. The given function is called on all masters in the
-- cluster with a guarantee that in case of success it was executed with all
Expand Down Expand Up @@ -868,14 +909,17 @@ local function router_map_callrw(router, func, args, opts)
--
-- Ref stage: send.
--
-- Netbox async requests work only with active connections. Need to wait
-- for the connection explicitly.
local rs_list = table_new(0, #replicasets)
for _, rs in pairs(replicasets) do
table.insert(rs_list, rs)
end
timeout, err, err_id = wait_connected_to_masters(rs_list, timeout)
if not timeout then
goto fail
end
for id, rs in pairs(replicasets) do
-- Netbox async requests work only with active connections. Need to wait
-- for the connection explicitly.
timeout, err = rs:wait_connected(timeout)
if timeout == nil then
err_id = id
goto fail
end
res, err = rs:callrw('vshard.storage._call',
{'storage_ref', rid, timeout}, opts_ref)
if res == nil then
Expand Down Expand Up @@ -1038,6 +1082,7 @@ local function router_map_part_callrw(router, bucket_ids, func, args, opts)
local grouped_buckets
local err, err_id, res, ok, map
local call_args
local rs_list
local replicasets = {}
local preallocated = false
local futures = {}
Expand All @@ -1060,6 +1105,18 @@ local function router_map_part_callrw(router, bucket_ids, func, args, opts)
end
timeout = deadline - fiber_clock()

-- Netbox async requests work only with active connections.
-- So, first need to wait for the master connection explicitly.
rs_list = table_new(0, #grouped_buckets)
for uuid, _ in pairs(grouped_buckets) do
replicaset = router.replicasets[uuid]
table.insert(rs_list, replicaset)
end
timeout, err, err_id = wait_connected_to_masters(rs_list, timeout)
if not timeout then
goto fail
end

-- Send ref requests with timeouts to the replicasets.
futures = table_new(0, #grouped_buckets)
for id, buckets in pairs(grouped_buckets) do
Expand Down

0 comments on commit e48b75d

Please sign in to comment.