Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ocadaruma committed Jul 19, 2024
1 parent e52c367 commit 7984c60
Show file tree
Hide file tree
Showing 28 changed files with 129 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.linecorp.decaton.client.PutTaskResult;
import com.linecorp.decaton.client.kafka.PrintableAsciiStringSerializer;
import com.linecorp.decaton.common.Serializer;
import com.linecorp.decaton.common.TaskMetadataUtil;
import com.linecorp.decaton.protocol.Decaton.TaskMetadataProto;

public class DecatonClientImpl<T> implements DecatonClient<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* under the License.
*/

package com.linecorp.decaton.common;
package com.linecorp.decaton.client.internal;

import java.io.UncheckedIOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import com.linecorp.decaton.common.TaskMetadataUtil;
import com.linecorp.decaton.client.internal.TaskMetadataUtil;
import com.linecorp.decaton.protobuf.ProtocolBuffersSerializer;
import com.linecorp.decaton.protocol.Decaton.TaskMetadataProto;
import com.linecorp.decaton.protocol.Sample.HelloTask;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.mockito.quality.Strictness;

import com.linecorp.decaton.client.DecatonClient.TaskMetadata;
import com.linecorp.decaton.common.TaskMetadataUtil;
import com.linecorp.decaton.protobuf.ProtocolBuffersSerializer;
import com.linecorp.decaton.protocol.Decaton.TaskMetadataProto;
import com.linecorp.decaton.protocol.Sample.HelloTask;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
Expand All @@ -36,6 +35,7 @@
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;

import com.linecorp.decaton.processor.runtime.ConsumedRecord;
import com.linecorp.decaton.processor.runtime.DecatonTask;
import com.linecorp.decaton.processor.runtime.ProcessorSubscription;
import com.linecorp.decaton.processor.runtime.ProcessorsBuilder;
Expand Down Expand Up @@ -78,7 +78,7 @@ private TestTaskExtractor(String topic, Deserializer<T> deserializer) {
}

