From 610d98507fcc9da9655e3f2687d55c990672eef3 Mon Sep 17 00:00:00 2001 From: Thomas Canava Date: Mon, 23 Sep 2024 00:51:53 +0200 Subject: [PATCH 1/3] Add multi request to KafkaRequestReply --- .../kafka/reply/KafkaRequestReply.java | 17 +++++ .../kafka/reply/KafkaRequestReplyImpl.java | 62 +++++++++------ .../kafka/reply/KafkaRequestReplyTest.java | 75 +++++++++++++++++++ 3 files changed, 132 insertions(+), 22 deletions(-) diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReply.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReply.java index 23af6eb92e..6c7fba3ca5 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReply.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReply.java @@ -5,6 +5,7 @@ import java.util.Map; import java.util.Set; +import io.smallrye.mutiny.Multi; import org.apache.kafka.common.TopicPartition; import org.eclipse.microprofile.reactive.messaging.Message; @@ -120,6 +121,22 @@ public interface KafkaRequestReply extends EmitterType { */ Uni> request(Message request); + /** + * Sends a request and receives responses. + * + * @param request the request object to be sent + * @return a Multi object representing the results of the send and receive operation + */ + Multi requestMulti(Req request); + + /** + * Sends a request and receives responses. + * + * @param request the request object to be sent + * @return a Multi object representing the results of the send and receive operation + */ + Multi> requestMulti(Message request); + /** * Blocks until the consumer has been assigned all partitions for consumption. * If a {@code reply.partition} is provided, waits only for the assignment of that particular partition. diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyImpl.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyImpl.java index 240fef87f8..64b75579c7 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyImpl.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyImpl.java @@ -31,6 +31,7 @@ import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.helpers.Subscriptions; +import io.smallrye.mutiny.subscription.MultiEmitter; import io.smallrye.mutiny.subscription.MultiSubscriber; import io.smallrye.mutiny.subscription.UniEmitter; import io.smallrye.reactive.messaging.ClientCustomizer; @@ -185,12 +186,22 @@ private void grace(Duration duration) { @Override public Uni request(Req request) { - return request(ContextAwareMessage.of(request)) - .map(Message::getPayload); + return requestMulti(request).toUni(); } @Override public Uni> request(Message request) { + return requestMulti(request).toUni(); + } + + @Override + public Multi requestMulti(Req request) { + return requestMulti(ContextAwareMessage.of(request)) + .map(Message::getPayload); + } + + @Override + public Multi> requestMulti(Message request) { var builder = request.getMetadata(OutgoingKafkaRecordMetadata.class) .map(metadata -> OutgoingKafkaRecordMetadata.from(metadata)) .orElseGet(OutgoingKafkaRecordMetadata::builder); @@ -204,16 +215,26 @@ public Uni> request(Message request) { OutgoingMessageMetadata outMetadata = new OutgoingMessageMetadata<>(); return sendMessage(request.addMetadata(builder.build()).addMetadata(outMetadata)) .invoke(() -> subscription.get().request(1)) - .chain(unused -> Uni.createFrom().> emitter(emitter -> pendingReplies.put(correlationId, - new PendingReplyImpl<>(outMetadata.getResult(), replyTopic, replyPartition, - (UniEmitter>) emitter))) - .ifNoItem().after(replyTimeout).fail()) - .onItemOrFailure().invoke(() -> pendingReplies.remove(correlationId)) - .plug(uni -> replyFailureHandler != null ? uni.onItem().transformToUni(f -> { - Throwable failure = replyFailureHandler.handleReply((KafkaRecord) f); - return failure != null ? Uni.createFrom().failure(failure) : Uni.createFrom().item(f); - }) : uni) - .plug(uni -> replyConverter != null ? uni.map(f -> replyConverter.apply(f)) : uni); + .onItem() + .transformToMulti(unused -> Multi.createFrom().>emitter(emitter -> { + pendingReplies.put(correlationId, + new PendingReplyImpl<>(outMetadata.getResult(), + replyTopic, + replyPartition, + (MultiEmitter>) emitter)); + }) + .ifNoItem().after(replyTimeout).fail() + ).onTermination().invoke(() -> pendingReplies.remove(correlationId)) + .onItem().transformToUniAndMerge(m -> { + if (replyFailureHandler != null) { + Throwable failure = replyFailureHandler.handleReply((KafkaRecord) m); + if (failure != null) { + return Uni.createFrom().failure(failure); + } + } + return Uni.createFrom().item(m); + }) + .plug(multi -> replyConverter != null ? multi.map(f -> replyConverter.apply(f)) : multi); } @Override @@ -262,13 +283,10 @@ public void onItem(KafkaRecord record) { // If reply topic header is NOT null, it is considered a request not a reply if (header != null && record.getHeaders().lastHeader(replyTopicHeader) == null) { CorrelationId correlationId = correlationIdHandler.parse(header.value()); - PendingReplyImpl reply = pendingReplies.remove(correlationId); - if (reply != null) { - reply.getEmitter().complete(record); - return; - } else { - log.requestReplyRecordIgnored(channel, record.getTopic(), correlationId.toString()); - } + pendingReplies.computeIfPresent(correlationId, (id, reply) -> { + reply.getEmitter().emit(record); + return reply; + }); } // request more subscription.get().request(1); @@ -289,10 +307,10 @@ public static class PendingReplyImpl implements PendingReply { private final RecordMetadata metadata; private final String replyTopic; private final int replyPartition; - private final UniEmitter> emitter; + private final MultiEmitter> emitter; public PendingReplyImpl(RecordMetadata metadata, String replyTopic, int replyPartition, - UniEmitter> emitter) { + MultiEmitter> emitter) { this.replyTopic = replyTopic; this.replyPartition = replyPartition; this.metadata = metadata; @@ -314,7 +332,7 @@ public RecordMetadata recordMetadata() { return metadata; } - public UniEmitter> getEmitter() { + public MultiEmitter> getEmitter() { return emitter; } diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyTest.java index 209b033b65..8c4c9798e3 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyTest.java @@ -5,11 +5,14 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; +import java.time.Duration; +import java.util.ArrayList; import java.util.Base64; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; +import io.smallrye.mutiny.Multi; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; @@ -139,6 +142,58 @@ void testReplyMessage() { .containsExactlyInAnyOrder("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"); } + @Test + void testReplyMessageMulti() { + addBeans(ReplyServerMultipleReplies.class); + topic = companion.topics().createAndWait(topic, 3); + String replyTopic = topic + "-replies"; + companion.topics().createAndWait(replyTopic, 3); + + List replies = new CopyOnWriteArrayList<>(); + + RequestReplyProducer app = runApplication(config(), RequestReplyProducer.class); + List expected = new ArrayList<>(); + int sent = 5; + for (int i = 0; i < sent; i++) { + app.requestReply().requestMulti(i) + .subscribe() + .with(replies::add); + for (int j = 0; j < ReplyServerMultipleReplies.REPLIES; j++) { + expected.add(i + ": " + j); + } + } + await().untilAsserted(() -> assertThat(replies).hasSize(ReplyServerMultipleReplies.REPLIES * sent)); + assertThat(replies) + .containsAll(expected); + + assertThat(companion.consumeStrings().fromTopics(replyTopic, ReplyServerMultipleReplies.REPLIES * sent).awaitCompletion()) + .extracting(ConsumerRecord::value) + .containsAll(expected); + } + + @Test + void testReplyMessageMultiLimit() { + addBeans(ReplyServerMultipleReplies.class); + topic = companion.topics().createAndWait(topic, 3); + String replyTopic = topic + "-replies"; + companion.topics().createAndWait(replyTopic, 3); + + List replies = new CopyOnWriteArrayList<>(); + + RequestReplyProducer app = runApplication(config(), RequestReplyProducer.class); + app.requestReply().requestMulti(0) + .capDemandsTo(5) + .subscribe() + .with(replies::add); + await().untilAsserted(() -> assertThat(replies).hasSize(5)); + assertThat(replies) + .containsExactlyInAnyOrder("0: 0", "0: 1", "0: 2", "0: 3", "0: 4"); + + assertThat(companion.consumeStrings().fromTopics(replyTopic, 5).awaitCompletion()) + .extracting(ConsumerRecord::value) + .containsExactlyInAnyOrder("0: 0", "0: 1", "0: 2", "0: 3", "0: 4"); + } + @Test void testReplyWithReplyTopic() { addBeans(ReplyServer.class); @@ -610,6 +665,26 @@ String process(Integer payload) { } } + @ApplicationScoped + public static class ReplyServerMultipleReplies { + + public static final int REPLIES = 10; + + @Incoming("req") + @Outgoing("rep") + Multi process(Integer payload) { + if (payload == null) { + return null; + } + return Multi.createFrom().emitter(multiEmitter -> { + for (int i = 0; i < REPLIES; i++) { + multiEmitter.emit(payload + ": " + i); + } + multiEmitter.complete(); + }); + } + } + @ApplicationScoped public static class ReplyServerSlow { From f822df91b5edcab07ed9298d02b035736e685469 Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Tue, 24 Sep 2024 19:18:08 +0200 Subject: [PATCH 2/3] Add multi request to KafkaRequestReply 2 --- .../kafka/reply/KafkaRequestReply.java | 2 +- .../kafka/reply/KafkaRequestReplyImpl.java | 43 +++++++++++++------ .../messaging/kafka/reply/PendingReply.java | 10 +++++ .../kafka/reply/KafkaRequestReplyTest.java | 28 ++++++++++-- 4 files changed, 66 insertions(+), 17 deletions(-) diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReply.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReply.java index 6c7fba3ca5..236b2f33b0 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReply.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReply.java @@ -5,11 +5,11 @@ import java.util.Map; import java.util.Set; -import io.smallrye.mutiny.Multi; import org.apache.kafka.common.TopicPartition; import org.eclipse.microprofile.reactive.messaging.Message; import io.smallrye.common.annotation.Experimental; +import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; import io.smallrye.reactive.messaging.EmitterType; import io.smallrye.reactive.messaging.kafka.KafkaConsumer; diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyImpl.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyImpl.java index 64b75579c7..faa261d67b 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyImpl.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyImpl.java @@ -33,7 +33,6 @@ import io.smallrye.mutiny.helpers.Subscriptions; import io.smallrye.mutiny.subscription.MultiEmitter; import io.smallrye.mutiny.subscription.MultiSubscriber; -import io.smallrye.mutiny.subscription.UniEmitter; import io.smallrye.reactive.messaging.ClientCustomizer; import io.smallrye.reactive.messaging.EmitterConfiguration; import io.smallrye.reactive.messaging.OutgoingMessageMetadata; @@ -173,6 +172,12 @@ public void complete() { pendingReplies.size(), pendingReplies.keySet()); } } + for (CorrelationId correlationId : pendingReplies.keySet()) { + PendingReplyImpl reply = pendingReplies.remove(correlationId); + if (reply != null) { + reply.complete(); + } + } replySource.closeQuietly(); } @@ -216,15 +221,15 @@ public Multi> requestMulti(Message request) { return sendMessage(request.addMetadata(builder.build()).addMetadata(outMetadata)) .invoke(() -> subscription.get().request(1)) .onItem() - .transformToMulti(unused -> Multi.createFrom().>emitter(emitter -> { - pendingReplies.put(correlationId, - new PendingReplyImpl<>(outMetadata.getResult(), - replyTopic, - replyPartition, - (MultiEmitter>) emitter)); - }) - .ifNoItem().after(replyTimeout).fail() - ).onTermination().invoke(() -> pendingReplies.remove(correlationId)) + .transformToMulti(unused -> Multi.createFrom().> emitter(emitter -> { + pendingReplies.put(correlationId, + new PendingReplyImpl<>(outMetadata.getResult(), + replyTopic, + replyPartition, + (MultiEmitter>) emitter)); + }) + .ifNoItem().after(replyTimeout).fail()) + .onTermination().invoke(() -> pendingReplies.remove(correlationId)) .onItem().transformToUniAndMerge(m -> { if (replyFailureHandler != null) { Throwable failure = replyFailureHandler.handleReply((KafkaRecord) m); @@ -283,10 +288,12 @@ public void onItem(KafkaRecord record) { // If reply topic header is NOT null, it is considered a request not a reply if (header != null && record.getHeaders().lastHeader(replyTopicHeader) == null) { CorrelationId correlationId = correlationIdHandler.parse(header.value()); - pendingReplies.computeIfPresent(correlationId, (id, reply) -> { + PendingReplyImpl reply = pendingReplies.get(correlationId); + if (reply != null) { reply.getEmitter().emit(record); - return reply; - }); + } else { + log.requestReplyRecordIgnored(channel, record.getTopic(), correlationId.toString()); + } } // request more subscription.get().request(1); @@ -332,6 +339,16 @@ public RecordMetadata recordMetadata() { return metadata; } + @Override + public void complete() { + emitter.complete(); + } + + @Override + public boolean isCancelled() { + return emitter.isCancelled(); + } + public MultiEmitter> getEmitter() { return emitter; } diff --git a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/PendingReply.java b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/PendingReply.java index ec5adbc319..f00a7ac0c2 100644 --- a/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/PendingReply.java +++ b/smallrye-reactive-messaging-kafka/src/main/java/io/smallrye/reactive/messaging/kafka/reply/PendingReply.java @@ -21,4 +21,14 @@ public interface PendingReply { * @return the recordMetadata of the request */ RecordMetadata recordMetadata(); + + /** + * Complete the pending reply. + */ + void complete(); + + /** + * @return whether the pending reply was terminated (with a completion or failure). + */ + boolean isCancelled(); } diff --git a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyTest.java b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyTest.java index 8c4c9798e3..b5eaf7b0f7 100644 --- a/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyTest.java +++ b/smallrye-reactive-messaging-kafka/src/test/java/io/smallrye/reactive/messaging/kafka/reply/KafkaRequestReplyTest.java @@ -5,14 +5,12 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; -import java.time.Duration; import java.util.ArrayList; import java.util.Base64; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; -import io.smallrye.mutiny.Multi; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; @@ -35,6 +33,7 @@ import org.junit.jupiter.api.Test; import io.smallrye.common.annotation.Identifier; +import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.TimeoutException; import io.smallrye.mutiny.helpers.test.UniAssertSubscriber; import io.smallrye.reactive.messaging.annotations.Blocking; @@ -91,6 +90,7 @@ void testReply() { assertThat(companion.consumeStrings().fromTopics(replyTopic, 10).awaitCompletion()) .extracting(ConsumerRecord::value).containsExactly("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"); + assertThat(app.requestReply().getPendingReplies()).isEmpty(); } @Test @@ -114,6 +114,7 @@ void testReplyWithConverter() { assertThat(companion.consumeStrings().fromTopics(replyTopic, 10).awaitCompletion()) .extracting(ConsumerRecord::value) .containsExactly("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"); + assertThat(app.requestReply().getPendingReplies()).isEmpty(); } @Test @@ -140,6 +141,7 @@ void testReplyMessage() { assertThat(companion.consumeStrings().fromTopics(replyTopic, 10).awaitCompletion()) .extracting(ConsumerRecord::value) .containsExactlyInAnyOrder("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"); + assertThat(app.requestReply().getPendingReplies()).isEmpty(); } @Test @@ -169,6 +171,16 @@ void testReplyMessageMulti() { assertThat(companion.consumeStrings().fromTopics(replyTopic, ReplyServerMultipleReplies.REPLIES * sent).awaitCompletion()) .extracting(ConsumerRecord::value) .containsAll(expected); + + Map pendingReplies = app.requestReply().getPendingReplies(); + assertThat(pendingReplies).allSatisfy((k, v) -> assertThat(v.isCancelled()).isFalse()); + for (PendingReply pending : pendingReplies.values()) { + pending.complete(); + } + assertThat(pendingReplies).allSatisfy((k, v) -> assertThat(v.isCancelled()).isTrue()); + assertThat(app.requestReply().getPendingReplies()) + .allSatisfy((k, v) -> assertThat(v.isCancelled()).isTrue()); + await().untilAsserted(() -> assertThat(app.requestReply().getPendingReplies()).isEmpty()); } @Test @@ -182,7 +194,7 @@ void testReplyMessageMultiLimit() { RequestReplyProducer app = runApplication(config(), RequestReplyProducer.class); app.requestReply().requestMulti(0) - .capDemandsTo(5) + .select().first(5) .subscribe() .with(replies::add); await().untilAsserted(() -> assertThat(replies).hasSize(5)); @@ -192,6 +204,7 @@ void testReplyMessageMultiLimit() { assertThat(companion.consumeStrings().fromTopics(replyTopic, 5).awaitCompletion()) .extracting(ConsumerRecord::value) .containsExactlyInAnyOrder("0: 0", "0: 1", "0: 2", "0: 3", "0: 4"); + assertThat(app.requestReply().getPendingReplies()).isEmpty(); } @Test @@ -241,6 +254,7 @@ void testReplyWithSameTopic() { await().untilAsserted(() -> assertThat(replies).hasSize(10)); assertThat(replies).extracting(ConsumerRecord::value) .containsExactlyInAnyOrder("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"); + assertThat(app.requestReply().getPendingReplies()).isEmpty(); } @Test @@ -268,6 +282,7 @@ void testReplyWithReplyPartition() { assertThat(companion.consumeStrings().fromTopics(replyTopic, 10).awaitCompletion()) .allSatisfy(record -> assertThat(record.partition()).isEqualTo(2)) .extracting(ConsumerRecord::value).containsExactlyInAnyOrder("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"); + assertThat(app.requestReply().getPendingReplies()).isEmpty(); } @Test @@ -300,6 +315,7 @@ void testReplyWithConsumerConfig() { assertThat(companion.consumerGroups().list()).extracting(ConsumerGroupListing::groupId) .contains(replyTopicConsumer); await().untilAsserted(() -> assertThat(companion.consumerGroups().offsets(replyTopicConsumer)).isNotEmpty()); + assertThat(app.requestReply().getPendingReplies()).isEmpty(); } @Test @@ -332,6 +348,7 @@ void testReplyWithCustomHeadersReplyServerMessage() { assertThat(companion.consumeStrings().fromTopics(replyTopic, 10).awaitCompletion()) .allSatisfy(record -> assertThat(record.partition()).isEqualTo(2)) .extracting(ConsumerRecord::value).containsExactlyInAnyOrder("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"); + assertThat(app.requestReply().getPendingReplies()).isEmpty(); } @Test @@ -362,6 +379,7 @@ void testReplyWithCustomHeaders() { assertThat(companion.consumeStrings().fromTopics(replyTopic, 10).awaitCompletion()) .allSatisfy(record -> assertThat(record.partition()).isEqualTo(2)) .extracting(ConsumerRecord::value).containsExactlyInAnyOrder("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"); + assertThat(app.requestReply().getPendingReplies()).isEmpty(); } @Test @@ -476,6 +494,7 @@ void testReplyMessageBytesCorrelationId() { assertThat(companion.consumeStrings().fromTopics(replyTopic, 10).awaitCompletion()) .extracting(ConsumerRecord::value).containsExactlyInAnyOrder("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"); + assertThat(app.requestReply().getPendingReplies()).isEmpty(); } @Test @@ -508,6 +527,7 @@ void testReplyFailureHandler() { .extracting(Throwable::getMessage) .allSatisfy(message -> assertThat(message).containsAnyOf("0", "3", "6", "9") .contains("Cannot reply to")); + assertThat(app.requestReply().getPendingReplies()).isEmpty(); } @Test @@ -532,6 +552,7 @@ void testReplyOffsetResetEarliest() { assertThat(companion.consumeStrings().fromTopics(replyTopic, 10).awaitCompletion()) .extracting(ConsumerRecord::value).containsExactlyInAnyOrder("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"); + assertThat(app.requestReply().getPendingReplies()).isEmpty(); } @Test @@ -567,6 +588,7 @@ void testReplyAssignAndSeekOffset() { assertThat(companion.consumeStrings().fromOffsets(Map.of(tp(replyTopic, 2), 10L), 10).awaitCompletion()) .extracting(ConsumerRecord::value).containsExactlyInAnyOrder("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"); + assertThat(app.requestReply().getPendingReplies()).isEmpty(); } @Test From 74caf72d9a845b1aadd19087070612dd4be1b816 Mon Sep 17 00:00:00 2001 From: Thomas Canava Date: Wed, 25 Sep 2024 00:00:41 +0200 Subject: [PATCH 3/3] Init doc for multi request --- .../src/main/docs/kafka/request-reply.md | 15 +++++++++++++ .../KafkaRequestReplyMultiEmitter.java | 21 +++++++++++++++++++ 2 files changed, 36 insertions(+) create mode 100644 documentation/src/main/java/kafka/outbound/KafkaRequestReplyMultiEmitter.java diff --git a/documentation/src/main/docs/kafka/request-reply.md b/documentation/src/main/docs/kafka/request-reply.md index 22ba07c8ae..419c30b13a 100644 --- a/documentation/src/main/docs/kafka/request-reply.md +++ b/documentation/src/main/docs/kafka/request-reply.md @@ -65,6 +65,21 @@ Like the core Emitter's `send` methods, `request` method also can receive a `Mes The ingested reply type of the `KafkaRequestReply` is discovered at runtime, in order to configure a `MessageConveter` to be applied on the incoming message before returning the `Uni` result. +## Requesting multiple replies + +You can use the `requestMulti` method to expect any number of replies represented by the `Multi` return type. + +For example this can be used to aggregate multiple replies to a single request. + +``` java +{{ insert('kafka/outbound/KafkaRequestReplyMultiEmitter.java') }} +``` +Like the other `request` you can also request `Message` types. + +!!! note + The channel attribute `reply.timeout` will be applied between each message, if reached the returned `Multi` will + fail. + ## Scaling Request/Reply If multiple requestor instances are configured on the same outgoing topic, and the same reply topic, diff --git a/documentation/src/main/java/kafka/outbound/KafkaRequestReplyMultiEmitter.java b/documentation/src/main/java/kafka/outbound/KafkaRequestReplyMultiEmitter.java new file mode 100644 index 0000000000..8d8897ba1d --- /dev/null +++ b/documentation/src/main/java/kafka/outbound/KafkaRequestReplyMultiEmitter.java @@ -0,0 +1,21 @@ +package org.acme; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import org.eclipse.microprofile.reactive.messaging.Channel; + +import io.smallrye.mutiny.Multi; +import io.smallrye.reactive.messaging.kafka.reply.KafkaRequestReply; + +@ApplicationScoped +public class KafkaRequestReplyMultiEmitter { + + @Inject + @Channel("my-request") + KafkaRequestReply quoteRequest; + + public Multi requestQuote(String request) { + return quoteRequest.requestMulti(request).select().first(5); + } +}