diff --git a/build.gradle b/build.gradle index 034539ab..e1beb569 100644 --- a/build.gradle +++ b/build.gradle @@ -39,6 +39,7 @@ subprojects { reactorKafkaVersion = "1.3.22" reactorVersion = "3.5.7" vertxSqlVersion = "4.3.3" + testContainerVersion = "1.19.8" } test { diff --git a/demo/demo-in-memory/src/main/java/com/example/demo/Account.java b/demo/demo-in-memory/src/main/java/com/example/demo/Account.java index 40937d1f..b2564194 100644 --- a/demo/demo-in-memory/src/main/java/com/example/demo/Account.java +++ b/demo/demo-in-memory/src/main/java/com/example/demo/Account.java @@ -14,6 +14,11 @@ public Long sequenceNum() { return sequenceNum; } + @Override + public String entityId() { + return id; + } + @Override public Account withSequenceNum(Long sequenceNum) { this.sequenceNum = sequenceNum; diff --git a/demo/demo-postgres-kafka-reactive/src/main/java/com/example/demo/Account.java b/demo/demo-postgres-kafka-reactive/src/main/java/com/example/demo/Account.java index d8ed6d12..ae9162d3 100644 --- a/demo/demo-postgres-kafka-reactive/src/main/java/com/example/demo/Account.java +++ b/demo/demo-postgres-kafka-reactive/src/main/java/com/example/demo/Account.java @@ -13,6 +13,11 @@ public String getId() { return id; } + @Override + public String entityId() { + return id; + } + public BigDecimal getBalance() { return balance; } diff --git a/demo/demo-postgres-kafka/src/main/java/com/example/demo/Account.java b/demo/demo-postgres-kafka/src/main/java/com/example/demo/Account.java index 40937d1f..b2564194 100644 --- a/demo/demo-postgres-kafka/src/main/java/com/example/demo/Account.java +++ b/demo/demo-postgres-kafka/src/main/java/com/example/demo/Account.java @@ -14,6 +14,11 @@ public Long sequenceNum() { return sequenceNum; } + @Override + public String entityId() { + return id; + } + @Override public Account withSequenceNum(Long sequenceNum) { this.sequenceNum = sequenceNum; diff --git a/sample/src/main/java/fr/maif/thoth/sample/state/Account.java b/sample/src/main/java/fr/maif/thoth/sample/state/Account.java index 1b6b526b..5f8ead87 100644 --- a/sample/src/main/java/fr/maif/thoth/sample/state/Account.java +++ b/sample/src/main/java/fr/maif/thoth/sample/state/Account.java @@ -9,6 +9,11 @@ public class Account extends AbstractState { public BigDecimal balance; public long sequenceNum; + @Override + public String entityId() { + return id; + } + public static class AccountBuilder{ String id; BigDecimal balance; diff --git a/thoth-core-akka/src/main/java/fr/maif/akka/eventsourcing/InMemoryEventStore.java b/thoth-core-akka/src/main/java/fr/maif/akka/eventsourcing/InMemoryEventStore.java index e363e307..9828f1f8 100644 --- a/thoth-core-akka/src/main/java/fr/maif/akka/eventsourcing/InMemoryEventStore.java +++ b/thoth-core-akka/src/main/java/fr/maif/akka/eventsourcing/InMemoryEventStore.java @@ -96,6 +96,11 @@ public CompletionStage nextSequence(Tuple0 tx) { return CompletionStages.completedStage(sequence_num.incrementAndGet()); } + @Override + public CompletionStage> nextSequences(Tuple0 tx, Integer count) { + return CompletionStages.completedStage(List.range(0, count).map(any -> sequence_num.incrementAndGet())); + } + @Override public CompletionStage publish(List> events) { events.forEach(queue::offer); diff --git a/thoth-core-akka/src/test/java/fr/maif/Helpers.java b/thoth-core-akka/src/test/java/fr/maif/Helpers.java index aefbc268..01e1dd47 100644 --- a/thoth-core-akka/src/test/java/fr/maif/Helpers.java +++ b/thoth-core-akka/src/test/java/fr/maif/Helpers.java @@ -354,6 +354,11 @@ public Viking(String id, String name, Long sequenceNum) { this.sequenceNum = sequenceNum; } + @Override + public String entityId() { + return id; + } + @Override public Long sequenceNum() { return sequenceNum; diff --git a/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/InMemoryEventStore.java b/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/InMemoryEventStore.java index fa4f6b27..c5419d10 100644 --- a/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/InMemoryEventStore.java +++ b/thoth-core-reactor/src/main/java/fr/maif/reactor/eventsourcing/InMemoryEventStore.java @@ -140,6 +140,11 @@ public CompletionStage nextSequence(InMemoryEventStore.Transaction> nextSequences(InMemoryEventStore.Transaction tx, Integer count) { + return CompletionStages.traverse(List.range(0, count), c -> nextSequence(tx)); + } + @Override public CompletionStage publish(List> events) { events.forEach(e -> store.put(e.sequenceNum, e)); diff --git a/thoth-core-reactor/src/test/java/fr/maif/Helpers.java b/thoth-core-reactor/src/test/java/fr/maif/Helpers.java index 5a6b8120..201e4a1b 100644 --- a/thoth-core-reactor/src/test/java/fr/maif/Helpers.java +++ b/thoth-core-reactor/src/test/java/fr/maif/Helpers.java @@ -415,6 +415,11 @@ public Viking(String id, String name, Integer age, Long sequenceNum) { this.sequenceNum = sequenceNum; } + @Override + public String entityId() { + return id; + } + @Override public Long sequenceNum() { return sequenceNum; diff --git a/thoth-core/src/main/java/fr/maif/eventsourcing/AbstractState.java b/thoth-core/src/main/java/fr/maif/eventsourcing/AbstractState.java index 94e1d22a..d3d70257 100644 --- a/thoth-core/src/main/java/fr/maif/eventsourcing/AbstractState.java +++ b/thoth-core/src/main/java/fr/maif/eventsourcing/AbstractState.java @@ -1,6 +1,6 @@ package fr.maif.eventsourcing; -public class AbstractState implements State { +public abstract class AbstractState implements State { protected long sequenceNum; @Override diff --git a/thoth-core/src/main/java/fr/maif/eventsourcing/AggregateStore.java b/thoth-core/src/main/java/fr/maif/eventsourcing/AggregateStore.java index cfcf7045..76630c48 100644 --- a/thoth-core/src/main/java/fr/maif/eventsourcing/AggregateStore.java +++ b/thoth-core/src/main/java/fr/maif/eventsourcing/AggregateStore.java @@ -3,7 +3,9 @@ import fr.maif.concurrent.CompletionStages; import io.vavr.Tuple; import io.vavr.Tuple0; +import io.vavr.collection.HashMap; import io.vavr.collection.List; +import io.vavr.collection.Map; import io.vavr.control.Option; import java.util.concurrent.CompletionStage; @@ -12,6 +14,11 @@ public interface AggregateStore, Id, TxCtx> { CompletionStage> getAggregate(Id entityId); + default CompletionStage>> getAggregates(TxCtx ctx, List entityIds) { + return CompletionStages.traverse(entityIds, id -> getAggregate(ctx, id).thenApply(agg -> Tuple.of(id, agg))) + .thenApply(HashMap::ofEntries); + } + CompletionStage> getAggregate(TxCtx ctx, Id entityId); default CompletionStage storeSnapshot(TxCtx transactionContext, Id id, Option state) { @@ -22,6 +29,10 @@ default CompletionStage> getSnapshot(TxCtx transactionContext, Id id) return CompletionStages.completedStage(Option.none()); } + default CompletionStage> getSnapshots(TxCtx transactionContext, List ids) { + return CompletionStages.completedStage(List.empty()); + } + default CompletionStage> buildAggregateAndStoreSnapshot(TxCtx ctx, EventHandler eventHandler, Option state, Id id, List events, Option lastSequenceNum) { Option newState = eventHandler.deriveState(state, events.filter(event -> event.entityId().equals(id))); diff --git a/thoth-core/src/main/java/fr/maif/eventsourcing/EventProcessorImpl.java b/thoth-core/src/main/java/fr/maif/eventsourcing/EventProcessorImpl.java index 7611d075..28cd1b08 100644 --- a/thoth-core/src/main/java/fr/maif/eventsourcing/EventProcessorImpl.java +++ b/thoth-core/src/main/java/fr/maif/eventsourcing/EventProcessorImpl.java @@ -9,6 +9,7 @@ import io.vavr.Tuple3; import io.vavr.Value; import io.vavr.collection.List; +import io.vavr.collection.Map; import io.vavr.collection.Seq; import io.vavr.control.Either; import io.vavr.control.Option; @@ -20,12 +21,14 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.function.BiFunction; +import java.util.function.Function; import java.util.function.Supplier; import static io.vavr.API.List; import static io.vavr.API.None; import static io.vavr.API.Tuple; import static fr.maif.concurrent.CompletionStages.traverse; +import static java.util.function.Function.identity; public class EventProcessorImpl, C extends Command, E extends Event, TxCtx, Message, Meta, Context> implements EventProcessor { @@ -68,90 +71,92 @@ public CompletionStage>>>> batchProcessCommand(TxCtx ctx, List commands) { // Collect all states from db - return traverseCommands(commands, (c, events) -> - this.getCurrentState(ctx, c, events).thenCompose(mayBeState -> - //handle command with state to get events - handleCommand(ctx, mayBeState, c) - // Return command + state + (error or events) - .thenApply(r -> Tuple(c, mayBeState, r)) - ) - ) - .thenCompose(commandsAndResults -> { - // Extract errors from command handling - List>> errors = commandsAndResults - .map(Tuple3::_3) - .filter(Either::isLeft) - .map(e -> Either.left(e.swap().get())); - - // Extract success and generate envelopes for each result - CompletionStage> success = traverse(commandsAndResults.filter(t -> t._3.isRight()), t -> { - C command = t._1; - Option mayBeState = t._2; - List events = t._3.get().events.toList(); - return buildEnvelopes(ctx, command, events).thenApply(eventEnvelopes -> { - Option mayBeLastSeqNum = eventEnvelopes.lastOption().map(evl -> evl.sequenceNum); - return new CommandStateAndEvent(command, mayBeState, eventEnvelopes, events, t._3.get().message, mayBeLastSeqNum); - }); - }); - - return success.thenApply(s -> Tuple(s.toList(), errors)); - }) - .thenCompose(successAndErrors -> { - - List>> errors = successAndErrors._2; - List success = successAndErrors._1; - - // Get all envelopes - List> envelopes = success.flatMap(CommandStateAndEvent::getEventEnvelopes); - - CompletionStage>> stored = eventStore - // Persist all envelopes - .persist(ctx, envelopes) - .thenCompose(__ -> - // Persist states - traverse(success, s -> { - LOGGER.debug("Storing state {} to DB", s); - List sequences = envelopes.filter(env -> env.entityId.equals(s.command.entityId().get())).map(env -> env.sequenceNum); - return aggregateStore - .buildAggregateAndStoreSnapshot( - ctx, - eventHandler, - s.getState(), - s.getCommand().entityId().get(), - s.getEvents(), - sequences.max() - ) - .thenApply(mayBeNextState -> - new ProcessingSuccess<>(s.state, mayBeNextState, s.getEventEnvelopes(), s.getMessage()) - ); - }) - ) - .thenCompose(mayBeNextState -> - // Apply events to projections - traverse(projections, p -> { - LOGGER.debug("Applying envelopes {} to projection", envelopes); - return p.storeProjection(ctx, envelopes); - }) - .thenApply(__ -> mayBeNextState) - ); - return stored.thenApply(results -> - errors.appendAll(results.map(Either::right)) - ); - }) - .thenApply(results -> { - Supplier> postTransactionProcess = () -> { - List> envelopes = results.flatMap(Value::toList).flatMap(ProcessingSuccess::getEvents); - LOGGER.debug("Publishing events {} to kafka", envelopes); - return eventStore.publish(envelopes) - .thenApply(__ -> Tuple.empty()) - .exceptionally(e -> Tuple.empty()); - }; - var inTransactionResult = new InTransactionResult<>( - results, - postTransactionProcess - ); - return inTransactionResult; - }); + return aggregateStore.getAggregates(ctx, commands.filter(c -> c.hasId()).map(c -> c.entityId().get())) + .thenCompose(states -> + traverseCommands(commands, (c, events) -> { + //handle command with state to get events + Option mayBeState = this.getCurrentState(ctx, states, c, events); + return handleCommand(ctx, mayBeState, c) + // Return command + state + (error or events) + .thenApply(r -> Tuple(c, mayBeState, r)); + }) + .thenCompose(commandsAndResults -> { + // Extract errors from command handling + List>> errors = commandsAndResults + .map(Tuple3::_3) + .filter(Either::isLeft) + .map(e -> Either.left(e.swap().get())); + + // Extract success and generate envelopes for each result + CompletionStage> success = traverse(commandsAndResults.filter(t -> t._3.isRight()), t -> { + C command = t._1; + Option mayBeState = t._2; + List events = t._3.get().events.toList(); + return buildEnvelopes(ctx, command, events).thenApply(eventEnvelopes -> { + Option mayBeLastSeqNum = eventEnvelopes.lastOption().map(evl -> evl.sequenceNum); + return new CommandStateAndEvent(command, mayBeState, eventEnvelopes, events, t._3.get().message, mayBeLastSeqNum); + }); + }); + + return success.thenApply(s -> Tuple(s.toList(), errors)); + }) + .thenCompose(successAndErrors -> { + + List>> errors = successAndErrors._2; + List success = successAndErrors._1; + + // Get all envelopes + List> envelopes = success.flatMap(CommandStateAndEvent::getEventEnvelopes); + + CompletionStage>> stored = eventStore + // Persist all envelopes + .persist(ctx, envelopes) + .thenCompose(__ -> + // Persist states + traverse(success, s -> { + LOGGER.debug("Storing state {} to DB", s); + List sequences = envelopes.filter(env -> env.entityId.equals(s.command.entityId().get())).map(env -> env.sequenceNum); + return aggregateStore + .buildAggregateAndStoreSnapshot( + ctx, + eventHandler, + s.getState(), + s.getCommand().entityId().get(), + s.getEvents(), + sequences.max() + ) + .thenApply(mayBeNextState -> + new ProcessingSuccess<>(s.state, mayBeNextState, s.getEventEnvelopes(), s.getMessage()) + ); + }) + ) + .thenCompose(mayBeNextState -> + // Apply events to projections + traverse(projections, p -> { + LOGGER.debug("Applying envelopes {} to projection", envelopes); + return p.storeProjection(ctx, envelopes); + }) + .thenApply(__ -> mayBeNextState) + ); + return stored.thenApply(results -> + errors.appendAll(results.map(Either::right)) + ); + }) + .thenApply(results -> { + Supplier> postTransactionProcess = () -> { + List> envelopes = results.flatMap(Value::toList).flatMap(ProcessingSuccess::getEvents); + LOGGER.debug("Publishing events {} to kafka", envelopes); + return eventStore.publish(envelopes) + .thenApply(__ -> Tuple.empty()) + .exceptionally(e -> Tuple.empty()); + }; + var inTransactionResult = new InTransactionResult<>( + results, + postTransactionProcess + ); + return inTransactionResult; + }) + ); } public CompletionStage, Either>>>> traverseCommands(List elements, BiFunction, CompletionStage, Either>>>> handler) { @@ -160,9 +165,9 @@ public CompletionStage, Either fResult.thenCompose(listResult -> handler.apply(elt, listResult._2.flatMap(e -> e.events)) .thenApply(r -> - Tuple( - listResult._1.append(r), - listResult._2.append(r._3.getOrElse(Events.empty()))) + Tuple( + listResult._1.append(r), + listResult._2.append(r._3.getOrElse(Events.empty()))) )) ).thenApply(t -> t._1); } @@ -170,53 +175,49 @@ public CompletionStage, Either>> buildEnvelopes(TxCtx tx, C command, List events) { String transactionId = transactionManager.transactionId(); int nbMessages = events.length(); - return traverse(events.zipWithIndex(), - t -> buildEnvelope(tx, command, t._1, t._2, nbMessages, transactionId) - ).thenApply(Value::toList); + return eventStore.nextSequences(tx, events.size()).thenApply(s -> + events.zip(s).zipWithIndex().map(t -> + buildEnvelope(tx, command, t._1._1, t._1._2, t._2, nbMessages, transactionId) + ) + ); } private CompletionStage>> handleCommand(TxCtx txCtx, Option state, C command) { return commandHandler.handleCommand(txCtx, state, command); } - private CompletionStage> getCurrentState(TxCtx ctx, C command, List previousEvent) { + private Option getCurrentState(TxCtx ctx, Map> states, C command, List previousEvent) { if (command.hasId()) { String entityId = command.entityId().get(); - return aggregateStore.getAggregate(ctx, entityId) - .thenApply(state -> - eventHandler.deriveState(state, previousEvent.filter(e -> e.entityId().equals(entityId))) - ); + return eventHandler.deriveState(states.get(entityId).flatMap(identity()), previousEvent.filter(e -> e.entityId().equals(entityId))); } else { - return CompletionStages.successful(None()); + return None(); } } - private CompletionStage> buildEnvelope(TxCtx tx, Command command, E event, Integer numMessage, Integer nbMessages, String transactionId) { + private EventEnvelope buildEnvelope(TxCtx tx, Command command, E event, Long nextSequence, Integer numMessage, Integer nbMessages, String transactionId) { LOGGER.debug("Writing event {} to envelope", event); - UUID id = UUIDgen.generate(); - return eventStore.nextSequence(tx).thenApply(nextSequence -> { - EventEnvelope.Builder builder = EventEnvelope.builder() - .withId(id) - .withEmissionDate(LocalDateTime.now()) - .withEntityId(event.entityId()) - .withSequenceNum(nextSequence) - .withEventType(event.type().name()) - .withVersion(event.type().version()) - .withTotalMessageInTransaction(nbMessages) - .withNumMessageInTransaction(numMessage + 1) - .withTransactionId(transactionId) - .withEvent(event); - - command.context().forEach(builder::withContext); - command.userId().forEach(builder::withUserId); - command.systemId().forEach(builder::withSystemId); - command.metadata().forEach(builder::withMetadata); - - return builder.build(); - }); + EventEnvelope.Builder builder = EventEnvelope.builder() + .withId(id) + .withEmissionDate(LocalDateTime.now()) + .withEntityId(event.entityId()) + .withSequenceNum(nextSequence) + .withEventType(event.type().name()) + .withVersion(event.type().version()) + .withTotalMessageInTransaction(nbMessages) + .withNumMessageInTransaction(numMessage + 1) + .withTransactionId(transactionId) + .withEvent(event); + + command.context().forEach(builder::withContext); + command.userId().forEach(builder::withUserId); + command.systemId().forEach(builder::withSystemId); + command.metadata().forEach(builder::withMetadata); + + return builder.build(); } @Override diff --git a/thoth-core/src/main/java/fr/maif/eventsourcing/EventStore.java b/thoth-core/src/main/java/fr/maif/eventsourcing/EventStore.java index 93499169..adbfe41d 100644 --- a/thoth-core/src/main/java/fr/maif/eventsourcing/EventStore.java +++ b/thoth-core/src/main/java/fr/maif/eventsourcing/EventStore.java @@ -2,6 +2,7 @@ import fr.maif.concurrent.CompletionStages; import io.vavr.Tuple0; +import io.vavr.Tuple2; import io.vavr.Value; import io.vavr.collection.List; import io.vavr.control.Option; @@ -33,6 +34,8 @@ default Publisher> loadAllEvents() { CompletionStage nextSequence(TxCtx tx); + CompletionStage> nextSequences(TxCtx tx, Integer count); + CompletionStage publish(List> events); CompletionStage> markAsPublished(TxCtx tx, EventEnvelope eventEnvelope); @@ -76,6 +79,7 @@ class Query { public final Long sequenceFrom; public final Long sequenceTo; public final Boolean published; + public final List> idsAndSequences; private Query(Query.Builder builder) { this.dateFrom = builder.dateFrom; @@ -87,6 +91,7 @@ private Query(Query.Builder builder) { this.published = builder.published; this.sequenceFrom = builder.sequenceFrom; this.sequenceTo = builder.sequenceTo; + this.idsAndSequences = Objects.requireNonNullElse(builder.idsAndSequences, List.empty()); } public static Builder builder() { @@ -125,6 +130,10 @@ public Option sequenceTo() { return Option.of(sequenceTo); } + public List> idsAndSequences() { + return idsAndSequences; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -156,6 +165,7 @@ public static class Builder { Boolean published; Long sequenceFrom; Long sequenceTo; + List> idsAndSequences; public Builder withDateFrom(LocalDateTime dateFrom) { this.dateFrom = dateFrom; @@ -201,6 +211,10 @@ public Builder withSequenceTo(Long sequenceTo) { this.sequenceTo = sequenceTo; return this; } + public Builder withIdsAndSequences(List> idsAndSequences) { + this.idsAndSequences = idsAndSequences; + return this; + } public Query build() { return new Query(this); diff --git a/thoth-core/src/main/java/fr/maif/eventsourcing/State.java b/thoth-core/src/main/java/fr/maif/eventsourcing/State.java index 41d8bc66..f3b3be45 100644 --- a/thoth-core/src/main/java/fr/maif/eventsourcing/State.java +++ b/thoth-core/src/main/java/fr/maif/eventsourcing/State.java @@ -2,6 +2,8 @@ public interface State { + String entityId(); + Long sequenceNum(); S withSequenceNum(Long sequenceNum); diff --git a/thoth-core/src/main/java/fr/maif/eventsourcing/impl/AbstractDefaultAggregateStore.java b/thoth-core/src/main/java/fr/maif/eventsourcing/impl/AbstractDefaultAggregateStore.java index 3f019f1a..3ec0020c 100644 --- a/thoth-core/src/main/java/fr/maif/eventsourcing/impl/AbstractDefaultAggregateStore.java +++ b/thoth-core/src/main/java/fr/maif/eventsourcing/impl/AbstractDefaultAggregateStore.java @@ -7,12 +7,20 @@ import fr.maif.eventsourcing.EventStore; import fr.maif.eventsourcing.State; import fr.maif.eventsourcing.TransactionManager; +import io.vavr.Tuple; +import io.vavr.Tuple2; +import io.vavr.collection.HashMap; +import io.vavr.collection.List; +import io.vavr.collection.Map; +import io.vavr.collection.Traversable; import io.vavr.control.Option; import org.reactivestreams.Publisher; import java.util.concurrent.CompletionStage; import java.util.function.BiFunction; +import static java.util.function.Function.identity; + public abstract class AbstractDefaultAggregateStore, E extends Event, Meta, Context, TxCtx> implements AggregateStore { private final EventStore eventStore; @@ -31,6 +39,29 @@ public CompletionStage> getAggregate(String entityId) { } @Override + public CompletionStage>> getAggregates(TxCtx ctx, List entityIds) { + return this.getSnapshots(ctx, entityIds) + .thenCompose(snapshots -> { + Map indexed = snapshots.groupBy(State::entityId).mapValues(Traversable::head); + List> idsAndSeqNums = entityIds.map(id -> Tuple.of(id, indexed.get(id).map(s -> s.sequenceNum()).getOrElse(0L))); + Map> empty = HashMap.ofEntries(entityIds.map(id -> Tuple.of(id, indexed.get(id)))); + EventStore.Query query = EventStore.Query.builder().withIdsAndSequences(idsAndSeqNums).build(); + Publisher> events = this.eventStore.loadEventsByQuery(ctx, query); + return fold(events, + empty, + (Map> states, EventEnvelope event) -> { + Option mayBeCurrentState = states.get(event.entityId).flatMap(identity()); + return states.put( + event.entityId, + this.eventEventHandler + .applyEvent(mayBeCurrentState, event.event) + .map((S state) -> (S) state.withSequenceNum(event.sequenceNum)) + ); + } + ); + }); + } + public CompletionStage> getAggregate(TxCtx ctx, String entityId) { return this.getSnapshot(ctx, entityId) @@ -43,7 +74,8 @@ public CompletionStage> getAggregate(TxCtx ctx, String entityId) { s -> EventStore.Query.builder().withSequenceFrom(s.sequenceNum()).withEntityId(entityId).build() ); - return fold(this.eventStore.loadEventsByQuery(ctx, query), + Publisher> events = this.eventStore.loadEventsByQuery(ctx, query); + return fold(events, mayBeSnapshot, (Option mayBeState, EventEnvelope event) -> this.eventEventHandler.applyEvent(mayBeState, event.event) diff --git a/thoth-core/src/test/java/fr/maif/Helpers.java b/thoth-core/src/test/java/fr/maif/Helpers.java index 5b6eb76c..ff875d47 100644 --- a/thoth-core/src/test/java/fr/maif/Helpers.java +++ b/thoth-core/src/test/java/fr/maif/Helpers.java @@ -345,6 +345,11 @@ public Long sequenceNum() { return sequenceNum; } + @Override + public String entityId() { + return id; + } + @Override public Viking withSequenceNum(Long sequenceNum) { this.sequenceNum = sequenceNum; diff --git a/thoth-jooq-akka/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java b/thoth-jooq-akka/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java index b7d53fd3..46a20a02 100644 --- a/thoth-jooq-akka/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java +++ b/thoth-jooq-akka/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java @@ -21,6 +21,7 @@ import io.vavr.Tuple0; import io.vavr.collection.List; import io.vavr.collection.Seq; +import io.vavr.collection.Traversable; import io.vavr.concurrent.Future; import io.vavr.control.Either; import io.vavr.control.Option; @@ -228,7 +229,10 @@ public Publisher> loadEventsByQuery(PgAsyncTrans query.userId().map(USER_ID::eq), query.published().map(PUBLISHED::eq), query.sequenceTo().map(SEQUENCE_NUM::le), - query.sequenceFrom().map(SEQUENCE_NUM::ge) + query.sequenceFrom().map(SEQUENCE_NUM::ge), + Option.of(query.idsAndSequences()).filter(Traversable::nonEmpty).map(l -> + l.map(t -> SEQUENCE_NUM.gt(t._2).and(ENTITY_ID.eq(t._1))).reduce(Condition::or) + ) ).flatMap(identity()); return Source.fromPublisher(tx.stream(500, dsl -> { @@ -286,6 +290,13 @@ public CompletionStage nextSequence(PgAsyncTransaction tx) { ).thenApply(mayBeResult -> mayBeResult.map(r -> r.get(0, Long.class)).getOrNull()); } + @Override + public CompletionStage> nextSequences(PgAsyncTransaction tx, Integer count) { + return tx.query(dsl -> + dsl.resultQuery("select nextval('" + this.tableNames.sequenceNumName + "') from generate_series(1, {0})", count) + ).thenApply(mayBeResult -> mayBeResult.map(r -> r.get(0, Long.class))); + } + @Override public CompletionStage publish(List> events) { LOGGER.debug("Publishing event {}", events); diff --git a/thoth-jooq-reactor/build.gradle b/thoth-jooq-reactor/build.gradle index e48cd4d9..464a1f27 100644 --- a/thoth-jooq-reactor/build.gradle +++ b/thoth-jooq-reactor/build.gradle @@ -21,6 +21,10 @@ dependencies { testImplementation("org.mockito:mockito-all:1.10.19") testImplementation("org.junit.jupiter:junit-jupiter:5.9.3") testRuntimeOnly 'org.junit.platform:junit-platform-launcher' + testImplementation "org.testcontainers:junit-jupiter:$testContainerVersion" + testImplementation "org.testcontainers:testcontainers:$testContainerVersion" + testImplementation "org.testcontainers:postgresql:$testContainerVersion" + testImplementation 'org.mockito:mockito-core:5.12.0' } test { diff --git a/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/DefaultReactorAggregateStore.java b/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/DefaultReactorAggregateStore.java index a6dc5f41..8bdcff63 100644 --- a/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/DefaultReactorAggregateStore.java +++ b/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/DefaultReactorAggregateStore.java @@ -1,8 +1,18 @@ package fr.maif.eventsourcing; +import io.vavr.Tuple; +import io.vavr.Tuple2; +import io.vavr.collection.HashMap; +import io.vavr.collection.List; +import io.vavr.collection.Map; +import io.vavr.collection.Traversable; import io.vavr.control.Option; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import static java.util.function.Function.identity; + public class DefaultReactorAggregateStore, E extends Event, Meta, Context, TxCtx> implements ReactorAggregateStore { @@ -21,6 +31,30 @@ public Mono> getAggregate(String entityId) { return transactionManager.withTransaction(ctx -> getAggregate(ctx, entityId)); } + @Override + public Mono>> getAggregates(TxCtx txCtx, List entityIds) { + return this.getSnapshots(txCtx, entityIds) + .flatMap(snapshots -> { + Map indexed = snapshots.groupBy(State::entityId).mapValues(Traversable::head); + List> idsAndSeqNums = entityIds.map(id -> Tuple.of(id, indexed.get(id).map(s -> s.sequenceNum()).getOrElse(0L))); + Map> empty = HashMap.ofEntries(entityIds.map(id -> Tuple.of(id, indexed.get(id)))); + EventStore.Query query = EventStore.Query.builder().withIdsAndSequences(idsAndSeqNums).build(); + Flux> events = this.eventStore.loadEventsByQuery(txCtx, query); + return events.reduce( + empty, + (Map> states, EventEnvelope event) -> { + Option mayBeCurrentState = states.get(event.entityId).flatMap(identity()); + return states.put( + event.entityId, + this.eventEventHandler + .applyEvent(mayBeCurrentState, event.event) + .map((S state) -> (S) state.withSequenceNum(event.sequenceNum)) + ); + } + ); + }); + } + @Override public Mono> getAggregate(TxCtx txCtx, String entityId) { diff --git a/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java b/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java index 377927d3..ddd4c6c7 100644 --- a/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java +++ b/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/ReactivePostgresEventStore.java @@ -15,6 +15,7 @@ import io.vavr.Tuple0; import io.vavr.collection.List; import io.vavr.collection.Seq; +import io.vavr.collection.Traversable; import io.vavr.control.Either; import io.vavr.control.Option; import io.vavr.control.Try; @@ -260,7 +261,10 @@ public Publisher> loadEventsByQuery(Tx tx, Query query.userId().map(USER_ID::eq), query.published().map(PUBLISHED::eq), query.sequenceTo().map(SEQUENCE_NUM::le), - query.sequenceFrom().map(SEQUENCE_NUM::ge) + query.sequenceFrom().map(SEQUENCE_NUM::ge), + Option.of(query.idsAndSequences()).filter(Traversable::nonEmpty).map(l -> + l.map(t -> SEQUENCE_NUM.gt(t._2).and(ENTITY_ID.eq(t._1))).reduce(Condition::or) + ) ).flatMap(identity()); return Flux.from(tx.stream(500, dsl -> { @@ -314,6 +318,13 @@ public CompletionStage nextSequence(Tx tx) { ).thenApply(mayBeResult -> mayBeResult.map(r -> r.get(0, Long.class)).getOrNull()); } + @Override + public CompletionStage> nextSequences(Tx tx, Integer count) { + return tx.query(dsl -> + dsl.resultQuery("select nextval('" + this.tableNames.sequenceNumName + "') from generate_series(1, {0})", count) + ).thenApply(mayBeResult -> mayBeResult.map(r -> r.get(0, Long.class))); + } + @Override public CompletionStage publish(List> events) { LOGGER.debug("Publishing event {}", events); diff --git a/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/ReactorAggregateStore.java b/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/ReactorAggregateStore.java index 807f15e0..0b953d4e 100644 --- a/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/ReactorAggregateStore.java +++ b/thoth-jooq-reactor/src/main/java/fr/maif/eventsourcing/ReactorAggregateStore.java @@ -1,8 +1,10 @@ package fr.maif.eventsourcing; +import fr.maif.concurrent.CompletionStages; import io.vavr.Tuple; import io.vavr.Tuple0; import io.vavr.collection.List; +import io.vavr.collection.Map; import io.vavr.control.Option; import reactor.core.publisher.Mono; @@ -14,6 +16,8 @@ public interface ReactorAggregateStore, Id, TxCtx> { Mono> getAggregate(TxCtx ctx, Id entityId); + Mono>> getAggregates(TxCtx ctx, List entityIds); + default Mono storeSnapshot(TxCtx transactionContext, Id id, Option state) { return Mono.just(Tuple.empty()); } @@ -22,6 +26,10 @@ default Mono> getSnapshot(TxCtx transactionContext, Id id) { return Mono.just(Option.none()); } + default Mono> getSnapshots(TxCtx transactionContext, List ids) { + return Mono.just(List.empty()); + } + default Mono> buildAggregateAndStoreSnapshot(TxCtx ctx, EventHandler eventHandler, Option state, Id id, List events, Option lastSequenceNum) { Option newState = eventHandler.deriveState(state, events.filter(event -> event.entityId().equals(id))); @@ -45,6 +53,11 @@ public CompletionStage> getAggregate(Id entityId) { public CompletionStage> getAggregate(TxCtx txCtx, Id entityId) { return _this.getAggregate(txCtx, entityId).toFuture(); } + + @Override + public CompletionStage>> getAggregates(TxCtx txCtx, List entityIds) { + return _this.getAggregates(txCtx, entityIds).toFuture(); + } }; } @@ -61,6 +74,11 @@ public Mono> getAggregate(Id entityId) { public Mono> getAggregate(TxCtx txCtx, Id entityId) { return Mono.fromCompletionStage(() -> aggregateStore.getAggregate(txCtx, entityId)); } + + @Override + public Mono>> getAggregates(TxCtx txCtx, List entityIds) { + return Mono.fromCompletionStage(() -> aggregateStore.getAggregates(txCtx, entityIds)); + } }; } diff --git a/thoth-jooq-reactor/src/test/java/fr/maif/eventsourcing/AbstractPostgresEventStoreTest.java b/thoth-jooq-reactor/src/test/java/fr/maif/eventsourcing/AbstractPostgresEventStoreTest.java index bbd8fd40..c9653388 100644 --- a/thoth-jooq-reactor/src/test/java/fr/maif/eventsourcing/AbstractPostgresEventStoreTest.java +++ b/thoth-jooq-reactor/src/test/java/fr/maif/eventsourcing/AbstractPostgresEventStoreTest.java @@ -16,10 +16,13 @@ import org.jooq.SQLDialect; import org.jooq.Table; import org.jooq.impl.DSL; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.postgresql.ds.PGSimpleDataSource; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.utility.DockerImageName; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -28,7 +31,10 @@ import java.sql.Statement; import java.time.Duration; import java.time.LocalDateTime; -import java.util.*; +import java.util.Date; +import java.util.Objects; +import java.util.StringJoiner; +import java.util.UUID; import java.util.concurrent.CompletionStage; import java.util.function.Function; @@ -39,13 +45,53 @@ import static org.jooq.impl.DSL.table; import static org.mockito.Mockito.mock; + public abstract class AbstractPostgresEventStoreTest { - protected Integer port = 5557; - protected String host = "localhost"; - protected String database = "eventsourcing"; - protected String user = "eventsourcing"; - protected String password = "eventsourcing"; + private static final PostgreSQLContainer postgreSQLContainer = new PostgreSQLContainer<>(DockerImageName.parse("postgres:14")) + .withUsername("eventsourcing") + .withPassword("eventsourcing") + .withDatabaseName("eventsourcing"); + + static { + if(!isCi()) { + postgreSQLContainer.start(); + } + } + + @AfterAll + public static void stopDb() { + if(!isCi()) { + postgreSQLContainer.stop(); + } + } + + protected static boolean isCi() { + return "true".equals(System.getenv("CI")); + } + + protected static Integer port() { + if (isCi()) { + return 5557; + } else { + return postgreSQLContainer.getFirstMappedPort(); + } + } + + protected static String host() { + return "localhost"; + } + protected static String database() { + return "eventsourcing"; + } + protected static String user() { + return "eventsourcing"; + } + protected static String password() { + return "eventsourcing"; + } + + private ReactivePostgresEventStore postgresEventStore; private PgAsyncPool pgAsyncPool; @@ -125,6 +171,12 @@ public void nextSequence() { } + @Test + public void nextSequences() { + List seq = inTransaction(ctx -> postgresEventStore.nextSequences(ctx, 5)).toCompletableFuture().join(); + assertThat(seq).hasSize(5); + } + protected CompletionStage inTransaction(Function> action) { return pgAsyncPool.inTransaction(action); } @@ -169,6 +221,18 @@ public void queryingByDate() { assertThat(events).containsExactlyInAnyOrder(event1, event2, event3); } + @Test + public void queryingBySeqAndEntityId() { + initDatas(); + List> events = getFromQuery(EventStore.Query.builder() + .withIdsAndSequences(List.of( + Tuple("bjorn@gmail.com", 0L), + Tuple("ragnard@gmail.com", 5L) + )) + .build()); + assertThat(events).containsExactlyInAnyOrder(event1, event2, event3, event6); + } + @Test public void queryingByPublished() { @@ -299,9 +363,9 @@ public void setUp() { this.pgAsyncPool = init(); PGSimpleDataSource pgSimpleDataSource = new PGSimpleDataSource(); - pgSimpleDataSource.setUrl("jdbc:postgresql://"+host+":"+port+"/"+database); - pgSimpleDataSource.setUser(user); - pgSimpleDataSource.setPassword(password); + pgSimpleDataSource.setUrl("jdbc:postgresql://"+host()+":"+port()+"/"+database()); + pgSimpleDataSource.setUser(user()); + pgSimpleDataSource.setPassword(password()); this.dslContext = DSL.using(pgSimpleDataSource, SQLDialect.POSTGRES); Try.of(() -> { this.dslContext.deleteFrom(vikings_journal).execute(); diff --git a/thoth-jooq-reactor/src/test/java/fr/maif/eventsourcing/DefaultReactorAggregateStoreTest.java b/thoth-jooq-reactor/src/test/java/fr/maif/eventsourcing/DefaultReactorAggregateStoreTest.java new file mode 100644 index 00000000..a0073c91 --- /dev/null +++ b/thoth-jooq-reactor/src/test/java/fr/maif/eventsourcing/DefaultReactorAggregateStoreTest.java @@ -0,0 +1,119 @@ +package fr.maif.eventsourcing; + +import io.vavr.Tuple; +import io.vavr.Tuple0; +import io.vavr.collection.HashMap; +import io.vavr.collection.List; +import io.vavr.collection.Map; +import io.vavr.control.Option; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.time.LocalDateTime; +import java.util.function.Function; + +import static fr.maif.eventsourcing.AbstractPostgresEventStoreTest.eventEnvelope; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class DefaultReactorAggregateStoreTest { + + private ReactorEventStore eventStore; + + record CountState(String id, Long sequenceNum, Integer count) implements State { + @Override + public String entityId() { + return id; + } + + @Override + public Long sequenceNum() { + return sequenceNum; + } + + @Override + public CountState withSequenceNum(Long sequenceNum) { + return new CountState(id, sequenceNum, count); + } + } + + sealed interface CountEvent extends Event { + record AddedEvent(String id, Integer add) implements CountEvent { + @Override + public Type type() { + return Type.create(AddedEvent.class, 1L); + } + + @Override + public String entityId() { + return id; + } + } + } + + EventHandler eventHandler = (EventHandler) (state, events) -> { + if( events instanceof CountEvent.AddedEvent addedEvent) { + return state.map(s -> new CountState(s.id, s.sequenceNum, s.count + addedEvent.add)).orElse( + Option.of(new CountState(addedEvent.id(), 0L, addedEvent.add))); + } else { + return state; + } + }; + + DefaultReactorAggregateStore defaultReactorAggregateStore; + + @BeforeEach + public void setUp() { + eventStore = mock(ReactorEventStore.class); + defaultReactorAggregateStore = new DefaultReactorAggregateStore<>( + eventStore, eventHandler, new ReactorTransactionManager() { + @Override + public Mono withTransaction(Function> callBack) { + return callBack.apply(Tuple.empty()); + } + }) { + @Override + public Mono> getSnapshots(Tuple0 transactionContext, List strings) { + return Mono.just(strings.flatMap(id -> { + if (id.equals("5")) { + return Option.of(new CountState(id, 0L, 50)); + } else { + return Option.none(); + } + })); + } + }; + } + + @Test + void loadEventToAggregates() { + List idsInt = List.range(1, 10); + List ids = idsInt.map(String::valueOf); + + when(eventStore.loadEventsByQuery(any(), any())).thenReturn(Flux.fromIterable(idsInt.flatMap(id -> + List.range(0, id).map(add -> + new CountEvent.AddedEvent(String.valueOf(id), add) + ) + )).map(evt -> eventEnvelope(0L, evt, LocalDateTime.now()))); + + Map> result = defaultReactorAggregateStore.getAggregates(Tuple.empty(), ids).block(); + assertThat(result).isEqualTo(HashMap.ofEntries(List.of( + new CountState("1", 0L, 0), + new CountState("2", 0L, 1), + new CountState("3", 0L, 3), + new CountState("4", 0L, 6), + new CountState("5", 0L, 60), + new CountState("6", 0L, 15), + new CountState("7", 0L, 21), + new CountState("8", 0L, 28), + new CountState("9", 0L, 36) + ).map(s -> Tuple.of(s.id, Option.of(s))))); + + } + + +} \ No newline at end of file diff --git a/thoth-jooq-reactor/src/test/java/fr/maif/eventsourcing/JdbcPostgresEventStoreTest.java b/thoth-jooq-reactor/src/test/java/fr/maif/eventsourcing/JdbcPostgresEventStoreTest.java index c2e9a89b..a9d60735 100644 --- a/thoth-jooq-reactor/src/test/java/fr/maif/eventsourcing/JdbcPostgresEventStoreTest.java +++ b/thoth-jooq-reactor/src/test/java/fr/maif/eventsourcing/JdbcPostgresEventStoreTest.java @@ -16,9 +16,9 @@ protected PgAsyncPool init() { jooqConfig.setSQLDialect(SQLDialect.POSTGRES); PGSimpleDataSource pgSimpleDataSource = new PGSimpleDataSource(); - pgSimpleDataSource.setUrl("jdbc:postgresql://"+host+":"+port+"/"+database); - pgSimpleDataSource.setUser(user); - pgSimpleDataSource.setPassword(password); + pgSimpleDataSource.setUrl("jdbc:postgresql://"+host()+":"+port()+"/"+database()); + pgSimpleDataSource.setUser(user()); + pgSimpleDataSource.setPassword(password()); return new JdbcPgAsyncPool(SQLDialect.POSTGRES, pgSimpleDataSource, Executors.newFixedThreadPool(3)); } diff --git a/thoth-jooq-reactor/src/test/java/fr/maif/eventsourcing/ReactivePostgresEventStoreTest.java b/thoth-jooq-reactor/src/test/java/fr/maif/eventsourcing/ReactivePostgresEventStoreTest.java index 6bb674ae..4096ea41 100644 --- a/thoth-jooq-reactor/src/test/java/fr/maif/eventsourcing/ReactivePostgresEventStoreTest.java +++ b/thoth-jooq-reactor/src/test/java/fr/maif/eventsourcing/ReactivePostgresEventStoreTest.java @@ -21,11 +21,11 @@ protected PgAsyncPool init() { PoolOptions poolOptions = new PoolOptions().setMaxSize(30); PgConnectOptions options = new PgConnectOptions() - .setPort(port) + .setPort(port()) .setHost("localhost") - .setDatabase(database) - .setUser(user) - .setPassword(password); + .setDatabase(database()) + .setUser(user()) + .setPassword(password()); PgPool client = PgPool.pool(Vertx.vertx(), options, poolOptions); return new ReactivePgAsyncPool(client, jooqConfig); } diff --git a/thoth-jooq/src/main/java/fr/maif/eventsourcing/impl/PostgresEventStore.java b/thoth-jooq/src/main/java/fr/maif/eventsourcing/impl/PostgresEventStore.java index 5df1d701..b5cbddd0 100644 --- a/thoth-jooq/src/main/java/fr/maif/eventsourcing/impl/PostgresEventStore.java +++ b/thoth-jooq/src/main/java/fr/maif/eventsourcing/impl/PostgresEventStore.java @@ -33,6 +33,7 @@ import javax.sql.DataSource; import java.io.Closeable; import java.io.IOException; +import java.math.BigInteger; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; @@ -218,6 +219,14 @@ public CompletionStage nextSequence(Connection tx) { ), executor); } + @Override + public CompletionStage> nextSequences(Connection tx, Integer count) { + return CompletionStages.fromTry(() -> Try.of(() -> { + DSLContext ctx = using(tx); + return List.ofAll(ctx.fetchValues(sequence(name(this.tableNames.sequenceNumName)).nextvals(count))).map(BigInteger::longValue); + }), executor); + } + @Override public CompletionStage publish(List> events) { return this.eventPublisher.publish(events); @@ -308,7 +317,10 @@ public Publisher> loadEventsByQueryWithOptions(C query.userId().map(d -> field(" user_id").eq(d)), query.published().map(d -> field(" published").eq(d)), query.sequenceTo().map(d -> field(" sequence_num").lessOrEqual(d)), - query.sequenceFrom().map(d -> field(" sequence_num").greaterOrEqual(d)) + query.sequenceFrom().map(d -> field(" sequence_num").greaterOrEqual(d)), + Option.of(query.idsAndSequences()).filter(Traversable::nonEmpty).map(l -> + l.map(t -> field(" sequence_num").greaterThan(t._2).and(field(" entity_id").eq(t._1))).reduce(Condition::or) + ) ).flatMap(identity()); var tmpJooqQuery = DSL.using(tx) diff --git a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestState.java b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestState.java index 52204104..22257fa7 100644 --- a/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestState.java +++ b/thoth-tck/src/main/java/fr/maif/eventsourcing/datastore/TestState.java @@ -6,9 +6,15 @@ public class TestState extends AbstractState { public final String id; public final int count; - public TestState(String id, int count) { this.id = id; this.count = count; } + + @Override + public String entityId() { + return id; + } + + }