diff --git a/test/rebalancer/rebalancer.result b/test/rebalancer/rebalancer.result index 8a6ebf9f..6309ac33 100644 --- a/test/rebalancer/rebalancer.result +++ b/test/rebalancer/rebalancer.result @@ -51,6 +51,15 @@ test_run:switch('box_1_a') --- - true ... +-- The ratelimiter object stores its state throughout the entire work of +-- rebalancer service. As a result, after disabling of rebalancer, ratelimiter +-- will continue to hide errors with errcodes which occurred before disabling +-- of rebalancer. Because of this we can't test the 6th scenario ("Replica _ +-- has receiving buckets" will hided by "Rebalancer is not active ..." error). +-- To fix it we should turn off the ratelimiter. +vshard.consts.LOG_RATELIMIT_INTERVAL = 0 +--- +... _bucket = box.space._bucket --- ... @@ -149,7 +158,7 @@ test_run:switch('box_1_a') vshard.storage.rebalancer_enable() --- ... -wait_rebalancer_state("Rebalance routes are sent", test_run) +wait_rebalancer_state("The following rebalancer routes were sent", test_run) --- ... wait_rebalancer_state('The cluster is balanced ok', test_run) @@ -239,7 +248,7 @@ cfg.rebalancer_disbalance_threshold = 0.01 vshard.storage.cfg(cfg, util.name_to_uuid.box_1_a) --- ... -wait_rebalancer_state('Rebalance routes are sent', test_run) +wait_rebalancer_state('The following rebalancer routes were sent', test_run) --- ... wait_rebalancer_state('The cluster is balanced ok', test_run) @@ -318,12 +327,16 @@ _bucket:update({150}, {{'=', 2, vshard.consts.BUCKET.RECEIVING}}) --- - [150, 'receiving'] ... -wait_rebalancer_state("Some buckets are not active", test_run) +-- We should not check the certain status of buckets (e.g. receiving) because +-- in rare cases we accidentally can get the wrong one. For example we wait for +-- "receiving" status, but get "garbage" due to some previous rebalancer error. +err_msg = string.format('Replica .* has receiving buckets during rebalancing') --- ... +wait_rebalancer_state('Error during downloading rebalancer states:.*' .. \ + err_msg, test_run) \ _bucket:update({150}, {{'=', 2, vshard.consts.BUCKET.ACTIVE}}) --- -- [150, 'active'] ... vshard.storage.sync() --- diff --git a/test/rebalancer/rebalancer.test.lua b/test/rebalancer/rebalancer.test.lua index ec7ebcf2..0fc893e8 100644 --- a/test/rebalancer/rebalancer.test.lua +++ b/test/rebalancer/rebalancer.test.lua @@ -32,6 +32,13 @@ util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage(\'%s\' -- test_run:switch('box_1_a') +-- The ratelimiter object stores its state throughout the entire work of +-- rebalancer service. As a result, after disabling of rebalancer, ratelimiter +-- will continue to hide errors with errcodes which occurred before disabling +-- of rebalancer. Because of this we can't test the 6th scenario ("Replica _ +-- has receiving buckets" will hided by "Rebalancer is not active ..." error). +-- To fix it we should turn off the ratelimiter. +vshard.consts.LOG_RATELIMIT_INTERVAL = 0 _bucket = box.space._bucket vshard.storage.cfg(cfg, util.name_to_uuid.box_1_a) @@ -78,7 +85,7 @@ util.map_bucket_protection(test_run, {REPLICASET_1}, true) test_run:switch('box_1_a') vshard.storage.rebalancer_enable() -wait_rebalancer_state("Rebalance routes are sent", test_run) +wait_rebalancer_state("The following rebalancer routes were sent", test_run) wait_rebalancer_state('The cluster is balanced ok', test_run) _bucket.index.status:count({vshard.consts.BUCKET.ACTIVE}) @@ -118,7 +125,7 @@ _bucket.index.status:count({vshard.consts.BUCKET.ACTIVE}) -- Return 1%. cfg.rebalancer_disbalance_threshold = 0.01 vshard.storage.cfg(cfg, util.name_to_uuid.box_1_a) -wait_rebalancer_state('Rebalance routes are sent', test_run) +wait_rebalancer_state('The following rebalancer routes were sent', test_run) wait_rebalancer_state('The cluster is balanced ok', test_run) _bucket.index.status:count({vshard.consts.BUCKET.ACTIVE}) _bucket.index.status:min({vshard.consts.BUCKET.ACTIVE}) @@ -156,7 +163,12 @@ util.map_bucket_protection(test_run, {REPLICASET_1}, false) test_run:switch('box_1_a') vshard.storage.rebalancer_enable() _bucket:update({150}, {{'=', 2, vshard.consts.BUCKET.RECEIVING}}) -wait_rebalancer_state("Some buckets are not active", test_run) +-- We should not check the certain status of buckets (e.g. receiving) because +-- in rare cases we accidentally can get the wrong one. For example we wait for +-- "receiving" status, but get "garbage" due to some previous rebalancer error. +err_msg = string.format('Replica .* has receiving buckets during rebalancing') +wait_rebalancer_state('Error during downloading rebalancer states:.*' .. \ + err_msg, test_run) \ _bucket:update({150}, {{'=', 2, vshard.consts.BUCKET.ACTIVE}}) vshard.storage.sync() diff --git a/test/rebalancer/stress_add_remove_several_rs.result b/test/rebalancer/stress_add_remove_several_rs.result index 6a9b0ffb..194c99c4 100644 --- a/test/rebalancer/stress_add_remove_several_rs.result +++ b/test/rebalancer/stress_add_remove_several_rs.result @@ -175,7 +175,7 @@ add_replicaset() vshard.storage.cfg(cfg, util.name_to_uuid.box_1_a) --- ... -wait_rebalancer_state('Rebalance routes are sent', test_run) +wait_rebalancer_state('The following rebalancer routes were sent', test_run) --- ... -- Now, add a second replicaset. @@ -422,7 +422,7 @@ remove_second_replicaset_first_stage() vshard.storage.cfg(cfg, util.name_to_uuid.box_1_a) --- ... -wait_rebalancer_state('Rebalance routes are sent', test_run) +wait_rebalancer_state('The following rebalancer routes were sent', test_run) --- ... -- Rebalancing has been started - now remove second replicaset. diff --git a/test/rebalancer/stress_add_remove_several_rs.test.lua b/test/rebalancer/stress_add_remove_several_rs.test.lua index f62400f2..7c1ae3bc 100644 --- a/test/rebalancer/stress_add_remove_several_rs.test.lua +++ b/test/rebalancer/stress_add_remove_several_rs.test.lua @@ -71,7 +71,7 @@ fiber.sleep(0.5) test_run:switch('box_1_a') add_replicaset() vshard.storage.cfg(cfg, util.name_to_uuid.box_1_a) -wait_rebalancer_state('Rebalance routes are sent', test_run) +wait_rebalancer_state('The following rebalancer routes were sent', test_run) -- Now, add a second replicaset. @@ -153,7 +153,7 @@ fiber.sleep(0.5) test_run:switch('box_1_a') remove_second_replicaset_first_stage() vshard.storage.cfg(cfg, util.name_to_uuid.box_1_a) -wait_rebalancer_state('Rebalance routes are sent', test_run) +wait_rebalancer_state('The following rebalancer routes were sent', test_run) -- Rebalancing has been started - now remove second replicaset. remove_replicaset_first_stage() vshard.storage.cfg(cfg, util.name_to_uuid.box_1_a) diff --git a/test/storage-luatest/storage_1_1_1_test.lua b/test/storage-luatest/storage_1_1_1_test.lua index 694baa79..ff25070c 100644 --- a/test/storage-luatest/storage_1_1_1_test.lua +++ b/test/storage-luatest/storage_1_1_1_test.lua @@ -51,6 +51,18 @@ test_group.before_all(function(g) vtest.cluster_new(g, global_cfg) vtest.cluster_bootstrap(g, global_cfg) vtest.cluster_rebalancer_disable(g) + vtest.cluster_wait_vclock_all(g) + + vtest.cluster_exec_each_master(g, function() + box.schema.create_space('test_space') + box.space.test_space:format({ + {name = 'pk', type = 'unsigned'}, + {name = 'bucket_id', type = 'unsigned'}, + }) + box.space.test_space:create_index('primary', {parts = {'pk'}}) + box.space.test_space:create_index( + 'bucket_id', {parts = {'bucket_id'}, unique = false}) + end) end) test_group.after_all(function(g) @@ -101,3 +113,144 @@ test_group.test_manual_bucket_send_doubled_buckets = function(g) ilt.assert_equals(box.space._bucket:get(bid), nil) end, {bid}) end + +local function start_partial_bucket_move(src_storage, dest_storage, bucket_id) + src_storage:exec(function(bucket_id, replicaset_id) + local res, err = ivshard.storage.bucket_send(bucket_id, replicaset_id) + t.assert_not(res) + t.assert(err) + -- The bucket on src_storage must be in "sending" state. The + -- recovery service on src_storage should not erase this bucket. + t.assert_equals(box.space._bucket:get(bucket_id).status, 'sending') + end, {bucket_id, dest_storage:replicaset_uuid()}) + + dest_storage:exec(function(bucket_id) + t.helpers.retrying({timeout = 10}, function() + -- The recovery service on dest_storage should clear this bucket. + t.assert_equals(box.space._bucket:select(bucket_id), {}) + end) + end, {bucket_id}) +end + +local function wait_for_bucket_is_transferred(src_storage, dest_storage, + bucket_id) + src_storage:exec(function(bucket_id) + t.helpers.retrying({timeout = 10}, function() + t.assert_equals(box.space._bucket:select(bucket_id), {}) + end) + end, {bucket_id}) + dest_storage:exec(function(bucket_id) + t.helpers.retrying({timeout = 10}, function() + t.assert_equals(box.space._bucket:get(bucket_id).status, 'active') + end) + end, {bucket_id}) +end + +local function move_bucket(src_storage, dest_storage, bucket_id) + src_storage:exec(function(bucket_id, replicaset_id) + t.helpers.retrying({timeout = 60}, function() + local res, err = ivshard.storage.bucket_send(bucket_id, + replicaset_id) + t.assert_not(err) + t.assert(res) + end) + end, {bucket_id, dest_storage:replicaset_uuid()}) + wait_for_bucket_is_transferred(src_storage, dest_storage, bucket_id) +end + +-- +-- Reduce spam of "Finish bucket recovery step" logs and add logging of +-- recovered buckets in recovery service (gh-212). +-- +test_group.test_no_logs_while_unsuccess_recovery = function(g) + g.replica_2_a:exec(function() + ivshard.storage.internal.errinj.ERRINJ_RECEIVE_PARTIALLY = true + rawset(_G, 'old_call', ivshard.storage._call) + ivshard.storage._call = function(service_name, ...) + if service_name == 'recovery_bucket_stat' then + return error('TimedOut') + end + return _G.old_call(service_name, ...) + end + end) + local hung_bucket_id_1 = vtest.storage_first_bucket(g.replica_1_a) + start_partial_bucket_move(g.replica_1_a, g.replica_2_a, hung_bucket_id_1) + local hung_bucket_id_2 = vtest.storage_first_bucket(g.replica_1_a) + start_partial_bucket_move(g.replica_1_a, g.replica_2_a, hung_bucket_id_2) + t.helpers.retrying({}, function() + g.replica_1_a:exec(function() ivshard.storage.recovery_wakeup() end) + t.assert(g.replica_1_a:grep_log('Error during recovery of bucket')) + end) + t.assert_not(g.replica_1_a:grep_log('Finish bucket recovery step, 0')) + g.replica_2_a:exec(function() + ivshard.storage.internal.errinj.ERRINJ_RECEIVE_PARTIALLY = false + ivshard.storage._call = _G.old_call + end) + t.helpers.retrying({timeout = 60}, function() + g.replica_2_a:exec(function() + ivshard.storage.garbage_collector_wakeup() + ivshard.storage.recovery_wakeup() + end) + g.replica_1_a:exec(function() ivshard.storage.recovery_wakeup() end) + -- In some rare cases the recovery service can recover buckets one + -- by one. As a result we get multiple "Finish bucket recovery" and + -- "Recovery buckets" logs with different bucket ids and buckets' + -- count. That is why we should grep general logs without buckets' + -- count and bucket ids to avoid flakiness. + t.assert(g.replica_1_a:grep_log('Finish bucket recovery step')) + t.assert(g.replica_1_a:grep_log('Recovered buckets')) + end) + wait_for_bucket_is_transferred(g.replica_2_a, g.replica_1_a, + hung_bucket_id_1) + wait_for_bucket_is_transferred(g.replica_2_a, g.replica_1_a, + hung_bucket_id_2) +end + +-- +-- Add logging of routes in rebalancer service (gh-212). +-- +test_group.test_rebalancer_routes_logging = function(g) + local moved_bucket_from_2 = vtest.storage_first_bucket(g.replica_2_a) + move_bucket(g.replica_2_a, g.replica_1_a, moved_bucket_from_2) + local moved_bucket_from_3 = vtest.storage_first_bucket(g.replica_3_a) + move_bucket(g.replica_3_a, g.replica_1_a, moved_bucket_from_3) + vtest.cluster_rebalancer_enable(g) + t.helpers.retrying({timeout = 60}, function() + g.replica_1_a:exec(function() ivshard.storage.rebalancer_wakeup() end) + t.assert(g.replica_1_a:grep_log('Apply rebalancer routes with 1 ' .. + 'workers')) + end) + local rebalancer_routes_msg = string.format( + 'The following rebalancer routes were sent: {\"%s\":', + g.replica_1_a:replicaset_uuid()) + local rebalancer_transfer_msg = string.format( + '1 buckets were successfully sent to %s', + g.replica_2_a:replicaset_uuid()) + t.helpers.retrying({}, function() + t.assert(g.replica_1_a:grep_log(rebalancer_routes_msg)) + t.assert(g.replica_1_a:grep_log(rebalancer_transfer_msg)) + g.replica_1_a:exec(function() ivshard.storage.rebalancer_wakeup() end) + g.replica_1_a:grep_log('The cluster is balanced ok.') + end) + vtest.cluster_rebalancer_disable(g) +end + +-- +-- Add replicaset.id into rebalancer_request_state errors (gh-212). +-- +test_group.test_no_log_spam_when_buckets_no_active = function(g) + vtest.cluster_rebalancer_enable(g) + local moved_bucket = vtest.storage_first_bucket(g.replica_1_a) + move_bucket(g.replica_1_a, g.replica_2_a, moved_bucket) + vtest.storage_stop(g.replica_2_a) + local err_log = string.format('Error during downloading rebalancer ' .. + 'states:.*"replicaset_id":"%s"', + g.replica_2_a:replicaset_uuid()) + t.helpers.retrying({timeout = 60}, function() + g.replica_1_a:exec(function() ivshard.storage.rebalancer_wakeup() end) + t.assert(g.replica_1_a:grep_log(err_log)) + end) + vtest.storage_start(g.replica_2_a, global_cfg) + move_bucket(g.replica_2_a, g.replica_1_a, moved_bucket) + vtest.cluster_rebalancer_disable(g) +end diff --git a/test/unit/rebalancer.result b/test/unit/rebalancer.result index d312a411..80b8b648 100644 --- a/test/unit/rebalancer.result +++ b/test/unit/rebalancer.result @@ -25,6 +25,9 @@ rlist = vshard.storage.internal.rlist consts = vshard.consts --- ... +util = require('util') +--- +... -- -- Test adding two new replicasets. -- @@ -290,9 +293,16 @@ build_routes(replicasets) vshard.storage.internal.is_master = true --- ... -get_state = vshard.storage._rebalancer_request_state +-- We need to initialize the minimal replica object in order to +-- have a meaningful replica.id in rebalancer_request_state's error. +vshard.storage.internal.this_replica = {id = util.name_to_uuid.box_1_a} --- ... +get_state = function() \ + local res, err = vshard.storage._rebalancer_request_state() \ + if res == nil then err.trace = nil end \ + return res, err \ +end \ _bucket = box.schema.create_space('_bucket') --- ... @@ -314,21 +324,40 @@ _bucket:replace{3, consts.BUCKET.SENT} --- - [3, 'sent'] ... -get_state() +res, err = get_state() --- -- bucket_active_count: 2 - bucket_pinned_count: 0 +... +assert(res.bucket_active_count == 2) +--- +- true +... +assert(res.bucket_pinned_count == 0) +--- +- true +... +assert(not err) +--- +- true ... _bucket:replace{1, consts.BUCKET.RECEIVING} --- - [1, 'receiving'] ... -get_state() +res, err = get_state() +--- +... +assert(not res) --- +- true ... +assert(err.message == string.format('Replica %s has receiving buckets ' .. \ + 'during rebalancing', util.name_to_uuid.box_1_a)) \ vshard.storage.internal.is_master = false --- ... +vshard.storage.internal.this_replica = nil +--- +... assert(not vshard.storage.internal.this_replicaset) --- - true diff --git a/test/unit/rebalancer.test.lua b/test/unit/rebalancer.test.lua index e6d54b81..4481236d 100644 --- a/test/unit/rebalancer.test.lua +++ b/test/unit/rebalancer.test.lua @@ -7,6 +7,7 @@ calc_etalon = require('vshard.replicaset').calculate_etalon_balance dispenser = vshard.storage.internal.route_dispenser rlist = vshard.storage.internal.rlist consts = vshard.consts +util = require('util') -- -- Test adding two new replicasets. @@ -76,18 +77,32 @@ build_routes(replicasets) -- Test rebalancer local state. -- vshard.storage.internal.is_master = true -get_state = vshard.storage._rebalancer_request_state +-- We need to initialize the minimal replica object in order to +-- have a meaningful replica.id in rebalancer_request_state's error. +vshard.storage.internal.this_replica = {id = util.name_to_uuid.box_1_a} +get_state = function() \ + local res, err = vshard.storage._rebalancer_request_state() \ + if res == nil then err.trace = nil end \ + return res, err \ +end \ _bucket = box.schema.create_space('_bucket') pk = _bucket:create_index('pk') status = _bucket:create_index('status', {parts = {{2, 'string'}}, unique = false}) _bucket:replace{1, consts.BUCKET.ACTIVE} _bucket:replace{2, consts.BUCKET.ACTIVE} _bucket:replace{3, consts.BUCKET.SENT} -get_state() +res, err = get_state() +assert(res.bucket_active_count == 2) +assert(res.bucket_pinned_count == 0) +assert(not err) _bucket:replace{1, consts.BUCKET.RECEIVING} -get_state() +res, err = get_state() +assert(not res) +assert(err.message == string.format('Replica %s has receiving buckets ' .. \ + 'during rebalancing', util.name_to_uuid.box_1_a)) \ vshard.storage.internal.is_master = false +vshard.storage.internal.this_replica = nil assert(not vshard.storage.internal.this_replicaset) vshard.storage.internal.this_replicaset = { \ diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua index 97642250..df4e96f4 100644 --- a/vshard/storage/init.lua +++ b/vshard/storage/init.lua @@ -5,6 +5,7 @@ local lmsgpack = require('msgpack') local netbox = require('net.box') -- for net.box:self() local trigger = require('internal.trigger') local ffi = require('ffi') +local json_encode = require('json').encode local yaml_encode = require('yaml').encode local fiber_clock = lfiber.clock local fiber_yield = lfiber.yield @@ -922,6 +923,12 @@ local function recovery_local_bucket_is_active(local_bucket, remote_bucket) return status == BSENT or status == BGARBAGE end +local function recovery_save_recovered(dict, id, status) + local ids = dict[status] or {} + table.insert(ids, id) + dict[status] = ids +end + -- -- Check status of each transferring bucket. Resolve status where -- possible. @@ -932,6 +939,7 @@ local function recovery_step_by_type(type, limiter) local recovered = 0 local total = 0 local start_format = 'Starting %s buckets recovery step' + local recovered_buckets = {} for _, bucket in _bucket.index.status:pairs(type) do lfiber.testcancel() total = total + 1 @@ -992,12 +1000,15 @@ local function recovery_step_by_type(type, limiter) if recovery_local_bucket_is_sent(bucket, remote_bucket) then _bucket:update({bucket_id}, {{'=', 2, BSENT}}) recovered = recovered + 1 + recovery_save_recovered(recovered_buckets, bucket_id, BSENT) elseif recovery_local_bucket_is_garbage(bucket, remote_bucket) then _bucket:update({bucket_id}, {{'=', 2, BGARBAGE}}) recovered = recovered + 1 + recovery_save_recovered(recovered_buckets, bucket_id, BGARBAGE) elseif recovery_local_bucket_is_active(bucket, remote_bucket) then _bucket:replace({bucket_id, BACTIVE}) recovered = recovered + 1 + recovery_save_recovered(recovered_buckets, bucket_id, BACTIVE) elseif is_step_empty then log.info('Bucket %s is %s local and %s on replicaset %s, waiting', bucket_id, bucket.status, remote_bucket.status, peer_id) @@ -1005,9 +1016,10 @@ local function recovery_step_by_type(type, limiter) is_step_empty = false ::continue:: end - if not is_step_empty then + if recovered > 0 then log.info('Finish bucket recovery step, %d %s buckets are recovered '.. - 'among %d', recovered, type, total) + 'among %d. Recovered buckets: %s', recovered, type, total, + json_encode(recovered_buckets)) end return total, recovered end @@ -2800,6 +2812,7 @@ local function rebalancer_download_states() replicaset, 'vshard.storage.rebalancer_request_state', {}, {timeout = consts.REBALANCER_GET_STATE_TIMEOUT}) if state == nil then + err.replicaset_id = replicaset.id return nil, err end local bucket_count = state.bucket_active_count + @@ -2846,11 +2859,9 @@ local function rebalancer_service_f(service, limiter) end if not status or replicasets == nil then local err = status and total_bucket_active_count or replicasets - if err then - limiter:log_error(err, service:set_status_error( - 'Error during downloading rebalancer states: %s', err)) - end - log.info('Some buckets are not active, retry rebalancing later') + limiter:log_error(err, service:set_status_error( + 'Error during downloading rebalancer states: %s, ' .. + 'retry rebalancing later', err)) service:set_activity('idling') lfiber.testcancel() lfiber.sleep(consts.REBALANCER_WORK_INTERVAL) @@ -2903,8 +2914,9 @@ local function rebalancer_service_f(service, limiter) goto continue end end - log.info('Rebalance routes are sent. Schedule next wakeup after '.. - '%f seconds', consts.REBALANCER_WORK_INTERVAL) + log.info('The following rebalancer routes were sent: %s. ' .. + 'Schedule next wakeup after %f seconds', json_encode(routes), + consts.REBALANCER_WORK_INTERVAL) service:set_activity('idling') lfiber.testcancel() lfiber.sleep(consts.REBALANCER_WORK_INTERVAL) @@ -2938,18 +2950,17 @@ local function rebalancer_request_state() return nil, err end if not M.is_rebalancer_active or rebalancing_is_in_progress() then - return + return nil, lerror.make('Rebalancer is not active or is in progress') end local _bucket = box.space._bucket local status_index = _bucket.index.status - if #status_index:select({BSENDING}, {limit = 1}) > 0 then - return - end - if #status_index:select({BRECEIVING}, {limit = 1}) > 0 then - return - end - if #status_index:select({BGARBAGE}, {limit = 1}) > 0 then - return + local repl_id = M.this_replica.id + for _, status in pairs({BSENDING, BRECEIVING, BGARBAGE}) do + if #status_index:select({status}, {limit = 1}) > 0 then + local err = string.format('Replica %s has %s buckets during ' .. + 'rebalancing', repl_id, status) + return nil, lerror.make(err) + end end return { bucket_active_count = status_index:count({BACTIVE}),