Skip to content

Commit

Permalink
storage: fix send/recv unprotected when 2 at once
Browse files Browse the repository at this point in the history
If more than one bucket_send/recv would try to start on the same
bucket on the same storage, it could lead to the bucket recovery
or GC when there would be no need for it. Data couldn't be lost,
and it wouldn't occur during automatic rebalancing, but manual
usage of those functions could fail when it shouldn't have.

Closes #434

NO_DOC=bugfix
  • Loading branch information
Gerold103 committed Nov 3, 2023
1 parent 5cb1df1 commit 2e3c583
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 7 deletions.
85 changes: 85 additions & 0 deletions test/storage-luatest/storage_1_1_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -261,3 +261,88 @@ test_group.test_on_bucket_event = function(g)
box.space.data2:drop()
end)
end

--
-- gh-434: bucket_send() shouldn't change the transfer flags if a transfer for
-- the same bucket is already in progress.
--
test_group.test_bucket_double_send = function(g)
g.replica_2_a:exec(function()
ivshard.storage.internal.errinj.ERRINJ_LAST_RECEIVE_DELAY = true
end)
local bid = g.replica_1_a:exec(function(uuid)
local transfer_flags =
ivshard.storage.internal.rebalancer_transfering_buckets
local bid = _G.get_first_bucket()
local f = ifiber.create(ivshard.storage.bucket_send, bid, uuid,
{timeout = 100000})
f:set_joinable(true)
rawset(_G, 'test_f', f)
ilt.assert(transfer_flags[bid])
local ok, err = ivshard.storage.bucket_send(bid, uuid)
ilt.assert_equals(err.code, iverror.code.WRONG_BUCKET)
ilt.assert(not ok)
-- Before the bug was fixed, the second send would clear the flag, thus
-- leaving the first sending unprotected.
ilt.assert(transfer_flags[bid])
return bid
end, {g.replica_2_a:replicaset_uuid()})
g.replica_2_a:exec(function()
ivshard.storage.internal.errinj.ERRINJ_LAST_RECEIVE_DELAY = false
end)
g.replica_1_a:exec(function(bid)
local transfer_flags =
ivshard.storage.internal.rebalancer_transfering_buckets
local f = _G.test_f
_G.test_f = nil
local f_ok, ok, err = f:join(iwait_timeout)
ilt.assert_equals(err, nil)
ilt.assert(f_ok)
ilt.assert(ok)
ilt.assert(not transfer_flags[bid])
_G.bucket_gc_wait()
end, {bid})
--
-- Cleanup.
--
g.replica_2_a:exec(function(bid, uuid)
local ok, err = ivshard.storage.bucket_send(bid, uuid,
{timeout = iwait_timeout})
ilt.assert_equals(err, nil)
ilt.assert(ok)
_G.bucket_gc_wait()
end, {bid, g.replica_1_a:replicaset_uuid()})
end

--
-- gh-434: bucket_recv() shouldn't change the transfer flags if a transfer for
-- the same bucket is already in progress.
--
test_group.test_bucket_double_recv = function(g)
g.replica_2_a:exec(function(bid, uuid)
local transfer_flags =
ivshard.storage.internal.rebalancer_transfering_buckets
local f = ifiber.create(ivshard.storage.bucket_recv, bid, uuid, {})
f:set_joinable(true)
ilt.assert(transfer_flags[bid])
local ok, err = ivshard.storage.bucket_recv(bid, uuid, {})
-- Before the bug was fixed, the second recv would clear the flag, thus
-- leaving the first recv unprotected.
ilt.assert(transfer_flags[bid])
ilt.assert_equals(err.code, iverror.code.WRONG_BUCKET)
ilt.assert(not ok)
local f_ok
f_ok, ok, err = f:join(iwait_timeout)
ilt.assert(not transfer_flags[bid])
ilt.assert_equals(err, nil)
ilt.assert(f_ok)
ilt.assert(ok)
--
-- Cleanup.
--
_G.bucket_recovery_wait()
_G.bucket_gc_wait()
ilt.assert_equals(box.space._bucket:get{bid}, nil)
end, {vtest.storage_first_bucket(g.replica_1_a),
g.replica_1_a:replicaset_uuid()})
end
36 changes: 29 additions & 7 deletions vshard/storage/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,20 @@ local function bucket_generation_increment()
M.bucket_generation_cond:broadcast()
end

local function bucket_transfer_start(bid)
if M.rebalancer_transfering_buckets[bid] then
return nil, lerror.vshard(lerror.code.WRONG_BUCKET, bid,
'transfer is already in progress', nil)
end
M.rebalancer_transfering_buckets[bid] = true
return true
end

local function bucket_transfer_end(bid)
assert(M.rebalancer_transfering_buckets[bid])
M.rebalancer_transfering_buckets[bid] = nil
end

--
-- Handle a bad update of _bucket space.
--
Expand Down Expand Up @@ -1763,9 +1777,13 @@ local function bucket_recv(bucket_id, from, data, opts)
while opts and opts.is_last and M.errinj.ERRINJ_LAST_RECEIVE_DELAY do
lfiber.sleep(0.01)
end
M.rebalancer_transfering_buckets[bucket_id] = true
local status, ret, err = pcall(bucket_recv_xc, bucket_id, from, data, opts)
M.rebalancer_transfering_buckets[bucket_id] = nil
local status, ret, err
status, err = bucket_transfer_start(bucket_id)
if not status then
return nil, err
end
status, ret, err = pcall(bucket_recv_xc, bucket_id, from, data, opts)
bucket_transfer_end(bucket_id)
if status then
if ret then
return ret
Expand Down Expand Up @@ -2155,14 +2173,18 @@ local function bucket_send(bucket_id, destination, opts)
if type(bucket_id) ~= 'number' or type(destination) ~= 'string' then
error('Usage: bucket_send(bucket_id, destination)')
end
M.rebalancer_transfering_buckets[bucket_id] = true
local status, ret, err
status, err = bucket_transfer_start(bucket_id)
if not status then
return nil, err
end
local exception_guard = {}
local status, ret, err = pcall(bucket_send_xc, bucket_id, destination, opts,
exception_guard)
status, ret, err = pcall(bucket_send_xc, bucket_id, destination, opts,
exception_guard)
if exception_guard.drop_rw_lock then
exception_guard.ref.rw_lock = false
end
M.rebalancer_transfering_buckets[bucket_id] = nil
bucket_transfer_end(bucket_id)
if status then
if ret then
return ret
Expand Down

0 comments on commit 2e3c583

Please sign in to comment.