diff --git a/link/adapters/controller.py b/link/adapters/controller.py index 4d2289e6..f3bab2a4 100644 --- a/link/adapters/controller.py +++ b/link/adapters/controller.py @@ -1,15 +1,10 @@ """Contains code controlling the execution of use-cases.""" from __future__ import annotations -from typing import Callable, Iterable, Mapping +from typing import Iterable -from link.service.services import ( - DeleteRequest, - ListIdleEntitiesRequest, - PullRequest, - Request, - Services, -) +from link.domain import commands +from link.service.messagebus import MessageBus from .custom_types import PrimaryKey from .identification import IdentificationTranslator @@ -20,21 +15,21 @@ class DJController: def __init__( self, - handlers: Mapping[Services, Callable[[Request], None]], + message_bus: MessageBus, translator: IdentificationTranslator, ) -> None: """Initialize the translator.""" - self.__handlers = handlers - self.__translator = translator + self._message_bus = message_bus + self._translator = translator def pull(self, primary_keys: Iterable[PrimaryKey]) -> None: """Execute the pull use-case.""" - self.__handlers[Services.PULL](PullRequest(frozenset(self.__translator.to_identifiers(primary_keys)))) + self._message_bus.handle(commands.PullEntities(frozenset(self._translator.to_identifiers(primary_keys)))) def delete(self, primary_keys: Iterable[PrimaryKey]) -> None: """Execute the delete use-case.""" - self.__handlers[Services.DELETE](DeleteRequest(frozenset(self.__translator.to_identifiers(primary_keys)))) + self._message_bus.handle(commands.DeleteEntities(frozenset(self._translator.to_identifiers(primary_keys)))) def list_idle_entities(self) -> None: """Execute the use-case that lists idle entities.""" - self.__handlers[Services.LIST_IDLE_ENTITIES](ListIdleEntitiesRequest()) + self._message_bus.handle(commands.ListIdleEntities()) diff --git a/link/adapters/gateway.py b/link/adapters/gateway.py index f219bff2..8e678105 100644 --- a/link/adapters/gateway.py +++ b/link/adapters/gateway.py @@ -5,9 +5,10 @@ from itertools import groupby from typing import Iterable +from link.domain import events from link.domain.custom_types import Identifier from link.domain.link import Link, create_link -from link.domain.state import Commands, Components, Processes, Update +from link.domain.state import Commands, Components, Processes from link.service.gateway import LinkGateway from .custom_types import PrimaryKey @@ -51,10 +52,10 @@ def translate_tainted_primary_keys(primary_keys: Iterable[PrimaryKey]) -> set[Id tainted_identifiers=translate_tainted_primary_keys(self.facade.get_tainted_primary_keys()), ) - def apply(self, updates: Iterable[Update]) -> None: + def apply(self, updates: Iterable[events.StateChanged]) -> None: """Apply updates to the persistent data representing the link.""" - def keyfunc(update: Update) -> int: + def keyfunc(update: events.StateChanged) -> int: assert update.command is not None return update.command.value diff --git a/link/adapters/present.py b/link/adapters/present.py index 28c4e47e..fb20f570 100644 --- a/link/adapters/present.py +++ b/link/adapters/present.py @@ -1,107 +1,40 @@ """Logic associated with presenting information about finished use-cases.""" from __future__ import annotations -from dataclasses import dataclass from typing import Callable, Iterable -from link.service.services import ( - ListIdleEntitiesResponse, - OperationResponse, -) +from link.domain import events from .custom_types import PrimaryKey from .identification import IdentificationTranslator -@dataclass(frozen=True) -class OperationRecord: - """Record of a finished operation.""" - - requests: list[Request] - successes: list[Sucess] - failures: list[Failure] - - -@dataclass(frozen=True) -class Request: - """Record of a request to perform a certain operation on a particular entity.""" - - primary_key: PrimaryKey - operation: str - - -@dataclass(frozen=True) -class Sucess: - """Record of a successful operation on a particular entity.""" - - primary_key: PrimaryKey - operation: str - transition: Transition - - -@dataclass(frozen=True) -class Transition: - """Record of a transition between two states.""" - - old: str - new: str - - -@dataclass(frozen=True) -class Failure: - """Record of a failed operation on a particular entity.""" - - primary_key: PrimaryKey - operation: str - state: str - - -def create_operation_response_presenter( - translator: IdentificationTranslator, show: Callable[[OperationRecord], None] -) -> Callable[[OperationResponse], None]: - """Create a callable that when called presents information about a finished operation.""" - - def get_class_name(obj: type) -> str: - return obj.__name__ - - def present_operation_response(response: OperationResponse) -> None: - show( - OperationRecord( - [ - Request(translator.to_primary_key(identifier), response.operation.name) - for identifier in response.requested - ], - [ - Sucess( - translator.to_primary_key(update.identifier), - operation=response.operation.name, - transition=Transition( - get_class_name(update.transition.current).upper(), - get_class_name(update.transition.new).upper(), - ), - ) - for update in response.updates - ], - [ - Failure( - translator.to_primary_key(error.identifier), - error.operation.name, - get_class_name(error.state).upper(), - ) - for error in response.errors - ], - ) - ) - - return present_operation_response - - def create_idle_entities_updater( translator: IdentificationTranslator, update: Callable[[Iterable[PrimaryKey]], None] -) -> Callable[[ListIdleEntitiesResponse], None]: +) -> Callable[[events.IdleEntitiesListed], None]: """Create a callable that when called updates the list of idle entities.""" - def update_idle_entities(response: ListIdleEntitiesResponse) -> None: + def update_idle_entities(response: events.IdleEntitiesListed) -> None: update(translator.to_primary_key(identifier) for identifier in response.identifiers) return update_idle_entities + + +def create_state_change_logger( + translator: IdentificationTranslator, log: Callable[[str], None] +) -> Callable[[events.StateChanged], None]: + """Create a logger that logs state changes of entities.""" + + def log_state_change(state_change: events.StateChanged) -> None: + context = { + "identifier": translator.to_primary_key(state_change.identifier), + "operation": state_change.operation.name, + "transition": { + "old": state_change.transition.current.__name__, + "new": state_change.transition.new.__name__, + }, + "command": state_change.command.name, + } + log(f"Entity state changed {context}") + + return log_state_change diff --git a/link/domain/commands.py b/link/domain/commands.py new file mode 100644 index 00000000..d6598def --- /dev/null +++ b/link/domain/commands.py @@ -0,0 +1,30 @@ +"""Contains all domain commands.""" +from __future__ import annotations + +from dataclasses import dataclass + +from .custom_types import Identifier + + +@dataclass(frozen=True) +class Command: + """Base class for all commands.""" + + +@dataclass(frozen=True) +class PullEntities(Command): + """Pull the requested entities.""" + + requested: frozenset[Identifier] + + +@dataclass(frozen=True) +class DeleteEntities(Command): + """Delete the requested entities.""" + + requested: frozenset[Identifier] + + +@dataclass(frozen=True) +class ListIdleEntities(Command): + """Start the delete process for the requested entities.""" diff --git a/link/domain/events.py b/link/domain/events.py new file mode 100644 index 00000000..3c517315 --- /dev/null +++ b/link/domain/events.py @@ -0,0 +1,45 @@ +"""Contains all domain events.""" +from __future__ import annotations + +from dataclasses import dataclass +from typing import TYPE_CHECKING + +from .custom_types import Identifier + +if TYPE_CHECKING: + from .state import Commands, Operations, State, Transition + + +@dataclass(frozen=True) +class Event: + """Base class for all events.""" + + +@dataclass(frozen=True) +class OperationApplied(Event): + """An operation was applied to an entity.""" + + operation: Operations + identifier: Identifier + + +@dataclass(frozen=True) +class InvalidOperationRequested(OperationApplied): + """An operation that is invalid given the entities current state was requested.""" + + state: type[State] + + +@dataclass(frozen=True) +class StateChanged(OperationApplied): + """The state of an entity changed during the application of an operation.""" + + transition: Transition + command: Commands + + +@dataclass(frozen=True) +class IdleEntitiesListed(Event): + """Idle entities in a link have been listed.""" + + identifiers: frozenset[Identifier] diff --git a/link/domain/link.py b/link/domain/link.py index 6485d21d..942d5422 100644 --- a/link/domain/link.py +++ b/link/domain/link.py @@ -1,21 +1,11 @@ """Contains the link class.""" from __future__ import annotations -from dataclasses import dataclass -from typing import Any, Iterable, Iterator, Mapping, Optional, Set, Tuple, TypeVar +from collections import deque +from typing import Any, Iterable, Iterator, Mapping, Optional, Set, TypeVar from .custom_types import Identifier -from .state import ( - STATE_MAP, - Components, - Entity, - EntityOperationResult, - InvalidOperation, - Operations, - PersistentState, - Processes, - Update, -) +from .state import STATE_MAP, Components, Entity, Idle, PersistentState, Processes, State def create_link( @@ -70,7 +60,7 @@ def create_entity(identifier: Identifier) -> Entity: state=state, current_process=processes_map.get(identifier, Processes.NONE), is_tainted=is_tainted(identifier), - operation_results=tuple(), + events=deque(), ) return {create_entity(identifier) for identifier in assignments[Components.SOURCE]} @@ -94,44 +84,35 @@ def assign_to_component(component: Components) -> set[Entity]: class Link(Set[Entity]): """The state of a link between two databases.""" - def __init__( - self, entities: Iterable[Entity], operation_results: Tuple[LinkOperationResult, ...] = tuple() - ) -> None: + def __init__(self, entities: Iterable[Entity]) -> None: """Initialize the link.""" self._entities = set(entities) - self._operation_results = operation_results @property def identifiers(self) -> frozenset[Identifier]: """Return the identifiers of all entities in the link.""" return frozenset(entity.identifier for entity in self) - @property - def operation_results(self) -> Tuple[LinkOperationResult, ...]: - """Return the results of operations performed on this link.""" - return self._operation_results - - def apply(self, operation: Operations, *, requested: Iterable[Identifier]) -> Link: - """Apply an operation to the requested entities.""" - - def create_operation_result(results: Iterable[EntityOperationResult]) -> LinkOperationResult: - """Create the result of an operation on a link from results of individual entities.""" - results = set(results) - operation = next(iter(results)).operation - return LinkOperationResult( - operation, - updates=frozenset(result for result in results if isinstance(result, Update)), - errors=frozenset(result for result in results if isinstance(result, InvalidOperation)), - ) - - assert requested, "No identifiers requested." + def pull(self, requested: Iterable[Identifier]) -> None: + """Pull the requested entities.""" + requested = set(requested) + self._validate_requested(requested) + for entity in (entity for entity in self if entity.identifier in requested): + entity.pull() + + def delete(self, requested: Iterable[Identifier]) -> None: + """Delete the requested entities.""" + requested = set(requested) + self._validate_requested(requested) + for entity in (entity for entity in self if entity.identifier in requested): + entity.delete() + + def list_idle_entities(self) -> frozenset[Identifier]: + """List the identifiers of all idle entities in the link.""" + return frozenset(entity.identifier for entity in self if entity.state is Idle) + + def _validate_requested(self, requested: Iterable[Identifier]) -> None: assert set(requested) <= self.identifiers, "Requested identifiers not present in link." - changed = {entity.apply(operation) for entity in self if entity.identifier in requested} - unchanged = {entity for entity in self if entity.identifier not in requested} - operation_results = self.operation_results + ( - create_operation_result(entity.operation_results[-1] for entity in changed), - ) - return Link(changed | unchanged, operation_results) def __contains__(self, entity: object) -> bool: """Check if the link contains the given entity.""" @@ -145,17 +126,12 @@ def __len__(self) -> int: """Return the number of entities in the link.""" return len(self._entities) + def __eq__(self, other: object) -> bool: + """Return True if both links have entities with the same identifiers and states.""" + if not isinstance(other, type(self)): + raise NotImplementedError -@dataclass(frozen=True) -class LinkOperationResult: - """Represents the result of an operation on all entities of a link.""" - - operation: Operations - updates: frozenset[Update] - errors: frozenset[InvalidOperation] + def create_identifier_state_pairs(link: Link) -> set[tuple[Identifier, type[State]]]: + return {(entity.identifier, entity.state) for entity in link} - def __post_init__(self) -> None: - """Validate the result.""" - assert all( - result.operation is self.operation for result in (self.updates | self.errors) - ), "Not all results have same operation." + return create_identifier_state_pairs(self) == create_identifier_state_pairs(other) diff --git a/link/domain/state.py b/link/domain/state.py index 7b453232..52f3d680 100644 --- a/link/domain/state.py +++ b/link/domain/state.py @@ -1,48 +1,49 @@ """Contains everything state related.""" from __future__ import annotations -from dataclasses import dataclass, replace +from collections import deque +from dataclasses import dataclass from enum import Enum, auto from functools import partial -from typing import Union from .custom_types import Identifier +from .events import InvalidOperationRequested, OperationApplied, StateChanged class State: """An entity's state.""" @classmethod - def start_pull(cls, entity: Entity) -> Entity: + def start_pull(cls, entity: Entity) -> None: """Return the command needed to start the pull process for the entity.""" return cls._create_invalid_operation(entity, Operations.START_PULL) @classmethod - def start_delete(cls, entity: Entity) -> Entity: + def start_delete(cls, entity: Entity) -> None: """Return the commands needed to start the delete process for the entity.""" return cls._create_invalid_operation(entity, Operations.START_DELETE) @classmethod - def process(cls, entity: Entity) -> Entity: + def process(cls, entity: Entity) -> None: """Return the commands needed to process the entity.""" return cls._create_invalid_operation(entity, Operations.PROCESS) @staticmethod - def _create_invalid_operation(entity: Entity, operation: Operations) -> Entity: - updated = entity.operation_results + (InvalidOperation(operation, entity.identifier, entity.state),) - return replace(entity, operation_results=updated) + def _create_invalid_operation(entity: Entity, operation: Operations) -> None: + entity.events.append(InvalidOperationRequested(operation, entity.identifier, entity.state)) @classmethod def _transition_entity( cls, entity: Entity, operation: Operations, new_state: type[State], *, new_process: Processes | None = None - ) -> Entity: + ) -> None: if new_process is None: new_process = entity.current_process transition = Transition(cls, new_state) - updated_results = entity.operation_results + ( - Update(operation, entity.identifier, transition, TRANSITION_MAP[transition]), + entity.state = transition.new + entity.current_process = new_process + entity.events.append( + StateChanged(operation, entity.identifier, transition, TRANSITION_MAP[transition]), ) - return replace(entity, state=transition.new, current_process=new_process, operation_results=updated_results) class States: @@ -68,7 +69,7 @@ class Idle(State): """The default state of an entity.""" @classmethod - def start_pull(cls, entity: Entity) -> Entity: + def start_pull(cls, entity: Entity) -> None: """Return the command needed to start the pull process for an entity.""" return cls._transition_entity(entity, Operations.START_PULL, Activated, new_process=Processes.PULL) @@ -80,7 +81,7 @@ class Activated(State): """The state of an activated entity.""" @classmethod - def process(cls, entity: Entity) -> Entity: + def process(cls, entity: Entity) -> None: """Return the commands needed to process an activated entity.""" transition_entity = partial(cls._transition_entity, entity, Operations.PROCESS) if entity.is_tainted: @@ -99,7 +100,7 @@ class Received(State): """The state of an received entity.""" @classmethod - def process(cls, entity: Entity) -> Entity: + def process(cls, entity: Entity) -> None: """Return the commands needed to process a received entity.""" transition_entity = partial(cls._transition_entity, entity, Operations.PROCESS) if entity.current_process is Processes.PULL: @@ -119,7 +120,7 @@ class Pulled(State): """The state of an entity that has been copied to the local side.""" @classmethod - def start_delete(cls, entity: Entity) -> Entity: + def start_delete(cls, entity: Entity) -> None: """Return the commands needed to start the delete process for the entity.""" return cls._transition_entity(entity, Operations.START_DELETE, Received, new_process=Processes.DELETE) @@ -131,7 +132,7 @@ class Tainted(State): """The state of an entity that has been flagged as faulty by the source side.""" @classmethod - def start_delete(cls, entity: Entity) -> Entity: + def start_delete(cls, entity: Entity) -> None: """Return the commands needed to start the delete process for the entity.""" return cls._transition_entity(entity, Operations.START_DELETE, Received, new_process=Processes.DELETE) @@ -191,28 +192,6 @@ class Operations(Enum): PROCESS = auto() -@dataclass(frozen=True) -class Update: - """Represents the persistent update needed to transition an entity.""" - - operation: Operations - identifier: Identifier - transition: Transition - command: Commands - - -@dataclass(frozen=True) -class InvalidOperation: - """Represents the result of attempting an operation that is invalid in the entity's current state.""" - - operation: Operations - identifier: Identifier - state: type[State] - - -EntityOperationResult = Union[Update, InvalidOperation] - - class Processes(Enum): """Names for processes that pull/delete entities into/from the local side.""" @@ -282,7 +261,7 @@ class PersistentState: } -@dataclass(frozen=True) +@dataclass class Entity: """An entity in a link.""" @@ -290,9 +269,21 @@ class Entity: state: type[State] current_process: Processes is_tainted: bool - operation_results: tuple[EntityOperationResult, ...] + events: deque[OperationApplied] + + def pull(self) -> None: + """Pull the entity.""" + self._finish_process() + self.apply(Operations.START_PULL) + self._finish_process() - def apply(self, operation: Operations) -> Entity: + def delete(self) -> None: + """Delete the entity.""" + self._finish_process() + self.apply(Operations.START_DELETE) + self._finish_process() + + def apply(self, operation: Operations) -> None: """Apply an operation to the entity.""" if operation is Operations.START_PULL: return self._start_pull() @@ -301,14 +292,28 @@ def apply(self, operation: Operations) -> Entity: if operation is Operations.PROCESS: return self._process() - def _start_pull(self) -> Entity: + def _start_pull(self) -> None: """Start the pull process for the entity.""" return self.state.start_pull(self) - def _start_delete(self) -> Entity: + def _start_delete(self) -> None: """Start the delete process for the entity.""" return self.state.start_delete(self) - def _process(self) -> Entity: + def _process(self) -> None: """Process the entity.""" return self.state.process(self) + + def _finish_process(self) -> None: + while self.current_process is not Processes.NONE: + self.apply(Operations.PROCESS) + + def __hash__(self) -> int: + """Return the hash of this entity.""" + return hash(self.identifier) + + def __eq__(self, other: object) -> bool: + """Return True if both entities are equal.""" + if not isinstance(other, type(self)): + raise NotImplementedError + return hash(self) == hash(other) diff --git a/link/infrastructure/link.py b/link/infrastructure/link.py index 53b727a1..06f0f77d 100644 --- a/link/infrastructure/link.py +++ b/link/infrastructure/link.py @@ -1,6 +1,7 @@ """Contains the link decorator that is used by the user to establish a link.""" from __future__ import annotations +import logging from collections.abc import Callable from functools import partial from typing import Any, Mapping, Optional @@ -9,26 +10,14 @@ from link.adapters.custom_types import PrimaryKey from link.adapters.gateway import DJLinkGateway from link.adapters.identification import IdentificationTranslator -from link.adapters.present import ( - create_idle_entities_updater, - create_operation_response_presenter, -) -from link.service.io import make_responsive -from link.service.services import ( - Services, - delete, - list_idle_entities, - process, - process_to_completion, - pull, - start_delete_process, - start_pull_process, -) +from link.adapters.present import create_idle_entities_updater, create_state_change_logger +from link.domain import commands, events +from link.service.handlers import delete, list_idle_entities, log_state_change, pull +from link.service.messagebus import CommandHandlers, EventHandlers, MessageBus from link.service.uow import UnitOfWork from . import DJConfiguration, create_tables from .facade import DJLinkFacade -from .log import create_operation_logger from .mixin import create_local_endpoint from .sequence import IterationCallbackList, create_content_replacer @@ -58,34 +47,20 @@ def inner(obj: type) -> Any: uow = UnitOfWork(gateway) source_restriction: IterationCallbackList[PrimaryKey] = IterationCallbackList() idle_entities_updater = create_idle_entities_updater(translator, create_content_replacer(source_restriction)) - operation_presenter = create_operation_response_presenter(translator, create_operation_logger()) - process_service = partial(make_responsive(partial(process, uow=uow)), output_port=operation_presenter) - start_pull_process_service = partial( - make_responsive(partial(start_pull_process, uow=uow)), output_port=operation_presenter + logger = logging.getLogger(obj.__name__) + command_handlers: CommandHandlers = {} + command_handlers[commands.PullEntities] = partial(pull, uow=uow) + command_handlers[commands.DeleteEntities] = partial(delete, uow=uow) + command_handlers[commands.ListIdleEntities] = partial( + list_idle_entities, uow=uow, output_port=idle_entities_updater ) - start_delete_process_service = partial( - make_responsive(partial(start_delete_process, uow=uow)), output_port=operation_presenter - ) - process_to_completion_service = partial( - make_responsive(partial(process_to_completion, process_service=process_service)), output_port=lambda x: None - ) - handlers = { - Services.PULL: partial( - pull, - process_to_completion_service=process_to_completion_service, - start_pull_process_service=start_pull_process_service, - output_port=lambda x: None, - ), - Services.DELETE: partial( - delete, - process_to_completion_service=process_to_completion_service, - start_delete_process_service=start_delete_process_service, - output_port=lambda x: None, - ), - Services.PROCESS: partial(process, uow=uow, output_port=operation_presenter), - Services.LIST_IDLE_ENTITIES: partial(list_idle_entities, uow=uow, output_port=idle_entities_updater), - } - controller = DJController(handlers, translator) + event_handlers: EventHandlers = {} + event_handlers[events.StateChanged] = [ + partial(log_state_change, log=create_state_change_logger(translator, logger.info)) + ] + event_handlers[events.InvalidOperationRequested] = [lambda event: None] + bus = MessageBus(uow, command_handlers, event_handlers) + controller = DJController(bus, translator) source_restriction.callback = controller.list_idle_entities return create_local_endpoint(controller, tables, source_restriction) diff --git a/link/infrastructure/log.py b/link/infrastructure/log.py deleted file mode 100644 index b7f4248e..00000000 --- a/link/infrastructure/log.py +++ /dev/null @@ -1,21 +0,0 @@ -"""Logging functionality.""" -import logging -from dataclasses import asdict -from typing import Callable - -from link.adapters.present import OperationRecord - - -def create_operation_logger() -> Callable[[OperationRecord], None]: - """Create a function that logs information about finished operations.""" - logger = logging.getLogger("link[operations]") - - def log(record: OperationRecord) -> None: - for request in record.requests: - logger.info(f"Operation requested {asdict(request)}") - for success in record.successes: - logger.info(f"Operation succeeded {asdict(success)}") - for failure in record.failures: - logger.info(f"Operation failed {asdict(failure)}") - - return log diff --git a/link/service/gateway.py b/link/service/gateway.py index 999b76e8..f7b3d375 100644 --- a/link/service/gateway.py +++ b/link/service/gateway.py @@ -4,8 +4,8 @@ from abc import ABC, abstractmethod from collections.abc import Iterable +from link.domain import events from link.domain.link import Link -from link.domain.state import Update class LinkGateway(ABC): @@ -16,5 +16,5 @@ def create_link(self) -> Link: """Create a link from the persistent data.""" @abstractmethod - def apply(self, updates: Iterable[Update]) -> None: + def apply(self, updates: Iterable[events.StateChanged]) -> None: """Apply updates to the link's persistent data.""" diff --git a/link/service/handlers.py b/link/service/handlers.py new file mode 100644 index 00000000..625c9c7e --- /dev/null +++ b/link/service/handlers.py @@ -0,0 +1,39 @@ +"""Contains code handling domain commands and events.""" +from __future__ import annotations + +from collections.abc import Callable + +from link.domain import commands, events + +from .uow import UnitOfWork + + +def pull(command: commands.PullEntities, *, uow: UnitOfWork) -> None: + """Pull entities across the link.""" + with uow: + uow.link.pull(command.requested) + uow.commit() + + +def delete(command: commands.DeleteEntities, *, uow: UnitOfWork) -> None: + """Delete pulled entities.""" + with uow: + uow.link.delete(command.requested) + uow.commit() + + +def list_idle_entities( + command: commands.ListIdleEntities, + *, + uow: UnitOfWork, + output_port: Callable[[events.IdleEntitiesListed], None], +) -> None: + """List all idle entities.""" + with uow: + idle = uow.link.list_idle_entities() + output_port(events.IdleEntitiesListed(idle)) + + +def log_state_change(event: events.StateChanged, log: Callable[[events.StateChanged], None]) -> None: + """Log the state change of an entity.""" + log(event) diff --git a/link/service/io.py b/link/service/io.py deleted file mode 100644 index cef47d47..00000000 --- a/link/service/io.py +++ /dev/null @@ -1,62 +0,0 @@ -"""Contains logic related to input/output handling to/from services.""" -from __future__ import annotations - -from functools import partial -from typing import Any, Callable, Generic, Protocol, TypeVar - -from .services import Request, Response - -_Response = TypeVar("_Response", bound=Response) - - -class ResponseRelay(Generic[_Response]): - """A relay that makes the response of one service available to another.""" - - def __init__(self) -> None: - """Initialize the relay.""" - self._response: _Response | None = None - - def get_response(self) -> _Response: - """Return the response of the relayed service.""" - assert self._response is not None - return self._response - - def __call__(self, response: _Response) -> None: - """Store the response of the relayed service.""" - self._response = response - - -_Request = TypeVar("_Request", bound=Request) - -_Response_co = TypeVar("_Response_co", bound=Response, covariant=True) - -_Request_contra = TypeVar("_Request_contra", bound=Request, contravariant=True) - - -class Service(Protocol[_Request_contra, _Response_co]): - """Protocol for services.""" - - def __call__(self, request: _Request_contra, *, output_port: Callable[[_Response_co], None], **kwargs: Any) -> None: - """Execute the service.""" - - -class ReturningService(Protocol[_Request_contra, _Response_co]): - """Protocol for services that return their response.""" - - def __call__( - self, request: _Request_contra, *, output_port: Callable[[_Response_co], None], **kwargs: Any - ) -> _Response_co: - """Execute the service.""" - - -def make_responsive(service: Service[_Request, _Response]) -> ReturningService[_Request, _Response]: - """Create a version of the service that returns its response in addition to sending it to the output port.""" - relay: ResponseRelay[_Response] = ResponseRelay() - service = partial(service, output_port=relay) - - def returning_service(request: _Request, *, output_port: Callable[[_Response], None], **kwargs: Any) -> _Response: - service(request, **kwargs) - output_port(relay.get_response()) - return relay.get_response() - - return returning_service diff --git a/link/service/messagebus.py b/link/service/messagebus.py new file mode 100644 index 00000000..3e981805 --- /dev/null +++ b/link/service/messagebus.py @@ -0,0 +1,80 @@ +"""Contains the message bus.""" +from __future__ import annotations + +import logging +from collections import deque +from typing import Callable, Iterable, Protocol, TypeVar, Union + +from link.domain.commands import Command +from link.domain.events import Event + +from .uow import UnitOfWork + +Message = Union[Command, Event] + + +logger = logging.getLogger() + +T = TypeVar("T", bound=Command) + + +class CommandHandlers(Protocol): + """A mapping of command types to handlers.""" + + def __getitem__(self, command_type: type[T]) -> Callable[[T], None]: + """Get the appropriate handler for the given type of command.""" + + def __setitem__(self, command_type: type[T], handler: Callable[[T], None]) -> None: + """Set the appropriate handler for the given type of command.""" + + +V = TypeVar("V", bound=Event) + + +class EventHandlers(Protocol): + """A mapping of event types to handlers.""" + + def __getitem__(self, event_type: type[V]) -> Iterable[Callable[[V], None]]: + """Get the appropriate handlers for the given type of event.""" + + def __setitem__(selG, event_type: type[V], handlers: Iterable[Callable[[V], None]]) -> None: + """Set the appropriate handlers for the given type of event.""" + + +class MessageBus: + """A message bus that dispatches domain messages to their appropriate handlers.""" + + def __init__(self, uow: UnitOfWork, command_handlers: CommandHandlers, event_handlers: EventHandlers) -> None: + """Initialize the bus.""" + self._uow = uow + self._queue: deque[Message] = deque() + self._command_handlers = command_handlers + self._event_handlers = event_handlers + + def handle(self, message: Message) -> None: + """Handle the message.""" + self._queue.append(message) + while self._queue: + message = self._queue.popleft() + if isinstance(message, Command): + self._handle_command(message) + elif isinstance(message, Event): + self._handle_event(message) + else: + raise TypeError(f"Unknown message type {type(message)!r}") + self._queue.extend(self._uow.collect_new_events()) + + def _handle_command(self, command: Command) -> None: + handler = self._command_handlers[type(command)] + try: + handler(command) + except Exception: + logger.exception(f"Error handling command {command!r} with handler {handler!r}") + raise + + def _handle_event(self, event: Event) -> None: + for handler in self._event_handlers[type(event)]: + try: + handler(event) + except Exception: + logger.exception(f"Error handling event {event!r} with handler {handler!r}") diff --git a/link/service/services.py b/link/service/services.py deleted file mode 100644 index a3237f6c..00000000 --- a/link/service/services.py +++ /dev/null @@ -1,202 +0,0 @@ -"""Contains all the services.""" -from __future__ import annotations - -from collections.abc import Callable -from dataclasses import dataclass -from enum import Enum, auto - -from link.domain.custom_types import Identifier -from link.domain.state import InvalidOperation, Operations, Update, states - -from .uow import UnitOfWork - - -class Request: - """Base class for all request models.""" - - -class Response: - """Base class for all response models.""" - - -@dataclass(frozen=True) -class PullRequest(Request): - """Request model for the pull use-case.""" - - requested: frozenset[Identifier] - - -@dataclass(frozen=True) -class PullResponse(Response): - """Response model for the pull use-case.""" - - requested: frozenset[Identifier] - errors: frozenset[InvalidOperation] - - -def pull( - request: PullRequest, - *, - process_to_completion_service: Callable[[ProcessToCompletionRequest], ProcessToCompletionResponse], - start_pull_process_service: Callable[[StartPullProcessRequest], OperationResponse], - output_port: Callable[[PullResponse], None], -) -> None: - """Pull entities across the link.""" - process_to_completion_service(ProcessToCompletionRequest(request.requested)) - response = start_pull_process_service(StartPullProcessRequest(request.requested)) - errors = (error for error in response.errors if error.state is states.Deprecated) - process_to_completion_service(ProcessToCompletionRequest(request.requested)) - output_port(PullResponse(request.requested, errors=frozenset(errors))) - - -@dataclass(frozen=True) -class DeleteRequest(Request): - """Request model for the delete use-case.""" - - requested: frozenset[Identifier] - - -@dataclass(frozen=True) -class DeleteResponse(Response): - """Response model for the delete use-case.""" - - requested: frozenset[Identifier] - - -def delete( - request: DeleteRequest, - *, - process_to_completion_service: Callable[[ProcessToCompletionRequest], ProcessToCompletionResponse], - start_delete_process_service: Callable[[StartDeleteProcessRequest], OperationResponse], - output_port: Callable[[DeleteResponse], None], -) -> None: - """Delete pulled entities.""" - process_to_completion_service(ProcessToCompletionRequest(request.requested)) - start_delete_process_service(StartDeleteProcessRequest(request.requested)) - process_to_completion_service(ProcessToCompletionRequest(request.requested)) - output_port(DeleteResponse(request.requested)) - - -@dataclass(frozen=True) -class ProcessToCompletionRequest(Request): - """Request model for the process to completion use-case.""" - - requested: frozenset[Identifier] - - -@dataclass(frozen=True) -class ProcessToCompletionResponse(Response): - """Response model for the process to completion use-case.""" - - requested: frozenset[Identifier] - - -def process_to_completion( - request: ProcessToCompletionRequest, - *, - process_service: Callable[[ProcessRequest], OperationResponse], - output_port: Callable[[ProcessToCompletionResponse], None], -) -> None: - """Process entities until their processes are complete.""" - while process_service(ProcessRequest(request.requested)).updates: - pass - output_port(ProcessToCompletionResponse(request.requested)) - - -@dataclass(frozen=True) -class OperationResponse(Response): - """Response model for all use-cases that operate on entities.""" - - operation: Operations - requested: frozenset[Identifier] - updates: frozenset[Update] - errors: frozenset[InvalidOperation] - - -@dataclass(frozen=True) -class StartPullProcessRequest(Request): - """Request model for the start-pull-process service.""" - - requested: frozenset[Identifier] - - -def start_pull_process( - request: StartPullProcessRequest, - *, - uow: UnitOfWork, - output_port: Callable[[OperationResponse], None], -) -> None: - """Start the pull process for the requested entities.""" - with uow: - result = uow.link.apply(Operations.START_PULL, requested=request.requested).operation_results[0] - uow.commit() - output_port(OperationResponse(result.operation, request.requested, result.updates, result.errors)) - - -@dataclass(frozen=True) -class StartDeleteProcessRequest(Request): - """Request model for the start-delete-process service.""" - - requested: frozenset[Identifier] - - -def start_delete_process( - request: StartDeleteProcessRequest, - *, - uow: UnitOfWork, - output_port: Callable[[OperationResponse], None], -) -> None: - """Start the delete process for the requested entities.""" - with uow: - result = uow.link.apply(Operations.START_DELETE, requested=request.requested).operation_results[0] - uow.commit() - output_port(OperationResponse(result.operation, request.requested, result.updates, result.errors)) - - -@dataclass(frozen=True) -class ProcessRequest(Request): - """Request model for the process use-case.""" - - requested: frozenset[Identifier] - - -def process(request: ProcessRequest, *, uow: UnitOfWork, output_port: Callable[[OperationResponse], None]) -> None: - """Process entities.""" - with uow: - result = uow.link.apply(Operations.PROCESS, requested=request.requested).operation_results[0] - uow.commit() - output_port(OperationResponse(result.operation, request.requested, result.updates, result.errors)) - - -@dataclass(frozen=True) -class ListIdleEntitiesRequest(Request): - """Request model for the use-case that lists idle entities.""" - - -@dataclass(frozen=True) -class ListIdleEntitiesResponse(Response): - """Response model for the use-case that lists idle entities.""" - - identifiers: frozenset[Identifier] - - -def list_idle_entities( - request: ListIdleEntitiesRequest, - *, - uow: UnitOfWork, - output_port: Callable[[ListIdleEntitiesResponse], None], -) -> None: - """List all idle entities.""" - with uow: - output_port( - ListIdleEntitiesResponse(frozenset(entity.identifier for entity in uow.link if entity.state is states.Idle)) - ) - - -class Services(Enum): - """Names for all available services.""" - - PULL = auto() - DELETE = auto() - PROCESS = auto() - LIST_IDLE_ENTITIES = auto() diff --git a/link/service/uow.py b/link/service/uow.py index 34bbf0ef..3235d467 100644 --- a/link/service/uow.py +++ b/link/service/uow.py @@ -2,13 +2,14 @@ from __future__ import annotations from abc import ABC -from collections import defaultdict, deque +from collections import deque from types import TracebackType -from typing import Callable, Iterable, Protocol +from typing import Callable, Iterable, Iterator, Protocol +from link.domain import events from link.domain.custom_types import Identifier from link.domain.link import Link -from link.domain.state import TRANSITION_MAP, Entity, Operations, Transition, Update +from link.domain.state import TRANSITION_MAP, Entity, Operations, Transition from .gateway import LinkGateway @@ -27,61 +28,40 @@ def __init__(self, gateway: LinkGateway) -> None: """Initialize the unit of work.""" self._gateway = gateway self._link: Link | None = None - self._updates: dict[Identifier, deque[Update]] = defaultdict(deque) + self._updates: deque[events.StateChanged] = deque() + self._events: deque[events.Event] = deque() + self._seen: list[Entity] = [] def __enter__(self) -> UnitOfWork: """Enter the context in which updates to entities can be made.""" - def augment_link(link: Link) -> None: - original = getattr(link, "apply") - augmented = augment_link_apply(link, original) - object.__setattr__(link, "apply", augmented) - object.__setattr__(link, "_is_expired", False) - - def augment_link_apply(current: Link, original: SupportsLinkApply) -> SupportsLinkApply: - def augmented(operation: Operations, *, requested: Iterable[Identifier]) -> Link: - assert hasattr(current, "_is_expired") - if current._is_expired: - raise RuntimeError("Can not apply operation to expired link") - self._link = original(operation, requested=requested) - augment_link(self._link) - object.__setattr__(current, "_is_expired", True) - return self._link - - return augmented - def augment_entity(entity: Entity) -> None: original = getattr(entity, "apply") augmented = augment_entity_apply(entity, original) - object.__setattr__(entity, "apply", augmented) - object.__setattr__(entity, "_is_expired", False) + setattr(entity, "apply", augmented) + setattr(entity, "_is_expired", False) def augment_entity_apply( - current: Entity, original: Callable[[Operations], Entity] - ) -> Callable[[Operations], Entity]: - def augmented(operation: Operations) -> Entity: - assert hasattr(current, "_is_expired") - if current._is_expired is True: - raise RuntimeError("Can not apply operation to expired entity") - new = original(operation) - store_update(operation, current, new) - augment_entity(new) - object.__setattr__(current, "_is_expired", True) - return new + entity: Entity, original: Callable[[Operations], None] + ) -> Callable[[Operations], None]: + def augmented(operation: Operations) -> None: + assert hasattr(entity, "_is_expired") + if entity._is_expired: + raise RuntimeError("Can not apply operation to expired entity.") + if entity not in self._seen: + self._seen.append(entity) + current_state = entity.state + original(operation) + new_state = entity.state + if current_state is new_state: + return + transition = Transition(current_state, new_state) + command = TRANSITION_MAP[transition] + self._updates.append(events.StateChanged(operation, entity.identifier, transition, command)) return augmented - def store_update(operation: Operations, current: Entity, new: Entity) -> None: - assert current.identifier == new.identifier - if current.state is new.state: - return - transition = Transition(current.state, new.state) - self._updates[current.identifier].append( - Update(operation, current.identifier, transition, TRANSITION_MAP[transition]) - ) - self._link = self._gateway.create_link() - augment_link(self._link) for entity in self._link: augment_entity(entity) return self @@ -105,16 +85,24 @@ def commit(self) -> None: if self._link is None: raise RuntimeError("Not available outside of context") while self._updates: - identifier, updates = self._updates.popitem() - while updates: - self._gateway.apply([updates.popleft()]) + self._gateway.apply([self._updates.popleft()]) + for entity in self._seen: + while entity.events: + self._events.append(entity.events.popleft()) self.rollback() def rollback(self) -> None: """Throw away any not yet persisted updates.""" if self._link is None: raise RuntimeError("Not available outside of context") - object.__setattr__(self._link, "_is_expired", True) for entity in self._link: - object.__setattr__(entity, "_is_expired", True) + setattr(entity, "_is_expired", True) self._updates.clear() + self._seen.clear() + + def collect_new_events(self) -> Iterator[events.Event]: + """Collect new events from entities.""" + if self._link is not None: + raise RuntimeError("New events can not be collected when inside context") + while self._events: + yield self._events.popleft() diff --git a/tests/functional/conftest.py b/tests/functional/conftest.py index cc2dc7b6..b01b46f9 100644 --- a/tests/functional/conftest.py +++ b/tests/functional/conftest.py @@ -20,7 +20,7 @@ SCOPE = os.environ.get("SCOPE", "session") REMOVE = True DATABASE_IMAGE = "cblessing24/mariadb:11.1" -MINIO_IMAGE = "minio/minio:latest" +MINIO_IMAGE = "minio/minio:RELEASE.2023-11-01T18-37-25Z" DATABASE_ROOT_PASSWORD = "root" @@ -172,10 +172,10 @@ def get_runner_kwargs(docker_client, spec): ) elif isinstance(spec, MinIOSpec): processed_container_config = dict( - environment=dict(MINIO_ACCESS_KEY=spec.config.access_key, MINIO_SECRET_KEY=spec.config.secret_key), + environment=dict(MINIO_ROOT_USER=spec.config.access_key, MINIO_ROOT_PASSWORD=spec.config.secret_key), command=["server", "/data"], healthcheck=dict( - test=["CMD", "curl", "-f", "127.0.0.1:9000/minio/health/ready"], + test=["CMD", "mc", "ready", "local"], start_period=int(spec.container.health_check.start_period_seconds * 1e9), # nanoseconds interval=int(spec.container.health_check.interval_seconds * 1e9), # nanoseconds retries=spec.container.health_check.max_retries, diff --git a/tests/integration/gateway.py b/tests/integration/gateway.py index e8c1eec9..075c6220 100644 --- a/tests/integration/gateway.py +++ b/tests/integration/gateway.py @@ -3,9 +3,10 @@ from collections.abc import Mapping from typing import Iterable +from link.domain import events from link.domain.custom_types import Identifier from link.domain.link import Link, create_link -from link.domain.state import Commands, Components, Processes, Update +from link.domain.state import Commands, Components, Processes from link.service.gateway import LinkGateway @@ -27,7 +28,7 @@ def __init__( def create_link(self) -> Link: return create_link(self.assignments, tainted_identifiers=self.tainted_identifiers, processes=self.processes) - def apply(self, updates: Iterable[Update]) -> None: + def apply(self, updates: Iterable[events.StateChanged]) -> None: for update in updates: if update.command is Commands.START_PULL_PROCESS: self.processes[Processes.PULL].add(update.identifier) diff --git a/tests/integration/test_datajoint_persistence.py b/tests/integration/test_datajoint_persistence.py index 0de89752..9b1aaff5 100644 --- a/tests/integration/test_datajoint_persistence.py +++ b/tests/integration/test_datajoint_persistence.py @@ -17,6 +17,7 @@ from link.adapters import PrimaryKey from link.adapters.gateway import DJLinkGateway from link.adapters.identification import IdentificationTranslator +from link.domain import events from link.domain.link import create_link from link.domain.state import Components, Operations, Processes from link.infrastructure.facade import DJLinkFacade, Table @@ -340,6 +341,19 @@ def test_link_creation() -> None: ) +def apply_update(gateway: DJLinkGateway, operation: Operations, requested: Iterable[PrimaryKey]) -> None: + link = gateway.create_link() + for entity in link: + if entity.identifier not in {gateway.translator.to_identifier(key) for key in requested}: + continue + entity.apply(operation) + while entity.events: + event = entity.events.popleft() + if not isinstance(event, events.StateChanged): + continue + gateway.apply([event]) + + def test_add_to_local_command() -> None: tables = create_tables( "link", @@ -370,12 +384,7 @@ def test_add_to_local_command() -> None: ), ) - gateway.apply( - gateway.create_link() - .apply(Operations.PROCESS, requested={gateway.translator.to_identifier({"a": 0})}) - .operation_results[0] - .updates - ) + apply_update(gateway, Operations.PROCESS, [{"a": 0}]) assert has_state( tables, @@ -413,12 +422,7 @@ def test_add_to_local_command_with_error() -> None: tables["local"].children(as_objects=True)[0].error_on_insert = RuntimeError try: - gateway.apply( - gateway.create_link() - .apply(Operations.PROCESS, requested={gateway.translator.to_identifier({"a": 0})}) - .operation_results[0] - .updates - ) + apply_update(gateway, Operations.PROCESS, [{"a": 0}]) except RuntimeError: pass @@ -435,12 +439,7 @@ def test_add_to_local_command_with_external_file(tmpdir: Path) -> None: tables["source"].insert([{"a": 0, "external": insert_filepath}]) os.remove(insert_filepath) tables["outbound"].insert([{"a": 0, "process": "PULL", "is_flagged": "FALSE", "is_deprecated": "FALSE"}]) - gateway.apply( - gateway.create_link() - .apply(Operations.PROCESS, requested={gateway.translator.to_identifier({"a": 0})}) - .operation_results[0] - .updates - ) + apply_update(gateway, Operations.PROCESS, [{"a": 0}]) fetch_filepath = Path(tables["local"].fetch(as_dict=True, download_path=str(tmpdir))[0]["external"]) with fetch_filepath.open(mode="rb") as file: assert file.read() == data @@ -459,12 +458,7 @@ def test_remove_from_local_command() -> None: ) with as_stdin(StringIO("y")): - gateway.apply( - gateway.create_link() - .apply(Operations.PROCESS, requested={gateway.translator.to_identifier({"a": 0})}) - .operation_results[0] - .updates - ) + apply_update(gateway, Operations.PROCESS, [{"a": 0}]) assert has_state( tables, @@ -480,12 +474,7 @@ def test_start_pull_process() -> None: "link", primary={"a"}, non_primary={"b"}, initial=State(source=TableState([{"a": 0, "b": 1}])) ) - gateway.apply( - gateway.create_link() - .apply(Operations.START_PULL, requested={gateway.translator.to_identifier({"a": 0})}) - .operation_results[0] - .updates - ) + apply_update(gateway, Operations.START_PULL, [{"a": 0}]) assert has_state( tables, @@ -510,12 +499,7 @@ def initial_state() -> State: def test_state_after_command(initial_state: State) -> None: tables, gateway = initialize("link", primary={"a"}, non_primary={"b"}, initial=initial_state) - gateway.apply( - gateway.create_link() - .apply(Operations.PROCESS, requested={gateway.translator.to_identifier({"a": 0})}) - .operation_results[0] - .updates - ) + apply_update(gateway, Operations.PROCESS, [{"a": 0}]) assert has_state( tables, @@ -532,12 +516,7 @@ def test_rollback_on_error(initial_state: State) -> None: tables["outbound"].error_on_insert = RuntimeError try: - gateway.apply( - gateway.create_link() - .apply(Operations.PROCESS, requested={gateway.translator.to_identifier({"a": 0})}) - .operation_results[0] - .updates - ) + apply_update(gateway, Operations.PROCESS, [{"a": 0}]) except RuntimeError: pass @@ -558,12 +537,7 @@ def initial_state() -> State: def test_state_after_command(initial_state: State) -> None: tables, gateway = initialize("link", primary={"a"}, non_primary={"b"}, initial=initial_state) - gateway.apply( - gateway.create_link() - .apply(Operations.START_DELETE, requested={gateway.translator.to_identifier({"a": 0})}) - .operation_results[0] - .updates - ) + apply_update(gateway, Operations.START_DELETE, [{"a": 0}]) assert has_state( tables, @@ -580,12 +554,7 @@ def test_rollback_on_error(initial_state: State) -> None: tables["outbound"].error_on_insert = RuntimeError try: - gateway.apply( - gateway.create_link() - .apply(Operations.START_DELETE, requested={gateway.translator.to_identifier({"a": 0})}) - .operation_results[0] - .updates - ) + apply_update(gateway, Operations.START_DELETE, [{"a": 0}]) except RuntimeError: pass @@ -603,12 +572,7 @@ def test_finish_delete_process_command() -> None: ), ) - gateway.apply( - gateway.create_link() - .apply(Operations.PROCESS, requested={gateway.translator.to_identifier({"a": 0})}) - .operation_results[0] - .updates - ) + apply_update(gateway, Operations.PROCESS, [{"a": 0}]) assert has_state(tables, State(source=TableState([{"a": 0, "b": 1}]))) @@ -626,12 +590,7 @@ def initial_state() -> State: def test_state_after_command(initial_state: State) -> None: tables, gateway = initialize("link", primary={"a"}, non_primary={"b"}, initial=initial_state) - gateway.apply( - gateway.create_link() - .apply(Operations.PROCESS, requested={gateway.translator.to_identifier({"a": 0})}) - .operation_results[0] - .updates - ) + apply_update(gateway, Operations.PROCESS, [{"a": 0}]) assert has_state( tables, @@ -647,12 +606,7 @@ def test_rollback_on_error(initial_state: State) -> None: tables["outbound"].error_on_insert = RuntimeError try: - gateway.apply( - gateway.create_link() - .apply(Operations.PROCESS, requested={gateway.translator.to_identifier({"a": 0})}) - .operation_results[0] - .updates - ) + apply_update(gateway, Operations.PROCESS, [{"a": 0}]) except RuntimeError: pass @@ -677,12 +631,7 @@ def test_applying_multiple_commands() -> None: ) with as_stdin(StringIO("y")): - gateway.apply( - gateway.create_link() - .apply(Operations.PROCESS, requested=gateway.translator.to_identifiers([{"a": 0}, {"a": 1}])) - .operation_results[0] - .updates - ) + apply_update(gateway, Operations.PROCESS, [{"a": 0}, {"a": 1}]) assert has_state( tables, diff --git a/tests/integration/test_services.py b/tests/integration/test_services.py index 444eab47..ee1630b6 100644 --- a/tests/integration/test_services.py +++ b/tests/integration/test_services.py @@ -1,38 +1,19 @@ from __future__ import annotations -from collections.abc import Callable from functools import partial -from typing import Generic, TypedDict, TypeVar +from typing import Callable, Generic, TypedDict, TypeVar import pytest -from link.domain.state import Components, InvalidOperation, Operations, Processes, State, states -from link.service.io import Service, make_responsive -from link.service.services import ( - DeleteRequest, - DeleteResponse, - ListIdleEntitiesRequest, - ListIdleEntitiesResponse, - OperationResponse, - ProcessRequest, - ProcessToCompletionRequest, - PullRequest, - PullResponse, - Response, - delete, - list_idle_entities, - process, - process_to_completion, - pull, - start_delete_process, - start_pull_process, -) +from link.domain import commands, events +from link.domain.state import Components, Processes, State, states +from link.service.handlers import delete, list_idle_entities, pull from link.service.uow import UnitOfWork -from tests.assignments import create_assignments, create_identifier, create_identifiers +from tests.assignments import create_assignments, create_identifiers from .gateway import FakeLinkGateway -T = TypeVar("T", bound=Response) +T = TypeVar("T", bound=events.Event) class FakeOutputPort(Generic[T]): @@ -86,41 +67,17 @@ def create_uow(state: type[State], process: Processes | None = None, is_tainted: ) -def create_process_to_completion_service(uow: UnitOfWork) -> Callable[[ProcessToCompletionRequest], None]: - process_service = partial(make_responsive(partial(process, uow=uow)), output_port=lambda x: None) - return partial( - make_responsive( - partial( - process_to_completion, - process_service=process_service, - ), - ), - output_port=lambda x: None, - ) +_Event_co = TypeVar("_Event_co", bound=events.Event, covariant=True) +_Command_contra = TypeVar("_Command_contra", bound=commands.Command, contravariant=True) -def create_pull_service(uow: UnitOfWork) -> Service[PullRequest, PullResponse]: - process_to_completion_service = create_process_to_completion_service(uow) - start_pull_process_service = partial( - make_responsive(partial(start_pull_process, uow=uow)), output_port=lambda x: None - ) - return partial( - pull, - process_to_completion_service=process_to_completion_service, - start_pull_process_service=start_pull_process_service, - ) +def create_pull_service(uow: UnitOfWork) -> Callable[[commands.PullEntities], None]: + return partial(pull, uow=uow) -def create_delete_service(uow: UnitOfWork) -> Service[DeleteRequest, DeleteResponse]: - process_to_completion_service = create_process_to_completion_service(uow) - start_delete_process_service = partial( - make_responsive(partial(start_delete_process, uow=uow)), output_port=lambda x: None - ) - return partial( - delete, - process_to_completion_service=process_to_completion_service, - start_delete_process_service=start_delete_process_service, - ) + +def create_delete_service(uow: UnitOfWork) -> Callable[[commands.DeleteEntities], None]: + return partial(delete, uow=uow) class EntityConfig(TypedDict): @@ -165,23 +122,11 @@ class EntityConfig(TypedDict): def test_deleted_entity_ends_in_correct_state(state: EntityConfig, expected: type[State]) -> None: uow = create_uow(**state) delete_service = create_delete_service(uow) - delete_service(DeleteRequest(frozenset(create_identifiers("1"))), output_port=lambda x: None) + delete_service(commands.DeleteEntities(frozenset(create_identifiers("1")))) with uow: assert next(iter(uow.link)).state is expected -def test_correct_response_model_gets_passed_to_delete_output_port() -> None: - uow = UnitOfWork( - FakeLinkGateway( - create_assignments({Components.SOURCE: {"1"}, Components.OUTBOUND: {"1"}, Components.LOCAL: {"1"}}) - ) - ) - output_port = FakeOutputPort[DeleteResponse]() - delete_service = create_delete_service(uow) - delete_service(DeleteRequest(frozenset(create_identifiers("1"))), output_port=output_port) - assert output_port.response.requested == create_identifiers("1") - - @pytest.mark.parametrize( ("state", "expected"), [ @@ -202,90 +147,17 @@ def test_correct_response_model_gets_passed_to_delete_output_port() -> None: def test_pulled_entity_ends_in_correct_state(state: EntityConfig, expected: type[State]) -> None: uow = create_uow(**state) pull_service = create_pull_service(uow) - pull_service( - PullRequest(frozenset(create_identifiers("1"))), - output_port=lambda x: None, - ) + pull_service(commands.PullEntities(frozenset(create_identifiers("1")))) with uow: assert next(iter(uow.link)).state is expected -@pytest.mark.parametrize( - ("state", "produces_error"), - [ - (STATES[0], False), - (STATES[1], False), - (STATES[2], False), - (STATES[3], True), - (STATES[4], True), - (STATES[5], False), - (STATES[6], False), - (STATES[7], False), - (STATES[8], True), - (STATES[9], False), - (STATES[10], False), - (STATES[11], True), - ], -) -def test_correct_response_model_gets_passed_to_pull_output_port(state: EntityConfig, produces_error: bool) -> None: - if produces_error: - errors = { - InvalidOperation( - operation=Operations.START_PULL, identifier=create_identifier("1"), state=states.Deprecated - ) - } - else: - errors = set() - gateway = create_uow(**state) - output_port = FakeOutputPort[PullResponse]() - pull_service = create_pull_service(gateway) - pull_service( - PullRequest(frozenset(create_identifiers("1"))), - output_port=output_port, - ) - assert output_port.response == PullResponse(requested=frozenset(create_identifiers("1")), errors=frozenset(errors)) - - -def test_entity_undergoing_process_gets_processed() -> None: - uow = UnitOfWork( - FakeLinkGateway( - create_assignments({Components.SOURCE: {"1"}, Components.OUTBOUND: {"1"}}), - processes={Processes.PULL: create_identifiers("1")}, - ) - ) - process( - ProcessRequest(frozenset(create_identifiers("1"))), - uow=uow, - output_port=FakeOutputPort[OperationResponse](), - ) - with uow: - entity = next(entity for entity in uow.link if entity.identifier == create_identifier("1")) - assert entity.state is states.Received - - -def test_correct_response_model_gets_passed_to_process_output_port() -> None: - uow = UnitOfWork( - FakeLinkGateway( - create_assignments({Components.SOURCE: {"1"}, Components.OUTBOUND: {"1"}}), - processes={Processes.PULL: create_identifiers("1")}, - ) - ) - output_port = FakeOutputPort[OperationResponse]() - process( - ProcessRequest(frozenset(create_identifiers("1"))), - uow=uow, - output_port=output_port, - ) - assert output_port.response.requested == create_identifiers("1") - assert output_port.response.operation is Operations.PROCESS - - def test_correct_response_model_gets_passed_to_list_idle_entities_output_port() -> None: uow = UnitOfWork( FakeLinkGateway( create_assignments({Components.SOURCE: {"1", "2"}, Components.OUTBOUND: {"2"}, Components.LOCAL: {"2"}}) ) ) - output_port = FakeOutputPort[ListIdleEntitiesResponse]() - list_idle_entities(ListIdleEntitiesRequest(), uow=uow, output_port=output_port) + output_port = FakeOutputPort[events.IdleEntitiesListed]() + list_idle_entities(commands.ListIdleEntities(), uow=uow, output_port=output_port) assert set(output_port.response.identifiers) == create_identifiers("1") diff --git a/tests/integration/test_uow.py b/tests/integration/test_uow.py index 651e8e44..9aabc0fd 100644 --- a/tests/integration/test_uow.py +++ b/tests/integration/test_uow.py @@ -4,7 +4,8 @@ import pytest -from link.domain.state import Components, Operations, states +from link.domain import events +from link.domain.state import Commands, Components, Operations, Transition, states from link.service.uow import UnitOfWork from tests.assignments import create_assignments, create_identifier, create_identifiers @@ -19,10 +20,8 @@ def initialize(assignments: Mapping[Components, Iterable[str]]) -> tuple[FakeLin def test_updates_are_applied_to_gateway_on_commit() -> None: gateway, uow = initialize({Components.SOURCE: {"1", "2"}, Components.OUTBOUND: {"2"}, Components.LOCAL: {"2"}}) with uow: - uow.link.apply(Operations.START_PULL, requested=create_identifiers("1")) - uow.link.apply(Operations.START_DELETE, requested=create_identifiers("2")) - uow.link.apply(Operations.PROCESS, requested=create_identifiers("1", "2")) - uow.link.apply(Operations.PROCESS, requested=create_identifiers("1", "2")) + uow.link.pull(create_identifiers("1")) + uow.link.delete(create_identifiers("2")) uow.commit() actual = {(entity.identifier, entity.state) for entity in gateway.create_link()} expected = {(create_identifier("1"), states.Pulled), (create_identifier("2"), states.Idle)} @@ -32,10 +31,8 @@ def test_updates_are_applied_to_gateway_on_commit() -> None: def test_updates_are_discarded_on_context_exit() -> None: gateway, uow = initialize({Components.SOURCE: {"1", "2"}, Components.OUTBOUND: {"2"}, Components.LOCAL: {"2"}}) with uow: - uow.link.apply(Operations.START_PULL, requested=create_identifiers("1")) - uow.link.apply(Operations.START_DELETE, requested=create_identifiers("2")) - uow.link.apply(Operations.PROCESS, requested=create_identifiers("1", "2")) - uow.link.apply(Operations.PROCESS, requested=create_identifiers("1", "2")) + uow.link.pull(create_identifiers("1")) + uow.link.delete(create_identifiers("2")) actual = {(entity.identifier, entity.state) for entity in gateway.create_link()} expected = {(create_identifier("1"), states.Idle), (create_identifier("2"), states.Pulled)} assert actual == expected @@ -44,10 +41,8 @@ def test_updates_are_discarded_on_context_exit() -> None: def test_updates_are_discarded_on_rollback() -> None: gateway, uow = initialize({Components.SOURCE: {"1", "2"}, Components.OUTBOUND: {"2"}, Components.LOCAL: {"2"}}) with uow: - uow.link.apply(Operations.START_PULL, requested=create_identifiers("1")) - uow.link.apply(Operations.START_DELETE, requested=create_identifiers("2")) - uow.link.apply(Operations.PROCESS, requested=create_identifiers("1", "2")) - uow.link.apply(Operations.PROCESS, requested=create_identifiers("1", "2")) + uow.link.pull(create_identifiers("1")) + uow.link.delete(create_identifiers("2")) uow.rollback() actual = {(entity.identifier, entity.state) for entity in gateway.create_link()} expected = {(create_identifier("1"), states.Idle), (create_identifier("2"), states.Pulled)} @@ -100,22 +95,13 @@ def test_entity_expires_when_leaving_context() -> None: entity.apply(Operations.START_PULL) -def test_entity_expires_when_applying_operation() -> None: - _, uow = initialize({Components.SOURCE: {"1"}}) - with uow: - entity = next(entity for entity in uow.link if entity.identifier == create_identifier("1")) - entity.apply(Operations.START_PULL) - with pytest.raises(RuntimeError, match="expired entity"): - entity.apply(Operations.PROCESS) - - def test_link_expires_when_committing() -> None: _, uow = initialize({Components.SOURCE: {"1"}}) with uow: link = uow.link uow.commit() - with pytest.raises(RuntimeError, match="expired link"): - link.apply(Operations.START_PULL, requested=create_identifiers("1")) + with pytest.raises(RuntimeError, match="expired entity"): + link.pull(create_identifiers("1")) def test_link_expires_when_rolling_back() -> None: @@ -123,22 +109,86 @@ def test_link_expires_when_rolling_back() -> None: with uow: link = uow.link uow.rollback() - with pytest.raises(RuntimeError, match="expired link"): - link.apply(Operations.START_PULL, requested=create_identifiers("1")) + with pytest.raises(RuntimeError, match="expired entity"): + link.pull(create_identifiers("1")) def test_link_expires_when_exiting_context() -> None: _, uow = initialize({Components.SOURCE: {"1"}}) with uow: link = uow.link - with pytest.raises(RuntimeError, match="expired link"): - link.apply(Operations.START_PULL, requested=create_identifiers("1")) + with pytest.raises(RuntimeError, match="expired entity"): + link.pull(create_identifiers("1")) + + +def test_correct_events_are_collected() -> None: + _, uow = initialize({Components.SOURCE: {"1", "2"}, Components.OUTBOUND: {"2"}, Components.LOCAL: {"2"}}) + with uow: + uow.link.pull(create_identifiers("1")) + uow.link.delete(create_identifiers("2")) + uow.commit() + expected = [ + events.StateChanged( + Operations.START_PULL, + create_identifier("1"), + Transition(states.Idle, states.Activated), + Commands.START_PULL_PROCESS, + ), + events.StateChanged( + Operations.PROCESS, + create_identifier("1"), + Transition(states.Activated, states.Received), + Commands.ADD_TO_LOCAL, + ), + events.StateChanged( + Operations.PROCESS, + create_identifier("1"), + Transition(states.Received, states.Pulled), + Commands.FINISH_PULL_PROCESS, + ), + events.StateChanged( + Operations.START_DELETE, + create_identifier("2"), + Transition(states.Pulled, states.Received), + Commands.START_DELETE_PROCESS, + ), + events.StateChanged( + Operations.PROCESS, + create_identifier("2"), + Transition(states.Received, states.Activated), + Commands.REMOVE_FROM_LOCAL, + ), + events.StateChanged( + Operations.PROCESS, + create_identifier("2"), + Transition(states.Activated, states.Idle), + Commands.FINISH_DELETE_PROCESS, + ), + ] + actual = list(uow.collect_new_events()) + assert actual == expected -def test_link_expires_when_applying_operation() -> None: +def test_unit_must_be_committed_to_collect_events() -> None: _, uow = initialize({Components.SOURCE: {"1"}}) with uow: - link = uow.link - link.apply(Operations.START_PULL, requested=create_identifiers("1")) - with pytest.raises(RuntimeError, match="expired link"): - link.apply(Operations.PROCESS, requested=create_identifiers("1")) + uow.link.pull(create_identifiers("1")) + assert list(uow.collect_new_events()) == [] + + +def test_events_can_only_be_collected_once() -> None: + _, uow = initialize({Components.SOURCE: {"1"}}) + with uow: + uow.link.pull(create_identifiers("1")) + uow.commit() + list(uow.collect_new_events()) + assert list(uow.collect_new_events()) == [] + + +def test_events_can_only_be_collected_outside_of_context() -> None: + _, uow = initialize({Components.SOURCE: {"1"}}) + with uow: + uow.link.pull(create_identifiers("1")) + uow.commit() + with pytest.raises(RuntimeError, match="inside context"): + list(uow.collect_new_events()) diff --git a/tests/unit/entities/test_link.py b/tests/unit/entities/test_link.py index 9df845a3..b4bfd4aa 100644 --- a/tests/unit/entities/test_link.py +++ b/tests/unit/entities/test_link.py @@ -6,8 +6,8 @@ import pytest from link.domain.custom_types import Identifier -from link.domain.link import Link, create_link -from link.domain.state import Components, Operations, Processes, State, states +from link.domain.link import create_link +from link.domain.state import Components, Processes, State, states from tests.assignments import create_assignments, create_identifier, create_identifiers @@ -166,80 +166,16 @@ def test_can_get_identifiers_of_entities_in_component( link = create_link(assignments) assert set(link.identifiers) == create_identifiers("1", "2") - -def test_link_is_processed_correctly() -> None: - link = create_link( - create_assignments( - { - Components.SOURCE: {"1", "2", "3", "4", "5"}, - Components.OUTBOUND: {"1", "2", "3", "4", "5"}, - Components.LOCAL: {"2", "4", "5"}, - } - ), - processes={ - Processes.PULL: create_identifiers("1", "2"), - Processes.DELETE: create_identifiers("3", "4", "5"), - }, - ) - actual = { - (entity.identifier, entity.state) - for entity in link.apply(Operations.PROCESS, requested=create_identifiers("1", "2", "3", "4")) - } - expected = { - (create_identifier("1"), states.Received), - (create_identifier("2"), states.Pulled), - (create_identifier("3"), states.Idle), - (create_identifier("4"), states.Activated), - (create_identifier("5"), states.Received), - } - assert actual == expected - - -class TestStartPull: - @staticmethod - @pytest.fixture() - def link() -> Link: - return create_link(create_assignments({Components.SOURCE: {"1"}})) - - @staticmethod - def test_idle_entity_becomes_activated(link: Link) -> None: - link = link.apply(Operations.START_PULL, requested=create_identifiers("1")) - entity = next(iter(link)) - assert entity.identifier == create_identifier("1") - assert entity.state is states.Activated - - @staticmethod - def test_not_specifying_requested_identifiers_raises_error(link: Link) -> None: - with pytest.raises(AssertionError, match="No identifiers requested."): - link.apply(Operations.START_PULL, requested={}) - @staticmethod - def test_specifying_identifiers_not_present_in_link_raises_error(link: Link) -> None: + def test_specifying_identifiers_not_present_in_link_raises_error_when_pulling() -> None: + link = create_link(create_assignments({Components.SOURCE: {"1"}})) with pytest.raises(AssertionError, match="Requested identifiers not present in link."): - link.apply(Operations.START_PULL, requested=create_identifiers("2")) - - -@pytest.fixture() -def link() -> Link: - return create_link( - create_assignments({Components.SOURCE: {"1"}, Components.OUTBOUND: {"1"}, Components.LOCAL: {"1"}}) - ) - - -class TestStartDelete: - @staticmethod - def test_pulled_entity_becomes_received(link: Link) -> None: - link = link.apply(Operations.START_DELETE, requested=create_identifiers("1")) - entity = next(iter(link)) - assert entity.identifier == create_identifier("1") - assert entity.state is states.Received - - @staticmethod - def test_not_specifying_requested_identifiers_raises_error(link: Link) -> None: - with pytest.raises(AssertionError, match="No identifiers requested."): - link.apply(Operations.START_DELETE, requested={}) + link.pull(create_identifiers("2")) @staticmethod - def test_specifying_identifiers_not_present_in_link_raises_error(link: Link) -> None: + def test_specifying_identifiers_not_present_in_link_raises_error_when_deleting() -> None: + link = create_link( + create_assignments({Components.SOURCE: {"1"}, Components.OUTBOUND: {"1"}, Components.LOCAL: {"1"}}) + ) with pytest.raises(AssertionError, match="Requested identifiers not present in link."): - link.apply(Operations.START_DELETE, requested=create_identifiers("2")) + link.delete(create_identifiers("2")) diff --git a/tests/unit/entities/test_state.py b/tests/unit/entities/test_state.py index c74e7c79..a097e303 100644 --- a/tests/unit/entities/test_state.py +++ b/tests/unit/entities/test_state.py @@ -1,21 +1,19 @@ from __future__ import annotations -from dataclasses import replace from typing import Iterable import pytest +from link.domain import events from link.domain.custom_types import Identifier from link.domain.link import create_link from link.domain.state import ( Commands, Components, - InvalidOperation, Operations, Processes, State, Transition, - Update, states, ) from tests.assignments import create_assignments, create_identifier, create_identifiers @@ -52,26 +50,25 @@ def test_invalid_transitions_returns_unchanged_entity( ) entity = next(entity for entity in link if entity.identifier == identifier) for operation in operations: - result = InvalidOperation(operation, identifier, state) - assert entity.apply(operation) == replace(entity, operation_results=(result,)) + result = events.InvalidOperationRequested(operation, identifier, state) + entity.apply(operation) + assert entity.events.pop() == result def test_start_pulling_idle_entity_returns_correct_entity() -> None: link = create_link(create_assignments({Components.SOURCE: {"1"}})) entity = next(iter(link)) - assert entity.apply(Operations.START_PULL) == replace( - entity, - state=states.Activated, - current_process=Processes.PULL, - operation_results=( - Update( - Operations.START_PULL, - entity.identifier, - Transition(states.Idle, states.Activated), - Commands.START_PULL_PROCESS, - ), - ), - ) + entity.apply(Operations.START_PULL) + assert entity.state is states.Activated + assert entity.current_process is Processes.PULL + assert list(entity.events) == [ + events.StateChanged( + Operations.START_PULL, + entity.identifier, + Transition(states.Idle, states.Activated), + Commands.START_PULL_PROCESS, + ) + ] @pytest.mark.parametrize( @@ -96,12 +93,12 @@ def test_processing_activated_entity_returns_correct_entity( tainted_identifiers=tainted_identifiers, ) entity = next(iter(link)) - updated_results = entity.operation_results + ( - Update(Operations.PROCESS, entity.identifier, Transition(entity.state, new_state), command), - ) - assert entity.apply(Operations.PROCESS) == replace( - entity, state=new_state, current_process=new_process, operation_results=updated_results + entity.events.append( + events.StateChanged(Operations.PROCESS, entity.identifier, Transition(entity.state, new_state), command), ) + entity.apply(Operations.PROCESS) + assert entity.state == new_state + assert entity.current_process == new_process @pytest.mark.parametrize( @@ -126,10 +123,13 @@ def test_processing_received_entity_returns_correct_entity( tainted_identifiers=tainted_identifiers, ) entity = next(iter(link)) - operation_results = (Update(Operations.PROCESS, entity.identifier, Transition(entity.state, new_state), command),) - assert entity.apply(Operations.PROCESS) == replace( - entity, state=new_state, current_process=new_process, operation_results=operation_results - ) + expected_events = [ + events.StateChanged(Operations.PROCESS, entity.identifier, Transition(entity.state, new_state), command), + ] + entity.apply(Operations.PROCESS) + assert entity.state == new_state + assert entity.current_process == new_process + assert list(entity.events) == expected_events def test_starting_delete_on_pulled_entity_returns_correct_entity() -> None: @@ -138,17 +138,18 @@ def test_starting_delete_on_pulled_entity_returns_correct_entity() -> None: ) entity = next(iter(link)) transition = Transition(states.Pulled, states.Received) - operation_results = ( - Update( + expected_events = [ + events.StateChanged( Operations.START_DELETE, entity.identifier, transition, Commands.START_DELETE_PROCESS, ), - ) - assert entity.apply(Operations.START_DELETE) == replace( - entity, state=transition.new, current_process=Processes.DELETE, operation_results=operation_results - ) + ] + entity.apply(Operations.START_DELETE) + assert entity.state == transition.new + assert entity.current_process == Processes.DELETE + assert list(entity.events) == expected_events def test_starting_delete_on_tainted_entity_returns_correct_commands() -> None: @@ -158,7 +159,10 @@ def test_starting_delete_on_tainted_entity_returns_correct_commands() -> None: ) entity = next(iter(link)) transition = Transition(states.Tainted, states.Received) - operation_results = (Update(Operations.START_DELETE, entity.identifier, transition, Commands.START_DELETE_PROCESS),) - assert entity.apply(Operations.START_DELETE) == replace( - entity, state=transition.new, current_process=Processes.DELETE, operation_results=operation_results - ) + expected_events = [ + events.StateChanged(Operations.START_DELETE, entity.identifier, transition, Commands.START_DELETE_PROCESS), + ] + entity.apply(Operations.START_DELETE) + assert entity.state == transition.new + assert entity.current_process == Processes.DELETE + assert list(entity.events) == expected_events