Skip to content

Commit

Permalink
feat(table): auto create table topic control topic (#2186)
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <hanxvdovehx@gmail.com>
  • Loading branch information
superhx authored Nov 28, 2024
1 parent 2583617 commit fba35b6
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,13 @@ public class Topic {
public static final String TRANSACTION_STATE_TOPIC_NAME = "__transaction_state";
public static final String SHARE_GROUP_STATE_TOPIC_NAME = "__share_group_state";
public static final String CLUSTER_METADATA_TOPIC_NAME = "__cluster_metadata";

// AutoMQ inject start
public static final String AUTO_BALANCER_METRICS_TOPIC_NAME = "__auto_balancer_metrics";
public static final String TABLE_TOPIC_CONTROL_TOPIC_NAME = "__automq_table_control";
public static final String TABLE_TOPIC_DATA_TOPIC_NAME = "__automq_table_data";
// AutoMQ inject end

public static final TopicPartition CLUSTER_METADATA_TOPIC_PARTITION = new TopicPartition(
CLUSTER_METADATA_TOPIC_NAME,
0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.kafka.clients.ClientResponse
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.errors.InvalidTopicException
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME}
import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TABLE_TOPIC_CONTROL_TOPIC_NAME, TABLE_TOPIC_DATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME}
import org.apache.kafka.common.message.CreateTopicsRequestData
import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicConfig, CreatableTopicConfigCollection}
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
Expand Down Expand Up @@ -247,7 +247,7 @@ class DefaultAutoTopicCreationManager(
txnCoordinator.transactionTopicConfigs))

// AutoMQ inject start
case "__automq_table_control" => {
case TABLE_TOPIC_CONTROL_TOPIC_NAME => {
val configs = new Properties()
configs.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, 20 * 1024 * 1024)
new CreatableTopic()
Expand All @@ -256,7 +256,7 @@ class DefaultAutoTopicCreationManager(
.setReplicationFactor(1)
.setConfigs(convertToTopicConfigCollections(configs))
}
case "__automq_table_data" => {
case TABLE_TOPIC_DATA_TOPIC_NAME => {
val configs = new Properties()
configs.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, 20 * 1024 * 1024)
new CreatableTopic()
Expand Down
10 changes: 9 additions & 1 deletion core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ import org.apache.kafka.common.{Node, TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.coordinator.group.{Group, GroupCoordinator}
import org.apache.kafka.server.ClientMetricsManager
import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.common.{MetadataVersion}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_11_0_IV0, IBP_2_3_IV0}
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation, FetchParams, FetchPartitionData}
Expand Down Expand Up @@ -1288,6 +1288,14 @@ class KafkaApis(val requestChannel: RequestChannel,
val controllerMutationQuota = quotas.controllerMutation.newPermissiveQuotaFor(request)
autoTopicCreationManager.createTopics(nonExistingTopics, controllerMutationQuota, Some(request.context))
} else {
// AutoMQ inject start
for (tableTopic <- Set(Topic.TABLE_TOPIC_CONTROL_TOPIC_NAME, Topic.TABLE_TOPIC_DATA_TOPIC_NAME)) {
if (nonExistingTopics.contains(tableTopic)) {
val controllerMutationQuota = quotas.controllerMutation.newPermissiveQuotaFor(request)
autoTopicCreationManager.createTopics(Set(tableTopic), controllerMutationQuota, Some(request.context))
}
}
// AutoMQ inject end
nonExistingTopics.map { topic =>
val error = try {
Topic.validate(topic)
Expand Down

0 comments on commit fba35b6

Please sign in to comment.