Skip to content

Commit

Permalink
chore(table): set table max.message.bytes to 20MiB (#2182)
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <hanxvdovehx@gmail.com>
  • Loading branch information
superhx authored Nov 26, 2024
1 parent 57d8e5f commit 64b3865
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 3 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/log/streamaspect/MetaStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ private Map<String, Object> getValidMetaMap() {
metaMap.put(key, ElasticLeaderEpochCheckpointMeta.decode(value.value()));
break;
default:
LOGGER.error("{} streamId {}: unknown meta key: {}", logIdent, streamId(), key);
metaMap.put(key, value.value().duplicate());
}
});
return metaMap;
Expand Down
13 changes: 11 additions & 2 deletions core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import kafka.controller.KafkaController
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.utils.Logging
import org.apache.kafka.clients.ClientResponse
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.errors.InvalidTopicException
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME}
Expand Down Expand Up @@ -246,16 +247,24 @@ class DefaultAutoTopicCreationManager(
txnCoordinator.transactionTopicConfigs))

// AutoMQ inject start
case "__automq_table_control" =>
case "__automq_table_control" => {
val configs = new Properties()
configs.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, 20 * 1024 * 1024)
new CreatableTopic()
.setName(topic)
.setNumPartitions(1)
.setReplicationFactor(1)
case "__automq_table_data" =>
.setConfigs(convertToTopicConfigCollections(configs))
}
case "__automq_table_data" => {
val configs = new Properties()
configs.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, 20 * 1024 * 1024)
new CreatableTopic()
.setName(topic)
.setNumPartitions(50)
.setReplicationFactor(1)
.setConfigs(convertToTopicConfigCollections(configs))
}
// AutoMQ inject end

case topicName =>
Expand Down

0 comments on commit 64b3865

Please sign in to comment.