From 45229e385a41ce7d43af8c2fb67d17f42ff70a85 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexandre=20Del=C3=A8gue?= Date: Fri, 8 Dec 2023 14:44:31 +0100 Subject: [PATCH] Publishing hang --- build.gradle | 4 +- .../eventsourcing/InMemoryEventStore.java | 8 + thoth-core-reactor/build.gradle | 1 + .../eventsourcing/InMemoryEventStore.java | 144 ++++++--- .../ReactorKafkaEventPublisher.java | 216 ++++++++++---- .../src/test/java/fr/maif/Helpers.java | 16 +- .../DefaultAggregateStoreTest.java | 23 +- .../eventsourcing/EventProcessorTest.java | 54 ++-- .../KafkaEventPublisherTest.java | 278 ++++++++++++++---- .../maif/reactor/eventsourcing/SinkTest.java | 127 ++++++++ .../fr/maif/eventsourcing/EventStore.java | 2 + .../ReactivePostgresEventStore.java | 12 +- .../ReactivePostgresEventStore.java | 29 +- .../java/fr/maif/eventsourcing/SimpleDb.java | 4 + .../AbstractPostgresEventStoreTest.java | 9 + .../impl/PostgresEventStore.java | 13 +- thoth-kafka-consumer-reactor/build.gradle | 1 + 17 files changed, 726 insertions(+), 215 deletions(-) create mode 100644 thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/SinkTest.java diff --git a/build.gradle b/build.gradle index ff74302c..034539ab 100644 --- a/build.gradle +++ b/build.gradle @@ -36,8 +36,8 @@ subprojects { jooqAsyncVersion = "2.0.0" functionalJsonVersion = "1.0.3" kafkaVersion = "3.0.1" - reactorKafkaVersion = "1.3.12" - reactorVersion = "3.4.23" + reactorKafkaVersion = "1.3.22" + reactorVersion = "3.5.7" vertxSqlVersion = "4.3.3" } diff --git a/thoth-core-akka/src/main/java/fr/maif/akka/eventsourcing/InMemoryEventStore.java b/thoth-core-akka/src/main/java/fr/maif/akka/eventsourcing/InMemoryEventStore.java index b171df5c..6f1978b6 100644 --- a/thoth-core-akka/src/main/java/fr/maif/akka/eventsourcing/InMemoryEventStore.java +++ b/thoth-core-akka/src/main/java/fr/maif/akka/eventsourcing/InMemoryEventStore.java @@ -18,6 +18,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Comparator; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; @@ -55,6 +56,13 @@ public static InMemoryEventStore(system); } + @Override + public CompletionStage lastPublishedSequence() { + return CompletableFuture.completedStage(eventStore.stream().filter(e -> e.published).map(e -> e.sequenceNum) + .max(Comparator.comparingLong(e -> e)) + .orElse(0L)); + } + @Override public Publisher> loadEventsUnpublished(Tuple0 tx, ConcurrentReplayStrategy concurrentReplayStrategy) { return Source.>empty().runWith(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), system); diff --git a/thoth-core-reactor/build.gradle b/thoth-core-reactor/build.gradle index 780a4c6c..ce09d534 100644 --- a/thoth-core-reactor/build.gradle +++ b/thoth-core-reactor/build.gradle @@ -7,6 +7,7 @@ dependencies { implementation "io.projectreactor.kafka:reactor-kafka:$reactorKafkaVersion" implementation("io.vavr:vavr:$vavrVersion") implementation("io.vavr:vavr-jackson:$vavrVersion") + implementation('org.slf4j:slf4j-api:2.0.7') implementation("com.fasterxml.uuid:java-uuid-generator:3.1.5") implementation("com.fasterxml.jackson.datatype:jackson-datatype-jdk8:$jacksonVersion") implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310:$jacksonVersion") diff --git a/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/InMemoryEventStore.java b/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/InMemoryEventStore.java index f170b986..7429f5da 100644 --- a/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/InMemoryEventStore.java +++ b/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/InMemoryEventStore.java @@ -5,9 +5,11 @@ import fr.maif.eventsourcing.EventEnvelope; import fr.maif.eventsourcing.EventPublisher; import fr.maif.eventsourcing.EventStore; +import io.vavr.API; import io.vavr.Tuple; import io.vavr.Tuple0; import io.vavr.collection.List; +import io.vavr.collection.Stream; import io.vavr.control.Option; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; @@ -15,96 +17,170 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Comparator; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; -public class InMemoryEventStore implements EventStore { +public class InMemoryEventStore implements EventStore, E, Meta, Context> { - private java.util.List> eventStore = new ArrayList<>(); + ConcurrentHashMap> store = new ConcurrentHashMap<>(); + private final Supplier> markAsPublishedTx; + private final Supplier> markAsPublished; - private final Sinks.Many> queue; - private final Flux> realTimeEvents; + private final static Supplier> NOOP = () -> CompletableFuture.completedStage(Tuple.empty()); - private AtomicLong sequence_num = new AtomicLong(0); + public InMemoryEventStore(Supplier> markAsPublishedTx, + Supplier> markAsPublished, + EventEnvelope... events) { - private final ConcurrentHashMap offsets = new ConcurrentHashMap<>(); + this.markAsPublishedTx = markAsPublishedTx; + this.markAsPublished = markAsPublished; + Stream.of(events).forEach(e -> store.put(e.sequenceNum, e)); + } - public InMemoryEventStore() { - this.queue = Sinks.many().multicast().onBackpressureBuffer(10000); - this.realTimeEvents = queue.asFlux(); + public InMemoryEventStore(EventEnvelope... events) { + this(NOOP, NOOP, events); } - public static InMemoryEventStore create() { - return new InMemoryEventStore<>(); + public static InMemoryEventStore create(EventEnvelope... events) { + return new InMemoryEventStore<>(events); + } + + + public record Transaction(ArrayList> events, + ArrayList> toPublish) { + + public Transaction() { + this(new ArrayList<>(), new ArrayList<>()); + } + + public static Transaction newTx() { + return new Transaction<>(); + } + + public Tuple0 addAll(java.util.List> events) { + this.events.addAll(events); + return API.Tuple(); + } } @Override - public Publisher> loadEventsUnpublished(Tuple0 tx, ConcurrentReplayStrategy concurrentReplayStrategy) { - return Flux.empty(); + public Publisher> loadEventsUnpublished(Transaction tx, ConcurrentReplayStrategy concurrentReplayStrategy) { + return Flux.fromIterable(store.values().stream().sorted(Comparator.comparingLong(e -> e.sequenceNum)).toList()) + .filter(e -> Boolean.FALSE.equals(e.published)); } @Override - public CompletionStage> markAsPublished(Tuple0 tx, EventEnvelope eventEnvelope) { - return markAsPublished(eventEnvelope); + public CompletionStage lastPublishedSequence() { + AtomicLong max = new AtomicLong(0L); + store.values().forEach(k -> { + if (k.published) { + max.accumulateAndGet(k.sequenceNum, Math::max); + } + }); + return CompletableFuture.completedStage(max.get()); } @Override - public CompletionStage openTransaction() { - return CompletableFuture.completedStage(Tuple.empty()); + public CompletionStage> openTransaction() { + return CompletableFuture.completedStage(new Transaction<>()); } @Override - public CompletionStage commitOrRollback(Option of, Tuple0 tx) { - return CompletionStages.empty(); + public CompletionStage> markAsPublished(Transaction tx, EventEnvelope eventEnvelope) { + return markAsPublishedTx.get().thenCompose(any -> { + tx.toPublish().add(eventEnvelope); + return CompletableFuture.completedStage(eventEnvelope); + }); } +// @Override +// public CompletionStage>> markAsPublished(List> eventEnvelopes) { +// return markAsPublished.get().thenCompose(any -> { +// return CompletableFuture.completedStage(eventEnvelopes.map(eventEnvelope -> { +// return store.compute(eventEnvelope.sequenceNum, (k, event) -> { +// if (event == null) { +// return eventEnvelope.copy().withPublished(true).build(); +// } else { +// return event.copy().withPublished(true).build(); +// } +// }); +// })); +// } +// ); +// } + @Override public CompletionStage> markAsPublished(EventEnvelope eventEnvelope) { - return CompletableFuture.completedStage( - eventEnvelope.copy().withPublished(true).build() + return markAsPublished.get().thenCompose(any -> + CompletableFuture.completedStage(store.compute(eventEnvelope.sequenceNum, (k, event) -> { + if (event == null) { + return eventEnvelope.copy().withPublished(true).build(); + } else { + return event.copy().withPublished(true).build(); + } + })) ); } @Override - public CompletionStage nextSequence(Tuple0 tx) { - return CompletableFuture.completedStage(sequence_num.incrementAndGet()); + public CompletionStage persist(Transaction transactionContext, List> events) { + return CompletableFuture.completedStage(transactionContext.addAll(events.toJavaList())); + } + + @Override + public CompletionStage commitOrRollback(Option of, Transaction tx) { + if (of.isEmpty()) { + tx.events().forEach(event -> + store.put(event.sequenceNum, event) + ); + tx.toPublish.forEach(e -> + store.computeIfPresent(e.sequenceNum, (k, event) -> event.copy().withPublished(true).build()) + ); + } + tx.events.clear(); + tx.toPublish.clear(); + return CompletableFuture.completedStage(API.Tuple()); + } + + @Override + public CompletionStage nextSequence(InMemoryEventStore.Transaction tx) { + return CompletableFuture.completedStage(store.values().stream().map(e -> e.sequenceNum).max(Comparator.comparingLong(e -> e)).orElse(0L) + 1); } @Override public CompletionStage publish(List> events) { - events.forEach(queue::tryEmitNext); - return CompletionStages.empty(); + events.forEach(e -> store.put(e.sequenceNum, e)); + return CompletableFuture.completedStage(API.Tuple()); } @Override public Publisher> loadEvents(String id) { - return Flux.fromIterable(eventStore); + return Flux.fromIterable(store.values()).filter(e -> + e.entityId.equals(id) + ); } @Override public Publisher> loadAllEvents() { - return Flux.fromIterable(eventStore); + return Flux.fromIterable(store.values()); } @Override - public Publisher> loadEventsByQuery(Tuple0 tx, Query query) { + public Publisher> loadEventsByQuery(InMemoryEventStore.Transaction tx, Query query) { return loadEventsByQuery(query); } @Override public Publisher> loadEventsByQuery(Query query) { - return Flux.fromIterable(eventStore) + return Flux.fromIterable(store.values()) .filter(e -> Option.of(query.entityId).map(id -> id.equals(e.entityId)).getOrElse(true)); } - @Override - public CompletionStage persist(Tuple0 transactionContext, List> events) { - eventStore.addAll(events.toJavaList()); - return CompletionStages.empty(); - } @Override public EventPublisher eventPublisher() { diff --git a/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher.java b/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher.java index dc9123a9..2de4882c 100644 --- a/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher.java +++ b/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher.java @@ -8,15 +8,19 @@ import io.vavr.Tuple; import io.vavr.Tuple0; import io.vavr.collection.List; +import io.vavr.collection.Traversable; import io.vavr.control.Option; import org.apache.kafka.clients.producer.ProducerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.Disposable; +import reactor.core.Exceptions; import reactor.core.Scannable; import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; +import reactor.core.scheduler.Schedulers; import reactor.kafka.sender.KafkaSender; import reactor.kafka.sender.SenderOptions; import reactor.kafka.sender.SenderRecord; @@ -28,8 +32,16 @@ import java.time.Duration; import java.time.temporal.ChronoUnit; import java.util.Objects; +import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import java.util.function.Supplier; import static fr.maif.eventsourcing.EventStore.ConcurrentReplayStrategy.WAIT; @@ -37,13 +49,15 @@ public class ReactorKafkaEventPublisher implemen private final static Logger LOGGER = LoggerFactory.getLogger(ReactorKafkaEventPublisher.class); + private AtomicBoolean stop = new AtomicBoolean(false); private final String topic; + + private final ReactorQueue> reactorQueue; private final Sinks.Many> queue; + private final Flux> eventSource; private final SenderOptions> senderOptions; - private final Flux> eventsSource; private final Duration restartInterval; private final Duration maxRestartInterval; - private final Function>>, Flux>>>> groupFlow = it -> it.buffer(1000).map(List::ofAll); private Disposable killSwitch; private KafkaSender> kafkaSender; @@ -61,102 +75,188 @@ public ReactorKafkaEventPublisher(SenderOptions e = EventEnvelope.builder().build(); - - this.queue = Sinks.many().multicast().onBackpressureBuffer(queueBufferSize1); - this.eventsSource = queue.asFlux(); + this.reactorQueue = new ReactorQueue<>(queueBufferSize1); + this.queue = Sinks.many().replay().limit(queueBufferSize1); // .multicast().onBackpressureBuffer(queueBufferSize1); + this.eventSource = queue.asFlux(); this.senderOptions = senderOptions; this.kafkaSender = KafkaSender.create(senderOptions); } + record CountAndMaxSeqNum(Long count, Long lastSeqNum) { + static CountAndMaxSeqNum empty() { + return new CountAndMaxSeqNum(0L, 0L); + } + + CountAndMaxSeqNum handleSeqNum(Long lastSeqNum) { + return new CountAndMaxSeqNum(count + 1, Math.max(this.lastSeqNum, lastSeqNum)); + } + } + + + private Function, Flux>> fixedSizeGroup(int size) { + return it -> it.buffer(size).map(List::ofAll); + } + + private Function, Flux>> bufferTimeout(int size, Duration duration) { + return it -> it.bufferTimeout(size, duration, true).map(List::ofAll); + } + @Override public void start(EventStore eventStore, ConcurrentReplayStrategy concurrentReplayStrategy) { - LOGGER.info("Starting/Restarting publishing event to kafka on topic {}", topic); + LOGGER.info("Starting event publisher for topic {}", topic); Sinks.Many> logProgressSink = Sinks.many().unicast().onBackpressureBuffer(); logProgress(logProgressSink.asFlux(), 100).subscribe(); - killSwitch = Mono.fromCompletionStage(eventStore::openTransaction) - .flatMapMany(tx -> { - LOGGER.info("Replaying not published in DB for {}", topic); - ConcurrentReplayStrategy strategy = Objects.isNull(concurrentReplayStrategy) ? WAIT : concurrentReplayStrategy; - return Flux - .from(eventStore.loadEventsUnpublished(tx, strategy)) - .transform(publishToKafka(eventStore, Option.some(tx), groupFlow)) - .doOnNext(logProgressSink::tryEmitNext) - .collectList() - .flatMap(any -> Mono.fromCompletionStage(() -> { - LOGGER.info("Replaying events not published in DB is finished for {}", topic); - return eventStore.commitOrRollback(Option.none(), tx); - })) - .doOnError(e -> { - eventStore.commitOrRollback(Option.of(e), tx); - LOGGER.error("Error replaying non published events to kafka for " + topic, e); - }) - .map(__ -> Tuple.empty()); + killSwitch = Mono.defer(() -> fromCS(eventStore::openTransaction) + .flatMap(tx -> { + LOGGER.info("Replaying events not published from DB in topic {}", topic); + ConcurrentReplayStrategy strategy = Objects.isNull(concurrentReplayStrategy) ? WAIT : concurrentReplayStrategy; + return Flux + .from(eventStore.loadEventsUnpublished(tx, strategy)) + .transform(publishToKafka(eventStore, Option.some(tx), fixedSizeGroup(1000), fixedSizeGroup(1000))) + .doOnNext(logProgressSink::tryEmitNext) + .reduce(CountAndMaxSeqNum.empty(), (c, elt) -> c.handleSeqNum(elt.sequenceNum)) + .flatMap(count -> { + LOGGER.info("Replaying events not published in DB is finished for {}, {} elements published", topic, count.count); + return fromCS(() -> eventStore.commitOrRollback(Option.none(), tx)) + .thenReturn(count); + }) + .doOnError(e -> { + eventStore.commitOrRollback(Option.of(e), tx); + LOGGER.error("Error replaying non published events to kafka for " + topic, e); + }) + .flatMap(c -> { + if (c.count == 0) { + return fromCS(()-> eventStore.lastPublishedSequence()).map(l -> new CountAndMaxSeqNum(0L, l)); + } else { + return Mono.just(c); + } + }); + })) + .flux() + .concatMap(countAndLastSeqNum -> { +// Flux.defer(() -> { + LOGGER.debug("Starting consuming in memory queue for {}. Event lower than {} are ignored", topic, countAndLastSeqNum.lastSeqNum); + //return reactorQueue.asFlux() + return eventSource + .filter(e -> e.sequenceNum > countAndLastSeqNum.lastSeqNum) + .transform(publishToKafka( + eventStore, + Option.none(), + bufferTimeout(200, Duration.ofMillis(20)), + bufferTimeout(200, Duration.ofMillis(20)) + )); }) - .retryWhen(Retry.backoff(10, restartInterval) - .transientErrors(true) - .maxBackoff(maxRestartInterval) - .doBeforeRetry(ctx -> { - LOGGER.error("Error republishing events for topic %s retrying for the %s time".formatted(topic, ctx.totalRetries()), ctx.failure()); - }) - ) - .onErrorReturn(Tuple.empty()) - .switchIfEmpty(Mono.just(Tuple.empty())) - .concatMap(__ -> - this.eventsSource.transform(publishToKafka( - eventStore, - Option.none(), - it -> it - .bufferTimeout(50, Duration.ofMillis(20)) - .map(List::ofAll) - )) - ) .doOnError(e -> LOGGER.error("Error publishing events to kafka", e)) - .doOnComplete(() -> LOGGER.info("Closing publishing to {}", topic)) .retryWhen(Retry.backoff(Long.MAX_VALUE, restartInterval) .transientErrors(true) .maxBackoff(maxRestartInterval) .doBeforeRetry(ctx -> { - LOGGER.error("Error handling events for topic %s retrying for the %s time".formatted(topic, ctx.totalRetries()), ctx.failure()); + LOGGER.error("Error handling events for topic %s retrying for the %s time".formatted(topic, ctx.totalRetries() + 1), ctx.failure()); }) ) .subscribe(); } - private Function>, Flux>> publishToKafka(EventStore eventStore, Option tx, Function>>, Flux>>>> groupFlow) { + public static class ReactorQueue { + private final Queue innerQueue; + private final AtomicReference lastOnPush = new AtomicReference<>(); + private final AtomicReference> lastSubscriber = new AtomicReference<>(); + + public ReactorQueue(int capacity) { + this.innerQueue = new ArrayBlockingQueue<>(capacity); + } + + public void offer(List list) { + innerQueue.addAll(list.toJavaList()); + Runnable runnable = lastOnPush.get(); + if (runnable != null) { + runnable.run(); + } + } + + public Flux asFlux() { + return Flux.create(sink -> { + FluxSink lastSink = lastSubscriber.get(); + if (lastSink != null) { + lastSink.complete(); + } + lastSubscriber.set(sink); + AtomicLong request = new AtomicLong(); + Runnable publishToQueue = () -> { + while (!innerQueue.isEmpty() && request.get() > 0) { + sink.next(innerQueue.remove()); + } + }; + lastOnPush.set(publishToQueue); + sink.onRequest(count -> { + request.getAndAccumulate(count, Long::sum); + publishToQueue.run(); + }); + }); + } + } + + + private Function>, Flux>> publishToKafka(EventStore eventStore, + Option tx, + Function, EventEnvelope>>, Flux, EventEnvelope>>>> groupFlowForKafka, + Function>>, Flux>>>> groupFlow + ) { return it -> it .map(this::toKafkaMessage) + .transform(groupFlowForKafka) + .filter(Traversable::nonEmpty) .concatMap(events -> { LOGGER.debug("Sending event {}", events); - return kafkaSender.send(Flux.just(events)) + return kafkaSender.send(Flux.fromIterable(events)) .doOnError(e -> LOGGER.error("Error publishing to kafka ", e)); }) .transform(groupFlow) + .filter(Traversable::nonEmpty) .concatMap(m -> tx.fold( - () -> Mono.fromCompletionStage(() -> eventStore.markAsPublished(m.map(SenderResult::correlationMetadata))), - txCtx -> Mono.fromCompletionStage(() -> eventStore.markAsPublished(txCtx, m.map(SenderResult::correlationMetadata))) + () -> fromCS(() -> eventStore.markAsPublished(m.map(SenderResult::correlationMetadata))), + txCtx -> fromCS(() -> eventStore.markAsPublished(txCtx, m.map(SenderResult::correlationMetadata))) ) ) .flatMapIterable(e -> e); } + static Mono fromCS(Supplier> cs) { + return Mono.fromFuture(() -> cs.get().toCompletableFuture()); +// return Mono.create(s -> +// cs.get().whenComplete((r, e) -> { +// if (e != null) { +// s.error(e); +// } else { +// s.success(r); +// } +// }) +// ); + } + @Override public CompletionStage publish(List> events) { - LOGGER.debug("Publishing event in memory : \n{} ", events); +// LOGGER.debug("Publishing event in memory : \n{} ", events); +// return Mono.fromCallable(() -> { +// reactorQueue.offer(events); +// return Tuple.empty(); +// }).publishOn(Schedulers.boundedElastic()).toFuture(); return Flux .fromIterable(events) .map(t -> { - try { - queue.emitNext(t, Sinks.EmitFailureHandler.busyLooping(Duration.ofSeconds(1))); + queue.tryEmitNext(t).orThrow(); return Tuple.empty(); - } catch (Exception e) { - LOGGER.error("Error publishing to topic %s".formatted(topic), e); - return Tuple.empty(); - } }) + .retryWhen(Retry.fixedDelay(50, Duration.ofMillis(2)) + .transientErrors(true) + .doBeforeRetry(ctx -> { + LOGGER.error("Error publishing events in memory queue for topic %s retrying for the %s time".formatted(topic, ctx.totalRetries() + 1), ctx.failure()); + }) + ) + .onErrorResume(e -> Mono.just(Tuple.empty())) .collectList() .thenReturn(Tuple.empty()) .toFuture(); @@ -164,6 +264,7 @@ public CompletionStage publish(List> eve @Override public void close() throws IOException { + stop.set(true); if (Objects.nonNull(killSwitch) && !killSwitch.isDisposed()) { try { this.killSwitch.dispose(); @@ -199,7 +300,8 @@ private Flux logProgress(Flux logProgress, int every) { } public Integer getBufferedElementCount() { - return this.queue.scan(Scannable.Attr.BUFFERED); +// return this.queue.scan(Scannable.Attr.BUFFERED); + return 0; } } diff --git a/thoth-core-reactor/src/test/java/fr/maif/Helpers.java b/thoth-core-reactor/src/test/java/fr/maif/Helpers.java index 29c0d587..8f6e59fa 100644 --- a/thoth-core-reactor/src/test/java/fr/maif/Helpers.java +++ b/thoth-core-reactor/src/test/java/fr/maif/Helpers.java @@ -18,6 +18,8 @@ import fr.maif.json.JsonRead; import fr.maif.json.JsonSchema; import fr.maif.json.JsonWrite; +import fr.maif.reactor.eventsourcing.InMemoryEventStore; +import fr.maif.reactor.eventsourcing.InMemoryEventStore.Transaction; import io.vavr.API; import io.vavr.Lazy; import io.vavr.Tuple; @@ -301,10 +303,10 @@ public Option applyEvent(Option state, VikingEvent event) { } } - public static class VikingCommandHandler implements CommandHandler { + public static class VikingCommandHandler implements CommandHandler> { @Override - public CompletionStage>> handleCommand(Tuple0 unit, Option state, VikingCommand vikingCommand) { + public CompletionStage>> handleCommand(Transaction unit, Option state, VikingCommand vikingCommand) { return CompletableFuture.completedStage( Match(vikingCommand).of( Case(VikingCommand.CreateVikingV1.pattern(), e -> events("C", new VikingEvent.VikingCreated(e.id, e.name, e.age))), @@ -355,7 +357,7 @@ public JsonWrite eventWrite() { } - public static class VikingSnapshot implements SnapshotStore { + public static class VikingSnapshot implements SnapshotStore> { public ConcurrentHashMap data = new ConcurrentHashMap<>(); @@ -366,12 +368,12 @@ public CompletionStage> getSnapshot(String entityId) { } @Override - public CompletionStage> getSnapshot(Tuple0 transactionContext, String entityId) { + public CompletionStage> getSnapshot(Transaction transactionContext, String entityId) { return CompletableFuture.completedStage(Option.of(data.get(entityId))); } @Override - public CompletionStage persist(Tuple0 transactionContext, String id, Option state) { + public CompletionStage persist(Transaction transactionContext, String id, Option state) { Match(state).of( Case($Some($()), s -> data.put(s.id, s)), Case($None(), () -> data.remove(id)) @@ -380,11 +382,11 @@ public CompletionStage persist(Tuple0 transactionContext, String id, Opt } } - public static class VikingProjection implements Projection { + public static class VikingProjection implements Projection, VikingEvent, Tuple0, Tuple0> { public ConcurrentHashMap data = new ConcurrentHashMap<>(); @Override - public CompletionStage storeProjection(Tuple0 unit, List> events) { + public CompletionStage storeProjection(Transaction unit, List> events) { events.forEach(event -> { int i = data.getOrDefault(event.entityId, 0) + events.size(); data.put(event.entityId, i); diff --git a/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/DefaultAggregateStoreTest.java b/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/DefaultAggregateStoreTest.java index 80ce73f0..4d7a36ca 100644 --- a/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/DefaultAggregateStoreTest.java +++ b/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/DefaultAggregateStoreTest.java @@ -7,6 +7,7 @@ import fr.maif.eventsourcing.EventEnvelope; import fr.maif.eventsourcing.EventStore; import fr.maif.eventsourcing.EventStore.Query; +import fr.maif.reactor.eventsourcing.InMemoryEventStore.Transaction; import io.vavr.Tuple; import io.vavr.Tuple0; import io.vavr.collection.List; @@ -63,36 +64,36 @@ class DefaultAggregateStoreTest { @Test void testReloadEventAndBuildAggregateWithoutSnapshots() { - EventStore eventStore = mock(EventStore.class); - DefaultAggregateStore aggregateStore = new DefaultAggregateStore<>(eventStore, new Helpers.VikingEventHandler(), new EventProcessorTest.FakeTransactionManager()); + EventStore, VikingEvent, Tuple0, Tuple0> eventStore = mock(EventStore.class); + var aggregateStore = new DefaultAggregateStore<>(eventStore, new Helpers.VikingEventHandler(), new EventProcessorTest.FakeTransactionManager()); Query query = Query.builder().withEntityId(entityId).build(); - when(eventStore.loadEventsByQuery(Tuple(), query)).thenReturn(Flux.fromIterable(List.of(eventEnvelope1, eventEnvelope2))); + when(eventStore.loadEventsByQuery(any(), eq(query))).thenReturn(Flux.fromIterable(List.of(eventEnvelope1, eventEnvelope2))); - Option vikings = aggregateStore.getAggregate(Tuple.empty(), entityId).toCompletableFuture().join(); + Option vikings = aggregateStore.getAggregate(Transaction.newTx(), entityId).toCompletableFuture().join(); Assertions.assertThat(vikings).isEqualTo(Some(new Viking(entityId, "Ragnar Lodbrock", 30, 2L))); - verify(eventStore, times(1)).loadEventsByQuery(Tuple(), query); + verify(eventStore, times(1)).loadEventsByQuery(any(), eq(query)); } @Test void testReloadEventAndBuildAggregateWithSnapshots() { - EventStore eventStore = mock(EventStore.class); - DefaultAggregateStore aggregateStore = spy(new DefaultAggregateStore(eventStore, new Helpers.VikingEventHandler(), new EventProcessorTest.FakeTransactionManager()) { + EventStore, VikingEvent, Tuple0, Tuple0> eventStore = mock(EventStore.class); + DefaultAggregateStore> aggregateStore = spy(new DefaultAggregateStore>(eventStore, new Helpers.VikingEventHandler(), new EventProcessorTest.FakeTransactionManager()) { @Override - public CompletionStage> getSnapshot(Tuple0 transactionContext, String id) { + public CompletionStage> getSnapshot(Transaction transactionContext, String id) { return CompletionStages.successful(Option.some(new Viking(id, "Rollo", 30, 1L))); } }); Query query = Query.builder().withEntityId(entityId).withSequenceFrom(1L).build(); - when(eventStore.loadEventsByQuery(Tuple(), query)).thenReturn(Flux.fromIterable(List.of(eventEnvelope2))); + when(eventStore.loadEventsByQuery(any(), eq(query))).thenReturn(Flux.fromIterable(List.of(eventEnvelope2))); - Option vikings = aggregateStore.getAggregate(Tuple.empty(), entityId).toCompletableFuture().join(); + Option vikings = aggregateStore.getAggregate(Transaction.newTx(), entityId).toCompletableFuture().join(); Assertions.assertThat(vikings).isEqualTo(Some(new Viking(entityId, "Ragnar Lodbrock", 30, 2L))); - verify(eventStore, times(1)).loadEventsByQuery(Tuple(), query); + verify(eventStore, times(1)).loadEventsByQuery(any(), eq(query)); verify(aggregateStore, times(1)).getSnapshot(any(), eq(entityId)); } } \ No newline at end of file diff --git a/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/EventProcessorTest.java b/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/EventProcessorTest.java index 38c4baaa..cc82b0f7 100644 --- a/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/EventProcessorTest.java +++ b/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/EventProcessorTest.java @@ -1,15 +1,7 @@ package fr.maif.reactor.eventsourcing; -import fr.maif.eventsourcing.AggregateStore; -import fr.maif.eventsourcing.CommandHandler; -import fr.maif.eventsourcing.EventEnvelope; -import fr.maif.eventsourcing.EventProcessorImpl; -import fr.maif.eventsourcing.Events; -import fr.maif.eventsourcing.ProcessingSuccess; -import fr.maif.eventsourcing.Projection; -import fr.maif.eventsourcing.TransactionManager; -import fr.maif.reactor.eventsourcing.DefaultAggregateStore; -import fr.maif.reactor.eventsourcing.InMemoryEventStore; +import fr.maif.eventsourcing.*; +import fr.maif.reactor.eventsourcing.InMemoryEventStore.Transaction; import io.vavr.API; import io.vavr.Tuple; import io.vavr.Tuple0; @@ -24,18 +16,13 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; -import static fr.maif.Helpers.Viking; -import static fr.maif.Helpers.VikingCommand; -import static fr.maif.Helpers.VikingCommand.CreateViking; -import static fr.maif.Helpers.VikingCommand.DeleteViking; -import static fr.maif.Helpers.VikingCommand.UpdateViking; -import static fr.maif.Helpers.VikingCommandHandler; -import static fr.maif.Helpers.VikingEvent; -import static fr.maif.Helpers.VikingEventHandler; -import static fr.maif.Helpers.VikingProjection; -import static fr.maif.Helpers.VikingSnapshot; +import static fr.maif.Helpers.*; +import static fr.maif.Helpers.VikingCommand.*; import static io.vavr.API.Some; +import static org.assertj.core.api.Assertions.anyOf; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; public class EventProcessorTest { @@ -46,7 +33,7 @@ public void oneCommandShouldGenerateEventAndPersistState() { //Set up VikingSnapshot vikingSnapshot = new VikingSnapshot(); InMemoryEventStore inMemoryEventStore = InMemoryEventStore.create(); - EventProcessorImpl vikingEventProcessor = vikingEventProcessor(inMemoryEventStore, vikingSnapshot); + var vikingEventProcessor = vikingEventProcessor(inMemoryEventStore, vikingSnapshot); //Test Either> result = vikingEventProcessor.processCommand(new CreateViking("1", "ragnar", 30)).toCompletableFuture().join(); @@ -87,7 +74,7 @@ public void oneCommandShouldGenerateEventAndPersistProjection() { //Set up InMemoryEventStore inMemoryEventStore = InMemoryEventStore.create(); VikingProjection projection = new VikingProjection(); - EventProcessorImpl vikingEventProcessor = vikingEventProcessorWithProjection(inMemoryEventStore, List.of(projection)); + var vikingEventProcessor = vikingEventProcessorWithProjection(inMemoryEventStore, List.of(projection)); //Test Either> result = vikingEventProcessor.processCommand(new CreateViking("1", "ragnar", 30)).toCompletableFuture().join(); @@ -119,7 +106,7 @@ public void oneCommandShouldGenerateEventAndPersistProjection() { assertThat(eventsFromJournal).containsExactly(expectedEnvelope); - assertThat(vikingEventProcessor.getAggregateStore().getAggregate(Tuple.empty(), "1").toCompletableFuture().join()).isEqualTo(Some(expected)); + assertThat(vikingEventProcessor.getAggregateStore().getAggregate(any(), eq("1")).toCompletableFuture().join()).isEqualTo(Some(expected)); Assertions.assertThat(projection.data.get("1")).isEqualTo(1); } @@ -128,7 +115,7 @@ public void oneCommandShouldGenerateEventAndPersistProjection() { public void twoCommandShouldGenerateEventAndPersistState() { VikingSnapshot vikingSnapshot = new VikingSnapshot(); InMemoryEventStore inMemoryEventStore = InMemoryEventStore.create(); - EventProcessorImpl vikingEventProcessor = vikingEventProcessor(inMemoryEventStore, vikingSnapshot); + var vikingEventProcessor = vikingEventProcessor(inMemoryEventStore, vikingSnapshot); vikingEventProcessor.processCommand(new VikingCommand.CreateViking("1", "ragnar", 30)).toCompletableFuture().join(); Either> result = vikingEventProcessor.processCommand(new UpdateViking("1", "Ragnar Lodbrock", 30)).toCompletableFuture().join(); @@ -181,7 +168,7 @@ public void twoCommandShouldGenerateEventAndPersistState() { public void createAndDeleteShouldGenerateEventAndPersistState() { VikingSnapshot vikingSnapshot = new VikingSnapshot(); InMemoryEventStore inMemoryEventStore = InMemoryEventStore.create(); - EventProcessorImpl vikingEventProcessor = vikingEventProcessor(inMemoryEventStore, vikingSnapshot); + var vikingEventProcessor = vikingEventProcessor(inMemoryEventStore, vikingSnapshot); vikingEventProcessor.processCommand(new VikingCommand.CreateViking("1", "ragnar", 30)).toCompletableFuture().join(); Either> result = vikingEventProcessor.processCommand(new DeleteViking("1")).toCompletableFuture().join(); @@ -233,7 +220,7 @@ public void createAndDeleteShouldGenerateEventAndPersistState() { public void multipleCommandsOnSameEntityOnSameBatch() { VikingSnapshot vikingSnapshot = new VikingSnapshot(); InMemoryEventStore inMemoryEventStore = InMemoryEventStore.create(); - EventProcessorImpl vikingEventProcessor = vikingEventProcessor(inMemoryEventStore, vikingSnapshot); + var vikingEventProcessor = vikingEventProcessor(inMemoryEventStore, vikingSnapshot); var result = vikingEventProcessor.batchProcessCommand(API.List( new VikingCommand.CreateViking("1", "ragnar", 30), @@ -254,7 +241,7 @@ public void multipleCommandsOnSameEntityOnSameBatch() { } - private EventProcessorImpl vikingEventProcessor(InMemoryEventStore inMemoryEventStore, VikingSnapshot vikingSnapshot) { + private EventProcessorImpl, String, Tuple0, Tuple0> vikingEventProcessor(InMemoryEventStore inMemoryEventStore, VikingSnapshot vikingSnapshot) { return new EventProcessorImpl<>( inMemoryEventStore, new FakeTransactionManager(), @@ -266,7 +253,7 @@ private EventProcessorImpl vikingEventProcessor(CommandHandler commandHandler, InMemoryEventStore inMemoryEventStore, VikingSnapshot vikingSnapshot) { + private EventProcessorImpl, String, Tuple0, Tuple0> vikingEventProcessor(CommandHandler> commandHandler, InMemoryEventStore inMemoryEventStore, VikingSnapshot vikingSnapshot) { return new EventProcessorImpl<>( inMemoryEventStore, new FakeTransactionManager(), @@ -277,7 +264,7 @@ private EventProcessorImpl vikingEventProcessorWithProjection(InMemoryEventStore inMemoryEventStore, List> projections) { + private EventProcessorImpl, String, Tuple0, Tuple0> vikingEventProcessorWithProjection(InMemoryEventStore inMemoryEventStore, List, VikingEvent, Tuple0, Tuple0>> projections) { VikingEventHandler vikingEventHandler = new VikingEventHandler(); FakeTransactionManager fakeTransactionManager = new FakeTransactionManager(); return new EventProcessorImpl<>( @@ -291,7 +278,7 @@ private EventProcessorImpl vikingEventProcessorWithSnapshot(InMemoryEventStore inMemoryEventStore, AggregateStore aggregateStore, List> projections) { + private EventProcessorImpl, String, Tuple0, Tuple0> vikingEventProcessorWithSnapshot(InMemoryEventStore inMemoryEventStore, AggregateStore> aggregateStore, List, VikingEvent, Tuple0, Tuple0>> projections) { return new EventProcessorImpl<>( inMemoryEventStore, new FakeTransactionManager(), @@ -303,13 +290,14 @@ private EventProcessorImpl { + public static class FakeTransactionManager implements TransactionManager> { private AtomicInteger counter = new AtomicInteger(0); + @Override - public CompletionStage withTransaction(Function> callBack) { - return callBack.apply(Tuple.empty()); + public CompletionStage withTransaction(Function, CompletionStage> callBack) { + return callBack.apply(new Transaction<>()); } @Override diff --git a/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/KafkaEventPublisherTest.java b/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/KafkaEventPublisherTest.java index ebf7d4f9..28159e8f 100644 --- a/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/KafkaEventPublisherTest.java +++ b/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/KafkaEventPublisherTest.java @@ -2,10 +2,7 @@ import com.fasterxml.jackson.databind.JsonNode; import fr.maif.concurrent.CompletionStages; -import fr.maif.eventsourcing.Event; -import fr.maif.eventsourcing.EventEnvelope; -import fr.maif.eventsourcing.EventStore; -import fr.maif.eventsourcing.Type; +import fr.maif.eventsourcing.*; import fr.maif.eventsourcing.format.JacksonEventFormat; import fr.maif.eventsourcing.format.JacksonSimpleFormat; import fr.maif.json.EventEnvelopeJson; @@ -13,11 +10,11 @@ import fr.maif.kafka.JsonDeserializer; import fr.maif.kafka.JsonSerializer; import fr.maif.reactor.KafkaContainerTest; -import io.vavr.API; -import io.vavr.Tuple; -import io.vavr.Tuple0; +import io.vavr.*; import io.vavr.collection.List; +import io.vavr.collection.Stream; import io.vavr.control.Either; +import io.vavr.control.Option; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerConfig; @@ -39,18 +36,19 @@ import java.time.Duration; import java.time.LocalDateTime; import java.time.temporal.ChronoUnit; -import java.util.Map; -import java.util.Objects; -import java.util.StringJoiner; -import java.util.UUID; +import java.util.*; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; import static fr.maif.eventsourcing.EventStore.ConcurrentReplayStrategy.NO_STRATEGY; import static fr.maif.eventsourcing.EventStore.ConcurrentReplayStrategy.SKIP; +import static io.vavr.API.List; import static io.vavr.API.println; +import static java.util.function.Function.identity; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.*; @@ -66,6 +64,7 @@ public static void setUp() { @BeforeEach @AfterEach void cleanUpInit() { + sequence.set(0); deleteTopics(); } @@ -184,81 +183,234 @@ public void eventConsumptionWithEventFromDb() throws IOException, InterruptedExc publisher.close(); } +// +// @Test +// @SuppressWarnings("unchecked") +// public void testRestart() throws IOException, InterruptedException { +// +// AtomicInteger failed = new AtomicInteger(0); +// AtomicInteger streamCount = new AtomicInteger(0); +// String topic = createTopic("testRestart", 5, 1); +// ReactorKafkaEventPublisher publisher = createPublisher(topic); +// +// Supplier>> eventsFlux = () -> KafkaReceiver +// .create(receiverDefault() +// .consumerProperty(ConsumerConfig.GROUP_ID_CONFIG, "testRestart") +// .consumerProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") +// .consumerProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") +// .subscription(List.of(topic).toJavaList())) +// .receive() +// .map(ConsumerRecord::value) +// .map(KafkaEventPublisherTest::deserialize); +// +// EventEnvelope envelope1 = eventEnvelope("value 1"); +// EventEnvelope envelope2 = eventEnvelope("value 2"); +// EventEnvelope envelope3 = eventEnvelope("value 3"); +// EventEnvelope envelope4 = eventEnvelope("value 4"); +// EventEnvelope envelope5 = eventEnvelope("value 5"); +// EventEnvelope envelope6 = eventEnvelope("value 6"); +// EventEnvelope envelope7 = eventEnvelope("value 7"); +// EventEnvelope envelope8 = eventEnvelope("value 8"); +// EventEnvelope envelope9 = eventEnvelope("value 9"); +// EventStore eventStore = mock(EventStore.class); +// +// when(eventStore.loadEventsUnpublished(any(), any())) +// .thenReturn(txStream(envelope1, envelope2, envelope3)) +// .thenReturn(txStream(envelope1, envelope2, envelope3)) +// .thenReturn(emptyTxStream()); +// +// when(eventStore.markAsPublished(any(), Mockito.>>any())) +// .then(i -> { +// if (failed.incrementAndGet() > 1) { +// return CompletionStages.successful(i.getArgument(1)); +// } else { +// return CompletionStages.failed(new RuntimeException("Oups "+failed.get())); +// } +// }); +// when(eventStore.markAsPublished(Mockito.>>any())) +// .then(i -> { +// int count = streamCount.incrementAndGet(); +// Object argument = i.getArgument(0); +// System.out.println("Count "+count+" - "+argument); +// if (count == 1) { +// return CompletionStages.failed(new RuntimeException("Oups stream "+count)); +// } else { +// return CompletionStages.successful(argument); +// } +// }); +// +// publisher.start(eventStore, SKIP); +// +// Thread.sleep(200); +// +// +// CompletionStage>> results = eventsFlux.get() +// .bufferTimeout(10, Duration.ofSeconds(10)) +// .take(1) +// .timeout(Duration.of(30, ChronoUnit.SECONDS)) +// .collectList() +// .map(l -> List.ofAll(l).flatMap(identity())) +// .toFuture(); +// +// +// publisher.publish(List(envelope4, envelope5, envelope6)); +// +// List> events = results.toCompletableFuture().join(); +// +// println(events.mkString("\n")); +// +// assertThat(events).containsExactly(envelope1, envelope2, envelope3, envelope1, envelope2, envelope3, envelope4, envelope5, envelope6); +// +// verify(eventStore, times(3)).openTransaction(); +// verify(eventStore, times(2)).markAsPublished(any(), Mockito.>>any()); +// verify(eventStore, times(1)).markAsPublished(Mockito.>>any()); +// +// +// publisher.publish(List(envelope7, envelope8, envelope9)); +// List> resultsAfterCrash = eventsFlux.get() +// .bufferTimeout(12, Duration.ofSeconds(10)) +// .take(1) +// .timeout(Duration.of(30, ChronoUnit.SECONDS)) +// .collectList() +// .map(l -> List.ofAll(l).flatMap(identity())) +// .block(); +// println(resultsAfterCrash.mkString("\n")); +// +// assertThat(resultsAfterCrash).containsExactly(envelope1, envelope2, envelope3, envelope1, envelope2, envelope3, envelope4, envelope5, envelope6, envelope7, envelope8, envelope9); +// +// +// publisher.close(); +// } @Test @SuppressWarnings("unchecked") - public void testRestart() throws IOException, InterruptedException { - AtomicBoolean failed = new AtomicBoolean(false); + public void testRestartWithMock() throws IOException, InterruptedException { + + AtomicInteger failed = new AtomicInteger(0); AtomicInteger streamCount = new AtomicInteger(0); String topic = createTopic("testRestart", 5, 1); ReactorKafkaEventPublisher publisher = createPublisher(topic); - EventStore eventStore = mock(EventStore.class); - when(eventStore.openTransaction()).thenReturn(CompletionStages.successful(Tuple.empty())); - when(eventStore.commitOrRollback(any(), any())).thenReturn(CompletionStages.empty()); - EventEnvelope envelope1 = eventEnvelope("value 1"); - EventEnvelope envelope2 = eventEnvelope("value 2"); - EventEnvelope envelope3 = eventEnvelope("value 3"); - EventEnvelope envelope4 = eventEnvelope("value 4"); - EventEnvelope envelope5 = eventEnvelope("value 5"); - EventEnvelope envelope6 = eventEnvelope("value 6"); - - when(eventStore.loadEventsUnpublished(any(), any())) - .thenReturn(txStream(envelope1, envelope2, envelope3)) - .thenReturn(txStream(envelope1, envelope2, envelope3)) - .thenReturn(emptyTxStream()); - when(eventStore.markAsPublished(Mockito.>>any())).thenAnswer(in -> CompletionStages.successful(in.getArgument(0))); - when(eventStore.markAsPublished(any(), Mockito.>>any())) - .then(i -> { - if (failed.getAndSet(true)) { - return CompletionStages.successful(i.getArgument(1)); + Supplier>> eventsFlux = () -> KafkaReceiver + .create(receiverDefault() + .consumerProperty(ConsumerConfig.GROUP_ID_CONFIG, "testRestart") + .consumerProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + .consumerProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + .subscription(List.of(topic).toJavaList())) + .receive() + .doOnNext(e -> e.receiverOffset().acknowledge()) + .map(ConsumerRecord::value) + .map(KafkaEventPublisherTest::deserialize); + + EventEnvelope envelope1 = eventEnvelopeUnpublished("value 1"); + EventEnvelope envelope2 = eventEnvelopeUnpublished("value 2"); + EventEnvelope envelope3 = eventEnvelopeUnpublished("value 3"); + EventEnvelope envelope4 = eventEnvelopeUnpublished("value 4"); + EventEnvelope envelope5 = eventEnvelopeUnpublished("value 5"); + EventEnvelope envelope6 = eventEnvelopeUnpublished("value 6"); + EventEnvelope envelope7 = eventEnvelopeUnpublished("value 7"); + EventEnvelope envelope8 = eventEnvelopeUnpublished("value 8"); + EventEnvelope envelope9 = eventEnvelopeUnpublished("value 9"); + EventEnvelope envelope10 = eventEnvelopeUnpublished("value 10"); + EventEnvelope envelope11 = eventEnvelopeUnpublished("value 11"); + EventEnvelope envelope12 = eventEnvelopeUnpublished("value 12"); + + InMemoryEventStore eventStore = spy(new InMemoryEventStore<>( + () -> { + if (failed.incrementAndGet() > 1) { + return CompletionStages.successful(API.Tuple()); } else { - return CompletionStages.failed(new RuntimeException("Oups")); + return CompletionStages.failed(new RuntimeException("Oups "+failed.get())); } - }); - when(eventStore.markAsPublished(Mockito.>>any())) - .then(i -> { - if (streamCount.incrementAndGet() == 2) { - return CompletionStages.failed(new RuntimeException("Oups")); + }, () -> { + int count = streamCount.incrementAndGet(); + if (count == 1) { + return CompletionStages.failed(new RuntimeException("Oups stream "+count)); } else { - return CompletionStages.successful(i.getArgument(0)); + return CompletionStages.successful(API.Tuple()); } - }); + }, + envelope1, envelope2, envelope3 + )); publisher.start(eventStore, SKIP); Thread.sleep(200); - CompletionStage>> results = - KafkaReceiver.create(receiverDefault() - .consumerProperty(ConsumerConfig.GROUP_ID_CONFIG, "testRestart") - .consumerProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") - .subscription(List.of(topic).toJavaList())) - .receive() - .map(ConsumerRecord::value) - .map(KafkaEventPublisherTest::deserialize) - .take(8) + + CompletionStage>> results = eventsFlux.get() + .bufferTimeout(50, Duration.ofSeconds(4)) + .take(1) .timeout(Duration.of(30, ChronoUnit.SECONDS)) .collectList() - .map(List::ofAll) + .map(l -> List.ofAll(l).flatMap(identity())) .toFuture(); - - publisher.publish(API.List(envelope4, envelope5, envelope6)); + List> toPublish = List(envelope4, envelope5, envelope6); + eventStore.publish(toPublish); + publisher.publish(toPublish); List> events = results.toCompletableFuture().join(); - assertThat(events).hasSize(8); - println(events.mkString("\n")); - assertThat(events).containsExactly(envelope1, envelope2, envelope3, envelope1, envelope2, envelope3, envelope4, envelope5); - + assertThat(events).usingRecursiveFieldByFieldElementComparator().containsExactly( + // Event that were in store when publisher started + envelope1, envelope2, envelope3, + // First transaction failed so, events were replayed + envelope1, envelope2, envelope3, + // Inqueued event were published but transaction failed + // So events were replayed + envelope4, envelope5, envelope6); + assertThat(eventStore.store.values()).containsExactly(published(envelope1, envelope2, envelope3, envelope4, envelope5, envelope6)); +// + verify(eventStore, times(2)).openTransaction(); verify(eventStore, times(2)).markAsPublished(any(), Mockito.>>any()); + verify(eventStore, times(0)).markAsPublished(Mockito.>>any()); + + + + List> toPublishFailInMemory = List(envelope7, envelope8, envelope9); + eventStore.publish(toPublishFailInMemory); + publisher.publish(toPublishFailInMemory); + List> resultsAfterCrash = eventsFlux.get() + .bufferTimeout(50, Duration.ofSeconds(10)) + .take(1) + .timeout(Duration.of(30, ChronoUnit.SECONDS)) + .collectList() + .map(l -> List.ofAll(l).flatMap(identity())) + .block(); + + println(resultsAfterCrash.mkString("\n")); + assertThat(resultsAfterCrash).contains(envelope7, envelope8, envelope9); + verify(eventStore, times(3)).openTransaction(); + verify(eventStore, times(3)).markAsPublished(any(), Mockito.>>any()); + verify(eventStore, times(1)).markAsPublished(Mockito.>>any()); + + List> toPublishFailAtTheEnd = List(envelope10, envelope11, envelope12); + publisher.publish(toPublishFailAtTheEnd); + + List> resultsAfterCrashInMemory = eventsFlux.get() + .bufferTimeout(50, Duration.ofSeconds(10)) + .take(1) + .timeout(Duration.of(30, ChronoUnit.SECONDS)) + .collectList() + .map(l -> List.ofAll(l).flatMap(identity())) + .block(); + + println(resultsAfterCrashInMemory.mkString("\n")); + assertThat(resultsAfterCrashInMemory).containsExactly(envelope10, envelope11, envelope12); + + verify(eventStore, times(3)).openTransaction(); + verify(eventStore, times(3)).markAsPublished(any(), Mockito.>>any()); + verify(eventStore, times(2)).markAsPublished(Mockito.>>any()); publisher.close(); } + private EventEnvelope[] published(EventEnvelope... envelopes) { + return List.of(envelopes).map(e -> e.copy().withPublished(true).build()).toJavaArray(EventEnvelope[]::new); + } private static EventEnvelope deserialize(String event) { return EventEnvelopeJson.deserialize(event, new TestEventSerializer(), JacksonSimpleFormat.empty(), JacksonSimpleFormat.empty(), (s, o) -> { @@ -280,6 +432,18 @@ private EventEnvelope eventEnvelope(String value) { .withEvent(new TestEvent(value, entityId)) .build(); } + private EventEnvelope eventEnvelopeUnpublished(String value) { + long sequenceNum = sequence.incrementAndGet(); + String entityId = "entityId"; + return EventEnvelope.builder() + .withEmissionDate(LocalDateTime.now()) + .withId(UUID.randomUUID()) + .withEntityId(entityId) + .withSequenceNum(sequenceNum) + .withPublished(false) + .withEvent(new TestEvent(value, entityId)) + .build(); + } private ReactorKafkaEventPublisher createPublisher(String topic) { return new ReactorKafkaEventPublisher<>(producerSettings(), topic, null, Duration.of(500, ChronoUnit.MILLIS), Duration.of(30, ChronoUnit.SECONDS)); diff --git a/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/SinkTest.java b/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/SinkTest.java new file mode 100644 index 00000000..90b5aa2d --- /dev/null +++ b/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/SinkTest.java @@ -0,0 +1,127 @@ +package fr.maif.reactor.eventsourcing; + +import fr.maif.reactor.eventsourcing.ReactorKafkaEventPublisher.ReactorQueue; +import io.vavr.collection.List; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; +import reactor.util.retry.Retry; + +import java.time.Duration; +import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +public class SinkTest { + + + + + @Test + public void test() { + Sinks.Many queue = Sinks.many().replay().latest(); + AtomicBoolean failed = new AtomicBoolean(false); + AtomicBoolean failedStream = new AtomicBoolean(false); + CountDownLatch countDownLatch = new CountDownLatch(1); + + Mono.defer(() -> { + if (failed.getAndSet(true)) { + return Mono.just(""); + } else { + return Mono.error(new RuntimeException("Oups")); + } + }).flatMap(__ -> + queue.asFlux() +// .log() + .concatMap(element -> { + if (failedStream.getAndSet(true)) { + System.out.println(element); + return Mono.just(element); + } else { + return Mono.error(new RuntimeException("Oups stream")); + } + }) + .ignoreElements() + ) + .retryWhen(Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(1)) + .transientErrors(true) + .maxBackoff(Duration.ofSeconds(5)) + .doBeforeRetry(ctx -> { + System.out.println("Error handling events for topic %s retrying for the %s time".formatted("test", ctx.totalRetries() + 1)); + ctx.failure().printStackTrace(); + }) + ).subscribe(); + + + Flux.range(0, 20) + .delayElements(Duration.ofSeconds(1)) + .subscribe(n -> System.out.println(queue.tryEmitNext(n.toString()))); + + try { + countDownLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + } + @Test + public void test2() { + ReactorQueue queue = new ReactorQueue<>(10000); + AtomicBoolean failed = new AtomicBoolean(true); + AtomicBoolean failedStream = new AtomicBoolean(true); + CountDownLatch countDownLatch = new CountDownLatch(1); + + Mono.defer(() -> { + if (failed.getAndSet(true)) { + return Mono.just(""); + } else { + return Mono.error(new RuntimeException("Oups")); + } + }).flatMap(__ -> + queue.asFlux() +// .log() + .concatMap(element -> { + if (failedStream.getAndSet(true)) { + System.out.println(element); + return Mono.just(element); + } else { + return Mono.error(new RuntimeException("Oups stream")); + } + }) + .ignoreElements() + ) + .retryWhen(Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(1)) + .transientErrors(true) + .maxBackoff(Duration.ofSeconds(5)) + .doBeforeRetry(ctx -> { + System.out.println("Error handling events for topic %s retrying for the %s time".formatted("test", ctx.totalRetries() + 1)); + ctx.failure().printStackTrace(); + }) + ).subscribe(); + + + Flux.range(0, 20) +// .delayElements(Duration.ofSeconds(1)) + .doOnNext(n -> queue.offer(List.of(n.toString()))) + .collectList() + .block(); + Flux.range(0, 20) + .delayElements(Duration.ofSeconds(1)) + .doOnNext(n -> queue.offer(List.of(n.toString()))) + .collectList() + .block(); + + try { + countDownLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + } +} diff --git a/thoth-core/src/main/java/fr/maif/eventsourcing/EventStore.java b/thoth-core/src/main/java/fr/maif/eventsourcing/EventStore.java index b66df349..93499169 100644 --- a/thoth-core/src/main/java/fr/maif/eventsourcing/EventStore.java +++ b/thoth-core/src/main/java/fr/maif/eventsourcing/EventStore.java @@ -15,6 +15,8 @@ public interface EventStore { CompletionStage persist(TxCtx transactionContext, List> events); + CompletionStage lastPublishedSequence(); + Publisher> loadEventsUnpublished(TxCtx tx, ConcurrentReplayStrategy concurrentReplayStrategy); Publisher> loadEventsByQuery(TxCtx tx, Query query); diff --git a/thoth-jooq-akka/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java b/thoth-jooq-akka/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java index f6dd2794..b7d53fd3 100644 --- a/thoth-jooq-akka/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java +++ b/thoth-jooq-akka/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java @@ -43,8 +43,7 @@ import java.util.concurrent.CompletionStage; import static java.util.function.Function.identity; -import static org.jooq.impl.DSL.field; -import static org.jooq.impl.DSL.table; +import static org.jooq.impl.DSL.*; public class ReactivePostgresEventStore implements EventStore, Closeable { @@ -209,6 +208,15 @@ public Publisher> loadEventsUnpublished(PgAsyncT })).map(this::rsToEnvelope).runWith(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), system); } + @Override + public CompletionStage lastPublishedSequence() { + return this.pgAsyncPool.queryOne(dsl -> dsl. + select(max(SEQUENCE_NUM).as("max")) + .from(table(this.tableNames.tableName)) + .where(PUBLISHED.eq(true)) + ).thenApply(mayBe -> mayBe.map(q -> q.get(0, Long.class)).getOrElse(0L)); + } + @Override public Publisher> loadEventsByQuery(PgAsyncTransaction tx, Query query) { diff --git a/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java b/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java index 5173af1a..bef79d32 100644 --- a/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java +++ b/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java @@ -18,13 +18,7 @@ import io.vavr.control.Either; import io.vavr.control.Option; import io.vavr.control.Try; -import org.jooq.Condition; -import org.jooq.Converter; -import org.jooq.DSLContext; -import org.jooq.Field; -import org.jooq.JSONB; -import org.jooq.Record15; -import org.jooq.SelectSeekStep1; +import org.jooq.*; import org.jooq.impl.SQLDataType; import org.reactivestreams.Publisher; import org.slf4j.LoggerFactory; @@ -41,8 +35,7 @@ import java.util.function.Function; import static java.util.function.Function.identity; -import static org.jooq.impl.DSL.field; -import static org.jooq.impl.DSL.table; +import static org.jooq.impl.DSL.*; public class ReactivePostgresEventStore implements EventStore, Closeable { @@ -102,6 +95,11 @@ public CompletionStage execute(Function count(Function>> queryFunction) { + return pgAsyncPool.queryOne(queryFunction).thenApply(opt -> opt.map(qr -> qr.get(0, Long.class)).getOrElse(0L)); + } + @Override public CompletionStage begin() { return pgAsyncPool.begin(); @@ -122,6 +120,11 @@ public CompletionStage execute(Function count(Function>> queryFunction) { + return pgAsyncPool.queryOne(queryFunction).thenApply(opt -> opt.map(qr -> qr.get(0, Long.class)).getOrElse(0L)); + } + @Override public CompletionStage begin() { return pgAsyncPool.beginMono().toFuture(); @@ -203,6 +206,14 @@ public CompletionStage persist(Tx transactionContext, List lastPublishedSequence() { + return this.simpleDb.count(dsl -> dsl. + select(max(SEQUENCE_NUM)) + .from(table(this.tableNames.tableName)) + .where(PUBLISHED.eq(true))); + } + @Override public Publisher> loadEventsUnpublished(Tx transaction, ConcurrentReplayStrategy concurrentReplayStrategy) { return Flux.from(transaction.stream(500, dsl -> { diff --git a/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/SimpleDb.java b/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/SimpleDb.java index 2336b3d6..11c3fb2c 100644 --- a/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/SimpleDb.java +++ b/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/SimpleDb.java @@ -2,12 +2,16 @@ import org.jooq.DSLContext; import org.jooq.Query; +import org.jooq.Record1; +import org.jooq.ResultQuery; import java.util.concurrent.CompletionStage; import java.util.function.Function; public interface SimpleDb { + CompletionStage count(Function>> queryFunction); + CompletionStage execute(Function queryFunction); CompletionStage begin(); } diff --git a/thoth-jooq-reactor/src/test/java/fr/maif/eventsourcing/AbstractPostgresEventStoreTest.java b/thoth-jooq-reactor/src/test/java/fr/maif/eventsourcing/AbstractPostgresEventStoreTest.java index 8a331946..78f248d0 100644 --- a/thoth-jooq-reactor/src/test/java/fr/maif/eventsourcing/AbstractPostgresEventStoreTest.java +++ b/thoth-jooq-reactor/src/test/java/fr/maif/eventsourcing/AbstractPostgresEventStoreTest.java @@ -179,6 +179,15 @@ public void queryingByPublished() { } + @Test + public void lastPublishedSeqNum() { + initDatas(); + postgresEventStore.markAsPublished(List(event1, event2, event3)).toCompletableFuture().join(); + Long lastSeqNum = postgresEventStore.lastPublishedSequence().toCompletableFuture().join(); + assertThat(lastSeqNum).isEqualTo(4L); + } + + @Test public void loadEventsUnpublished() { initDatas(); diff --git a/thoth-jooq/src/main/java/fr/maif/eventsourcing/impl/PostgresEventStore.java b/thoth-jooq/src/main/java/fr/maif/eventsourcing/impl/PostgresEventStore.java index 8eac89a4..2cbca43e 100644 --- a/thoth-jooq/src/main/java/fr/maif/eventsourcing/impl/PostgresEventStore.java +++ b/thoth-jooq/src/main/java/fr/maif/eventsourcing/impl/PostgresEventStore.java @@ -46,9 +46,7 @@ import static io.vavr.API.List; import static io.vavr.API.Seq; import static java.util.function.Function.identity; -import static org.jooq.impl.DSL.field; -import static org.jooq.impl.DSL.name; -import static org.jooq.impl.DSL.table; +import static org.jooq.impl.DSL.*; public class PostgresEventStore implements EventStore, Closeable { @@ -262,6 +260,15 @@ public CompletionStage> markAsPublished(Connecti return markAsPublished(tx, List(eventEnvelope)).thenApply(Traversable::head); } + @Override + public CompletionStage lastPublishedSequence() { + return sql.select(max(SEQUENCE_NUM).as("max")) + .from(table(this.tableNames.tableName)) + .where(PUBLISHED.eq(true)) + .fetchAsync(executor) + .thenApply(r -> r.getValues("max", Long.class).stream().findFirst().orElse(0L)); + } + @Override public Publisher> loadEventsUnpublished(Connection c, ConcurrentReplayStrategy concurrentReplayStrategy) { String tmpQuery = SELECT_CLAUSE + diff --git a/thoth-kafka-consumer-reactor/build.gradle b/thoth-kafka-consumer-reactor/build.gradle index f3e0003d..3c4a8992 100644 --- a/thoth-kafka-consumer-reactor/build.gradle +++ b/thoth-kafka-consumer-reactor/build.gradle @@ -2,6 +2,7 @@ dependencies { implementation "io.projectreactor.kafka:reactor-kafka:$reactorKafkaVersion" implementation "io.vavr:vavr:$vavrVersion" + implementation('org.slf4j:slf4j-api:2.0.7') testImplementation("com.typesafe.akka:akka-stream_$scalaVersion:$akkaVersion") testImplementation("com.typesafe.akka:akka-stream-kafka_$scalaVersion:$alpakkaKafkaVersion")