Skip to content

Commit

Permalink
More control on event publishing
Browse files Browse the repository at this point in the history
  • Loading branch information
larousso committed Nov 29, 2023
1 parent 9e1ff0f commit 400611b
Showing 1 changed file with 2 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,15 @@ public <TxCtx> void start(EventStore<TxCtx, E, Meta, Context> eventStore, Concur
.from(eventStore.loadEventsUnpublished(tx, strategy))
.transform(publishToKafka(eventStore, Option.some(tx), groupFlow))
.doOnNext(logProgressSink::tryEmitNext)
.concatMap(any -> Mono.fromCompletionStage(() -> {
.collectList()
.flatMap(any -> Mono.fromCompletionStage(() -> {
LOGGER.info("Replaying events not published in DB is finished for {}", topic);
return eventStore.commitOrRollback(Option.none(), tx);
}))
.doOnError(e -> {
eventStore.commitOrRollback(Option.of(e), tx);
LOGGER.error("Error replaying non published events to kafka for " + topic, e);
})
.collectList()
.map(__ -> Tuple.empty());
})
.retryWhen(Retry.backoff(10, restartInterval)
Expand Down

0 comments on commit 400611b

Please sign in to comment.