From 73ad1472f09ae601a68109ed14b36509a57ba5b7 Mon Sep 17 00:00:00 2001 From: Delegue Alexandre Date: Fri, 8 Dec 2023 17:12:29 +0100 Subject: [PATCH] Publishing hang (#62) * Publishing hang in reactor publisher. The queue stop and never go back --- build.gradle | 4 +- .../eventsourcing/InMemoryEventStore.java | 8 + thoth-core-reactor/build.gradle | 1 + .../eventsourcing/InMemoryEventStore.java | 133 ++++++++--- .../ReactorKafkaEventPublisher.java | 152 +++++++----- .../src/test/java/fr/maif/Helpers.java | 16 +- .../DefaultAggregateStoreTest.java | 23 +- .../eventsourcing/EventProcessorTest.java | 54 ++--- .../KafkaEventPublisherTest.java | 218 ++++++++++-------- .../fr/maif/eventsourcing/EventStore.java | 2 + thoth-jooq-akka/build.gradle | 11 +- .../ReactivePostgresEventStore.java | 12 +- thoth-jooq-reactor/build.gradle | 12 +- .../ReactivePostgresEventStore.java | 29 ++- .../java/fr/maif/eventsourcing/SimpleDb.java | 4 + .../AbstractPostgresEventStoreTest.java | 11 +- .../impl/PostgresEventStore.java | 13 +- thoth-kafka-consumer-reactor/build.gradle | 1 + 18 files changed, 436 insertions(+), 268 deletions(-) diff --git a/build.gradle b/build.gradle index ff74302c..034539ab 100644 --- a/build.gradle +++ b/build.gradle @@ -36,8 +36,8 @@ subprojects { jooqAsyncVersion = "2.0.0" functionalJsonVersion = "1.0.3" kafkaVersion = "3.0.1" - reactorKafkaVersion = "1.3.12" - reactorVersion = "3.4.23" + reactorKafkaVersion = "1.3.22" + reactorVersion = "3.5.7" vertxSqlVersion = "4.3.3" } diff --git a/thoth-core-akka/src/main/java/fr/maif/akka/eventsourcing/InMemoryEventStore.java b/thoth-core-akka/src/main/java/fr/maif/akka/eventsourcing/InMemoryEventStore.java index b171df5c..6f1978b6 100644 --- a/thoth-core-akka/src/main/java/fr/maif/akka/eventsourcing/InMemoryEventStore.java +++ b/thoth-core-akka/src/main/java/fr/maif/akka/eventsourcing/InMemoryEventStore.java @@ -18,6 +18,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Comparator; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; @@ -55,6 +56,13 @@ public static InMemoryEventStore(system); } + @Override + public CompletionStage lastPublishedSequence() { + return CompletableFuture.completedStage(eventStore.stream().filter(e -> e.published).map(e -> e.sequenceNum) + .max(Comparator.comparingLong(e -> e)) + .orElse(0L)); + } + @Override public Publisher> loadEventsUnpublished(Tuple0 tx, ConcurrentReplayStrategy concurrentReplayStrategy) { return Source.>empty().runWith(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), system); diff --git a/thoth-core-reactor/build.gradle b/thoth-core-reactor/build.gradle index 780a4c6c..ce09d534 100644 --- a/thoth-core-reactor/build.gradle +++ b/thoth-core-reactor/build.gradle @@ -7,6 +7,7 @@ dependencies { implementation "io.projectreactor.kafka:reactor-kafka:$reactorKafkaVersion" implementation("io.vavr:vavr:$vavrVersion") implementation("io.vavr:vavr-jackson:$vavrVersion") + implementation('org.slf4j:slf4j-api:2.0.7') implementation("com.fasterxml.uuid:java-uuid-generator:3.1.5") implementation("com.fasterxml.jackson.datatype:jackson-datatype-jdk8:$jacksonVersion") implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310:$jacksonVersion") diff --git a/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/InMemoryEventStore.java b/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/InMemoryEventStore.java index f170b986..272df479 100644 --- a/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/InMemoryEventStore.java +++ b/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/InMemoryEventStore.java @@ -5,9 +5,11 @@ import fr.maif.eventsourcing.EventEnvelope; import fr.maif.eventsourcing.EventPublisher; import fr.maif.eventsourcing.EventStore; +import io.vavr.API; import io.vavr.Tuple; import io.vavr.Tuple0; import io.vavr.collection.List; +import io.vavr.collection.Stream; import io.vavr.control.Option; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; @@ -15,96 +17,159 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Comparator; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; -public class InMemoryEventStore implements EventStore { +public class InMemoryEventStore implements EventStore, E, Meta, Context> { - private java.util.List> eventStore = new ArrayList<>(); + ConcurrentHashMap> store = new ConcurrentHashMap<>(); - private final Sinks.Many> queue; - private final Flux> realTimeEvents; + AtomicLong sequenceNums = new AtomicLong(0); + private final Supplier> markAsPublishedTx; + private final Supplier> markAsPublished; - private AtomicLong sequence_num = new AtomicLong(0); + private final static Supplier> NOOP = () -> CompletableFuture.completedStage(Tuple.empty()); - private final ConcurrentHashMap offsets = new ConcurrentHashMap<>(); + public InMemoryEventStore(Supplier> markAsPublishedTx, + Supplier> markAsPublished, + EventEnvelope... events) { - public InMemoryEventStore() { - this.queue = Sinks.many().multicast().onBackpressureBuffer(10000); - this.realTimeEvents = queue.asFlux(); + this.markAsPublishedTx = markAsPublishedTx; + this.markAsPublished = markAsPublished; + Stream.of(events).forEach(e -> store.put(e.sequenceNum, e)); } - public static InMemoryEventStore create() { - return new InMemoryEventStore<>(); + public InMemoryEventStore(EventEnvelope... events) { + this(NOOP, NOOP, events); + } + + @SafeVarargs + public static InMemoryEventStore create(EventEnvelope... events) { + return new InMemoryEventStore<>(events); + } + + + public record Transaction(ArrayList> events, + ArrayList> toPublish) { + + public Transaction() { + this(new ArrayList<>(), new ArrayList<>()); + } + + public static Transaction newTx() { + return new Transaction<>(); + } + + public Tuple0 addAll(java.util.List> events) { + this.events.addAll(events); + return API.Tuple(); + } } @Override - public Publisher> loadEventsUnpublished(Tuple0 tx, ConcurrentReplayStrategy concurrentReplayStrategy) { - return Flux.empty(); + public Publisher> loadEventsUnpublished(Transaction tx, ConcurrentReplayStrategy concurrentReplayStrategy) { + return Flux.fromIterable(store.values().stream().sorted(Comparator.comparingLong(e -> e.sequenceNum)).toList()) + .filter(e -> Boolean.FALSE.equals(e.published)); } @Override - public CompletionStage> markAsPublished(Tuple0 tx, EventEnvelope eventEnvelope) { - return markAsPublished(eventEnvelope); + public CompletionStage lastPublishedSequence() { + AtomicLong max = new AtomicLong(0L); + store.values().forEach(k -> { + if (k.published) { + max.accumulateAndGet(k.sequenceNum, Math::max); + } + }); + return CompletableFuture.completedStage(max.get()); } @Override - public CompletionStage openTransaction() { - return CompletableFuture.completedStage(Tuple.empty()); + public CompletionStage> openTransaction() { + return CompletableFuture.completedStage(new Transaction<>()); } @Override - public CompletionStage commitOrRollback(Option of, Tuple0 tx) { - return CompletionStages.empty(); + public CompletionStage> markAsPublished(Transaction tx, EventEnvelope eventEnvelope) { + return markAsPublishedTx.get().thenCompose(any -> { + tx.toPublish().add(eventEnvelope); + return CompletableFuture.completedStage(eventEnvelope); + }); } @Override public CompletionStage> markAsPublished(EventEnvelope eventEnvelope) { - return CompletableFuture.completedStage( - eventEnvelope.copy().withPublished(true).build() + return markAsPublished.get().thenCompose(any -> + CompletableFuture.completedStage(store.compute(eventEnvelope.sequenceNum, (k, event) -> { + if (event == null) { + return eventEnvelope.copy().withPublished(true).build(); + } else { + return event.copy().withPublished(true).build(); + } + })) ); } @Override - public CompletionStage nextSequence(Tuple0 tx) { - return CompletableFuture.completedStage(sequence_num.incrementAndGet()); + public CompletionStage persist(Transaction transactionContext, List> events) { + return CompletableFuture.completedStage(transactionContext.addAll(events.toJavaList())); + } + + @Override + public CompletionStage commitOrRollback(Option of, Transaction tx) { + if (of.isEmpty()) { + tx.events().forEach(event -> + store.put(event.sequenceNum, event) + ); + tx.toPublish.forEach(e -> + store.computeIfPresent(e.sequenceNum, (k, event) -> event.copy().withPublished(true).build()) + ); + } + tx.events.clear(); + tx.toPublish.clear(); + return CompletableFuture.completedStage(API.Tuple()); + } + + @Override + public CompletionStage nextSequence(InMemoryEventStore.Transaction tx) { + long value = store.values().stream().map(e -> e.sequenceNum).max(Comparator.comparingLong(e -> e)).orElse(0L) + 1; + sequenceNums.incrementAndGet(); + return CompletableFuture.completedStage(sequenceNums.accumulateAndGet(value, Math::max)); } @Override public CompletionStage publish(List> events) { - events.forEach(queue::tryEmitNext); - return CompletionStages.empty(); + events.forEach(e -> store.put(e.sequenceNum, e)); + return CompletableFuture.completedStage(API.Tuple()); } @Override public Publisher> loadEvents(String id) { - return Flux.fromIterable(eventStore); + return Flux.fromIterable(store.values()).filter(e -> + e.entityId.equals(id) + ); } @Override public Publisher> loadAllEvents() { - return Flux.fromIterable(eventStore); + return Flux.fromIterable(store.values()); } @Override - public Publisher> loadEventsByQuery(Tuple0 tx, Query query) { + public Publisher> loadEventsByQuery(InMemoryEventStore.Transaction tx, Query query) { return loadEventsByQuery(query); } @Override public Publisher> loadEventsByQuery(Query query) { - return Flux.fromIterable(eventStore) + return Flux.fromIterable(store.values()) .filter(e -> Option.of(query.entityId).map(id -> id.equals(e.entityId)).getOrElse(true)); } - @Override - public CompletionStage persist(Tuple0 transactionContext, List> events) { - eventStore.addAll(events.toJavaList()); - return CompletionStages.empty(); - } @Override public EventPublisher eventPublisher() { diff --git a/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher.java b/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher.java index dc9123a9..4cca9dfb 100644 --- a/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher.java +++ b/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher.java @@ -8,12 +8,12 @@ import io.vavr.Tuple; import io.vavr.Tuple0; import io.vavr.collection.List; +import io.vavr.collection.Traversable; import io.vavr.control.Option; import org.apache.kafka.clients.producer.ProducerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.Disposable; -import reactor.core.Scannable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; @@ -29,7 +29,9 @@ import java.time.temporal.ChronoUnit; import java.util.Objects; import java.util.concurrent.CompletionStage; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; +import java.util.function.Supplier; import static fr.maif.eventsourcing.EventStore.ConcurrentReplayStrategy.WAIT; @@ -37,13 +39,13 @@ public class ReactorKafkaEventPublisher implemen private final static Logger LOGGER = LoggerFactory.getLogger(ReactorKafkaEventPublisher.class); + private AtomicBoolean stop = new AtomicBoolean(false); private final String topic; private final Sinks.Many> queue; + private final Flux> eventSource; private final SenderOptions> senderOptions; - private final Flux> eventsSource; private final Duration restartInterval; private final Duration maxRestartInterval; - private final Function>>, Flux>>>> groupFlow = it -> it.buffer(1000).map(List::ofAll); private Disposable killSwitch; private KafkaSender> kafkaSender; @@ -61,102 +63,132 @@ public ReactorKafkaEventPublisher(SenderOptions e = EventEnvelope.builder().build(); - - this.queue = Sinks.many().multicast().onBackpressureBuffer(queueBufferSize1); - this.eventsSource = queue.asFlux(); + this.queue = Sinks.many().replay().limit(queueBufferSize1); // .multicast().onBackpressureBuffer(queueBufferSize1); + this.eventSource = queue.asFlux(); this.senderOptions = senderOptions; this.kafkaSender = KafkaSender.create(senderOptions); } + record CountAndMaxSeqNum(Long count, Long lastSeqNum) { + static CountAndMaxSeqNum empty() { + return new CountAndMaxSeqNum(0L, 0L); + } + + CountAndMaxSeqNum handleSeqNum(Long lastSeqNum) { + return new CountAndMaxSeqNum(count + 1, Math.max(this.lastSeqNum, lastSeqNum)); + } + } + + + private Function, Flux>> fixedSizeGroup(int size) { + return it -> it.buffer(size).map(List::ofAll); + } + + private Function, Flux>> bufferTimeout(int size, Duration duration) { + return it -> it.bufferTimeout(size, duration, true).map(List::ofAll); + } + @Override public void start(EventStore eventStore, ConcurrentReplayStrategy concurrentReplayStrategy) { - LOGGER.info("Starting/Restarting publishing event to kafka on topic {}", topic); + LOGGER.info("Starting event publisher for topic {}", topic); Sinks.Many> logProgressSink = Sinks.many().unicast().onBackpressureBuffer(); logProgress(logProgressSink.asFlux(), 100).subscribe(); - killSwitch = Mono.fromCompletionStage(eventStore::openTransaction) - .flatMapMany(tx -> { - LOGGER.info("Replaying not published in DB for {}", topic); - ConcurrentReplayStrategy strategy = Objects.isNull(concurrentReplayStrategy) ? WAIT : concurrentReplayStrategy; - return Flux - .from(eventStore.loadEventsUnpublished(tx, strategy)) - .transform(publishToKafka(eventStore, Option.some(tx), groupFlow)) - .doOnNext(logProgressSink::tryEmitNext) - .collectList() - .flatMap(any -> Mono.fromCompletionStage(() -> { - LOGGER.info("Replaying events not published in DB is finished for {}", topic); - return eventStore.commitOrRollback(Option.none(), tx); - })) - .doOnError(e -> { - eventStore.commitOrRollback(Option.of(e), tx); - LOGGER.error("Error replaying non published events to kafka for " + topic, e); - }) - .map(__ -> Tuple.empty()); + killSwitch = Mono.defer(() -> fromCS(eventStore::openTransaction) + .flatMap(tx -> { + LOGGER.info("Replaying events not published from DB in topic {}", topic); + ConcurrentReplayStrategy strategy = Objects.isNull(concurrentReplayStrategy) ? WAIT : concurrentReplayStrategy; + return Flux + .from(eventStore.loadEventsUnpublished(tx, strategy)) + .transform(publishToKafka(eventStore, Option.some(tx), fixedSizeGroup(1000), fixedSizeGroup(1000))) + .doOnNext(logProgressSink::tryEmitNext) + .reduce(CountAndMaxSeqNum.empty(), (c, elt) -> c.handleSeqNum(elt.sequenceNum)) + .flatMap(count -> { + LOGGER.info("Replaying events not published in DB is finished for {}, {} elements published", topic, count.count); + return fromCS(() -> eventStore.commitOrRollback(Option.none(), tx)) + .thenReturn(count); + }) + .doOnError(e -> { + eventStore.commitOrRollback(Option.of(e), tx); + LOGGER.error("Error replaying non published events to kafka for " + topic, e); + }) + .flatMap(c -> { + if (c.count == 0) { + return fromCS(eventStore::lastPublishedSequence).map(l -> new CountAndMaxSeqNum(0L, l)); + } else { + return Mono.just(c); + } + }); + })) + .flux() + .concatMap(countAndLastSeqNum -> { +// Flux.defer(() -> { + LOGGER.debug("Starting consuming in memory queue for {}. Event lower than {} are ignored", topic, countAndLastSeqNum.lastSeqNum); + //return reactorQueue.asFlux() + return eventSource + .filter(e -> e.sequenceNum > countAndLastSeqNum.lastSeqNum) + .transform(publishToKafka( + eventStore, + Option.none(), + bufferTimeout(200, Duration.ofMillis(20)), + bufferTimeout(200, Duration.ofMillis(20)) + )); }) - .retryWhen(Retry.backoff(10, restartInterval) - .transientErrors(true) - .maxBackoff(maxRestartInterval) - .doBeforeRetry(ctx -> { - LOGGER.error("Error republishing events for topic %s retrying for the %s time".formatted(topic, ctx.totalRetries()), ctx.failure()); - }) - ) - .onErrorReturn(Tuple.empty()) - .switchIfEmpty(Mono.just(Tuple.empty())) - .concatMap(__ -> - this.eventsSource.transform(publishToKafka( - eventStore, - Option.none(), - it -> it - .bufferTimeout(50, Duration.ofMillis(20)) - .map(List::ofAll) - )) - ) .doOnError(e -> LOGGER.error("Error publishing events to kafka", e)) - .doOnComplete(() -> LOGGER.info("Closing publishing to {}", topic)) .retryWhen(Retry.backoff(Long.MAX_VALUE, restartInterval) .transientErrors(true) .maxBackoff(maxRestartInterval) .doBeforeRetry(ctx -> { - LOGGER.error("Error handling events for topic %s retrying for the %s time".formatted(topic, ctx.totalRetries()), ctx.failure()); + LOGGER.error("Error handling events for topic %s retrying for the %s time".formatted(topic, ctx.totalRetries() + 1), ctx.failure()); }) ) .subscribe(); } - - private Function>, Flux>> publishToKafka(EventStore eventStore, Option tx, Function>>, Flux>>>> groupFlow) { + private Function>, Flux>> publishToKafka(EventStore eventStore, + Option tx, + Function, EventEnvelope>>, Flux, EventEnvelope>>>> groupFlowForKafka, + Function>>, Flux>>>> groupFlow + ) { return it -> it .map(this::toKafkaMessage) + .transform(groupFlowForKafka) + .filter(Traversable::nonEmpty) .concatMap(events -> { LOGGER.debug("Sending event {}", events); - return kafkaSender.send(Flux.just(events)) + return kafkaSender.send(Flux.fromIterable(events)) .doOnError(e -> LOGGER.error("Error publishing to kafka ", e)); }) .transform(groupFlow) + .filter(Traversable::nonEmpty) .concatMap(m -> tx.fold( - () -> Mono.fromCompletionStage(() -> eventStore.markAsPublished(m.map(SenderResult::correlationMetadata))), - txCtx -> Mono.fromCompletionStage(() -> eventStore.markAsPublished(txCtx, m.map(SenderResult::correlationMetadata))) + () -> fromCS(() -> eventStore.markAsPublished(m.map(SenderResult::correlationMetadata))), + txCtx -> fromCS(() -> eventStore.markAsPublished(txCtx, m.map(SenderResult::correlationMetadata))) ) ) .flatMapIterable(e -> e); } + static Mono fromCS(Supplier> cs) { + return Mono.fromFuture(() -> cs.get().toCompletableFuture()); + } + @Override public CompletionStage publish(List> events) { - LOGGER.debug("Publishing event in memory : \n{} ", events); return Flux .fromIterable(events) .map(t -> { - try { - queue.emitNext(t, Sinks.EmitFailureHandler.busyLooping(Duration.ofSeconds(1))); + queue.tryEmitNext(t).orThrow(); return Tuple.empty(); - } catch (Exception e) { - LOGGER.error("Error publishing to topic %s".formatted(topic), e); - return Tuple.empty(); - } }) + .retryWhen(Retry.fixedDelay(50, Duration.ofMillis(1)) + .transientErrors(true) + .doBeforeRetry(ctx -> { + LOGGER.error("Error publishing events in memory queue for topic %s retrying for the %s time".formatted(topic, ctx.totalRetries() + 1), ctx.failure()); + }) + ) + .onErrorResume(e -> Mono.just(Tuple.empty())) .collectList() .thenReturn(Tuple.empty()) .toFuture(); @@ -164,6 +196,7 @@ public CompletionStage publish(List> eve @Override public void close() throws IOException { + stop.set(true); if (Objects.nonNull(killSwitch) && !killSwitch.isDisposed()) { try { this.killSwitch.dispose(); @@ -199,7 +232,8 @@ private Flux logProgress(Flux logProgress, int every) { } public Integer getBufferedElementCount() { - return this.queue.scan(Scannable.Attr.BUFFERED); +// return this.queue.scan(Scannable.Attr.BUFFERED); + return 0; } } diff --git a/thoth-core-reactor/src/test/java/fr/maif/Helpers.java b/thoth-core-reactor/src/test/java/fr/maif/Helpers.java index 29c0d587..8f6e59fa 100644 --- a/thoth-core-reactor/src/test/java/fr/maif/Helpers.java +++ b/thoth-core-reactor/src/test/java/fr/maif/Helpers.java @@ -18,6 +18,8 @@ import fr.maif.json.JsonRead; import fr.maif.json.JsonSchema; import fr.maif.json.JsonWrite; +import fr.maif.reactor.eventsourcing.InMemoryEventStore; +import fr.maif.reactor.eventsourcing.InMemoryEventStore.Transaction; import io.vavr.API; import io.vavr.Lazy; import io.vavr.Tuple; @@ -301,10 +303,10 @@ public Option applyEvent(Option state, VikingEvent event) { } } - public static class VikingCommandHandler implements CommandHandler { + public static class VikingCommandHandler implements CommandHandler> { @Override - public CompletionStage>> handleCommand(Tuple0 unit, Option state, VikingCommand vikingCommand) { + public CompletionStage>> handleCommand(Transaction unit, Option state, VikingCommand vikingCommand) { return CompletableFuture.completedStage( Match(vikingCommand).of( Case(VikingCommand.CreateVikingV1.pattern(), e -> events("C", new VikingEvent.VikingCreated(e.id, e.name, e.age))), @@ -355,7 +357,7 @@ public JsonWrite eventWrite() { } - public static class VikingSnapshot implements SnapshotStore { + public static class VikingSnapshot implements SnapshotStore> { public ConcurrentHashMap data = new ConcurrentHashMap<>(); @@ -366,12 +368,12 @@ public CompletionStage> getSnapshot(String entityId) { } @Override - public CompletionStage> getSnapshot(Tuple0 transactionContext, String entityId) { + public CompletionStage> getSnapshot(Transaction transactionContext, String entityId) { return CompletableFuture.completedStage(Option.of(data.get(entityId))); } @Override - public CompletionStage persist(Tuple0 transactionContext, String id, Option state) { + public CompletionStage persist(Transaction transactionContext, String id, Option state) { Match(state).of( Case($Some($()), s -> data.put(s.id, s)), Case($None(), () -> data.remove(id)) @@ -380,11 +382,11 @@ public CompletionStage persist(Tuple0 transactionContext, String id, Opt } } - public static class VikingProjection implements Projection { + public static class VikingProjection implements Projection, VikingEvent, Tuple0, Tuple0> { public ConcurrentHashMap data = new ConcurrentHashMap<>(); @Override - public CompletionStage storeProjection(Tuple0 unit, List> events) { + public CompletionStage storeProjection(Transaction unit, List> events) { events.forEach(event -> { int i = data.getOrDefault(event.entityId, 0) + events.size(); data.put(event.entityId, i); diff --git a/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/DefaultAggregateStoreTest.java b/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/DefaultAggregateStoreTest.java index 80ce73f0..4d7a36ca 100644 --- a/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/DefaultAggregateStoreTest.java +++ b/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/DefaultAggregateStoreTest.java @@ -7,6 +7,7 @@ import fr.maif.eventsourcing.EventEnvelope; import fr.maif.eventsourcing.EventStore; import fr.maif.eventsourcing.EventStore.Query; +import fr.maif.reactor.eventsourcing.InMemoryEventStore.Transaction; import io.vavr.Tuple; import io.vavr.Tuple0; import io.vavr.collection.List; @@ -63,36 +64,36 @@ class DefaultAggregateStoreTest { @Test void testReloadEventAndBuildAggregateWithoutSnapshots() { - EventStore eventStore = mock(EventStore.class); - DefaultAggregateStore aggregateStore = new DefaultAggregateStore<>(eventStore, new Helpers.VikingEventHandler(), new EventProcessorTest.FakeTransactionManager()); + EventStore, VikingEvent, Tuple0, Tuple0> eventStore = mock(EventStore.class); + var aggregateStore = new DefaultAggregateStore<>(eventStore, new Helpers.VikingEventHandler(), new EventProcessorTest.FakeTransactionManager()); Query query = Query.builder().withEntityId(entityId).build(); - when(eventStore.loadEventsByQuery(Tuple(), query)).thenReturn(Flux.fromIterable(List.of(eventEnvelope1, eventEnvelope2))); + when(eventStore.loadEventsByQuery(any(), eq(query))).thenReturn(Flux.fromIterable(List.of(eventEnvelope1, eventEnvelope2))); - Option vikings = aggregateStore.getAggregate(Tuple.empty(), entityId).toCompletableFuture().join(); + Option vikings = aggregateStore.getAggregate(Transaction.newTx(), entityId).toCompletableFuture().join(); Assertions.assertThat(vikings).isEqualTo(Some(new Viking(entityId, "Ragnar Lodbrock", 30, 2L))); - verify(eventStore, times(1)).loadEventsByQuery(Tuple(), query); + verify(eventStore, times(1)).loadEventsByQuery(any(), eq(query)); } @Test void testReloadEventAndBuildAggregateWithSnapshots() { - EventStore eventStore = mock(EventStore.class); - DefaultAggregateStore aggregateStore = spy(new DefaultAggregateStore(eventStore, new Helpers.VikingEventHandler(), new EventProcessorTest.FakeTransactionManager()) { + EventStore, VikingEvent, Tuple0, Tuple0> eventStore = mock(EventStore.class); + DefaultAggregateStore> aggregateStore = spy(new DefaultAggregateStore>(eventStore, new Helpers.VikingEventHandler(), new EventProcessorTest.FakeTransactionManager()) { @Override - public CompletionStage> getSnapshot(Tuple0 transactionContext, String id) { + public CompletionStage> getSnapshot(Transaction transactionContext, String id) { return CompletionStages.successful(Option.some(new Viking(id, "Rollo", 30, 1L))); } }); Query query = Query.builder().withEntityId(entityId).withSequenceFrom(1L).build(); - when(eventStore.loadEventsByQuery(Tuple(), query)).thenReturn(Flux.fromIterable(List.of(eventEnvelope2))); + when(eventStore.loadEventsByQuery(any(), eq(query))).thenReturn(Flux.fromIterable(List.of(eventEnvelope2))); - Option vikings = aggregateStore.getAggregate(Tuple.empty(), entityId).toCompletableFuture().join(); + Option vikings = aggregateStore.getAggregate(Transaction.newTx(), entityId).toCompletableFuture().join(); Assertions.assertThat(vikings).isEqualTo(Some(new Viking(entityId, "Ragnar Lodbrock", 30, 2L))); - verify(eventStore, times(1)).loadEventsByQuery(Tuple(), query); + verify(eventStore, times(1)).loadEventsByQuery(any(), eq(query)); verify(aggregateStore, times(1)).getSnapshot(any(), eq(entityId)); } } \ No newline at end of file diff --git a/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/EventProcessorTest.java b/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/EventProcessorTest.java index 38c4baaa..a2684514 100644 --- a/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/EventProcessorTest.java +++ b/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/EventProcessorTest.java @@ -1,15 +1,7 @@ package fr.maif.reactor.eventsourcing; -import fr.maif.eventsourcing.AggregateStore; -import fr.maif.eventsourcing.CommandHandler; -import fr.maif.eventsourcing.EventEnvelope; -import fr.maif.eventsourcing.EventProcessorImpl; -import fr.maif.eventsourcing.Events; -import fr.maif.eventsourcing.ProcessingSuccess; -import fr.maif.eventsourcing.Projection; -import fr.maif.eventsourcing.TransactionManager; -import fr.maif.reactor.eventsourcing.DefaultAggregateStore; -import fr.maif.reactor.eventsourcing.InMemoryEventStore; +import fr.maif.eventsourcing.*; +import fr.maif.reactor.eventsourcing.InMemoryEventStore.Transaction; import io.vavr.API; import io.vavr.Tuple; import io.vavr.Tuple0; @@ -24,18 +16,13 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; -import static fr.maif.Helpers.Viking; -import static fr.maif.Helpers.VikingCommand; -import static fr.maif.Helpers.VikingCommand.CreateViking; -import static fr.maif.Helpers.VikingCommand.DeleteViking; -import static fr.maif.Helpers.VikingCommand.UpdateViking; -import static fr.maif.Helpers.VikingCommandHandler; -import static fr.maif.Helpers.VikingEvent; -import static fr.maif.Helpers.VikingEventHandler; -import static fr.maif.Helpers.VikingProjection; -import static fr.maif.Helpers.VikingSnapshot; +import static fr.maif.Helpers.*; +import static fr.maif.Helpers.VikingCommand.*; import static io.vavr.API.Some; +import static org.assertj.core.api.Assertions.anyOf; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; public class EventProcessorTest { @@ -46,7 +33,7 @@ public void oneCommandShouldGenerateEventAndPersistState() { //Set up VikingSnapshot vikingSnapshot = new VikingSnapshot(); InMemoryEventStore inMemoryEventStore = InMemoryEventStore.create(); - EventProcessorImpl vikingEventProcessor = vikingEventProcessor(inMemoryEventStore, vikingSnapshot); + var vikingEventProcessor = vikingEventProcessor(inMemoryEventStore, vikingSnapshot); //Test Either> result = vikingEventProcessor.processCommand(new CreateViking("1", "ragnar", 30)).toCompletableFuture().join(); @@ -87,7 +74,7 @@ public void oneCommandShouldGenerateEventAndPersistProjection() { //Set up InMemoryEventStore inMemoryEventStore = InMemoryEventStore.create(); VikingProjection projection = new VikingProjection(); - EventProcessorImpl vikingEventProcessor = vikingEventProcessorWithProjection(inMemoryEventStore, List.of(projection)); + var vikingEventProcessor = vikingEventProcessorWithProjection(inMemoryEventStore, List.of(projection)); //Test Either> result = vikingEventProcessor.processCommand(new CreateViking("1", "ragnar", 30)).toCompletableFuture().join(); @@ -119,7 +106,7 @@ public void oneCommandShouldGenerateEventAndPersistProjection() { assertThat(eventsFromJournal).containsExactly(expectedEnvelope); - assertThat(vikingEventProcessor.getAggregateStore().getAggregate(Tuple.empty(), "1").toCompletableFuture().join()).isEqualTo(Some(expected)); + assertThat(vikingEventProcessor.getAggregateStore().getAggregate(Transaction.newTx(), "1").toCompletableFuture().join()).isEqualTo(Some(expected)); Assertions.assertThat(projection.data.get("1")).isEqualTo(1); } @@ -128,7 +115,7 @@ public void oneCommandShouldGenerateEventAndPersistProjection() { public void twoCommandShouldGenerateEventAndPersistState() { VikingSnapshot vikingSnapshot = new VikingSnapshot(); InMemoryEventStore inMemoryEventStore = InMemoryEventStore.create(); - EventProcessorImpl vikingEventProcessor = vikingEventProcessor(inMemoryEventStore, vikingSnapshot); + var vikingEventProcessor = vikingEventProcessor(inMemoryEventStore, vikingSnapshot); vikingEventProcessor.processCommand(new VikingCommand.CreateViking("1", "ragnar", 30)).toCompletableFuture().join(); Either> result = vikingEventProcessor.processCommand(new UpdateViking("1", "Ragnar Lodbrock", 30)).toCompletableFuture().join(); @@ -181,7 +168,7 @@ public void twoCommandShouldGenerateEventAndPersistState() { public void createAndDeleteShouldGenerateEventAndPersistState() { VikingSnapshot vikingSnapshot = new VikingSnapshot(); InMemoryEventStore inMemoryEventStore = InMemoryEventStore.create(); - EventProcessorImpl vikingEventProcessor = vikingEventProcessor(inMemoryEventStore, vikingSnapshot); + var vikingEventProcessor = vikingEventProcessor(inMemoryEventStore, vikingSnapshot); vikingEventProcessor.processCommand(new VikingCommand.CreateViking("1", "ragnar", 30)).toCompletableFuture().join(); Either> result = vikingEventProcessor.processCommand(new DeleteViking("1")).toCompletableFuture().join(); @@ -233,7 +220,7 @@ public void createAndDeleteShouldGenerateEventAndPersistState() { public void multipleCommandsOnSameEntityOnSameBatch() { VikingSnapshot vikingSnapshot = new VikingSnapshot(); InMemoryEventStore inMemoryEventStore = InMemoryEventStore.create(); - EventProcessorImpl vikingEventProcessor = vikingEventProcessor(inMemoryEventStore, vikingSnapshot); + var vikingEventProcessor = vikingEventProcessor(inMemoryEventStore, vikingSnapshot); var result = vikingEventProcessor.batchProcessCommand(API.List( new VikingCommand.CreateViking("1", "ragnar", 30), @@ -254,7 +241,7 @@ public void multipleCommandsOnSameEntityOnSameBatch() { } - private EventProcessorImpl vikingEventProcessor(InMemoryEventStore inMemoryEventStore, VikingSnapshot vikingSnapshot) { + private EventProcessorImpl, String, Tuple0, Tuple0> vikingEventProcessor(InMemoryEventStore inMemoryEventStore, VikingSnapshot vikingSnapshot) { return new EventProcessorImpl<>( inMemoryEventStore, new FakeTransactionManager(), @@ -266,7 +253,7 @@ private EventProcessorImpl vikingEventProcessor(CommandHandler commandHandler, InMemoryEventStore inMemoryEventStore, VikingSnapshot vikingSnapshot) { + private EventProcessorImpl, String, Tuple0, Tuple0> vikingEventProcessor(CommandHandler> commandHandler, InMemoryEventStore inMemoryEventStore, VikingSnapshot vikingSnapshot) { return new EventProcessorImpl<>( inMemoryEventStore, new FakeTransactionManager(), @@ -277,7 +264,7 @@ private EventProcessorImpl vikingEventProcessorWithProjection(InMemoryEventStore inMemoryEventStore, List> projections) { + private EventProcessorImpl, String, Tuple0, Tuple0> vikingEventProcessorWithProjection(InMemoryEventStore inMemoryEventStore, List, VikingEvent, Tuple0, Tuple0>> projections) { VikingEventHandler vikingEventHandler = new VikingEventHandler(); FakeTransactionManager fakeTransactionManager = new FakeTransactionManager(); return new EventProcessorImpl<>( @@ -291,7 +278,7 @@ private EventProcessorImpl vikingEventProcessorWithSnapshot(InMemoryEventStore inMemoryEventStore, AggregateStore aggregateStore, List> projections) { + private EventProcessorImpl, String, Tuple0, Tuple0> vikingEventProcessorWithSnapshot(InMemoryEventStore inMemoryEventStore, AggregateStore> aggregateStore, List, VikingEvent, Tuple0, Tuple0>> projections) { return new EventProcessorImpl<>( inMemoryEventStore, new FakeTransactionManager(), @@ -303,13 +290,14 @@ private EventProcessorImpl { + public static class FakeTransactionManager implements TransactionManager> { private AtomicInteger counter = new AtomicInteger(0); + @Override - public CompletionStage withTransaction(Function> callBack) { - return callBack.apply(Tuple.empty()); + public CompletionStage withTransaction(Function, CompletionStage> callBack) { + return callBack.apply(new Transaction<>()); } @Override diff --git a/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/KafkaEventPublisherTest.java b/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/KafkaEventPublisherTest.java index ebf7d4f9..0ab5ad1e 100644 --- a/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/KafkaEventPublisherTest.java +++ b/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/KafkaEventPublisherTest.java @@ -4,7 +4,6 @@ import fr.maif.concurrent.CompletionStages; import fr.maif.eventsourcing.Event; import fr.maif.eventsourcing.EventEnvelope; -import fr.maif.eventsourcing.EventStore; import fr.maif.eventsourcing.Type; import fr.maif.eventsourcing.format.JacksonEventFormat; import fr.maif.eventsourcing.format.JacksonSimpleFormat; @@ -14,8 +13,6 @@ import fr.maif.kafka.JsonSerializer; import fr.maif.reactor.KafkaContainerTest; import io.vavr.API; -import io.vavr.Tuple; -import io.vavr.Tuple0; import io.vavr.collection.List; import io.vavr.control.Either; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -28,7 +25,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mockito; -import org.reactivestreams.Publisher; import org.testcontainers.junit.jupiter.Testcontainers; import reactor.core.publisher.Flux; import reactor.kafka.receiver.KafkaReceiver; @@ -44,13 +40,15 @@ import java.util.StringJoiner; import java.util.UUID; import java.util.concurrent.CompletionStage; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; import static fr.maif.eventsourcing.EventStore.ConcurrentReplayStrategy.NO_STRATEGY; import static fr.maif.eventsourcing.EventStore.ConcurrentReplayStrategy.SKIP; +import static io.vavr.API.List; import static io.vavr.API.println; +import static java.util.function.Function.identity; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.*; @@ -66,6 +64,7 @@ public static void setUp() { @BeforeEach @AfterEach void cleanUpInit() { + sequence.set(0); deleteTopics(); } @@ -76,17 +75,11 @@ public void eventConsumption() throws IOException, InterruptedException { String topic = createTopic("eventConsumption", 5, 1); ReactorKafkaEventPublisher publisher = createPublisher(topic); - EventStore eventStore = mock(EventStore.class); + InMemoryEventStore eventStore = spy(new InMemoryEventStore<>()); - when(eventStore.openTransaction()).thenReturn(CompletionStages.successful(Tuple.empty())); - when(eventStore.commitOrRollback(any(), any())).thenReturn(CompletionStages.empty()); - when(eventStore.loadEventsUnpublished(any(), any())).thenReturn(emptyTxStream()); - when(eventStore.markAsPublished(Mockito.>>any())).then(i -> CompletionStages.successful(i.getArgument(0))); - when(eventStore.markAsPublished(any(), Mockito.>>any())).then(i -> CompletionStages.successful(i.getArgument(0))); - - EventEnvelope envelope1 = eventEnvelope("value 1"); - EventEnvelope envelope2 = eventEnvelope("value 2"); - EventEnvelope envelope3 = eventEnvelope("value 3"); + EventEnvelope envelope1 = eventEnvelopeUnpublished("value 1"); + EventEnvelope envelope2 = eventEnvelopeUnpublished("value 2"); + EventEnvelope envelope3 = eventEnvelopeUnpublished("value 3"); publisher.start(eventStore, NO_STRATEGY); @@ -99,14 +92,15 @@ public void eventConsumption() throws IOException, InterruptedException { .receive() .map(ConsumerRecord::value) .map(KafkaEventPublisherTest::deserialize) - .take(3) + .bufferTimeout(50, Duration.ofSeconds(4)) .map(e -> { println(e); return e; }) - .timeout(Duration.of(60, ChronoUnit.SECONDS)) + .take(1) + .timeout(Duration.of(30, ChronoUnit.SECONDS)) .collectList() - .map(List::ofAll) + .map(l -> List.ofAll(l).flatMap(identity())) .toFuture(); publisher.publish(List.of( @@ -125,31 +119,18 @@ public void eventConsumption() throws IOException, InterruptedException { publisher.close(); } - private Publisher emptyTxStream() { - return Flux.empty(); - } - - private Publisher txStream(T... values) { - return Flux.fromIterable(List.of(values)); - } - - @Test @SuppressWarnings("unchecked") public void eventConsumptionWithEventFromDb() throws IOException, InterruptedException { String topic = createTopic("eventConsumptionWithEventFromDb", 5, 1); + ReactorKafkaEventPublisher publisher = createPublisher(topic); - EventStore eventStore = mock(EventStore.class); - when(eventStore.openTransaction()).thenReturn(CompletionStages.successful(Tuple.empty())); - when(eventStore.commitOrRollback(any(), any())).thenReturn(CompletionStages.empty()); - when(eventStore.loadEventsUnpublished(any(), any())).thenReturn(txStream( - eventEnvelope("value 1"), - eventEnvelope("value 2"), - eventEnvelope("value 3") + InMemoryEventStore eventStore = spy(new InMemoryEventStore<>( + eventEnvelopeUnpublished("value 1"), + eventEnvelopeUnpublished("value 2"), + eventEnvelopeUnpublished("value 3") )); - when(eventStore.markAsPublished(eq(Tuple.empty()), Mockito.>>any())).then(i -> CompletionStages.successful(i.getArgument(1))); - when(eventStore.markAsPublished(Mockito.>>any())).then(i -> CompletionStages.successful(i.getArgument(0))); publisher.start(eventStore, NO_STRATEGY); @@ -162,16 +143,17 @@ public void eventConsumptionWithEventFromDb() throws IOException, InterruptedExc ) .receive() .map(ConsumerRecord::value) - .take(6) - .timeout(Duration.of(20, ChronoUnit.SECONDS)) + .bufferTimeout(50, Duration.ofSeconds(4)) + .take(1) + .timeout(Duration.of(30, ChronoUnit.SECONDS)) .collectList() - .map(List::ofAll) + .map(l -> List.ofAll(l).flatMap(identity())) .toFuture(); publisher.publish(List.of( - eventEnvelope("value 4"), - eventEnvelope("value 5"), - eventEnvelope("value 6") + eventEnvelopeUnpublished("value 4"), + eventEnvelopeUnpublished("value 5"), + eventEnvelopeUnpublished("value 6") )).toCompletableFuture().join(); List events = results.toCompletableFuture().join(); @@ -184,81 +166,136 @@ public void eventConsumptionWithEventFromDb() throws IOException, InterruptedExc publisher.close(); } - @Test @SuppressWarnings("unchecked") - public void testRestart() throws IOException, InterruptedException { - AtomicBoolean failed = new AtomicBoolean(false); + public void testRestartWithMock() throws IOException, InterruptedException { + + AtomicInteger failed = new AtomicInteger(0); AtomicInteger streamCount = new AtomicInteger(0); String topic = createTopic("testRestart", 5, 1); ReactorKafkaEventPublisher publisher = createPublisher(topic); - EventStore eventStore = mock(EventStore.class); - when(eventStore.openTransaction()).thenReturn(CompletionStages.successful(Tuple.empty())); - when(eventStore.commitOrRollback(any(), any())).thenReturn(CompletionStages.empty()); - - EventEnvelope envelope1 = eventEnvelope("value 1"); - EventEnvelope envelope2 = eventEnvelope("value 2"); - EventEnvelope envelope3 = eventEnvelope("value 3"); - EventEnvelope envelope4 = eventEnvelope("value 4"); - EventEnvelope envelope5 = eventEnvelope("value 5"); - EventEnvelope envelope6 = eventEnvelope("value 6"); - - when(eventStore.loadEventsUnpublished(any(), any())) - .thenReturn(txStream(envelope1, envelope2, envelope3)) - .thenReturn(txStream(envelope1, envelope2, envelope3)) - .thenReturn(emptyTxStream()); - when(eventStore.markAsPublished(Mockito.>>any())).thenAnswer(in -> CompletionStages.successful(in.getArgument(0))); - when(eventStore.markAsPublished(any(), Mockito.>>any())) - .then(i -> { - if (failed.getAndSet(true)) { - return CompletionStages.successful(i.getArgument(1)); + + Supplier>> eventsFlux = () -> KafkaReceiver + .create(receiverDefault() + .consumerProperty(ConsumerConfig.GROUP_ID_CONFIG, "testRestart") + .consumerProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + .consumerProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + .subscription(List.of(topic).toJavaList())) + .receive() + .doOnNext(e -> e.receiverOffset().acknowledge()) + .map(ConsumerRecord::value) + .map(KafkaEventPublisherTest::deserialize); + + EventEnvelope envelope1 = eventEnvelopeUnpublished("value 1"); + EventEnvelope envelope2 = eventEnvelopeUnpublished("value 2"); + EventEnvelope envelope3 = eventEnvelopeUnpublished("value 3"); + EventEnvelope envelope4 = eventEnvelopeUnpublished("value 4"); + EventEnvelope envelope5 = eventEnvelopeUnpublished("value 5"); + EventEnvelope envelope6 = eventEnvelopeUnpublished("value 6"); + EventEnvelope envelope7 = eventEnvelopeUnpublished("value 7"); + EventEnvelope envelope8 = eventEnvelopeUnpublished("value 8"); + EventEnvelope envelope9 = eventEnvelopeUnpublished("value 9"); + EventEnvelope envelope10 = eventEnvelopeUnpublished("value 10"); + EventEnvelope envelope11 = eventEnvelopeUnpublished("value 11"); + EventEnvelope envelope12 = eventEnvelopeUnpublished("value 12"); + + InMemoryEventStore eventStore = spy(new InMemoryEventStore<>( + () -> { + if (failed.incrementAndGet() > 1) { + return CompletionStages.successful(API.Tuple()); } else { - return CompletionStages.failed(new RuntimeException("Oups")); + return CompletionStages.failed(new RuntimeException("Oups "+failed.get())); } - }); - when(eventStore.markAsPublished(Mockito.>>any())) - .then(i -> { - if (streamCount.incrementAndGet() == 2) { - return CompletionStages.failed(new RuntimeException("Oups")); + }, () -> { + int count = streamCount.incrementAndGet(); + if (count == 1) { + return CompletionStages.failed(new RuntimeException("Oups stream "+count)); } else { - return CompletionStages.successful(i.getArgument(0)); + return CompletionStages.successful(API.Tuple()); } - }); + }, + envelope1, envelope2, envelope3 + )); publisher.start(eventStore, SKIP); Thread.sleep(200); - CompletionStage>> results = - KafkaReceiver.create(receiverDefault() - .consumerProperty(ConsumerConfig.GROUP_ID_CONFIG, "testRestart") - .consumerProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") - .subscription(List.of(topic).toJavaList())) - .receive() - .map(ConsumerRecord::value) - .map(KafkaEventPublisherTest::deserialize) - .take(8) + + CompletionStage>> results = eventsFlux.get() + .bufferTimeout(50, Duration.ofSeconds(4)) + .take(1) .timeout(Duration.of(30, ChronoUnit.SECONDS)) .collectList() - .map(List::ofAll) + .map(l -> List.ofAll(l).flatMap(identity())) .toFuture(); - - publisher.publish(API.List(envelope4, envelope5, envelope6)); + List> toPublish = List(envelope4, envelope5, envelope6); + eventStore.publish(toPublish); + publisher.publish(toPublish); List> events = results.toCompletableFuture().join(); - assertThat(events).hasSize(8); - println(events.mkString("\n")); - assertThat(events).containsExactly(envelope1, envelope2, envelope3, envelope1, envelope2, envelope3, envelope4, envelope5); - + assertThat(events).usingRecursiveFieldByFieldElementComparator().containsExactly( + // Event that were in store when publisher started + envelope1, envelope2, envelope3, + // First transaction failed so, events were replayed + envelope1, envelope2, envelope3, + // Inqueued event were published but transaction failed + // So events were replayed + envelope4, envelope5, envelope6); + assertThat(eventStore.store.values()).containsExactly(published(envelope1, envelope2, envelope3, envelope4, envelope5, envelope6)); +// + verify(eventStore, times(2)).openTransaction(); verify(eventStore, times(2)).markAsPublished(any(), Mockito.>>any()); + verify(eventStore, times(0)).markAsPublished(Mockito.>>any()); + + + + List> toPublishFailInMemory = List(envelope7, envelope8, envelope9); + eventStore.publish(toPublishFailInMemory); + publisher.publish(toPublishFailInMemory); + List> resultsAfterCrash = eventsFlux.get() + .bufferTimeout(50, Duration.ofSeconds(10)) + .take(1) + .timeout(Duration.of(30, ChronoUnit.SECONDS)) + .collectList() + .map(l -> List.ofAll(l).flatMap(identity())) + .block(); + + println(resultsAfterCrash.mkString("\n")); + assertThat(resultsAfterCrash).contains(envelope7, envelope8, envelope9); + verify(eventStore, times(3)).openTransaction(); + verify(eventStore, times(3)).markAsPublished(any(), Mockito.>>any()); + verify(eventStore, times(1)).markAsPublished(Mockito.>>any()); + + List> toPublishFailAtTheEnd = List(envelope10, envelope11, envelope12); + publisher.publish(toPublishFailAtTheEnd); + + List> resultsAfterCrashInMemory = eventsFlux.get() + .bufferTimeout(50, Duration.ofSeconds(10)) + .take(1) + .timeout(Duration.of(30, ChronoUnit.SECONDS)) + .collectList() + .map(l -> List.ofAll(l).flatMap(identity())) + .block(); + + println(resultsAfterCrashInMemory.mkString("\n")); + assertThat(resultsAfterCrashInMemory).containsExactly(envelope10, envelope11, envelope12); + + verify(eventStore, times(3)).openTransaction(); + verify(eventStore, times(3)).markAsPublished(any(), Mockito.>>any()); + verify(eventStore, times(2)).markAsPublished(Mockito.>>any()); publisher.close(); } + @SafeVarargs + private EventEnvelope[] published(EventEnvelope... envelopes) { + return List.of(envelopes).map(e -> e.copy().withPublished(true).build()).toJavaArray(EventEnvelope[]::new); + } private static EventEnvelope deserialize(String event) { return EventEnvelopeJson.deserialize(event, new TestEventSerializer(), JacksonSimpleFormat.empty(), JacksonSimpleFormat.empty(), (s, o) -> { @@ -269,7 +306,7 @@ private static EventEnvelope deserialize(String event) { static AtomicLong sequence = new AtomicLong(); - private EventEnvelope eventEnvelope(String value) { + private EventEnvelope eventEnvelopeUnpublished(String value) { long sequenceNum = sequence.incrementAndGet(); String entityId = "entityId"; return EventEnvelope.builder() @@ -277,6 +314,7 @@ private EventEnvelope eventEnvelope(String value) { .withId(UUID.randomUUID()) .withEntityId(entityId) .withSequenceNum(sequenceNum) + .withPublished(false) .withEvent(new TestEvent(value, entityId)) .build(); } diff --git a/thoth-core/src/main/java/fr/maif/eventsourcing/EventStore.java b/thoth-core/src/main/java/fr/maif/eventsourcing/EventStore.java index b66df349..93499169 100644 --- a/thoth-core/src/main/java/fr/maif/eventsourcing/EventStore.java +++ b/thoth-core/src/main/java/fr/maif/eventsourcing/EventStore.java @@ -15,6 +15,8 @@ public interface EventStore { CompletionStage persist(TxCtx transactionContext, List> events); + CompletionStage lastPublishedSequence(); + Publisher> loadEventsUnpublished(TxCtx tx, ConcurrentReplayStrategy concurrentReplayStrategy); Publisher> loadEventsByQuery(TxCtx tx, Query query); diff --git a/thoth-jooq-akka/build.gradle b/thoth-jooq-akka/build.gradle index 0bb016cb..ee74971b 100644 --- a/thoth-jooq-akka/build.gradle +++ b/thoth-jooq-akka/build.gradle @@ -16,17 +16,12 @@ dependencies { testImplementation("fr.maif:jooq-async-reactive:$jooqAsyncVersion") testImplementation("org.assertj:assertj-core:3.10.0") testImplementation("org.postgresql:postgresql:42.2.5") - testImplementation("org.junit.platform:junit-platform-launcher:1.4.2") - testImplementation("org.junit.platform:junit-platform-commons:1.4.2") - testImplementation("org.junit.jupiter:junit-jupiter-engine:5.4.2") - testImplementation("org.junit.vintage:junit-vintage-engine:5.4.2") testImplementation("net.aichler:jupiter-interface:0.9.1") testImplementation("org.mockito:mockito-all:1.10.19") + testImplementation("org.junit.jupiter:junit-jupiter:5.9.3") + testRuntimeOnly 'org.junit.platform:junit-platform-launcher' } test { - useTestNG { - useDefaultListeners = true // Tells TestNG to execute its default reporting structure - suites 'src/test/resources/testng.xml' //location of our suite.xml - } + useJUnitPlatform() } diff --git a/thoth-jooq-akka/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java b/thoth-jooq-akka/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java index f6dd2794..b7d53fd3 100644 --- a/thoth-jooq-akka/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java +++ b/thoth-jooq-akka/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java @@ -43,8 +43,7 @@ import java.util.concurrent.CompletionStage; import static java.util.function.Function.identity; -import static org.jooq.impl.DSL.field; -import static org.jooq.impl.DSL.table; +import static org.jooq.impl.DSL.*; public class ReactivePostgresEventStore implements EventStore, Closeable { @@ -209,6 +208,15 @@ public Publisher> loadEventsUnpublished(PgAsyncT })).map(this::rsToEnvelope).runWith(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), system); } + @Override + public CompletionStage lastPublishedSequence() { + return this.pgAsyncPool.queryOne(dsl -> dsl. + select(max(SEQUENCE_NUM).as("max")) + .from(table(this.tableNames.tableName)) + .where(PUBLISHED.eq(true)) + ).thenApply(mayBe -> mayBe.map(q -> q.get(0, Long.class)).getOrElse(0L)); + } + @Override public Publisher> loadEventsByQuery(PgAsyncTransaction tx, Query query) { diff --git a/thoth-jooq-reactor/build.gradle b/thoth-jooq-reactor/build.gradle index 03604510..e48cd4d9 100644 --- a/thoth-jooq-reactor/build.gradle +++ b/thoth-jooq-reactor/build.gradle @@ -18,17 +18,11 @@ dependencies { testImplementation("fr.maif:jooq-async-reactive:$jooqAsyncVersion") testImplementation("org.assertj:assertj-core:3.10.0") testImplementation("org.postgresql:postgresql:42.2.5") - testImplementation("org.junit.platform:junit-platform-launcher:1.4.2") - testImplementation("org.junit.platform:junit-platform-commons:1.4.2") - testImplementation("org.junit.jupiter:junit-jupiter-engine:5.4.2") - testImplementation("org.junit.vintage:junit-vintage-engine:5.4.2") - testImplementation("net.aichler:jupiter-interface:0.9.1") testImplementation("org.mockito:mockito-all:1.10.19") + testImplementation("org.junit.jupiter:junit-jupiter:5.9.3") + testRuntimeOnly 'org.junit.platform:junit-platform-launcher' } test { - useTestNG { - useDefaultListeners = true // Tells TestNG to execute its default reporting structure - suites 'src/test/resources/testng.xml' //location of our suite.xml - } + useJUnitPlatform() } diff --git a/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java b/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java index 5173af1a..377927d3 100644 --- a/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java +++ b/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java @@ -18,13 +18,7 @@ import io.vavr.control.Either; import io.vavr.control.Option; import io.vavr.control.Try; -import org.jooq.Condition; -import org.jooq.Converter; -import org.jooq.DSLContext; -import org.jooq.Field; -import org.jooq.JSONB; -import org.jooq.Record15; -import org.jooq.SelectSeekStep1; +import org.jooq.*; import org.jooq.impl.SQLDataType; import org.reactivestreams.Publisher; import org.slf4j.LoggerFactory; @@ -41,8 +35,7 @@ import java.util.function.Function; import static java.util.function.Function.identity; -import static org.jooq.impl.DSL.field; -import static org.jooq.impl.DSL.table; +import static org.jooq.impl.DSL.*; public class ReactivePostgresEventStore implements EventStore, Closeable { @@ -102,6 +95,11 @@ public CompletionStage execute(Function count(Function>> queryFunction) { + return pgAsyncPool.queryOne(queryFunction).thenApply(opt -> opt.flatMap(qr -> Option.of(qr.get(0, Long.class))).getOrElse(0L)); + } + @Override public CompletionStage begin() { return pgAsyncPool.begin(); @@ -122,6 +120,11 @@ public CompletionStage execute(Function count(Function>> queryFunction) { + return pgAsyncPool.queryOne(queryFunction).thenApply(opt -> opt.flatMap(qr -> Option.of(qr.get(0, Long.class))).getOrElse(0L)); + } + @Override public CompletionStage begin() { return pgAsyncPool.beginMono().toFuture(); @@ -203,6 +206,14 @@ public CompletionStage persist(Tx transactionContext, List lastPublishedSequence() { + return this.simpleDb.count(dsl -> dsl. + select(max(SEQUENCE_NUM)) + .from(table(this.tableNames.tableName)) + .where(PUBLISHED.eq(true))); + } + @Override public Publisher> loadEventsUnpublished(Tx transaction, ConcurrentReplayStrategy concurrentReplayStrategy) { return Flux.from(transaction.stream(500, dsl -> { diff --git a/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/SimpleDb.java b/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/SimpleDb.java index 2336b3d6..11c3fb2c 100644 --- a/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/SimpleDb.java +++ b/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/SimpleDb.java @@ -2,12 +2,16 @@ import org.jooq.DSLContext; import org.jooq.Query; +import org.jooq.Record1; +import org.jooq.ResultQuery; import java.util.concurrent.CompletionStage; import java.util.function.Function; public interface SimpleDb { + CompletionStage count(Function>> queryFunction); + CompletionStage execute(Function queryFunction); CompletionStage begin(); } diff --git a/thoth-jooq-reactor/src/test/java/fr/maif/eventsourcing/AbstractPostgresEventStoreTest.java b/thoth-jooq-reactor/src/test/java/fr/maif/eventsourcing/AbstractPostgresEventStoreTest.java index 8a331946..bbd8fd40 100644 --- a/thoth-jooq-reactor/src/test/java/fr/maif/eventsourcing/AbstractPostgresEventStoreTest.java +++ b/thoth-jooq-reactor/src/test/java/fr/maif/eventsourcing/AbstractPostgresEventStoreTest.java @@ -179,6 +179,15 @@ public void queryingByPublished() { } + @Test + public void lastPublishedSeqNum() { + initDatas(); + postgresEventStore.markAsPublished(List(event1, event2, event3)).toCompletableFuture().join(); + Long lastSeqNum = postgresEventStore.lastPublishedSequence().toCompletableFuture().join(); + assertThat(lastSeqNum).isEqualTo(3L); + } + + @Test public void loadEventsUnpublished() { initDatas(); @@ -225,7 +234,7 @@ public void loadEventsUnpublishedWait() throws InterruptedException { initDatas(); CompletionStage>> first = transactionSource().flatMapMany(t -> Flux.from(postgresEventStore.loadEventsUnpublished(t, WAIT)) - .concatMap(elt -> Flux.interval(Duration.ofMillis(100), Duration.ofMillis(100)).map(__ -> elt).take(1)) + .concatMap(elt -> Mono.delay(Duration.ofMillis(120)).map(__ -> elt)) .concatMap(e -> Mono.fromCompletionStage(() -> postgresEventStore.markAsPublished(t, e).toCompletableFuture()).map(__ -> e)) .doFinally(it -> t.commit()) ).collectList().toFuture(); diff --git a/thoth-jooq/src/main/java/fr/maif/eventsourcing/impl/PostgresEventStore.java b/thoth-jooq/src/main/java/fr/maif/eventsourcing/impl/PostgresEventStore.java index 8eac89a4..2cbca43e 100644 --- a/thoth-jooq/src/main/java/fr/maif/eventsourcing/impl/PostgresEventStore.java +++ b/thoth-jooq/src/main/java/fr/maif/eventsourcing/impl/PostgresEventStore.java @@ -46,9 +46,7 @@ import static io.vavr.API.List; import static io.vavr.API.Seq; import static java.util.function.Function.identity; -import static org.jooq.impl.DSL.field; -import static org.jooq.impl.DSL.name; -import static org.jooq.impl.DSL.table; +import static org.jooq.impl.DSL.*; public class PostgresEventStore implements EventStore, Closeable { @@ -262,6 +260,15 @@ public CompletionStage> markAsPublished(Connecti return markAsPublished(tx, List(eventEnvelope)).thenApply(Traversable::head); } + @Override + public CompletionStage lastPublishedSequence() { + return sql.select(max(SEQUENCE_NUM).as("max")) + .from(table(this.tableNames.tableName)) + .where(PUBLISHED.eq(true)) + .fetchAsync(executor) + .thenApply(r -> r.getValues("max", Long.class).stream().findFirst().orElse(0L)); + } + @Override public Publisher> loadEventsUnpublished(Connection c, ConcurrentReplayStrategy concurrentReplayStrategy) { String tmpQuery = SELECT_CLAUSE + diff --git a/thoth-kafka-consumer-reactor/build.gradle b/thoth-kafka-consumer-reactor/build.gradle index f3e0003d..3c4a8992 100644 --- a/thoth-kafka-consumer-reactor/build.gradle +++ b/thoth-kafka-consumer-reactor/build.gradle @@ -2,6 +2,7 @@ dependencies { implementation "io.projectreactor.kafka:reactor-kafka:$reactorKafkaVersion" implementation "io.vavr:vavr:$vavrVersion" + implementation('org.slf4j:slf4j-api:2.0.7') testImplementation("com.typesafe.akka:akka-stream_$scalaVersion:$akkaVersion") testImplementation("com.typesafe.akka:akka-stream-kafka_$scalaVersion:$alpakkaKafkaVersion")