From a65f81ea97d2b464255a7927facd1775a3da1a98 Mon Sep 17 00:00:00 2001 From: better0fdead Date: Wed, 5 Jul 2023 11:25:20 +0300 Subject: [PATCH] driver: fix duplicate id error with mvvc on put and take Taking the maximum or minimum of the index is an implicit transactions, so it is always done with 'read-confirmed' mvcc isolation level. It can lead to errors when trying to make parallel 'put' or 'take' calls with mvcc enabled. It is hapenning because 'max' or 'min' for several puts in parallel will be the same since read confirmed isolation level makes visible all transactions that finished the commit. To fix it we wrap it with box.begin/commit and set right isolation level. Current fix does not resolve that bug in situations when we already are in transaction since it will open nested transactions. Part of #207 --- queue/abstract/driver/fifo.lua | 42 ++++++++++++++++- queue/abstract/driver/fifottl.lua | 41 ++++++++++++++--- queue/abstract/driver/utube.lua | 41 ++++++++++++++++- queue/abstract/driver/utubettl.lua | 42 ++++++++++++++++- t/220-mvcc.t | 72 ++++++++++++++++++++++++++++++ 5 files changed, 227 insertions(+), 11 deletions(-) create mode 100644 t/220-mvcc.t diff --git a/queue/abstract/driver/fifo.lua b/queue/abstract/driver/fifo.lua index 4093059c..bc723e6c 100644 --- a/queue/abstract/driver/fifo.lua +++ b/queue/abstract/driver/fifo.lua @@ -68,7 +68,27 @@ end -- put task in space function method.put(self, data, opts) - local max = self.space.index.task_id:max() + local max + + -- Taking the maximum of the index is an implicit transactions, so it is + -- always done with 'read-confirmed' mvcc isolation level. + -- It can lead to errors when trying to make parallel 'put' calls with mvcc enabled. + -- It is hapenning because 'max' for several puts in parallel will be the same since + -- read confirmed isolation level makes visible all transactions that finished the commit. + -- To fix it we wrap it with box.begin/commit and set right isolation level. + -- Current fix does not resolve that bug in situations when we already are in transaction + -- since it will open nested transactions. + -- See https://github.com/tarantool/queue/issues/207 + -- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/ + + if box.cfg.memtx_use_mvcc_engine and (not box.is_in_txn()) then + box.begin({txn_isolation = 'read-committed'}) + max = self.space.index.task_id:max() + box.commit() + else + max = self.space.index.task_id:max() + end + local id = max and max[1] + 1 or 0 local task = self.space:insert{id, state.READY, data} self.on_task_change(task, 'put') @@ -77,7 +97,25 @@ end -- take task function method.take(self) - local task = self.space.index.status:min{state.READY} + local task + -- Taking the minimum is an implicit transactions, so it is + -- always done with 'read-confirmed' mvcc isolation level. + -- It can lead to errors when trying to make parallel 'take' calls with mvcc enabled. + -- It is hapenning because 'min' for several takes in parallel will be the same since + -- read confirmed isolation level makes visible all transactions that finished the commit. + -- To fix it we wrap it with box.begin/commit and set right isolation level. + -- Current fix does not resolve that bug in situations when we already are in transaction + -- since it will open nested transactions. + -- See https://github.com/tarantool/queue/issues/207 + -- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/ + if box.cfg.memtx_use_mvcc_engine and (not box.is_in_txn()) then + box.begin({txn_isolation = 'read-committed'}) + task = self.space.index.status:min{state.READY} + box.commit() + else + task = self.space.index.status:min{state.READY} + end + if task ~= nil and task[2] == state.READY then task = self.space:update(task[1], { { '=', 2, state.TAKEN } }) self.on_task_change(task, 'take') diff --git a/queue/abstract/driver/fifottl.lua b/queue/abstract/driver/fifottl.lua index 496a324c..d2b60809 100644 --- a/queue/abstract/driver/fifottl.lua +++ b/queue/abstract/driver/fifottl.lua @@ -209,7 +209,27 @@ end -- put task in space function method.put(self, data, opts) - local max = self.space.index.task_id:max() + local max + + -- Taking the maximum of the index is an implicit transactions, so it is + -- always done with 'read-confirmed' mvcc isolation level. + -- It can lead to errors when trying to make parallel 'put' calls with mvcc enabled. + -- It is hapenning because 'max' for several puts in parallel will be the same since + -- read confirmed isolation level makes visible all transactions that finished the commit. + -- To fix it we wrap it with box.begin/commit and set right isolation level. + -- Current fix does not resolve that bug in situations when we already are in transaction + -- since it will open nested transactions. + -- See https://github.com/tarantool/queue/issues/207 + -- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/ + + if box.cfg.memtx_use_mvcc_engine and not box.is_in_txn() then + box.begin({txn_isolation = 'read-committed'}) + max = self.space.index.task_id:max() + box.commit() + else + max = self.space.index.task_id:max() + end + local id = max and max[i_id] + 1 or 0 local status @@ -265,10 +285,21 @@ end -- take task function method.take(self) local task = nil - for _, t in self.space.index.status:pairs({state.READY}) do - if not is_expired(t) then - task = t - break + if box.cfg.memtx_use_mvcc_engine and (not box.is_in_txn()) then + box.begin({txn_isolation = 'read-committed'}) + for _, t in self.space.index.status:pairs({state.READY}) do + if not is_expired(t) then + task = t + break + end + end + box.commit() + else + for _, t in self.space.index.status:pairs({state.READY}) do + if not is_expired(t) then + task = t + break + end end end diff --git a/queue/abstract/driver/utube.lua b/queue/abstract/driver/utube.lua index 588ced48..4ad0dfa4 100644 --- a/queue/abstract/driver/utube.lua +++ b/queue/abstract/driver/utube.lua @@ -75,7 +75,27 @@ end -- put task in space function method.put(self, data, opts) - local max = self.space.index.task_id:max() + local max + + -- Taking the maximum of the index is an implicit transactions, so it is + -- always done with 'read-confirmed' mvcc isolation level. + -- It can lead to errors when trying to make parallel 'put' calls with mvcc enabled. + -- It is hapenning because 'max' for several puts in parallel will be the same since + -- read confirmed isolation level makes visible all transactions that finished the commit. + -- To fix it we wrap it with box.begin/commit and set right isolation level. + -- Current fix does not resolve that bug in situations when we already are in transaction + -- since it will open nested transactions. + -- See https://github.com/tarantool/queue/issues/207 + -- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/ + + if box.cfg.memtx_use_mvcc_engine and not box.is_in_txn() then + box.begin({txn_isolation = 'read-committed'}) + max = self.space.index.task_id:max() + box.commit() + else + max = self.space.index.task_id:max() + end + local id = max and max[1] + 1 or 0 local task = self.space:insert{id, state.READY, tostring(opts.utube), data} self.on_task_change(task, 'put') @@ -89,8 +109,25 @@ function method.take(self) if task[2] ~= state.READY then break end + local taken + -- Taking the minimum is an implicit transactions, so it is + -- always done with 'read-confirmed' mvcc isolation level. + -- It can lead to errors when trying to make parallel 'take' calls with mvcc enabled. + -- It is hapenning because 'min' for several takes in parallel will be the same since + -- read confirmed isolation level makes visible all transactions that finished the commit. + -- To fix it we wrap it with box.begin/commit and set right isolation level. + -- Current fix does not resolve that bug in situations when we already are in transaction + -- since it will open nested transactions. + -- See https://github.com/tarantool/queue/issues/207 + -- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/ + if box.cfg.memtx_use_mvcc_engine and (not box.is_in_txn()) then + box.begin({txn_isolation = 'read-committed'}) + taken = self.space.index.utube:min{state.TAKEN, task[3]} + box.commit() + else + taken = self.space.index.utube:min{state.TAKEN, task[3]} + end - local taken = self.space.index.utube:min{state.TAKEN, task[3]} if taken == nil or taken[2] ~= state.TAKEN then task = self.space:update(task[1], { { '=', 2, state.TAKEN } }) self.on_task_change(task, 'take') diff --git a/queue/abstract/driver/utubettl.lua b/queue/abstract/driver/utubettl.lua index e2c887dd..d9cb95bf 100644 --- a/queue/abstract/driver/utubettl.lua +++ b/queue/abstract/driver/utubettl.lua @@ -217,7 +217,27 @@ end -- put task in space function method.put(self, data, opts) - local max = self.space.index.task_id:max() + local max + + -- Taking the maximum of the index is an implicit transactions, so it is + -- always done with 'read-confirmed' mvcc isolation level. + -- It can lead to errors when trying to make parallel 'put' calls with mvcc enabled. + -- It is hapenning because 'max' for several puts in parallel will be the same since + -- read confirmed isolation level makes visible all transactions that finished the commit. + -- To fix it we wrap it with box.begin/commit and set right isolation level. + -- Current fix does not resolve that bug in situations when we already are in transaction + -- since it will open nested transactions. + -- See https://github.com/tarantool/queue/issues/207 + -- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/ + + if box.cfg.memtx_use_mvcc_engine and not box.is_in_txn() then + box.begin({txn_isolation = 'read-committed'}) + max = self.space.index.task_id:max() + box.commit() + else + max = self.space.index.task_id:max() + end + local id = max and max[i_id] + 1 or 0 local status @@ -280,7 +300,25 @@ function method.take(self) break elseif not is_expired(t) then local next_event = util.time() + t[i_ttr] - local taken = self.space.index.utube:min{state.TAKEN, t[i_utube]} + local taken + -- Taking the minimum is an implicit transactions, so it is + -- always done with 'read-confirmed' mvcc isolation level. + -- It can lead to errors when trying to make parallel 'take' calls with mvcc enabled. + -- It is hapenning because 'min' for several takes in parallel will be the same since + -- read confirmed isolation level makes visible all transactions that finished the commit. + -- To fix it we wrap it with box.begin/commit and set right isolation level. + -- Current fix does not resolve that bug in situations when we already are in transaction + -- since it will open nested transactions. + -- See https://github.com/tarantool/queue/issues/207 + -- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/ + if box.cfg.memtx_use_mvcc_engine and (not box.is_in_txn()) then + box.begin({txn_isolation = 'read-committed'}) + taken = self.space.index.utube:min{state.TAKEN, t[i_utube]} + box.commit() + else + taken = self.space.index.utube:min{state.TAKEN, t[i_utube]} + end + if taken == nil or taken[i_status] ~= state.TAKEN then t = self.space:update(t[1], { { '=', i_status, state.TAKEN }, diff --git a/t/220-mvcc.t b/t/220-mvcc.t new file mode 100644 index 00000000..293255bf --- /dev/null +++ b/t/220-mvcc.t @@ -0,0 +1,72 @@ +#!/usr/bin/env tarantool +local qc = require('queue.compat') +local log = require('log') +if not qc.check_version({2, 6, 1}) then + log.info('Tests skipped, tarantool version < 2.6.1') + return +end +local yaml = require('yaml') +local fiber = require('fiber') + +local test = require('tap').test() +test:plan(6) + +local queue = require('queue') +local state = require('queue.abstract.state') + +local tnt = require('t.tnt') +tnt.cfg{memtx_use_mvcc_engine = true} + +local engine = 'memtx' + +test:ok(rawget(box, 'space'), 'box started') +test:ok(queue, 'queue is loaded') + +local tube = queue.create_tube('test', 'fifo', { engine = engine, temporary = false}) +test:ok(tube, 'test tube created') +test:is(tube.name, 'test', 'tube.name') +test:is(tube.type, 'fifo', 'tube.type') + + +--- That test checks that https://github.com/tarantool/queue/pull/211 is fixed. +-- Previously trying to make parallel 'put' or 'take' calls failed with mvcc enabled. +test:test('concurent put and take with mvcc', function(test) + test:plan(6) + -- channels are used to wait for fibers to finish + -- and check results of the 'take'/'put'. + local channel_put = fiber.channel(2) + test:ok(channel_put, 'channel created') + local channel_take = fiber.channel(2) + test:ok(channel_take, 'channel created') + + for i = 1, 2 do + fiber.create(function(i) + local err = pcall(tube.put, tube, i) + channel_put:put(err) + end, i) + end + + for i = 1, 2 do + local res = channel_put:get(1) + test:ok(res, 'task ' .. i .. ' was put') + end + + for i = 1, 2 do + fiber.create(function(i) + local err = pcall(tube.take, tube) + channel_take:put(err) + end, i) + end + + for i = 1, 2 do + local res = channel_take:get() + test:ok(res, 'task ' .. i .. ' was taken') + end + + +end) + + +tnt.finish() +os.exit(test:check() and 0 or 1) +-- vim: set ft=lua: