diff --git a/test/storage-luatest/storage_1_1_test.lua b/test/storage-luatest/storage_1_1_test.lua index 740e635c..0350164d 100644 --- a/test/storage-luatest/storage_1_1_test.lua +++ b/test/storage-luatest/storage_1_1_test.lua @@ -261,3 +261,88 @@ test_group.test_on_bucket_event = function(g) box.space.data2:drop() end) end + +-- +-- gh-434: bucket_send() shouldn't change the transfer flags if a transfer for +-- the same bucket is already in progress. +-- +test_group.test_bucket_double_send = function(g) + g.replica_2_a:exec(function() + ivshard.storage.internal.errinj.ERRINJ_LAST_RECEIVE_DELAY = true + end) + local bid = g.replica_1_a:exec(function(uuid) + local transfer_flags = + ivshard.storage.internal.rebalancer_transfering_buckets + local bid = _G.get_first_bucket() + local f = ifiber.create(ivshard.storage.bucket_send, bid, uuid, + {timeout = 100000}) + f:set_joinable(true) + rawset(_G, 'test_f', f) + ilt.assert(transfer_flags[bid]) + local ok, err = ivshard.storage.bucket_send(bid, uuid) + ilt.assert_equals(err.code, iverror.code.WRONG_BUCKET) + ilt.assert(not ok) + -- Before the bug was fixed, the second send would clear the flag, thus + -- leaving the first sending unprotected. + ilt.assert(transfer_flags[bid]) + return bid + end, {g.replica_2_a:replicaset_uuid()}) + g.replica_2_a:exec(function() + ivshard.storage.internal.errinj.ERRINJ_LAST_RECEIVE_DELAY = false + end) + g.replica_1_a:exec(function(bid) + local transfer_flags = + ivshard.storage.internal.rebalancer_transfering_buckets + local f = _G.test_f + _G.test_f = nil + local f_ok, ok, err = f:join(iwait_timeout) + ilt.assert_equals(err, nil) + ilt.assert(f_ok) + ilt.assert(ok) + ilt.assert(not transfer_flags[bid]) + _G.bucket_gc_wait() + end, {bid}) + -- + -- Cleanup. + -- + g.replica_2_a:exec(function(bid, uuid) + local ok, err = ivshard.storage.bucket_send(bid, uuid, + {timeout = iwait_timeout}) + ilt.assert_equals(err, nil) + ilt.assert(ok) + _G.bucket_gc_wait() + end, {bid, g.replica_1_a:replicaset_uuid()}) +end + +-- +-- gh-434: bucket_recv() shouldn't change the transfer flags if a transfer for +-- the same bucket is already in progress. +-- +test_group.test_bucket_double_recv = function(g) + g.replica_2_a:exec(function(bid, uuid) + local transfer_flags = + ivshard.storage.internal.rebalancer_transfering_buckets + local f = ifiber.create(ivshard.storage.bucket_recv, bid, uuid, {}) + f:set_joinable(true) + ilt.assert(transfer_flags[bid]) + local ok, err = ivshard.storage.bucket_recv(bid, uuid, {}) + -- Before the bug was fixed, the second recv would clear the flag, thus + -- leaving the first recv unprotected. + ilt.assert(transfer_flags[bid]) + ilt.assert_equals(err.code, iverror.code.WRONG_BUCKET) + ilt.assert(not ok) + local f_ok + f_ok, ok, err = f:join(iwait_timeout) + ilt.assert(not transfer_flags[bid]) + ilt.assert_equals(err, nil) + ilt.assert(f_ok) + ilt.assert(ok) + -- + -- Cleanup. + -- + _G.bucket_recovery_wait() + _G.bucket_gc_wait() + ilt.assert_equals(box.space._bucket:get{bid}, nil) + end, {vtest.storage_first_bucket(g.replica_1_a), + g.replica_1_a:replicaset_uuid()}) +end diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua index 2213dc55..899f6f0d 100644 --- a/vshard/storage/init.lua +++ b/vshard/storage/init.lua @@ -328,6 +328,20 @@ local function bucket_generation_increment() M.bucket_generation_cond:broadcast() end +local function bucket_transfer_start(bid) + if M.rebalancer_transfering_buckets[bid] then + return nil, lerror.vshard(lerror.code.WRONG_BUCKET, bid, + 'transfer is already in progress', nil) + end + M.rebalancer_transfering_buckets[bid] = true + return true +end + +local function bucket_transfer_end(bid) + assert(M.rebalancer_transfering_buckets[bid]) + M.rebalancer_transfering_buckets[bid] = nil +end + -- -- Handle a bad update of _bucket space. -- @@ -1763,9 +1777,13 @@ local function bucket_recv(bucket_id, from, data, opts) while opts and opts.is_last and M.errinj.ERRINJ_LAST_RECEIVE_DELAY do lfiber.sleep(0.01) end - M.rebalancer_transfering_buckets[bucket_id] = true - local status, ret, err = pcall(bucket_recv_xc, bucket_id, from, data, opts) - M.rebalancer_transfering_buckets[bucket_id] = nil + local status, ret, err + status, err = bucket_transfer_start(bucket_id) + if not status then + return nil, err + end + status, ret, err = pcall(bucket_recv_xc, bucket_id, from, data, opts) + bucket_transfer_end(bucket_id) if status then if ret then return ret @@ -2155,14 +2173,18 @@ local function bucket_send(bucket_id, destination, opts) if type(bucket_id) ~= 'number' or type(destination) ~= 'string' then error('Usage: bucket_send(bucket_id, destination)') end - M.rebalancer_transfering_buckets[bucket_id] = true + local status, ret, err + status, err = bucket_transfer_start(bucket_id) + if not status then + return nil, err + end local exception_guard = {} - local status, ret, err = pcall(bucket_send_xc, bucket_id, destination, opts, - exception_guard) + status, ret, err = pcall(bucket_send_xc, bucket_id, destination, opts, + exception_guard) if exception_guard.drop_rw_lock then exception_guard.ref.rw_lock = false end - M.rebalancer_transfering_buckets[bucket_id] = nil + bucket_transfer_end(bucket_id) if status then if ret then return ret