Skip to content

Commit

Permalink
feat: or-1349 add stop-start shards
Browse files Browse the repository at this point in the history
  • Loading branch information
QuintenGreenstack committed Jan 24, 2024
1 parent fef2382 commit 5d183fd
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ public static void AddProjectionEndpoints(this WebApplication app, RebuildConfig

StartRebuild(logger, projectionName: "Detail", rebuildFunc: async () =>
{
await projectionDaemon.RebuildProjection<BeheerVerenigingDetailProjection>(shardTimeout,CancellationToken.None);
await projectionDaemon.StopRebuildStart<BeheerVerenigingDetailProjection>(shardTimeout);
});

StartRebuild(logger, projectionName: "Historiek", rebuildFunc: async () =>
{
await projectionDaemon.RebuildProjection<BeheerVerenigingHistoriekProjection>(shardTimeout,CancellationToken.None);
await projectionDaemon.StopRebuildStart<BeheerVerenigingHistoriekProjection>(shardTimeout);
});

StartRebuild(logger, projectionName: "Search", rebuildFunc: async () =>
Expand All @@ -51,7 +51,7 @@ public static void AddProjectionEndpoints(this WebApplication app, RebuildConfig
StartRebuild(logger, projectionName: "Detail", rebuildFunc: async () =>
{
var projectionDaemon = await store.BuildProjectionDaemonAsync();
await projectionDaemon.RebuildProjection<BeheerVerenigingDetailProjection>(shardTimeout,CancellationToken.None);
await projectionDaemon.StopRebuildStart<BeheerVerenigingDetailProjection>(shardTimeout);
});

return Results.Accepted();
Expand All @@ -64,7 +64,7 @@ public static void AddProjectionEndpoints(this WebApplication app, RebuildConfig
StartRebuild(logger, projectionName: "Historiek", rebuildFunc: async () =>
{
var projectionDaemon = await store.BuildProjectionDaemonAsync();
await projectionDaemon.RebuildProjection<BeheerVerenigingHistoriekProjection>(shardTimeout,CancellationToken.None);
await projectionDaemon.StopRebuildStart<BeheerVerenigingHistoriekProjection>(shardTimeout);
});

return Results.Accepted();
Expand Down Expand Up @@ -93,6 +93,15 @@ public static void AddProjectionEndpoints(this WebApplication app, RebuildConfig
await store.Advanced.AllProjectionProgress(token: cancellationToken));
}

private static async Task StopRebuildStart<TProjection>(this IProjectionDaemon projectionDaemon, TimeSpan shardTimeout)
{
await projectionDaemon.StopShard($"{typeof(TProjection).FullName}:All");
await projectionDaemon.RebuildProjection<TProjection>(shardTimeout, CancellationToken.None);

await projectionDaemon.StartShard($"{typeof(TProjection).FullName}:All",
CancellationToken.None);
}

private static void StartRebuild(ILogger logger, string projectionName, Func<Task> rebuildFunc)
{
_ = Task.Run(async () =>
Expand Down Expand Up @@ -126,7 +135,7 @@ private static async Task RebuildElasticProjections(

await elasticClient.Indices.DeleteAsync(options.Indices.DuplicateDetection, ct: CancellationToken.None).ThrowIfInvalidAsync();
await elasticClient.Indices.CreateDuplicateDetectionIndexAsync(options.Indices.DuplicateDetection).ThrowIfInvalidAsync();
await projectionDaemon.RebuildProjection(ProjectionNames.VerenigingZoeken,shardTimeout, CancellationToken.None);
await projectionDaemon.RebuildProjection(ProjectionNames.VerenigingZoeken, shardTimeout, CancellationToken.None);

await elasticClient.Indices.PutAliasAsync(newIndicesVerenigingen, options.Indices.Verenigingen, ct: CancellationToken.None);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ namespace AssociationRegistry.Public.ProjectionHost.Extensions;
using Infrastructure.ConfigurationBindings;
using Infrastructure.Extensions;
using Marten;
using Marten.Events.Daemon;
using Nest;
using Projections;
using Projections.Detail;
Expand All @@ -24,7 +25,7 @@ public static void AddProjectionEndpoints(this WebApplication app, RebuildConfig
StartRebuild(logger, projectionName: "Detail", rebuildFunc: async () =>
{
var projectionDaemon = await store.BuildProjectionDaemonAsync();
await projectionDaemon.RebuildProjection<PubliekVerenigingDetailProjection>(shardTimeout, CancellationToken.None);
await projectionDaemon.StopRebuildStart<PubliekVerenigingDetailProjection>(shardTimeout);
});

StartRebuild(logger, projectionName: "Search", rebuildFunc: async () =>
Expand All @@ -49,7 +50,7 @@ public static void AddProjectionEndpoints(this WebApplication app, RebuildConfig
StartRebuild(logger, projectionName: "Detail", rebuildFunc: async () =>
{
var projectionDaemon = await store.BuildProjectionDaemonAsync();
await projectionDaemon.RebuildProjection<PubliekVerenigingDetailProjection>(shardTimeout, CancellationToken.None);
await projectionDaemon.StopRebuildStart<PubliekVerenigingDetailProjection>(shardTimeout);
});

return Results.Accepted();
Expand Down Expand Up @@ -83,6 +84,14 @@ public static void AddProjectionEndpoints(this WebApplication app, RebuildConfig
=> store.Advanced.AllProjectionProgress(token: cancellationToken));
}

private static async Task StopRebuildStart<TProjection>(this IProjectionDaemon projectionDaemon, TimeSpan shardTimeout)
{
await projectionDaemon.StopShard($"{typeof(TProjection).FullName}:All");
await projectionDaemon.RebuildProjection<TProjection>(shardTimeout, CancellationToken.None);
await projectionDaemon.StartShard($"{typeof(TProjection).FullName}:All",
CancellationToken.None);
}

private static void StartRebuild(ILogger logger, string projectionName, Func<Task> rebuildFunc)
{
_ = Task.Run(async () =>
Expand Down

0 comments on commit 5d183fd

Please sign in to comment.