From da4b966a8171b3743df669ce34b2eb9eedf4dab4 Mon Sep 17 00:00:00 2001 From: Oleg Jukovec Date: Wed, 3 Apr 2024 11:36:48 +0300 Subject: [PATCH] internal: split api.lua to logical blocks The patch splits a large file `api.lua` with a Cartridge role for a few smaller files based on their functionality. It will help to create a Tarantool 3.0 role from that blocks instead of copy-paste. Part of #68 --- sharded-queue-scm-1.rockspec | 4 + sharded_queue/api.lua | 562 ++----------------------------- sharded_queue/router/config.lua | 19 ++ sharded_queue/router/metrics.lua | 31 ++ sharded_queue/router/queue.lua | 142 ++++++++ sharded_queue/router/tube.lua | 415 +++++++++++++++++++++++ test/api_test.lua | 3 +- 7 files changed, 632 insertions(+), 544 deletions(-) create mode 100644 sharded_queue/router/config.lua create mode 100644 sharded_queue/router/metrics.lua create mode 100644 sharded_queue/router/queue.lua create mode 100644 sharded_queue/router/tube.lua diff --git a/sharded-queue-scm-1.rockspec b/sharded-queue-scm-1.rockspec index 3e53455..7b8dcfb 100755 --- a/sharded-queue-scm-1.rockspec +++ b/sharded-queue-scm-1.rockspec @@ -30,6 +30,10 @@ build = { ['sharded_queue.stash'] = 'sharded_queue/stash.lua', ['sharded_queue.state'] = 'sharded_queue/state.lua', ['sharded_queue.stats.storage'] = 'sharded_queue/stats/storage.lua', + ['sharded_queue.router.config'] = 'sharded_queue/router/config.lua', + ['sharded_queue.router.metrics'] = 'sharded_queue/router/metrics.lua', + ['sharded_queue.router.queue'] = 'sharded_queue/router/queue.lua', + ['sharded_queue.router.tube'] = 'sharded_queue/router/tube.lua', ['sharded_queue.storage.config'] = 'sharded_queue/storage/config.lua', ['sharded_queue.storage.drivers'] = 'sharded_queue/storage/drivers.lua', ['sharded_queue.storage.methods'] = 'sharded_queue/storage/methods.lua', diff --git a/sharded_queue/api.lua b/sharded_queue/api.lua index 53a6587..e8a076e 100644 --- a/sharded_queue/api.lua +++ b/sharded_queue/api.lua @@ -1,455 +1,11 @@ local cartridge = require('cartridge') -local vshard = require('vshard') -local fiber = require('fiber') -local log = require('log') -local metrics = require('sharded_queue.metrics') -local stash = require('sharded_queue.stash') -local state = require('sharded_queue.state') -local time = require('sharded_queue.time') +local config = require('sharded_queue.router.config') +local metrics = require('sharded_queue.router.metrics') local utils = require('sharded_queue.utils') +local queue = require('sharded_queue.router.queue') -local stash_names = { - cfg = '__sharded_queue_api_cfg', - metrics_stats = '__sharded_queue_api_metrics_stats', -} -stash.setup(stash_names) - -local remote_call = function(method, replicaset, args, timeout) - return replicaset:callrw(method, { args }, { timeout = timeout }) -end - -local function validate_options(options) - if not options then return true end - - if options.wait_factor then - if type(options.wait_factor) ~= 'number' - or options.wait_factor < 1 - then - return false, "wait_factor must be number greater than or equal to 1" - end - end - - local _, err = utils.normalize.log_request(options.log_request) - if err then - return false, err - end - - if options.wait_max ~= nil then - local err - options.wait_max, err = utils.normalize.wait_max(options.wait_max) - if err ~= nil then - return false, err - end - end - - return true -end - -local function log_task(op_name, task) - local task_id = type(task) == 'table' and task[1] or task - log.info(string.format([[Router[%d] %s: task %s]], fiber.self():id(), op_name, task_id)) -end - -local sharded_tube = {} - -function sharded_tube.put(self, data, options) - local bucket_count = vshard.router.bucket_count() - local bucket_id = math.random(bucket_count) - - options = table.deepcopy(options or {}) - - if options.priority == nil and options.pri ~= nil then - options.priority = options.pri - end - - options.data = data - options.tube_name = self.tube_name - options.bucket_id = bucket_id - options.bucket_count = bucket_count - - options.extra = { - log_request = utils.normalize.log_request(options.log_request) or self.log_request, - } - - local task, err = vshard.router.call(bucket_id, - 'write', 'tube_put', { options }) - -- re-raise storage errors - if err ~= nil then error(err) end - - if options.extra.log_request then - log_task('put', task) - end - - return task -end - --- function for try get task from instance -- -local function take_task(replicasets, options, take_timeout, call_timeout) - for _, replicaset in ipairs(replicasets) do - if take_timeout == 0 then - break - end - local begin = time.cur() - - -- try take task from all instance - local ok, ret = pcall(remote_call, 'tube_take', - replicaset, - options, - call_timeout - ) - - if ret ~= nil and ok then - return ret - end - - local duration = time.cur() - begin - take_timeout = take_timeout > duration and take_timeout - duration or 0 - end -end - -function sharded_tube.take(self, timeout, options) - -- take task from tube -- - options = table.deepcopy(options or {}) - options.tube_name = self.tube_name - - options.extra = { - log_request = utils.normalize.log_request(options.log_request) or self.log_request, - } - - local remote_call_timeout = time.MIN_NET_BOX_CALL_TIMEOUT - if timeout ~= nil and timeout > time.MIN_NET_BOX_CALL_TIMEOUT then - remote_call_timeout = timeout - end - - local take_timeout = time.nano(timeout) or time.TIMEOUT_INFINITY - - local frequency = 1000 - local wait_part = 0.01 -- maximum waiting time in second - local wait_max = utils.normalize.wait_max(options.wait_max) - or self.wait_max or time.MAX_TIMEOUT - - local wait_factor = self.wait_factor - - local calc_part = time.sec(take_timeout / frequency) - - if calc_part < wait_part then - wait_part = tonumber(calc_part) - end - - if options.extra.log_request then - log.info(("Router[%d] take: start attempts"):format(fiber.self():id())) - end - - while take_timeout ~= 0 do - local begin = time.cur() - - local shards, err = vshard.router.routeall() - if err ~= nil then - error(err) - end - - local replicasets = {} - for _, replicaset in pairs(shards) do - table.insert(replicasets, replicaset) - end - utils.array_shuffle(replicasets) - - local task = take_task(replicasets, - options, take_timeout, remote_call_timeout) - - if task ~= nil then - if options.extra.log_request then - log_task('take', task) - end - return task - end - - if take_timeout < time.nano(wait_part) then - if options.extra.log_request then - log.info(("Router[%d] take: next attemt will be after timeout") - :format(fiber.self():id())) - end - return nil - end - - fiber.sleep(wait_part) - - wait_part = wait_part * wait_factor - if wait_part > wait_max then - wait_part = wait_max - end - - local duration = time.cur() - begin - - take_timeout = take_timeout > duration and take_timeout - duration or 0 - end - - if options.extra.log_request then - log.info(("Router[%d] take: timeout"):format(fiber.self():id())) - end -end - -function sharded_tube.delete(self, task_id, options) - -- task delete from tube -- - - local bucket_count = vshard.router.bucket_count() - local bucket_id, _ = utils.unpack_task_id(task_id, bucket_count) - - options = table.deepcopy(options or {}) - options.tube_name = self.tube_name - options.task_id = task_id - - options.extra = { - log_request = utils.normalize.log_request(options.log_request) or self.log_request, - } - - local task, err = vshard.router.call(bucket_id, 'write', 'tube_delete', { - options - }) - -- re-raise storage errors - if err ~= nil then error(err) end - - if options.extra.log_request then - log_task('delete', task) - end - - return task -end - -function sharded_tube.release(self, task_id, options) - -- task release from tube -- - - local bucket_count = vshard.router.bucket_count() - local bucket_id, _ = utils.unpack_task_id(task_id, bucket_count) - - options = table.deepcopy(options or {}) - options.tube_name = self.tube_name - options.task_id = task_id - - options.extra = { - log_request = utils.normalize.log_request(options.log_request) or self.log_request, - } - - local task, err = vshard.router.call(bucket_id, 'write', 'tube_release', { - options - }) - -- re-raise storage errors - if err ~= nil then error(err) end - - if options.extra.log_request then - log_task('release', task) - end - - return task -end - -function sharded_tube.touch(self, task_id, delta, options) - if delta == nil or delta <= 0 then - return - end - - if delta >= time.MAX_TIMEOUT then - delta = time.TIMEOUT_INFINITY - else - delta = time.nano(delta) - end - - local bucket_count = vshard.router.bucket_count() - local bucket_id, _ = utils.unpack_task_id(task_id, bucket_count) - - options = table.deepcopy(options or {}) - options.tube_name = self.tube_name - options.task_id = task_id - options.delta = delta - - options.extra = { - log_request = utils.normalize.log_request(options.log_request) or self.log_request, - } - - local task, err = vshard.router.call(bucket_id, 'write', 'tube_touch', { - options - }) - -- re-raise storage errors - if err ~= nil then error(err) end - - if options.extra.log_request then - log_task('touch', task) - end - - return task -end - -function sharded_tube.ack(self, task_id, options) - -- task delete from tube -- - - local bucket_count = vshard.router.bucket_count() - local bucket_id, _ = utils.unpack_task_id(task_id, bucket_count) - - options = table.deepcopy(options or {}) - options.tube_name = self.tube_name - options.task_id = task_id - - options.extra = { - log_request = utils.normalize.log_request(options.log_request) or self.log_request, - } - - if options.extra.log_request then - log.info(("Router[%d] ack: call id %d, bucket %d") - :format(fiber.self():id(), task_id, bucket_id)) - end - - local task, err = vshard.router.call(bucket_id, 'write', 'tube_ack', { - options - }) - -- re-raise storage errors - if err ~= nil then error(err) end - - if options.extra.log_request then - log_task('ack', task) - end - - return task -end - -function sharded_tube.bury(self, task_id, options) - -- task bury -- - - local bucket_count = vshard.router.bucket_count() - local bucket_id, _ = utils.unpack_task_id(task_id, bucket_count) - - options = table.deepcopy(options or {}) - options.tube_name = self.tube_name - options.task_id = task_id - - options.extra = { - log_request = utils.normalize.log_request(options.log_request) or self.log_request, - } - - local task, err = vshard.router.call(bucket_id, 'write', 'tube_bury', { - options - }) - -- re-raise storage errors - if err ~= nil then error(err) end - - if options.extra.log_request then - log_task('bury', task) - end - - return task -end - -function sharded_tube.kick(self, count, options) - -- try kick few tasks -- - - local kicked_count = 0 -- count kicked task - local shards, err = vshard.router.routeall() - if err ~= nil then - error(err) - end - - for _, replicaset in pairs(shards) do - local opts = table.deepcopy(options or {}) - opts.tube_name = self.tube_name - opts.count = count - kicked_count - - local ok, k = pcall(remote_call, 'tube_kick', replicaset, opts) - if not ok then - log.error(k) - return kicked_count - end - - kicked_count = kicked_count + k - - if kicked_count == count then - break - end - end - - return kicked_count -end - -function sharded_tube.peek(self, task_id, options) - local bucket_count = vshard.router.bucket_count() - local bucket_id, _ = utils.unpack_task_id(task_id, bucket_count) - - options = table.deepcopy(options or {}) - options.tube_name = self.tube_name - options.task_id = task_id - - options.extra = { - log_request = utils.normalize.log_request(options.log_request) or self.log_request, - } - - local task, err = vshard.router.call(bucket_id, 'write', 'tube_peek', { - options - }) - -- re-raise storage errors - if err ~= nil then error(err) end - - if options.extra.log_request then - log_task('peek', task) - end - - return task -end - -function sharded_tube.drop(self) - local tubes = cartridge.config_get_deepcopy('tubes') or {} - - tubes[self.tube_name] = nil - cartridge.config_patch_clusterwide({ tubes = tubes }) -end - -local sharded_queue = { - tube = {}, - cfg = stash.get(stash_names.cfg), - metrics_stats = metrics.init(stash.get(stash_names.metrics_stats)), -} - -if sharded_queue.cfg.metrics == nil then - sharded_queue.cfg.metrics = true -end - -if sharded_queue.cfg.metrics then - sharded_queue.cfg.metrics = metrics.is_supported() -end - -local function wrap_sharded_queue_call_counters(call, fun) - return function(self, ...) - local before = fiber.clock() - local ok, ret = pcall(fun, self, ...) - local after = fiber.clock() - - if sharded_queue.cfg.metrics then - sharded_queue.metrics_stats:observe(after - before, - self.tube_name, call, ok) - end - - if not ok then - error(ret) - end - - return ret - end -end - -for call, fun in pairs(sharded_tube) do - sharded_tube[call] = wrap_sharded_queue_call_counters(call, fun) -end - -local function metrics_enable() - local get_statistic = function(tube) - return sharded_queue.statistics(tube) - end - sharded_queue.metrics_stats:enable('api', sharded_queue.tube, get_statistic) -end - -local function metrics_disable() - sharded_queue.metrics_stats:disable() -end - -function sharded_queue.cfg_call(_, options) +function cfg_call(_, options) options = options or {} if options.metrics == nil then return @@ -459,7 +15,7 @@ function sharded_queue.cfg_call(_, options) error('"metrics" must be a boolean') end - if sharded_queue.cfg.metrics ~= options.metrics then + if config.metrics ~= options.metrics then local tubes = cartridge.config_get_deepcopy('tubes') or {} if tubes['cfg'] ~= nil and tubes['cfg'].metrics == nil then @@ -472,71 +28,8 @@ function sharded_queue.cfg_call(_, options) end end -function sharded_queue.statistics(tube_name) - if not tube_name then - return - end - - local stats_collection, err = vshard.router.map_callrw('tube_statistic', - {{ tube_name = tube_name }}) - if err ~= nil then - return nil, err - end - - if type(stats_collection) ~= 'table' then - return nil, 'No stats retrieved' - end - - if next(stats_collection) == nil then - return nil - end - - local stat = { tasks = {}, calls = {} } - for _, replicaset_stats in pairs(stats_collection) do - if type(replicaset_stats) ~= 'table' or next(replicaset_stats) == nil then - return nil, 'Invalid stats' - end - - for name, count in pairs(replicaset_stats[1].tasks) do - stat.tasks[name] = (stat.tasks[name] or 0) + count - end - for name, count in pairs(replicaset_stats[1].calls) do - stat.calls[name] = (stat.calls[name] or 0) + count - end - end - - return stat -end - -function sharded_queue.create_tube(tube_name, options) - local tubes = cartridge.config_get_deepcopy('tubes') or {} - - if tube_name == 'cfg' then - error('a tube name "cfg" is reserved') - end - - if tubes[tube_name] ~= nil then - -- already exist -- - return nil - end - - local ok , err = validate_options(options) - if not ok then error(err) end - - options = table.deepcopy(options or {}) - if options.priority == nil and options.pri ~= nil then - options.priority = options.pri - end - - tubes[tube_name] = options - ok, err = cartridge.config_patch_clusterwide({ tubes = tubes }) - if not ok then error(err) end - - return sharded_queue.tube[tube_name] -end - local function init(opts) - rawset(_G, 'queue', sharded_queue) + queue.export_globals() end local function validate_config(cfg) @@ -554,43 +47,30 @@ local function apply_config(cfg, opts) for tube_name, options in pairs(cfg_tubes) do if tube_name == 'cfg' then if options.metrics ~= nil then - sharded_queue.cfg.metrics = options.metrics and true or false + config.metrics = options.metrics and true or false end - elseif sharded_queue.tube[tube_name] == nil then - local self = setmetatable({ - tube_name = tube_name, - wait_max = options.wait_max, - wait_factor = options.wait_factor or time.DEFAULT_WAIT_FACTOR, - log_request = utils.normalize.log_request(options.log_request), - }, { - __index = sharded_tube, - }) - sharded_queue.tube[tube_name] = self + elseif queue.map()[tube_name] == nil then + queue.add(tube_name, metrics, options) end end -- try drop tubes -- - for tube_name, _ in pairs(sharded_queue.tube) do + for tube_name, _ in pairs(queue.map()) do if tube_name ~= 'cfg' and cfg_tubes[tube_name] == nil then - setmetatable(sharded_queue.tube[tube_name], nil) - sharded_queue.tube[tube_name] = nil + queue.remove(tube_name) end end - if sharded_queue.cfg.metrics then - metrics_enable() + if config.metrics then + metrics.enable(queue) else - metrics_disable() + metrics.disable() end end --- FIXME: Remove when https://github.com/tarantool/cartridge/issues/308 resolved local function queue_action_wrapper(action) return function(name, ...) - if not sharded_queue.tube[name] then - return nil, string.format('No queue "%s" initialized yet', name) - end - return sharded_queue.tube[name][action](sharded_queue.tube[name], ...) + return queue.call(name, action, ...) end end @@ -611,19 +91,15 @@ return { drop = queue_action_wrapper('drop'), cfg = setmetatable({}, { - __index = sharded_queue.cfg, + __index = config, __newindex = function() error("Use api.cfg() instead", 2) end, - __call = sharded_queue.cfg_call, - __serialize = function() return sharded_queue.cfg end, + __call = cfg_call, + __serialize = function() return config end, }), - statistics = sharded_queue.statistics, + statistics = queue.statistics, _VERSION = require('sharded_queue.version'), dependencies = { 'cartridge.roles.vshard-router', }, - - __private = { - sharded_tube = sharded_tube, - } } diff --git a/sharded_queue/router/config.lua b/sharded_queue/router/config.lua new file mode 100644 index 0000000..26061ca --- /dev/null +++ b/sharded_queue/router/config.lua @@ -0,0 +1,19 @@ +local metrics = require('sharded_queue.metrics') +local stash = require('sharded_queue.stash') + +local stash_names = { + config = '__sharded_queue_router_config', +} +stash.setup(stash_names) + +local config = stash.get(stash_names.config) + +if config.metrics == nil then + config.metrics = true +end + +if config.metrics then + config.metrics = metrics.is_supported() +end + +return config diff --git a/sharded_queue/router/metrics.lua b/sharded_queue/router/metrics.lua new file mode 100644 index 0000000..e11361b --- /dev/null +++ b/sharded_queue/router/metrics.lua @@ -0,0 +1,31 @@ +local metrics = require('sharded_queue.metrics') +local stash = require('sharded_queue.stash') +local stats_storage = require('sharded_queue.stats.storage') + +local stash_names = { + metrics_stats = '__sharded_queue_router_metrics_stats', +} +stash.setup(stash_names) + +local metrics_stats = metrics.init(stash.get(stash_names.metrics_stats)) + +local function enable(queue) + local get_statistic = function(tube) + return queue.statistics(tube) + end + metrics_stats:enable('api', queue.map(), get_statistic) +end + +local function observe(latency, tube, method, ok) + metrics_stats:observe(latency, tube, method, ok) +end + +local function disable() + metrics_stats:disable() +end + +return { + enable = enable, + observe = observe, + disable = disable, +} diff --git a/sharded_queue/router/queue.lua b/sharded_queue/router/queue.lua new file mode 100644 index 0000000..c92d86f --- /dev/null +++ b/sharded_queue/router/queue.lua @@ -0,0 +1,142 @@ +local is_cartridge_package, cartridge = pcall(require, 'cartridge') +local vshard = require('vshard') +local tube = require('sharded_queue.router.tube') +local utils = require('sharded_queue.utils') + +local queue_global = { + tube = {}, +} + +function queue_global.statistics(tube_name) + if not tube_name then + return + end + + local stats_collection, err = vshard.router.map_callrw('tube_statistic', + {{ tube_name = tube_name }}) + if err ~= nil then + return nil, err + end + + if type(stats_collection) ~= 'table' then + return nil, 'No stats retrieved' + end + + if next(stats_collection) == nil then + return nil + end + + local stat = { tasks = {}, calls = {} } + for _, replicaset_stats in pairs(stats_collection) do + if type(replicaset_stats) ~= 'table' or next(replicaset_stats) == nil then + return nil, 'Invalid stats' + end + + for name, count in pairs(replicaset_stats[1].tasks) do + stat.tasks[name] = (stat.tasks[name] or 0) + count + end + for name, count in pairs(replicaset_stats[1].calls) do + stat.calls[name] = (stat.calls[name] or 0) + count + end + end + + return stat +end + +-- The Tarantool 3.0 does not support to update dinamically a configuration, so +-- a user must update the configuration by itself. +if is_cartridge_package then + local function validate_options(options) + if not options then return true end + + if options.wait_factor then + if type(options.wait_factor) ~= 'number' + or options.wait_factor < 1 + then + return false, "wait_factor must be number greater than or equal to 1" + end + end + + local _, err = utils.normalize.log_request(options.log_request) + if err then + return false, err + end + + if options.wait_max ~= nil then + local err + options.wait_max, err = utils.normalize.wait_max(options.wait_max) + if err ~= nil then + return false, err + end + end + + return true + end + + queue_global.create_tube = function(tube_name, options) + require('log').info("CREATE TUBE") + local tubes = cartridge.config_get_deepcopy('tubes') or {} + + if tube_name == 'cfg' then + error('a tube name "cfg" is reserved') + end + + if tubes[tube_name] ~= nil then + return nil + end + + local ok , err = validate_options(options) + if not ok then error(err) end + + options = table.deepcopy(options or {}) + if options.priority == nil and options.pri ~= nil then + options.priority = options.pri + end + + tubes[tube_name] = options + ok, err = cartridge.config_patch_clusterwide({ tubes = tubes }) + if not ok then + error(err) + end + + return queue_global.tube[tube_name] + end +end + +local function export_globals() + rawset(_G, 'queue', queue_global) +end + +local function add(name, metrics, options) + queue_global.tube[name] = tube.new(name, metrics, options) +end + +local function call(tube, action, ...) + if queue_global.tube[tube] == nil then + return nil, string.format('No queue "%s" initialized yet', name) + end + if queue_global.tube[tube][action] == nil then + return nil, string.format('Queue %s has not action %s', tube, action) + end + return queue_global.tube[tube][action](queue_global.tube[tube], ...) +end + +local function map() + return queue_global.tube +end + +local function remove(tube) + if queue_global.tube[tube] ~= nil then + setmetatable(queue_global.tube[tube], nil) + queue_global.tube[tube] = nil + end +end + +return { + export_globals = export_globals, + add = add, + call = call, + map = map, + statistics = queue_global.statistics, + remove = remove, +} diff --git a/sharded_queue/router/tube.lua b/sharded_queue/router/tube.lua new file mode 100644 index 0000000..3bb9926 --- /dev/null +++ b/sharded_queue/router/tube.lua @@ -0,0 +1,415 @@ +local is_cartridge_package, cartridge = pcall(require, 'cartridge') +local fiber = require('fiber') +local vshard = require('vshard') +local log = require('log') + +local time = require('sharded_queue.time') +local utils = require('sharded_queue.utils') + +local function log_task(op_name, task) + local task_id = type(task) == 'table' and task[1] or task + log.info(string.format([[Router[%d] %s: task %s]], fiber.self():id(), op_name, task_id)) +end + +local function remote_call(method, replicaset, args, timeout) + return replicaset:callrw(method, { args }, { timeout = timeout }) +end + +local function take_task(replicasets, options, take_timeout, call_timeout) + for _, replicaset in ipairs(replicasets) do + if take_timeout == 0 then + break + end + local begin = time.cur() + + -- Try to take a task from all instances. + local ok, ret = pcall(remote_call, 'tube_take', + replicaset, + options, + call_timeout + ) + + if ret ~= nil and ok then + return ret + end + + local duration = time.cur() - begin + take_timeout = take_timeout > duration and take_timeout - duration or 0 + end +end + +function put(self, data, options) + local bucket_count = vshard.router.bucket_count() + local bucket_id = math.random(bucket_count) + + options = table.deepcopy(options or {}) + + if options.priority == nil and options.pri ~= nil then + options.priority = options.pri + end + + options.data = data + options.tube_name = self.tube_name + options.bucket_id = bucket_id + options.bucket_count = bucket_count + + options.extra = { + log_request = utils.normalize.log_request(options.log_request) or self.log_request, + } + + local task, err = vshard.router.call(bucket_id, + 'write', 'tube_put', { options }) + -- re-raise storage errors + if err ~= nil then error(err) end + + if options.extra.log_request then + log_task('put', task) + end + + return task +end + +function take(self, timeout, options) + options = table.deepcopy(options or {}) + options.tube_name = self.tube_name + + options.extra = { + log_request = utils.normalize.log_request(options.log_request) or self.log_request, + } + + local remote_call_timeout = time.MIN_NET_BOX_CALL_TIMEOUT + if timeout ~= nil and timeout > time.MIN_NET_BOX_CALL_TIMEOUT then + remote_call_timeout = timeout + end + + local take_timeout = time.nano(timeout) or time.TIMEOUT_INFINITY + + local frequency = 1000 + local wait_part = 0.01 -- maximum waiting time in second + local wait_max = utils.normalize.wait_max(options.wait_max) + or self.wait_max or time.MAX_TIMEOUT + + local wait_factor = self.wait_factor + + local calc_part = time.sec(take_timeout / frequency) + + if calc_part < wait_part then + wait_part = tonumber(calc_part) + end + + if options.extra.log_request then + log.info(("Router[%d] take: start attempts"):format(fiber.self():id())) + end + + while take_timeout ~= 0 do + local begin = time.cur() + + local shards, err = vshard.router.routeall() + if err ~= nil then + error(err) + end + + local replicasets = {} + for _, replicaset in pairs(shards) do + table.insert(replicasets, replicaset) + end + utils.array_shuffle(replicasets) + + local task = take_task(replicasets, + options, take_timeout, remote_call_timeout) + + if task ~= nil then + if options.extra.log_request then + log_task('take', task) + end + return task + end + + if take_timeout < time.nano(wait_part) then + if options.extra.log_request then + log.info(("Router[%d] take: next attemt will be after timeout") + :format(fiber.self():id())) + end + return nil + end + + fiber.sleep(wait_part) + + wait_part = wait_part * wait_factor + if wait_part > wait_max then + wait_part = wait_max + end + + local duration = time.cur() - begin + + take_timeout = take_timeout > duration and take_timeout - duration or 0 + end + + if options.extra.log_request then + log.info(("Router[%d] take: timeout"):format(fiber.self():id())) + end +end + +function delete(self, task_id, options) + local bucket_count = vshard.router.bucket_count() + local bucket_id, _ = utils.unpack_task_id(task_id, bucket_count) + + options = table.deepcopy(options or {}) + options.tube_name = self.tube_name + options.task_id = task_id + + options.extra = { + log_request = utils.normalize.log_request(options.log_request) or self.log_request, + } + + local task, err = vshard.router.call(bucket_id, 'write', 'tube_delete', { + options + }) + -- re-raise storage errors + if err ~= nil then error(err) end + + if options.extra.log_request then + log_task('delete', task) + end + + return task +end + +function release(self, task_id, options) + local bucket_count = vshard.router.bucket_count() + local bucket_id, _ = utils.unpack_task_id(task_id, bucket_count) + + options = table.deepcopy(options or {}) + options.tube_name = self.tube_name + options.task_id = task_id + + options.extra = { + log_request = utils.normalize.log_request(options.log_request) or self.log_request, + } + + local task, err = vshard.router.call(bucket_id, 'write', 'tube_release', { + options + }) + -- re-raise storage errors + if err ~= nil then error(err) end + + if options.extra.log_request then + log_task('release', task) + end + + return task +end + +function touch(self, task_id, delta, options) + if delta == nil or delta <= 0 then + return + end + + if delta >= time.MAX_TIMEOUT then + delta = time.TIMEOUT_INFINITY + else + delta = time.nano(delta) + end + + local bucket_count = vshard.router.bucket_count() + local bucket_id, _ = utils.unpack_task_id(task_id, bucket_count) + + options = table.deepcopy(options or {}) + options.tube_name = self.tube_name + options.task_id = task_id + options.delta = delta + + options.extra = { + log_request = utils.normalize.log_request(options.log_request) or self.log_request, + } + + local task, err = vshard.router.call(bucket_id, 'write', 'tube_touch', { + options + }) + -- re-raise storage errors + if err ~= nil then error(err) end + + if options.extra.log_request then + log_task('touch', task) + end + + return task +end + +function ack(self, task_id, options) + local bucket_count = vshard.router.bucket_count() + local bucket_id, _ = utils.unpack_task_id(task_id, bucket_count) + + options = table.deepcopy(options or {}) + options.tube_name = self.tube_name + options.task_id = task_id + + options.extra = { + log_request = utils.normalize.log_request(options.log_request) or self.log_request, + } + + if options.extra.log_request then + log.info(("Router[%d] ack: call id %d, bucket %d") + :format(fiber.self():id(), task_id, bucket_id)) + end + + local task, err = vshard.router.call(bucket_id, 'write', 'tube_ack', { + options + }) + -- re-raise storage errors + if err ~= nil then error(err) end + + if options.extra.log_request then + log_task('ack', task) + end + + return task +end + +function bury(self, task_id, options) + local bucket_count = vshard.router.bucket_count() + local bucket_id, _ = utils.unpack_task_id(task_id, bucket_count) + + options = table.deepcopy(options or {}) + options.tube_name = self.tube_name + options.task_id = task_id + + options.extra = { + log_request = utils.normalize.log_request(options.log_request) or self.log_request, + } + + local task, err = vshard.router.call(bucket_id, 'write', 'tube_bury', { + options + }) + -- re-raise storage errors + if err ~= nil then error(err) end + + if options.extra.log_request then + log_task('bury', task) + end + + return task +end + +function kick(self, count, options) + local kicked_count = 0 -- count kicked task + local shards, err = vshard.router.routeall() + if err ~= nil then + error(err) + end + + for _, replicaset in pairs(shards) do + local opts = table.deepcopy(options or {}) + opts.tube_name = self.tube_name + opts.count = count - kicked_count + + local ok, k = pcall(remote_call, 'tube_kick', replicaset, opts) + if not ok then + log.error(k) + return kicked_count + end + + kicked_count = kicked_count + k + + if kicked_count == count then + break + end + end + + return kicked_count +end + +function peek(self, task_id, options) + local bucket_count = vshard.router.bucket_count() + local bucket_id, _ = utils.unpack_task_id(task_id, bucket_count) + + options = table.deepcopy(options or {}) + options.tube_name = self.tube_name + options.task_id = task_id + + options.extra = { + log_request = utils.normalize.log_request(options.log_request) or self.log_request, + } + + local task, err = vshard.router.call(bucket_id, 'write', 'tube_peek', { + options + }) + -- re-raise storage errors + if err ~= nil then error(err) end + + if options.extra.log_request then + log_task('peek', task) + end + + return task +end + +function drop(self) + local tubes = cartridge.config_get_deepcopy('tubes') or {} + tubes[self.tube_name] = nil + cartridge.config_patch_clusterwide({ tubes = tubes }) +end + +local methods = { + put = put, + take = take, + delete = delete, + release = release, + touch = touch, + ack = ack, + bury = bury, + kick = kick, + peek = peek, +} + +-- The Tarantool 3.0 does not support to update dinamically a configuration, so +-- a user must update the configuration by itself. +if is_cartridge_package then + methods.drop = drop +end + +local function new_metrics_metatable(metrics) + local mt = { + __index = {}, + } + + for call, fun in pairs(methods) do + mt.__index[call] = function(self, ...) + local before = fiber.clock() + local ok, ret = pcall(fun, self, ...) + local latency = fiber.clock() - before + + metrics.observe(latency, self.tube_name, call, ok) + + if not ok then + error(ret) + end + + return ret + end + end + + return mt +end + +local function new(name, metrics, options) + return setmetatable({ + tube_name = name, + wait_max = options.wait_max, + wait_factor = options.wait_factor or time.DEFAULT_WAIT_FACTOR, + log_request = utils.normalize.log_request(options.log_request), + }, new_metrics_metatable(metrics)) +end + +local function get_methods() + local list = {} + for method, _ in pairs(methods) do + table.insert(list, method) + end + return list +end + +return { + new = new, + get_methods = get_methods, +} diff --git a/test/api_test.lua b/test/api_test.lua index e161752..fdf7794 100644 --- a/test/api_test.lua +++ b/test/api_test.lua @@ -6,6 +6,7 @@ local g = t.group('api') local api = require('sharded_queue.api') local config = require('test.helper.config') local utils = require('test.helper.utils') +local tube = require('sharded_queue.router.tube') local is_metrics_supported = utils.is_metrics_supported() g.before_all(function() @@ -19,7 +20,7 @@ g.after_each(function() end) g.test_exported_api = function() - for method, _ in pairs(api.__private.sharded_tube) do + for _, method in pairs(tube.get_methods()) do t.assert_type(api[method], 'function', string.format('api role has method "%s" exported', method)) end