Skip to content

Commit

Permalink
Merge pull request #53 from christoph-blessing/integrate_messagebus
Browse files Browse the repository at this point in the history
Integrate message bus
  • Loading branch information
christoph-blessing authored Nov 6, 2023
2 parents ba9e0bb + eed964d commit 776d3ee
Show file tree
Hide file tree
Showing 22 changed files with 553 additions and 959 deletions.
23 changes: 9 additions & 14 deletions link/adapters/controller.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
"""Contains code controlling the execution of use-cases."""
from __future__ import annotations

from typing import Callable, Iterable, Mapping
from typing import Iterable

from link.service.services import (
DeleteRequest,
ListIdleEntitiesRequest,
PullRequest,
Request,
Services,
)
from link.domain import commands
from link.service.messagebus import MessageBus

from .custom_types import PrimaryKey
from .identification import IdentificationTranslator
Expand All @@ -20,21 +15,21 @@ class DJController:

def __init__(
self,
handlers: Mapping[Services, Callable[[Request], None]],
message_bus: MessageBus,
translator: IdentificationTranslator,
) -> None:
"""Initialize the translator."""
self.__handlers = handlers
self.__translator = translator
self._message_bus = message_bus
self._translator = translator

def pull(self, primary_keys: Iterable[PrimaryKey]) -> None:
"""Execute the pull use-case."""
self.__handlers[Services.PULL](PullRequest(frozenset(self.__translator.to_identifiers(primary_keys))))
self._message_bus.handle(commands.PullEntities(frozenset(self._translator.to_identifiers(primary_keys))))

def delete(self, primary_keys: Iterable[PrimaryKey]) -> None:
"""Execute the delete use-case."""
self.__handlers[Services.DELETE](DeleteRequest(frozenset(self.__translator.to_identifiers(primary_keys))))
self._message_bus.handle(commands.DeleteEntities(frozenset(self._translator.to_identifiers(primary_keys))))

def list_idle_entities(self) -> None:
"""Execute the use-case that lists idle entities."""
self.__handlers[Services.LIST_IDLE_ENTITIES](ListIdleEntitiesRequest())
self._message_bus.handle(commands.ListIdleEntities())
7 changes: 4 additions & 3 deletions link/adapters/gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
from itertools import groupby
from typing import Iterable

from link.domain import events
from link.domain.custom_types import Identifier
from link.domain.link import Link, create_link
from link.domain.state import Commands, Components, Processes, Update
from link.domain.state import Commands, Components, Processes
from link.service.gateway import LinkGateway

from .custom_types import PrimaryKey
Expand Down Expand Up @@ -51,10 +52,10 @@ def translate_tainted_primary_keys(primary_keys: Iterable[PrimaryKey]) -> set[Id
tainted_identifiers=translate_tainted_primary_keys(self.facade.get_tainted_primary_keys()),
)

def apply(self, updates: Iterable[Update]) -> None:
def apply(self, updates: Iterable[events.StateChanged]) -> None:
"""Apply updates to the persistent data representing the link."""

def keyfunc(update: Update) -> int:
def keyfunc(update: events.StateChanged) -> int:
assert update.command is not None
return update.command.value

Expand Down
113 changes: 23 additions & 90 deletions link/adapters/present.py
Original file line number Diff line number Diff line change
@@ -1,107 +1,40 @@
"""Logic associated with presenting information about finished use-cases."""
from __future__ import annotations

from dataclasses import dataclass
from typing import Callable, Iterable

from link.service.services import (
ListIdleEntitiesResponse,
OperationResponse,
)
from link.domain import events

from .custom_types import PrimaryKey
from .identification import IdentificationTranslator


@dataclass(frozen=True)
class OperationRecord:
"""Record of a finished operation."""

requests: list[Request]
successes: list[Sucess]
failures: list[Failure]


@dataclass(frozen=True)
class Request:
"""Record of a request to perform a certain operation on a particular entity."""

primary_key: PrimaryKey
operation: str


