diff --git a/.idea/copyright/LINE_OSS.xml b/.idea/copyright/LINE_OSS.xml deleted file mode 100644 index 23464434..00000000 --- a/.idea/copyright/LINE_OSS.xml +++ /dev/null @@ -1,7 +0,0 @@ - - - - diff --git a/.idea/copyright/LY_OSS.xml b/.idea/copyright/LY_OSS.xml new file mode 100644 index 00000000..6451e0fd --- /dev/null +++ b/.idea/copyright/LY_OSS.xml @@ -0,0 +1,7 @@ + + + + diff --git a/.idea/copyright/profiles_settings.xml b/.idea/copyright/profiles_settings.xml index ae76eed1..e8ccc104 100644 --- a/.idea/copyright/profiles_settings.xml +++ b/.idea/copyright/profiles_settings.xml @@ -1,3 +1,7 @@ - - + + + + + + \ No newline at end of file diff --git a/benchmark/src/main/java/com/linecorp/decaton/benchmark/DecatonRunner.java b/benchmark/src/main/java/com/linecorp/decaton/benchmark/DecatonRunner.java index 4f8ffbd0..cd5090ed 100644 --- a/benchmark/src/main/java/com/linecorp/decaton/benchmark/DecatonRunner.java +++ b/benchmark/src/main/java/com/linecorp/decaton/benchmark/DecatonRunner.java @@ -118,11 +118,11 @@ public String get(String key) { .subPartitionRuntime(subPartitionRuntime) .processorsBuilder( ProcessorsBuilder.consuming(config.topic(), - (TaskExtractor) bytes -> { + (TaskExtractor) record -> { Task task = config.taskDeserializer() - .deserialize(config.topic(), bytes); + .deserialize(config.topic(), record.value()); return new DecatonTask<>( - TaskMetadata.builder().build(), task, bytes); + TaskMetadata.builder().build(), task, record.value()); }) .thenProcess( (ctx, task) -> recording.process(task))) diff --git a/client/src/main/java/com/linecorp/decaton/client/DecatonClientBuilder.java b/client/src/main/java/com/linecorp/decaton/client/DecatonClientBuilder.java index 699d58d4..1b23d6ff 100644 --- a/client/src/main/java/com/linecorp/decaton/client/DecatonClientBuilder.java +++ b/client/src/main/java/com/linecorp/decaton/client/DecatonClientBuilder.java @@ -27,9 +27,7 @@ import org.apache.kafka.common.serialization.ByteArraySerializer; import com.linecorp.decaton.client.internal.DecatonClientImpl; -import com.linecorp.decaton.client.kafka.ProtocolBuffersKafkaSerializer; import com.linecorp.decaton.common.Serializer; -import com.linecorp.decaton.protocol.Decaton.DecatonTaskRequest; import lombok.AccessLevel; import lombok.Setter; @@ -54,10 +52,10 @@ public class DecatonClientBuilder { public static class DefaultKafkaProducerSupplier implements KafkaProducerSupplier { @Override - public Producer getProducer(Properties config) { + public Producer getProducer(Properties config) { return new KafkaProducer<>(config, new ByteArraySerializer(), - new ProtocolBuffersKafkaSerializer<>()); + new ByteArraySerializer()); } } diff --git a/client/src/main/java/com/linecorp/decaton/client/KafkaProducerSupplier.java b/client/src/main/java/com/linecorp/decaton/client/KafkaProducerSupplier.java index 8d48efd2..ebfcf752 100644 --- a/client/src/main/java/com/linecorp/decaton/client/KafkaProducerSupplier.java +++ b/client/src/main/java/com/linecorp/decaton/client/KafkaProducerSupplier.java @@ -21,7 +21,6 @@ import org.apache.kafka.clients.producer.Producer; import com.linecorp.decaton.client.internal.DecatonClientImpl; -import com.linecorp.decaton.protocol.Decaton.DecatonTaskRequest; /** * An interface to specify a custom instantiation function for {@link Producer}. @@ -39,5 +38,5 @@ public interface KafkaProducerSupplier { * @return an Kafka producer instance which implements {@link Producer}. The returned instance will be * closed along with {@link DecatonClient#close} being called. */ - Producer getProducer(Properties config); + Producer getProducer(Properties config); } diff --git a/client/src/main/java/com/linecorp/decaton/client/internal/DecatonClientImpl.java b/client/src/main/java/com/linecorp/decaton/client/internal/DecatonClientImpl.java index 1cb4aeaf..f5d53300 100644 --- a/client/src/main/java/com/linecorp/decaton/client/internal/DecatonClientImpl.java +++ b/client/src/main/java/com/linecorp/decaton/client/internal/DecatonClientImpl.java @@ -21,14 +21,13 @@ import java.util.function.Consumer; import java.util.function.Supplier; -import com.google.protobuf.ByteString; +import org.apache.kafka.clients.producer.ProducerRecord; import com.linecorp.decaton.client.DecatonClient; import com.linecorp.decaton.client.KafkaProducerSupplier; import com.linecorp.decaton.client.PutTaskResult; import com.linecorp.decaton.client.kafka.PrintableAsciiStringSerializer; import com.linecorp.decaton.common.Serializer; -import com.linecorp.decaton.protocol.Decaton.DecatonTaskRequest; import com.linecorp.decaton.protocol.Decaton.TaskMetadataProto; public class DecatonClientImpl implements DecatonClient { @@ -52,7 +51,7 @@ public class DecatonClientImpl implements DecatonClient { this.serializer = serializer; this.applicationId = applicationId; this.instanceId = instanceId; - producer = new DecatonTaskProducer(topic, producerConfig, producerSupplier); + producer = new DecatonTaskProducer(producerConfig, producerSupplier); this.timestampSupplier = timestampSupplier; } @@ -109,13 +108,11 @@ private CompletableFuture put(String key, T task, TaskMetadataPro byte[] serializedKey = keySerializer.serialize(topic, key); byte[] serializedTask = serializer.serialize(task); - DecatonTaskRequest request = - DecatonTaskRequest.newBuilder() - .setMetadata(taskMetadataProto) - .setSerializedTask(ByteString.copyFrom(serializedTask)) - .build(); + ProducerRecord record = new ProducerRecord<>( + topic, partition, serializedKey, serializedTask); + TaskMetadataUtil.writeAsHeader(taskMetadataProto, record.headers()); - return producer.sendRequest(serializedKey, request, partition); + return producer.sendRequest(record); } private TaskMetadataProto convertToTaskMetadataProto(TaskMetadata overrideTaskMetadata) { diff --git a/client/src/main/java/com/linecorp/decaton/client/internal/DecatonTaskProducer.java b/client/src/main/java/com/linecorp/decaton/client/internal/DecatonTaskProducer.java index 3fb59b93..39727b1b 100644 --- a/client/src/main/java/com/linecorp/decaton/client/internal/DecatonTaskProducer.java +++ b/client/src/main/java/com/linecorp/decaton/client/internal/DecatonTaskProducer.java @@ -28,10 +28,9 @@ import com.linecorp.decaton.client.DecatonClient; import com.linecorp.decaton.client.KafkaProducerSupplier; import com.linecorp.decaton.client.PutTaskResult; -import com.linecorp.decaton.protocol.Decaton.DecatonTaskRequest; /** - * A raw interface to put a built {@link DecatonTaskRequest} directly. + * A raw interface to put decaton tasks. * This interface isn't expected to be used by applications unless it's really necessary. * Use {@link DecatonClient} to put task into a Decaton topic instead. */ @@ -44,8 +43,7 @@ public class DecatonTaskProducer implements AutoCloseable { presetProducerConfig.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"); } - private final Producer producer; - private final String topic; + private final Producer producer; private static Properties completeProducerConfig(Properties producerConfig) { final Properties result = new Properties(); @@ -54,20 +52,13 @@ private static Properties completeProducerConfig(Properties producerConfig) { return result; } - public DecatonTaskProducer(String topic, Properties producerConfig, + public DecatonTaskProducer(Properties producerConfig, KafkaProducerSupplier producerSupplier) { Properties completeProducerConfig = completeProducerConfig(producerConfig); producer = producerSupplier.getProducer(completeProducerConfig); - this.topic = topic; } - public CompletableFuture sendRequest(byte[] key, DecatonTaskRequest request, - Integer partition) { - ProducerRecord record = new ProducerRecord<>(topic, partition, key, request); - return sendRequest(record); - } - - private CompletableFuture sendRequest(ProducerRecord record) { + public CompletableFuture sendRequest(ProducerRecord record) { CompletableFuture result = new CompletableFuture<>(); producer.send(record, (metadata, exception) -> { if (exception == null) { diff --git a/client/src/main/java/com/linecorp/decaton/client/internal/TaskMetadataUtil.java b/client/src/main/java/com/linecorp/decaton/client/internal/TaskMetadataUtil.java new file mode 100644 index 00000000..dc1eae7d --- /dev/null +++ b/client/src/main/java/com/linecorp/decaton/client/internal/TaskMetadataUtil.java @@ -0,0 +1,58 @@ +/* + * Copyright 2024 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.decaton.client.internal; + +import java.io.UncheckedIOException; + +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; + +import com.google.protobuf.InvalidProtocolBufferException; + +import com.linecorp.decaton.protocol.Decaton.TaskMetadataProto; + +public class TaskMetadataUtil { + private static final String METADATA_HEADER_KEY = "dt_meta"; + + /** + * Write metadata to {@link Headers} + * @param metadata task metadata to be written + * @param headers record header to write to + */ + public static void writeAsHeader(TaskMetadataProto metadata, Headers headers) { + headers.remove(METADATA_HEADER_KEY); + headers.add(METADATA_HEADER_KEY, metadata.toByteArray()); + } + + /** + * Read metadata from given {@link Headers} + * @param headers record header to read from + * @return parsed {@link TaskMetadataProto} or null if header is absent + * @throws IllegalStateException if metadata bytes is invalid + */ + public static TaskMetadataProto readFromHeader(Headers headers) { + Header header = headers.lastHeader(METADATA_HEADER_KEY); + if (header == null) { + return null; + } + try { + return TaskMetadataProto.parseFrom(header.value()); + } catch (InvalidProtocolBufferException e) { + throw new UncheckedIOException(e); + } + } +} diff --git a/client/src/main/java/com/linecorp/decaton/client/kafka/ProtocolBuffersKafkaSerializer.java b/client/src/main/java/com/linecorp/decaton/client/kafka/ProtocolBuffersKafkaSerializer.java deleted file mode 100644 index 6c7650f2..00000000 --- a/client/src/main/java/com/linecorp/decaton/client/kafka/ProtocolBuffersKafkaSerializer.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright 2020 LINE Corporation - * - * LINE Corporation licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package com.linecorp.decaton.client.kafka; - -import java.util.Map; - -import org.apache.kafka.common.serialization.Serializer; - -import com.google.protobuf.MessageLite; - -public class ProtocolBuffersKafkaSerializer implements Serializer { - @Override - public void configure(Map config, boolean isKey) { - // noop - } - - @Override - public byte[] serialize(String topic, T data) { - if (data == null) { - return null; - } - return data.toByteArray(); - } - - @Override - public void close() { - // noop - } -} diff --git a/client/src/test/java/com/linecorp/decaton/client/DecatonClientBuilderTest.java b/client/src/test/java/com/linecorp/decaton/client/DecatonClientBuilderTest.java index d44a0838..8db0a6f6 100644 --- a/client/src/test/java/com/linecorp/decaton/client/DecatonClientBuilderTest.java +++ b/client/src/test/java/com/linecorp/decaton/client/DecatonClientBuilderTest.java @@ -33,20 +33,20 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import com.linecorp.decaton.client.internal.TaskMetadataUtil; import com.linecorp.decaton.protobuf.ProtocolBuffersSerializer; -import com.linecorp.decaton.protocol.Decaton.DecatonTaskRequest; import com.linecorp.decaton.protocol.Decaton.TaskMetadataProto; import com.linecorp.decaton.protocol.Sample.HelloTask; @ExtendWith(MockitoExtension.class) public class DecatonClientBuilderTest { @Mock - private Producer producer; + private Producer producer; @Captor - private ArgumentCaptor> recordCaptor; + private ArgumentCaptor> recordCaptor; - private ProducerRecord doProduce(DecatonClient dclient) { + private ProducerRecord doProduce(DecatonClient dclient) { dclient.put(null, HelloTask.getDefaultInstance()); verify(producer, times(1)).send(recordCaptor.capture(), any(Callback.class)); return recordCaptor.getValue(); @@ -66,10 +66,10 @@ public void testBuild() { .producerSupplier(config -> producer) .build(); - ProducerRecord record = doProduce(dclient); + ProducerRecord record = doProduce(dclient); assertEquals(topic, record.topic()); - TaskMetadataProto metadata = record.value().getMetadata(); + TaskMetadataProto metadata = TaskMetadataUtil.readFromHeader(record.headers()); assertEquals(applicationId, metadata.getSourceApplicationId()); assertEquals(instanceId, metadata.getSourceInstanceId()); } diff --git a/client/src/test/java/com/linecorp/decaton/client/internal/DecatonClientImplTest.java b/client/src/test/java/com/linecorp/decaton/client/internal/DecatonClientImplTest.java index 89174a10..56aa1103 100644 --- a/client/src/test/java/com/linecorp/decaton/client/internal/DecatonClientImplTest.java +++ b/client/src/test/java/com/linecorp/decaton/client/internal/DecatonClientImplTest.java @@ -43,7 +43,7 @@ import com.linecorp.decaton.client.DecatonClient.TaskMetadata; import com.linecorp.decaton.protobuf.ProtocolBuffersSerializer; -import com.linecorp.decaton.protocol.Decaton.DecatonTaskRequest; +import com.linecorp.decaton.protocol.Decaton.TaskMetadataProto; import com.linecorp.decaton.protocol.Sample.HelloTask; @ExtendWith(MockitoExtension.class) @@ -54,7 +54,7 @@ public class DecatonClientImplTest { private static final String INSTANCE_ID = "instance"; @Mock - private Producer producer; + private Producer producer; @Mock private Supplier timestampSupplier; @@ -62,7 +62,7 @@ public class DecatonClientImplTest { private DecatonClientImpl client; @Captor - private ArgumentCaptor> captor; + private ArgumentCaptor> captor; @BeforeEach public void setUp() { @@ -78,9 +78,9 @@ public void testTimestampFieldSetInternally() { client.put("key", HelloTask.getDefaultInstance()); verify(producer, times(1)).send(captor.capture(), any(Callback.class)); - ProducerRecord record = captor.getValue(); + ProducerRecord record = captor.getValue(); assertNull(record.timestamp()); - assertEquals(1234, record.value().getMetadata().getTimestampMillis()); + assertEquals(1234, TaskMetadataUtil.readFromHeader(record.headers()).getTimestampMillis()); } @Test @@ -90,9 +90,9 @@ public void testTimestampFieldSetInternallyWithCallback() { client.put("key", HelloTask.getDefaultInstance(), ignored -> {}); verify(producer, times(1)).send(captor.capture(), any(Callback.class)); - ProducerRecord record = captor.getValue(); + ProducerRecord record = captor.getValue(); assertNull(record.timestamp()); - assertEquals(1234, record.value().getMetadata().getTimestampMillis()); + assertEquals(1234, TaskMetadataUtil.readFromHeader(record.headers()).getTimestampMillis()); } @Test @@ -102,9 +102,9 @@ public void testTimestampFieldSetExternally() { client.put("key", HelloTask.getDefaultInstance(), 5678L); verify(producer, times(1)).send(captor.capture(), any(Callback.class)); - ProducerRecord record = captor.getValue(); + ProducerRecord record = captor.getValue(); assertNull(record.timestamp()); - assertEquals(5678L, record.value().getMetadata().getTimestampMillis()); + assertEquals(5678, TaskMetadataUtil.readFromHeader(record.headers()).getTimestampMillis()); } @Test @@ -115,9 +115,9 @@ public void testTimestampFieldSetExternallyWithCallback() { }); verify(producer, times(1)).send(captor.capture(), any(Callback.class)); - ProducerRecord record = captor.getValue(); + ProducerRecord record = captor.getValue(); assertNull(record.timestamp()); - assertEquals(5678, record.value().getMetadata().getTimestampMillis()); + assertEquals(5678, TaskMetadataUtil.readFromHeader(record.headers()).getTimestampMillis()); } @Test @@ -150,10 +150,11 @@ public void testWithEmptyTaskMetaDataSetExternally() { client.put("key", HelloTask.getDefaultInstance(), TaskMetadata.builder().build()); verify(producer, times(1)).send(captor.capture(), any(Callback.class)); - ProducerRecord record = captor.getValue(); - assertTrue(record.value().getMetadata().getTimestampMillis() > 0); - assertNotNull(record.value().getMetadata().getSourceApplicationId()); - assertNotNull(record.value().getMetadata().getSourceInstanceId()); + ProducerRecord record = captor.getValue(); + TaskMetadataProto metadata = TaskMetadataUtil.readFromHeader(record.headers()); + assertTrue(metadata.getTimestampMillis() > 0); + assertNotNull(metadata.getSourceApplicationId()); + assertNotNull(metadata.getSourceInstanceId()); } @Test @@ -167,12 +168,13 @@ public void testSpecifyingPartition() { .build(), 4); verify(producer, times(1)).send(captor.capture(), any(Callback.class)); - ProducerRecord record = captor.getValue(); + ProducerRecord record = captor.getValue(); assertNotNull(record.partition()); assertEquals(4, record.partition().intValue()); assertNull(record.timestamp()); - assertEquals(5678L, record.value().getMetadata().getTimestampMillis()); - assertEquals(6912L, record.value().getMetadata().getScheduledTimeMillis()); + TaskMetadataProto metadata = TaskMetadataUtil.readFromHeader(record.headers()); + assertEquals(5678L, metadata.getTimestampMillis()); + assertEquals(6912L, metadata.getScheduledTimeMillis()); } @Test @@ -182,18 +184,19 @@ public void testSpecifyingPartitionWithoutMetadata() { client.put("key", HelloTask.getDefaultInstance(), null, 4); verify(producer, times(1)).send(captor.capture(), any(Callback.class)); - ProducerRecord record = captor.getValue(); + ProducerRecord record = captor.getValue(); assertNotNull(record.partition()); assertEquals(4, record.partition().intValue()); } private void verifyAndAssertTaskMetadata(long timestamp, long scheduledTime) { verify(producer, times(1)).send(captor.capture(), any(Callback.class)); - ProducerRecord record = captor.getValue(); + ProducerRecord record = captor.getValue(); assertNull(record.timestamp()); - assertEquals(timestamp, record.value().getMetadata().getTimestampMillis()); - assertEquals(scheduledTime, record.value().getMetadata().getScheduledTimeMillis()); - assertNotNull(record.value().getMetadata().getSourceApplicationId()); - assertNotNull(record.value().getMetadata().getSourceInstanceId()); + TaskMetadataProto metadata = TaskMetadataUtil.readFromHeader(record.headers()); + assertEquals(timestamp, metadata.getTimestampMillis()); + assertEquals(scheduledTime, metadata.getScheduledTimeMillis()); + assertNotNull(metadata.getSourceApplicationId()); + assertNotNull(metadata.getSourceInstanceId()); } } diff --git a/processor/src/it/java/com/linecorp/decaton/processor/ArbitraryTopicTypeTest.java b/processor/src/it/java/com/linecorp/decaton/processor/ArbitraryTopicTypeTest.java index ba4594dc..7ee960f5 100644 --- a/processor/src/it/java/com/linecorp/decaton/processor/ArbitraryTopicTypeTest.java +++ b/processor/src/it/java/com/linecorp/decaton/processor/ArbitraryTopicTypeTest.java @@ -35,6 +35,7 @@ import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.extension.RegisterExtension; +import com.linecorp.decaton.processor.runtime.ConsumedRecord; import com.linecorp.decaton.processor.runtime.DecatonTask; import com.linecorp.decaton.processor.runtime.ProcessorSubscription; import com.linecorp.decaton.processor.runtime.ProcessorsBuilder; @@ -77,10 +78,10 @@ private TestTaskExtractor(String topic, Deserializer deserializer) { } @Override - public DecatonTask extract(byte[] bytes) { - final T value = deserializer.deserialize(topic, bytes); + public DecatonTask extract(ConsumedRecord record) { + final T value = deserializer.deserialize(topic, record.value()); final TaskMetadata metadata = TaskMetadata.builder().build(); - return new DecatonTask<>(metadata, value, bytes); + return new DecatonTask<>(metadata, value, record.value()); } } diff --git a/processor/src/it/java/com/linecorp/decaton/processor/RetryQueueingTest.java b/processor/src/it/java/com/linecorp/decaton/processor/RetryQueueingTest.java index ac77f135..06bd2dca 100644 --- a/processor/src/it/java/com/linecorp/decaton/processor/RetryQueueingTest.java +++ b/processor/src/it/java/com/linecorp/decaton/processor/RetryQueueingTest.java @@ -16,15 +16,15 @@ package com.linecorp.decaton.processor; -import java.io.IOException; -import java.io.UncheckedIOException; import java.time.Duration; import java.util.Properties; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -33,13 +33,14 @@ import org.junit.jupiter.api.extension.RegisterExtension; import com.linecorp.decaton.processor.Completion.TimeoutChoice; +import com.linecorp.decaton.processor.runtime.ConsumedRecord; import com.linecorp.decaton.processor.runtime.DecatonTask; +import com.linecorp.decaton.processor.runtime.DynamicProperty; import com.linecorp.decaton.processor.runtime.ProcessorProperties; import com.linecorp.decaton.processor.runtime.Property; import com.linecorp.decaton.processor.runtime.RetryConfig; import com.linecorp.decaton.processor.runtime.StaticPropertySupplier; import com.linecorp.decaton.processor.runtime.TaskExtractor; -import com.linecorp.decaton.protocol.Decaton.DecatonTaskRequest; import com.linecorp.decaton.testing.KafkaClusterExtension; import com.linecorp.decaton.testing.TestUtils; import com.linecorp.decaton.testing.processor.ProcessedRecord; @@ -107,16 +108,10 @@ public void doAssert() { private static class TestTaskExtractor implements TaskExtractor { @Override - public DecatonTask extract(byte[] bytes) { - try { - TaskMetadata meta = TaskMetadata.builder().build(); - DecatonTaskRequest request = DecatonTaskRequest.parseFrom(bytes); - TestTask task = new TestTask.TestTaskDeserializer().deserialize( - request.getSerializedTask().toByteArray()); - return new DecatonTask<>(meta, task, bytes); - } catch (IOException e) { - throw new UncheckedIOException(e); - } + public DecatonTask extract(ConsumedRecord record) { + TaskMetadata meta = TaskMetadata.builder().build(); + TestTask task = new TestTask.TestTaskDeserializer().deserialize(record.value()); + return new DecatonTask<>(meta, task, record.value()); } } @@ -259,4 +254,45 @@ public void testRetryQueueingFromCompletionTimeoutCallback() throws Exception { .build() .run(); } + + @Test + @Timeout(60) + public void testRetryQueueingMigrateToHeader() throws Exception { + DynamicProperty metadataAsHeader = + new DynamicProperty<>(ProcessorProperties.CONFIG_TASK_METADATA_AS_HEADER); + metadataAsHeader.set(false); + + AtomicInteger processCount = new AtomicInteger(0); + CountDownLatch migrationLatch = new CountDownLatch(1); + ProcessorTestSuite + .builder(rule) + .numTasks(100) + .propertySupplier(StaticPropertySupplier.of(metadataAsHeader)) + .produceTasksWithHeaderMetadata(false) + .configureProcessorsBuilder(builder -> builder.thenProcess((ctx, task) -> { + if (ctx.metadata().retryCount() == 0) { + int cnt = processCount.incrementAndGet(); + // Enable header-mode after 50 tasks are processed + if (cnt < 50) { + ctx.retry(); + } else if (cnt == 50) { + metadataAsHeader.set(true); + migrationLatch.countDown(); + ctx.retry(); + } else { + migrationLatch.await(); + ctx.retry(); + } + } + })) + .retryConfig(RetryConfig.builder() + .retryTopic(retryTopic) + .backoff(Duration.ofMillis(10)) + .build()) + .excludeSemantics(GuaranteeType.PROCESS_ORDERING, + GuaranteeType.SERIAL_PROCESSING) + .customSemantics(new ProcessRetriedTask()) + .build() + .run(); + } } diff --git a/processor/src/it/java/com/linecorp/decaton/processor/metrics/MetricsTest.java b/processor/src/it/java/com/linecorp/decaton/processor/metrics/MetricsTest.java index 3ca3a4f8..df433e68 100644 --- a/processor/src/it/java/com/linecorp/decaton/processor/metrics/MetricsTest.java +++ b/processor/src/it/java/com/linecorp/decaton/processor/metrics/MetricsTest.java @@ -29,6 +29,7 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.ByteArraySerializer; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -37,7 +38,7 @@ import com.linecorp.decaton.client.DecatonClient; import com.linecorp.decaton.client.kafka.PrintableAsciiStringSerializer; -import com.linecorp.decaton.client.kafka.ProtocolBuffersKafkaSerializer; +import com.linecorp.decaton.client.internal.TaskMetadataUtil; import com.linecorp.decaton.processor.DecatonProcessor; import com.linecorp.decaton.processor.runtime.ProcessorProperties; import com.linecorp.decaton.processor.runtime.ProcessorSubscription; @@ -45,7 +46,7 @@ import com.linecorp.decaton.processor.runtime.Property; import com.linecorp.decaton.processor.runtime.StaticPropertySupplier; import com.linecorp.decaton.protobuf.ProtocolBuffersDeserializer; -import com.linecorp.decaton.protocol.Decaton.DecatonTaskRequest; +import com.linecorp.decaton.protocol.Decaton.TaskMetadataProto; import com.linecorp.decaton.protocol.Sample.HelloTask; import com.linecorp.decaton.testing.KafkaClusterExtension; import com.linecorp.decaton.testing.TestUtils; @@ -120,19 +121,15 @@ public void testDetectStuckPartitions() throws Exception { .addProperties(StaticPropertySupplier.of( Property.ofStatic(ProcessorProperties.CONFIG_PARTITION_CONCURRENCY, 1), Property.ofStatic(ProcessorProperties.CONFIG_MAX_PENDING_RECORDS, 1)))); - Producer producer = + Producer producer = new KafkaProducer<>(props, new PrintableAsciiStringSerializer(), - new ProtocolBuffersKafkaSerializer<>())) { + new ByteArraySerializer())) { - DecatonTaskRequest req = DecatonTaskRequest.newBuilder() - .setSerializedTask( - HelloTask.getDefaultInstance().toByteString()) - .build(); for (int i = 0; i < PARTITIONS; i++) { // What we need here is just simple round-robin but it's not possible with null keys anymore // since https://cwiki.apache.org/confluence/display/KAFKA/KIP-480%3A+Sticky+Partitioner - producer.send(new ProducerRecord<>(topicName, i, null, req)); + producer.send(new ProducerRecord<>(topicName, i, null, HelloTask.getDefaultInstance().toByteArray())); } processLatch.await(); @@ -230,19 +227,17 @@ public void testDeferredCompletionLeak() throws Exception { .addProperties(StaticPropertySupplier.of( Property.ofStatic(ProcessorProperties.CONFIG_PARTITION_CONCURRENCY, 2), Property.ofStatic(ProcessorProperties.CONFIG_MAX_PENDING_RECORDS, 3)))); - Producer producer = + Producer producer = new KafkaProducer<>(props, new PrintableAsciiStringSerializer(), - new ProtocolBuffersKafkaSerializer<>())) { + new ByteArraySerializer())) { for (int i = 0; i < count; i++) { - DecatonTaskRequest req = DecatonTaskRequest.newBuilder() - .setSerializedTask( - HelloTask.newBuilder() - .setAge(i) - .build().toByteString()) - .build(); // All requests will be sent on partition 0 for simplicity - producer.send(new ProducerRecord<>(topicName, 0, null, req)); + ProducerRecord record = new ProducerRecord<>( + topicName, 0, null, HelloTask.newBuilder().setAge(i).build().toByteArray()); + // simulate produces from DecatonClient + TaskMetadataUtil.writeAsHeader(TaskMetadataProto.getDefaultInstance(), record.headers()); + producer.send(record); } processLatch.await(); diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ConsumedRecord.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ConsumedRecord.java new file mode 100644 index 00000000..590386b3 --- /dev/null +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ConsumedRecord.java @@ -0,0 +1,46 @@ +/* + * Copyright 2024 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.decaton.processor.runtime; + +import org.apache.kafka.common.header.Headers; + +import lombok.Builder; +import lombok.Value; +import lombok.experimental.Accessors; + +/** + * Represents a single record consumed by Decaton and to be extracted as task + */ +@Value +@Builder +@Accessors(fluent = true) +public class ConsumedRecord { + /** + * Headers of the record + */ + Headers headers; + + /** + * Key of the record + */ + byte[] key; + + /** + * Value of the record + */ + byte[] value; +} diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/DecatonTask.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/DecatonTask.java index d34bd847..2769b76a 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/DecatonTask.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/DecatonTask.java @@ -16,6 +16,8 @@ package com.linecorp.decaton.processor.runtime; +import org.apache.kafka.clients.consumer.ConsumerRecord; + import com.linecorp.decaton.processor.TaskMetadata; import lombok.Value; @@ -62,7 +64,7 @@ public class DecatonTask { /** * Holds serialized task bytes. *

- * This field must be exactly same as the bytes passed to {@link TaskExtractor#extract}. + * This field must be exactly same as the {@link ConsumerRecord#value()} bytes passed to {@link TaskExtractor#extract}. *

*/ byte[] taskDataBytes; diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorProperties.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorProperties.java index a41571a4..b71d047c 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorProperties.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorProperties.java @@ -30,6 +30,7 @@ import com.linecorp.decaton.processor.runtime.internal.AbstractDecatonProperties; import com.linecorp.decaton.processor.runtime.internal.OutOfOrderCommitControl; import com.linecorp.decaton.processor.runtime.internal.RateLimiter; +import com.linecorp.decaton.protocol.internal.DecatonInternal.DecatonTaskRequest; /** * Collection of properties that can be configured to adjust {@link DecatonProcessor}'s behavior. @@ -224,6 +225,20 @@ public class ProcessorProperties extends AbstractDecatonProperties { PropertyDefinition.define("decaton.processor.threads.termination.timeout.ms", Long.class, Long.MAX_VALUE, v -> v instanceof Long && (Long) v >= 0); + /** + * Controls whether to produce retry tasks with task metadata as headers, instead of as deprecated + * {@link DecatonTaskRequest} format. + *

+ * CAUTION!!! YOU MAY NEED TO SET THIS TO FALSE WHEN YOU UPGRADE FROM 8.0.1 OR EARLIER + *

+ * Please read Decaton 9.0.0 Release Note carefully. + *

+ * Reloadable: yes + */ + public static final PropertyDefinition CONFIG_TASK_METADATA_AS_HEADER = + PropertyDefinition.define("decaton.task.metadata.as.header", Boolean.class, true, + v -> v instanceof Boolean); + public static final List> PROPERTY_DEFINITIONS = Collections.unmodifiableList(Arrays.asList( CONFIG_IGNORE_KEYS, @@ -237,7 +252,8 @@ public class ProcessorProperties extends AbstractDecatonProperties { CONFIG_BIND_CLIENT_METRICS, CONFIG_DEFERRED_COMPLETE_TIMEOUT_MS, CONFIG_PROCESSOR_THREADS_TERMINATION_TIMEOUT_MS, - CONFIG_PER_KEY_QUOTA_PROCESSING_RATE)); + CONFIG_PER_KEY_QUOTA_PROCESSING_RATE, + CONFIG_TASK_METADATA_AS_HEADER)); /** * Find and return a {@link PropertyDefinition} from its name. diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java index 09f1a17d..b519ac63 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java @@ -27,6 +27,7 @@ import com.linecorp.decaton.processor.runtime.internal.Processors; import lombok.Getter; +import lombok.RequiredArgsConstructor; import lombok.experimental.Accessors; /** @@ -65,29 +66,14 @@ public static ProcessorsBuilder consuming(String topic, Deserializer d /** * Create new {@link ProcessorsBuilder} that consumes message from topic expecting tasks of type - * which can be parsed by valueParser. + * which can be parsed by taskExtractor. * @param topic the name of topic to consume. * @param taskExtractor the extractor to extract task of type {@link T} from message bytes. * @param the type of instantiated tasks. * @return an instance of {@link ProcessorsBuilder}. */ public static ProcessorsBuilder consuming(String topic, TaskExtractor taskExtractor) { - DefaultTaskExtractor outerExtractor = new DefaultTaskExtractor<>(bytes -> bytes); - TaskExtractor retryTaskExtractor = bytes -> { - // Retry tasks are serialized as PB DecatonTaskRequest. - // First, deserialize PB from raw bytes. - DecatonTask wrappedTask = outerExtractor.extract(bytes); - - // Original raw task bytes is stored in DecatonTaskRequest#serializedTask. - // Extract DecatonTask from DecatonTaskRequest#serializedTask using given taskExtractor. - DecatonTask task = taskExtractor.extract(wrappedTask.taskData()); - - // Instantiate DecatonTask. - // Use wrappedTask#metadata because retry count is stored in wrappedTask#metada not task#metadata - return new DecatonTask<>(wrappedTask.metadata(), task.taskData(), task.taskDataBytes()); - }; - - return new ProcessorsBuilder<>(topic, taskExtractor, retryTaskExtractor); + return new ProcessorsBuilder<>(topic, taskExtractor, new RetryTaskExtractor<>(taskExtractor)); } /** @@ -133,4 +119,31 @@ public ProcessorsBuilder thenProcess(DecatonProcessor processor) { Processors build(DecatonProcessorSupplier retryProcessorSupplier) { return new Processors<>(suppliers, retryProcessorSupplier, taskExtractor, retryTaskExtractor); } + + @RequiredArgsConstructor + private static class RetryTaskExtractor implements TaskExtractor { + private final DefaultTaskExtractor outerExtractor = new DefaultTaskExtractor<>(bytes -> bytes); + private final TaskExtractor innerExtractor; + + @Override + public DecatonTask extract(ConsumedRecord record) { + // Retry tasks might be stored in retry-topic in DecatonTaskRequest format depending on + // decaton.task.metadata.as.header configuration. + // Hence, we need to extract the task with DefaultTaskExtractor to "unwrap" the task first, + // then extract the task with the given taskExtractor. + DecatonTask outerTask = outerExtractor.extract(record); + ConsumedRecord inner = ConsumedRecord + .builder() + .headers(record.headers()) + .key(record.key()) + .value(outerTask.taskDataBytes()) + .build(); + DecatonTask extracted = innerExtractor.extract(inner); + return new DecatonTask<>( + // Use outerTask#metadata because retry count is stored in rawTask#metada not extracted#metadata + outerTask.metadata(), + extracted.taskData(), + extracted.taskDataBytes()); + } + } } diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/SubscriptionBuilder.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/SubscriptionBuilder.java index 77b42029..7b68be0c 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/SubscriptionBuilder.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/SubscriptionBuilder.java @@ -303,7 +303,7 @@ private DecatonProcessorSupplier maybeRetryProcessorSupplier(Subscriptio KafkaProducerSupplier producerSupplier = Optional.ofNullable(retryConfig.producerSupplier()) .orElseGet(DefaultKafkaProducerSupplier::new); return new DecatonProcessorSupplierImpl<>(() -> { - DecatonTaskProducer producer = new DecatonTaskProducer(scope.retryTopic().get(), producerConfig, producerSupplier); + DecatonTaskProducer producer = new DecatonTaskProducer(producerConfig, producerSupplier); return new DecatonTaskRetryQueueingProcessor(scope, producer); }, ProcessorScope.SINGLETON); } diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/TaskExtractor.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/TaskExtractor.java index 92f562e2..398acbe8 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/TaskExtractor.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/TaskExtractor.java @@ -17,16 +17,16 @@ package com.linecorp.decaton.processor.runtime; /** - * An interface for classes extracting {@link DecatonTask} from given bytes. + * An interface for classes extracting {@link DecatonTask} from given record. * @param type of task. */ public interface TaskExtractor { /** * Extract object of type {@link DecatonTask} from given bytes. - * @param bytes raw message bytes. + * @param record {@link ConsumedRecord} to extract task from. * @return object of type {@link DecatonTask}. - * @throws RuntimeException this method can throw arbitrary {@link RuntimeException} if given bytes is invalid. + * @throws RuntimeException this method can throw arbitrary {@link RuntimeException} if given record is invalid. * If the method throws an exception, the task will be discarded and processor continues to process subsequent tasks. */ - DecatonTask extract(byte[] bytes); + DecatonTask extract(ConsumedRecord record); } diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DecatonTaskRetryQueueingProcessor.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DecatonTaskRetryQueueingProcessor.java index 101080b9..1535145a 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DecatonTaskRetryQueueingProcessor.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DecatonTaskRetryQueueingProcessor.java @@ -19,17 +19,22 @@ import java.time.Duration; import java.util.concurrent.CompletableFuture; +import org.apache.kafka.clients.producer.ProducerRecord; + import com.google.protobuf.ByteString; import com.linecorp.decaton.client.internal.DecatonTaskProducer; import com.linecorp.decaton.client.PutTaskResult; +import com.linecorp.decaton.client.internal.TaskMetadataUtil; import com.linecorp.decaton.processor.DecatonProcessor; import com.linecorp.decaton.processor.ProcessingContext; import com.linecorp.decaton.processor.TaskMetadata; import com.linecorp.decaton.processor.metrics.Metrics; import com.linecorp.decaton.processor.metrics.Metrics.RetryMetrics; +import com.linecorp.decaton.processor.runtime.ProcessorProperties; +import com.linecorp.decaton.processor.runtime.Property; import com.linecorp.decaton.processor.runtime.RetryConfig; -import com.linecorp.decaton.protocol.Decaton.DecatonTaskRequest; +import com.linecorp.decaton.protocol.internal.DecatonInternal.DecatonTaskRequest; import com.linecorp.decaton.protocol.Decaton.TaskMetadataProto; import lombok.extern.slf4j.Slf4j; @@ -39,11 +44,15 @@ public class DecatonTaskRetryQueueingProcessor implements DecatonProcessor metadataAsHeaderProperty; public DecatonTaskRetryQueueingProcessor(SubscriptionScope scope, DecatonTaskProducer producer) { RetryConfig retryConfig = scope.retryConfig().get(); // This won't be instantiated unless it present this.producer = producer; backoff = retryConfig.backoff(); + retryTopic = scope.retryTopic().get(); // This won't be instantiated unless it present + metadataAsHeaderProperty = scope.props().get(ProcessorProperties.CONFIG_TASK_METADATA_AS_HEADER); metrics = Metrics.withTags("subscription", scope.subscriptionId()).new RetryMetrics(); } @@ -59,14 +68,32 @@ public void process(ProcessingContext context, byte[] serializedTask) .setRetryCount(nextRetryCount) .setScheduledTimeMillis(nextTryTimeMillis) .build(); - DecatonTaskRequest request = - DecatonTaskRequest.newBuilder() - .setMetadata(taskMetadata) - .setSerializedTask(ByteString.copyFrom(serializedTask)) - .build(); + + final ProducerRecord record; + if (metadataAsHeaderProperty.value()) { + record = new ProducerRecord<>( + retryTopic, + null, + context.key(), + serializedTask, + context.headers()); + TaskMetadataUtil.writeAsHeader(taskMetadata, record.headers()); + } else { + DecatonTaskRequest request = + DecatonTaskRequest.newBuilder() + .setMetadata(taskMetadata) + .setSerializedTask(ByteString.copyFrom(serializedTask)) + .build(); + record = new ProducerRecord<>( + retryTopic, + null, + context.key(), + request.toByteArray(), + context.headers()); + } metrics.retryTaskRetries.record(nextRetryCount); - CompletableFuture future = producer.sendRequest(context.key(), request, null); + CompletableFuture future = producer.sendRequest(record); future.whenComplete((r, e) -> { if (e == null) { metrics.retryQueuedTasks.increment(); diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractor.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractor.java index 1fce5e18..458454d0 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractor.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractor.java @@ -19,10 +19,13 @@ import com.google.protobuf.InvalidProtocolBufferException; import com.linecorp.decaton.common.Deserializer; +import com.linecorp.decaton.client.internal.TaskMetadataUtil; +import com.linecorp.decaton.processor.runtime.ConsumedRecord; import com.linecorp.decaton.processor.runtime.DecatonTask; import com.linecorp.decaton.processor.runtime.TaskExtractor; import com.linecorp.decaton.processor.TaskMetadata; -import com.linecorp.decaton.protocol.Decaton.DecatonTaskRequest; +import com.linecorp.decaton.protocol.internal.DecatonInternal.DecatonTaskRequest; +import com.linecorp.decaton.protocol.Decaton.TaskMetadataProto; import lombok.RequiredArgsConstructor; @@ -31,18 +34,27 @@ public class DefaultTaskExtractor implements TaskExtractor { private final Deserializer taskDeserializer; @Override - public DecatonTask extract(byte[] bytes) { - try { - DecatonTaskRequest taskRequest = DecatonTaskRequest.parseFrom(bytes); - TaskMetadata metadata = TaskMetadata.fromProto(taskRequest.getMetadata()); - byte[] taskDataBytes = taskRequest.getSerializedTask().toByteArray(); - + public DecatonTask extract(ConsumedRecord record) { + TaskMetadataProto headerMeta = TaskMetadataUtil.readFromHeader(record.headers()); + if (headerMeta != null) { + byte[] taskDataBytes = record.value(); return new DecatonTask<>( - metadata, + TaskMetadata.fromProto(headerMeta), taskDeserializer.deserialize(taskDataBytes), taskDataBytes); - } catch (InvalidProtocolBufferException e) { - throw new IllegalArgumentException(e); + } else { + try { + DecatonTaskRequest taskRequest = DecatonTaskRequest.parseFrom(record.value()); + TaskMetadata metadata = TaskMetadata.fromProto(taskRequest.getMetadata()); + byte[] taskDataBytes = taskRequest.getSerializedTask().toByteArray(); + + return new DecatonTask<>( + metadata, + taskDeserializer.deserialize(taskDataBytes), + taskDataBytes); + } catch (InvalidProtocolBufferException e) { + throw new IllegalArgumentException(e); + } } } } diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/ProcessPipeline.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/ProcessPipeline.java index c7039689..61d5265a 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/ProcessPipeline.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/ProcessPipeline.java @@ -29,6 +29,7 @@ import com.linecorp.decaton.processor.LoggingContext; import com.linecorp.decaton.processor.ProcessingContext; import com.linecorp.decaton.processor.metrics.Metrics.TaskMetrics; +import com.linecorp.decaton.processor.runtime.ConsumedRecord; import com.linecorp.decaton.processor.runtime.DecatonTask; import com.linecorp.decaton.processor.runtime.ProcessorProperties; import com.linecorp.decaton.processor.runtime.TaskExtractor; @@ -124,7 +125,12 @@ public CompletionStage scheduleThenProcess(TaskRequest request) throws Int // visible for testing DecatonTask extract(TaskRequest request) { final DecatonTask extracted; - extracted = taskExtractor.extract(request.rawRequestBytes()); + extracted = taskExtractor.extract( + ConsumedRecord.builder() + .headers(request.headers()) + .key(request.key()) + .value(request.rawRequestBytes()) + .build()); if (!validateTask(extracted)) { throw new RuntimeException("Invalid task"); } diff --git a/processor/src/test/java/com/linecorp/decaton/processor/processors/CompactionProcessorTest.java b/processor/src/test/java/com/linecorp/decaton/processor/processors/CompactionProcessorTest.java index 2a6c1bf9..fdcd09ff 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/processors/CompactionProcessorTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/processors/CompactionProcessorTest.java @@ -101,6 +101,7 @@ private TaskInput put(DecatonProcessor processor, taskData.toByteArray()); TaskRequest request = new TaskRequest( new TopicPartition("topic", 1), 1, null, name.getBytes(StandardCharsets.UTF_8), null, NoopTrace.INSTANCE, null, null); + ProcessingContext context = spy(new ProcessingContextImpl<>("subscription", request, task, Arrays.asList(processor, downstream), null, diff --git a/processor/src/test/java/com/linecorp/decaton/processor/runtime/ProcessorSubscriptionTest.java b/processor/src/test/java/com/linecorp/decaton/processor/runtime/ProcessorSubscriptionTest.java index f3feff3b..86bfd396 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/runtime/ProcessorSubscriptionTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/runtime/ProcessorSubscriptionTest.java @@ -139,9 +139,9 @@ private static ProcessorSubscription subscription(Consumer consu SubscriptionScope scope = scope(tp.topic(), 0L); ProcessorsBuilder builder = ProcessorsBuilder.consuming(scope.originTopic(), - (byte[] bytes) -> new DecatonTask<>( + (ConsumedRecord record) -> new DecatonTask<>( TaskMetadata.builder().build(), - new String(bytes), bytes)); + new String(record.value()), record.value())); if (processor != null) { builder.thenProcess(processor); } @@ -277,8 +277,8 @@ public synchronized ConsumerRecords poll(Duration timeout) { consumer, NoopQuotaApplier.INSTANCE, ProcessorsBuilder.consuming(scope.originTopic(), - (byte[] bytes) -> new DecatonTask<>( - TaskMetadata.builder().build(), "dummy", bytes)) + (ConsumedRecord record) -> new DecatonTask<>( + TaskMetadata.builder().build(), "dummy", record.value())) .thenProcess(processor) .build(null), scope.props(), diff --git a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/DecatonTaskRetryQueueingProcessorTest.java b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/DecatonTaskRetryQueueingProcessorTest.java index 010f0912..5d45733f 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/DecatonTaskRetryQueueingProcessorTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/DecatonTaskRetryQueueingProcessorTest.java @@ -16,12 +16,13 @@ package com.linecorp.decaton.processor.runtime.internal; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.never; @@ -33,6 +34,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; +import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.KafkaException; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -44,15 +46,17 @@ import org.mockito.quality.Strictness; import com.linecorp.decaton.client.internal.DecatonTaskProducer; +import com.linecorp.decaton.client.internal.TaskMetadataUtil; import com.linecorp.decaton.processor.ProcessingContext; import com.linecorp.decaton.processor.TaskMetadata; import com.linecorp.decaton.processor.runtime.DefaultSubPartitioner; import com.linecorp.decaton.processor.runtime.ProcessorProperties; +import com.linecorp.decaton.processor.runtime.Property; import com.linecorp.decaton.processor.runtime.RetryConfig; import com.linecorp.decaton.processor.runtime.SubPartitionRuntime; import com.linecorp.decaton.processor.tracing.internal.NoopTracingProvider; -import com.linecorp.decaton.protocol.Decaton.DecatonTaskRequest; import com.linecorp.decaton.protocol.Sample.HelloTask; +import com.linecorp.decaton.protocol.internal.DecatonInternal.DecatonTaskRequest; @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.LENIENT) @@ -78,13 +82,14 @@ public class DecatonTaskRetryQueueingProcessorTest { @BeforeEach public void setUp() { processor = new DecatonTaskRetryQueueingProcessor(scope, producer); - doReturn(CompletableFuture.completedFuture(null)).when(producer).sendRequest(any(), any(), any()); + doReturn(CompletableFuture.completedFuture(null)).when(producer).sendRequest(any()); doReturn(new CompletionImpl()).when(context).deferCompletion(); doReturn("key".getBytes(StandardCharsets.UTF_8)).when(context).key(); doReturn(TaskMetadata.builder().build()).when(context).metadata(); } @Test + @SuppressWarnings("unchecked") public void testRetryRequest() throws InterruptedException { byte[] key = "key".getBytes(StandardCharsets.UTF_8); TaskMetadata meta = @@ -103,13 +108,13 @@ public void testRetryRequest() throws InterruptedException { long currentTime = System.currentTimeMillis(); processor.process(context, task.toByteArray()); - ArgumentCaptor captor = ArgumentCaptor.forClass(DecatonTaskRequest.class); - verify(producer, times(1)).sendRequest(eq(key), captor.capture(), eq(null)); + ArgumentCaptor> captor = ArgumentCaptor.forClass(ProducerRecord.class); + verify(producer, times(1)).sendRequest(captor.capture()); - DecatonTaskRequest request = captor.getValue(); - assertEquals(task.toByteString(), request.getSerializedTask()); + ProducerRecord record = captor.getValue(); + assertArrayEquals(task.toByteArray(), record.value()); - TaskMetadata gotMeta = TaskMetadata.fromProto(request.getMetadata()); + TaskMetadata gotMeta = TaskMetadata.fromProto(TaskMetadataUtil.readFromHeader(record.headers())); assertEquals(meta.sourceApplicationId(), gotMeta.sourceApplicationId()); assertEquals(meta.sourceInstanceId(), gotMeta.sourceInstanceId()); assertEquals(meta.timestampMillis(), gotMeta.timestampMillis()); @@ -123,7 +128,7 @@ public void testDeferCompletion() throws InterruptedException { CompletionImpl comp = new CompletionImpl(); doReturn(comp).when(context).deferCompletion(); - doReturn(future).when(producer).sendRequest(any(), any(), any()); + doReturn(future).when(producer).sendRequest(any()); processor.process(context, HelloTask.getDefaultInstance().toByteArray()); @@ -136,7 +141,7 @@ public void testDeferCompletion() throws InterruptedException { @Test public void testDeferCompletion_EXCEPTION() throws InterruptedException { - doThrow(new KafkaException("kafka")).when(producer).sendRequest(any(), any(), any()); + doThrow(new KafkaException("kafka")).when(producer).sendRequest(any()); try { processor.process(context, HelloTask.getDefaultInstance().toByteArray()); @@ -146,4 +151,51 @@ public void testDeferCompletion_EXCEPTION() throws InterruptedException { verify(context, never()).deferCompletion(); } + + @Test + public void testLegacyRetryTaskFormat() throws Exception { + byte[] key = "key".getBytes(StandardCharsets.UTF_8); + TaskMetadata meta = + TaskMetadata.builder() + .sourceApplicationId("unit-test") + .sourceInstanceId("testing") + .timestampMillis(12345) + .retryCount(1) + .scheduledTimeMillis(67891) + .build(); + doReturn(key).when(context).key(); + doReturn(meta).when(context).metadata(); + + SubscriptionScope scope = new SubscriptionScope( + "subscription", "topic", + SubPartitionRuntime.THREAD_POOL, + Optional.of(RetryConfig.builder().backoff(RETRY_BACKOFF).build()), Optional.empty(), + ProcessorProperties.builder() + .set(Property.ofStatic(ProcessorProperties.CONFIG_TASK_METADATA_AS_HEADER, + false)) + .build(), NoopTracingProvider.INSTANCE, + ConsumerSupplier.DEFAULT_MAX_POLL_RECORDS, + DefaultSubPartitioner::new); + + DecatonTaskRetryQueueingProcessor processor = new DecatonTaskRetryQueueingProcessor(scope, producer); + + HelloTask task = HelloTask.getDefaultInstance(); + long currentTime = System.currentTimeMillis(); + processor.process(context, task.toByteArray()); + + ArgumentCaptor> captor = ArgumentCaptor.forClass(ProducerRecord.class); + verify(producer, times(1)).sendRequest(captor.capture()); + + ProducerRecord record = captor.getValue(); + DecatonTaskRequest taskRequest = DecatonTaskRequest.parseFrom(record.value()); + + assertArrayEquals(task.toByteArray(), taskRequest.getSerializedTask().toByteArray()); + assertNull(TaskMetadataUtil.readFromHeader(record.headers())); + + assertEquals(meta.sourceApplicationId(), taskRequest.getMetadata().getSourceApplicationId()); + assertEquals(meta.sourceInstanceId(), taskRequest.getMetadata().getSourceInstanceId()); + assertEquals(meta.timestampMillis(), taskRequest.getMetadata().getTimestampMillis()); + assertEquals(meta.retryCount() + 1, taskRequest.getMetadata().getRetryCount()); + assertTrue(taskRequest.getMetadata().getScheduledTimeMillis() >= currentTime + RETRY_BACKOFF.toMillis()); + } } diff --git a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractorTest.java b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractorTest.java index 299b182d..2208ec85 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractorTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractorTest.java @@ -19,18 +19,20 @@ import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; +import org.apache.kafka.common.header.internals.RecordHeaders; import org.junit.jupiter.api.Test; +import com.linecorp.decaton.processor.runtime.ConsumedRecord; import com.linecorp.decaton.processor.runtime.DecatonTask; import com.linecorp.decaton.protobuf.ProtocolBuffersDeserializer; -import com.linecorp.decaton.protocol.Decaton.DecatonTaskRequest; +import com.linecorp.decaton.protocol.internal.DecatonInternal.DecatonTaskRequest; import com.linecorp.decaton.protocol.Decaton.TaskMetadataProto; import com.linecorp.decaton.protocol.Sample.HelloTask; public class DefaultTaskExtractorTest { private static final HelloTask TASK = HelloTask.getDefaultInstance(); - private static final DecatonTaskRequest REQUEST = + private static final DecatonTaskRequest LEGACY_REQUEST = DecatonTaskRequest.newBuilder() .setMetadata(TaskMetadataProto.newBuilder().setTimestampMillis(1561709151628L).build()) .setSerializedTask(TASK.toByteString()) @@ -40,9 +42,15 @@ public void testExtract() { DefaultTaskExtractor extractor = new DefaultTaskExtractor<>( new ProtocolBuffersDeserializer<>(HelloTask.parser())); - DecatonTask extracted = extractor.extract(REQUEST.toByteArray()); + ConsumedRecord record = ConsumedRecord + .builder() + .headers(new RecordHeaders()) + .value(LEGACY_REQUEST.toByteArray()) + .build(); - assertEquals(REQUEST.getMetadata(), extracted.metadata().toProto()); + DecatonTask extracted = extractor.extract(record); + + assertEquals(LEGACY_REQUEST.getMetadata(), extracted.metadata().toProto()); assertEquals(TASK, extracted.taskData()); assertArrayEquals(TASK.toByteArray(), extracted.taskDataBytes()); diff --git a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessPipelineTest.java b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessPipelineTest.java index 1801e74e..ce0bd277 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessPipelineTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessPipelineTest.java @@ -41,6 +41,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -65,8 +66,6 @@ import com.linecorp.decaton.processor.runtime.TaskExtractor; import com.linecorp.decaton.processor.tracing.internal.NoopTracingProvider; import com.linecorp.decaton.processor.tracing.internal.NoopTracingProvider.NoopTrace; -import com.linecorp.decaton.protocol.Decaton.DecatonTaskRequest; -import com.linecorp.decaton.protocol.Decaton.TaskMetadataProto; import com.linecorp.decaton.protocol.Sample.HelloTask; @ExtendWith(MockitoExtension.class) @@ -74,12 +73,6 @@ public class ProcessPipelineTest { private static final HelloTask TASK = HelloTask.getDefaultInstance(); - private static final DecatonTaskRequest REQUEST = - DecatonTaskRequest.newBuilder() - .setMetadata(TaskMetadataProto.getDefaultInstance()) - .setSerializedTask(TASK.toByteString()) - .build(); - private final DynamicProperty completionTimeoutMsProp = new DynamicProperty<>(ProcessorProperties.CONFIG_DEFERRED_COMPLETE_TIMEOUT_MS); @@ -103,7 +96,7 @@ public class ProcessPipelineTest { private static TaskRequest taskRequest() { return new TaskRequest( - new TopicPartition("topic", 1), 1, new OffsetState(1234), "TEST".getBytes(StandardCharsets.UTF_8), null, NoopTrace.INSTANCE, REQUEST.toByteArray(), null); + new TopicPartition("topic", 1), 1, new OffsetState(1234), "TEST".getBytes(StandardCharsets.UTF_8), null, NoopTrace.INSTANCE, TASK.toByteArray(), null); } @Mock @@ -131,11 +124,11 @@ public void setUp() { @Test public void testScheduleThenProcess_SYNC_COMPLETE() throws InterruptedException { when(extractorMock.extract(any())) - .thenReturn(new DecatonTask<>(TaskMetadata.fromProto(REQUEST.getMetadata()), TASK, TASK.toByteArray())); + .thenReturn(new DecatonTask<>(TaskMetadata.builder().build(), TASK, TASK.toByteArray())); TaskRequest request = taskRequest(); pipeline.scheduleThenProcess(request); - verify(schedulerMock, times(1)).schedule(eq(TaskMetadata.fromProto(REQUEST.getMetadata()))); + verify(schedulerMock, times(1)).schedule(eq(TaskMetadata.builder().build())); verify(processorMock, times(1)).process(any(), eq(TASK)); assertTrue(request.offsetState().completion().isComplete()); assertEquals(completionTimeoutMsProp.value() + clock.millis(), @@ -145,7 +138,7 @@ public void testScheduleThenProcess_SYNC_COMPLETE() throws InterruptedException @Test public void testScheduleThenProcess_ASYNC_COMPLETE() throws InterruptedException { when(extractorMock.extract(any())) - .thenReturn(new DecatonTask<>(TaskMetadata.fromProto(REQUEST.getMetadata()), TASK, TASK.toByteArray())); + .thenReturn(new DecatonTask<>(TaskMetadata.builder().build(), TASK, TASK.toByteArray())); CountDownLatch beforeComplete = new CountDownLatch(1); CountDownLatch afterComplete = new CountDownLatch(1); doAnswer(invocation -> { @@ -166,7 +159,7 @@ public void testScheduleThenProcess_ASYNC_COMPLETE() throws InterruptedException TaskRequest request = taskRequest(); pipeline.scheduleThenProcess(request); - verify(schedulerMock, times(1)).schedule(eq(TaskMetadata.fromProto(REQUEST.getMetadata()))); + verify(schedulerMock, times(1)).schedule(eq(TaskMetadata.builder().build())); verify(processorMock, times(1)).process(any(), eq(TASK)); // Should complete only after processor completes it @@ -221,7 +214,7 @@ public void testScheduleThenProcess_ExtractThrows() throws InterruptedException @Test public void testExtract_PurgeRawRequestBytes() { when(extractorMock.extract(any())) - .thenReturn(new DecatonTask<>(TaskMetadata.fromProto(REQUEST.getMetadata()), TASK, TASK.toByteArray())); + .thenReturn(new DecatonTask<>(TaskMetadata.builder().build(), TASK, TASK.toByteArray())); TaskRequest request = taskRequest(); pipeline.extract(request); @@ -231,7 +224,7 @@ public void testExtract_PurgeRawRequestBytes() { @Test public void testScheduleThenProcess_SynchronousFailure() throws InterruptedException { - DecatonTask task = new DecatonTask<>(TaskMetadata.fromProto(REQUEST.getMetadata()), TASK, TASK.toByteArray()); + DecatonTask task = new DecatonTask<>(TaskMetadata.builder().build(), TASK, TASK.toByteArray()); when(extractorMock.extract(any())).thenReturn(task); doThrow(new RuntimeException()).when(processorMock).process(any(), eq(TASK)); @@ -245,7 +238,7 @@ public void testScheduleThenProcess_SynchronousFailure() throws InterruptedExcep @Test @Timeout(5) public void testScheduleThenProcess_Terminate() throws InterruptedException { - DecatonTask task = new DecatonTask<>(TaskMetadata.fromProto(REQUEST.getMetadata()), TASK, TASK.toByteArray()); + DecatonTask task = new DecatonTask<>(TaskMetadata.builder().build(), TASK, TASK.toByteArray()); when(extractorMock.extract(any())).thenReturn(task); CountDownLatch atSchedule = new CountDownLatch(1); diff --git a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessingContextImplTest.java b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessingContextImplTest.java index 0333952d..b0960c6d 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessingContextImplTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessingContextImplTest.java @@ -63,8 +63,6 @@ import com.linecorp.decaton.processor.tracing.TestTracingProvider; import com.linecorp.decaton.processor.tracing.TracingProvider.RecordTraceHandle; import com.linecorp.decaton.processor.tracing.internal.NoopTracingProvider.NoopTrace; -import com.linecorp.decaton.protocol.Decaton.DecatonTaskRequest; -import com.linecorp.decaton.protocol.Decaton.TaskMetadataProto; import com.linecorp.decaton.protocol.Sample.HelloTask; import lombok.RequiredArgsConstructor; @@ -110,12 +108,6 @@ public void process(ProcessingContext context, byte[] task) throws Inter private static final HelloTask TASK = HelloTask.getDefaultInstance(); - private static final DecatonTaskRequest REQUEST = - DecatonTaskRequest.newBuilder() - .setMetadata(TaskMetadataProto.getDefaultInstance()) - .setSerializedTask(TASK.toByteString()) - .build(); - @Mock private NamedProcessor processorMock; @@ -129,9 +121,9 @@ private static ProcessingContextImpl context(RecordTraceHandle traceH DecatonProcessor... processors) { TaskRequest request = new TaskRequest( new TopicPartition("topic", 1), 1, null, "TEST".getBytes(StandardCharsets.UTF_8), - null, traceHandle, REQUEST.toByteArray(), null); + null, traceHandle, TASK.toByteArray(), null); DecatonTask task = new DecatonTask<>( - TaskMetadata.fromProto(REQUEST.getMetadata()), TASK, TASK.toByteArray()); + TaskMetadata.builder().build(), TASK, TASK.toByteArray()); return new ProcessingContextImpl<>("subscription", request, task, Arrays.asList(processors), null, ProcessorProperties.builder().build()); } @@ -374,9 +366,9 @@ public void testRetry() throws InterruptedException { CountDownLatch retryLatch = new CountDownLatch(1); DecatonProcessor retryProcessor = spy(new AsyncCompleteProcessor(retryLatch)); TaskRequest request = new TaskRequest( - new TopicPartition("topic", 1), 1, null, "TEST".getBytes(StandardCharsets.UTF_8), null, null, REQUEST.toByteArray(), null); + new TopicPartition("topic", 1), 1, null, "TEST".getBytes(StandardCharsets.UTF_8), null, null, TASK.toByteArray(), null); DecatonTask task = new DecatonTask<>( - TaskMetadata.fromProto(REQUEST.getMetadata()), TASK.toByteArray(), TASK.toByteArray()); + TaskMetadata.builder().build(), TASK.toByteArray(), TASK.toByteArray()); ProcessingContextImpl context = spy(new ProcessingContextImpl<>("subscription", request, task, @@ -405,9 +397,9 @@ public void testRetryAtCompletionTimeout() throws InterruptedException { CountDownLatch retryLatch = new CountDownLatch(1); DecatonProcessor retryProcessor = spy(new AsyncCompleteProcessor(retryLatch)); TaskRequest request = new TaskRequest( - new TopicPartition("topic", 1), 1, null, "TEST".getBytes(StandardCharsets.UTF_8), null, null, REQUEST.toByteArray(), null); + new TopicPartition("topic", 1), 1, null, "TEST".getBytes(StandardCharsets.UTF_8), null, null, TASK.toByteArray(), null); DecatonTask task = new DecatonTask<>( - TaskMetadata.fromProto(REQUEST.getMetadata()), TASK.toByteArray(), TASK.toByteArray()); + TaskMetadata.builder().build(), TASK.toByteArray(), TASK.toByteArray()); DecatonProcessor processor = (context, ignored) -> context.deferCompletion(comp -> { try { diff --git a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessorUnitTest.java b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessorUnitTest.java index 72456efc..ed832b05 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessorUnitTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessorUnitTest.java @@ -35,13 +35,10 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import com.linecorp.decaton.processor.DeferredCompletion; import com.linecorp.decaton.processor.runtime.DefaultSubPartitioner; import com.linecorp.decaton.processor.runtime.ProcessorProperties; import com.linecorp.decaton.processor.runtime.SubPartitionRuntime; import com.linecorp.decaton.processor.tracing.internal.NoopTracingProvider; -import com.linecorp.decaton.protocol.Decaton.DecatonTaskRequest; -import com.linecorp.decaton.protocol.Decaton.TaskMetadataProto; import com.linecorp.decaton.protocol.Sample.HelloTask; @ExtendWith(MockitoExtension.class) @@ -69,13 +66,8 @@ public void setUp() { 0); unit = spy(new ProcessorUnit(scope, pipeline, Executors.newSingleThreadExecutor())); - DecatonTaskRequest request = - DecatonTaskRequest.newBuilder() - .setMetadata(TaskMetadataProto.getDefaultInstance()) - .setSerializedTask(HelloTask.getDefaultInstance().toByteString()) - .build(); - taskRequest = new TaskRequest(topicPartition, 1, new OffsetState(1234), null, null, null, request.toByteArray(), null); + taskRequest = new TaskRequest(topicPartition, 1, new OffsetState(1234), null, null, null, HelloTask.getDefaultInstance().toByteArray(), null); } @Test diff --git a/protocol/src/main/proto/decaton.proto b/protocol/src/main/proto/decaton.proto index 90a04c27..f1cc49b9 100644 --- a/protocol/src/main/proto/decaton.proto +++ b/protocol/src/main/proto/decaton.proto @@ -20,9 +20,3 @@ message TaskMetadataProto { // but it isn't guaranteed to be executed exactly at the time. int64 scheduled_time_millis = 5; } - -// The topic level message struct of the message to request Decaton for processing a task. -message DecatonTaskRequest { - TaskMetadataProto metadata = 1; - bytes serialized_task = 2; -} diff --git a/protocol/src/main/proto/decaton_internal.proto b/protocol/src/main/proto/decaton_internal.proto new file mode 100644 index 00000000..be873383 --- /dev/null +++ b/protocol/src/main/proto/decaton_internal.proto @@ -0,0 +1,18 @@ +syntax = "proto3"; +import "decaton.proto"; + +package com.linecorp.decaton.protocol.internal; + +// The topic level message struct of the message to request Decaton for processing a task. +// +// When Decaton had started, Kafka didn't have record headers yet so we needed to wrap tasks +// in a protocol to propagate task metadata. +// As we can use headers now, decaton client has started to support propagating metadata +// through headers and sending tasks as record value directly. +// This protocol could be removed in the future release. +message DecatonTaskRequest { + option deprecated = true; + + TaskMetadataProto metadata = 1; + bytes serialized_task = 2; +} diff --git a/testing/src/main/java/com/linecorp/decaton/testing/TestUtils.java b/testing/src/main/java/com/linecorp/decaton/testing/TestUtils.java index e621b2e1..646b7694 100644 --- a/testing/src/main/java/com/linecorp/decaton/testing/TestUtils.java +++ b/testing/src/main/java/com/linecorp/decaton/testing/TestUtils.java @@ -39,7 +39,6 @@ import com.google.protobuf.MessageLite; import com.linecorp.decaton.client.DecatonClient; -import com.linecorp.decaton.client.kafka.ProtocolBuffersKafkaSerializer; import com.linecorp.decaton.common.Serializer; import com.linecorp.decaton.processor.runtime.ProcessorSubscription; import com.linecorp.decaton.processor.runtime.Property; @@ -48,7 +47,6 @@ import com.linecorp.decaton.processor.runtime.SubscriptionStateListener; import com.linecorp.decaton.processor.runtime.SubscriptionStateListener.State; import com.linecorp.decaton.protobuf.ProtocolBuffersSerializer; -import com.linecorp.decaton.protocol.Decaton.DecatonTaskRequest; public class TestUtils { private static final AtomicInteger sequence = new AtomicInteger(0); @@ -110,10 +108,10 @@ public static DecatonClient client(String topic, * @param bootstrapServers bootstrap servers to connect * @return {@link Producer} instance with preset configurations */ - public static Producer producer(String bootstrapServers) { + public static Producer producer(String bootstrapServers) { return producer(bootstrapServers, new ByteArraySerializer(), - new ProtocolBuffersKafkaSerializer<>()); + new ByteArraySerializer()); } /** diff --git a/testing/src/main/java/com/linecorp/decaton/testing/processor/ProcessorTestSuite.java b/testing/src/main/java/com/linecorp/decaton/testing/processor/ProcessorTestSuite.java index 95fc7d5e..bdc12249 100644 --- a/testing/src/main/java/com/linecorp/decaton/testing/processor/ProcessorTestSuite.java +++ b/testing/src/main/java/com/linecorp/decaton/testing/processor/ProcessorTestSuite.java @@ -52,6 +52,7 @@ import com.google.protobuf.ByteString; import com.linecorp.decaton.client.DecatonClientBuilder.DefaultKafkaProducerSupplier; +import com.linecorp.decaton.client.internal.TaskMetadataUtil; import com.linecorp.decaton.processor.Completion; import com.linecorp.decaton.processor.DecatonProcessor; import com.linecorp.decaton.processor.runtime.PerKeyQuotaConfig; @@ -69,8 +70,8 @@ import com.linecorp.decaton.processor.runtime.TaskExtractor; import com.linecorp.decaton.processor.runtime.internal.RateLimiter; import com.linecorp.decaton.processor.tracing.TracingProvider; -import com.linecorp.decaton.protocol.Decaton.DecatonTaskRequest; import com.linecorp.decaton.protocol.Decaton.TaskMetadataProto; +import com.linecorp.decaton.protocol.internal.DecatonInternal.DecatonTaskRequest; import com.linecorp.decaton.testing.KafkaClusterExtension; import com.linecorp.decaton.testing.TestUtils; import com.linecorp.decaton.testing.processor.ProcessingGuarantee.GuaranteeType; @@ -109,8 +110,9 @@ public class ProcessorTestSuite { private final Set semantics; private final SubscriptionStatesListener statesListener; private final TracingProvider tracingProvider; - private final Function> producerSupplier; + private final Function> producerSupplier; private final TaskExtractor customTaskExtractor; + private final boolean produceTasksWithHeaderMetadata; private static final int DEFAULT_NUM_TASKS = 10000; private static final int NUM_KEYS = 100; @@ -179,11 +181,15 @@ public static class Builder { * Expected use case: * supply a producer which adds tracing id to each message to test tracing-functionality in e2e */ - private Function> producerSupplier = TestUtils::producer; + private Function> producerSupplier = TestUtils::producer; /** * Supply custom {@link TaskExtractor} to be used to extract a task. */ private TaskExtractor customTaskExtractor; + /** + * Specify whether to produce tasks with header metadata instead of DecatonTaskRequest format + */ + private boolean produceTasksWithHeaderMetadata = true; /** * Exclude semantics from assertion. @@ -231,7 +237,8 @@ public ProcessorTestSuite build() { statesListener, tracingProvider, producerSupplier, - customTaskExtractor); + customTaskExtractor, + produceTasksWithHeaderMetadata); } } @@ -250,7 +257,7 @@ public void run() throws InterruptedException, ExecutionException, TimeoutExcept CountDownLatch rollingRestartLatch = new CountDownLatch(numTasks / 2); ProcessorSubscription[] subscriptions = new ProcessorSubscription[NUM_SUBSCRIPTION_INSTANCES]; - try (Producer producer = producerSupplier.apply(rule.bootstrapServers())) { + try (Producer producer = producerSupplier.apply(rule.bootstrapServers())) { ConcurrentMap queuedTaskOffsets = new ConcurrentHashMap<>(); for (int i = 0; i < subscriptions.length; i++) { subscriptions[i] = newSubscription(i, topic, Optional.of(rollingRestartLatch), queuedTaskOffsets); @@ -328,7 +335,7 @@ private ProcessorSubscription newSubscription( if (retryConfig != null) { RetryConfigBuilder retryConfigBuilder = retryConfig.toBuilder(); retryConfigBuilder.producerSupplier(props -> { - final Producer producer; + final Producer producer; if (retryConfig.producerSupplier() != null) { producer = retryConfig.producerSupplier().getProducer(props); } else { @@ -412,7 +419,7 @@ private void awaitAllOffsetsCommitted(Map producedOffsets) * @return A CompletableFuture of Map, which holds partition as the key and max offset as the value */ private CompletableFuture> produceTasks( - Producer producer, + Producer producer, String topic, Consumer onProduce) { @SuppressWarnings("unchecked") @@ -428,13 +435,19 @@ private CompletableFuture> produceTasks( .setSourceApplicationId("test-application") .setSourceInstanceId("test-instance") .build(); - DecatonTaskRequest request = - DecatonTaskRequest.newBuilder() - .setMetadata(taskMetadata) - .setSerializedTask(ByteString.copyFrom(serializer.serialize(task))) - .build(); - ProducerRecord record = - new ProducerRecord<>(topic, null, taskMetadata.getTimestampMillis(), key, request); + final ProducerRecord record; + if (produceTasksWithHeaderMetadata) { + record = new ProducerRecord<>(topic, null, taskMetadata.getTimestampMillis(), key, serializer.serialize(task)); + TaskMetadataUtil.writeAsHeader(taskMetadata, record.headers()); + } else { + DecatonTaskRequest request = + DecatonTaskRequest.newBuilder() + .setMetadata(taskMetadata) + .setSerializedTask(ByteString.copyFrom(serializer.serialize(task))) + .build(); + record = new ProducerRecord<>(topic, null, taskMetadata.getTimestampMillis(), key, request.toByteArray()); + } + CompletableFuture future = new CompletableFuture<>(); produceFutures[i] = future; diff --git a/testing/src/main/java/com/linecorp/decaton/testing/processor/TestTracingProducer.java b/testing/src/main/java/com/linecorp/decaton/testing/processor/TestTracingProducer.java index 7f9cfd8d..f225f892 100644 --- a/testing/src/main/java/com/linecorp/decaton/testing/processor/TestTracingProducer.java +++ b/testing/src/main/java/com/linecorp/decaton/testing/processor/TestTracingProducer.java @@ -27,14 +27,13 @@ import org.apache.kafka.common.header.internals.RecordHeader; import com.linecorp.decaton.processor.tracing.TestTracingProvider; -import com.linecorp.decaton.protocol.Decaton.DecatonTaskRequest; -public class TestTracingProducer extends ProducerAdaptor { - public TestTracingProducer(Producer delegate) { +public class TestTracingProducer extends ProducerAdaptor { + public TestTracingProducer(Producer delegate) { super(delegate); } - private static void propagateCurrentTrace(ProducerRecord record) { + private static void propagateCurrentTrace(ProducerRecord record) { String traceId = TestTracingProvider.getCurrentTraceId(); if (null == traceId) { traceId = "trace-" + UUID.randomUUID(); @@ -45,13 +44,13 @@ private static void propagateCurrentTrace(ProducerRecord send(ProducerRecord record) { + public Future send(ProducerRecord record) { propagateCurrentTrace(record); return super.send(record); } @Override - public Future send(ProducerRecord record, Callback callback) { + public Future send(ProducerRecord record, Callback callback) { propagateCurrentTrace(record); return super.send(record, callback); }