Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support batch receive for consumer. #357

Merged
merged 2 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,12 @@ export interface ConsumerConfig {
batchIndexAckEnabled?: boolean;
regexSubscriptionMode?: RegexSubscriptionMode;
deadLetterPolicy?: DeadLetterPolicy;
batchReceivePolicy?: ConsumerBatchReceivePolicy;
}

export class Consumer {
receive(timeout?: number): Promise<Message>;
batchReceive(): Promise<Message []>;
acknowledge(message: Message): Promise<null>;
acknowledgeId(messageId: MessageId): Promise<null>;
negativeAcknowledge(message: Message): void;
Expand Down Expand Up @@ -181,6 +183,12 @@ export interface DeadLetterPolicy {
initialSubscriptionName?: string;
}

export interface ConsumerBatchReceivePolicy {
maxNumMessages?: number;
maxNumBytes?: number;
timeoutMs?: number;
}

export class AuthenticationTls {
constructor(params: { certificatePath: string, privateKeyPath: string });
}
Expand Down
65 changes: 40 additions & 25 deletions src/Consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ void Consumer::Init(Napi::Env env, Napi::Object exports) {
DefineClass(env, "Consumer",
{
InstanceMethod("receive", &Consumer::Receive),
InstanceMethod("batchReceive", &Consumer::BatchReceive),
InstanceMethod("acknowledge", &Consumer::Acknowledge),
InstanceMethod("acknowledgeId", &Consumer::AcknowledgeId),
InstanceMethod("negativeAcknowledge", &Consumer::NegativeAcknowledge),
Expand Down Expand Up @@ -192,36 +193,17 @@ struct ConsumerNewInstanceContext {
Napi::Value Consumer::NewInstance(const Napi::CallbackInfo &info, std::shared_ptr<pulsar_client_t> cClient) {
auto deferred = ThreadSafeDeferred::New(info.Env());
auto config = info[0].As<Napi::Object>();
std::shared_ptr<ConsumerConfig> consumerConfig = std::make_shared<ConsumerConfig>(config, &MessageListener);
std::shared_ptr<ConsumerConfig> consumerConfig = std::make_shared<ConsumerConfig>();

consumerConfig->InitConfig(deferred, config, &MessageListener);
if (deferred->IsDone()) {
return deferred->Promise();
}

const std::string &topic = consumerConfig->GetTopic();
const std::vector<std::string> &topics = consumerConfig->GetTopics();
const std::string &topicsPattern = consumerConfig->GetTopicsPattern();
if (topic.empty() && topics.size() == 0 && topicsPattern.empty()) {
deferred->Reject(
std::string("Topic, topics or topicsPattern is required and must be specified as a string when "
"creating consumer"));
return deferred->Promise();
}
const std::string &subscription = consumerConfig->GetSubscription();
if (subscription.empty()) {
deferred->Reject(
std::string("Subscription is required and must be specified as a string when creating consumer"));
return deferred->Promise();
}
int32_t ackTimeoutMs = consumerConfig->GetAckTimeoutMs();
if (ackTimeoutMs != 0 && ackTimeoutMs < MIN_ACK_TIMEOUT_MILLIS) {
std::string msg("Ack timeout should be 0 or greater than or equal to " +
std::to_string(MIN_ACK_TIMEOUT_MILLIS));
deferred->Reject(msg);
return deferred->Promise();
}
int32_t nAckRedeliverTimeoutMs = consumerConfig->GetNAckRedeliverTimeoutMs();
if (nAckRedeliverTimeoutMs < 0) {
std::string msg("NAck timeout should be greater than or equal to zero");
deferred->Reject(msg);
return deferred->Promise();
BewareMyPower marked this conversation as resolved.
Show resolved Hide resolved
}

auto ctx = new ConsumerNewInstanceContext(deferred, cClient, consumerConfig);

Expand Down Expand Up @@ -291,6 +273,39 @@ class ConsumerReceiveWorker : public Napi::AsyncWorker {
int64_t timeout;
};

Napi::Value Consumer::BatchReceive(const Napi::CallbackInfo &info) {
auto deferred = ThreadSafeDeferred::New(Env());
auto ctx = new ExtDeferredContext(deferred);
pulsar_consumer_batch_receive_async(
this->cConsumer.get(),
[](pulsar_result result, pulsar_messages_t *rawMessages, void *ctx) {
auto deferredContext = static_cast<ExtDeferredContext *>(ctx);
auto deferred = deferredContext->deferred;
delete deferredContext;

if (result != pulsar_result_Ok) {
deferred->Reject(std::string("Failed to batch receive message: ") + pulsar_result_str(result));
} else {
deferred->Resolve([rawMessages](const Napi::Env env) {
int listSize = pulsar_messages_size(rawMessages);
Napi::Array jsArray = Napi::Array::New(env, listSize);
for (int i = 0; i < listSize; i++) {
pulsar_message_t *rawMessage = pulsar_messages_get(rawMessages, i);
pulsar_message_t *message = pulsar_message_create();
pulsar_message_copy(rawMessage, message);
Napi::Object obj =
Message::NewInstance({}, std::shared_ptr<pulsar_message_t>(message, pulsar_message_free));
jsArray.Set(i, obj);
}
pulsar_messages_free(rawMessages);
return jsArray;
});
}
},
ctx);
return deferred->Promise();
}

Napi::Value Consumer::Receive(const Napi::CallbackInfo &info) {
if (info[0].IsUndefined()) {
auto deferred = ThreadSafeDeferred::New(Env());
Expand Down
1 change: 1 addition & 0 deletions src/Consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class Consumer : public Napi::ObjectWrap<Consumer> {
MessageListenerCallback *listener;

Napi::Value Receive(const Napi::CallbackInfo &info);
Napi::Value BatchReceive(const Napi::CallbackInfo &info);
Napi::Value Acknowledge(const Napi::CallbackInfo &info);
Napi::Value AcknowledgeId(const Napi::CallbackInfo &info);
void NegativeAcknowledge(const Napi::CallbackInfo &info);
Expand Down
71 changes: 65 additions & 6 deletions src/ConsumerConfig.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ static const std::string CFG_DEAD_LETTER_POLICY = "deadLetterPolicy";
static const std::string CFG_DLQ_POLICY_TOPIC = "deadLetterTopic";
static const std::string CFG_DLQ_POLICY_MAX_REDELIVER_COUNT = "maxRedeliverCount";
static const std::string CFG_DLQ_POLICY_INIT_SUB_NAME = "initialSubscriptionName";
static const std::string CFG_BATCH_RECEIVE_POLICY = "batchReceivePolicy";
static const std::string CFG_BATCH_RECEIVE_POLICY_MAX_NUM_MESSAGES = "maxNumMessages";
static const std::string CFG_BATCH_RECEIVE_POLICY_MAX_NUM_BYTES = "maxNumBytes";
static const std::string CFG_BATCH_RECEIVE_POLICY_TIMEOUT_MS = "timeoutMs";

static const std::map<std::string, pulsar_consumer_type> SUBSCRIPTION_TYPE = {
{"Exclusive", pulsar_ConsumerExclusive},
Expand All @@ -74,7 +78,7 @@ static const std::map<std::string, pulsar_consumer_crypto_failure_action> CONSUM

void FinalizeListenerCallback(Napi::Env env, MessageListenerCallback *cb, void *) { delete cb; }

ConsumerConfig::ConsumerConfig(const Napi::Object &consumerConfig, pulsar_message_listener messageListener)
ConsumerConfig::ConsumerConfig()
: topic(""),
topicsPattern(""),
subscription(""),
Expand All @@ -83,7 +87,10 @@ ConsumerConfig::ConsumerConfig(const Napi::Object &consumerConfig, pulsar_messag
listener(nullptr) {
this->cConsumerConfig = std::shared_ptr<pulsar_consumer_configuration_t>(
pulsar_consumer_configuration_create(), pulsar_consumer_configuration_free);
}

void ConsumerConfig::InitConfig(const std::shared_ptr<ThreadSafeDeferred> deferred,
const Napi::Object &consumerConfig, pulsar_message_listener messageListener) {
if (consumerConfig.Has(CFG_TOPIC) && consumerConfig.Get(CFG_TOPIC).IsString()) {
this->topic = consumerConfig.Get(CFG_TOPIC).ToString().Utf8Value();
}
Expand All @@ -101,9 +108,21 @@ ConsumerConfig::ConsumerConfig(const Napi::Object &consumerConfig, pulsar_messag
this->topicsPattern = consumerConfig.Get(CFG_TOPICS_PATTERN).ToString().Utf8Value();
}

if (this->topic.empty() && this->topics.size() == 0 && this->topicsPattern.empty()) {
deferred->Reject(
std::string("Topic, topics or topicsPattern is required and must be specified as a string when "
"creating consumer"));
return;
}

if (consumerConfig.Has(CFG_SUBSCRIPTION) && consumerConfig.Get(CFG_SUBSCRIPTION).IsString()) {
this->subscription = consumerConfig.Get(CFG_SUBSCRIPTION).ToString().Utf8Value();
}
if (subscription.empty()) {
deferred->Reject(
std::string("Subscription is required and must be specified as a string when creating consumer"));
return;
}

if (consumerConfig.Has(CFG_SUBSCRIPTION_TYPE) && consumerConfig.Get(CFG_SUBSCRIPTION_TYPE).IsString()) {
std::string subscriptionType = consumerConfig.Get(CFG_SUBSCRIPTION_TYPE).ToString().Utf8Value();
Expand Down Expand Up @@ -139,18 +158,25 @@ ConsumerConfig::ConsumerConfig(const Napi::Object &consumerConfig, pulsar_messag

if (consumerConfig.Has(CFG_ACK_TIMEOUT) && consumerConfig.Get(CFG_ACK_TIMEOUT).IsNumber()) {
this->ackTimeoutMs = consumerConfig.Get(CFG_ACK_TIMEOUT).ToNumber().Int64Value();
if (this->ackTimeoutMs == 0 || this->ackTimeoutMs >= MIN_ACK_TIMEOUT_MILLIS) {
pulsar_consumer_set_unacked_messages_timeout_ms(this->cConsumerConfig.get(), this->ackTimeoutMs);
if (this->ackTimeoutMs != 0 && ackTimeoutMs < MIN_ACK_TIMEOUT_MILLIS) {
std::string msg("Ack timeout should be 0 or greater than or equal to " +
std::to_string(MIN_ACK_TIMEOUT_MILLIS));
deferred->Reject(msg);
return;
}
pulsar_consumer_set_unacked_messages_timeout_ms(this->cConsumerConfig.get(), this->ackTimeoutMs);
}

if (consumerConfig.Has(CFG_NACK_REDELIVER_TIMEOUT) &&
consumerConfig.Get(CFG_NACK_REDELIVER_TIMEOUT).IsNumber()) {
this->nAckRedeliverTimeoutMs = consumerConfig.Get(CFG_NACK_REDELIVER_TIMEOUT).ToNumber().Int64Value();
if (this->nAckRedeliverTimeoutMs >= 0) {
pulsar_configure_set_negative_ack_redelivery_delay_ms(this->cConsumerConfig.get(),
this->nAckRedeliverTimeoutMs);
if (nAckRedeliverTimeoutMs < 0) {
std::string msg("NAck timeout should be greater than or equal to zero");
deferred->Reject(msg);
return;
}
pulsar_configure_set_negative_ack_redelivery_delay_ms(this->cConsumerConfig.get(),
this->nAckRedeliverTimeoutMs);
}

if (consumerConfig.Has(CFG_RECV_QUEUE) && consumerConfig.Get(CFG_RECV_QUEUE).IsNumber()) {
Expand Down Expand Up @@ -265,6 +291,39 @@ ConsumerConfig::ConsumerConfig(const Napi::Object &consumerConfig, pulsar_messag
}
pulsar_consumer_configuration_set_dlq_policy(this->cConsumerConfig.get(), &dlq_policy);
}

if (consumerConfig.Has(CFG_BATCH_RECEIVE_POLICY) &&
consumerConfig.Get(CFG_BATCH_RECEIVE_POLICY).IsObject()) {
Napi::Object propObj = consumerConfig.Get(CFG_BATCH_RECEIVE_POLICY).ToObject();
int maxNumMessages = -1;
if (propObj.Has(CFG_BATCH_RECEIVE_POLICY_MAX_NUM_MESSAGES) &&
shibd marked this conversation as resolved.
Show resolved Hide resolved
propObj.Get(CFG_BATCH_RECEIVE_POLICY_MAX_NUM_MESSAGES).IsNumber()) {
maxNumMessages = propObj.Get(CFG_BATCH_RECEIVE_POLICY_MAX_NUM_MESSAGES).ToNumber().Int32Value();
}
int maxNumBytes = 10 * 1024 * 1024;
if (propObj.Has(CFG_BATCH_RECEIVE_POLICY_MAX_NUM_BYTES) &&
propObj.Get(CFG_BATCH_RECEIVE_POLICY_MAX_NUM_BYTES).IsNumber()) {
maxNumBytes = propObj.Get(CFG_BATCH_RECEIVE_POLICY_MAX_NUM_BYTES).ToNumber().Int64Value();
}
int timeoutMs = 100;
if (propObj.Has(CFG_BATCH_RECEIVE_POLICY_TIMEOUT_MS) &&
propObj.Get(CFG_BATCH_RECEIVE_POLICY_TIMEOUT_MS).IsNumber()) {
timeoutMs = propObj.Get(CFG_BATCH_RECEIVE_POLICY_TIMEOUT_MS).ToNumber().Int64Value();
}
if (maxNumMessages <= 0 && maxNumBytes <= 0 && timeoutMs <= 0) {
std::string msg("At least one of maxNumMessages, maxNumBytes and timeoutMs must be specified.");
deferred->Reject(msg);
return;
}
pulsar_consumer_batch_receive_policy_t batch_receive_policy{maxNumMessages, maxNumBytes, timeoutMs};
int result = pulsar_consumer_configuration_set_batch_receive_policy(this->cConsumerConfig.get(),
&batch_receive_policy);
if (result == -1) {
std::string msg("Set batch receive policy failed: C client returned failure");
deferred->Reject(msg);
return;
}
}
}

ConsumerConfig::~ConsumerConfig() {
Expand Down
5 changes: 4 additions & 1 deletion src/ConsumerConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,17 @@
#define CONSUMER_CONFIG_H

#include <pulsar/c/consumer_configuration.h>
#include "ThreadSafeDeferred.h"
#include "MessageListener.h"

#define MIN_ACK_TIMEOUT_MILLIS 10000

class ConsumerConfig {
public:
ConsumerConfig(const Napi::Object &consumerConfig, pulsar_message_listener messageListener);
ConsumerConfig();
~ConsumerConfig();
void InitConfig(const std::shared_ptr<ThreadSafeDeferred> deferred, const Napi::Object &consumerConfig,
shibd marked this conversation as resolved.
Show resolved Hide resolved
pulsar_message_listener messageListener);
std::shared_ptr<pulsar_consumer_configuration_t> GetCConsumerConfig();
std::string GetTopic();
std::vector<std::string> GetTopics();
Expand Down
4 changes: 4 additions & 0 deletions src/ThreadSafeDeferred.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,7 @@ void ThreadSafeDeferred::Reject(const std::string &errorMsg) {
this->fate = EFate::REJECTED;
this->tsf.Release();
}

bool ThreadSafeDeferred::IsDone() const {
return this->fate == EFate::RESOLVED || this->fate == EFate::REJECTED;
}
1 change: 1 addition & 0 deletions src/ThreadSafeDeferred.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class ThreadSafeDeferred : public Napi::Promise::Deferred {
inline void Reject() { this->Reject(""); }
void Reject(
const std::string &); // <- if only Reject were virtual... But we can live without polymorphism here
bool IsDone() const;

static std::shared_ptr<ThreadSafeDeferred> New(const Napi::Env env);
};
Expand Down
72 changes: 72 additions & 0 deletions tests/consumer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,37 @@ const Pulsar = require('../index');
nAckRedeliverTimeoutMs: -12,
})).rejects.toThrow('NAck timeout should be greater than or equal to zero');
});

test('Ack timeout less 10000', async () => {
await expect(client.subscribe({
topic: 'test-topic',
subscription: 'sub1',
subscriptionType: 'Shared',
ackTimeoutMs: 100,
})).rejects.toThrow('Ack timeout should be 0 or greater than or equal to 10000');
});

test('NAck timeout less 0', async () => {
await expect(client.subscribe({
topic: 'test-topic',
subscription: 'sub1',
subscriptionType: 'Shared',
nAckRedeliverTimeoutMs: -1,
})).rejects.toThrow('NAck timeout should be greater than or equal to zero');
});

test('Batch Receive Config Error', async () => {
await expect(client.subscribe({
topic: 'test-batch-receive-policy-error',
subscription: 'sub1',
subscriptionType: 'Shared',
batchReceivePolicy: {
maxNumMessages: -1,
maxNumBytes: -1,
timeoutMs: -1,
},
})).rejects.toThrow('At least one of maxNumMessages, maxNumBytes and timeoutMs must be specified.');
});
});

describe('Close', () => {
Expand Down Expand Up @@ -315,6 +346,47 @@ const Pulsar = require('../index');
consumer.close();
dlqConsumer.close();
});

test('Batch Receive', async () => {
const topicName = 'batch-receive-test-topic';
const producer = await client.createProducer({
topic: topicName,
});

const consumer = await client.subscribe({
topic: topicName,
subscription: 'sub1',
subscriptionType: 'Shared',
batchReceivePolicy: {
maxNumMessages: 10,
maxNumBytes: -1,
timeoutMs: 500,
shibd marked this conversation as resolved.
Show resolved Hide resolved
},
});
const num = 10;
const messages = [];
for (let i = 0; i < num; i += 1) {
const msg = `my-message-${i}`;
await producer.send({ data: Buffer.from(msg) });
messages.push(msg);
}

const receiveMessages = await consumer.batchReceive();
expect(receiveMessages.length).toEqual(num);
const results = [];
for (let i = 0; i < receiveMessages.length; i += 1) {
const msg = receiveMessages[i];
console.log(msg.getData().toString());
results.push(msg.getData().toString());
}
expect(results).toEqual(messages);

// assert no more msgs.
expect(await consumer.batchReceive()).toEqual([]);

await producer.close();
await consumer.close();
});
});
});
})();
Loading