diff --git a/Manifest/manifest_authors.json b/Manifest/manifest_authors.json
index 6d44ddf68..ba6aa5ee2 100644
--- a/Manifest/manifest_authors.json
+++ b/Manifest/manifest_authors.json
@@ -1,7 +1,7 @@
{
"$schema": "https://developer.microsoft.com/en-us/json-schemas/teams/v1.5/MicrosoftTeams.schema.json",
"manifestVersion": "1.5",
- "version": "4.1.3",
+ "version": "4.1.4",
"id": "1c07cd26-a088-4db8-8928-ace382fa219f",
"packageName": "com.microsoft.teams.companycommunicator.authors",
"developer": {
diff --git a/Manifest/manifest_users.json b/Manifest/manifest_users.json
index c5f5cb6ca..c43d80dd3 100644
--- a/Manifest/manifest_users.json
+++ b/Manifest/manifest_users.json
@@ -1,7 +1,7 @@
{
"$schema": "https://developer.microsoft.com/en-us/json-schemas/teams/v1.5/MicrosoftTeams.schema.json",
"manifestVersion": "1.5",
- "version": "4.1.3",
+ "version": "4.1.4",
"id": "148a66bb-e83d-425a-927d-09f4299a9274",
"packageName": "com.microsoft.teams.companycommunicator",
"developer": {
diff --git a/Source/CompanyCommunicator.Common/Constants.cs b/Source/CompanyCommunicator.Common/Constants.cs
index e3fef60bd..79cce3a14 100644
--- a/Source/CompanyCommunicator.Common/Constants.cs
+++ b/Source/CompanyCommunicator.Common/Constants.cs
@@ -69,5 +69,16 @@ public static class Constants
/// get the OData next page link.
///
public const string ODataNextPageLink = "@odata.nextLink";
+
+ ///
+ /// get the maximum number of recipients in a batch.
+ ///
+ public const int MaximumNumberOfRecipientsInBatch = 1000;
+
+ ///
+ /// get the Microsoft Graph api batch request size.
+ /// https://docs.microsoft.com/en-us/graph/known-issues#limit-on-batch-size.
+ ///
+ public const int MaximumGraphAPIBatchSize = 15;
}
}
diff --git a/Source/CompanyCommunicator.Common/Extensions/EnumerableExtensions.cs b/Source/CompanyCommunicator.Common/Extensions/EnumerableExtensions.cs
index 42de15337..28eaa114d 100644
--- a/Source/CompanyCommunicator.Common/Extensions/EnumerableExtensions.cs
+++ b/Source/CompanyCommunicator.Common/Extensions/EnumerableExtensions.cs
@@ -51,5 +51,38 @@ select Task.Run(async () =>
}
}));
}
+
+ ///
+ /// Extension method to separate a list of objects into batches (a list of lists).
+ ///
+ /// An object type.
+ /// the source list.
+ /// the batch size.
+ /// group list of user id list.
+ public static IEnumerable> AsBatches(this IEnumerable sourceCollection, int batchSize)
+ {
+ _ = sourceCollection ?? throw new ArgumentNullException(nameof(sourceCollection));
+ if (batchSize <= 0)
+ {
+ throw new ArgumentOutOfRangeException(nameof(batchSize));
+ }
+
+ var buffer = new List(batchSize);
+ var sourceList = sourceCollection.ToList();
+ for (int i = 0; i < sourceList.Count; i++)
+ {
+ buffer.Add(sourceList[i]);
+ if (((i + 1) % batchSize) == 0 && buffer.Count > 0)
+ {
+ yield return buffer;
+ buffer = new List(batchSize);
+ }
+ }
+
+ if (buffer.Count > 0)
+ {
+ yield return buffer;
+ }
+ }
}
}
diff --git a/Source/CompanyCommunicator.Common/Extensions/SendQueueMessageContentExtension.cs b/Source/CompanyCommunicator.Common/Extensions/SendQueueMessageContentExtensions.cs
similarity index 91%
rename from Source/CompanyCommunicator.Common/Extensions/SendQueueMessageContentExtension.cs
rename to Source/CompanyCommunicator.Common/Extensions/SendQueueMessageContentExtensions.cs
index 3b49b4521..b8634dd22 100644
--- a/Source/CompanyCommunicator.Common/Extensions/SendQueueMessageContentExtension.cs
+++ b/Source/CompanyCommunicator.Common/Extensions/SendQueueMessageContentExtensions.cs
@@ -1,4 +1,4 @@
-//
+//
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
//
@@ -12,7 +12,7 @@ namespace Microsoft.Teams.Apps.CompanyCommunicator.Common.Extensions
///
/// Extension class for .
///
- public static class SendQueueMessageContentExtension
+ public static class SendQueueMessageContentExtensions
{
///
/// Get service url.
@@ -58,7 +58,7 @@ public static bool IsRecipientGuestUser(this SendQueueMessageContent message)
{
if (string.IsNullOrEmpty(recipient.UserData.UserType))
{
- throw new ArgumentNullException(nameof(recipient.UserData.UserType));
+ throw new InvalidOperationException(nameof(recipient.UserData.UserType));
}
else if (recipient.UserData.UserType.Equals(UserType.Guest, StringComparison.OrdinalIgnoreCase))
{
diff --git a/Source/CompanyCommunicator.Common/Extensions/UserExtensions.cs b/Source/CompanyCommunicator.Common/Extensions/UserExtensions.cs
index 967489378..4ad990e19 100644
--- a/Source/CompanyCommunicator.Common/Extensions/UserExtensions.cs
+++ b/Source/CompanyCommunicator.Common/Extensions/UserExtensions.cs
@@ -6,9 +6,7 @@
namespace Microsoft.Teams.Apps.CompanyCommunicator.Common.Extensions
{
using System;
- using System.Collections.Generic;
using Microsoft.Graph;
- using Microsoft.Teams.Apps.CompanyCommunicator.Common.Repositories.UserData;
using Microsoft.Teams.Apps.CompanyCommunicator.Common.Services.MicrosoftGraph;
///
@@ -16,35 +14,6 @@ namespace Microsoft.Teams.Apps.CompanyCommunicator.Common.Extensions
///
public static class UserExtensions
{
- ///
- /// this is as per microsoft graph api filter size.
- ///
- private static readonly int MaxGroupSize = 15;
-
- ///
- /// Break the list in groups.
- ///
- /// the user ids.
- /// group list of user id list.
- public static IEnumerable> AsGroups(this IList userIds)
- {
- var buffer = new List(MaxGroupSize);
- for (int i = 0; i < userIds.Count; i++)
- {
- buffer.Add(userIds[i]);
- if (((i + 1) % MaxGroupSize) == 0 && buffer.Count > 0)
- {
- yield return buffer;
- buffer = new List(MaxGroupSize);
- }
- }
-
- if (buffer.Count > 0)
- {
- yield return buffer;
- }
- }
-
///
/// Get the userType for a user.
///
diff --git a/Source/CompanyCommunicator.Common/Services/Recipients/IRecipientsService.cs b/Source/CompanyCommunicator.Common/Services/Recipients/IRecipientsService.cs
new file mode 100644
index 000000000..17c3e9d30
--- /dev/null
+++ b/Source/CompanyCommunicator.Common/Services/Recipients/IRecipientsService.cs
@@ -0,0 +1,26 @@
+//
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+//
+
+namespace Microsoft.Teams.Apps.CompanyCommunicator.Common.Services.Recipients
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Text;
+ using System.Threading.Tasks;
+ using Microsoft.Teams.Apps.CompanyCommunicator.Common.Repositories.SentNotificationData;
+
+ ///
+ /// Recipient service.
+ ///
+ public interface IRecipientsService
+ {
+ ///
+ /// Batch the list of recipients.
+ ///
+ /// list of recipients.
+ /// recipients information.
+ Task BatchRecipients(IEnumerable recipients);
+ }
+}
diff --git a/Source/CompanyCommunicator.Common/Services/Recipients/RecipientsInfo.cs b/Source/CompanyCommunicator.Common/Services/Recipients/RecipientsInfo.cs
new file mode 100644
index 000000000..1d4f80de4
--- /dev/null
+++ b/Source/CompanyCommunicator.Common/Services/Recipients/RecipientsInfo.cs
@@ -0,0 +1,54 @@
+//
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+//
+
+namespace Microsoft.Teams.Apps.CompanyCommunicator.Common.Services.Recipients
+{
+ using System;
+ using System.Collections.Generic;
+
+ ///
+ /// Recipient information.
+ ///
+ public class RecipientsInfo
+ {
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// notification id.
+ public RecipientsInfo(string notificationId)
+ {
+ if (string.IsNullOrEmpty(notificationId))
+ {
+ throw new ArgumentNullException(nameof(notificationId));
+ }
+
+ // Initialize properties.
+ this.TotalRecipientCount = 0;
+ this.BatchKeys = new List();
+ this.HasRecipientsPendingInstallation = false;
+ this.NotificationId = notificationId;
+ }
+
+ ///
+ /// Gets the notification id.
+ ///
+ public string NotificationId { get; private set; }
+
+ ///
+ /// Gets or sets the total recipient count of the message.
+ ///
+ public int TotalRecipientCount { get; set; }
+
+ ///
+ /// Gets or sets a value indicating whether there are user app installations pending(recipients who have no conversation id in database) for recipients.
+ ///
+ public bool HasRecipientsPendingInstallation { get; set; }
+
+ ///
+ /// Gets or sets the batch keys of the recipients.
+ ///
+ public List BatchKeys { get; set; }
+ }
+}
diff --git a/Source/CompanyCommunicator.Common/Services/Recipients/RecipientsService.cs b/Source/CompanyCommunicator.Common/Services/Recipients/RecipientsService.cs
new file mode 100644
index 000000000..c3f0853ea
--- /dev/null
+++ b/Source/CompanyCommunicator.Common/Services/Recipients/RecipientsService.cs
@@ -0,0 +1,70 @@
+//
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+//
+
+namespace Microsoft.Teams.Apps.CompanyCommunicator.Common.Services.Recipients
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Linq;
+ using System.Threading.Tasks;
+ using Microsoft.Teams.Apps.CompanyCommunicator.Common.Extensions;
+ using Microsoft.Teams.Apps.CompanyCommunicator.Common.Repositories.SentNotificationData;
+ using Microsoft.Teams.Apps.CompanyCommunicator.Common.Utilities;
+
+ ///
+ /// Recipients service.
+ ///
+ public class RecipientsService : IRecipientsService
+ {
+ private readonly ISentNotificationDataRepository sentNotificationDataRepository;
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// sent notification data repository.
+ public RecipientsService(ISentNotificationDataRepository sentNotificationDataRepository)
+ {
+ this.sentNotificationDataRepository = sentNotificationDataRepository ?? throw new ArgumentNullException(nameof(sentNotificationDataRepository));
+ }
+
+ ///
+ public async Task BatchRecipients(IEnumerable recipients)
+ {
+ if (recipients == null)
+ {
+ throw new ArgumentNullException(nameof(IEnumerable));
+ }
+
+ var notificationId = recipients.FirstOrDefault().PartitionKey;
+
+ var recipientBatches = recipients.AsBatches(Constants.MaximumNumberOfRecipientsInBatch);
+ var recipientInfo = new RecipientsInfo(notificationId)
+ {
+ TotalRecipientCount = recipients.ToList().Count,
+ };
+ int batchIndex = 1;
+ foreach (var recipientBatch in recipientBatches)
+ {
+ var recipientBatchList = recipientBatch.ToList();
+
+ // Update PartitionKey to Batch Key
+ recipientBatchList.ForEach(s =>
+ {
+ s.PartitionKey = PartitionKeyUtility.CreateBatchPartitionKey(s.PartitionKey, batchIndex);
+
+ // Update if there is any recipient which has no conversation id.
+ recipientInfo.HasRecipientsPendingInstallation |= string.IsNullOrEmpty(s.ConversationId);
+ });
+
+ // Store.
+ await this.sentNotificationDataRepository.BatchInsertOrMergeAsync(recipientBatch);
+ recipientInfo.BatchKeys.Add(recipientBatch.FirstOrDefault().PartitionKey);
+ batchIndex++;
+ }
+
+ return recipientInfo;
+ }
+ }
+}
diff --git a/Source/CompanyCommunicator.Common/Services/User/UserTypeService.cs b/Source/CompanyCommunicator.Common/Services/User/UserTypeService.cs
index f03e501ad..8bb133ead 100644
--- a/Source/CompanyCommunicator.Common/Services/User/UserTypeService.cs
+++ b/Source/CompanyCommunicator.Common/Services/User/UserTypeService.cs
@@ -75,8 +75,7 @@ public async Task UpdateUserTypeForExistingUserListAsync(IEnumerable user.AadId)
- .ToList()
- .AsGroups());
+ .AsBatches(Common.Constants.MaximumGraphAPIBatchSize));
if (!users.IsNullOrEmpty())
{
diff --git a/Source/CompanyCommunicator.Common/Utilities/PartitionKeyUtility.cs b/Source/CompanyCommunicator.Common/Utilities/PartitionKeyUtility.cs
new file mode 100644
index 000000000..ad1009fcc
--- /dev/null
+++ b/Source/CompanyCommunicator.Common/Utilities/PartitionKeyUtility.cs
@@ -0,0 +1,58 @@
+//
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+//
+
+namespace Microsoft.Teams.Apps.CompanyCommunicator.Common.Utilities
+{
+ using System;
+
+ ///
+ /// Partition Key utility.
+ ///
+ public static class PartitionKeyUtility
+ {
+ ///
+ /// Create the partition key from notification id.
+ ///
+ /// notification id.
+ /// batch index.
+ /// partition key.
+ public static string CreateBatchPartitionKey(string notificationId, int batchIndex)
+ {
+ return $"{notificationId}:{batchIndex}";
+ }
+
+ ///
+ /// Get the notification id from partition key.
+ ///
+ /// partition key.
+ /// notification id.
+ public static string GetNotificationIdFromBatchPartitionKey(string partitionKey)
+ {
+ var result = partitionKey.Split(":");
+ if (result.Length != 2)
+ {
+ throw new FormatException("Invalid format of batch partition key");
+ }
+
+ return result[0];
+ }
+
+ ///
+ /// Get the notification id from partition key.
+ ///
+ /// partition key.
+ /// notification id.
+ public static string GetBatchIdFromBatchPartitionKey(string partitionKey)
+ {
+ var result = partitionKey.Split(":");
+ if (result.Length != 2)
+ {
+ throw new FormatException("Invalid format of batch partition key");
+ }
+
+ return result[1];
+ }
+ }
+}
diff --git a/Source/CompanyCommunicator.Prep.Func/Export/Streams/DataStreamFacade.cs b/Source/CompanyCommunicator.Prep.Func/Export/Streams/DataStreamFacade.cs
index dd381ea78..c37224cd3 100644
--- a/Source/CompanyCommunicator.Prep.Func/Export/Streams/DataStreamFacade.cs
+++ b/Source/CompanyCommunicator.Prep.Func/Export/Streams/DataStreamFacade.cs
@@ -85,8 +85,7 @@ public async IAsyncEnumerable> GetUserDataStreamAsync(stri
// Group the recipients as per the Graph batch api.
var groupRecipientsByAadId = recipients?
.Select(notitification => notitification.RowKey)
- .ToList()
- .AsGroups();
+ .AsBatches(Common.Constants.MaximumGraphAPIBatchSize);
if (!groupRecipientsByAadId.IsNullOrEmpty())
{
diff --git a/Source/CompanyCommunicator.Prep.Func/PreparingToSend/Activities/GetRecipientsActivity.cs b/Source/CompanyCommunicator.Prep.Func/PreparingToSend/Activities/GetRecipientsActivity.cs
deleted file mode 100644
index 1d20d06c2..000000000
--- a/Source/CompanyCommunicator.Prep.Func/PreparingToSend/Activities/GetRecipientsActivity.cs
+++ /dev/null
@@ -1,123 +0,0 @@
-//
-// Copyright (c) Microsoft Corporation.
-// Licensed under the MIT License.
-//
-
-namespace Microsoft.Teams.Apps.CompanyCommunicator.Prep.Func.PreparingToSend
-{
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Threading.Tasks;
- using Microsoft.Azure.Cosmos.Table;
- using Microsoft.Azure.WebJobs;
- using Microsoft.Azure.WebJobs.Extensions.DurableTask;
- using Microsoft.Teams.Apps.CompanyCommunicator.Common.Repositories.NotificationData;
- using Microsoft.Teams.Apps.CompanyCommunicator.Common.Repositories.SentNotificationData;
-
- ///
- /// Reads all the recipients from Sent notification table.
- ///
- public class GetRecipientsActivity
- {
- // Recommended data count size that should be returned from activity function to orchestrator.
- // Please note that increasing this value can cause OutOfMemoryException.
- private const int MaxResultSize = 100000;
-
- // Maximum record count that Table storage returns.
- private const int UserCount = 1000;
- private readonly ISentNotificationDataRepository sentNotificationDataRepository;
-
- ///
- /// Initializes a new instance of the class.
- ///
- /// The sent notification data repository.
- public GetRecipientsActivity(ISentNotificationDataRepository sentNotificationDataRepository)
- {
- this.sentNotificationDataRepository = sentNotificationDataRepository ?? throw new ArgumentNullException(nameof(sentNotificationDataRepository));
- }
-
- ///
- /// Reads all the recipients from Sent notification table.
- ///
- /// notification.
- /// A representing the asynchronous operation.
- [FunctionName(FunctionNames.GetRecipientsActivity)]
- public async Task<(IEnumerable, TableContinuationToken)> GetRecipientsAsync([ActivityTrigger] NotificationDataEntity notification)
- {
- if (notification == null)
- {
- throw new ArgumentNullException(nameof(notification));
- }
-
- var results = await this.sentNotificationDataRepository.GetPagedAsync(notification.Id, UserCount);
- var recipients = new List();
- if (results.Item1 != null)
- {
- recipients.AddRange(results.Item1);
- }
-
- while (results.Item2 != null && recipients.Count < MaxResultSize)
- {
- results = await this.sentNotificationDataRepository.GetPagedAsync(notification.Id, UserCount, results.Item2);
- if (results.Item1 != null)
- {
- recipients.AddRange(results.Item1);
- }
- }
-
- return (recipients, results.Item2);
- }
-
- ///
- /// Reads all the recipients from Sent notification table.
- ///
- /// Input containing notification id and continuation token.
- /// A representing the asynchronous operation.
- [FunctionName(FunctionNames.GetRecipientsByTokenActivity)]
- public async Task<(IEnumerable, TableContinuationToken)> GetRecipientsByTokenAsync(
- [ActivityTrigger](string notificationId, TableContinuationToken tableContinuationToken) input)
- {
- if (input.notificationId == null)
- {
- throw new ArgumentNullException(nameof(input.notificationId));
- }
-
- if (input.tableContinuationToken == null)
- {
- throw new ArgumentNullException(nameof(input.tableContinuationToken));
- }
-
- var recipients = new List();
- while (input.tableContinuationToken != null && recipients.Count < MaxResultSize)
- {
- var results = await this.sentNotificationDataRepository.GetPagedAsync(input.notificationId, UserCount, input.tableContinuationToken);
- if (results.Item1 != null)
- {
- recipients.AddRange(results.Item1);
- }
-
- input.tableContinuationToken = results.Item2;
- }
-
- return (recipients, input.tableContinuationToken);
- }
-
- ///
- /// Reads all the recipients from Sent notification table who do not have conversation details.
- ///
- /// notification.
- /// A representing the asynchronous operation.
- [FunctionName(FunctionNames.GetPendingRecipientsActivity)]
- public async Task> GetPendingRecipientsAsync([ActivityTrigger] NotificationDataEntity notification)
- {
- if (notification == null)
- {
- throw new ArgumentNullException(nameof(notification));
- }
-
- var recipients = await this.sentNotificationDataRepository.GetAllAsync(notification.Id);
- return recipients.Where(recipient => string.IsNullOrEmpty(recipient.ConversationId));
- }
- }
-}
diff --git a/Source/CompanyCommunicator.Prep.Func/PreparingToSend/Activities/RecipientsActivity.cs b/Source/CompanyCommunicator.Prep.Func/PreparingToSend/Activities/RecipientsActivity.cs
new file mode 100644
index 000000000..467657a43
--- /dev/null
+++ b/Source/CompanyCommunicator.Prep.Func/PreparingToSend/Activities/RecipientsActivity.cs
@@ -0,0 +1,77 @@
+//
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+//
+
+namespace Microsoft.Teams.Apps.CompanyCommunicator.Prep.Func.PreparingToSend
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Linq;
+ using System.Threading.Tasks;
+ using Microsoft.Azure.WebJobs;
+ using Microsoft.Azure.WebJobs.Extensions.DurableTask;
+ using Microsoft.Teams.Apps.CompanyCommunicator.Common.Extensions;
+ using Microsoft.Teams.Apps.CompanyCommunicator.Common.Repositories.SentNotificationData;
+ using Microsoft.Teams.Apps.CompanyCommunicator.Common.Services.Recipients;
+
+ ///
+ /// Reads all the recipients from Sent notification table.
+ ///
+ public class RecipientsActivity
+ {
+ private readonly ISentNotificationDataRepository sentNotificationDataRepository;
+ private readonly IRecipientsService recipientsService;
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The sent notification data repository.
+ /// The recipients service.
+ public RecipientsActivity(
+ ISentNotificationDataRepository sentNotificationDataRepository,
+ IRecipientsService recipientsService)
+ {
+ this.sentNotificationDataRepository = sentNotificationDataRepository ?? throw new ArgumentNullException(nameof(sentNotificationDataRepository));
+ this.recipientsService = recipientsService ?? throw new ArgumentNullException(nameof(recipientsService));
+ }
+
+ ///
+ /// Reads all the batched recipients from Sent notification table.
+ ///
+ /// notification batch key.
+ /// A representing the asynchronous operation.
+ [FunctionName(FunctionNames.GetRecipientsActivity)]
+ public async Task> GetRecipientsAsync([ActivityTrigger] string notificationBatchKey)
+ {
+ _ = notificationBatchKey ?? throw new ArgumentNullException(nameof(notificationBatchKey));
+ return await this.sentNotificationDataRepository.GetAllAsync(notificationBatchKey);
+ }
+
+ ///
+ /// Reads all the batched recipients from Sent notification table who do not have conversation details.
+ ///
+ /// notification batch key.
+ /// A representing the asynchronous operation.
+ [FunctionName(FunctionNames.GetPendingRecipientsActivity)]
+ public async Task> GetPendingRecipientsAsync([ActivityTrigger] string notificationBatchKey)
+ {
+ _ = notificationBatchKey ?? throw new ArgumentNullException(nameof(notificationBatchKey));
+ var recipients = await this.sentNotificationDataRepository.GetAllAsync(notificationBatchKey);
+ return recipients.Where(recipient => string.IsNullOrEmpty(recipient.ConversationId));
+ }
+
+ ///
+ /// Batch all the recipient from Sent notification table.
+ ///
+ /// notification id.
+ /// A representing the asynchronous operation.
+ [FunctionName(FunctionNames.BatchRecipientsActivity)]
+ public async Task BatchRecipientsAsync([ActivityTrigger] string notificationId)
+ {
+ _ = notificationId ?? throw new ArgumentNullException(nameof(notificationId));
+ var recipients = await this.sentNotificationDataRepository.GetAllAsync(notificationId);
+ return await this.recipientsService.BatchRecipients(recipients);
+ }
+ }
+}
diff --git a/Source/CompanyCommunicator.Prep.Func/PreparingToSend/Activities/SendBatchMessagesActivity.cs b/Source/CompanyCommunicator.Prep.Func/PreparingToSend/Activities/SendBatchMessagesActivity.cs
index 1b958dc7d..506cb6231 100644
--- a/Source/CompanyCommunicator.Prep.Func/PreparingToSend/Activities/SendBatchMessagesActivity.cs
+++ b/Source/CompanyCommunicator.Prep.Func/PreparingToSend/Activities/SendBatchMessagesActivity.cs
@@ -11,7 +11,6 @@ namespace Microsoft.Teams.Apps.CompanyCommunicator.Prep.Func.PreparingToSend
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
- using Microsoft.Teams.Apps.CompanyCommunicator.Common.Repositories.NotificationData;
using Microsoft.Teams.Apps.CompanyCommunicator.Common.Repositories.SentNotificationData;
using Microsoft.Teams.Apps.CompanyCommunicator.Common.Repositories.TeamData;
using Microsoft.Teams.Apps.CompanyCommunicator.Common.Repositories.UserData;
@@ -41,11 +40,11 @@ public SendBatchMessagesActivity(
/// A representing the asynchronous operation.
[FunctionName(FunctionNames.SendBatchMessagesActivity)]
public async Task RunAsync(
- [ActivityTrigger](NotificationDataEntity notification, List batch) input)
+ [ActivityTrigger](string notificationId, List batch) input)
{
- if (input.notification == null)
+ if (input.notificationId == null)
{
- throw new ArgumentNullException(nameof(input.notification));
+ throw new ArgumentNullException(nameof(input.notificationId));
}
if (input.batch == null)
@@ -58,7 +57,7 @@ public async Task RunAsync(
{
return new SendQueueMessageContent()
{
- NotificationId = input.notification.Id,
+ NotificationId = input.notificationId,
RecipientData = this.ConvertToRecipientData(recipient),
};
});
diff --git a/Source/CompanyCommunicator.Prep.Func/PreparingToSend/Activities/SyncAllUsersActivity.cs b/Source/CompanyCommunicator.Prep.Func/PreparingToSend/Activities/SyncAllUsersActivity.cs
index 72c265ca9..8eb69b800 100644
--- a/Source/CompanyCommunicator.Prep.Func/PreparingToSend/Activities/SyncAllUsersActivity.cs
+++ b/Source/CompanyCommunicator.Prep.Func/PreparingToSend/Activities/SyncAllUsersActivity.cs
@@ -21,6 +21,7 @@ namespace Microsoft.Teams.Apps.CompanyCommunicator.Prep.Func.PreparingToSend
using Microsoft.Teams.Apps.CompanyCommunicator.Common.Repositories.UserData;
using Microsoft.Teams.Apps.CompanyCommunicator.Common.Resources;
using Microsoft.Teams.Apps.CompanyCommunicator.Common.Services.MicrosoftGraph;
+ using Microsoft.Teams.Apps.CompanyCommunicator.Common.Services.Recipients;
using Microsoft.Teams.Apps.CompanyCommunicator.Common.Services.User;
using Microsoft.Teams.Apps.CompanyCommunicator.Prep.Func.PreparingToSend.Extensions;
@@ -34,6 +35,7 @@ public class SyncAllUsersActivity
private readonly IUsersService usersService;
private readonly INotificationDataRepository notificationDataRepository;
private readonly IUserTypeService userTypeService;
+ private readonly IRecipientsService recipientsService;
private readonly IStringLocalizer localizer;
///
@@ -44,6 +46,7 @@ public class SyncAllUsersActivity
/// Users service.
/// Notification data entity repository.
/// User type service.
+ /// The recipients service.
/// Localization service.
public SyncAllUsersActivity(
IUserDataRepository userDataRepository,
@@ -51,6 +54,7 @@ public SyncAllUsersActivity(
IUsersService usersService,
INotificationDataRepository notificationDataRepository,
IUserTypeService userTypeService,
+ IRecipientsService recipientsService,
IStringLocalizer localizer)
{
this.userDataRepository = userDataRepository ?? throw new ArgumentNullException(nameof(userDataRepository));
@@ -58,6 +62,7 @@ public SyncAllUsersActivity(
this.usersService = usersService ?? throw new ArgumentNullException(nameof(usersService));
this.notificationDataRepository = notificationDataRepository ?? throw new ArgumentNullException(nameof(notificationDataRepository));
this.userTypeService = userTypeService ?? throw new ArgumentNullException(nameof(userTypeService));
+ this.recipientsService = recipientsService ?? throw new ArgumentNullException(nameof(recipientsService));
this.localizer = localizer ?? throw new ArgumentNullException(nameof(localizer));
}
@@ -68,7 +73,7 @@ public SyncAllUsersActivity(
/// Logging service.
/// A representing the asynchronous operation.
[FunctionName(FunctionNames.SyncAllUsersActivity)]
- public async Task RunAsync([ActivityTrigger] NotificationDataEntity notification, ILogger log)
+ public async Task RunAsync([ActivityTrigger] NotificationDataEntity notification, ILogger log)
{
if (notification == null)
{
@@ -85,13 +90,18 @@ public async Task RunAsync([ActivityTrigger] NotificationDataEntity notification
await this.userTypeService.UpdateUserTypeForExistingUserListAsync(users);
users = await this.userDataRepository.GetAllAsync();
- if (!users.IsNullOrEmpty())
+ if (users.IsNullOrEmpty())
{
- // Store in sent notification table.
- var recipients = users.Select(
- user => user.CreateInitialSentNotificationDataEntity(partitionKey: notification.Id));
- await this.sentNotificationDataRepository.BatchInsertOrMergeAsync(recipients);
+ return new RecipientsInfo(notification.Id);
}
+
+ // Store in sent notification table.
+ var recipients = users.Select(
+ user => user.CreateInitialSentNotificationDataEntity(partitionKey: notification.Id));
+ await this.sentNotificationDataRepository.BatchInsertOrMergeAsync(recipients);
+
+ // Store in batches and return batch info.
+ return await this.recipientsService.BatchRecipients(recipients);
}
///
diff --git a/Source/CompanyCommunicator.Prep.Func/PreparingToSend/Activities/SyncGroupMembersActivity.cs b/Source/CompanyCommunicator.Prep.Func/PreparingToSend/Activities/SyncGroupMembersActivity.cs
index 713d8d476..6c99ba510 100644
--- a/Source/CompanyCommunicator.Prep.Func/PreparingToSend/Activities/SyncGroupMembersActivity.cs
+++ b/Source/CompanyCommunicator.Prep.Func/PreparingToSend/Activities/SyncGroupMembersActivity.cs
@@ -71,20 +71,9 @@ public SyncGroupMembersActivity(
public async Task RunAsync(
[ActivityTrigger](string notificationId, string groupId) input, ILogger log)
{
- if (input.notificationId == null)
- {
- throw new ArgumentNullException(nameof(input.notificationId));
- }
-
- if (input.groupId == null)
- {
- throw new ArgumentNullException(nameof(input.groupId));
- }
-
- if (log == null)
- {
- throw new ArgumentNullException(nameof(log));
- }
+ _ = input.notificationId ?? throw new ArgumentNullException(nameof(input.notificationId));
+ _ = input.groupId ?? throw new ArgumentNullException(nameof(input.groupId));
+ _ = log ?? throw new ArgumentNullException(nameof(log));
var notificationId = input.notificationId;
var groupId = input.groupId;
@@ -97,11 +86,8 @@ public async Task RunAsync(
// Convert to Recipients
var recipients = await this.GetRecipientsAsync(notificationId, users);
- if (!recipients.IsNullOrEmpty())
- {
- // Store.
- await this.sentNotificationDataRepository.BatchInsertOrMergeAsync(recipients);
- }
+ // Store.
+ await this.sentNotificationDataRepository.BatchInsertOrMergeAsync(recipients);
}
catch (Exception ex)
{
@@ -127,7 +113,7 @@ await Task.WhenAll(users.ForEachAsync(maxParallelism, async user =>
{
var userEntity = await this.userDataRepository.GetAsync(UserDataTableNames.UserDataPartition, user.Id);
- // This is to set the type of user(exisiting only, new ones will be skipped) to identify later if it is member or guest.
+ // This is to set the type of user(existing only, new ones will be skipped) to identify later if it is member or guest.
var userType = user.UserPrincipalName.GetUserType();
if (userEntity == null && userType.Equals(UserType.Guest, StringComparison.OrdinalIgnoreCase))
{
diff --git a/Source/CompanyCommunicator.Prep.Func/PreparingToSend/Activities/SyncTeamMembersActivity.cs b/Source/CompanyCommunicator.Prep.Func/PreparingToSend/Activities/SyncTeamMembersActivity.cs
index f6fda7cdd..e988acf71 100644
--- a/Source/CompanyCommunicator.Prep.Func/PreparingToSend/Activities/SyncTeamMembersActivity.cs
+++ b/Source/CompanyCommunicator.Prep.Func/PreparingToSend/Activities/SyncTeamMembersActivity.cs
@@ -116,11 +116,8 @@ public async Task RunAsync(
// Convert to Recipients.
var recipients = await this.GetRecipientsAsync(notificationId, userEntities);
- if (!recipients.IsNullOrEmpty())
- {
- // Store.
- await this.sentNotificationDataRepository.BatchInsertOrMergeAsync(recipients);
- }
+ // Store.
+ await this.sentNotificationDataRepository.BatchInsertOrMergeAsync(recipients);
}
catch (Exception ex)
{
@@ -151,7 +148,7 @@ await Task.WhenAll(users.ForEachAsync(maxParallelism, async user =>
return;
}
- // This is to set the type of user(exisiting only, new ones will be skipped) to identify later if it is member or guest.
+ // This is to set the type of user(existing only, new ones will be skipped) to identify later if it is member or guest.
await this.userTypeService.UpdateUserTypeForExistingUserAsync(userEntity, user.UserType);
user.ConversationId ??= userEntity?.ConversationId;
recipients.Add(user.CreateInitialSentNotificationDataEntity(partitionKey: notificationId));
diff --git a/Source/CompanyCommunicator.Prep.Func/PreparingToSend/Activities/SyncTeamsActivity.cs b/Source/CompanyCommunicator.Prep.Func/PreparingToSend/Activities/SyncTeamsActivity.cs
index 7f32a18f9..d26faf0fa 100644
--- a/Source/CompanyCommunicator.Prep.Func/PreparingToSend/Activities/SyncTeamsActivity.cs
+++ b/Source/CompanyCommunicator.Prep.Func/PreparingToSend/Activities/SyncTeamsActivity.cs
@@ -17,6 +17,8 @@ namespace Microsoft.Teams.Apps.CompanyCommunicator.Prep.Func.PreparingToSend
using Microsoft.Teams.Apps.CompanyCommunicator.Common.Repositories.SentNotificationData;
using Microsoft.Teams.Apps.CompanyCommunicator.Common.Repositories.TeamData;
using Microsoft.Teams.Apps.CompanyCommunicator.Common.Resources;
+ using Microsoft.Teams.Apps.CompanyCommunicator.Common.Services.Recipients;
+ using Microsoft.Teams.Apps.CompanyCommunicator.Common.Utilities;
///
/// Sync teams data to Sent notification table.
@@ -27,6 +29,7 @@ public class SyncTeamsActivity
private readonly ISentNotificationDataRepository sentNotificationDataRepository;
private readonly IStringLocalizer localizer;
private readonly INotificationDataRepository notificationDataRepository;
+ private readonly IRecipientsService recipientsService;
///
/// Initializes a new instance of the class.
@@ -35,16 +38,19 @@ public class SyncTeamsActivity
/// Sent notification data repository.
/// Localization service.
/// Notification data entity repository.
+ /// Recipients service.
public SyncTeamsActivity(
ITeamDataRepository teamDataRepository,
ISentNotificationDataRepository sentNotificationDataRepository,
IStringLocalizer localizer,
- INotificationDataRepository notificationDataRepository)
+ INotificationDataRepository notificationDataRepository,
+ IRecipientsService recipientsService)
{
this.teamDataRepository = teamDataRepository ?? throw new ArgumentNullException(nameof(teamDataRepository));
this.sentNotificationDataRepository = sentNotificationDataRepository ?? throw new ArgumentNullException(nameof(sentNotificationDataRepository));
this.localizer = localizer ?? throw new ArgumentNullException(nameof(localizer));
this.notificationDataRepository = notificationDataRepository ?? throw new ArgumentNullException(nameof(notificationDataRepository));
+ this.recipientsService = recipientsService ?? throw new ArgumentNullException(nameof(recipientsService));
}
///
@@ -54,7 +60,7 @@ public SyncTeamsActivity(
/// Logging service.
/// A representing the asynchronous operation.
[FunctionName(FunctionNames.SyncTeamsActivity)]
- public async Task RunAsync([ActivityTrigger] NotificationDataEntity notification, ILogger log)
+ public async Task RunAsync([ActivityTrigger] NotificationDataEntity notification, ILogger log)
{
if (notification == null)
{
@@ -74,6 +80,9 @@ public async Task RunAsync([ActivityTrigger] NotificationDataEntity notification
// Store.
await this.sentNotificationDataRepository.BatchInsertOrMergeAsync(recipients);
+
+ // Store in batches and return batch info.
+ return await this.recipientsService.BatchRecipients(recipients);
}
///
diff --git a/Source/CompanyCommunicator.Prep.Func/PreparingToSend/Activities/TeamsConversationActivity.cs b/Source/CompanyCommunicator.Prep.Func/PreparingToSend/Activities/TeamsConversationActivity.cs
index 202b60925..fd5a8a543 100644
--- a/Source/CompanyCommunicator.Prep.Func/PreparingToSend/Activities/TeamsConversationActivity.cs
+++ b/Source/CompanyCommunicator.Prep.Func/PreparingToSend/Activities/TeamsConversationActivity.cs
@@ -82,7 +82,7 @@ public TeamsConversationActivity(
/// A representing the asynchronous operation.
[FunctionName(FunctionNames.TeamsConversationActivity)]
public async Task CreateConversationAsync(
- [ActivityTrigger](string notificationId, SentNotificationDataEntity recipient) input,
+ [ActivityTrigger](string notificationId, string batchKey, SentNotificationDataEntity recipient) input,
ILogger log)
{
if (input.notificationId == null)
@@ -95,6 +95,11 @@ public async Task CreateConversationAsync(
throw new ArgumentNullException(nameof(input.recipient));
}
+ if (string.IsNullOrEmpty(input.batchKey))
+ {
+ throw new ArgumentNullException(nameof(input.batchKey));
+ }
+
if (log == null)
{
throw new ArgumentNullException(nameof(log));
@@ -108,13 +113,15 @@ public async Task CreateConversationAsync(
return;
}
- // Skip Guest users.
+ // No-op for null user type.
if (string.IsNullOrEmpty(recipient.UserType))
{
- throw new ArgumentNullException(nameof(recipient.UserType));
+ log.LogInformation("Unknown User Type.");
+ return;
}
else if (recipient.UserType.Equals(UserType.Guest, StringComparison.OrdinalIgnoreCase))
{
+ // Skip guest users.
return;
}
@@ -155,6 +162,10 @@ public async Task CreateConversationAsync(
// Update sent notification and user entity.
await this.sentNotificationDataRepository.InsertOrMergeAsync(recipient);
await this.UpdateUserEntityAsync(recipient);
+
+ // Update Batch entry.
+ recipient.PartitionKey = input.batchKey;
+ await this.sentNotificationDataRepository.InsertOrMergeAsync(recipient);
}
private async Task CreateConversationWithTeamsUser(
diff --git a/Source/CompanyCommunicator.Prep.Func/PreparingToSend/FunctionNames.cs b/Source/CompanyCommunicator.Prep.Func/PreparingToSend/FunctionNames.cs
index 0d9e1089a..566a8457b 100644
--- a/Source/CompanyCommunicator.Prep.Func/PreparingToSend/FunctionNames.cs
+++ b/Source/CompanyCommunicator.Prep.Func/PreparingToSend/FunctionNames.cs
@@ -46,32 +46,32 @@ public static class FunctionNames
public const string SyncAllUsersActivity = nameof(SyncAllUsersActivity);
///
- /// Sync Team members acitivity function.
+ /// Sync Team members activity function.
///
public const string SyncTeamMembersActivity = nameof(SyncTeamMembersActivity);
///
- /// Sync group members acitivity function.
+ /// Sync group members activity function.
///
public const string SyncGroupMembersActivity = nameof(SyncGroupMembersActivity);
///
- /// Sync Teams acitivity function.
+ /// Sync Teams activity function.
///
public const string SyncTeamsActivity = nameof(SyncTeamsActivity);
///
- /// Get recipients acitvity function.
+ /// Get recipients activity function.
///
public const string GetRecipientsActivity = nameof(GetRecipientsActivity);
///
- /// Get recipients acitvity by token function.
+ /// Batch recipients activity function.
///
- public const string GetRecipientsByTokenActivity = nameof(GetRecipientsByTokenActivity);
+ public const string BatchRecipientsActivity = nameof(BatchRecipientsActivity);
///
- /// Get pending recipients (ie recipients with no conversation id in the database) acitvity function.
+ /// Get pending recipients (ie recipients with no conversation id in the database) activity function.
///
public const string GetPendingRecipientsActivity = nameof(GetPendingRecipientsActivity);
@@ -126,7 +126,7 @@ public static class FunctionNames
public const string HandleExportFailureActivity = nameof(HandleExportFailureActivity);
///
- /// Export orechestration function.
+ /// Export orchestration function.
///
public const string ExportOrchestration = nameof(ExportOrchestration);
}
diff --git a/Source/CompanyCommunicator.Prep.Func/PreparingToSend/Orchestrators/PrepareToSendOrchestrator.cs b/Source/CompanyCommunicator.Prep.Func/PreparingToSend/Orchestrators/PrepareToSendOrchestrator.cs
index f820282dd..6de5734e9 100644
--- a/Source/CompanyCommunicator.Prep.Func/PreparingToSend/Orchestrators/PrepareToSendOrchestrator.cs
+++ b/Source/CompanyCommunicator.Prep.Func/PreparingToSend/Orchestrators/PrepareToSendOrchestrator.cs
@@ -6,11 +6,13 @@
namespace Microsoft.Teams.Apps.CompanyCommunicator.Prep.Func.PreparingToSend
{
using System;
+ using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Extensions.Logging;
using Microsoft.Teams.Apps.CompanyCommunicator.Common.Repositories.NotificationData;
+ using Microsoft.Teams.Apps.CompanyCommunicator.Common.Services.Recipients;
///
/// Prepare to Send orchestrator.
@@ -21,7 +23,8 @@ namespace Microsoft.Teams.Apps.CompanyCommunicator.Prep.Func.PreparingToSend
/// 1. Stores the message in sending notification table.
/// 2. Syncs recipients information to sent notification table.
/// 3. Creates teams conversation with recipients if required.
- /// 4. Starts Send Queue orchestration.
+ /// 4. Starts Data aggregation.
+ /// 5. Starts Send Queue orchestration.
///
public static class PrepareToSendOrchestrator
{
@@ -61,30 +64,48 @@ await context.CallActivityWithRetryAsync(
log.LogInformation("About to sync recipients.");
}
- await context.CallSubOrchestratorWithRetryAsync(
+ var recipientsInfo = await context.CallSubOrchestratorWithRetryAsync(
FunctionNames.SyncRecipientsOrchestrator,
FunctionSettings.DefaultRetryOptions,
notificationDataEntity);
- if (!context.IsReplaying)
+ // Proactive Installation
+ if (recipientsInfo.HasRecipientsPendingInstallation)
{
- log.LogInformation("About to create conversation for recipients if required.");
- }
+ if (!context.IsReplaying)
+ {
+ log.LogInformation("About to create 1:1 conversations for recipients if required.");
+ }
- await context.CallSubOrchestratorWithRetryAsync(
- FunctionNames.TeamsConversationOrchestrator,
- FunctionSettings.DefaultRetryOptions,
- notificationDataEntity);
+ // Update notification status.
+ await context.CallActivityWithRetryAsync(
+ FunctionNames.UpdateNotificationStatusActivity,
+ FunctionSettings.DefaultRetryOptions,
+ (recipientsInfo.NotificationId, NotificationStatus.InstallingApp));
+
+ // Fan Out/Fan In Conversation orchestrator.
+ await FanOutFanInSubOrchestratorAsync(context, FunctionNames.TeamsConversationOrchestrator, recipientsInfo);
+ }
if (!context.IsReplaying)
{
log.LogInformation("About to send messages to send queue.");
}
- await context.CallSubOrchestratorWithRetryAsync(
- FunctionNames.SendQueueOrchestrator,
+ // Update notification status.
+ await context.CallActivityWithRetryAsync(
+ FunctionNames.UpdateNotificationStatusActivity,
FunctionSettings.DefaultRetryOptions,
- notificationDataEntity);
+ (notificationDataEntity.Id, NotificationStatus.Sending));
+
+ // Update Total recipient count.
+ await context.CallActivityWithRetryAsync(
+ FunctionNames.DataAggregationTriggerActivity,
+ FunctionSettings.DefaultRetryOptions,
+ (notificationDataEntity.Id, recipientsInfo.TotalRecipientCount));
+
+ // Fan-out/ Fan-in send queue orchestrator.
+ await FanOutFanInSubOrchestratorAsync(context, FunctionNames.SendQueueOrchestrator, recipientsInfo);
log.LogInformation($"PrepareToSendOrchestrator successfully completed for notification: {notificationDataEntity.Id}!");
}
@@ -99,5 +120,23 @@ await context.CallActivityWithRetryAsync(
(notificationDataEntity, ex));
}
}
+
+ private static async Task FanOutFanInSubOrchestratorAsync(IDurableOrchestrationContext context, string functionName, RecipientsInfo recipientsInfo)
+ {
+ var tasks = new List();
+
+ // Fan-out
+ foreach (var batchKey in recipientsInfo.BatchKeys)
+ {
+ var task = context.CallSubOrchestratorWithRetryAsync(
+ functionName,
+ FunctionSettings.DefaultRetryOptions,
+ batchKey);
+ tasks.Add(task);
+ }
+
+ // Fan-in
+ await Task.WhenAll(tasks);
+ }
}
}
diff --git a/Source/CompanyCommunicator.Prep.Func/PreparingToSend/Orchestrators/SendQueueOrchestrator.cs b/Source/CompanyCommunicator.Prep.Func/PreparingToSend/Orchestrators/SendQueueOrchestrator.cs
index 2efa24a58..96a2e151d 100644
--- a/Source/CompanyCommunicator.Prep.Func/PreparingToSend/Orchestrators/SendQueueOrchestrator.cs
+++ b/Source/CompanyCommunicator.Prep.Func/PreparingToSend/Orchestrators/SendQueueOrchestrator.cs
@@ -1,37 +1,35 @@
-//
+//
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
//
-namespace Microsoft.Teams.Apps.CompanyCommunicator.Prep.Func.PreparingToSend
+namespace Microsoft.Teams.Apps.CompanyCommunicator.Prep.Func.PreparingToSend.Orchestrators
{
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
- using Microsoft.Azure.Cosmos.Table;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Extensions.Logging;
- using Microsoft.Teams.Apps.CompanyCommunicator.Common.Repositories.NotificationData;
+ using Microsoft.Teams.Apps.CompanyCommunicator.Common.Extensions;
using Microsoft.Teams.Apps.CompanyCommunicator.Common.Repositories.SentNotificationData;
using Microsoft.Teams.Apps.CompanyCommunicator.Common.Services.MessageQueues.SendQueue;
+ using Microsoft.Teams.Apps.CompanyCommunicator.Common.Utilities;
///
/// Send Queue orchestrator.
///
/// Does following:
/// 1. Reads all the recipients from Sent notification tables.
- /// 2. Starts data aggregation.
- /// 3. Sends messages to Send Queue in batches.
+ /// 2. Sends messages to Send Queue in batches.
///
public static class SendQueueOrchestrator
{
///
- /// SendQueueOrchestrator function.
+ /// SendQueueSubOrchestrator function.
/// Does following:
- /// 1. Reads all the recipients from Sent notification tables.
- /// 2. Starts data aggregation.
- /// 3. Sends messages to Send Queue in batches.
+ /// 1. Reads the batch recipients from Sent notification tables.
+ /// 2. Sends messages to Send Queue in batches.
///
/// Durable orchestration context.
/// Logger.
@@ -41,53 +39,23 @@ public static async Task RunOrchestrator(
[OrchestrationTrigger] IDurableOrchestrationContext context,
ILogger log)
{
- var notification = context.GetInput();
-
- // Update notification status.
- await context.CallActivityWithRetryAsync(
- FunctionNames.UpdateNotificationStatusActivity,
- FunctionSettings.DefaultRetryOptions,
- (notification.Id, NotificationStatus.Sending));
+ var batchPartitionKey = context.GetInput();
+ var notificationId = PartitionKeyUtility.GetNotificationIdFromBatchPartitionKey(batchPartitionKey);
+ var batchId = PartitionKeyUtility.GetBatchIdFromBatchPartitionKey(batchPartitionKey);
if (!context.IsReplaying)
{
- log.LogInformation("About to get all recipients.");
+ log.LogInformation($"About to get recipients from batch {batchId}.");
}
- var results = await context.CallActivityWithRetryAsync<(IEnumerable, TableContinuationToken)>(
+ var recipients = await context.CallActivityWithRetryAsync>(
FunctionNames.GetRecipientsActivity,
FunctionSettings.DefaultRetryOptions,
- notification);
-
- var recipientsList = new List();
- if (results.Item1 != null)
- {
- recipientsList.AddRange(results.Item1.ToList());
- }
-
- while (results.Item2 != null)
- {
- results = await context.CallActivityWithRetryAsync<(IEnumerable, TableContinuationToken)>(
- FunctionNames.GetRecipientsByTokenActivity,
- FunctionSettings.DefaultRetryOptions,
- (notification.Id, results.Item2));
- if (results.Item1 != null)
- {
- recipientsList.AddRange(results.Item1);
- }
- }
-
- if (!context.IsReplaying)
- {
- log.LogInformation("About to send data aggregration message to data queue.");
- }
-
- await context.CallActivityWithRetryAsync(
- FunctionNames.DataAggregationTriggerActivity,
- FunctionSettings.DefaultRetryOptions,
- (notification.Id, recipientsList.Count));
+ batchPartitionKey);
- var batches = SeparateIntoBatches(recipientsList);
+ // Use the SendQueue's maximum number of messages in a batch request number because
+ // the list is being broken into batches in order to be added to that queue.
+ var batches = recipients.AsBatches(SendQueue.MaxNumberOfMessagesInBatchRequest).ToList();
var totalBatchCount = batches.Count;
if (!context.IsReplaying)
@@ -106,7 +74,7 @@ await context.CallActivityWithRetryAsync(
var task = context.CallActivityWithRetryAsync(
FunctionNames.SendBatchMessagesActivity,
FunctionSettings.DefaultRetryOptions,
- (notification, batches[batchIndex]));
+ (notificationId, batches[batchIndex]));
tasks.Add(task);
}
@@ -114,43 +82,5 @@ await context.CallActivityWithRetryAsync(
// Fan-out Fan-in
await Task.WhenAll(tasks);
}
-
- ///
- /// Separate a list of recipients into batches (a list of lists).
- /// The size of the batch is determined by the maximum allowed size of a batch
- /// request to the Send queue service bus queue.
- ///
- /// The list to break into batches.
- /// The batches (a list of lists).
- private static List> SeparateIntoBatches(List sourceList)
- {
- var batches = new List>();
-
- var totalNumberOfEntities = sourceList.Count;
-
- // Use the SendQueue's maximum number of messages in a batch request number because
- // the list is being broken into batches in order to be added to that queue.
- var batchSize = SendQueue.MaxNumberOfMessagesInBatchRequest;
- var numberOfCompleteBatches = totalNumberOfEntities / batchSize;
- var numberOfEntitiesInIncompleteBatch = totalNumberOfEntities % batchSize;
-
- for (var i = 0; i < numberOfCompleteBatches; i++)
- {
- var startingIndex = i * batchSize;
- var batch = sourceList.GetRange(startingIndex, batchSize);
- batches.Add(batch);
- }
-
- if (numberOfEntitiesInIncompleteBatch != 0)
- {
- var incompleteBatchStartingIndex = numberOfCompleteBatches * batchSize;
- var incompleteBatch = sourceList.GetRange(
- incompleteBatchStartingIndex,
- numberOfEntitiesInIncompleteBatch);
- batches.Add(incompleteBatch);
- }
-
- return batches;
- }
}
-}
\ No newline at end of file
+}
diff --git a/Source/CompanyCommunicator.Prep.Func/PreparingToSend/Orchestrators/SyncRecipientsOrchestrator.cs b/Source/CompanyCommunicator.Prep.Func/PreparingToSend/Orchestrators/SyncRecipientsOrchestrator.cs
index 71a040ce7..071223dc1 100644
--- a/Source/CompanyCommunicator.Prep.Func/PreparingToSend/Orchestrators/SyncRecipientsOrchestrator.cs
+++ b/Source/CompanyCommunicator.Prep.Func/PreparingToSend/Orchestrators/SyncRecipientsOrchestrator.cs
@@ -13,6 +13,7 @@ namespace Microsoft.Teams.Apps.CompanyCommunicator.Prep.Func.PreparingToSend
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Extensions.Logging;
using Microsoft.Teams.Apps.CompanyCommunicator.Common.Repositories.NotificationData;
+ using Microsoft.Teams.Apps.CompanyCommunicator.Common.Services.Recipients;
///
/// Syncs target set of recipients to Sent notification table.
@@ -26,7 +27,7 @@ public static class SyncRecipientsOrchestrator
/// Logging service.
/// representing the asynchronous operation.
[FunctionName(FunctionNames.SyncRecipientsOrchestrator)]
- public static async Task RunOrchestrator(
+ public static async Task RunOrchestrator(
[OrchestrationTrigger] IDurableOrchestrationContext context,
ILogger log)
{
@@ -41,58 +42,31 @@ await context.CallActivityWithRetryAsync(
// All users.
if (notification.AllUsers)
{
- await context.CallActivityWithRetryAsync(
+ return await context.CallActivityWithRetryAsync(
FunctionNames.SyncAllUsersActivity,
FunctionSettings.DefaultRetryOptions,
notification);
- return;
}
// Members of specific teams.
if (notification.Rosters.Any())
{
- var tasks = new List();
- foreach (var teamId in notification.Rosters)
- {
- var task = context.CallActivityWithRetryAsync(
- FunctionNames.SyncTeamMembersActivity,
- FunctionSettings.DefaultRetryOptions,
- (notification.Id, teamId));
- tasks.Add(task);
- }
-
- // Fan-Out Fan-In.
- await Task.WhenAll(tasks);
- return;
+ return await FanOutFanInActivityAsync(context, FunctionNames.SyncTeamMembersActivity, notification.Rosters, notification.Id);
}
// Members of M365 groups, DG or SG.
if (notification.Groups.Any())
{
- var tasks = new List();
- foreach (var groupId in notification.Groups)
- {
- var task = context.CallActivityWithRetryAsync(
- FunctionNames.SyncGroupMembersActivity,
- FunctionSettings.DefaultRetryOptions,
- (notification.Id, groupId));
-
- tasks.Add(task);
- }
-
- // Fan-Out Fan-In
- await Task.WhenAll(tasks);
- return;
+ return await FanOutFanInActivityAsync(context, FunctionNames.SyncGroupMembersActivity, notification.Groups, notification.Id);
}
// General channel of teams.
if (notification.Teams.Any())
{
- await context.CallActivityWithRetryAsync(
+ return await context.CallActivityWithRetryAsync(
FunctionNames.SyncTeamsActivity,
FunctionSettings.DefaultRetryOptions,
notification);
- return;
}
// Invalid audience.
@@ -100,5 +74,40 @@ await context.CallActivityWithRetryAsync(
log.LogError(errorMessage);
throw new ArgumentException(errorMessage);
}
+
+ ///
+ /// Fan out Fan in activities.
+ ///
+ /// durable orchestration context.
+ /// activity name.
+ /// entities e.g. groups or teams.
+ /// notification id.
+ /// recipient information.
+ private static async Task FanOutFanInActivityAsync(IDurableOrchestrationContext context, string functionName, IEnumerable entities, string notificationId)
+ {
+ var tasks = new List();
+ int index = 1;
+
+ // Fan-out
+ foreach (var entityId in entities)
+ {
+ var task = context.CallActivityWithRetryAsync(
+ functionName,
+ FunctionSettings.DefaultRetryOptions,
+ (notificationId, entityId, index));
+
+ tasks.Add(task);
+ index++;
+ }
+
+ // Fan-In
+ await Task.WhenAll(tasks);
+
+ // Batch recipients.
+ return await context.CallActivityWithRetryAsync(
+ FunctionNames.BatchRecipientsActivity,
+ FunctionSettings.DefaultRetryOptions,
+ notificationId);
+ }
}
}
\ No newline at end of file
diff --git a/Source/CompanyCommunicator.Prep.Func/PreparingToSend/Orchestrators/TeamsConversationOrchestrator.cs b/Source/CompanyCommunicator.Prep.Func/PreparingToSend/Orchestrators/TeamsConversationOrchestrator.cs
index bc0d6db66..800575a39 100644
--- a/Source/CompanyCommunicator.Prep.Func/PreparingToSend/Orchestrators/TeamsConversationOrchestrator.cs
+++ b/Source/CompanyCommunicator.Prep.Func/PreparingToSend/Orchestrators/TeamsConversationOrchestrator.cs
@@ -11,13 +11,13 @@ namespace Microsoft.Teams.Apps.CompanyCommunicator.Prep.Func.PreparingToSend
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Extensions.Logging;
- using Microsoft.Teams.Apps.CompanyCommunicator.Common.Repositories.NotificationData;
using Microsoft.Teams.Apps.CompanyCommunicator.Common.Repositories.SentNotificationData;
+ using Microsoft.Teams.Apps.CompanyCommunicator.Common.Utilities;
///
/// Teams conversation orchestrator.
/// Does following:
- /// 1. Gets all the recipients for whom we do not have conversation Id.
+ /// 1. Gets the batch recipients for whom we do not have conversation Id.
/// 2. Creates conversation with each recipient.
///
public static class TeamsConversationOrchestrator
@@ -36,41 +36,44 @@ public static async Task RunOrchestrator(
[OrchestrationTrigger] IDurableOrchestrationContext context,
ILogger log)
{
- var notification = context.GetInput();
+ var batchPartitionKey = context.GetInput();
+ var notificationId = PartitionKeyUtility.GetNotificationIdFromBatchPartitionKey(batchPartitionKey);
if (!context.IsReplaying)
{
- log.LogInformation($"About to get pending recipients (with no conversation id in database.");
+ log.LogInformation($"About to get pending recipients (with no conversation id in database).");
}
var recipients = await context.CallActivityWithRetryAsync>(
- FunctionNames.GetPendingRecipientsActivity,
- FunctionSettings.DefaultRetryOptions,
- notification);
+ FunctionNames.GetPendingRecipientsActivity,
+ FunctionSettings.DefaultRetryOptions,
+ batchPartitionKey);
- var count = recipients.Count();
- if (!context.IsReplaying)
+ var count = recipients.ToList().Count;
+ if (count == 0)
{
- log.LogInformation($"About to create conversation with {count} recipients.");
+ log.LogInformation("No pending recipients.");
+ return;
}
- if (count > 0)
+ if (!context.IsReplaying)
{
- // Update notification status.
- await context.CallActivityWithRetryAsync(
- FunctionNames.UpdateNotificationStatusActivity,
- FunctionSettings.DefaultRetryOptions,
- (notification.Id, NotificationStatus.InstallingApp));
+ log.LogInformation($"About to create 1:1 conversations with {count} recipients.");
}
// Create conversation.
var tasks = new List();
foreach (var recipient in recipients)
{
+ // Update batch partition key to actual notification Id.
+ // Because batch partition key is used only for batching data.
+ // Actual state and data is stored against the notification id record in SentNotificationData Table.
+ recipient.PartitionKey = notificationId;
+
var task = context.CallActivityWithRetryAsync(
FunctionNames.TeamsConversationActivity,
FunctionSettings.DefaultRetryOptions,
- (notification.Id, recipient));
+ (notificationId, batchPartitionKey, recipient));
tasks.Add(task);
}
diff --git a/Source/CompanyCommunicator.Prep.Func/Startup.cs b/Source/CompanyCommunicator.Prep.Func/Startup.cs
index 946e3a000..10e490d13 100644
--- a/Source/CompanyCommunicator.Prep.Func/Startup.cs
+++ b/Source/CompanyCommunicator.Prep.Func/Startup.cs
@@ -34,6 +34,7 @@ namespace Microsoft.Teams.Apps.CompanyCommunicator.Prep.Func
using Microsoft.Teams.Apps.CompanyCommunicator.Common.Services.MessageQueues.ExportQueue;
using Microsoft.Teams.Apps.CompanyCommunicator.Common.Services.MessageQueues.SendQueue;
using Microsoft.Teams.Apps.CompanyCommunicator.Common.Services.MicrosoftGraph;
+ using Microsoft.Teams.Apps.CompanyCommunicator.Common.Services.Recipients;
using Microsoft.Teams.Apps.CompanyCommunicator.Common.Services.Teams;
using Microsoft.Teams.Apps.CompanyCommunicator.Common.Services.User;
using Microsoft.Teams.Apps.CompanyCommunicator.Prep.Func.Export.Streams;
@@ -129,6 +130,7 @@ public override void Configure(IFunctionsHostBuilder builder)
builder.Services.AddTransient();
builder.Services.AddTransient();
builder.Services.AddTransient();
+ builder.Services.AddTransient();
// Add Teams services.
builder.Services.AddTransient();
diff --git a/Source/CompanyCommunicator.Prep.Func/host.json b/Source/CompanyCommunicator.Prep.Func/host.json
index d92266801..a8122dec9 100644
--- a/Source/CompanyCommunicator.Prep.Func/host.json
+++ b/Source/CompanyCommunicator.Prep.Func/host.json
@@ -14,8 +14,8 @@
},
"extendedSessionsEnabled": true,
"extendedSessionIdleTimeoutInSeconds": 60,
- "maxConcurrentOrchestratorFunctions": 5,
- "maxConcurrentActivityFunctions": 30
+ "maxConcurrentOrchestratorFunctions": 3,
+ "maxConcurrentActivityFunctions": 10
}
},
"functionTimeout": "01:00:00",
diff --git a/Source/CompanyCommunicator/ClientApp/package-lock.json b/Source/CompanyCommunicator/ClientApp/package-lock.json
index 68e7743ab..593b7edd9 100644
--- a/Source/CompanyCommunicator/ClientApp/package-lock.json
+++ b/Source/CompanyCommunicator/ClientApp/package-lock.json
@@ -1,6 +1,6 @@
{
"name": "company-communicator",
- "version": "4.1.3",
+ "version": "4.1.4",
"lockfileVersion": 1,
"requires": true,
"dependencies": {
@@ -12087,9 +12087,9 @@
"integrity": "sha1-QRyttXTFoUDTpLGRDUDYDMn0C0A="
},
"path-parse": {
- "version": "1.0.6",
- "resolved": "https://registry.npmjs.org/path-parse/-/path-parse-1.0.6.tgz",
- "integrity": "sha512-GSmOT2EbHrINBf9SR7CDELwlJ8AENk3Qn7OikK4nFYAu3Ote2+JYNVvkpAEQm3/TLNEJFD/xZJjzyxg3KBWOzw=="
+ "version": "1.0.7",
+ "resolved": "https://registry.npmjs.org/path-parse/-/path-parse-1.0.7.tgz",
+ "integrity": "sha512-LDJzPVEEEPR+y48z93A0Ed0yXb8pAByGWo/k5YYdYgpY2/2EsOsksJrq7lOHxryrVOn1ejG6oAp8ahvOIQD8sw=="
},
"path-to-regexp": {
"version": "0.1.7",
@@ -15809,9 +15809,9 @@
"integrity": "sha512-4WK/bYZmj8xLr+HUCODHGF1ZFzsYffasLUgEiMBY4fgtltdO6B4WJtlSbPaDTLpYTcGVwM2qLnFTICEcNxs3kA=="
},
"tar": {
- "version": "6.1.0",
- "resolved": "https://registry.npmjs.org/tar/-/tar-6.1.0.tgz",
- "integrity": "sha512-DUCttfhsnLCjwoDoFcI+B2iJgYa93vBnDUATYEeRx6sntCTdN01VnqsIuTlALXla/LWooNg0yEGeB+Y8WdFxGA==",
+ "version": "6.1.11",
+ "resolved": "https://registry.npmjs.org/tar/-/tar-6.1.11.tgz",
+ "integrity": "sha512-an/KZQzQUkZCkuoAA64hM92X0Urb6VpRhAFllDzz44U2mcD5scmT3zBc4VgVpkugF580+DQn8eAFSyoQt0tznA==",
"requires": {
"chownr": "^2.0.0",
"fs-minipass": "^2.0.0",
@@ -16488,9 +16488,9 @@
}
},
"url-parse": {
- "version": "1.5.1",
- "resolved": "https://registry.npmjs.org/url-parse/-/url-parse-1.5.1.tgz",
- "integrity": "sha512-HOfCOUJt7iSYzEx/UqgtwKRMC6EU91NFhsCHMv9oM03VJcVo2Qrp8T8kI9D7amFf1cu+/3CEhgb3rF9zL7k85Q==",
+ "version": "1.5.3",
+ "resolved": "https://registry.npmjs.org/url-parse/-/url-parse-1.5.3.tgz",
+ "integrity": "sha512-IIORyIQD9rvj0A4CLWsHkBBJuNqWpFQe224b6j9t/ABmquIS0qDU2pY6kl6AuOrL5OkCXHMCFNe1jBcuAggjvQ==",
"requires": {
"querystringify": "^2.1.1",
"requires-port": "^1.0.0"
@@ -18160,4 +18160,4 @@
"integrity": "sha512-rVksvsnNCdJ/ohGc6xgPwyN8eheCxsiLM8mxuE/t/mOVqJewPuO1miLpTHQiRgTKCLexL4MeAFVagts7HmNZ2Q=="
}
}
-}
\ No newline at end of file
+}
diff --git a/Source/CompanyCommunicator/ClientApp/package.json b/Source/CompanyCommunicator/ClientApp/package.json
index 3836e9c1e..0b76cc079 100644
--- a/Source/CompanyCommunicator/ClientApp/package.json
+++ b/Source/CompanyCommunicator/ClientApp/package.json
@@ -1,6 +1,6 @@
{
"name": "company-communicator",
- "version": "4.1.3",
+ "version": "4.1.4",
"private": true,
"dependencies": {
"@fluentui/react-northstar": "^0.52.0",
diff --git a/Source/Test/CompanyCommunicator.Common.Test/Extensions/EnumerableExtensionsTest.cs b/Source/Test/CompanyCommunicator.Common.Test/Extensions/EnumerableExtensionsTest.cs
new file mode 100644
index 000000000..121b26cd3
--- /dev/null
+++ b/Source/Test/CompanyCommunicator.Common.Test/Extensions/EnumerableExtensionsTest.cs
@@ -0,0 +1,99 @@
+//
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+//
+
+namespace Microsoft.Teams.App.CompanyCommunicator.Common.Test.Extensions
+{
+ using System.Collections.Generic;
+ using System.Linq;
+ using Microsoft.Teams.Apps.CompanyCommunicator.Common.Extensions;
+ using Xunit;
+
+ ///
+ /// Enumerable Extensions Test.
+ ///
+ public class EnumerableExtensionsTest
+ {
+ ///
+ /// Gets data for AsBatches Test.
+ /// Format: { each batch size, expected count of batches, list of data }.
+ ///
+ public static IEnumerable