Skip to content

Commit

Permalink
test: tck setup
Browse files Browse the repository at this point in the history
  • Loading branch information
ptitFicus authored and Benjamin CAVY committed May 11, 2021
1 parent 5196131 commit 251798b
Show file tree
Hide file tree
Showing 25 changed files with 980 additions and 18 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
run: sbt publishLocal
- name: Run test
id: test
run: JAVA_OPTS="--enable-preview" sbt test
run: JAVA_OPTS="--enable-preview" sbt "project thoth-core" test && sbt "project thoth-jooq" test && sbt "project thoth-jooq-async" test && sbt "project thoth-kafka-goodies" test && sbt "project thoth-tck" test
- name: Publish Unit Test Results
uses: EnricoMi/publish-unit-test-result-action@v1
if: always()
Expand Down
1 change: 0 additions & 1 deletion .java-version

This file was deleted.

16 changes: 14 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ lazy val root = (project in file("."))
`demo-postgres-kafka`,
`demo-in-memory`,
`sample`,
`thoth-documentation`
`thoth-documentation`,
`thoth-tck`
)
.enablePlugins(GitVersioning, GitBranchPrompt)
.settings(
Expand All @@ -48,6 +49,16 @@ lazy val `demo-in-memory` = (project in file("./demo/demo-in-memory"))
)

lazy val `thoth-documentation` = project
.settings(
skip in publish := true
)

lazy val `thoth-tck` = project
.dependsOn(`thoth-core`)
.enablePlugins(TestNGPlugin)
.settings(
skip in publish := true
)

lazy val `demo-postgres-kafka-reactive` =
(project in file("./demo/demo-postgres-kafka-reactive"))
Expand Down Expand Up @@ -94,7 +105,8 @@ lazy val `thoth-core` = project
)

lazy val `thoth-jooq` = project
.dependsOn(`thoth-core`)
.dependsOn(`thoth-core`, `thoth-tck`)
.enablePlugins(TestNGPlugin)
.settings(
sonatypeRepository := "https://s01.oss.sonatype.org/service/local",
sonatypeCredentialHost := "s01.oss.sonatype.org",
Expand Down
4 changes: 3 additions & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,6 @@ addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "3.9.7")

addSbtPlugin("com.geirsson" % "sbt-ci-release" % "1.5.6")

addSbtPlugin("com.lightbend.paradox" % "sbt-paradox" % "0.6.9") // Apache 2.0
addSbtPlugin("com.lightbend.paradox" % "sbt-paradox" % "0.6.9") // Apache 2.0

addSbtPlugin("de.johoop" % "sbt-testng-plugin" % "3.1.1")
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package fr.maif.eventsourcing;

import akka.Done;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.Materializer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
import akka.kafka.ProducerMessage;
import akka.kafka.ProducerSettings;
import akka.kafka.javadsl.Producer;
import akka.stream.KillSwitches;
import akka.stream.Materializer;
import akka.stream.OverflowStrategy;
import akka.stream.RestartSettings;
import akka.stream.UniqueKillSwitch;
import akka.stream.javadsl.*;
import fr.maif.eventsourcing.EventStore.ConcurrentReplayStrategy;
import io.vavr.Tuple;
import io.vavr.Tuple0;
import fr.maif.akka.AkkaExecutionContext;
import fr.maif.eventsourcing.Event;
import fr.maif.eventsourcing.EventEnvelope;
import fr.maif.eventsourcing.EventPublisher;
Expand All @@ -29,9 +31,8 @@
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Objects;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CompletableFuture;

import static fr.maif.eventsourcing.EventStore.ConcurrentReplayStrategy.NO_STRATEGY;
import static fr.maif.eventsourcing.EventStore.ConcurrentReplayStrategy.WAIT;