@Override
public DecatonTask<T> extract(ConsumerRecord<byte[], byte[]> record) {
public DecatonTask<T> extract(ConsumedRecord record) {
final T value = deserializer.deserialize(topic, record.value());
final TaskMetadata metadata = TaskMetadata.builder().build();
return new DecatonTask<>(metadata, value, record.value());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;

import com.linecorp.decaton.processor.Completion.TimeoutChoice;
import com.linecorp.decaton.processor.runtime.ConsumedRecord;
import com.linecorp.decaton.processor.runtime.DecatonTask;
import com.linecorp.decaton.processor.runtime.DynamicProperty;
import com.linecorp.decaton.processor.runtime.ProcessorProperties;
Expand Down Expand Up @@ -108,7 +108,7 @@ public void doAssert() {

private static class TestTaskExtractor implements TaskExtractor<TestTask> {
@Override
public DecatonTask<TestTask> extract(ConsumerRecord<byte[], byte[]> record) {
public DecatonTask<TestTask> extract(ConsumedRecord record) {
TaskMetadata meta = TaskMetadata.builder().build();
TestTask task = new TestTask.TestTaskDeserializer().deserialize(record.value());
return new DecatonTask<>(meta, task, record.value());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

import com.linecorp.decaton.client.DecatonClient;
import com.linecorp.decaton.client.kafka.PrintableAsciiStringSerializer;
import com.linecorp.decaton.common.TaskMetadataUtil;
import com.linecorp.decaton.client.internal.TaskMetadataUtil;
import com.linecorp.decaton.processor.DecatonProcessor;
import com.linecorp.decaton.processor.runtime.ProcessorProperties;
import com.linecorp.decaton.processor.runtime.ProcessorSubscription;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ public LoggingContext(boolean enabled, String subscriptionId, TaskRequest reques
this.enabled = enabled;
if (enabled) {
MDC.put(METADATA_KEY, metadata.toString());
MDC.put(TASK_KEY, ByteArrays.toString(request.record().key()));
MDC.put(TASK_KEY, ByteArrays.toString(request.key()));
MDC.put(SUBSCRIPTION_ID_KEY, subscriptionId);
MDC.put(OFFSET_KEY, String.valueOf(request.record().offset()));
MDC.put(TOPIC_KEY, request.record().topic());
MDC.put(PARTITION_KEY, String.valueOf(request.record().partition()));
MDC.put(OFFSET_KEY, String.valueOf(request.recordOffset()));
MDC.put(TOPIC_KEY, request.topicPartition().topic());
MDC.put(PARTITION_KEY, String.valueOf(request.topicPartition().partition()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,33 @@
* under the License.
*/

dependencies {
api project(":protocol")
implementation "org.apache.kafka:kafka-clients:$kafkaVersion"
package com.linecorp.decaton.processor.runtime;

import org.apache.kafka.common.header.Headers;

import lombok.Builder;
import lombok.Value;
import lombok.experimental.Accessors;

/**
* Represents a single record consumed by Decaton and to be extracted as task
*/
@Value
@Builder
@Accessors(fluent = true)
public class ConsumedRecord {
/**
* Headers of the record
*/
Headers headers;

/**
* Key of the record
*/
byte[] key;

/**
* Value of the record
*/
byte[] value;
}
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ public class ProcessorProperties extends AbstractDecatonProperties {
* <p>
* <b>CAUTION!!! YOU MAY NEED TO SET THIS TO FALSE WHEN YOU UPGRADE FROM 8.0.1 OR EARLIER</b>
* <p>
* Please read <a href="https://github.com/line/decaton/releases/tag/v8.0.1">Decaton 9.0.0 Release Note</a> carefully.
* Please read <a href="https://github.com/line/decaton/releases/tag/v9.0.0">Decaton 9.0.0 Release Note</a> carefully.
* <p>
* Reloadable: yes
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import java.util.List;
import java.util.function.Supplier;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import com.linecorp.decaton.common.Deserializer;
import com.linecorp.decaton.processor.DecatonProcessor;
import com.linecorp.decaton.processor.runtime.internal.DecatonProcessorSupplierImpl;
Expand Down Expand Up @@ -128,25 +126,18 @@ private static class RetryTaskExtractor<T> implements TaskExtractor<T> {
private final TaskExtractor<T> innerExtractor;

@Override
public DecatonTask<T> extract(ConsumerRecord<byte[], byte[]> record) {
public DecatonTask<T> extract(ConsumedRecord record) {
// Retry tasks might be stored in retry-topic in DecatonTaskRequest format depending on
// decaton.task.metadata.as.header configuration.
// Hence, we need to extract the task with DefaultTaskExtractor to "unwrap" the task first,
// then extract the task with the given taskExtractor.
DecatonTask<byte[]> outerTask = outerExtractor.extract(record);
ConsumerRecord<byte[], byte[]> inner = new ConsumerRecord<>(
record.topic(),
record.partition(),
record.offset(),
record.timestamp(),
record.timestampType(),
record.serializedKeySize(),
outerTask.taskDataBytes().length,
record.key(),
outerTask.taskDataBytes(),
record.headers(),
record.leaderEpoch()
);
ConsumedRecord inner = ConsumedRecord
.builder()
.headers(record.headers())
.key(record.key())
.value(outerTask.taskDataBytes())
.build();
DecatonTask<T> extracted = innerExtractor.extract(inner);
return new DecatonTask<>(
// Use rawTask#metadata because retry count is stored in rawTask#metada not extracted#metadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,17 @@

package com.linecorp.decaton.processor.runtime;

import org.apache.kafka.clients.consumer.ConsumerRecord;

/**
* An interface for classes extracting {@link DecatonTask} from given record.
* @param <T> type of task.
*/
public interface TaskExtractor<T> {
/**
* Extract object of type {@link DecatonTask} from given bytes.
* @param record {@link ConsumerRecord} to extract task from.
* @param record {@link ConsumedRecord} to extract task from.
* @return object of type {@link DecatonTask}.
* @throws RuntimeException this method can throw arbitrary {@link RuntimeException} if given record is invalid.
* If the method throws an exception, the task will be discarded and processor continues to process subsequent tasks.
*/
DecatonTask<T> extract(ConsumerRecord<byte[], byte[]> record);
DecatonTask<T> extract(ConsumedRecord record);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

import com.linecorp.decaton.client.internal.DecatonTaskProducer;
import com.linecorp.decaton.client.PutTaskResult;
import com.linecorp.decaton.common.TaskMetadataUtil;
import com.linecorp.decaton.client.internal.TaskMetadataUtil;
import com.linecorp.decaton.processor.DecatonProcessor;
import com.linecorp.decaton.processor.ProcessingContext;
import com.linecorp.decaton.processor.TaskMetadata;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@

package com.linecorp.decaton.processor.runtime.internal;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import com.google.protobuf.InvalidProtocolBufferException;

import com.linecorp.decaton.common.Deserializer;
import com.linecorp.decaton.common.TaskMetadataUtil;
import com.linecorp.decaton.client.internal.TaskMetadataUtil;
import com.linecorp.decaton.processor.runtime.ConsumedRecord;
import com.linecorp.decaton.processor.runtime.DecatonTask;
import com.linecorp.decaton.processor.runtime.TaskExtractor;
import com.linecorp.decaton.processor.TaskMetadata;
Expand All @@ -35,7 +34,7 @@ public class DefaultTaskExtractor<T> implements TaskExtractor<T> {
private final Deserializer<T> taskDeserializer;

@Override
public DecatonTask<T> extract(ConsumerRecord<byte[], byte[]> record) {
public DecatonTask<T> extract(ConsumedRecord record) {
TaskMetadataProto headerMeta = TaskMetadataUtil.readFromHeader(record.headers());
if (headerMeta != null) {
byte[] taskDataBytes = record.value();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,9 @@ public void addRecord(ConsumerRecord<byte[], byte[]> record,
RecordTraceHandle traceHandle,
QuotaApplier quotaApplier) {
if (!quotaApplier.apply(record, offsetState, maybeRecordQuotaUsage(record.key()))) {
TaskRequest request = new TaskRequest(offsetState, record, traceHandle, maybeRecordQuotaUsage(record.key()));
TaskRequest request = new TaskRequest(
scope.topicPartition(), record.offset(), offsetState, record.key(),
record.headers(), traceHandle, record.value(), maybeRecordQuotaUsage(record.key()));
subPartitions.addTask(request);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.linecorp.decaton.processor.LoggingContext;
import com.linecorp.decaton.processor.ProcessingContext;
import com.linecorp.decaton.processor.metrics.Metrics.TaskMetrics;
import com.linecorp.decaton.processor.runtime.ConsumedRecord;
import com.linecorp.decaton.processor.runtime.DecatonTask;
import com.linecorp.decaton.processor.runtime.ProcessorProperties;
import com.linecorp.decaton.processor.runtime.TaskExtractor;
Expand Down Expand Up @@ -124,10 +125,16 @@ public CompletionStage<Void> scheduleThenProcess(TaskRequest request) throws Int
// visible for testing
DecatonTask<T> extract(TaskRequest request) {
final DecatonTask<T> extracted;
extracted = taskExtractor.extract(request.record());
extracted = taskExtractor.extract(
ConsumedRecord.builder()
.headers(request.headers())
.key(request.key())
.value(request.rawRequestBytes())
.build());
if (!validateTask(extracted)) {
throw new RuntimeException("Invalid task");
}
request.purgeRawRequestBytes();

return extracted;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,12 @@ public TaskMetadata metadata() {

@Override
public byte[] key() {
return request.record().key();
return request.key();
}

@Override
public Headers headers() {
return request.record().headers();
return request.headers();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,37 +16,50 @@

package com.linecorp.decaton.processor.runtime.internal;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;

import com.linecorp.decaton.processor.runtime.internal.PerKeyQuotaManager.QuotaUsage;
import com.linecorp.decaton.processor.tracing.TracingProvider.RecordTraceHandle;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.ToString;
import lombok.experimental.Accessors;

@ToString
@Getter
@Accessors(fluent = true)
@AllArgsConstructor
public class TaskRequest {
private final TopicPartition topicPartition;
private final long recordOffset;
private final OffsetState offsetState;
private final ConsumerRecord<byte[], byte[]> record;
@ToString.Exclude
private final byte[] key;
@ToString.Exclude
private final Headers headers;
@ToString.Exclude
private final RecordTraceHandle trace;
@ToString.Exclude
private byte[] rawRequestBytes;
@ToString.Exclude
private final QuotaUsage quotaUsage;

public String id() {
// TaskRequest object is held alive through associated ProcessingContext's lifetime, hence holding
// any value as its field makes memory occupation worse. Since this ID field is rarely used (typically
// when trace level logging is enabled), it is better to take short lived object allocation and cpu cost
// rather than building it once and cache as an object field.
return "topic=" + record.topic() + " partition=" + record.partition() +
" offset=" + record.offset();
return "topic=" + topicPartition.topic() + " partition=" + topicPartition.partition() +
" offset=" + recordOffset;
}

@Override
public String toString() {
return "TaskRequest(topicPartition=" + record.topic() + '-' + record.partition() +
", recordOffset=" + record.offset()
+ ", offsetState=" + offsetState() + ')';
/**
* This class will live until the task process has been completed.
* To lessen heap pressure, rawRequestBytes should be purged by calling this once the task is extracted.
*/
public void purgeRawRequestBytes() {
rawRequestBytes = null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public ThreadPoolSubPartitions(PartitionScope scope, Processors<?> processors) {

@Override
public void addTask(TaskRequest request) {
int threadId = subPartitioner.subPartitionFor(request.record().key());
int threadId = subPartitioner.subPartitionFor(request.key());
SubPartition subPartition = subPartitions[threadId];
if (subPartition == null) {
ThreadScope threadScope = new ThreadScope(scope, threadId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public VirtualThreadSubPartitions(PartitionScope scope, Processors<?> processors

@Override
public void addTask(TaskRequest request) {
int threadId = subPartitioner.subPartitionFor(request.record().key());
int threadId = subPartitioner.subPartitionFor(request.key());
units.computeIfAbsent(threadId, key -> {
ExecutorService executor = Executors.newSingleThreadExecutor(
Utils.namedVirtualThreadFactory("PartitionProcessorVThread-" + scope));
Expand Down
Loading

0 comments on commit 7984c60

Please sign in to comment.