Skip to content

Commit

Permalink
test projection
Browse files Browse the repository at this point in the history
  • Loading branch information
MatthieuMAIF committed Jun 1, 2021
1 parent 251798b commit ce4d24e
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 10 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
target
.bsp/
.DS_Store
.bloop
.bloop
*.iml
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,7 @@ public Source<EventEnvelope<E, Meta, Context>, NotUsed> loadEventsByQuery(Tuple0
@Override
public Source<EventEnvelope<E, Meta, Context>, NotUsed> loadEventsByQuery(Query query) {
return Source.from(eventStore)
.filter(e -> {
return Option.of(query.entityId).map(id -> id.equals(e.entityId)).getOrElse(true);
});
.filter(e -> Option.of(query.entityId).map(id -> id.equals(e.entityId)).getOrElse(true));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;

import fr.maif.eventsourcing.Projection;
import fr.maif.eventsourcing.datastore.TestProjection;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
Expand Down Expand Up @@ -58,6 +60,7 @@ public class JooqKafkaTckImplementation extends DataStoreVerification<Connection
private TestEventFormat eventFormat;
private PostgreSQLContainer postgres;
private KafkaContainer kafka;
private Projection testProjection;

private final String SCHEMA = "CREATE TABLE IF NOT EXISTS test_journal (\n" +
" id UUID primary key,\n" +
Expand Down Expand Up @@ -90,6 +93,7 @@ public void tearDown() throws InterruptedException {

@BeforeClass(alwaysRun = true)
public void initClass() {

this.tableNames = new TableNames("test_journal", "test_sequence_num");
this.eventFormat = new TestEventFormat();

Expand All @@ -102,6 +106,7 @@ public void initClass() {

@BeforeMethod(alwaysRun = true)
public void init() throws SQLException {
this.testProjection = new TestProjection();
this.dataSource = new PGSimpleDataSource();
dataSource.setUrl(postgres.getJdbcUrl());
dataSource.setUser(postgres.getUsername());
Expand All @@ -128,7 +133,7 @@ public EventProcessor<String, TestState, TestCommand, TestEvent, Connection, Tup
.withEventHandler(new TestEventHandler())
.withDefaultAggregateStore()
.withCommandHandler(new TestCommandHandler<>())
.withNoProjections()
.withProjections(this.testProjection)
.build();


Expand Down Expand Up @@ -228,6 +233,11 @@ public List<EventEnvelope<TestEvent, Tuple0, Tuple0>> readPublishedEvents(String
return envelopes;
}

@Override
public Integer readProjection() {
return ((TestProjection)this.testProjection).getCount();
}

private static Optional<Long> getEndOffsetIfNotReached(String topic, String kafkaServers, String groupId) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.vavr.control.Either;
import io.vavr.control.Option;

import io.vavr.control.Try;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
Expand Down Expand Up @@ -168,11 +169,7 @@ public void required_eventShouldBePublishedEventIfBrokerIsDownAtFirst() {
submitValidCommand(eventProcessor, "1");

restartBroker();
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
sleep();
List<EventEnvelope<TestEvent, Tuple0, Tuple0>> envelopes = deduplicateOnId(readPublishedEvents(kafkaBootstrapUrl(), topic));

cleanup(eventProcessor);
Expand All @@ -199,6 +196,55 @@ public void required_commandSubmissionShouldFailIfDatabaseIsNotAvailable() {
}
}


@Override
@Test
public void required_eventShouldBeConsumedByProjectionWhenEverythingIsAlright(){
String topic = randomKafkaTopic();
EventProcessor<String, TestState, TestCommand, TestEvent, TxCtx, Tuple0, Tuple0, Tuple0> eventProcessor = eventProcessor(topic);
submitValidCommand(eventProcessor, "1");
sleep();

cleanup(eventProcessor);
assertThat(readProjection()).isEqualTo(1);
}
@Override
@Test
public void required_eventShouldBeConsumedByProjectionEvenIfBrokerIsDownAtFirst(){
String topic = randomKafkaTopic();
EventProcessor<String, TestState, TestCommand, TestEvent, TxCtx, Tuple0, Tuple0, Tuple0> eventProcessor = eventProcessor(topic);
shutdownBroker();
submitValidCommand(eventProcessor, "1");
sleep();
restartBroker();
sleep();
cleanup(eventProcessor);
assertThat(readProjection()).isEqualTo(1);
}
@Override
@Test
public void required_eventShouldNotBeConsumedByProjectionEvenIfDataBaseIsBroken(){
String topic = randomKafkaTopic();
EventProcessor<String, TestState, TestCommand, TestEvent, TxCtx, Tuple0, Tuple0, Tuple0> eventProcessor = eventProcessor(topic);
shutdownDatabase();
try {
submitValidCommand(eventProcessor, "1");
}catch (Throwable t){}
sleep();
cleanup(eventProcessor);
assertThat(readProjection()).isEqualTo(0);
restartDatabase();
}

private void sleep() {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}


@Override
public Either<String, ProcessingSuccess<TestState, TestEvent, Tuple0, Tuple0, Tuple0>> submitValidCommand(
EventProcessor<String, TestState, TestCommand, TestEvent, TxCtx, Tuple0, Tuple0, Tuple0> eventProcessor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public interface DataStoreVerificationRules<Ste extends State, Evt extends Event
Option<Ste> readState(EventProcessor<String, TestState, TestCommand, TestEvent, TxCtx, Tuple0, Tuple0, Tuple0> eventProcessor, String id);
void submitDeleteCommand(EventProcessor<String, TestState, TestCommand, TestEvent, TxCtx, Tuple0, Tuple0, Tuple0> eventProcessor, String id);
List<EventEnvelope<TestEvent, Tuple0, Tuple0>> readPublishedEvents(String kafkaBootstrapUrl, String topic);
Integer readProjection();
void shutdownBroker();
void restartBroker();
void shutdownDatabase();
Expand All @@ -38,6 +39,11 @@ public interface DataStoreVerificationRules<Ste extends State, Evt extends Event
void required_eventShouldBePublishedEventIfBrokerIsDownAtFirst();
void required_commandSubmissionShouldFailIfDatabaseIsNotAvailable();

void required_eventShouldBeConsumedByProjectionWhenEverythingIsAlright();
void required_eventShouldBeConsumedByProjectionEvenIfBrokerIsDownAtFirst();
void required_eventShouldNotBeConsumedByProjectionEvenIfDataBaseIsBroken();


List<EventEnvelope<Evt, Meta, Context>> readFromDataStore(EventStore<TxCtx, TestEvent, Tuple0, Tuple0> eventStore);

default void cleanup(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package fr.maif.eventsourcing.datastore;

import fr.maif.eventsourcing.EventEnvelope;
import fr.maif.eventsourcing.Projection;
import io.vavr.Tuple;
import io.vavr.Tuple0;
import io.vavr.collection.List;
import io.vavr.concurrent.Future;

import java.sql.Connection;

public class TestProjection implements Projection<Connection, TestEvent, Tuple0, Tuple0> {
private int counter = 0;

@Override
public Future<Tuple0> storeProjection(Connection connection, List<EventEnvelope<TestEvent, Tuple0, Tuple0>> envelopes) {
return Future.of(() -> {
envelopes.forEach(envelope -> {
if (envelope.event instanceof TestEvent.SimpleEvent) {
counter++;
}
});
return Tuple.empty();
});
}

public int getCount() {
return counter;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.concurrent.ExecutionException;
import java.util.function.Function;

import fr.maif.eventsourcing.Projection;
import org.mockito.Mockito;
import org.testng.annotations.BeforeMethod;

Expand All @@ -22,6 +23,7 @@ public class InMemoryDataStoreTest extends DataStoreVerification<Tuple0> {
public InMemoryEventStore<TestEvent, Tuple0, Tuple0> eventStore;
public EventProcessor<String, TestState, TestCommand, TestEvent, Tuple0, Tuple0, Tuple0, Tuple0> eventProcessor;


@BeforeMethod(alwaysRun = true)
public void init() {
this.eventStore = Mockito.spy(InMemoryEventStore.create(actorSystem));
Expand All @@ -46,6 +48,24 @@ public List<EventEnvelope<TestEvent, Tuple0, Tuple0>> readPublishedEvents(String
}
}

@Override
public Integer readProjection() {
// Not implemented for in memory
return null;
}

@Override
public void required_eventShouldBeConsumedByProjectionWhenEverythingIsAlright(){
// Not implemented for in memory
}
@Override
public void required_eventShouldBeConsumedByProjectionEvenIfBrokerIsDownAtFirst(){
// Not implemented for in memory
}
@Override
public void required_eventShouldNotBeConsumedByProjectionEvenIfDataBaseIsBroken(){
// Not implemented for in memory
}
@Override
public void required_commandSubmissionShouldFailIfDatabaseIsNotAvailable() {
// Not implemented for in memory
Expand Down

0 comments on commit ce4d24e

Please sign in to comment.