Skip to content

Commit

Permalink
fixed a race condition in the table update (#127)
Browse files Browse the repository at this point in the history
Introduced a thread lock to fix a race condition in the table update
  • Loading branch information
evalott100 authored Jul 3, 2024
1 parent 6ffd344 commit fdc904e
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 13 deletions.
51 changes: 39 additions & 12 deletions src/pandablocks_ioc/_tables.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# IOC Table record support

import logging
import threading
import typing
from collections import OrderedDict
from dataclasses import dataclass
Expand Down Expand Up @@ -184,6 +185,9 @@ def __init__(
self.client = client
self.table_name = table_name
self.field_info = field_info
self._mode_lock = threading.Lock()
self._sent_data: List[str] = []
self._update_in_progress = False
pva_table_name = RecordName(table_name)

# Make a labels field
Expand Down Expand Up @@ -298,6 +302,7 @@ def __init__(
DESC="Controls PandA <-> EPICS data interface",
initial_value=TableModeEnum.VIEW.value,
on_update=self.update_mode,
validate=self._wait_for_mode_lock,
)
pvi_name = epics_to_pvi_name(mode_record_name)
Pvi.add_pvi_info(
Expand Down Expand Up @@ -340,6 +345,16 @@ def __init__(
},
)

def __del__(self):
self._mode_lock.release()

def _wait_for_mode_lock(self, record: RecordWrapper, new_val):
mode = TableModeEnum(new_val)
with self._mode_lock:
if mode == TableModeEnum.EDIT and self._update_in_progress:
return False
return True

def validate_waveform(self, record: RecordWrapper, new_val) -> bool:
"""Controls whether updates to the waveform records are processed, based on the
value of the MODE record.
Expand All @@ -355,7 +370,7 @@ def validate_waveform(self, record: RecordWrapper, new_val) -> bool:
record_val = self.mode_record_info.record.get()

if record_val == TableModeEnum.VIEW.value:
logging.debug(
logging.error(
f"{self.table_name} MODE record is VIEW, stopping update "
f"to {record.name}"
)
Expand All @@ -370,14 +385,14 @@ def validate_waveform(self, record: RecordWrapper, new_val) -> bool:
# SUBMIT only present when currently writing out data to PandA.
logging.warning(
f"Update of record {record.name} attempted while MODE was SUBMIT."
"New value will be discarded"
"New will be discarded"
)
return False
elif record_val == TableModeEnum.DISCARD.value:
# DISCARD only present when currently overriding local data with PandA data
logging.warning(
f"Update of record {record.name} attempted while MODE was DISCARD."
"New value will be discarded"
"New will be discarded"
)
return False
else:
Expand Down Expand Up @@ -411,6 +426,7 @@ async def update_mode(self, new_val: int):

panda_field_name = epics_to_panda_name(self.table_name)
await self.client.send(Put(panda_field_name, packed_data))
self._sent_data = packed_data

except Exception:
logging.exception(
Expand Down Expand Up @@ -491,9 +507,23 @@ def update_table(self, new_values: List[str]) -> None:
new_values: The list of new values from the PandA
"""

curr_mode = TableModeEnum(self.mode_record_info.record.get())
if self._sent_data == new_values:
# Received changes back from the panda that were updated
# from this method already
return

if curr_mode == TableModeEnum.VIEW:
self._sent_data = []

with self._mode_lock:
if TableModeEnum(self.mode_record_info.record.get()) == TableModeEnum.EDIT:
logging.warning(
f"Update of table {self.table_name} attempted when MODE "
"was not VIEW. New value will be discarded"
)
else:
self._update_in_progress = True

if self._update_in_progress:
field_data = words_to_table(new_values, self.field_info)

for field_name, field_record in self.table_fields_records.items():
Expand All @@ -503,12 +533,9 @@ def update_table(self, new_values: List[str]) -> None:
field_data, field_name, field_record.field
)

# Must skip processing as the validate method would reject the update
# Must skip processing as the validate method would
# reject the update
field_record.record_info.record.set(waveform_val, process=False)

else:
# No other mode allows PandA updates to EPICS records
logging.warning(
f"Update of table {self.table_name} attempted when MODE "
"was not VIEW. New value will be discarded"
)
with self._mode_lock:
self._update_in_progress = False
64 changes: 63 additions & 1 deletion tests/test_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from pandablocks.asyncio import AsyncioClient
from pandablocks.commands import GetMultiline, Put
from pandablocks.responses import TableFieldDetails, TableFieldInfo
from pandablocks.utils import words_to_table
from softioc import alarm

from fixtures.mocked_panda import TIMEOUT, command_to_key, multiprocessing_queue_to_list
Expand Down Expand Up @@ -275,7 +276,9 @@ def test_table_updater_validate_mode_discard(table_updater: TableUpdater):
def test_table_updater_validate_mode_unknown(table_updater: TableUpdater):
"""Test the validate method when mode is unknown"""

table_updater.mode_record_info.record.get = MagicMock(return_value="UnknownValue")
table_updater.mode_record_info.record.get = MagicMock(
return_value=12
) # Unknown Value
table_updater.mode_record_info.record.set_alarm = MagicMock()

record = MagicMock()
Expand Down Expand Up @@ -485,3 +488,62 @@ def test_table_updater_update_table_not_view(
record_info = table_updater.table_fields_records[field_name].record_info
assert record_info
record_info.record.set.assert_not_called()


async def test_table_update_skips_data_sent_from_ioc_once_received_back(
table_updater: TableUpdater,
table_data_1: List[str],
):
"""A test that once the ioc sets table values, it won't attempt to
set the same values once they come back from the panda"""

await table_updater.update_mode(TableModeEnum.VIEW.value)
await table_updater.update_mode(TableModeEnum.EDIT.value)
table_updater.update_table(table_data_1)
await table_updater.update_mode(TableModeEnum.SUBMIT.value)
table_updater.update_table(table_data_1)
assert table_updater._sent_data == table_data_1
for field_record in table_updater.table_fields_records.values():
assert isinstance(field_record.record_info, RecordInfo)
field_record.record_info.record.set.assert_called_once()


async def test_table_update_mode_thread_lock(table_updater):
async def some_other_mode_update_from_panda():
with table_updater._mode_lock:
table_updater._update_in_progress = True
await asyncio.sleep(0.1)

async def ioc_update_mode_record():
# Enough time for the above coroutine to grab the lock
await asyncio.sleep(0.05)
assert table_updater._wait_for_mode_lock(None, TableModeEnum.EDIT) is False

assert table_updater._wait_for_mode_lock(None, TableModeEnum.EDIT) is True
await asyncio.gather(some_other_mode_update_from_panda(), ioc_update_mode_record())


async def test_table_changed_back_correctly(table_updater, table_data_1, table_data_2):
table_unpacked_data_1 = words_to_table(
table_data_1, table_updater.field_info, convert_enum_indices=True
)
table_unpacked_data_2 = words_to_table(
table_data_2, table_updater.field_info, convert_enum_indices=True
)

def assert_last_table_data_set_equaled(table_unpacked_data):
for row, field_record in zip(
table_unpacked_data.values(), table_updater.table_fields_records.values()
):
assert numpy.array_equal(
row, field_record.record_info.record.set.call_args[0][-1]
)

table_updater.update_table(table_data_1)
await table_updater.update_mode(TableModeEnum.SUBMIT.value)
table_updater.update_table(table_data_1)
assert_last_table_data_set_equaled(table_unpacked_data_1)
table_updater.update_table(table_data_2)
assert_last_table_data_set_equaled(table_unpacked_data_2)
table_updater.update_table(table_data_1)
assert_last_table_data_set_equaled(table_unpacked_data_1)

0 comments on commit fdc904e

Please sign in to comment.