-
Notifications
You must be signed in to change notification settings - Fork 51
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
TaskMetadata as header #238
Conversation
* This class will live until the task process has been completed. | ||
* To lessen heap pressure, rawRequestBytes should be purged by calling this once the task is extracted. | ||
*/ | ||
public void purgeRawRequestBytes() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found that purging rawRequestBytes doesn't make much sense because ProcessingContextImpl.task.taskDataBytes
anyways hold bytes before extraction, and ProcessingContextImpl lives until the task is completed.
The exception is when task is serialized as DecatonTaskRequest
.
In this case, task.taskDataBytes
and ConsumerRecord#value
differs so purging latter might be effective though, since DecatonTaskRequest
is already deprecated, we don't need to take care of this case I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ProcessingContextImpl lives until the task is completed.
Not really if users implement it in this way?
Completion comp = context.deferCompletion();
executor.execute(() -> { ...; comp.complete(); });
In this case the reference for the ProcessingContext
itself gets cut hence GC-able?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
H-m that sounds right.
Let me consider about this again then
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reverted the code to still do purging after extraction 644474b
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me return once with the following comments
* under the License. | ||
*/ | ||
|
||
package com.linecorp.decaton.common; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, is this the only class to be placed in a common module? Since the processor module has dependency for the producer module I think we've been placing these kind of classes in the producer module, maybe that's sufficient?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good
import com.linecorp.decaton.protocol.Decaton.TaskMetadataProto; | ||
|
||
public class TaskMetadataUtil { | ||
private static final String METADATA_HEADER_KEY = "dt_meta"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was actually expecting to decompose protobuf struct into the individual header fields, because why not?
Have you considered pros/cons of putting a single serialized value into the header vs decomposed primitive fields into multiple headers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Considering below pros/cons, I decided to choose single protobuf value.
Decomposed primitive fields
- Pros
- We can access each field without decoding entire metadata (not sure about the concrete use case though)
- Cons
- Since header is just a
Map<String, byte[]>
essentially, we need to decide how to encode primitives as byte array (endianness, charset encoding, varint or fixed length, ...)
- Since header is just a
Encoded protobuf in header
- Pros
- Above "Cons" doesn't exist. The byte-array representation is defined by protobuf spec
- Cons
- Above "Pros" is not possible
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure about the concrete use case though
only debug cases I think.
ok, I think your point is fair. let's go with the current way then.
* <p> | ||
* <b>CAUTION!!! YOU MAY NEED TO SET THIS TO FALSE WHEN YOU UPGRADE FROM 8.0.1 OR EARLIER</b> | ||
* <p> | ||
* Please read <a href="https://github.com/line/decaton/releases/tag/v8.0.1">Decaton 9.0.0 Release Note</a> carefully. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: update link?
* If the method throws an exception, the task will be discarded and processor continues to process subsequent tasks. | ||
*/ | ||
DecatonTask<T> extract(byte[] bytes); | ||
DecatonTask<T> extract(ConsumerRecord<byte[], byte[]> record); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given our current situation would it be a good idea to expose more kafka specific interfaces in our public interfaces?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that's what I wondered.
Giving full access to ConsumerRecord might be useful for advanced users, but at the same time it makes Decaton to couple with Kafka tightly.
I propose below strategy. WDYT?
- Creating a class to hold necessary information (e.g. ConsumedRecord? name TBD. Which will include
key: byte[], value: byte[], headers: Headers
) - Change TaskExtractor#extract signature to
extract(ConsumedRecord record)
- Why not
extract(key: byte[], value: byte[], headers: Headers)
?- To prevent extract() signature change in the future when we want to provide more field, which need all users to rewrite their extractor impls
- Why not
* This class will live until the task process has been completed. | ||
* To lessen heap pressure, rawRequestBytes should be purged by calling this once the task is extracted. | ||
*/ | ||
public void purgeRawRequestBytes() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ProcessingContextImpl lives until the task is completed.
Not really if users implement it in this way?
Completion comp = context.deferCompletion();
executor.execute(() -> { ...; comp.complete(); });
In this case the reference for the ProcessingContext
itself gets cut hence GC-able?
} | ||
|
||
public CompletableFuture<PutTaskResult> sendRequest(byte[] key, DecatonTaskRequest request, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So is this class now essentially a slightly different KafkaProducer implementation which at least gives CF as the returning value of sendRequest? Does it worth it to keep the class itself then..?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It also fill preset configs (e.g. acks=all).
So I think it's worth to keep it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay
7984c60
to
644474b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just a few minor points.
@@ -0,0 +1,58 @@ | |||
/* | |||
* Copyright 2020 LINE Corporation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad. Gonna fix
* If the method throws an exception, the task will be discarded and processor continues to process subsequent tasks. | ||
*/ | ||
DecatonTask<T> extract(byte[] bytes); | ||
DecatonTask<T> extract(ConsumedRecord record); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we're effectively exposing kafka specific API (Headers) to users through this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
In fact, we already expose Headers through ProcessingContext#headers
.
We might extend Decaton to support other type of mq in the future though, then we need to make change to ProcessingContext#headers
too so I think we're ok to expose headers for now.
Of course, we should be cautious to avoid making Decaton couple to specific mq as much as possible though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, cool 👍
Rework of #80 since it's too outdated.
Motivation
DecatonTaskRequest
protobuf format because when Decaton had started, Kafka didn't have record header yetSummary of changes
Deprecate DecatonTaskRequest
.internal
and marked as deprecatedDecatonClient
TaskExtractor#extract signature change
TaskExtractor#extract
now acceptsConsumerRecord<byte[], byte[]>
instead of justbyte[]
ProcessorProperties, RetryQueueingProcessor
decaton.task.metadata.as.header
config is introduced. This is to control producing retry tasks with header-metadata or in deprecated DecatonTaskRequest format.Trivial
Breaking changes
DecatonTaskRequest
package change tocom.linecorp.decaton.protocol.internal
DecatonTaskRequest
DecatonTaskRequest
unlessdecaton.task.metadata.as.header
is set to falseIMPORTANT NOTE
decaton.task.metadata.as.header = false
decaton.task.metadata.as.header = true
on your timing laterTODOs