From 63a99070f315eb23f1c389207086379a1947977b Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 29 Aug 2024 19:58:12 +0800 Subject: [PATCH] Enable batching by default Keep the same behavior with other clients. --- pulsar/__init__.py | 4 ++-- tests/pulsar_test.py | 11 +++++++---- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/pulsar/__init__.py b/pulsar/__init__.py index fad33cd..c7369da 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -617,7 +617,7 @@ def create_producer(self, topic, max_pending_messages=1000, max_pending_messages_across_partitions=50000, block_if_queue_full=False, - batching_enabled=False, + batching_enabled=True, batching_max_messages=1000, batching_max_allowed_size_in_bytes=128*1024, batching_max_publish_delay_ms=10, @@ -670,7 +670,7 @@ def create_producer(self, topic, SNAPPY is supported since Pulsar 2.4. Consumers will need to be at least at that release in order to be able to receive messages compressed with SNAPPY. - batching_enabled: bool, default=False + batching_enabled: bool, default=True When automatic batching is enabled, multiple calls to `send` can result in a single batch to be sent to the broker, leading to better throughput, especially when publishing small messages. All messages in a batch will be published as a single batched message. The consumer will be delivered diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py index bb62d99..c76ef40 100755 --- a/tests/pulsar_test.py +++ b/tests/pulsar_test.py @@ -146,18 +146,21 @@ def test_producer_send_async(self): sent_messages = [] + event = threading.Event() def send_callback(producer, msg): sent_messages.append(msg) + if len(sent_messages) >= 3: + event.set() producer.send_async(b"hello", send_callback) producer.send_async(b"hello", send_callback) producer.send_async(b"hello", send_callback) - i = 0 - while len(sent_messages) < 3 and i < 100: - time.sleep(0.1) - i += 1 + event.wait(3000) self.assertEqual(len(sent_messages), 3) + for i in range(0, len(sent_messages)): + msg_id = sent_messages[i] + self.assertEqual(msg_id.batch_index(), i) client.close() def test_producer_send(self):