Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] Fixed an issue where clients were unintentionally disconnected #348

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions src/Client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,7 @@ Client::Client(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Client>(info)
}

try {
this->cClient = std::shared_ptr<pulsar_client_t>(
pulsar_client_create(serviceUrl.Utf8Value().c_str(), cClientConfig.get()), pulsar_client_free);
this->cClient = pulsar_client_create(serviceUrl.Utf8Value().c_str(), cClientConfig.get());
} catch (const std::exception &e) {
Napi::Error::New(env, e.what()).ThrowAsJavaScriptException();
}
Expand All @@ -214,7 +213,7 @@ Napi::Value Client::GetPartitionsForTopic(const Napi::CallbackInfo &info) {
auto ctx = new ExtDeferredContext(deferred);

pulsar_client_get_topic_partitions_async(
this->cClient.get(), topic.c_str(),
this->cClient, topic.c_str(),
[](pulsar_result result, pulsar_string_list_t *topicList, void *ctx) {
auto deferredContext = static_cast<ExtDeferredContext *>(ctx);
auto deferred = deferredContext->deferred;
Expand Down Expand Up @@ -275,7 +274,7 @@ Napi::Value Client::Close(const Napi::CallbackInfo &info) {
auto ctx = new ExtDeferredContext(deferred);

pulsar_client_close_async(
this->cClient.get(),
this->cClient,
[](pulsar_result result, void *ctx) {
auto deferredContext = static_cast<ExtDeferredContext *>(ctx);
auto deferred = deferredContext->deferred;
Expand Down
2 changes: 1 addition & 1 deletion src/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class Client : public Napi::ObjectWrap<Client> {
private:
static LogCallback *logCallback;
static Napi::FunctionReference constructor;
std::shared_ptr<pulsar_client_t> cClient;
pulsar_client_t *cClient;
std::shared_ptr<pulsar_client_configuration_t> cClientConfig;

Napi::Value CreateProducer(const Napi::CallbackInfo &info);
Expand Down
13 changes: 6 additions & 7 deletions src/Consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,11 @@ void Consumer::SetListenerCallback(MessageListenerCallback *listener) {
Consumer::Consumer(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Consumer>(info), listener(nullptr) {}

struct ConsumerNewInstanceContext {
ConsumerNewInstanceContext(std::shared_ptr<ThreadSafeDeferred> deferred,
std::shared_ptr<pulsar_client_t> cClient,
ConsumerNewInstanceContext(std::shared_ptr<ThreadSafeDeferred> deferred, pulsar_client_t *cClient,
std::shared_ptr<ConsumerConfig> consumerConfig)
: deferred(deferred), cClient(cClient), consumerConfig(consumerConfig){};
std::shared_ptr<ThreadSafeDeferred> deferred;
std::shared_ptr<pulsar_client_t> cClient;
pulsar_client_t *cClient;
std::shared_ptr<ConsumerConfig> consumerConfig;

static void subscribeCallback(pulsar_result result, pulsar_consumer_t *rawConsumer, void *ctx) {
Expand Down Expand Up @@ -189,7 +188,7 @@ struct ConsumerNewInstanceContext {
}
};

Napi::Value Consumer::NewInstance(const Napi::CallbackInfo &info, std::shared_ptr<pulsar_client_t> cClient) {
Napi::Value Consumer::NewInstance(const Napi::CallbackInfo &info, pulsar_client_t *cClient) {
auto deferred = ThreadSafeDeferred::New(info.Env());
auto config = info[0].As<Napi::Object>();
std::shared_ptr<ConsumerConfig> consumerConfig = std::make_shared<ConsumerConfig>(config, &MessageListener);
Expand Down Expand Up @@ -226,20 +225,20 @@ Napi::Value Consumer::NewInstance(const Napi::CallbackInfo &info, std::shared_pt
auto ctx = new ConsumerNewInstanceContext(deferred, cClient, consumerConfig);

if (!topicsPattern.empty()) {
pulsar_client_subscribe_pattern_async(cClient.get(), topicsPattern.c_str(), subscription.c_str(),
pulsar_client_subscribe_pattern_async(cClient, topicsPattern.c_str(), subscription.c_str(),
consumerConfig->GetCConsumerConfig().get(),
&ConsumerNewInstanceContext::subscribeCallback, ctx);
} else if (topics.size() > 0) {
const char **cTopics = new const char *[topics.size()];
for (size_t i = 0; i < topics.size(); i++) {
cTopics[i] = topics[i].c_str();
}
pulsar_client_subscribe_multi_topics_async(cClient.get(), cTopics, topics.size(), subscription.c_str(),
pulsar_client_subscribe_multi_topics_async(cClient, cTopics, topics.size(), subscription.c_str(),
consumerConfig->GetCConsumerConfig().get(),
&ConsumerNewInstanceContext::subscribeCallback, ctx);
delete[] cTopics;
} else {
pulsar_client_subscribe_async(cClient.get(), topic.c_str(), subscription.c_str(),
pulsar_client_subscribe_async(cClient, topic.c_str(), subscription.c_str(),
consumerConfig->GetCConsumerConfig().get(),
&ConsumerNewInstanceContext::subscribeCallback, ctx);
}
Expand Down
2 changes: 1 addition & 1 deletion src/Consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
class Consumer : public Napi::ObjectWrap<Consumer> {
public:
static void Init(Napi::Env env, Napi::Object exports);
static Napi::Value NewInstance(const Napi::CallbackInfo &info, std::shared_ptr<pulsar_client_t> cClient);
static Napi::Value NewInstance(const Napi::CallbackInfo &info, pulsar_client_t *cClient);
static Napi::FunctionReference constructor;
Consumer(const Napi::CallbackInfo &info);
~Consumer();
Expand Down
9 changes: 4 additions & 5 deletions src/Producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,15 @@ void Producer::Init(Napi::Env env, Napi::Object exports) {
void Producer::SetCProducer(std::shared_ptr<pulsar_producer_t> cProducer) { this->cProducer = cProducer; }

struct ProducerNewInstanceContext {
ProducerNewInstanceContext(std::shared_ptr<ThreadSafeDeferred> deferred,
std::shared_ptr<pulsar_client_t> cClient,
ProducerNewInstanceContext(std::shared_ptr<ThreadSafeDeferred> deferred, pulsar_client_t *cClient,
std::shared_ptr<ProducerConfig> producerConfig)
: deferred(deferred), cClient(cClient), producerConfig(producerConfig){};
std::shared_ptr<ThreadSafeDeferred> deferred;
std::shared_ptr<pulsar_client_t> cClient;
pulsar_client_t *cClient;
std::shared_ptr<ProducerConfig> producerConfig;
};

Napi::Value Producer::NewInstance(const Napi::CallbackInfo &info, std::shared_ptr<pulsar_client_t> cClient) {
Napi::Value Producer::NewInstance(const Napi::CallbackInfo &info, pulsar_client_t *cClient) {
auto deferred = ThreadSafeDeferred::New(info.Env());
auto config = info[0].As<Napi::Object>();
auto producerConfig = std::make_shared<ProducerConfig>(config);
Expand All @@ -68,7 +67,7 @@ Napi::Value Producer::NewInstance(const Napi::CallbackInfo &info, std::shared_pt
auto ctx = new ProducerNewInstanceContext(deferred, cClient, producerConfig);

pulsar_client_create_producer_async(
cClient.get(), topic.c_str(), producerConfig->GetCProducerConfig().get(),
cClient, topic.c_str(), producerConfig->GetCProducerConfig().get(),
[](pulsar_result result, pulsar_producer_t *rawProducer, void *ctx) {
auto instanceContext = static_cast<ProducerNewInstanceContext *>(ctx);
auto deferred = instanceContext->deferred;
Expand Down
2 changes: 1 addition & 1 deletion src/Producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
class Producer : public Napi::ObjectWrap<Producer> {
public:
static void Init(Napi::Env env, Napi::Object exports);
static Napi::Value NewInstance(const Napi::CallbackInfo &info, std::shared_ptr<pulsar_client_t> cClient);
static Napi::Value NewInstance(const Napi::CallbackInfo &info, pulsar_client_t *cClient);
static Napi::FunctionReference constructor;
Producer(const Napi::CallbackInfo &info);
~Producer();
Expand Down
9 changes: 4 additions & 5 deletions src/Reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,11 @@ void Reader::SetListenerCallback(ReaderListenerCallback *listener) {
Reader::Reader(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Reader>(info), listener(nullptr) {}

struct ReaderNewInstanceContext {
ReaderNewInstanceContext(std::shared_ptr<ThreadSafeDeferred> deferred,
std::shared_ptr<pulsar_client_t> cClient,
ReaderNewInstanceContext(std::shared_ptr<ThreadSafeDeferred> deferred, pulsar_client_t *cClient,
std::shared_ptr<ReaderConfig> readerConfig)
: deferred(deferred), cClient(cClient), readerConfig(readerConfig){};
std::shared_ptr<ThreadSafeDeferred> deferred;
std::shared_ptr<pulsar_client_t> cClient;
pulsar_client_t *cClient;
std::shared_ptr<ReaderConfig> readerConfig;

static void createReaderCallback(pulsar_result result, pulsar_reader_t *rawReader, void *ctx) {
Expand All @@ -143,7 +142,7 @@ struct ReaderNewInstanceContext {
}
};

Napi::Value Reader::NewInstance(const Napi::CallbackInfo &info, std::shared_ptr<pulsar_client_t> cClient) {
Napi::Value Reader::NewInstance(const Napi::CallbackInfo &info, pulsar_client_t *cClient) {
auto deferred = ThreadSafeDeferred::New(info.Env());
Napi::Object config = info[0].As<Napi::Object>();

Expand All @@ -162,7 +161,7 @@ Napi::Value Reader::NewInstance(const Napi::CallbackInfo &info, std::shared_ptr<

auto ctx = new ReaderNewInstanceContext(deferred, cClient, readerConfig);

pulsar_client_create_reader_async(cClient.get(), topic.c_str(), readerConfig->GetCStartMessageId().get(),
pulsar_client_create_reader_async(cClient, topic.c_str(), readerConfig->GetCStartMessageId().get(),
readerConfig->GetCReaderConfig().get(),
&ReaderNewInstanceContext::createReaderCallback, ctx);

Expand Down
2 changes: 1 addition & 1 deletion src/Reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
class Reader : public Napi::ObjectWrap<Reader> {
public:
static void Init(Napi::Env env, Napi::Object exports);
static Napi::Value NewInstance(const Napi::CallbackInfo &info, std::shared_ptr<pulsar_client_t> cClient);
static Napi::Value NewInstance(const Napi::CallbackInfo &info, pulsar_client_t *cClient);
static Napi::FunctionReference constructor;
Reader(const Napi::CallbackInfo &info);
~Reader();
Expand Down