From 8db176b45e0f9739e3edd5694aafc1fdbf03b8a0 Mon Sep 17 00:00:00 2001 From: better0fdead Date: Thu, 10 Aug 2023 18:49:16 +0300 Subject: [PATCH] driver: fix duplicate id error with mvvc on on take Taking the minimum of the index is an implicit transactions, so it is always done with 'read-confirmed' mvcc isolation level. It can lead to errors when trying to make parallel 'take' calls with mvcc enabled. It is hapenning because 'min' for several takes in parallel will be the same since read confirmed isolation level makes visible all transactions that finished the commit. To fix it we wrap it with box.begin/commit and set right isolation level. Current fix does not resolve that bug in situations when we already are in transaction since it will open nested transactions. Part of #207 --- queue/abstract/driver/fifo.lua | 20 +++++++++++++++++++- queue/abstract/driver/fifottl.lua | 19 +++++++++++++++---- queue/abstract/driver/utube.lua | 19 ++++++++++++++++++- queue/abstract/driver/utubettl.lua | 20 +++++++++++++++++++- 4 files changed, 71 insertions(+), 7 deletions(-) diff --git a/queue/abstract/driver/fifo.lua b/queue/abstract/driver/fifo.lua index a27412f3..bc723e6c 100644 --- a/queue/abstract/driver/fifo.lua +++ b/queue/abstract/driver/fifo.lua @@ -97,7 +97,25 @@ end -- take task function method.take(self) - local task = self.space.index.status:min{state.READY} + local task + -- Taking the minimum is an implicit transactions, so it is + -- always done with 'read-confirmed' mvcc isolation level. + -- It can lead to errors when trying to make parallel 'take' calls with mvcc enabled. + -- It is hapenning because 'min' for several takes in parallel will be the same since + -- read confirmed isolation level makes visible all transactions that finished the commit. + -- To fix it we wrap it with box.begin/commit and set right isolation level. + -- Current fix does not resolve that bug in situations when we already are in transaction + -- since it will open nested transactions. + -- See https://github.com/tarantool/queue/issues/207 + -- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/ + if box.cfg.memtx_use_mvcc_engine and (not box.is_in_txn()) then + box.begin({txn_isolation = 'read-committed'}) + task = self.space.index.status:min{state.READY} + box.commit() + else + task = self.space.index.status:min{state.READY} + end + if task ~= nil and task[2] == state.READY then task = self.space:update(task[1], { { '=', 2, state.TAKEN } }) self.on_task_change(task, 'take') diff --git a/queue/abstract/driver/fifottl.lua b/queue/abstract/driver/fifottl.lua index 4782f912..d2b60809 100644 --- a/queue/abstract/driver/fifottl.lua +++ b/queue/abstract/driver/fifottl.lua @@ -285,10 +285,21 @@ end -- take task function method.take(self) local task = nil - for _, t in self.space.index.status:pairs({state.READY}) do - if not is_expired(t) then - task = t - break + if box.cfg.memtx_use_mvcc_engine and (not box.is_in_txn()) then + box.begin({txn_isolation = 'read-committed'}) + for _, t in self.space.index.status:pairs({state.READY}) do + if not is_expired(t) then + task = t + break + end + end + box.commit() + else + for _, t in self.space.index.status:pairs({state.READY}) do + if not is_expired(t) then + task = t + break + end end end diff --git a/queue/abstract/driver/utube.lua b/queue/abstract/driver/utube.lua index 38f42328..4ad0dfa4 100644 --- a/queue/abstract/driver/utube.lua +++ b/queue/abstract/driver/utube.lua @@ -109,8 +109,25 @@ function method.take(self) if task[2] ~= state.READY then break end + local taken + -- Taking the minimum is an implicit transactions, so it is + -- always done with 'read-confirmed' mvcc isolation level. + -- It can lead to errors when trying to make parallel 'take' calls with mvcc enabled. + -- It is hapenning because 'min' for several takes in parallel will be the same since + -- read confirmed isolation level makes visible all transactions that finished the commit. + -- To fix it we wrap it with box.begin/commit and set right isolation level. + -- Current fix does not resolve that bug in situations when we already are in transaction + -- since it will open nested transactions. + -- See https://github.com/tarantool/queue/issues/207 + -- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/ + if box.cfg.memtx_use_mvcc_engine and (not box.is_in_txn()) then + box.begin({txn_isolation = 'read-committed'}) + taken = self.space.index.utube:min{state.TAKEN, task[3]} + box.commit() + else + taken = self.space.index.utube:min{state.TAKEN, task[3]} + end - local taken = self.space.index.utube:min{state.TAKEN, task[3]} if taken == nil or taken[2] ~= state.TAKEN then task = self.space:update(task[1], { { '=', 2, state.TAKEN } }) self.on_task_change(task, 'take') diff --git a/queue/abstract/driver/utubettl.lua b/queue/abstract/driver/utubettl.lua index 1677d745..d9cb95bf 100644 --- a/queue/abstract/driver/utubettl.lua +++ b/queue/abstract/driver/utubettl.lua @@ -300,7 +300,25 @@ function method.take(self) break elseif not is_expired(t) then local next_event = util.time() + t[i_ttr] - local taken = self.space.index.utube:min{state.TAKEN, t[i_utube]} + local taken + -- Taking the minimum is an implicit transactions, so it is + -- always done with 'read-confirmed' mvcc isolation level. + -- It can lead to errors when trying to make parallel 'take' calls with mvcc enabled. + -- It is hapenning because 'min' for several takes in parallel will be the same since + -- read confirmed isolation level makes visible all transactions that finished the commit. + -- To fix it we wrap it with box.begin/commit and set right isolation level. + -- Current fix does not resolve that bug in situations when we already are in transaction + -- since it will open nested transactions. + -- See https://github.com/tarantool/queue/issues/207 + -- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/ + if box.cfg.memtx_use_mvcc_engine and (not box.is_in_txn()) then + box.begin({txn_isolation = 'read-committed'}) + taken = self.space.index.utube:min{state.TAKEN, t[i_utube]} + box.commit() + else + taken = self.space.index.utube:min{state.TAKEN, t[i_utube]} + end + if taken == nil or taken[i_status] ~= state.TAKEN then t = self.space:update(t[1], { { '=', i_status, state.TAKEN },