Skip to content

Commit

Permalink
driver: fix duplicate id error with mvvc on put and take
Browse files Browse the repository at this point in the history
Taking the maximum or minimum of the index is an implicit transactions, so it is
always done with 'read-confirmed' mvcc isolation level.
It can lead to errors when trying to make parallel 'put' or 'take' calls with mvcc enabled.
It is hapenning because 'max' or 'min' for several puts in parallel will be the same since
read confirmed isolation level makes visible all transactions that finished the commit.
To fix it we wrap it with box.begin/commit and set right isolation level.
Current fix does not resolve that bug in situations when we already are in transaction
since it will open nested transactions.

Part of #207
  • Loading branch information
better0fdead committed Aug 15, 2023
1 parent 16e3b74 commit a65f81e
Show file tree
Hide file tree
Showing 5 changed files with 227 additions and 11 deletions.
42 changes: 40 additions & 2 deletions queue/abstract/driver/fifo.lua
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,27 @@ end

-- put task in space
function method.put(self, data, opts)
local max = self.space.index.task_id:max()
local max

-- Taking the maximum of the index is an implicit transactions, so it is
-- always done with 'read-confirmed' mvcc isolation level.
-- It can lead to errors when trying to make parallel 'put' calls with mvcc enabled.
-- It is hapenning because 'max' for several puts in parallel will be the same since
-- read confirmed isolation level makes visible all transactions that finished the commit.
-- To fix it we wrap it with box.begin/commit and set right isolation level.
-- Current fix does not resolve that bug in situations when we already are in transaction
-- since it will open nested transactions.
-- See https://github.com/tarantool/queue/issues/207
-- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/

if box.cfg.memtx_use_mvcc_engine and (not box.is_in_txn()) then
box.begin({txn_isolation = 'read-committed'})
max = self.space.index.task_id:max()
box.commit()
else
max = self.space.index.task_id:max()
end

local id = max and max[1] + 1 or 0
local task = self.space:insert{id, state.READY, data}
self.on_task_change(task, 'put')
Expand All @@ -77,7 +97,25 @@ end

-- take task
function method.take(self)
local task = self.space.index.status:min{state.READY}
local task
-- Taking the minimum is an implicit transactions, so it is
-- always done with 'read-confirmed' mvcc isolation level.
-- It can lead to errors when trying to make parallel 'take' calls with mvcc enabled.
-- It is hapenning because 'min' for several takes in parallel will be the same since
-- read confirmed isolation level makes visible all transactions that finished the commit.
-- To fix it we wrap it with box.begin/commit and set right isolation level.
-- Current fix does not resolve that bug in situations when we already are in transaction
-- since it will open nested transactions.
-- See https://github.com/tarantool/queue/issues/207
-- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/
if box.cfg.memtx_use_mvcc_engine and (not box.is_in_txn()) then
box.begin({txn_isolation = 'read-committed'})
task = self.space.index.status:min{state.READY}
box.commit()
else
task = self.space.index.status:min{state.READY}
end

if task ~= nil and task[2] == state.READY then
task = self.space:update(task[1], { { '=', 2, state.TAKEN } })
self.on_task_change(task, 'take')
Expand Down
41 changes: 36 additions & 5 deletions queue/abstract/driver/fifottl.lua
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,27 @@ end

-- put task in space
function method.put(self, data, opts)
local max = self.space.index.task_id:max()
local max

-- Taking the maximum of the index is an implicit transactions, so it is
-- always done with 'read-confirmed' mvcc isolation level.
-- It can lead to errors when trying to make parallel 'put' calls with mvcc enabled.
-- It is hapenning because 'max' for several puts in parallel will be the same since
-- read confirmed isolation level makes visible all transactions that finished the commit.
-- To fix it we wrap it with box.begin/commit and set right isolation level.
-- Current fix does not resolve that bug in situations when we already are in transaction
-- since it will open nested transactions.
-- See https://github.com/tarantool/queue/issues/207
-- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/

if box.cfg.memtx_use_mvcc_engine and not box.is_in_txn() then
box.begin({txn_isolation = 'read-committed'})
max = self.space.index.task_id:max()
box.commit()
else
max = self.space.index.task_id:max()
end

local id = max and max[i_id] + 1 or 0

local status
Expand Down Expand Up @@ -265,10 +285,21 @@ end
-- take task
function method.take(self)
local task = nil
for _, t in self.space.index.status:pairs({state.READY}) do
if not is_expired(t) then
task = t
break
if box.cfg.memtx_use_mvcc_engine and (not box.is_in_txn()) then
box.begin({txn_isolation = 'read-committed'})
for _, t in self.space.index.status:pairs({state.READY}) do
if not is_expired(t) then
task = t
break
end
end
box.commit()
else
for _, t in self.space.index.status:pairs({state.READY}) do
if not is_expired(t) then
task = t
break
end
end
end

Expand Down
41 changes: 39 additions & 2 deletions queue/abstract/driver/utube.lua
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,27 @@ end

-- put task in space
function method.put(self, data, opts)
local max = self.space.index.task_id:max()
local max

-- Taking the maximum of the index is an implicit transactions, so it is
-- always done with 'read-confirmed' mvcc isolation level.
-- It can lead to errors when trying to make parallel 'put' calls with mvcc enabled.
-- It is hapenning because 'max' for several puts in parallel will be the same since
-- read confirmed isolation level makes visible all transactions that finished the commit.
-- To fix it we wrap it with box.begin/commit and set right isolation level.
-- Current fix does not resolve that bug in situations when we already are in transaction
-- since it will open nested transactions.
-- See https://github.com/tarantool/queue/issues/207
-- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/

if box.cfg.memtx_use_mvcc_engine and not box.is_in_txn() then
box.begin({txn_isolation = 'read-committed'})
max = self.space.index.task_id:max()
box.commit()
else
max = self.space.index.task_id:max()
end

local id = max and max[1] + 1 or 0
local task = self.space:insert{id, state.READY, tostring(opts.utube), data}
self.on_task_change(task, 'put')
Expand All @@ -89,8 +109,25 @@ function method.take(self)
if task[2] ~= state.READY then
break
end
local taken
-- Taking the minimum is an implicit transactions, so it is
-- always done with 'read-confirmed' mvcc isolation level.
-- It can lead to errors when trying to make parallel 'take' calls with mvcc enabled.
-- It is hapenning because 'min' for several takes in parallel will be the same since
-- read confirmed isolation level makes visible all transactions that finished the commit.
-- To fix it we wrap it with box.begin/commit and set right isolation level.
-- Current fix does not resolve that bug in situations when we already are in transaction
-- since it will open nested transactions.
-- See https://github.com/tarantool/queue/issues/207
-- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/
if box.cfg.memtx_use_mvcc_engine and (not box.is_in_txn()) then
box.begin({txn_isolation = 'read-committed'})
taken = self.space.index.utube:min{state.TAKEN, task[3]}
box.commit()
else
taken = self.space.index.utube:min{state.TAKEN, task[3]}
end

local taken = self.space.index.utube:min{state.TAKEN, task[3]}
if taken == nil or taken[2] ~= state.TAKEN then
task = self.space:update(task[1], { { '=', 2, state.TAKEN } })
self.on_task_change(task, 'take')
Expand Down
42 changes: 40 additions & 2 deletions queue/abstract/driver/utubettl.lua
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,27 @@ end

-- put task in space
function method.put(self, data, opts)
local max = self.space.index.task_id:max()
local max

-- Taking the maximum of the index is an implicit transactions, so it is
-- always done with 'read-confirmed' mvcc isolation level.
-- It can lead to errors when trying to make parallel 'put' calls with mvcc enabled.
-- It is hapenning because 'max' for several puts in parallel will be the same since
-- read confirmed isolation level makes visible all transactions that finished the commit.
-- To fix it we wrap it with box.begin/commit and set right isolation level.
-- Current fix does not resolve that bug in situations when we already are in transaction
-- since it will open nested transactions.
-- See https://github.com/tarantool/queue/issues/207
-- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/

if box.cfg.memtx_use_mvcc_engine and not box.is_in_txn() then
box.begin({txn_isolation = 'read-committed'})
max = self.space.index.task_id:max()
box.commit()
else
max = self.space.index.task_id:max()
end

local id = max and max[i_id] + 1 or 0

local status
Expand Down Expand Up @@ -280,7 +300,25 @@ function method.take(self)
break
elseif not is_expired(t) then
local next_event = util.time() + t[i_ttr]
local taken = self.space.index.utube:min{state.TAKEN, t[i_utube]}
local taken
-- Taking the minimum is an implicit transactions, so it is
-- always done with 'read-confirmed' mvcc isolation level.
-- It can lead to errors when trying to make parallel 'take' calls with mvcc enabled.
-- It is hapenning because 'min' for several takes in parallel will be the same since
-- read confirmed isolation level makes visible all transactions that finished the commit.
-- To fix it we wrap it with box.begin/commit and set right isolation level.
-- Current fix does not resolve that bug in situations when we already are in transaction
-- since it will open nested transactions.
-- See https://github.com/tarantool/queue/issues/207
-- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/
if box.cfg.memtx_use_mvcc_engine and (not box.is_in_txn()) then
box.begin({txn_isolation = 'read-committed'})
taken = self.space.index.utube:min{state.TAKEN, t[i_utube]}
box.commit()
else
taken = self.space.index.utube:min{state.TAKEN, t[i_utube]}
end

if taken == nil or taken[i_status] ~= state.TAKEN then
t = self.space:update(t[1], {
{ '=', i_status, state.TAKEN },
Expand Down
72 changes: 72 additions & 0 deletions t/220-mvcc.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#!/usr/bin/env tarantool
local qc = require('queue.compat')
local log = require('log')
if not qc.check_version({2, 6, 1}) then
log.info('Tests skipped, tarantool version < 2.6.1')
return
end
local yaml = require('yaml')
local fiber = require('fiber')

local test = require('tap').test()
test:plan(6)

local queue = require('queue')
local state = require('queue.abstract.state')

local tnt = require('t.tnt')
tnt.cfg{memtx_use_mvcc_engine = true}

local engine = 'memtx'

test:ok(rawget(box, 'space'), 'box started')
test:ok(queue, 'queue is loaded')

local tube = queue.create_tube('test', 'fifo', { engine = engine, temporary = false})
test:ok(tube, 'test tube created')
test:is(tube.name, 'test', 'tube.name')
test:is(tube.type, 'fifo', 'tube.type')


--- That test checks that https://github.com/tarantool/queue/pull/211 is fixed.
-- Previously trying to make parallel 'put' or 'take' calls failed with mvcc enabled.
test:test('concurent put and take with mvcc', function(test)
test:plan(6)
-- channels are used to wait for fibers to finish
-- and check results of the 'take'/'put'.
local channel_put = fiber.channel(2)
test:ok(channel_put, 'channel created')
local channel_take = fiber.channel(2)
test:ok(channel_take, 'channel created')

for i = 1, 2 do
fiber.create(function(i)
local err = pcall(tube.put, tube, i)
channel_put:put(err)
end, i)
end

for i = 1, 2 do
local res = channel_put:get(1)
test:ok(res, 'task ' .. i .. ' was put')
end

for i = 1, 2 do
fiber.create(function(i)
local err = pcall(tube.take, tube)
channel_take:put(err)
end, i)
end

for i = 1, 2 do
local res = channel_take:get()
test:ok(res, 'task ' .. i .. ' was taken')
end


end)


tnt.finish()
os.exit(test:check() and 0 or 1)
-- vim: set ft=lua:

0 comments on commit a65f81e

Please sign in to comment.