Skip to content

Commit

Permalink
shopfloor_base: improve lock for update action
Browse files Browse the repository at this point in the history
Add the skip locked option.
  • Loading branch information
TDu committed Nov 27, 2023
1 parent 40348b7 commit 7ec0522
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 4 deletions.
25 changes: 21 additions & 4 deletions shopfloor_base/actions/lock.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Copyright 2022 Michael Tietz (MT Software) <mtietz@mt-software.de>
# Copyright 2023 Camptocamp SA (http://www.camptocamp.com)
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html).
import hashlib
import struct
Expand Down Expand Up @@ -26,7 +27,23 @@ def advisory(self, name):
self.env.cr.execute("SELECT pg_advisory_xact_lock(%s);", (int_lock,))
self.env.cr.fetchone()[0]

def for_update(self, records, log_exceptions=False):
"""Lock a table FOR UPDATE"""
sql = "SELECT id FROM %s WHERE ID IN %%s FOR UPDATE" % records._table
self.env.cr.execute(sql, (tuple(records.ids),), log_exceptions=False)
def for_update(self, records, log_exceptions=False, skip_locked=False):
"""Lock rows for update on a specific table.
This function will try to obtain a lock on the rows (records parameter) and
wait until they are available for update.
Using the SKIP LOCKED parameter (better used with only one record), it will
not wait for the row to be available but return False if the lock could not
be obtained.
"""
query = "SELECT id FROM %s WHERE ID IN %%s FOR UPDATE"
if skip_locked:
query += " SKIP LOCKED"
sql = query % records._table
self.env.cr.execute(sql, (tuple(records.ids),), log_exceptions=log_exceptions)
if skip_locked:
rows = self.env.cr.fetchall()
return len(rows) == len(records)
return True
1 change: 1 addition & 0 deletions shopfloor_base/tests/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from . import test_actions_data
from . import test_actions_lock
from . import test_menu_service
from . import test_profile_service
from . import test_scan_anything_service
Expand Down
29 changes: 29 additions & 0 deletions shopfloor_base/tests/test_actions_lock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright 2023 Camptocamp SA (http://www.camptocamp.com)
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html).
from contextlib import closing

from .common import CommonCase


class ActionsLockCase(CommonCase):
@classmethod
def setUpClassBaseData(cls):
super().setUpClassBaseData()
cls.partner = cls.env.ref("base.res_partner_12")
with cls.work_on_actions(cls) as work:
cls.lock = work.component(usage="lock")

def test_select_for_update_skip_locked_ok(self):
"""Check the lock is obtained and True is returned."""
result = self.lock.for_update(self.partner, skip_locked=True)
self.assertTrue(result)

def test_select_for_update_skip_locked_not_ok(self):
"""Check the lock is NOT obtained and False is returned."""
with closing(self.registry.cursor()) as cr:
# Simulate another user locked a row
cr.execute(
"SELECT id FROM res_partner WHERE id=%s FOR UPDATE", (self.partner.id,)
)
result = self.lock.for_update(self.partner, skip_locked=True)
self.assertFalse(result)

0 comments on commit 7ec0522

Please sign in to comment.