Skip to content

Commit

Permalink
Add compatibility for Tarantool 1.7 + if_not_exists fix
Browse files Browse the repository at this point in the history
  • Loading branch information
bigbes committed Nov 15, 2016
1 parent 793f728 commit 7fd0fd7
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 63 deletions.
24 changes: 15 additions & 9 deletions queue/abstract.lua
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
local log = require('log')
local fun = require('fun')
local fiber = require('fiber')
local log = require('log')
local fun = require('fun')
local fiber = require('fiber')

local state = require('queue.abstract.state')
local state = require('queue.abstract.state')
local num_type = require('queue.compat').num_type

local session = box.session

Expand Down Expand Up @@ -317,7 +318,7 @@ function method.start()
_queue:create_index('tube',
{ type = 'tree', parts = { 1, 'str' }, unique = true})
_queue:create_index('tube_id',
{ type = 'tree', parts = { 2, 'num' }, unique = true })
{ type = 'tree', parts = { 2, num_type() }, unique = true })
end

local _cons = box.space._queue_consumers
Expand All @@ -326,20 +327,25 @@ function method.start()
_cons = box.schema
.create_space('_queue_consumers', { temporary = true })
_cons:create_index('pk',
{ type = 'tree', parts = { 1, 'num', 2, 'num' }, unique = true })
{ type = 'tree', parts = { 1, num_type(),
2, num_type() }, unique = true })
_cons:create_index('consumer',
{ type = 'tree', parts = { 3, 'num', 4, 'num' }, unique = false})
{ type = 'tree', parts = { 3, num_type(),
4, num_type() }, unique = false})
end

local _taken = box.space._queue_taken
if _taken == nil then
-- session_id, tube_id, task_id, time
_taken = box.schema.create_space('_queue_taken', { temporary = true })
_taken:create_index('pk',
{ type = 'tree', parts = { 1, 'num', 2, 'num', 3, 'num'}, unique = true})
{ type = 'tree', parts = { 1, num_type(),
2, num_type(),
3, num_type() }, unique = true})

_taken:create_index('task',
{ type = 'tree', parts = { 2, 'num', 3, 'num' }, unique = true })
{ type = 'tree', parts = { 2, num_type(),
3, num_type() }, unique = true })
end

for _, tube_rc in _queue:pairs() do
Expand Down
25 changes: 18 additions & 7 deletions queue/abstract/driver/fifo.lua
Original file line number Diff line number Diff line change
@@ -1,18 +1,29 @@
local state = require 'queue.abstract.state'
local state = require('queue.abstract.state')

local num_type = require('queue.compat').num_type
local str_type = require('queue.compat').str_type

local tube = {}
local method = {}

-- create space
function tube.create_space(space_name, opts)
local space_opts = {}
if opts.temporary then
space_opts.temporary = true
end
local space_opts = {}
local if_not_exists = opts.if_not_exists or false
space_opts.temporary = opts.temporary or false
space_opts.if_not_exists = if_not_exists

local space = box.schema.create_space(space_name, space_opts)
space:create_index('task_id', { type = 'tree', parts = { 1, 'num' }})
space:create_index('status', { type = 'tree', parts = { 2, 'str', 1, 'num' }})
space:create_index('task_id', {
type = 'tree',
parts = {1, num_type()},
if_not_exists = if_not_exists
})
space:create_index('status', {
type = 'tree',
parts = {2, str_type(), 1, num_type()},
if_not_exists = if_not_exists
})
return space
end

Expand Down
38 changes: 26 additions & 12 deletions queue/abstract/driver/fifottl.lua
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
local log = require 'log'
local fiber = require 'fiber'
local state = require 'queue.abstract.state'
local log = require('log')
local fiber = require('fiber')
local state = require('queue.abstract.state')

local num_type = require('queue.compat').num_type
local str_type = require('queue.compat').str_type

local tube = {}
local method = {}
Expand Down Expand Up @@ -39,20 +42,31 @@ function tube.create_space(space_name, opts)
opts.ttr = opts.ttr or opts.ttl
opts.pri = opts.pri or 0

local space_opts = {}
space_opts.temporary = opts.temporary
local space_opts = {}
local if_not_exists = opts.if_not_exists or false
space_opts.temporary = opts.temporary or false
space_opts.if_not_exists = if_not_exists

-- 1 2 3 4 5 6 7, 8
-- task_id, status, next_event, ttl, ttr, pri, created, data
local space = box.schema.create_space(space_name, space_opts)

space:create_index('task_id', { type = 'tree', parts = { i_id, 'num' }})
space:create_index('status',
{ type = 'tree',
parts = { i_status, 'str', i_pri, 'num', i_id, 'num' }})
space:create_index('watch',
{ type = 'tree', parts = { i_status, 'str', i_next_event, 'num' },
unique = false})
space:create_index('task_id', {
type = 'tree',
parts = {i_id, num_type()},
if_not_exists = if_not_exists
})
space:create_index('status', {
type = 'tree',
parts = {i_status, str_type(), i_pri, num_type(), i_id, num_type()},
if_not_exists = if_not_exists
})
space:create_index('watch', {
type = 'tree',
parts = {i_status, str_type(), i_next_event, num_type()},
unique = false,
if_not_exists = if_not_exists
})
return space
end

Expand Down
32 changes: 22 additions & 10 deletions queue/abstract/driver/utube.lua
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
local state = require 'queue.abstract.state'
local state = require('queue.abstract.state')
local num_type = require('queue.compat').num_type
local str_type = require('queue.compat').str_type

local tube = {}
local method = {}
Expand All @@ -7,18 +9,28 @@ local i_status = 2

-- create space
function tube.create_space(space_name, opts)
local space_opts = {}
if opts.temporary then
space_opts.temporary = true
end
local space_opts = {}
local if_not_exists = opts.if_not_exists or false
space_opts.temporary = opts.temporary or false
space_opts.if_not_exists = if_not_exists

-- id, status, utube, data
local space = box.schema.create_space(space_name, space_opts)
space:create_index('task_id', { type = 'tree', parts = { 1, 'num' }})
space:create_index('status',
{ type = 'tree', parts = { 2, 'str', 1, 'num' }})
space:create_index('utube',
{ type = 'tree', parts = { 2, 'str', 3, 'str', 1, 'num' }})
space:create_index('task_id', {
type = 'tree',
parts = {1, num_type()},
if_not_exists = if_not_exists
})
space:create_index('status', {
type = 'tree',
parts = {2, str_type(), 1, num_type()},
if_not_exists = if_not_exists
})
space:create_index('utube', {
type = 'tree',
parts = {2, str_type(), 3, str_type(), 1, num_type()},
if_not_exists = if_not_exists
})
return space
end

Expand Down
46 changes: 31 additions & 15 deletions queue/abstract/driver/utubettl.lua
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
local log = require 'log'
local fiber = require 'fiber'
local state = require 'queue.abstract.state'
local log = require('log')
local fiber = require('fiber')

local state = require('queue.abstract.state')
local num_type = require('queue.compat').num_type
local str_type = require('queue.compat').str_type

local tube = {}
local method = {}
Expand Down Expand Up @@ -40,23 +43,36 @@ function tube.create_space(space_name, opts)
opts.ttr = opts.ttr or opts.ttl
opts.pri = opts.pri or 0

local space_opts = {}
space_opts.temporary = opts.temporary
local space_opts = {}
local if_not_exists = opts.if_not_exists or false
space_opts.temporary = opts.temporary or false
space_opts.if_not_exists = if_not_exists

-- 1 2 3 4 5 6 7, 8
-- task_id, status, next_event, ttl, ttr, pri, created, data
local space = box.schema.create_space(space_name, space_opts)

space:create_index('task_id', { type = 'tree', parts = { i_id, 'num' }})
space:create_index('status',
{ type = 'tree',
parts = { i_status, 'str', i_pri, 'num', i_id, 'num' }})
space:create_index('watch',
{ type = 'tree', parts = { i_status, 'str', i_next_event, 'num' },
unique = false})
space:create_index('utube',
{ type = 'tree',
parts = { i_status, 'str', i_utube, 'str', i_id, 'num' }})
space:create_index('task_id', {
type = 'tree',
parts = {i_id, num_type()},
if_not_exists = if_not_exists
})
space:create_index('status', {
type = 'tree',
parts = {i_status, str_type(), i_pri, num_type(), i_id, num_type()},
if_not_exists = if_not_exists
})
space:create_index('watch', {
type = 'tree',
parts = {i_status, str_type(), i_next_event, num_type()},
unique = false,
if_not_exists = if_not_exists
})
space:create_index('utube', {
type = 'tree',
parts = {i_status, str_type(), i_utube, str_type(), i_id, num_type()},
if_not_exists = if_not_exists
})
return space
end

Expand Down
44 changes: 44 additions & 0 deletions queue/compat.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
local fun = require('fun')
local log = require('log')
local json = require('json')

local iter, op = fun.iter, fun.operator

function split(self, sep)
local sep, fields = sep or ":", {}
local pattern = string.format("([^%s]+)", sep)
self:gsub(pattern, function(c) table.insert(fields, c) end)
return fields
end

local function opge(l, r)
l = type(l) == 'string' and tonumber(l) or l
r = type(r) == 'string' and tonumber(r) or r
return l >= r
end

local function check_version(expected)
local vtable = split(_TARANTOOL, '.')
local vtable2 = split(vtable[3], '-')
vtable[3], vtable[4] = vtable2[1], vtable2[2]
return iter(vtable):zip(expected):every(opge)
end

local function get_actual_numtype()
return check_version{1, 7, 2} and 'unsigned' or 'num'
end

local function get_actual_strtype()
return check_version{1, 7, 2} and 'string' or 'str'
end

local function get_actual_vinylname()
return check_version{1, 7} and 'vinyl' or 'sophia'
end

return {
check_version = check_version,
vinyl_name = get_actual_vinylname,
num_type = get_actual_numtype,
str_type = get_actual_strtype
}
1 change: 0 additions & 1 deletion queue/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ end
local queue = {}
setmetatable(queue, { __index = index_unconfigured })


if rawget(box, 'space') == nil then
local orig_cfg = box.cfg
box.cfg = function(...)
Expand Down
20 changes: 11 additions & 9 deletions t/tnt/init.lua
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
local fio = require 'fio'
local errno = require 'errno'
local yaml = require 'yaml'
local log = require 'log'
local fio = require('fio')
local log = require('log')
local yaml = require('yaml')
local errno = require('errno')

local dir = os.getenv('QUEUE_TMP')
local dir = os.getenv('QUEUE_TMP')
local cleanup = false

local vinyl_name = require('queue.compat').vinyl_name

if dir == nil then
dir = fio.tempdir()
cleanup = true
Expand All @@ -21,10 +23,10 @@ local function tnt_prepare(cfg_args)
end
end

cfg_args['wal_dir'] = dir
cfg_args['snap_dir'] = dir
cfg_args['sophia_dir'] = dir
cfg_args['logger'] = fio.pathjoin(dir, 'tarantool.log')
cfg_args['wal_dir'] = dir
cfg_args['snap_dir'] = dir
cfg_args[vinyl_name() .. '_dir'] = dir
cfg_args['logger'] = fio.pathjoin(dir, 'tarantool.log')

box.cfg (cfg_args)
end
Expand Down

0 comments on commit 7fd0fd7

Please sign in to comment.