Skip to content

Commit

Permalink
api: add more metrics
Browse files Browse the repository at this point in the history
* Metric `sharded_queue_api_role_stats` is a summary of
  `sharded_queue.api` role API calls.

  The metric includes a counter of API calls and errors[1].
  The metric contains labels in the following format:
  `{name = "tube_name", method = "api_call_method", status = "ok" or "error"}`

* Metric `sharded_queue_storage_role_stats` is a summary of
  `sharded_queue.storage` role API calls.

  The metric includes a counter of API calls and errors[1].
  The metric contains labels in the following format:
  `{name = "tube_name", method = "api_call_method", status = "ok" or "error"}`

* Metric `sharded_queue_storage_calls` as an equivalent of
  `sharded_queue_calls` for the `sharded_queue.storage` role.

  Values have the same meaning as the `queue` statistics `calls`
  table[2].
  The metric contains labels in the following format:
  `{name = "tube_name", status = "call_name"}`

* Metric `sharded_queue_storage_tasks` as an equivalent of
  `sharded_queue_tasks` for the `sharded_queue.storage` role.
  Values have the same meaning as the `queue` statistics `tasks`
  table[2].
  The metric contains labels in the following format:
  `{name = "tube_name", status = "task_status"}`

1. https://github.com/tarantool/metrics/?tab=readme-ov-file#summary
2. https://github.com/tarantool/queue?tab=readme-ov-file#getting-statistics

Closes #69
Closes #71
  • Loading branch information
oleg-jukovec committed Mar 20, 2024
1 parent 5e5fd9f commit 9e8b9ef
Show file tree
Hide file tree
Showing 7 changed files with 522 additions and 130 deletions.
25 changes: 25 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,31 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

### Added

- Metric `sharded_queue_api_role_stats` is a summary of
`sharded_queue.api` role API calls (#69).
The metric includes a counter of API calls and errors, see:
https://github.com/tarantool/metrics/?tab=readme-ov-file#summary
The metric contains labels in the following format:
`{name = "tube_name", method = "api_call_method", status = "ok" or "error"}`
- Metric `sharded_queue_storage_role_stats` is a summary of
`sharded_queue.storage` role API calls (#69).
The metric includes a counter of API calls and errors, see:
https://github.com/tarantool/metrics/?tab=readme-ov-file#summary
The metric contains labels in the following format:
`{name = "tube_name", method = "api_call_method", status = "ok" or "error"}`
- Metric `sharded_queue_storage_calls` as an equivalent of
`sharded_queue_calls` for the `sharded_queue.storage` role (#69).
Values have the same meaning as the `queue` statistics `calls` table, see:
https://github.com/tarantool/queue?tab=readme-ov-file#getting-statistics
The metric contains labels in the following format:
`{name = "tube_name", status = "call_name"}`
- Metric `sharded_queue_storage_tasks` as an equivalent of
`sharded_queue_tasks` for the `sharded_queue.storage` role (#69).
Values have the same meaning as the `queue` statistics `tasks` table, see:
https://github.com/tarantool/queue?tab=readme-ov-file#getting-statistics
The metric contains labels in the following format:
`{name = "tube_name", status = "task_status"}`

### Changed

### Fixed
Expand Down
1 change: 1 addition & 0 deletions sharded-queue-scm-1.rockspec
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ build = {
['sharded_queue.drivers.fifottl'] = 'sharded_queue/drivers/fifottl.lua',
['sharded_queue.time'] = 'sharded_queue/time.lua',
['sharded_queue.utils'] = 'sharded_queue/utils.lua',
['sharded_queue.metrics'] = 'sharded_queue/metrics.lua',
['sharded_queue.stash'] = 'sharded_queue/stash.lua',
['sharded_queue.state'] = 'sharded_queue/state.lua',
['sharded_queue.stats.storage'] = 'sharded_queue/stats/storage.lua',
Expand Down
123 changes: 33 additions & 90 deletions sharded_queue/api.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@ local vshard = require('vshard')
local fiber = require('fiber')
local log = require('log')

local metrics = require('sharded_queue.metrics')
local stash = require('sharded_queue.stash')
local state = require('sharded_queue.state')
local time = require('sharded_queue.time')
local utils = require('sharded_queue.utils')

local cartridge_pool = require('cartridge.pool')
local cartridge_rpc = require('cartridge.rpc')
local is_metrics_package, metrics = pcall(require, "metrics")

local stash_names = {
cfg = '__sharded_queue_cfg',
metrics_stats = '__sharded_queue_metrics_stats',
cfg = '__sharded_queue_api_cfg',
metrics_stats = '__sharded_queue_api_metrics_stats',
}
stash.setup(stash_names)

Expand Down Expand Up @@ -403,91 +403,49 @@ end
local sharded_queue = {
tube = {},
cfg = stash.get(stash_names.cfg),
metrics_stats = stash.get(stash_names.metrics_stats),
metrics_stats = metrics.init(stash.get(stash_names.metrics_stats)),
}

if sharded_queue.cfg.metrics == nil then
sharded_queue.cfg.metrics = true
end

local function is_metrics_v_0_11_installed()
if not is_metrics_package or metrics.unregister_callback == nil then
return false
end
local counter = require('metrics.collectors.counter')
return counter.remove and true or false
if sharded_queue.cfg.metrics then
sharded_queue.cfg.metrics = metrics.is_supported()
end

local function metrics_create_collectors()
return {
calls = {
collector = metrics.counter(
"sharded_queue_calls",
"sharded_queue's number of calls"
),
values = {},
},
tasks = {
collector = metrics.gauge(
"sharded_queue_tasks",
"sharded_queue's number of tasks"
)
},
}
end
local function wrap_sharded_queue_call_counters(call, fun)
return function(self, ...)
local before = fiber.clock()
local ok, ret = pcall(fun, self, ...)
local after = fiber.clock()

local function metrics_disable()
if sharded_queue.metrics_stats.callback then
metrics.unregister_callback(sharded_queue.metrics_stats.callback)
end
sharded_queue.metrics_stats.callback = nil
if sharded_queue.cfg.metrics then
sharded_queue.metrics_stats:observe(after - before,
self.tube_name, call, ok)
end

if sharded_queue.metrics_stats.collectors then
for _, c in pairs(sharded_queue.metrics_stats.collectors) do
metrics.registry:unregister(c.collector)
if not ok then
error(ret)
end

return ret
end
sharded_queue.metrics_stats.collectors = nil
end

for call, fun in pairs(sharded_tube) do
sharded_tube[call] = wrap_sharded_queue_call_counters(call, fun)
end

local function metrics_enable()
-- Drop all collectors and a callback.
metrics_disable()

-- Set all collectors and the callback.
sharded_queue.metrics_stats.collectors = metrics_create_collectors()
local callback = function()
local metrics_stats = sharded_queue.metrics_stats
for tube_name, _ in pairs(sharded_queue.tube) do
local stat = sharded_queue.statistics(tube_name)
local collectors = metrics_stats.collectors
if collectors.calls.values[tube_name] == nil then
collectors.calls.values[tube_name] = {}
end
for k, v in pairs(stat.calls) do
local prev = metrics_stats.collectors.calls.values[tube_name][k] or 0
local inc = v - prev
metrics_stats.collectors.calls.collector:inc(inc, {
name = tube_name,
status = k,
})
metrics_stats.collectors.calls.values[tube_name][k] = v
end
for k, v in pairs(stat.tasks) do
metrics_stats.collectors.tasks.collector:set(v, {
name = tube_name,
status = k,
})
end
end
local get_statistic = function(tube)
return sharded_queue.statistics(tube)
end

metrics.register_callback(callback)
sharded_queue.metrics_stats.callback = callback
return true
sharded_queue.metrics_stats:enable('api', sharded_queue.tube, get_statistic)
end

if sharded_queue.cfg.metrics then
sharded_queue.cfg.metrics = is_metrics_v_0_11_installed()
local function metrics_disable()
sharded_queue.metrics_stats:disable()
end

function sharded_queue.cfg_call(_, options)
Expand Down Expand Up @@ -526,7 +484,6 @@ function sharded_queue.statistics(tube_name)

local stats_collection, err = cartridge_pool.map_call('tube_statistic',
{{ tube_name = tube_name }}, {uri_list=storages})

if err ~= nil then
return nil, err
end
Expand Down Expand Up @@ -585,22 +542,7 @@ local function init(opts)
end

local function validate_config(cfg)
if cfg['cfg'] == nil then
return
end

cfg = cfg['cfg']
if type(cfg) ~= 'table' then
error('"cfg" must be a table')
end
if cfg.metrics and type(cfg.metrics) ~= 'boolean' then
error('"cfg.metrics" must be a boolean')
end
if cfg.metrics and cfg.metrics == true then
if not is_metrics_v_0_11_installed() then
error("metrics >= 0.11.0 is required")
end
end
return utils.validate_config_cfg(cfg)
end

local function apply_config(cfg, opts)
Expand All @@ -618,7 +560,7 @@ local function apply_config(cfg, opts)
wait_factor = options.wait_factor or time.DEFAULT_WAIT_FACTOR,
log_request = utils.normalize.log_request(options.log_request),
}, {
__index = sharded_tube
__index = sharded_tube,
})
sharded_queue.tube[tube_name] = self
end
Expand All @@ -645,14 +587,15 @@ local function queue_action_wrapper(action)
if not sharded_queue.tube[name] then
return nil, string.format('No queue "%s" initialized yet', name)
end

return sharded_queue.tube[name][action](sharded_queue.tube[name], ...)
end
end

return {
init = init,
apply_config = apply_config,
validate_config = validate_config,

put = queue_action_wrapper('put'),
take = queue_action_wrapper('take'),
delete = queue_action_wrapper('delete'),
Expand Down
140 changes: 140 additions & 0 deletions sharded_queue/metrics.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
---- Module with metrics helper code.
local is_metrics_package, metrics = pcall(require, "metrics")

local function create_collectors(self, role)
local role_full = "sharded_queue." .. role
local collectors = {}

-- Unique names help to avoid clashes on instances with both roles.
collectors.api_stats = {
collector = metrics.summary(
string.format("sharded_queue_%s_role_stats", role),
string.format("sharded_queue's number of %s API calls", role_full),
{[0.5]=0.01, [0.95]=0.01, [0.99]=0.01},
{max_age_time = 60, age_buckets_count = 5}
),
values = {},
}

if role == "api" then
-- Backward compatibility for the sharded_queue.api role.
collectors.calls = {
collector = metrics.counter(
"sharded_queue_calls",
"sharded_queue's number of calls"
),
values = {},
}
collectors.tasks = {
collector = metrics.gauge(
"sharded_queue_tasks",
"sharded_queue's number of tasks"
),
}
else
-- But for other roles we need another names to avoid clashes on
-- instances with both roles.
collectors.calls = {
collector = metrics.counter(
string.format("sharded_queue_%s_calls", role),
string.format("sharded_queue's number of calls on %s", role_full)
),
values = {},
}
collectors.tasks = {
collector = metrics.gauge(
string.format("sharded_queue_%s_tasks", role),
string.format("sharded_queue's number of tasks on %s", role_full)
),
}
end
self.collectors = collectors
end

local function enable(self, role, tubes, get_statistic_callback)
-- Drop all collectors and a callback.
self:disable(self)

-- Set all collectors and the callback.
create_collectors(self, role)
local callback = function()
for tube_name, _ in pairs(tubes) do
local statistics = get_statistic_callback(tube_name)

if statistics ~= nil then
local collectors = self.collectors
if collectors.calls.values[tube_name] == nil then
collectors.calls.values[tube_name] = {}
end

for k, v in pairs(statistics.calls) do
local prev = collectors.calls.values[tube_name][k] or 0
local inc = v - prev
collectors.calls.collector:inc(inc, {
name = tube_name,
status = k,
})
collectors.calls.values[tube_name][k] = v
end
for k, v in pairs(statistics.tasks) do
collectors.tasks.collector:set(v, {
name = tube_name,
status = k,
})
end
end
end
end

metrics.register_callback(callback)
self.callback = callback
return true
end

local function disable(self)
if self.callback then
metrics.unregister_callback(self.callback)
end
self.callback = nil

if self.collectors then
for _, c in pairs(self.collectors) do
metrics.registry:unregister(c.collector)
end
end
self.collectors = nil
end

local function observe(self, latency, tube, method, ok)
if self.collectors ~= nil then
local status = ok and 'ok' or 'error'
self.collectors.api_stats.collector:observe(
latency, {name = tube, method = method, status = status})
end
end

local mt = {
__index = {
enable = enable,
disable = disable,
observe = observe,
}
}

local function is_v_0_11_installed()
if not is_metrics_package or metrics.unregister_callback == nil then
return false
end
local counter = require('metrics.collectors.counter')
return counter.remove and true or false
end

local function init(stash)
setmetatable(stash, mt)
return stash
end

return {
is_supported = is_v_0_11_installed,
init = init,
}
Loading

0 comments on commit 9e8b9ef

Please sign in to comment.