Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: fix send/recv unprotected when 2 at once #441

Merged
merged 2 commits into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 129 additions & 0 deletions test/storage-luatest/service_info_test.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
local t = require('luatest')
local vtest = require('test.luatest_helpers.vtest')
local vutil = require('vshard.util')

local group_config = {{engine = 'memtx'}, {engine = 'vinyl'}}

if vutil.feature.memtx_mvcc then
table.insert(group_config, {
engine = 'memtx', memtx_use_mvcc_engine = true
})
table.insert(group_config, {
engine = 'vinyl', memtx_use_mvcc_engine = true
})
end

local test_group = t.group('storage', group_config)

local cfg_template = {
sharding = {
{
replicas = {
replica_1_a = {
master = true,
},
},
},
{
replicas = {
replica_2_a = {
master = true,
},
},
},
},
bucket_count = 10
}
local global_cfg

test_group.before_all(function(g)
cfg_template.memtx_use_mvcc_engine = g.params.memtx_use_mvcc_engine
global_cfg = vtest.config_new(cfg_template)

vtest.cluster_new(g, global_cfg)
vtest.cluster_bootstrap(g, global_cfg)
vtest.cluster_rebalancer_disable(g)
end)

test_group.after_all(function(g)
g.cluster:drop()
end)

--
-- Test that services for all background fibers are created
-- and work properly (gh-107).
--
test_group.test_basic_storage_service_info = function(g)
local uuid = g.replica_1_a:exec(function()
-- Test that all services save states
local info = ivshard.storage.info({with_services = true})
ilt.assert_not_equals(info.services, nil)
ilt.assert_not_equals(info.services.gc, nil)
ilt.assert_not_equals(info.services.recovery, nil)
ilt.assert_not_equals(info.services.rebalancer, nil)
-- Routes applier service is created as soon as it's needed
ilt.assert_equals(info.services.routes_applier, nil)

-- Forbid routes_apply service to die
local internal = ivshard.storage.internal
internal.errinj.ERRINJ_APPLY_ROUTES_STOP_DELAY = true
-- Break timeout in order to get error
rawset(_G, 'chunk_timeout', ivconst.REBALANCER_CHUNK_TIMEOUT)
ivconst.REBALANCER_CHUNK_TIMEOUT = 1e-6
return ivutil.replicaset_uuid()
end)

g.replica_2_a:exec(function(uuid)
-- Send bucket to create disbalance in
-- order to test routes applier service
local bid = _G.get_first_bucket()
local ok, err = ivshard.storage.bucket_send(bid, uuid)
ilt.assert_equals(err, nil)
ilt.assert(ok)
end, {uuid})

vtest.cluster_rebalancer_enable(g)

g.replica_1_a:exec(function()
local internal = ivshard.storage.internal
local applier_name = 'routes_applier_service'
ivtest.wait_for_not_nil(internal, applier_name)
local service = internal[applier_name]
ivtest.service_wait_for_error(service, 'Timed?[Oo]ut')

-- Restore everything
ivconst.REBALANCER_CHUNK_TIMEOUT = _G.chunk_timeout
internal.errinj.ERRINJ_APPLY_ROUTES_STOP_DELAY = false
ivtest.wait_for_nil(internal, applier_name)
internal.errinj.ERRINJ_APPLY_ROUTES_STOP_DELAY = true

-- All buckets must be recovered to the ACTIVE state,
-- otherwise rebalancer won't work.
ivshard.storage.recovery_wakeup()
service = ivshard.storage.internal.recovery_service
ivtest.service_wait_for_new_ok(service)
end)

g.replica_2_a:exec(function()
ivshard.storage.recovery_wakeup()
local service = ivshard.storage.internal.recovery_service
ivtest.service_wait_for_new_ok(service)
end)

g.replica_1_a:exec(function()
local internal = ivshard.storage.internal
local applier_name = 'routes_applier_service'
ivtest.wait_for_not_nil(internal, applier_name,
{on_yield = ivshard.storage.rebalancer_wakeup})

-- Everything is all right now
ivtest.service_wait_for_ok(internal[applier_name])
internal.errinj.ERRINJ_APPLY_ROUTES_STOP_DELAY = false

ivtest.service_wait_for_ok(internal.rebalancer_service,
{on_yield = ivshard.storage.rebalancer_wakeup})
end)

-- Cleanup
vtest.cluster_rebalancer_disable(g)
end
144 changes: 75 additions & 69 deletions test/storage-luatest/storage_1_1_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -263,80 +263,86 @@ test_group.test_on_bucket_event = function(g)
end

Serpentian marked this conversation as resolved.
Show resolved Hide resolved
--
-- Test that services for all background fibers are created
-- and work properly (gh-107).
-- gh-434: bucket_send() shouldn't change the transfer flags if a transfer for
-- the same bucket is already in progress.
--
test_group.test_basic_storage_service_info = function(g)
local uuid = g.replica_1_a:exec(function()
-- Test that all services save states
local info = ivshard.storage.info({with_services = true})
ilt.assert_not_equals(info.services, nil)
ilt.assert_not_equals(info.services.gc, nil)
ilt.assert_not_equals(info.services.recovery, nil)
ilt.assert_not_equals(info.services.rebalancer, nil)
-- Routes applier service is created as soon as it's needed
ilt.assert_equals(info.services.routes_applier, nil)

-- Forbid routes_apply service to die
local internal = ivshard.storage.internal
internal.errinj.ERRINJ_APPLY_ROUTES_STOP_DELAY = true
-- Break timeout in order to get error
rawset(_G, 'chunk_timeout', ivconst.REBALANCER_CHUNK_TIMEOUT)
ivconst.REBALANCER_CHUNK_TIMEOUT = 1e-6
return ivutil.replicaset_uuid()
test_group.test_bucket_double_send = function(g)
g.replica_2_a:exec(function()
ivshard.storage.internal.errinj.ERRINJ_LAST_RECEIVE_DELAY = true
end)

g.replica_2_a:exec(function(uuid)
-- Send bucket to create disbalance in
-- order to test routes applier service
local bid = g.replica_1_a:exec(function(uuid)
local transfer_flags =
ivshard.storage.internal.rebalancer_transfering_buckets
local bid = _G.get_first_bucket()
local f = ifiber.create(ivshard.storage.bucket_send, bid, uuid,
{timeout = 100000})
f:set_joinable(true)
rawset(_G, 'test_f', f)
ilt.assert(transfer_flags[bid])
local ok, err = ivshard.storage.bucket_send(bid, uuid)
ilt.assert_equals(err, nil)
ilt.assert(ok)
end, {uuid})

vtest.cluster_rebalancer_enable(g)

g.replica_1_a:exec(function()
local internal = ivshard.storage.internal
local applier_name = 'routes_applier_service'
ivtest.wait_for_not_nil(internal, applier_name)
local service = internal[applier_name]
ivtest.service_wait_for_error(service, 'Timed?[Oo]ut')

-- Restore everything
ivconst.REBALANCER_CHUNK_TIMEOUT = _G.chunk_timeout
internal.errinj.ERRINJ_APPLY_ROUTES_STOP_DELAY = false
ivtest.wait_for_nil(internal, applier_name)
internal.errinj.ERRINJ_APPLY_ROUTES_STOP_DELAY = true

-- All buckets must be recovered to the ACTIVE state,
-- otherwise rebalancer won't work.
ivshard.storage.recovery_wakeup()
service = ivshard.storage.internal.recovery_service
ivtest.service_wait_for_new_ok(service)
end)

ilt.assert_equals(err.code, iverror.code.WRONG_BUCKET)
ilt.assert(not ok)
-- Before the bug was fixed, the second send would clear the flag, thus
-- leaving the first sending unprotected.
ilt.assert(transfer_flags[bid])
return bid
end, {g.replica_2_a:replicaset_uuid()})
g.replica_2_a:exec(function()
ivshard.storage.recovery_wakeup()
local service = ivshard.storage.internal.recovery_service
ivtest.service_wait_for_new_ok(service)
end)

g.replica_1_a:exec(function()
local internal = ivshard.storage.internal
local applier_name = 'routes_applier_service'
ivtest.wait_for_not_nil(internal, applier_name,
{on_yield = ivshard.storage.rebalancer_wakeup})

-- Everything is all right now
ivtest.service_wait_for_ok(internal[applier_name])
internal.errinj.ERRINJ_APPLY_ROUTES_STOP_DELAY = false

ivtest.service_wait_for_ok(internal.rebalancer_service,
{on_yield = ivshard.storage.rebalancer_wakeup})
ivshard.storage.internal.errinj.ERRINJ_LAST_RECEIVE_DELAY = false
end)
g.replica_1_a:exec(function(bid)
local transfer_flags =
ivshard.storage.internal.rebalancer_transfering_buckets
local f = _G.test_f
_G.test_f = nil
local f_ok, ok, err = f:join(iwait_timeout)
ilt.assert_equals(err, nil)
ilt.assert(f_ok)
ilt.assert(ok)
ilt.assert(not transfer_flags[bid])
_G.bucket_gc_wait()
end, {bid})
--
-- Cleanup.
--
g.replica_2_a:exec(function(bid, uuid)
local ok, err = ivshard.storage.bucket_send(bid, uuid,
Serpentian marked this conversation as resolved.
Show resolved Hide resolved
{timeout = iwait_timeout})
ilt.assert_equals(err, nil)
ilt.assert(ok)
_G.bucket_gc_wait()
end, {bid, g.replica_1_a:replicaset_uuid()})
end

-- Cleanup
vtest.cluster_rebalancer_disable(g)
--
-- gh-434: bucket_recv() shouldn't change the transfer flags if a transfer for
-- the same bucket is already in progress.
--
test_group.test_bucket_double_recv = function(g)
g.replica_2_a:exec(function(bid, uuid)
local transfer_flags =
ivshard.storage.internal.rebalancer_transfering_buckets
local f = ifiber.create(ivshard.storage.bucket_recv, bid, uuid, {})
f:set_joinable(true)
Serpentian marked this conversation as resolved.
Show resolved Hide resolved
ilt.assert(transfer_flags[bid])
local ok, err = ivshard.storage.bucket_recv(bid, uuid, {})
-- Before the bug was fixed, the second recv would clear the flag, thus
-- leaving the first recv unprotected.
ilt.assert(transfer_flags[bid])
ilt.assert_equals(err.code, iverror.code.WRONG_BUCKET)
ilt.assert(not ok)
local f_ok
f_ok, ok, err = f:join(iwait_timeout)
ilt.assert(not transfer_flags[bid])
ilt.assert_equals(err, nil)
ilt.assert(f_ok)
ilt.assert(ok)
--
-- Cleanup.
--
_G.bucket_recovery_wait()
_G.bucket_gc_wait()
ilt.assert_equals(box.space._bucket:get{bid}, nil)
end, {vtest.storage_first_bucket(g.replica_1_a),
g.replica_1_a:replicaset_uuid()})
end
36 changes: 29 additions & 7 deletions vshard/storage/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,20 @@ local function bucket_generation_increment()
M.bucket_generation_cond:broadcast()
end

local function bucket_transfer_start(bid)
if M.rebalancer_transfering_buckets[bid] then
return nil, lerror.vshard(lerror.code.WRONG_BUCKET, bid,
'transfer is already in progress', nil)
end
M.rebalancer_transfering_buckets[bid] = true
return true
end

local function bucket_transfer_end(bid)
assert(M.rebalancer_transfering_buckets[bid])
M.rebalancer_transfering_buckets[bid] = nil
end

--
-- Handle a bad update of _bucket space.
--
Expand Down Expand Up @@ -1763,9 +1777,13 @@ local function bucket_recv(bucket_id, from, data, opts)
while opts and opts.is_last and M.errinj.ERRINJ_LAST_RECEIVE_DELAY do
lfiber.sleep(0.01)
end
M.rebalancer_transfering_buckets[bucket_id] = true
local status, ret, err = pcall(bucket_recv_xc, bucket_id, from, data, opts)
M.rebalancer_transfering_buckets[bucket_id] = nil
local status, ret, err
status, err = bucket_transfer_start(bucket_id)
if not status then
return nil, err
end
status, ret, err = pcall(bucket_recv_xc, bucket_id, from, data, opts)
bucket_transfer_end(bucket_id)
if status then
if ret then
return ret
Expand Down Expand Up @@ -2155,14 +2173,18 @@ local function bucket_send(bucket_id, destination, opts)
if type(bucket_id) ~= 'number' or type(destination) ~= 'string' then
error('Usage: bucket_send(bucket_id, destination)')
end
M.rebalancer_transfering_buckets[bucket_id] = true
local status, ret, err
status, err = bucket_transfer_start(bucket_id)
if not status then
return nil, err
end
local exception_guard = {}
local status, ret, err = pcall(bucket_send_xc, bucket_id, destination, opts,
exception_guard)
status, ret, err = pcall(bucket_send_xc, bucket_id, destination, opts,
exception_guard)
if exception_guard.drop_rw_lock then
exception_guard.ref.rw_lock = false
end
M.rebalancer_transfering_buckets[bucket_id] = nil
bucket_transfer_end(bucket_id)
if status then
if ret then
return ret
Expand Down
Loading