From f88761686dd697608fdb434ed014232004677bf4 Mon Sep 17 00:00:00 2001 From: Oleg Jukovec Date: Tue, 19 Mar 2024 15:48:39 +0300 Subject: [PATCH] api: add more metrics * Metric `tnt_sharded_queue_api_role_stats` is a summary[1] with quantiles of `sharded_queue.api` role API calls. The metric includes a counter of API calls and errors. The metric contains labels in the following format: `{name = "tube_name", method = "api_call_method", status = "ok" or "error"}` * Metric `tnt_sharded_queue_storage_role_stats` is a summary[1] with quantiles of `sharded_queue.storage` role API calls. The metric includes a counter of API calls and errors. The metric contains labels in the following format: `{name = "tube_name", method = "api_call_method", status = "ok" or "error"}` * Metric `tnt_sharded_queue_storage_statistics_calls_total` as an equivalent of `tnt_sharded_queue_api_statistics_calls_total` for the `sharded_queue.storage` role. Values have the same meaning as the `queue` statistics `calls` table[2]. The metric contains labels in the following format: `{name = "tube_name", state = "call_type"}` * Metric `tnt_sharded_queue_storage_statistics_tasks` as an equivalent of `tnt_sharded_queue_api_statistics_tasks` for the `sharded_queue.storage` role. Values have the same meaning as the `queue` statistics `tasks` table[2]. The metric contains labels in the following format: `{name = "tube_name", state = "task_state"}` * Metric `sharded_queue_calls` renamed to `tnt_sharded_queue_api_statistics_calls_total` The metric now has labels in the format `{name = "tube_name", state = "call_type"}` instead of `{name = "tube_name", status = "call_type"}`. * Metric `sharded_queue_tasks` renamed to `tnt_sharded_queue_api_statistics_tasks`. The metric now has labels in the format `{name = "tube_name", state = "task_state"}` instead of `{name = "tube_name", status = "task_state"}`. 1. https://github.com/tarantool/metrics/?tab=readme-ov-file#summary 2. https://github.com/tarantool/queue?tab=readme-ov-file#getting-statistics Closes #69 Closes #71 --- CHANGELOG.md | 37 ++++++ README.md | 70 +++++++++++ sharded-queue-scm-1.rockspec | 1 + sharded_queue/api.lua | 123 +++++------------- sharded_queue/metrics.lua | 122 ++++++++++++++++++ sharded_queue/storage.lua | 112 ++++++++++++++--- sharded_queue/utils.lua | 24 ++++ test/metrics_test.lua | 235 +++++++++++++++++++++++++++++++---- 8 files changed, 590 insertions(+), 134 deletions(-) create mode 100644 sharded_queue/metrics.lua diff --git a/CHANGELOG.md b/CHANGELOG.md index 88647ac..739c3ac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,8 +9,42 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Added +- Metric `tnt_sharded_queue_api_role_stats` is a [summary][metrics-summary] + with quantiles of `sharded_queue.api` role API calls (#71). + The metric includes a counter of API calls and errors. + The metric contains labels in the following format: + `{name = "tube_name", method = "api_call_method", status = "ok" or "error"}` +- Metric `tnt_sharded_queue_storage_role_stats` is a [summary][metrics-summary] + with quantiles of `sharded_queue.storage` role API calls (#71). + The metric includes a counter of API calls and errors. + The metric contains labels in the following format: + `{name = "tube_name", method = "api_call_method", status = "ok" or "error"}` +- Metric `tnt_sharded_queue_storage_statistics_calls_total` as an equivalent of + `tnt_sharded_queue_api_statistics_calls_total` for the + `sharded_queue.storage` role (#71). + Values have the same meaning as the [`queue` statistics][queue-statistics] + `calls` table. + The metric contains labels in the following format: + `{name = "tube_name", state = "call_type"}` +- Metric `tnt_sharded_queue_storage_statistics_tasks` as an equivalent of + `tnt_sharded_queue_api_statistics_tasks` for the `sharded_queue.storage` + role (#71). + Values have the same meaning as the [`queue` statistics][queue-statistics] + `tasks` table. + The metric contains labels in the following format: + `{name = "tube_name", state = "task_state"}` + ### Changed +- Metric `sharded_queue_calls` renamed to + `tnt_sharded_queue_api_statistics_calls_total` (#71). The metric now has + labels in the format `{name = "tube_name", state = "call_type"}` instead of + `{name = "tube_name", status = "call_type"}`. +- Metric `sharded_queue_tasks` renamed to + `tnt_sharded_queue_api_statistics_tasks` (#71). The metric now has labels + in the format `{name = "tube_name", state = "task_state"}` instead of + `{name = "tube_name", status = "task_state"}`. + ### Fixed - Data race with fifo driver for put()/take() methods with vinyl @@ -47,3 +81,6 @@ different shards (over the whole cluster). - Testing CI (#53). - Linter check on CI (#18). - Publish CI (#54). + +[metrics-summary]: https://www.tarantool.io/en/doc/latest/book/monitoring/api_reference/#summary +[queue-statistics]: https://github.com/tarantool/queue?tab=readme-ov-file#getting-statistics diff --git a/README.md b/README.md index cc35888..803ca6f 100644 --- a/README.md +++ b/README.md @@ -166,6 +166,73 @@ make deps make test ``` +## Metrics + +The module exports several metrics if the module `metrics` >= 0.11 is +installed and the feature is not disabled by the configuration. + +### Role sharded_queue.api + +* Metric `tnt_sharded_queue_api_statistics_calls_total` is a counter with + the number of requests broken down by [the type of request][queue-statistics]. + The metric has labels in the following format: + + `{name = "tube_name", state = "request_type"}` + + A list of possible request types: `done`, `take`, `kick`, `bury`, `put`, + `delete`, `touch`, `ack`, `release`. The metric on the `sharded_queue.api` + role accumulates values from all buckets. + +* Metric `tnt_sharded_queue_api_statistics_tasks` is a gauge with + the number of tasks in a queue broken down by [a task state][queue-statistics]. + The metric has labels in the following format: + + `{name = "tube_name", state = "task_state"}` + + A list of possible task states: `taken`, `buried`, `ready`, `done`, + `delayed`, `total`. The metric on the `sharded_queue.api` role accumulates + values from all buckets. + +* Metric `tnt_sharded_queue_api_role_stats` is a [summary][metrics-summary] + with quantiles of `sharded_queue.api` role API calls. The metric includes a + counter of API calls and errors and has labels in the following format: + + `{name = "tube_name", method = "api_call_method", status = "ok" or "error"}` + + A list of possible call methods: `put`, `take`, `delete`, `release`, `touch`, + `ack`, `bury`, `kick`, `peek`, `drop`. + +### Role sharded_queue.storage + +* Metric `tnt_sharded_queue_storage_statistics_calls_total` is a counter with + the number of requests broken down by [the type of request][queue-statistics]. + The metric has labels in the following format: + + `{name = "tube_name", state = "request_type"}` + + A list of possible request types: `done`, `take`, `kick`, `bury`, `put`, + `delete`, `touch`, `ack`, `release`. The metric on the `sharded_queue.storage` + role shows actual values on the instance. + +* Metric `tnt_sharded_queue_storage_statistics_tasks` is a gauge with + the number of tasks in a queue broken down by [a task state][queue-statistics]. + The metric has labels in the following format: + + `{name = "tube_name", state = "task_state"}` + + A list of possible task states: `taken`, `buried`, `ready`, `done`, + `delayed`, `total`. The metric on the `sharded_queue.storage` role shows + actual values on the instance. + +* Metric `tnt_sharded_queue_storage_role_stats` is a [summary][metrics-summary] + with quantiles of `sharded_queue.api` role API calls. The metric includes a + counter of API calls and errors and has labels in the following format: + + `{name = "tube_name", method = "api_call_method", status = "ok" or "error"}` + + A list of possible call methods: `statistic`, `put`, `take`, `delete`, + `release`, `touch`, `ack`, `bury`, `kick`, `peek`, `drop`. + ## API extensions (compared to tarantool/queue) * ``tube:take`` method has additional table argument ``options``. It may be used to provide additional logic in some @@ -198,3 +265,6 @@ make test ``` If you use **fifottl** driver (default), you can log driver's method calls with `log_request` (log router's and storage's operations). + +[metrics-summary]: https://www.tarantool.io/en/doc/latest/book/monitoring/api_reference/#summary +[queue-statistics]: https://github.com/tarantool/queue?tab=readme-ov-file#getting-statistics diff --git a/sharded-queue-scm-1.rockspec b/sharded-queue-scm-1.rockspec index be11c3d..b4210c4 100755 --- a/sharded-queue-scm-1.rockspec +++ b/sharded-queue-scm-1.rockspec @@ -26,6 +26,7 @@ build = { ['sharded_queue.drivers.fifottl'] = 'sharded_queue/drivers/fifottl.lua', ['sharded_queue.time'] = 'sharded_queue/time.lua', ['sharded_queue.utils'] = 'sharded_queue/utils.lua', + ['sharded_queue.metrics'] = 'sharded_queue/metrics.lua', ['sharded_queue.stash'] = 'sharded_queue/stash.lua', ['sharded_queue.state'] = 'sharded_queue/state.lua', ['sharded_queue.stats.storage'] = 'sharded_queue/stats/storage.lua', diff --git a/sharded_queue/api.lua b/sharded_queue/api.lua index bb1eb06..548d3da 100644 --- a/sharded_queue/api.lua +++ b/sharded_queue/api.lua @@ -3,6 +3,7 @@ local vshard = require('vshard') local fiber = require('fiber') local log = require('log') +local metrics = require('sharded_queue.metrics') local stash = require('sharded_queue.stash') local state = require('sharded_queue.state') local time = require('sharded_queue.time') @@ -10,11 +11,10 @@ local utils = require('sharded_queue.utils') local cartridge_pool = require('cartridge.pool') local cartridge_rpc = require('cartridge.rpc') -local is_metrics_package, metrics = pcall(require, "metrics") local stash_names = { - cfg = '__sharded_queue_cfg', - metrics_stats = '__sharded_queue_metrics_stats', + cfg = '__sharded_queue_api_cfg', + metrics_stats = '__sharded_queue_api_metrics_stats', } stash.setup(stash_names) @@ -403,91 +403,49 @@ end local sharded_queue = { tube = {}, cfg = stash.get(stash_names.cfg), - metrics_stats = stash.get(stash_names.metrics_stats), + metrics_stats = metrics.init(stash.get(stash_names.metrics_stats)), } + if sharded_queue.cfg.metrics == nil then sharded_queue.cfg.metrics = true end -local function is_metrics_v_0_11_installed() - if not is_metrics_package or metrics.unregister_callback == nil then - return false - end - local counter = require('metrics.collectors.counter') - return counter.remove and true or false +if sharded_queue.cfg.metrics then + sharded_queue.cfg.metrics = metrics.is_supported() end -local function metrics_create_collectors() - return { - calls = { - collector = metrics.counter( - "sharded_queue_calls", - "sharded_queue's number of calls" - ), - values = {}, - }, - tasks = { - collector = metrics.gauge( - "sharded_queue_tasks", - "sharded_queue's number of tasks" - ) - }, - } -end +local function wrap_sharded_queue_call_counters(call, fun) + return function(self, ...) + local before = fiber.clock() + local ok, ret = pcall(fun, self, ...) + local after = fiber.clock() -local function metrics_disable() - if sharded_queue.metrics_stats.callback then - metrics.unregister_callback(sharded_queue.metrics_stats.callback) - end - sharded_queue.metrics_stats.callback = nil + if sharded_queue.cfg.metrics then + sharded_queue.metrics_stats:observe(after - before, + self.tube_name, call, ok) + end - if sharded_queue.metrics_stats.collectors then - for _, c in pairs(sharded_queue.metrics_stats.collectors) do - metrics.registry:unregister(c.collector) + if not ok then + error(ret) end + + return ret end - sharded_queue.metrics_stats.collectors = nil +end + +for call, fun in pairs(sharded_tube) do + sharded_tube[call] = wrap_sharded_queue_call_counters(call, fun) end local function metrics_enable() - -- Drop all collectors and a callback. - metrics_disable() - - -- Set all collectors and the callback. - sharded_queue.metrics_stats.collectors = metrics_create_collectors() - local callback = function() - local metrics_stats = sharded_queue.metrics_stats - for tube_name, _ in pairs(sharded_queue.tube) do - local stat = sharded_queue.statistics(tube_name) - local collectors = metrics_stats.collectors - if collectors.calls.values[tube_name] == nil then - collectors.calls.values[tube_name] = {} - end - for k, v in pairs(stat.calls) do - local prev = metrics_stats.collectors.calls.values[tube_name][k] or 0 - local inc = v - prev - metrics_stats.collectors.calls.collector:inc(inc, { - name = tube_name, - status = k, - }) - metrics_stats.collectors.calls.values[tube_name][k] = v - end - for k, v in pairs(stat.tasks) do - metrics_stats.collectors.tasks.collector:set(v, { - name = tube_name, - status = k, - }) - end - end + local get_statistic = function(tube) + return sharded_queue.statistics(tube) end - - metrics.register_callback(callback) - sharded_queue.metrics_stats.callback = callback - return true + sharded_queue.metrics_stats:enable('api', sharded_queue.tube, get_statistic) end -if sharded_queue.cfg.metrics then - sharded_queue.cfg.metrics = is_metrics_v_0_11_installed() +local function metrics_disable() + sharded_queue.metrics_stats:disable() end function sharded_queue.cfg_call(_, options) @@ -526,7 +484,6 @@ function sharded_queue.statistics(tube_name) local stats_collection, err = cartridge_pool.map_call('tube_statistic', {{ tube_name = tube_name }}, {uri_list=storages}) - if err ~= nil then return nil, err end @@ -585,22 +542,7 @@ local function init(opts) end local function validate_config(cfg) - if cfg['cfg'] == nil then - return - end - - cfg = cfg['cfg'] - if type(cfg) ~= 'table' then - error('"cfg" must be a table') - end - if cfg.metrics and type(cfg.metrics) ~= 'boolean' then - error('"cfg.metrics" must be a boolean') - end - if cfg.metrics and cfg.metrics == true then - if not is_metrics_v_0_11_installed() then - error("metrics >= 0.11.0 is required") - end - end + return utils.validate_config_cfg(cfg) end local function apply_config(cfg, opts) @@ -618,7 +560,7 @@ local function apply_config(cfg, opts) wait_factor = options.wait_factor or time.DEFAULT_WAIT_FACTOR, log_request = utils.normalize.log_request(options.log_request), }, { - __index = sharded_tube + __index = sharded_tube, }) sharded_queue.tube[tube_name] = self end @@ -645,7 +587,6 @@ local function queue_action_wrapper(action) if not sharded_queue.tube[name] then return nil, string.format('No queue "%s" initialized yet', name) end - return sharded_queue.tube[name][action](sharded_queue.tube[name], ...) end end @@ -653,6 +594,8 @@ end return { init = init, apply_config = apply_config, + validate_config = validate_config, + put = queue_action_wrapper('put'), take = queue_action_wrapper('take'), delete = queue_action_wrapper('delete'), diff --git a/sharded_queue/metrics.lua b/sharded_queue/metrics.lua new file mode 100644 index 0000000..5d19a02 --- /dev/null +++ b/sharded_queue/metrics.lua @@ -0,0 +1,122 @@ +---- Module with metrics helper code. +local is_metrics_package, metrics = pcall(require, "metrics") + +local function create_collectors(self, role) + local role_full = "sharded_queue." .. role + local collectors = {} + + -- Unique names help to avoid clashes on instances with both roles. + collectors.api_stats = { + collector = metrics.summary( + string.format("tnt_sharded_queue_%s_role_stats", role), + string.format("sharded_queue's number of %s API calls", role_full), + {[0.5]=0.01, [0.95]=0.01, [0.99]=0.01}, + {max_age_time = 60, age_buckets_count = 5} + ), + values = {}, + } + + collectors.calls = { + collector = metrics.counter( + string.format("tnt_sharded_queue_%s_statistics_calls_total", role), + string.format("sharded_queue's number of calls on %s", role_full) + ), + values = {}, + } + collectors.tasks = { + collector = metrics.gauge( + string.format("tnt_sharded_queue_%s_statistics_tasks", role), + string.format("sharded_queue's number of task states on %s", + role_full) + ), + } + self.collectors = collectors +end + +local function enable(self, role, tubes, get_statistic_callback) + -- Drop all collectors and a callback. + self:disable(self) + + -- Set all collectors and the callback. + create_collectors(self, role) + local callback = function() + for tube_name, _ in pairs(tubes) do + local statistics = get_statistic_callback(tube_name) + + if statistics ~= nil then + local collectors = self.collectors + if collectors.calls.values[tube_name] == nil then + collectors.calls.values[tube_name] = {} + end + + for k, v in pairs(statistics.calls) do + local prev = collectors.calls.values[tube_name][k] or 0 + local inc = v - prev + collectors.calls.collector:inc(inc, { + name = tube_name, + state = k, + }) + collectors.calls.values[tube_name][k] = v + end + for k, v in pairs(statistics.tasks) do + collectors.tasks.collector:set(v, { + name = tube_name, + state = k, + }) + end + end + end + end + + metrics.register_callback(callback) + self.callback = callback + return true +end + +local function disable(self) + if self.callback then + metrics.unregister_callback(self.callback) + end + self.callback = nil + + if self.collectors then + for _, c in pairs(self.collectors) do + metrics.registry:unregister(c.collector) + end + end + self.collectors = nil +end + +local function observe(self, latency, tube, method, ok) + if self.collectors ~= nil then + local status = ok and 'ok' or 'error' + self.collectors.api_stats.collector:observe( + latency, {name = tube, method = method, status = status}) + end +end + +local mt = { + __index = { + enable = enable, + disable = disable, + observe = observe, + } +} + +local function is_v_0_11_installed() + if not is_metrics_package or metrics.unregister_callback == nil then + return false + end + local counter = require('metrics.collectors.counter') + return counter.remove and true or false +end + +local function init(stash) + setmetatable(stash, mt) + return stash +end + +return { + is_supported = is_v_0_11_installed, + init = init, +} diff --git a/sharded_queue/storage.lua b/sharded_queue/storage.lua index 84381cb..c3ecdb5 100644 --- a/sharded_queue/storage.lua +++ b/sharded_queue/storage.lua @@ -1,13 +1,49 @@ -local log = require('log') +local fiber = require('fiber') local json = require('json') +local log = require('log') local cartridge = require('cartridge') +local metrics = require('sharded_queue.metrics') +local stash = require('sharded_queue.stash') local state = require('sharded_queue.state') local stats_storage = require('sharded_queue.stats.storage') +local utils = require('sharded_queue.utils') local DEFAULT_DRIVER = 'sharded_queue.drivers.fifottl' +local stash_names = { + cfg = '__sharded_queue_storage_cfg', + metrics_stats = '__sharded_queue_storage_metrics_stats', +} +stash.setup(stash_names) + +local methods = { + 'statistic', + 'put', + 'take', + 'delete', + 'touch', + 'ack', + 'peek', + 'release', + 'bury', + 'kick', +} + +local storage = { + cfg = stash.get(stash_names.cfg), + metrics_stats = metrics.init(stash.get(stash_names.metrics_stats)), +} + +if storage.cfg.metrics == nil then + storage.cfg.metrics = true +end + +if storage.cfg.metrics then + storage.cfg.metrics = metrics.is_supported() +end + local queue_drivers = {} local function get_driver(driver_name) if queue_drivers[driver_name] == nil then @@ -30,8 +66,20 @@ local function map_tubes(cfg_tubes) return result end -local function validate_config(conf_new, _) - local cfg_tubes = conf_new.tubes or {} +local function metrics_enable() + local get_statistic = function(tube) + return stats_storage.get(tube) + end + + storage.metrics_stats:enable('storage', tubes, get_statistic) +end + +local function metrics_disable() + storage.metrics_stats:disable() +end + +local function validate_config(cfg) + local cfg_tubes = cfg.tubes or {} for tube_name, tube_opts in pairs(cfg_tubes) do if tube_opts.driver ~= nil then if type('tube_opts.driver') ~= 'string' then @@ -43,27 +91,21 @@ local function validate_config(conf_new, _) end end end - return true -end -local methods = { - 'statistic', - 'put', - 'take', - 'delete', - 'touch', - 'ack', - 'peek', - 'release', - 'bury', - 'kick', -} + return utils.validate_config_cfg(cfg) +end local function apply_config(cfg, opts) if opts.is_master then stats_storage.init() local cfg_tubes = cfg.tubes or {} + if cfg_tubes['cfg'] ~= nil then + local options = cfg_tubes['cfg'] + if options.metrics ~= nil then + storage.cfg.metrics = options.metrics and true or false + end + end local existing_tubes = tubes @@ -95,7 +137,21 @@ local function apply_config(cfg, opts) local tube_name = args.tube_name if tubes[tube_name].method[name] == nil then error(('Method %s not implemented in tube %s'):format(name, tube_name)) end - return tubes[tube_name].method[name](args) + + local before = fiber.clock() + local ok, ret, err = pcall(tubes[tube_name].method[name], args) + local after = fiber.clock() + + if storage.cfg.metrics then + storage.metrics_stats:observe(after - before, + tube_name, name, ok and err == nil) + end + + if not ok then + error(ret) + end + + return ret, err end local global_name = 'tube_' .. name @@ -104,13 +160,31 @@ local function apply_config(cfg, opts) end local tube_statistic_func = function(args) - return stats_storage.get(args.tube_name) + local before = fiber.clock() + local ok, ret, err = pcall(stats_storage.get, args.tube_name) + local after = fiber.clock() + if storage.cfg.metrics then + storage.metrics_stats:observe(after - before, + args.tube_name, 'statistic', ok and err == nil) + end + + if not ok then + error(ret) + end + + return ret, err end rawset(_G, 'tube_statistic', tube_statistic_func) box.schema.func.create('tube_statistic', { if_not_exists = true }) end + if storage.cfg.metrics then + metrics_enable() + else + metrics_disable() + end + return true end diff --git a/sharded_queue/utils.lua b/sharded_queue/utils.lua index 06b47f0..3f483b4 100644 --- a/sharded_queue/utils.lua +++ b/sharded_queue/utils.lua @@ -1,5 +1,7 @@ local fiber = require('fiber') +local metrics = require('sharded_queue.metrics') + local utils = {} local function atomic_tail(status, ...) @@ -72,4 +74,26 @@ function utils.normalize.wait_max(wait_max) return wait_max end +function utils.validate_config_cfg(cfg) + cfg = cfg.tubes or {} + if cfg['cfg'] == nil then + return true + end + + cfg = cfg['cfg'] + if type(cfg) ~= 'table' then + return nil, '"cfg" must be a table' + end + if cfg.metrics and type(cfg.metrics) ~= 'boolean' then + return nil, '"cfg.metrics" must be a boolean' + end + if cfg.metrics and cfg.metrics == true then + if not metrics.is_supported() then + return nil, "metrics >= 0.11.0 is required" + end + end + + return true +end + return utils diff --git a/test/metrics_test.lua b/test/metrics_test.lua index e46f20f..09c8082 100644 --- a/test/metrics_test.lua +++ b/test/metrics_test.lua @@ -2,6 +2,7 @@ local t = require('luatest') local g = t.group('metrics_test') local config = require('test.helper.config') +local json = require('json') local utils = require('test.helper.utils') g.before_all(function() @@ -19,27 +20,77 @@ g.after_each(function() g.queue_conn:eval("require('sharded_queue.api').cfg(...)", {g.cfg}) end) -local function filter_metrics(metrics, label, value) +local function filter_metrics(metrics, labels) local filtered = {} for _, v in pairs(metrics) do - if v.label_pairs and v.label_pairs[label] and v.label_pairs[label] == value then - table.insert(filtered, v) - end + local found = true + for label, value in pairs(labels) do + if not v.label_pairs + or not v.label_pairs[label] + or v.label_pairs[label] ~= value then + found = false + end + end + if found then + table.insert(filtered, v) + end end return filtered end -local function get_metrics(tube_name) - local metrics = g.queue_conn:eval([[ +local function get_metrics(tube_name, instance) + local metrics + local eval = [[ local metrics = require('metrics') metrics.invoke_callbacks() return metrics.collect() -]]) +]] + if instance == nil then + metrics = g.queue_conn:eval(eval) + else + metrics = config.cluster:server(instance).net_box:eval(eval) + end + for _, v in ipairs(metrics) do v.timestamp = nil end - return filter_metrics(metrics, "name", tube_name) + return filter_metrics(metrics, {name = tube_name}) +end + +local function get_router_metrics(tube_name) + return get_metrics(tube_name, nil) +end + +local function merge_metrics(first, second) + for _, s in pairs(second) do + local found = false + for _, f in pairs(first) do + if f.metric_name == s.metric_name then + if json.encode(f.label_pairs) == json.encode(s.label_pairs) then + found = true + f.value = f.value + s.value + break + end + end + end + if not found then + table.insert(first, s) + end + end + return first +end + +-- Values on storages could be random, so we are interesting in an accumulated +-- value. +local function get_storages_metrics(tube_name) + local masters = {"queue-storage-1-0", "queue-storage-2-0"} + local metrics = {} + for _, instance in ipairs(masters) do + local instance_metrics = get_metrics(tube_name, instance) + metrics = merge_metrics(metrics, instance_metrics) + end + return metrics end local function get_metric(metrics, metric_name) @@ -52,17 +103,22 @@ local function get_metric(metrics, metric_name) return filtered end -local function assert_metric(metrics, name, label, values) +local function assert_metric(metrics, name, label, values, filters) local metric = get_metric(metrics, name) + if filters ~= nil then + metric = filter_metrics(metric, filters) + end + for k, v in pairs(values) do - local filtered = filter_metrics(metric, label, k) + local filtered = filter_metrics(metric, {[label] = k}) + json.encode(filtered) t.assert_equals(#filtered, 1, label .. "_" .. k) t.assert_equals(filtered[1].value, v, label .. "_" .. k) end end -g.test_metrics = function() - local tube_name = 'metrics_test' +g.test_metrics_api = function() + local tube_name = 'metrics_api_test' g.queue_conn:call('queue.create_tube', { tube_name }) @@ -73,9 +129,43 @@ g.test_metrics = function() i, { delay = 3 , ttl = 3, ttr = 1} }) end - -- check all putten task - local metrics = get_metrics(tube_name) - assert_metric(metrics, "sharded_queue_calls", "status", { + t.assert_error(function() + g.queue_conn:call(utils.shape_cmd(tube_name, 'peek'), {'invalid'}) + end) + + -- Check metrics on the router. + local metrics = get_router_metrics(tube_name) + assert_metric(metrics, "tnt_sharded_queue_api_role_stats_count", "method", { + put = task_count, + }, {status = "ok"}) + assert_metric(metrics, "tnt_sharded_queue_api_statistics_calls_total", "state", { + done = 0, + take = 0, + kick = 0, + bury = 0, + put = task_count, + delete = 0, + touch = 0, + ack = 0, + release = 0, + }) + assert_metric(metrics, "tnt_sharded_queue_api_statistics_tasks", "state", { + ready = 0, + taken = 0, + done = 0, + buried = 0, + delayed = task_count, + total = task_count, + }) + t.assert_not_equals(get_metric(metrics, "tnt_sharded_queue_api_role_stats"), {}) + t.assert_not_equals(get_metric(metrics, "tnt_sharded_queue_api_role_stats_sum"), {}) + + -- Check metrics on storages. + metrics = get_storages_metrics(tube_name) + assert_metric(metrics, "tnt_sharded_queue_storage_role_stats_count", "method", { + put = task_count, + }, {status = "ok"}) + assert_metric(metrics, "tnt_sharded_queue_storage_statistics_calls_total", "state", { done = 0, take = 0, kick = 0, @@ -86,7 +176,7 @@ g.test_metrics = function() ack = 0, release = 0, }) - assert_metric(metrics, "sharded_queue_tasks", "status", { + assert_metric(metrics, "tnt_sharded_queue_storage_statistics_tasks", "state", { ready = 0, taken = 0, done = 0, @@ -94,10 +184,13 @@ g.test_metrics = function() delayed = task_count, total = task_count, }) + t.assert_not_equals(get_metric(metrics, "tnt_sharded_queue_storage_role_stats"), {}) + t.assert_not_equals(get_metric(metrics, "tnt_sharded_queue_storage_role_stats_sum"), {}) + t.assert_not_equals(get_metric(metrics, "tnt_sharded_queue_storage_role_stats_count"), {}) end -g.test_metrics_disabled = function() - local tube_name = 'metrics_disabled_test' +g.test_metrics_api_disabled = function() + local tube_name = 'metrics_api_disabled_test' g.queue_conn:eval("require('sharded_queue.api').cfg(...)", {{metrics = false}}) @@ -108,12 +201,19 @@ g.test_metrics_disabled = function() g.queue_conn:call(utils.shape_cmd(tube_name, 'put'), { 1, { delay = 3 , ttl = 3, ttr = 1} }) - local metrics = get_metrics(tube_name) + local metrics = get_router_metrics(tube_name) t.assert_equals(metrics, {}) + + metrics = get_storages_metrics(tube_name) + t.assert_equals(get_metric(metrics, "tnt_sharded_queue_storage_statistics_calls_total"), {}) + t.assert_equals(get_metric(metrics, "sharded_queue_storage_errors"), {}) + t.assert_equals(get_metric(metrics, "tnt_sharded_queue_storage_role_stats"), {}) + t.assert_equals(get_metric(metrics, "tnt_sharded_queue_storage_role_stats_sum"), {}) + t.assert_equals(get_metric(metrics, "tnt_sharded_queue_storage_role_stats_count"), {}) end -g.test_metrics_disable = function() - local tube_name = 'metrics_disable_test' +g.test_metrics_api_disable = function() + local tube_name = 'metrics_api_disable_test' g.queue_conn:call('queue.create_tube', { tube_name }) @@ -121,17 +221,102 @@ g.test_metrics_disable = function() g.queue_conn:call(utils.shape_cmd(tube_name, 'put'), { 1, { delay = 3 , ttl = 3, ttr = 1} }) - local metrics = get_metrics(tube_name) - assert_metric(metrics, "sharded_queue_calls", "status", { + local metrics = get_router_metrics(tube_name) + assert_metric(metrics, "tnt_sharded_queue_api_statistics_calls_total", "state", { + put = 1, + }) + assert_metric(metrics, "tnt_sharded_queue_api_statistics_tasks", "state", { + delayed = 1, + }) + t.assert_not_equals(get_metric(metrics, "tnt_sharded_queue_api_role_stats"), {}) + t.assert_not_equals(get_metric(metrics, "tnt_sharded_queue_api_role_stats_sum"), {}) + t.assert_not_equals(get_metric(metrics, "tnt_sharded_queue_api_role_stats_count"), {}) + + metrics = get_storages_metrics(tube_name) + assert_metric(metrics, "tnt_sharded_queue_storage_statistics_calls_total", "state", { put = 1, }) - assert_metric(metrics, "sharded_queue_tasks", "status", { + assert_metric(metrics, "tnt_sharded_queue_storage_statistics_tasks", "state", { delayed = 1, }) + t.assert_not_equals(get_metric(metrics, "tnt_sharded_queue_storage_role_stats"), {}) + t.assert_not_equals(get_metric(metrics, "tnt_sharded_queue_storage_role_stats_sum"), {}) + t.assert_not_equals(get_metric(metrics, "tnt_sharded_queue_storage_role_stats_count"), {}) g.queue_conn:eval("require('sharded_queue.api').cfg(...)", {{metrics = false}}) - metrics = get_metrics(tube_name) + metrics = get_router_metrics(tube_name) t.assert_equals(metrics, {}) + + metrics = get_storages_metrics(tube_name) + t.assert_equals(get_metric(metrics, "tnt_sharded_queue_storage_statistics_calls_total"), {}) + t.assert_equals(get_metric(metrics, "tnt_sharded_queue_storage_statistics_tasks"), {}) + t.assert_equals(get_metric(metrics, "tnt_sharded_queue_storage_role_stats"), {}) + t.assert_equals(get_metric(metrics, "tnt_sharded_queue_storage_role_stats_sum"), {}) + t.assert_equals(get_metric(metrics, "tnt_sharded_queue_storage_role_stats_count"), {}) +end + +g.test_metrics_storage = function() + local tube_name = 'metrics_storage_test' + g.queue_conn:call('queue.create_tube', { + tube_name + }) + + local storage = config.cluster:server('queue-storage-1-0').net_box + local methods = { + 'statistic', + 'put', + 'take', + 'delete', + 'touch', + 'ack', + 'peek', + 'release', + 'bury', + 'kick', + } + + -- Some of them will fail, some of them not - it does not metter. We just + -- ensure that metrics works on the storage. + local oks = {} + local errors = {} + for _, method in ipairs(methods) do + local ok, _, err = pcall(function() + return storage:call("tube_" .. method, {{tube_name = tube_name}}) + end) + if ok and err == nil then + oks[method] = 1 + else + errors[method] = 1 + end + end + + local metrics = get_metrics(tube_name, 'queue-storage-1-0') + + assert_metric(metrics, "tnt_sharded_queue_storage_role_stats_count", "method", + oks, {status = "ok"}) + assert_metric(metrics, "tnt_sharded_queue_storage_role_stats_count", "method", + errors, {status = "error"}) + assert_metric(metrics, "tnt_sharded_queue_storage_statistics_calls_total", "state", { + done = 0, + take = 0, + kick = 0, + bury = 1, + put = 0, + delete = 0, + touch = 0, + ack = 0, + release = 0, + }) + assert_metric(metrics, "tnt_sharded_queue_storage_statistics_tasks", "state", { + ready = 0, + taken = 0, + done = 0, + buried = 0, + delayed = 0, + total = 0, + }) + t.assert_not_equals(get_metric(metrics, "tnt_sharded_queue_storage_role_stats"), {}) + t.assert_not_equals(get_metric(metrics, "tnt_sharded_queue_storage_role_stats_sum"), {}) end