Skip to content

Commit

Permalink
More control on publisher queue (#60)
Browse files Browse the repository at this point in the history
* More control on event publishing
  • Loading branch information
larousso authored Nov 17, 2023
1 parent eea8de8 commit b176290
Show file tree
Hide file tree
Showing 9 changed files with 97 additions and 28 deletions.
3 changes: 3 additions & 0 deletions .sdkmanrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Enable auto-env through the sdkman_auto_env config
# Add key=value pairs of SDKs to use below
java=17.0.7-tem
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@
import akka.stream.javadsl.*;
import fr.maif.eventsourcing.Event;
import fr.maif.eventsourcing.EventEnvelope;
import fr.maif.eventsourcing.EventPublisher;
import fr.maif.eventsourcing.EventStore;
import io.vavr.Tuple;
import io.vavr.Tuple0;
import io.vavr.collection.List;
import io.vavr.control.Option;
import org.reactivestreams.Publisher;

import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
Expand Down Expand Up @@ -121,4 +123,20 @@ public CompletionStage<Tuple0> persist(Tuple0 transactionContext, List<EventEnve
eventStore.addAll(events.toJavaList());
return CompletableFuture.supplyAsync(Tuple::empty);
}

@Override
public EventPublisher<E, Meta, Context> eventPublisher() {
var _this = this;
return new EventPublisher<E, Meta, Context>() {
@Override
public CompletionStage<Tuple0> publish(List<EventEnvelope<E, Meta, Context>> events) {
return _this.publish(events);
}

@Override
public void close() throws IOException {

}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import fr.maif.concurrent.CompletionStages;
import fr.maif.eventsourcing.Event;
import fr.maif.eventsourcing.EventEnvelope;
import fr.maif.eventsourcing.EventPublisher;
import fr.maif.eventsourcing.EventStore;
import io.vavr.Tuple;
import io.vavr.Tuple0;
Expand All @@ -12,6 +13,7 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
Expand Down Expand Up @@ -103,4 +105,20 @@ public CompletionStage<Tuple0> persist(Tuple0 transactionContext, List<EventEnve
eventStore.addAll(events.toJavaList());
return CompletionStages.empty();
}

@Override
public EventPublisher<E, Meta, Context> eventPublisher() {
var _this = this;
return new EventPublisher<E, Meta, Context>() {
@Override
public CompletionStage<Tuple0> publish(List<EventEnvelope<E, Meta, Context>> events) {
return _this.publish(events);
}

@Override
public void close() throws IOException {

}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Scannable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
Expand Down Expand Up @@ -57,8 +58,8 @@ public ReactorKafkaEventPublisher(SenderOptions<String, EventEnvelope<E, Meta, C
public ReactorKafkaEventPublisher(SenderOptions<String, EventEnvelope<E, Meta, Context>> senderOptions, String topic, Integer queueBufferSize, Duration restartInterval, Duration maxRestartInterval) {
this.topic = topic;
int queueBufferSize1 = queueBufferSize == null ? 10000 : queueBufferSize;
this.restartInterval = restartInterval == null ? Duration.of(10, ChronoUnit.SECONDS) : restartInterval;
this.maxRestartInterval = maxRestartInterval == null ? Duration.of(30, ChronoUnit.MINUTES) : maxRestartInterval;
this.restartInterval = restartInterval == null ? Duration.of(1, ChronoUnit.SECONDS) : restartInterval;
this.maxRestartInterval = maxRestartInterval == null ? Duration.of(1, ChronoUnit.MINUTES) : maxRestartInterval;

EventEnvelope<E, Meta, Context> e = EventEnvelope.<E, Meta, Context>builder().build();

Expand Down Expand Up @@ -105,18 +106,26 @@ public <TxCtx> void start(EventStore<TxCtx, E, Meta, Context> eventStore, Concur
.doOnComplete(() -> LOGGER.info("Closing publishing to {}", topic))
.retryWhen(Retry.backoff(Long.MAX_VALUE, restartInterval)
.transientErrors(true)
.maxBackoff(maxRestartInterval))
.maxBackoff(maxRestartInterval)
.doBeforeRetry(ctx -> {
LOGGER.error("Error handling events for topic %s retrying for the %s time".formatted(topic, ctx.totalRetries()), ctx.failure());
})
)
.subscribe();
}


private <TxCtx> Function<Flux<EventEnvelope<E, Meta, Context>>, Flux<EventEnvelope<E, Meta, Context>>> publishToKafka(EventStore<TxCtx, E, Meta, Context> eventStore, Option<TxCtx> tx, Function<Flux<SenderResult<EventEnvelope<E, Meta, Context>>>, Flux<List<SenderResult<EventEnvelope<E, Meta, Context>>>>> groupFlow) {
Function<Flux<SenderRecord<String, EventEnvelope<E, Meta, Context>, EventEnvelope<E, Meta, Context>>>, Flux<SenderResult<EventEnvelope<E, Meta, Context>>>> publishToKafkaFlow = it -> kafkaSender.send(it);
Function<Flux<SenderRecord<String, EventEnvelope<E, Meta, Context>, EventEnvelope<E, Meta, Context>>>, Flux<SenderResult<EventEnvelope<E, Meta, Context>>>> publishToKafkaFlow = it ->
kafkaSender.send(it)
.doOnError(e -> {
LOGGER.error("Error publishing to kafka ", e);
});
return it -> it
.map(this::toKafkaMessage)
.transform(publishToKafkaFlow)
.transform(groupFlow)
.flatMap(m ->
.concatMap(m ->
tx.fold(
() -> Mono.fromCompletionStage(() -> eventStore.markAsPublished(m.map(SenderResult::correlationMetadata))),
txCtx -> Mono.fromCompletionStage(() -> eventStore.markAsPublished(txCtx, m.map(SenderResult::correlationMetadata)))
Expand All @@ -130,7 +139,23 @@ public CompletionStage<Tuple0> publish(List<EventEnvelope<E, Meta, Context>> eve
LOGGER.debug("Publishing event in memory : \n{} ", events);
return Flux
.fromIterable(events)
.map(queue::tryEmitNext)
.concatMap(t ->
Mono.defer(() -> {
Sinks.EmitResult emitResult = queue.tryEmitNext(t);
if (emitResult.isFailure()) {
return Mono.error(new RuntimeException("Error publishing to queue for %s : %s".formatted(topic, emitResult)));
} else {
return Mono.just("");
}
})
.retryWhen(Retry
.backoff(5, Duration.ofMillis(500))
.doBeforeRetry(ctx -> {
LOGGER.error("Error publishing to queue %s retrying for the %s time".formatted(topic, ctx.totalRetries()), ctx.failure());
})
)
.onErrorReturn("")
)
.collectList()
.thenReturn(Tuple.empty())
.toFuture();
Expand All @@ -139,7 +164,11 @@ public CompletionStage<Tuple0> publish(List<EventEnvelope<E, Meta, Context>> eve
@Override
public void close() throws IOException {
if (Objects.nonNull(killSwitch)) {
this.killSwitch.dispose();
try {
this.killSwitch.dispose();
} catch (UnsupportedOperationException e) {
LOGGER.error("Error closing Publisher", e);
}
}
this.kafkaSender.close();
}
Expand Down Expand Up @@ -168,4 +197,8 @@ private <Any> Flux<Integer> logProgress(Flux<Any> logProgress, int every) {
});
}

public Integer getBufferedElementCount() {
return this.queue.scan(Scannable.Attr.BUFFERED);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.*;
import org.mockito.Mockito;
import org.reactivestreams.Publisher;
import reactor.kafka.sender.SenderOptions;
Expand All @@ -44,30 +39,18 @@
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.StringJoiner;
import java.util.UUID;
import java.util.*;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import static fr.maif.eventsourcing.EventStore.ConcurrentReplayStrategy.NO_STRATEGY;
import static fr.maif.eventsourcing.EventStore.ConcurrentReplayStrategy.SKIP;
import static io.vavr.API.println;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class KafkaEventPublisherTest extends BaseKafkaTest {
Expand All @@ -80,7 +63,7 @@ public class KafkaEventPublisherTest extends BaseKafkaTest {
}

@BeforeEach
void cleanUpInit() throws ExecutionException, InterruptedException, TimeoutException {
void cleanUpInit() {
setUpAdminClient();
try {
Set<String> topics = adminClient().listTopics().names().get(5, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ default CompletionStage<List<EventEnvelope<E, Meta, Context>>> markAsPublished(L

CompletionStage<Tuple0> commitOrRollback(Option<Throwable> of, TxCtx tx);

EventPublisher<E, Meta, Context> eventPublisher();

/**
* Strategy to choose when replaying journal in case of crash when there is two or more nodes that want to replay concurrently.
* <ul>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,4 +390,8 @@ public Class<JsonNode> toType() {
}
}

@Override
public EventPublisher<E, Meta, Context> eventPublisher() {
return eventPublisher;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -411,4 +411,8 @@ public Class<JsonNode> toType() {
}
}

@Override
public EventPublisher<E, Meta, Context> eventPublisher() {
return eventPublisher;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -372,4 +372,8 @@ private Option<JsonNode> readValue(String value) {
.flatMap(str -> Try.of(() -> objectMapper.readTree(str)).toOption());
}

@Override
public EventPublisher<E, Meta, Context> eventPublisher() {
return eventPublisher;
}
}

0 comments on commit b176290

Please sign in to comment.