Skip to content

Commit

Permalink
Mock subprocess in runner tests
Browse files Browse the repository at this point in the history
- Mock subprocess in runner tests
- Remove ability of runner to invoke function in the same process, this was previously only used in tests
- Expose _rpc function as import_and_run_function, write separate tests for it as it was no longer behind the mock
  • Loading branch information
callumforrester committed Nov 29, 2024
1 parent 879ccee commit ce35c84
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 85 deletions.
44 changes: 15 additions & 29 deletions src/blueapi/service/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,19 @@ class WorkerDispatcher:

_config: ApplicationConfig
_subprocess: PoolClass | None
_use_subprocess: bool
_state: EnvironmentResponse

def __init__(
self,
config: ApplicationConfig | None = None,
use_subprocess: bool = True,
subprocess_factory: Callable[[], PoolClass] | None = None,
) -> None:
def default_subprocess_factory():
return Pool(initializer=_init_worker, processes=1)

self._config = config or ApplicationConfig()
self._subprocess = None
self._use_subprocess = use_subprocess
self._subprocess_factory = subprocess_factory or default_subprocess_factory
self._state = EnvironmentResponse(
initialized=False,
)
Expand All @@ -68,12 +70,9 @@ def reload(self):

@start_as_current_span(TRACER)
def start(self):
add_span_attributes(
{"_use_subprocess": self._use_subprocess, "_config": str(self._config)}
)
add_span_attributes({"_use_subprocess": True, "_config": str(self._config)})
try:
if self._use_subprocess:
self._subprocess = Pool(initializer=_init_worker, processes=1)
self._subprocess = self._subprocess_factory()
self.run(setup, self._config)
self._state = EnvironmentResponse(initialized=True)
except Exception as e:
Expand Down Expand Up @@ -107,40 +106,27 @@ def run(
function: Callable[P, T],
*args: P.args,
**kwargs: P.kwargs,
) -> T:
"""Calls the supplied function, which is modified to accept a dict as it's new
first param, before being passed to the subprocess runner, or just run in place.
"""
add_span_attributes({"use_subprocess": self._use_subprocess})
if self._use_subprocess:
return self._run_in_subprocess(function, *args, **kwargs)
else:
return function(*args, **kwargs)

@start_as_current_span(TRACER, "function", "args", "kwargs")
def _run_in_subprocess(
self,
function: Callable[P, T],
*args: P.args,
**kwargs: P.kwargs,
) -> T:
"""Call the supplied function, passing the current Span ID, if one
exists,from the observability context inro the _rpc caller function.
exists,from the observability context inro the import_and_run_function
caller function.
When this is deserialized in and run by the subprocess, this will allow
its functions to use the corresponding span as their parent span."""

add_span_attributes({"use_subprocess": True})

if self._subprocess is None:
raise InvalidRunnerStateError("Subprocess runner has not been started")
if not (hasattr(function, "__name__") and hasattr(function, "__module__")):
raise RpcError(f"{function} is anonymous, cannot be run in subprocess")
if not callable(function):
raise RpcError(f"{function} is not Callable, cannot be run in subprocess")
try:
return_type = inspect.signature(function).return_annotation
except TypeError:
return_type = None

