Skip to content

Commit

Permalink
417 automatically pull bindings from bindings folder (#419)
Browse files Browse the repository at this point in the history
* Remove old new scale, add const for bindings

* Add platform info

* Move Pinpoint ID to server

* Migrated bindings

* Generate GUI dynamically

* Dynamic binding instancing

* Updated binding documentation
  • Loading branch information
kjy5 authored Jan 4, 2025
1 parent e4543d3 commit f79c1ec
Show file tree
Hide file tree
Showing 14 changed files with 189 additions and 106 deletions.
30 changes: 20 additions & 10 deletions docs/development/adding_a_manipulator.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ read [how the system works first](../home/how_it_works.md) before proceeding.
## Create a Manipulator Binding

Manipulators are added to Ephys Link through bindings. A binding is a Python class that extends the abstract base class
[`BaseBinding`][ephys_link.utils.base_binding] and defines the functions Ephys Link expects from a platform.
[`BaseBinding`][ephys_link.utils.base_binding] and defines the methods Ephys Link expects from a platform.

Create a new Python module in `src/ephys_link/bindings` for your manipulator. Make a class that extends
[`BaseBinding`][ephys_link.utils.base_binding]. Most IDEs will automatically import the necessary classes and tell you
Expand All @@ -23,7 +23,7 @@ descriptions of the expected behavior.
As described in the [system overview](../home/how_it_works.md), Ephys Link converts all manipulator movement into a
common "unified space" which is
the [left-hand cartesian coordinate system](https://www.scratchapixel.com/lessons/mathematics-physics-for-computer-graphics/geometry/coordinate-systems.html).
The two functions [
The two methods [
`platform_space_to_unified_space`](../../reference/ephys_link/utils/base_binding/#ephys_link.utils.base_binding.BaseBinding.platform_space_to_unified_space)
and [
`unified_space_to_platform_space`](../../reference/ephys_link/utils/base_binding/#ephys_link.utils.base_binding.BaseBinding.unified_space_to_platform_space)
Expand All @@ -37,19 +37,29 @@ are used to convert between your manipulator's coordinate system and the unified
the [New Scale Pathfinder MPM](https://github.com/VirtualBrainLab/ephys-link/blob/main/src/ephys_link/bindings/mpm_bindings.py)
binding for an example where the platform uses a REST API to an external provider.

## Register the Binding
### Binding Names

To make Ephys Link aware of your new binding, you'll need to register it in
`src/ephys_link/back_end/platform_handler.py`. In the function [
`_match_platform_type`](https://github.com/VirtualBrainLab/ephys-link/blob/c00be57bb552e5d0466b1cfebd0a54d555f12650/src/ephys_link/back_end/platform_handler.py#L69),
add a new `case` to the `match` statement that returns an instance of your binding when matched to the desired CLI name
for your platform. For example, to use Sensapex's uMp-4 the CLI launch command is `ephys_link.exe -b -t ump-4`,
therefore the matching case statement is `ump-4`.
The two naming methods [
`get_display_name`](../../reference/ephys_link/utils/base_binding/#ephys_link.utils.base_binding.BaseBinding.get_display_name)
and [
`get_cli_name`](../../reference/ephys_link/utils/base_binding/#ephys_link.utils.base_binding.BaseBinding.get_cli_name)
are used to identify the binding in the user interface. As described by their documentation, `get_display_name` should
return a human-readable name for the binding, while `get_cli_name` should return the name used to launch the binding
from the command line (what is passed to the `-t` flag). For example, Sensapex uMp-4 manipulator's `get_cli_name`
returns `ump-4` because the CLI launch command is `ephys_link.exe -b -t ump-4`.

### Custom Additional Arguments

Sometimes you may want to pass extra data to your binding on initialization. For example, New Scale Pathfinder MPM
bindings needs to know what the HTTP server port is. To add custom arguments, define them as arguments on the `__init__`
method of your binding then pass in the appropriate data when the binding is instantiated in the [
`_get_binding_instance`]() method of the [`PlatformHandler`][ephys_link.back_end.platform_handler]. Use [New Scale
Pathfinder MPM's binding][ephys_link.bindings.mpm_binding] as an example for how to do this.

## Test Your Binding

Once you've implemented your binding, you can test it by running Ephys Link using your binding
`ephys_link -b -t <your_manipulator>`. You can interact with it using the Socket.IO API or Pinpoint.
`ephys_link -b -t <cli_name>`. You can interact with it using the [Socket.IO API](socketio_api.md) or Pinpoint.

## Submit Your Changes

Expand Down
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ dependencies = [
"requests==2.32.3",
"sensapex==1.400.3",
"rich==13.9.4",
"vbl-aquarium==1.0.0b1"
"vbl-aquarium==1.0.0b3"
]

[project.urls]
Expand Down Expand Up @@ -80,5 +80,8 @@ dependencies = [
serve = "mkdocs serve"
build = "mkdocs build"

[tool.ruff]
unsafe-fixes = true

[tool.ruff.lint]
extend-ignore = ["DTZ005"]
82 changes: 38 additions & 44 deletions src/ephys_link/back_end/platform_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
Responsible for performing the various manipulator commands.
Instantiates the appropriate bindings based on the platform type and uses them to perform the commands.
Usage: Instantiate PlatformHandler with the platform type and call the desired command.
Usage:
Instantiate PlatformHandler with the platform type and call the desired command.
"""

from uuid import uuid4
Expand All @@ -14,22 +15,18 @@
BooleanStateResponse,
EphysLinkOptions,
GetManipulatorsResponse,
PlatformInfo,
PositionalResponse,
SetDepthRequest,
SetDepthResponse,
SetInsideBrainRequest,
SetPositionRequest,
ShankCountResponse,
)
from vbl_aquarium.models.proxy import PinpointIdResponse
from vbl_aquarium.models.unity import Vector4

from ephys_link.__about__ import __version__
from ephys_link.bindings.fake_binding import FakeBinding
from ephys_link.bindings.mpm_binding import MPMBinding
from ephys_link.bindings.ump_4_binding import Ump4Binding
from ephys_link.utils.base_binding import BaseBinding
from ephys_link.utils.common import vector4_to_array
from ephys_link.utils.common import get_bindings, vector4_to_array
from ephys_link.utils.console import Console


Expand All @@ -50,83 +47,80 @@ def __init__(self, options: EphysLinkOptions, console: Console) -> None:
self._console = console

# Define bindings based on platform type.
self._bindings = self._match_platform_type(options)
self._bindings = self._get_binding_instance(options)

# Record which IDs are inside the brain.
self._inside_brain: set[str] = set()

# Generate a Pinpoint ID for proxy usage.
self._pinpoint_id = str(uuid4())[:8]

def _match_platform_type(self, options: EphysLinkOptions) -> BaseBinding:
def _get_binding_instance(self, options: EphysLinkOptions) -> BaseBinding:
"""Match the platform type to the appropriate bindings.
Args:
options: CLI options.
Raises:
ValueError: If the platform type is not recognized.
Returns:
Bindings for the specified platform type.
"""
match options.type:
case "ump-4":
return Ump4Binding()
case "pathfinder-mpm":
return MPMBinding(options.mpm_port)
case "fake":
return FakeBinding()
case _:
error_message = f'Platform type "{options.type}" not recognized.'
self._console.critical_print(error_message)
raise ValueError(error_message)

# Ephys Link metadata.

@staticmethod
def get_version() -> str:
"""Get Ephys Link's version.
for binding_type in get_bindings():
binding_cli_name = binding_type.get_cli_name()

Returns:
Ephys Link's version.
"""
return __version__
if binding_cli_name == options.type:
# Pass in HTTP port for Pathfinder MPM.
if binding_cli_name == "pathfinder-mpm":
return binding_type(options.mpm_port)

# Otherwise just return the binding.
return binding_type()

def get_pinpoint_id(self) -> PinpointIdResponse:
"""Get the Pinpoint ID for proxy usage.
# Raise an error if the platform type is not recognized.
error_message = f'Platform type "{options.type}" not recognized.'
self._console.critical_print(error_message)
raise ValueError(error_message)

# Platform metadata.

def get_display_name(self) -> str:
"""Get the display name for the platform.
Returns:
Pinpoint ID response.
Display name for the platform.
"""
return PinpointIdResponse(pinpoint_id=self._pinpoint_id, is_requester=False)
return self._bindings.get_display_name()

def get_platform_type(self) -> str:
async def get_platform_info(self) -> PlatformInfo:
"""Get the manipulator platform type connected to Ephys Link.
Returns:
Platform type config identifier (see CLI options for examples).
"""
return str(self._options.type)
return PlatformInfo(
name=self._bindings.get_display_name(),
cli_name=self._bindings.get_cli_name(),
axes_count=await self._bindings.get_axes_count(),
dimensions=await self._bindings.get_dimensions(),
)

# Manipulator commands.

async def get_manipulators(self) -> GetManipulatorsResponse:
"""Get a list of available manipulators on the current handler.
Returns:
List of manipulator IDs, number of axes, dimensions of manipulators (mm), and an error message if any.
List of manipulator IDs or an error message if any.
"""
try:
manipulators = await self._bindings.get_manipulators()
num_axes = await self._bindings.get_axes_count()
dimensions = self._bindings.get_dimensions()
except Exception as e:
self._console.exception_error_print("Get Manipulators", e)
return GetManipulatorsResponse(error=self._console.pretty_exception(e))
else:
return GetManipulatorsResponse(
manipulators=manipulators,
num_axes=num_axes,
dimensions=dimensions,
)
return GetManipulatorsResponse(manipulators=manipulators)

async def get_position(self, manipulator_id: str) -> PositionalResponse:
"""Get the current translation position of a manipulator in unified coordinates (mm).
Expand Down
32 changes: 26 additions & 6 deletions src/ephys_link/back_end/server.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,22 @@
"""Socket.IO Server.
Responsible to managing the Socket.IO connection and events.
Directs events to the platform handler or handles them directly.
Usage:
Instantiate Server with the appropriate options, platform handler, and console.
Then call `launch()` to start the server.
```python
Server(options, platform_handler, console).launch()
```
"""

from asyncio import get_event_loop, run
from collections.abc import Callable, Coroutine
from json import JSONDecodeError, dumps, loads
from typing import Any
from uuid import uuid4

from aiohttp.web import Application, run_app
from pydantic import ValidationError
Expand All @@ -12,8 +27,10 @@
SetInsideBrainRequest,
SetPositionRequest,
)
from vbl_aquarium.models.proxy import PinpointIdResponse
from vbl_aquarium.utils.vbl_base_model import VBLBaseModel

from ephys_link.__about__ import __version__
from ephys_link.back_end.platform_handler import PlatformHandler
from ephys_link.utils.common import PORT, check_for_updates, server_preamble
from ephys_link.utils.console import Console
Expand Down Expand Up @@ -47,6 +64,9 @@ def __init__(self, options: EphysLinkOptions, platform_handler: PlatformHandler,
# Store connected client.
self._client_sid: str = ""

# Generate Pinpoint ID for proxy usage.
self._pinpoint_id = str(uuid4())[:8]

# Bind events.
self._sio.on("*", self.platform_event_handler)

Expand All @@ -63,15 +83,15 @@ def launch(self) -> None:
check_for_updates()

# List platform and available manipulators.
self._console.info_print("PLATFORM", self._platform_handler.get_platform_type())
self._console.info_print("PLATFORM", self._platform_handler.get_display_name())
self._console.info_print(
"MANIPULATORS",
str(get_event_loop().run_until_complete(self._platform_handler.get_manipulators()).manipulators),
)

# Launch server
if self._options.use_proxy:
self._console.info_print("PINPOINT ID", self._platform_handler.get_pinpoint_id().pinpoint_id)
self._console.info_print("PINPOINT ID", self._pinpoint_id)

async def connect_proxy() -> None:
# noinspection HttpUrlsUsage
Expand Down Expand Up @@ -204,11 +224,11 @@ async def platform_event_handler(self, event: str, *args: tuple[Any]) -> str:
match event:
# Server metadata.
case "get_version":
return self._platform_handler.get_version()
return __version__
case "get_pinpoint_id":
return str(self._platform_handler.get_pinpoint_id().to_json_string())
case "get_platform_type":
return self._platform_handler.get_platform_type()
return PinpointIdResponse(pinpoint_id=self._pinpoint_id, is_requester=False).to_json_string()
case "get_platform_info":
return (await self._platform_handler.get_platform_info()).to_json_string()

# Manipulator commands.
case "get_manipulators":
Expand Down
11 changes: 10 additions & 1 deletion src/ephys_link/bindings/fake_binding.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@


class FakeBinding(BaseBinding):
def __init__(self) -> None:
def __init__(self, *args, **kwargs) -> None:
"""Initialize fake manipulator infos."""

super().__init__(*args, **kwargs)
self._positions = [Vector4() for _ in range(8)]
self._angles = [
Vector3(x=90, y=60, z=0),
Expand All @@ -20,6 +21,14 @@ def __init__(self) -> None:
Vector3(x=-135, y=30, z=0),
]

@staticmethod
def get_display_name() -> str:
return "Fake Manipulator"

@staticmethod
def get_cli_name() -> str:
return "fake"

async def get_manipulators(self) -> list[str]:
return list(map(str, range(8)))

Expand Down
13 changes: 11 additions & 2 deletions src/ephys_link/bindings/mpm_binding.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,22 +72,31 @@ class MPMBinding(BaseBinding):
COARSE_SPEED_THRESHOLD = 0.1
INSERTION_SPEED_LIMIT = 9_000

def __init__(self, port: int) -> None:
def __init__(self, port: int = 8080, *args, **kwargs) -> None:
"""Initialize connection to MPM HTTP server.
Args:
port: Port number for MPM HTTP server.
"""
super().__init__(*args, **kwargs)
self._url = f"http://localhost:{port}"
self._movement_stopped = False

@staticmethod
def get_display_name() -> str:
return "Pathfinder MPM Control v2.8.8+"

@staticmethod
def get_cli_name() -> str:
return "pathfinder-mpm"

async def get_manipulators(self) -> list[str]:
return [manipulator["Id"] for manipulator in (await self._query_data())["ProbeArray"]]

async def get_axes_count(self) -> int:
return 3

def get_dimensions(self) -> Vector4:
async def get_dimensions(self) -> Vector4:
return Vector4(x=15, y=15, z=15, w=15)

async def get_position(self, manipulator_id: str) -> Vector4:
Expand Down
Loading

0 comments on commit f79c1ec

Please sign in to comment.