Skip to content

Commit

Permalink
make commands idempontent
Browse files Browse the repository at this point in the history
  • Loading branch information
rikdepeuter committed May 17, 2024
1 parent 26f4807 commit 36377ca
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 86 deletions.
15 changes: 14 additions & 1 deletion src/ParcelRegistry.Consumer.Address.Console/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ namespace ParcelRegistry.Consumer.Address.Console
using Autofac;
using Autofac.Extensions.DependencyInjection;
using Be.Vlaanderen.Basisregisters.Aws.DistributedMutex;
using Be.Vlaanderen.Basisregisters.CommandHandling.Idempotency;
using Be.Vlaanderen.Basisregisters.EventHandling;
using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka;
using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer;
using Destructurama;
using Infrastructure;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
Expand All @@ -25,6 +25,7 @@ namespace ParcelRegistry.Consumer.Address.Console
using Serilog;
using Serilog.Debugging;
using Serilog.Extensions.Logging;
using MigrationsHelper = Infrastructure.MigrationsHelper;

public sealed class Program
{
Expand Down Expand Up @@ -145,6 +146,18 @@ public static async Task Main(string[] args)
.As<IIdempotentConsumer<ConsumerAddressContext>>()
.SingleInstance();

services.ConfigureIdempotency(
hostContext.Configuration.GetSection(IdempotencyConfiguration.Section)
.Get<IdempotencyConfiguration>()!.ConnectionString!,
new IdempotencyMigrationsTableInfo(Schema.Import),
new IdempotencyTableInfo(Schema.Import),
loggerFactory);

builder.RegisterType<IdempotentCommandHandler>()
.As<IIdempotentCommandHandler>()
.AsSelf()
.InstancePerLifetimeScope();

builder
.RegisterModule(new EditModule(hostContext.Configuration))
.RegisterModule(new BackOfficeModule(hostContext.Configuration, services, loggerFactory, ServiceLifetime.Transient));
Expand Down
4 changes: 4 additions & 0 deletions src/ParcelRegistry.Consumer.Address.Console/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
"BackOffice": "Server=(localdb)\\mssqllocaldb;Database=EFProviders.InMemory.ParcelRegistry;Trusted_Connection=True;TrustServerCertificate=True;"
},

"Idempotency": {
"ConnectionString": "Server=(localdb)\\mssqllocaldb;Database=EFProviders.InMemory.ParcelRegistry;Trusted_Connection=True;TrustServerCertificate=True;"
},

"Kafka": {
"BootstrapServers": "localhost:29092/"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ namespace ParcelRegistry.Consumer.Address.Projections
using System.Threading;
using System.Threading.Tasks;
using Api.BackOffice.Abstractions;
using Be.Vlaanderen.Basisregisters.CommandHandling.Idempotency;
using Be.Vlaanderen.Basisregisters.GrAr.Contracts.AddressRegistry;
using Be.Vlaanderen.Basisregisters.GrAr.Provenance;
using Be.Vlaanderen.Basisregisters.ProjectionHandling.Connector;
Expand Down Expand Up @@ -198,9 +199,18 @@ await DetachBecauseRemoved(

foreach (var command in commandByParcels)
{
await commandHandler.Handle(command, ct);
try
{
await commandHandler.HandleIdempotent(command, ct);
}
catch (IdempotencyException)
{
// do nothing
}
}

await backOfficeContext.Database.BeginTransactionAsync();

foreach (var parcelId in commandByParcels.Select(x => x.ParcelId))
{
var parcel = await parcels.GetAsync(new ParcelStreamId(parcelId), ct);
Expand All @@ -226,6 +236,8 @@ await DetachBecauseRemoved(
await backOfficeContext.AddIdempotentParcelAddressRelation(parcelId, addressPersistentLocalId, ct);
}
}

await backOfficeContext.Database.CommitTransactionAsync();
});

When<AddressWasRejectedBecauseOfReaddress>(async (commandHandler, message, ct) =>
Expand Down
40 changes: 5 additions & 35 deletions src/ParcelRegistry.Projections.BackOffice/BackOfficeProjections.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
namespace ParcelRegistry.Projections.BackOffice
namespace ParcelRegistry.Projections.BackOffice
{
using System.Threading.Tasks;
using Api.BackOffice.Abstractions;
using Be.Vlaanderen.Basisregisters.ProjectionHandling.Connector;
using Be.Vlaanderen.Basisregisters.ProjectionHandling.SqlStreamStore;
Expand Down Expand Up @@ -107,41 +108,10 @@ await backOfficeContext.AddIdempotentParcelAddressRelation(
}
});

When<Envelope<ParcelAddressesWereReaddressed>>(async (_, message, cancellationToken) =>
When<Envelope<ParcelAddressesWereReaddressed>>((_, message, cancellationToken) =>
{
await using var backOfficeContext = await backOfficeContextFactory.CreateDbContextAsync(cancellationToken);

foreach (var addressPersistentLocalId in message.Message.DetachedAddressPersistentLocalIds)
{
var relation = await backOfficeContext.FindParcelAddressRelation(
new ParcelId(message.Message.ParcelId),
new AddressPersistentLocalId(addressPersistentLocalId),
cancellationToken);

if (relation is not null)
{
await backOfficeContext.RemoveIdempotentParcelAddressRelation(
new ParcelId(message.Message.ParcelId),
new AddressPersistentLocalId(addressPersistentLocalId),
cancellationToken);
}
}

foreach (var addressPersistentLocalId in message.Message.AttachedAddressPersistentLocalIds)
{
var relation = await backOfficeContext.FindParcelAddressRelation(
new ParcelId(message.Message.ParcelId),
new AddressPersistentLocalId(addressPersistentLocalId),
cancellationToken);

if (relation is null)
{
await backOfficeContext.AddIdempotentParcelAddressRelation(
new ParcelId(message.Message.ParcelId),
new AddressPersistentLocalId(addressPersistentLocalId),
cancellationToken);
}
}
// Do nothing
return Task.CompletedTask;
});
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace ParcelRegistry.Tests.ProjectionTests.BackOffice
namespace ParcelRegistry.Tests.ProjectionTests.BackOffice
{
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -91,51 +91,5 @@ await Sut
newAddressParcelRelation!.Count.Should().Be(2);
});
}

[Fact]
public async Task GivenParcelAddressesWereReaddressed_ThenAddressesAreAttachedAndDetached()
{
_fixture.Customize(new WithFixedParcelId());

var attachedAddressPersistentLocalIds = new[] { 1, 2, 3 };
var detachedAddressPersistentLocalIds = new[] { 4, 5, 6 };

var eventBuilder = new ParcelAddressesWereReaddressedBuilder(_fixture);

foreach (var addressPersistentLocalId in attachedAddressPersistentLocalIds)
{
eventBuilder.WithAttachedAddress(addressPersistentLocalId);
}

foreach (var addressPersistentLocalId in detachedAddressPersistentLocalIds)
{
await AddRelation(_fixture.Create<ParcelId>(), addressPersistentLocalId);
eventBuilder.WithDetachedAddress(addressPersistentLocalId);
}

var @event = eventBuilder.Build();

await Sut
.Given(@event)
.Then(async _ =>
{
foreach (var addressPersistentLocalId in attachedAddressPersistentLocalIds)
{
var parcelAddressRelation = await _backOfficeContext.ParcelAddressRelations.FindAsync(
@event.ParcelId,
addressPersistentLocalId);
parcelAddressRelation.Should().NotBeNull();
parcelAddressRelation!.Count.Should().Be(1);
}

foreach (var addressPersistentLocalId in detachedAddressPersistentLocalIds)
{
var parcelAddressRelation = await _backOfficeContext.ParcelAddressRelations.FindAsync(
@event.ParcelId,
addressPersistentLocalId);
parcelAddressRelation.Should().BeNull();
}
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public async Task AttachAndDetachAddressesWhenStreetNameWasReaddressed()
await Then(async _ =>
{
_mockCommandHandler.Verify(x =>
x.Handle(
x.HandleIdempotent(
It.Is<ReaddressAddresses>(y =>
y.ParcelId == parcelOneId
&& y.Readdresses.Count == 2
Expand All @@ -97,7 +97,7 @@ await Then(async _ =>
CancellationToken.None),
Times.Once);
_mockCommandHandler.Verify(x =>
x.Handle(
x.HandleIdempotent(
It.Is<ReaddressAddresses>(y =>
y.ParcelId == parcelTwoId
&& y.Readdresses.Count == 1
Expand Down

0 comments on commit 36377ca

Please sign in to comment.