You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
kafka-python-ng falls on consumer.poll() method when some offset has empty data
We have 4 records in our Kafka queue. Offsets 0, 1, 3 have data, 2 data is missing (was deleted by producer after initial push I believe). I can see that with kcat (kafkacat utility)
kafka-python-ng treats that incorrectly and falls. I believe a null check is missing
kafka-python-ng 2.2.3
python 3.12
stack trace
20:53:39|ERROR|consumer.poll() error
Traceback (most recent call last):
File "/home/user/my-kafka/consumer.py", line 163, in fetch
records: Records = self.consumer.poll(self.fetch_timeout, self.batch_size)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/.pyenv/versions/food72-kafka/lib/python3.12/site-packages/kafka/consumer/group.py", line 663, in poll
records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/.pyenv/versions/food72-kafka/lib/python3.12/site-packages/kafka/consumer/group.py", line 718, in _poll_once
records, _ = self._fetcher.fetched_records(max_records, update_offsets=update_offsets)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/.pyenv/versions/food72-kafka/lib/python3.12/site-packages/kafka/consumer/fetcher.py", line 340, in fetched_records
self._next_partition_records = self._parse_fetched_data(completion)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/.pyenv/versions/food72-kafka/lib/python3.12/site-packages/kafka/consumer/fetcher.py", line 814, in _parse_fetched_data
unpacked = list(self._unpack_message_set(tp, records))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/.pyenv/versions/food72-kafka/lib/python3.12/site-packages/kafka/consumer/fetcher.py", line 469, in _unpack_message_set
value = self._deserialize(
^^^^^^^^^^^^^^^^^^
File "/home/user/.pyenv/versions/food72-kafka/lib/python3.12/site-packages/kafka/consumer/fetcher.py", line 507, in _deserialize
return f(bytes_)
^^^^^^^^^
File "/home/user/my-kafka/functions.py", line 29, in deserialize_body
return json.loads(value.decode('utf-8'))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/.pyenv/versions/3.12.5/lib/python3.12/json/__init__.py", line 346, in loads
return _default_decoder.decode(s)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/.pyenv/versions/3.12.5/lib/python3.12/json/decoder.py", line 337, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/.pyenv/versions/3.12.5/lib/python3.12/json/decoder.py", line 355, in raw_decode
raise JSONDecodeError("Expecting value", s, err.value) from None
The text was updated successfully, but these errors were encountered:
kafka-python-ng falls on
consumer.poll()
method when some offset has empty dataWe have 4 records in our Kafka queue. Offsets 0, 1, 3 have data, 2 data is missing (was deleted by producer after initial push I believe). I can see that with kcat (kafkacat utility)
kafka-python-ng treats that incorrectly and falls. I believe a null check is missing
kafka-python-ng 2.2.3
python 3.12
stack trace
The text was updated successfully, but these errors were encountered: