diff --git a/queue/abstract/driver/fifo.lua b/queue/abstract/driver/fifo.lua index a27412f..bc723e6 100644 --- a/queue/abstract/driver/fifo.lua +++ b/queue/abstract/driver/fifo.lua @@ -97,7 +97,25 @@ end -- take task function method.take(self) - local task = self.space.index.status:min{state.READY} + local task + -- Taking the minimum is an implicit transactions, so it is + -- always done with 'read-confirmed' mvcc isolation level. + -- It can lead to errors when trying to make parallel 'take' calls with mvcc enabled. + -- It is hapenning because 'min' for several takes in parallel will be the same since + -- read confirmed isolation level makes visible all transactions that finished the commit. + -- To fix it we wrap it with box.begin/commit and set right isolation level. + -- Current fix does not resolve that bug in situations when we already are in transaction + -- since it will open nested transactions. + -- See https://github.com/tarantool/queue/issues/207 + -- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/ + if box.cfg.memtx_use_mvcc_engine and (not box.is_in_txn()) then + box.begin({txn_isolation = 'read-committed'}) + task = self.space.index.status:min{state.READY} + box.commit() + else + task = self.space.index.status:min{state.READY} + end + if task ~= nil and task[2] == state.READY then task = self.space:update(task[1], { { '=', 2, state.TAKEN } }) self.on_task_change(task, 'take') diff --git a/queue/abstract/driver/fifottl.lua b/queue/abstract/driver/fifottl.lua index 4782f91..d2b6080 100644 --- a/queue/abstract/driver/fifottl.lua +++ b/queue/abstract/driver/fifottl.lua @@ -285,10 +285,21 @@ end -- take task function method.take(self) local task = nil - for _, t in self.space.index.status:pairs({state.READY}) do - if not is_expired(t) then - task = t - break + if box.cfg.memtx_use_mvcc_engine and (not box.is_in_txn()) then + box.begin({txn_isolation = 'read-committed'}) + for _, t in self.space.index.status:pairs({state.READY}) do + if not is_expired(t) then + task = t + break + end + end + box.commit() + else + for _, t in self.space.index.status:pairs({state.READY}) do + if not is_expired(t) then + task = t + break + end end end diff --git a/queue/abstract/driver/utube.lua b/queue/abstract/driver/utube.lua index 38f4232..4ad0dfa 100644 --- a/queue/abstract/driver/utube.lua +++ b/queue/abstract/driver/utube.lua @@ -109,8 +109,25 @@ function method.take(self) if task[2] ~= state.READY then break end + local taken + -- Taking the minimum is an implicit transactions, so it is + -- always done with 'read-confirmed' mvcc isolation level. + -- It can lead to errors when trying to make parallel 'take' calls with mvcc enabled. + -- It is hapenning because 'min' for several takes in parallel will be the same since + -- read confirmed isolation level makes visible all transactions that finished the commit. + -- To fix it we wrap it with box.begin/commit and set right isolation level. + -- Current fix does not resolve that bug in situations when we already are in transaction + -- since it will open nested transactions. + -- See https://github.com/tarantool/queue/issues/207 + -- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/ + if box.cfg.memtx_use_mvcc_engine and (not box.is_in_txn()) then + box.begin({txn_isolation = 'read-committed'}) + taken = self.space.index.utube:min{state.TAKEN, task[3]} + box.commit() + else + taken = self.space.index.utube:min{state.TAKEN, task[3]} + end - local taken = self.space.index.utube:min{state.TAKEN, task[3]} if taken == nil or taken[2] ~= state.TAKEN then task = self.space:update(task[1], { { '=', 2, state.TAKEN } }) self.on_task_change(task, 'take') diff --git a/queue/abstract/driver/utubettl.lua b/queue/abstract/driver/utubettl.lua index 1677d74..d9cb95b 100644 --- a/queue/abstract/driver/utubettl.lua +++ b/queue/abstract/driver/utubettl.lua @@ -300,7 +300,25 @@ function method.take(self) break elseif not is_expired(t) then local next_event = util.time() + t[i_ttr] - local taken = self.space.index.utube:min{state.TAKEN, t[i_utube]} + local taken + -- Taking the minimum is an implicit transactions, so it is + -- always done with 'read-confirmed' mvcc isolation level. + -- It can lead to errors when trying to make parallel 'take' calls with mvcc enabled. + -- It is hapenning because 'min' for several takes in parallel will be the same since + -- read confirmed isolation level makes visible all transactions that finished the commit. + -- To fix it we wrap it with box.begin/commit and set right isolation level. + -- Current fix does not resolve that bug in situations when we already are in transaction + -- since it will open nested transactions. + -- See https://github.com/tarantool/queue/issues/207 + -- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/ + if box.cfg.memtx_use_mvcc_engine and (not box.is_in_txn()) then + box.begin({txn_isolation = 'read-committed'}) + taken = self.space.index.utube:min{state.TAKEN, t[i_utube]} + box.commit() + else + taken = self.space.index.utube:min{state.TAKEN, t[i_utube]} + end + if taken == nil or taken[i_status] ~= state.TAKEN then t = self.space:update(t[1], { { '=', i_status, state.TAKEN },