diff --git a/queue/abstract/driver/fifo.lua b/queue/abstract/driver/fifo.lua index 4093059c..a27412f3 100644 --- a/queue/abstract/driver/fifo.lua +++ b/queue/abstract/driver/fifo.lua @@ -68,7 +68,27 @@ end -- put task in space function method.put(self, data, opts) - local max = self.space.index.task_id:max() + local max + + -- Taking the maximum of the index is an implicit transactions, so it is + -- always done with 'read-confirmed' mvcc isolation level. + -- It can lead to errors when trying to make parallel 'put' calls with mvcc enabled. + -- It is hapenning because 'max' for several puts in parallel will be the same since + -- read confirmed isolation level makes visible all transactions that finished the commit. + -- To fix it we wrap it with box.begin/commit and set right isolation level. + -- Current fix does not resolve that bug in situations when we already are in transaction + -- since it will open nested transactions. + -- See https://github.com/tarantool/queue/issues/207 + -- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/ + + if box.cfg.memtx_use_mvcc_engine and (not box.is_in_txn()) then + box.begin({txn_isolation = 'read-committed'}) + max = self.space.index.task_id:max() + box.commit() + else + max = self.space.index.task_id:max() + end + local id = max and max[1] + 1 or 0 local task = self.space:insert{id, state.READY, data} self.on_task_change(task, 'put') diff --git a/queue/abstract/driver/fifottl.lua b/queue/abstract/driver/fifottl.lua index 496a324c..4782f912 100644 --- a/queue/abstract/driver/fifottl.lua +++ b/queue/abstract/driver/fifottl.lua @@ -209,7 +209,27 @@ end -- put task in space function method.put(self, data, opts) - local max = self.space.index.task_id:max() + local max + + -- Taking the maximum of the index is an implicit transactions, so it is + -- always done with 'read-confirmed' mvcc isolation level. + -- It can lead to errors when trying to make parallel 'put' calls with mvcc enabled. + -- It is hapenning because 'max' for several puts in parallel will be the same since + -- read confirmed isolation level makes visible all transactions that finished the commit. + -- To fix it we wrap it with box.begin/commit and set right isolation level. + -- Current fix does not resolve that bug in situations when we already are in transaction + -- since it will open nested transactions. + -- See https://github.com/tarantool/queue/issues/207 + -- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/ + + if box.cfg.memtx_use_mvcc_engine and not box.is_in_txn() then + box.begin({txn_isolation = 'read-committed'}) + max = self.space.index.task_id:max() + box.commit() + else + max = self.space.index.task_id:max() + end + local id = max and max[i_id] + 1 or 0 local status diff --git a/queue/abstract/driver/utube.lua b/queue/abstract/driver/utube.lua index 588ced48..38f42328 100644 --- a/queue/abstract/driver/utube.lua +++ b/queue/abstract/driver/utube.lua @@ -75,7 +75,27 @@ end -- put task in space function method.put(self, data, opts) - local max = self.space.index.task_id:max() + local max + + -- Taking the maximum of the index is an implicit transactions, so it is + -- always done with 'read-confirmed' mvcc isolation level. + -- It can lead to errors when trying to make parallel 'put' calls with mvcc enabled. + -- It is hapenning because 'max' for several puts in parallel will be the same since + -- read confirmed isolation level makes visible all transactions that finished the commit. + -- To fix it we wrap it with box.begin/commit and set right isolation level. + -- Current fix does not resolve that bug in situations when we already are in transaction + -- since it will open nested transactions. + -- See https://github.com/tarantool/queue/issues/207 + -- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/ + + if box.cfg.memtx_use_mvcc_engine and not box.is_in_txn() then + box.begin({txn_isolation = 'read-committed'}) + max = self.space.index.task_id:max() + box.commit() + else + max = self.space.index.task_id:max() + end + local id = max and max[1] + 1 or 0 local task = self.space:insert{id, state.READY, tostring(opts.utube), data} self.on_task_change(task, 'put') diff --git a/queue/abstract/driver/utubettl.lua b/queue/abstract/driver/utubettl.lua index e2c887dd..1677d745 100644 --- a/queue/abstract/driver/utubettl.lua +++ b/queue/abstract/driver/utubettl.lua @@ -217,7 +217,27 @@ end -- put task in space function method.put(self, data, opts) - local max = self.space.index.task_id:max() + local max + + -- Taking the maximum of the index is an implicit transactions, so it is + -- always done with 'read-confirmed' mvcc isolation level. + -- It can lead to errors when trying to make parallel 'put' calls with mvcc enabled. + -- It is hapenning because 'max' for several puts in parallel will be the same since + -- read confirmed isolation level makes visible all transactions that finished the commit. + -- To fix it we wrap it with box.begin/commit and set right isolation level. + -- Current fix does not resolve that bug in situations when we already are in transaction + -- since it will open nested transactions. + -- See https://github.com/tarantool/queue/issues/207 + -- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/ + + if box.cfg.memtx_use_mvcc_engine and not box.is_in_txn() then + box.begin({txn_isolation = 'read-committed'}) + max = self.space.index.task_id:max() + box.commit() + else + max = self.space.index.task_id:max() + end + local id = max and max[i_id] + 1 or 0 local status diff --git a/t/220-mvcc.t b/t/220-mvcc.t new file mode 100644 index 00000000..c51e4593 --- /dev/null +++ b/t/220-mvcc.t @@ -0,0 +1,63 @@ +#!/usr/bin/env tarantool +local qc = require('queue.compat') +local log = require('log') +if not qc.check_version({2, 6, 1}) then + log.info('Tests skipped, tarantool version < 2.6.1') + return +end +local yaml = require('yaml') +local fiber = require('fiber') + +local test = require('tap').test() +test:plan(6) + +local queue = require('queue') +local state = require('queue.abstract.state') + +local tnt = require('t.tnt') +tnt.cfg{memtx_use_mvcc_engine = true} + +local engine = 'memtx' + +test:ok(rawget(box, 'space'), 'box started') +test:ok(queue, 'queue is loaded') + +local tube = queue.create_tube('test', 'fifo', { engine = engine }) +test:ok(tube, 'test tube created') +test:is(tube.name, 'test', 'tube.name') +test:is(tube.type, 'fifo', 'tube.type') + +test:test('concurent take', function(test) + test:plan(151) + + local channel = fiber.channel(1000) + test:ok(channel, 'channel created') + + local res = {} + for i = 1, 50 do + fiber.create(function(i) + local taken = tube:take(1) + test:ok(taken, 'Task was taken ' .. i) + table.insert(res, { taken }) + channel:put(true) + end, i) + end + + fiber.sleep(.1) + test:ok(tube:put(1), 'task 1 was put') + + for i = 2, 50 do + fiber.create(function(i) + test:ok(tube:put(i), 'task ' .. i .. ' was put') + end, i) + end + fiber.sleep(.1) + for i = 1, 50 do + test:ok(channel:get(1 / i), 'take was done ' .. i) + end +end) + + +tnt.finish() +os.exit(test:check() and 0 or 1) +-- vim: set ft=lua: