diff --git a/Source/MQTTnet.Extensions.ManagedClient/IManagedMqttClientStorage.cs b/Source/MQTTnet.Extensions.ManagedClient/IManagedMqttClientStorage.cs index 09bd4105d..430d51a09 100644 --- a/Source/MQTTnet.Extensions.ManagedClient/IManagedMqttClientStorage.cs +++ b/Source/MQTTnet.Extensions.ManagedClient/IManagedMqttClientStorage.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for more information. using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; namespace MQTTnet.Extensions.ManagedClient @@ -11,6 +12,6 @@ public interface IManagedMqttClientStorage { Task SaveQueuedMessagesAsync(IList messages); - Task> LoadQueuedMessagesAsync(); + IAsyncEnumerable LoadQueuedMessagesAsync(CancellationToken cancellationToken = default); } -} +} \ No newline at end of file diff --git a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs index 109eb69a3..b08207b1f 100644 --- a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs +++ b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs @@ -249,17 +249,6 @@ public async Task StartAsync(ManagedMqttClientOptions options) Options = options; - if (options.Storage != null) - { - _storageManager = new ManagedMqttClientStorageManager(options.Storage); - var messages = await _storageManager.LoadQueuedMessagesAsync().ConfigureAwait(false); - - foreach (var message in messages) - { - _messageQueue.Enqueue(message); - } - } - var cancellationTokenSource = new CancellationTokenSource(); var cancellationToken = cancellationTokenSource.Token; _connectionCancellationToken = cancellationTokenSource; @@ -444,6 +433,48 @@ async Task MaintainConnectionAsync(CancellationToken cancellationToken) } } + async Task PublishStoredMessagesAsync(CancellationToken cancellationToken) + { + + + try + { + _storageManager = new ManagedMqttClientStorageManager(Options.Storage); + + while (!cancellationToken.IsCancellationRequested && InternalClient.IsConnected) + { + + cancellationToken.ThrowIfCancellationRequested(); + + if (Options.Storage != null) + { + + await foreach (var msg in _storageManager.LoadQueuedMessagesAsync().ConfigureAwait(false)) + { + + await EnqueueAsync(msg); + + cancellationToken.ThrowIfCancellationRequested(); + + await TryPublishQueuedMessageAsync(msg, cancellationToken).ConfigureAwait(false); + } + } + + } + } + catch (OperationCanceledException) + { + } + catch (Exception exception) + { + _logger.Error(exception, "Error while publishing stored application messages."); + } + finally + { + _logger.Verbose("Stopped publishing stored messages."); + } + } + async Task PublishQueuedMessagesAsync(CancellationToken cancellationToken) { try @@ -665,7 +696,7 @@ async Task SendSubscribeUnsubscribe(List PublishQueuedMessagesAsync(cancellationToken), cancellationToken).RunInBackground(_logger); } @@ -717,11 +749,11 @@ async Task TryMaintainConnectionAsync(CancellationToken cancellationToken) else if (connectionState == ReconnectionResult.Reconnected) { await PublishReconnectSubscriptionsAsync(cancellationToken).ConfigureAwait(false); - StartPublishing(); + await StartPublishing(); } else if (connectionState == ReconnectionResult.Recovered) { - StartPublishing(); + await StartPublishing(); } else if (connectionState == ReconnectionResult.StillConnected) { diff --git a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClientStorageManager.cs b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClientStorageManager.cs index bd1c48c20..fe027bece 100644 --- a/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClientStorageManager.cs +++ b/Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClientStorageManager.cs @@ -4,6 +4,7 @@ using System; using System.Collections.Generic; +using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using MQTTnet.Internal; @@ -22,12 +23,13 @@ public ManagedMqttClientStorageManager(IManagedMqttClientStorage storage) _storage = storage ?? throw new ArgumentNullException(nameof(storage)); } - public async Task> LoadQueuedMessagesAsync() + public async IAsyncEnumerable LoadQueuedMessagesAsync([EnumeratorCancellation] CancellationToken cancellationToken = default) { - var loadedMessages = await _storage.LoadQueuedMessagesAsync().ConfigureAwait(false); - _messages.AddRange(loadedMessages); - - return _messages; + await foreach (var message in _storage.LoadQueuedMessagesAsync(cancellationToken).ConfigureAwait(false)) + { + _messages.Add(message); + yield return message; + } } public async Task AddAsync(ManagedMqttApplicationMessage applicationMessage) @@ -63,4 +65,4 @@ private Task SaveAsync() return _storage.SaveQueuedMessagesAsync(_messages); } } -} +} \ No newline at end of file