diff --git a/thoth-core-reactor/build.gradle b/thoth-core-reactor/build.gradle index 4b053a98..6059008e 100644 --- a/thoth-core-reactor/build.gradle +++ b/thoth-core-reactor/build.gradle @@ -12,9 +12,6 @@ dependencies { implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310:$jacksonVersion") implementation("fr.maif:functional-json:$functionalJsonVersion") - testImplementation("com.typesafe.akka:akka-testkit_$scalaVersion:$akkaVersion") - testImplementation("com.typesafe.akka:akka-stream-testkit_$scalaVersion:$akkaVersion") - testImplementation("com.typesafe.akka:akka-stream-kafka-testkit_$scalaVersion:$alpakkaKafkaVersion") testImplementation("org.assertj:assertj-core:3.10.0") testImplementation("com.h2database:h2:1.4.197") testImplementation("org.mockito:mockito-core:2.22.0") @@ -24,7 +21,8 @@ dependencies { testImplementation("org.junit.vintage:junit-vintage-engine:5.4.2") testImplementation("net.aichler:jupiter-interface:0.9.1") testImplementation("org.scalatest:scalatest_$scalaVersion:3.0.8") - testImplementation("org.testcontainers:kafka:1.15.3") + testImplementation("org.testcontainers:kafka:1.18.0") + testImplementation "org.testcontainers:junit-jupiter:1.18.0" } test { diff --git a/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher.java b/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher.java index dba145e0..efb4500f 100644 --- a/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher.java +++ b/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/ReactorKafkaEventPublisher.java @@ -1,10 +1,10 @@ package fr.maif.reactor.eventsourcing; -import fr.maif.eventsourcing.EventStore.ConcurrentReplayStrategy; import fr.maif.eventsourcing.Event; import fr.maif.eventsourcing.EventEnvelope; import fr.maif.eventsourcing.EventPublisher; import fr.maif.eventsourcing.EventStore; +import fr.maif.eventsourcing.EventStore.ConcurrentReplayStrategy; import io.vavr.Tuple; import io.vavr.Tuple0; import io.vavr.collection.List; @@ -80,20 +80,28 @@ public void start(EventStore eventStore, Concur .concatMap(tx -> { LOGGER.info("Replaying not published in DB for {}", topic); ConcurrentReplayStrategy strategy = Objects.isNull(concurrentReplayStrategy) ? WAIT : concurrentReplayStrategy; - return Flux.from(eventStore.loadEventsUnpublished(tx, strategy)) + return Flux + .from(eventStore.loadEventsUnpublished(tx, strategy)) .transform(publishToKafka(eventStore, Option.some(tx), groupFlow)) .doOnNext(logProgressSink::tryEmitNext) - .then(Mono.fromCompletionStage(() -> { + .concatMap(any -> Mono.fromCompletionStage(() -> { LOGGER.info("Replaying events not published in DB is finished for {}", topic); return eventStore.commitOrRollback(Option.none(), tx); })) .doOnError(e -> { eventStore.commitOrRollback(Option.of(e), tx); LOGGER.error("Error replaying non published events to kafka for " + topic, e); - - }); + }) + .retryWhen(Retry.backoff(Long.MAX_VALUE, restartInterval) + .transientErrors(true) + .maxBackoff(maxRestartInterval) + .doBeforeRetry(ctx -> { + LOGGER.error("Error republishing events for topic %s retrying for the %s time".formatted(topic, ctx.totalRetries()), ctx.failure()); + }) + ) + .switchIfEmpty(Flux.just(Tuple.empty())); }) - .thenMany( + .concatMap(__ -> this.eventsSource.transform(publishToKafka( eventStore, Option.none(), @@ -116,14 +124,13 @@ public void start(EventStore eventStore, Concur private Function>, Flux>> publishToKafka(EventStore eventStore, Option tx, Function>>, Flux>>>> groupFlow) { - Function, EventEnvelope>>, Flux>>> publishToKafkaFlow = it -> - kafkaSender.send(it) - .doOnError(e -> { - LOGGER.error("Error publishing to kafka ", e); - }); return it -> it .map(this::toKafkaMessage) - .transform(publishToKafkaFlow) + .concatMap(events -> { + LOGGER.debug("Sending event {}", events); + return kafkaSender.send(Flux.just(events)) + .doOnError(e -> LOGGER.error("Error publishing to kafka ", e)); + }) .transform(groupFlow) .concatMap(m -> tx.fold( @@ -142,6 +149,7 @@ public CompletionStage publish(List> eve .concatMap(t -> Mono.defer(() -> { Sinks.EmitResult emitResult = queue.tryEmitNext(t); + LOGGER.debug("Event publisher {}, {} buffered elements ( capacity = {} ), emitResult = {}, event = {}", topic, queue.scan(Scannable.Attr.BUFFERED), queue.scan(Scannable.Attr.CAPACITY), emitResult, t); if (emitResult.isFailure()) { return Mono.error(new RuntimeException("Error publishing to queue for %s : %s".formatted(topic, emitResult))); } else { diff --git a/thoth-core-reactor/src/test/java/fr/maif/reactor/KafkaContainerTest.java b/thoth-core-reactor/src/test/java/fr/maif/reactor/KafkaContainerTest.java new file mode 100644 index 00000000..f7e05fa9 --- /dev/null +++ b/thoth-core-reactor/src/test/java/fr/maif/reactor/KafkaContainerTest.java @@ -0,0 +1,102 @@ +package fr.maif.reactor; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; +import reactor.core.publisher.Mono; +import reactor.kafka.receiver.ReceiverOptions; +import reactor.kafka.sender.KafkaSender; +import reactor.kafka.sender.SenderOptions; +import reactor.kafka.sender.SenderRecord; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +import static io.vavr.API.println; + +@Testcontainers +public interface KafkaContainerTest { + + AtomicInteger counter = new AtomicInteger(0); + + KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0")); + + static void startContainer() { + kafkaContainer.start(); + } + + default String bootstrapServers() { + return kafkaContainer.getBootstrapServers(); + } + + default Admin adminClient() { + return Admin.create(Map.of( + AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers() + )); + } + + default String createTopic() { + return createTopic("topic-"+counter.incrementAndGet(), 3, 1); + } + + default String createTopic(String name, int partitions, int replication) { + try { + CreateTopicsResult createTopicsResult = adminClient().createTopics(java.util.List.of(new NewTopic(name, partitions, (short) replication))); + createTopicsResult.all().get(10, TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new RuntimeException("Unable to create topic with name " + name, e); + } + return name; + } + + default void deleteTopics() { + try { + Set topics = adminClient().listTopics().names().get(5, TimeUnit.SECONDS); + if (!topics.isEmpty()) { + println("Deleting " + String.join(",", topics)); + adminClient().deleteTopics(topics).all().get(); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + + default ReceiverOptions receiverDefault() { + return ReceiverOptions.create(Map.of( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers() + )) + .withKeyDeserializer(new StringDeserializer()) + .withValueDeserializer(new StringDeserializer()); + } + + default SenderOptions senderDefault() { + return SenderOptions.create(Map.of( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers() + )) + .withKeySerializer(new StringSerializer()) + .withValueSerializer(new StringSerializer()); + } + + default void produceString(String topic, String event) { + KafkaSender.create(senderDefault()).send(Mono.just( + SenderRecord.create(new ProducerRecord<>( + topic, event + ), null) + )).collectList().block(); + } + +} diff --git a/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/KafkaEventPublisherTest.java b/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/KafkaEventPublisherTest.java index 96944eb0..bc272a27 100644 --- a/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/KafkaEventPublisherTest.java +++ b/thoth-core-reactor/src/test/java/fr/maif/reactor/eventsourcing/KafkaEventPublisherTest.java @@ -1,15 +1,5 @@ package fr.maif.reactor.eventsourcing; -import akka.actor.ActorSystem; -import akka.kafka.ConsumerSettings; -import akka.kafka.Subscriptions; -import akka.kafka.javadsl.Consumer; -import akka.kafka.testkit.javadsl.BaseKafkaTest; -import akka.stream.Materializer; -import akka.stream.javadsl.AsPublisher; -import akka.stream.javadsl.Sink; -import akka.stream.javadsl.Source; -import akka.testkit.javadsl.TestKit; import com.fasterxml.jackson.databind.JsonNode; import fr.maif.concurrent.CompletionStages; import fr.maif.eventsourcing.Event; @@ -22,28 +12,40 @@ import fr.maif.json.Json; import fr.maif.kafka.JsonDeserializer; import fr.maif.kafka.JsonSerializer; +import fr.maif.reactor.KafkaContainerTest; +import io.vavr.API; import io.vavr.Tuple; import io.vavr.Tuple0; import io.vavr.collection.List; import io.vavr.control.Either; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; -import org.junit.jupiter.api.*; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.mockito.Mockito; import org.reactivestreams.Publisher; +import org.testcontainers.junit.jupiter.Testcontainers; +import reactor.core.publisher.Flux; +import reactor.kafka.receiver.KafkaReceiver; +import reactor.kafka.receiver.ReceiverOptions; import reactor.kafka.sender.SenderOptions; import java.io.IOException; import java.time.Duration; import java.time.LocalDateTime; import java.time.temporal.ChronoUnit; -import java.util.*; +import java.util.Map; +import java.util.Objects; +import java.util.StringJoiner; +import java.util.UUID; import java.util.concurrent.CompletionStage; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import static fr.maif.eventsourcing.EventStore.ConcurrentReplayStrategy.NO_STRATEGY; @@ -52,46 +54,26 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.*; -@TestInstance(TestInstance.Lifecycle.PER_CLASS) -public class KafkaEventPublisherTest extends BaseKafkaTest { +@Testcontainers +public class KafkaEventPublisherTest implements KafkaContainerTest { - private static final ActorSystem sys = ActorSystem.create("KafkaEventPublisherTest"); - private static final Materializer mat = Materializer.createMaterializer(sys); - KafkaEventPublisherTest() { - super(sys, mat, "localhost:29097"); + @BeforeAll + public static void setUp() { + KafkaContainerTest.startContainer(); } @BeforeEach - void cleanUpInit() { - setUpAdminClient(); - try { - Set topics = adminClient().listTopics().names().get(5, TimeUnit.SECONDS); - if (!topics.isEmpty()) { - println("Deleting "+ String.join(",", topics)); - adminClient().deleteTopics(topics).all().get(); - } - } catch (Exception e) {} - } - @AfterEach - void cleanUpAfter() throws ExecutionException, InterruptedException { - Set topics = adminClient().listTopics().names().get(); - println("Deleting "+ String.join(",", topics)); - adminClient().deleteTopics(topics).all().get(); - cleanUpAdminClient(); - } - - @AfterAll - static void afterClass() { - TestKit.shutdownActorSystem(sys); + void cleanUpInit() { + deleteTopics(); } @Test @SuppressWarnings("unchecked") public void eventConsumption() throws IOException, InterruptedException { - String topic = createTopic(1, 5, 1); + String topic = createTopic("eventConsumption", 5, 1); ReactorKafkaEventPublisher publisher = createPublisher(topic); EventStore eventStore = mock(EventStore.class); @@ -100,6 +82,7 @@ public void eventConsumption() throws IOException, InterruptedException { when(eventStore.commitOrRollback(any(), any())).thenReturn(CompletionStages.empty()); when(eventStore.loadEventsUnpublished(any(), any())).thenReturn(emptyTxStream()); when(eventStore.markAsPublished(Mockito.>>any())).then(i -> CompletionStages.successful(i.getArgument(0))); + when(eventStore.markAsPublished(any(), Mockito.>>any())).then(i -> CompletionStages.successful(i.getArgument(0))); EventEnvelope envelope1 = eventEnvelope("value 1"); EventEnvelope envelope2 = eventEnvelope("value 2"); @@ -109,17 +92,25 @@ public void eventConsumption() throws IOException, InterruptedException { Thread.sleep(200); - CompletionStage>> results = Consumer.plainSource(consumerDefaults().withGroupId("test1"), Subscriptions.topics(topic)) + CompletionStage>> results = KafkaReceiver.create(receiverDefault() + .consumerProperty(ConsumerConfig.GROUP_ID_CONFIG, "eventConsumption") + .consumerProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + .subscription(List.of(topic).toJavaList())) + .receive() .map(ConsumerRecord::value) .map(KafkaEventPublisherTest::deserialize) + .doOnNext(elt -> System.out.println("next : "+elt)) .take(3) + .doOnNext(elt -> System.out.println("Group : " + elt)) + .doOnError(e -> e.printStackTrace()) .map(e -> { println(e); return e; }) - .idleTimeout(Duration.of(30, ChronoUnit.SECONDS)) - .runWith(Sink.seq(), mat) - .thenApply(List::ofAll); + .timeout(Duration.of(60, ChronoUnit.SECONDS)) + .collectList() + .map(List::ofAll) + .toFuture(); publisher.publish(List.of( envelope1, @@ -138,18 +129,19 @@ public void eventConsumption() throws IOException, InterruptedException { } private Publisher emptyTxStream() { - return Source.empty().runWith(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), sys); + return Flux.empty(); } private Publisher txStream(T... values) { - return Source.from(List.of(values)).runWith(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), sys); + return Flux.fromIterable(List.of(values)); } @Test @SuppressWarnings("unchecked") - public void eventConsumptionWithEventFromDb() throws IOException { - String topic = createTopic(2, 5, 1); + public void eventConsumptionWithEventFromDb() throws IOException, InterruptedException { + + String topic = createTopic("eventConsumptionWithEventFromDb", 5, 1); ReactorKafkaEventPublisher publisher = createPublisher(topic); EventStore eventStore = mock(EventStore.class); when(eventStore.openTransaction()).thenReturn(CompletionStages.successful(Tuple.empty())); @@ -159,19 +151,27 @@ public void eventConsumptionWithEventFromDb() throws IOException { eventEnvelope("value 2"), eventEnvelope("value 3") )); - when(eventStore.markAsPublished(eq(Tuple.empty()), Mockito.>>any())) - .then(i -> CompletionStages.successful(i.getArgument(1))); - when(eventStore.markAsPublished(Mockito.>>any())) - .then(i -> CompletionStages.successful(i.getArgument(0))); + when(eventStore.markAsPublished(eq(Tuple.empty()), Mockito.>>any())).then(i -> CompletionStages.successful(i.getArgument(1))); + when(eventStore.markAsPublished(Mockito.>>any())).then(i -> CompletionStages.successful(i.getArgument(0))); publisher.start(eventStore, NO_STRATEGY); - CompletionStage> results = Consumer.plainSource(consumerDefaults().withGroupId("test2"), Subscriptions.topics(topic)) + Thread.sleep(200); + + CompletionStage> results = KafkaReceiver.create(receiverDefault() + .consumerProperty(ConsumerConfig.GROUP_ID_CONFIG, "eventConsumptionWithEventFromDb") + .consumerProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + .subscription(List.of(topic).toJavaList()) + ) + .receive() .map(ConsumerRecord::value) + .doOnNext(elt -> System.out.println(elt)) + .doOnError(e -> e.printStackTrace()) .take(6) - .idleTimeout(Duration.of(5, ChronoUnit.SECONDS)) - .runWith(Sink.seq(), mat) - .thenApply(List::ofAll); + .timeout(Duration.of(20, ChronoUnit.SECONDS)) + .collectList() + .map(List::ofAll) + .toFuture(); publisher.publish(List.of( eventEnvelope("value 4"), @@ -190,13 +190,12 @@ public void eventConsumptionWithEventFromDb() throws IOException { } - @Test - @Disabled @SuppressWarnings("unchecked") - public void testRestart() throws IOException { + public void testRestart() throws IOException, InterruptedException { AtomicBoolean failed = new AtomicBoolean(false); - String topic = createTopic(3, 5, 1); + AtomicInteger streamCount = new AtomicInteger(0); + String topic = createTopic("testRestart", 5, 1); ReactorKafkaEventPublisher publisher = createPublisher(topic); EventStore eventStore = mock(EventStore.class); when(eventStore.openTransaction()).thenReturn(CompletionStages.successful(Tuple.empty())); @@ -205,35 +204,60 @@ public void testRestart() throws IOException { EventEnvelope envelope1 = eventEnvelope("value 1"); EventEnvelope envelope2 = eventEnvelope("value 2"); EventEnvelope envelope3 = eventEnvelope("value 3"); - - when(eventStore.loadEventsUnpublished(any(), any())).thenReturn(txStream(envelope1, envelope2, envelope3)); + EventEnvelope envelope4 = eventEnvelope("value 4"); + EventEnvelope envelope5 = eventEnvelope("value 5"); + EventEnvelope envelope6 = eventEnvelope("value 6"); + + when(eventStore.loadEventsUnpublished(any(), any())) + .thenReturn(txStream(envelope1, envelope2, envelope3)) + .thenReturn(txStream(envelope1, envelope2, envelope3)) + .thenReturn(emptyTxStream()); when(eventStore.markAsPublished(Mockito.>>any())).thenAnswer(in -> CompletionStages.successful(in.getArgument(0))); when(eventStore.markAsPublished(any(), Mockito.>>any())) .then(i -> { if (failed.getAndSet(true)) { return CompletionStages.successful(i.getArgument(1)); } else { - throw new RuntimeException("Oups"); + return CompletionStages.failed(new RuntimeException("Oups")); + } + }); + when(eventStore.markAsPublished(Mockito.>>any())) + .then(i -> { + if (streamCount.incrementAndGet() == 2) { + return CompletionStages.failed(new RuntimeException("Oups")); + } else { + return CompletionStages.successful(i.getArgument(0)); } }); publisher.start(eventStore, SKIP); - CompletionStage>> results = Consumer.plainSource(consumerDefaults().withGroupId("test3"), Subscriptions.topics(topic)) + Thread.sleep(200); + + CompletionStage>> results = + KafkaReceiver.create(receiverDefault() + .consumerProperty(ConsumerConfig.GROUP_ID_CONFIG, "testRestart") + .consumerProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + .subscription(List.of(topic).toJavaList())) + .receive() .map(ConsumerRecord::value) .map(KafkaEventPublisherTest::deserialize) - .take(6) - .idleTimeout(Duration.of(10, ChronoUnit.SECONDS)) - .runWith(Sink.seq(), mat) - .thenApply(List::ofAll); + .take(8) + .timeout(Duration.of(30, ChronoUnit.SECONDS)) + .collectList() + .map(List::ofAll) + .toFuture(); + + + publisher.publish(API.List(envelope4, envelope5, envelope6)); List> events = results.toCompletableFuture().join(); - assertThat(events).hasSize(6); + assertThat(events).hasSize(8); println(events.mkString("\n")); - assertThat(events).containsExactly(envelope1, envelope2, envelope3, envelope1, envelope2, envelope3); + assertThat(events).containsExactly(envelope1, envelope2, envelope3, envelope1, envelope2, envelope3, envelope4, envelope5); verify(eventStore, times(2)).markAsPublished(any(), Mockito.>>any()); @@ -244,7 +268,8 @@ public void testRestart() throws IOException { private static EventEnvelope deserialize(String event) { return EventEnvelopeJson.deserialize(event, new TestEventSerializer(), JacksonSimpleFormat.empty(), JacksonSimpleFormat.empty(), (s, o) -> { println("Error " + s + " - " + o); - }, e -> {}); + }, e -> { + }); } static AtomicLong sequence = new AtomicLong(); @@ -267,7 +292,7 @@ private ReactorKafkaEventPublisher createPublisher(String private SenderOptions> producerSettings() { return SenderOptions.>create( - Map.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()) + Map.of(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers()) ) .withKeySerializer(new StringSerializer()) .withValueSerializer(new JsonSerializer( @@ -277,20 +302,26 @@ private SenderOptions> producerSett )); } - private ConsumerSettings> consumerSettings() { - return ConsumerSettings.create( - sys, - new StringDeserializer(), - new JsonDeserializer( - new TestEventSerializer(), - JacksonSimpleFormat.empty(), - JacksonSimpleFormat.empty(), - (s, o) -> {}, - e -> {} - ) - ).withBootstrapServers(bootstrapServers()); + private ReceiverOptions> receiverOptions() { + return ReceiverOptions.>create(Map.of( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers() + )) + .withKeyDeserializer(new StringDeserializer()) + .withValueDeserializer( + new JsonDeserializer( + new TestEventSerializer(), + JacksonSimpleFormat.empty(), + JacksonSimpleFormat.empty(), + (s, o) -> { + }, + e -> { + } + ) + ); } + + public static class TestEventSerializer implements JacksonEventFormat { @Override public Either read(String type, Long version, JsonNode json) { diff --git a/thoth-core-reactor/src/test/java/fr/maif/reactor/projections/EventuallyConsistentProjectionTest.java b/thoth-core-reactor/src/test/java/fr/maif/reactor/projections/EventuallyConsistentProjectionTest.java index be00f2e4..2261a02b 100644 --- a/thoth-core-reactor/src/test/java/fr/maif/reactor/projections/EventuallyConsistentProjectionTest.java +++ b/thoth-core-reactor/src/test/java/fr/maif/reactor/projections/EventuallyConsistentProjectionTest.java @@ -1,9 +1,5 @@ package fr.maif.reactor.projections; -import akka.actor.ActorSystem; -import akka.kafka.testkit.javadsl.TestcontainersKafkaTest; -import akka.stream.Materializer; -import akka.testkit.javadsl.TestKit; import com.fasterxml.jackson.databind.JsonNode; import fr.maif.Helpers; import fr.maif.Helpers.VikingEvent; @@ -11,13 +7,12 @@ import fr.maif.eventsourcing.EventEnvelope; import fr.maif.json.Json; import fr.maif.json.JsonFormat; +import fr.maif.reactor.KafkaContainerTest; import fr.maif.reactor.projections.EventuallyConsistentProjection.Config; import io.vavr.API; import io.vavr.Tuple0; import io.vavr.control.Option; -import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInstance; import java.time.LocalDateTime; import java.util.UUID; @@ -27,15 +22,9 @@ import static io.vavr.API.println; import static org.assertj.core.api.Assertions.assertThat; -@TestInstance(TestInstance.Lifecycle.PER_CLASS) -class EventuallyConsistentProjectionTest extends TestcontainersKafkaTest { +class EventuallyConsistentProjectionTest implements KafkaContainerTest { private static JsonFormat> vikingEventJsonFormat = new Helpers.VikingEventJsonFormat(); - private static final ActorSystem system = ActorSystem.create("test"); - - public EventuallyConsistentProjectionTest() { - super(system, Materializer.createMaterializer(system)); - } @Test void consumer() throws Exception { @@ -60,9 +49,9 @@ void consumer() throws Exception { ); Thread.sleep(3000); - resultOf(produceString(topic, stringEvent(new VikingEvent.VikingCreated("1", "Lodbrock")))); - resultOf(produceString(topic, stringEvent(new VikingEvent.VikingCreated("2", "Lagerta")))); - resultOf(produceString(topic, stringEvent(new VikingEvent.VikingUpdated("1", "Lodbrok")))); + produceString(topic, stringEvent(new VikingEvent.VikingCreated("1", "Lodbrock"))); + produceString(topic, stringEvent(new VikingEvent.VikingCreated("2", "Lagerta"))); + produceString(topic, stringEvent(new VikingEvent.VikingUpdated("1", "Lodbrok"))); Thread.sleep(1000); @@ -92,11 +81,4 @@ private EventEnvelope eventEnvelope(VikingEvent eve .withEmissionDate(LocalDateTime.now()) .build(); } - - - @AfterAll - void shutdownActorSystem() { - TestKit.shutdownActorSystem(system); - } - } \ No newline at end of file