public class KafkaEventPublisher<E extends Event, Meta, Context> implements EventPublisher<E, Meta, Context>, Closeable {
Expand All @@ -47,6 +48,7 @@ public class KafkaEventPublisher<E extends Event, Meta, Context> implements Even
private final Duration restartInterval;
private final Duration maxRestartInterval;
private final Flow<ProducerMessage.Results<String, EventEnvelope<E, Meta, Context>, EventEnvelope<E, Meta, Context>>, List<ProducerMessage.Results<String, EventEnvelope<E, Meta, Context>, EventEnvelope<E, Meta, Context>>>, NotUsed> groupFlow = Flow.<ProducerMessage.Results<String, EventEnvelope<E, Meta, Context>, EventEnvelope<E, Meta, Context>>>create().grouped(1000).map(List::ofAll);
private UniqueKillSwitch killSwitch;

public KafkaEventPublisher(ActorSystem system, ProducerSettings<String, EventEnvelope<E, Meta, Context>> producerSettings, String topic) {
this(system, producerSettings, topic, null);
Expand Down Expand Up @@ -79,8 +81,7 @@ public KafkaEventPublisher(ActorSystem system, ProducerSettings<String, EventEnv
}

public <TxCtx> void start(EventStore<TxCtx, E, Meta, Context> eventStore, ConcurrentReplayStrategy concurrentReplayStrategy) {

RestartSource
killSwitch = RestartSource
.onFailuresWithBackoff(
restartInterval,
maxRestartInterval,
Expand Down Expand Up @@ -128,8 +129,9 @@ public <TxCtx> void start(EventStore<TxCtx, E, Meta, Context> eventStore, Concur
});
}
)
.viaMat(KillSwitches.single(), Keep.right())
.toMat(Sink.ignore(), Keep.both())
.run(materializer);
.run(materializer).first();
}


Expand Down Expand Up @@ -163,6 +165,9 @@ public Future<Tuple0> publish(List<EventEnvelope<E, Meta, Context>> events) {

@Override
public void close() throws IOException {
if(Objects.nonNull(killSwitch)) {
this.killSwitch.shutdown();
}
this.kafkaProducer.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ public void eventConsumptionWithEventFromDb() throws IOException {
assertThat(events).hasSize(6);

verify(eventStore, times(1)).markAsPublished(any(), Mockito.<List<EventEnvelope<TestEvent, Void, Void>>>any());
verify(eventStore, times(1)).markAsPublished(Mockito.<List<EventEnvelope<TestEvent, Void, Void>>>any());
verify(eventStore, atLeastOnce()).markAsPublished(Mockito.<List<EventEnvelope<TestEvent, Void, Void>>>any());

publisher.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,8 @@
import org.jooq.Condition;
import org.jooq.Converter;
import org.jooq.Field;
import org.jooq.JSON;
import org.jooq.JSONB;
import org.jooq.Record15;
import org.jooq.SelectForUpdateWaitStep;
import org.jooq.SelectSeekStep1;
import org.jooq.impl.SQLDataType;
import org.slf4j.LoggerFactory;
Expand All @@ -40,7 +38,6 @@
import java.util.Objects;
import java.util.UUID;

import static fr.maif.eventsourcing.EventStore.ConcurrentReplayStrategy.SKIP;
import static java.util.function.Function.identity;
import static org.jooq.impl.DSL.field;
import static org.jooq.impl.DSL.table;
Expand Down
9 changes: 8 additions & 1 deletion thoth-jooq/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,16 @@ libraryDependencies ++= Seq(
"org.junit.jupiter" % "junit-jupiter-engine" % "5.4.2" % Test,
"org.junit.vintage" % "junit-vintage-engine" % "5.4.2" % Test,
"net.aichler" % "jupiter-interface" % "0.9.1" % Test,
"org.mockito" % "mockito-core" % "2.22.0" % Test
"org.mockito" % "mockito-core" % "2.22.0" % Test,
"org.testng" % "testng" % "6.3" % Test,
"org.testcontainers" % "postgresql" % "1.15.0" % Test,
"org.testcontainers" % "kafka" % "1.15.0" % Test,
"org.slf4j" % "slf4j-api" % "1.7.30" % Test,
"org.slf4j" % "slf4j-simple" % "1.7.30" % Test
)

testNGSuites := Seq(((resourceDirectory in Test).value / "testng.xml").absolutePath)

javacOptions in Compile ++= Seq("-source", "8", "-target", "8", "-Xlint:unchecked", "-Xlint:deprecation")

// Skip the javadoc for the moment
Expand Down
Loading

0 comments on commit 251798b

Please sign in to comment.