diff --git a/core/src/main/java/kafka/automq/zonerouter/NoopProduceRouter.java b/core/src/main/java/kafka/automq/zonerouter/NoopProduceRouter.java index c573b7e919..df88b3c506 100644 --- a/core/src/main/java/kafka/automq/zonerouter/NoopProduceRouter.java +++ b/core/src/main/java/kafka/automq/zonerouter/NoopProduceRouter.java @@ -12,6 +12,7 @@ package kafka.automq.zonerouter; import kafka.server.MetadataCache; +import kafka.server.RequestLocal; import kafka.server.streamaspect.ElasticKafkaApis; import org.apache.kafka.common.Node; @@ -44,7 +45,9 @@ public NoopProduceRouter(ElasticKafkaApis kafkaApis, MetadataCache metadataCache public void handleProduceRequest(short apiVersion, ClientIdMetadata clientId, int timeout, short requiredAcks, boolean internalTopicsAllowed, String transactionId, Map entriesPerPartition, Consumer> responseCallback, - Consumer> recordValidationStatsCallback) { + Consumer> recordValidationStatsCallback, + RequestLocal requestLocal + ) { kafkaApis.handleProduceAppendJavaCompatible( timeout, requiredAcks, @@ -59,7 +62,8 @@ public void handleProduceRequest(short apiVersion, ClientIdMetadata clientId, in recordValidationStatsCallback.accept(rst); return null; }, - apiVersion + apiVersion, + requestLocal ); } diff --git a/core/src/main/java/kafka/automq/zonerouter/ProduceRouter.java b/core/src/main/java/kafka/automq/zonerouter/ProduceRouter.java index 34fd1ba017..3b3a67ece4 100644 --- a/core/src/main/java/kafka/automq/zonerouter/ProduceRouter.java +++ b/core/src/main/java/kafka/automq/zonerouter/ProduceRouter.java @@ -11,6 +11,8 @@ package kafka.automq.zonerouter; +import kafka.server.RequestLocal; + import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.message.MetadataResponseData; @@ -36,7 +38,8 @@ void handleProduceRequest( String transactionId, Map entriesPerPartition, Consumer> responseCallback, - Consumer> recordValidationStatsCallback + Consumer> recordValidationStatsCallback, + RequestLocal requestLocal ); CompletableFuture handleZoneRouterRequest(byte[] metadata); diff --git a/core/src/main/scala/kafka/server/streamaspect/ElasticKafkaApis.scala b/core/src/main/scala/kafka/server/streamaspect/ElasticKafkaApis.scala index 047567b489..1aec8ed74c 100644 --- a/core/src/main/scala/kafka/server/streamaspect/ElasticKafkaApis.scala +++ b/core/src/main/scala/kafka/server/streamaspect/ElasticKafkaApis.scala @@ -395,6 +395,7 @@ class ElasticKafkaApis( authorizedRequestInfo.asJava, sendResponseCallbackJava, processingStatsCallbackJava, + requestLocal, ) // if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected; @@ -425,7 +426,9 @@ class ElasticKafkaApis( entriesPerPartition: util.Map[TopicPartition, MemoryRecords], responseCallback: util.Map[TopicPartition, PartitionResponse] => Unit, recordValidationStatsCallback: util.Map[TopicPartition, RecordValidationStats] => Unit = _ => (), - apiVersion: Short): Unit = { + apiVersion: Short, + requestLocal: RequestLocal + ): Unit = { val transactionSupportedOperation = if (apiVersion > 10) genericError else defaultError replicaManager.handleProduceAppend( timeout = timeout, @@ -435,7 +438,8 @@ class ElasticKafkaApis( entriesPerPartition = entriesPerPartition.asScala, responseCallback = rst => responseCallback.apply(rst.asJava), recordValidationStatsCallback = rst => recordValidationStatsCallback.apply(rst.asJava), - transactionSupportedOperation = transactionSupportedOperation + transactionSupportedOperation = transactionSupportedOperation, + requestLocal = requestLocal, ) }