From a9c8927f5d499bbf03eaa334c5fe8c743852d82d Mon Sep 17 00:00:00 2001
From: Kenneth Yang <82800265+kjy5@users.noreply.github.com>
Date: Thu, 14 Dec 2023 16:16:47 -0800
Subject: [PATCH] 267 change communication to use pure json strings (#274)
* Update python version in env
* Converted server into a class
* Fixed server class
* Respond with JSON strings
* Fix documentation
* Make catch all return "UNKNOWN_EVENT"
* Default configure for sensapex
* Read and parse input as strings
---
.idea/codeStyles/Project.xml | 4 +-
.idea/codeStyles/codeStyleConfig.xml | 1 -
.idea/misc.xml | 3 +
pyproject.toml | 1 +
src/ephys_link/__main__.py | 9 +-
src/ephys_link/common.py | 25 +
src/ephys_link/gui.py | 1 -
src/ephys_link/platform_handler.py | 2 +-
src/ephys_link/server.py | 783 +++++++++++++--------------
9 files changed, 418 insertions(+), 411 deletions(-)
diff --git a/.idea/codeStyles/Project.xml b/.idea/codeStyles/Project.xml
index a4cb319..2096fd2 100644
--- a/.idea/codeStyles/Project.xml
+++ b/.idea/codeStyles/Project.xml
@@ -1,8 +1,8 @@
-
-
+
+
\ No newline at end of file
diff --git a/.idea/codeStyles/codeStyleConfig.xml b/.idea/codeStyles/codeStyleConfig.xml
index 6e6eec1..79ee123 100644
--- a/.idea/codeStyles/codeStyleConfig.xml
+++ b/.idea/codeStyles/codeStyleConfig.xml
@@ -1,6 +1,5 @@
-
\ No newline at end of file
diff --git a/.idea/misc.xml b/.idea/misc.xml
index ed0007e..7b1598c 100644
--- a/.idea/misc.xml
+++ b/.idea/misc.xml
@@ -4,4 +4,7 @@
+
+
+
\ No newline at end of file
diff --git a/pyproject.toml b/pyproject.toml
index bbde077..1d46b10 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -55,6 +55,7 @@ path = "src/ephys_link/__about__.py"
exclude = ["/.github", "/.idea"]
[tool.hatch.envs.default]
+python = "3.12"
dependencies = [
"coverage[toml]>=6.5",
"pytest",
diff --git a/src/ephys_link/__main__.py b/src/ephys_link/__main__.py
index 92568b7..11f2c3a 100644
--- a/src/ephys_link/__main__.py
+++ b/src/ephys_link/__main__.py
@@ -1,20 +1,23 @@
import argparse
import signal
import time
-from importlib import metadata
from threading import Event, Thread
import serial
import serial.tools.list_ports as ports
from ephys_link import common as com
-from ephys_link import server
+from ephys_link.__about__ import __version__ as version
+from ephys_link.server import Server
# Setup Arduino serial port (emergency stop)
poll_rate = 0.05
kill_serial_event = Event()
poll_serial_thread: Thread
+# Create Server
+server = Server()
+
def poll_serial(kill_event: Event, serial_port: str) -> None:
"""Continuously poll serial port for data
@@ -100,7 +103,7 @@ def close_serial(_, __) -> None:
"-v",
"--version",
action="version",
- version=f"Electrophysiology Manipulator Link v{metadata.version('ephys_link')}",
+ version=f"Electrophysiology Manipulator Link v{version}",
help="Print version and exit",
)
diff --git a/src/ephys_link/common.py b/src/ephys_link/common.py
index a9e358d..ae4ee4d 100644
--- a/src/ephys_link/common.py
+++ b/src/ephys_link/common.py
@@ -6,6 +6,7 @@
from __future__ import annotations
+import json
from typing import TypedDict
# Debugging flag
@@ -81,6 +82,10 @@ def __init__(self, manipulators: list, num_axes: int, dimensions: list, error: s
error=error,
)
+ def json(self) -> str:
+ """Return JSON string"""
+ return json.dumps(self)
+
class PositionalOutputData(dict):
"""Output format for (position, error)
@@ -98,6 +103,10 @@ def __init__(self, position: list, error: str) -> None:
"""Constructor"""
super().__init__(position=position, error=error)
+ def json(self) -> str:
+ """Return JSON string"""
+ return json.dumps(self)
+
class AngularOutputData(dict):
"""Output format for (angles, error)
@@ -112,6 +121,10 @@ def __init__(self, angles: list, error: str) -> None:
"""Constructor"""
super().__init__(angles=angles, error=error)
+ def json(self) -> str:
+ """Return JSON string"""
+ return json.dumps(self)
+
class ShankCountOutputData(dict):
"""Output format for (num_shanks, error)
@@ -126,6 +139,10 @@ def __init__(self, shank_count: int, error: str) -> None:
"""Constructor"""
super().__init__(shank_count=shank_count, error=error)
+ def json(self) -> str:
+ """Return JSON string"""
+ return json.dumps(self)
+
class DriveToDepthOutputData(dict):
"""Output format for depth driving (depth, error)
@@ -142,6 +159,10 @@ def __init__(self, depth: float, error: str) -> None:
"""Create drive to depth output data dictionary"""
super().__init__(depth=depth, error=error)
+ def json(self) -> str:
+ """Return JSON string"""
+ return json.dumps(self)
+
class StateOutputData(dict):
"""Output format for (state, error)
@@ -157,3 +178,7 @@ class StateOutputData(dict):
def __init__(self, state: bool, error: str) -> None:
"""Create state output data dictionary"""
super().__init__(state=state, error=error)
+
+ def json(self) -> str:
+ """Return JSON string"""
+ return json.dumps(self)
diff --git a/src/ephys_link/gui.py b/src/ephys_link/gui.py
index f2064b9..b0868e6 100644
--- a/src/ephys_link/gui.py
+++ b/src/ephys_link/gui.py
@@ -1,4 +1,3 @@
-# pylama:skip=1
import socket
from argparse import Namespace
from threading import Event, Thread
diff --git a/src/ephys_link/platform_handler.py b/src/ephys_link/platform_handler.py
index 3ff208a..cfbeb9b 100644
--- a/src/ephys_link/platform_handler.py
+++ b/src/ephys_link/platform_handler.py
@@ -98,7 +98,6 @@ def register_manipulator(self, manipulator_id: str) -> str:
try:
# Register manipulator
self._register_manipulator(manipulator_id)
- com.dprint(f"[SUCCESS]\t Registered manipulator: {manipulator_id}\n")
except ValueError as ve:
# Manipulator not found in UMP
print(f"[ERROR]\t\t Manipulator not found: {manipulator_id}: {ve}\n")
@@ -109,6 +108,7 @@ def register_manipulator(self, manipulator_id: str) -> str:
print(f"{type(e)}: {e}\n")
return "Error registering manipulator"
else:
+ com.dprint(f"[SUCCESS]\t Registered manipulator: {manipulator_id}\n")
return ""
def unregister_manipulator(self, manipulator_id: str) -> str:
diff --git a/src/ephys_link/server.py b/src/ephys_link/server.py
index e571e08..7877511 100644
--- a/src/ephys_link/server.py
+++ b/src/ephys_link/server.py
@@ -10,9 +10,9 @@
"""
import importlib
+import json
import sys
-from importlib import metadata
-from typing import Any
+from typing import TYPE_CHECKING, Any
import socketio
from aiohttp import web
@@ -20,409 +20,386 @@
from pythonnet import load
from ephys_link import common as com
-from ephys_link.platform_handler import PlatformHandler
+from ephys_link.__about__ import __version__ as version
+
+if TYPE_CHECKING:
+ from ephys_link.platform_handler import PlatformHandler
# Setup server
load("netfx")
-sio = socketio.AsyncServer()
-app = web.Application()
-sio.attach(app)
-is_connected = False
-
-# Declare platform handler
-platform: PlatformHandler
-
-# Is the server running
-is_running = False
-
-
-# Handle connection events
-
-
-@sio.event
-async def connect(sid, _, __) -> bool:
- """Acknowledge connection to the server
-
- :param sid: Socket session ID
- :type sid: str
- :param _: WSGI formatted dictionary with request info (unused)
- :type _: dict
- :param __: Authentication details (unused)
- :type __: dict
- :return: False on error to refuse connection. None otherwise.
- :rtype: bool
- """
- print(f"[CONNECTION REQUEST]:\t\t {sid}\n")
-
- global is_connected
- if not is_connected:
- print(f"[CONNECTION GRANTED]:\t\t {sid}\n")
- is_connected = True
- return True
-
- print(f"[CONNECTION DENIED]:\t\t {sid}: another client is already connected\n")
- return False
-
-
-@sio.event
-async def disconnect(sid) -> None:
- """Acknowledge disconnection from the server
-
- :param sid: Socket session ID
- :type sid: str
- :return: None
- """
- print(f"[DISCONNECTION]:\t {sid}\n")
-
- platform.reset()
- global is_connected
- is_connected = False
-
-
-# Events
-
-
-@sio.event
-async def get_version(_) -> str:
- """Get the version number of the server
-
- :param _: Socket session ID (unused)
- :type _: str
- :return: Version number as defined in __version__
- :rtype: str
- """
- return metadata.version("ephys_link")
-
-
-@sio.event
-async def get_manipulators(_) -> com.GetManipulatorsOutputData:
- """Get the list of discoverable manipulators
-
- :param _: Socket session ID (unused)
- :type _: str
- :return: Callback parameters (manipulators, error message)
- :rtype: :class:`ephys_link.common.GetManipulatorsOutputData`
- """
- com.dprint("[EVENT]\t\t Get discoverable manipulators")
-
- return platform.get_manipulators()
-
-
-@sio.event
-async def register_manipulator(_, manipulator_id: str) -> str:
- """Register a manipulator with the server
-
- :param _: Socket session ID (unused)
- :type _: str
- :param manipulator_id: ID of the manipulator to register
- :type manipulator_id: str
- :return: Callback parameter (Error message (on error))
- :rtype: str
- """
- com.dprint(f"[EVENT]\t\t Register manipulator: {manipulator_id}")
-
- return platform.register_manipulator(manipulator_id)
-
-
-@sio.event
-async def unregister_manipulator(_, manipulator_id: str) -> str:
- """Unregister a manipulator from the server
-
- :param _: Socket session ID (unused)
- :type _: str
- :param manipulator_id: ID of the manipulator to unregister
- :type manipulator_id: str
- :return: Callback parameter (Error message (on error))
- :rtype: str
- """
- com.dprint(f"[EVENT]\t\t Unregister manipulator: {manipulator_id}")
-
- return platform.unregister_manipulator(manipulator_id)
-
-
-@sio.event
-async def get_pos(_, manipulator_id: str) -> com.PositionalOutputData:
- """Position of manipulator request
-
- :param _: Socket session ID (unused)
- :type _: str
- :param manipulator_id: ID of manipulator to pull position from
- :type manipulator_id: str
- :return: Callback parameters (manipulator ID, position in (x, y, z, w) (or an empty
- array on error) in mm, error message)
- :rtype: :class:`ephys_link.common.PositionalOutputData`
- """
- # com.dprint(f"[EVENT]\t\t Get position of manipulator" f" {manipulator_id}")
-
- return platform.get_pos(manipulator_id)
-
-
-@sio.event
-async def get_angles(_, manipulator_id: str) -> com.AngularOutputData:
- """Angles of manipulator request
-
- :param _: Socket session ID (unused)
- :type _: str
- :param manipulator_id: ID of manipulator to pull angles from
- :type manipulator_id: str
- :return: Callback parameters (manipulator ID, angles in (yaw, pitch, roll) (or an empty
- array on error) in degrees, error message)
- :rtype: :class:`ephys_link.common.AngularOutputData`
- """
-
- return platform.get_angles(manipulator_id)
-
-
-@sio.event
-async def get_shank_count(_, manipulator_id: str) -> com.ShankCountOutputData:
- """Number of shanks of manipulator request
-
- :param _: Socket session ID (unused)
- :type _: str
- :param manipulator_id: ID of manipulator to pull number of shanks from
- :type manipulator_id: str
- :return: Callback parameters (manipulator ID, number of shanks (or -1 on error), error
- message)
- :rtype: :class:`ephys_link.common.ShankCountOutputData`
- """
-
- return platform.get_shank_count(manipulator_id)
-
-
-@sio.event
-async def goto_pos(_, data: com.GotoPositionInputDataFormat) -> com.PositionalOutputData:
- """Move manipulator to position
-
- :param _: Socket session ID (unused)
- :type _: str
- :param data: Data containing manipulator ID, position in mm, and speed in mm/s
- :type data: :class:`ephys_link.common.GotoPositionInputDataFormat`
- :return: Callback parameters (manipulator ID, position in (x, y, z, w) (or an empty
- tuple on error) in mm, error message)
- :rtype: :class:`ephys_link.common.PositionalOutputData`
- """
- try:
- manipulator_id = data["manipulator_id"]
- pos = data["pos"]
- speed = data["speed"]
-
- except KeyError:
- manipulator_id = data["manipulator_id"] if "manipulator_id" in data else -1
- print(f"[ERROR]\t\t Invalid data for manipulator {manipulator_id}\n")
- return com.PositionalOutputData([], "Invalid data format")
-
- except Exception as e:
- print(f"[ERROR]\t\t Error in goto_pos: {e}\n")
- return com.PositionalOutputData([], "Error in goto_pos")
-
- com.dprint(f"[EVENT]\t\t Move manipulator {manipulator_id} " f"to position {pos}")
-
- return await platform.goto_pos(manipulator_id, pos, speed)
-
-
-@sio.event
-async def drive_to_depth(_, data: com.DriveToDepthInputDataFormat) -> com.DriveToDepthOutputData:
- """Drive to depth
-
- :param _: Socket session ID (unused)
- :type _: str
- :param data: Data containing manipulator ID, depth in mm, and speed in mm/s
- :type data: :class:`ephys_link.common.DriveToDepthInputDataFormat`
- :return: Callback parameters (manipulator ID, depth (or -1 on error) in mm, error message
- )
- :rtype: :class:`ephys_link.common.DriveToDepthOutputData`
- """
- try:
- manipulator_id = data["manipulator_id"]
- depth = data["depth"]
- speed = data["speed"]
-
- except KeyError:
- manipulator_id = data["manipulator_id"] if "manipulator_id" in data else -1
- print(f"[ERROR]\t\t Invalid data for manipulator {manipulator_id}\n")
- return com.DriveToDepthOutputData(-1, "Invalid data " "format")
-
- except Exception as e:
- print(f"[ERROR]\t\t Error in drive_to_depth: {e}\n")
- return com.DriveToDepthOutputData(-1, "Error in drive_to_depth")
-
- com.dprint(f"[EVENT]\t\t Drive manipulator {manipulator_id} " f"to depth {depth}")
-
- return await platform.drive_to_depth(manipulator_id, depth, speed)
-
-
-@sio.event
-async def set_inside_brain(_, data: com.InsideBrainInputDataFormat) -> com.StateOutputData:
- """Set the inside brain state
-
- :param _: Socket session ID (unused)
- :type _: str
- :param data: Data containing manipulator ID and inside brain state
- :type data: :class:`ephys_link.common.InsideBrainInputDataFormat`
- :return: Callback parameters (manipulator ID, inside, error message)
- :rtype: :class:`ephys_link.common.StateOutputData`
- """
- try:
- manipulator_id = data["manipulator_id"]
- inside = data["inside"]
-
- except KeyError:
- manipulator_id = data["manipulator_id"] if "manipulator_id" in data else -1
- print(f"[ERROR]\t\t Invalid data for manipulator {manipulator_id}\n")
- return com.StateOutputData(False, "Invalid data format")
- except Exception as e:
- print(f"[ERROR]\t\t Error in inside_brain: {e}\n")
- return com.StateOutputData(False, "Error in set_inside_brain")
-
- com.dprint(f"[EVENT]\t\t Set manipulator {manipulator_id} inside brain to " f'{"true" if inside else "false"}')
-
- return platform.set_inside_brain(manipulator_id, inside)
-
-
-@sio.event
-async def calibrate(_, manipulator_id: str) -> str:
- """Calibrate manipulator
-
- :param _: Socket session ID (unused)
- :type _: str
- :param manipulator_id: ID of manipulator to calibrate
- :type manipulator_id: str
- :return: Callback parameters (manipulator ID, error message)
- :rtype: str
- """
- com.dprint(f"[EVENT]\t\t Calibrate manipulator" f" {manipulator_id}")
-
- return await platform.calibrate(manipulator_id, sio)
-
-
-@sio.event
-async def bypass_calibration(_, manipulator_id: str) -> str:
- """Bypass calibration of manipulator
-
- :param _: Socket session ID (unused)
- :type _: str
- :param manipulator_id: ID of manipulator to bypass calibration
- :type manipulator_id: str
- :return: Callback parameters (manipulator ID, error message)
- :rtype: str
- """
- com.dprint(f"[EVENT]\t\t Bypass calibration of manipulator" f" {manipulator_id}")
-
- return platform.bypass_calibration(manipulator_id)
-
-
-@sio.event
-async def set_can_write(_, data: com.CanWriteInputDataFormat) -> com.StateOutputData:
- """Set manipulator can_write state
-
- :param _: Socket session ID (unused)
- :type _: str
- :param data: Data containing manipulator ID and can_write brain state
- :type data: :class:`ephys_link.common.CanWriteInputDataFormat`
- :return: Callback parameters (manipulator ID, can_write, error message)
- :rtype: :class:`ephys_link.common.StateOutputData`
- """
- try:
- manipulator_id = data["manipulator_id"]
- can_write = data["can_write"]
- hours = data["hours"]
-
- except KeyError:
- manipulator_id = data["manipulator_id"] if "manipulator_id" in data else -1
- print(f"[ERROR]\t\t Invalid data for manipulator {manipulator_id}\n")
- return com.StateOutputData(False, "Invalid data " "format")
-
- except Exception as e:
- print(f"[ERROR]\t\t Error in inside_brain: {e}\n")
- return com.StateOutputData(False, "Error in set_can_write")
-
- com.dprint(
- f"[EVENT]\t\t Set manipulator {manipulator_id} can_write state to " f'{"true" if can_write else "false"}'
- )
-
- return platform.set_can_write(manipulator_id, can_write, hours, sio)
-
-
-@sio.event
-def stop(_) -> bool:
- """Stop all manipulators
-
- :param _: Socket session ID (unused)
- :type _: str
- :return: True if successful, False otherwise
- :rtype: bool
- """
- com.dprint("[EVENT]\t\t Stop all manipulators")
-
- return platform.stop()
-
-
-@sio.on("*")
-async def catch_all(_, __, data: Any) -> None:
- """Catch all event
-
- :param _: Socket session ID (unused)
- :type _: str
- :param __: Client ID (unused)
- :type __: str
- :param data: Data received from client
- :type data: Any
- :return: None
- """
- print(f"[UNKNOWN EVENT]:\t {data}")
-
-
-# Handle server start and end
-def launch_server(platform_type: str, server_port: int, pathfinder_port: int) -> None:
- """Launch the server
-
- :param platform_type: Parsed argument for platform type
- :type platform_type: str
- :param server_port: HTTP port to serve the server
- :type server_port: int
- :param pathfinder_port: Port New Scale Pathfinder's server is on
- :type pathfinder_port: int
- :return: None
- """
-
- # Import correct manipulator handler
- global platform
- if platform_type == "sensapex":
- platform = importlib.import_module("ephys_link.platforms.sensapex_handler").SensapexHandler()
- elif platform_type == "ump3":
- platform = importlib.import_module("ephys_link.platforms.ump3_handler").UMP3Handler()
- elif platform_type == "new_scale":
- platform = importlib.import_module("ephys_link.platforms.new_scale_handler").NewScaleHandler()
- elif platform_type == "new_scale_pathfinder":
- platform = importlib.import_module(
- "ephys_link.platforms.new_scale_pathfinder_handler"
- ).NewScalePathfinderHandler(pathfinder_port)
- else:
- sys.exit(f"[ERROR]\t\t Invalid manipulator type: {platform_type}")
-
- # Preamble
- print(f"=== Ephys Link v{metadata.version('ephys_link')} ===")
-
- # List available manipulators
- print("Available Manipulators:")
- print(platform.get_manipulators()["manipulators"])
-
- print("\n(Shutdown server with CTRL+Pause/Break)\n")
-
- # Mark that server is running
- global is_running
- is_running = True
- web.run_app(app, port=server_port)
-
-
-def close_server(_, __) -> None:
- """Close the server"""
- print("[INFO]\t\t Closing server")
-
- # Stop movement
- platform.stop()
-
- # Exit
- raise GracefulExit
+class Server:
+ def __init__(self):
+ # Server and Socketio
+ self.sio = socketio.AsyncServer()
+ self.app = web.Application()
+
+ # Is there a client connected?
+ self.is_connected = False
+
+ # Is the server running?
+ self.is_running = False
+
+ # Current platform handler
+ self.platform: PlatformHandler = importlib.import_module(
+ "ephys_link.platforms.sensapex_handler"
+ ).SensapexHandler()
+
+ # Attach server to the web app
+ self.sio.attach(self.app)
+
+ # Declare events
+ self.sio.on("connect", self.connect)
+ self.sio.on("disconnect", self.disconnect)
+ self.sio.on("get_version", self.get_version)
+ self.sio.on("get_manipulators", self.get_manipulators)
+ self.sio.on("register_manipulator", self.register_manipulator)
+ self.sio.on("unregister_manipulator", self.unregister_manipulator)
+ self.sio.on("get_pos", self.get_pos)
+ self.sio.on("get_angles", self.get_angles)
+ self.sio.on("get_shank_count", self.get_shank_count)
+ self.sio.on("goto_pos", self.goto_pos)
+ self.sio.on("drive_to_depth", self.drive_to_depth)
+ self.sio.on("set_inside_brain", self.set_inside_brain)
+ self.sio.on("calibrate", self.calibrate)
+ self.sio.on("bypass_calibration", self.bypass_calibration)
+ self.sio.on("set_can_write", self.set_can_write)
+ self.sio.on("stop", self.stop)
+ self.sio.on("*", self.catch_all)
+
+ async def connect(self, sid, _, __) -> bool:
+ """Acknowledge connection to the server
+
+ :param sid: Socket session ID
+ :type sid: str
+ :param _: WSGI formatted dictionary with request info (unused)
+ :type _: dict
+ :param __: Authentication details (unused)
+ :type __: dict
+ :return: False on error to refuse connection. True otherwise.
+ :rtype: bool
+ """
+ print(f"[CONNECTION REQUEST]:\t\t {sid}\n")
+
+ if not self.is_connected:
+ print(f"[CONNECTION GRANTED]:\t\t {sid}\n")
+ self.is_connected = True
+ return True
+
+ print(f"[CONNECTION DENIED]:\t\t {sid}: another client is already connected\n")
+ return False
+
+ async def disconnect(self, sid) -> None:
+ """Acknowledge disconnection from the server
+
+ :param sid: Socket session ID
+ :type sid: str
+ :return: None
+ """
+ print(f"[DISCONNECTION]:\t {sid}\n")
+
+ self.platform.reset()
+ self.is_connected = False
+
+ # Events
+
+ @staticmethod
+ async def get_version(_) -> str:
+ """Get the version number of the server
+
+ :param _: Socket session ID (unused)
+ :type _: str
+ :return: Version number as defined in __version__
+ :rtype: str
+ """
+ return version
+
+ async def get_manipulators(self, _) -> str:
+ """Get the list of discoverable manipulators
+
+ :param _: Socket session ID (unused)
+ :type _: str
+ :return: Callback as :class:`ephys_link.common.GetManipulatorsOutputData`
+ :rtype: str
+ """
+ com.dprint("[EVENT]\t\t Get discoverable manipulators")
+
+ return self.platform.get_manipulators().json()
+
+ async def register_manipulator(self, _, manipulator_id: str) -> str:
+ """Register a manipulator with the server
+
+ :param _: Socket session ID (unused)
+ :type _: str
+ :param manipulator_id: ID of the manipulator to register
+ :type manipulator_id: str
+ :return: Error message (on error)
+ :rtype: str
+ """
+ com.dprint(f"[EVENT]\t\t Register manipulator: {manipulator_id}")
+
+ return self.platform.register_manipulator(manipulator_id)
+
+ async def unregister_manipulator(self, _, manipulator_id: str) -> str:
+ """Unregister a manipulator from the server
+
+ :param _: Socket session ID (unused)
+ :type _: str
+ :param manipulator_id: ID of the manipulator to unregister
+ :type manipulator_id: str
+ :return: Error message (on error)
+ :rtype: str
+ """
+ com.dprint(f"[EVENT]\t\t Unregister manipulator: {manipulator_id}")
+
+ return self.platform.unregister_manipulator(manipulator_id)
+
+ async def get_pos(self, _, manipulator_id: str) -> str:
+ """Position of manipulator request
+
+ :param _: Socket session ID (unused)
+ :type _: str
+ :param manipulator_id: ID of manipulator to pull position from
+ :type manipulator_id: str
+ :return: Callback as :class:`ephys_link.common.PositionalOutputData`
+ :rtype: str
+ """
+ # com.dprint(f"[EVENT]\t\t Get position of manipulator" f" {manipulator_id}")
+
+ return self.platform.get_pos(manipulator_id).json()
+
+ async def get_angles(self, _, manipulator_id: str):
+ """Angles of manipulator request
+
+ :param _: Socket session ID (unused)
+ :type _: str
+ :param manipulator_id: ID of manipulator to pull angles from
+ :type manipulator_id: str
+ :return: Callback as :class:`ephys_link.common.AngularOutputData`
+ :rtype: str
+ """
+
+ return self.platform.get_angles(manipulator_id).json()
+
+ async def get_shank_count(self, _, manipulator_id: str) -> str:
+ """Number of shanks of manipulator request
+
+ :param _: Socket session ID (unused)
+ :type _: str
+ :param manipulator_id: ID of manipulator to pull number of shanks from
+ :type manipulator_id: str
+ :return: Callback as :class:`ephys_link.common.ShankCountOutputData`
+ :rtype: str
+ """
+
+ return self.platform.get_shank_count(manipulator_id).json()
+
+ async def goto_pos(self, _, data: str) -> str:
+ """Move manipulator to position
+
+ :param _: Socket session ID (unused)
+ :type _: str
+ :param data: Data containing manipulator ID, position in mm, and speed in mm/s
+ :type data: :class:`ephys_link.common.GotoPositionInputDataFormat`
+ :return: Callback as :class:`ephys_link.common.PositionalOutputData`
+ :rtype: str
+ """
+ try:
+ parsed_data: com.GotoPositionInputDataFormat = json.loads(data)
+ manipulator_id = parsed_data["manipulator_id"]
+ pos = parsed_data["pos"]
+ speed = parsed_data["speed"]
+ except KeyError:
+ print(f"[ERROR]\t\t Invalid goto_pos data: {data}\n")
+ return com.PositionalOutputData([], "Invalid data format").json()
+ except Exception as e:
+ print(f"[ERROR]\t\t Error in goto_pos: {e}\n")
+ return com.PositionalOutputData([], "Error in goto_pos").json()
+ else:
+ com.dprint(f"[EVENT]\t\t Move manipulator {manipulator_id} " f"to position {pos}")
+ goto_result = await self.platform.goto_pos(manipulator_id, pos, speed)
+ return goto_result.json()
+
+ async def drive_to_depth(self, _, data: str) -> str:
+ """Drive to depth
+
+ :param _: Socket session ID (unused)
+ :type _: str
+ :param data: Data containing manipulator ID, depth in mm, and speed in mm/s
+ :type data: :class:`ephys_link.common.DriveToDepthInputDataFormat`
+ :return: Callback as :class:`ephys_link.common.DriveToDepthOutputData`
+ :rtype: str
+ """
+ try:
+ parsed_data: com.DriveToDepthInputDataFormat = json.loads(data)
+ manipulator_id = parsed_data["manipulator_id"]
+ depth = parsed_data["depth"]
+ speed = parsed_data["speed"]
+ except KeyError:
+ print(f"[ERROR]\t\t Invalid drive_to_depth data: {data}\n")
+ return com.DriveToDepthOutputData(-1, "Invalid data " "format").json()
+ except Exception as e:
+ print(f"[ERROR]\t\t Error in drive_to_depth: {e}\n")
+ return com.DriveToDepthOutputData(-1, "Error in drive_to_depth").json()
+ else:
+ com.dprint(f"[EVENT]\t\t Drive manipulator {manipulator_id} " f"to depth {depth}")
+ drive_result = await self.platform.drive_to_depth(manipulator_id, depth, speed)
+ return drive_result.json()
+
+ async def set_inside_brain(self, _, data: str) -> str:
+ """Set the inside brain state
+
+ :param _: Socket session ID (unused)
+ :type _: str
+ :param data: Data containing manipulator ID and inside brain state
+ :type data: :class:`ephys_link.common.InsideBrainInputDataFormat`
+ :return: Callback as :class:`ephys_link.common.StateOutputData`:
+ :rtype: str
+ """
+ try:
+ parsed_data: com.InsideBrainInputDataFormat = json.loads(data)
+ manipulator_id = parsed_data["manipulator_id"]
+ inside = parsed_data["inside"]
+ except KeyError:
+ print(f"[ERROR]\t\t Invalid set_inside_brain data: {data}\n")
+ return com.StateOutputData(False, "Invalid data format").json()
+ except Exception as e:
+ print(f"[ERROR]\t\t Error in inside_brain: {e}\n")
+ return com.StateOutputData(False, "Error in set_inside_brain").json()
+ else:
+ com.dprint(f"[EVENT]\t\t Set manipulator {manipulator_id} inside brain to {"true" if inside else "false"}")
+ return self.platform.set_inside_brain(manipulator_id, inside).json()
+
+ async def calibrate(self, _, manipulator_id: str) -> str:
+ """Calibrate manipulator
+
+ :param _: Socket session ID (unused)
+ :type _: str
+ :param manipulator_id: ID of manipulator to calibrate
+ :type manipulator_id: str
+ :return: Error message (on error)
+ :rtype: str
+ """
+ com.dprint(f"[EVENT]\t\t Calibrate manipulator" f" {manipulator_id}")
+
+ return await self.platform.calibrate(manipulator_id, self.sio)
+
+ async def bypass_calibration(self, _, manipulator_id: str) -> str:
+ """Bypass calibration of manipulator
+
+ :param _: Socket session ID (unused)
+ :type _: str
+ :param manipulator_id: ID of manipulator to bypass calibration
+ :type manipulator_id: str
+ :return: Error message (on error)
+ :rtype: str
+ """
+ com.dprint(f"[EVENT]\t\t Bypass calibration of manipulator" f" {manipulator_id}")
+
+ return self.platform.bypass_calibration(manipulator_id)
+
+ async def set_can_write(self, _, data: str) -> str:
+ """Set manipulator can_write state
+
+ :param _: Socket session ID (unused)
+ :type _: str
+ :param data: Data containing manipulator ID and can_write brain state
+ :type data: :class:`ephys_link.common.CanWriteInputDataFormat`
+ :return: Callback as :class:`ephys_link.common.StateOutputData`
+ :rtype: str
+ """
+ try:
+ parsed_data: com.CanWriteInputDataFormat = json.loads(data)
+ manipulator_id = parsed_data["manipulator_id"]
+ can_write = parsed_data["can_write"]
+ hours = parsed_data["hours"]
+ except KeyError:
+ print(f"[ERROR]\t\t Invalid set_can_write data: {data}\n")
+ return com.StateOutputData(False, "Invalid data " "format").json()
+ except Exception as e:
+ print(f"[ERROR]\t\t Error in inside_brain: {e}\n")
+ return com.StateOutputData(False, "Error in set_can_write").json()
+ else:
+ com.dprint(
+ f"[EVENT]\t\t Set manipulator {manipulator_id} can_write state to {"true" if can_write else "false"}"
+ )
+ return self.platform.set_can_write(manipulator_id, can_write, hours, self.sio).json()
+
+ def stop(self, _) -> bool:
+ """Stop all manipulators
+
+ :param _: Socket session ID (unused)
+ :type _: str
+ :return: True if successful, False otherwise
+ :rtype: bool
+ """
+ com.dprint("[EVENT]\t\t Stop all manipulators")
+
+ return self.platform.stop()
+
+ @staticmethod
+ async def catch_all(_, __, data: Any) -> str:
+ """Catch all event
+
+ :param _: Socket session ID (unused)
+ :type _: str
+ :param __: Client ID (unused)
+ :type __: str
+ :param data: Data received from client
+ :type data: Any
+ :return: "UNKNOWN_EVENT" response message
+ :rtype: str
+ """
+ print(f"[UNKNOWN EVENT]:\t {data}")
+ return "UNKNOWN_EVENT"
+
+ def launch_server(self, platform_type: str, server_port: int, pathfinder_port: int) -> None:
+ """Launch the server
+
+ :param platform_type: Parsed argument for platform type
+ :type platform_type: str
+ :param server_port: HTTP port to serve the server
+ :type server_port: int
+ :param pathfinder_port: Port New Scale Pathfinder's server is on
+ :type pathfinder_port: int
+ :return: None
+ """
+
+ # Import correct manipulator handler
+ if platform_type == "sensapex":
+ # Already imported (was the default)
+ pass
+ elif platform_type == "ump3":
+ self.platform = importlib.import_module("ephys_link.platforms.ump3_handler").UMP3Handler()
+ elif platform_type == "new_scale":
+ self.platform = importlib.import_module("ephys_link.platforms.new_scale_handler").NewScaleHandler()
+ elif platform_type == "new_scale_pathfinder":
+ self.platform = importlib.import_module(
+ "ephys_link.platforms.new_scale_pathfinder_handler"
+ ).NewScalePathfinderHandler(pathfinder_port)
+ else:
+ sys.exit(f"[ERROR]\t\t Invalid manipulator type: {platform_type}")
+
+ # Preamble
+ print(f"=== Ephys Link v{version} ===")
+
+ # List available manipulators
+ print("Available Manipulators:")
+ print(self.platform.get_manipulators()["manipulators"])
+
+ print("\n(Shutdown server with CTRL+Pause/Break)\n")
+
+ # Mark that server is running
+ self.is_running = True
+ web.run_app(self.app, port=server_port)
+
+ def close_server(self, _, __) -> None:
+ """Close the server"""
+ print("[INFO]\t\t Closing server")
+
+ # Stop movement
+ self.platform.stop()
+
+ # Exit
+ raise GracefulExit