Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Possible driver manager concept #182

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

clint-lawrence
Copy link
Collaborator

This is pretty scrappy, but mostly trying to test out the big picture idea.

Have a look at test-script.py. Here is the general concept:

# 1. Define a class which is your "driver manager"
@dataclasses.dataclass
class Jig123DriverManager:
    dmm: dmm.DMM = dataclasses.field(default_factory=dmm.open)


# 2. TestList and TestClass are generic. You need connect them to the driver manager type
class Jig123TestList(TestList[Jig123DriverManager]):
    pass


class Jig123TestClass(TestClass[Jig123DriverManager]):
    pass

# 3. Define your tests & lists as normal. But now the driver manager will be passed in by the
# sequencer to test, set_up, tear_down, enter and exit. By annotating the dm attribute, you
# get reliable autocomplete in the IDE.
class Test(Jig123TestClass):
    def test(self, dm: Jig123DriverManager):
        value = dm.dmm.measure()
        ...

# 4. Instead of only specifying a top level test list to the sequencer, we will now
# pass in a `TestScript`, which tell the sequencer how to create a driver manager to pass
# into the TestClass/TestList methods.
TEST_SCRIPT = TestScript(
    test_list=Jig123TestList(... Test list goes here ... ),
    dm_type=Jig123DriverManager,
)

The doesn't work 100% yet, the example with the "dummy" driver works. obviously we will also need to think about backwards compatibility.

@clint-lawrence
Copy link
Collaborator Author

clint-lawrence commented May 28, 2024

At the moment, we sometimes create drivers that depend on other drivers. Here is one example:

dm = JigDriverManager()

class DriverManager:
    def __init__(self):
        pub.subscribe(self.test_exception, "Test_Exception")
        logging.debug("Driver Manager Init")
        self._dmm = None
        self._pps = None
        self._relay_matrix = None
        self._jig_J513 = None

class FTDIAddressHandlerJ513(AddressHandler):
    pin_list = (
        "1K1",
        ...
        "3K16",
    )

    def update_output(self, value):
        dm.relay_matrix.serial_shift_bit_bang(value)

Note the reference to the global dm in the update_output method. The way the address handlers get created and added to a JigDriver subclass makes this a challenge to untangle. The whole switching/VirtualMux/AddressHandler code is in need of a birthday anyway and perhaps that will need to have before the changes proposed here are useful.

Happy to be proven wrong on that :)

@clint-lawrence
Copy link
Collaborator Author

... The whole switching/VirtualMux/AddressHandler code is in need of a birthday anyway and perhaps that will need to have before the changes proposed here are useful.

Starting to chip away at that problem over here master...clint-lawrence:Fixate:jig-mux-refactor

@clint-lawrence clint-lawrence removed the request for review from John2202W June 20, 2024 03:46
@John2202W
Copy link
Collaborator

Another aspect that would be helpful to tie into a rework of the driver manager is a way to decouple or delay the execution of opening drivers until they are used.

I.e. for IPM3 we are testing a bunch of comms channels and using a mux to switch in either serial uart or RS485 converters. Would be ideal to have a simple test list such as:

CommsTests(
        [
            SerialCommTest("OTS-BT", dm.rs422_A, dm.rs422_B),
            SerialCommTest("OTS-IF", dm.rs422_A, dm.serial_uart),
            SerialCommTest("PROT-IF", dm.rs422_A, dm.serial_uart),
            SerialCommTest("NFC", dm.rs422_A, dm.serial_uart),
        ]
    ),

With a generic test class:

class SerialCommTest(TestClass):
    def __init__(
        self, mux: str, PortA: serial.Serial, PortB: serial.Serial, *args, **kwargs
    ):
        super().__init__(*args, **kwargs)
        self.mux = mux
        self.PortA = PortA
        self.PortB = PortB

    def test(self):
        dm.jig_j511.mux(self.mux)
        self.PortA.write(self.Message)
        message1 = self.PortB.readline()
        chk_true(message1 == self.Message, f"Comms channel: {self.mux}")

        message_return = self.Message + "-reverse".encode("utf-8")
        self.PortB.write(message_return)
        message2 = self.PortA.readline()
        chk_true(
            message2 == message_return, f"Comms channel: {self.mux} return"
        )

With each of the serial ports being a property of the Driver Manager:

    @property
    def rs422_A(self):
        """RS422 converters for serial comms"""
        if self._rs422_A is None:
            self._rs422_A = serial.Serial(
                findftdi.by_description("J511-RS422-A"),
                baudrate=57600,
                timeout=0.1,
            )
        return self._rs422_A

    @property
    def rs422_B(self):
        """RS422 converters for serial comms"""
        if self._rs422_B is None:
            self._rs422_B = serial.Serial(
                findftdi.by_description("J511-RS422-B"),
                baudrate=57600,
                timeout=0.1
            )
        return self._rs422_B

    @property
    def serial_uart(self):
        """FT232 for Serial UART"""
        if self._serial_uart is None:
            self._serial_uart = serial.Serial(
                findftdi.by_description("J511-Serial"), baudrate=57600, timeout=0.1
            )
        return self._serial_uart

Since at import time the serial ports are trying to be accessed, if the device is not connected an exception will be raised before the sequencer and UI are up and running.

@pazzarpj
Copy link
Collaborator

pazzarpj commented Jul 16, 2024

Hey, I know I'm very out of the loop with current practices. Written on phone so excuse code mistakes.

But is is better to do something like dependency injection and do away with the driver manager altogether as a global accessor. But rather just use as a context manager and to inject from it to the methods.

Would give you the delayed initialisation. And makes type checking easier and local to the the test function.

class SerialCommTest(TestClass):
    def __init__(
        self, mux: str, PortA: serial.Serial, PortB: serial.Serial, *args, **kwargs
    ):
        super().__init__(*args, **kwargs)
        self.mux = mux
        self.port_a = PortA
        self.port_b = PortB

    def setup(self, jig_j511: JigJ511):
        jig_j511.mux(self.mux)

    def test(self, port_a: serial.Serial, port_b: serial.Serial):
        port_a.write(self.Message)
        message1 = port_b.readline()
        chk_true(message1 == self.Message, f"Comms channel: {self.mux}")

        message_return = self.Message + "-reverse".encode("utf-8")
        port_b.write(message_return)
        message2 = port_b.readline()
        chk_true(
            message2 == message_return, f"Comms channel: {self.mux} return"
        )

@pazzarpj
Copy link
Collaborator

pazzarpj commented Jul 16, 2024

from contextlib import contextmanager
class DriverManager:
   stack: ExitStack
   ...

    def register(name:str, driver:Generator):
       self.drivers[name] = contextmanager(driver)

    def __enter__():
        self.stack = ExitStack()
        return self

    ...

    def enter_context(name:str):
        self.active[name] = self.stack.enter_context(self.drivers[name])
        return self.active[name]
        
    # properties for backwards compatibility 
    @property
    def rs422_A(self) -> serial.Serial:
        """RS422 converters for serial comms"""
        return self.active.get("rs422_A",  self.enter_context("rs422_A"))

@pazzarpj
Copy link
Collaborator

pazzarpj commented Jul 16, 2024

Although, I guess the enter_context and stack should live on the test list, or sequencer not the driver manager. So that you could have set up and teardown of drivers at the same level required by the test lists. Or the sequencer could inject it into the test list etc.

Either way, I think this approach simplifies specifying type annotations, at the cost of a "little bit of magic" with using the inspect module to find the appropriate driver for each test list / test by looking at the function signature and matching it by name.

The driver manager also becomes generic instead of a per test script implementation. Not relying on dataclass field default factories, allows lazy loading. And provides actual clear definitions on when to clean up the driver.

Eg.

def dmm_factory():
   dmm = DMM.open()
   yield dmm
   dmm.close()

def serial_factory(description:str, baud:int):
   @wraps
   def tmp():
       device = serial.Serial(
                findftdi.by_description(description),
                baudrate=baud,
                timeout=0.1,
            )
        yield device
        device.close()
    return tmp
   
dm = DriverManager()
dm.register("dmm", dmm_factory)
dm.register("rs422_A", serial_factory("J511-RS422-A",57600))
dm.register("rs422_B", serial_factory("J511-RS422-B",57600))

You could specify in the top level list if you need the dmm for all tests, or you could get down to specifying only a single mux for a test instead of jig drivers.mux

Forgive me if I'm way off base, haven't really looked at it the past 5 years

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants