Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose TopicPartition and offset from ProcessingContext #216

Open
bitterfox opened this issue Oct 7, 2023 · 2 comments
Open

Expose TopicPartition and offset from ProcessingContext #216

bitterfox opened this issue Oct 7, 2023 · 2 comments

Comments

@bitterfox
Copy link

We develop Kafka consume using decaton such as

  • Consume messages from Kafka
  • Write messages into file and make a large file containing 10K and more messages
    • We would like create 100MB+ files, if single message size is 300B and 50% compression ratio, the number of messages contained will be 700K
    • Thus we specify huge decaton.max.pending.records like 100K, 1M
  • We'd like to commit offset for all messages are persisted in the file
    • We use external storage and we assume data is persisted when the file descriptor is closed successfully
    • So we don't like to commit offset until we close it

We used deferred completion and tried to keep all deferred completions, however, we figured out we need huge heap, otherwise OOME.
Finally, we solved this by keeping the deferred completion for the smallest offset for each topic partition per file.

This worked well, however, we need ugly and fragile code to take topic partition and offset for current message from processing context like

        if (context is ProcessingContextImpl) {
            try {
                val clazz = ProcessingContextImpl::class.java
                val requestField = clazz.getDeclaredField("request")
                requestField.isAccessible = true
                val request = requestField.get(context) as TaskRequest
                val topicPartition = request.topicPartition()
                val offset = request.recordOffset()
                return Pair(topicPartition, offset)
            } catch (e: ReflectiveOperationException) {
                logger.debug("Fallback to take topicPartitionAndOffset from logging context", e)
            }
        }

        context.loggingContext().use {
            val topic = MDC.get(LoggingContext.TOPIC_KEY)
            if (topic == null) {
                logger.warn(
                    "Cannot find topicPartitionAndOffset from MDC, " +
                        "configure decaton.logging.mdc.enabled=true",
                )
                return Pair(TopicPartition("dummy", 0), 0)
            }
            val partition = MDC.get(LoggingContext.PARTITION_KEY).toInt()
            val offset = MDC.get(LoggingContext.OFFSET_KEY).toLong()

            return Pair(TopicPartition(topic, partition), offset)
        }

I'm wondering if ProcessingContext can provide such info officially, and it would be useful for some usecase for advanced users like us.

If you don't like to expose interface of Kafka, I'd like to have an object having methods to

  • check the task A is coming from the same group (topic partition for kafka) of another task B (A.topic-partition == B.topic-parition?)
  • check the task A is prior than another task B in the same group (A.offset < B.offset?)
@ocadaruma
Copy link
Contributor

Thanks for reporting the issue.

I have a question about the situation.

Finally, we solved this by keeping the deferred completion for the smallest offset for each topic partition per file.

Does it mean, just holding DeferredCompletion instances would cause OOME? (i.e. Even we don't hold task data itself)

I wonder if solving #217 is enough for your problem or we still need workaround to track smallest DeferredCompletion even we address #217.

@bitterfox
Copy link
Author

Right, DeferredCompletion and related (CompletableFuture and TaskRequest) objects consume memory.
Also, the list contains DeferredCompletion consumes memory.

So we still need workaround for DeferredCompletion even if #217 is solved.
#217 is an improvement when consuming many messages faster than processor in case of huge decaton.max.pending.records.

I'll share the heap analysis image with you in a different channel.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants