From 3f1c3a35d91e894718267793841a9c3c7974e282 Mon Sep 17 00:00:00 2001 From: Kenneth Yang <82800265+kjy5@users.noreply.github.com> Date: Fri, 15 Dec 2023 12:02:23 -0800 Subject: [PATCH] 233 fix docstrings (#275) * Fix boolean print * Fixed common * Fixed platform handler * Added data info for input format, fixed server * Fixed platform comments, use subclassing for uMp3-3 --- src/ephys_link/common.py | 90 ++++++--- src/ephys_link/platform_handler.py | 171 +++++------------- .../platforms/new_scale_manipulator.py | 54 +++--- .../platforms/new_scale_pathfinder_handler.py | 43 ++--- src/ephys_link/platforms/sensapex_handler.py | 7 +- .../platforms/sensapex_manipulator.py | 47 +++-- src/ephys_link/platforms/ump3_handler.py | 102 +---------- src/ephys_link/platforms/ump3_manipulator.py | 124 ++----------- src/ephys_link/server.py | 150 ++++++++------- 9 files changed, 273 insertions(+), 515 deletions(-) diff --git a/src/ephys_link/common.py b/src/ephys_link/common.py index ae4ee4d..e11b63d 100644 --- a/src/ephys_link/common.py +++ b/src/ephys_link/common.py @@ -14,9 +14,9 @@ def dprint(message: str) -> None: - """Print message if debug is enabled + """Print message if debug is enabled. - :param message: Message to print + :param message: Message to print. :type message: str :return: None """ @@ -26,30 +26,60 @@ def dprint(message: str) -> None: # Input data formats class GotoPositionInputDataFormat(TypedDict): - """Data format for :func:`server.goto_pos`""" + """Data format for positional requests. + + :param manipulator_id: ID of the manipulator to move. + :type manipulator_id: str + :param pos: Position to move to in mm (X, Y, Z, W). + :type pos: list[float] + :param speed: Speed to move at in mm/s. + :type speed: float + """ manipulator_id: str pos: list[float] - speed: int + speed: float class InsideBrainInputDataFormat(TypedDict): - """Data format for :func:`server.set_inside_brain`""" + """Data format for setting inside brain state. + + :param manipulator_id: ID of the manipulator to move. + :type manipulator_id: str + :param inside: Whether the manipulator is inside the brain. + :type inside: bool + """ manipulator_id: str inside: bool class DriveToDepthInputDataFormat(TypedDict): - """Data format for :func:`server.drive_to_depth`""" + """Data format for depth driving requests. + + :param manipulator_id: ID of the manipulator to move. + :type manipulator_id: str + :param depth: Depth to drive to in mm. + :type depth: float + :param speed: Speed to drive at in mm/s. + :type speed: float + """ manipulator_id: str depth: float - speed: int + speed: float class CanWriteInputDataFormat(TypedDict): - """Data format for :func:`server.set_can_write`""" + """Data format for setting can write state. + + :param manipulator_id: ID of the manipulator to move. + :type manipulator_id: str + :param can_write: Whether the manipulator can write. + :type can_write: bool + :param hours: Number of hours the manipulator can write for. + :type hours: float + """ manipulator_id: str can_write: bool @@ -58,19 +88,19 @@ class CanWriteInputDataFormat(TypedDict): # Output data dictionaries class GetManipulatorsOutputData(dict): - """Output format for (manipulators) + """Output format for get manipulators request. - :param manipulators: Tuple of manipulator IDs (as strings) + :param manipulators: List of manipulator IDs (as strings). :type manipulators: list - :param num_axes: Number of axes this manipulator has + :param num_axes: Number of axes this manipulator has. :type num_axes: int - :param dimensions: Size of the movement space in mm (first 3 axes) + :param dimensions: Size of the movement space in mm (first 3 axes). :type dimensions: list - :param error: Error message + :param error: Error message. :type error: str :example: Example generated dictionary - :code:`{"manipulators": ["1", "2"], "error": ""}` + :code:`{"manipulators": ["1", "2"], "num_axes": 4, "dimensions": [20, 20, 20], "error": ""}` """ def __init__(self, manipulators: list, num_axes: int, dimensions: list, error: str) -> None: @@ -88,11 +118,11 @@ def json(self) -> str: class PositionalOutputData(dict): - """Output format for (position, error) + """Output format for positional requests. - :param position: Position in mm (as a tuple, can be empty) in X, Y, Z, W order + :param position: Position in mm (as a list, empty on error) in X, Y, Z, W order. :type position: list - :param error: Error message + :param error: Error message. :type error: str :example: Example generated dictionary @@ -109,11 +139,11 @@ def json(self) -> str: class AngularOutputData(dict): - """Output format for (angles, error) + """Output format for manipulator angle requests. - :param angles: Angles in degrees (as a tuple, can be empty) in yaw, pitch, roll order + :param angles: Angles in degrees (as a list, can be empty) in yaw, pitch, roll order. :type angles: list - :param error: Error message + :param error: Error message. :type error: str """ @@ -127,11 +157,11 @@ def json(self) -> str: class ShankCountOutputData(dict): - """Output format for (num_shanks, error) + """Output format for number of shanks. - :param shank_count: Number of shanks on the probe + :param shank_count: Number of shanks on the probe (-1 if error). :type shank_count: int - :param error: Error message + :param error: Error message. :type error: str """ @@ -145,14 +175,14 @@ def json(self) -> str: class DriveToDepthOutputData(dict): - """Output format for depth driving (depth, error) + """Output format for depth driving. - :param depth: Depth in mm + :param depth: Depth in mm (0 on error). :type depth: float - :param error: Error message + :param error: Error message. :type error: str - :example: Example generated dictionary :code:`{"depth": 0.123, "error": ""}` + :example: Example generated dictionary :code:`{"depth": 1.23, "error": ""}` """ def __init__(self, depth: float, error: str) -> None: @@ -165,11 +195,11 @@ def json(self) -> str: class StateOutputData(dict): - """Output format for (state, error) + """Output format for boolean state requests. - :param state: State of the event + :param state: State of the event. :type state: bool - :param error: Error message + :param error: Error message. :type error: str :example: Example generated dictionary :code:`{"state": True, "error": ""}` diff --git a/src/ephys_link/platform_handler.py b/src/ephys_link/platform_handler.py index cfbeb9b..1f7b7b0 100644 --- a/src/ephys_link/platform_handler.py +++ b/src/ephys_link/platform_handler.py @@ -40,9 +40,9 @@ def __init__(self): # Platform Handler Methods def reset(self) -> bool: - """Reset handler + """Reset handler. - :return: True if successful, False otherwise + :return: True if successful, False otherwise. :rtype: bool """ stop_result = self.stop() @@ -50,9 +50,9 @@ def reset(self) -> bool: return stop_result def stop(self) -> bool: - """Stop handler + """Stop handler. - :return: True if successful, False otherwise + :return: True if successful, False otherwise. :rtype: bool """ try: @@ -66,28 +66,27 @@ def stop(self) -> bool: return True def get_manipulators(self) -> com.GetManipulatorsOutputData: - """Get all registered manipulators + """Get all registered manipulators. - :return: Callback parameters (manipulators, error) + :return: Result of connected manipulators, platform information, and error message (if any). :rtype: :class:`ephys_link.common.GetManipulatorsOutputData` """ - devices = [] error = "Error getting manipulators" try: - # noinspection PyUnresolvedReferences devices = self._get_manipulators() error = "" except Exception as e: print(f"[ERROR]\t\t Getting manipulators: {type(e)}: {e}\n") + return com.GetManipulatorsOutputData([], self.num_axes, self.dimensions, error) else: return com.GetManipulatorsOutputData(devices, self.num_axes, self.dimensions, error) def register_manipulator(self, manipulator_id: str) -> str: - """Register a manipulator + """Register a manipulator. :param manipulator_id: The ID of the manipulator to register. :type manipulator_id: str - :return: Callback parameter (Error message (on error)) + :return: Error message on error, empty string otherwise. :rtype: str """ # Check if manipulator is already registered @@ -112,11 +111,12 @@ def register_manipulator(self, manipulator_id: str) -> str: return "" def unregister_manipulator(self, manipulator_id: str) -> str: - """Unregister a manipulator + """Unregister a manipulator. :param manipulator_id: The ID of the manipulator to unregister. :type manipulator_id: str - :return: Callback parameters (error message (on error)) + :return: Error message on error, empty string otherwise. + :rtype: str """ # Check if manipulator is not registered if manipulator_id not in self.manipulators: @@ -126,23 +126,21 @@ def unregister_manipulator(self, manipulator_id: str) -> str: try: # Unregister manipulator self._unregister_manipulator(manipulator_id) - - com.dprint(f"[SUCCESS]\t Unregistered manipulator: {manipulator_id}\n") except Exception as e: # Other error print(f"[ERROR]\t\t Unregistering manipulator: {manipulator_id}") print(f"{e}\n") return "Error unregistering manipulator" else: + com.dprint(f"[SUCCESS]\t Unregistered manipulator: {manipulator_id}\n") return "" def get_pos(self, manipulator_id: str) -> com.PositionalOutputData: - """Get the current position of a manipulator + """Get the current position of a manipulator. :param manipulator_id: The ID of the manipulator to get the position of. :type manipulator_id: str - :return: Callback parameters (manipulator ID, position in (x, y, z, w) (or an - empty array on error) in mm, error message) + :return: Positional information for the manipulator and error message (if any). :rtype: :class:`ephys_link.common.PositionalOutputData` """ try: @@ -165,19 +163,17 @@ def get_pos(self, manipulator_id: str) -> com.PositionalOutputData: if manipulator_pos["error"] != "": return manipulator_pos return com.PositionalOutputData(self._platform_space_to_unified_space(manipulator_pos["position"]), "") - except KeyError: # Manipulator not found in registered manipulators print(f"[ERROR]\t\t Manipulator not registered: {manipulator_id}") return com.PositionalOutputData([], "Manipulator not registered") def get_angles(self, manipulator_id: str) -> com.AngularOutputData: - """Get the current position of a manipulator + """Get the current position of a manipulator. - :param manipulator_id: The ID of the manipulator to get the position of. + :param manipulator_id: The ID of the manipulator to get the angles of. :type manipulator_id: str - :return: Callback parameters (angles in (yaw, pitch, roll) or an - empty array on error in degrees, error message) + :return: Angular information for the manipulator and error message (if any). :rtype: :class:`ephys_link.common.AngularOutputData` """ try: @@ -202,11 +198,12 @@ def get_shank_count(self, manipulator_id: str) -> com.ShankCountOutputData: :param manipulator_id: The ID of the manipulator to get the number of shanks of. :type manipulator_id: str - :return: Callback parameters (number of shanks or -1 on error, error message) + :return: Number of shanks on the probe. + :rtype: :class:`ephys_link.common.ShankCountOutputData` """ return self._get_shank_count(manipulator_id) - async def goto_pos(self, manipulator_id: str, position: list[float], speed: int) -> com.PositionalOutputData: + async def goto_pos(self, manipulator_id: str, position: list[float], speed: float) -> com.PositionalOutputData: """Move manipulator to position :param manipulator_id: The ID of the manipulator to move @@ -214,9 +211,8 @@ async def goto_pos(self, manipulator_id: str, position: list[float], speed: int) :param position: The position to move to in (x, y, z, w) in mm :type position: list[float] :param speed: The speed to move at (in mm/s) - :type speed: int - :return: Callback parameters (manipulator ID, position in (x, y, z, w) (or an - empty array on error) in mm, error message) + :type speed: float + :return: Resulting position of the manipulator and error message (if any). :rtype: :class:`ephys_link.common.PositionalOutputData` """ try: @@ -242,7 +238,7 @@ async def goto_pos(self, manipulator_id: str, position: list[float], speed: int) print(f"[ERROR]\t\t Manipulator not registered: {manipulator_id}\n") return com.PositionalOutputData([], "Manipulator not registered") - async def drive_to_depth(self, manipulator_id: str, depth: float, speed: int) -> com.DriveToDepthOutputData: + async def drive_to_depth(self, manipulator_id: str, depth: float, speed: float) -> com.DriveToDepthOutputData: """Drive manipulator to depth :param manipulator_id: The ID of the manipulator to drive @@ -250,9 +246,8 @@ async def drive_to_depth(self, manipulator_id: str, depth: float, speed: int) -> :param depth: The depth to drive to in mm :type depth: float :param speed: The speed to drive at (in mm/s) - :type speed: int - :return: Callback parameters (manipulator ID, depth (or 0 on error) in mm, error - message) + :type speed: float + :return: Resulting depth of the manipulator and error message (if any). :rtype: :class:`ephys_link.common.DriveToDepthOutputData` """ try: @@ -290,7 +285,7 @@ def set_inside_brain(self, manipulator_id: str, inside: bool) -> com.StateOutput :type manipulator_id: str :param inside: True if inside brain, False if outside :type inside: bool - :return: Callback parameters (manipulator ID, inside, error message) + :return: New inside brain state of the manipulator and error message (if any). :rtype: :class:`ephys_link.common.StateOutputData` """ try: @@ -322,7 +317,7 @@ async def calibrate(self, manipulator_id: str, sio: socketio.AsyncServer) -> str :type manipulator_id: str :param sio: SocketIO object (to call sleep) :type sio: :class:`socketio.AsyncServer` - :return: Callback parameters (manipulator ID, error message) + :return: Error message on error, empty string otherwise. :rtype: str """ try: @@ -349,7 +344,7 @@ def bypass_calibration(self, manipulator_id: str) -> str: :param manipulator_id: ID of manipulator to bypass calibration :type manipulator_id: str - :return: Callback parameters (manipulator ID, error message) + :return: Error message on error, empty string otherwise. :rtype: str """ try: @@ -384,7 +379,7 @@ def set_can_write( :type hours: float :param sio: SocketIO object from server to emit reset event :type sio: :class:`socketio.AsyncServer` - :return: Callback parameters (manipulator ID, can_write, error message) + :return: New can_write state of the manipulator and error message (if any). :rtype: :class:`ephys_link.common.StateOutputData` """ try: @@ -393,7 +388,6 @@ def set_can_write( # Manipulator not found in registered manipulators print(f"[ERROR]\t\t Manipulator not registered: {manipulator_id}\n") return com.StateOutputData(False, "Manipulator not " "registered") - except Exception as e: # Other error print(f"[ERROR]\t\t Set manipulator {manipulator_id} can_write state") @@ -404,102 +398,39 @@ def set_can_write( @abstractmethod def _get_manipulators(self) -> list: - """Get all registered manipulators - - :return: List of manipulator IDs - :rtype: list - """ + raise NotImplementedError @abstractmethod def _register_manipulator(self, manipulator_id: str) -> None: - """Register a manipulator - - :param manipulator_id: The ID of the manipulator to register. - :type manipulator_id: str - :return: None - """ + raise NotImplementedError @abstractmethod def _unregister_manipulator(self, manipulator_id: str) -> None: - """Unregister a manipulator - - :param manipulator_id: The ID of the manipulator to unregister. - :type manipulator_id: str - :return: None - """ + raise NotImplementedError @abstractmethod def _get_pos(self, manipulator_id: str) -> com.PositionalOutputData: - """Get the current position of a manipulator - - :param manipulator_id: The ID of the manipulator to get the position of. - :type manipulator_id: int - :return: Callback parameters (manipulator ID, position in (x, y, z, w) (or an - empty array on error) in mm, error message) - :rtype: :class:`ephys_link.common.PositionalOutputData` - """ + raise NotImplementedError @abstractmethod def _get_angles(self, manipulator_id: str) -> com.AngularOutputData: - """Get the current angles of a manipulator - - :param manipulator_id: The ID of the manipulator to get the position of. - :type manipulator_id: int - :return: Callback parameters (manipulator ID, position in (yaw, pitch, roll) (or an - empty array on error) in degrees, error message) - :rtype: :class:`ephys_link.common.AngularOutputData` - """ + raise NotImplementedError @abstractmethod def _get_shank_count(self, manipulator_id: str) -> com.ShankCountOutputData: - """Get the number of shanks on the probe - - :param manipulator_id: The ID of the manipulator to get the number of shanks of. - :type manipulator_id: str - :return: Callback parameters (number of shanks or -1 on error, error message) - """ + raise NotImplementedError @abstractmethod async def _goto_pos(self, manipulator_id: str, position: list[float], speed: int) -> com.PositionalOutputData: - """Move manipulator to position - - :param manipulator_id: The ID of the manipulator to move - :type manipulator_id: str - :param position: The position to move to in mm - :type position: list[float] - :param speed: The speed to move at (in mm/s) - :type speed: int - :return: Callback parameters (manipulator ID, position in (x, y, z, w) (or an - empty array on error) in mm, error message) - :rtype: :class:`ephys_link.common.PositionalOutputData` - """ + raise NotImplementedError @abstractmethod async def _drive_to_depth(self, manipulator_id: str, depth: float, speed: int) -> com.DriveToDepthOutputData: - """Drive manipulator to depth - - :param manipulator_id: The ID of the manipulator to drive - :type manipulator_id: str - :param depth: The depth to drive to in mm - :type depth: float - :param speed: The speed to drive at (in mm/s) - :type speed: int - :return: Callback parameters (manipulator ID, depth (or 0 on error), error - message) - :rtype: :class:`ephys_link.common.DriveToDepthOutputData` - """ + raise NotImplementedError @abstractmethod def _set_inside_brain(self, manipulator_id: str, inside: bool) -> com.StateOutputData: - """Set manipulator inside brain state (restricts motion) - - :param manipulator_id: The ID of the manipulator to set the state of - :type manipulator_id: str - :param inside: True if inside brain, False if outside - :type inside: bool - :return: Callback parameters (manipulator ID, inside, error message) - :rtype: :class:`ephys_link.common.StateOutputData` - """ + raise NotImplementedError @abstractmethod async def _calibrate(self, manipulator_id: str, sio: socketio.AsyncServer) -> str: @@ -515,13 +446,7 @@ async def _calibrate(self, manipulator_id: str, sio: socketio.AsyncServer) -> st @abstractmethod def _bypass_calibration(self, manipulator_id: str) -> str: - """Bypass calibration of manipulator - - :param manipulator_id: ID of manipulator to bypass calibration - :type manipulator_id: str - :return: Callback parameters (manipulator ID, error message) - :rtype: str - """ + raise NotImplementedError @abstractmethod def _set_can_write( @@ -531,19 +456,7 @@ def _set_can_write( hours: float, sio: socketio.AsyncServer, ) -> com.StateOutputData: - """Set manipulator can_write state (enables/disabled moving manipulator) - - :param manipulator_id: The ID of the manipulator to set the state of - :type manipulator_id: str - :param can_write: True if allowed to move, False if outside - :type can_write: bool - :param hours: The number of hours to allow writing (0 = forever) - :type hours: float - :param sio: SocketIO object from server to emit reset event - :type sio: :class:`socketio.AsyncServer` - :return: Callback parameters (manipulator ID, can_write, error message) - :rtype: :class:`ephys_link.common.StateOutputData` - """ + raise NotImplementedError @abstractmethod def _platform_space_to_unified_space(self, platform_position: list[float]) -> list[float]: @@ -554,6 +467,7 @@ def _platform_space_to_unified_space(self, platform_position: list[float]) -> li :return: Position in unified manipulator space (x, y, z, w) in mm :rtype: list[float] """ + raise NotImplementedError @abstractmethod def _unified_space_to_platform_space(self, unified_position: list[float]) -> list[float]: @@ -564,3 +478,4 @@ def _unified_space_to_platform_space(self, unified_position: list[float]) -> lis :return: Position in platform space (x, y, z, w) in mm :rtype: list[float] """ + raise NotImplementedError diff --git a/src/ephys_link/platforms/new_scale_manipulator.py b/src/ephys_link/platforms/new_scale_manipulator.py index c21731e..fad857a 100644 --- a/src/ephys_link/platforms/new_scale_manipulator.py +++ b/src/ephys_link/platforms/new_scale_manipulator.py @@ -65,10 +65,9 @@ def query_all_axes(self): axis.QueryPosStatus() def get_pos(self) -> com.PositionalOutputData: - """Get the current position of the manipulator and convert it into mm + """Get the current position of the manipulator and convert it into mm. - :return: Callback parameters (position in (x, y, z, 0) (or an empty array on - error) in mm, error message) + :return: Position of manipulator in (x, y, z, z) in mm (or an empty array on error) and error message (if any). :rtype: :class:`ephys_link.common.PositionalOutputData` """ self.query_all_axes() @@ -89,14 +88,14 @@ def get_pos(self) -> com.PositionalOutputData: return com.PositionalOutputData([], "Error getting position") async def goto_pos(self, position: list[float], speed: float) -> com.PositionalOutputData: - """Move manipulator to position + """Move manipulator to position. - :param position: The position to move to in mm + :param position: The position to move to in mm. :type position: list[float] - :param speed: The speed to move at (in mm/s) + :param speed: The speed to move at (in mm/s). :type speed: float - :return: Callback parameters (position in (x, y, z, w) (or an empty array on - error), error message) + :return: Resulting position of manipulator in (x, y, z, z) in mm (or an empty array on error) + and error message (if any). :rtype: :class:`ephys_link.common.PositionalOutputData` """ # Check if able to write @@ -159,14 +158,14 @@ async def goto_pos(self, position: list[float], speed: float) -> com.PositionalO print(f"{e}\n") return com.PositionalOutputData([], "Error moving manipulator") - async def drive_to_depth(self, depth: float, speed: int) -> com.DriveToDepthOutputData: - """Drive the manipulator to a certain depth + async def drive_to_depth(self, depth: float, speed: float) -> com.DriveToDepthOutputData: + """Drive the manipulator to a certain depth. - :param depth: The depth to drive to in mm + :param depth: The depth to drive to in mm. :type depth: float - :param speed: The speed to drive at in mm/s - :type speed: int - :return: Callback parameters (depth (or 0 on error), error message) + :param speed: The speed to drive at in mm/s. + :type speed: float + :return: Resulting depth of manipulator in mm (or 0 on error) and error message (if any). :rtype: :class:`ephys_link.common.DriveToDepthOutputData` """ # Check if able to write @@ -220,7 +219,7 @@ async def drive_to_depth(self, depth: float, speed: int) -> com.DriveToDepthOutp return com.DriveToDepthOutputData(0, "Error driving " "manipulator") def calibrate(self) -> bool: - """Calibrate the manipulator + """Calibrate the manipulator. :return: None """ @@ -229,7 +228,7 @@ def calibrate(self) -> bool: def get_calibrated(self) -> bool: """Return the calibration state of the manipulator. - :return: True if the manipulator is calibrated, False otherwise + :return: True if the manipulator is calibrated, False otherwise. :rtype: bool """ return self._calibrated @@ -242,14 +241,13 @@ def set_calibrated(self) -> None: self._calibrated = True def set_can_write(self, can_write: bool, hours: float, sio: socketio.AsyncServer) -> None: - """Set if the manipulator can move + """Set if the manipulator can move. - :param can_write: True if the manipulator can move, False otherwise + :param can_write: True if the manipulator can move, False otherwise. :type can_write: bool - :param hours: The number of hours to allow the manipulator to move (0 = - forever) + :param hours: The number of hours to allow the manipulator to move (0 = forever). :type hours: float - :param sio: SocketIO object from server to emit reset event + :param sio: SocketIO object from server to emit reset event. :type sio: :class:`socketio.AsyncServer` :return: None """ @@ -262,17 +260,17 @@ def set_can_write(self, can_write: bool, hours: float, sio: socketio.AsyncServer self._reset_timer.start() def get_can_write(self) -> bool: - """Return if the manipulator can move + """Return if the manipulator can move. - :return: True if the manipulator can move, False otherwise + :return: True if the manipulator can move, False otherwise. :rtype: bool """ return self._can_write def reset_can_write(self, sio: socketio.AsyncServer) -> None: - """Reset the :attr:`can_write` flag + """Reset the :attr:`can_write` flag. - :param sio: SocketIO object from server to emit reset event + :param sio: SocketIO object from server to emit reset event. :type sio: :class:`socketio.AsyncServer` :return: None """ @@ -280,16 +278,16 @@ def reset_can_write(self, sio: socketio.AsyncServer) -> None: asyncio.run(sio.emit("write_disabled", self._id)) def set_inside_brain(self, inside: bool) -> None: - """Set if the manipulator is inside the brain + """Set if the manipulator is inside the brain. - :param inside: True if the manipulator is inside the brain, False otherwise + :param inside: True if the manipulator is inside the brain, False otherwise. :type inside: bool :return: None """ self._inside_brain = inside def stop(self) -> None: - """Stop all axes on manipulator + """Stop all axes on manipulator. :returns None """ diff --git a/src/ephys_link/platforms/new_scale_pathfinder_handler.py b/src/ephys_link/platforms/new_scale_pathfinder_handler.py index 408266c..7e6553a 100644 --- a/src/ephys_link/platforms/new_scale_pathfinder_handler.py +++ b/src/ephys_link/platforms/new_scale_pathfinder_handler.py @@ -7,7 +7,7 @@ from __future__ import annotations -from json import loads +import json from typing import TYPE_CHECKING from urllib import request @@ -87,21 +87,23 @@ def __init__(self, port: int = 8080) -> None: raise ValueError(msg) from e def query_data(self) -> dict: - """Query New Scale HTTP server for data and return as dict + """Query New Scale HTTP server for data and return as dict. - :return: dict of data (originally in JSON) + :return: Parsed JSON data from New Scale HTTP server. + :rtype: dict """ try: - return loads(request.urlopen(f"http://localhost:{self.port}").read()) + return json.loads(request.urlopen(f"http://localhost:{self.port}").read()) except Exception as e: print(f"[ERROR]\t\t Unable to query for New Scale data: {type(e)} {e}\n") def query_manipulator_data(self, manipulator_id: str) -> dict: - """Query New Scale HTTP server for data on a specific manipulator + """Query New Scale HTTP server for data on a specific manipulator. - :param manipulator_id: manipulator ID - :return: dict of data (originally in JSON) - :raises ValueError: if manipulator ID is not found in query + :param manipulator_id: manipulator ID. + :return: Parsed JSON data for a particular manipulator. + :rtype: dict + :raises ValueError: if manipulator ID is not found in query. """ data_query = self.query_data()["ProbeArray"] manipulator_data = data_query[self.manipulators[manipulator_id]] @@ -176,12 +178,6 @@ def _get_pos(self, manipulator_id: str) -> com.PositionalOutputData: ) def _get_angles(self, manipulator_id: str) -> com.AngularOutputData: - """Get the current angles of the manipulator in degrees - - :param manipulator_id: manipulator ID - :return: Callback parameters (angles in (yaw, pitch, roll) (or an empty array on - error) in degrees, error message) - """ manipulator_data = self.query_manipulator_data(manipulator_id) return com.AngularOutputData( @@ -194,11 +190,6 @@ def _get_angles(self, manipulator_id: str) -> com.AngularOutputData: ) def _get_shank_count(self, manipulator_id: str) -> com.ShankCountOutputData: - """Get the number of shanks on the probe - - :param manipulator_id: manipulator ID - :return: Callback parameters (number of shanks (or -1 on error), error message) - """ for probe in self.query_data()["ProbeArray"]: if probe["Id"] == manipulator_id: return com.ShankCountOutputData(probe["ShankCount"], "") @@ -206,16 +197,16 @@ def _get_shank_count(self, manipulator_id: str) -> com.ShankCountOutputData: return com.ShankCountOutputData(-1, "Unable to find manipulator") async def _goto_pos(self, manipulator_id: str, position: list[float], speed: int) -> com.PositionalOutputData: - pass + raise NotImplementedError async def _drive_to_depth(self, manipulator_id: str, depth: float, speed: int) -> com.DriveToDepthOutputData: - pass + raise NotImplementedError def _set_inside_brain(self, manipulator_id: str, inside: bool) -> com.StateOutputData: - pass + raise NotImplementedError async def _calibrate(self, manipulator_id: str, sio: socketio.AsyncServer) -> str: - pass + raise NotImplementedError def _bypass_calibration(self, manipulator_id: str) -> str: return "" @@ -227,10 +218,10 @@ def _set_can_write( hours: float, sio: socketio.AsyncServer, ) -> com.StateOutputData: - pass + raise NotImplementedError def _unified_space_to_platform_space(self, unified_position: list[float]) -> list[float]: - pass + raise NotImplementedError def _platform_space_to_unified_space(self, platform_position: list[float]) -> list[float]: - pass + raise NotImplementedError diff --git a/src/ephys_link/platforms/sensapex_handler.py b/src/ephys_link/platforms/sensapex_handler.py index 3b5fd16..c803eb6 100644 --- a/src/ephys_link/platforms/sensapex_handler.py +++ b/src/ephys_link/platforms/sensapex_handler.py @@ -1,10 +1,13 @@ """Handle communications with Sensapex uMp API +This supports the uMp-4 manipulator. Any Sensapex variants should extend this class. + Implements Sensapex uMp specific API calls including coordinating the usage of the :class:`ephys_link.platforms.sensapex_manipulator.SensapexManipulator` class. This is a subclass of :class:`ephys_link.platform_handler.PlatformHandler`. """ + from __future__ import annotations from pathlib import Path @@ -21,9 +24,9 @@ class SensapexHandler(PlatformHandler): - """Handler for Sensapex platform""" + """Handler for Sensapex platform.""" - def __init__(self): + def __init__(self) -> None: super().__init__() # Establish connection to Sensapex API (exit if connection fails) diff --git a/src/ephys_link/platforms/sensapex_manipulator.py b/src/ephys_link/platforms/sensapex_manipulator.py index 0b2f53c..2cab038 100644 --- a/src/ephys_link/platforms/sensapex_manipulator.py +++ b/src/ephys_link/platforms/sensapex_manipulator.py @@ -42,10 +42,9 @@ def __init__(self, device: SensapexDevice) -> None: # Device functions def get_pos(self) -> com.PositionalOutputData: - """Get the current position of the manipulator and convert it into mm + """Get the current position of the manipulator and convert it into mm. - :return: Callback parameters (position in (x, y, z, w) (or an empty array on - error) in mm, error message) + :return: Position in (x, y, z, w) (or an empty array on error) in mm and error message (if any). :rtype: :class:`ephys_link.common.PositionalOutputData` """ try: @@ -58,14 +57,13 @@ def get_pos(self) -> com.PositionalOutputData: return com.PositionalOutputData([], "Error getting position") async def goto_pos(self, position: list[float], speed: float) -> com.PositionalOutputData: - """Move manipulator to position + """Move manipulator to position. :param position: The position to move to in mm :type position: list[float] :param speed: The speed to move at (in mm/s) :type speed: float - :return: Callback parameters (position in (x, y, z, w) (or an empty array on - error), error message) + :return: Resulting position in (x, y, z, w) (or an empty array on error) in mm and error message (if any). :rtype: :class:`ephys_link.common.PositionalOutputData` """ # Check if able to write @@ -114,14 +112,14 @@ async def goto_pos(self, position: list[float], speed: float) -> com.PositionalO print(f"{e}\n") return com.PositionalOutputData([], "Error moving manipulator") - async def drive_to_depth(self, depth: float, speed: int) -> com.DriveToDepthOutputData: - """Drive the manipulator to a certain depth + async def drive_to_depth(self, depth: float, speed: float) -> com.DriveToDepthOutputData: + """Drive the manipulator to a certain depth. - :param depth: The depth to drive to in mm + :param depth: The depth to drive to in mm. :type depth: float :param speed: The speed to drive at in mm/s - :type speed: int - :return: Callback parameters (depth (or 0 on error), error message) + :type speed: float + :return: Resulting depth in mm (or 0 on error) and error message (if any). :rtype: :class:`ephys_link.common.DriveToDepthOutputData` """ # Get position before this movement @@ -138,33 +136,32 @@ async def drive_to_depth(self, depth: float, speed: int) -> com.DriveToDepthOutp return com.DriveToDepthOutputData(0, "Error driving " "manipulator") def set_inside_brain(self, inside: bool) -> None: - """Set if the manipulator is inside the brain + """Set if the manipulator is inside the brain. Used to signal that the brain should move at :const:`INSIDE_BRAIN_SPEED_LIMIT` - :param inside: True if the manipulator is inside the brain, False otherwise + :param inside: True if the manipulator is inside the brain, False otherwise. :type inside: bool :return: None """ self._inside_brain = inside def get_can_write(self) -> bool: - """Return if the manipulator can move + """Return if the manipulator can move. - :return: True if the manipulator can move, False otherwise + :return: True if the manipulator can move, False otherwise. :rtype: bool """ return self._can_write def set_can_write(self, can_write: bool, hours: float, sio: socketio.AsyncServer) -> None: - """Set if the manipulator can move + """Set if the manipulator can move. - :param can_write: True if the manipulator can move, False otherwise + :param can_write: True if the manipulator can move, False otherwise. :type can_write: bool - :param hours: The number of hours to allow the manipulator to move (0 = - forever) + :param hours: The number of hours to allow the manipulator to move (0 = forever). :type hours: float - :param sio: SocketIO object from server to emit reset event + :param sio: SocketIO object from server to emit reset event. :type sio: :class:`socketio.AsyncServer` :return: None """ @@ -177,9 +174,9 @@ def set_can_write(self, can_write: bool, hours: float, sio: socketio.AsyncServer self._reset_timer.start() def reset_can_write(self, sio: socketio.AsyncServer) -> None: - """Reset the :attr:`can_write` flag + """Reset the :attr:`can_write` flag. - :param sio: SocketIO object from server to emit reset event + :param sio: SocketIO object from server to emit reset event. :type sio: :class:`socketio.AsyncServer` :return: None """ @@ -188,7 +185,7 @@ def reset_can_write(self, sio: socketio.AsyncServer) -> None: # Calibration def call_calibrate(self) -> None: - """Calibrate the manipulator + """Calibrate the manipulator. :return: None """ @@ -197,13 +194,13 @@ def call_calibrate(self) -> None: def get_calibrated(self) -> bool: """Return the calibration state of the manipulator. - :return: True if the manipulator is calibrated, False otherwise + :return: True if the manipulator is calibrated, False otherwise. :rtype: bool """ return self._calibrated def set_calibrated(self) -> None: - """Set the manipulator to calibrated + """Set the manipulator to be calibrated. :return: None """ diff --git a/src/ephys_link/platforms/ump3_handler.py b/src/ephys_link/platforms/ump3_handler.py index cf4d07a..7da5e6c 100644 --- a/src/ephys_link/platforms/ump3_handler.py +++ b/src/ephys_link/platforms/ump3_handler.py @@ -1,35 +1,24 @@ -from __future__ import annotations +"""Handle communications with Sensapex uMp-3 API -from pathlib import Path -from typing import TYPE_CHECKING +Uses the Sensapex uMp handler which implements for the uMp-4 manipulator +and extends it to support the uMp-3 manipulator. -from sensapex import UMP, UMError +This is a subclass of :class:`ephys_link.platforms.sensapex_handler`. +""" -from ephys_link import common as com -from ephys_link.platform_handler import PlatformHandler -from ephys_link.platforms.ump3_manipulator import UMP3Manipulator +from __future__ import annotations -if TYPE_CHECKING: - import socketio +from ephys_link.platforms.sensapex_handler import SensapexHandler +from ephys_link.platforms.ump3_manipulator import UMP3Manipulator -class UMP3Handler(PlatformHandler): +class UMP3Handler(SensapexHandler): def __init__(self): super().__init__() self.num_axes = 3 self.dimensions = [20, 20, 20] - # Establish connection to Sensapex API (exit if connection fails) - UMP.set_library_path(str(Path(__file__).parent.parent.absolute()) + "/resources/") - self.ump = UMP.get_ump() - if self.ump is None: - msg = "Unable to connect to uMp" - raise ValueError(msg) - - def _get_manipulators(self) -> list: - return list(map(str, self.ump.list_devices())) - def _register_manipulator(self, manipulator_id: str) -> None: if not manipulator_id.isnumeric(): msg = "Manipulator ID must be numeric" @@ -37,79 +26,6 @@ def _register_manipulator(self, manipulator_id: str) -> None: self.manipulators[manipulator_id] = UMP3Manipulator(self.ump.get_device(int(manipulator_id))) - def _unregister_manipulator(self, manipulator_id: str) -> None: - del self.manipulators[manipulator_id] - - def _get_pos(self, manipulator_id: str) -> com.PositionalOutputData: - return self.manipulators[manipulator_id].get_pos() - - def _get_angles(self, manipulator_id: str) -> com.AngularOutputData: - raise NotImplementedError - - def _get_shank_count(self, manipulator_id: str) -> com.ShankCountOutputData: - raise NotImplementedError - - async def _goto_pos(self, manipulator_id: str, position: list[float], speed: int) -> com.PositionalOutputData: - return await self.manipulators[manipulator_id].goto_pos(position, speed) - - async def _drive_to_depth(self, manipulator_id: str, depth: float, speed: int) -> com.DriveToDepthOutputData: - return await self.manipulators[manipulator_id].drive_to_depth(depth, speed) - - def _set_inside_brain(self, manipulator_id: str, inside: bool) -> com.StateOutputData: - self.manipulators[manipulator_id].set_inside_brain(inside) - com.dprint(f"[SUCCESS]\t Set inside brain state for manipulator:" f" {manipulator_id}\n") - return com.StateOutputData(inside, "") - - async def _calibrate(self, manipulator_id: str, sio: socketio.AsyncServer) -> str: - try: - # Move manipulator to max position - await self.manipulators[manipulator_id].goto_pos([20000, 20000, 20000], 2000) - - # Call calibrate - self.manipulators[manipulator_id].call_calibrate() - - # Wait for calibration to complete - still_working = True - while still_working: - cur_pos = self.manipulators[manipulator_id].get_pos()["position"] - - # Check difference between current and target position - for prev, cur in zip([10000, 10000, 10000], cur_pos): - if abs(prev - cur) > 1: - still_working = True - break - still_working = False - - # Sleep for a bit - await sio.sleep(0.5) - - # Calibration complete - self.manipulators[manipulator_id].set_calibrated() - com.dprint(f"[SUCCESS]\t Calibrated manipulator {manipulator_id}\n") - except UMError as e: - # SDK call error - print(f"[ERROR]\t\t Calling calibrate manipulator {manipulator_id}") - print(f"{e}\n") - return "Error calling calibrate" - else: - return "" - - def _bypass_calibration(self, manipulator_id: str) -> str: - self.manipulators[manipulator_id].set_calibrated() - com.dprint(f"[SUCCESS]\t Bypassed calibration for manipulator" f" {manipulator_id}\n") - return "" - - def _set_can_write( - self, - manipulator_id: str, - can_write: bool, - hours: float, - sio: socketio.AsyncServer, - ) -> com.StateOutputData: - self.manipulators[manipulator_id].set_can_write(can_write, hours, sio) - com.dprint(f"[SUCCESS]\t Set can_write state for manipulator" f" {manipulator_id}\n") - return com.StateOutputData(can_write, "") - def _platform_space_to_unified_space(self, platform_position: list[float]) -> list[float]: # unified <- platform # +x <- +y diff --git a/src/ephys_link/platforms/ump3_manipulator.py b/src/ephys_link/platforms/ump3_manipulator.py index b5a6be6..ccc5e6f 100644 --- a/src/ephys_link/platforms/ump3_manipulator.py +++ b/src/ephys_link/platforms/ump3_manipulator.py @@ -1,30 +1,25 @@ -"""Sensapex Manipulator class +"""Sensapex uMp-3 Manipulator class -Handles logic for calling Sensapex API functions. Also includes extra logic for safe -function calls, error handling, managing per-manipulator attributes, and returning the -appropriate callback parameters like in :mod:`ephys_link.sensapex_handler`. +Extends from :class:`ephys_link.platforms.sensapex_manipulator.SensapexManipulator` to support the uMp-3 manipulator. """ from __future__ import annotations import asyncio -import threading from typing import TYPE_CHECKING import ephys_link.common as com from ephys_link.platform_manipulator import ( - HOURS_TO_SECONDS, MM_TO_UM, POSITION_POLL_DELAY, - PlatformManipulator, ) +from ephys_link.platforms.sensapex_manipulator import SensapexManipulator if TYPE_CHECKING: - import socketio from sensapex import SensapexDevice -class UMP3Manipulator(PlatformManipulator): +class UMP3Manipulator(SensapexManipulator): """Representation of a single Sensapex manipulator :param device: A Sensapex device @@ -36,16 +31,13 @@ def __init__(self, device: SensapexDevice) -> None: :param device: A Sensapex device """ - super().__init__() - self._device = device - self._id = device.dev_id + super().__init__(device) # Device functions def get_pos(self) -> com.PositionalOutputData: - """Get the current position of the manipulator and convert it into mm + """Get the current position of the manipulator and convert it into mm. - :return: Callback parameters (position in (x, y, z, w) (or an empty array on - error) in mm, error message) + :return: Position in (x, y, z, x) (or an empty array on error) in mm and error message (if any). :rtype: :class:`ephys_link.common.PositionalOutputData` """ try: @@ -62,14 +54,13 @@ def get_pos(self) -> com.PositionalOutputData: return com.PositionalOutputData([], "Error getting position") async def goto_pos(self, position: list[float], speed: float) -> com.PositionalOutputData: - """Move manipulator to position + """Move manipulator to position. - :param position: The position to move to in mm + :param position: The position to move to in mm. :type position: list[float] - :param speed: The speed to move at (in mm/s) + :param speed: The speed to move at (in mm/s). :type speed: float - :return: Callback parameters (position in (x, y, z, w) (or an empty array on - error), error message) + :return: Resulting position in (x, y, z, x) (or an empty array on error) in mm and error message (if any). :rtype: :class:`ephys_link.common.PositionalOutputData` """ # Check if able to write @@ -118,14 +109,14 @@ async def goto_pos(self, position: list[float], speed: float) -> com.PositionalO print(f"{e}\n") return com.PositionalOutputData([], "Error moving manipulator") - async def drive_to_depth(self, depth: float, speed: int) -> com.DriveToDepthOutputData: - """Drive the manipulator to a certain depth + async def drive_to_depth(self, depth: float, speed: float) -> com.DriveToDepthOutputData: + """Drive the manipulator to a certain depth. - :param depth: The depth to drive to in mm + :param depth: The depth to drive to in mm. :type depth: float - :param speed: The speed to drive at in mm/s - :type speed: int - :return: Callback parameters (depth (or 0 on error), error message) + :param speed: The speed to drive at in mm/s. + :type speed: float + :return: Resulting depth in mm (or 0 on error) and error message (if any). :rtype: :class:`ephys_link.common.DriveToDepthOutputData` """ # Get position before this movement @@ -140,84 +131,3 @@ async def drive_to_depth(self, depth: float, speed: int) -> com.DriveToDepthOutp # Return 0 and error message on failure return com.DriveToDepthOutputData(0, "Error driving " "manipulator") - - def set_inside_brain(self, inside: bool) -> None: - """Set if the manipulator is inside the brain - - Used to signal that the brain should move at :const:`INSIDE_BRAIN_SPEED_LIMIT` - - :param inside: True if the manipulator is inside the brain, False otherwise - :type inside: bool - :return: None - """ - self._inside_brain = inside - - def get_can_write(self) -> bool: - """Return if the manipulator can move - - :return: True if the manipulator can move, False otherwise - :rtype: bool - """ - return self._can_write - - def set_can_write(self, can_write: bool, hours: float, sio: socketio.AsyncServer) -> None: - """Set if the manipulator can move - - :param can_write: True if the manipulator can move, False otherwise - :type can_write: bool - :param hours: The number of hours to allow the manipulator to move (0 = - forever) - :type hours: float - :param sio: SocketIO object from server to emit reset event - :type sio: :class:`socketio.AsyncServer` - :return: None - """ - self._can_write = can_write - - if can_write and hours > 0: - if self._reset_timer: - self._reset_timer.cancel() - self._reset_timer = threading.Timer(hours * HOURS_TO_SECONDS, self.reset_can_write, [sio]) - self._reset_timer.start() - - def reset_can_write(self, sio: socketio.AsyncServer) -> None: - """Reset the :attr:`can_write` flag - - :param sio: SocketIO object from server to emit reset event - :type sio: :class:`socketio.AsyncServer` - :return: None - """ - self._can_write = False - asyncio.run(sio.emit("write_disabled", self._id)) - - # Calibration - def call_calibrate(self) -> None: - """Calibrate the manipulator - - :return: None - """ - self._device.calibrate_zero_position() - - def get_calibrated(self) -> bool: - """Return the calibration state of the manipulator. - - :return: True if the manipulator is calibrated, False otherwise - :rtype: bool - """ - return self._calibrated - - def set_calibrated(self) -> None: - """Set the manipulator to calibrated - - :return: None - """ - self._calibrated = True - - def stop(self) -> None: - """Stop the manipulator - - :return: None - """ - self._can_write = False - self._device.stop() - com.dprint(f"[SUCCESS]\t Stopped manipulator {self._id}") diff --git a/src/ephys_link/server.py b/src/ephys_link/server.py index 7877511..2dfd966 100644 --- a/src/ephys_link/server.py +++ b/src/ephys_link/server.py @@ -41,7 +41,7 @@ def __init__(self): # Is the server running? self.is_running = False - # Current platform handler + # Current platform handler (defaults to Sensapex) self.platform: PlatformHandler = importlib.import_module( "ephys_link.platforms.sensapex_handler" ).SensapexHandler() @@ -69,13 +69,13 @@ def __init__(self): self.sio.on("*", self.catch_all) async def connect(self, sid, _, __) -> bool: - """Acknowledge connection to the server + """Acknowledge connection to the server. - :param sid: Socket session ID + :param sid: Socket session ID. :type sid: str - :param _: WSGI formatted dictionary with request info (unused) + :param _: WSGI formatted dictionary with request info (unused). :type _: dict - :param __: Authentication details (unused) + :param __: Authentication details (unused). :type __: dict :return: False on error to refuse connection. True otherwise. :rtype: bool @@ -91,9 +91,9 @@ async def connect(self, sid, _, __) -> bool: return False async def disconnect(self, sid) -> None: - """Acknowledge disconnection from the server + """Acknowledge disconnection from the server. - :param sid: Socket session ID + :param sid: Socket session ID. :type sid: str :return: None """ @@ -106,21 +106,21 @@ async def disconnect(self, sid) -> None: @staticmethod async def get_version(_) -> str: - """Get the version number of the server + """Get the version number of the server. - :param _: Socket session ID (unused) + :param _: Socket session ID (unused). :type _: str - :return: Version number as defined in __version__ + :return: Version number as defined in :mod:`ephys_link.__about__`. :rtype: str """ return version async def get_manipulators(self, _) -> str: - """Get the list of discoverable manipulators + """Get the list of discoverable manipulators. - :param _: Socket session ID (unused) + :param _: Socket session ID (unused). :type _: str - :return: Callback as :class:`ephys_link.common.GetManipulatorsOutputData` + :return: :class:`ephys_link.common.GetManipulatorsOutputData` as JSON formatted string. :rtype: str """ com.dprint("[EVENT]\t\t Get discoverable manipulators") @@ -128,13 +128,13 @@ async def get_manipulators(self, _) -> str: return self.platform.get_manipulators().json() async def register_manipulator(self, _, manipulator_id: str) -> str: - """Register a manipulator with the server + """Register a manipulator with the server. - :param _: Socket session ID (unused) + :param _: Socket session ID (unused). :type _: str - :param manipulator_id: ID of the manipulator to register + :param manipulator_id: ID of the manipulator to register. :type manipulator_id: str - :return: Error message (on error) + :return: Error message on error, empty string otherwise. :rtype: str """ com.dprint(f"[EVENT]\t\t Register manipulator: {manipulator_id}") @@ -142,13 +142,13 @@ async def register_manipulator(self, _, manipulator_id: str) -> str: return self.platform.register_manipulator(manipulator_id) async def unregister_manipulator(self, _, manipulator_id: str) -> str: - """Unregister a manipulator from the server + """Unregister a manipulator from the server. :param _: Socket session ID (unused) :type _: str - :param manipulator_id: ID of the manipulator to unregister + :param manipulator_id: ID of the manipulator to unregister. :type manipulator_id: str - :return: Error message (on error) + :return: Error message on error, empty string otherwise. :rtype: str """ com.dprint(f"[EVENT]\t\t Unregister manipulator: {manipulator_id}") @@ -156,13 +156,13 @@ async def unregister_manipulator(self, _, manipulator_id: str) -> str: return self.platform.unregister_manipulator(manipulator_id) async def get_pos(self, _, manipulator_id: str) -> str: - """Position of manipulator request + """Position of manipulator request. - :param _: Socket session ID (unused) + :param _: Socket session ID (unused). :type _: str - :param manipulator_id: ID of manipulator to pull position from + :param manipulator_id: ID of manipulator to pull position from. :type manipulator_id: str - :return: Callback as :class:`ephys_link.common.PositionalOutputData` + :return: :class:`ephys_link.common.PositionalOutputData` as JSON formatted string. :rtype: str """ # com.dprint(f"[EVENT]\t\t Get position of manipulator" f" {manipulator_id}") @@ -170,39 +170,39 @@ async def get_pos(self, _, manipulator_id: str) -> str: return self.platform.get_pos(manipulator_id).json() async def get_angles(self, _, manipulator_id: str): - """Angles of manipulator request + """Angles of manipulator request. - :param _: Socket session ID (unused) + :param _: Socket session ID (unused). :type _: str - :param manipulator_id: ID of manipulator to pull angles from + :param manipulator_id: ID of manipulator to pull angles from. :type manipulator_id: str - :return: Callback as :class:`ephys_link.common.AngularOutputData` + :return: :class:`ephys_link.common.AngularOutputData` as JSON formatted string. :rtype: str """ return self.platform.get_angles(manipulator_id).json() async def get_shank_count(self, _, manipulator_id: str) -> str: - """Number of shanks of manipulator request + """Number of shanks of manipulator request. - :param _: Socket session ID (unused) + :param _: Socket session ID (unused). :type _: str - :param manipulator_id: ID of manipulator to pull number of shanks from + :param manipulator_id: ID of manipulator to pull number of shanks from. :type manipulator_id: str - :return: Callback as :class:`ephys_link.common.ShankCountOutputData` + :return: :class:`ephys_link.common.ShankCountOutputData` as JSON formatted string. :rtype: str """ return self.platform.get_shank_count(manipulator_id).json() async def goto_pos(self, _, data: str) -> str: - """Move manipulator to position + """Move manipulator to position. - :param _: Socket session ID (unused) + :param _: Socket session ID (unused). :type _: str - :param data: Data containing manipulator ID, position in mm, and speed in mm/s - :type data: :class:`ephys_link.common.GotoPositionInputDataFormat` - :return: Callback as :class:`ephys_link.common.PositionalOutputData` + :param data: :class:`ephys_link.common.GotoPositionInputDataFormat` as JSON formatted string. + :type data: str + :return: :class:`ephys_link.common.PositionalOutputData` as JSON formatted string. :rtype: str """ try: @@ -222,13 +222,13 @@ async def goto_pos(self, _, data: str) -> str: return goto_result.json() async def drive_to_depth(self, _, data: str) -> str: - """Drive to depth + """Drive to depth. - :param _: Socket session ID (unused) + :param _: Socket session ID (unused). :type _: str - :param data: Data containing manipulator ID, depth in mm, and speed in mm/s - :type data: :class:`ephys_link.common.DriveToDepthInputDataFormat` - :return: Callback as :class:`ephys_link.common.DriveToDepthOutputData` + :param data: :class:`ephys_link.common.DriveToDepthInputDataFormat` as JSON formatted string. + :type data: str + :return: :class:`ephys_link.common.DriveToDepthOutputData` as JSON formatted string. :rtype: str """ try: @@ -248,13 +248,13 @@ async def drive_to_depth(self, _, data: str) -> str: return drive_result.json() async def set_inside_brain(self, _, data: str) -> str: - """Set the inside brain state + """Set the inside brain state. - :param _: Socket session ID (unused) + :param _: Socket session ID (unused). :type _: str - :param data: Data containing manipulator ID and inside brain state - :type data: :class:`ephys_link.common.InsideBrainInputDataFormat` - :return: Callback as :class:`ephys_link.common.StateOutputData`: + :param data: :class:`ephys_link.common.InsideBrainInputDataFormat` as JSON formatted string. + :type data: str + :return: :class:`ephys_link.common.StateOutputData` as JSON formatted string. :rtype: str """ try: @@ -268,17 +268,17 @@ async def set_inside_brain(self, _, data: str) -> str: print(f"[ERROR]\t\t Error in inside_brain: {e}\n") return com.StateOutputData(False, "Error in set_inside_brain").json() else: - com.dprint(f"[EVENT]\t\t Set manipulator {manipulator_id} inside brain to {"true" if inside else "false"}") + com.dprint(f"[EVENT]\t\t Set manipulator {manipulator_id} inside brain to {inside}") return self.platform.set_inside_brain(manipulator_id, inside).json() async def calibrate(self, _, manipulator_id: str) -> str: - """Calibrate manipulator + """Calibrate manipulator. - :param _: Socket session ID (unused) + :param _: Socket session ID (unused). :type _: str - :param manipulator_id: ID of manipulator to calibrate + :param manipulator_id: ID of manipulator to calibrate. :type manipulator_id: str - :return: Error message (on error) + :return: Error message on error, empty string otherwise. :rtype: str """ com.dprint(f"[EVENT]\t\t Calibrate manipulator" f" {manipulator_id}") @@ -286,13 +286,13 @@ async def calibrate(self, _, manipulator_id: str) -> str: return await self.platform.calibrate(manipulator_id, self.sio) async def bypass_calibration(self, _, manipulator_id: str) -> str: - """Bypass calibration of manipulator + """Bypass calibration of manipulator. - :param _: Socket session ID (unused) + :param _: Socket session ID (unused). :type _: str - :param manipulator_id: ID of manipulator to bypass calibration + :param manipulator_id: ID of manipulator to bypass calibration. :type manipulator_id: str - :return: Error message (on error) + :return: Error message on error, empty string otherwise. :rtype: str """ com.dprint(f"[EVENT]\t\t Bypass calibration of manipulator" f" {manipulator_id}") @@ -300,13 +300,13 @@ async def bypass_calibration(self, _, manipulator_id: str) -> str: return self.platform.bypass_calibration(manipulator_id) async def set_can_write(self, _, data: str) -> str: - """Set manipulator can_write state + """Set manipulator can_write state. :param _: Socket session ID (unused) :type _: str - :param data: Data containing manipulator ID and can_write brain state - :type data: :class:`ephys_link.common.CanWriteInputDataFormat` - :return: Callback as :class:`ephys_link.common.StateOutputData` + :param data: :class:`ephys_link.common.CanWriteInputDataFormat` as JSON formatted string. + :type data: str + :return: :class:`ephys_link.common.StateOutputData` as JSON formatted string. :rtype: str """ try: @@ -321,17 +321,15 @@ async def set_can_write(self, _, data: str) -> str: print(f"[ERROR]\t\t Error in inside_brain: {e}\n") return com.StateOutputData(False, "Error in set_can_write").json() else: - com.dprint( - f"[EVENT]\t\t Set manipulator {manipulator_id} can_write state to {"true" if can_write else "false"}" - ) + com.dprint(f"[EVENT]\t\t Set manipulator {manipulator_id} can_write state to {can_write}") return self.platform.set_can_write(manipulator_id, can_write, hours, self.sio).json() def stop(self, _) -> bool: - """Stop all manipulators + """Stop all manipulators. - :param _: Socket session ID (unused) + :param _: Socket session ID (unused). :type _: str - :return: True if successful, False otherwise + :return: True if successful, False otherwise. :rtype: bool """ com.dprint("[EVENT]\t\t Stop all manipulators") @@ -340,28 +338,28 @@ def stop(self, _) -> bool: @staticmethod async def catch_all(_, __, data: Any) -> str: - """Catch all event + """Catch all event. - :param _: Socket session ID (unused) + :param _: Socket session ID (unused). :type _: str - :param __: Client ID (unused) + :param __: Client ID (unused). :type __: str - :param data: Data received from client + :param data: Data received from client. :type data: Any - :return: "UNKNOWN_EVENT" response message + :return: "UNKNOWN_EVENT" response message. :rtype: str """ print(f"[UNKNOWN EVENT]:\t {data}") return "UNKNOWN_EVENT" def launch_server(self, platform_type: str, server_port: int, pathfinder_port: int) -> None: - """Launch the server + """Launch the server. - :param platform_type: Parsed argument for platform type + :param platform_type: Parsed argument for platform type. :type platform_type: str - :param server_port: HTTP port to serve the server + :param server_port: HTTP port to serve the server. :type server_port: int - :param pathfinder_port: Port New Scale Pathfinder's server is on + :param pathfinder_port: Port New Scale Pathfinder's server is on. :type pathfinder_port: int :return: None """ @@ -395,7 +393,7 @@ def launch_server(self, platform_type: str, server_port: int, pathfinder_port: i web.run_app(self.app, port=server_port) def close_server(self, _, __) -> None: - """Close the server""" + """Close the server.""" print("[INFO]\t\t Closing server") # Stop movement