From 0b5959e52492ada006cec971275ca4b6933fcaec Mon Sep 17 00:00:00 2001 From: RyotaYamagishi Date: Sun, 10 Apr 2022 21:14:05 +0900 Subject: [PATCH 01/10] Add task batching doc --- docs/task-batching.adoc | 97 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 97 insertions(+) create mode 100644 docs/task-batching.adoc diff --git a/docs/task-batching.adoc b/docs/task-batching.adoc new file mode 100644 index 00000000..06b2e5f1 --- /dev/null +++ b/docs/task-batching.adoc @@ -0,0 +1,97 @@ += Task Batching +:base_version: 4.0.0 +:modules: processor + +== Introduction +Task Batching is a feature to batch several tasks of type `T` to `List` and process them at once. + +Decaton provides a *time-based* way and *size-based* way to batch multiple tasks. +The *time-based* means that tasks in past linger time are processed on every linger time. +The *size-based* means that tasks in past before reaching capacity are processed every time tasks’size reaches capacity. + +== When to use (Example) +When downstream-DB supports batching I/O (which often very efficient) + +== Usage +To use `Task Batching`, you only need to instantiate `BatchingProcessor`. + +=== BatchingProcessor +`BatchingProcessor` is an abstract `DecatonProcessor` to batch several tasks of type `T` to `List`. To instantiate this class, pass two arguments to constructor and implement one abstract method: + +|=== +|parameter |Description + +|`lingerMillis` +|Time limit for this processor. On every lingerMillis milliseconds, tasks in past lingerMillis milliseconds are pushed to `BatchingProcessor#processBatchingTasks(List)`. + +|`capacity` +|Capacity size limit for this processor. Every time tasks’size reaches capacity, tasks in past before reaching capacity are pushed to `BatchingProcessor#processBatchingTasks(List)`. +|=== + +|=== +|abstract method |Description + +|`void processBatchingTasks(List> batchingTasks)` +|When the size or time reach each limit, this method is called with stored `batchingTasks`. + +After complete processing batch of tasks, *MUST* call `BatchingTask#completion` 's `DeferredCompletion#complete()` or `BatchingTask#context` 's `ProcessingContext#retry()` method for each `BatchingTask`. + +The above methods is not called automatically even when an error occurs in this method, so design it so that they are called finally by yourself. Otherwise, consumption will stick. + +`BatchingProcessor` realizes its function by using `ProcessingContext#deferCompletion()`. Reading `ProcessingContext#deferCompletion()` 's description will help you. + +This method runs in different thread from the `process(ProcessingContext, T)` thread. +|=== + +Before getting started with the example, let's create a `HelloTask` domain object which can be used to simulate the scenario we described: + +[source,java] +.HelloTask.java +---- +public class HelloTask { + private String name; + private int age; + private String text; + + // ... (Getter and Setter) +} +---- +Instantiate `BatchingProcessor`, as shown in the following example `BathingProcessor`: + +[source,java] +.TaskBatchingMain.java +---- +public class TaskBatchingMain { + public static void main(String[] args) { + // ... (Set up other options like default DecatonProcessor) + long lingerMillis = 1000; + int capacity = 100; + ProcessorSubscription subscription = + SubscriptionBuilder.newBuilder("my-decaton-processor") + .processorsBuilder( + ProcessorsBuilder + .consuming("my-decaton-topic", extractor) + .thenProcess(TaskBatchingMain.createBatchingProcessor(lingerMillis, capacity), + ProcessorScope.THREAD) + ) + } + + private static BatchingProcessor createBatchingProcessor(long lingerMillis, int capacity) { + return new BatchingProcessor(lingerMillis, capacity) { // <1> + @Override + protected void processBatchingTasks(List> batchingTasks) { // <2> + List helloTasks = + batchingTasks.stream().map(BatchingTask::task).collect(Collectors.toList()); + // ... (Process helloTasks) + batchingTasks.forEach(batchingTask -> batchingTask.completion().complete()); // <3> + } + }; + } +} +---- +<1> Pass `lingerMillis` and `capacity` to the constructor. +<2> Implement `processBatchingTasks(List)` +<3> Call `BatchingTask#completion` 's `DeferredCompletion#complete()`. + +== Implementation +In this section, we will briefly explain how is Task Batching implemented. +All the magic happened in `BatchingProcessor` when a task comes to this processor the following things happen: + +1. The task will be put into an in-memory window. +2. When the size or time reach each limit, `processBatchingTasks(List)` is called with stored `batchingTasks`. From fbedebd6e542875d111a638e6ec98d12af8fb263 Mon Sep 17 00:00:00 2001 From: RyotaYamagishi Date: Sat, 27 Jul 2024 13:24:50 +0900 Subject: [PATCH 02/10] Fix task batching doc to explain how to create a class that inherits BatchingProcessor instead of direct instantiation. --- docs/task-batching.adoc | 37 ++++++++++++++++++++++++------------- 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/docs/task-batching.adoc b/docs/task-batching.adoc index 06b2e5f1..52a4fff5 100644 --- a/docs/task-batching.adoc +++ b/docs/task-batching.adoc @@ -13,10 +13,10 @@ The *size-based* means that tasks in past before reaching capacity are processed When downstream-DB supports batching I/O (which often very efficient) == Usage -To use `Task Batching`, you only need to instantiate `BatchingProcessor`. +To use `Task Batching`, you need to create a class that inherits `BatchingProcessor`. === BatchingProcessor -`BatchingProcessor` is an abstract `DecatonProcessor` to batch several tasks of type `T` to `List`. To instantiate this class, pass two arguments to constructor and implement one abstract method: +`BatchingProcessor` is an abstract `DecatonProcessor` to batch several tasks of type `T` to `List`. To implement this class, pass two arguments to the constructor and implement one abstract method: |=== |parameter |Description @@ -52,7 +52,26 @@ public class HelloTask { // ... (Getter and Setter) } ---- -Instantiate `BatchingProcessor`, as shown in the following example `BathingProcessor`: +Create a class that inherits `BatchingProcessor`, as shown in the following example `InsertHelloTask`: + +[source,java] +.InsertHelloTask.java +---- +public class InsertHelloTask extends BatchingProcessor { + public InsertHelloTask(long lingerMillis, int capacity) { + super(lingerMillis, capacity); // <1> + } + + @Override + protected void processBatchingTasks(List> batchingTasks) { // <2> + List helloTasks = + batchingTasks.stream().map(BatchingTask::task).collect(Collectors.toList()); + // ... (Process helloTasks) + batchingTasks.forEach(batchingTask -> batchingTask.completion().complete()); // <3> + } +} +---- +Then, use this `InsertHelloTask` class in your `ProcessorSubscription` setup: [source,java] .TaskBatchingMain.java @@ -73,20 +92,12 @@ public class TaskBatchingMain { } private static BatchingProcessor createBatchingProcessor(long lingerMillis, int capacity) { - return new BatchingProcessor(lingerMillis, capacity) { // <1> - @Override - protected void processBatchingTasks(List> batchingTasks) { // <2> - List helloTasks = - batchingTasks.stream().map(BatchingTask::task).collect(Collectors.toList()); - // ... (Process helloTasks) - batchingTasks.forEach(batchingTask -> batchingTask.completion().complete()); // <3> - } - }; + return new InsertHelloTask(lingerMillis, capacity); // <1> } } ---- <1> Pass `lingerMillis` and `capacity` to the constructor. -<2> Implement `processBatchingTasks(List)` +<2> Implement `processBatchingTasks(List)`. <3> Call `BatchingTask#completion` 's `DeferredCompletion#complete()`. == Implementation From 98cd9987cb31b70c371483a6ccad09e1b8102dcf Mon Sep 17 00:00:00 2001 From: RyotaYamagishi Date: Sat, 27 Jul 2024 13:27:26 +0900 Subject: [PATCH 03/10] Remove Implementation section from task batching doc --- docs/task-batching.adoc | 7 ------- 1 file changed, 7 deletions(-) diff --git a/docs/task-batching.adoc b/docs/task-batching.adoc index 52a4fff5..bf99ba99 100644 --- a/docs/task-batching.adoc +++ b/docs/task-batching.adoc @@ -99,10 +99,3 @@ public class TaskBatchingMain { <1> Pass `lingerMillis` and `capacity` to the constructor. <2> Implement `processBatchingTasks(List)`. <3> Call `BatchingTask#completion` 's `DeferredCompletion#complete()`. - -== Implementation -In this section, we will briefly explain how is Task Batching implemented. -All the magic happened in `BatchingProcessor` when a task comes to this processor the following things happen: - -1. The task will be put into an in-memory window. -2. When the size or time reach each limit, `processBatchingTasks(List)` is called with stored `batchingTasks`. From 777ee8f0d15f041ee480e57089ad0f43c73453e4 Mon Sep 17 00:00:00 2001 From: RyotaYamagishi Date: Sat, 27 Jul 2024 13:27:54 +0900 Subject: [PATCH 04/10] Upgrade task batching doc base version --- docs/task-batching.adoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/task-batching.adoc b/docs/task-batching.adoc index bf99ba99..55010795 100644 --- a/docs/task-batching.adoc +++ b/docs/task-batching.adoc @@ -1,5 +1,5 @@ = Task Batching -:base_version: 4.0.0 +:base_version: 8.0.0 :modules: processor == Introduction From fb2f1c295746e7a99e46334399219af02f00fac2 Mon Sep 17 00:00:00 2001 From: RyotaYamagishi Date: Sat, 27 Jul 2024 15:34:15 +0900 Subject: [PATCH 05/10] Add TaskBatchingMain --- .../main/java/example/TaskBatchingMain.java | 78 +++++++++++++++++++ .../example/processors/InsertHelloTask.java | 37 +++++++++ 2 files changed, 115 insertions(+) create mode 100644 docs/example/src/main/java/example/TaskBatchingMain.java create mode 100644 docs/example/src/main/java/example/processors/InsertHelloTask.java diff --git a/docs/example/src/main/java/example/TaskBatchingMain.java b/docs/example/src/main/java/example/TaskBatchingMain.java new file mode 100644 index 00000000..a26bd524 --- /dev/null +++ b/docs/example/src/main/java/example/TaskBatchingMain.java @@ -0,0 +1,78 @@ +/* + * Copyright 2020 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package example; + +import java.io.IOException; +import java.util.Properties; + +import org.apache.kafka.clients.consumer.ConsumerConfig; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import com.linecorp.decaton.processor.TaskMetadata; +import com.linecorp.decaton.processor.processors.BatchingProcessor; +import com.linecorp.decaton.processor.runtime.DecatonTask; +import com.linecorp.decaton.processor.runtime.ProcessorScope; +import com.linecorp.decaton.processor.runtime.ProcessorSubscription; +import com.linecorp.decaton.processor.runtime.ProcessorsBuilder; +import com.linecorp.decaton.processor.runtime.SubscriptionBuilder; +import com.linecorp.decaton.processor.runtime.TaskExtractor; +import com.linecorp.decaton.protocol.Sample.HelloTask; + +import example.processors.InsertHelloTask; + +public class TaskBatchingMain { + public static void main(String[] args) throws Exception { + Properties consumerConfig = new Properties(); + consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "my-decaton-processor"); + String bootstrapServers = System.getProperty("bootstrap.servers", "localhost:9092"); + consumerConfig.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-decaton-processor"); + + TaskExtractor extractor = bytes -> { + TaskMetadata metadata = TaskMetadata.builder().build(); + HelloTask data; + try { + data = new ObjectMapper().readValue(bytes, HelloTask.class); + } catch (IOException e) { + throw new RuntimeException(e); + } + + return new DecatonTask<>(metadata, data, bytes); + }; + long lingerMillis = 1000; + int capacity = 100; + ProcessorSubscription subscription = + SubscriptionBuilder.newBuilder("my-decaton-processor") + .processorsBuilder( + ProcessorsBuilder + .consuming("my-decaton-topic", extractor) + .thenProcess(() -> createBatchingProcessor(lingerMillis, + capacity), + ProcessorScope.THREAD) + ) + .consumerConfig(consumerConfig) + .buildAndStart(); + + Thread.sleep(10000); + subscription.close(); + } + + private static BatchingProcessor createBatchingProcessor(long lingerMillis, int capacity) { + return new InsertHelloTask(lingerMillis, capacity); // <1> + } +} diff --git a/docs/example/src/main/java/example/processors/InsertHelloTask.java b/docs/example/src/main/java/example/processors/InsertHelloTask.java new file mode 100644 index 00000000..1ce136be --- /dev/null +++ b/docs/example/src/main/java/example/processors/InsertHelloTask.java @@ -0,0 +1,37 @@ +/* + * Copyright 2020 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package example.processors; + +import java.util.List; +import java.util.stream.Collectors; + +import com.linecorp.decaton.processor.processors.BatchingProcessor; +import com.linecorp.decaton.protocol.Sample.HelloTask; + +public class InsertHelloTask extends BatchingProcessor { + public InsertHelloTask(long lingerMillis, int capacity) { + super(lingerMillis, capacity); + } + + @Override + protected void processBatchingTasks(List> batchingTasks) { + List helloTasks = + batchingTasks.stream().map(BatchingTask::task).collect(Collectors.toList()); + helloTasks.forEach(task -> System.out.println("Processing task: " + task)); // (Process helloTasks) + batchingTasks.forEach(batchingTask -> batchingTask.completion().complete()); + } +} From c4fdfcfdc3d40d8894794ce3a83be526e09a4142 Mon Sep 17 00:00:00 2001 From: RyotaYamagishi Date: Sat, 27 Jul 2024 15:34:41 +0900 Subject: [PATCH 06/10] Upgrade example's PROTOBUF_VERSION --- docs/example/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/example/build.gradle b/docs/example/build.gradle index eef2ffc7..6dc8617a 100644 --- a/docs/example/build.gradle +++ b/docs/example/build.gradle @@ -7,7 +7,7 @@ plugins { ext { DECATON_VERSION = getProperty("version") + (getProperty("snapshot").toBoolean() ? "-SNAPSHOT" : "") - PROTOBUF_VERSION = "3.3.0" + PROTOBUF_VERSION = "3.22.3" CENTRALDOGMA_VERSION = "0.52.5" KAFKA_VERSION = "2.4.0" } From 332262a397ada492036ce6c01698288ce1618a0f Mon Sep 17 00:00:00 2001 From: RyotaYamagishi Date: Sat, 27 Jul 2024 15:37:43 +0900 Subject: [PATCH 07/10] Fix task batching doc to have the same code example.TaskBatchingMain --- docs/task-batching.adoc | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/docs/task-batching.adoc b/docs/task-batching.adoc index 55010795..7607b69e 100644 --- a/docs/task-batching.adoc +++ b/docs/task-batching.adoc @@ -65,7 +65,7 @@ public class InsertHelloTask extends BatchingProcessor { @Override protected void processBatchingTasks(List> batchingTasks) { // <2> List helloTasks = - batchingTasks.stream().map(BatchingTask::task).collect(Collectors.toList()); + batchingTasks.stream().map(BatchingTask::task).collect(Collectors.toList()); // ... (Process helloTasks) batchingTasks.forEach(batchingTask -> batchingTask.completion().complete()); // <3> } @@ -86,9 +86,11 @@ public class TaskBatchingMain { .processorsBuilder( ProcessorsBuilder .consuming("my-decaton-topic", extractor) - .thenProcess(TaskBatchingMain.createBatchingProcessor(lingerMillis, capacity), + .thenProcess(() -> createBatchingProcessor(lingerMillis, + capacity), ProcessorScope.THREAD) ) + // ... (Set up other options and build) } private static BatchingProcessor createBatchingProcessor(long lingerMillis, int capacity) { From 675d58bf780b9aa6b51fd86307209d4880adba48 Mon Sep 17 00:00:00 2001 From: RyotaYamagishi Date: Sat, 27 Jul 2024 15:59:33 +0900 Subject: [PATCH 08/10] Rename task batching doc classes --- .../src/main/java/example/TaskBatchingMain.java | 4 ++-- ....java => InsertHelloTaskBatchingProcessor.java} | 8 +++++--- docs/task-batching.adoc | 14 ++++++++------ 3 files changed, 15 insertions(+), 11 deletions(-) rename docs/example/src/main/java/example/processors/{InsertHelloTask.java => InsertHelloTaskBatchingProcessor.java} (79%) diff --git a/docs/example/src/main/java/example/TaskBatchingMain.java b/docs/example/src/main/java/example/TaskBatchingMain.java index a26bd524..d26a6b10 100644 --- a/docs/example/src/main/java/example/TaskBatchingMain.java +++ b/docs/example/src/main/java/example/TaskBatchingMain.java @@ -33,7 +33,7 @@ import com.linecorp.decaton.processor.runtime.TaskExtractor; import com.linecorp.decaton.protocol.Sample.HelloTask; -import example.processors.InsertHelloTask; +import example.processors.InsertHelloTaskBatchingProcessor; public class TaskBatchingMain { public static void main(String[] args) throws Exception { @@ -73,6 +73,6 @@ public static void main(String[] args) throws Exception { } private static BatchingProcessor createBatchingProcessor(long lingerMillis, int capacity) { - return new InsertHelloTask(lingerMillis, capacity); // <1> + return new InsertHelloTaskBatchingProcessor(lingerMillis, capacity); // <1> } } diff --git a/docs/example/src/main/java/example/processors/InsertHelloTask.java b/docs/example/src/main/java/example/processors/InsertHelloTaskBatchingProcessor.java similarity index 79% rename from docs/example/src/main/java/example/processors/InsertHelloTask.java rename to docs/example/src/main/java/example/processors/InsertHelloTaskBatchingProcessor.java index 1ce136be..cbc8405f 100644 --- a/docs/example/src/main/java/example/processors/InsertHelloTask.java +++ b/docs/example/src/main/java/example/processors/InsertHelloTaskBatchingProcessor.java @@ -22,8 +22,8 @@ import com.linecorp.decaton.processor.processors.BatchingProcessor; import com.linecorp.decaton.protocol.Sample.HelloTask; -public class InsertHelloTask extends BatchingProcessor { - public InsertHelloTask(long lingerMillis, int capacity) { +public class InsertHelloTaskBatchingProcessor extends BatchingProcessor { + public InsertHelloTaskBatchingProcessor(long lingerMillis, int capacity) { super(lingerMillis, capacity); } @@ -31,7 +31,9 @@ public InsertHelloTask(long lingerMillis, int capacity) { protected void processBatchingTasks(List> batchingTasks) { List helloTasks = batchingTasks.stream().map(BatchingTask::task).collect(Collectors.toList()); - helloTasks.forEach(task -> System.out.println("Processing task: " + task)); // (Process helloTasks) + helloTasks.forEach( + task -> System.out.println("Processing task: " + task) // (If it's real, insert tasks.) + ); batchingTasks.forEach(batchingTask -> batchingTask.completion().complete()); } } diff --git a/docs/task-batching.adoc b/docs/task-batching.adoc index 7607b69e..2c5011da 100644 --- a/docs/task-batching.adoc +++ b/docs/task-batching.adoc @@ -55,10 +55,10 @@ public class HelloTask { Create a class that inherits `BatchingProcessor`, as shown in the following example `InsertHelloTask`: [source,java] -.InsertHelloTask.java +.InsertHelloTaskBatchingProcessor.java ---- -public class InsertHelloTask extends BatchingProcessor { - public InsertHelloTask(long lingerMillis, int capacity) { +public class InsertHelloTaskBatchingProcessor extends BatchingProcessor { + public InsertHelloTaskBatchingProcessor(long lingerMillis, int capacity) { super(lingerMillis, capacity); // <1> } @@ -66,8 +66,10 @@ public class InsertHelloTask extends BatchingProcessor { protected void processBatchingTasks(List> batchingTasks) { // <2> List helloTasks = batchingTasks.stream().map(BatchingTask::task).collect(Collectors.toList()); - // ... (Process helloTasks) - batchingTasks.forEach(batchingTask -> batchingTask.completion().complete()); // <3> + helloTasks.forEach( + task -> System.out.println("Processing task: " + task) // <3> (If it's real, insert tasks.) + ); + batchingTasks.forEach(batchingTask -> batchingTask.completion().complete()); } } ---- @@ -94,7 +96,7 @@ public class TaskBatchingMain { } private static BatchingProcessor createBatchingProcessor(long lingerMillis, int capacity) { - return new InsertHelloTask(lingerMillis, capacity); // <1> + return new InsertHelloTaskBatchingProcessor(lingerMillis, capacity); // <1> } } ---- From 475302932c231ee83b934d30d40843a505af9e53 Mon Sep 17 00:00:00 2001 From: RyotaYamagishi Date: Mon, 29 Jul 2024 19:26:29 +0900 Subject: [PATCH 09/10] Fix task batching doc's details --- docs/example/src/main/java/example/TaskBatchingMain.java | 2 +- docs/task-batching.adoc | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/example/src/main/java/example/TaskBatchingMain.java b/docs/example/src/main/java/example/TaskBatchingMain.java index d26a6b10..197faf80 100644 --- a/docs/example/src/main/java/example/TaskBatchingMain.java +++ b/docs/example/src/main/java/example/TaskBatchingMain.java @@ -73,6 +73,6 @@ public static void main(String[] args) throws Exception { } private static BatchingProcessor createBatchingProcessor(long lingerMillis, int capacity) { - return new InsertHelloTaskBatchingProcessor(lingerMillis, capacity); // <1> + return new InsertHelloTaskBatchingProcessor(lingerMillis, capacity); } } diff --git a/docs/task-batching.adoc b/docs/task-batching.adoc index 2c5011da..8a615eaa 100644 --- a/docs/task-batching.adoc +++ b/docs/task-batching.adoc @@ -52,7 +52,7 @@ public class HelloTask { // ... (Getter and Setter) } ---- -Create a class that inherits `BatchingProcessor`, as shown in the following example `InsertHelloTask`: +Create a class that inherits `BatchingProcessor`, as shown in the following example `InsertHelloTaskBatchingProcessor`: [source,java] .InsertHelloTaskBatchingProcessor.java @@ -73,7 +73,7 @@ public class InsertHelloTaskBatchingProcessor extends BatchingProcessor Date: Tue, 30 Jul 2024 18:11:41 +0900 Subject: [PATCH 10/10] Add CAUTION to task batching doc --- docs/task-batching.adoc | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/docs/task-batching.adoc b/docs/task-batching.adoc index 8a615eaa..514fa561 100644 --- a/docs/task-batching.adoc +++ b/docs/task-batching.adoc @@ -39,6 +39,15 @@ The above methods is not called automatically even when an error occurs in this This method runs in different thread from the `process(ProcessingContext, T)` thread. |=== +[CAUTION] +==== +* Batch-flush is done in `BatchingProcessor` 's scheduled executor thread. +* This means the parallelism of flushing has to be controlled by `ProcessorScope`, not only by `decaton.partition.concurrency` config. i.e.: + +** Parallelize flushing per partition: `ProcessorScope.PARTITION` +** Parallelize flushing per processor thread: `ProcessorScope.THREAD` +==== + Before getting started with the example, let's create a `HelloTask` domain object which can be used to simulate the scenario we described: [source,java]