From 88ee254734e2c6d7056d1507b340e4346bb361fa Mon Sep 17 00:00:00 2001 From: thoth-github-actions Date: Thu, 8 Jul 2021 08:13:33 +0000 Subject: [PATCH 01/10] Setting version to 1.1.2 --- version.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version.sbt b/version.sbt index a9bd6a76..1990c4dd 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "1.1.2-SNAPSHOT" +version in ThisBuild := "1.1.2" From 837e56a211848d90c504e64c0bee5b2824fb6f3f Mon Sep 17 00:00:00 2001 From: thoth-github-actions Date: Thu, 8 Jul 2021 08:13:37 +0000 Subject: [PATCH 02/10] Setting version to 1.1.3-SNAPSHOT --- version.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version.sbt b/version.sbt index 1990c4dd..76dcbae2 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "1.1.2" +version in ThisBuild := "1.1.3-SNAPSHOT" From d470a33c9bd3788760dd3b0274aef1d689a32f01 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexandre=20Del=C3=A8gue?= Date: Thu, 15 Jul 2021 15:53:40 +0200 Subject: [PATCH 03/10] Try to fix, build and publish --- build.sbt | 31 +++++++++++++++++++------------ project/Publish.scala | 11 ----------- thoth-tck/build.sbt | 10 ++++------ 3 files changed, 23 insertions(+), 29 deletions(-) delete mode 100644 project/Publish.scala diff --git a/build.sbt b/build.sbt index a0e5f4e0..4cfbab5a 100644 --- a/build.sbt +++ b/build.sbt @@ -5,8 +5,11 @@ organization := "fr.maif" resolvers ++= Seq(Resolver.jcenterRepo) -scalaVersion := "2.12.13" -crossScalaVersions := List("2.13.5", "2.12.13") +val mainScalaVersion = "2.12.13" +val scalaVersions = List("2.13.5", mainScalaVersion) + +scalaVersion := mainScalaVersion +crossScalaVersions := scalaVersions usePgpKeyHex("5B6BE1966878E3AE16B85BC975B8BA741462DEA9") sonatypeRepository := "https://s01.oss.sonatype.org/service/local" @@ -57,7 +60,11 @@ lazy val `thoth-tck` = project .dependsOn(`thoth-core`) .enablePlugins(TestNGPlugin) .settings( - skip in publish := true + sonatypeRepository := "https://s01.oss.sonatype.org/service/local", + sonatypeCredentialHost := "s01.oss.sonatype.org", + scalaVersion := mainScalaVersion, + crossScalaVersions := scalaVersions, + crossPaths := true ) lazy val `demo-postgres-kafka-reactive` = @@ -71,7 +78,7 @@ lazy val `commons-events` = project .settings( sonatypeRepository := "https://s01.oss.sonatype.org/service/local", sonatypeCredentialHost := "s01.oss.sonatype.org", - scalaVersion := "2.12.13", + scalaVersion := mainScalaVersion, crossPaths := false ) @@ -79,8 +86,8 @@ lazy val `thoth-kafka-goodies` = project .settings( sonatypeRepository := "https://s01.oss.sonatype.org/service/local", sonatypeCredentialHost := "s01.oss.sonatype.org", - scalaVersion := "2.12.13", - crossScalaVersions := List("2.13.5", "2.12.13"), + scalaVersion := mainScalaVersion, + crossScalaVersions := scalaVersions, crossPaths := true ) @@ -89,8 +96,8 @@ lazy val `thoth-jooq-async` = project .settings( sonatypeRepository := "https://s01.oss.sonatype.org/service/local", sonatypeCredentialHost := "s01.oss.sonatype.org", - scalaVersion := "2.12.13", - crossScalaVersions := List("2.13.5", "2.12.13"), + scalaVersion := mainScalaVersion, + crossScalaVersions := scalaVersions, crossPaths := true ) @@ -99,8 +106,8 @@ lazy val `thoth-core` = project .settings( sonatypeRepository := "https://s01.oss.sonatype.org/service/local", sonatypeCredentialHost := "s01.oss.sonatype.org", - scalaVersion := "2.12.13", - crossScalaVersions := List("2.13.5", "2.12.13"), + scalaVersion := mainScalaVersion, + crossScalaVersions := scalaVersions, crossPaths := true ) @@ -110,8 +117,8 @@ lazy val `thoth-jooq` = project .settings( sonatypeRepository := "https://s01.oss.sonatype.org/service/local", sonatypeCredentialHost := "s01.oss.sonatype.org", - scalaVersion := "2.12.13", - crossScalaVersions := List("2.13.5", "2.12.13"), + scalaVersion := mainScalaVersion, + crossScalaVersions := scalaVersions, crossPaths := true ) diff --git a/project/Publish.scala b/project/Publish.scala deleted file mode 100644 index 3459f2a1..00000000 --- a/project/Publish.scala +++ /dev/null @@ -1,11 +0,0 @@ -import sbt._, Keys._ - -object NoPublish extends AutoPlugin { - override def requires = plugins.JvmPlugin - - override def projectSettings = Seq( - publishArtifact := false, - publish := {}, - publishLocal := {} - ) -} \ No newline at end of file diff --git a/thoth-tck/build.sbt b/thoth-tck/build.sbt index 8a1857ab..b4bc9134 100644 --- a/thoth-tck/build.sbt +++ b/thoth-tck/build.sbt @@ -4,13 +4,11 @@ organization := "fr.maif" name := "thoth-tck" -scalaVersion := "2.12.12" - libraryDependencies ++= Seq( - "org.assertj" % "assertj-core" % "3.10.0", - "org.testng" % "testng" % "6.3", - "com.typesafe.akka" %% "akka-stream" % akkaVersion % Test, - "org.mockito" % "mockito-core" % "3.6.28" % Test + "org.assertj" % "assertj-core" % "3.10.0", + "org.testng" % "testng" % "6.3", + "com.typesafe.akka" %% "akka-stream" % akkaVersion % Test, + "org.mockito" % "mockito-core" % "3.6.28" % Test ) testNGSuites := Seq(((resourceDirectory in Test).value / "testng.xml").absolutePath) From f232c6a5fd15db8d27a23db5da9d1ee4b243dfdf Mon Sep 17 00:00:00 2001 From: thoth-github-actions Date: Thu, 15 Jul 2021 14:00:00 +0000 Subject: [PATCH 04/10] Setting version to 1.1.3 --- version.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version.sbt b/version.sbt index 76dcbae2..a1014cee 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "1.1.3-SNAPSHOT" +version in ThisBuild := "1.1.3" From 844c4be3d724720edc8df9009d7ac71c2a408767 Mon Sep 17 00:00:00 2001 From: thoth-github-actions Date: Thu, 15 Jul 2021 14:00:02 +0000 Subject: [PATCH 05/10] Setting version to 1.1.4-SNAPSHOT --- version.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version.sbt b/version.sbt index a1014cee..918f99a8 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "1.1.3" +version in ThisBuild := "1.1.4-SNAPSHOT" From fc193b5d01ce61cdda74814b53fb938b0c93d7e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexandre=20Del=C3=A8gue?= Date: Thu, 15 Jul 2021 16:06:57 +0200 Subject: [PATCH 06/10] Depends on tck only for tests --- build.sbt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/build.sbt b/build.sbt index 4cfbab5a..a33e9d5b 100644 --- a/build.sbt +++ b/build.sbt @@ -112,7 +112,7 @@ lazy val `thoth-core` = project ) lazy val `thoth-jooq` = project - .dependsOn(`thoth-core`, `thoth-tck`) + .dependsOn(`thoth-core`, `thoth-tck` % "compile->test") .enablePlugins(TestNGPlugin) .settings( sonatypeRepository := "https://s01.oss.sonatype.org/service/local", @@ -148,7 +148,7 @@ releaseProcess := Seq[ReleaseStep]( pushChanges ) -lazy val githubRepo = "maif/thoth" +lazy val githubRepo = "maif/thoth" inThisBuild( List( From 48f794c334ca1cf496edb36c4d8da62c1ff29051 Mon Sep 17 00:00:00 2001 From: Gregnarok Date: Thu, 15 Jul 2021 23:16:01 +0200 Subject: [PATCH 07/10] feat: remove lombok (#36) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Grégory BERTHELOT --- .../main/java/com/example/demo/Account.java | 25 +- .../main/java/com/example/demo/BankEvent.java | 112 ++++++++- .../fr/maif/thoth/sample/state/Account.java | 51 ++++- thoth-core/build.sbt | 1 - .../EventuallyConsistentProjection.java | 93 +++++++- .../main/java/fr/maif/projections/Status.java | 32 ++- .../impl/KafkaEventPublisherTest.java | 5 +- .../EventuallyConsistentProjectionTest.java | 4 +- thoth-jooq-async/build.sbt | 1 - ...ivePostgresKafkaEventProcessorBuilder.java | 157 +++++++++++-- .../AbstractPostgresEventStoreTest.java | 8 +- .../PostgresKafkaEventProcessorBuilder.java | 160 +++++++++++-- .../impl/PostgresEventStoreTest.java | 11 +- thoth-kafka-goodies/build.sbt | 1 - .../consumer/ResilientKafkaConsumer.java | 216 +++++++++++++++--- .../consumer/ResilientKafkaConsumerTest.java | 13 +- 16 files changed, 761 insertions(+), 129 deletions(-) diff --git a/demo/demo-postgres-kafka-reactive/src/main/java/com/example/demo/Account.java b/demo/demo-postgres-kafka-reactive/src/main/java/com/example/demo/Account.java index 85a3e1a7..d8ed6d12 100644 --- a/demo/demo-postgres-kafka-reactive/src/main/java/com/example/demo/Account.java +++ b/demo/demo-postgres-kafka-reactive/src/main/java/com/example/demo/Account.java @@ -1,18 +1,35 @@ package com.example.demo; import fr.maif.eventsourcing.State; -import lombok.Getter; -import lombok.ToString; import java.math.BigDecimal; -@Getter -@ToString public class Account implements State { public String id; public BigDecimal balance; public long sequenceNum; + public String getId() { + return id; + } + + public BigDecimal getBalance() { + return balance; + } + + public long getSequenceNum() { + return sequenceNum; + } + + @Override + public String toString() { + return "Account{" + + "id='" + id + '\'' + + ", balance=" + balance + + ", sequenceNum=" + sequenceNum + + '}'; + } + @Override public Long sequenceNum() { return sequenceNum; diff --git a/demo/demo-postgres-kafka-reactive/src/main/java/com/example/demo/BankEvent.java b/demo/demo-postgres-kafka-reactive/src/main/java/com/example/demo/BankEvent.java index f3d80029..322bc389 100644 --- a/demo/demo-postgres-kafka-reactive/src/main/java/com/example/demo/BankEvent.java +++ b/demo/demo-postgres-kafka-reactive/src/main/java/com/example/demo/BankEvent.java @@ -6,8 +6,6 @@ import fr.maif.json.JsonFormat; import fr.maif.json.JsonRead; import io.vavr.API.Match.Pattern0; -import lombok.Builder; -import lombok.Value; import java.math.BigDecimal; @@ -55,13 +53,39 @@ public interface BankEvent extends Event { ) ); - @Builder - @Value class MoneyWithdrawn implements BankEvent { - public final String accountId; public final BigDecimal amount; + static class MoneyWithdrawnBuilder{ + String accountId; + BigDecimal amount; + + MoneyWithdrawnBuilder accountId(String accountId){ + this.accountId = accountId; + return this; + } + + MoneyWithdrawnBuilder amount(BigDecimal amount){ + this.amount = amount; + return this; + } + + MoneyWithdrawn build(){ + return new MoneyWithdrawn(accountId,amount); + } + + } + + public MoneyWithdrawn(String accountId, BigDecimal amount) { + this.accountId = accountId; + this.amount = amount; + } + + public static MoneyWithdrawnBuilder builder(){ + return new MoneyWithdrawnBuilder(); + } + @Override public Type type() { return MoneyWithdrawnV1; @@ -84,11 +108,30 @@ public String entityId() { ); } - @Builder - @Value class AccountOpened implements BankEvent { public final String accountId; + static class AccountOpenedBuilder{ + String accountId; + + AccountOpenedBuilder accountId(String accountId){ + this.accountId = accountId; + return this; + } + + AccountOpened build(){ + return new AccountOpened(accountId); + } + } + + public AccountOpened(String accountId) { + this.accountId = accountId; + } + + public static AccountOpenedBuilder builder(){ + return new AccountOpenedBuilder(); + } + @Override public Type type() { return AccountOpenedV1; @@ -109,12 +152,40 @@ public String entityId() { ); } - @Builder - @Value + class MoneyDeposited implements BankEvent { public final String accountId; public final BigDecimal amount; + public MoneyDeposited(String accountId, BigDecimal amount) { + this.accountId = accountId; + this.amount = amount; + } + + static class MoneyDepositedBuilder{ + String accountId; + BigDecimal amount; + + MoneyDepositedBuilder accountId(String accountId){ + this.accountId = accountId; + return this; + } + + MoneyDepositedBuilder amount(BigDecimal amount){ + this.amount = amount; + return this; + } + + MoneyDeposited build(){ + return new MoneyDeposited(accountId,amount); + } + + } + + public static MoneyDepositedBuilder builder(){ + return new MoneyDepositedBuilder(); + } + @Override public Type type() { return MoneyDepositedV1; @@ -137,11 +208,30 @@ public String entityId() { ); } - @Builder - @Value class AccountClosed implements BankEvent { public final String accountId; + static class AccountClosedBuilder{ + String accountId; + + AccountClosedBuilder accountId(String accountId){ + this.accountId = accountId; + return this; + } + + AccountClosed build(){ + return new AccountClosed(accountId); + } + } + + public AccountClosed(String accountId) { + this.accountId = accountId; + } + + public static AccountClosedBuilder builder(){ + return new AccountClosedBuilder(); + } + @Override public Type type() { return AccountClosedV1; diff --git a/sample/src/main/java/fr/maif/thoth/sample/state/Account.java b/sample/src/main/java/fr/maif/thoth/sample/state/Account.java index eb4385e9..1b6b526b 100644 --- a/sample/src/main/java/fr/maif/thoth/sample/state/Account.java +++ b/sample/src/main/java/fr/maif/thoth/sample/state/Account.java @@ -3,14 +3,61 @@ import java.math.BigDecimal; import fr.maif.eventsourcing.AbstractState; -import lombok.Builder; -@Builder(toBuilder = true) public class Account extends AbstractState { public String id; public BigDecimal balance; public long sequenceNum; + public static class AccountBuilder{ + String id; + BigDecimal balance; + long sequenceNum; + + public AccountBuilder() { + } + + public AccountBuilder(String id, BigDecimal balance, long sequenceNum) { + this.id = id; + this.balance = balance; + this.sequenceNum = sequenceNum; + } + + public AccountBuilder id(String id){ + this.id = id; + return this; + } + + public AccountBuilder balance(BigDecimal balance){ + this.balance = balance; + return this; + } + + public AccountBuilder withSequenceNum(long sequenceNum){ + this.sequenceNum = sequenceNum; + return this; + } + + public Account build(){ + return new Account(this.id,this.balance,this.sequenceNum); + } + + } + + public Account(String id, BigDecimal balance, long sequenceNum) { + this.id = id; + this.balance = balance; + this.sequenceNum = sequenceNum; + } + + public AccountBuilder toBuilder(){ + return new AccountBuilder(this.id, this.balance, this.sequenceNum); + } + + public static AccountBuilder builder(){ + return new AccountBuilder(); + } + @Override public Long sequenceNum() { return sequenceNum; diff --git a/thoth-core/build.sbt b/thoth-core/build.sbt index 6a8dbd9f..55e1137f 100644 --- a/thoth-core/build.sbt +++ b/thoth-core/build.sbt @@ -13,7 +13,6 @@ libraryDependencies ++= Seq( "com.fasterxml.jackson.datatype" % "jackson-datatype-jdk8" % jacksonVersion, "com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % jacksonVersion, "fr.maif" % "functional-json" % functionalJsonVersion, - "org.projectlombok" % "lombok" % "1.18.18", "com.typesafe.akka" %% "akka-testkit" % akkaVersion % Test, "com.typesafe.akka" %% "akka-stream-testkit" % akkaVersion % Test, "com.typesafe.akka" %% "akka-stream-kafka-testkit" % alpakkaKafkaVersion % Test, diff --git a/thoth-core/src/main/java/fr/maif/projections/EventuallyConsistentProjection.java b/thoth-core/src/main/java/fr/maif/projections/EventuallyConsistentProjection.java index c7a296f6..75a77b24 100644 --- a/thoth-core/src/main/java/fr/maif/projections/EventuallyConsistentProjection.java +++ b/thoth-core/src/main/java/fr/maif/projections/EventuallyConsistentProjection.java @@ -15,21 +15,14 @@ import fr.maif.kafka.consumer.ResilientKafkaConsumer; import io.vavr.Tuple0; import io.vavr.concurrent.Future; -import lombok.AccessLevel; -import lombok.AllArgsConstructor; -import lombok.Builder; -import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.function.Function; -@Slf4j public abstract class EventuallyConsistentProjection extends ResilientKafkaConsumer> { - @Builder(toBuilder = true) - @AllArgsConstructor(access = AccessLevel.PRIVATE) public static class Config { public final String topic; public final String groupId; @@ -40,6 +33,76 @@ public static class Config { public final Double randomFactor; public final Integer commitSize; + public static class ConfigBuilder { + String topic; + String groupId; + String bootstrapServers; + Function>, ConsumerSettings>> completeConfig; + Duration minBackoff; + Duration maxBackoff; + Double randomFactor; + Integer commitSize; + + public ConfigBuilder topic(String topic) { + this.topic = topic; + return this; + } + + public ConfigBuilder groupId(String groupId) { + this.groupId = groupId; + return this; + } + + public ConfigBuilder bootstrapServers(String bootstrapServers) { + this.bootstrapServers = bootstrapServers; + return this; + } + + public ConfigBuilder completeConfig( + Function>, ConsumerSettings>> completeConfig) { + this.completeConfig = completeConfig; + return this; + } + + public ConfigBuilder minBackoff(Duration minBackoff) { + this.minBackoff = minBackoff; + return this; + } + + public ConfigBuilder maxBackoff(Duration maxBackoff) { + this.maxBackoff = maxBackoff; + return this; + } + + public ConfigBuilder randomFactor(Double randomFactor) { + this.randomFactor = randomFactor; + return this; + } + + public ConfigBuilder commitSize(Integer commitSize) { + this.commitSize = commitSize; + return this; + } + + public Config build(){ + return new Config<>(this.topic, this.groupId, this.bootstrapServers, this.completeConfig, + this.minBackoff, this.maxBackoff, this.randomFactor, this.commitSize); + } + } + + private Config(String topic, String groupId, String bootstrapServers, + Function>, ConsumerSettings>> completeConfig, + Duration minBackoff, Duration maxBackoff, Double randomFactor, Integer commitSize) { + this.topic = topic; + this.groupId = groupId; + this.bootstrapServers = bootstrapServers; + this.completeConfig = completeConfig; + this.minBackoff = minBackoff; + this.maxBackoff = maxBackoff; + this.randomFactor = randomFactor; + this.commitSize = commitSize; + } + public static Config create(String topic, String groupId, String bootstrapServers) { return Config.builder() .bootstrapServers(bootstrapServers) @@ -47,6 +110,22 @@ public static Config create(S .topic(topic) .build(); } + + public static ConfigBuilder builder(){ + return new ConfigBuilder<>(); + } + + public ConfigBuilder toBuilder() { + return Config.builder() + .topic(this.topic) + .groupId(this.groupId) + .bootstrapServers(this.bootstrapServers) + .completeConfig(this.completeConfig) + .minBackoff(this.minBackoff) + .maxBackoff(this.maxBackoff) + .randomFactor(this.randomFactor) + .commitSize(this.commitSize); + } } public EventuallyConsistentProjection(ActorSystem actorSystem, diff --git a/thoth-core/src/main/java/fr/maif/projections/Status.java b/thoth-core/src/main/java/fr/maif/projections/Status.java index 7c0bee42..f94a53c7 100644 --- a/thoth-core/src/main/java/fr/maif/projections/Status.java +++ b/thoth-core/src/main/java/fr/maif/projections/Status.java @@ -1,7 +1,5 @@ package fr.maif.projections; -import lombok.ToString; - import static io.vavr.API.$; import static io.vavr.API.Case; import static io.vavr.API.Match; @@ -30,44 +28,64 @@ static Status fromString(String status) { String name(); - @ToString class Started implements Status { @Override public String name() { return "Started"; } + + @Override + public String toString() { + return "Started{}"; + } } - @ToString class Starting implements Status { @Override public String name() { return "Starting"; } + + @Override + public String toString() { + return "Starting{}"; + } } - @ToString class Failed implements Status { @Override public String name() { return "Failed"; } + + @Override + public String toString() { + return "Failed{}"; + } } - @ToString class Stopped implements Status { @Override public String name() { return "Stopped"; } + + @Override + public String toString() { + return "Stopped{}"; + } } - @ToString class Stopping implements Status { @Override public String name() { return "Stopping"; } + + @Override + public String toString() { + return "Stopping{}"; + } } } diff --git a/thoth-core/src/test/java/fr/maif/eventsourcing/impl/KafkaEventPublisherTest.java b/thoth-core/src/test/java/fr/maif/eventsourcing/impl/KafkaEventPublisherTest.java index f6314a9a..acaa7a9d 100644 --- a/thoth-core/src/test/java/fr/maif/eventsourcing/impl/KafkaEventPublisherTest.java +++ b/thoth-core/src/test/java/fr/maif/eventsourcing/impl/KafkaEventPublisherTest.java @@ -31,7 +31,6 @@ import io.vavr.collection.List; import io.vavr.concurrent.Future; import io.vavr.control.Either; -import lombok.SneakyThrows; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; @@ -55,6 +54,7 @@ import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -76,8 +76,7 @@ public class KafkaEventPublisherTest extends BaseKafkaTest { } @BeforeEach - @SneakyThrows - void cleanUpInit() { + void cleanUpInit() throws ExecutionException, InterruptedException, TimeoutException { setUpAdminClient(); Set topics = adminClient().listTopics().names().get(5, TimeUnit.SECONDS); if (!topics.isEmpty()) { diff --git a/thoth-core/src/test/java/fr/maif/projections/EventuallyConsistentProjectionTest.java b/thoth-core/src/test/java/fr/maif/projections/EventuallyConsistentProjectionTest.java index 3c005467..f182fb01 100644 --- a/thoth-core/src/test/java/fr/maif/projections/EventuallyConsistentProjectionTest.java +++ b/thoth-core/src/test/java/fr/maif/projections/EventuallyConsistentProjectionTest.java @@ -15,7 +15,6 @@ import io.vavr.Tuple0; import io.vavr.concurrent.Future; import io.vavr.control.Option; -import lombok.SneakyThrows; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; @@ -42,8 +41,7 @@ public EventuallyConsistentProjectionTest() { } @Test - @SneakyThrows - void consumer() { + void consumer() throws Exception { String topic = createTopic(); String groupId = "test-group-id"; diff --git a/thoth-jooq-async/build.sbt b/thoth-jooq-async/build.sbt index ce284c3d..80166bb3 100644 --- a/thoth-jooq-async/build.sbt +++ b/thoth-jooq-async/build.sbt @@ -11,7 +11,6 @@ libraryDependencies ++= Seq( "fr.maif" %% "jooq-async-api" % jooqAsyncVersion, "fr.maif" %% "jooq-async-jdbc" % jooqAsyncVersion % Test, "fr.maif" %% "jooq-async-reactive" % jooqAsyncVersion % Test, - "org.projectlombok" % "lombok" % "1.18.18", "org.assertj" % "assertj-core" % "3.10.0" % Test, "org.postgresql" % "postgresql" % "42.2.5" % Test, "org.junit.platform" % "junit-platform-launcher" % "1.4.2" % Test, diff --git a/thoth-jooq-async/src/main/java/fr/maif/eventsourcing/ReactivePostgresKafkaEventProcessorBuilder.java b/thoth-jooq-async/src/main/java/fr/maif/eventsourcing/ReactivePostgresKafkaEventProcessorBuilder.java index 7c5ee52f..a4e1b911 100644 --- a/thoth-jooq-async/src/main/java/fr/maif/eventsourcing/ReactivePostgresKafkaEventProcessorBuilder.java +++ b/thoth-jooq-async/src/main/java/fr/maif/eventsourcing/ReactivePostgresKafkaEventProcessorBuilder.java @@ -12,38 +12,49 @@ import io.vavr.Tuple0; import io.vavr.collection.List; import io.vavr.control.Option; -import lombok.AllArgsConstructor; import java.util.function.Function; public class ReactivePostgresKafkaEventProcessorBuilder { - @AllArgsConstructor public static class BuilderWithSystem { public final ActorSystem system; + public BuilderWithSystem(ActorSystem system) { + this.system = system; + } + public BuilderWithPool withPgAsyncPool(PgAsyncPool pgAsyncPool) { return new BuilderWithPool(system, pgAsyncPool); } } - @AllArgsConstructor public static class BuilderWithPool { public final ActorSystem system; public final PgAsyncPool pgAsyncPool; + public BuilderWithPool(ActorSystem system, PgAsyncPool pgAsyncPool) { + this.system = system; + this.pgAsyncPool = pgAsyncPool; + } + public BuilderWithTables withTables(TableNames tableNames) { return new BuilderWithTables(system, pgAsyncPool, tableNames); } } - @AllArgsConstructor public static class BuilderWithTables { public final ActorSystem system; public final PgAsyncPool pgAsyncPool; public final TableNames tableNames; + public BuilderWithTables(ActorSystem system, PgAsyncPool pgAsyncPool, TableNames tableNames) { + this.system = system; + this.pgAsyncPool = pgAsyncPool; + this.tableNames = tableNames; + } + public BuilderWithTx withExistingTransactionManager(TransactionManager transactionManager) { return new BuilderWithTx(system, pgAsyncPool, tableNames, transactionManager); } @@ -53,19 +64,25 @@ public BuilderWithTx withTransactionManager() { } } - @AllArgsConstructor public static class BuilderWithTx { public final ActorSystem system; public final PgAsyncPool pgAsyncPool; public final TableNames tableNames; public final TransactionManager transactionManager; + public BuilderWithTx(ActorSystem system, PgAsyncPool pgAsyncPool, TableNames tableNames, + TransactionManager transactionManager) { + this.system = system; + this.pgAsyncPool = pgAsyncPool; + this.tableNames = tableNames; + this.transactionManager = transactionManager; + } + public BuilderWithEventFormat withEventFormater(JacksonEventFormat eventFormat) { return new BuilderWithEventFormat<>(system, pgAsyncPool, tableNames, transactionManager, eventFormat); } } - @AllArgsConstructor public static class BuilderWithEventFormat { public final ActorSystem system; public final PgAsyncPool pgAsyncPool; @@ -73,6 +90,15 @@ public static class BuilderWithEventFormat { public final TransactionManager transactionManager; public final JacksonEventFormat eventFormat; + public BuilderWithEventFormat(ActorSystem system, PgAsyncPool pgAsyncPool, TableNames tableNames, + TransactionManager transactionManager, JacksonEventFormat eventFormat) { + this.system = system; + this.pgAsyncPool = pgAsyncPool; + this.tableNames = tableNames; + this.transactionManager = transactionManager; + this.eventFormat = eventFormat; + } + public BuilderWithMetaFormat withMetaFormater(JacksonSimpleFormat metaFormat) { return new BuilderWithMetaFormat(system, pgAsyncPool, tableNames, transactionManager, eventFormat, metaFormat); } @@ -82,7 +108,6 @@ public BuilderWithMetaFormat withNoMetaFormater() { } } - @AllArgsConstructor public static class BuilderWithMetaFormat { public final ActorSystem system; public final PgAsyncPool pgAsyncPool; @@ -91,6 +116,17 @@ public static class BuilderWithMetaFormat { public final JacksonEventFormat eventFormat; public final JacksonSimpleFormat metaFormat; + public BuilderWithMetaFormat(ActorSystem system, PgAsyncPool pgAsyncPool, TableNames tableNames, + TransactionManager transactionManager, JacksonEventFormat eventFormat, + JacksonSimpleFormat metaFormat) { + this.system = system; + this.pgAsyncPool = pgAsyncPool; + this.tableNames = tableNames; + this.transactionManager = transactionManager; + this.eventFormat = eventFormat; + this.metaFormat = metaFormat; + } + public BuilderWithContextFormat withContextFormater(JacksonSimpleFormat contextFormat) { return new BuilderWithContextFormat(system, pgAsyncPool, tableNames, transactionManager, eventFormat, metaFormat, contextFormat); } @@ -100,7 +136,6 @@ public BuilderWithContextFormat withNoContextFormater() { } } - @AllArgsConstructor public static class BuilderWithContextFormat { public final ActorSystem system; public final PgAsyncPool pgAsyncPool; @@ -110,6 +145,18 @@ public static class BuilderWithContextFormat { public final JacksonSimpleFormat metaFormat; public final JacksonSimpleFormat contextFormat; + public BuilderWithContextFormat(ActorSystem system, PgAsyncPool pgAsyncPool, TableNames tableNames, + TransactionManager transactionManager, JacksonEventFormat eventFormat, + JacksonSimpleFormat metaFormat, JacksonSimpleFormat contextFormat) { + this.system = system; + this.pgAsyncPool = pgAsyncPool; + this.tableNames = tableNames; + this.transactionManager = transactionManager; + this.eventFormat = eventFormat; + this.metaFormat = metaFormat; + this.contextFormat = contextFormat; + } + public BuilderWithKafkaSettings withKafkaSettings(String topic, ProducerSettings> producerSettings, Integer bufferSize) { return new BuilderWithKafkaSettings<>( system, @@ -255,7 +302,6 @@ public > BuilderWithEventHandler withEve } } - @AllArgsConstructor public static class BuilderWithEventHandler, E extends Event, Meta, Context> { public final ActorSystem system; public final PgAsyncPool pgAsyncPool; @@ -269,7 +315,25 @@ public static class BuilderWithEventHandler, E extends Event, public final ReactivePostgresEventStore eventStore; public final EventHandler eventHandler; - public BuilderWithAggregateStore withAggregateStore(Function, AggregateStore> builder) { + public BuilderWithEventHandler(ActorSystem system, PgAsyncPool pgAsyncPool, TableNames tableNames, + TransactionManager transactionManager, JacksonEventFormat eventFormat, + JacksonSimpleFormat metaFormat, JacksonSimpleFormat contextFormat, + KafkaEventPublisher eventPublisher, ConcurrentReplayStrategy concurrentReplayStrategy, + ReactivePostgresEventStore eventStore, EventHandler eventHandler) { + this.system = system; + this.pgAsyncPool = pgAsyncPool; + this.tableNames = tableNames; + this.transactionManager = transactionManager; + this.eventFormat = eventFormat; + this.metaFormat = metaFormat; + this.contextFormat = contextFormat; + this.eventPublisher = eventPublisher; + this.concurrentReplayStrategy = concurrentReplayStrategy; + this.eventStore = eventStore; + this.eventHandler = eventHandler; + } + + public BuilderWithAggregateStore withAggregateStore(Function, AggregateStore> builder) { return new BuilderWithAggregateStore<>( system, pgAsyncPool, @@ -319,7 +383,6 @@ public BuilderWithAggregateStore withDefaultAggregateStore( } - @AllArgsConstructor public static class BuilderWithAggregateStore, E extends Event, Meta, Context> { public final ActorSystem system; public final PgAsyncPool pgAsyncPool; @@ -334,6 +397,26 @@ public static class BuilderWithAggregateStore, E extends Even public final EventHandler eventHandler; public final AggregateStore aggregateStore; + public BuilderWithAggregateStore(ActorSystem system, PgAsyncPool pgAsyncPool, TableNames tableNames, + TransactionManager transactionManager, JacksonEventFormat eventFormat, + JacksonSimpleFormat metaFormat, JacksonSimpleFormat contextFormat, + KafkaEventPublisher eventPublisher, ConcurrentReplayStrategy concurrentReplayStrategy, + ReactivePostgresEventStore eventStore, EventHandler eventHandler, + AggregateStore aggregateStore) { + this.system = system; + this.pgAsyncPool = pgAsyncPool; + this.tableNames = tableNames; + this.transactionManager = transactionManager; + this.eventFormat = eventFormat; + this.metaFormat = metaFormat; + this.contextFormat = contextFormat; + this.eventPublisher = eventPublisher; + this.concurrentReplayStrategy = concurrentReplayStrategy; + this.eventStore = eventStore; + this.eventHandler = eventHandler; + this.aggregateStore = aggregateStore; + } + public , Message> BuilderWithCommandHandler withCommandHandler(CommandHandler commandHandler) { return new BuilderWithCommandHandler<>( system, @@ -371,10 +454,6 @@ public , Message> BuilderWithCommandHand } } - - - - @AllArgsConstructor public static class BuilderWithCommandHandler, C extends Command, E extends Event, Message, Meta, Context> { public final ActorSystem system; public final PgAsyncPool pgAsyncPool; @@ -390,6 +469,28 @@ public static class BuilderWithCommandHandler, C exten public final EventHandler eventHandler; public final CommandHandler commandHandler; + public BuilderWithCommandHandler(ActorSystem system, PgAsyncPool pgAsyncPool, TableNames tableNames, + TransactionManager transactionManager, JacksonEventFormat eventFormat, + JacksonSimpleFormat metaFormat, JacksonSimpleFormat contextFormat, + KafkaEventPublisher eventPublisher, ConcurrentReplayStrategy concurrentReplayStrategy, + ReactivePostgresEventStore eventStore, + AggregateStore aggregateStore, EventHandler eventHandler, + CommandHandler commandHandler) { + this.system = system; + this.pgAsyncPool = pgAsyncPool; + this.tableNames = tableNames; + this.transactionManager = transactionManager; + this.eventFormat = eventFormat; + this.metaFormat = metaFormat; + this.contextFormat = contextFormat; + this.eventPublisher = eventPublisher; + this.concurrentReplayStrategy = concurrentReplayStrategy; + this.eventStore = eventStore; + this.aggregateStore = aggregateStore; + this.eventHandler = eventHandler; + this.commandHandler = commandHandler; + } + public BuilderWithProjections withProjections(List> projections) { return new BuilderWithProjections<>( system, @@ -437,8 +538,6 @@ public BuilderWithProjections withNoProj } } - - @AllArgsConstructor public static class BuilderWithProjections, C extends Command, E extends Event, Message, Meta, Context> { public final ActorSystem system; public final PgAsyncPool pgAsyncPool; @@ -455,6 +554,30 @@ public static class BuilderWithProjections, C extends public final CommandHandler commandHandler; public final List> projections; + public BuilderWithProjections(ActorSystem system, PgAsyncPool pgAsyncPool, TableNames tableNames, + TransactionManager transactionManager, JacksonEventFormat eventFormat, + JacksonSimpleFormat metaFormat, JacksonSimpleFormat contextFormat, + KafkaEventPublisher eventPublisher, ConcurrentReplayStrategy concurrentReplayStrategy, + ReactivePostgresEventStore eventStore, + AggregateStore aggregateStore, EventHandler eventHandler, + CommandHandler commandHandler, + List> projections) { + this.system = system; + this.pgAsyncPool = pgAsyncPool; + this.tableNames = tableNames; + this.transactionManager = transactionManager; + this.eventFormat = eventFormat; + this.metaFormat = metaFormat; + this.contextFormat = contextFormat; + this.eventPublisher = eventPublisher; + this.concurrentReplayStrategy = concurrentReplayStrategy; + this.eventStore = eventStore; + this.aggregateStore = aggregateStore; + this.eventHandler = eventHandler; + this.commandHandler = commandHandler; + this.projections = projections; + } + public ReactivePostgresKafkaEventProcessor build() { return new ReactivePostgresKafkaEventProcessor( new ReactivePostgresKafkaEventProcessor.PostgresKafkaEventProcessorConfig( diff --git a/thoth-jooq-async/src/test/java/fr/maif/eventsourcing/AbstractPostgresEventStoreTest.java b/thoth-jooq-async/src/test/java/fr/maif/eventsourcing/AbstractPostgresEventStoreTest.java index 272160fd..3368a6d6 100644 --- a/thoth-jooq-async/src/test/java/fr/maif/eventsourcing/AbstractPostgresEventStoreTest.java +++ b/thoth-jooq-async/src/test/java/fr/maif/eventsourcing/AbstractPostgresEventStoreTest.java @@ -16,7 +16,6 @@ import io.vavr.control.Either; import io.vavr.control.Option; import io.vavr.control.Try; -import lombok.SneakyThrows; import org.jooq.DSLContext; import org.jooq.Record; import org.jooq.SQLDialect; @@ -182,7 +181,6 @@ public void queryingByPublished() { @Test - @SneakyThrows public void loadEventsUnpublished() { initDatas(); List> events = List.ofAll(transactionSource() @@ -197,8 +195,7 @@ public void loadEventsUnpublished() { } @Test - @SneakyThrows - public void loadEventsUnpublishedSkip() { + public void loadEventsUnpublishedSkip() throws InterruptedException { initDatas(); CompletionStage>> first = transactionSource().flatMapConcat(t -> postgresEventStore.loadEventsUnpublished(t, SKIP) @@ -221,8 +218,7 @@ public void loadEventsUnpublishedSkip() { } @Test - @SneakyThrows - public void loadEventsUnpublishedWait() { + public void loadEventsUnpublishedWait() throws InterruptedException { initDatas(); CompletionStage>> first = transactionSource().flatMapConcat(t -> postgresEventStore.loadEventsUnpublished(t, WAIT) diff --git a/thoth-jooq/src/main/java/fr/maif/eventsourcing/PostgresKafkaEventProcessorBuilder.java b/thoth-jooq/src/main/java/fr/maif/eventsourcing/PostgresKafkaEventProcessorBuilder.java index 3e6daccd..094cb9d9 100644 --- a/thoth-jooq/src/main/java/fr/maif/eventsourcing/PostgresKafkaEventProcessorBuilder.java +++ b/thoth-jooq/src/main/java/fr/maif/eventsourcing/PostgresKafkaEventProcessorBuilder.java @@ -13,45 +13,55 @@ import io.vavr.Tuple0; import io.vavr.collection.List; import io.vavr.control.Option; -import lombok.AllArgsConstructor; import javax.sql.DataSource; import java.sql.Connection; import java.util.concurrent.ExecutorService; import java.util.function.Function; -import static fr.maif.eventsourcing.EventStore.ConcurrentReplayStrategy.NO_STRATEGY; -import static fr.maif.eventsourcing.EventStore.ConcurrentReplayStrategy.SKIP; import static fr.maif.eventsourcing.EventStore.ConcurrentReplayStrategy.WAIT; public class PostgresKafkaEventProcessorBuilder { - @AllArgsConstructor + public static class BuilderWithSystem { public final ActorSystem system; + public BuilderWithSystem(ActorSystem system) { + this.system = system; + } + public BuilderWithPool withDataSource(DataSource dataSource) { return new BuilderWithPool(system, dataSource); } } - @AllArgsConstructor public static class BuilderWithPool { public final ActorSystem system; public final DataSource dataSource; + public BuilderWithPool(ActorSystem system, DataSource dataSource) { + this.system = system; + this.dataSource = dataSource; + } + public BuilderWithTables withTables(TableNames tableNames) { return new BuilderWithTables(system, dataSource, tableNames); } } - @AllArgsConstructor public static class BuilderWithTables { public final ActorSystem system; public final DataSource dataSource; public final TableNames tableNames; + public BuilderWithTables(ActorSystem system, DataSource dataSource, TableNames tableNames) { + this.system = system; + this.dataSource = dataSource; + this.tableNames = tableNames; + } + public BuilderWithTx withTransactionManager(TransactionManager transactionManager, ExecutorService executor) { return new BuilderWithTx(system, dataSource, tableNames, transactionManager, executor); } @@ -61,7 +71,6 @@ public BuilderWithTx withTransactionManager(ExecutorService executor) { } } - @AllArgsConstructor public static class BuilderWithTx { public final ActorSystem system; public final DataSource dataSource; @@ -69,12 +78,20 @@ public static class BuilderWithTx { public final TransactionManager transactionManager; public final ExecutorService executor; + public BuilderWithTx(ActorSystem system, DataSource dataSource, TableNames tableNames, + TransactionManager transactionManager, ExecutorService executor) { + this.system = system; + this.dataSource = dataSource; + this.tableNames = tableNames; + this.transactionManager = transactionManager; + this.executor = executor; + } + public BuilderWithEventFormat withEventFormater(JacksonEventFormat eventFormat) { return new BuilderWithEventFormat<>(system, dataSource, tableNames, transactionManager, eventFormat, executor); } } - @AllArgsConstructor public static class BuilderWithEventFormat { public final ActorSystem system; public final DataSource dataSource; @@ -83,6 +100,16 @@ public static class BuilderWithEventFormat { public final JacksonEventFormat eventFormat; public final ExecutorService executor; + public BuilderWithEventFormat(ActorSystem system, DataSource dataSource, TableNames tableNames, + TransactionManager transactionManager, JacksonEventFormat eventFormat, ExecutorService executor) { + this.system = system; + this.dataSource = dataSource; + this.tableNames = tableNames; + this.transactionManager = transactionManager; + this.eventFormat = eventFormat; + this.executor = executor; + } + public BuilderWithMetaFormat withMetaFormater(JacksonSimpleFormat metaFormat) { return new BuilderWithMetaFormat(system, dataSource, tableNames, transactionManager, eventFormat, metaFormat, executor); } @@ -92,7 +119,6 @@ public BuilderWithMetaFormat withNoMetaFormater() { } } - @AllArgsConstructor public static class BuilderWithMetaFormat { public final ActorSystem system; public final DataSource dataSource; @@ -102,6 +128,18 @@ public static class BuilderWithMetaFormat { public final JacksonSimpleFormat metaFormat; public final ExecutorService executor; + public BuilderWithMetaFormat(ActorSystem system, DataSource dataSource, TableNames tableNames, + TransactionManager transactionManager, JacksonEventFormat eventFormat, + JacksonSimpleFormat metaFormat, ExecutorService executor) { + this.system = system; + this.dataSource = dataSource; + this.tableNames = tableNames; + this.transactionManager = transactionManager; + this.eventFormat = eventFormat; + this.metaFormat = metaFormat; + this.executor = executor; + } + public BuilderWithContextFormat withContextFormater(JacksonSimpleFormat contextFormat) { return new BuilderWithContextFormat(system, dataSource, tableNames, transactionManager, eventFormat, metaFormat, contextFormat, executor); } @@ -111,7 +149,6 @@ public BuilderWithContextFormat withNoContextFormater() { } } - @AllArgsConstructor public static class BuilderWithContextFormat { public final ActorSystem system; public final DataSource dataSource; @@ -122,6 +159,19 @@ public static class BuilderWithContextFormat { public final JacksonSimpleFormat contextFormat; public final ExecutorService executor; + public BuilderWithContextFormat(ActorSystem system, DataSource dataSource, TableNames tableNames, + TransactionManager transactionManager, JacksonEventFormat eventFormat, + JacksonSimpleFormat metaFormat, JacksonSimpleFormat contextFormat, ExecutorService executor) { + this.system = system; + this.dataSource = dataSource; + this.tableNames = tableNames; + this.transactionManager = transactionManager; + this.eventFormat = eventFormat; + this.metaFormat = metaFormat; + this.contextFormat = contextFormat; + this.executor = executor; + } + public BuilderWithKafkaSettings withKafkaSettings(String topic, ProducerSettings> producerSettings, Integer bufferSize) { return new BuilderWithKafkaSettings<>( system, @@ -273,7 +323,6 @@ public > BuilderWithEventHandler withEve } } - @AllArgsConstructor public static class BuilderWithEventHandler, E extends Event, Meta, Context> { public final ActorSystem system; public final DataSource dataSource; @@ -285,9 +334,26 @@ public static class BuilderWithEventHandler, E extends Event, public final KafkaEventPublisher eventPublisher; public final ConcurrentReplayStrategy concurrentReplayStrategy; public final PostgresEventStore eventStore; - public final EventHandler eventHandler; + public BuilderWithEventHandler(ActorSystem system, DataSource dataSource, TableNames tableNames, + TransactionManager transactionManager, JacksonEventFormat eventFormat, + JacksonSimpleFormat metaFormat, JacksonSimpleFormat contextFormat, + KafkaEventPublisher eventPublisher, ConcurrentReplayStrategy concurrentReplayStrategy, + PostgresEventStore eventStore, EventHandler eventHandler) { + this.system = system; + this.dataSource = dataSource; + this.tableNames = tableNames; + this.transactionManager = transactionManager; + this.eventFormat = eventFormat; + this.metaFormat = metaFormat; + this.contextFormat = contextFormat; + this.eventPublisher = eventPublisher; + this.concurrentReplayStrategy = concurrentReplayStrategy; + this.eventStore = eventStore; + this.eventHandler = eventHandler; + } + public BuilderWithAggregateStore withAggregateStore(Function, ? extends AggregateStore> builder) { return new BuilderWithAggregateStore<>( system, @@ -337,8 +403,6 @@ public BuilderWithAggregateStore withDefaultAggregateStore( } } - - @AllArgsConstructor public static class BuilderWithAggregateStore, E extends Event, Meta, Context> { public final ActorSystem system; public final DataSource dataSource; @@ -353,6 +417,26 @@ public static class BuilderWithAggregateStore, E extends Even public final EventHandler eventHandler; public final AggregateStore aggregateStore; + public BuilderWithAggregateStore(ActorSystem system, DataSource dataSource, TableNames tableNames, + TransactionManager transactionManager, JacksonEventFormat eventFormat, + JacksonSimpleFormat metaFormat, JacksonSimpleFormat contextFormat, + KafkaEventPublisher eventPublisher, ConcurrentReplayStrategy concurrentReplayStrategy, + PostgresEventStore eventStore, EventHandler eventHandler, + AggregateStore aggregateStore) { + this.system = system; + this.dataSource = dataSource; + this.tableNames = tableNames; + this.transactionManager = transactionManager; + this.eventFormat = eventFormat; + this.metaFormat = metaFormat; + this.contextFormat = contextFormat; + this.eventPublisher = eventPublisher; + this.concurrentReplayStrategy = concurrentReplayStrategy; + this.eventStore = eventStore; + this.eventHandler = eventHandler; + this.aggregateStore = aggregateStore; + } + public , Message> BuilderWithCommandHandler withCommandHandler(CommandHandler commandHandler) { return new BuilderWithCommandHandler<>( system, @@ -391,7 +475,6 @@ public , Message> BuilderWithCommandHand } - @AllArgsConstructor public static class BuilderWithCommandHandler, C extends Command, E extends Event, Message, Meta, Context> { public final ActorSystem system; public final DataSource dataSource; @@ -407,6 +490,28 @@ public static class BuilderWithCommandHandler, C exten public final EventHandler eventHandler; public final CommandHandler commandHandler; + public BuilderWithCommandHandler(ActorSystem system, DataSource dataSource, TableNames tableNames, + TransactionManager transactionManager, JacksonEventFormat eventFormat, + JacksonSimpleFormat metaFormat, JacksonSimpleFormat contextFormat, + KafkaEventPublisher eventPublisher, ConcurrentReplayStrategy concurrentReplayStrategy, + PostgresEventStore eventStore, + AggregateStore aggregateStore, EventHandler eventHandler, + CommandHandler commandHandler) { + this.system = system; + this.dataSource = dataSource; + this.tableNames = tableNames; + this.transactionManager = transactionManager; + this.eventFormat = eventFormat; + this.metaFormat = metaFormat; + this.contextFormat = contextFormat; + this.eventPublisher = eventPublisher; + this.concurrentReplayStrategy = concurrentReplayStrategy; + this.eventStore = eventStore; + this.aggregateStore = aggregateStore; + this.eventHandler = eventHandler; + this.commandHandler = commandHandler; + } + public BuilderWithProjections withProjections(List> projections) { return new BuilderWithProjections<>( system, @@ -455,7 +560,6 @@ public BuilderWithProjections withNoProj } - @AllArgsConstructor public static class BuilderWithProjections, C extends Command, E extends Event, Message, Meta, Context> { public final ActorSystem system; public final DataSource dataSource; @@ -472,6 +576,30 @@ public static class BuilderWithProjections, C extends public final CommandHandler commandHandler; public final List> projections; + public BuilderWithProjections(ActorSystem system, DataSource dataSource, TableNames tableNames, + TransactionManager transactionManager, JacksonEventFormat eventFormat, + JacksonSimpleFormat metaFormat, JacksonSimpleFormat contextFormat, + KafkaEventPublisher eventPublisher, ConcurrentReplayStrategy concurrentReplayStrategy, + PostgresEventStore eventStore, + AggregateStore aggregateStore, EventHandler eventHandler, + CommandHandler commandHandler, + List> projections) { + this.system = system; + this.dataSource = dataSource; + this.tableNames = tableNames; + this.transactionManager = transactionManager; + this.eventFormat = eventFormat; + this.metaFormat = metaFormat; + this.contextFormat = contextFormat; + this.eventPublisher = eventPublisher; + this.concurrentReplayStrategy = concurrentReplayStrategy; + this.eventStore = eventStore; + this.aggregateStore = aggregateStore; + this.eventHandler = eventHandler; + this.commandHandler = commandHandler; + this.projections = projections; + } + public PostgresKafkaEventProcessor build() { return new PostgresKafkaEventProcessor( new PostgresKafkaEventProcessor.PostgresKafkaEventProcessorConfig( diff --git a/thoth-jooq/src/test/java/fr/maif/eventsourcing/impl/PostgresEventStoreTest.java b/thoth-jooq/src/test/java/fr/maif/eventsourcing/impl/PostgresEventStoreTest.java index 3d5f221c..31e5e403 100644 --- a/thoth-jooq/src/test/java/fr/maif/eventsourcing/impl/PostgresEventStoreTest.java +++ b/thoth-jooq/src/test/java/fr/maif/eventsourcing/impl/PostgresEventStoreTest.java @@ -16,7 +16,6 @@ import io.vavr.control.Either; import io.vavr.control.Option; import io.vavr.control.Try; -import lombok.SneakyThrows; import org.jooq.DSLContext; import org.jooq.Record; import org.jooq.SQLDialect; @@ -183,7 +182,6 @@ private Source transactionSource() { @Test - @SneakyThrows public void loadEventsUnpublished() { initDatas(); List> events = List.ofAll(transactionSource() @@ -198,8 +196,7 @@ public void loadEventsUnpublished() { } @Test - @SneakyThrows - public void loadEventsUnpublishedSkip() { + public void loadEventsUnpublishedSkip() throws InterruptedException { initDatas(); Duration initialDelay = Duration.ofMillis(100); System.out.println("Running first query"); @@ -227,8 +224,7 @@ public void loadEventsUnpublishedSkip() { } @Test - @SneakyThrows - public void loadEventsUnpublishedWait() { + public void loadEventsUnpublishedWait() throws InterruptedException { initDatas(); CompletionStage>> first = transactionSource().flatMapConcat(t -> postgresEventStore.loadEventsUnpublished(t, WAIT) @@ -252,8 +248,7 @@ public void loadEventsUnpublishedWait() { } @Test - @SneakyThrows - public void markEventsAsPublished() { + public void markEventsAsPublished() throws SQLException { initDatas(); List> events; try (Connection connection = dataSource.getConnection()) { diff --git a/thoth-kafka-goodies/build.sbt b/thoth-kafka-goodies/build.sbt index 17098228..c8a7f86b 100644 --- a/thoth-kafka-goodies/build.sbt +++ b/thoth-kafka-goodies/build.sbt @@ -7,7 +7,6 @@ name := "thoth-kafka-goodies" libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-stream" % akkaVersion, "com.typesafe.akka" %% "akka-stream-kafka" % alpakkaKafkaVersion, - "org.projectlombok" % "lombok" % "1.18.18", "com.typesafe.akka" %% "akka-testkit" % akkaVersion % Test, "com.typesafe.akka" %% "akka-stream-testkit" % akkaVersion % Test, "com.typesafe.akka" %% "akka-stream-kafka-testkit" % alpakkaKafkaVersion % Test, diff --git a/thoth-kafka-goodies/src/main/java/fr/maif/kafka/consumer/ResilientKafkaConsumer.java b/thoth-kafka-goodies/src/main/java/fr/maif/kafka/consumer/ResilientKafkaConsumer.java index 994ef9e1..6be6c203 100644 --- a/thoth-kafka-goodies/src/main/java/fr/maif/kafka/consumer/ResilientKafkaConsumer.java +++ b/thoth-kafka-goodies/src/main/java/fr/maif/kafka/consumer/ResilientKafkaConsumer.java @@ -16,13 +16,9 @@ import akka.stream.javadsl.RestartSource; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; -import lombok.AccessLevel; -import lombok.AllArgsConstructor; -import lombok.Builder; -import lombok.With; -import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.List; @@ -40,33 +36,192 @@ import static akka.Done.done; import static java.util.function.Function.identity; -@Slf4j public abstract class ResilientKafkaConsumer { - @Builder(toBuilder = true) - @AllArgsConstructor(access = AccessLevel.PRIVATE) - @With + private static final Logger LOGGER = LoggerFactory.getLogger(ResilientKafkaConsumer.class); + public static class Config { public final AutoSubscription subscription; public final String groupId; public final ConsumerSettings consumerSettings; - public final Duration minBackoff; - public final Duration maxBackoff; - public final Double randomFactor; - public final Integer commitSize; - public final BiFunction> onStarted; - public final Supplier> onStarting; - public final Supplier> onStopped; - public final Function> onStopping; - public final Function> onFailed; + public Duration minBackoff; + public Duration maxBackoff; + public Double randomFactor; + public Integer commitSize; + public BiFunction> onStarted; + public Supplier> onStarting; + public Supplier> onStopped; + public Function> onStopping; + public Function> onFailed; + + private Config(AutoSubscription subscription, String groupId, ConsumerSettings consumerSettings, Duration minBackoff, + Duration maxBackoff, Double randomFactor, Integer commitSize, + BiFunction> onStarted, + Supplier> onStarting, Supplier> onStopped, + Function> onStopping, + Function> onFailed) { + this.subscription = subscription; + this.groupId = groupId; + this.consumerSettings = consumerSettings; + this.minBackoff = minBackoff; + this.maxBackoff = maxBackoff; + this.randomFactor = randomFactor; + this.commitSize = commitSize; + this.onStarted = onStarted; + this.onStarting = onStarting; + this.onStopped = onStopped; + this.onStopping = onStopping; + this.onFailed = onFailed; + } + + public static class ConfigBuilder { + AutoSubscription subscription; + String groupId; + ConsumerSettings consumerSettings; + Duration minBackoff; + Duration maxBackoff; + Double randomFactor; + Integer commitSize; + BiFunction> onStarted; + Supplier> onStarting; + Supplier> onStopped; + Function> onStopping; + Function> onFailed; + + public ConfigBuilder subscription(AutoSubscription subscription) { + this.subscription = subscription; + return this; + } + + public ConfigBuilder groupId(String groupId) { + this.groupId = groupId; + return this; + } + + public ConfigBuilder consumerSettings(ConsumerSettings consumerSettings) { + this.consumerSettings = consumerSettings; + return this; + } + + public ConfigBuilder minBackoff(Duration minBackoff) { + this.minBackoff = minBackoff; + return this; + } + + public ConfigBuilder maxBackoff(Duration maxBackoff) { + this.maxBackoff = maxBackoff; + return this; + } + + public ConfigBuilder randomFactor(Double randomFactor) { + this.randomFactor = randomFactor; + return this; + } + + public ConfigBuilder commitSize(Integer commitSize) { + this.commitSize = commitSize; + return this; + } + + public ConfigBuilder onStarted( + BiFunction> onStarted) { + this.onStarted = onStarted; + return this; + } + + public ConfigBuilder onStarting(Supplier> onStarting) { + this.onStarting = onStarting; + return this; + } + + public ConfigBuilder onStopped(Supplier> onStopped) { + this.onStopped = onStopped; + return this; + } + + public ConfigBuilder onStopping( + Function> onStopping) { + this.onStopping = onStopping; + return this; + } + + public ConfigBuilder onFailed(Function> onFailed) { + this.onFailed = onFailed; + return this; + } + + public Config build(){ + return new Config<>(this.subscription, this.groupId, this.consumerSettings, this.minBackoff, + this.maxBackoff, this.randomFactor, this.commitSize, this.onStarted, + this.onStarting, this.onStopped, this.onStopping, this.onFailed); + } + } public static Config create(AutoSubscription subscription, String groupId, ConsumerSettings consumerSettings) { - return Config.builder() + return Config.builder() .consumerSettings(consumerSettings) .groupId(groupId) .subscription(subscription) .build(); } + + public static ConfigBuilder builder(){ + return new ConfigBuilder<>(); + } + + public ConfigBuilder toBuilder(){ + return Config.builder() + .subscription(this.subscription) + .groupId(this.groupId) + .consumerSettings(this.consumerSettings) + .minBackoff(this.minBackoff) + .maxBackoff(this.maxBackoff) + .randomFactor(this.randomFactor) + .commitSize(this.commitSize) + .onStarted(this.onStarted) + .onStarting(this.onStarting) + .onStopped(this.onStopped) + .onStopping(this.onStopping) + .onFailed(this.onFailed); + } + + public Config withMinBackoff(Duration minBackoff) { + return this.toBuilder().minBackoff(minBackoff).build(); + } + + public Config withMaxBackoff(Duration maxBackoff) { + return this.toBuilder().maxBackoff(maxBackoff).build(); + } + + public Config withRandomFactor(Double randomFactor) { + return this.toBuilder().randomFactor(randomFactor).build(); + } + + public Config withCommitSize(Integer commitSize) { + return this.toBuilder().commitSize(commitSize).build(); + } + + public Config withOnStarted( + BiFunction> onStarted) { + return this.toBuilder().onStarted(onStarted).build(); + } + + public Config withOnStarting(Supplier> onStarting) { + return this.toBuilder().onStarting(onStarting).build(); + } + + public Config withOnStopped(Supplier> onStopped) { + return this.toBuilder().onStopped(onStopped).build(); + } + + public Config withOnStopping( + Function> onStopping) { + return this.toBuilder().onStopping(onStopping).build(); + } + + public Config withOnFailed(Function> onFailed) { + return this.toBuilder().onFailed(onFailed).build(); + } } protected final ActorSystem actorSystem; protected final Materializer materializer; @@ -228,11 +383,6 @@ protected static T defaultIfNull(T v, T defaultValue) { protected abstract String name(); protected abstract Flow, ConsumerMessage.CommittableOffset, NotUsed> messageHandling(); - - protected Logger logger() { - return log; - } - private Status updateStatus(Status status) { innerStatus.set(status); return status; @@ -245,13 +395,13 @@ public Status status() { public Status start() { Status currentStatus = status(); if (Status.Starting.equals(currentStatus) || Status.Started.equals(currentStatus)) { - logger().info("{} already started", name()); + LOGGER.info("{} already started", name()); return currentStatus; } updateStatus(Status.Starting); CommitterSettings committerSettings = CommitterSettings.create(actorSystem); - logger().info("Starting {} on topic '{}' with group id '{}'", name(), subscription, groupId); + LOGGER.info("Starting {} on topic '{}' with group id '{}'", name(), subscription, groupId); AtomicInteger restartCount = new AtomicInteger(0); RestartSource.onFailuresWithBackoff( minBackoff, @@ -261,9 +411,9 @@ public Status start() { this.onStarting.get(); int count = restartCount.incrementAndGet(); if (count > 1) { - logger().info("Stream for {} is restarting for the {} time", name(), count); + LOGGER.info("Stream for {} is restarting for the {} time", name(), count); } else { - logger().info("Stream for {} is starting", name()); + LOGGER.info("Stream for {} is starting", name()); } return Consumer .committablePartitionedSource(consumerSettings, subscription) @@ -275,7 +425,7 @@ public Status start() { .mapMaterializedValue(control -> { updateStatus(Status.Started); this.onStarted.apply(control, count); - logger().info("Stream for {} has started", name()); + LOGGER.info("Stream for {} has started", name()); controlRef.set(control); return control; }) @@ -302,7 +452,7 @@ public CompletionStage stop() { protected CompletionStage handleTerminaison(CompletionStage done) { return done .thenApply(any -> { - logger().info("Stopping {}", name()); + LOGGER.info("Stopping {}", name()); updateStatus(Status.Stopped); return stopConsumingKafka() .exceptionally(__ -> done()) @@ -311,7 +461,7 @@ protected CompletionStage handleTerminaison(CompletionStage done) .exceptionally(__ -> Status.Stopped); }) .exceptionally(e -> { - logger().error("Error during " + name(), e); + LOGGER.error("Error during " + name(), e); updateStatus(Status.Failed); return stopConsumingKafka() .exceptionally(__ -> done()) @@ -329,9 +479,9 @@ protected CompletionStage stopConsumingKafka() { return control.shutdown() .whenComplete((d, e) -> { if (e != null) { - logger().error("Error shutting down kafka consumer for {}", name()); + LOGGER.error("Error shutting down kafka consumer for {}", name()); } else { - logger().info("Kafka consumer for {} is shutdown", name()); + LOGGER.info("Kafka consumer for {} is shutdown", name()); } }) .thenCompose(___ -> control.isShutdown()) diff --git a/thoth-kafka-goodies/src/test/java/fr/maif/kafka/consumer/ResilientKafkaConsumerTest.java b/thoth-kafka-goodies/src/test/java/fr/maif/kafka/consumer/ResilientKafkaConsumerTest.java index 06b4fcd4..9318bbcf 100644 --- a/thoth-kafka-goodies/src/test/java/fr/maif/kafka/consumer/ResilientKafkaConsumerTest.java +++ b/thoth-kafka-goodies/src/test/java/fr/maif/kafka/consumer/ResilientKafkaConsumerTest.java @@ -11,7 +11,6 @@ import akka.stream.javadsl.Flow; import akka.stream.javadsl.FlowWithContext; import akka.testkit.javadsl.TestKit; -import lombok.SneakyThrows; import org.apache.kafka.common.serialization.StringDeserializer; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Test; @@ -36,8 +35,7 @@ public ResilientKafkaConsumerTest() { @Test - @SneakyThrows - void consumer() { + void consumer() throws Exception { String topic = createTopic(); String groupId = "test-group-id-1"; @@ -76,8 +74,7 @@ void consumer() { } @Test - @SneakyThrows - void contexteAkkastreamApi() { + void contexteAkkastreamApi() throws Exception { String topic = createTopic(); String groupId = "test-group-id-3"; @@ -118,8 +115,7 @@ void contexteAkkastreamApi() { @Test - @SneakyThrows - void crash() { + void crash() throws Exception { String topic = createTopic(); String groupId = "test-group-id-3"; @@ -170,8 +166,7 @@ void crash() { } @Test - @SneakyThrows - void consumerLifecycle() { + void consumerLifecycle() throws Exception { String topic = createTopic(); String groupId = "test-group-id-4"; From 8688dc292e0d4ad18c1f2285b1de31fd22b2fdd1 Mon Sep 17 00:00:00 2001 From: Benjamin CAVY Date: Fri, 16 Jul 2021 11:36:43 +0200 Subject: [PATCH 08/10] doc: use java 16 for demos --- .github/workflows/release.yml | 4 ++-- .github/workflows/test.yml | 6 +++--- build.sbt | 2 +- demo/demo-in-memory/build.sbt | 2 +- demo/demo-postgres-kafka-reactive/buid.sbt | 2 +- demo/demo-postgres-kafka/buid.sbt | 2 +- project/Dependencies.scala | 2 +- sample/build.sbt | 7 +++---- .../java/fr/maif/thoth/sample/commands/BankCommand.java | 4 +--- .../main/java/fr/maif/thoth/sample/events/BankEvent.java | 2 +- 10 files changed, 15 insertions(+), 18 deletions(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index a743763e..e678fc5b 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -16,10 +16,10 @@ jobs: - uses: actions/checkout@v2 with: token: '${{ secrets.PERSONAL_ACCESS_TOKEN }}' - - name: Set up JDK 15 + - name: Set up JDK 16 uses: actions/setup-java@v1 with: - java-version: 15 + java-version: 16 - name: release sbt run: | git config --local user.email "thoth-github-actions@users.noreply.github.com" diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 01185850..35f2fa67 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -6,10 +6,10 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2.2.0 - - name: Set up JDK 15 + - name: Set up JDK 16 uses: actions/setup-java@v1 with: - java-version: 15 + java-version: 16 - name: Start docker run: docker-compose -f docker-compose.test.yml up -d - name: Build @@ -17,7 +17,7 @@ jobs: run: sbt publishLocal - name: Run test id: test - run: JAVA_OPTS="--enable-preview" sbt "project thoth-core" test && sbt "project thoth-jooq" test && sbt "project thoth-jooq-async" test && sbt "project thoth-kafka-goodies" test && sbt "project thoth-tck" test + run: JAVA_OPTS="sbt "project thoth-core" test && sbt "project thoth-jooq" test && sbt "project thoth-jooq-async" test && sbt "project thoth-kafka-goodies" test && sbt "project thoth-tck" test - name: Publish Unit Test Results uses: EnricoMi/publish-unit-test-result-action@v1 if: always() diff --git a/build.sbt b/build.sbt index a33e9d5b..efcc48dc 100644 --- a/build.sbt +++ b/build.sbt @@ -124,7 +124,7 @@ lazy val `thoth-jooq` = project javacOptions in Compile ++= Seq( "-source", - "15", + "16", "-target", "8", "-Xlint:unchecked", diff --git a/demo/demo-in-memory/build.sbt b/demo/demo-in-memory/build.sbt index 2b48cf6b..cba2d160 100644 --- a/demo/demo-in-memory/build.sbt +++ b/demo/demo-in-memory/build.sbt @@ -6,4 +6,4 @@ name := "demo-in-memory" scalaVersion := "2.12.12" -javacOptions in Compile ++= Seq("-source", "15", "-target", "15", "-Xlint:unchecked", "-Xlint:deprecation", "--enable-preview") \ No newline at end of file +javacOptions in Compile ++= Seq("-source", "16", "-target", "16", "-Xlint:unchecked", "-Xlint:deprecation") \ No newline at end of file diff --git a/demo/demo-postgres-kafka-reactive/buid.sbt b/demo/demo-postgres-kafka-reactive/buid.sbt index 98dcb718..14c96324 100644 --- a/demo/demo-postgres-kafka-reactive/buid.sbt +++ b/demo/demo-postgres-kafka-reactive/buid.sbt @@ -10,4 +10,4 @@ libraryDependencies ++= Seq( "fr.maif" % "functional-json" % functionalJsonVersion ) -javacOptions in Compile ++= Seq("-source", "15", "-target", "15", "-Xlint:unchecked", "-Xlint:deprecation") +javacOptions in Compile ++= Seq("-source", "16", "-target", "16", "-Xlint:unchecked", "-Xlint:deprecation") diff --git a/demo/demo-postgres-kafka/buid.sbt b/demo/demo-postgres-kafka/buid.sbt index da698c83..a877e0c3 100644 --- a/demo/demo-postgres-kafka/buid.sbt +++ b/demo/demo-postgres-kafka/buid.sbt @@ -10,4 +10,4 @@ libraryDependencies ++= Seq( "fr.maif" % "functional-json" % functionalJsonVersion ) -javacOptions in Compile ++= Seq("-source", "15", "-target", "15", "-Xlint:unchecked", "-Xlint:deprecation") \ No newline at end of file +javacOptions in Compile ++= Seq("-source", "16", "-target", "16", "-Xlint:unchecked", "-Xlint:deprecation") \ No newline at end of file diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 37edf99d..f9c10f4e 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -5,5 +5,5 @@ object Dependencies { val vavrVersion = "0.10.3" val jooqVersion = "3.14.3" val jooqAsyncVersion = "1.1.0" - val functionalJsonVersion = "1.0.2" + val functionalJsonVersion = "1.0.3" } diff --git a/sample/build.sbt b/sample/build.sbt index eb57ef02..533d9b72 100644 --- a/sample/build.sbt +++ b/sample/build.sbt @@ -25,10 +25,9 @@ libraryDependencies ++= Seq( javacOptions in Compile ++= Seq( "-source", - "15", + "16", "-target", - "15", + "16", "-Xlint:unchecked", - "-Xlint:deprecation", - "--enable-preview" + "-Xlint:deprecation" ) diff --git a/sample/src/main/java/fr/maif/thoth/sample/commands/BankCommand.java b/sample/src/main/java/fr/maif/thoth/sample/commands/BankCommand.java index 2b4f267d..63380d18 100644 --- a/sample/src/main/java/fr/maif/thoth/sample/commands/BankCommand.java +++ b/sample/src/main/java/fr/maif/thoth/sample/commands/BankCommand.java @@ -3,11 +3,9 @@ import java.math.BigDecimal; import fr.maif.eventsourcing.SimpleCommand; -import fr.maif.eventsourcing.Type; -import io.vavr.API.Match.Pattern0; import io.vavr.Lazy; -public sealed interface BankCommand extends SimpleCommand { +public interface BankCommand extends SimpleCommand { final class Withdraw implements BankCommand { public String account; diff --git a/sample/src/main/java/fr/maif/thoth/sample/events/BankEvent.java b/sample/src/main/java/fr/maif/thoth/sample/events/BankEvent.java index efd63089..aafe1d5a 100644 --- a/sample/src/main/java/fr/maif/thoth/sample/events/BankEvent.java +++ b/sample/src/main/java/fr/maif/thoth/sample/events/BankEvent.java @@ -9,7 +9,7 @@ import fr.maif.eventsourcing.Type; import io.vavr.API.Match.Pattern0; -public abstract sealed class BankEvent implements Event { +public abstract class BankEvent implements Event { public static Type MoneyWithdrawnV1 = Type.create(MoneyWithdrawn.class, 1L); public static Type AccountOpenedV1 = Type.create(AccountOpened.class, 1L); public static Type MoneyDepositedV1 = Type.create(MoneyDeposited.class, 1L); From c535a7decc20f9bdf710d3e400340061c47bd50e Mon Sep 17 00:00:00 2001 From: Benjamin CAVY Date: Fri, 16 Jul 2021 11:40:20 +0200 Subject: [PATCH 09/10] build: fix build --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 35f2fa67..631e920e 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -17,7 +17,7 @@ jobs: run: sbt publishLocal - name: Run test id: test - run: JAVA_OPTS="sbt "project thoth-core" test && sbt "project thoth-jooq" test && sbt "project thoth-jooq-async" test && sbt "project thoth-kafka-goodies" test && sbt "project thoth-tck" test + run: sbt "project thoth-core" test && sbt "project thoth-jooq" test && sbt "project thoth-jooq-async" test && sbt "project thoth-kafka-goodies" test && sbt "project thoth-tck" test - name: Publish Unit Test Results uses: EnricoMi/publish-unit-test-result-action@v1 if: always() From 7265e57f1b11cdc53f01e26c512919d386826a96 Mon Sep 17 00:00:00 2001 From: Benjamin Cavy Date: Mon, 26 Jul 2021 14:49:22 +0200 Subject: [PATCH 10/10] feat: add sample application test to global tests (#38) --- .github/workflows/test.yml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 631e920e..4b3563c8 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -18,6 +18,10 @@ jobs: - name: Run test id: test run: sbt "project thoth-core" test && sbt "project thoth-jooq" test && sbt "project thoth-jooq-async" test && sbt "project thoth-kafka-goodies" test && sbt "project thoth-tck" test + - name: Stop docker + run: docker-compose -f docker-compose.test.yml down + - name: Run sample application test + run: sbt "project sample" test - name: Publish Unit Test Results uses: EnricoMi/publish-unit-test-result-action@v1 if: always()