diff --git a/test/storage-luatest/service_info_test.lua b/test/storage-luatest/service_info_test.lua new file mode 100644 index 00000000..ac2ac34a --- /dev/null +++ b/test/storage-luatest/service_info_test.lua @@ -0,0 +1,129 @@ +local t = require('luatest') +local vtest = require('test.luatest_helpers.vtest') +local vutil = require('vshard.util') + +local group_config = {{engine = 'memtx'}, {engine = 'vinyl'}} + +if vutil.feature.memtx_mvcc then + table.insert(group_config, { + engine = 'memtx', memtx_use_mvcc_engine = true + }) + table.insert(group_config, { + engine = 'vinyl', memtx_use_mvcc_engine = true + }) +end + +local test_group = t.group('storage', group_config) + +local cfg_template = { + sharding = { + { + replicas = { + replica_1_a = { + master = true, + }, + }, + }, + { + replicas = { + replica_2_a = { + master = true, + }, + }, + }, + }, + bucket_count = 10 +} +local global_cfg + +test_group.before_all(function(g) + cfg_template.memtx_use_mvcc_engine = g.params.memtx_use_mvcc_engine + global_cfg = vtest.config_new(cfg_template) + + vtest.cluster_new(g, global_cfg) + vtest.cluster_bootstrap(g, global_cfg) + vtest.cluster_rebalancer_disable(g) +end) + +test_group.after_all(function(g) + g.cluster:drop() +end) + +-- +-- Test that services for all background fibers are created +-- and work properly (gh-107). +-- +test_group.test_basic_storage_service_info = function(g) + local uuid = g.replica_1_a:exec(function() + -- Test that all services save states + local info = ivshard.storage.info({with_services = true}) + ilt.assert_not_equals(info.services, nil) + ilt.assert_not_equals(info.services.gc, nil) + ilt.assert_not_equals(info.services.recovery, nil) + ilt.assert_not_equals(info.services.rebalancer, nil) + -- Routes applier service is created as soon as it's needed + ilt.assert_equals(info.services.routes_applier, nil) + + -- Forbid routes_apply service to die + local internal = ivshard.storage.internal + internal.errinj.ERRINJ_APPLY_ROUTES_STOP_DELAY = true + -- Break timeout in order to get error + rawset(_G, 'chunk_timeout', ivconst.REBALANCER_CHUNK_TIMEOUT) + ivconst.REBALANCER_CHUNK_TIMEOUT = 1e-6 + return ivutil.replicaset_uuid() + end) + + g.replica_2_a:exec(function(uuid) + -- Send bucket to create disbalance in + -- order to test routes applier service + local bid = _G.get_first_bucket() + local ok, err = ivshard.storage.bucket_send(bid, uuid) + ilt.assert_equals(err, nil) + ilt.assert(ok) + end, {uuid}) + + vtest.cluster_rebalancer_enable(g) + + g.replica_1_a:exec(function() + local internal = ivshard.storage.internal + local applier_name = 'routes_applier_service' + ivtest.wait_for_not_nil(internal, applier_name) + local service = internal[applier_name] + ivtest.service_wait_for_error(service, 'Timed?[Oo]ut') + + -- Restore everything + ivconst.REBALANCER_CHUNK_TIMEOUT = _G.chunk_timeout + internal.errinj.ERRINJ_APPLY_ROUTES_STOP_DELAY = false + ivtest.wait_for_nil(internal, applier_name) + internal.errinj.ERRINJ_APPLY_ROUTES_STOP_DELAY = true + + -- All buckets must be recovered to the ACTIVE state, + -- otherwise rebalancer won't work. + ivshard.storage.recovery_wakeup() + service = ivshard.storage.internal.recovery_service + ivtest.service_wait_for_new_ok(service) + end) + + g.replica_2_a:exec(function() + ivshard.storage.recovery_wakeup() + local service = ivshard.storage.internal.recovery_service + ivtest.service_wait_for_new_ok(service) + end) + + g.replica_1_a:exec(function() + local internal = ivshard.storage.internal + local applier_name = 'routes_applier_service' + ivtest.wait_for_not_nil(internal, applier_name, + {on_yield = ivshard.storage.rebalancer_wakeup}) + + -- Everything is all right now + ivtest.service_wait_for_ok(internal[applier_name]) + internal.errinj.ERRINJ_APPLY_ROUTES_STOP_DELAY = false + + ivtest.service_wait_for_ok(internal.rebalancer_service, + {on_yield = ivshard.storage.rebalancer_wakeup}) + end) + + -- Cleanup + vtest.cluster_rebalancer_disable(g) +end diff --git a/test/storage-luatest/storage_1_1_test.lua b/test/storage-luatest/storage_1_1_test.lua index 53ebbc95..0350164d 100644 --- a/test/storage-luatest/storage_1_1_test.lua +++ b/test/storage-luatest/storage_1_1_test.lua @@ -263,80 +263,86 @@ test_group.test_on_bucket_event = function(g) end -- --- Test that services for all background fibers are created --- and work properly (gh-107). +-- gh-434: bucket_send() shouldn't change the transfer flags if a transfer for +-- the same bucket is already in progress. -- -test_group.test_basic_storage_service_info = function(g) - local uuid = g.replica_1_a:exec(function() - -- Test that all services save states - local info = ivshard.storage.info({with_services = true}) - ilt.assert_not_equals(info.services, nil) - ilt.assert_not_equals(info.services.gc, nil) - ilt.assert_not_equals(info.services.recovery, nil) - ilt.assert_not_equals(info.services.rebalancer, nil) - -- Routes applier service is created as soon as it's needed - ilt.assert_equals(info.services.routes_applier, nil) - - -- Forbid routes_apply service to die - local internal = ivshard.storage.internal - internal.errinj.ERRINJ_APPLY_ROUTES_STOP_DELAY = true - -- Break timeout in order to get error - rawset(_G, 'chunk_timeout', ivconst.REBALANCER_CHUNK_TIMEOUT) - ivconst.REBALANCER_CHUNK_TIMEOUT = 1e-6 - return ivutil.replicaset_uuid() +test_group.test_bucket_double_send = function(g) + g.replica_2_a:exec(function() + ivshard.storage.internal.errinj.ERRINJ_LAST_RECEIVE_DELAY = true end) - - g.replica_2_a:exec(function(uuid) - -- Send bucket to create disbalance in - -- order to test routes applier service + local bid = g.replica_1_a:exec(function(uuid) + local transfer_flags = + ivshard.storage.internal.rebalancer_transfering_buckets local bid = _G.get_first_bucket() + local f = ifiber.create(ivshard.storage.bucket_send, bid, uuid, + {timeout = 100000}) + f:set_joinable(true) + rawset(_G, 'test_f', f) + ilt.assert(transfer_flags[bid]) local ok, err = ivshard.storage.bucket_send(bid, uuid) - ilt.assert_equals(err, nil) - ilt.assert(ok) - end, {uuid}) - - vtest.cluster_rebalancer_enable(g) - - g.replica_1_a:exec(function() - local internal = ivshard.storage.internal - local applier_name = 'routes_applier_service' - ivtest.wait_for_not_nil(internal, applier_name) - local service = internal[applier_name] - ivtest.service_wait_for_error(service, 'Timed?[Oo]ut') - - -- Restore everything - ivconst.REBALANCER_CHUNK_TIMEOUT = _G.chunk_timeout - internal.errinj.ERRINJ_APPLY_ROUTES_STOP_DELAY = false - ivtest.wait_for_nil(internal, applier_name) - internal.errinj.ERRINJ_APPLY_ROUTES_STOP_DELAY = true - - -- All buckets must be recovered to the ACTIVE state, - -- otherwise rebalancer won't work. - ivshard.storage.recovery_wakeup() - service = ivshard.storage.internal.recovery_service - ivtest.service_wait_for_new_ok(service) - end) - + ilt.assert_equals(err.code, iverror.code.WRONG_BUCKET) + ilt.assert(not ok) + -- Before the bug was fixed, the second send would clear the flag, thus + -- leaving the first sending unprotected. + ilt.assert(transfer_flags[bid]) + return bid + end, {g.replica_2_a:replicaset_uuid()}) g.replica_2_a:exec(function() - ivshard.storage.recovery_wakeup() - local service = ivshard.storage.internal.recovery_service - ivtest.service_wait_for_new_ok(service) - end) - - g.replica_1_a:exec(function() - local internal = ivshard.storage.internal - local applier_name = 'routes_applier_service' - ivtest.wait_for_not_nil(internal, applier_name, - {on_yield = ivshard.storage.rebalancer_wakeup}) - - -- Everything is all right now - ivtest.service_wait_for_ok(internal[applier_name]) - internal.errinj.ERRINJ_APPLY_ROUTES_STOP_DELAY = false - - ivtest.service_wait_for_ok(internal.rebalancer_service, - {on_yield = ivshard.storage.rebalancer_wakeup}) + ivshard.storage.internal.errinj.ERRINJ_LAST_RECEIVE_DELAY = false end) + g.replica_1_a:exec(function(bid) + local transfer_flags = + ivshard.storage.internal.rebalancer_transfering_buckets + local f = _G.test_f + _G.test_f = nil + local f_ok, ok, err = f:join(iwait_timeout) + ilt.assert_equals(err, nil) + ilt.assert(f_ok) + ilt.assert(ok) + ilt.assert(not transfer_flags[bid]) + _G.bucket_gc_wait() + end, {bid}) + -- + -- Cleanup. + -- + g.replica_2_a:exec(function(bid, uuid) + local ok, err = ivshard.storage.bucket_send(bid, uuid, + {timeout = iwait_timeout}) + ilt.assert_equals(err, nil) + ilt.assert(ok) + _G.bucket_gc_wait() + end, {bid, g.replica_1_a:replicaset_uuid()}) +end - -- Cleanup - vtest.cluster_rebalancer_disable(g) +-- +-- gh-434: bucket_recv() shouldn't change the transfer flags if a transfer for +-- the same bucket is already in progress. +-- +test_group.test_bucket_double_recv = function(g) + g.replica_2_a:exec(function(bid, uuid) + local transfer_flags = + ivshard.storage.internal.rebalancer_transfering_buckets + local f = ifiber.create(ivshard.storage.bucket_recv, bid, uuid, {}) + f:set_joinable(true) + ilt.assert(transfer_flags[bid]) + local ok, err = ivshard.storage.bucket_recv(bid, uuid, {}) + -- Before the bug was fixed, the second recv would clear the flag, thus + -- leaving the first recv unprotected. + ilt.assert(transfer_flags[bid]) + ilt.assert_equals(err.code, iverror.code.WRONG_BUCKET) + ilt.assert(not ok) + local f_ok + f_ok, ok, err = f:join(iwait_timeout) + ilt.assert(not transfer_flags[bid]) + ilt.assert_equals(err, nil) + ilt.assert(f_ok) + ilt.assert(ok) + -- + -- Cleanup. + -- + _G.bucket_recovery_wait() + _G.bucket_gc_wait() + ilt.assert_equals(box.space._bucket:get{bid}, nil) + end, {vtest.storage_first_bucket(g.replica_1_a), + g.replica_1_a:replicaset_uuid()}) end diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua index 2213dc55..899f6f0d 100644 --- a/vshard/storage/init.lua +++ b/vshard/storage/init.lua @@ -328,6 +328,20 @@ local function bucket_generation_increment() M.bucket_generation_cond:broadcast() end +local function bucket_transfer_start(bid) + if M.rebalancer_transfering_buckets[bid] then + return nil, lerror.vshard(lerror.code.WRONG_BUCKET, bid, + 'transfer is already in progress', nil) + end + M.rebalancer_transfering_buckets[bid] = true + return true +end + +local function bucket_transfer_end(bid) + assert(M.rebalancer_transfering_buckets[bid]) + M.rebalancer_transfering_buckets[bid] = nil +end + -- -- Handle a bad update of _bucket space. -- @@ -1763,9 +1777,13 @@ local function bucket_recv(bucket_id, from, data, opts) while opts and opts.is_last and M.errinj.ERRINJ_LAST_RECEIVE_DELAY do lfiber.sleep(0.01) end - M.rebalancer_transfering_buckets[bucket_id] = true - local status, ret, err = pcall(bucket_recv_xc, bucket_id, from, data, opts) - M.rebalancer_transfering_buckets[bucket_id] = nil + local status, ret, err + status, err = bucket_transfer_start(bucket_id) + if not status then + return nil, err + end + status, ret, err = pcall(bucket_recv_xc, bucket_id, from, data, opts) + bucket_transfer_end(bucket_id) if status then if ret then return ret @@ -2155,14 +2173,18 @@ local function bucket_send(bucket_id, destination, opts) if type(bucket_id) ~= 'number' or type(destination) ~= 'string' then error('Usage: bucket_send(bucket_id, destination)') end - M.rebalancer_transfering_buckets[bucket_id] = true + local status, ret, err + status, err = bucket_transfer_start(bucket_id) + if not status then + return nil, err + end local exception_guard = {} - local status, ret, err = pcall(bucket_send_xc, bucket_id, destination, opts, - exception_guard) + status, ret, err = pcall(bucket_send_xc, bucket_id, destination, opts, + exception_guard) if exception_guard.drop_rw_lock then exception_guard.ref.rw_lock = false end - M.rebalancer_transfering_buckets[bucket_id] = nil + bucket_transfer_end(bucket_id) if status then if ret then return ret