Skip to content

Commit

Permalink
feat: create oslo snapshots
Browse files Browse the repository at this point in the history
  • Loading branch information
jvandaal committed May 27, 2024
1 parent ffa1c5a commit 1a47eea
Show file tree
Hide file tree
Showing 28 changed files with 821 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
namespace ParcelRegistry.Api.BackOffice.Abstractions.Requests
{
using System.Collections.Generic;

public class CreateOsloSnapshotsRequest
{
public List<string> CaPaKeys { get; set; }

public string Reden { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
namespace ParcelRegistry.Api.BackOffice.Abstractions.SqsRequests
{
using Be.Vlaanderen.Basisregisters.Sqs.Requests;
using Requests;

public class CreateOsloSnapshotsSqsRequest : SqsRequest
{
public CreateOsloSnapshotsRequest Request { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
namespace ParcelRegistry.Api.BackOffice.Handlers.Lambda.Handlers
{
using Be.Vlaanderen.Basisregisters.AggregateSource;
using Be.Vlaanderen.Basisregisters.CommandHandling.Idempotency;
using Be.Vlaanderen.Basisregisters.Sqs.Lambda.Handlers;
using Be.Vlaanderen.Basisregisters.Sqs.Lambda.Infrastructure;
using Requests;
using TicketingService.Abstractions;

public sealed class CreateOsloSnapshotsLambdaHandler : SqsLambdaHandlerBase<CreateOsloSnapshotsLambdaRequest>
{
public CreateOsloSnapshotsLambdaHandler(
ICustomRetryPolicy retryPolicy,
ITicketing ticketing,
IIdempotentCommandHandler idempotentCommandHandler)
: base(retryPolicy, ticketing, idempotentCommandHandler)
{
}

protected override async Task<object> InnerHandle(CreateOsloSnapshotsLambdaRequest request, CancellationToken cancellationToken)
{
var cmd = request.ToCommand();

try
{
await IdempotentCommandHandler.Dispatch(
cmd.CreateCommandId(),
cmd,
request.Metadata!,
cancellationToken);
}
catch (IdempotencyException)
{
// Idempotent: Do Nothing return last etag
}

return "done";
}

protected override TicketError? MapDomainException(DomainException exception, CreateOsloSnapshotsLambdaRequest request) => null;

protected override Task HandleAggregateIdIsNotFoundException(CreateOsloSnapshotsLambdaRequest request, CancellationToken cancellationToken)
{
throw new NotImplementedException();
}

protected override Task ValidateIfMatchHeaderValue(CreateOsloSnapshotsLambdaRequest request, CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ await mediator.Send(
cancellationToken);
break;

case CreateOsloSnapshotsSqsRequest request:
await mediator.Send(
new CreateOsloSnapshotsLambdaRequest(messageMetadata.MessageGroupId!, request),
cancellationToken);
break;

default:
throw new NotImplementedException(
$"{sqsRequest.GetType().Name} has no corresponding SqsLambdaRequest defined.");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
namespace ParcelRegistry.Api.BackOffice.Handlers.Lambda.Requests
{
using Abstractions.Requests;
using Abstractions.SqsRequests;
using AllStream.Commands;
using Be.Vlaanderen.Basisregisters.Sqs.Lambda.Requests;

public sealed record CreateOsloSnapshotsLambdaRequest : SqsLambdaRequest
{
public CreateOsloSnapshotsRequest Request { get; }


public CreateOsloSnapshotsLambdaRequest(
string messageGroupId,
CreateOsloSnapshotsSqsRequest sqsRequest)
: base(
messageGroupId,
sqsRequest.TicketId,
null,
sqsRequest.ProvenanceData.ToProvenance(),
sqsRequest.Metadata)
{
Request = sqsRequest.Request;
}

/// <summary>
/// Map to CreateOsloSnapshots command
/// </summary>
/// <returns>CreateOsloSnapshots</returns>
public CreateOsloSnapshots ToCommand()
{
return new CreateOsloSnapshots(
Request.CaPaKeys.Select(x => new VbrCaPaKey(x)),
Provenance);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace ParcelRegistry.Api.BackOffice.Handlers
using Parcel;
using TicketingService.Abstractions;

public class AttachAddressHandler : SqsHandler<AttachAddressSqsRequest>
public sealed class AttachAddressHandler : SqsHandler<AttachAddressSqsRequest>
{
public const string Action = "AttachAddressParcel";

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
namespace ParcelRegistry.Api.BackOffice.Handlers
{
using System.Collections.Generic;
using Abstractions.SqsRequests;
using AllStream;
using Be.Vlaanderen.Basisregisters.Sqs;
using Be.Vlaanderen.Basisregisters.Sqs.Handlers;
using TicketingService.Abstractions;

public sealed class CreateOsloSnapshotsHandler : SqsHandler<CreateOsloSnapshotsSqsRequest>
{
public const string Action = "CreateOsloSnapshots";

public CreateOsloSnapshotsHandler(
ISqsQueue sqsQueue,
ITicketing ticketing,
ITicketingUrl ticketingUrl) : base(sqsQueue, ticketing, ticketingUrl)
{ }

protected override string? WithAggregateId(CreateOsloSnapshotsSqsRequest request)
{
return AllStreamId.Instance;
}

protected override IDictionary<string, string> WithTicketMetadata(string aggregateId, CreateOsloSnapshotsSqsRequest sqsRequest)
{
return new Dictionary<string, string>
{
{ RegistryKey, nameof(ParcelRegistry) },
{ ActionKey, Action },
{ AggregateIdKey, aggregateId }
};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace ParcelRegistry.Api.BackOffice.Handlers
using Parcel;
using TicketingService.Abstractions;

public class DetachAddressHandler : SqsHandler<DetachAddressSqsRequest>
public sealed class DetachAddressHandler : SqsHandler<DetachAddressSqsRequest>
{
public const string Action = "DetachAddressParcel";

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
namespace ParcelRegistry.Api.BackOffice
{
using System.Threading;
using System.Threading.Tasks;
using Abstractions.Requests;
using Abstractions.SqsRequests;
using Be.Vlaanderen.Basisregisters.Auth.AcmIdm;
using Be.Vlaanderen.Basisregisters.GrAr.Provenance;
using Microsoft.AspNetCore.Authentication.JwtBearer;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;

public partial class ParcelController
{
/// <summary>
/// Creëer nieuwe OSLO snapshots.
/// </summary>
/// <param name="request"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
[HttpPost("acties/oslosnapshots")]
[Authorize(AuthenticationSchemes = JwtBearerDefaults.AuthenticationScheme, Policy = PolicyNames.Adres.InterneBijwerker)]
public async Task<IActionResult> CreateOsloSnapshots(
[FromBody] CreateOsloSnapshotsRequest request,
CancellationToken cancellationToken = default)
{
var provenance = _provenanceFactory.Create(new Reason(request.Reden), Modification.Unknown);

var sqsRequest = new CreateOsloSnapshotsSqsRequest
{
Request = request,
Metadata = GetMetadata(),
ProvenanceData = new ProvenanceData(provenance)
};

var sqsResult = await _mediator.Send(sqsRequest, cancellationToken);

return Accepted(sqsResult);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
namespace ParcelRegistry.Infrastructure.Modules
{
using Be.Vlaanderen.Basisregisters.AggregateSource;
using Be.Vlaanderen.Basisregisters.CommandHandling;
using Autofac;
using Be.Vlaanderen.Basisregisters.CommandHandling;
using Legacy;
using Microsoft.Extensions.Configuration;
using Parcel;

public class CommandHandlingModule : Module
{
Expand All @@ -17,8 +16,9 @@ protected override void Load(ContainerBuilder builder)
{
builder.RegisterModule(new AggregateSourceModule(_configuration));

Legacy.CommandHandlerModules.Register(builder);
CommandHandlerModules.Register(builder);
ParcelRegistry.Parcel.CommandHandlerModules.Register(builder);
AllStream.CommandHandlerModules.Register(builder);

builder
.RegisterType<CommandHandlerResolver>()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
namespace ParcelRegistry.Infrastructure.Modules
{
using AllStream;
using Autofac;
using Parcel;
using Repositories;
Expand All @@ -16,6 +17,10 @@ protected override void Load(ContainerBuilder builder)
builder
.RegisterType<Parcels>()
.As<IParcels>();

builder
.RegisterType<AllStreamRepository>()
.As<IAllStreamRepository>();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
namespace ParcelRegistry.Infrastructure.Repositories
{
using AllStream;
using Be.Vlaanderen.Basisregisters.AggregateSource;
using Be.Vlaanderen.Basisregisters.AggregateSource.SqlStreamStore;
using Be.Vlaanderen.Basisregisters.EventHandling;
using Parcel;
using SqlStreamStore;

public class AllStreamRepository : Repository<AllStream, AllStreamId>, IAllStreamRepository
{
public AllStreamRepository(
ConcurrentUnitOfWork unitOfWork,
IStreamStore eventStore,
EventMapping eventMapping,
EventDeserializer eventDeserializer)
: base(() => new AllStream(), unitOfWork, eventStore, eventMapping, eventDeserializer) { }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ private void RegisterProjections(ContainerBuilder builder)
_loggerFactory)
.RegisterProjections<ProducerProjections, ProducerContext>(c =>
{
var bootstrapServers = _configuration["Kafka:BootstrapServers"];
var bootstrapServers = _configuration["Kafka:BootstrapServers"]!;
var topic = $"{_configuration[ProducerProjections.TopicKey]}" ?? throw new ArgumentException($"Configuration has no value for {ProducerProjections.TopicKey}");
var producerOptions = new ProducerOptions(
new BootstrapServers(bootstrapServers),
Expand All @@ -95,18 +95,21 @@ private void RegisterProjections(ContainerBuilder builder)
&& !string.IsNullOrEmpty(_configuration["Kafka:SaslPassword"]))
{
producerOptions.ConfigureSaslAuthentication(new SaslAuthentication(
_configuration["Kafka:SaslUserName"],
_configuration["Kafka:SaslPassword"]));
_configuration["Kafka:SaslUserName"]!,
_configuration["Kafka:SaslPassword"]!));
}

var osloProxy = c.Resolve<IOsloProxy>();

return new ProducerProjections(
new Producer(producerOptions),
new SnapshotManager(
c.Resolve<ILoggerFactory>(),
c.Resolve<IOsloProxy>(),
osloProxy,
SnapshotManagerOptions.Create(
_configuration["RetryPolicy:MaxRetryWaitIntervalSeconds"],
_configuration["RetryPolicy:RetryBackoffFactor"])));
_configuration["RetryPolicy:MaxRetryWaitIntervalSeconds"]!,
_configuration["RetryPolicy:RetryBackoffFactor"]!)),
osloProxy);
},
connectedProjectionSettings);
}
Expand Down
21 changes: 20 additions & 1 deletion src/ParcelRegistry.Producer.Snapshot.Oslo/ProducerProjections.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ namespace ParcelRegistry.Producer.Snapshot.Oslo
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using AllStream.Events;
using Be.Vlaanderen.Basisregisters.GrAr.Oslo.SnapshotProducer;
using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka;
using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Producer;
Expand All @@ -18,10 +19,28 @@ public sealed class ProducerProjections : ConnectedProjection<ProducerContext>

private readonly IProducer _producer;

public ProducerProjections(IProducer producer, ISnapshotManager snapshotManager)
public ProducerProjections(
IProducer producer,
ISnapshotManager snapshotManager,
IOsloProxy osloProxy)
{
_producer = producer;

When<Be.Vlaanderen.Basisregisters.ProjectionHandling.SqlStreamStore.Envelope<ParcelOsloSnapshotsWereRequested>>(async (_, message, ct) =>
{
foreach (var caPaKey in message.Message.ParcelIdsWithCapaKey.Values)
{
if (ct.IsCancellationRequested)
{
break;
}
await FindAndProduce(async () =>
await osloProxy.GetSnapshot(caPaKey, ct),
message.Position,
ct);
}
});

When<Be.Vlaanderen.Basisregisters.ProjectionHandling.SqlStreamStore.Envelope<ParcelWasMigrated>>(async (_, message, ct) =>
{
await FindAndProduce(async () =>
Expand Down
17 changes: 17 additions & 0 deletions src/ParcelRegistry/AllStream/AllStream.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
namespace ParcelRegistry.AllStream
{
using System.Collections.Generic;
using System.Linq;
using Be.Vlaanderen.Basisregisters.AggregateSource;
using Events;
using Parcel;

public sealed class AllStream : AggregateRootEntity
{
public void CreateOsloSnapshots(IReadOnlyList<VbrCaPaKey> caPaKeys)
{
ApplyChange(new ParcelOsloSnapshotsWereRequested(
caPaKeys.ToDictionary(ParcelId.CreateFor, x => x)));
}
}
}
Loading

0 comments on commit 1a47eea

Please sign in to comment.