From 996549ec5f945431c26a5b5cc4a6ac331b6b998f Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Tue, 9 Apr 2024 16:30:54 +0800 Subject: [PATCH] fix: ReaderListenerProxy will make a segfault --- src/Reader.cc | 24 +++++++++++++----------- tests/reader.test.js | 41 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+), 11 deletions(-) diff --git a/src/Reader.cc b/src/Reader.cc index bd6c7f0c..4fe7c63c 100644 --- a/src/Reader.cc +++ b/src/Reader.cc @@ -60,17 +60,19 @@ struct ReaderListenerProxyData { void ReaderListenerProxy(Napi::Env env, Napi::Function jsCallback, ReaderListenerProxyData *data) { Napi::Object msg = Message::NewInstance({}, data->cMessage); Reader *reader = data->reader; - - Napi::Value ret = jsCallback.Call({msg, reader->Value()}); - if (ret.IsPromise()) { - Napi::Promise promise = ret.As(); - Napi::Value thenValue = promise.Get("then"); - if (thenValue.IsFunction()) { - Napi::Function then = thenValue.As(); - Napi::Function callback = - Napi::Function::New(env, [data](const Napi::CallbackInfo &info) { data->callback(); }); - then.Call(promise, {callback}); - return; + // `reader` might be null in certain cases, segmentation fault might happend without this null check. + if (reader) { + Napi::Value ret = jsCallback.Call({msg, reader->Value()}); + if (ret.IsPromise()) { + Napi::Promise promise = ret.As(); + Napi::Value thenValue = promise.Get("then"); + if (thenValue.IsFunction()) { + Napi::Function then = thenValue.As(); + Napi::Function callback = + Napi::Function::New(env, [data](const Napi::CallbackInfo &info) { data->callback(); }); + then.Call(promise, {callback}); + return; + } } } data->callback(); diff --git a/tests/reader.test.js b/tests/reader.test.js index 673ae603..56d1b48e 100644 --- a/tests/reader.test.js +++ b/tests/reader.test.js @@ -130,5 +130,46 @@ const baseUrl = 'http://localhost:8080'; await reader.close(); await client.close(); }); + + test('Reader should not throw segmentation fault when create and close', async () => { + const NUM_ITS = 1000; + const its = Array.from({ length: NUM_ITS }, (_, i) => i); + + const client = new Pulsar.Client({ + serviceUrl: 'pulsar://localhost:6650', + }); + + const producer = await client.createProducer({ + topic: 'persistent://public/default/my-topic', + sendTimeoutMs: 30000, + batchingEnabled: true, + }); + + // Send messages + for (let i = 0; i < 10; i += 1) { + const msg = `my-message-${i}`; + producer.send({ + data: Buffer.from(msg), + }); + console.log(`Sent message: ${msg}`); + } + await producer.flush(); + + await Promise.all( + its.map(async () => { + const reader = await client.createReader({ + topic: 'persistent://public/default/my-topic', + startMessageId: Pulsar.MessageId.earliest(), + listener: (message) => { + console.log(message.getData().toString()); + }, + }); + await reader.close(); + }), + ); + await producer.close(); + await client.close(); + expect(true).toBe(true); + }); }); })();