Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More heap usage safety back pressure (user controllable? back pressure based on unprocessed messages?) #217

Open
bitterfox opened this issue Oct 7, 2023 · 1 comment

Comments

@bitterfox
Copy link

We develop Kafka consumer using decaton such as

  • Consume messages from Kafka
  • Write messages into file and make a large file containing 10K and more messages
    • We would like create 100MB+ files, if single message size is 300B and 50% compression ratio, the number of messages contained will be 700K
    • Thus we specify huge decaton.max.pending.records like 100K, 1M
  • We'd like to commit offset for all messages are persisted in the file
    • We use external storage and we assume data is persisted when the file descriptor is closed successfully
    • So we don't like to commit offset until we close it

As we configure decaton.max.pending.records to huge number, decaton could consume the huge number of messages and push down to the processor.
The huge number for decaton.max.pending.records works fine for our application if the consumer consumes messages starting from empty and messages coming lower speed than processor can process.

Assuming the scenario that we have pending records like 100K, 1M for partition and the consumer node fails.
Then partition was rebalanced and decaton will consume messages up to decaton.max.pending.records like 100K, 1M faster than the processor can process.
So more than 100K messages might occupy heap usage until the processor processes all messages.
In such a scenario, we actually had OOME for our application.
We might have a similar scenario when restarting the application or resuming the application after stopping for several hours.

So I'm wondering decaton can be improved for this kind of usecase and decaton can have much more better back pressure strategy for heap usage safety.
I propose several possible options

  • Processor can control back pressure
  • Separate decaton.max.pending.records to 2 configurations like the max number of uncommitted messages and the max number of messages yet processed and decaton control back pressure based on these 2 numbers.
  • Make decaton consuming speed (rps) configuration and respect it
  • Make executor in ProcessUnit configurable and let us control block polling
@ocadaruma
Copy link
Contributor

Hi, thanks for reporting the issue.

Though your use-case sounds not so common, definitely it's hard to achieve in current Decaton features.

Separate decaton.max.pending.records to 2 configurations

This approach sounds like the most simple way.

I gonna try if we can support this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants