From 0d6e522f2702659be8abaf14f45ce0219eeb2a65 Mon Sep 17 00:00:00 2001 From: Koen Metsu Date: Sat, 13 Jan 2024 12:35:10 +0100 Subject: [PATCH] fix: use hotcold and retry mechanism --- .../Extensions/MartenExtensions.cs | 4 +++- .../ConfigureMartenExtensions.cs | 7 +++++-- .../Projections/Search/MartenSubscription.cs | 17 +++++++++++------ .../paket.references | 2 +- .../ConfigureMartenExtensions.cs | 6 ++++-- .../Projections/Search/MartenSubscription.cs | 14 ++++++++++++-- .../paket.references | 2 +- 7 files changed, 37 insertions(+), 15 deletions(-) diff --git a/src/AssociationRegistry.Acm.Api/Infrastructure/Extensions/MartenExtensions.cs b/src/AssociationRegistry.Acm.Api/Infrastructure/Extensions/MartenExtensions.cs index ecad0e639..450e0a554 100644 --- a/src/AssociationRegistry.Acm.Api/Infrastructure/Extensions/MartenExtensions.cs +++ b/src/AssociationRegistry.Acm.Api/Infrastructure/Extensions/MartenExtensions.cs @@ -33,6 +33,8 @@ public static IServiceCollection AddMarten( opts.Events.MetadataConfig.EnableAll(); opts.AddPostgresProjections(serviceProvider); + opts.Projections.DaemonLockId = 2; + opts.RegisterDocumentType(); opts.RegisterDocumentType(); @@ -52,7 +54,7 @@ public static IServiceCollection AddMarten( martenConfiguration.ApplyAllDatabaseChangesOnStartup(); if (configuration["ProjectionDaemonDisabled"]?.ToLowerInvariant() != "true") - martenConfiguration.AddAsyncDaemon(DaemonMode.Solo); + martenConfiguration.AddAsyncDaemon(DaemonMode.HotCold); return services; } diff --git a/src/AssociationRegistry.Admin.ProjectionHost/Infrastructure/Program/WebApplicationBuilder/ConfigureMartenExtensions.cs b/src/AssociationRegistry.Admin.ProjectionHost/Infrastructure/Program/WebApplicationBuilder/ConfigureMartenExtensions.cs index 90a050f31..d72540372 100644 --- a/src/AssociationRegistry.Admin.ProjectionHost/Infrastructure/Program/WebApplicationBuilder/ConfigureMartenExtensions.cs +++ b/src/AssociationRegistry.Admin.ProjectionHost/Infrastructure/Program/WebApplicationBuilder/ConfigureMartenExtensions.cs @@ -15,6 +15,7 @@ namespace AssociationRegistry.Admin.ProjectionHost.Infrastructure.Program.WebApp using Projections.Detail; using Projections.Historiek; using Projections.Search; +using Projections.Search.Zoeken; using Schema.Detail; using Schema.Historiek; using System.Configuration; @@ -33,7 +34,7 @@ public static IServiceCollection ConfigureProjectionsWithMarten( var martenConfiguration = AddMarten(source, configurationManager); if (configurationManager["ProjectionDaemonDisabled"]?.ToLowerInvariant() != "true") - martenConfiguration.AddAsyncDaemon(DaemonMode.Solo); + martenConfiguration.AddAsyncDaemon(DaemonMode.HotCold); return source; } @@ -80,7 +81,9 @@ static JsonNetSerializer CreateCustomMartenSerializer() opts.Events.MetadataConfig.EnableAll(); - opts.Projections.OnException(_ => true).Stop(); + opts.Projections.OnException(_ => true).RetryLater(TimeSpan.FromSeconds(2)); + + opts.Projections.DaemonLockId = 1; opts.Projections.AsyncListeners.Add( new ProjectionStateListener(serviceProvider.GetRequiredService())); diff --git a/src/AssociationRegistry.Admin.ProjectionHost/Projections/Search/MartenSubscription.cs b/src/AssociationRegistry.Admin.ProjectionHost/Projections/Search/MartenSubscription.cs index 36abb9e61..403ec9886 100644 --- a/src/AssociationRegistry.Admin.ProjectionHost/Projections/Search/MartenSubscription.cs +++ b/src/AssociationRegistry.Admin.ProjectionHost/Projections/Search/MartenSubscription.cs @@ -1,20 +1,25 @@ namespace AssociationRegistry.Admin.ProjectionHost.Projections.Search; -using System; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; using Marten; using Marten.Events; using Marten.Events.Projections; +using Polly; +using Polly.Retry; public class MartenSubscription : IProjection { private readonly IMartenEventsConsumer _consumer; + private readonly AsyncRetryPolicy _retryPolicy; public MartenSubscription(IMartenEventsConsumer consumer) { _consumer = consumer; + var maxDelay = TimeSpan.FromSeconds(30); // Set the maximum delay limit here + + _retryPolicy = Policy + .Handle() + .WaitAndRetryForeverAsync(retryAttempt => + TimeSpan.FromSeconds(Math.Min(Math.Pow(x: 2, retryAttempt), maxDelay.TotalSeconds))); } public void Apply( @@ -25,10 +30,10 @@ IReadOnlyList streams throw new NotSupportedException("Subscription should be only run asynchronously"); } - public Task ApplyAsync( + public async Task ApplyAsync( IDocumentOperations operations, IReadOnlyList streams, CancellationToken ct ) - => _consumer.ConsumeAsync(streams); + => await _retryPolicy.ExecuteAsync(() => _consumer.ConsumeAsync(streams)); } diff --git a/src/AssociationRegistry.Admin.ProjectionHost/paket.references b/src/AssociationRegistry.Admin.ProjectionHost/paket.references index 8b009d5f5..abbb685a3 100755 --- a/src/AssociationRegistry.Admin.ProjectionHost/paket.references +++ b/src/AssociationRegistry.Admin.ProjectionHost/paket.references @@ -32,7 +32,7 @@ System.Configuration.ConfigurationManager Destructurama.JsonNet FluentValidation - +Polly Marten diff --git a/src/AssociationRegistry.Public.ProjectionHost/Infrastructure/Program/WebApplicationBuilder/ConfigureMartenExtensions.cs b/src/AssociationRegistry.Public.ProjectionHost/Infrastructure/Program/WebApplicationBuilder/ConfigureMartenExtensions.cs index c0cb5ff79..16dc043e1 100644 --- a/src/AssociationRegistry.Public.ProjectionHost/Infrastructure/Program/WebApplicationBuilder/ConfigureMartenExtensions.cs +++ b/src/AssociationRegistry.Public.ProjectionHost/Infrastructure/Program/WebApplicationBuilder/ConfigureMartenExtensions.cs @@ -29,7 +29,7 @@ public static IServiceCollection ConfigureProjectionsWithMarten( var martenConfiguration = AddMarten(source, configurationManager); if (configurationManager["ProjectionDaemonDisabled"]?.ToLowerInvariant() != "true") - martenConfiguration.AddAsyncDaemon(DaemonMode.Solo); + martenConfiguration.AddAsyncDaemon(DaemonMode.HotCold); return source; } @@ -73,9 +73,11 @@ static JsonNetSerializer CreateCustomMartenSerializer() opts.Events.StreamIdentity = StreamIdentity.AsString; + opts.Projections.DaemonLockId = 3; + opts.Events.MetadataConfig.EnableAll(); - opts.Projections.OnException(_ => true).Stop(); + opts.Projections.OnException(_ => true).RetryLater(TimeSpan.FromSeconds(2)); opts.Projections.Add(ProjectionLifecycle.Async); diff --git a/src/AssociationRegistry.Public.ProjectionHost/Projections/Search/MartenSubscription.cs b/src/AssociationRegistry.Public.ProjectionHost/Projections/Search/MartenSubscription.cs index 5b184f43d..aef096c4b 100644 --- a/src/AssociationRegistry.Public.ProjectionHost/Projections/Search/MartenSubscription.cs +++ b/src/AssociationRegistry.Public.ProjectionHost/Projections/Search/MartenSubscription.cs @@ -7,14 +7,24 @@ namespace AssociationRegistry.Public.ProjectionHost.Projections.Search; using Marten; using Marten.Events; using Marten.Events.Projections; +using Polly; +using Polly.Retry; public class MartenSubscription : IProjection { private readonly IMartenEventsConsumer _consumer; + private readonly AsyncRetryPolicy _retryPolicy; public MartenSubscription(IMartenEventsConsumer consumer) { _consumer = consumer; + + var maxDelay = TimeSpan.FromSeconds(30); // Set the maximum delay limit here + _retryPolicy = Policy + .Handle() + .WaitAndRetryForeverAsync(retryAttempt => + TimeSpan.FromSeconds(Math.Min(Math.Pow(2, retryAttempt), maxDelay.TotalSeconds))); + } public void Apply( @@ -25,10 +35,10 @@ IReadOnlyList streams throw new NotSupportedException("Subscription should be only run asynchronously"); } - public Task ApplyAsync( + public async Task ApplyAsync( IDocumentOperations operations, IReadOnlyList streams, CancellationToken ct ) - => _consumer.ConsumeAsync(streams); + => await _retryPolicy.ExecuteAsync(() => _consumer.ConsumeAsync(streams)); } diff --git a/src/AssociationRegistry.Public.ProjectionHost/paket.references b/src/AssociationRegistry.Public.ProjectionHost/paket.references index ab3066584..12f0dd953 100755 --- a/src/AssociationRegistry.Public.ProjectionHost/paket.references +++ b/src/AssociationRegistry.Public.ProjectionHost/paket.references @@ -30,7 +30,7 @@ Serilog.Sinks.File Destructurama.JsonNet FluentValidation - +Polly Marten