diff --git a/clients/src/main/java/org/apache/kafka/common/internals/Topic.java b/clients/src/main/java/org/apache/kafka/common/internals/Topic.java index d94c91da50..3709d8005b 100644 --- a/clients/src/main/java/org/apache/kafka/common/internals/Topic.java +++ b/clients/src/main/java/org/apache/kafka/common/internals/Topic.java @@ -30,7 +30,13 @@ public class Topic { public static final String TRANSACTION_STATE_TOPIC_NAME = "__transaction_state"; public static final String SHARE_GROUP_STATE_TOPIC_NAME = "__share_group_state"; public static final String CLUSTER_METADATA_TOPIC_NAME = "__cluster_metadata"; + + // AutoMQ inject start public static final String AUTO_BALANCER_METRICS_TOPIC_NAME = "__auto_balancer_metrics"; + public static final String TABLE_TOPIC_CONTROL_TOPIC_NAME = "__automq_table_control"; + public static final String TABLE_TOPIC_DATA_TOPIC_NAME = "__automq_table_data"; + // AutoMQ inject end + public static final TopicPartition CLUSTER_METADATA_TOPIC_PARTITION = new TopicPartition( CLUSTER_METADATA_TOPIC_NAME, 0 diff --git a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala index 0e307b16a8..9060c464bf 100644 --- a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala +++ b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala @@ -27,7 +27,7 @@ import org.apache.kafka.clients.ClientResponse import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.errors.InvalidTopicException import org.apache.kafka.common.internals.Topic -import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME} +import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TABLE_TOPIC_CONTROL_TOPIC_NAME, TABLE_TOPIC_DATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME} import org.apache.kafka.common.message.CreateTopicsRequestData import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicConfig, CreatableTopicConfigCollection} import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic @@ -247,7 +247,7 @@ class DefaultAutoTopicCreationManager( txnCoordinator.transactionTopicConfigs)) // AutoMQ inject start - case "__automq_table_control" => { + case TABLE_TOPIC_CONTROL_TOPIC_NAME => { val configs = new Properties() configs.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, 20 * 1024 * 1024) new CreatableTopic() @@ -256,7 +256,7 @@ class DefaultAutoTopicCreationManager( .setReplicationFactor(1) .setConfigs(convertToTopicConfigCollections(configs)) } - case "__automq_table_data" => { + case TABLE_TOPIC_DATA_TOPIC_NAME => { val configs = new Properties() configs.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, 20 * 1024 * 1024) new CreatableTopic() diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 65b65fa3d8..557fb7e12e 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -70,7 +70,7 @@ import org.apache.kafka.common.{Node, TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.coordinator.group.{Group, GroupCoordinator} import org.apache.kafka.server.ClientMetricsManager import org.apache.kafka.server.authorizer._ -import org.apache.kafka.server.common.{MetadataVersion} +import org.apache.kafka.server.common.MetadataVersion import org.apache.kafka.server.common.MetadataVersion.{IBP_0_11_0_IV0, IBP_2_3_IV0} import org.apache.kafka.server.record.BrokerCompressionType import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation, FetchParams, FetchPartitionData} @@ -1288,6 +1288,14 @@ class KafkaApis(val requestChannel: RequestChannel, val controllerMutationQuota = quotas.controllerMutation.newPermissiveQuotaFor(request) autoTopicCreationManager.createTopics(nonExistingTopics, controllerMutationQuota, Some(request.context)) } else { + // AutoMQ inject start + for (tableTopic <- Set(Topic.TABLE_TOPIC_CONTROL_TOPIC_NAME, Topic.TABLE_TOPIC_DATA_TOPIC_NAME)) { + if (nonExistingTopics.contains(tableTopic)) { + val controllerMutationQuota = quotas.controllerMutation.newPermissiveQuotaFor(request) + autoTopicCreationManager.createTopics(Set(tableTopic), controllerMutationQuota, Some(request.context)) + } + } + // AutoMQ inject end nonExistingTopics.map { topic => val error = try { Topic.validate(topic)