Skip to content

Commit

Permalink
convert cClient from shared_ptr to normal pointer
Browse files Browse the repository at this point in the history
  • Loading branch information
kontotto committed Oct 31, 2023
1 parent 325853d commit a01b80d
Show file tree
Hide file tree
Showing 8 changed files with 21 additions and 25 deletions.
7 changes: 3 additions & 4 deletions src/Client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,7 @@ Client::Client(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Client>(info)
}

try {
this->cClient = std::shared_ptr<pulsar_client_t>(
pulsar_client_create(serviceUrl.Utf8Value().c_str(), cClientConfig.get()), pulsar_client_free);
this->cClient = pulsar_client_create(serviceUrl.Utf8Value().c_str(), cClientConfig.get());
} catch (const std::exception &e) {
Napi::Error::New(env, e.what()).ThrowAsJavaScriptException();
}
Expand All @@ -214,7 +213,7 @@ Napi::Value Client::GetPartitionsForTopic(const Napi::CallbackInfo &info) {
auto ctx = new ExtDeferredContext(deferred);

pulsar_client_get_topic_partitions_async(
this->cClient.get(), topic.c_str(),
this->cClient, topic.c_str(),
[](pulsar_result result, pulsar_string_list_t *topicList, void *ctx) {
auto deferredContext = static_cast<ExtDeferredContext *>(ctx);
auto deferred = deferredContext->deferred;
Expand Down Expand Up @@ -275,7 +274,7 @@ Napi::Value Client::Close(const Napi::CallbackInfo &info) {
auto ctx = new ExtDeferredContext(deferred);

pulsar_client_close_async(
this->cClient.get(),
this->cClient,
[](pulsar_result result, void *ctx) {
auto deferredContext = static_cast<ExtDeferredContext *>(ctx);
auto deferred = deferredContext->deferred;
Expand Down
2 changes: 1 addition & 1 deletion src/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class Client : public Napi::ObjectWrap<Client> {
private:
static LogCallback *logCallback;
static Napi::FunctionReference constructor;
std::shared_ptr<pulsar_client_t> cClient;
pulsar_client_t *cClient;
std::shared_ptr<pulsar_client_configuration_t> cClientConfig;

Napi::Value CreateProducer(const Napi::CallbackInfo &info);
Expand Down
13 changes: 6 additions & 7 deletions src/Consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,11 @@ void Consumer::SetListenerCallback(MessageListenerCallback *listener) {
Consumer::Consumer(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Consumer>(info), listener(nullptr) {}

struct ConsumerNewInstanceContext {
ConsumerNewInstanceContext(std::shared_ptr<ThreadSafeDeferred> deferred,
std::shared_ptr<pulsar_client_t> cClient,
ConsumerNewInstanceContext(std::shared_ptr<ThreadSafeDeferred> deferred, pulsar_client_t *cClient,
std::shared_ptr<ConsumerConfig> consumerConfig)
: deferred(deferred), cClient(cClient), consumerConfig(consumerConfig){};
std::shared_ptr<ThreadSafeDeferred> deferred;
std::shared_ptr<pulsar_client_t> cClient;
pulsar_client_t *cClient;
std::shared_ptr<ConsumerConfig> consumerConfig;

static void subscribeCallback(pulsar_result result, pulsar_consumer_t *rawConsumer, void *ctx) {
Expand Down Expand Up @@ -189,7 +188,7 @@ struct ConsumerNewInstanceContext {
}
};

Napi::Value Consumer::NewInstance(const Napi::CallbackInfo &info, std::shared_ptr<pulsar_client_t> cClient) {
Napi::Value Consumer::NewInstance(const Napi::CallbackInfo &info, pulsar_client_t *cClient) {
auto deferred = ThreadSafeDeferred::New(info.Env());
auto config = info[0].As<Napi::Object>();
std::shared_ptr<ConsumerConfig> consumerConfig = std::make_shared<ConsumerConfig>(config, &MessageListener);
Expand Down Expand Up @@ -226,20 +225,20 @@ Napi::Value Consumer::NewInstance(const Napi::CallbackInfo &info, std::shared_pt
auto ctx = new ConsumerNewInstanceContext(deferred, cClient, consumerConfig);

if (!topicsPattern.empty()) {
pulsar_client_subscribe_pattern_async(cClient.get(), topicsPattern.c_str(), subscription.c_str(),
pulsar_client_subscribe_pattern_async(cClient, topicsPattern.c_str(), subscription.c_str(),
consumerConfig->GetCConsumerConfig().get(),
&ConsumerNewInstanceContext::subscribeCallback, ctx);
} else if (topics.size() > 0) {
const char **cTopics = new const char *[topics.size()];
for (size_t i = 0; i < topics.size(); i++) {
cTopics[i] = topics[i].c_str();
}
pulsar_client_subscribe_multi_topics_async(cClient.get(), cTopics, topics.size(), subscription.c_str(),
pulsar_client_subscribe_multi_topics_async(cClient, cTopics, topics.size(), subscription.c_str(),
consumerConfig->GetCConsumerConfig().get(),
&ConsumerNewInstanceContext::subscribeCallback, ctx);
delete[] cTopics;
} else {
pulsar_client_subscribe_async(cClient.get(), topic.c_str(), subscription.c_str(),
pulsar_client_subscribe_async(cClient, topic.c_str(), subscription.c_str(),
consumerConfig->GetCConsumerConfig().get(),
&ConsumerNewInstanceContext::subscribeCallback, ctx);
}
Expand Down
2 changes: 1 addition & 1 deletion src/Consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
class Consumer : public Napi::ObjectWrap<Consumer> {
public:
static void Init(Napi::Env env, Napi::Object exports);
static Napi::Value NewInstance(const Napi::CallbackInfo &info, std::shared_ptr<pulsar_client_t> cClient);
static Napi::Value NewInstance(const Napi::CallbackInfo &info, pulsar_client_t *cClient);
static Napi::FunctionReference constructor;
Consumer(const Napi::CallbackInfo &info);
~Consumer();
Expand Down
9 changes: 4 additions & 5 deletions src/Producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,15 @@ void Producer::Init(Napi::Env env, Napi::Object exports) {
void Producer::SetCProducer(std::shared_ptr<pulsar_producer_t> cProducer) { this->cProducer = cProducer; }

struct ProducerNewInstanceContext {
ProducerNewInstanceContext(std::shared_ptr<ThreadSafeDeferred> deferred,
std::shared_ptr<pulsar_client_t> cClient,
ProducerNewInstanceContext(std::shared_ptr<ThreadSafeDeferred> deferred, pulsar_client_t *cClient,
std::shared_ptr<ProducerConfig> producerConfig)
: deferred(deferred), cClient(cClient), producerConfig(producerConfig){};
std::shared_ptr<ThreadSafeDeferred> deferred;
std::shared_ptr<pulsar_client_t> cClient;
pulsar_client_t *cClient;
std::shared_ptr<ProducerConfig> producerConfig;
};

Napi::Value Producer::NewInstance(const Napi::CallbackInfo &info, std::shared_ptr<pulsar_client_t> cClient) {
Napi::Value Producer::NewInstance(const Napi::CallbackInfo &info, pulsar_client_t *cClient) {
auto deferred = ThreadSafeDeferred::New(info.Env());
auto config = info[0].As<Napi::Object>();
auto producerConfig = std::make_shared<ProducerConfig>(config);
Expand All @@ -68,7 +67,7 @@ Napi::Value Producer::NewInstance(const Napi::CallbackInfo &info, std::shared_pt
auto ctx = new ProducerNewInstanceContext(deferred, cClient, producerConfig);

pulsar_client_create_producer_async(
cClient.get(), topic.c_str(), producerConfig->GetCProducerConfig().get(),
cClient, topic.c_str(), producerConfig->GetCProducerConfig().get(),
[](pulsar_result result, pulsar_producer_t *rawProducer, void *ctx) {
auto instanceContext = static_cast<ProducerNewInstanceContext *>(ctx);
auto deferred = instanceContext->deferred;
Expand Down
2 changes: 1 addition & 1 deletion src/Producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
class Producer : public Napi::ObjectWrap<Producer> {
public:
static void Init(Napi::Env env, Napi::Object exports);
static Napi::Value NewInstance(const Napi::CallbackInfo &info, std::shared_ptr<pulsar_client_t> cClient);
static Napi::Value NewInstance(const Napi::CallbackInfo &info, pulsar_client_t *cClient);
static Napi::FunctionReference constructor;
Producer(const Napi::CallbackInfo &info);
~Producer();
Expand Down
9 changes: 4 additions & 5 deletions src/Reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,11 @@ void Reader::SetListenerCallback(ReaderListenerCallback *listener) {
Reader::Reader(const Napi::CallbackInfo &info) : Napi::ObjectWrap<Reader>(info), listener(nullptr) {}

struct ReaderNewInstanceContext {
ReaderNewInstanceContext(std::shared_ptr<ThreadSafeDeferred> deferred,
std::shared_ptr<pulsar_client_t> cClient,
ReaderNewInstanceContext(std::shared_ptr<ThreadSafeDeferred> deferred, pulsar_client_t *cClient,
std::shared_ptr<ReaderConfig> readerConfig)
: deferred(deferred), cClient(cClient), readerConfig(readerConfig){};
std::shared_ptr<ThreadSafeDeferred> deferred;
std::shared_ptr<pulsar_client_t> cClient;
pulsar_client_t *cClient;
std::shared_ptr<ReaderConfig> readerConfig;

static void createReaderCallback(pulsar_result result, pulsar_reader_t *rawReader, void *ctx) {
Expand All @@ -143,7 +142,7 @@ struct ReaderNewInstanceContext {
}
};

Napi::Value Reader::NewInstance(const Napi::CallbackInfo &info, std::shared_ptr<pulsar_client_t> cClient) {
Napi::Value Reader::NewInstance(const Napi::CallbackInfo &info, pulsar_client_t *cClient) {
auto deferred = ThreadSafeDeferred::New(info.Env());
Napi::Object config = info[0].As<Napi::Object>();

Expand All @@ -162,7 +161,7 @@ Napi::Value Reader::NewInstance(const Napi::CallbackInfo &info, std::shared_ptr<

auto ctx = new ReaderNewInstanceContext(deferred, cClient, readerConfig);

pulsar_client_create_reader_async(cClient.get(), topic.c_str(), readerConfig->GetCStartMessageId().get(),
pulsar_client_create_reader_async(cClient, topic.c_str(), readerConfig->GetCStartMessageId().get(),
readerConfig->GetCReaderConfig().get(),
&ReaderNewInstanceContext::createReaderCallback, ctx);

Expand Down
2 changes: 1 addition & 1 deletion src/Reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
class Reader : public Napi::ObjectWrap<Reader> {
public:
static void Init(Napi::Env env, Napi::Object exports);
static Napi::Value NewInstance(const Napi::CallbackInfo &info, std::shared_ptr<pulsar_client_t> cClient);
static Napi::Value NewInstance(const Napi::CallbackInfo &info, pulsar_client_t *cClient);
static Napi::FunctionReference constructor;
Reader(const Napi::CallbackInfo &info);
~Reader();
Expand Down

0 comments on commit a01b80d

Please sign in to comment.