From 9ad99b1a21906f21e3f3a4bd30ce94026622c9cd Mon Sep 17 00:00:00 2001 From: Oleg Jukovec Date: Tue, 19 Mar 2024 15:48:39 +0300 Subject: [PATCH] api: add latency metrics for API calls * Metric `sharded_queue_api_role_latency` is a histogram for latency fo API calls on an instance with `sharded_queue.api` role. * Metric `sharded_queue_storage_role_latency` is a histogram for latency fo API calls on an instance with `sharded_queue.storage` role. Closes #71 --- CHANGELOG.md | 4 ++++ sharded_queue/api.lua | 12 ++++++++++++ sharded_queue/metrics.lua | 8 ++++++++ sharded_queue/storage.lua | 15 ++++++++++++++- test/metrics_test.lua | 23 +++++++++++++++++++++++ 5 files changed, 61 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2de9caf..534c61a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. instance with `sharded_queue.api` role (#69). - Metric `sharded_queue_api_role_errors` is a counter of API calls that failed on an instance with `sharded_queue.api` role (#69). +- Metric `sharded_queue_api_role_latency` is a histogram for latency fo API + calls on an instance with `sharded_queue.api` role (#71). - Metric `sharded_queue_storage_role_calls` is a counter of API calls on an instance with `sharded_queue.storage` role (#69). - Metric `sharded_queue_storage_role_errors` is a counter of API calls that @@ -23,6 +25,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. - Metric `sharded_queue_storage_tasks` as an equivalent of `sharded_queue_tasks` for an instance with `sharded_queue.storage` role (#69). +- Metric `sharded_queue_storage_role_latency` is a histogram for latency fo API + calls on an instance with `sharded_queue.storage` role (#71). ### Changed diff --git a/sharded_queue/api.lua b/sharded_queue/api.lua index 5bb8af9..2e959eb 100644 --- a/sharded_queue/api.lua +++ b/sharded_queue/api.lua @@ -426,12 +426,24 @@ end local function wrap_sharded_queue_call_counters(call, fun) return function(self, ...) + local before = fiber.clock() sharded_queue.api_call_stats:increment(self.tube_name, call) + local ok, ret = pcall(fun, self, ...) + + local after = fiber.clock() + if sharded_queue.cfg.metrics + and sharded_queue.metrics_stats.collectors ~= nil then + sharded_queue.metrics_stats.collectors.latency.collector:observe( + after - before, + {name = self.tube_name, status = call} + ) + end if not ok then sharded_queue.api_error_stats:increment(self.tube_name, call) error(ret) end + return ret end end diff --git a/sharded_queue/metrics.lua b/sharded_queue/metrics.lua index e13f19a..7490b46 100644 --- a/sharded_queue/metrics.lua +++ b/sharded_queue/metrics.lua @@ -31,6 +31,14 @@ local function create_collectors(role, metrics_stats) values = {}, } + collectors.latency = { + collector = metrics.histogram( + string.format("sharded_queue_%s_role_latency", role), + string.format("sharded_queue's latency of API calls to %s", + role_full) + ), + } + if role == "api" then -- Backward compatibility for the sharded_queue.api role. collectors.calls = { diff --git a/sharded_queue/storage.lua b/sharded_queue/storage.lua index d03c8cf..f88b42c 100644 --- a/sharded_queue/storage.lua +++ b/sharded_queue/storage.lua @@ -1,5 +1,6 @@ -local log = require('log') +local fiber = require('fiber') local json = require('json') +local log = require('log') local cartridge = require('cartridge') @@ -149,11 +150,23 @@ local function apply_config(cfg, opts) local tube_name = args.tube_name if tubes[tube_name].method[name] == nil then error(('Method %s not implemented in tube %s'):format(name, tube_name)) end + local before = fiber.clock() storage.api_call_stats:increment(tube_name, name) + local ok, ret, err = pcall(tubes[tube_name].method[name], args) + + local after = fiber.clock() + if storage.cfg.metrics + and storage.metrics_stats.collectors ~= nil then + storage.metrics_stats.collectors.latency.collector:observe( + after - before, + {name = tube_name, status = name} + ) + end if not ok or err ~= nil then storage.api_error_stats:increment(tube_name, name) end + if not ok then error(ret) end diff --git a/test/metrics_test.lua b/test/metrics_test.lua index be08985..23e98ec 100644 --- a/test/metrics_test.lua +++ b/test/metrics_test.lua @@ -165,6 +165,9 @@ g.test_metrics_api = function() delayed = task_count, total = task_count, }) + t.assert_not_equals(get_metric(metrics, "sharded_queue_api_role_latency_bucket"), {}) + t.assert_not_equals(get_metric(metrics, "sharded_queue_api_role_latency_sum"), {}) + t.assert_not_equals(get_metric(metrics, "sharded_queue_api_role_latency_count"), {}) -- Check metrics on storages. metrics = get_storages_metrics(tube_name) @@ -209,6 +212,9 @@ g.test_metrics_api = function() delayed = task_count, total = task_count, }) + t.assert_not_equals(get_metric(metrics, "sharded_queue_storage_role_latency_bucket"), {}) + t.assert_not_equals(get_metric(metrics, "sharded_queue_storage_role_latency_sum"), {}) + t.assert_not_equals(get_metric(metrics, "sharded_queue_storage_role_latency_count"), {}) end g.test_metrics_api_disabled = function() @@ -231,6 +237,9 @@ g.test_metrics_api_disabled = function() t.assert_equals(get_metric(metrics, "sharded_queue_storage_role_errors"), {}) t.assert_equals(get_metric(metrics, "sharded_queue_storage_storage_calls"), {}) t.assert_equals(get_metric(metrics, "sharded_queue_storage_storage_tasks"), {}) + t.assert_equals(get_metric(metrics, "sharded_queue_storage_role_latency_bucket"), {}) + t.assert_equals(get_metric(metrics, "sharded_queue_storage_role_latency_sum"), {}) + t.assert_equals(get_metric(metrics, "sharded_queue_storage_role_latency_count"), {}) end g.test_metrics_api_disable = function() @@ -255,6 +264,10 @@ g.test_metrics_api_disable = function() assert_metric(metrics, "sharded_queue_tasks", "status", { delayed = 1, }) + t.assert_not_equals(get_metric(metrics, "sharded_queue_api_role_latency_bucket"), {}) + t.assert_not_equals(get_metric(metrics, "sharded_queue_api_role_latency_sum"), {}) + t.assert_not_equals(get_metric(metrics, "sharded_queue_api_role_latency_count"), {}) + metrics = get_storages_metrics(tube_name) assert_metric(metrics, "sharded_queue_storage_role_calls", "status", { put = 1, @@ -268,6 +281,9 @@ g.test_metrics_api_disable = function() assert_metric(metrics, "sharded_queue_storage_tasks", "status", { delayed = 1, }) + t.assert_not_equals(get_metric(metrics, "sharded_queue_storage_role_latency_bucket"), {}) + t.assert_not_equals(get_metric(metrics, "sharded_queue_storage_role_latency_sum"), {}) + t.assert_not_equals(get_metric(metrics, "sharded_queue_storage_role_latency_count"), {}) g.queue_conn:eval("require('sharded_queue.api').cfg(...)", {{metrics = false}}) @@ -280,6 +296,9 @@ g.test_metrics_api_disable = function() t.assert_equals(get_metric(metrics, "sharded_queue_storage_role_errors"), {}) t.assert_equals(get_metric(metrics, "sharded_queue_storage_storage_calls"), {}) t.assert_equals(get_metric(metrics, "sharded_queue_storage_storage_tasks"), {}) + t.assert_equals(get_metric(metrics, "sharded_queue_storage_role_latency_bucket"), {}) + t.assert_equals(get_metric(metrics, "sharded_queue_storage_role_latency_sum"), {}) + t.assert_equals(get_metric(metrics, "sharded_queue_storage_role_latency_count"), {}) end g.test_metrics_storage = function() @@ -313,6 +332,7 @@ g.test_metrics_storage = function() end local metrics = get_metrics(tube_name, 'queue-storage-1-0') + assert_metric(metrics, "sharded_queue_storage_role_calls", "status", { put = 1, take = 1, @@ -344,4 +364,7 @@ g.test_metrics_storage = function() delayed = 0, total = 0, }) + t.assert_not_equals(get_metric(metrics, "sharded_queue_storage_role_latency_bucket"), {}) + t.assert_not_equals(get_metric(metrics, "sharded_queue_storage_role_latency_sum"), {}) + t.assert_not_equals(get_metric(metrics, "sharded_queue_storage_role_latency_count"), {}) end