diff --git a/.gitignore b/.gitignore index 212f41bf..fd36b7c5 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,5 @@ target .bsp/ .DS_Store -.bloop \ No newline at end of file +.bloop +*.iml \ No newline at end of file diff --git a/thoth-core/src/main/java/fr/maif/eventsourcing/impl/InMemoryEventStore.java b/thoth-core/src/main/java/fr/maif/eventsourcing/impl/InMemoryEventStore.java index da043799..39ba2202 100644 --- a/thoth-core/src/main/java/fr/maif/eventsourcing/impl/InMemoryEventStore.java +++ b/thoth-core/src/main/java/fr/maif/eventsourcing/impl/InMemoryEventStore.java @@ -118,9 +118,7 @@ public Source, NotUsed> loadEventsByQuery(Tuple0 @Override public Source, NotUsed> loadEventsByQuery(Query query) { return Source.from(eventStore) - .filter(e -> { - return Option.of(query.entityId).map(id -> id.equals(e.entityId)).getOrElse(true); - }); + .filter(e -> Option.of(query.entityId).map(id -> id.equals(e.entityId)).getOrElse(true)); } @Override diff --git a/thoth-jooq/src/test/java/fr/maif/eventsourcing/impl/JooqKafkaTckImplementation.java b/thoth-jooq/src/test/java/fr/maif/eventsourcing/impl/JooqKafkaTckImplementation.java index e9061be9..7b46d49f 100644 --- a/thoth-jooq/src/test/java/fr/maif/eventsourcing/impl/JooqKafkaTckImplementation.java +++ b/thoth-jooq/src/test/java/fr/maif/eventsourcing/impl/JooqKafkaTckImplementation.java @@ -15,6 +15,8 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; +import fr.maif.eventsourcing.Projection; +import fr.maif.eventsourcing.datastore.TestProjection; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -58,6 +60,7 @@ public class JooqKafkaTckImplementation extends DataStoreVerification()) - .withNoProjections() + .withProjections(this.testProjection) .build(); @@ -228,6 +233,11 @@ public List> readPublishedEvents(String return envelopes; } + @Override + public Integer readProjection() { + return ((TestProjection)this.testProjection).getCount(); + } + private static Optional getEndOffsetIfNotReached(String topic, String kafkaServers, String groupId) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers); diff --git a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerification.java b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerification.java index 473934fb..f3075250 100644 --- a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerification.java +++ b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerification.java @@ -12,6 +12,7 @@ import io.vavr.control.Either; import io.vavr.control.Option; +import io.vavr.control.Try; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -168,11 +169,7 @@ public void required_eventShouldBePublishedEventIfBrokerIsDownAtFirst() { submitValidCommand(eventProcessor, "1"); restartBroker(); - try { - Thread.sleep(10000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + sleep(); List> envelopes = deduplicateOnId(readPublishedEvents(kafkaBootstrapUrl(), topic)); cleanup(eventProcessor); @@ -199,6 +196,55 @@ public void required_commandSubmissionShouldFailIfDatabaseIsNotAvailable() { } } + + @Override + @Test + public void required_eventShouldBeConsumedByProjectionWhenEverythingIsAlright(){ + String topic = randomKafkaTopic(); + EventProcessor eventProcessor = eventProcessor(topic); + submitValidCommand(eventProcessor, "1"); + sleep(); + + cleanup(eventProcessor); + assertThat(readProjection()).isEqualTo(1); + } + @Override + @Test + public void required_eventShouldBeConsumedByProjectionEvenIfBrokerIsDownAtFirst(){ + String topic = randomKafkaTopic(); + EventProcessor eventProcessor = eventProcessor(topic); + shutdownBroker(); + submitValidCommand(eventProcessor, "1"); + sleep(); + restartBroker(); + sleep(); + cleanup(eventProcessor); + assertThat(readProjection()).isEqualTo(1); + } + @Override + @Test + public void required_eventShouldNotBeConsumedByProjectionEvenIfDataBaseIsBroken(){ + String topic = randomKafkaTopic(); + EventProcessor eventProcessor = eventProcessor(topic); + shutdownDatabase(); + try { + submitValidCommand(eventProcessor, "1"); + }catch (Throwable t){} + sleep(); + cleanup(eventProcessor); + assertThat(readProjection()).isEqualTo(0); + restartDatabase(); + } + + private void sleep() { + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Override public Either> submitValidCommand( EventProcessor eventProcessor, diff --git a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerificationRules.java b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerificationRules.java index a6ee48bc..937fc183 100644 --- a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerificationRules.java +++ b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/DataStoreVerificationRules.java @@ -19,6 +19,7 @@ public interface DataStoreVerificationRules readState(EventProcessor eventProcessor, String id); void submitDeleteCommand(EventProcessor eventProcessor, String id); List> readPublishedEvents(String kafkaBootstrapUrl, String topic); + Integer readProjection(); void shutdownBroker(); void restartBroker(); void shutdownDatabase(); @@ -38,6 +39,11 @@ public interface DataStoreVerificationRules> readFromDataStore(EventStore eventStore); default void cleanup( diff --git a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestProjection.java b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestProjection.java new file mode 100644 index 00000000..fd667c44 --- /dev/null +++ b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestProjection.java @@ -0,0 +1,32 @@ +package fr.maif.eventsourcing.datastore; + +import fr.maif.eventsourcing.EventEnvelope; +import fr.maif.eventsourcing.Projection; +import io.vavr.Tuple; +import io.vavr.Tuple0; +import io.vavr.collection.List; +import io.vavr.concurrent.Future; + +import java.sql.Connection; + +public class TestProjection implements Projection { + private int counter = 0; + + @Override + public Future storeProjection(Connection connection, List> envelopes) { + return Future.of(() -> { + envelopes.forEach(envelope -> { + if (envelope.event instanceof TestEvent.SimpleEvent) { + counter++; + } + }); + return Tuple.empty(); + }); + } + + public int getCount() { + return counter; + } + + +} diff --git a/thoth-tck/src/test/java/fr/maif/eventsourcing/datastore/InMemoryDataStoreTest.java b/thoth-tck/src/test/java/fr/maif/eventsourcing/datastore/InMemoryDataStoreTest.java index d648ae70..097ad5aa 100644 --- a/thoth-tck/src/test/java/fr/maif/eventsourcing/datastore/InMemoryDataStoreTest.java +++ b/thoth-tck/src/test/java/fr/maif/eventsourcing/datastore/InMemoryDataStoreTest.java @@ -4,6 +4,7 @@ import java.util.concurrent.ExecutionException; import java.util.function.Function; +import fr.maif.eventsourcing.Projection; import org.mockito.Mockito; import org.testng.annotations.BeforeMethod; @@ -22,6 +23,7 @@ public class InMemoryDataStoreTest extends DataStoreVerification { public InMemoryEventStore eventStore; public EventProcessor eventProcessor; + @BeforeMethod(alwaysRun = true) public void init() { this.eventStore = Mockito.spy(InMemoryEventStore.create(actorSystem)); @@ -46,6 +48,24 @@ public List> readPublishedEvents(String } } + @Override + public Integer readProjection() { + // Not implemented for in memory + return null; + } + + @Override + public void required_eventShouldBeConsumedByProjectionWhenEverythingIsAlright(){ + // Not implemented for in memory + } + @Override + public void required_eventShouldBeConsumedByProjectionEvenIfBrokerIsDownAtFirst(){ + // Not implemented for in memory + } + @Override + public void required_eventShouldNotBeConsumedByProjectionEvenIfDataBaseIsBroken(){ + // Not implemented for in memory + } @Override public void required_commandSubmissionShouldFailIfDatabaseIsNotAvailable() { // Not implemented for in memory