From f94d4868e52c758acac82f1ea8bc87d3330c57f0 Mon Sep 17 00:00:00 2001 From: ArneD Date: Thu, 5 Dec 2024 09:16:24 +0100 Subject: [PATCH] fix(consumer): remove stopping token in messagehandler --- .../BackOfficeConsumer.cs | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/src/ParcelRegistry.Consumer.Address/BackOfficeConsumer.cs b/src/ParcelRegistry.Consumer.Address/BackOfficeConsumer.cs index 79641938..c1714e13 100644 --- a/src/ParcelRegistry.Consumer.Address/BackOfficeConsumer.cs +++ b/src/ParcelRegistry.Consumer.Address/BackOfficeConsumer.cs @@ -57,14 +57,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) { await _kafkaIdemIdompotencyConsumer.ConsumeContinuously(async (message, context) => { - _logger.LogInformation("Handling next message"); - - await commandHandlingProjector.ProjectAsync(commandHandler, message, stoppingToken).ConfigureAwait(false); - await backOfficeProjector.ProjectAsync(context, message, stoppingToken).ConfigureAwait(false); - - //CancellationToken.None to prevent halfway consumption - await context.SaveChangesAsync(CancellationToken.None); - + await HandleMessage(commandHandlingProjector, commandHandler, message, backOfficeProjector, context); }, stoppingToken); } catch (Exception) @@ -73,5 +66,17 @@ await _kafkaIdemIdompotencyConsumer.ConsumeContinuously(async (message, context) throw; } } + + private async Task HandleMessage(ConnectedProjector commandHandlingProjector, CommandHandler commandHandler, object message, + ConnectedProjector backOfficeProjector, ConsumerAddressContext context) + { + _logger.LogInformation("Handling next message"); + + await commandHandlingProjector.ProjectAsync(commandHandler, message, CancellationToken.None).ConfigureAwait(false); + await backOfficeProjector.ProjectAsync(context, message, CancellationToken.None).ConfigureAwait(false); + + //CancellationToken.None to prevent halfway consumption + await context.SaveChangesAsync(CancellationToken.None); + } } }