Skip to content

Commit

Permalink
Publishing hang
Browse files Browse the repository at this point in the history
  • Loading branch information
larousso committed Dec 8, 2023
1 parent cd3a9a6 commit 73976b9
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 187 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
public class InMemoryEventStore<E extends Event, Meta, Context> implements EventStore<InMemoryEventStore.Transaction<E, Meta, Context>, E, Meta, Context> {

ConcurrentHashMap<Long, EventEnvelope<E, Meta, Context>> store = new ConcurrentHashMap<>();

AtomicLong sequenceNums = new AtomicLong(0);
private final Supplier<CompletionStage<Tuple0>> markAsPublishedTx;
private final Supplier<CompletionStage<Tuple0>> markAsPublished;

Expand All @@ -45,6 +47,7 @@ public InMemoryEventStore(EventEnvelope<E, Meta, Context>... events) {
this(NOOP, NOOP, events);
}

@SafeVarargs
public static <E extends Event, Meta, Context> InMemoryEventStore<E, Meta, Context> create(EventEnvelope<E, Meta, Context>... events) {
return new InMemoryEventStore<>(events);
}
Expand Down Expand Up @@ -97,22 +100,6 @@ public CompletionStage<EventEnvelope<E, Meta, Context>> markAsPublished(Transact
});
}

// @Override
// public CompletionStage<List<EventEnvelope<E, Meta, Context>>> markAsPublished(List<EventEnvelope<E, Meta, Context>> eventEnvelopes) {
// return markAsPublished.get().thenCompose(any -> {
// return CompletableFuture.completedStage(eventEnvelopes.map(eventEnvelope -> {
// return store.compute(eventEnvelope.sequenceNum, (k, event) -> {
// if (event == null) {
// return eventEnvelope.copy().withPublished(true).build();
// } else {
// return event.copy().withPublished(true).build();
// }
// });
// }));
// }
// );
// }

