Skip to content

Commit

Permalink
api: add more metrics
Browse files Browse the repository at this point in the history
* Metric `tnt_sharded_queue_api_role_stats` is a summary[1] with
  quantiles of `sharded_queue.api` role API calls.
  The metric includes a counter of API calls and errors.
  The metric contains labels in the following format:
  `{name = "tube_name", method = "api_call_method", status = "ok" or "error"}`

* Metric `tnt_sharded_queue_storage_role_stats` is a summary[1] with
  quantiles of `sharded_queue.storage` role API calls.
  The metric includes a counter of API calls and errors.
  The metric contains labels in the following format:
  `{name = "tube_name", method = "api_call_method", status = "ok" or "error"}`

* Metric `tnt_sharded_queue_storage_statistics_calls_total` as
  an equivalent of `tnt_sharded_queue_api_statistics_calls_total`
  for the `sharded_queue.storage` role.
  Values have the same meaning as the `queue` statistics `calls`
  table[2].
  The metric contains labels in the following format:
  `{name = "tube_name", state = "call_type"}`

* Metric `tnt_sharded_queue_storage_statistics_tasks` as an
  equivalent of `tnt_sharded_queue_api_statistics_tasks` for
  the `sharded_queue.storage` role.
  Values have the same meaning as the `queue` statistics `tasks`
  table[2].
  The metric contains labels in the following format:
  `{name = "tube_name", state = "task_state"}`

* Metric `sharded_queue_calls` renamed to
  `tnt_sharded_queue_api_statistics_calls_total` The metric now has
  labels in the format `{name = "tube_name", state = "call_type"}`
  instead of `{name = "tube_name", status = "call_type"}`.

* Metric `sharded_queue_tasks` renamed to
  `tnt_sharded_queue_api_statistics_tasks`. The metric now has labels
  in the format `{name = "tube_name", state = "task_state"}` instead of
  `{name = "tube_name", status = "task_state"}`.

1. https://github.com/tarantool/metrics/?tab=readme-ov-file#summary
2. https://github.com/tarantool/queue?tab=readme-ov-file#getting-statistics

Closes #69
Closes #71
  • Loading branch information
oleg-jukovec committed Mar 22, 2024
1 parent 5e5fd9f commit f887616
Show file tree
Hide file tree
Showing 8 changed files with 590 additions and 134 deletions.
37 changes: 37 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,42 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

### Added

