Skip to content

Commit

Permalink
WriteAsync must be awaited (#4491)
Browse files Browse the repository at this point in the history
  • Loading branch information
rysweet committed Dec 10, 2024
1 parent 6899ae1 commit d245bf5
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,100 @@ private void DispatchResponse(GrpcWorkerConnection connection, RpcResponse respo
completion.SetResult(response);
}

<<<<<<< HEAD:dotnet/src/Microsoft.AutoGen/Microsoft.AutoGen.Runtime.Grpc/GrpcGateway.cs
=======
// agentype:rpc_request={requesting_agent_id}
// {genttype}:rpc_response={request_id}
private async ValueTask AddSubscriptionAsync(GrpcWorkerConnection connection, AddSubscriptionRequest request)
{
var topic = "";
var agentType = "";
if (request.Subscription.TypePrefixSubscription is not null)
{
topic = request.Subscription.TypePrefixSubscription.TopicTypePrefix;
agentType = request.Subscription.TypePrefixSubscription.AgentType;
}
else if (request.Subscription.TypeSubscription is not null)
{
topic = request.Subscription.TypeSubscription.TopicType;
agentType = request.Subscription.TypeSubscription.AgentType;
}
_subscriptionsByAgentType[agentType] = request.Subscription;
_subscriptionsByTopic.GetOrAdd(topic, _ => []).Add(agentType);
await _subscriptions.SubscribeAsync(topic, agentType);
//var response = new AddSubscriptionResponse { RequestId = request.RequestId, Error = "", Success = true };
Message response = new()
{
AddSubscriptionResponse = new()
{
RequestId = request.RequestId,
Error = "",
Success = true
}
};
await connection.ResponseStream.WriteAsync(response).ConfigureAwait(false);
}
private async ValueTask RegisterAgentTypeAsync(GrpcWorkerConnection connection, RegisterAgentTypeRequest msg)
{
connection.AddSupportedType(msg.Type);
_supportedAgentTypes.GetOrAdd(msg.Type, _ => []).Add(connection);

await _gatewayRegistry.RegisterAgentType(msg.Type, _reference).ConfigureAwait(true);
Message response = new()
{
RegisterAgentTypeResponse = new()
{
RequestId = msg.RequestId,
Error = "",
Success = true
}
};
await connection.ResponseStream.WriteAsync(response).ConfigureAwait(false);
}
private async ValueTask DispatchEventAsync(CloudEvent evt)
{
// get the event type and then send to all agents that are subscribed to that event type
var eventType = evt.Type;
// ensure that we get agentTypes as an async enumerable list - try to get the value of agentTypes by topic and then cast it to an async enumerable list
if (_subscriptionsByTopic.TryGetValue(eventType, out var agentTypes))
{
await DispatchEventToAgentsAsync(agentTypes, evt);
}
// instead of an exact match, we can also check for a prefix match where key starts with the eventType
else if (_subscriptionsByTopic.Keys.Any(key => key.StartsWith(eventType)))
{
_subscriptionsByTopic.Where(
kvp => kvp.Key.StartsWith(eventType))
.SelectMany(kvp => kvp.Value)
.Distinct()
.ToList()
.ForEach(async agentType =>
{
await DispatchEventToAgentsAsync(new List<string> { agentType }, evt).ConfigureAwait(false);
});
}
else
{
// log that no agent types were found
_logger.LogWarning("No agent types found for event type {EventType}.", eventType);
}
}
private async ValueTask DispatchEventToAgentsAsync(IEnumerable<string> agentTypes, CloudEvent evt)
{
var tasks = new List<Task>(agentTypes.Count());
foreach (var agentType in agentTypes)
{
if (_supportedAgentTypes.TryGetValue(agentType, out var connections))
{
foreach (var connection in connections)
{
tasks.Add(this.SendMessageAsync(connection, evt));
}
}
}
await Task.WhenAll(tasks).ConfigureAwait(false);
}
>>>>>>> 79c5aaa1 (WriteAsync must be awaited (#4491)):dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcGateway.cs
private async ValueTask DispatchRequestAsync(GrpcWorkerConnection connection, RpcRequest request)
{
var requestId = request.RequestId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
namespace Microsoft.AutoGen.Runtime.Grpc;
public interface ISubscriptionsGrain : IGrainWithIntegerKey
{
ValueTask Subscribe(string agentType, string topic);
ValueTask Unsubscribe(string agentType, string topic);
ValueTask SubscribeAsync(string agentType, string topic);
ValueTask UnsubscribeAsync(string agentType, string topic);
ValueTask<Dictionary<string, List<string>>> GetSubscriptions(string agentType);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public ValueTask<Dictionary<string, List<string>>> GetSubscriptions(string? agen
}
return new ValueTask<Dictionary<string, List<string>>>(_subscriptions);
}
public ValueTask Subscribe(string agentType, string topic)
public async ValueTask SubscribeAsync(string agentType, string topic)
{
if (!_subscriptions.TryGetValue(topic, out var subscriptions))
{
Expand All @@ -27,11 +27,9 @@ public ValueTask Subscribe(string agentType, string topic)
}
_subscriptions[topic] = subscriptions;
state.State.Subscriptions = _subscriptions;
state.WriteStateAsync();

return ValueTask.CompletedTask;
await state.WriteStateAsync().ConfigureAwait(false);
}
public ValueTask Unsubscribe(string agentType, string topic)
public async ValueTask UnsubscribeAsync(string agentType, string topic)
{
if (!_subscriptions.TryGetValue(topic, out var subscriptions))
{
Expand All @@ -43,8 +41,7 @@ public ValueTask Unsubscribe(string agentType, string topic)
}
_subscriptions[topic] = subscriptions;
state.State.Subscriptions = _subscriptions;
state.WriteStateAsync();
return ValueTask.CompletedTask;
await state.WriteStateAsync();
}
}
public sealed class SubscriptionsState
Expand Down

0 comments on commit d245bf5

Please sign in to comment.