diff --git a/queue/abstract/driver/fifo.lua b/queue/abstract/driver/fifo.lua index 4093059c..bc723e6c 100644 --- a/queue/abstract/driver/fifo.lua +++ b/queue/abstract/driver/fifo.lua @@ -68,7 +68,27 @@ end -- put task in space function method.put(self, data, opts) - local max = self.space.index.task_id:max() + local max + + -- Taking the maximum of the index is an implicit transactions, so it is + -- always done with 'read-confirmed' mvcc isolation level. + -- It can lead to errors when trying to make parallel 'put' calls with mvcc enabled. + -- It is hapenning because 'max' for several puts in parallel will be the same since + -- read confirmed isolation level makes visible all transactions that finished the commit. + -- To fix it we wrap it with box.begin/commit and set right isolation level. + -- Current fix does not resolve that bug in situations when we already are in transaction + -- since it will open nested transactions. + -- See https://github.com/tarantool/queue/issues/207 + -- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/ + + if box.cfg.memtx_use_mvcc_engine and (not box.is_in_txn()) then + box.begin({txn_isolation = 'read-committed'}) + max = self.space.index.task_id:max() + box.commit() + else + max = self.space.index.task_id:max() + end + local id = max and max[1] + 1 or 0 local task = self.space:insert{id, state.READY, data} self.on_task_change(task, 'put') @@ -77,7 +97,25 @@ end -- take task function method.take(self) - local task = self.space.index.status:min{state.READY} + local task + -- Taking the minimum is an implicit transactions, so it is + -- always done with 'read-confirmed' mvcc isolation level. + -- It can lead to errors when trying to make parallel 'take' calls with mvcc enabled. + -- It is hapenning because 'min' for several takes in parallel will be the same since + -- read confirmed isolation level makes visible all transactions that finished the commit. + -- To fix it we wrap it with box.begin/commit and set right isolation level. + -- Current fix does not resolve that bug in situations when we already are in transaction + -- since it will open nested transactions. + -- See https://github.com/tarantool/queue/issues/207 + -- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/ + if box.cfg.memtx_use_mvcc_engine and (not box.is_in_txn()) then + box.begin({txn_isolation = 'read-committed'}) + task = self.space.index.status:min{state.READY} + box.commit() + else + task = self.space.index.status:min{state.READY} + end + if task ~= nil and task[2] == state.READY then task = self.space:update(task[1], { { '=', 2, state.TAKEN } }) self.on_task_change(task, 'take') diff --git a/queue/abstract/driver/fifottl.lua b/queue/abstract/driver/fifottl.lua index 496a324c..d2b60809 100644 --- a/queue/abstract/driver/fifottl.lua +++ b/queue/abstract/driver/fifottl.lua @@ -209,7 +209,27 @@ end -- put task in space function method.put(self, data, opts) - local max = self.space.index.task_id:max() + local max + + -- Taking the maximum of the index is an implicit transactions, so it is + -- always done with 'read-confirmed' mvcc isolation level. + -- It can lead to errors when trying to make parallel 'put' calls with mvcc enabled. + -- It is hapenning because 'max' for several puts in parallel will be the same since + -- read confirmed isolation level makes visible all transactions that finished the commit. + -- To fix it we wrap it with box.begin/commit and set right isolation level. + -- Current fix does not resolve that bug in situations when we already are in transaction + -- since it will open nested transactions. + -- See https://github.com/tarantool/queue/issues/207 + -- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/ + + if box.cfg.memtx_use_mvcc_engine and not box.is_in_txn() then + box.begin({txn_isolation = 'read-committed'}) + max = self.space.index.task_id:max() + box.commit() + else + max = self.space.index.task_id:max() + end + local id = max and max[i_id] + 1 or 0 local status @@ -265,10 +285,21 @@ end -- take task function method.take(self) local task = nil - for _, t in self.space.index.status:pairs({state.READY}) do - if not is_expired(t) then - task = t - break + if box.cfg.memtx_use_mvcc_engine and (not box.is_in_txn()) then + box.begin({txn_isolation = 'read-committed'}) + for _, t in self.space.index.status:pairs({state.READY}) do + if not is_expired(t) then + task = t + break + end + end + box.commit() + else + for _, t in self.space.index.status:pairs({state.READY}) do + if not is_expired(t) then + task = t + break + end end end diff --git a/queue/abstract/driver/utube.lua b/queue/abstract/driver/utube.lua index 588ced48..4ad0dfa4 100644 --- a/queue/abstract/driver/utube.lua +++ b/queue/abstract/driver/utube.lua @@ -75,7 +75,27 @@ end -- put task in space function method.put(self, data, opts) - local max = self.space.index.task_id:max() + local max + + -- Taking the maximum of the index is an implicit transactions, so it is + -- always done with 'read-confirmed' mvcc isolation level. + -- It can lead to errors when trying to make parallel 'put' calls with mvcc enabled. + -- It is hapenning because 'max' for several puts in parallel will be the same since + -- read confirmed isolation level makes visible all transactions that finished the commit. + -- To fix it we wrap it with box.begin/commit and set right isolation level. + -- Current fix does not resolve that bug in situations when we already are in transaction + -- since it will open nested transactions. + -- See https://github.com/tarantool/queue/issues/207 + -- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/ + + if box.cfg.memtx_use_mvcc_engine and not box.is_in_txn() then + box.begin({txn_isolation = 'read-committed'}) + max = self.space.index.task_id:max() + box.commit() + else + max = self.space.index.task_id:max() + end + local id = max and max[1] + 1 or 0 local task = self.space:insert{id, state.READY, tostring(opts.utube), data} self.on_task_change(task, 'put') @@ -89,8 +109,25 @@ function method.take(self) if task[2] ~= state.READY then break end + local taken + -- Taking the minimum is an implicit transactions, so it is + -- always done with 'read-confirmed' mvcc isolation level. + -- It can lead to errors when trying to make parallel 'take' calls with mvcc enabled. + -- It is hapenning because 'min' for several takes in parallel will be the same since + -- read confirmed isolation level makes visible all transactions that finished the commit. + -- To fix it we wrap it with box.begin/commit and set right isolation level. + -- Current fix does not resolve that bug in situations when we already are in transaction + -- since it will open nested transactions. + -- See https://github.com/tarantool/queue/issues/207 + -- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/ + if box.cfg.memtx_use_mvcc_engine and (not box.is_in_txn()) then + box.begin({txn_isolation = 'read-committed'}) + taken = self.space.index.utube:min{state.TAKEN, task[3]} + box.commit() + else + taken = self.space.index.utube:min{state.TAKEN, task[3]} + end - local taken = self.space.index.utube:min{state.TAKEN, task[3]} if taken == nil or taken[2] ~= state.TAKEN then task = self.space:update(task[1], { { '=', 2, state.TAKEN } }) self.on_task_change(task, 'take') diff --git a/queue/abstract/driver/utubettl.lua b/queue/abstract/driver/utubettl.lua index e2c887dd..d9cb95bf 100644 --- a/queue/abstract/driver/utubettl.lua +++ b/queue/abstract/driver/utubettl.lua @@ -217,7 +217,27 @@ end -- put task in space function method.put(self, data, opts) - local max = self.space.index.task_id:max() + local max + + -- Taking the maximum of the index is an implicit transactions, so it is + -- always done with 'read-confirmed' mvcc isolation level. + -- It can lead to errors when trying to make parallel 'put' calls with mvcc enabled. + -- It is hapenning because 'max' for several puts in parallel will be the same since + -- read confirmed isolation level makes visible all transactions that finished the commit. + -- To fix it we wrap it with box.begin/commit and set right isolation level. + -- Current fix does not resolve that bug in situations when we already are in transaction + -- since it will open nested transactions. + -- See https://github.com/tarantool/queue/issues/207 + -- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/ + + if box.cfg.memtx_use_mvcc_engine and not box.is_in_txn() then + box.begin({txn_isolation = 'read-committed'}) + max = self.space.index.task_id:max() + box.commit() + else + max = self.space.index.task_id:max() + end + local id = max and max[i_id] + 1 or 0 local status @@ -280,7 +300,25 @@ function method.take(self) break elseif not is_expired(t) then local next_event = util.time() + t[i_ttr] - local taken = self.space.index.utube:min{state.TAKEN, t[i_utube]} + local taken + -- Taking the minimum is an implicit transactions, so it is + -- always done with 'read-confirmed' mvcc isolation level. + -- It can lead to errors when trying to make parallel 'take' calls with mvcc enabled. + -- It is hapenning because 'min' for several takes in parallel will be the same since + -- read confirmed isolation level makes visible all transactions that finished the commit. + -- To fix it we wrap it with box.begin/commit and set right isolation level. + -- Current fix does not resolve that bug in situations when we already are in transaction + -- since it will open nested transactions. + -- See https://github.com/tarantool/queue/issues/207 + -- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/ + if box.cfg.memtx_use_mvcc_engine and (not box.is_in_txn()) then + box.begin({txn_isolation = 'read-committed'}) + taken = self.space.index.utube:min{state.TAKEN, t[i_utube]} + box.commit() + else + taken = self.space.index.utube:min{state.TAKEN, t[i_utube]} + end + if taken == nil or taken[i_status] ~= state.TAKEN then t = self.space:update(t[1], { { '=', i_status, state.TAKEN }, diff --git a/t/220-mvcc.t b/t/220-mvcc.t new file mode 100644 index 00000000..286a06f1 --- /dev/null +++ b/t/220-mvcc.t @@ -0,0 +1,53 @@ +#!/usr/bin/env tarantool +local qc = require('queue.compat') +local log = require('log') +if not qc.check_version({2, 6, 1}) then + log.info('Tests skipped, tarantool version < 2.6.1') + return +end +local yaml = require('yaml') +local fiber = require('fiber') + +local test = require('tap').test() +test:plan(6) + +local queue = require('queue') +local state = require('queue.abstract.state') + +local tnt = require('t.tnt') +tnt.cfg{memtx_use_mvcc_engine = true} + +local engine = 'memtx' + +test:ok(rawget(box, 'space'), 'box started') +test:ok(queue, 'queue is loaded') + +local tube = queue.create_tube('test', 'fifo', { engine = engine }) +test:ok(tube, 'test tube created') +test:is(tube.name, 'test', 'tube.name') +test:is(tube.type, 'fifo', 'tube.type') + +test:test('concurent put and take with mvcc', function(test) + test:plan(5) + + local channel = fiber.channel(1000) + test:ok(channel, 'channel created') + + for i = 1, 2 do + fiber.create(function(i) + test:ok(tube:put(i), 'task ' .. i .. ' was put') + end, i) + end + + for i = 1, 2 do + fiber.create(function(i) + test:ok(tube:take(i), 'task was taken ' .. i) + end, i) + end + +end) + + +tnt.finish() +os.exit(test:check() and 0 or 1) +-- vim: set ft=lua: