Skip to content

Commit

Permalink
api: add latency metrics for API calls
Browse files Browse the repository at this point in the history
* Metric `sharded_queue_api_role_latency` is a histogram for latency
  fo API calls on an instance with `sharded_queue.api` role.
* Metric `sharded_queue_storage_role_latency` is a histogram for
  latency fo API calls on an instance with `sharded_queue.storage`
  role.

Closes #71
  • Loading branch information
oleg-jukovec committed Mar 19, 2024
1 parent c2d3242 commit 9ad99b1
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 1 deletion.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
instance with `sharded_queue.api` role (#69).
- Metric `sharded_queue_api_role_errors` is a counter of API calls that failed
on an instance with `sharded_queue.api` role (#69).
- Metric `sharded_queue_api_role_latency` is a histogram for latency fo API
calls on an instance with `sharded_queue.api` role (#71).
- Metric `sharded_queue_storage_role_calls` is a counter of API calls on an
instance with `sharded_queue.storage` role (#69).
- Metric `sharded_queue_storage_role_errors` is a counter of API calls that
Expand All @@ -23,6 +25,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
- Metric `sharded_queue_storage_tasks` as an equivalent of
`sharded_queue_tasks` for an instance with `sharded_queue.storage`
role (#69).
- Metric `sharded_queue_storage_role_latency` is a histogram for latency fo API
calls on an instance with `sharded_queue.storage` role (#71).

### Changed

Expand Down
12 changes: 12 additions & 0 deletions sharded_queue/api.lua
Original file line number Diff line number Diff line change
Expand Up @@ -426,12 +426,24 @@ end

local function wrap_sharded_queue_call_counters(call, fun)
return function(self, ...)
local before = fiber.clock()
sharded_queue.api_call_stats:increment(self.tube_name, call)

local ok, ret = pcall(fun, self, ...)

local after = fiber.clock()
if sharded_queue.cfg.metrics
and sharded_queue.metrics_stats.collectors ~= nil then
sharded_queue.metrics_stats.collectors.latency.collector:observe(
after - before,
{name = self.tube_name, status = call}
)
end
if not ok then
sharded_queue.api_error_stats:increment(self.tube_name, call)
error(ret)
end

return ret
end
end
Expand Down
8 changes: 8 additions & 0 deletions sharded_queue/metrics.lua
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ local function create_collectors(role, metrics_stats)
values = {},
}

collectors.latency = {
collector = metrics.histogram(
string.format("sharded_queue_%s_role_latency", role),
string.format("sharded_queue's latency of API calls to %s",
role_full)
),
}

if role == "api" then
-- Backward compatibility for the sharded_queue.api role.
collectors.calls = {
Expand Down
15 changes: 14 additions & 1 deletion sharded_queue/storage.lua
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
local log = require('log')
local fiber = require('fiber')
local json = require('json')
local log = require('log')

local cartridge = require('cartridge')

Expand Down Expand Up @@ -149,11 +150,23 @@ local function apply_config(cfg, opts)
local tube_name = args.tube_name
if tubes[tube_name].method[name] == nil then error(('Method %s not implemented in tube %s'):format(name, tube_name)) end

local before = fiber.clock()
storage.api_call_stats:increment(tube_name, name)

local ok, ret, err = pcall(tubes[tube_name].method[name], args)

local after = fiber.clock()
if storage.cfg.metrics
and storage.metrics_stats.collectors ~= nil then
storage.metrics_stats.collectors.latency.collector:observe(
after - before,
{name = tube_name, status = name}
)
end
if not ok or err ~= nil then
storage.api_error_stats:increment(tube_name, name)
end

if not ok then
error(ret)
end
Expand Down
23 changes: 23 additions & 0 deletions test/metrics_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ g.test_metrics_api = function()
delayed = task_count,
total = task_count,
})
t.assert_not_equals(get_metric(metrics, "sharded_queue_api_role_latency_bucket"), {})
t.assert_not_equals(get_metric(metrics, "sharded_queue_api_role_latency_sum"), {})
t.assert_not_equals(get_metric(metrics, "sharded_queue_api_role_latency_count"), {})

-- Check metrics on storages.
metrics = get_storages_metrics(tube_name)
Expand Down Expand Up @@ -209,6 +212,9 @@ g.test_metrics_api = function()
delayed = task_count,
total = task_count,
})
t.assert_not_equals(get_metric(metrics, "sharded_queue_storage_role_latency_bucket"), {})
t.assert_not_equals(get_metric(metrics, "sharded_queue_storage_role_latency_sum"), {})
t.assert_not_equals(get_metric(metrics, "sharded_queue_storage_role_latency_count"), {})
end

g.test_metrics_api_disabled = function()
Expand All @@ -231,6 +237,9 @@ g.test_metrics_api_disabled = function()
t.assert_equals(get_metric(metrics, "sharded_queue_storage_role_errors"), {})
t.assert_equals(get_metric(metrics, "sharded_queue_storage_storage_calls"), {})
t.assert_equals(get_metric(metrics, "sharded_queue_storage_storage_tasks"), {})
t.assert_equals(get_metric(metrics, "sharded_queue_storage_role_latency_bucket"), {})
t.assert_equals(get_metric(metrics, "sharded_queue_storage_role_latency_sum"), {})
t.assert_equals(get_metric(metrics, "sharded_queue_storage_role_latency_count"), {})
end

g.test_metrics_api_disable = function()
Expand All @@ -255,6 +264,10 @@ g.test_metrics_api_disable = function()
assert_metric(metrics, "sharded_queue_tasks", "status", {
delayed = 1,
})
t.assert_not_equals(get_metric(metrics, "sharded_queue_api_role_latency_bucket"), {})
t.assert_not_equals(get_metric(metrics, "sharded_queue_api_role_latency_sum"), {})
t.assert_not_equals(get_metric(metrics, "sharded_queue_api_role_latency_count"), {})

metrics = get_storages_metrics(tube_name)
assert_metric(metrics, "sharded_queue_storage_role_calls", "status", {
put = 1,
Expand All @@ -268,6 +281,9 @@ g.test_metrics_api_disable = function()
assert_metric(metrics, "sharded_queue_storage_tasks", "status", {
delayed = 1,
})
t.assert_not_equals(get_metric(metrics, "sharded_queue_storage_role_latency_bucket"), {})
t.assert_not_equals(get_metric(metrics, "sharded_queue_storage_role_latency_sum"), {})
t.assert_not_equals(get_metric(metrics, "sharded_queue_storage_role_latency_count"), {})

g.queue_conn:eval("require('sharded_queue.api').cfg(...)",
{{metrics = false}})
Expand All @@ -280,6 +296,9 @@ g.test_metrics_api_disable = function()
t.assert_equals(get_metric(metrics, "sharded_queue_storage_role_errors"), {})
t.assert_equals(get_metric(metrics, "sharded_queue_storage_storage_calls"), {})
t.assert_equals(get_metric(metrics, "sharded_queue_storage_storage_tasks"), {})
t.assert_equals(get_metric(metrics, "sharded_queue_storage_role_latency_bucket"), {})
t.assert_equals(get_metric(metrics, "sharded_queue_storage_role_latency_sum"), {})
t.assert_equals(get_metric(metrics, "sharded_queue_storage_role_latency_count"), {})
end

g.test_metrics_storage = function()
Expand Down Expand Up @@ -313,6 +332,7 @@ g.test_metrics_storage = function()
end

local metrics = get_metrics(tube_name, 'queue-storage-1-0')

assert_metric(metrics, "sharded_queue_storage_role_calls", "status", {
put = 1,
take = 1,
Expand Down Expand Up @@ -344,4 +364,7 @@ g.test_metrics_storage = function()
delayed = 0,
total = 0,
})
t.assert_not_equals(get_metric(metrics, "sharded_queue_storage_role_latency_bucket"), {})
t.assert_not_equals(get_metric(metrics, "sharded_queue_storage_role_latency_sum"), {})
t.assert_not_equals(get_metric(metrics, "sharded_queue_storage_role_latency_count"), {})
end

0 comments on commit 9ad99b1

Please sign in to comment.