Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Question] Reacting to deletion of a topic being read through a consumer #1907

Open
shubhanshu02 opened this issue Jan 22, 2025 · 0 comments

Comments

@shubhanshu02
Copy link

I encountered a case where while I was reading through a topic, the topic was deleted and then created from the beginning. My consumer using confluent_kafka looks somewhat like this:

from confluent_kafka import Consumer

consumer = Consumer(config)
consumer.assign([TopicPartition(topic, partition, offset)])
while True:
    msgs = self._consumer.consume(1000, 0.1)
    for msg in msgs:
        ...

Case happening here:

  1. The topic is single-partitioned.
  2. When the topic is deleted, it is recreated within a few seconds and then ingestion starts.
  3. The consumer does not crash on consume() and continues to return empty msgs list.

Currently, I only receive the following line on the STDERR when the topic is deleted. Is it possible from the consumer side to know if such event occured, so that the program could react to this event?

%5|1737553425.511|PARTCNT|rdkafka#consumer-3| [thrd:main]: Topic <topic> partition count changed from 1 to 0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant