Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
oleg-jukovec committed Apr 8, 2024
1 parent 9f2ceed commit a00dcb5
Show file tree
Hide file tree
Showing 27 changed files with 940 additions and 237 deletions.
67 changes: 67 additions & 0 deletions roles/sharded-queue-router.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
local config = require('sharded_queue.router.config')
local is_metrics_supported = require('sharded_queue.metrics').is_supported
local metrics = require('sharded_queue.router.metrics')
local roles = require('sharded_queue.roles')
local utils = require('sharded_queue.utils')
local queue = require('sharded_queue.router.queue')

local role_name = "roles.sharded-queue-router"

local function validate(conf)
conf = conf or {}
if not roles.is_sharding_role_enabled('router') then
error(role_name .. ": instance must be a sharding router to use the role")
end

local ok, err = utils.validate_tubes(conf.tubes or {}, false)
if not ok then
error(role_name .. ": " .. err)
end
ok, err = utils.validate_cfg(conf.cfg or {})
if not ok then
error(role_name .. ": " .. err)
end
return true
end

local function apply(conf)
conf = conf or {}

queue.export_globals()

local conf_tubes = conf.tubes or {}
local conf_cfg = conf.cfg or {}
if conf_cfg.metrics ~= nil then
config.metrics = conf_cfg.metrics and true or false
else
config.metrics = is_metrics_supported()
end

for tube_name, options in pairs(conf_tubes) do
if queue.map()[tube_name] == nil then
queue.add(tube_name, metrics, options)
end
end

for tube_name, _ in pairs(queue.map()) do
if conf_tubes[tube_name] == nil then
queue.remove(tube_name)
end
end

if config.metrics then
metrics.enable(queue)
else
metrics.disable()
end
end

local function stop()
queue.clear_globals()
end

return {
validate = validate,
apply = apply,
stop = stop,
}
79 changes: 79 additions & 0 deletions roles/sharded-queue-storage.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
local config = require('sharded_queue.storage.config')
local is_metrics_supported = require('sharded_queue.metrics').is_supported
local methods = require('sharded_queue.storage.methods')
local metrics = require('sharded_queue.storage.metrics')
local roles = require('sharded_queue.roles')
local stats_storage = require('sharded_queue.stats.storage')
local tubes = require('sharded_queue.storage.tubes').new()
local utils = require('sharded_queue.utils')

local role_name = "roles.sharded-queue-storage"
local watcher = nil

local function validate(conf)
conf = conf or {}

if not roles.is_sharding_role_enabled('storage') then
error(role_name .. ": instance must be a sharding storage to use the role")
end

local ok, err = utils.validate_tubes(conf.tubes or {}, true)
if not ok then
error(role_name .. ": " .. err)
end
ok, err = utils.validate_cfg(conf.cfg or {})
if not ok then
error(role_name .. ": " .. err)
end
return true
end

local function apply(conf)
conf = conf or {}

local conf_tubes = conf.tubes or {}
local conf_cfg = conf.cfg or {}
if conf_cfg.metrics ~= nil then
config.metrics = conf_cfg.metrics and true or false
else
config.metrics = is_metrics_supported()
end

if watcher ~= nil then
watcher:unregister()
end
watcher = box.watch('box.status', function(_, status)
if status.is_ro == false then
stats_storage.init()

local new = tubes:update(conf_tubes)
for _, tube in ipairs(new) do
stats_storage.reset(tube)
end

if config.metrics then
metrics.enable(tubes:map())
end
methods.init(metrics, tubes)
end
end)

if config.metrics then
metrics.enable(tubes:map())
else
metrics.disable()
end
end

local function stop()
if watcher ~= nil then
watcher:unregister()
watcher = nil
end
end

return {
validate = validate,
apply = apply,
stop = stop,
}
4 changes: 4 additions & 0 deletions sharded-queue-scm-1.rockspec
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@ build = {
build_target = 'all',
install = {
lua = {
["roles.sharded-queue-router"] = "roles/sharded-queue-router.lua",
["roles.sharded-queue-storage"] = "roles/sharded-queue-storage.lua",
['sharded_queue.api'] = 'sharded_queue/api.lua',
['sharded_queue.storage'] = 'sharded_queue/storage.lua',
['sharded_queue.drivers.fifo'] = 'sharded_queue/drivers/fifo.lua',
['sharded_queue.drivers.fifottl'] = 'sharded_queue/drivers/fifottl.lua',
['sharded_queue.time'] = 'sharded_queue/time.lua',
['sharded_queue.utils'] = 'sharded_queue/utils.lua',
['sharded_queue.metrics'] = 'sharded_queue/metrics.lua',
['sharded_queue.roles'] = 'sharded_queue/roles.lua',
['sharded_queue.stash'] = 'sharded_queue/stash.lua',
['sharded_queue.state'] = 'sharded_queue/state.lua',
['sharded_queue.stats.storage'] = 'sharded_queue/stats/storage.lua',
Expand All @@ -39,6 +42,7 @@ build = {
['sharded_queue.storage.methods'] = 'sharded_queue/storage/methods.lua',
['sharded_queue.storage.metrics'] = 'sharded_queue/storage/metrics.lua',
['sharded_queue.storage.tubes'] = 'sharded_queue/storage/tubes.lua',
['sharded_queue.storage.vshard_utils'] = 'sharded_queue/storage/vshard_utils.lua',
['sharded_queue.version'] = 'sharded_queue/version.lua',
},
},
Expand Down
5 changes: 5 additions & 0 deletions sharded_queue/drivers/fifo.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ local state = require('sharded_queue.state')
local utils = require('sharded_queue.utils')
local log = require('log') -- luacheck: ignore
local stats = require('sharded_queue.stats.storage')
local vshard_utils = require('sharded_queue.storage.vshard_utils')

local function update_stat(tube_name, name)
stats.update(tube_name, name, '+', 1)
Expand All @@ -10,6 +11,7 @@ end
local method = {}

local function tube_create(args)
local user = vshard_utils.get_this_replica_user() or 'guest'
local space_options = {}
local if_not_exists = args.options.if_not_exists or true
space_options.if_not_exists = if_not_exists
Expand Down Expand Up @@ -47,6 +49,9 @@ local function tube_create(args)
if_not_exists = if_not_exists
})

box.schema.user.grant(user, 'read,write', 'space', args.name,
{if_not_exists = true})

return space
end

Expand Down
5 changes: 5 additions & 0 deletions sharded_queue/drivers/fifottl.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ local state = require('sharded_queue.state')
local utils = require('sharded_queue.utils')
local stats = require('sharded_queue.stats.storage')
local time = require('sharded_queue.time')
local vshard_utils = require('sharded_queue.storage.vshard_utils')
local log = require('log') -- luacheck: ignore

local index = {
Expand Down Expand Up @@ -136,6 +137,7 @@ end
-- QUEUE METHODs --

local function tube_create(args)
local user = vshard_utils.get_this_replica_user() or 'guest'
local space_options = {}
local if_not_exists = args.options.if_not_exists or true
space_options.if_not_exists = if_not_exists
Expand Down Expand Up @@ -201,6 +203,9 @@ local function tube_create(args)
if_not_exists = if_not_exists
})

box.schema.user.grant(user, 'read,write', 'space', args.name,
{if_not_exists = true})

-- run fiber for tracking event
fiber.create(fiber_common, args.name)
end
Expand Down
15 changes: 15 additions & 0 deletions sharded_queue/roles.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
local config = require('config')

local function is_sharding_role_enabled(expected)
local sharding_roles = config:get('sharding.roles')
for _, role in ipairs(sharding_roles or {}) do
if role == expected then
return true
end
end
return false
end

return {
is_sharding_role_enabled = is_sharding_role_enabled,
}
34 changes: 5 additions & 29 deletions sharded_queue/router/queue.lua
Original file line number Diff line number Diff line change
Expand Up @@ -46,35 +46,7 @@ end
-- The Tarantool 3.0 does not support to update dinamically a configuration, so
-- a user must update the configuration by itself.
if is_cartridge_package then
local function validate_options(options)
if not options then return true end

if options.wait_factor then
if type(options.wait_factor) ~= 'number'
or options.wait_factor < 1
then
return false, "wait_factor must be number greater than or equal to 1"
end
end

local _, err = utils.normalize.log_request(options.log_request)
if err then
return false, err
end

if options.wait_max ~= nil then
local err
options.wait_max, err = utils.normalize.wait_max(options.wait_max)
if err ~= nil then
return false, err
end
end

return true
end

queue_global.create_tube = function(tube_name, options)
require('log').info("CREATE TUBE")
local tubes = cartridge.config_get_deepcopy('tubes') or {}

if tube_name == 'cfg' then
Expand All @@ -85,7 +57,7 @@ if is_cartridge_package then
return nil
end

local ok , err = validate_options(options)
local ok, err = utils.validate_options(options)
if not ok then error(err) end

options = table.deepcopy(options or {})
Expand All @@ -107,6 +79,10 @@ local function export_globals()
rawset(_G, 'queue', queue_global)
end

local function clear_globals()
rawset(_G, 'queue', nil)
end

local function add(name, metrics, options)
queue_global.tube[name] = tube.new(name, metrics, options)
end
Expand Down
5 changes: 5 additions & 0 deletions sharded_queue/stats/storage.lua
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
---- Module used to store storage-specific statistics.

local state = require('sharded_queue.state')
local vshard_utils = require('sharded_queue.storage.vshard_utils')

local statistics = {}

Expand All @@ -17,6 +18,7 @@ local actions = {
}

function statistics.init()
local user = vshard_utils.get_this_replica_user() or 'guest'
local space_stat = box.schema.space.create('_queue_statistics',
{ if_not_exists = true })
space_stat:format({
Expand All @@ -39,6 +41,9 @@ function statistics.init()
},
if_not_exists = true
})

box.schema.user.grant(user, 'read,write', 'space', '_queue_statistics',
{if_not_exists = true})
end

function statistics.update(tube_name, stat_name, operation, value)
Expand Down
11 changes: 11 additions & 0 deletions sharded_queue/storage/methods.lua
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
local fiber = require('fiber')
local vshard_utils = require('sharded_queue.storage.vshard_utils')

local stats_storage = require('sharded_queue.stats.storage')

local methods = {
Expand All @@ -15,10 +17,15 @@ local methods = {
}

local function init(metrics, tubes)
local user = vshard_utils.get_this_replica_user() or 'guest'

for _, method in pairs(methods) do
local func = function(args)
args = args or {}
args.options = tubes:get_options(args.tube_name) or {}
if args.options.priority == nil and args.options.pri ~= nil then
args.options.priority = args.options.pri
end

local tube_name = args.tube_name
local before = fiber.clock()
Expand All @@ -37,6 +44,8 @@ local function init(metrics, tubes)
local global_name = 'tube_' .. method
rawset(_G, global_name, func)
box.schema.func.create(global_name, { if_not_exists = true })
box.schema.user.grant(user, 'execute', 'function', global_name,
{if_not_exists = true})
end

local tube_statistic_func = function(args)
Expand All @@ -55,6 +64,8 @@ local function init(metrics, tubes)

rawset(_G, 'tube_statistic', tube_statistic_func)
box.schema.func.create('tube_statistic', { if_not_exists = true })
box.schema.user.grant(user, 'execute', 'function', 'tube_statistic',
{if_not_exists = true})
end

local function get_list()
Expand Down
Loading

0 comments on commit a00dcb5

Please sign in to comment.