- Metric `tnt_sharded_queue_api_role_stats` is a [summary][metrics-summary]
with quantiles of `sharded_queue.api` role API calls (#71).
The metric includes a counter of API calls and errors.
The metric contains labels in the following format:
`{name = "tube_name", method = "api_call_method", status = "ok" or "error"}`
- Metric `tnt_sharded_queue_storage_role_stats` is a [summary][metrics-summary]
with quantiles of `sharded_queue.storage` role API calls (#71).
The metric includes a counter of API calls and errors.
The metric contains labels in the following format:
`{name = "tube_name", method = "api_call_method", status = "ok" or "error"}`
- Metric `tnt_sharded_queue_storage_statistics_calls_total` as an equivalent of
`tnt_sharded_queue_api_statistics_calls_total` for the
`sharded_queue.storage` role (#71).
Values have the same meaning as the [`queue` statistics][queue-statistics]
`calls` table.
The metric contains labels in the following format:
`{name = "tube_name", state = "call_type"}`
- Metric `tnt_sharded_queue_storage_statistics_tasks` as an equivalent of
`tnt_sharded_queue_api_statistics_tasks` for the `sharded_queue.storage`
role (#71).
Values have the same meaning as the [`queue` statistics][queue-statistics]
`tasks` table.
The metric contains labels in the following format:
`{name = "tube_name", state = "task_state"}`

### Changed

- Metric `sharded_queue_calls` renamed to
`tnt_sharded_queue_api_statistics_calls_total` (#71). The metric now has
labels in the format `{name = "tube_name", state = "call_type"}` instead of
`{name = "tube_name", status = "call_type"}`.
- Metric `sharded_queue_tasks` renamed to
`tnt_sharded_queue_api_statistics_tasks` (#71). The metric now has labels
in the format `{name = "tube_name", state = "task_state"}` instead of
`{name = "tube_name", status = "task_state"}`.

### Fixed

- Data race with fifo driver for put()/take() methods with vinyl
Expand Down Expand Up @@ -47,3 +81,6 @@ different shards (over the whole cluster).
- Testing CI (#53).
- Linter check on CI (#18).
- Publish CI (#54).

[metrics-summary]: https://www.tarantool.io/en/doc/latest/book/monitoring/api_reference/#summary
[queue-statistics]: https://github.com/tarantool/queue?tab=readme-ov-file#getting-statistics
70 changes: 70 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,73 @@ make deps
make test
```

## Metrics

The module exports several metrics if the module `metrics` >= 0.11 is
installed and the feature is not disabled by the configuration.

### Role sharded_queue.api

* Metric `tnt_sharded_queue_api_statistics_calls_total` is a counter with
the number of requests broken down by [the type of request][queue-statistics].
The metric has labels in the following format:

`{name = "tube_name", state = "request_type"}`

A list of possible request types: `done`, `take`, `kick`, `bury`, `put`,
`delete`, `touch`, `ack`, `release`. The metric on the `sharded_queue.api`
role accumulates values from all buckets.

* Metric `tnt_sharded_queue_api_statistics_tasks` is a gauge with
the number of tasks in a queue broken down by [a task state][queue-statistics].
The metric has labels in the following format:

`{name = "tube_name", state = "task_state"}`

A list of possible task states: `taken`, `buried`, `ready`, `done`,
`delayed`, `total`. The metric on the `sharded_queue.api` role accumulates
values from all buckets.

* Metric `tnt_sharded_queue_api_role_stats` is a [summary][metrics-summary]
with quantiles of `sharded_queue.api` role API calls. The metric includes a
counter of API calls and errors and has labels in the following format:

`{name = "tube_name", method = "api_call_method", status = "ok" or "error"}`

A list of possible call methods: `put`, `take`, `delete`, `release`, `touch`,
`ack`, `bury`, `kick`, `peek`, `drop`.

### Role sharded_queue.storage

* Metric `tnt_sharded_queue_storage_statistics_calls_total` is a counter with
the number of requests broken down by [the type of request][queue-statistics].
The metric has labels in the following format:

`{name = "tube_name", state = "request_type"}`

A list of possible request types: `done`, `take`, `kick`, `bury`, `put`,
`delete`, `touch`, `ack`, `release`. The metric on the `sharded_queue.storage`
role shows actual values on the instance.

* Metric `tnt_sharded_queue_storage_statistics_tasks` is a gauge with
the number of tasks in a queue broken down by [a task state][queue-statistics].
The metric has labels in the following format:

`{name = "tube_name", state = "task_state"}`

A list of possible task states: `taken`, `buried`, `ready`, `done`,
`delayed`, `total`. The metric on the `sharded_queue.storage` role shows
actual values on the instance.

* Metric `tnt_sharded_queue_storage_role_stats` is a [summary][metrics-summary]
with quantiles of `sharded_queue.api` role API calls. The metric includes a
counter of API calls and errors and has labels in the following format:

`{name = "tube_name", method = "api_call_method", status = "ok" or "error"}`

A list of possible call methods: `statistic`, `put`, `take`, `delete`,
`release`, `touch`, `ack`, `bury`, `kick`, `peek`, `drop`.

## API extensions (compared to tarantool/queue)

* ``tube:take`` method has additional table argument ``options``. It may be used to provide additional logic in some
Expand Down Expand Up @@ -198,3 +265,6 @@ make test
```

If you use **fifottl** driver (default), you can log driver's method calls with `log_request` (log router's and storage's operations).

[metrics-summary]: https://www.tarantool.io/en/doc/latest/book/monitoring/api_reference/#summary
[queue-statistics]: https://github.com/tarantool/queue?tab=readme-ov-file#getting-statistics
1 change: 1 addition & 0 deletions sharded-queue-scm-1.rockspec
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ build = {
['sharded_queue.drivers.fifottl'] = 'sharded_queue/drivers/fifottl.lua',
['sharded_queue.time'] = 'sharded_queue/time.lua',
['sharded_queue.utils'] = 'sharded_queue/utils.lua',
['sharded_queue.metrics'] = 'sharded_queue/metrics.lua',
['sharded_queue.stash'] = 'sharded_queue/stash.lua',
['sharded_queue.state'] = 'sharded_queue/state.lua',
['sharded_queue.stats.storage'] = 'sharded_queue/stats/storage.lua',
Expand Down
123 changes: 33 additions & 90 deletions sharded_queue/api.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@ local vshard = require('vshard')
local fiber = require('fiber')
local log = require('log')

local metrics = require('sharded_queue.metrics')
local stash = require('sharded_queue.stash')
local state = require('sharded_queue.state')
local time = require('sharded_queue.time')
local utils = require('sharded_queue.utils')

local cartridge_pool = require('cartridge.pool')
local cartridge_rpc = require('cartridge.rpc')
local is_metrics_package, metrics = pcall(require, "metrics")

local stash_names = {
cfg = '__sharded_queue_cfg',
metrics_stats = '__sharded_queue_metrics_stats',
cfg = '__sharded_queue_api_cfg',
metrics_stats = '__sharded_queue_api_metrics_stats',
}
stash.setup(stash_names)

Expand Down Expand Up @@ -403,91 +403,49 @@ end
local sharded_queue = {
tube = {},
cfg = stash.get(stash_names.cfg),
metrics_stats = stash.get(stash_names.metrics_stats),
metrics_stats = metrics.init(stash.get(stash_names.metrics_stats)),
}

if sharded_queue.cfg.metrics == nil then
sharded_queue.cfg.metrics = true
end

local function is_metrics_v_0_11_installed()
if not is_metrics_package or metrics.unregister_callback == nil then
return false
end
local counter = require('metrics.collectors.counter')
return counter.remove and true or false
if sharded_queue.cfg.metrics then
sharded_queue.cfg.metrics = metrics.is_supported()
end

local function metrics_create_collectors()
return {
calls = {
collector = metrics.counter(
"sharded_queue_calls",
"sharded_queue's number of calls"
),
values = {},
},
tasks = {
collector = metrics.gauge(
"sharded_queue_tasks",
"sharded_queue's number of tasks"
)
},
}
end
local function wrap_sharded_queue_call_counters(call, fun)
return function(self, ...)
local before = fiber.clock()
local ok, ret = pcall(fun, self, ...)
local after = fiber.clock()

local function metrics_disable()
if sharded_queue.metrics_stats.callback then
metrics.unregister_callback(sharded_queue.metrics_stats.callback)
end
sharded_queue.metrics_stats.callback = nil
if sharded_queue.cfg.metrics then
sharded_queue.metrics_stats:observe(after - before,
self.tube_name, call, ok)
end

if sharded_queue.metrics_stats.collectors then
for _, c in pairs(sharded_queue.metrics_stats.collectors) do
metrics.registry:unregister(c.collector)
if not ok then
error(ret)
end

return ret
end
sharded_queue.metrics_stats.collectors = nil
end

for call, fun in pairs(sharded_tube) do
sharded_tube[call] = wrap_sharded_queue_call_counters(call, fun)
end

local function metrics_enable()
-- Drop all collectors and a callback.
metrics_disable()

-- Set all collectors and the callback.
sharded_queue.metrics_stats.collectors = metrics_create_collectors()
local callback = function()
local metrics_stats = sharded_queue.metrics_stats
for tube_name, _ in pairs(sharded_queue.tube) do
local stat = sharded_queue.statistics(tube_name)
local collectors = metrics_stats.collectors
if collectors.calls.values[tube_name] == nil then
collectors.calls.values[tube_name] = {}
end
for k, v in pairs(stat.calls) do
local prev = metrics_stats.collectors.calls.values[tube_name][k] or 0
local inc = v - prev
metrics_stats.collectors.calls.collector:inc(inc, {
name = tube_name,
status = k,
})
metrics_stats.collectors.calls.values[tube_name][k] = v
end
for k, v in pairs(stat.tasks) do
metrics_stats.collectors.tasks.collector:set(v, {
name = tube_name,
status = k,
})
end
end
local get_statistic = function(tube)
return sharded_queue.statistics(tube)
end

metrics.register_callback(callback)
sharded_queue.metrics_stats.callback = callback
return true
sharded_queue.metrics_stats:enable('api', sharded_queue.tube, get_statistic)
end

if sharded_queue.cfg.metrics then
sharded_queue.cfg.metrics = is_metrics_v_0_11_installed()
local function metrics_disable()
sharded_queue.metrics_stats:disable()
end

function sharded_queue.cfg_call(_, options)
Expand Down Expand Up @@ -526,7 +484,6 @@ function sharded_queue.statistics(tube_name)

local stats_collection, err = cartridge_pool.map_call('tube_statistic',
{{ tube_name = tube_name }}, {uri_list=storages})

if err ~= nil then
return nil, err
end
Expand Down Expand Up @@ -585,22 +542,7 @@ local function init(opts)
end

local function validate_config(cfg)
if cfg['cfg'] == nil then
return
end

cfg = cfg['cfg']
if type(cfg) ~= 'table' then
error('"cfg" must be a table')
end
if cfg.metrics and type(cfg.metrics) ~= 'boolean' then
error('"cfg.metrics" must be a boolean')
end
if cfg.metrics and cfg.metrics == true then
if not is_metrics_v_0_11_installed() then
error("metrics >= 0.11.0 is required")
end
end
return utils.validate_config_cfg(cfg)
end

local function apply_config(cfg, opts)
Expand All @@ -618,7 +560,7 @@ local function apply_config(cfg, opts)
wait_factor = options.wait_factor or time.DEFAULT_WAIT_FACTOR,
log_request = utils.normalize.log_request(options.log_request),
}, {
__index = sharded_tube
__index = sharded_tube,
})
sharded_queue.tube[tube_name] = self
end
Expand All @@ -645,14 +587,15 @@ local function queue_action_wrapper(action)
if not sharded_queue.tube[name] then
return nil, string.format('No queue "%s" initialized yet', name)
end

return sharded_queue.tube[name][action](sharded_queue.tube[name], ...)
end
end

return {
init = init,
apply_config = apply_config,
validate_config = validate_config,

put = queue_action_wrapper('put'),
take = queue_action_wrapper('take'),
delete = queue_action_wrapper('delete'),
Expand Down
Loading

0 comments on commit f887616

Please sign in to comment.