Skip to content

Commit

Permalink
internal: replace cartridge calls by vshard calls
Browse files Browse the repository at this point in the history
Calls on vshard replicasets at first glance do not bring additional
load, but at the same time they make the code more general and portable
to Tarantool 3.0.

Part of #68
  • Loading branch information
oleg-jukovec committed Apr 3, 2024
1 parent 7d16a5c commit 0ce2cf0
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 36 deletions.
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,11 @@ cartridge.cfg({
...
})
```
3. Queue API will be available on all nodes where sharded_queue.api is enabled

3. Enable the `sharded_queue.storage` role on all storage nodes. Be careful,
there should be no replicasets with `cartridge.roles.vshard-storage` role,
but without the `sharded_queue.storage` role.
4. Queue API will be available on all nodes where the `sharded_queue.api` role
is enabled.

## Usage as a ready-to-deploy service

Expand Down
66 changes: 32 additions & 34 deletions sharded_queue/api.lua
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,14 @@ local state = require('sharded_queue.state')
local time = require('sharded_queue.time')
local utils = require('sharded_queue.utils')

local cartridge_pool = require('cartridge.pool')
local cartridge_rpc = require('cartridge.rpc')

local stash_names = {
cfg = '__sharded_queue_api_cfg',
metrics_stats = '__sharded_queue_api_metrics_stats',
}
stash.setup(stash_names)

local remote_call = function(method, instance_uri, args, timeout)
local conn = cartridge_pool.connect(instance_uri)
return conn:call(method, { args }, { timeout = timeout })
local remote_call = function(method, replicaset, args, timeout)
return replicaset:callrw(method, { args }, { timeout = timeout })
end

local function validate_options(options)
Expand Down Expand Up @@ -89,16 +85,16 @@ function sharded_tube.put(self, data, options)
end

-- function for try get task from instance --
local function take_task(storages, options, take_timeout, call_timeout)
for _, instance_uri in ipairs(storages) do
local function take_task(replicasets, options, take_timeout, call_timeout)
for _, replicaset in ipairs(replicasets) do
if take_timeout == 0 then
break
end
local begin = time.cur()

-- try take task from all instance
local ok, ret = pcall(remote_call, 'tube_take',
instance_uri,
replicaset,
options,
call_timeout
)
Expand Down Expand Up @@ -148,13 +144,19 @@ function sharded_tube.take(self, timeout, options)
while take_timeout ~= 0 do
local begin = time.cur()

local storages = cartridge_rpc.get_candidates(
'sharded_queue.storage',
{ leader_only = true })
local shards, err = vshard.router.routeall()
if err ~= nil then
error(err)
end

utils.array_shuffle(storages)
local replicasets = {}
for _, replicaset in pairs(shards) do
table.insert(replicasets, replicaset)
end
utils.array_shuffle(replicasets)

local task = take_task(storages, options, take_timeout, remote_call_timeout)
local task = take_task(replicasets,
options, take_timeout, remote_call_timeout)

if task ~= nil then
if options.extra.log_request then
Expand Down Expand Up @@ -340,19 +342,18 @@ end
function sharded_tube.kick(self, count, options)
-- try kick few tasks --

local storages = cartridge_rpc.get_candidates(
'sharded_queue.storage',
{
leader_only = true
})

local kicked_count = 0 -- count kicked task
for _, instance_uri in ipairs(storages) do
local shards, err = vshard.router.routeall()
if err ~= nil then
error(err)
end

for _, replicaset in pairs(shards) do
local opts = table.deepcopy(options or {})
opts.tube_name = self.tube_name
opts.count = count - kicked_count

local ok, k = pcall(remote_call, 'tube_kick', instance_uri, opts)
local ok, k = pcall(remote_call, 'tube_kick', replicaset, opts)
if not ok then
log.error(k)
return kicked_count
Expand Down Expand Up @@ -475,15 +476,9 @@ function sharded_queue.statistics(tube_name)
if not tube_name then
return
end
-- collect stats from all storages
local storages = cartridge_rpc.get_candidates(
'sharded_queue.storage',
{
leader_only = true
})

local stats_collection, err = cartridge_pool.map_call('tube_statistic',
{{ tube_name = tube_name }}, {uri_list=storages})
local stats_collection, err = vshard.router.map_callrw('tube_statistic',
{{ tube_name = tube_name }})
if err ~= nil then
return nil, err
end
Expand All @@ -496,13 +491,16 @@ function sharded_queue.statistics(tube_name)
return nil
end

-- merge
local stat = { tasks = {}, calls = {} }
for _, uri_stat in pairs(stats_collection) do
for name, count in pairs(uri_stat.tasks) do
for _, replicaset_stats in pairs(stats_collection) do
if type(replicaset_stats) ~= 'table' or next(replicaset_stats) == nil then
return nil, 'Invalid stats'
end

for name, count in pairs(replicaset_stats[1].tasks) do
stat.tasks[name] = (stat.tasks[name] or 0) + count
end
for name, count in pairs(uri_stat.calls) do
for name, count in pairs(replicaset_stats[1].calls) do
stat.calls[name] = (stat.calls[name] or 0) + count
end
end
Expand Down

0 comments on commit 0ce2cf0

Please sign in to comment.