From fe3ddee95d1214032c5e8a25349587718909db62 Mon Sep 17 00:00:00 2001 From: better0fdead Date: Wed, 5 Jul 2023 11:25:20 +0300 Subject: [PATCH] driver: fix duplicate id error with mvvc on Taking the maximum of the index is an implicit transactions, so it is always done with 'read-confirmed' mvcc isolation level. It can lead to errors when trying to make parallel 'put' calls with mvcc enabled. It is hapenning because 'max' for several puts in parallel will be the same since read confirmed isolation level makes visible all transactions that finished the commit. To fix it we wrap it with box.begin/commit and set right isolation level. Closes #207 --- queue/abstract/driver/fifo.lua | 19 ++++++++- queue/abstract/driver/fifottl.lua | 19 ++++++++- queue/abstract/driver/utube.lua | 19 ++++++++- queue/abstract/driver/utubettl.lua | 19 ++++++++- t/220-mvcc.t | 63 ++++++++++++++++++++++++++++++ 5 files changed, 135 insertions(+), 4 deletions(-) create mode 100644 t/220-mvcc.t diff --git a/queue/abstract/driver/fifo.lua b/queue/abstract/driver/fifo.lua index 4093059c..acd00e12 100644 --- a/queue/abstract/driver/fifo.lua +++ b/queue/abstract/driver/fifo.lua @@ -68,7 +68,24 @@ end -- put task in space function method.put(self, data, opts) - local max = self.space.index.task_id:max() + local max + + -- Taking the maximum of the index is an implicit transactions, so it is + -- always done with 'read-confirmed' mvcc isolation level. + -- It can lead to errors when trying to make parallel 'put' calls with mvcc enabled. + -- It is hapenning because 'max' for several puts in parallel will be the same since + -- read confirmed isolation level makes visible all transactions that finished the commit. + -- To fix it we wrap it with box.begin/commit and set right isolation level. + -- See https://github.com/tarantool/queue/issues/207 + -- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/ + if box.cfg.memtx_use_mvcc_engine then + box.begin({txn_isolation = 'read-committed'}) + max = self.space.index.task_id:max() + box.commit() + else + max = self.space.index.task_id:max() + end + local id = max and max[1] + 1 or 0 local task = self.space:insert{id, state.READY, data} self.on_task_change(task, 'put') diff --git a/queue/abstract/driver/fifottl.lua b/queue/abstract/driver/fifottl.lua index 496a324c..9ed214ea 100644 --- a/queue/abstract/driver/fifottl.lua +++ b/queue/abstract/driver/fifottl.lua @@ -209,7 +209,24 @@ end -- put task in space function method.put(self, data, opts) - local max = self.space.index.task_id:max() + local max + + -- Taking the maximum of the index is an implicit transactions, so it is + -- always done with 'read-confirmed' mvcc isolation level. + -- It can lead to errors when trying to make parallel 'put' calls with mvcc enabled. + -- It is hapenning because 'max' for several puts in parallel will be the same since + -- read confirmed isolation level makes visible all transactions that finished the commit. + -- To fix it we wrap it with box.begin/commit and set right isolation level. + -- See https://github.com/tarantool/queue/issues/207 + -- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/ + if box.cfg.memtx_use_mvcc_engine then + box.begin({txn_isolation = 'read-committed'}) + max = self.space.index.task_id:max() + box.commit() + else + max = self.space.index.task_id:max() + end + local id = max and max[i_id] + 1 or 0 local status diff --git a/queue/abstract/driver/utube.lua b/queue/abstract/driver/utube.lua index 588ced48..b130a49f 100644 --- a/queue/abstract/driver/utube.lua +++ b/queue/abstract/driver/utube.lua @@ -75,7 +75,24 @@ end -- put task in space function method.put(self, data, opts) - local max = self.space.index.task_id:max() + local max + + -- Taking the maximum of the index is an implicit transactions, so it is + -- always done with 'read-confirmed' mvcc isolation level. + -- It can lead to errors when trying to make parallel 'put' calls with mvcc enabled. + -- It is hapenning because 'max' for several puts in parallel will be the same since + -- read confirmed isolation level makes visible all transactions that finished the commit. + -- To fix it we wrap it with box.begin/commit and set right isolation level. + -- See https://github.com/tarantool/queue/issues/207 + -- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/ + if box.cfg.memtx_use_mvcc_engine then + box.begin({txn_isolation = 'read-committed'}) + max = self.space.index.task_id:max() + box.commit() + else + max = self.space.index.task_id:max() + end + local id = max and max[1] + 1 or 0 local task = self.space:insert{id, state.READY, tostring(opts.utube), data} self.on_task_change(task, 'put') diff --git a/queue/abstract/driver/utubettl.lua b/queue/abstract/driver/utubettl.lua index e2c887dd..0e1f0e0f 100644 --- a/queue/abstract/driver/utubettl.lua +++ b/queue/abstract/driver/utubettl.lua @@ -217,7 +217,24 @@ end -- put task in space function method.put(self, data, opts) - local max = self.space.index.task_id:max() + local max + + -- Taking the maximum of the index is an implicit transactions, so it is + -- always done with 'read-confirmed' mvcc isolation level. + -- It can lead to errors when trying to make parallel 'put' calls with mvcc enabled. + -- It is hapenning because 'max' for several puts in parallel will be the same since + -- read confirmed isolation level makes visible all transactions that finished the commit. + -- To fix it we wrap it with box.begin/commit and set right isolation level. + -- See https://github.com/tarantool/queue/issues/207 + -- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/ + if box.cfg.memtx_use_mvcc_engine then + box.begin({txn_isolation = 'read-committed'}) + max = self.space.index.task_id:max() + box.commit() + else + max = self.space.index.task_id:max() + end + local id = max and max[i_id] + 1 or 0 local status diff --git a/t/220-mvcc.t b/t/220-mvcc.t new file mode 100644 index 00000000..c51e4593 --- /dev/null +++ b/t/220-mvcc.t @@ -0,0 +1,63 @@ +#!/usr/bin/env tarantool +local qc = require('queue.compat') +local log = require('log') +if not qc.check_version({2, 6, 1}) then + log.info('Tests skipped, tarantool version < 2.6.1') + return +end +local yaml = require('yaml') +local fiber = require('fiber') + +local test = require('tap').test() +test:plan(6) + +local queue = require('queue') +local state = require('queue.abstract.state') + +local tnt = require('t.tnt') +tnt.cfg{memtx_use_mvcc_engine = true} + +local engine = 'memtx' + +test:ok(rawget(box, 'space'), 'box started') +test:ok(queue, 'queue is loaded') + +local tube = queue.create_tube('test', 'fifo', { engine = engine }) +test:ok(tube, 'test tube created') +test:is(tube.name, 'test', 'tube.name') +test:is(tube.type, 'fifo', 'tube.type') + +test:test('concurent take', function(test) + test:plan(151) + + local channel = fiber.channel(1000) + test:ok(channel, 'channel created') + + local res = {} + for i = 1, 50 do + fiber.create(function(i) + local taken = tube:take(1) + test:ok(taken, 'Task was taken ' .. i) + table.insert(res, { taken }) + channel:put(true) + end, i) + end + + fiber.sleep(.1) + test:ok(tube:put(1), 'task 1 was put') + + for i = 2, 50 do + fiber.create(function(i) + test:ok(tube:put(i), 'task ' .. i .. ' was put') + end, i) + end + fiber.sleep(.1) + for i = 1, 50 do + test:ok(channel:get(1 / i), 'take was done ' .. i) + end +end) + + +tnt.finish() +os.exit(test:check() and 0 or 1) +-- vim: set ft=lua: