Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: create oslo snapshots #779

Merged
merged 1 commit into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
namespace ParcelRegistry.Api.BackOffice.Abstractions.Requests
{
using System.Collections.Generic;

public class CreateOsloSnapshotsRequest
{
public List<string> CaPaKeys { get; set; }

public string Reden { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
namespace ParcelRegistry.Api.BackOffice.Abstractions.SqsRequests
{
using Be.Vlaanderen.Basisregisters.Sqs.Requests;
using Requests;

public class CreateOsloSnapshotsSqsRequest : SqsRequest
{
public CreateOsloSnapshotsRequest Request { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
namespace ParcelRegistry.Api.BackOffice.Handlers.Lambda.Handlers
{
using Be.Vlaanderen.Basisregisters.AggregateSource;
using Be.Vlaanderen.Basisregisters.CommandHandling.Idempotency;
using Be.Vlaanderen.Basisregisters.Sqs.Lambda.Handlers;
using Be.Vlaanderen.Basisregisters.Sqs.Lambda.Infrastructure;
using Requests;
using TicketingService.Abstractions;

public sealed class CreateOsloSnapshotsLambdaHandler : SqsLambdaHandlerBase<CreateOsloSnapshotsLambdaRequest>
{
public CreateOsloSnapshotsLambdaHandler(
ICustomRetryPolicy retryPolicy,
ITicketing ticketing,
IIdempotentCommandHandler idempotentCommandHandler)
: base(retryPolicy, ticketing, idempotentCommandHandler)
{
}

protected override async Task<object> InnerHandle(CreateOsloSnapshotsLambdaRequest request, CancellationToken cancellationToken)
{
var cmd = request.ToCommand();

try
{
await IdempotentCommandHandler.Dispatch(
cmd.CreateCommandId(),
cmd,
request.Metadata!,
cancellationToken);
}
catch (IdempotencyException)
{
// Idempotent: Do Nothing return last etag
}

return "done";
}

protected override TicketError? MapDomainException(DomainException exception, CreateOsloSnapshotsLambdaRequest request) => null;

protected override Task HandleAggregateIdIsNotFoundException(CreateOsloSnapshotsLambdaRequest request, CancellationToken cancellationToken)
{
throw new NotImplementedException();
}

protected override Task ValidateIfMatchHeaderValue(CreateOsloSnapshotsLambdaRequest request, CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ await mediator.Send(
cancellationToken);
break;

case CreateOsloSnapshotsSqsRequest request:
await mediator.Send(
new CreateOsloSnapshotsLambdaRequest(messageMetadata.MessageGroupId!, request),
cancellationToken);
break;

default:
throw new NotImplementedException(
$"{sqsRequest.GetType().Name} has no corresponding SqsLambdaRequest defined.");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
namespace ParcelRegistry.Api.BackOffice.Handlers.Lambda.Requests
{
using Abstractions.Requests;
using Abstractions.SqsRequests;
using AllStream.Commands;
using Be.Vlaanderen.Basisregisters.Sqs.Lambda.Requests;

public sealed record CreateOsloSnapshotsLambdaRequest : SqsLambdaRequest
{
public CreateOsloSnapshotsRequest Request { get; }


public CreateOsloSnapshotsLambdaRequest(
string messageGroupId,
CreateOsloSnapshotsSqsRequest sqsRequest)
: base(
messageGroupId,
sqsRequest.TicketId,
null,
sqsRequest.ProvenanceData.ToProvenance(),
sqsRequest.Metadata)
{
Request = sqsRequest.Request;
}

/// <summary>
/// Map to CreateOsloSnapshots command
/// </summary>
/// <returns>CreateOsloSnapshots</returns>
public CreateOsloSnapshots ToCommand()
{
return new CreateOsloSnapshots(
Request.CaPaKeys.Select(x => new VbrCaPaKey(x)),
Provenance);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace ParcelRegistry.Api.BackOffice.Handlers
using Parcel;
using TicketingService.Abstractions;

public class AttachAddressHandler : SqsHandler<AttachAddressSqsRequest>
public sealed class AttachAddressHandler : SqsHandler<AttachAddressSqsRequest>
{
public const string Action = "AttachAddressParcel";

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
namespace ParcelRegistry.Api.BackOffice.Handlers
{
using System.Collections.Generic;
using Abstractions.SqsRequests;
using AllStream;
using Be.Vlaanderen.Basisregisters.Sqs;
using Be.Vlaanderen.Basisregisters.Sqs.Handlers;
using TicketingService.Abstractions;

public sealed class CreateOsloSnapshotsHandler : SqsHandler<CreateOsloSnapshotsSqsRequest>
{
public const string Action = "CreateOsloSnapshots";

public CreateOsloSnapshotsHandler(
ISqsQueue sqsQueue,
ITicketing ticketing,
ITicketingUrl ticketingUrl) : base(sqsQueue, ticketing, ticketingUrl)
{ }

protected override string? WithAggregateId(CreateOsloSnapshotsSqsRequest request)
{
return AllStreamId.Instance;
}

protected override IDictionary<string, string> WithTicketMetadata(string aggregateId, CreateOsloSnapshotsSqsRequest sqsRequest)
{
return new Dictionary<string, string>
{
{ RegistryKey, nameof(ParcelRegistry) },
{ ActionKey, Action },
{ AggregateIdKey, aggregateId }
};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace ParcelRegistry.Api.BackOffice.Handlers
using Parcel;
using TicketingService.Abstractions;

public class DetachAddressHandler : SqsHandler<DetachAddressSqsRequest>
public sealed class DetachAddressHandler : SqsHandler<DetachAddressSqsRequest>
{
public const string Action = "DetachAddressParcel";

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
namespace ParcelRegistry.Api.BackOffice
{
using System.Threading;
using System.Threading.Tasks;
using Abstractions.Requests;
using Abstractions.SqsRequests;
using Be.Vlaanderen.Basisregisters.Auth.AcmIdm;
using Be.Vlaanderen.Basisregisters.GrAr.Provenance;
using Microsoft.AspNetCore.Authentication.JwtBearer;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;

public partial class ParcelController
{
/// <summary>
/// Creëer nieuwe OSLO snapshots.
/// </summary>
/// <param name="request"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
[HttpPost("acties/oslosnapshots")]
[Authorize(AuthenticationSchemes = JwtBearerDefaults.AuthenticationScheme, Policy = PolicyNames.Adres.InterneBijwerker)]
public async Task<IActionResult> CreateOsloSnapshots(
[FromBody] CreateOsloSnapshotsRequest request,
CancellationToken cancellationToken = default)
{
var provenance = _provenanceFactory.Create(new Reason(request.Reden), Modification.Unknown);

var sqsRequest = new CreateOsloSnapshotsSqsRequest
{
Request = request,
Metadata = GetMetadata(),
ProvenanceData = new ProvenanceData(provenance)
};

var sqsResult = await _mediator.Send(sqsRequest, cancellationToken);

return Accepted(sqsResult);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
namespace ParcelRegistry.Infrastructure.Modules
{
using Be.Vlaanderen.Basisregisters.AggregateSource;
using Be.Vlaanderen.Basisregisters.CommandHandling;
using Autofac;
using Be.Vlaanderen.Basisregisters.CommandHandling;
using Legacy;
using Microsoft.Extensions.Configuration;
using Parcel;

public class CommandHandlingModule : Module
{
Expand All @@ -17,8 +16,9 @@ protected override void Load(ContainerBuilder builder)
{
builder.RegisterModule(new AggregateSourceModule(_configuration));

Legacy.CommandHandlerModules.Register(builder);
CommandHandlerModules.Register(builder);
ParcelRegistry.Parcel.CommandHandlerModules.Register(builder);
AllStream.CommandHandlerModules.Register(builder);

builder
.RegisterType<CommandHandlerResolver>()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
namespace ParcelRegistry.Infrastructure.Modules
{
using AllStream;
using Autofac;
using Parcel;
using Repositories;
Expand All @@ -16,6 +17,10 @@ protected override void Load(ContainerBuilder builder)
builder
.RegisterType<Parcels>()
.As<IParcels>();

builder
.RegisterType<AllStreamRepository>()
.As<IAllStreamRepository>();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
namespace ParcelRegistry.Infrastructure.Repositories
{
using AllStream;
using Be.Vlaanderen.Basisregisters.AggregateSource;
using Be.Vlaanderen.Basisregisters.AggregateSource.SqlStreamStore;
using Be.Vlaanderen.Basisregisters.EventHandling;
using Parcel;
using SqlStreamStore;

public class AllStreamRepository : Repository<AllStream, AllStreamId>, IAllStreamRepository
{
public AllStreamRepository(
ConcurrentUnitOfWork unitOfWork,
IStreamStore eventStore,
EventMapping eventMapping,
EventDeserializer eventDeserializer)
: base(() => new AllStream(), unitOfWork, eventStore, eventMapping, eventDeserializer) { }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ private void RegisterProjections(ContainerBuilder builder)
_loggerFactory)
.RegisterProjections<ProducerProjections, ProducerContext>(c =>
{
var bootstrapServers = _configuration["Kafka:BootstrapServers"];
var bootstrapServers = _configuration["Kafka:BootstrapServers"]!;
var topic = $"{_configuration[ProducerProjections.TopicKey]}" ?? throw new ArgumentException($"Configuration has no value for {ProducerProjections.TopicKey}");
var producerOptions = new ProducerOptions(
new BootstrapServers(bootstrapServers),
Expand All @@ -95,18 +95,21 @@ private void RegisterProjections(ContainerBuilder builder)
&& !string.IsNullOrEmpty(_configuration["Kafka:SaslPassword"]))
{
producerOptions.ConfigureSaslAuthentication(new SaslAuthentication(
_configuration["Kafka:SaslUserName"],
_configuration["Kafka:SaslPassword"]));
_configuration["Kafka:SaslUserName"]!,
_configuration["Kafka:SaslPassword"]!));
}

var osloProxy = c.Resolve<IOsloProxy>();

return new ProducerProjections(
new Producer(producerOptions),
new SnapshotManager(
c.Resolve<ILoggerFactory>(),
c.Resolve<IOsloProxy>(),
osloProxy,
SnapshotManagerOptions.Create(
_configuration["RetryPolicy:MaxRetryWaitIntervalSeconds"],
_configuration["RetryPolicy:RetryBackoffFactor"])));
_configuration["RetryPolicy:MaxRetryWaitIntervalSeconds"]!,
_configuration["RetryPolicy:RetryBackoffFactor"]!)),
osloProxy);
},
connectedProjectionSettings);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ namespace ParcelRegistry.Producer.Snapshot.Oslo
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using AllStream.Events;
using Be.Vlaanderen.Basisregisters.GrAr.Oslo.SnapshotProducer;
using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka;
using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Producer;
Expand All @@ -18,10 +19,28 @@ public sealed class ProducerProjections : ConnectedProjection<ProducerContext>

private readonly IProducer _producer;

public ProducerProjections(IProducer producer, ISnapshotManager snapshotManager)
public ProducerProjections(
IProducer producer,
ISnapshotManager snapshotManager,
IOsloProxy osloProxy)
{
_producer = producer;

When<Be.Vlaanderen.Basisregisters.ProjectionHandling.SqlStreamStore.Envelope<ParcelOsloSnapshotsWereRequested>>(async (_, message, ct) =>
{
foreach (var caPaKey in message.Message.ParcelIdsWithCapaKey.Values)
{
if (ct.IsCancellationRequested)
{
break;
}
await FindAndProduce(async () =>
await osloProxy.GetSnapshot(caPaKey, ct),
message.Position,
ct);
}
});

When<Be.Vlaanderen.Basisregisters.ProjectionHandling.SqlStreamStore.Envelope<ParcelWasMigrated>>(async (_, message, ct) =>
{
await FindAndProduce(async () =>
Expand Down
17 changes: 17 additions & 0 deletions src/ParcelRegistry/AllStream/AllStream.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
namespace ParcelRegistry.AllStream
{
using System.Collections.Generic;
using System.Linq;
using Be.Vlaanderen.Basisregisters.AggregateSource;
using Events;
using Parcel;

public sealed class AllStream : AggregateRootEntity
{
public void CreateOsloSnapshots(IReadOnlyList<VbrCaPaKey> caPaKeys)
{
ApplyChange(new ParcelOsloSnapshotsWereRequested(
caPaKeys.ToDictionary(ParcelId.CreateFor, x => x)));
}
}
}
Loading
Loading