diff --git a/ParcelRegistry.sln.DotSettings b/ParcelRegistry.sln.DotSettings
index c38e744a..37da9169 100644
--- a/ParcelRegistry.sln.DotSettings
+++ b/ParcelRegistry.sln.DotSettings
@@ -14,6 +14,7 @@
True
True
<Policy Inspect="True" Prefix="" Suffix="" Style="AaBb"><ExtraRule Prefix="" Suffix="" Style="aa_bb" /></Policy>
+ <Policy><Descriptor Staticness="Any" AccessRightKinds="Any" Description="Types and namespaces"><ElementKinds><Kind Name="NAMESPACE" /><Kind Name="CLASS" /><Kind Name="STRUCT" /><Kind Name="ENUM" /><Kind Name="DELEGATE" /></ElementKinds></Descriptor><Policy Inspect="True" Prefix="" Suffix="" Style="AaBb"><ExtraRule Prefix="" Suffix="" Style="aa_bb" /></Policy></Policy>
True
Never
Never
@@ -26,4 +27,5 @@
True
True
True
+ True
diff --git a/paket.dependencies b/paket.dependencies
index 50282344..6331fee1 100755
--- a/paket.dependencies
+++ b/paket.dependencies
@@ -79,16 +79,16 @@ nuget Be.Vlaanderen.Basisregisters.Projector 15.0.0
nuget Be.Vlaanderen.Basisregisters.Crab 4.0.0
-nuget Be.Vlaanderen.Basisregisters.GrAr.Common 21.1.0
-nuget Be.Vlaanderen.Basisregisters.GrAr.Notifications 21.1.0
-nuget Be.Vlaanderen.Basisregisters.GrAr.Contracts 21.1.0
-nuget Be.Vlaanderen.Basisregisters.GrAr.Edit 21.1.0
-nuget Be.Vlaanderen.Basisregisters.GrAr.Import 21.1.0
-nuget Be.Vlaanderen.Basisregisters.GrAr.Legacy 21.1.0
-nuget Be.Vlaanderen.Basisregisters.GrAr.Provenance 21.1.0
-nuget Be.Vlaanderen.Basisregisters.GrAr.Provenance.AcmIdm 21.1.0
-nuget Be.Vlaanderen.Basisregisters.GrAr.Extracts 21.1.0
-nuget Be.Vlaanderen.Basisregisters.GrAr.Oslo 21.1.0
+nuget Be.Vlaanderen.Basisregisters.GrAr.Common 21.6.0
+nuget Be.Vlaanderen.Basisregisters.GrAr.Notifications 21.6.0
+nuget Be.Vlaanderen.Basisregisters.GrAr.Contracts 21.6.0
+nuget Be.Vlaanderen.Basisregisters.GrAr.Edit 21.6.0
+nuget Be.Vlaanderen.Basisregisters.GrAr.Import 21.6.0
+nuget Be.Vlaanderen.Basisregisters.GrAr.Legacy 21.6.0
+nuget Be.Vlaanderen.Basisregisters.GrAr.Provenance 21.6.0
+nuget Be.Vlaanderen.Basisregisters.GrAr.Provenance.AcmIdm 21.6.0
+nuget Be.Vlaanderen.Basisregisters.GrAr.Extracts 21.6.0
+nuget Be.Vlaanderen.Basisregisters.GrAr.Oslo 21.6.0
nuget Be.Vlaanderen.Basisregisters.MessageHandling.AwsSqs.Simple 5.0.1
nuget Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Simple 5.0.1
diff --git a/paket.lock b/paket.lock
index d5c41856..632c6073 100644
--- a/paket.lock
+++ b/paket.lock
@@ -253,20 +253,18 @@ NUGET
Autofac.Extensions.DependencyInjection (>= 9.0)
Be.Vlaanderen.Basisregisters.EventHandling (5.0)
Be.Vlaanderen.Basisregisters.Generators.Guid.Deterministic (4.0)
- Be.Vlaanderen.Basisregisters.GrAr.Common (21.1)
+ Be.Vlaanderen.Basisregisters.GrAr.Common (21.6)
Be.Vlaanderen.Basisregisters.AggregateSource (>= 9.0.1)
Be.Vlaanderen.Basisregisters.CommandHandling (>= 9.0.1)
NetTopologySuite (>= 2.5)
NodaTime (>= 3.1.11)
- Be.Vlaanderen.Basisregisters.GrAr.Contracts (21.1)
- Be.Vlaanderen.Basisregisters.AggregateSource (>= 9.0.1)
- NodaTime (>= 3.1.11)
- Be.Vlaanderen.Basisregisters.GrAr.Edit (21.1)
+ Be.Vlaanderen.Basisregisters.GrAr.Contracts (21.6)
+ Be.Vlaanderen.Basisregisters.GrAr.Edit (21.6)
NetTopologySuite (>= 2.5)
- Be.Vlaanderen.Basisregisters.GrAr.Extracts (21.1)
+ Be.Vlaanderen.Basisregisters.GrAr.Extracts (21.6)
Be.Vlaanderen.Basisregisters.Api (>= 21.0)
Be.Vlaanderen.Basisregisters.Shaperon (>= 10.0.2)
- Be.Vlaanderen.Basisregisters.GrAr.Import (21.1)
+ Be.Vlaanderen.Basisregisters.GrAr.Import (21.6)
Autofac (>= 8.0)
Be.Vlaanderen.Basisregisters.AggregateSource.SqlStreamStore (>= 9.0.1)
Be.Vlaanderen.Basisregisters.CommandHandling (>= 9.0.1)
@@ -281,31 +279,31 @@ NUGET
Serilog (>= 3.1.1)
Serilog.Extensions.Logging (>= 8.0)
System.Threading.Tasks.Dataflow (>= 8.0)
- Be.Vlaanderen.Basisregisters.GrAr.Legacy (21.1)
- Be.Vlaanderen.Basisregisters.GrAr.Common (21.1)
+ Be.Vlaanderen.Basisregisters.GrAr.Legacy (21.6)
+ Be.Vlaanderen.Basisregisters.GrAr.Common (21.6)
Be.Vlaanderen.Basisregisters.Utilities.Rfc3339DateTimeOffset (>= 4.0)
Newtonsoft.Json (>= 13.0.3)
- Be.Vlaanderen.Basisregisters.GrAr.Notifications (21.1)
+ Be.Vlaanderen.Basisregisters.GrAr.Notifications (21.6)
AWSSDK.SimpleNotificationService (>= 3.7.301.3)
System.Text.Json (>= 8.0.3)
- Be.Vlaanderen.Basisregisters.GrAr.Oslo (21.1)
+ Be.Vlaanderen.Basisregisters.GrAr.Oslo (21.6)
Be.Vlaanderen.Basisregisters.AspNetCore.Mvc.Formatters.Json (>= 5.0)
- Be.Vlaanderen.Basisregisters.GrAr.Common (21.1)
+ Be.Vlaanderen.Basisregisters.GrAr.Common (21.6)
Be.Vlaanderen.Basisregisters.Utilities.Rfc3339DateTimeOffset (>= 4.0)
Microsoft.Extensions.Configuration (>= 8.0)
Microsoft.Extensions.Http.Polly (>= 8.0.3)
Newtonsoft.Json (>= 13.0.3)
- Be.Vlaanderen.Basisregisters.GrAr.Provenance (21.1)
+ Be.Vlaanderen.Basisregisters.GrAr.Provenance (21.6)
Be.Vlaanderen.Basisregisters.CommandHandling (>= 9.0.1)
Be.Vlaanderen.Basisregisters.Crab (>= 4.0)
- Be.Vlaanderen.Basisregisters.GrAr.Common (21.1)
+ Be.Vlaanderen.Basisregisters.GrAr.Common (21.6)
Microsoft.CSharp (>= 4.7)
- Be.Vlaanderen.Basisregisters.GrAr.Provenance.AcmIdm (21.1)
+ Be.Vlaanderen.Basisregisters.GrAr.Provenance.AcmIdm (21.6)
Be.Vlaanderen.Basisregisters.Auth.AcmIdm (>= 2.0)
Be.Vlaanderen.Basisregisters.CommandHandling (>= 9.0.1)
Be.Vlaanderen.Basisregisters.Crab (>= 4.0)
- Be.Vlaanderen.Basisregisters.GrAr.Common (21.1)
- Be.Vlaanderen.Basisregisters.GrAr.Provenance (21.1)
+ Be.Vlaanderen.Basisregisters.GrAr.Common (21.6)
+ Be.Vlaanderen.Basisregisters.GrAr.Provenance (21.6)
Microsoft.CSharp (>= 4.7)
Be.Vlaanderen.Basisregisters.MessageHandling.AwsSqs.Simple (5.0.1)
AWSSDK.Core (>= 3.7.302.15)
diff --git a/src/ParcelRegistry.Consumer.Address.Console/Program.cs b/src/ParcelRegistry.Consumer.Address.Console/Program.cs
index 864cb345..85f79982 100644
--- a/src/ParcelRegistry.Consumer.Address.Console/Program.cs
+++ b/src/ParcelRegistry.Consumer.Address.Console/Program.cs
@@ -9,11 +9,11 @@ namespace ParcelRegistry.Consumer.Address.Console
using Autofac;
using Autofac.Extensions.DependencyInjection;
using Be.Vlaanderen.Basisregisters.Aws.DistributedMutex;
+ using Be.Vlaanderen.Basisregisters.CommandHandling.Idempotency;
using Be.Vlaanderen.Basisregisters.EventHandling;
using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka;
using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer;
using Destructurama;
- using Infrastructure;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
@@ -25,6 +25,7 @@ namespace ParcelRegistry.Consumer.Address.Console
using Serilog;
using Serilog.Debugging;
using Serilog.Extensions.Logging;
+ using MigrationsHelper = Infrastructure.MigrationsHelper;
public sealed class Program
{
@@ -145,6 +146,18 @@ public static async Task Main(string[] args)
.As>()
.SingleInstance();
+ services.ConfigureIdempotency(
+ hostContext.Configuration.GetSection(IdempotencyConfiguration.Section)
+ .Get()!.ConnectionString!,
+ new IdempotencyMigrationsTableInfo(Schema.Import),
+ new IdempotencyTableInfo(Schema.Import),
+ loggerFactory);
+
+ builder.RegisterType()
+ .As()
+ .AsSelf()
+ .InstancePerLifetimeScope();
+
builder
.RegisterModule(new EditModule(hostContext.Configuration))
.RegisterModule(new BackOfficeModule(hostContext.Configuration, services, loggerFactory, ServiceLifetime.Transient));
diff --git a/src/ParcelRegistry.Consumer.Address.Console/appsettings.json b/src/ParcelRegistry.Consumer.Address.Console/appsettings.json
index ae773a67..1ce8dbbe 100644
--- a/src/ParcelRegistry.Consumer.Address.Console/appsettings.json
+++ b/src/ParcelRegistry.Consumer.Address.Console/appsettings.json
@@ -7,6 +7,10 @@
"BackOffice": "Server=(localdb)\\mssqllocaldb;Database=EFProviders.InMemory.ParcelRegistry;Trusted_Connection=True;TrustServerCertificate=True;"
},
+ "Idempotency": {
+ "ConnectionString": "Server=(localdb)\\mssqllocaldb;Database=EFProviders.InMemory.ParcelRegistry;Trusted_Connection=True;TrustServerCertificate=True;"
+ },
+
"Kafka": {
"BootstrapServers": "localhost:29092/"
},
diff --git a/src/ParcelRegistry.Consumer.Address/BackOfficeConsumer.cs b/src/ParcelRegistry.Consumer.Address/BackOfficeConsumer.cs
index dd484088..79641938 100644
--- a/src/ParcelRegistry.Consumer.Address/BackOfficeConsumer.cs
+++ b/src/ParcelRegistry.Consumer.Address/BackOfficeConsumer.cs
@@ -10,6 +10,7 @@ namespace ParcelRegistry.Consumer.Address
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
+ using Parcel;
using Projections;
public sealed class BackOfficeConsumer : BackgroundService
@@ -17,6 +18,7 @@ public sealed class BackOfficeConsumer : BackgroundService
private readonly ILifetimeScope _lifetimeScope;
private readonly IHostApplicationLifetime _hostApplicationLifetime;
private readonly IDbContextFactory _backOfficeContextFactory;
+ private readonly IParcels _parcels;
private readonly ILoggerFactory _loggerFactory;
private readonly IIdempotentConsumer _kafkaIdemIdompotencyConsumer;
private readonly ILogger _logger;
@@ -25,12 +27,14 @@ public BackOfficeConsumer(
ILifetimeScope lifetimeScope,
IHostApplicationLifetime hostApplicationLifetime,
IDbContextFactory backOfficeContextFactory,
+ IParcels parcels,
ILoggerFactory loggerFactory,
IIdempotentConsumer kafkaIdemIdompotencyConsumer)
{
_lifetimeScope = lifetimeScope;
_hostApplicationLifetime = hostApplicationLifetime;
_backOfficeContextFactory = backOfficeContextFactory;
+ _parcels = parcels;
_loggerFactory = loggerFactory;
_kafkaIdemIdompotencyConsumer = kafkaIdemIdompotencyConsumer;
@@ -45,7 +49,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
var commandHandlingProjector = new ConnectedProjector(
Resolve.WhenEqualToHandlerMessageType(
- new CommandHandlingKafkaProjection(_backOfficeContextFactory).Handlers));
+ new CommandHandlingKafkaProjection(_backOfficeContextFactory, _parcels).Handlers));
var commandHandler = new CommandHandler(_lifetimeScope, _loggerFactory);
diff --git a/src/ParcelRegistry.Consumer.Address/Projections/CommandHandler.cs b/src/ParcelRegistry.Consumer.Address/Projections/CommandHandler.cs
index 2f3a07cb..ba54f438 100644
--- a/src/ParcelRegistry.Consumer.Address/Projections/CommandHandler.cs
+++ b/src/ParcelRegistry.Consumer.Address/Projections/CommandHandler.cs
@@ -1,9 +1,11 @@
namespace ParcelRegistry.Consumer.Address.Projections
{
+ using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Autofac;
using Be.Vlaanderen.Basisregisters.CommandHandling;
+ using Be.Vlaanderen.Basisregisters.CommandHandling.Idempotency;
using Be.Vlaanderen.Basisregisters.GrAr.Provenance;
using Microsoft.Extensions.Logging;
@@ -30,5 +32,22 @@ public virtual async Task Handle(T command, CancellationToken cancellationTok
_logger.LogDebug($"Handled {command.GetType().FullName}");
}
+
+ public virtual async Task HandleIdempotent(T command, CancellationToken cancellationToken)
+ where T : class, IHasCommandProvenance
+ {
+ _logger.LogDebug($"Idempotently handling {command.GetType().FullName}");
+
+ await using var scope = _container.BeginLifetimeScope();
+
+ var resolver = scope.Resolve();
+ _ = await resolver.Dispatch(
+ command.CreateCommandId(),
+ command,
+ new Dictionary(),
+ cancellationToken: cancellationToken);
+
+ _logger.LogDebug($"Idempotently handled {command.GetType().FullName}");
+ }
}
}
diff --git a/src/ParcelRegistry.Consumer.Address/Projections/CommandHandlingKafkaProjection.cs b/src/ParcelRegistry.Consumer.Address/Projections/CommandHandlingKafkaProjection.cs
index 3126ad0a..3ad02e32 100644
--- a/src/ParcelRegistry.Consumer.Address/Projections/CommandHandlingKafkaProjection.cs
+++ b/src/ParcelRegistry.Consumer.Address/Projections/CommandHandlingKafkaProjection.cs
@@ -5,6 +5,7 @@ namespace ParcelRegistry.Consumer.Address.Projections
using System.Threading;
using System.Threading.Tasks;
using Api.BackOffice.Abstractions;
+ using Be.Vlaanderen.Basisregisters.CommandHandling.Idempotency;
using Be.Vlaanderen.Basisregisters.GrAr.Contracts.AddressRegistry;
using Be.Vlaanderen.Basisregisters.GrAr.Provenance;
using Be.Vlaanderen.Basisregisters.ProjectionHandling.Connector;
@@ -19,7 +20,9 @@ public sealed class CommandHandlingKafkaProjection : ConnectedProjection _backOfficeContextFactory;
- public CommandHandlingKafkaProjection(IDbContextFactory backOfficeContextFactory)
+ public CommandHandlingKafkaProjection(
+ IDbContextFactory backOfficeContextFactory,
+ IParcels parcels)
{
_backOfficeContextFactory = backOfficeContextFactory;
@@ -159,26 +162,82 @@ await DetachBecauseRemoved(
ct);
});
- When(async (commandHandler, message, ct) =>
+ When(async (commandHandler, message, ct) =>
{
await using var backOfficeContext = await _backOfficeContextFactory.CreateDbContextAsync(ct);
- await ReplaceBecauseOfReaddress(
- commandHandler,
- backOfficeContext,
- message.ReaddressedHouseNumber,
- message.Provenance,
- ct);
+ var readdresses = message.ReaddressedHouseNumbers
+ .Select(x => new ReaddressData(
+ new AddressPersistentLocalId(x.ReaddressedHouseNumber.SourceAddressPersistentLocalId),
+ new AddressPersistentLocalId(x.ReaddressedHouseNumber.DestinationAddressPersistentLocalId)))
+ .Concat(
+ message.ReaddressedHouseNumbers
+ .SelectMany(x => x.ReaddressedBoxNumbers)
+ .Select(boxNumberAddress => new ReaddressData(
+ new AddressPersistentLocalId(boxNumberAddress.SourceAddressPersistentLocalId),
+ new AddressPersistentLocalId(boxNumberAddress.DestinationAddressPersistentLocalId))))
+ .ToList();
+
+ var sourceAddressPersistentLocalIds = readdresses
+ .Select(x => (int)x.SourceAddressPersistentLocalId)
+ .ToList();
- foreach (var readdressedBoxNumber in message.ReaddressedBoxNumbers)
+ var sourceAddressParcelRelations = await backOfficeContext.ParcelAddressRelations
+ .AsNoTracking()
+ .Where(x => sourceAddressPersistentLocalIds.Contains(x.AddressPersistentLocalId))
+ .ToListAsync(cancellationToken: ct);
+
+ var commandByParcels = sourceAddressParcelRelations
+ .GroupBy(
+ relation => relation.ParcelId,
+ relation => readdresses.Where(x => x.SourceAddressPersistentLocalId == relation.AddressPersistentLocalId))
+ .Select(x => new ReaddressAddresses(
+ new ParcelId(x.Key),
+ x.SelectMany(a => a),
+ FromProvenance(message.Provenance)))
+ .ToList();
+
+ foreach (var command in commandByParcels)
{
- await ReplaceBecauseOfReaddress(
- commandHandler,
- backOfficeContext,
- readdressedBoxNumber,
- message.Provenance,
- ct);
+ try
+ {
+ await commandHandler.HandleIdempotent(command, ct);
+ }
+ catch (IdempotencyException)
+ {
+ // do nothing
+ }
+ }
+
+ await backOfficeContext.Database.BeginTransactionAsync();
+
+ foreach (var parcelId in commandByParcels.Select(x => x.ParcelId))
+ {
+ var parcel = await parcels.GetAsync(new ParcelStreamId(parcelId), ct);
+
+ var backOfficeAddresses = (await backOfficeContext.ParcelAddressRelations
+ .AsNoTracking()
+ .Where(x => x.ParcelId == parcelId)
+ .Select(x => x.AddressPersistentLocalId)
+ .ToListAsync(cancellationToken: ct))
+ .Select(x => new AddressPersistentLocalId(x))
+ .ToList();
+
+ var addressesToRemove = backOfficeAddresses.Except(parcel.AddressPersistentLocalIds).ToList();
+ var addressesToAdd = parcel.AddressPersistentLocalIds.Except(backOfficeAddresses).ToList();
+
+ foreach (var addressPersistentLocalId in addressesToRemove)
+ {
+ await backOfficeContext.RemoveIdempotentParcelAddressRelation(parcelId, addressPersistentLocalId, ct);
+ }
+
+ foreach (var addressPersistentLocalId in addressesToAdd)
+ {
+ await backOfficeContext.AddIdempotentParcelAddressRelation(parcelId, addressPersistentLocalId, ct);
+ }
}
+
+ await backOfficeContext.Database.CommitTransactionAsync();
});
When(async (commandHandler, message, ct) =>
@@ -190,7 +249,6 @@ await DetachBecauseRejected(
ct);
});
-
When(async (commandHandler, message, ct) =>
{
await DetachBecauseRetired(
@@ -201,37 +259,6 @@ await DetachBecauseRetired(
});
}
- private async Task ReplaceBecauseOfReaddress(
- CommandHandler commandHandler,
- BackOfficeContext backOfficeContext,
- ReaddressedAddressData readdressedAddress,
- Contracts.Provenance provenance,
- CancellationToken ct)
- {
- var relations = backOfficeContext.ParcelAddressRelations
- .AsNoTracking()
- .Where(x =>
- x.AddressPersistentLocalId == readdressedAddress.SourceAddressPersistentLocalId)
- .ToList();
-
- foreach (var relation in relations)
- {
- var command = new ReplaceAttachedAddressBecauseAddressWasReaddressed(
- new ParcelId(relation.ParcelId),
- newAddressPersistentLocalId: new AddressPersistentLocalId(readdressedAddress.DestinationAddressPersistentLocalId),
- previousAddressPersistentLocalId: new AddressPersistentLocalId(readdressedAddress.SourceAddressPersistentLocalId),
- FromProvenance(provenance));
-
- await commandHandler.Handle(command, ct);
-
- // This should only be handled by the back office projections to prevent conflicts. Else a relation is added or removed twice.
- // await backOfficeContext.RemoveIdempotentParcelAddressRelation(
- // command.ParcelId, new AddressPersistentLocalId(readdressedAddress.SourceAddressPersistentLocalId), ct);
- // await backOfficeContext.AddIdempotentParcelAddressRelation(
- // command.ParcelId, new AddressPersistentLocalId(readdressedAddress.DestinationAddressPersistentLocalId), ct);
- }
- }
-
private async Task DetachBecauseRemoved(
CommandHandler commandHandler,
AddressPersistentLocalId addressPersistentLocalId,
@@ -264,9 +291,9 @@ private async Task DetachBecauseRetired(
{
await using var backOfficeContext = await _backOfficeContextFactory.CreateDbContextAsync(ct);
var relations = backOfficeContext.ParcelAddressRelations
- .AsNoTracking()
- .Where(x => x.AddressPersistentLocalId == addressPersistentLocalId)
- .ToList();
+ .AsNoTracking()
+ .Where(x => x.AddressPersistentLocalId == addressPersistentLocalId)
+ .ToList();
foreach (var relation in relations)
{
diff --git a/src/ParcelRegistry.Producer.Snapshot.Oslo/ProducerProjections.cs b/src/ParcelRegistry.Producer.Snapshot.Oslo/ProducerProjections.cs
index 3057fc3c..60414eb9 100644
--- a/src/ParcelRegistry.Producer.Snapshot.Oslo/ProducerProjections.cs
+++ b/src/ParcelRegistry.Producer.Snapshot.Oslo/ProducerProjections.cs
@@ -120,6 +120,20 @@ await snapshotManager.FindMatchingSnapshot(
ct);
});
+ When>(async (_, message, ct) =>
+ {
+ await FindAndProduce(async () =>
+ await snapshotManager.FindMatchingSnapshot(
+ message.Message.CaPaKey,
+ message.Message.Provenance.Timestamp,
+ message.Message.GetHash(),
+ message.Position,
+ throwStaleWhenGone: false,
+ ct),
+ message.Position,
+ ct);
+ });
+
When>(async (_, message, ct) =>
{
await FindAndProduce(async () =>
diff --git a/src/ParcelRegistry.Producer/Extensions/MessageExtensions.cs b/src/ParcelRegistry.Producer/Extensions/MessageExtensions.cs
index c32fca9e..4c88662a 100644
--- a/src/ParcelRegistry.Producer/Extensions/MessageExtensions.cs
+++ b/src/ParcelRegistry.Producer/Extensions/MessageExtensions.cs
@@ -1,5 +1,6 @@
namespace ParcelRegistry.Producer.Extensions
{
+ using System.Linq;
using Be.Vlaanderen.Basisregisters.GrAr.Provenance;
using Contracts = Be.Vlaanderen.Basisregisters.GrAr.Contracts.ParcelRegistry;
using Legacy = Legacy.Events;
@@ -64,6 +65,16 @@ public static Contracts.ParcelAddressWasReplacedBecauseAddressWasReaddressed ToC
message.PreviousAddressPersistentLocalId,
message.Provenance.ToContract());
+ public static Contracts.ParcelAddressesWereReaddressed ToContract(this ParcelAggregate.ParcelAddressesWereReaddressed message) =>
+ new Contracts.ParcelAddressesWereReaddressed(
+ message.ParcelId.ToString("D"),
+ message.CaPaKey,
+ message.AttachedAddressPersistentLocalIds,
+ message.DetachedAddressPersistentLocalIds,
+ message.AddressRegistryReaddresses.Select(x =>
+ new Contracts.AddressRegistryReaddress(x.SourceAddressPersistentLocalId, x.SourceAddressPersistentLocalId)),
+ message.Provenance.ToContract());
+
public static Contracts.ParcelWasMigrated ToContract(this ParcelAggregate.ParcelWasMigrated message) =>
new Contracts.ParcelWasMigrated(
message.OldParcelId.ToString("D"),
diff --git a/src/ParcelRegistry.Producer/ProducerMigrateProjections.cs b/src/ParcelRegistry.Producer/ProducerMigrateProjections.cs
index 844a5f9c..ca6de405 100644
--- a/src/ParcelRegistry.Producer/ProducerMigrateProjections.cs
+++ b/src/ParcelRegistry.Producer/ProducerMigrateProjections.cs
@@ -52,6 +52,11 @@ public ProducerMigrateProjections(IProducer producer)
await Produce(message.Message.ParcelId, message.Message.ToContract(), message.Position, ct);
});
+ When>(async (_, message, ct) =>
+ {
+ await Produce(message.Message.ParcelId, message.Message.ToContract(), message.Position, ct);
+ });
+
When>(async (_, message, ct) =>
{
await Produce(message.Message.ParcelId, message.Message.ToContract(), message.Position, ct);
diff --git a/src/ParcelRegistry.Producer/ProducerProjections.cs b/src/ParcelRegistry.Producer/ProducerProjections.cs
index 3969213b..31f54aa7 100644
--- a/src/ParcelRegistry.Producer/ProducerProjections.cs
+++ b/src/ParcelRegistry.Producer/ProducerProjections.cs
@@ -106,6 +106,11 @@ public ProducerProjections(IProducer producer)
await Produce(message.Message.ParcelId, message.Message.ToContract(), message.Position, ct);
});
+ When>(async (_, message, ct) =>
+ {
+ await Produce(message.Message.ParcelId, message.Message.ToContract(), message.Position, ct);
+ });
+
When>(async (_, message, ct) =>
{
await Produce(message.Message.ParcelId, message.Message.ToContract(), message.Position, ct);
diff --git a/src/ParcelRegistry.Projections.BackOffice/BackOfficeProjections.cs b/src/ParcelRegistry.Projections.BackOffice/BackOfficeProjections.cs
index 73890a48..0aaf132b 100644
--- a/src/ParcelRegistry.Projections.BackOffice/BackOfficeProjections.cs
+++ b/src/ParcelRegistry.Projections.BackOffice/BackOfficeProjections.cs
@@ -1,5 +1,6 @@
-namespace ParcelRegistry.Projections.BackOffice
+namespace ParcelRegistry.Projections.BackOffice
{
+ using System.Threading.Tasks;
using Api.BackOffice.Abstractions;
using Be.Vlaanderen.Basisregisters.ProjectionHandling.Connector;
using Be.Vlaanderen.Basisregisters.ProjectionHandling.SqlStreamStore;
@@ -106,6 +107,12 @@ await backOfficeContext.AddIdempotentParcelAddressRelation(
newAddress.Count += 1;
}
});
+
+ When>((_, message, cancellationToken) =>
+ {
+ // Do nothing
+ return Task.CompletedTask;
+ });
}
}
}
diff --git a/src/ParcelRegistry.Projections.Extract/ParcelLinkExtractWithCount/ParcelLinkExtractProjections.cs b/src/ParcelRegistry.Projections.Extract/ParcelLinkExtractWithCount/ParcelLinkExtractProjections.cs
index 0b2c57c7..9f369c25 100644
--- a/src/ParcelRegistry.Projections.Extract/ParcelLinkExtractWithCount/ParcelLinkExtractProjections.cs
+++ b/src/ParcelRegistry.Projections.Extract/ParcelLinkExtractWithCount/ParcelLinkExtractProjections.cs
@@ -7,7 +7,6 @@ namespace ParcelRegistry.Projections.Extract.ParcelLinkExtractWithCount
using Be.Vlaanderen.Basisregisters.GrAr.Extracts;
using Be.Vlaanderen.Basisregisters.ProjectionHandling.Connector;
using Be.Vlaanderen.Basisregisters.ProjectionHandling.SqlStreamStore;
- using Microsoft.Extensions.Options;
using Parcel.Events;
[ConnectedProjectionName("Extract perceelkoppelingen met adres")]
@@ -18,7 +17,7 @@ public sealed class ParcelLinkExtractProjections : ConnectedProjection extractConfig, Encoding encoding)
+ public ParcelLinkExtractProjections(Encoding encoding)
{
_encoding = encoding ?? throw new ArgumentNullException(nameof(encoding));
@@ -130,6 +129,33 @@ await context
newAddress.Count += 1;
}
});
+
+ When>(async (context, message, ct) =>
+ {
+ foreach (var addressPersistentLocalId in message.Message.DetachedAddressPersistentLocalIds)
+ {
+ var relation = await context
+ .ParcelLinkExtractWithCount
+ .FindAsync([message.Message.ParcelId, addressPersistentLocalId], ct);
+
+ if (relation is not null)
+ {
+ context.ParcelLinkExtractWithCount.Remove(relation);
+ }
+ }
+
+ foreach (var addressPersistentLocalId in message.Message.AttachedAddressPersistentLocalIds)
+ {
+ var relation = await context
+ .ParcelLinkExtractWithCount
+ .FindAsync([message.Message.ParcelId, addressPersistentLocalId], ct);
+
+ if (relation is not null)
+ {
+ await context.ParcelLinkExtractWithCount.AddAsync(relation, ct);
+ }
+ }
+ });
}
private static async Task RemoveParcelLink(
diff --git a/src/ParcelRegistry.Projections.Integration/ParcelLatestItem/ParcelLatestItemProjections.cs b/src/ParcelRegistry.Projections.Integration/ParcelLatestItem/ParcelLatestItemProjections.cs
index ab53af92..bc5faccf 100644
--- a/src/ParcelRegistry.Projections.Integration/ParcelLatestItem/ParcelLatestItemProjections.cs
+++ b/src/ParcelRegistry.Projections.Integration/ParcelLatestItem/ParcelLatestItemProjections.cs
@@ -137,6 +137,38 @@ await context
}
});
+ When>(async (context, message, ct) =>
+ {
+ foreach (var addressPersistentLocalId in message.Message.DetachedAddressPersistentLocalIds)
+ {
+ var relation = await context
+ .ParcelLatestItemAddresses
+ .FindAsync([message.Message.ParcelId, addressPersistentLocalId], ct);
+
+ if (relation is not null)
+ {
+ context.ParcelLatestItemAddresses.Remove(relation);
+ }
+ }
+
+ foreach (var addressPersistentLocalId in message.Message.AttachedAddressPersistentLocalIds)
+ {
+ var relation = await context
+ .ParcelLatestItemAddresses
+ .FindAsync([message.Message.ParcelId, addressPersistentLocalId], ct);
+
+ if (relation is null)
+ {
+ await context.ParcelLatestItemAddresses.AddAsync(
+ new ParcelLatestItemAddress(
+ message.Message.ParcelId,
+ addressPersistentLocalId,
+ message.Message.CaPaKey),
+ ct);
+ }
+ }
+ });
+
When>(async (context, message, ct) =>
{
var latestItemAddress = await context
diff --git a/src/ParcelRegistry.Projections.Integration/ParcelVersion/ParcelVersionProjections.cs b/src/ParcelRegistry.Projections.Integration/ParcelVersion/ParcelVersionProjections.cs
index 85ad4e5c..f75d995b 100644
--- a/src/ParcelRegistry.Projections.Integration/ParcelVersion/ParcelVersionProjections.cs
+++ b/src/ParcelRegistry.Projections.Integration/ParcelVersion/ParcelVersionProjections.cs
@@ -160,6 +160,44 @@ await context
}
});
+ When>(async (context, message, ct) =>
+ {
+ await context.CreateNewParcelVersion(
+ message.Message.ParcelId,
+ message,
+ _ => { }, ct);
+
+ foreach (var addressPersistentLocalId in message.Message.DetachedAddressPersistentLocalIds)
+ {
+ var relation = await context
+ .ParcelVersionAddresses
+ .FindAsync([message.Position, message.Message.ParcelId, addressPersistentLocalId], ct);
+
+ if (relation is not null)
+ {
+ context.ParcelVersionAddresses.Remove(relation);
+ }
+ }
+
+ foreach (var addressPersistentLocalId in message.Message.AttachedAddressPersistentLocalIds)
+ {
+ var relation = await context
+ .ParcelVersionAddresses
+ .FindAsync([message.Position, message.Message.ParcelId, addressPersistentLocalId], ct);
+
+ if (relation is null)
+ {
+ await context.ParcelVersionAddresses.AddAsync(
+ new ParcelVersionAddress(
+ message.Position,
+ message.Message.ParcelId,
+ addressPersistentLocalId,
+ message.Message.CaPaKey),
+ ct);
+ }
+ }
+ });
+
When>(async (context, message, ct) =>
{
await context.CreateNewParcelVersion(
diff --git a/src/ParcelRegistry.Projections.LastChangedList/LastChangedListProjections.cs b/src/ParcelRegistry.Projections.LastChangedList/LastChangedListProjections.cs
index f5cf0ccd..959af620 100644
--- a/src/ParcelRegistry.Projections.LastChangedList/LastChangedListProjections.cs
+++ b/src/ParcelRegistry.Projections.LastChangedList/LastChangedListProjections.cs
@@ -114,6 +114,11 @@ public LastChangedListProjections(ICacheValidator cacheValidator)
await GetLastChangedRecordsAndUpdatePosition(message.Message.ParcelId.ToString(), message.Position, context, ct);
});
+ When>(async (context, message, ct) =>
+ {
+ await GetLastChangedRecordsAndUpdatePosition(message.Message.ParcelId.ToString(), message.Position, context, ct);
+ });
+
When>(async (context, message, ct) =>
{
var records = await GetLastChangedRecordsAndUpdatePosition(message.Message.ParcelId.ToString(), message.Position, context, ct);
diff --git a/src/ParcelRegistry.Projections.Legacy/ParcelDetailWithCountV2/ParcelDetailV2Projections.cs b/src/ParcelRegistry.Projections.Legacy/ParcelDetailWithCountV2/ParcelDetailV2Projections.cs
index bc1b1ca6..d16c4a2d 100644
--- a/src/ParcelRegistry.Projections.Legacy/ParcelDetailWithCountV2/ParcelDetailV2Projections.cs
+++ b/src/ParcelRegistry.Projections.Legacy/ParcelDetailWithCountV2/ParcelDetailV2Projections.cs
@@ -211,6 +211,45 @@ await context.FindAndUpdateParcelDetail(
ct);
});
+ When>(async (context, message, ct) =>
+ {
+ await context.FindAndUpdateParcelDetail(
+ message.Message.ParcelId,
+ entity =>
+ {
+ context.Entry(entity).Collection(x => x.Addresses).Load();
+
+
+ foreach (var addressPersistentLocalId in message.Message.DetachedAddressPersistentLocalIds)
+ {
+ var relation = entity.Addresses.SingleOrDefault(parcelAddress =>
+ parcelAddress.AddressPersistentLocalId == addressPersistentLocalId
+ && parcelAddress.ParcelId == message.Message.ParcelId);
+
+ if (relation is not null)
+ {
+ entity.Addresses.Remove(relation);
+ }
+ }
+
+ foreach (var addressPersistentLocalId in message.Message.AttachedAddressPersistentLocalIds)
+ {
+ var relation = entity.Addresses.SingleOrDefault(parcelAddress =>
+ parcelAddress.AddressPersistentLocalId == addressPersistentLocalId
+ && parcelAddress.ParcelId == message.Message.ParcelId);
+
+ if (relation is null)
+ {
+ entity.Addresses.Add(new ParcelDetailAddressV2(message.Message.ParcelId, addressPersistentLocalId));
+ }
+ }
+
+ UpdateHash(entity, message);
+ UpdateVersionTimestamp(entity, message.Message.Provenance.Timestamp);
+ },
+ ct);
+ });
+
When>(async (context, message, ct) =>
{
var (geometryType, gml) = ToGml(message.Message.ExtendedWkbGeometry);
diff --git a/src/ParcelRegistry.Projections.Legacy/ParcelSyndication/ParcelSyndicationProjections.cs b/src/ParcelRegistry.Projections.Legacy/ParcelSyndication/ParcelSyndicationProjections.cs
index c1c5ac8a..ae2dc595 100755
--- a/src/ParcelRegistry.Projections.Legacy/ParcelSyndication/ParcelSyndicationProjections.cs
+++ b/src/ParcelRegistry.Projections.Legacy/ParcelSyndication/ParcelSyndicationProjections.cs
@@ -5,7 +5,6 @@ namespace ParcelRegistry.Projections.Legacy.ParcelSyndication
using Be.Vlaanderen.Basisregisters.ProjectionHandling.Connector;
using Be.Vlaanderen.Basisregisters.ProjectionHandling.SqlStreamStore;
using Be.Vlaanderen.Basisregisters.Utilities.HexByteConvertor;
- using NetTopologySuite.IO;
using Parcel;
using Parcel.Events;
using ParcelRegistry.Legacy.Events;
@@ -273,6 +272,26 @@ await context.CreateNewParcelSyndicationItem(
},
ct);
});
+
+ When>(async (context, message, ct) =>
+ {
+ await context.CreateNewParcelSyndicationItem(
+ message.Message.ParcelId,
+ message,
+ x =>
+ {
+ foreach (var addressPersistentLocalId in message.Message.DetachedAddressPersistentLocalIds)
+ {
+ x.RemoveAddressPersistentLocalId(addressPersistentLocalId);
+ }
+
+ foreach (var addressPersistentLocalId in message.Message.AttachedAddressPersistentLocalIds)
+ {
+ x.AddAddressPersistentLocalId(addressPersistentLocalId);
+ }
+ },
+ ct);
+ });
}
private static async Task DoNothing()
diff --git a/src/ParcelRegistry.Projector/Infrastructure/Modules/ApiModule.cs b/src/ParcelRegistry.Projector/Infrastructure/Modules/ApiModule.cs
index 84aa01c7..f87ea993 100644
--- a/src/ParcelRegistry.Projector/Infrastructure/Modules/ApiModule.cs
+++ b/src/ParcelRegistry.Projector/Infrastructure/Modules/ApiModule.cs
@@ -102,9 +102,7 @@ private void RegisterExtractV2Projections(ContainerBuilder builder)
DbaseCodePage.Western_European_ANSI.ToEncoding()),
ConnectedProjectionSettings.Default)
.RegisterProjections(
- context => new ParcelLinkExtractWithCountProjections(
- context.Resolve>(),
- DbaseCodePage.Western_European_ANSI.ToEncoding()),
+ context => new ParcelLinkExtractWithCountProjections(DbaseCodePage.Western_European_ANSI.ToEncoding()),
ConnectedProjectionSettings.Default);
}
diff --git a/src/ParcelRegistry/Parcel/AddressCommandHandlerModule.cs b/src/ParcelRegistry/Parcel/AddressCommandHandlerModule.cs
index 02890c65..c0c9f85d 100644
--- a/src/ParcelRegistry/Parcel/AddressCommandHandlerModule.cs
+++ b/src/ParcelRegistry/Parcel/AddressCommandHandlerModule.cs
@@ -82,18 +82,16 @@ public AddressCommandHandlerModule(
parcel.DetachAddressBecauseAddressWasRetired(message.Command.AddressPersistentLocalId);
});
- For()
+ For()
.AddSqlStreamStore(getStreamStore, getUnitOfWork, eventMapping, eventSerializer, getSnapshotStore)
- .AddEventHash(getUnitOfWork)
+ .AddEventHash(getUnitOfWork)
.AddProvenance(getUnitOfWork, provenanceFactory)
.Handle(async (message, ct) =>
{
var streamId = new ParcelStreamId(message.Command.ParcelId);
var parcel = await parcelRepository().GetAsync(streamId, ct);
- parcel.ReplaceAttachedAddressBecauseAddressWasReaddressed(
- message.Command.NewAddressPersistentLocalId,
- message.Command.PreviousAddressPersistentLocalId);
+ parcel.ReaddressAddresses(message.Command.Readdresses);
});
}
}
diff --git a/src/ParcelRegistry/Parcel/Commands/ReaddressAddresses.cs b/src/ParcelRegistry/Parcel/Commands/ReaddressAddresses.cs
new file mode 100644
index 00000000..edfa3633
--- /dev/null
+++ b/src/ParcelRegistry/Parcel/Commands/ReaddressAddresses.cs
@@ -0,0 +1,63 @@
+namespace ParcelRegistry.Parcel.Commands
+{
+ using System;
+ using System.Collections.Generic;
+ using System.Linq;
+ using Be.Vlaanderen.Basisregisters.Generators.Guid;
+ using Be.Vlaanderen.Basisregisters.GrAr.Provenance;
+ using Be.Vlaanderen.Basisregisters.Utilities;
+
+ public class ReaddressAddresses : IHasCommandProvenance
+ {
+ private static readonly Guid Namespace = new Guid("646d3ef7-6cbc-4b33-b75f-e5d72e48c356");
+ public ParcelId ParcelId { get; }
+ public IReadOnlyList Readdresses { get; }
+ public Provenance Provenance { get; }
+
+ public ReaddressAddresses(
+ ParcelId parcelId,
+ IEnumerable readdresses,
+ Provenance provenance)
+ {
+ ParcelId = parcelId;
+ Readdresses = readdresses.ToList();
+ Provenance = provenance;
+ }
+
+ public Guid CreateCommandId()
+ => Deterministic.Create(Namespace, $"ReaddressAddresses-{ToString()}");
+
+ public override string? ToString()
+ => ToStringBuilder.ToString(IdentityFields());
+
+ private IEnumerable