Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add task batching doc #150

Merged
merged 10 commits into from
Jul 30, 2024
2 changes: 1 addition & 1 deletion docs/example/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ plugins {

ext {
DECATON_VERSION = getProperty("version") + (getProperty("snapshot").toBoolean() ? "-SNAPSHOT" : "")
PROTOBUF_VERSION = "3.3.0"
PROTOBUF_VERSION = "3.22.3"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The example build failed, so I upgraded it.

CENTRALDOGMA_VERSION = "0.52.5"
KAFKA_VERSION = "2.4.0"
}
Expand Down
78 changes: 78 additions & 0 deletions docs/example/src/main/java/example/TaskBatchingMain.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2020 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package example;

import java.io.IOException;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;

import com.fasterxml.jackson.databind.ObjectMapper;

import com.linecorp.decaton.processor.TaskMetadata;
import com.linecorp.decaton.processor.processors.BatchingProcessor;
import com.linecorp.decaton.processor.runtime.DecatonTask;
import com.linecorp.decaton.processor.runtime.ProcessorScope;
import com.linecorp.decaton.processor.runtime.ProcessorSubscription;
import com.linecorp.decaton.processor.runtime.ProcessorsBuilder;
import com.linecorp.decaton.processor.runtime.SubscriptionBuilder;
import com.linecorp.decaton.processor.runtime.TaskExtractor;
import com.linecorp.decaton.protocol.Sample.HelloTask;

import example.processors.InsertHelloTaskBatchingProcessor;

public class TaskBatchingMain {
public static void main(String[] args) throws Exception {
Properties consumerConfig = new Properties();
consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "my-decaton-processor");
String bootstrapServers = System.getProperty("bootstrap.servers", "localhost:9092");
consumerConfig.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-decaton-processor");

TaskExtractor<HelloTask> extractor = bytes -> {
TaskMetadata metadata = TaskMetadata.builder().build();
HelloTask data;
try {
data = new ObjectMapper().readValue(bytes, HelloTask.class);
} catch (IOException e) {
throw new RuntimeException(e);
}

return new DecatonTask<>(metadata, data, bytes);
};
long lingerMillis = 1000;
int capacity = 100;
ProcessorSubscription subscription =
SubscriptionBuilder.newBuilder("my-decaton-processor")
.processorsBuilder(
ProcessorsBuilder
.consuming("my-decaton-topic", extractor)
.thenProcess(() -> createBatchingProcessor(lingerMillis,
capacity),
ProcessorScope.THREAD)
)
.consumerConfig(consumerConfig)
.buildAndStart();

Thread.sleep(10000);
subscription.close();
}

private static BatchingProcessor<HelloTask> createBatchingProcessor(long lingerMillis, int capacity) {
return new InsertHelloTaskBatchingProcessor(lingerMillis, capacity);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2020 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package example.processors;

import java.util.List;
import java.util.stream.Collectors;

import com.linecorp.decaton.processor.processors.BatchingProcessor;
import com.linecorp.decaton.protocol.Sample.HelloTask;

public class InsertHelloTaskBatchingProcessor extends BatchingProcessor<HelloTask> {
public InsertHelloTaskBatchingProcessor(long lingerMillis, int capacity) {
super(lingerMillis, capacity);
}

@Override
protected void processBatchingTasks(List<BatchingTask<HelloTask>> batchingTasks) {
List<HelloTask> helloTasks =
batchingTasks.stream().map(BatchingTask::task).collect(Collectors.toList());
helloTasks.forEach(
task -> System.out.println("Processing task: " + task) // (If it's real, insert tasks.)
);
batchingTasks.forEach(batchingTask -> batchingTask.completion().complete());
}
}
114 changes: 114 additions & 0 deletions docs/task-batching.adoc
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add Caution: somewhere to mention below?:

  • batch-flush is done in BatchingProcessor's scheduled executor thread
  • Which means, the parallelism of flushing has to be controlled by ProcessorScope, not only by decaton.partition.concurrency config. i.e.:
    • parallelize flushing per partition: ProcessorScope.PARTITION
    • parallelize flushing per processor thread: ProcessorScope.THREAD

Some users pointed out that this behavior might be confusing, so it's worth to mention.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I've added it.
7b0112a

Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
= Task Batching
:base_version: 8.0.0
:modules: processor

== Introduction
Task Batching is a feature to batch several tasks of type `T` to `List<T>` and process them at once.

Decaton provides a *time-based* way and *size-based* way to batch multiple tasks.
The *time-based* means that tasks in past linger time are processed on every linger time.
The *size-based* means that tasks in past before reaching capacity are processed every time tasks’size reaches capacity.

== When to use (Example)
When downstream-DB supports batching I/O (which often very efficient)

== Usage
To use `Task Batching`, you need to create a class that inherits `BatchingProcessor`.

=== BatchingProcessor
`BatchingProcessor` is an abstract `DecatonProcessor` to batch several tasks of type `T` to `List<T>`. To implement this class, pass two arguments to the constructor and implement one abstract method:

|===
|parameter |Description

|`lingerMillis`
|Time limit for this processor. On every lingerMillis milliseconds, tasks in past lingerMillis milliseconds are pushed to `BatchingProcessor#processBatchingTasks(List)`.

|`capacity`
|Capacity size limit for this processor. Every time tasks’size reaches capacity, tasks in past before reaching capacity are pushed to `BatchingProcessor#processBatchingTasks(List)`.
|===

|===
|abstract method |Description

|`void processBatchingTasks(List<BatchingTask<T>> batchingTasks)`
|When the size or time reach each limit, this method is called with stored `batchingTasks`. +
After complete processing batch of tasks, *MUST* call `BatchingTask#completion` 's `DeferredCompletion#complete()` or `BatchingTask#context` 's `ProcessingContext#retry()` method for each `BatchingTask`. +
The above methods is not called automatically even when an error occurs in this method, so design it so that they are called finally by yourself. Otherwise, consumption will stick. +
`BatchingProcessor` realizes its function by using `ProcessingContext#deferCompletion()`. Reading `ProcessingContext#deferCompletion()` 's description will help you. +
This method runs in different thread from the `process(ProcessingContext<T>, T)` thread.
|===

[CAUTION]
====
* Batch-flush is done in `BatchingProcessor` 's scheduled executor thread.
* This means the parallelism of flushing has to be controlled by `ProcessorScope`, not only by `decaton.partition.concurrency` config. i.e.:

** Parallelize flushing per partition: `ProcessorScope.PARTITION`
** Parallelize flushing per processor thread: `ProcessorScope.THREAD`
====

Before getting started with the example, let's create a `HelloTask` domain object which can be used to simulate the scenario we described:

[source,java]
.HelloTask.java
----
public class HelloTask {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make this example more looks like "real", by naming this class as "InsertHelloTask" or something? :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, maybe I don't understand this comment properly.

I created InsertHelloTaskBatchingProcessor and wrote the insertion points in the commet, but please let me know if there are any problems.
https://github.com/line/decaton/pull/150/files#diff-6cfdc5f934448411f9f9ff81c4cf114233a240ab01765dec212c7eac322f9961R70

private String name;
private int age;
private String text;

// ... (Getter and Setter)
}
----
Create a class that inherits `BatchingProcessor`, as shown in the following example `InsertHelloTaskBatchingProcessor`:

[source,java]
.InsertHelloTaskBatchingProcessor.java
----
public class InsertHelloTaskBatchingProcessor extends BatchingProcessor<HelloTask> {
public InsertHelloTaskBatchingProcessor(long lingerMillis, int capacity) {
super(lingerMillis, capacity); // <1>
}

@Override
protected void processBatchingTasks(List<BatchingTask<HelloTask>> batchingTasks) { // <2>
List<HelloTask> helloTasks =
batchingTasks.stream().map(BatchingTask::task).collect(Collectors.toList());
helloTasks.forEach(
task -> System.out.println("Processing task: " + task) // <3> (If it's real, insert tasks.)
);
batchingTasks.forEach(batchingTask -> batchingTask.completion().complete());
}
}
----
Then, use this `InsertHelloTaskBatchingProcessor` class in your `ProcessorSubscription` setup:

[source,java]
.TaskBatchingMain.java
----
public class TaskBatchingMain {
public static void main(String[] args) {
// ... (Set up other options like default DecatonProcessor)
long lingerMillis = 1000;
int capacity = 100;
ProcessorSubscription subscription =
SubscriptionBuilder.newBuilder("my-decaton-processor")
.processorsBuilder(
ProcessorsBuilder
.consuming("my-decaton-topic", extractor)
.thenProcess(() -> createBatchingProcessor(lingerMillis,
capacity),
ProcessorScope.THREAD)
)
// ... (Set up other options and build)
}

private static BatchingProcessor<HelloTask> createBatchingProcessor(long lingerMillis, int capacity) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above comment, I guess the practical usage would be implementing a class that inherits BatchingProcessor.

Then how about fixing this example as like so?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did the same. #150 (comment)

return new InsertHelloTaskBatchingProcessor(lingerMillis, capacity); // <1>
}
}
----
<1> Pass `lingerMillis` and `capacity` to the constructor.
<2> Implement `processBatchingTasks(List)`.
<3> Call `BatchingTask#completion` 's `DeferredCompletion#complete()`.
Loading