Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions test/rebalancer/rebalancer.result
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ test_run:switch('box_1_a')
---
- true
...
-- The ratelimiter object stores its state throughout the entire work of
-- rebalancer service. As a result, after disabling of rebalancer, ratelimiter
-- will continue to hide errors with errcodes which occurred before disabling
-- of rebalancer. Because of this we can't test the 6th scenario ("Replica _
-- has receiving buckets" will hided by "Rebalancer is not active ..." error).
-- To fix it we should turn off the ratelimiter.
vshard.consts.LOG_RATELIMIT_INTERVAL = 0
---
...
_bucket = box.space._bucket
---
...
Expand Down Expand Up @@ -149,7 +158,7 @@ test_run:switch('box_1_a')
vshard.storage.rebalancer_enable()
---
...
wait_rebalancer_state("Rebalance routes are sent", test_run)
wait_rebalancer_state("The following rebalancer routes were sent", test_run)
---
...
wait_rebalancer_state('The cluster is balanced ok', test_run)
Expand Down Expand Up @@ -239,7 +248,7 @@ cfg.rebalancer_disbalance_threshold = 0.01
vshard.storage.cfg(cfg, util.name_to_uuid.box_1_a)
---
...
wait_rebalancer_state('Rebalance routes are sent', test_run)
wait_rebalancer_state('The following rebalancer routes were sent', test_run)
---
...
wait_rebalancer_state('The cluster is balanced ok', test_run)
Expand Down Expand Up @@ -318,12 +327,16 @@ _bucket:update({150}, {{'=', 2, vshard.consts.BUCKET.RECEIVING}})
---
- [150, 'receiving']
...
wait_rebalancer_state("Some buckets are not active", test_run)
-- We should not check the certain status of buckets (e.g. receiving) because
-- in rare cases we accidentally can get the wrong one. For example we wait for
-- "receiving" status, but get "garbage" due to some previous rebalancer error.
err_msg = string.format('Replica .* has receiving buckets during rebalancing')
---
...
wait_rebalancer_state('Error during downloading rebalancer states:.*' .. \
err_msg, test_run) \
_bucket:update({150}, {{'=', 2, vshard.consts.BUCKET.ACTIVE}})
---
- [150, 'active']
...
vshard.storage.sync()
---
Expand Down
18 changes: 15 additions & 3 deletions test/rebalancer/rebalancer.test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage(\'%s\'
--

test_run:switch('box_1_a')
-- The ratelimiter object stores its state throughout the entire work of
-- rebalancer service. As a result, after disabling of rebalancer, ratelimiter
-- will continue to hide errors with errcodes which occurred before disabling
-- of rebalancer. Because of this we can't test the 6th scenario ("Replica _
-- has receiving buckets" will hided by "Rebalancer is not active ..." error).
-- To fix it we should turn off the ratelimiter.
vshard.consts.LOG_RATELIMIT_INTERVAL = 0
_bucket = box.space._bucket
vshard.storage.cfg(cfg, util.name_to_uuid.box_1_a)

Expand Down Expand Up @@ -78,7 +85,7 @@ util.map_bucket_protection(test_run, {REPLICASET_1}, true)

test_run:switch('box_1_a')
vshard.storage.rebalancer_enable()
wait_rebalancer_state("Rebalance routes are sent", test_run)
wait_rebalancer_state("The following rebalancer routes were sent", test_run)

wait_rebalancer_state('The cluster is balanced ok', test_run)
_bucket.index.status:count({vshard.consts.BUCKET.ACTIVE})
Expand Down Expand Up @@ -118,7 +125,7 @@ _bucket.index.status:count({vshard.consts.BUCKET.ACTIVE})
-- Return 1%.
cfg.rebalancer_disbalance_threshold = 0.01
vshard.storage.cfg(cfg, util.name_to_uuid.box_1_a)
wait_rebalancer_state('Rebalance routes are sent', test_run)
wait_rebalancer_state('The following rebalancer routes were sent', test_run)
wait_rebalancer_state('The cluster is balanced ok', test_run)
_bucket.index.status:count({vshard.consts.BUCKET.ACTIVE})
_bucket.index.status:min({vshard.consts.BUCKET.ACTIVE})
Expand Down Expand Up @@ -156,7 +163,12 @@ util.map_bucket_protection(test_run, {REPLICASET_1}, false)
test_run:switch('box_1_a')
vshard.storage.rebalancer_enable()
_bucket:update({150}, {{'=', 2, vshard.consts.BUCKET.RECEIVING}})
wait_rebalancer_state("Some buckets are not active", test_run)
-- We should not check the certain status of buckets (e.g. receiving) because
-- in rare cases we accidentally can get the wrong one. For example we wait for
-- "receiving" status, but get "garbage" due to some previous rebalancer error.
err_msg = string.format('Replica .* has receiving buckets during rebalancing')
wait_rebalancer_state('Error during downloading rebalancer states:.*' .. \
err_msg, test_run) \
_bucket:update({150}, {{'=', 2, vshard.consts.BUCKET.ACTIVE}})
vshard.storage.sync()

Expand Down
4 changes: 2 additions & 2 deletions test/rebalancer/stress_add_remove_several_rs.result
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ add_replicaset()
vshard.storage.cfg(cfg, util.name_to_uuid.box_1_a)
---
...
wait_rebalancer_state('Rebalance routes are sent', test_run)
wait_rebalancer_state('The following rebalancer routes were sent', test_run)
---
...
-- Now, add a second replicaset.
Expand Down Expand Up @@ -422,7 +422,7 @@ remove_second_replicaset_first_stage()
vshard.storage.cfg(cfg, util.name_to_uuid.box_1_a)
---
...
wait_rebalancer_state('Rebalance routes are sent', test_run)
wait_rebalancer_state('The following rebalancer routes were sent', test_run)
---
...
-- Rebalancing has been started - now remove second replicaset.
Expand Down
4 changes: 2 additions & 2 deletions test/rebalancer/stress_add_remove_several_rs.test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ fiber.sleep(0.5)
test_run:switch('box_1_a')
add_replicaset()
vshard.storage.cfg(cfg, util.name_to_uuid.box_1_a)
wait_rebalancer_state('Rebalance routes are sent', test_run)
wait_rebalancer_state('The following rebalancer routes were sent', test_run)

-- Now, add a second replicaset.

Expand Down Expand Up @@ -153,7 +153,7 @@ fiber.sleep(0.5)
test_run:switch('box_1_a')
remove_second_replicaset_first_stage()
vshard.storage.cfg(cfg, util.name_to_uuid.box_1_a)
wait_rebalancer_state('Rebalance routes are sent', test_run)
wait_rebalancer_state('The following rebalancer routes were sent', test_run)
-- Rebalancing has been started - now remove second replicaset.
remove_replicaset_first_stage()
vshard.storage.cfg(cfg, util.name_to_uuid.box_1_a)
Expand Down
153 changes: 153 additions & 0 deletions test/storage-luatest/storage_1_1_1_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,18 @@ test_group.before_all(function(g)
vtest.cluster_new(g, global_cfg)
vtest.cluster_bootstrap(g, global_cfg)
vtest.cluster_rebalancer_disable(g)
vtest.cluster_wait_vclock_all(g)

vtest.cluster_exec_each_master(g, function()
box.schema.create_space('test_space')
box.space.test_space:format({
{name = 'pk', type = 'unsigned'},
{name = 'bucket_id', type = 'unsigned'},
})
box.space.test_space:create_index('primary', {parts = {'pk'}})
box.space.test_space:create_index(
'bucket_id', {parts = {'bucket_id'}, unique = false})
end)
end)

test_group.after_all(function(g)
Expand Down Expand Up @@ -101,3 +113,144 @@ test_group.test_manual_bucket_send_doubled_buckets = function(g)
ilt.assert_equals(box.space._bucket:get(bid), nil)
end, {bid})
end

local function start_partial_bucket_move(src_storage, dest_storage, bucket_id)
src_storage:exec(function(bucket_id, replicaset_id)
local res, err = ivshard.storage.bucket_send(bucket_id, replicaset_id)
t.assert_not(res)
t.assert(err)
-- The bucket on src_storage must be in "sending" state. The
-- recovery service on src_storage should not erase this bucket.
t.assert_equals(box.space._bucket:get(bucket_id).status, 'sending')
end, {bucket_id, dest_storage:replicaset_uuid()})

dest_storage:exec(function(bucket_id)
t.helpers.retrying({timeout = 10}, function()
-- The recovery service on dest_storage should clear this bucket.
t.assert_equals(box.space._bucket:select(bucket_id), {})
end)
end, {bucket_id})
end

local function wait_for_bucket_is_transferred(src_storage, dest_storage,
bucket_id)
src_storage:exec(function(bucket_id)
t.helpers.retrying({timeout = 10}, function()
t.assert_equals(box.space._bucket:select(bucket_id), {})
end)
end, {bucket_id})
dest_storage:exec(function(bucket_id)
t.helpers.retrying({timeout = 10}, function()
t.assert_equals(box.space._bucket:get(bucket_id).status, 'active')
end)
end, {bucket_id})
end

local function move_bucket(src_storage, dest_storage, bucket_id)
src_storage:exec(function(bucket_id, replicaset_id)
t.helpers.retrying({timeout = 60}, function()
local res, err = ivshard.storage.bucket_send(bucket_id,
replicaset_id)
t.assert_not(err)
t.assert(res)
end)
end, {bucket_id, dest_storage:replicaset_uuid()})
wait_for_bucket_is_transferred(src_storage, dest_storage, bucket_id)
end

--
-- Reduce spam of "Finish bucket recovery step" logs and add logging of
-- recovered buckets in recovery service (gh-212).
--
test_group.test_no_logs_while_unsuccess_recovery = function(g)
g.replica_2_a:exec(function()
ivshard.storage.internal.errinj.ERRINJ_RECEIVE_PARTIALLY = true
rawset(_G, 'old_call', ivshard.storage._call)
ivshard.storage._call = function(service_name, ...)
if service_name == 'recovery_bucket_stat' then
return error('TimedOut')
end
return _G.old_call(service_name, ...)
end
end)
local hung_bucket_id_1 = vtest.storage_first_bucket(g.replica_1_a)
start_partial_bucket_move(g.replica_1_a, g.replica_2_a, hung_bucket_id_1)
local hung_bucket_id_2 = vtest.storage_first_bucket(g.replica_1_a)
start_partial_bucket_move(g.replica_1_a, g.replica_2_a, hung_bucket_id_2)
t.helpers.retrying({}, function()
g.replica_1_a:exec(function() ivshard.storage.recovery_wakeup() end)
t.assert(g.replica_1_a:grep_log('Error during recovery of bucket'))
end)
t.assert_not(g.replica_1_a:grep_log('Finish bucket recovery step, 0'))
g.replica_2_a:exec(function()
ivshard.storage.internal.errinj.ERRINJ_RECEIVE_PARTIALLY = false
ivshard.storage._call = _G.old_call
end)
t.helpers.retrying({timeout = 60}, function()
g.replica_2_a:exec(function()
ivshard.storage.garbage_collector_wakeup()
ivshard.storage.recovery_wakeup()
end)
g.replica_1_a:exec(function() ivshard.storage.recovery_wakeup() end)
-- In some rare cases the recovery service can recover buckets one
-- by one. As a result we get multiple "Finish bucket recovery" and
-- "Recovery buckets" logs with different bucket ids and buckets'
-- count. That is why we should grep general logs without buckets'
-- count and bucket ids to avoid flakiness.
t.assert(g.replica_1_a:grep_log('Finish bucket recovery step'))
t.assert(g.replica_1_a:grep_log('Recovered buckets'))
end)
wait_for_bucket_is_transferred(g.replica_2_a, g.replica_1_a,
hung_bucket_id_1)
wait_for_bucket_is_transferred(g.replica_2_a, g.replica_1_a,
hung_bucket_id_2)
end

--
-- Add logging of routes in rebalancer service (gh-212).
--
test_group.test_rebalancer_routes_logging = function(g)
local moved_bucket_from_2 = vtest.storage_first_bucket(g.replica_2_a)
move_bucket(g.replica_2_a, g.replica_1_a, moved_bucket_from_2)
local moved_bucket_from_3 = vtest.storage_first_bucket(g.replica_3_a)
move_bucket(g.replica_3_a, g.replica_1_a, moved_bucket_from_3)
vtest.cluster_rebalancer_enable(g)
t.helpers.retrying({timeout = 60}, function()
g.replica_1_a:exec(function() ivshard.storage.rebalancer_wakeup() end)
t.assert(g.replica_1_a:grep_log('Apply rebalancer routes with 1 ' ..
'workers'))
end)
local rebalancer_routes_msg = string.format(
'The following rebalancer routes were sent: {\"%s\":',
g.replica_1_a:replicaset_uuid())
local rebalancer_transfer_msg = string.format(
'1 buckets were successfully sent to %s',
g.replica_2_a:replicaset_uuid())
t.helpers.retrying({}, function()
t.assert(g.replica_1_a:grep_log(rebalancer_routes_msg))
t.assert(g.replica_1_a:grep_log(rebalancer_transfer_msg))
g.replica_1_a:exec(function() ivshard.storage.rebalancer_wakeup() end)
g.replica_1_a:grep_log('The cluster is balanced ok.')
end)
vtest.cluster_rebalancer_disable(g)
end

--
-- Add replicaset.id into rebalancer_request_state errors (gh-212).
--
test_group.test_no_log_spam_when_buckets_no_active = function(g)
vtest.cluster_rebalancer_enable(g)
local moved_bucket = vtest.storage_first_bucket(g.replica_1_a)
move_bucket(g.replica_1_a, g.replica_2_a, moved_bucket)
vtest.storage_stop(g.replica_2_a)
local err_log = string.format('Error during downloading rebalancer ' ..
'states:.*"replicaset_id":"%s"',
g.replica_2_a:replicaset_uuid())
t.helpers.retrying({timeout = 60}, function()
g.replica_1_a:exec(function() ivshard.storage.rebalancer_wakeup() end)
t.assert(g.replica_1_a:grep_log(err_log))
end)
vtest.storage_start(g.replica_2_a, global_cfg)
move_bucket(g.replica_2_a, g.replica_1_a, moved_bucket)
vtest.cluster_rebalancer_disable(g)
end
39 changes: 34 additions & 5 deletions test/unit/rebalancer.result
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ rlist = vshard.storage.internal.rlist
consts = vshard.consts
---
...
util = require('util')
---
...
--
-- Test adding two new replicasets.
--
Expand Down Expand Up @@ -290,9 +293,16 @@ build_routes(replicasets)
vshard.storage.internal.is_master = true
---
...
get_state = vshard.storage._rebalancer_request_state
-- We need to initialize the minimal replica object in order to
-- have a meaningful replica.id in rebalancer_request_state's error.
vshard.storage.internal.this_replica = {id = util.name_to_uuid.box_1_a}
---
...
get_state = function() \
local res, err = vshard.storage._rebalancer_request_state() \
if res == nil then err.trace = nil end \
return res, err \
end \
_bucket = box.schema.create_space('_bucket')
---
...
Expand All @@ -314,21 +324,40 @@ _bucket:replace{3, consts.BUCKET.SENT}
---
- [3, 'sent']
...
get_state()
res, err = get_state()
---
- bucket_active_count: 2
bucket_pinned_count: 0
...
assert(res.bucket_active_count == 2)
---
- true
...
assert(res.bucket_pinned_count == 0)
---
- true
...
assert(not err)
---
- true
...
_bucket:replace{1, consts.BUCKET.RECEIVING}
---
- [1, 'receiving']
...
get_state()
res, err = get_state()
---
...
assert(not res)
---
- true
...
assert(err.message == string.format('Replica %s has receiving buckets ' .. \
'during rebalancing', util.name_to_uuid.box_1_a)) \
vshard.storage.internal.is_master = false
---
...
vshard.storage.internal.this_replica = nil
---
...
assert(not vshard.storage.internal.this_replicaset)
---
- true
Expand Down
Loading
Loading