return self._subprocess.apply(
_rpc,
import_and_run_function,
(
function.__module__,
function.__name__,
Expand All @@ -164,7 +150,7 @@ def __init__(self, message):
class RpcError(Exception): ...


def _rpc(
def import_and_run_function(
module_name: str,
function_name: str,
expected_type: type[T] | None,
Expand Down
117 changes: 61 additions & 56 deletions tests/unit_tests/service/test_runner.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from multiprocessing.pool import Pool as PoolClass
from typing import Any, Generic, TypeVar
from unittest import mock
from unittest.mock import MagicMock, patch
from unittest.mock import MagicMock, Mock, patch

import pytest
from observability_utils.tracing import (
Expand All @@ -16,17 +16,19 @@
InvalidRunnerStateError,
RpcError,
WorkerDispatcher,
import_and_run_function,
)


@pytest.fixture
def local_runner():
return WorkerDispatcher(use_subprocess=False)
def mock_subprocess() -> Mock:
subprocess = Mock(spec=PoolClass)
return subprocess


@pytest.fixture
def runner():
return WorkerDispatcher()
def runner(mock_subprocess: Mock):
return WorkerDispatcher(subprocess_factory=lambda: mock_subprocess)


@pytest.fixture
Expand All @@ -36,13 +38,22 @@ def started_runner(runner: WorkerDispatcher):
runner.stop()


def test_initialize(runner: WorkerDispatcher):
def test_initialize(runner: WorkerDispatcher, mock_subprocess: Mock):
mock_subprocess.apply.return_value = None

assert runner.state.error_message is None
assert not runner.state.initialized
runner.start()

assert runner.state.error_message is None
assert runner.state.initialized

# Run a single call to the runner for coverage of dispatch to subprocess
assert runner.run(interface.get_worker_state)
mock_subprocess.apply.return_value = 123
assert runner.run(interface.get_worker_state) == 123
runner.stop()

assert runner.state.error_message is None
assert not runner.state.initialized


Expand All @@ -59,22 +70,20 @@ def test_raises_if_used_before_started(runner: WorkerDispatcher):
runner.run(interface.get_plans)


def test_error_on_runner_setup(local_runner: WorkerDispatcher):
def test_error_on_runner_setup(runner: WorkerDispatcher, mock_subprocess: Mock):
error_message = "Intentional start_worker exception"
expected_state = EnvironmentResponse(
initialized=False,
error_message="Intentional start_worker exception",
error_message=error_message,
)
mock_subprocess.apply.side_effect = Exception(error_message)

with mock.patch(
"blueapi.service.runner.setup",
side_effect=Exception("Intentional start_worker exception"),
):
# Calling reload here instead of start also indirectly
# tests that stop() doesn't raise if there is no error message
# and the runner is not yet initialised
local_runner.reload()
state = local_runner.state
assert state == expected_state
# Calling reload here instead of start also indirectly
# tests that stop() doesn't raise if there is no error message
# and the runner is not yet initialised
runner.reload()
state = runner.state
assert state == expected_state


def start_worker_mock():
Expand All @@ -99,7 +108,7 @@ def test_can_reload_after_an_error(pool_mock: MagicMock):

another_mock.apply.side_effect = subprocess_calls_return_values

runner = WorkerDispatcher(use_subprocess=True)
runner = WorkerDispatcher()
runner.start()

assert runner.state == EnvironmentResponse(
Expand All @@ -111,55 +120,51 @@ def test_can_reload_after_an_error(pool_mock: MagicMock):
assert runner.state == EnvironmentResponse(initialized=True, error_message=None)


def test_function_not_findable_on_subprocess(started_runner: WorkerDispatcher):
from tests.unit_tests.core.fake_device_module import fake_motor_y

# Valid target on main but not sub process
# Change in this process not reflected in subprocess
fake_motor_y.__name__ = "not_exported"
def test_clear_message_for_anonymous_function(started_runner: WorkerDispatcher):
non_fetchable_callable = MagicMock()

with pytest.raises(
RpcError, match="not_exported: No such function in subprocess API"
RpcError,
match="<MagicMock id='[0-9]+'> is anonymous, cannot be run in subprocess",
):
started_runner.run(fake_motor_y)
started_runner.run(non_fetchable_callable)


def test_non_callable_excepts_in_main_process(started_runner: WorkerDispatcher):
# Not a valid target on main or sub process
from tests.unit_tests.core.fake_device_module import fetchable_non_callable

with pytest.raises(
RpcError,
match="<NonCallableMock id='[0-9]+'> is not Callable, "
+ "cannot be run in subprocess",
):
started_runner.run(fetchable_non_callable)
def test_function_not_findable_on_subprocess():
with pytest.raises(RpcError, match="unknown: No such function in subprocess API"):
import_and_run_function("blueapi", "unknown", None, {})


def test_non_callable_excepts_in_sub_process(started_runner: WorkerDispatcher):
# Valid target on main but finds non-callable in sub process
from tests.unit_tests.core.fake_device_module import (
fetchable_callable,
fetchable_non_callable,
)
def test_module_not_findable_on_subprocess():
with pytest.raises(ModuleNotFoundError):
import_and_run_function("unknown", "unknown", None, {})

fetchable_callable.__name__ = fetchable_non_callable.__name__

with pytest.raises(
RpcError,
match="fetchable_non_callable: Object in subprocess is not a function",
):
started_runner.run(fetchable_callable)
def run_rpc_function(
func: Callable[..., Any],
expected_type: type[Any],
*args: Any,
**kwargs: Any,
) -> Any:
import_and_run_function(
func.__module__,
func.__name__,
expected_type,
{},
*args,
**kwargs,
)


def test_clear_message_for_anonymous_function(started_runner: WorkerDispatcher):
non_fetchable_callable = MagicMock()
def test_non_callable_excepts(started_runner: WorkerDispatcher):
# Not a valid target on main or sub process
from tests.unit_tests.core.fake_device_module import fetchable_non_callable

with pytest.raises(
RpcError,
match="<MagicMock id='[0-9]+'> is anonymous, cannot be run in subprocess",
match="fetchable_non_callable: Object in subprocess is not a function",
):
started_runner.run(non_fetchable_callable)
run_rpc_function(fetchable_non_callable, Mock)


def test_clear_message_for_wrong_return(started_runner: WorkerDispatcher):
Expand All @@ -169,7 +174,7 @@ def test_clear_message_for_wrong_return(started_runner: WorkerDispatcher):
ValidationError,
match="1 validation error for int",
):
started_runner.run(wrong_return_type)
run_rpc_function(wrong_return_type, int)


T = TypeVar("T")
Expand Down

0 comments on commit ce35c84

Please sign in to comment.