Skip to content

Commit

Permalink
Update proto to include remove sub, move to rpc based operations (#5168)
Browse files Browse the repository at this point in the history
* Update proto to include remove sub, move to rpc based operations

* dont add a breaking change

* mypy fix
  • Loading branch information
jackgerrits authored Jan 23, 2025
1 parent c3e84dc commit 44b9bff
Show file tree
Hide file tree
Showing 9 changed files with 368 additions and 32 deletions.
31 changes: 25 additions & 6 deletions protos/agent_worker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ message Event {
}

message RegisterAgentTypeRequest {
string request_id = 1;
string request_id = 1; // TODO: remove once message based requests are removed
string type = 2;
}

message RegisterAgentTypeResponse {
string request_id = 1;
string request_id = 1; // TODO: remove once message based requests are removed
bool success = 2;
optional string error = 3;
}
Expand All @@ -69,27 +69,46 @@ message TypePrefixSubscription {
}

message Subscription {
string id = 1;
oneof subscription {
TypeSubscription typeSubscription = 1;
TypePrefixSubscription typePrefixSubscription = 2;
TypeSubscription typeSubscription = 2;
TypePrefixSubscription typePrefixSubscription = 3;
}
}

message AddSubscriptionRequest {
string request_id = 1;
string request_id = 1; // TODO: remove once message based requests are removed
Subscription subscription = 2;
}

message AddSubscriptionResponse {
string request_id = 1;
string request_id = 1; // TODO: remove once message based requests are removed
bool success = 2;
optional string error = 3;
}

message RemoveSubscriptionRequest {
string id = 1;
}

message RemoveSubscriptionResponse {
bool success = 1;
optional string error = 2;
}

message GetSubscriptionsRequest {}
message GetSubscriptionsResponse {
repeated Subscription subscriptions = 1;
}

service AgentRpc {
rpc OpenChannel (stream Message) returns (stream Message);
rpc GetState(AgentId) returns (GetStateResponse);
rpc SaveState(AgentState) returns (SaveStateResponse);
rpc RegisterAgent(RegisterAgentTypeRequest) returns (RegisterAgentTypeResponse);
rpc AddSubscription(AddSubscriptionRequest) returns (AddSubscriptionResponse);
rpc RemoveSubscription(RemoveSubscriptionRequest) returns (RemoveSubscriptionResponse);
rpc GetSubscriptions(GetSubscriptionsRequest) returns (GetSubscriptionsResponse);
}

message AgentState {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ class TypePrefixSubscription(Subscription):
agent_type (str): Agent type to handle this subscription
"""

def __init__(self, topic_type_prefix: str, agent_type: str | AgentType):
def __init__(self, topic_type_prefix: str, agent_type: str | AgentType, id: str | None = None):
self._topic_type_prefix = topic_type_prefix
if isinstance(agent_type, AgentType):
self._agent_type = agent_type.type
else:
self._agent_type = agent_type
self._id = str(uuid.uuid4())
self._id = id or str(uuid.uuid4())

@property
def id(self) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ class TypeSubscription(Subscription):
agent_type (str): Agent type to handle this subscription
"""

def __init__(self, topic_type: str, agent_type: str | AgentType):
def __init__(self, topic_type: str, agent_type: str | AgentType, id: str | None = None):
self._topic_type = topic_type
if isinstance(agent_type, AgentType):
self._agent_type = agent_type.type
else:
self._agent_type = agent_type
self._id = str(uuid.uuid4())
self._id = id or str(uuid.uuid4())

@property
def id(self) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -807,25 +807,27 @@ async def add_subscription(self, subscription: Subscription) -> None:
request_id = await self._get_new_request_id()

match subscription:
case TypeSubscription(topic_type=topic_type, agent_type=agent_type):
case TypeSubscription(topic_type=topic_type, agent_type=agent_type, id=id):
message = agent_worker_pb2.Message(
addSubscriptionRequest=agent_worker_pb2.AddSubscriptionRequest(
request_id=request_id,
subscription=agent_worker_pb2.Subscription(
id=id,
typeSubscription=agent_worker_pb2.TypeSubscription(
topic_type=topic_type, agent_type=agent_type
)
),
),
)
)
case TypePrefixSubscription(topic_type_prefix=topic_type_prefix, agent_type=agent_type):
case TypePrefixSubscription(topic_type_prefix=topic_type_prefix, agent_type=agent_type, id=id):
message = agent_worker_pb2.Message(
addSubscriptionRequest=agent_worker_pb2.AddSubscriptionRequest(
request_id=request_id,
subscription=agent_worker_pb2.Subscription(
id=id,
typePrefixSubscription=agent_worker_pb2.TypePrefixSubscription(
topic_type_prefix=topic_type_prefix, agent_type=agent_type
)
),
),
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,9 @@ async def _process_add_subscription_request(
add_subscription_req.subscription.typeSubscription
)
subscription = TypeSubscription(
topic_type=type_subscription_msg.topic_type, agent_type=type_subscription_msg.agent_type
topic_type=type_subscription_msg.topic_type,
agent_type=type_subscription_msg.agent_type,
id=add_subscription_req.subscription.id,
)

case "typePrefixSubscription":
Expand All @@ -237,6 +239,7 @@ async def _process_add_subscription_request(
subscription = TypePrefixSubscription(
topic_type_prefix=type_prefix_subscription_msg.topic_type_prefix,
agent_type=type_prefix_subscription_msg.agent_type,
id=add_subscription_req.subscription.id,
)
case None:
logger.warning("Received empty subscription message")
Expand All @@ -260,6 +263,42 @@ async def _process_add_subscription_request(
)
)

def RegisterAgent( # type: ignore
self,
request: agent_worker_pb2.RegisterAgentTypeRequest,
context: grpc.aio.ServicerContext[
agent_worker_pb2.RegisterAgentTypeRequest, agent_worker_pb2.RegisterAgentTypeResponse
],
) -> agent_worker_pb2.RegisterAgentTypeResponse:
raise NotImplementedError("Method not implemented.")

def AddSubscription( # type: ignore
self,
request: agent_worker_pb2.AddSubscriptionRequest,
context: grpc.aio.ServicerContext[
agent_worker_pb2.AddSubscriptionRequest, agent_worker_pb2.AddSubscriptionResponse
],
) -> agent_worker_pb2.AddSubscriptionResponse:
raise NotImplementedError("Method not implemented.")

def RemoveSubscription( # type: ignore
self,
request: agent_worker_pb2.RemoveSubscriptionRequest,
context: grpc.aio.ServicerContext[
agent_worker_pb2.RemoveSubscriptionRequest, agent_worker_pb2.RemoveSubscriptionResponse
],
) -> agent_worker_pb2.RemoveSubscriptionResponse:
raise NotImplementedError("Method not implemented.")

def GetSubscriptions( # type: ignore
self,
request: agent_worker_pb2.GetSubscriptionsRequest,
context: grpc.aio.ServicerContext[
agent_worker_pb2.GetSubscriptionsRequest, agent_worker_pb2.GetSubscriptionsResponse
],
) -> agent_worker_pb2.GetSubscriptionsResponse:
raise NotImplementedError("Method not implemented.")

async def GetState( # type: ignore
self,
request: agent_worker_pb2.AgentId,
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 44b9bff

Please sign in to comment.