From 251798b82b1a55ce414a5dc77a2b284363e9fc48 Mon Sep 17 00:00:00 2001 From: Benjamin Cavy Date: Fri, 27 Nov 2020 09:25:09 +0100 Subject: [PATCH] test: tck setup --- .github/workflows/test.yml | 2 +- .java-version | 1 - build.sbt | 16 +- project/plugins.sbt | 4 +- .../fr/maif/eventsourcing/EventStore.java | 1 - .../impl/KafkaEventPublisher.java | 17 +- .../impl/KafkaEventPublisherTest.java | 2 +- .../ReactivePostgresEventStore.java | 3 - thoth-jooq/build.sbt | 9 +- .../impl/JooqKafkaTckImplementation.java | 281 ++++++++++++++++++ .../impl/PostgresEventStoreTest.java | 1 + thoth-jooq/src/test/resources/testng.xml | 11 + .../consumer/ResilientKafkaConsumerTest.java | 2 +- thoth-tck/build.sbt | 16 + .../datastore/DataStoreVerification.java | 261 ++++++++++++++++ .../datastore/DataStoreVerificationRules.java | 48 +++ .../datastore/DoTestListener.java | 21 ++ .../eventsourcing/datastore/TestCommand.java | 55 ++++ .../datastore/TestCommandHandler.java | 27 ++ .../eventsourcing/datastore/TestEvent.java | 48 +++ .../datastore/TestEventFormat.java | 28 ++ .../datastore/TestEventHandler.java | 20 ++ .../eventsourcing/datastore/TestState.java | 14 + .../datastore/InMemoryDataStoreTest.java | 99 ++++++ thoth-tck/src/test/resources/testng.xml | 11 + 25 files changed, 980 insertions(+), 18 deletions(-) delete mode 100644 .java-version create mode 100644 thoth-jooq/src/test/java/fr/maif/eventsourcing/impl/JooqKafkaTckImplementation.java create mode 100644 thoth-jooq/src/test/resources/testng.xml create mode 100644 thoth-tck/build.sbt create mode 100644 thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerification.java create mode 100644 thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerificationRules.java create mode 100644 thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DoTestListener.java create mode 100644 thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestCommand.java create mode 100644 thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestCommandHandler.java create mode 100644 thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestEvent.java create mode 100644 thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestEventFormat.java create mode 100644 thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestEventHandler.java create mode 100644 thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestState.java create mode 100644 thoth-tck/src/test/java/fr/maif/eventsourcing/datastore/InMemoryDataStoreTest.java create mode 100644 thoth-tck/src/test/resources/testng.xml diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 52f43076..01185850 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -17,7 +17,7 @@ jobs: run: sbt publishLocal - name: Run test id: test - run: JAVA_OPTS="--enable-preview" sbt test + run: JAVA_OPTS="--enable-preview" sbt "project thoth-core" test && sbt "project thoth-jooq" test && sbt "project thoth-jooq-async" test && sbt "project thoth-kafka-goodies" test && sbt "project thoth-tck" test - name: Publish Unit Test Results uses: EnricoMi/publish-unit-test-result-action@v1 if: always() diff --git a/.java-version b/.java-version deleted file mode 100644 index fe6b09a7..00000000 --- a/.java-version +++ /dev/null @@ -1 +0,0 @@ -15.0 diff --git a/build.sbt b/build.sbt index 0323fa50..a0e5f4e0 100644 --- a/build.sbt +++ b/build.sbt @@ -22,7 +22,8 @@ lazy val root = (project in file(".")) `demo-postgres-kafka`, `demo-in-memory`, `sample`, - `thoth-documentation` + `thoth-documentation`, + `thoth-tck` ) .enablePlugins(GitVersioning, GitBranchPrompt) .settings( @@ -48,6 +49,16 @@ lazy val `demo-in-memory` = (project in file("./demo/demo-in-memory")) ) lazy val `thoth-documentation` = project + .settings( + skip in publish := true + ) + +lazy val `thoth-tck` = project + .dependsOn(`thoth-core`) + .enablePlugins(TestNGPlugin) + .settings( + skip in publish := true + ) lazy val `demo-postgres-kafka-reactive` = (project in file("./demo/demo-postgres-kafka-reactive")) @@ -94,7 +105,8 @@ lazy val `thoth-core` = project ) lazy val `thoth-jooq` = project - .dependsOn(`thoth-core`) + .dependsOn(`thoth-core`, `thoth-tck`) + .enablePlugins(TestNGPlugin) .settings( sonatypeRepository := "https://s01.oss.sonatype.org/service/local", sonatypeCredentialHost := "s01.oss.sonatype.org", diff --git a/project/plugins.sbt b/project/plugins.sbt index 93342f7b..29d477ca 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -15,4 +15,6 @@ addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "3.9.7") addSbtPlugin("com.geirsson" % "sbt-ci-release" % "1.5.6") -addSbtPlugin("com.lightbend.paradox" % "sbt-paradox" % "0.6.9") // Apache 2.0 \ No newline at end of file +addSbtPlugin("com.lightbend.paradox" % "sbt-paradox" % "0.6.9") // Apache 2.0 + +addSbtPlugin("de.johoop" % "sbt-testng-plugin" % "3.1.1") \ No newline at end of file diff --git a/thoth-core/src/main/java/fr/maif/eventsourcing/EventStore.java b/thoth-core/src/main/java/fr/maif/eventsourcing/EventStore.java index 7408e275..5a9d540d 100644 --- a/thoth-core/src/main/java/fr/maif/eventsourcing/EventStore.java +++ b/thoth-core/src/main/java/fr/maif/eventsourcing/EventStore.java @@ -1,6 +1,5 @@ package fr.maif.eventsourcing; -import akka.Done; import akka.NotUsed; import akka.actor.ActorSystem; import akka.stream.Materializer; diff --git a/thoth-core/src/main/java/fr/maif/eventsourcing/impl/KafkaEventPublisher.java b/thoth-core/src/main/java/fr/maif/eventsourcing/impl/KafkaEventPublisher.java index 0fbcb1c0..c0cd5543 100644 --- a/thoth-core/src/main/java/fr/maif/eventsourcing/impl/KafkaEventPublisher.java +++ b/thoth-core/src/main/java/fr/maif/eventsourcing/impl/KafkaEventPublisher.java @@ -6,13 +6,15 @@ import akka.kafka.ProducerMessage; import akka.kafka.ProducerSettings; import akka.kafka.javadsl.Producer; +import akka.stream.KillSwitches; import akka.stream.Materializer; import akka.stream.OverflowStrategy; +import akka.stream.RestartSettings; +import akka.stream.UniqueKillSwitch; import akka.stream.javadsl.*; import fr.maif.eventsourcing.EventStore.ConcurrentReplayStrategy; import io.vavr.Tuple; import io.vavr.Tuple0; -import fr.maif.akka.AkkaExecutionContext; import fr.maif.eventsourcing.Event; import fr.maif.eventsourcing.EventEnvelope; import fr.maif.eventsourcing.EventPublisher; @@ -29,9 +31,8 @@ import java.time.Duration; import java.time.temporal.ChronoUnit; import java.util.Objects; -import java.util.concurrent.CompletionStage; +import java.util.concurrent.CompletableFuture; -import static fr.maif.eventsourcing.EventStore.ConcurrentReplayStrategy.NO_STRATEGY; import static fr.maif.eventsourcing.EventStore.ConcurrentReplayStrategy.WAIT; public class KafkaEventPublisher implements EventPublisher, Closeable { @@ -47,6 +48,7 @@ public class KafkaEventPublisher implements Even private final Duration restartInterval; private final Duration maxRestartInterval; private final Flow, EventEnvelope>, List, EventEnvelope>>, NotUsed> groupFlow = Flow., EventEnvelope>>create().grouped(1000).map(List::ofAll); + private UniqueKillSwitch killSwitch; public KafkaEventPublisher(ActorSystem system, ProducerSettings> producerSettings, String topic) { this(system, producerSettings, topic, null); @@ -79,8 +81,7 @@ public KafkaEventPublisher(ActorSystem system, ProducerSettings void start(EventStore eventStore, ConcurrentReplayStrategy concurrentReplayStrategy) { - - RestartSource + killSwitch = RestartSource .onFailuresWithBackoff( restartInterval, maxRestartInterval, @@ -128,8 +129,9 @@ public void start(EventStore eventStore, Concur }); } ) + .viaMat(KillSwitches.single(), Keep.right()) .toMat(Sink.ignore(), Keep.both()) - .run(materializer); + .run(materializer).first(); } @@ -163,6 +165,9 @@ public Future publish(List> events) { @Override public void close() throws IOException { + if(Objects.nonNull(killSwitch)) { + this.killSwitch.shutdown(); + } this.kafkaProducer.close(); } diff --git a/thoth-core/src/test/java/fr/maif/eventsourcing/impl/KafkaEventPublisherTest.java b/thoth-core/src/test/java/fr/maif/eventsourcing/impl/KafkaEventPublisherTest.java index 43fca55d..f6314a9a 100644 --- a/thoth-core/src/test/java/fr/maif/eventsourcing/impl/KafkaEventPublisherTest.java +++ b/thoth-core/src/test/java/fr/maif/eventsourcing/impl/KafkaEventPublisherTest.java @@ -196,7 +196,7 @@ public void eventConsumptionWithEventFromDb() throws IOException { assertThat(events).hasSize(6); verify(eventStore, times(1)).markAsPublished(any(), Mockito.>>any()); - verify(eventStore, times(1)).markAsPublished(Mockito.>>any()); + verify(eventStore, atLeastOnce()).markAsPublished(Mockito.>>any()); publisher.close(); } diff --git a/thoth-jooq-async/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java b/thoth-jooq-async/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java index 2ad8033b..a5366df6 100644 --- a/thoth-jooq-async/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java +++ b/thoth-jooq-async/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java @@ -26,10 +26,8 @@ import org.jooq.Condition; import org.jooq.Converter; import org.jooq.Field; -import org.jooq.JSON; import org.jooq.JSONB; import org.jooq.Record15; -import org.jooq.SelectForUpdateWaitStep; import org.jooq.SelectSeekStep1; import org.jooq.impl.SQLDataType; import org.slf4j.LoggerFactory; @@ -40,7 +38,6 @@ import java.util.Objects; import java.util.UUID; -import static fr.maif.eventsourcing.EventStore.ConcurrentReplayStrategy.SKIP; import static java.util.function.Function.identity; import static org.jooq.impl.DSL.field; import static org.jooq.impl.DSL.table; diff --git a/thoth-jooq/build.sbt b/thoth-jooq/build.sbt index 986a90e6..5ddb6259 100644 --- a/thoth-jooq/build.sbt +++ b/thoth-jooq/build.sbt @@ -22,9 +22,16 @@ libraryDependencies ++= Seq( "org.junit.jupiter" % "junit-jupiter-engine" % "5.4.2" % Test, "org.junit.vintage" % "junit-vintage-engine" % "5.4.2" % Test, "net.aichler" % "jupiter-interface" % "0.9.1" % Test, - "org.mockito" % "mockito-core" % "2.22.0" % Test + "org.mockito" % "mockito-core" % "2.22.0" % Test, + "org.testng" % "testng" % "6.3" % Test, + "org.testcontainers" % "postgresql" % "1.15.0" % Test, + "org.testcontainers" % "kafka" % "1.15.0" % Test, + "org.slf4j" % "slf4j-api" % "1.7.30" % Test, + "org.slf4j" % "slf4j-simple" % "1.7.30" % Test ) +testNGSuites := Seq(((resourceDirectory in Test).value / "testng.xml").absolutePath) + javacOptions in Compile ++= Seq("-source", "8", "-target", "8", "-Xlint:unchecked", "-Xlint:deprecation") // Skip the javadoc for the moment diff --git a/thoth-jooq/src/test/java/fr/maif/eventsourcing/impl/JooqKafkaTckImplementation.java b/thoth-jooq/src/test/java/fr/maif/eventsourcing/impl/JooqKafkaTckImplementation.java new file mode 100644 index 00000000..e9061be9 --- /dev/null +++ b/thoth-jooq/src/test/java/fr/maif/eventsourcing/impl/JooqKafkaTckImplementation.java @@ -0,0 +1,281 @@ +package fr.maif.eventsourcing.impl; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.sql.Connection; +import java.sql.SQLException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.postgresql.ds.PGSimpleDataSource; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import akka.actor.ActorSystem; +import akka.kafka.ProducerSettings; +import fr.maif.eventsourcing.EventEnvelope; +import fr.maif.eventsourcing.EventProcessor; +import fr.maif.eventsourcing.PostgresKafkaEventProcessor; +import fr.maif.eventsourcing.datastore.DataStoreVerification; +import fr.maif.eventsourcing.datastore.TestCommand; +import fr.maif.eventsourcing.datastore.TestCommandHandler; +import fr.maif.eventsourcing.datastore.TestEvent; +import fr.maif.eventsourcing.datastore.TestEventFormat; +import fr.maif.eventsourcing.datastore.TestEventHandler; +import fr.maif.eventsourcing.datastore.TestState; +import fr.maif.eventsourcing.format.JacksonEventFormat; +import fr.maif.eventsourcing.format.JacksonSimpleFormat; +import fr.maif.json.EventEnvelopeJson; +import fr.maif.kafka.JsonSerializer; +import fr.maif.kafka.KafkaSettings; +import io.vavr.Tuple0; + +public class JooqKafkaTckImplementation extends DataStoreVerification { + private PGSimpleDataSource dataSource; + private TableNames tableNames; + private TestEventFormat eventFormat; + private PostgreSQLContainer postgres; + private KafkaContainer kafka; + + private final String SCHEMA = "CREATE TABLE IF NOT EXISTS test_journal (\n" + + " id UUID primary key,\n" + + " entity_id varchar(100) not null,\n" + + " sequence_num bigint not null,\n" + + " event_type varchar(100) not null,\n" + + " version int not null,\n" + + " transaction_id varchar(100) not null,\n" + + " event jsonb not null,\n" + + " metadata jsonb,\n" + + " context jsonb,\n" + + " total_message_in_transaction int default 1,\n" + + " num_message_in_transaction int default 1,\n" + + " emission_date timestamp not null default now(),\n" + + " user_id varchar(100),\n" + + " system_id varchar(100),\n" + + " published boolean default false,\n" + + " UNIQUE (entity_id, sequence_num)\n" + + " );\n" + + " \n" + + " CREATE SEQUENCE if not exists test_sequence_num;"; + private final String TRUNCATE_QUERY = "TRUNCATE TABLE test_journal"; + + @AfterClass(alwaysRun = true) + public void tearDown() throws InterruptedException { + Thread.sleep(10000); + postgres.stop(); + kafka.stop(); + } + + @BeforeClass(alwaysRun = true) + public void initClass() { + this.tableNames = new TableNames("test_journal", "test_sequence_num"); + this.eventFormat = new TestEventFormat(); + + postgres = new PostgreSQLContainer(); + postgres.start(); + kafka = new KafkaContainer(); + kafka.start(); + } + + + @BeforeMethod(alwaysRun = true) + public void init() throws SQLException { + this.dataSource = new PGSimpleDataSource(); + dataSource.setUrl(postgres.getJdbcUrl()); + dataSource.setUser(postgres.getUsername()); + dataSource.setPassword(postgres.getPassword()); + // Override default setting, which wait indefinitely if database is down + dataSource.setLoginTimeout(5); + + dataSource.getConnection().prepareStatement(SCHEMA).execute(); + dataSource.getConnection().prepareStatement(TRUNCATE_QUERY).execute(); + } + + @Override + public EventProcessor eventProcessor(String topic) { + + final PostgresKafkaEventProcessor eventProcessor = PostgresKafkaEventProcessor + .withActorSystem(ActorSystem.create()) + .withDataSource(dataSource) + .withTables(tableNames) + .withTransactionManager(Executors.newFixedThreadPool(4)) + .withEventFormater(eventFormat) + .withNoMetaFormater() + .withNoContextFormater() + .withKafkaSettings(topic, producerSettings(settings(), new TestEventFormat())) + .withEventHandler(new TestEventHandler()) + .withDefaultAggregateStore() + .withCommandHandler(new TestCommandHandler<>()) + .withNoProjections() + .build(); + + + return eventProcessor; + } + + @Override + public String kafkaBootstrapUrl() { + return kafka.getBootstrapServers(); + } + + @Override + public void shutdownBroker() { + pauseContainer(kafka); + } + + @Override + public void restartBroker() { + unPauseContainer(kafka); + } + + @Override + public void shutdownDatabase() { + pauseContainer(postgres); + } + + @Override + public void restartDatabase() { + unPauseContainer(postgres); + } + + private void pauseContainer(GenericContainer container) { + container.getDockerClient().pauseContainerCmd(container.getContainerId()).exec(); + } + + private void unPauseContainer(GenericContainer container) { + container.getDockerClient().unpauseContainerCmd(container.getContainerId()).exec(); + } + + private KafkaSettings settings() { + return KafkaSettings.newBuilder(kafka.getBootstrapServers()).build(); + } + + private ProducerSettings> producerSettings( + KafkaSettings kafkaSettings, + JacksonEventFormat eventFormat) { + return kafkaSettings.producerSettings(actorSystem, JsonSerializer.of( + eventFormat, + JacksonSimpleFormat.empty(), + JacksonSimpleFormat.empty() + ) + ); + } + + @Override + public List> readPublishedEvents(String kafkaBootstrapUrl, String topic) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + String groupId = "reader-" + UUID.randomUUID(); + Optional maybeLastOffset = getEndOffsetIfNotReached(topic, kafkaBootstrapUrl, groupId); + if(!maybeLastOffset.isPresent()) { + return Collections.emptyList(); + } + long lastOffset = maybeLastOffset.get(); + + Properties props = new Properties(); + props.put("bootstrap.servers", kafkaBootstrapUrl); + props.put("group.id", groupId); + props.put("key.deserializer", + "org.apache.kafka.common.serialization.StringDeserializer"); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + props.put("value.deserializer", + "org.apache.kafka.common.serialization.StringDeserializer"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + KafkaConsumer consumer = new KafkaConsumer<>(props); + + consumer.subscribe(Collections.singletonList(topic)); + + boolean running = true; + List> envelopes = new ArrayList<>(); + while(running) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); + for (ConsumerRecord record : records) { + final long offset = record.offset(); + if (offset >= lastOffset) { + running = false; + } + envelopes.add(parsEnvelope(record.value())); + } + consumer.commitSync(); + } + consumer.close(); + return envelopes; + } + + private static Optional getEndOffsetIfNotReached(String topic, String kafkaServers, String groupId) { + Properties properties = new Properties(); + properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers); + properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + KafkaConsumer consumer = new KafkaConsumer<>(properties); + PartitionInfo partitionInfo = consumer.partitionsFor("foo").get(0); + TopicPartition topicPartition = new TopicPartition(topic, partitionInfo.partition()); + consumer.assign(Collections.singletonList(topicPartition)); + + long position = consumer.position(topicPartition); + consumer.seekToEnd(Collections.singletonList(topicPartition)); + final long endOffset = consumer.position(topicPartition); + + Optional result = Optional.empty(); + if(endOffset > 0 && endOffset > position) { + result = Optional.of(consumer.position(topicPartition) - 1); + } + + consumer.close(); + return result; + } + + public EventEnvelope parsEnvelope(String value) { + try { + ObjectMapper mapper = new ObjectMapper(); + ObjectNode node = (ObjectNode) mapper.readTree(value); + CompletableFuture> future = new CompletableFuture<>(); + EventEnvelopeJson.deserialize( + node, + eventFormat, + JacksonSimpleFormat.empty(), + JacksonSimpleFormat.empty(), + (event, err) -> { + future.completeExceptionally(new RuntimeException(err.toString())); + }, + future::complete + ); + return future.get(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } +} diff --git a/thoth-jooq/src/test/java/fr/maif/eventsourcing/impl/PostgresEventStoreTest.java b/thoth-jooq/src/test/java/fr/maif/eventsourcing/impl/PostgresEventStoreTest.java index cd49dfb4..3d5f221c 100644 --- a/thoth-jooq/src/test/java/fr/maif/eventsourcing/impl/PostgresEventStoreTest.java +++ b/thoth-jooq/src/test/java/fr/maif/eventsourcing/impl/PostgresEventStoreTest.java @@ -22,6 +22,7 @@ import org.jooq.SQLDialect; import org.jooq.Table; import org.jooq.impl.DSL; +import org.junit.Ignore; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; diff --git a/thoth-jooq/src/test/resources/testng.xml b/thoth-jooq/src/test/resources/testng.xml new file mode 100644 index 00000000..e6c00f3d --- /dev/null +++ b/thoth-jooq/src/test/resources/testng.xml @@ -0,0 +1,11 @@ + + + + + + + + + + + \ No newline at end of file diff --git a/thoth-kafka-goodies/src/test/java/fr/maif/kafka/consumer/ResilientKafkaConsumerTest.java b/thoth-kafka-goodies/src/test/java/fr/maif/kafka/consumer/ResilientKafkaConsumerTest.java index 748d81ea..06b4fcd4 100644 --- a/thoth-kafka-goodies/src/test/java/fr/maif/kafka/consumer/ResilientKafkaConsumerTest.java +++ b/thoth-kafka-goodies/src/test/java/fr/maif/kafka/consumer/ResilientKafkaConsumerTest.java @@ -238,8 +238,8 @@ void consumerLifecycle() { assertThat(actual).isEqualTo(" event-1 event-2 event-3"); CompletionStage stop = resilientKafkaConsumer.stop(); assertThat(resilientKafkaConsumer.status()).isIn(Status.Stopping, Status.Stopped); - assertThat(isStopping.get()).isTrue(); stop.toCompletableFuture().join(); + assertThat(isStopping.get()).isTrue(); assertThat(resilientKafkaConsumer.status()).isIn(Status.Stopped); assertThat(isStopped.get()).isTrue(); } diff --git a/thoth-tck/build.sbt b/thoth-tck/build.sbt new file mode 100644 index 00000000..8a1857ab --- /dev/null +++ b/thoth-tck/build.sbt @@ -0,0 +1,16 @@ +import Dependencies._ + +organization := "fr.maif" + +name := "thoth-tck" + +scalaVersion := "2.12.12" + +libraryDependencies ++= Seq( + "org.assertj" % "assertj-core" % "3.10.0", + "org.testng" % "testng" % "6.3", + "com.typesafe.akka" %% "akka-stream" % akkaVersion % Test, + "org.mockito" % "mockito-core" % "3.6.28" % Test +) + +testNGSuites := Seq(((resourceDirectory in Test).value / "testng.xml").absolutePath) diff --git a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerification.java b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerification.java new file mode 100644 index 00000000..473934fb --- /dev/null +++ b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerification.java @@ -0,0 +1,261 @@ +package fr.maif.eventsourcing.datastore; + +import akka.actor.ActorSystem; +import akka.stream.javadsl.Sink; +import fr.maif.eventsourcing.EventEnvelope; +import fr.maif.eventsourcing.EventProcessor; +import fr.maif.eventsourcing.EventStore; +import fr.maif.eventsourcing.ProcessingSuccess; +import fr.maif.eventsourcing.format.JacksonSimpleFormat; +import fr.maif.json.EventEnvelopeJson; +import io.vavr.Tuple0; +import io.vavr.control.Either; +import io.vavr.control.Option; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + +public abstract class DataStoreVerification implements DataStoreVerificationRules{ + public ActorSystem actorSystem = ActorSystem.create(); + public abstract EventProcessor eventProcessor(String topic); + public abstract String kafkaBootstrapUrl(); + + @Override + @Test + public void required_submitValidSingleEventCommandMustWriteEventInDataStore() { + String topic = randomKafkaTopic(); + final EventProcessor eventProcessor = eventProcessor(topic); + submitValidCommand(eventProcessor, "1"); + + List> envelopes = deduplicateOnId(readFromDataStore(eventProcessor.eventStore())); + + cleanup(eventProcessor); + assertThat(envelopes).hasSize(1); + } + + @Override + @Test + public void required_submitInvalidCommandMustNotWriteEventsIntDataStore() { + String topic = randomKafkaTopic(); + final EventProcessor eventProcessor = eventProcessor(topic); + submitInvalidCommand(eventProcessor, "1"); + + List> envelopes = deduplicateOnId(readFromDataStore(eventProcessor.eventStore())); + + cleanup(eventProcessor); + assertThat(envelopes).isEmpty(); + } + + @Override + @Test + public void required_submitMultiEventCommandMustWriteAllEventsInDataStore() { + String topic = randomKafkaTopic(); + final EventProcessor eventProcessor = eventProcessor(topic); + submitMultiEventsCommand(eventProcessor, "1"); + + List> envelopes = deduplicateOnId(readFromDataStore(eventProcessor.eventStore())); + + cleanup(eventProcessor); + + assertThat(envelopes.size()).isGreaterThan(1); + List ids = envelopes.stream().map(envelope -> envelope.id).collect(Collectors.toList()); + + assertThat(ids).doesNotHaveDuplicates(); + } + + @Override + @Test + public void required_aggregateOfSingleEventStateShouldBeCorrect() { + String topic = randomKafkaTopic(); + EventProcessor eventProcessor = eventProcessor(topic); + + submitValidCommand(eventProcessor, "1"); + Option state = readState(eventProcessor, "1"); + + cleanup(eventProcessor); + + assertThat(state.isDefined()).isTrue(); + assertThat(state.get().count).isEqualTo(1); + } + + @Override + @Test + public void required_aggregateOfDeleteEventStateShouldBeEmpty() { + String topic = randomKafkaTopic(); + EventProcessor eventProcessor = eventProcessor(topic); + + submitValidCommand(eventProcessor, "1"); + submitDeleteCommand(eventProcessor, "1"); + Option state = readState(eventProcessor, "1"); + + cleanup(eventProcessor); + + assertThat(state.isEmpty()).isTrue(); + } + + @Override + @Test + public void required_aggregateOfMultipleEventStateShouldBeCorrect() { + String topic = randomKafkaTopic(); + EventProcessor eventProcessor = eventProcessor(topic); + + submitMultiEventsCommand(eventProcessor, "1"); + Option state = readState(eventProcessor, "1"); + + cleanup(eventProcessor); + + assertThat(state.isDefined()).isTrue(); + assertThat(state.get().count).isEqualTo(2); + } + + @Override + @Test + public void required_singleEventShouldBePublished() { + String topic = randomKafkaTopic(); + EventProcessor eventProcessor = eventProcessor(topic); + + submitValidCommand(eventProcessor, "1"); + List> envelopes = deduplicateOnId(readPublishedEvents(kafkaBootstrapUrl(), topic)); + + cleanup(eventProcessor); + + assertThat(envelopes).hasSize(1); + } + + @Override + @Test + public void required_multipleEventsShouldBePublished() { + String topic = randomKafkaTopic(); + EventProcessor eventProcessor = eventProcessor(topic); + submitMultiEventsCommand(eventProcessor, "1"); + List> envelopes = deduplicateOnId(readPublishedEvents(kafkaBootstrapUrl(), topic)); + + cleanup(eventProcessor); + + assertThat(envelopes).hasSize(2); + } + + @Override + @Test + public void required_eventShouldBePublishedEventIfBrokerIsDownAtFirst() { + String topic = randomKafkaTopic(); + EventProcessor eventProcessor = eventProcessor(topic); + shutdownBroker(); + submitValidCommand(eventProcessor, "1"); + + restartBroker(); + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + List> envelopes = deduplicateOnId(readPublishedEvents(kafkaBootstrapUrl(), topic)); + + cleanup(eventProcessor); + + assertThat(envelopes).hasSize(1); + } + + @Override + @Test + public void required_commandSubmissionShouldFailIfDatabaseIsNotAvailable() { + String topic = randomKafkaTopic(); + EventProcessor eventProcessor = eventProcessor(topic); + shutdownDatabase(); + try { + Either> result = submitValidCommand(eventProcessor, "1"); + + cleanup(eventProcessor); + + assertThat(result.isLeft()).isTrue(); + } catch(Throwable ex) { + // implementation should either return an embedded error in either, either throw an exception + }finally { + restartDatabase(); + } + } + + @Override + public Either> submitValidCommand( + EventProcessor eventProcessor, + String id) { + return eventProcessor.processCommand(new TestCommand.SimpleCommand(id)).get(); + } + + @Override + public void submitInvalidCommand( + EventProcessor eventProcessor, + String id + ) { + eventProcessor.processCommand(new TestCommand.InvalidCommand(id)).get(); + + } + + @Override + public void submitMultiEventsCommand( + EventProcessor eventProcessor, + String id + ) { + eventProcessor.processCommand(new TestCommand.MultiEventCommand(id)).get(); + } + + @Override + public void submitDeleteCommand(EventProcessor eventProcessor, String id) { + eventProcessor.processCommand(new TestCommand.DeleteCommand(id)).get(); + } + + @Override + public Option readState(EventProcessor eventProcessor, String id) { + return eventProcessor.getAggregate(id).get(); + } + + @Override + public List> readFromDataStore(EventStore eventStore) { + try { + return eventStore.loadAllEvents().runWith(Sink.seq(), actorSystem).toCompletableFuture().get(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + + public String randomKafkaTopic() { + return "test-topic" + UUID.randomUUID(); + } + + private List> deduplicateOnId(List> envelopes) { + List> result = new ArrayList<>(); + for(EventEnvelope env: envelopes) { + if(result.stream().noneMatch(env2 -> env2.id.equals(env.id))) { + result.add(env); + } + } + + return result; + } +} diff --git a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerificationRules.java b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerificationRules.java new file mode 100644 index 00000000..a6ee48bc --- /dev/null +++ b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerificationRules.java @@ -0,0 +1,48 @@ +package fr.maif.eventsourcing.datastore; + +import fr.maif.eventsourcing.Event; +import fr.maif.eventsourcing.EventEnvelope; +import fr.maif.eventsourcing.EventProcessor; +import fr.maif.eventsourcing.EventStore; +import fr.maif.eventsourcing.ProcessingSuccess; +import fr.maif.eventsourcing.State; +import io.vavr.Tuple0; +import io.vavr.control.Either; +import io.vavr.control.Option; + +import java.util.List; + +public interface DataStoreVerificationRules { + Either> submitValidCommand(EventProcessor eventProcessor, String id); + void submitInvalidCommand(EventProcessor eventProcessor, String id); + void submitMultiEventsCommand(EventProcessor eventProcessor, String id); + Option readState(EventProcessor eventProcessor, String id); + void submitDeleteCommand(EventProcessor eventProcessor, String id); + List> readPublishedEvents(String kafkaBootstrapUrl, String topic); + void shutdownBroker(); + void restartBroker(); + void shutdownDatabase(); + void restartDatabase(); + + void required_submitValidSingleEventCommandMustWriteEventInDataStore(); + void required_submitInvalidCommandMustNotWriteEventsIntDataStore(); + void required_submitMultiEventCommandMustWriteAllEventsInDataStore(); + + void required_aggregateOfSingleEventStateShouldBeCorrect(); + void required_aggregateOfMultipleEventStateShouldBeCorrect(); + void required_aggregateOfDeleteEventStateShouldBeEmpty(); + + void required_singleEventShouldBePublished(); + void required_multipleEventsShouldBePublished(); + + void required_eventShouldBePublishedEventIfBrokerIsDownAtFirst(); + void required_commandSubmissionShouldFailIfDatabaseIsNotAvailable(); + + List> readFromDataStore(EventStore eventStore); + + default void cleanup( + EventProcessor eventProcessor + ) { + // Default implementation is NOOP + } +} diff --git a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DoTestListener.java b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DoTestListener.java new file mode 100644 index 00000000..1bd68a91 --- /dev/null +++ b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DoTestListener.java @@ -0,0 +1,21 @@ +package fr.maif.eventsourcing.datastore; + +import org.testng.ITestResult; +import org.testng.TestListenerAdapter; + +public class DoTestListener extends TestListenerAdapter { + @Override + public void onTestFailure(ITestResult tr) { + System.out.println(tr.getName() + " FAILED"); + } + + @Override + public void onTestSkipped(ITestResult tr) { + System.out.println(tr.getName() + " SKIPPED"); + } + + @Override + public void onTestSuccess(ITestResult tr) { + System.out.println(tr.getName() + " SUCCESSFULL"); + } +} diff --git a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestCommand.java b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestCommand.java new file mode 100644 index 00000000..bfaa3a2f --- /dev/null +++ b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestCommand.java @@ -0,0 +1,55 @@ +package fr.maif.eventsourcing.datastore; + +import fr.maif.eventsourcing.Command; +import io.vavr.API; +import io.vavr.Lazy; +import io.vavr.Tuple0; + +public abstract class TestCommand implements Command { + public final String id; + + public static API.Match.Pattern0 $SimpleCommand() { + return API.Match.Pattern0.of(SimpleCommand.class); + } + + public static API.Match.Pattern0 $MultiEventCommand() { + return API.Match.Pattern0.of(MultiEventCommand.class); + } + + public static API.Match.Pattern0 $DeleteCommand() { + return API.Match.Pattern0.of(DeleteCommand.class); + } + + public TestCommand(String id) { + this.id = id; + } + + public static class SimpleCommand extends TestCommand { + public SimpleCommand(String id) { + super(id); + } + } + + public static class MultiEventCommand extends TestCommand { + public MultiEventCommand(String id) { + super(id); + } + } + + public static class InvalidCommand extends TestCommand { + public InvalidCommand(String id) { + super(id); + } + } + + public static class DeleteCommand extends TestCommand { + public DeleteCommand(String id) { + super(id); + } + } + + @Override + public Lazy entityId() { + return Lazy.of(() -> id); + } +} diff --git a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestCommandHandler.java b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestCommandHandler.java new file mode 100644 index 00000000..810ac3fc --- /dev/null +++ b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestCommandHandler.java @@ -0,0 +1,27 @@ +package fr.maif.eventsourcing.datastore; + +import fr.maif.eventsourcing.CommandHandler; +import fr.maif.eventsourcing.Events; +import io.vavr.Tuple0; +import io.vavr.concurrent.Future; +import io.vavr.control.Either; +import io.vavr.control.Option; + +import java.util.function.Function; + +import static io.vavr.API.Case; +import static io.vavr.API.Match; + +public class TestCommandHandler implements CommandHandler { + @Override + public Future>> handleCommand( + TxCtx useless, + Option previousState, + TestCommand command) { + return Future.of(() -> Match(command).option( + Case(TestCommand.$SimpleCommand(), cmd -> events(new TestEvent.SimpleEvent(cmd.id))), + Case(TestCommand.$MultiEventCommand(), cmd -> events(new TestEvent.SimpleEvent(cmd.id), new TestEvent.SimpleEvent(cmd.id))), + Case(TestCommand.$DeleteCommand(), cmd -> events(new TestEvent.DeleteEvent(cmd.id))) + ).toEither(() -> "Unknown command").flatMap(Function.identity())); + } +} diff --git a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestEvent.java b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestEvent.java new file mode 100644 index 00000000..a872c293 --- /dev/null +++ b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestEvent.java @@ -0,0 +1,48 @@ +package fr.maif.eventsourcing.datastore; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import fr.maif.eventsourcing.Event; +import fr.maif.eventsourcing.Type; + +public abstract class TestEvent implements Event { + public final String id; + + public static Type SimpleEventV1 = Type.create(SimpleEvent.class, 1L); + public static Type DeleteEventV1 = Type.create(DeleteEvent.class, 1L); + + @Override + public String entityId() { + return id; + } + + public TestEvent(String id) { + this.id = id; + } + + public static class SimpleEvent extends TestEvent { + + @JsonCreator + public SimpleEvent(@JsonProperty("id") String id) { + super(id); + } + + @Override + public Type type() { + return SimpleEventV1; + } + } + + public static class DeleteEvent extends TestEvent { + + @JsonCreator + public DeleteEvent(@JsonProperty("id") String id) { + super(id); + } + + @Override + public Type type() { + return DeleteEventV1; + } + } +} diff --git a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestEventFormat.java b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestEventFormat.java new file mode 100644 index 00000000..8182a191 --- /dev/null +++ b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestEventFormat.java @@ -0,0 +1,28 @@ +package fr.maif.eventsourcing.datastore; + +import com.fasterxml.jackson.databind.JsonNode; +import fr.maif.eventsourcing.format.JacksonEventFormat; +import fr.maif.json.Json; +import fr.maif.json.JsonWrite; +import io.vavr.API; +import io.vavr.Tuple; +import io.vavr.control.Either; + +import static io.vavr.API.Case; + +public class TestEventFormat implements JacksonEventFormat { + @Override + public Either read(String type, Long version, JsonNode json) { + return API.Match(Tuple.of(type, version)).option( + Case(TestEvent.SimpleEventV1.pattern2(), () -> Json.fromJson(json, TestEvent.SimpleEvent.class)), + Case(TestEvent.DeleteEventV1.pattern2(), () -> Json.fromJson(json, TestEvent.DeleteEvent.class)) + ) + .toEither(() -> "Unknown event type " + type + "(v" + version + ")") + .flatMap(jsResult -> jsResult.toEither().mapLeft(errs -> errs.mkString(","))); + } + + @Override + public JsonNode write(TestEvent json) { + return Json.toJson(json, JsonWrite.auto()); + } +} diff --git a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestEventHandler.java b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestEventHandler.java new file mode 100644 index 00000000..325bbc1d --- /dev/null +++ b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestEventHandler.java @@ -0,0 +1,20 @@ +package fr.maif.eventsourcing.datastore; + +import fr.maif.eventsourcing.EventHandler; +import io.vavr.control.Option; + +import static io.vavr.API.Case; +import static io.vavr.API.Match; + +public class TestEventHandler implements EventHandler { + @Override + public Option applyEvent(Option previousState, TestEvent event) { + return Match(event).of( + Case(TestEvent.SimpleEventV1.pattern(), evt -> { + Integer previousCount = previousState.map(p -> p.count).getOrElse(0); + return Option.some(new TestState(event.id, previousCount + 1)); + }), + Case(TestEvent.DeleteEventV1.pattern(), evt -> Option.none()) + ); + } +} diff --git a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestState.java b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestState.java new file mode 100644 index 00000000..52204104 --- /dev/null +++ b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestState.java @@ -0,0 +1,14 @@ +package fr.maif.eventsourcing.datastore; + +import fr.maif.eventsourcing.AbstractState; + +public class TestState extends AbstractState { + public final String id; + public final int count; + + + public TestState(String id, int count) { + this.id = id; + this.count = count; + } +} diff --git a/thoth-tck/src/test/java/fr/maif/eventsourcing/datastore/InMemoryDataStoreTest.java b/thoth-tck/src/test/java/fr/maif/eventsourcing/datastore/InMemoryDataStoreTest.java new file mode 100644 index 00000000..d648ae70 --- /dev/null +++ b/thoth-tck/src/test/java/fr/maif/eventsourcing/datastore/InMemoryDataStoreTest.java @@ -0,0 +1,99 @@ +package fr.maif.eventsourcing.datastore; + +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.function.Function; + +import org.mockito.Mockito; +import org.testng.annotations.BeforeMethod; + +import akka.actor.ActorSystem; +import akka.stream.javadsl.Sink; +import fr.maif.eventsourcing.EventEnvelope; +import fr.maif.eventsourcing.EventProcessor; +import fr.maif.eventsourcing.EventStore; +import fr.maif.eventsourcing.TransactionManager; +import fr.maif.eventsourcing.impl.InMemoryEventStore; +import io.vavr.Tuple; +import io.vavr.Tuple0; +import io.vavr.concurrent.Future; + +public class InMemoryDataStoreTest extends DataStoreVerification { + public InMemoryEventStore eventStore; + public EventProcessor eventProcessor; + + @BeforeMethod(alwaysRun = true) + public void init() { + this.eventStore = Mockito.spy(InMemoryEventStore.create(actorSystem)); + this.eventProcessor = new EventProcessor<>( + actorSystem, + eventStore, + noOpTransactionManager(), + new TestCommandHandler(), + new TestEventHandler(), + io.vavr.collection.List.empty() + ); + } + + @Override + public List> readPublishedEvents(String kafkaBootStrapUrl, String topic) { + try { + return this.eventStore.loadAllEvents().runWith(Sink.seq(), actorSystem).toCompletableFuture().get(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + + @Override + public void required_commandSubmissionShouldFailIfDatabaseIsNotAvailable() { + // Not implemented for in memory + } + + + @Override + public void required_eventShouldBePublishedEventIfBrokerIsDownAtFirst() { + // Not implemented for in memory + } + + @Override + public void shutdownBroker() { + throw new RuntimeException("Not implemented for in memory"); + } + + @Override + public void restartBroker() { + Mockito.reset(eventStore); + throw new RuntimeException("Not implemented for in memory"); + } + + @Override + public void shutdownDatabase() { + throw new RuntimeException("Not implemented for in memory"); + } + + @Override + public void restartDatabase() { + throw new RuntimeException("Not implemented for in memory"); + } + + @Override + public EventProcessor eventProcessor(String topic) { + return this.eventProcessor; + } + + @Override + public String kafkaBootstrapUrl() { + return null; + } + + private TransactionManager noOpTransactionManager() { + return new TransactionManager() { + @Override + public Future withTransaction(Function> function) { + return function.apply(Tuple.empty()); + } + }; + } +} diff --git a/thoth-tck/src/test/resources/testng.xml b/thoth-tck/src/test/resources/testng.xml new file mode 100644 index 00000000..9f0d0fca --- /dev/null +++ b/thoth-tck/src/test/resources/testng.xml @@ -0,0 +1,11 @@ + + + + + + + + + + + \ No newline at end of file