Skip to content

Commit

Permalink
Publishing hang
Browse files Browse the repository at this point in the history
  • Loading branch information
larousso committed Dec 8, 2023
1 parent f6083a9 commit cd3a9a6
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 184 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.Scannable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;
Expand All @@ -32,14 +28,8 @@
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;

Expand All @@ -51,8 +41,6 @@ public class ReactorKafkaEventPublisher<E extends Event, Meta, Context> implemen

private AtomicBoolean stop = new AtomicBoolean(false);
private final String topic;

private final ReactorQueue<EventEnvelope<E, Meta, Context>> reactorQueue;
private final Sinks.Many<EventEnvelope<E, Meta, Context>> queue;
private final Flux<EventEnvelope<E, Meta, Context>> eventSource;
private final SenderOptions<String, EventEnvelope<E, Meta, Context>> senderOptions;
Expand All @@ -75,7 +63,6 @@ public ReactorKafkaEventPublisher(SenderOptions<String, EventEnvelope<E, Meta, C
this.restartInterval = restartInterval == null ? Duration.of(1, ChronoUnit.SECONDS) : restartInterval;
this.maxRestartInterval = maxRestartInterval == null ? Duration.of(1, ChronoUnit.MINUTES) : maxRestartInterval;

this.reactorQueue = new ReactorQueue<>(queueBufferSize1);
this.queue = Sinks.many().replay().limit(queueBufferSize1); // .multicast().onBackpressureBuffer(queueBufferSize1);
this.eventSource = queue.asFlux();
this.senderOptions = senderOptions;
Expand Down Expand Up @@ -127,7 +114,7 @@ public <TxCtx> void start(EventStore<TxCtx, E, Meta, Context> eventStore, Concur
})
.flatMap(c -> {
if (c.count == 0) {
return fromCS(()-> eventStore.lastPublishedSequence()).map(l -> new CountAndMaxSeqNum(0L, l));
return fromCS(eventStore::lastPublishedSequence).map(l -> new CountAndMaxSeqNum(0L, l));
} else {
return Mono.just(c);
}
Expand Down Expand Up @@ -158,47 +145,6 @@ public <TxCtx> void start(EventStore<TxCtx, E, Meta, Context> eventStore, Concur
.subscribe();
}


public static class ReactorQueue<E> {
private final Queue<E> innerQueue;
private final AtomicReference<Runnable> lastOnPush = new AtomicReference<>();
private final AtomicReference<FluxSink<E>> lastSubscriber = new AtomicReference<>();

public ReactorQueue(int capacity) {
this.innerQueue = new ArrayBlockingQueue<>(capacity);
}

public void offer(List<E> list) {
innerQueue.addAll(list.toJavaList());
Runnable runnable = lastOnPush.get();
if (runnable != null) {
runnable.run();
}
}

public Flux<E> asFlux() {
return Flux.create(sink -> {
FluxSink<E> lastSink = lastSubscriber.get();
if (lastSink != null) {
lastSink.complete();
}
lastSubscriber.set(sink);
AtomicLong request = new AtomicLong();
Runnable publishToQueue = () -> {
while (!innerQueue.isEmpty() && request.get() > 0) {
sink.next(innerQueue.remove());
}
};
lastOnPush.set(publishToQueue);
sink.onRequest(count -> {
request.getAndAccumulate(count, Long::sum);
publishToQueue.run();
});
});
}
}


private <TxCtx> Function<Flux<EventEnvelope<E, Meta, Context>>, Flux<EventEnvelope<E, Meta, Context>>> publishToKafka(EventStore<TxCtx, E, Meta, Context> eventStore,
Option<TxCtx> tx,
Function<Flux<SenderRecord<String, EventEnvelope<E, Meta, Context>, EventEnvelope<E, Meta, Context>>>, Flux<List<SenderRecord<String, EventEnvelope<E, Meta, Context>, EventEnvelope<E, Meta, Context>>>>> groupFlowForKafka,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public CompletionStage<Integer> execute(Function<DSLContext, ? extends org.jooq.

@Override
public CompletionStage<Long> count(Function<DSLContext, ? extends ResultQuery<Record1<Long>>> queryFunction) {
return pgAsyncPool.queryOne(queryFunction).thenApply(opt -> opt.map(qr -> qr.get(0, Long.class)).getOrElse(0L));
return pgAsyncPool.queryOne(queryFunction).thenApply(opt -> opt.flatMap(qr -> Option.of(qr.get(0, Long.class))).getOrElse(0L));
}

@Override
Expand All @@ -122,7 +122,7 @@ public CompletionStage<Integer> execute(Function<DSLContext, ? extends org.jooq.

@Override
public CompletionStage<Long> count(Function<DSLContext, ? extends ResultQuery<Record1<Long>>> queryFunction) {
return pgAsyncPool.queryOne(queryFunction).thenApply(opt -> opt.map(qr -> qr.get(0, Long.class)).getOrElse(0L));
return pgAsyncPool.queryOne(queryFunction).thenApply(opt -> opt.flatMap(qr -> Option.of(qr.get(0, Long.class))).getOrElse(0L));
}

@Override
Expand Down

0 comments on commit cd3a9a6

Please sign in to comment.