Skip to content

Commit

Permalink
fix: use hotcold and retry mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
koenmetsu committed Jan 13, 2024
1 parent da8803b commit 00a1351
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public static IServiceCollection AddMarten(
opts.Events.MetadataConfig.EnableAll();
opts.AddPostgresProjections(serviceProvider);

opts.Projections.DaemonLockId = 2;

opts.RegisterDocumentType<VerenigingenPerInszDocument>();
opts.RegisterDocumentType<VerenigingDocument>();

Expand All @@ -52,7 +54,7 @@ public static IServiceCollection AddMarten(
martenConfiguration.ApplyAllDatabaseChangesOnStartup();

if (configuration["ProjectionDaemonDisabled"]?.ToLowerInvariant() != "true")
martenConfiguration.AddAsyncDaemon(DaemonMode.Solo);
martenConfiguration.AddAsyncDaemon(DaemonMode.HotCold);

return services;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ namespace AssociationRegistry.Admin.ProjectionHost.Infrastructure.Program.WebApp
using Projections.Detail;
using Projections.Historiek;
using Projections.Search;
using Projections.Search.Zoeken;
using Schema.Detail;
using Schema.Historiek;
using System.Configuration;
Expand All @@ -33,7 +34,7 @@ public static IServiceCollection ConfigureProjectionsWithMarten(
var martenConfiguration = AddMarten(source, configurationManager);

if (configurationManager["ProjectionDaemonDisabled"]?.ToLowerInvariant() != "true")
martenConfiguration.AddAsyncDaemon(DaemonMode.Solo);
martenConfiguration.AddAsyncDaemon(DaemonMode.HotCold);

return source;
}
Expand Down Expand Up @@ -80,7 +81,9 @@ static JsonNetSerializer CreateCustomMartenSerializer()

opts.Events.MetadataConfig.EnableAll();

opts.Projections.OnException(_ => true).Stop();
opts.Projections.OnException(_ => true).RetryLater(TimeSpan.FromSeconds(2));

opts.Projections.DaemonLockId = 1;

opts.Projections.AsyncListeners.Add(
new ProjectionStateListener(serviceProvider.GetRequiredService<AdminInstrumentation>()));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
namespace AssociationRegistry.Admin.ProjectionHost.Projections.Search;

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Marten;
using Marten.Events;
using Marten.Events.Projections;
using Polly;
using Polly.Retry;

public class MartenSubscription : IProjection
{
private readonly IMartenEventsConsumer _consumer;
private readonly AsyncRetryPolicy _retryPolicy;

public MartenSubscription(IMartenEventsConsumer consumer)
{
_consumer = consumer;
var maxDelay = TimeSpan.FromSeconds(30); // Set the maximum delay limit here

_retryPolicy = Policy
.Handle<Exception>()
.WaitAndRetryForeverAsync(retryAttempt =>
TimeSpan.FromSeconds(Math.Min(Math.Pow(x: 2, retryAttempt), maxDelay.TotalSeconds)));
}

public void Apply(
Expand All @@ -25,10 +30,10 @@ IReadOnlyList<StreamAction> streams
throw new NotSupportedException("Subscription should be only run asynchronously");
}

public Task ApplyAsync(
public async Task ApplyAsync(
IDocumentOperations operations,
IReadOnlyList<StreamAction> streams,
CancellationToken ct
)
=> _consumer.ConsumeAsync(streams);
=> await _retryPolicy.ExecuteAsync(() => _consumer.ConsumeAsync(streams));
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ System.Configuration.ConfigurationManager
Destructurama.JsonNet

FluentValidation

Polly


Marten
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public static IServiceCollection ConfigureProjectionsWithMarten(
var martenConfiguration = AddMarten(source, configurationManager);

if (configurationManager["ProjectionDaemonDisabled"]?.ToLowerInvariant() != "true")
martenConfiguration.AddAsyncDaemon(DaemonMode.Solo);
martenConfiguration.AddAsyncDaemon(DaemonMode.HotCold);

return source;
}
Expand Down Expand Up @@ -73,9 +73,11 @@ static JsonNetSerializer CreateCustomMartenSerializer()

opts.Events.StreamIdentity = StreamIdentity.AsString;

opts.Projections.DaemonLockId = 3;

opts.Events.MetadataConfig.EnableAll();

opts.Projections.OnException(_ => true).Stop();
opts.Projections.OnException(_ => true).RetryLater(TimeSpan.FromSeconds(2));

opts.Projections.Add<PubliekVerenigingDetailProjection>(ProjectionLifecycle.Async);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,24 @@ namespace AssociationRegistry.Public.ProjectionHost.Projections.Search;
using Marten;
using Marten.Events;
using Marten.Events.Projections;
using Polly;
using Polly.Retry;

public class MartenSubscription : IProjection
{
private readonly IMartenEventsConsumer _consumer;
private readonly AsyncRetryPolicy _retryPolicy;

public MartenSubscription(IMartenEventsConsumer consumer)
{
_consumer = consumer;

var maxDelay = TimeSpan.FromSeconds(30); // Set the maximum delay limit here
_retryPolicy = Policy
.Handle<Exception>()
.WaitAndRetryForeverAsync(retryAttempt =>
TimeSpan.FromSeconds(Math.Min(Math.Pow(2, retryAttempt), maxDelay.TotalSeconds)));

}

public void Apply(
Expand All @@ -25,10 +35,10 @@ IReadOnlyList<StreamAction> streams
throw new NotSupportedException("Subscription should be only run asynchronously");
}

public Task ApplyAsync(
public async Task ApplyAsync(
IDocumentOperations operations,
IReadOnlyList<StreamAction> streams,
CancellationToken ct
)
=> _consumer.ConsumeAsync(streams);
=> await _retryPolicy.ExecuteAsync(() => _consumer.ConsumeAsync(streams));
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ Serilog.Sinks.File
Destructurama.JsonNet

FluentValidation

Polly


Marten
Expand Down

0 comments on commit 00a1351

Please sign in to comment.