From 73976b94182ea1a81fb2739edd84715fcf7dfab2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexandre=20Del=C3=A8gue?= Date: Fri, 8 Dec 2023 16:54:07 +0100 Subject: [PATCH] Publishing hang --- .../eventsourcing/InMemoryEventStore.java | 23 +-- .../ReactorKafkaEventPublisher.java | 16 +- .../KafkaEventPublisherTest.java | 184 +++--------------- 3 files changed, 36 insertions(+), 187 deletions(-) diff --git a/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/InMemoryEventStore.java b/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/InMemoryEventStore.java index 7429f5da..272df479 100644 --- a/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/InMemoryEventStore.java +++ b/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/InMemoryEventStore.java @@ -27,6 +27,8 @@ public class InMemoryEventStore implements EventStore, E, Meta, Context> { ConcurrentHashMap> store = new ConcurrentHashMap<>(); + + AtomicLong sequenceNums = new AtomicLong(0); private final Supplier> markAsPublishedTx; private final Supplier> markAsPublished; @@ -45,6 +47,7 @@ public InMemoryEventStore(EventEnvelope... events) { this(NOOP, NOOP, events); } + @SafeVarargs public static InMemoryEventStore create(EventEnvelope... events) { return new InMemoryEventStore<>(events); } @@ -97,22 +100,6 @@ public CompletionStage> markAsPublished(Transact }); } -// @Override -// public CompletionStage>> markAsPublished(List> eventEnvelopes) { -// return markAsPublished.get().thenCompose(any -> { -// return CompletableFuture.completedStage(eventEnvelopes.map(eventEnvelope -> { -// return store.compute(eventEnvelope.sequenceNum, (k, event) -> { -// if (event == null) { -// return eventEnvelope.copy().withPublished(true).build(); -// } else { -// return event.copy().withPublished(true).build(); -// } -// }); -// })); -// } -// ); -// } - @Override public CompletionStage> markAsPublished(EventEnvelope eventEnvelope) { return markAsPublished.get().thenCompose(any -> @@ -148,7 +135,9 @@ public CompletionStage commitOrRollback(Option of, Transactio @Override public CompletionStage nextSequence(InMemoryEventStore.Transaction tx) { - return CompletableFuture.completedStage(store.values().stream().map(e -> e.sequenceNum).max(Comparator.comparingLong(e -> e)).orElse(0L) + 1); + long value = store.values().stream().map(e -> e.sequenceNum).max(Comparator.comparingLong(e -> e)).orElse(0L) + 1; + sequenceNums.incrementAndGet(); + return CompletableFuture.completedStage(sequenceNums.accumulateAndGet(value, Math::max)); } @Override diff --git a/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher.java b/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher.java index 60630d2c..4cca9dfb 100644 --- a/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher.java +++ b/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher.java @@ -172,31 +172,17 @@ private Function>, Flux Mono fromCS(Supplier> cs) { return Mono.fromFuture(() -> cs.get().toCompletableFuture()); -// return Mono.create(s -> -// cs.get().whenComplete((r, e) -> { -// if (e != null) { -// s.error(e); -// } else { -// s.success(r); -// } -// }) -// ); } @Override public CompletionStage publish(List> events) { -// LOGGER.debug("Publishing event in memory : \n{} ", events); -// return Mono.fromCallable(() -> { -// reactorQueue.offer(events); -// return Tuple.empty(); -// }).publishOn(Schedulers.boundedElastic()).toFuture(); return Flux .fromIterable(events) .map(t -> { queue.tryEmitNext(t).orThrow(); return Tuple.empty(); }) - .retryWhen(Retry.fixedDelay(50, Duration.ofMillis(2)) + .retryWhen(Retry.fixedDelay(50, Duration.ofMillis(1)) .transientErrors(true) .doBeforeRetry(ctx -> { LOGGER.error("Error publishing events in memory queue for topic %s retrying for the %s time".formatted(topic, ctx.totalRetries() + 1), ctx.failure()); diff --git a/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/KafkaEventPublisherTest.java b/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/KafkaEventPublisherTest.java index 28159e8f..0ab5ad1e 100644 --- a/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/KafkaEventPublisherTest.java +++ b/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/KafkaEventPublisherTest.java @@ -2,7 +2,9 @@ import com.fasterxml.jackson.databind.JsonNode; import fr.maif.concurrent.CompletionStages; -import fr.maif.eventsourcing.*; +import fr.maif.eventsourcing.Event; +import fr.maif.eventsourcing.EventEnvelope; +import fr.maif.eventsourcing.Type; import fr.maif.eventsourcing.format.JacksonEventFormat; import fr.maif.eventsourcing.format.JacksonSimpleFormat; import fr.maif.json.EventEnvelopeJson; @@ -10,11 +12,9 @@ import fr.maif.kafka.JsonDeserializer; import fr.maif.kafka.JsonSerializer; import fr.maif.reactor.KafkaContainerTest; -import io.vavr.*; +import io.vavr.API; import io.vavr.collection.List; -import io.vavr.collection.Stream; import io.vavr.control.Either; -import io.vavr.control.Option; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerConfig; @@ -25,7 +25,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mockito; -import org.reactivestreams.Publisher; import org.testcontainers.junit.jupiter.Testcontainers; import reactor.core.publisher.Flux; import reactor.kafka.receiver.KafkaReceiver; @@ -36,10 +35,11 @@ import java.time.Duration; import java.time.LocalDateTime; import java.time.temporal.ChronoUnit; -import java.util.*; -import java.util.concurrent.CompletableFuture; +import java.util.Map; +import java.util.Objects; +import java.util.StringJoiner; +import java.util.UUID; import java.util.concurrent.CompletionStage; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; @@ -75,17 +75,11 @@ public void eventConsumption() throws IOException, InterruptedException { String topic = createTopic("eventConsumption", 5, 1); ReactorKafkaEventPublisher publisher = createPublisher(topic); - EventStore eventStore = mock(EventStore.class); + InMemoryEventStore eventStore = spy(new InMemoryEventStore<>()); - when(eventStore.openTransaction()).thenReturn(CompletionStages.successful(Tuple.empty())); - when(eventStore.commitOrRollback(any(), any())).thenReturn(CompletionStages.empty()); - when(eventStore.loadEventsUnpublished(any(), any())).thenReturn(emptyTxStream()); - when(eventStore.markAsPublished(Mockito.>>any())).then(i -> CompletionStages.successful(i.getArgument(0))); - when(eventStore.markAsPublished(any(), Mockito.>>any())).then(i -> CompletionStages.successful(i.getArgument(0))); - - EventEnvelope envelope1 = eventEnvelope("value 1"); - EventEnvelope envelope2 = eventEnvelope("value 2"); - EventEnvelope envelope3 = eventEnvelope("value 3"); + EventEnvelope envelope1 = eventEnvelopeUnpublished("value 1"); + EventEnvelope envelope2 = eventEnvelopeUnpublished("value 2"); + EventEnvelope envelope3 = eventEnvelopeUnpublished("value 3"); publisher.start(eventStore, NO_STRATEGY); @@ -98,14 +92,15 @@ public void eventConsumption() throws IOException, InterruptedException { .receive() .map(ConsumerRecord::value) .map(KafkaEventPublisherTest::deserialize) - .take(3) + .bufferTimeout(50, Duration.ofSeconds(4)) .map(e -> { println(e); return e; }) - .timeout(Duration.of(60, ChronoUnit.SECONDS)) + .take(1) + .timeout(Duration.of(30, ChronoUnit.SECONDS)) .collectList() - .map(List::ofAll) + .map(l -> List.ofAll(l).flatMap(identity())) .toFuture(); publisher.publish(List.of( @@ -124,31 +119,18 @@ public void eventConsumption() throws IOException, InterruptedException { publisher.close(); } - private Publisher emptyTxStream() { - return Flux.empty(); - } - - private Publisher txStream(T... values) { - return Flux.fromIterable(List.of(values)); - } - - @Test @SuppressWarnings("unchecked") public void eventConsumptionWithEventFromDb() throws IOException, InterruptedException { String topic = createTopic("eventConsumptionWithEventFromDb", 5, 1); + ReactorKafkaEventPublisher publisher = createPublisher(topic); - EventStore eventStore = mock(EventStore.class); - when(eventStore.openTransaction()).thenReturn(CompletionStages.successful(Tuple.empty())); - when(eventStore.commitOrRollback(any(), any())).thenReturn(CompletionStages.empty()); - when(eventStore.loadEventsUnpublished(any(), any())).thenReturn(txStream( - eventEnvelope("value 1"), - eventEnvelope("value 2"), - eventEnvelope("value 3") + InMemoryEventStore eventStore = spy(new InMemoryEventStore<>( + eventEnvelopeUnpublished("value 1"), + eventEnvelopeUnpublished("value 2"), + eventEnvelopeUnpublished("value 3") )); - when(eventStore.markAsPublished(eq(Tuple.empty()), Mockito.>>any())).then(i -> CompletionStages.successful(i.getArgument(1))); - when(eventStore.markAsPublished(Mockito.>>any())).then(i -> CompletionStages.successful(i.getArgument(0))); publisher.start(eventStore, NO_STRATEGY); @@ -161,16 +143,17 @@ public void eventConsumptionWithEventFromDb() throws IOException, InterruptedExc ) .receive() .map(ConsumerRecord::value) - .take(6) - .timeout(Duration.of(20, ChronoUnit.SECONDS)) + .bufferTimeout(50, Duration.ofSeconds(4)) + .take(1) + .timeout(Duration.of(30, ChronoUnit.SECONDS)) .collectList() - .map(List::ofAll) + .map(l -> List.ofAll(l).flatMap(identity())) .toFuture(); publisher.publish(List.of( - eventEnvelope("value 4"), - eventEnvelope("value 5"), - eventEnvelope("value 6") + eventEnvelopeUnpublished("value 4"), + eventEnvelopeUnpublished("value 5"), + eventEnvelopeUnpublished("value 6") )).toCompletableFuture().join(); List events = results.toCompletableFuture().join(); @@ -183,105 +166,6 @@ public void eventConsumptionWithEventFromDb() throws IOException, InterruptedExc publisher.close(); } -// -// @Test -// @SuppressWarnings("unchecked") -// public void testRestart() throws IOException, InterruptedException { -// -// AtomicInteger failed = new AtomicInteger(0); -// AtomicInteger streamCount = new AtomicInteger(0); -// String topic = createTopic("testRestart", 5, 1); -// ReactorKafkaEventPublisher publisher = createPublisher(topic); -// -// Supplier>> eventsFlux = () -> KafkaReceiver -// .create(receiverDefault() -// .consumerProperty(ConsumerConfig.GROUP_ID_CONFIG, "testRestart") -// .consumerProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") -// .consumerProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") -// .subscription(List.of(topic).toJavaList())) -// .receive() -// .map(ConsumerRecord::value) -// .map(KafkaEventPublisherTest::deserialize); -// -// EventEnvelope envelope1 = eventEnvelope("value 1"); -// EventEnvelope envelope2 = eventEnvelope("value 2"); -// EventEnvelope envelope3 = eventEnvelope("value 3"); -// EventEnvelope envelope4 = eventEnvelope("value 4"); -// EventEnvelope envelope5 = eventEnvelope("value 5"); -// EventEnvelope envelope6 = eventEnvelope("value 6"); -// EventEnvelope envelope7 = eventEnvelope("value 7"); -// EventEnvelope envelope8 = eventEnvelope("value 8"); -// EventEnvelope envelope9 = eventEnvelope("value 9"); -// EventStore eventStore = mock(EventStore.class); -// -// when(eventStore.loadEventsUnpublished(any(), any())) -// .thenReturn(txStream(envelope1, envelope2, envelope3)) -// .thenReturn(txStream(envelope1, envelope2, envelope3)) -// .thenReturn(emptyTxStream()); -// -// when(eventStore.markAsPublished(any(), Mockito.>>any())) -// .then(i -> { -// if (failed.incrementAndGet() > 1) { -// return CompletionStages.successful(i.getArgument(1)); -// } else { -// return CompletionStages.failed(new RuntimeException("Oups "+failed.get())); -// } -// }); -// when(eventStore.markAsPublished(Mockito.>>any())) -// .then(i -> { -// int count = streamCount.incrementAndGet(); -// Object argument = i.getArgument(0); -// System.out.println("Count "+count+" - "+argument); -// if (count == 1) { -// return CompletionStages.failed(new RuntimeException("Oups stream "+count)); -// } else { -// return CompletionStages.successful(argument); -// } -// }); -// -// publisher.start(eventStore, SKIP); -// -// Thread.sleep(200); -// -// -// CompletionStage>> results = eventsFlux.get() -// .bufferTimeout(10, Duration.ofSeconds(10)) -// .take(1) -// .timeout(Duration.of(30, ChronoUnit.SECONDS)) -// .collectList() -// .map(l -> List.ofAll(l).flatMap(identity())) -// .toFuture(); -// -// -// publisher.publish(List(envelope4, envelope5, envelope6)); -// -// List> events = results.toCompletableFuture().join(); -// -// println(events.mkString("\n")); -// -// assertThat(events).containsExactly(envelope1, envelope2, envelope3, envelope1, envelope2, envelope3, envelope4, envelope5, envelope6); -// -// verify(eventStore, times(3)).openTransaction(); -// verify(eventStore, times(2)).markAsPublished(any(), Mockito.>>any()); -// verify(eventStore, times(1)).markAsPublished(Mockito.>>any()); -// -// -// publisher.publish(List(envelope7, envelope8, envelope9)); -// List> resultsAfterCrash = eventsFlux.get() -// .bufferTimeout(12, Duration.ofSeconds(10)) -// .take(1) -// .timeout(Duration.of(30, ChronoUnit.SECONDS)) -// .collectList() -// .map(l -> List.ofAll(l).flatMap(identity())) -// .block(); -// println(resultsAfterCrash.mkString("\n")); -// -// assertThat(resultsAfterCrash).containsExactly(envelope1, envelope2, envelope3, envelope1, envelope2, envelope3, envelope4, envelope5, envelope6, envelope7, envelope8, envelope9); -// -// -// publisher.close(); -// } - @Test @SuppressWarnings("unchecked") public void testRestartWithMock() throws IOException, InterruptedException { @@ -408,6 +292,7 @@ public void testRestartWithMock() throws IOException, InterruptedException { publisher.close(); } + @SafeVarargs private EventEnvelope[] published(EventEnvelope... envelopes) { return List.of(envelopes).map(e -> e.copy().withPublished(true).build()).toJavaArray(EventEnvelope[]::new); } @@ -421,17 +306,6 @@ private static EventEnvelope deserialize(String event) { static AtomicLong sequence = new AtomicLong(); - private EventEnvelope eventEnvelope(String value) { - long sequenceNum = sequence.incrementAndGet(); - String entityId = "entityId"; - return EventEnvelope.builder() - .withEmissionDate(LocalDateTime.now()) - .withId(UUID.randomUUID()) - .withEntityId(entityId) - .withSequenceNum(sequenceNum) - .withEvent(new TestEvent(value, entityId)) - .build(); - } private EventEnvelope eventEnvelopeUnpublished(String value) { long sequenceNum = sequence.incrementAndGet(); String entityId = "entityId";