@dataclass(frozen=True)
class Sucess:
"""Record of a successful operation on a particular entity."""

primary_key: PrimaryKey
operation: str
transition: Transition


@dataclass(frozen=True)
class Transition:
"""Record of a transition between two states."""

old: str
new: str


@dataclass(frozen=True)
class Failure:
"""Record of a failed operation on a particular entity."""

primary_key: PrimaryKey
operation: str
state: str


def create_operation_response_presenter(
translator: IdentificationTranslator, show: Callable[[OperationRecord], None]
) -> Callable[[OperationResponse], None]:
"""Create a callable that when called presents information about a finished operation."""

def get_class_name(obj: type) -> str:
return obj.__name__

def present_operation_response(response: OperationResponse) -> None:
show(
OperationRecord(
[
Request(translator.to_primary_key(identifier), response.operation.name)
for identifier in response.requested
],
[
Sucess(
translator.to_primary_key(update.identifier),
operation=response.operation.name,
transition=Transition(
get_class_name(update.transition.current).upper(),
get_class_name(update.transition.new).upper(),
),
)
for update in response.updates
],
[
Failure(
translator.to_primary_key(error.identifier),
error.operation.name,
get_class_name(error.state).upper(),
)
for error in response.errors
],
)
)

return present_operation_response


def create_idle_entities_updater(
translator: IdentificationTranslator, update: Callable[[Iterable[PrimaryKey]], None]
) -> Callable[[ListIdleEntitiesResponse], None]:
) -> Callable[[events.IdleEntitiesListed], None]:
"""Create a callable that when called updates the list of idle entities."""

def update_idle_entities(response: ListIdleEntitiesResponse) -> None:
def update_idle_entities(response: events.IdleEntitiesListed) -> None:
update(translator.to_primary_key(identifier) for identifier in response.identifiers)

return update_idle_entities


def create_state_change_logger(
translator: IdentificationTranslator, log: Callable[[str], None]
) -> Callable[[events.StateChanged], None]:
"""Create a logger that logs state changes of entities."""

def log_state_change(state_change: events.StateChanged) -> None:
context = {
"identifier": translator.to_primary_key(state_change.identifier),
"operation": state_change.operation.name,
"transition": {
"old": state_change.transition.current.__name__,
"new": state_change.transition.new.__name__,
},
"command": state_change.command.name,
}
log(f"Entity state changed {context}")

return log_state_change
30 changes: 30 additions & 0 deletions link/domain/commands.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""Contains all domain commands."""
from __future__ import annotations

from dataclasses import dataclass

from .custom_types import Identifier


@dataclass(frozen=True)
class Command:
"""Base class for all commands."""


@dataclass(frozen=True)
class PullEntities(Command):
"""Pull the requested entities."""

requested: frozenset[Identifier]


@dataclass(frozen=True)
class DeleteEntities(Command):
"""Delete the requested entities."""

requested: frozenset[Identifier]


@dataclass(frozen=True)
class ListIdleEntities(Command):
"""Start the delete process for the requested entities."""
45 changes: 45 additions & 0 deletions link/domain/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
"""Contains all domain events."""
from __future__ import annotations

from dataclasses import dataclass
from typing import TYPE_CHECKING

from .custom_types import Identifier

if TYPE_CHECKING:
from .state import Commands, Operations, State, Transition


@dataclass(frozen=True)
class Event:
"""Base class for all events."""


@dataclass(frozen=True)
class OperationApplied(Event):
"""An operation was applied to an entity."""

operation: Operations
identifier: Identifier


@dataclass(frozen=True)
class InvalidOperationRequested(OperationApplied):
"""An operation that is invalid given the entities current state was requested."""

state: type[State]


@dataclass(frozen=True)
class StateChanged(OperationApplied):
"""The state of an entity changed during the application of an operation."""

transition: Transition
command: Commands


@dataclass(frozen=True)
class IdleEntitiesListed(Event):
"""Idle entities in a link have been listed."""

identifiers: frozenset[Identifier]
Loading

0 comments on commit 776d3ee

Please sign in to comment.