diff --git a/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher.java b/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher.java index 2de4882c..60630d2c 100644 --- a/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher.java +++ b/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher.java @@ -14,13 +14,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.Disposable; -import reactor.core.Exceptions; -import reactor.core.Scannable; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; -import reactor.core.scheduler.Schedulers; import reactor.kafka.sender.KafkaSender; import reactor.kafka.sender.SenderOptions; import reactor.kafka.sender.SenderRecord; @@ -32,14 +28,8 @@ import java.time.Duration; import java.time.temporal.ChronoUnit; import java.util.Objects; -import java.util.Queue; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CompletionStage; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Supplier; @@ -51,8 +41,6 @@ public class ReactorKafkaEventPublisher implemen private AtomicBoolean stop = new AtomicBoolean(false); private final String topic; - - private final ReactorQueue> reactorQueue; private final Sinks.Many> queue; private final Flux> eventSource; private final SenderOptions> senderOptions; @@ -75,7 +63,6 @@ public ReactorKafkaEventPublisher(SenderOptions(queueBufferSize1); this.queue = Sinks.many().replay().limit(queueBufferSize1); // .multicast().onBackpressureBuffer(queueBufferSize1); this.eventSource = queue.asFlux(); this.senderOptions = senderOptions; @@ -127,7 +114,7 @@ public void start(EventStore eventStore, Concur }) .flatMap(c -> { if (c.count == 0) { - return fromCS(()-> eventStore.lastPublishedSequence()).map(l -> new CountAndMaxSeqNum(0L, l)); + return fromCS(eventStore::lastPublishedSequence).map(l -> new CountAndMaxSeqNum(0L, l)); } else { return Mono.just(c); } @@ -158,47 +145,6 @@ public void start(EventStore eventStore, Concur .subscribe(); } - - public static class ReactorQueue { - private final Queue innerQueue; - private final AtomicReference lastOnPush = new AtomicReference<>(); - private final AtomicReference> lastSubscriber = new AtomicReference<>(); - - public ReactorQueue(int capacity) { - this.innerQueue = new ArrayBlockingQueue<>(capacity); - } - - public void offer(List list) { - innerQueue.addAll(list.toJavaList()); - Runnable runnable = lastOnPush.get(); - if (runnable != null) { - runnable.run(); - } - } - - public Flux asFlux() { - return Flux.create(sink -> { - FluxSink lastSink = lastSubscriber.get(); - if (lastSink != null) { - lastSink.complete(); - } - lastSubscriber.set(sink); - AtomicLong request = new AtomicLong(); - Runnable publishToQueue = () -> { - while (!innerQueue.isEmpty() && request.get() > 0) { - sink.next(innerQueue.remove()); - } - }; - lastOnPush.set(publishToQueue); - sink.onRequest(count -> { - request.getAndAccumulate(count, Long::sum); - publishToQueue.run(); - }); - }); - } - } - - private Function>, Flux>> publishToKafka(EventStore eventStore, Option tx, Function, EventEnvelope>>, Flux, EventEnvelope>>>> groupFlowForKafka, diff --git a/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/SinkTest.java b/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/SinkTest.java deleted file mode 100644 index 90b5aa2d..00000000 --- a/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/SinkTest.java +++ /dev/null @@ -1,127 +0,0 @@ -package fr.maif.reactor.eventsourcing; - -import fr.maif.reactor.eventsourcing.ReactorKafkaEventPublisher.ReactorQueue; -import io.vavr.collection.List; -import org.junit.jupiter.api.Test; -import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; -import reactor.core.publisher.Mono; -import reactor.core.publisher.Sinks; -import reactor.util.retry.Retry; - -import java.time.Duration; -import java.util.Queue; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Supplier; - -public class SinkTest { - - - - - @Test - public void test() { - Sinks.Many queue = Sinks.many().replay().latest(); - AtomicBoolean failed = new AtomicBoolean(false); - AtomicBoolean failedStream = new AtomicBoolean(false); - CountDownLatch countDownLatch = new CountDownLatch(1); - - Mono.defer(() -> { - if (failed.getAndSet(true)) { - return Mono.just(""); - } else { - return Mono.error(new RuntimeException("Oups")); - } - }).flatMap(__ -> - queue.asFlux() -// .log() - .concatMap(element -> { - if (failedStream.getAndSet(true)) { - System.out.println(element); - return Mono.just(element); - } else { - return Mono.error(new RuntimeException("Oups stream")); - } - }) - .ignoreElements() - ) - .retryWhen(Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(1)) - .transientErrors(true) - .maxBackoff(Duration.ofSeconds(5)) - .doBeforeRetry(ctx -> { - System.out.println("Error handling events for topic %s retrying for the %s time".formatted("test", ctx.totalRetries() + 1)); - ctx.failure().printStackTrace(); - }) - ).subscribe(); - - - Flux.range(0, 20) - .delayElements(Duration.ofSeconds(1)) - .subscribe(n -> System.out.println(queue.tryEmitNext(n.toString()))); - - try { - countDownLatch.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - - } - @Test - public void test2() { - ReactorQueue queue = new ReactorQueue<>(10000); - AtomicBoolean failed = new AtomicBoolean(true); - AtomicBoolean failedStream = new AtomicBoolean(true); - CountDownLatch countDownLatch = new CountDownLatch(1); - - Mono.defer(() -> { - if (failed.getAndSet(true)) { - return Mono.just(""); - } else { - return Mono.error(new RuntimeException("Oups")); - } - }).flatMap(__ -> - queue.asFlux() -// .log() - .concatMap(element -> { - if (failedStream.getAndSet(true)) { - System.out.println(element); - return Mono.just(element); - } else { - return Mono.error(new RuntimeException("Oups stream")); - } - }) - .ignoreElements() - ) - .retryWhen(Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(1)) - .transientErrors(true) - .maxBackoff(Duration.ofSeconds(5)) - .doBeforeRetry(ctx -> { - System.out.println("Error handling events for topic %s retrying for the %s time".formatted("test", ctx.totalRetries() + 1)); - ctx.failure().printStackTrace(); - }) - ).subscribe(); - - - Flux.range(0, 20) -// .delayElements(Duration.ofSeconds(1)) - .doOnNext(n -> queue.offer(List.of(n.toString()))) - .collectList() - .block(); - Flux.range(0, 20) - .delayElements(Duration.ofSeconds(1)) - .doOnNext(n -> queue.offer(List.of(n.toString()))) - .collectList() - .block(); - - try { - countDownLatch.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - - } -} diff --git a/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java b/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java index bef79d32..377927d3 100644 --- a/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java +++ b/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java @@ -97,7 +97,7 @@ public CompletionStage execute(Function count(Function>> queryFunction) { - return pgAsyncPool.queryOne(queryFunction).thenApply(opt -> opt.map(qr -> qr.get(0, Long.class)).getOrElse(0L)); + return pgAsyncPool.queryOne(queryFunction).thenApply(opt -> opt.flatMap(qr -> Option.of(qr.get(0, Long.class))).getOrElse(0L)); } @Override @@ -122,7 +122,7 @@ public CompletionStage execute(Function count(Function>> queryFunction) { - return pgAsyncPool.queryOne(queryFunction).thenApply(opt -> opt.map(qr -> qr.get(0, Long.class)).getOrElse(0L)); + return pgAsyncPool.queryOne(queryFunction).thenApply(opt -> opt.flatMap(qr -> Option.of(qr.get(0, Long.class))).getOrElse(0L)); } @Override