From 258241c374fccbaadf8ed19bbdb05cfa787c2a35 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Mon, 8 Jan 2024 10:52:03 +0800 Subject: [PATCH] feat: Support batch receive for consumer. (#357) --- index.d.ts | 8 +++ src/Consumer.cc | 65 +++++++++++++--------- src/Consumer.h | 1 + src/ConsumerConfig.cc | 71 ++++++++++++++++++++++-- src/ConsumerConfig.h | 5 +- src/ThreadSafeDeferred.cc | 4 ++ src/ThreadSafeDeferred.h | 1 + tests/consumer.test.js | 114 ++++++++++++++++++++++++++++++++++++++ 8 files changed, 237 insertions(+), 32 deletions(-) diff --git a/index.d.ts b/index.d.ts index 91f13ed6..c416ee2a 100644 --- a/index.d.ts +++ b/index.d.ts @@ -98,10 +98,12 @@ export interface ConsumerConfig { batchIndexAckEnabled?: boolean; regexSubscriptionMode?: RegexSubscriptionMode; deadLetterPolicy?: DeadLetterPolicy; + batchReceivePolicy?: ConsumerBatchReceivePolicy; } export class Consumer { receive(timeout?: number): Promise; + batchReceive(): Promise; acknowledge(message: Message): Promise; acknowledgeId(messageId: MessageId): Promise; negativeAcknowledge(message: Message): void; @@ -181,6 +183,12 @@ export interface DeadLetterPolicy { initialSubscriptionName?: string; } +export interface ConsumerBatchReceivePolicy { + maxNumMessages?: number; + maxNumBytes?: number; + timeoutMs?: number; +} + export class AuthenticationTls { constructor(params: { certificatePath: string, privateKeyPath: string }); } diff --git a/src/Consumer.cc b/src/Consumer.cc index 328da898..e56f8ba6 100644 --- a/src/Consumer.cc +++ b/src/Consumer.cc @@ -38,6 +38,7 @@ void Consumer::Init(Napi::Env env, Napi::Object exports) { DefineClass(env, "Consumer", { InstanceMethod("receive", &Consumer::Receive), + InstanceMethod("batchReceive", &Consumer::BatchReceive), InstanceMethod("acknowledge", &Consumer::Acknowledge), InstanceMethod("acknowledgeId", &Consumer::AcknowledgeId), InstanceMethod("negativeAcknowledge", &Consumer::NegativeAcknowledge), @@ -192,36 +193,17 @@ struct ConsumerNewInstanceContext { Napi::Value Consumer::NewInstance(const Napi::CallbackInfo &info, std::shared_ptr cClient) { auto deferred = ThreadSafeDeferred::New(info.Env()); auto config = info[0].As(); - std::shared_ptr consumerConfig = std::make_shared(config, &MessageListener); + std::shared_ptr consumerConfig = std::make_shared(); + + consumerConfig->InitConfig(deferred, config, &MessageListener); + if (deferred->IsDone()) { + return deferred->Promise(); + } const std::string &topic = consumerConfig->GetTopic(); const std::vector &topics = consumerConfig->GetTopics(); const std::string &topicsPattern = consumerConfig->GetTopicsPattern(); - if (topic.empty() && topics.size() == 0 && topicsPattern.empty()) { - deferred->Reject( - std::string("Topic, topics or topicsPattern is required and must be specified as a string when " - "creating consumer")); - return deferred->Promise(); - } const std::string &subscription = consumerConfig->GetSubscription(); - if (subscription.empty()) { - deferred->Reject( - std::string("Subscription is required and must be specified as a string when creating consumer")); - return deferred->Promise(); - } - int32_t ackTimeoutMs = consumerConfig->GetAckTimeoutMs(); - if (ackTimeoutMs != 0 && ackTimeoutMs < MIN_ACK_TIMEOUT_MILLIS) { - std::string msg("Ack timeout should be 0 or greater than or equal to " + - std::to_string(MIN_ACK_TIMEOUT_MILLIS)); - deferred->Reject(msg); - return deferred->Promise(); - } - int32_t nAckRedeliverTimeoutMs = consumerConfig->GetNAckRedeliverTimeoutMs(); - if (nAckRedeliverTimeoutMs < 0) { - std::string msg("NAck timeout should be greater than or equal to zero"); - deferred->Reject(msg); - return deferred->Promise(); - } auto ctx = new ConsumerNewInstanceContext(deferred, cClient, consumerConfig); @@ -291,6 +273,39 @@ class ConsumerReceiveWorker : public Napi::AsyncWorker { int64_t timeout; }; +Napi::Value Consumer::BatchReceive(const Napi::CallbackInfo &info) { + auto deferred = ThreadSafeDeferred::New(Env()); + auto ctx = new ExtDeferredContext(deferred); + pulsar_consumer_batch_receive_async( + this->cConsumer.get(), + [](pulsar_result result, pulsar_messages_t *rawMessages, void *ctx) { + auto deferredContext = static_cast(ctx); + auto deferred = deferredContext->deferred; + delete deferredContext; + + if (result != pulsar_result_Ok) { + deferred->Reject(std::string("Failed to batch receive message: ") + pulsar_result_str(result)); + } else { + deferred->Resolve([rawMessages](const Napi::Env env) { + int listSize = pulsar_messages_size(rawMessages); + Napi::Array jsArray = Napi::Array::New(env, listSize); + for (int i = 0; i < listSize; i++) { + pulsar_message_t *rawMessage = pulsar_messages_get(rawMessages, i); + pulsar_message_t *message = pulsar_message_create(); + pulsar_message_copy(rawMessage, message); + Napi::Object obj = + Message::NewInstance({}, std::shared_ptr(message, pulsar_message_free)); + jsArray.Set(i, obj); + } + pulsar_messages_free(rawMessages); + return jsArray; + }); + } + }, + ctx); + return deferred->Promise(); +} + Napi::Value Consumer::Receive(const Napi::CallbackInfo &info) { if (info[0].IsUndefined()) { auto deferred = ThreadSafeDeferred::New(Env()); diff --git a/src/Consumer.h b/src/Consumer.h index 1574fabb..2bc15e2b 100644 --- a/src/Consumer.h +++ b/src/Consumer.h @@ -44,6 +44,7 @@ class Consumer : public Napi::ObjectWrap { MessageListenerCallback *listener; Napi::Value Receive(const Napi::CallbackInfo &info); + Napi::Value BatchReceive(const Napi::CallbackInfo &info); Napi::Value Acknowledge(const Napi::CallbackInfo &info); Napi::Value AcknowledgeId(const Napi::CallbackInfo &info); void NegativeAcknowledge(const Napi::CallbackInfo &info); diff --git a/src/ConsumerConfig.cc b/src/ConsumerConfig.cc index 2758649d..23e7976c 100644 --- a/src/ConsumerConfig.cc +++ b/src/ConsumerConfig.cc @@ -51,6 +51,10 @@ static const std::string CFG_DEAD_LETTER_POLICY = "deadLetterPolicy"; static const std::string CFG_DLQ_POLICY_TOPIC = "deadLetterTopic"; static const std::string CFG_DLQ_POLICY_MAX_REDELIVER_COUNT = "maxRedeliverCount"; static const std::string CFG_DLQ_POLICY_INIT_SUB_NAME = "initialSubscriptionName"; +static const std::string CFG_BATCH_RECEIVE_POLICY = "batchReceivePolicy"; +static const std::string CFG_BATCH_RECEIVE_POLICY_MAX_NUM_MESSAGES = "maxNumMessages"; +static const std::string CFG_BATCH_RECEIVE_POLICY_MAX_NUM_BYTES = "maxNumBytes"; +static const std::string CFG_BATCH_RECEIVE_POLICY_TIMEOUT_MS = "timeoutMs"; static const std::map SUBSCRIPTION_TYPE = { {"Exclusive", pulsar_ConsumerExclusive}, @@ -74,7 +78,7 @@ static const std::map CONSUM void FinalizeListenerCallback(Napi::Env env, MessageListenerCallback *cb, void *) { delete cb; } -ConsumerConfig::ConsumerConfig(const Napi::Object &consumerConfig, pulsar_message_listener messageListener) +ConsumerConfig::ConsumerConfig() : topic(""), topicsPattern(""), subscription(""), @@ -83,7 +87,10 @@ ConsumerConfig::ConsumerConfig(const Napi::Object &consumerConfig, pulsar_messag listener(nullptr) { this->cConsumerConfig = std::shared_ptr( pulsar_consumer_configuration_create(), pulsar_consumer_configuration_free); +} +void ConsumerConfig::InitConfig(std::shared_ptr deferred, + const Napi::Object &consumerConfig, pulsar_message_listener messageListener) { if (consumerConfig.Has(CFG_TOPIC) && consumerConfig.Get(CFG_TOPIC).IsString()) { this->topic = consumerConfig.Get(CFG_TOPIC).ToString().Utf8Value(); } @@ -101,9 +108,21 @@ ConsumerConfig::ConsumerConfig(const Napi::Object &consumerConfig, pulsar_messag this->topicsPattern = consumerConfig.Get(CFG_TOPICS_PATTERN).ToString().Utf8Value(); } + if (this->topic.empty() && this->topics.size() == 0 && this->topicsPattern.empty()) { + deferred->Reject( + std::string("Topic, topics or topicsPattern is required and must be specified as a string when " + "creating consumer")); + return; + } + if (consumerConfig.Has(CFG_SUBSCRIPTION) && consumerConfig.Get(CFG_SUBSCRIPTION).IsString()) { this->subscription = consumerConfig.Get(CFG_SUBSCRIPTION).ToString().Utf8Value(); } + if (subscription.empty()) { + deferred->Reject( + std::string("Subscription is required and must be specified as a string when creating consumer")); + return; + } if (consumerConfig.Has(CFG_SUBSCRIPTION_TYPE) && consumerConfig.Get(CFG_SUBSCRIPTION_TYPE).IsString()) { std::string subscriptionType = consumerConfig.Get(CFG_SUBSCRIPTION_TYPE).ToString().Utf8Value(); @@ -139,18 +158,25 @@ ConsumerConfig::ConsumerConfig(const Napi::Object &consumerConfig, pulsar_messag if (consumerConfig.Has(CFG_ACK_TIMEOUT) && consumerConfig.Get(CFG_ACK_TIMEOUT).IsNumber()) { this->ackTimeoutMs = consumerConfig.Get(CFG_ACK_TIMEOUT).ToNumber().Int64Value(); - if (this->ackTimeoutMs == 0 || this->ackTimeoutMs >= MIN_ACK_TIMEOUT_MILLIS) { - pulsar_consumer_set_unacked_messages_timeout_ms(this->cConsumerConfig.get(), this->ackTimeoutMs); + if (this->ackTimeoutMs != 0 && ackTimeoutMs < MIN_ACK_TIMEOUT_MILLIS) { + std::string msg("Ack timeout should be 0 or greater than or equal to " + + std::to_string(MIN_ACK_TIMEOUT_MILLIS)); + deferred->Reject(msg); + return; } + pulsar_consumer_set_unacked_messages_timeout_ms(this->cConsumerConfig.get(), this->ackTimeoutMs); } if (consumerConfig.Has(CFG_NACK_REDELIVER_TIMEOUT) && consumerConfig.Get(CFG_NACK_REDELIVER_TIMEOUT).IsNumber()) { this->nAckRedeliverTimeoutMs = consumerConfig.Get(CFG_NACK_REDELIVER_TIMEOUT).ToNumber().Int64Value(); - if (this->nAckRedeliverTimeoutMs >= 0) { - pulsar_configure_set_negative_ack_redelivery_delay_ms(this->cConsumerConfig.get(), - this->nAckRedeliverTimeoutMs); + if (nAckRedeliverTimeoutMs < 0) { + std::string msg("NAck timeout should be greater than or equal to zero"); + deferred->Reject(msg); + return; } + pulsar_configure_set_negative_ack_redelivery_delay_ms(this->cConsumerConfig.get(), + this->nAckRedeliverTimeoutMs); } if (consumerConfig.Has(CFG_RECV_QUEUE) && consumerConfig.Get(CFG_RECV_QUEUE).IsNumber()) { @@ -265,6 +291,39 @@ ConsumerConfig::ConsumerConfig(const Napi::Object &consumerConfig, pulsar_messag } pulsar_consumer_configuration_set_dlq_policy(this->cConsumerConfig.get(), &dlq_policy); } + + if (consumerConfig.Has(CFG_BATCH_RECEIVE_POLICY) && + consumerConfig.Get(CFG_BATCH_RECEIVE_POLICY).IsObject()) { + Napi::Object propObj = consumerConfig.Get(CFG_BATCH_RECEIVE_POLICY).ToObject(); + int maxNumMessages = -1; + if (propObj.Has(CFG_BATCH_RECEIVE_POLICY_MAX_NUM_MESSAGES) && + propObj.Get(CFG_BATCH_RECEIVE_POLICY_MAX_NUM_MESSAGES).IsNumber()) { + maxNumMessages = propObj.Get(CFG_BATCH_RECEIVE_POLICY_MAX_NUM_MESSAGES).ToNumber().Int32Value(); + } + int maxNumBytes = 10 * 1024 * 1024; + if (propObj.Has(CFG_BATCH_RECEIVE_POLICY_MAX_NUM_BYTES) && + propObj.Get(CFG_BATCH_RECEIVE_POLICY_MAX_NUM_BYTES).IsNumber()) { + maxNumBytes = propObj.Get(CFG_BATCH_RECEIVE_POLICY_MAX_NUM_BYTES).ToNumber().Int64Value(); + } + int timeoutMs = 100; + if (propObj.Has(CFG_BATCH_RECEIVE_POLICY_TIMEOUT_MS) && + propObj.Get(CFG_BATCH_RECEIVE_POLICY_TIMEOUT_MS).IsNumber()) { + timeoutMs = propObj.Get(CFG_BATCH_RECEIVE_POLICY_TIMEOUT_MS).ToNumber().Int64Value(); + } + if (maxNumMessages <= 0 && maxNumBytes <= 0 && timeoutMs <= 0) { + std::string msg("At least one of maxNumMessages, maxNumBytes and timeoutMs must be specified."); + deferred->Reject(msg); + return; + } + pulsar_consumer_batch_receive_policy_t batch_receive_policy{maxNumMessages, maxNumBytes, timeoutMs}; + int result = pulsar_consumer_configuration_set_batch_receive_policy(this->cConsumerConfig.get(), + &batch_receive_policy); + if (result == -1) { + std::string msg("Set batch receive policy failed: C client returned failure"); + deferred->Reject(msg); + return; + } + } } ConsumerConfig::~ConsumerConfig() { diff --git a/src/ConsumerConfig.h b/src/ConsumerConfig.h index db74c19e..48e0545d 100644 --- a/src/ConsumerConfig.h +++ b/src/ConsumerConfig.h @@ -21,14 +21,17 @@ #define CONSUMER_CONFIG_H #include +#include "ThreadSafeDeferred.h" #include "MessageListener.h" #define MIN_ACK_TIMEOUT_MILLIS 10000 class ConsumerConfig { public: - ConsumerConfig(const Napi::Object &consumerConfig, pulsar_message_listener messageListener); + ConsumerConfig(); ~ConsumerConfig(); + void InitConfig(std::shared_ptr deferred, const Napi::Object &consumerConfig, + pulsar_message_listener messageListener); std::shared_ptr GetCConsumerConfig(); std::string GetTopic(); std::vector GetTopics(); diff --git a/src/ThreadSafeDeferred.cc b/src/ThreadSafeDeferred.cc index 804aea91..25ef038c 100644 --- a/src/ThreadSafeDeferred.cc +++ b/src/ThreadSafeDeferred.cc @@ -96,3 +96,7 @@ void ThreadSafeDeferred::Reject(const std::string &errorMsg) { this->fate = EFate::REJECTED; this->tsf.Release(); } + +bool ThreadSafeDeferred::IsDone() const { + return this->fate == EFate::RESOLVED || this->fate == EFate::REJECTED; +} diff --git a/src/ThreadSafeDeferred.h b/src/ThreadSafeDeferred.h index da050f99..bee219ee 100644 --- a/src/ThreadSafeDeferred.h +++ b/src/ThreadSafeDeferred.h @@ -73,6 +73,7 @@ class ThreadSafeDeferred : public Napi::Promise::Deferred { inline void Reject() { this->Reject(""); } void Reject( const std::string &); // <- if only Reject were virtual... But we can live without polymorphism here + bool IsDone() const; static std::shared_ptr New(const Napi::Env env); }; diff --git a/tests/consumer.test.js b/tests/consumer.test.js index 9e1be707..3a7d203b 100644 --- a/tests/consumer.test.js +++ b/tests/consumer.test.js @@ -122,6 +122,37 @@ const Pulsar = require('../index'); nAckRedeliverTimeoutMs: -12, })).rejects.toThrow('NAck timeout should be greater than or equal to zero'); }); + + test('Ack timeout less 10000', async () => { + await expect(client.subscribe({ + topic: 'test-topic', + subscription: 'sub1', + subscriptionType: 'Shared', + ackTimeoutMs: 100, + })).rejects.toThrow('Ack timeout should be 0 or greater than or equal to 10000'); + }); + + test('NAck timeout less 0', async () => { + await expect(client.subscribe({ + topic: 'test-topic', + subscription: 'sub1', + subscriptionType: 'Shared', + nAckRedeliverTimeoutMs: -1, + })).rejects.toThrow('NAck timeout should be greater than or equal to zero'); + }); + + test('Batch Receive Config Error', async () => { + await expect(client.subscribe({ + topic: 'test-batch-receive-policy-error', + subscription: 'sub1', + subscriptionType: 'Shared', + batchReceivePolicy: { + maxNumMessages: -1, + maxNumBytes: -1, + timeoutMs: -1, + }, + })).rejects.toThrow('At least one of maxNumMessages, maxNumBytes and timeoutMs must be specified.'); + }); }); describe('Close', () => { @@ -315,6 +346,89 @@ const Pulsar = require('../index'); consumer.close(); dlqConsumer.close(); }); + + test('Batch Receive by maxNumberMessages', async () => { + const topicName = 'batch-receive-test-topic'; + const producer = await client.createProducer({ + topic: topicName, + }); + + const consumer = await client.subscribe({ + topic: topicName, + subscription: 'sub1', + subscriptionType: 'Shared', + batchReceivePolicy: { + maxNumMessages: 10, + maxNumBytes: -1, + timeoutMs: 500, + }, + }); + const num = 10; + const messages = []; + for (let i = 0; i < num; i += 1) { + const msg = `my-message-${i}`; + await producer.send({ data: Buffer.from(msg) }); + messages.push(msg); + } + + const receiveMessages = await consumer.batchReceive(); + expect(receiveMessages.length).toEqual(num); + const results = []; + for (let i = 0; i < receiveMessages.length; i += 1) { + const msg = receiveMessages[i]; + console.log(msg.getData().toString()); + results.push(msg.getData().toString()); + } + expect(results).toEqual(messages); + + // assert no more msgs. + expect(await consumer.batchReceive()).toEqual([]); + + await producer.close(); + await consumer.close(); + }); + + test('Batch Receive by timeOutMs', async () => { + const topicName = 'batch-receive-test-topic-timeout'; + const producer = await client.createProducer({ + topic: topicName, + }); + + const consumer = await client.subscribe({ + topic: topicName, + subscription: 'sub1', + subscriptionType: 'Shared', + batchReceivePolicy: { + maxNumMessages: 100, + maxNumBytes: -1, + timeoutMs: 500, + }, + }); + // just send 10 message waite trigger timeout. + const num = 10; + const messages = []; + for (let i = 0; i < num; i += 1) { + const msg = `my-message-${i}`; + await producer.send({ data: Buffer.from(msg) }); + messages.push(msg); + } + + const receiveMessages = await consumer.batchReceive(); + expect(receiveMessages.length).toEqual(num); + const results = []; + for (let i = 0; i < receiveMessages.length; i += 1) { + const msg = receiveMessages[i]; + console.log(msg.getData().toString()); + results.push(msg.getData().toString()); + } + expect(results).toEqual(messages); + + // assert no more msgs. + expect(await consumer.batchReceive()).toEqual([]); + + await producer.close(); + await consumer.close(); + }); }); }); })();