-
Notifications
You must be signed in to change notification settings - Fork 51
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add task batching doc #150
Changes from 1 commit
0b5959e
fbedebd
98cd998
777ee8f
fb2f1c2
c4fdfcf
332262a
675d58b
4753029
7b0112a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
= Task Batching | ||
:base_version: 4.0.0 | ||
:modules: processor | ||
|
||
== Introduction | ||
Task Batching is a feature to batch several tasks of type `T` to `List<T>` and process them at once. | ||
|
||
Decaton provides a *time-based* way and *size-based* way to batch multiple tasks. | ||
The *time-based* means that tasks in past linger time are processed on every linger time. | ||
The *size-based* means that tasks in past before reaching capacity are processed every time tasks’size reaches capacity. | ||
|
||
== When to use (Example) | ||
When downstream-DB supports batching I/O (which often very efficient) | ||
|
||
== Usage | ||
To use `Task Batching`, you only need to instantiate `BatchingProcessor`. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Practically, developers will create their own class that inherits BatchingProcessor rather direct instantiation I guess. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The description and examples have been updated to use |
||
|
||
=== BatchingProcessor | ||
`BatchingProcessor` is an abstract `DecatonProcessor` to batch several tasks of type `T` to `List<T>`. To instantiate this class, pass two arguments to constructor and implement one abstract method: | ||
|
||
|=== | ||
|parameter |Description | ||
|
||
|`lingerMillis` | ||
|Time limit for this processor. On every lingerMillis milliseconds, tasks in past lingerMillis milliseconds are pushed to `BatchingProcessor#processBatchingTasks(List)`. | ||
|
||
|`capacity` | ||
|Capacity size limit for this processor. Every time tasks’size reaches capacity, tasks in past before reaching capacity are pushed to `BatchingProcessor#processBatchingTasks(List)`. | ||
|=== | ||
|
||
|=== | ||
|abstract method |Description | ||
|
||
|`void processBatchingTasks(List<BatchingTask<T>> batchingTasks)` | ||
|When the size or time reach each limit, this method is called with stored `batchingTasks`. + | ||
After complete processing batch of tasks, *MUST* call `BatchingTask#completion` 's `DeferredCompletion#complete()` or `BatchingTask#context` 's `ProcessingContext#retry()` method for each `BatchingTask`. + | ||
The above methods is not called automatically even when an error occurs in this method, so design it so that they are called finally by yourself. Otherwise, consumption will stick. + | ||
`BatchingProcessor` realizes its function by using `ProcessingContext#deferCompletion()`. Reading `ProcessingContext#deferCompletion()` 's description will help you. + | ||
This method runs in different thread from the `process(ProcessingContext<T>, T)` thread. | ||
|=== | ||
|
||
Before getting started with the example, let's create a `HelloTask` domain object which can be used to simulate the scenario we described: | ||
|
||
[source,java] | ||
.HelloTask.java | ||
---- | ||
public class HelloTask { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's make this example more looks like "real", by naming this class as "InsertHelloTask" or something? :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, maybe I don't understand this comment properly. I created |
||
private String name; | ||
private int age; | ||
private String text; | ||
|
||
// ... (Getter and Setter) | ||
} | ||
---- | ||
Instantiate `BatchingProcessor`, as shown in the following example `BathingProcessor`: | ||
|
||
[source,java] | ||
.TaskBatchingMain.java | ||
---- | ||
public class TaskBatchingMain { | ||
public static void main(String[] args) { | ||
// ... (Set up other options like default DecatonProcessor) | ||
long lingerMillis = 1000; | ||
int capacity = 100; | ||
ProcessorSubscription subscription = | ||
SubscriptionBuilder.newBuilder("my-decaton-processor") | ||
.processorsBuilder( | ||
ProcessorsBuilder | ||
.consuming("my-decaton-topic", extractor) | ||
.thenProcess(TaskBatchingMain.createBatchingProcessor(lingerMillis, capacity), | ||
ProcessorScope.THREAD) | ||
) | ||
} | ||
|
||
private static BatchingProcessor<HelloTask> createBatchingProcessor(long lingerMillis, int capacity) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As above comment, I guess the practical usage would be implementing a class that inherits BatchingProcessor. Then how about fixing this example as like so? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I did the same. #150 (comment) |
||
return new BatchingProcessor<HelloTask>(lingerMillis, capacity) { // <1> | ||
@Override | ||
protected void processBatchingTasks(List<BatchingTask<HelloTask>> batchingTasks) { // <2> | ||
List<HelloTask> helloTasks = | ||
batchingTasks.stream().map(BatchingTask::task).collect(Collectors.toList()); | ||
// ... (Process helloTasks) | ||
batchingTasks.forEach(batchingTask -> batchingTask.completion().complete()); // <3> | ||
} | ||
}; | ||
} | ||
} | ||
---- | ||
<1> Pass `lingerMillis` and `capacity` to the constructor. | ||
<2> Implement `processBatchingTasks(List)` | ||
<3> Call `BatchingTask#completion` 's `DeferredCompletion#complete()`. | ||
|
||
== Implementation | ||
In this section, we will briefly explain how is Task Batching implemented. | ||
All the magic happened in `BatchingProcessor` when a task comes to this processor the following things happen: | ||
|
||
1. The task will be put into an in-memory window. | ||
2. When the size or time reach each limit, `processBatchingTasks(List)` is called with stored `batchingTasks`. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think what we should include in "Implementation" section is something that developers should notice which comes from its implementation detail (rather than its "API"), rather than just a "how this feature is implemented". e.g. In rate-limiting doc, it's mentioned that it adopts token-bucket algorithm which allows some "bursting" against configured rate-limit, so developers can notice that it's not suitable when they have to limit the rate strictly. (https://github.com/line/decaton/blob/master/docs/rate-limiting.adoc#implementation) Then, we don't need to have Implementation section for BatchingProcessor I think? Or do you have some ideas you want to mention in this section? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the detailed explanation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add
Caution:
somewhere to mention below?:BatchingProcessor
's scheduled executor threaddecaton.partition.concurrency
config. i.e.:Some users pointed out that this behavior might be confusing, so it's worth to mention.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! I've added it.
7b0112a