Skip to content

Commit

Permalink
feat(config): add s3 config validation
Browse files Browse the repository at this point in the history
  • Loading branch information
lyzs90 committed Jan 5, 2025
1 parent 552fb77 commit 5f68259
Showing 1 changed file with 17 additions and 0 deletions.
17 changes: 17 additions & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,23 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
}
}

// configuration sanity checks
val memoryType = if (s3StreamAllocatorPolicy.isDirect) "Direct buffer" else "Heap buffer"
val memoryLimit = if (s3StreamAllocatorPolicy.isDirect) {
PlatformDependent.maxDirectMemory()
} else {
Runtime.getRuntime.maxMemory()
}
if (s3BlockCacheSize > memoryLimit) {
throw new ConfigException(s"${AutoMQConfig.S3_BLOCK_CACHE_SIZE_CONFIG} of ${s3BlockCacheSize} exceeds ${memoryType} limit of ${memoryLimit}")
}
if (s3WALCacheSize > memoryLimit) {
throw new ConfigException(s"${AutoMQConfig.S3_WAL_CACHE_SIZE_CONFIG} of ${s3WALCacheSize} exceeds ${memoryType} limit of ${memoryLimit}")
}
if (s3WALUploadThreshold > s3WALCacheSize) {
throw new ConfigException(s"${AutoMQConfig.S3_WAL_UPLOAD_THRESHOLD_CONFIG} of ${s3WALUploadThreshold} exceeds ${AutoMQConfig.S3_WAL_CACHE_SIZE_CONFIG} of ${s3WALCacheSize}")
}

(s3WALCacheSize, s3BlockCacheSize, s3WALUploadThreshold)
}

Expand Down

0 comments on commit 5f68259

Please sign in to comment.