From 5f682594ba95759113ec673ced554c842edd57d3 Mon Sep 17 00:00:00 2001 From: lyzs90 Date: Mon, 6 Jan 2025 00:01:57 +0800 Subject: [PATCH] feat(config): add s3 config validation --- .../main/scala/kafka/server/KafkaConfig.scala | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 1860a597a2..ffb029906d 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -761,6 +761,23 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) } } + // configuration sanity checks + val memoryType = if (s3StreamAllocatorPolicy.isDirect) "Direct buffer" else "Heap buffer" + val memoryLimit = if (s3StreamAllocatorPolicy.isDirect) { + PlatformDependent.maxDirectMemory() + } else { + Runtime.getRuntime.maxMemory() + } + if (s3BlockCacheSize > memoryLimit) { + throw new ConfigException(s"${AutoMQConfig.S3_BLOCK_CACHE_SIZE_CONFIG} of ${s3BlockCacheSize} exceeds ${memoryType} limit of ${memoryLimit}") + } + if (s3WALCacheSize > memoryLimit) { + throw new ConfigException(s"${AutoMQConfig.S3_WAL_CACHE_SIZE_CONFIG} of ${s3WALCacheSize} exceeds ${memoryType} limit of ${memoryLimit}") + } + if (s3WALUploadThreshold > s3WALCacheSize) { + throw new ConfigException(s"${AutoMQConfig.S3_WAL_UPLOAD_THRESHOLD_CONFIG} of ${s3WALUploadThreshold} exceeds ${AutoMQConfig.S3_WAL_CACHE_SIZE_CONFIG} of ${s3WALCacheSize}") + } + (s3WALCacheSize, s3BlockCacheSize, s3WALUploadThreshold) }