diff --git a/roles/sharded-queue-router.lua b/roles/sharded-queue-router.lua new file mode 100644 index 0000000..8543eb5 --- /dev/null +++ b/roles/sharded-queue-router.lua @@ -0,0 +1,67 @@ +local config = require('sharded_queue.router.config') +local is_metrics_supported = require('sharded_queue.metrics').is_supported +local metrics = require('sharded_queue.router.metrics') +local roles = require('sharded_queue.roles') +local utils = require('sharded_queue.utils') +local queue = require('sharded_queue.router.queue') + +local role_name = "roles.sharded-queue-router" + +local function validate(conf) + conf = conf or {} + if not roles.is_sharding_role_enabled('router') then + error(role_name .. ": instance must be a sharding router to use the role") + end + + local ok, err = utils.validate_tubes(conf.tubes or {}, false) + if not ok then + error(role_name .. ": " .. err) + end + ok, err = utils.validate_cfg(conf.cfg or {}) + if not ok then + error(role_name .. ": " .. err) + end + return true +end + +local function apply(conf) + conf = conf or {} + + queue.export_globals() + + local conf_tubes = conf.tubes or {} + local conf_cfg = conf.cfg or {} + if conf_cfg.metrics ~= nil then + config.metrics = conf_cfg.metrics and true or false + else + config.metrics = is_metrics_supported() + end + + for tube_name, options in pairs(conf_tubes) do + if queue.map()[tube_name] == nil then + queue.add(tube_name, metrics, options) + end + end + + for tube_name, _ in pairs(queue.map()) do + if conf_tubes[tube_name] == nil then + queue.remove(tube_name) + end + end + + if config.metrics then + metrics.enable(queue) + else + metrics.disable() + end +end + +local function stop() + queue.clear_globals() +end + +return { + validate = validate, + apply = apply, + stop = stop, +} diff --git a/roles/sharded-queue-storage.lua b/roles/sharded-queue-storage.lua new file mode 100644 index 0000000..43c01b4 --- /dev/null +++ b/roles/sharded-queue-storage.lua @@ -0,0 +1,79 @@ +local config = require('sharded_queue.storage.config') +local is_metrics_supported = require('sharded_queue.metrics').is_supported +local methods = require('sharded_queue.storage.methods') +local metrics = require('sharded_queue.storage.metrics') +local roles = require('sharded_queue.roles') +local stats_storage = require('sharded_queue.stats.storage') +local tubes = require('sharded_queue.storage.tubes').new() +local utils = require('sharded_queue.utils') + +local role_name = "roles.sharded-queue-storage" +local watcher = nil + +local function validate(conf) + conf = conf or {} + + if not roles.is_sharding_role_enabled('storage') then + error(role_name .. ": instance must be a sharding storage to use the role") + end + + local ok, err = utils.validate_tubes(conf.tubes or {}, true) + if not ok then + error(role_name .. ": " .. err) + end + ok, err = utils.validate_cfg(conf.cfg or {}) + if not ok then + error(role_name .. ": " .. err) + end + return true +end + +local function apply(conf) + conf = conf or {} + + local conf_tubes = conf.tubes or {} + local conf_cfg = conf.cfg or {} + if conf_cfg.metrics ~= nil then + config.metrics = conf_cfg.metrics and true or false + else + config.metrics = is_metrics_supported() + end + + if watcher ~= nil then + watcher:unregister() + end + watcher = box.watch('box.status', function(_, status) + if status.is_ro == false then + stats_storage.init() + + local new = tubes:update(conf_tubes) + for _, tube in ipairs(new) do + stats_storage.reset(tube) + end + + if config.metrics then + metrics.enable(tubes:map()) + end + methods.init(metrics, tubes) + end + end) + + if config.metrics then + metrics.enable(tubes:map()) + else + metrics.disable() + end +end + +local function stop() + if watcher ~= nil then + watcher:unregister() + watcher = nil + end +end + +return { + validate = validate, + apply = apply, + stop = stop, +} diff --git a/sharded-queue-scm-1.rockspec b/sharded-queue-scm-1.rockspec index 7b8dcfb..b2796c8 100755 --- a/sharded-queue-scm-1.rockspec +++ b/sharded-queue-scm-1.rockspec @@ -20,6 +20,8 @@ build = { build_target = 'all', install = { lua = { + ["roles.sharded-queue-router"] = "roles/sharded-queue-router.lua", + ["roles.sharded-queue-storage"] = "roles/sharded-queue-storage.lua", ['sharded_queue.api'] = 'sharded_queue/api.lua', ['sharded_queue.storage'] = 'sharded_queue/storage.lua', ['sharded_queue.drivers.fifo'] = 'sharded_queue/drivers/fifo.lua', @@ -27,6 +29,7 @@ build = { ['sharded_queue.time'] = 'sharded_queue/time.lua', ['sharded_queue.utils'] = 'sharded_queue/utils.lua', ['sharded_queue.metrics'] = 'sharded_queue/metrics.lua', + ['sharded_queue.roles'] = 'sharded_queue/roles.lua', ['sharded_queue.stash'] = 'sharded_queue/stash.lua', ['sharded_queue.state'] = 'sharded_queue/state.lua', ['sharded_queue.stats.storage'] = 'sharded_queue/stats/storage.lua', @@ -39,6 +42,7 @@ build = { ['sharded_queue.storage.methods'] = 'sharded_queue/storage/methods.lua', ['sharded_queue.storage.metrics'] = 'sharded_queue/storage/metrics.lua', ['sharded_queue.storage.tubes'] = 'sharded_queue/storage/tubes.lua', + ['sharded_queue.storage.vshard_utils'] = 'sharded_queue/storage/vshard_utils.lua', ['sharded_queue.version'] = 'sharded_queue/version.lua', }, }, diff --git a/sharded_queue/drivers/fifo.lua b/sharded_queue/drivers/fifo.lua index edb2ff7..16bf653 100644 --- a/sharded_queue/drivers/fifo.lua +++ b/sharded_queue/drivers/fifo.lua @@ -2,6 +2,7 @@ local state = require('sharded_queue.state') local utils = require('sharded_queue.utils') local log = require('log') -- luacheck: ignore local stats = require('sharded_queue.stats.storage') +local vshard_utils = require('sharded_queue.storage.vshard_utils') local function update_stat(tube_name, name) stats.update(tube_name, name, '+', 1) @@ -10,6 +11,7 @@ end local method = {} local function tube_create(args) + local user = vshard_utils.get_this_replica_user() or 'guest' local space_options = {} local if_not_exists = args.options.if_not_exists or true space_options.if_not_exists = if_not_exists @@ -47,6 +49,9 @@ local function tube_create(args) if_not_exists = if_not_exists }) + box.schema.user.grant(user, 'read,write', 'space', args.name, + {if_not_exists = true}) + return space end diff --git a/sharded_queue/drivers/fifottl.lua b/sharded_queue/drivers/fifottl.lua index e6afca2..c7be755 100644 --- a/sharded_queue/drivers/fifottl.lua +++ b/sharded_queue/drivers/fifottl.lua @@ -3,6 +3,7 @@ local state = require('sharded_queue.state') local utils = require('sharded_queue.utils') local stats = require('sharded_queue.stats.storage') local time = require('sharded_queue.time') +local vshard_utils = require('sharded_queue.storage.vshard_utils') local log = require('log') -- luacheck: ignore local index = { @@ -136,6 +137,7 @@ end -- QUEUE METHODs -- local function tube_create(args) + local user = vshard_utils.get_this_replica_user() or 'guest' local space_options = {} local if_not_exists = args.options.if_not_exists or true space_options.if_not_exists = if_not_exists @@ -201,6 +203,9 @@ local function tube_create(args) if_not_exists = if_not_exists }) + box.schema.user.grant(user, 'read,write', 'space', args.name, + {if_not_exists = true}) + -- run fiber for tracking event fiber.create(fiber_common, args.name) end diff --git a/sharded_queue/roles.lua b/sharded_queue/roles.lua new file mode 100644 index 0000000..72fcc1c --- /dev/null +++ b/sharded_queue/roles.lua @@ -0,0 +1,15 @@ +local config = require('config') + +local function is_sharding_role_enabled(expected) + local sharding_roles = config:get('sharding.roles') + for _, role in ipairs(sharding_roles or {}) do + if role == expected then + return true + end + end + return false +end + +return { + is_sharding_role_enabled = is_sharding_role_enabled, +} diff --git a/sharded_queue/router/queue.lua b/sharded_queue/router/queue.lua index c92d86f..accccb7 100644 --- a/sharded_queue/router/queue.lua +++ b/sharded_queue/router/queue.lua @@ -46,35 +46,7 @@ end -- The Tarantool 3.0 does not support to update dinamically a configuration, so -- a user must update the configuration by itself. if is_cartridge_package then - local function validate_options(options) - if not options then return true end - - if options.wait_factor then - if type(options.wait_factor) ~= 'number' - or options.wait_factor < 1 - then - return false, "wait_factor must be number greater than or equal to 1" - end - end - - local _, err = utils.normalize.log_request(options.log_request) - if err then - return false, err - end - - if options.wait_max ~= nil then - local err - options.wait_max, err = utils.normalize.wait_max(options.wait_max) - if err ~= nil then - return false, err - end - end - - return true - end - queue_global.create_tube = function(tube_name, options) - require('log').info("CREATE TUBE") local tubes = cartridge.config_get_deepcopy('tubes') or {} if tube_name == 'cfg' then @@ -85,7 +57,7 @@ if is_cartridge_package then return nil end - local ok , err = validate_options(options) + local ok, err = utils.validate_options(options) if not ok then error(err) end options = table.deepcopy(options or {}) @@ -107,6 +79,10 @@ local function export_globals() rawset(_G, 'queue', queue_global) end +local function clear_globals() + rawset(_G, 'queue', nil) +end + local function add(name, metrics, options) queue_global.tube[name] = tube.new(name, metrics, options) end diff --git a/sharded_queue/stats/storage.lua b/sharded_queue/stats/storage.lua index d2184e8..320e972 100644 --- a/sharded_queue/stats/storage.lua +++ b/sharded_queue/stats/storage.lua @@ -1,6 +1,7 @@ ---- Module used to store storage-specific statistics. local state = require('sharded_queue.state') +local vshard_utils = require('sharded_queue.storage.vshard_utils') local statistics = {} @@ -17,6 +18,7 @@ local actions = { } function statistics.init() + local user = vshard_utils.get_this_replica_user() or 'guest' local space_stat = box.schema.space.create('_queue_statistics', { if_not_exists = true }) space_stat:format({ @@ -39,6 +41,9 @@ function statistics.init() }, if_not_exists = true }) + + box.schema.user.grant(user, 'read,write', 'space', '_queue_statistics', + {if_not_exists = true}) end function statistics.update(tube_name, stat_name, operation, value) diff --git a/sharded_queue/storage/methods.lua b/sharded_queue/storage/methods.lua index 63e8283..6791888 100644 --- a/sharded_queue/storage/methods.lua +++ b/sharded_queue/storage/methods.lua @@ -1,4 +1,6 @@ local fiber = require('fiber') +local vshard_utils = require('sharded_queue.storage.vshard_utils') + local stats_storage = require('sharded_queue.stats.storage') local methods = { @@ -15,10 +17,15 @@ local methods = { } local function init(metrics, tubes) + local user = vshard_utils.get_this_replica_user() or 'guest' + for _, method in pairs(methods) do local func = function(args) args = args or {} args.options = tubes:get_options(args.tube_name) or {} + if args.options.priority == nil and args.options.pri ~= nil then + args.options.priority = args.options.pri + end local tube_name = args.tube_name local before = fiber.clock() @@ -37,6 +44,8 @@ local function init(metrics, tubes) local global_name = 'tube_' .. method rawset(_G, global_name, func) box.schema.func.create(global_name, { if_not_exists = true }) + box.schema.user.grant(user, 'execute', 'function', global_name, + {if_not_exists = true}) end local tube_statistic_func = function(args) @@ -55,6 +64,8 @@ local function init(metrics, tubes) rawset(_G, 'tube_statistic', tube_statistic_func) box.schema.func.create('tube_statistic', { if_not_exists = true }) + box.schema.user.grant(user, 'execute', 'function', 'tube_statistic', + {if_not_exists = true}) end local function get_list() diff --git a/sharded_queue/storage/vshard_utils.lua b/sharded_queue/storage/vshard_utils.lua new file mode 100644 index 0000000..98f359d --- /dev/null +++ b/sharded_queue/storage/vshard_utils.lua @@ -0,0 +1,93 @@ +-- The source is copied from: +-- https://github.com/tarantool/crud/blob/99315a53ef75056ac057b63f5ce1bbadef6a1418/crud/common/vshard_utils.lua + +local luri = require('uri') + +local vshard = require('vshard') + +local vshard_utils = {} + +function vshard_utils.get_self_vshard_replicaset() + local box_info = box.info() + + local ok, storage_info = pcall(vshard.storage.info) + assert(ok, 'vshard.storage.cfg() must be called first') + + if vshard_utils.get_vshard_identification_mode() == 'name_as_key' then + local replicaset_name = box_info.replicaset.name + + return replicaset_name, storage_info.replicasets[replicaset_name] + else + local replicaset_uuid + if box_info.replicaset ~= nil then + replicaset_uuid = box_info.replicaset.uuid + else + replicaset_uuid = box_info.cluster.uuid + end + + return replicaset_uuid, storage_info.replicasets[replicaset_uuid] + end +end + +function vshard_utils.get_self_vshard_replica_id() + local box_info = box.info() + + if vshard_utils.get_vshard_identification_mode() == 'name_as_key' then + return box_info.name + else + return box_info.uuid + end +end + +function vshard_utils.get_replicaset_id(vshard_router, replicaset) + -- https://github.com/tarantool/vshard/issues/460. + local known_replicasets = vshard_router:routeall() + + for known_replicaset_id, known_replicaset in pairs(known_replicasets) do + if known_replicaset == replicaset then + return known_replicaset_id + end + end + + return nil +end + +function vshard_utils.get_vshard_identification_mode() + -- https://github.com/tarantool/vshard/issues/460. + assert(vshard.storage.internal.current_cfg ~= nil, 'available only on vshard storage') + return vshard.storage.internal.current_cfg.identification_mode +end + +function vshard_utils.get_this_replica_user() + local replicaset_key, replicaset = vshard_utils.get_self_vshard_replicaset() + + if replicaset == nil or replicaset.master == nil then + error(string.format( + 'Failed to find a vshard configuration ' .. + 'for storage replicaset with key %q.', + replicaset_key)) + end + + local uri + if replicaset.master == 'auto' then + -- https://github.com/tarantool/vshard/issues/467. + uri = vshard.storage.internal.this_replica.uri + else + uri = replicaset.master.uri + end + + return luri.parse(uri).login +end + +function vshard_utils.get_replicaset_master(replicaset, opts) + opts = opts or {} + local cached = opts.cached or false + + if (not cached) and replicaset.locate_master ~= nil then + replicaset:locate_master() + end + + return replicaset.master +end + +return vshard_utils diff --git a/sharded_queue/utils.lua b/sharded_queue/utils.lua index e1999e3..8437bca 100644 --- a/sharded_queue/utils.lua +++ b/sharded_queue/utils.lua @@ -74,6 +74,33 @@ function utils.normalize.wait_max(wait_max) return wait_max end +function utils.validate_options(options) + if not options then return true end + + if options.wait_factor then + if type(options.wait_factor) ~= 'number' + or options.wait_factor < 1 + then + return false, "wait_factor must be number greater than or equal to 1" + end + end + + local _, err = utils.normalize.log_request(options.log_request) + if err then + return false, err + end + + if options.wait_max ~= nil then + local err + options.wait_max, err = utils.normalize.wait_max(options.wait_max) + if err ~= nil then + return false, err + end + end + + return true +end + function utils.validate_tubes(tubes, on_storage) for tube_name, tube_opts in pairs(tubes) do if tube_opts.driver ~= nil then @@ -89,6 +116,10 @@ function utils.validate_tubes(tubes, on_storage) end end end + local ok, err = utils.validate_options(tube_opts) + if not ok then + return nil, err + end end return true diff --git a/test/api_test.lua b/test/api_test.lua index fdf7794..1740266 100644 --- a/test/api_test.lua +++ b/test/api_test.lua @@ -4,19 +4,20 @@ local t = require('luatest') local g = t.group('api') local api = require('sharded_queue.api') -local config = require('test.helper.config') +local helper = require('test.helper') local utils = require('test.helper.utils') local tube = require('sharded_queue.router.tube') local is_metrics_supported = utils.is_metrics_supported() g.before_all(function() - g.queue_conn = config.cluster:server('queue-router').net_box - g.queue_conn_ro = config.cluster:server('queue-router-1').net_box - g.cfg = g.queue_conn:eval("return require('sharded_queue.api').cfg") + t.skip_if(utils.is_tarantool_3(), 'the role is available only for Cartridge') + g.queue_conn = helper.get_evaler('queue-router') + g.queue_conn_ro = helper.get_evaler('queue-router-1') + g.cfg = helper.get_cfg() end) g.after_each(function() - g.queue_conn:eval("require('sharded_queue.api').cfg(...)", {g.cfg}) + helper.set_cfg(g.cfg) end) g.test_exported_api = function() @@ -115,8 +116,8 @@ g.test_role_statistics_read_only_router = function() end g.test_api_version = function() - local api_conn = config.cluster:server('queue-router').net_box - local storage_conn = config.cluster:server('queue-storage-1-0').net_box + local api_conn = helper.get_evaler('queue-router') + local storage_conn = helper.get_evaler('queue-storage-1-0') local api_version = api_conn:eval( "return require('sharded_queue.api')._VERSION" diff --git a/test/create_test.lua b/test/create_test.lua index fc8bcb2..6826a7f 100644 --- a/test/create_test.lua +++ b/test/create_test.lua @@ -1,11 +1,10 @@ local t = require('luatest') local g = t.group('create_test') -local config = require('test.helper.config') +local helper = require('test.helper') g.before_all(function() - g.api = config.cluster:server('queue-router').net_box - g.storage = config.cluster:server('queue-storage-1-0').net_box + g.storage = helper.get_evaler('queue-storage-1-0') end) for test_name, options in pairs({ @@ -19,9 +18,7 @@ for test_name, options in pairs({ }) do g['test_create_tube_defauls_' .. test_name] = function() local tube_name = 'creates_tube_defaults_' .. test_name .. '_test' - g.api:call('queue.create_tube', { - tube_name, options - }) + helper.create_tube(tube_name, options) local space = g.storage:eval(string.format([[ local space = box.space.%s @@ -61,9 +58,7 @@ for test_name, options in pairs({ }) do g['test_create_tube_opts' .. test_name] = function() local tube_name = 'create_tube_opts_' .. test_name .. '_test' - g.api:call('queue.create_tube', { - tube_name, options - }) + helper.create_tube(tube_name, options) local space = g.storage:eval(string.format([[ local space = box.space.%s diff --git a/test/drop_test.lua b/test/drop_test.lua index baeb65d..2256f0d 100644 --- a/test/drop_test.lua +++ b/test/drop_test.lua @@ -1,20 +1,18 @@ local t = require('luatest') local g = t.group('drop_test') -local config = require('test.helper.config') +local helper = require('test.helper') local utils = require('test.helper.utils') g.before_all(function() - g.queue_conn = config.cluster:server('queue-router').net_box + g.queue_conn = helper.get_evaler('queue-router') end) function g.test_drop_empty() local tube_name = 'drop_empty_test' - g.queue_conn:call('queue.create_tube', { - tube_name - }) - g.queue_conn:call(utils.shape_cmd(tube_name, 'drop')) + helper.create_tube(tube_name) + helper.drop_tube(tube_name) local cur_stat = g.queue_conn:call('queue.statistics', { tube_name }) t.assert_equals(cur_stat, nil) @@ -23,20 +21,15 @@ end function g.test_drop_and_recreate() local tube_name = 'drop_and_recreate_test' - g.queue_conn:call('queue.create_tube', { - tube_name - }) + helper.create_tube(tube_name) g.queue_conn:call(utils.shape_cmd(tube_name, 'put'), { '*' } ) - g.queue_conn:call(utils.shape_cmd(tube_name, 'drop')) + helper.drop_tube(tube_name) - -- recreate tube with same name - t.assert(g.queue_conn:call('queue.create_tube', { - tube_name - })) + -- Recreate tube with same name. + helper.create_tube(tube_name) local task = g.queue_conn:call(utils.shape_cmd(tube_name, 'put'), { '*' } ) t.assert_equals(task[utils.index.data], '*') t.assert_equals(task[utils.index.status], utils.state.READY) - -end \ No newline at end of file +end diff --git a/test/entrypoint/config.yml b/test/entrypoint/config.yml new file mode 100644 index 0000000..0fa59d4 --- /dev/null +++ b/test/entrypoint/config.yml @@ -0,0 +1,71 @@ +credentials: + users: + guest: + roles: [super] + storage: + roles: [sharding] + password: "storage" + +iproto: + listen: + - uri: 'unix/:./{{ instance_name }}.iproto' + advertise: + sharding: + login: 'storage' + password: 'storage' + +sharding: + bucket_count: 1234 + sched_ref_quota: 258 + +groups: + group-001: + replicasets: + replicaset-001: + sharding: + roles: [router] + roles: + - roles.sharded-queue-router + database: + replicaset_uuid: 'aaaaaaaa-0000-4000-b000-000000000000' + instances: + queue-router: + database: + instance_uuid: 'aaaaaaaa-aaaa-4000-b000-000000000001' + mode: rw + queue-router-1: + database: + instance_uuid: 'aaaaaaaa-aaaa-4000-b000-000000000002' + mode: rw + replicaset-002: + sharding: + roles: [storage] + roles: + - roles.sharded-queue-storage + database: + replicaset_uuid: 'bbbbbbbb-bbbb-4000-b000-000000000001' + instances: + queue-storage-1-0: + database: + instance_uuid: 'bbbbbbbb-bbbb-4000-b000-000000000001' + mode: rw + queue-storage-1-1: + database: + instance_uuid: 'bbbbbbbb-bbbb-4000-b000-000000000002' + mode: ro + replicaset-003: + sharding: + roles: [storage] + roles: + - roles.sharded-queue-storage + database: + replicaset_uuid: 'cccccccc-0000-4000-b000-000000000000' + instances: + queue-storage-2-0: + database: + instance_uuid: 'cccccccc-cccc-4000-b000-000000000001' + mode: rw + queue-storage-2-1: + database: + instance_uuid: 'cccccccc-cccc-4000-b000-000000000002' + mode: ro diff --git a/test/helper/config.lua b/test/helper/config_cartridge.lua similarity index 77% rename from test/helper/config.lua rename to test/helper/config_cartridge.lua index fec4c82..9bc88c5 100644 --- a/test/helper/config.lua +++ b/test/helper/config_cartridge.lua @@ -1,6 +1,8 @@ local fio = require('fio') local t = require('luatest') local cartridge_helpers = require('cartridge.test-helpers') +local utils = require('test.helper.utils') + require('json').cfg { encode_use_tostring = true } local config = {} @@ -82,10 +84,41 @@ config.cluster = cartridge_helpers.Cluster:new({ } }) -t.before_suite(function () +function config.get_cfg() + return config.eval('queue-router', "return require('sharded_queue.api').cfg") +end + +function config.set_cfg(cfg) + config.eval('queue-router', "require('sharded_queue.api').cfg(...)", + {cfg}) +end + +function config.create_tube(tube_name, options) + config.eval('queue-router', "queue.create_tube(...)", {tube_name, options}) +end + +function config.drop_tube(tube_name) + pcall(function() + config.eval('queue-router', utils.shape_cmd(tube_name, 'drop') .. "()") + end) +end + +function config.eval(server, ...) + return config.cluster:server(server).net_box:eval(...) +end + +function config.get_evaler(server) + return config.cluster:server(server).net_box +end + +t.before_suite(function() fio.rmtree(config.datadir) fio.mktree(config.datadir) config.cluster:start() + config.servers = {} + for _, srv in pairs(config.cluster.servers) do + config.servers[srv.alias] = srv + end fio.mktree(config.unitdir) box.cfg{ @@ -94,6 +127,8 @@ t.before_suite(function () } end) -t.after_suite(function () config.cluster:stop() end) +t.after_suite(function() + config.cluster:stop() +end) return config diff --git a/test/helper/config_tarantool.lua b/test/helper/config_tarantool.lua new file mode 100644 index 0000000..a5df9c4 --- /dev/null +++ b/test/helper/config_tarantool.lua @@ -0,0 +1,188 @@ +local fio = require('fio') +local t = require('luatest') +local fun = require('fun') +local yaml = require('yaml') + +local server = require('test.helper.server') + +local config = {} + +local roles = { + 'roles.sharded-queue-router', + 'roles.sharded-queue-storage', +} +config.root = fio.dirname(fio.abspath(package.search('init'))) +config.datadir = fio.pathjoin(config.root, 'dev') +config.unitdir = fio.pathjoin(config.datadir, 'unit') +config.configpath = fio.pathjoin(config.root, 'test', 'entrypoint', 'config.yml') +config.devconfigpath = fio.pathjoin(config.root, 'dev', 'config.yml') + +function config.start_example_replicaset() + local opts = { + config_file = config.devconfigpath, + chdir = config.datadir, + } + local servers = { + "queue-router", + "queue-router-1", + "queue-storage-1-0", + "queue-storage-1-1", + "queue-storage-2-0", + "queue-storage-2-1", + } + config.servers = {} + for _, server_name in ipairs(servers) do + local server_opts = fun.chain(opts, {alias = server_name}):tomap() + config.servers[server_name] = server:new(server_opts) + end + + for _, srv in pairs(config.servers) do + srv:start({wait_until_ready = false}) + end + + for _, srv in pairs(config.servers) do + srv:wait_until_ready() + end + config.servers['queue-router']:eval("require('vshard').router.bootstrap()") + config.servers['queue-router-1']:eval("require('vshard').router.bootstrap()") +end + +function config.stop_example_replicaset() + for _, srv in pairs(config.servers) do + srv:drop() + end +end + +function config.reload() + for _, srv in pairs(config.servers) do + srv:eval("require('config'):reload()") + end + for _, srv in pairs(config.servers) do + srv:wait_until_ready() + end + config.servers['queue-router']:eval("require('vshard').router.bootstrap()") + config.servers['queue-router-1']:eval("require('vshard').router.bootstrap()") +end + +local function read_config(path) + local src = fio.open(path) + local data = src:read() + src:close() + return yaml.decode(data) +end + +local function write_config(path, decoded) + local dst = fio.open(path, {'O_CREAT', 'O_WRONLY', 'O_TRUNC'}) + dst:write(yaml.encode(decoded)) + dst:close() +end + +function config.get_cfg() + local decoded = read_config(config.devconfigpath) + + if decoded['roles_cfg'] ~= nil and decoded['roles_cfg'][roles[1]] then + return decoded['roles_cfg'][roles[1]]['cfg'] + end + return nil +end + +function config.set_cfg(cfg) + local decoded = read_config(config.devconfigpath) + + if decoded['roles_cfg'] == nil then + decoded['roles_cfg'] = {} + end + for _, role in ipairs(roles) do + if decoded['roles_cfg'][role] == nil then + decoded['roles_cfg'][role] = {} + end + decoded['roles_cfg'][role]['cfg'] = cfg + end + + write_config(config.devconfigpath, decoded) + + config.reload() +end + +function config.create_tube(tube_name, options) + options = options or {} + + local decoded = read_config(config.devconfigpath) + + if decoded['roles_cfg'] == nil then + decoded['roles_cfg'] = {} + end + for _, role in ipairs(roles) do + if decoded['roles_cfg'][role] == nil then + decoded['roles_cfg'][role] = {} + end + if decoded['roles_cfg'][role]['tubes'] == nil then + decoded['roles_cfg'][role]['tubes'] = {} + end + decoded['roles_cfg'][role]['tubes'][tube_name] = {} + for k, v in pairs(options or {}) do + decoded['roles_cfg'][role]['tubes'][tube_name][k] = v + end + end + + write_config(config.devconfigpath, decoded) + + config.reload() +end + +function config.drop_tube(tube_name) + local decoded = read_config(config.devconfigpath) + + if decoded['roles_cfg'] == nil then + return + end + for _, role in ipairs(roles) do + if decoded['roles_cfg'][role] == nil then + return + end + if decoded['roles_cfg'][role]['tubes'] == nil then + return + end + if decoded['roles_cfg'][role]['tubes'][tube_name] == nil then + return + end + decoded['roles_cfg'][role]['tubes'][tube_name] = nil + end + + write_config(config.devconfigpath, decoded) + + config.reload() +end + +function config.eval(server_name, ...) + return config.servers[server_name]:eval(...) +end + +function config.get_evaler(server_name) + return config.servers[server_name] +end + +t.before_suite(function() + fio.rmtree(config.datadir) + fio.mktree(config.datadir) + + fio.copyfile(config.configpath, config.devconfigpath) + fio.copytree(fio.pathjoin(config.root, 'roles'), + fio.pathjoin(config.datadir, 'roles')) + fio.copytree(fio.pathjoin(config.root, 'sharded_queue'), + fio.pathjoin(config.datadir, 'sharded_queue')) + + config.start_example_replicaset() + + fio.mktree(config.unitdir) + box.cfg{ + work_dir=config.unitdir, + wal_mode='none' + } +end) + +t.after_suite(function() + config.stop_example_replicaset() +end) + +return config diff --git a/test/helper/init.lua b/test/helper/init.lua new file mode 100644 index 0000000..8057cc0 --- /dev/null +++ b/test/helper/init.lua @@ -0,0 +1,7 @@ +local utils = require('test.helper.utils') + +if utils.is_tarantool_3() then + return require('test.helper.config_tarantool') +else + return require('test.helper.config_cartridge') +end diff --git a/test/helper/server.lua b/test/helper/server.lua new file mode 100644 index 0000000..7cc5f2c --- /dev/null +++ b/test/helper/server.lua @@ -0,0 +1,194 @@ +-- https://github.com/tarantool/tarantool/blob/5040fba9cf1da942371721e36e81c7372699600c/test/luatest_helpers/server.lua +local fun = require('fun') +local yaml = require('yaml') +local urilib = require('uri') +local fio = require('fio') +local luatest = require('luatest') + +-- Join paths in an intuitive way. +-- +-- If a component is nil, it is skipped. +-- +-- If a component is an absolute path, it skips all the previous +-- components. +-- +-- The wrapper is written for two components for simplicity. +local function pathjoin(a, b) + -- No first path -- skip it. + if a == nil then + return b + end + -- No second path -- skip it. + if b == nil then + return a + end + -- The absolute path is checked explicitly due to gh-8816. + if b:startswith('/') then + return b + end + return fio.pathjoin(a, b) +end + +-- Determine advertise URI for given instance from a cluster +-- configuration. +local function find_advertise_uri(config, instance_name, dir) + if config == nil or next(config) == nil then + return nil + end + + -- Determine listen and advertise options that are in effect + -- for the given instance. + local advertise + local listen + + for _, group in pairs(config.groups or {}) do + for _, replicaset in pairs(group.replicasets or {}) do + local instance = (replicaset.instances or {})[instance_name] + if instance == nil then + break + end + if instance.iproto ~= nil then + if instance.iproto.advertise ~= nil then + advertise = advertise or instance.iproto.advertise.client + end + listen = listen or instance.iproto.listen + end + if replicaset.iproto ~= nil then + if replicaset.iproto.advertise ~= nil then + advertise = advertise or replicaset.iproto.advertise.client + end + listen = listen or replicaset.iproto.listen + end + if group.iproto ~= nil then + if group.iproto.advertise ~= nil then + advertise = advertise or group.iproto.advertise.client + end + listen = listen or group.iproto.listen + end + end + end + + if config.iproto ~= nil then + if config.iproto.advertise ~= nil then + advertise = advertise or config.iproto.advertise.client + end + listen = listen or config.iproto.listen + end + + local uris + if advertise ~= nil then + uris = {{uri = advertise}} + else + uris = listen + end + + for _, uri in ipairs(uris or {}) do + uri = table.copy(uri) + uri.uri = uri.uri:gsub('{{ *instance_name *}}', instance_name) + uri.uri = uri.uri:gsub('unix/:%./', ('unix/:%s/'):format(dir)) + local u = urilib.parse(uri) + if u.ipv4 ~= '0.0.0.0' and u.ipv6 ~= '::' and u.service ~= '0' then + return uri + end + end + error('No suitable URI to connect is found') +end + +local Server = luatest.Server:inherit({}) + +-- Adds the following options: +-- +-- * config_file (string) +-- +-- An argument of the `--config <...>` CLI option. +-- +-- Used to deduce advertise URI to connect net.box to the +-- instance. +-- +-- The special value '' means running without `--config <...>` +-- CLI option (but still pass `--name `). +-- * remote_config (table) +-- +-- If `config_file` is not passed, this config value is used to +-- deduce the advertise URI to connect net.box to the instance. +Server.constructor_checks = fun.chain(Server.constructor_checks, { + config_file = 'string', + remote_config = '?table', +}):tomap() + +function Server:initialize() + if self.config_file ~= nil then + self.command = arg[-1] + + self.args = fun.chain(self.args or {}, { + '--name', self.alias + }):totable() + + if self.config_file ~= '' then + table.insert(self.args, '--config') + table.insert(self.args, self.config_file) + + -- Take into account self.chdir to calculate a config + -- file path. + local config_file_path = pathjoin(self.chdir, self.config_file) + + -- Read the provided config file. + local fh, err = fio.open(config_file_path, {'O_RDONLY'}) + if fh == nil then + error(('Unable to open file %q: %s'):format(config_file_path, + err)) + end + self.config = yaml.decode(fh:read()) + fh:close() + end + + if self.net_box_uri == nil then + local config = self.config or self.remote_config + + -- NB: listen and advertise URIs are relative to + -- process.work_dir, which, in turn, is relative to + -- self.chdir. + local work_dir + if config.process ~= nil and config.process.work_dir ~= nil then + work_dir = config.process.work_dir + end + local dir = pathjoin(self.chdir, work_dir) + self.net_box_uri = find_advertise_uri(config, self.alias, dir) + end + end + getmetatable(getmetatable(self)).initialize(self) +end + +function Server:connect_net_box() + getmetatable(getmetatable(self)).connect_net_box(self) + + if self.config_file == nil then + return + end + + if not self.net_box then + return + end + + -- Replace the ready condition. + local saved_eval = self.net_box.eval + self.net_box.eval = function(self, expr, args, opts) -- luacheck: ignore + if expr == 'return _G.ready' then + expr = "return require('config'):info().status == 'ready' or " .. + "require('config'):info().status == 'check_warnings'" + end + return saved_eval(self, expr, args, opts) + end +end + +-- Enable the startup waiting if the advertise URI of the instance +-- is determined. +function Server:start(opts) + opts = opts or {} + if self.config_file and opts.wait_until_ready == nil then + opts.wait_until_ready = self.net_box_uri ~= nil + end + getmetatable(getmetatable(self)).start(self, opts) +end + +return Server diff --git a/test/helper/utils.lua b/test/helper/utils.lua index 7b5c8fd..f3b2342 100644 --- a/test/helper/utils.lua +++ b/test/helper/utils.lua @@ -48,4 +48,20 @@ function utils.is_metrics_supported() return metrics.unregister_callback and counter.remove and true or false end +local function get_tarantool_version() + local version_parts = rawget(_G, '_TARANTOOL'):split('-', 3) + + local major_minor_patch_parts = version_parts[1]:split('.', 2) + local major = tonumber(major_minor_patch_parts[1]) + local minor = tonumber(major_minor_patch_parts[2]) + local patch = tonumber(major_minor_patch_parts[3]) + + return major, minor, patch +end + +function utils.is_tarantool_3() + local major = get_tarantool_version() + return major == 3 +end + return utils diff --git a/test/metrics_test.lua b/test/metrics_test.lua index 09c8082..5ff028b 100644 --- a/test/metrics_test.lua +++ b/test/metrics_test.lua @@ -1,13 +1,13 @@ local t = require('luatest') local g = t.group('metrics_test') -local config = require('test.helper.config') +local helper = require('test.helper') local json = require('json') local utils = require('test.helper.utils') g.before_all(function() - g.queue_conn = config.cluster:server('queue-router').net_box - g.cfg = g.queue_conn:eval("return require('sharded_queue.api').cfg") + g.queue_conn = helper.get_evaler('queue-router') + g.cfg = helper.get_cfg() end) g.before_each(function() @@ -17,7 +17,7 @@ g.before_each(function() end) g.after_each(function() - g.queue_conn:eval("require('sharded_queue.api').cfg(...)", {g.cfg}) + helper.set_cfg(g.cfg) end) local function filter_metrics(metrics, labels) @@ -45,10 +45,11 @@ local metrics = require('metrics') metrics.invoke_callbacks() return metrics.collect() ]] + if instance == nil then metrics = g.queue_conn:eval(eval) else - metrics = config.cluster:server(instance).net_box:eval(eval) + metrics = helper.eval(instance, eval) end for _, v in ipairs(metrics) do @@ -67,6 +68,9 @@ local function merge_metrics(first, second) local found = false for _, f in pairs(first) do if f.metric_name == s.metric_name then + -- It clears the storage name. + f.label_pairs.alias = nil + s.label_pairs.alias = nil if json.encode(f.label_pairs) == json.encode(s.label_pairs) then found = true f.value = f.value + s.value @@ -111,7 +115,6 @@ local function assert_metric(metrics, name, label, values, filters) for k, v in pairs(values) do local filtered = filter_metrics(metric, {[label] = k}) - json.encode(filtered) t.assert_equals(#filtered, 1, label .. "_" .. k) t.assert_equals(filtered[1].value, v, label .. "_" .. k) end @@ -119,9 +122,8 @@ end g.test_metrics_api = function() local tube_name = 'metrics_api_test' - g.queue_conn:call('queue.create_tube', { - tube_name - }) + helper.create_tube(tube_name) + g.queue_conn = helper.get_evaler('queue-router') local task_count = 64 for i = 1, task_count do @@ -191,12 +193,8 @@ end g.test_metrics_api_disabled = function() local tube_name = 'metrics_api_disabled_test' - g.queue_conn:eval("require('sharded_queue.api').cfg(...)", - {{metrics = false}}) - - g.queue_conn:call('queue.create_tube', { - tube_name - }) + helper.set_cfg({metrics = false}) + helper.create_tube(tube_name) g.queue_conn:call(utils.shape_cmd(tube_name, 'put'), { 1, { delay = 3 , ttl = 3, ttr = 1} @@ -214,9 +212,7 @@ end g.test_metrics_api_disable = function() local tube_name = 'metrics_api_disable_test' - g.queue_conn:call('queue.create_tube', { - tube_name - }) + helper.create_tube(tube_name) g.queue_conn:call(utils.shape_cmd(tube_name, 'put'), { 1, { delay = 3 , ttl = 3, ttr = 1} @@ -243,8 +239,7 @@ g.test_metrics_api_disable = function() t.assert_not_equals(get_metric(metrics, "tnt_sharded_queue_storage_role_stats_sum"), {}) t.assert_not_equals(get_metric(metrics, "tnt_sharded_queue_storage_role_stats_count"), {}) - g.queue_conn:eval("require('sharded_queue.api').cfg(...)", - {{metrics = false}}) + helper.set_cfg({metrics = false}) metrics = get_router_metrics(tube_name) t.assert_equals(metrics, {}) @@ -259,11 +254,9 @@ end g.test_metrics_storage = function() local tube_name = 'metrics_storage_test' - g.queue_conn:call('queue.create_tube', { - tube_name - }) + helper.create_tube(tube_name) - local storage = config.cluster:server('queue-storage-1-0').net_box + local storage = helper.get_evaler('queue-storage-1-0') local methods = { 'statistic', 'put', diff --git a/test/simple_test.lua b/test/simple_test.lua index eea6a7e..d4ce502 100644 --- a/test/simple_test.lua +++ b/test/simple_test.lua @@ -1,11 +1,11 @@ local t = require('luatest') local g = t.group('simple_test') -local config = require('test.helper.config') +local helper = require('test.helper') local utils = require('test.helper.utils') g.before_all(function() - g.queue_conn = config.cluster:server('queue-router').net_box + g.queue_conn = helper.get_evaler('queue-router') end) for test_name, options in pairs({ @@ -18,10 +18,7 @@ for test_name, options in pairs({ g['test_put_taken_' .. test_name] = function() local tube_name = 'put_taken_test_' .. test_name - g.queue_conn:call('queue.create_tube', { - tube_name, - options - }) + helper.create_tube(tube_name, options) -- tasks data for putting local task_count = 100 @@ -36,12 +33,10 @@ for test_name, options in pairs({ local task_ids = {} for _, data in pairs(tasks_data) do local task = g.queue_conn:call(utils.shape_cmd(tube_name, 'put'), { data }) - local peek_task = g.queue_conn:call(utils.shape_cmd(tube_name, 'peek'), { task[utils.index.task_id] }) - t.assert_equals(peek_task[utils.index.status], utils.state.READY) task_ids[task[utils.index.task_id]] = true end @@ -73,13 +68,12 @@ end g.test_take_with_options = function() local tube_name = 'test_take_with_options' - g.queue_conn:call('queue.create_tube', { - tube_name, + helper.create_tube(tube_name, { temporary = true, driver = 'sharded_queue.drivers.fifo', } - }) + ) local options, timeout, data = {}, 1, 'data' for _, take_args in pairs({{}, {timeout}, {timeout, options}, {box.NULL, options}}) do @@ -90,20 +84,19 @@ g.test_take_with_options = function() end function g.test_invalid_driver() - t.assert_error_msg_contains('Driver unexistent could not be loaded', function() g.queue_conn:call('queue.create_tube', { + t.assert_error_msg_contains('Driver unexistent could not be loaded', function() helper.create_tube( 'invalid', { driver = 'unexistent' } - }) + ) end) + helper.drop_tube('invalid') end function g.test_delete() local tube_name = 'delete_test' - g.queue_conn:call('queue.create_tube', { - tube_name - }) + helper.create_tube(tube_name) -- task data for putting local task_count = 20 @@ -167,9 +160,7 @@ end function g.test_release() local tube_name = 'release_test' - g.queue_conn:call('queue.create_tube', { - tube_name - }) + helper.create_tube(tube_name) local task_count = 10 local tasks_data = {} @@ -227,9 +218,7 @@ end function g.test_bury_kick() local tube_name = 'bury_kick_test' - g.queue_conn:call('queue.create_tube', { - tube_name - }) + helper.create_tube(tube_name) local cur_stat diff --git a/test/statistics_test.lua b/test/statistics_test.lua index 30d81f8..c7b5ac3 100644 --- a/test/statistics_test.lua +++ b/test/statistics_test.lua @@ -1,19 +1,17 @@ local t = require('luatest') local g = t.group('statistics_test') -local config = require('test.helper.config') +local helper = require('test.helper') local utils = require('test.helper.utils') local fiber = require('fiber') g.before_all(function() - g.queue_conn = config.cluster:server('queue-router').net_box + g.queue_conn = helper.get_evaler('queue-router') end) function g.test_statistics() local tube_name = 'statistics_test' - g.queue_conn:call('queue.create_tube', { - tube_name - }) + helper.create_tube(tube_name) local task_count = 64 local middle = 32 diff --git a/test/storage_test.lua b/test/storage_test.lua index 521a3d5..5f9311e 100644 --- a/test/storage_test.lua +++ b/test/storage_test.lua @@ -1,14 +1,12 @@ -#!/usr/bin/env tarantool - local t = require('luatest') local g = t.group('storage') -local config = require('test.helper.config') +local helper = require('test.helper') local methods = require('sharded_queue.storage.methods') g.before_all(function() - g.storage_master = config.cluster:server('queue-storage-1-0').net_box - g.storage_ro = config.cluster:server('queue-storage-1-1').net_box + g.storage_master = helper.get_evaler('queue-storage-1-0') + g.storage_ro = helper.get_evaler('queue-storage-1-1') end) g.test_storage_methods = function() diff --git a/test/take_exp_backoff_test.lua b/test/take_exp_backoff_test.lua index b910171..851c2d7 100644 --- a/test/take_exp_backoff_test.lua +++ b/test/take_exp_backoff_test.lua @@ -1,18 +1,12 @@ local t = require('luatest') local g = t.group('exponential_backoff_test') -local log = require('log') -- luacheck: ignore - -local config = require('test.helper.config') +local helper = require('test.helper') local utils = require('test.helper.utils') local fiber = require('fiber') g.before_all(function() - --- Workaround for https://github.com/tarantool/cartridge/issues/462 - config.cluster:server('queue-router').net_box:close() - config.cluster:server('queue-router').net_box = nil - config.cluster:server('queue-router'):connect_net_box() - g.queue_conn = config.cluster:server('queue-router').net_box + g.queue_conn = helper.get_evaler('queue-router') end) local function task_take(tube_name, timeout, channel, options) @@ -25,9 +19,7 @@ end function g.test_default_wait_factor() local tube_name = 'test_default_wait_factor' - g.queue_conn:call('queue.create_tube', { - tube_name - }) + helper.create_tube(tube_name) local timeout = 10 -- second local attemts = 8 -- attempts count @@ -62,12 +54,7 @@ function g.test_success() -- expected time is 1.56 in case wait_factor = 5 local tube_name = 'test_success_exp_backoff' - g.queue_conn:call('queue.create_tube', { - tube_name, - { - wait_factor = 5, - } - }) + helper.create_tube(tube_name, {wait_factor = 5}) local timeout = 10 @@ -88,53 +75,35 @@ function g.test_success() end function g.test_invalid_factors() - local tube_name = 'test_tinvalid_factors' - t.assert_error_msg_contains('wait_factor', g.queue_conn.call, - g.queue_conn, - 'queue.create_tube', - { tube_name, { - wait_factor = 0.5, - } - }) + t.assert_error_msg_contains('wait_factor', helper.create_tube, + tube_name, {wait_factor = 0.5} + ) - t.assert_error_msg_contains('wait_factor', g.queue_conn.call, - g.queue_conn, - 'queue.create_tube', - { tube_name, { - wait_factor = 'not factor', - } - }) + t.assert_error_msg_contains('wait_factor', helper.create_tube, + tube_name, {wait_factor = 'not factor'} + ) + + helper.drop_tube(tube_name) end function g.test_invalid_wait_max() - local tube_name = 'test_invalid_wait_max' - t.assert_error_msg_contains('wait_max', g.queue_conn.call, - g.queue_conn, - 'queue.create_tube', - { tube_name, { - wait_max = -8, - } - }) + t.assert_error_msg_contains('wait_max', helper.create_tube, + tube_name, {wait_max = -8} + ) - t.assert_error_msg_contains('wait_max', g.queue_conn.call, - g.queue_conn, - 'queue.create_tube', - { tube_name, { - wait_max = 0, - } - }) + t.assert_error_msg_contains('wait_max', helper.create_tube, + tube_name, {wait_max = 0} + ) - t.assert_error_msg_contains('wait_max', g.queue_conn.call, - g.queue_conn, - 'queue.create_tube', - { tube_name, { - wait_max = 'not number', - } - }) + t.assert_error_msg_contains('wait_max', helper.create_tube, + tube_name, {wait_max = 'not number'} + ) + + helper.drop_tube(tube_name) end function g.test_invalid_wait_max_on_take() @@ -151,13 +120,13 @@ function g.test_invalid_wait_max_on_take() -- expected time is 3.31 in case wait_factor = 5 local tube_name = 'test_invalid_wait_max_on_take' - g.queue_conn:call('queue.create_tube', { + helper.create_tube( tube_name, { wait_factor = 5, wait_max = 1, } - }) + ) local timeout = 10 @@ -197,12 +166,7 @@ function g.test_wait_max_on_tube() -- expected time is 3.27 local tube_name = 'test_wait_max_on_take_tube' - g.queue_conn:call('queue.create_tube', { - tube_name, - { - wait_max = 1, - } - }) + helper.create_tube(tube_name, {wait_max = 1}) local timeout = 10 local channel = fiber.channel(2) @@ -240,13 +204,12 @@ function g.test_wait_max_in_take() -- expected time is 3.27 local tube_name = 'test_wait_max_in_take_tube' - g.queue_conn:call('queue.create_tube', { - tube_name, + helper.create_tube(tube_name, { wait_factor = 2, wait_max = 100, } - }) + ) local timeout = 10 local channel = fiber.channel(2) diff --git a/test/timeout_test.lua b/test/timeout_test.lua index 2d6f093..9c1ce84 100644 --- a/test/timeout_test.lua +++ b/test/timeout_test.lua @@ -1,18 +1,12 @@ local t = require('luatest') local g = t.group('timeout_test') -local log = require('log') -- luacheck: ignore - -local config = require('test.helper.config') +local helper = require('test.helper') local utils = require('test.helper.utils') local fiber = require('fiber') g.before_all(function() - --- Workaround for https://github.com/tarantool/cartridge/issues/462 - config.cluster:server('queue-router').net_box:close() - config.cluster:server('queue-router').net_box = nil - config.cluster:server('queue-router'):connect_net_box() - g.queue_conn = config.cluster:server('queue-router').net_box + g.queue_conn = helper.get_evaler('queue-router') end) local function task_take(tube_name, timeout, channel) @@ -31,12 +25,7 @@ function g.test_try_waiting() -- CHECK uptime and value - nil local tube_name = 'try_waiting_test' - g.queue_conn:call('queue.create_tube', { - tube_name, - { - wait_factor = 1, - } - }) + helper.create_tube(tube_name, {wait_factor = 1}) local timeout = 3 -- second @@ -62,12 +51,7 @@ function g.test_wait_put_taking() -- CHEK what was taken successfully local tube_name = 'wait_put_taking_test' - g.queue_conn:call('queue.create_tube', { - tube_name, - { - wait_factor = 1, - } - }) + helper.create_tube(tube_name, {wait_factor = 1}) local timeout = 3 diff --git a/test/ttl_test.lua b/test/ttl_test.lua index 746f90c..846faef 100644 --- a/test/ttl_test.lua +++ b/test/ttl_test.lua @@ -1,22 +1,25 @@ local t = require('luatest') local g = t.group('ttl_test') -local config = require('test.helper.config') +local helper = require('test.helper') local utils = require('test.helper.utils') local fiber = require('fiber') g.before_all(function() - g.queue_conn = config.cluster:server('queue-router').net_box + g.queue_conn = helper.get_evaler('queue-router') end) -local function lookup_task(task_id, tube_name, cluster) +local function lookup_task(task_id, tube_name) local call_string = ("return box.space.%s:get(%s):tomap({names_only=true})"):format(tube_name, task_id) local ok, stored_task - for _, server in pairs(cluster.servers) do + for server, _ in pairs(helper.servers) do ok, stored_task = pcall(function() - return server.net_box:eval(call_string) + local evaler = helper.get_evaler(server) + return evaler:eval(call_string) end) - if ok then break end + if ok then + break + end end return stored_task end @@ -24,16 +27,13 @@ end function g.test_fifottl_config() local tube_name = 'test_fifottl_config' local tube_options = { ttl = 43, ttr = 15, priority = 17, wait_factor = 1 } - g.queue_conn:call('queue.create_tube', { - tube_name, - tube_options - }) + helper.create_tube(tube_name, tube_options) local task_id = g.queue_conn:call(utils.shape_cmd(tube_name, 'put'), { 'simple data', })[1] - local stored_task = lookup_task(task_id, tube_name, config.cluster) + local stored_task = lookup_task(task_id, tube_name) t.assert_equals(stored_task.ttl, tube_options.ttl * 1000000) t.assert_equals(stored_task.ttr, tube_options.ttr * 1000000) @@ -43,31 +43,23 @@ end function g.test_fifottl_config_pri() local tube_name = 'test_fifottl_config_pri' local tube_options = { ttl = 43, ttr = 15, pri = 15, wait_factor = 1 } - g.queue_conn:call('queue.create_tube', { - tube_name, - tube_options - }) + helper.create_tube(tube_name, tube_options) local task_id = g.queue_conn:call(utils.shape_cmd(tube_name, 'put'), { 'simple data', })[1] - local stored_task = lookup_task(task_id, tube_name, config.cluster) + local stored_task = lookup_task(task_id, tube_name) t.assert_equals(stored_task.priority, tube_options.pri) task_id = g.queue_conn:call(utils.shape_cmd(tube_name, 'put'), { 'simple data', {pri = 18}})[1] - stored_task = lookup_task(task_id, tube_name, config.cluster) + stored_task = lookup_task(task_id, tube_name) t.assert_equals(stored_task.priority, 18) end function g.test_touch_task() local tube_name = 'touch_task_test' - g.queue_conn:call('queue.create_tube', { - tube_name, - { - wait_factor = 1, - } - }) + helper.create_tube(tube_name, {wait_factor = 1}) local task = g.queue_conn:call(utils.shape_cmd(tube_name, 'put'), { 'simple data', @@ -98,12 +90,7 @@ end function g.test_delayed_tasks() local tube_name = 'delayed_tasks_test' - g.queue_conn:call('queue.create_tube', { - tube_name, - { - wait_factor = 1, - } - }) + helper.create_tube(tube_name, {wait_factor = 1}) -- task delayed for 0.1 sec local task = g.queue_conn:call(utils.shape_cmd(tube_name, 'put'), { 'simple data', @@ -153,14 +140,14 @@ end function g.test_ttr_release_no_delete_task() local tube_name = 'ttr_release_no_delete_task_test' - g.queue_conn:call('queue.create_tube', { + helper.create_tube( tube_name, { wait_factor = 1, ttr = 0.2, log_request = true, } - }) + ) g.queue_conn:call(utils.shape_cmd(tube_name, 'put'), { 'simple data', @@ -183,14 +170,14 @@ end function g.test_ttr_bury_no_delete_task() local tube_name = 'ttr_bury_no_delete_task_test' - g.queue_conn:call('queue.create_tube', { + helper.create_tube( tube_name, { wait_factor = 1, ttr = 0.2, log_request = true, } - }) + ) g.queue_conn:call(utils.shape_cmd(tube_name, 'put'), { 'simple data',