@Override
public CompletionStage<EventEnvelope<E, Meta, Context>> markAsPublished(EventEnvelope<E, Meta, Context> eventEnvelope) {
return markAsPublished.get().thenCompose(any ->
Expand Down Expand Up @@ -148,7 +135,9 @@ public CompletionStage<Tuple0> commitOrRollback(Option<Throwable> of, Transactio

@Override
public CompletionStage<Long> nextSequence(InMemoryEventStore.Transaction<E, Meta, Context> tx) {
return CompletableFuture.completedStage(store.values().stream().map(e -> e.sequenceNum).max(Comparator.comparingLong(e -> e)).orElse(0L) + 1);
long value = store.values().stream().map(e -> e.sequenceNum).max(Comparator.comparingLong(e -> e)).orElse(0L) + 1;
sequenceNums.incrementAndGet();
return CompletableFuture.completedStage(sequenceNums.accumulateAndGet(value, Math::max));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,31 +172,17 @@ private <TxCtx> Function<Flux<EventEnvelope<E, Meta, Context>>, Flux<EventEnvelo

static <T> Mono<T> fromCS(Supplier<CompletionStage<T>> cs) {
return Mono.fromFuture(() -> cs.get().toCompletableFuture());
// return Mono.create(s ->
// cs.get().whenComplete((r, e) -> {
// if (e != null) {
// s.error(e);
// } else {
// s.success(r);
// }
// })
// );
}

@Override
public CompletionStage<Tuple0> publish(List<EventEnvelope<E, Meta, Context>> events) {
// LOGGER.debug("Publishing event in memory : \n{} ", events);
// return Mono.fromCallable(() -> {
// reactorQueue.offer(events);
// return Tuple.empty();
// }).publishOn(Schedulers.boundedElastic()).toFuture();
return Flux
.fromIterable(events)
.map(t -> {
queue.tryEmitNext(t).orThrow();
return Tuple.empty();
})
.retryWhen(Retry.fixedDelay(50, Duration.ofMillis(2))
.retryWhen(Retry.fixedDelay(50, Duration.ofMillis(1))
.transientErrors(true)
.doBeforeRetry(ctx -> {
LOGGER.error("Error publishing events in memory queue for topic %s retrying for the %s time".formatted(topic, ctx.totalRetries() + 1), ctx.failure());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@

import com.fasterxml.jackson.databind.JsonNode;
import fr.maif.concurrent.CompletionStages;
import fr.maif.eventsourcing.*;
import fr.maif.eventsourcing.Event;
import fr.maif.eventsourcing.EventEnvelope;
import fr.maif.eventsourcing.Type;
import fr.maif.eventsourcing.format.JacksonEventFormat;
import fr.maif.eventsourcing.format.JacksonSimpleFormat;
import fr.maif.json.EventEnvelopeJson;
import fr.maif.json.Json;
import fr.maif.kafka.JsonDeserializer;
import fr.maif.kafka.JsonSerializer;
import fr.maif.reactor.KafkaContainerTest;
import io.vavr.*;
import io.vavr.API;
import io.vavr.collection.List;
import io.vavr.collection.Stream;
import io.vavr.control.Either;
import io.vavr.control.Option;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
Expand All @@ -25,7 +25,6 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.reactivestreams.Publisher;
import org.testcontainers.junit.jupiter.Testcontainers;
import reactor.core.publisher.Flux;
import reactor.kafka.receiver.KafkaReceiver;
Expand All @@ -36,10 +35,11 @@
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.Map;
import java.util.Objects;
import java.util.StringJoiner;
import java.util.UUID;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
Expand Down Expand Up @@ -75,17 +75,11 @@ public void eventConsumption() throws IOException, InterruptedException {
String topic = createTopic("eventConsumption", 5, 1);

ReactorKafkaEventPublisher<TestEvent, Void, Void> publisher = createPublisher(topic);
EventStore<Tuple0, TestEvent, Void, Void> eventStore = mock(EventStore.class);
InMemoryEventStore<TestEvent, Void, Void> eventStore = spy(new InMemoryEventStore<>());

when(eventStore.openTransaction()).thenReturn(CompletionStages.successful(Tuple.empty()));
when(eventStore.commitOrRollback(any(), any())).thenReturn(CompletionStages.empty());
when(eventStore.loadEventsUnpublished(any(), any())).thenReturn(emptyTxStream());
when(eventStore.markAsPublished(Mockito.<List<EventEnvelope<TestEvent, Void, Void>>>any())).then(i -> CompletionStages.successful(i.getArgument(0)));
when(eventStore.markAsPublished(any(), Mockito.<List<EventEnvelope<TestEvent, Void, Void>>>any())).then(i -> CompletionStages.successful(i.getArgument(0)));

EventEnvelope<TestEvent, Void, Void> envelope1 = eventEnvelope("value 1");
EventEnvelope<TestEvent, Void, Void> envelope2 = eventEnvelope("value 2");
EventEnvelope<TestEvent, Void, Void> envelope3 = eventEnvelope("value 3");
EventEnvelope<TestEvent, Void, Void> envelope1 = eventEnvelopeUnpublished("value 1");
EventEnvelope<TestEvent, Void, Void> envelope2 = eventEnvelopeUnpublished("value 2");
EventEnvelope<TestEvent, Void, Void> envelope3 = eventEnvelopeUnpublished("value 3");

publisher.start(eventStore, NO_STRATEGY);

Expand All @@ -98,14 +92,15 @@ public void eventConsumption() throws IOException, InterruptedException {
.receive()
.map(ConsumerRecord::value)
.map(KafkaEventPublisherTest::deserialize)
.take(3)
.bufferTimeout(50, Duration.ofSeconds(4))
.map(e -> {
println(e);
return e;
})
.timeout(Duration.of(60, ChronoUnit.SECONDS))
.take(1)
.timeout(Duration.of(30, ChronoUnit.SECONDS))
.collectList()
.map(List::ofAll)
.map(l -> List.ofAll(l).flatMap(identity()))
.toFuture();

publisher.publish(List.of(
Expand All @@ -124,31 +119,18 @@ public void eventConsumption() throws IOException, InterruptedException {
publisher.close();
}

private <T> Publisher<T> emptyTxStream() {
return Flux.<T>empty();
}

private <T> Publisher<T> txStream(T... values) {
return Flux.<T>fromIterable(List.of(values));
}


@Test
@SuppressWarnings("unchecked")
public void eventConsumptionWithEventFromDb() throws IOException, InterruptedException {

String topic = createTopic("eventConsumptionWithEventFromDb", 5, 1);

ReactorKafkaEventPublisher<TestEvent, Void, Void> publisher = createPublisher(topic);
EventStore<Tuple0, TestEvent, Void, Void> eventStore = mock(EventStore.class);
when(eventStore.openTransaction()).thenReturn(CompletionStages.successful(Tuple.empty()));
when(eventStore.commitOrRollback(any(), any())).thenReturn(CompletionStages.empty());
when(eventStore.loadEventsUnpublished(any(), any())).thenReturn(txStream(
eventEnvelope("value 1"),
eventEnvelope("value 2"),
eventEnvelope("value 3")
InMemoryEventStore<TestEvent, Void, Void> eventStore = spy(new InMemoryEventStore<>(
eventEnvelopeUnpublished("value 1"),
eventEnvelopeUnpublished("value 2"),
eventEnvelopeUnpublished("value 3")
));
when(eventStore.markAsPublished(eq(Tuple.empty()), Mockito.<List<EventEnvelope<TestEvent, Void, Void>>>any())).then(i -> CompletionStages.successful(i.getArgument(1)));
when(eventStore.markAsPublished(Mockito.<List<EventEnvelope<TestEvent, Void, Void>>>any())).then(i -> CompletionStages.successful(i.getArgument(0)));

publisher.start(eventStore, NO_STRATEGY);

Expand All @@ -161,16 +143,17 @@ public void eventConsumptionWithEventFromDb() throws IOException, InterruptedExc
)
.receive()
.map(ConsumerRecord::value)
.take(6)
.timeout(Duration.of(20, ChronoUnit.SECONDS))
.bufferTimeout(50, Duration.ofSeconds(4))
.take(1)
.timeout(Duration.of(30, ChronoUnit.SECONDS))
.collectList()
.map(List::ofAll)
.map(l -> List.ofAll(l).flatMap(identity()))
.toFuture();

publisher.publish(List.of(
eventEnvelope("value 4"),
eventEnvelope("value 5"),
eventEnvelope("value 6")
eventEnvelopeUnpublished("value 4"),
eventEnvelopeUnpublished("value 5"),
eventEnvelopeUnpublished("value 6")
)).toCompletableFuture().join();

List<String> events = results.toCompletableFuture().join();
Expand All @@ -183,105 +166,6 @@ public void eventConsumptionWithEventFromDb() throws IOException, InterruptedExc
publisher.close();
}

//
// @Test
// @SuppressWarnings("unchecked")
// public void testRestart() throws IOException, InterruptedException {
//
// AtomicInteger failed = new AtomicInteger(0);
// AtomicInteger streamCount = new AtomicInteger(0);
// String topic = createTopic("testRestart", 5, 1);
// ReactorKafkaEventPublisher<TestEvent, Void, Void> publisher = createPublisher(topic);
//
// Supplier<Flux<EventEnvelope<TestEvent, Void, Void>>> eventsFlux = () -> KafkaReceiver
// .create(receiverDefault()
// .consumerProperty(ConsumerConfig.GROUP_ID_CONFIG, "testRestart")
// .consumerProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
// .consumerProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
// .subscription(List.of(topic).toJavaList()))
// .receive()
// .map(ConsumerRecord::value)
// .map(KafkaEventPublisherTest::deserialize);
//
// EventEnvelope<TestEvent, Void, Void> envelope1 = eventEnvelope("value 1");
// EventEnvelope<TestEvent, Void, Void> envelope2 = eventEnvelope("value 2");
// EventEnvelope<TestEvent, Void, Void> envelope3 = eventEnvelope("value 3");
// EventEnvelope<TestEvent, Void, Void> envelope4 = eventEnvelope("value 4");
// EventEnvelope<TestEvent, Void, Void> envelope5 = eventEnvelope("value 5");
// EventEnvelope<TestEvent, Void, Void> envelope6 = eventEnvelope("value 6");
// EventEnvelope<TestEvent, Void, Void> envelope7 = eventEnvelope("value 7");
// EventEnvelope<TestEvent, Void, Void> envelope8 = eventEnvelope("value 8");
// EventEnvelope<TestEvent, Void, Void> envelope9 = eventEnvelope("value 9");
// EventStore<Tuple0, TestEvent, Void, Void> eventStore = mock(EventStore.class);
//
// when(eventStore.loadEventsUnpublished(any(), any()))
// .thenReturn(txStream(envelope1, envelope2, envelope3))
// .thenReturn(txStream(envelope1, envelope2, envelope3))
// .thenReturn(emptyTxStream());
//
// when(eventStore.markAsPublished(any(), Mockito.<List<EventEnvelope<TestEvent, Void, Void>>>any()))
// .then(i -> {
// if (failed.incrementAndGet() > 1) {
// return CompletionStages.successful(i.getArgument(1));
// } else {
// return CompletionStages.failed(new RuntimeException("Oups "+failed.get()));
// }
// });
// when(eventStore.markAsPublished(Mockito.<List<EventEnvelope<TestEvent, Void, Void>>>any()))
// .then(i -> {
// int count = streamCount.incrementAndGet();
// Object argument = i.getArgument(0);
// System.out.println("Count "+count+" - "+argument);
// if (count == 1) {
// return CompletionStages.failed(new RuntimeException("Oups stream "+count));
// } else {
// return CompletionStages.successful(argument);
// }
// });
//
// publisher.start(eventStore, SKIP);
//
// Thread.sleep(200);
//
//
// CompletionStage<List<EventEnvelope<TestEvent, Void, Void>>> results = eventsFlux.get()
// .bufferTimeout(10, Duration.ofSeconds(10))
// .take(1)
// .timeout(Duration.of(30, ChronoUnit.SECONDS))
// .collectList()
// .map(l -> List.ofAll(l).flatMap(identity()))
// .toFuture();
//
//
// publisher.publish(List(envelope4, envelope5, envelope6));
//
// List<EventEnvelope<TestEvent, Void, Void>> events = results.toCompletableFuture().join();
//
// println(events.mkString("\n"));
//
// assertThat(events).containsExactly(envelope1, envelope2, envelope3, envelope1, envelope2, envelope3, envelope4, envelope5, envelope6);
//
// verify(eventStore, times(3)).openTransaction();
// verify(eventStore, times(2)).markAsPublished(any(), Mockito.<List<EventEnvelope<TestEvent, Void, Void>>>any());
// verify(eventStore, times(1)).markAsPublished(Mockito.<List<EventEnvelope<TestEvent, Void, Void>>>any());
//
//
// publisher.publish(List(envelope7, envelope8, envelope9));
// List<EventEnvelope<TestEvent, Void, Void>> resultsAfterCrash = eventsFlux.get()
// .bufferTimeout(12, Duration.ofSeconds(10))
// .take(1)
// .timeout(Duration.of(30, ChronoUnit.SECONDS))
// .collectList()
// .map(l -> List.ofAll(l).flatMap(identity()))
// .block();
// println(resultsAfterCrash.mkString("\n"));
//
// assertThat(resultsAfterCrash).containsExactly(envelope1, envelope2, envelope3, envelope1, envelope2, envelope3, envelope4, envelope5, envelope6, envelope7, envelope8, envelope9);
//
//
// publisher.close();
// }

@Test
@SuppressWarnings("unchecked")
public void testRestartWithMock() throws IOException, InterruptedException {
Expand Down Expand Up @@ -408,6 +292,7 @@ public void testRestartWithMock() throws IOException, InterruptedException {
publisher.close();
}

@SafeVarargs
private EventEnvelope<TestEvent, Void, Void>[] published(EventEnvelope<TestEvent, Void, Void>... envelopes) {
return List.of(envelopes).map(e -> e.copy().withPublished(true).build()).toJavaArray(EventEnvelope[]::new);
}
Expand All @@ -421,17 +306,6 @@ private static EventEnvelope<TestEvent, Void, Void> deserialize(String event) {

static AtomicLong sequence = new AtomicLong();

private EventEnvelope<TestEvent, Void, Void> eventEnvelope(String value) {
long sequenceNum = sequence.incrementAndGet();
String entityId = "entityId";
return EventEnvelope.<TestEvent, Void, Void>builder()
.withEmissionDate(LocalDateTime.now())
.withId(UUID.randomUUID())
.withEntityId(entityId)
.withSequenceNum(sequenceNum)
.withEvent(new TestEvent(value, entityId))
.build();
}
private EventEnvelope<TestEvent, Void, Void> eventEnvelopeUnpublished(String value) {
long sequenceNum = sequence.incrementAndGet();
String entityId = "entityId";
Expand Down

0 comments on commit 73976b9

Please sign in to comment.