From d295c18829c5b92abf9114992f026a05afebc0dc Mon Sep 17 00:00:00 2001 From: jmaxwell Date: Wed, 31 Jul 2019 15:05:18 -0500 Subject: [PATCH 1/2] #665 Adds 2.2.1 and 2.3.0 support fixes #667 fixes #668 --- app/controllers/Cluster.scala | 25 ++++++++++--------- app/controllers/Logkafka.scala | 8 ++++++ app/controllers/Topic.scala | 8 +++++- app/kafka/manager/KafkaManager.scala | 17 +++++++++---- .../actor/cluster/ClusterManagerActor.scala | 3 ++- .../actor/cluster/KafkaStateActor.scala | 2 +- app/kafka/manager/model/model.scala | 11 +++++++- .../manager/utils/LogkafkaNewConfigs.scala | 4 ++- app/kafka/manager/utils/TopicConfigs.scala | 4 ++- build.sbt | 6 +++-- test/controller/api/TestKafkaStateCheck.scala | 5 +++- test/kafka/manager/TestKafkaManager.scala | 6 ++--- .../kafka/manager/TestKafkaManagerActor.scala | 17 +++++++------ .../manager/model/KafkaVersionTest.scala | 8 ++++-- .../manager/utils/TestClusterConfig.scala | 16 ++++++++++++ 15 files changed, 101 insertions(+), 39 deletions(-) diff --git a/app/controllers/Cluster.scala b/app/controllers/Cluster.scala index 5e002cca3..d793446a0 100644 --- a/app/controllers/Cluster.scala +++ b/app/controllers/Cluster.scala @@ -8,6 +8,7 @@ package controllers import features.{ApplicationFeatures, KMClusterManagerFeature} import kafka.manager.ApiError import kafka.manager.model._ +import kafka.manager.KafkaManager._ import models.FollowLink import models.form._ import models.navigation.Menus @@ -101,19 +102,19 @@ class Cluster (val cc: ControllerComponents, val kafkaManagerContext: KafkaManag , "tuning" -> optional( mapping( "brokerViewUpdatePeriodSeconds" -> optional(number(10, 1000)) - , "clusterManagerThreadPoolSize" -> optional(number(2, 1000)) + , "clusterManagerThreadPoolSize" -> optional(number(DefaultMinThreadPoolSize, 1000)) , "clusterManagerThreadPoolQueueSize" -> optional(number(10, 10000)) - , "kafkaCommandThreadPoolSize" -> optional(number(2, 1000)) + , "kafkaCommandThreadPoolSize" -> optional(number(DefaultMinThreadPoolSize, 1000)) , "kafkaCommandThreadPoolQueueSize" -> optional(number(10, 10000)) - , "logkafkaCommandThreadPoolSize" -> optional(number(2, 1000)) + , "logkafkaCommandThreadPoolSize" -> optional(number(DefaultMinThreadPoolSize, 1000)) , "logkafkaCommandThreadPoolQueueSize" -> optional(number(10, 10000)) , "logkafkaUpdatePeriodSeconds" -> optional(number(10, 1000)) , "partitionOffsetCacheTimeoutSecs" -> optional(number(5, 100)) - , "brokerViewThreadPoolSize" -> optional(number(2, 1000)) + , "brokerViewThreadPoolSize" -> optional(number(DefaultMinThreadPoolSize, 1000)) , "brokerViewThreadPoolQueueSize" -> optional(number(10, 10000)) - , "offsetCacheThreadPoolSize" -> optional(number(2, 1000)) + , "offsetCacheThreadPoolSize" -> optional(number(DefaultMinThreadPoolSize, 1000)) , "offsetCacheThreadPoolQueueSize" -> optional(number(10, 10000)) - , "kafkaAdminClientThreadPoolSize" -> optional(number(2, 1000)) + , "kafkaAdminClientThreadPoolSize" -> optional(number(DefaultMinThreadPoolSize, 1000)) , "kafkaAdminClientThreadPoolQueueSize" -> optional(number(10, 10000)) , "kafkaManagedOffsetMetadataCheckMillis" -> optional(number(10000, 120000)) , "kafkaManagedOffsetGroupCacheSize" -> optional(number(10000, 100000000)) @@ -145,19 +146,19 @@ class Cluster (val cc: ControllerComponents, val kafkaManagerContext: KafkaManag "tuning" -> optional( mapping( "brokerViewUpdatePeriodSeconds" -> optional(number(10, 1000)) - , "clusterManagerThreadPoolSize" -> optional(number(2, 1000)) + , "clusterManagerThreadPoolSize" -> optional(number(DefaultMinThreadPoolSize, 1000)) , "clusterManagerThreadPoolQueueSize" -> optional(number(10, 10000)) - , "kafkaCommandThreadPoolSize" -> optional(number(2, 1000)) + , "kafkaCommandThreadPoolSize" -> optional(number(DefaultMinThreadPoolSize, 1000)) , "kafkaCommandThreadPoolQueueSize" -> optional(number(10, 10000)) - , "logkafkaCommandThreadPoolSize" -> optional(number(2, 1000)) + , "logkafkaCommandThreadPoolSize" -> optional(number(DefaultMinThreadPoolSize, 1000)) , "logkafkaCommandThreadPoolQueueSize" -> optional(number(10, 10000)) , "logkafkaUpdatePeriodSeconds" -> optional(number(10, 1000)) , "partitionOffsetCacheTimeoutSecs" -> optional(number(5, 100)) - , "brokerViewThreadPoolSize" -> optional(number(2, 1000)) + , "brokerViewThreadPoolSize" -> optional(number(DefaultMinThreadPoolSize, 1000)) , "brokerViewThreadPoolQueueSize" -> optional(number(10, 10000)) - , "offsetCacheThreadPoolSize" -> optional(number(2, 1000)) + , "offsetCacheThreadPoolSize" -> optional(number(DefaultMinThreadPoolSize, 1000)) , "offsetCacheThreadPoolQueueSize" -> optional(number(10, 10000)) - , "kafkaAdminClientThreadPoolSize" -> optional(number(2, 1000)) + , "kafkaAdminClientThreadPoolSize" -> optional(number(DefaultMinThreadPoolSize, 1000)) , "kafkaAdminClientThreadPoolQueueSize" -> optional(number(10, 10000)) , "kafkaManagedOffsetMetadataCheckMillis" -> optional(number(10000, 120000)) , "kafkaManagedOffsetGroupCacheSize" -> optional(number(10000, 100000000)) diff --git a/app/controllers/Logkafka.scala b/app/controllers/Logkafka.scala index 441f93892..cdccdc175 100644 --- a/app/controllers/Logkafka.scala +++ b/app/controllers/Logkafka.scala @@ -97,6 +97,10 @@ class Logkafka (val cc: ControllerComponents, val kafkaManagerContext: KafkaMana LogkafkaNewConfigs.configMaps(Kafka_2_1_1).map{case(k,v) => LKConfig(k,Some(v))}.toList) val kafka_2_2_0_Default = CreateLogkafka("","", LogkafkaNewConfigs.configMaps(Kafka_2_2_0).map{case(k,v) => LKConfig(k,Some(v))}.toList) + val kafka_2_2_1_Default = CreateLogkafka("","", + LogkafkaNewConfigs.configMaps(Kafka_2_2_1).map{case(k,v) => LKConfig(k,Some(v))}.toList) + val kafka_2_3_0_Default = CreateLogkafka("","", + LogkafkaNewConfigs.configMaps(Kafka_2_3_0).map{case(k,v) => LKConfig(k,Some(v))}.toList) val defaultCreateForm = Form( mapping( @@ -157,6 +161,8 @@ class Logkafka (val cc: ControllerComponents, val kafkaManagerContext: KafkaMana case Kafka_2_1_0 => (defaultCreateForm.fill(kafka_2_1_0_Default), clusterContext) case Kafka_2_1_1 => (defaultCreateForm.fill(kafka_2_1_1_Default), clusterContext) case Kafka_2_2_0 => (defaultCreateForm.fill(kafka_2_2_0_Default), clusterContext) + case Kafka_2_2_1 => (defaultCreateForm.fill(kafka_2_2_1_Default), clusterContext) + case Kafka_2_3_0 => (defaultCreateForm.fill(kafka_2_3_0_Default), clusterContext) } } } @@ -261,6 +267,8 @@ class Logkafka (val cc: ControllerComponents, val kafkaManagerContext: KafkaMana case Kafka_2_1_0 => LogkafkaNewConfigs.configNames(Kafka_2_1_0).map(n => (n,LKConfig(n,None))).toMap case Kafka_2_1_1 => LogkafkaNewConfigs.configNames(Kafka_2_1_1).map(n => (n,LKConfig(n,None))).toMap case Kafka_2_2_0 => LogkafkaNewConfigs.configNames(Kafka_2_2_0).map(n => (n,LKConfig(n,None))).toMap + case Kafka_2_2_1 => LogkafkaNewConfigs.configNames(Kafka_2_2_1).map(n => (n,LKConfig(n,None))).toMap + case Kafka_2_3_0 => LogkafkaNewConfigs.configNames(Kafka_2_3_0).map(n => (n,LKConfig(n,None))).toMap } val identityOption = li.identityMap.get(log_path) if (identityOption.isDefined) { diff --git a/app/controllers/Topic.scala b/app/controllers/Topic.scala index 82dfd7dc3..d16990fe7 100644 --- a/app/controllers/Topic.scala +++ b/app/controllers/Topic.scala @@ -67,6 +67,8 @@ class Topic (val cc: ControllerComponents, val kafkaManagerContext: KafkaManager val kafka_2_1_0_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_2_1_0).map{ case (n, h) => TConfig(n,None,Option(h))}.toList) val kafka_2_1_1_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_2_1_1).map{ case (n, h) => TConfig(n,None,Option(h))}.toList) val kafka_2_2_0_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_2_2_0).map{ case (n, h) => TConfig(n,None,Option(h))}.toList) + val kafka_2_2_1_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_2_2_1).map{ case (n, h) => TConfig(n,None,Option(h))}.toList) + val kafka_2_3_0_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_2_3_0).map{ case (n, h) => TConfig(n,None,Option(h))}.toList) val defaultCreateForm = Form( mapping( @@ -168,7 +170,9 @@ class Topic (val cc: ControllerComponents, val kafkaManagerContext: KafkaManager case Kafka_2_0_0 => (defaultCreateForm.fill(kafka_2_0_0_Default), clusterContext) case Kafka_2_1_0 => (defaultCreateForm.fill(kafka_2_1_0_Default), clusterContext) case Kafka_2_1_1 => (defaultCreateForm.fill(kafka_2_1_1_Default), clusterContext) - case Kafka_2_2_0 => (defaultCreateForm.fill(kafka_2_1_1_Default), clusterContext) + case Kafka_2_2_0 => (defaultCreateForm.fill(kafka_2_2_0_Default), clusterContext) + case Kafka_2_2_1 => (defaultCreateForm.fill(kafka_2_2_1_Default), clusterContext) + case Kafka_2_3_0 => (defaultCreateForm.fill(kafka_2_3_0_Default), clusterContext) } } } @@ -419,6 +423,8 @@ class Topic (val cc: ControllerComponents, val kafkaManagerContext: KafkaManager case Kafka_2_1_0 => TopicConfigs.configNamesAndDoc(Kafka_2_1_0).map { case (n, h) => (n,TConfig(n,None, Option(h))) } case Kafka_2_1_1 => TopicConfigs.configNamesAndDoc(Kafka_2_1_1).map { case (n, h) => (n,TConfig(n,None, Option(h))) } case Kafka_2_2_0 => TopicConfigs.configNamesAndDoc(Kafka_2_2_0).map { case (n, h) => (n,TConfig(n,None, Option(h))) } + case Kafka_2_2_1 => TopicConfigs.configNamesAndDoc(Kafka_2_2_1).map { case (n, h) => (n,TConfig(n,None, Option(h))) } + case Kafka_2_3_0 => TopicConfigs.configNamesAndDoc(Kafka_2_3_0).map { case (n, h) => (n,TConfig(n,None, Option(h))) } } val updatedConfigMap = ti.config.toMap val updatedConfigList = defaultConfigs.map { diff --git a/app/kafka/manager/KafkaManager.scala b/app/kafka/manager/KafkaManager.scala index 7e563c3d2..9fe036f76 100644 --- a/app/kafka/manager/KafkaManager.scala +++ b/app/kafka/manager/KafkaManager.scala @@ -7,6 +7,7 @@ package kafka.manager import java.util.Properties import java.util.concurrent.{LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit} +import java.lang.Runtime.{getRuntime, _} import akka.actor.{ActorPath, ActorSystem, Props} import akka.util.Timeout @@ -15,7 +16,7 @@ import grizzled.slf4j.Logging import kafka.manager.actor.{KafkaManagerActor, KafkaManagerActorConfig} import kafka.manager.base.LongRunningPoolConfig import kafka.manager.model._ -import ActorModel._ +import kafka.manager.model.ActorModel._ import kafka.manager.actor.cluster.KafkaManagedOffsetCacheConfig import kafka.manager.utils.UtilException import kafka.manager.utils.zero81.ReassignPartitionErrors.ReplicationOutOfSync @@ -59,6 +60,12 @@ object ApiError extends Logging { object KafkaManager { + val AvailableProcessors: Int = getRuntime().availableProcessors(); + val DefaultMinThreadPoolSize: Int = 2; + val DefaultMinThreadPoolSizeAsString: String = DefaultMinThreadPoolSize.toString; + val DefaultThreadPoolSize: Int = if (AvailableProcessors < DefaultMinThreadPoolSize) DefaultMinThreadPoolSize else AvailableProcessors; + val DefaultThreadPoolSizeAsString: String = DefaultThreadPoolSize.toString; + val ConsumerPropertiesFile = "kafka-manager.consumer.properties.file" val BaseZkPath = "kafka-manager.base-zk-path" val PinnedDispatchName = "kafka-manager.pinned-dispatcher-name" @@ -94,18 +101,18 @@ object KafkaManager { DeleteClusterUpdateSeconds -> "10", DeletionBatchSize -> "2", MaxQueueSize -> "100", - ThreadPoolSize -> "2", + ThreadPoolSize -> DefaultMinThreadPoolSizeAsString, MutexTimeoutMillis -> "4000", StartDelayMillis -> "1000", ApiTimeoutMillis -> "5000", ClusterActorsAskTimeoutMillis -> "2000", PartitionOffsetCacheTimeoutSecs -> "5", SimpleConsumerSocketTimeoutMillis -> "10000", - BrokerViewThreadPoolSize -> Runtime.getRuntime.availableProcessors().toString, + BrokerViewThreadPoolSize -> DefaultThreadPoolSizeAsString, BrokerViewMaxQueueSize -> "1000", - OffsetCacheThreadPoolSize -> Runtime.getRuntime.availableProcessors().toString, + OffsetCacheThreadPoolSize -> DefaultThreadPoolSizeAsString, OffsetCacheMaxQueueSize -> "1000", - KafkaAdminClientThreadPoolSize -> Runtime.getRuntime.availableProcessors().toString, + KafkaAdminClientThreadPoolSize -> DefaultThreadPoolSizeAsString, KafkaAdminClientMaxQueueSize -> "1000", KafkaManagedOffsetMetadataCheckMillis -> KafkaManagedOffsetCacheConfig.defaultGroupMemberMetadataCheckMillis.toString, KafkaManagedOffsetGroupCacheSize -> KafkaManagedOffsetCacheConfig.defaultGroupTopicPartitionOffsetMaxSize.toString, diff --git a/app/kafka/manager/actor/cluster/ClusterManagerActor.scala b/app/kafka/manager/actor/cluster/ClusterManagerActor.scala index 252161dd1..e1b5d75a2 100644 --- a/app/kafka/manager/actor/cluster/ClusterManagerActor.scala +++ b/app/kafka/manager/actor/cluster/ClusterManagerActor.scala @@ -49,6 +49,7 @@ object ClusterManagerActor { } import kafka.manager.model.ActorModel._ +import kafka.manager.KafkaManager._ case class ClusterManagerActorConfig(pinnedDispatcherName: String , baseZkPath : String @@ -168,7 +169,7 @@ class ClusterManagerActor(cmConfig: ClusterManagerActorConfig) LogkafkaViewCacheActorConfig( logkafkaStateActor.get, clusterContext, - LongRunningPoolConfig(Runtime.getRuntime.availableProcessors(), 1000), + LongRunningPoolConfig(AvailableProcessors, 1000), FiniteDuration(clusterConfig.tuning.get.logkafkaUpdatePeriodSeconds.get, TimeUnit.SECONDS) ) ) diff --git a/app/kafka/manager/actor/cluster/KafkaStateActor.scala b/app/kafka/manager/actor/cluster/KafkaStateActor.scala index 4fcd9116b..20515f4a8 100644 --- a/app/kafka/manager/actor/cluster/KafkaStateActor.scala +++ b/app/kafka/manager/actor/cluster/KafkaStateActor.scala @@ -176,7 +176,7 @@ class KafkaAdminClient(context: => ActorContext, adminClientActorPath: ActorPath object KafkaManagedOffsetCache { - val supportedVersions: Set[KafkaVersion] = Set(Kafka_0_8_2_0, Kafka_0_8_2_1, Kafka_0_8_2_2, Kafka_0_9_0_0, Kafka_0_9_0_1, Kafka_0_10_0_0, Kafka_0_10_0_1, Kafka_0_10_1_0, Kafka_0_10_1_1, Kafka_0_10_2_0, Kafka_0_10_2_1, Kafka_0_11_0_0, Kafka_0_11_0_2, Kafka_1_0_0, Kafka_1_0_1, Kafka_1_1_0, Kafka_1_1_1, Kafka_2_0_0, Kafka_2_1_0, Kafka_2_1_1, Kafka_2_2_0) + val supportedVersions: Set[KafkaVersion] = Set(Kafka_0_8_2_0, Kafka_0_8_2_1, Kafka_0_8_2_2, Kafka_0_9_0_0, Kafka_0_9_0_1, Kafka_0_10_0_0, Kafka_0_10_0_1, Kafka_0_10_1_0, Kafka_0_10_1_1, Kafka_0_10_2_0, Kafka_0_10_2_1, Kafka_0_11_0_0, Kafka_0_11_0_2, Kafka_1_0_0, Kafka_1_0_1, Kafka_1_1_0, Kafka_1_1_1, Kafka_2_0_0, Kafka_2_1_0, Kafka_2_1_1, Kafka_2_2_0, Kafka_2_2_1, Kafka_2_3_0) val ConsumerOffsetTopic = "__consumer_offsets" def isSupported(version: KafkaVersion) : Boolean = { diff --git a/app/kafka/manager/model/model.scala b/app/kafka/manager/model/model.scala index bf8e996bb..c89e16f6c 100644 --- a/app/kafka/manager/model/model.scala +++ b/app/kafka/manager/model/model.scala @@ -100,6 +100,14 @@ case object Kafka_2_2_0 extends KafkaVersion { override def toString = "2.2.0" } +case object Kafka_2_2_1 extends KafkaVersion { + override def toString = "2.2.1" +} + +case object Kafka_2_3_0 extends KafkaVersion { + override def toString = "2.3.0" +} + object KafkaVersion { val supportedVersions: Map[String,KafkaVersion] = Map( "0.8.1.1" -> Kafka_0_8_1_1, @@ -124,7 +132,8 @@ object KafkaVersion { "2.0.0" -> Kafka_2_0_0, "2.1.0" -> Kafka_2_1_0, "2.1.1" -> Kafka_2_1_1, - "2.2.0" -> Kafka_2_2_0 + "2.2.0" -> Kafka_2_2_0, + "2.3.0" -> Kafka_2_3_0 ) val formSelectList : IndexedSeq[(String,String)] = supportedVersions.toIndexedSeq.filterNot(_._1.contains("beta")).map(t => (t._1,t._2.toString)).sortWith((a, b) => sortVersion(a._1, b._1)) diff --git a/app/kafka/manager/utils/LogkafkaNewConfigs.scala b/app/kafka/manager/utils/LogkafkaNewConfigs.scala index 0c20f41a5..ccabc2821 100644 --- a/app/kafka/manager/utils/LogkafkaNewConfigs.scala +++ b/app/kafka/manager/utils/LogkafkaNewConfigs.scala @@ -39,7 +39,9 @@ object LogkafkaNewConfigs { Kafka_2_0_0 -> logkafka82.LogConfig, Kafka_2_1_0 -> logkafka82.LogConfig, Kafka_2_1_1 -> logkafka82.LogConfig, - Kafka_2_2_0 -> logkafka82.LogConfig + Kafka_2_2_0 -> logkafka82.LogConfig, + Kafka_2_2_1 -> logkafka82.LogConfig, + Kafka_2_3_0 -> logkafka82.LogConfig, ) def configNames(version: KafkaVersion) : Set[String] = { diff --git a/app/kafka/manager/utils/TopicConfigs.scala b/app/kafka/manager/utils/TopicConfigs.scala index b8032061a..6979ad589 100644 --- a/app/kafka/manager/utils/TopicConfigs.scala +++ b/app/kafka/manager/utils/TopicConfigs.scala @@ -42,7 +42,9 @@ object TopicConfigs { Kafka_2_0_0 -> two00.LogConfig, Kafka_2_1_0 -> two00.LogConfig, Kafka_2_1_1 -> two00.LogConfig, - Kafka_2_2_0 -> two00.LogConfig + Kafka_2_2_0 -> two00.LogConfig, + Kafka_2_2_1 -> two00.LogConfig, + Kafka_2_3_0 -> two00.LogConfig ) def configNames(version: KafkaVersion): Seq[String] = { diff --git a/build.sbt b/build.sbt index 42b8a98a3..6897fc380 100644 --- a/build.sbt +++ b/build.sbt @@ -9,6 +9,8 @@ version := "2.0.0.2" scalaVersion := "2.12.8" +val kafkaVersion = "2.2.1" + scalacOptions ++= Seq("-Xlint:-missing-interpolator","-Xfatal-warnings","-deprecation","-feature","-language:implicitConversions","-language:postfixOps","-Xmax-classfile-name","240") // From https://www.playframework.com/documentation/2.3.x/ProductionDist @@ -37,8 +39,8 @@ libraryDependencies ++= Seq( "org.slf4j" % "log4j-over-slf4j" % "1.7.25", "com.adrianhurt" %% "play-bootstrap" % "1.4-P26-B4" exclude("com.typesafe.play", "*"), "org.clapper" %% "grizzled-slf4j" % "1.3.3", - "org.apache.kafka" %% "kafka" % "2.2.0" exclude("log4j","log4j") exclude("org.slf4j", "slf4j-log4j12") force(), - "org.apache.kafka" % "kafka-streams" % "2.2.0", + "org.apache.kafka" %% "kafka" % kafkaVersion exclude("log4j","log4j") exclude("org.slf4j", "slf4j-log4j12") force(), + "org.apache.kafka" % "kafka-streams" % kafkaVersion, "com.beachape" %% "enumeratum" % "1.5.13", "com.github.ben-manes.caffeine" % "caffeine" % "2.6.2", "com.typesafe.play" %% "play-logback" % "2.6.21", diff --git a/test/controller/api/TestKafkaStateCheck.scala b/test/controller/api/TestKafkaStateCheck.scala index 672fda694..58f124374 100644 --- a/test/controller/api/TestKafkaStateCheck.scala +++ b/test/controller/api/TestKafkaStateCheck.scala @@ -29,6 +29,7 @@ import scala.concurrent.{Await, ExecutionContext} import scala.util.Try class TestKafkaStateCheck extends CuratorAwareTest with KafkaServerInTest with MockitoSugar { + val KafkaVersion = "2.3.0" private[this] val broker = new SeededBroker("controller-api-test", 4) override val kafkaServerZkPath = broker.getZookeeperConnectionString private[this] val duration = FiniteDuration(10, SECONDS) @@ -76,9 +77,11 @@ class TestKafkaStateCheck extends CuratorAwareTest with KafkaServerInTest with M super.afterAll() } + + private[this] def createCluster() = { val future = kafkaManagerContext.get.getKafkaManager.addCluster( - testClusterName, "2.2.0", kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(kafkaManagerContext.get.getKafkaManager.defaultTuning), securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None + testClusterName, KafkaVersion, kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(kafkaManagerContext.get.getKafkaManager.defaultTuning), securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None ) val result = Await.result(future, duration) result.toEither.left.foreach(apiError => sys.error(apiError.msg)) diff --git a/test/kafka/manager/TestKafkaManager.scala b/test/kafka/manager/TestKafkaManager.scala index b4da3dd01..3ef4e0c1e 100644 --- a/test/kafka/manager/TestKafkaManager.scala +++ b/test/kafka/manager/TestKafkaManager.scala @@ -125,7 +125,7 @@ class TestKafkaManager extends CuratorAwareTest with BaseTest { } test("add cluster") { - val future = kafkaManager.addCluster("dev","2.2.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(kafkaManager.defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None) + val future = kafkaManager.addCluster("dev","2.3.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(kafkaManager.defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None) val result = Await.result(future,duration) assert(result.isRight === true) Thread.sleep(2000) @@ -392,7 +392,7 @@ class TestKafkaManager extends CuratorAwareTest with BaseTest { } test("update cluster zkhost") { - val future = kafkaManager.updateCluster("dev","2.2.0",testServer.getConnectString, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxSsl = false, jmxPass = None, tuning = Option(defaultTuning), securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None) + val future = kafkaManager.updateCluster("dev","2.3.0",testServer.getConnectString, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxSsl = false, jmxPass = None, tuning = Option(defaultTuning), securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None) val result = Await.result(future,duration) assert(result.isRight === true) @@ -449,7 +449,7 @@ class TestKafkaManager extends CuratorAwareTest with BaseTest { } test("update cluster logkafka enabled and activeOffsetCache enabled") { - val future = kafkaManager.updateCluster("dev","2.2.0",testServer.getConnectString, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None) + val future = kafkaManager.updateCluster("dev","2.3.0",testServer.getConnectString, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol = "PLAINTEXT", saslMechanism = None, jaasConfig = None) val result = Await.result(future,duration) assert(result.isRight === true) diff --git a/test/kafka/manager/TestKafkaManagerActor.scala b/test/kafka/manager/TestKafkaManagerActor.scala index b093ebbf8..1824ad568 100644 --- a/test/kafka/manager/TestKafkaManagerActor.scala +++ b/test/kafka/manager/TestKafkaManagerActor.scala @@ -27,7 +27,7 @@ import scala.util.Try * @author hiral */ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest { - + private[this] val KafkaVersion: String = "2.3.0" private[this] val akkaConfig: Properties = new Properties() akkaConfig.setProperty("pinned-dispatcher.type","PinnedDispatcher") akkaConfig.setProperty("pinned-dispatcher.executor","thread-pool-executor") @@ -68,8 +68,9 @@ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest { fn(result) } + test("add cluster") { - val cc = ClusterConfig("dev","2.2.0",testServer.getConnectString, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None) + val cc = ClusterConfig("dev",KafkaVersion,testServer.getConnectString, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None) withKafkaManagerActor(KMAddCluster(cc)) { result: KMCommandResult => result.result.get Thread.sleep(1000) @@ -80,7 +81,7 @@ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest { } test("update cluster zkhost") { - val cc2 = ClusterConfig("dev","2.2.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None) + val cc2 = ClusterConfig("dev",KafkaVersion,kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None) withKafkaManagerActor(KMUpdateCluster(cc2)) { result: KMCommandResult => result.result.get Thread.sleep(3000) @@ -112,7 +113,7 @@ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest { } test("update cluster version") { - val cc2 = ClusterConfig("dev","2.2.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None) + val cc2 = ClusterConfig("dev",KafkaVersion,kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None) withKafkaManagerActor(KMUpdateCluster(cc2)) { result: KMCommandResult => result.result.get Thread.sleep(3000) @@ -139,7 +140,7 @@ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest { println(result) result.msg.contains("dev") } - val cc2 = ClusterConfig("dev","2.2.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None) + val cc2 = ClusterConfig("dev",KafkaVersion,kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None) withKafkaManagerActor(KMAddCluster(cc2)) { result: KMCommandResult => result.result.get Thread.sleep(1000) @@ -156,7 +157,7 @@ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest { } test("update cluster logkafka enabled") { - val cc2 = ClusterConfig("dev","2.2.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None) + val cc2 = ClusterConfig("dev",KafkaVersion,kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None) withKafkaManagerActor(KMUpdateCluster(cc2)) { result: KMCommandResult => result.result.get Thread.sleep(3000) @@ -168,7 +169,7 @@ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest { test("update cluster tuning") { val newTuning = getClusterTuning(3, 101, 11, 10000, 10000, 1) - val cc2 = ClusterConfig("dev","2.2.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, + val cc2 = ClusterConfig("dev",KafkaVersion,kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(newTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None ) withKafkaManagerActor(KMUpdateCluster(cc2)) { result: KMCommandResult => @@ -185,7 +186,7 @@ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest { } test("update cluster security protocol") { - val cc2 = ClusterConfig("dev","2.2.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="SASL_PLAINTEXT", saslMechanism = Option("PLAIN"), jaasConfig = Option("blah")) + val cc2 = ClusterConfig("dev",KafkaVersion,kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="SASL_PLAINTEXT", saslMechanism = Option("PLAIN"), jaasConfig = Option("blah")) withKafkaManagerActor(KMUpdateCluster(cc2)) { result: KMCommandResult => result.result.get Thread.sleep(3000) diff --git a/test/kafka/manager/model/KafkaVersionTest.scala b/test/kafka/manager/model/KafkaVersionTest.scala index 220ebf3de..4b1f4ce79 100644 --- a/test/kafka/manager/model/KafkaVersionTest.scala +++ b/test/kafka/manager/model/KafkaVersionTest.scala @@ -34,7 +34,9 @@ class KafkaVersionTest extends FunSuite { "2.0.0" -> Kafka_2_0_0, "2.1.0" -> Kafka_2_1_0, "2.1.1" -> Kafka_2_1_1, - "2.2.0" -> Kafka_2_2_0 + "2.2.0" -> Kafka_2_2_0, + "2.2.1" -> Kafka_2_2_1, + "2.3.0" -> Kafka_2_3_0 ) test("apply method: supported version.") { @@ -75,7 +77,9 @@ class KafkaVersionTest extends FunSuite { ("2.0.0","2.0.0"), ("2.1.0","2.1.0"), ("2.1.1","2.1.1"), - ("2.2.0","2.2.0") + ("2.2.0","2.2.0"), + ("2.2.1","2.2.1"), + ("2.3.0","2.3.0") ) assertResult(expected)(KafkaVersion.formSelectList) } diff --git a/test/kafka/manager/utils/TestClusterConfig.scala b/test/kafka/manager/utils/TestClusterConfig.scala index 082d40667..9bf0c7897 100644 --- a/test/kafka/manager/utils/TestClusterConfig.scala +++ b/test/kafka/manager/utils/TestClusterConfig.scala @@ -229,4 +229,20 @@ class TestClusterConfig extends FunSuite with Matchers { assert(cc == deserialize.get) } + test("serialize and deserialize 2.2.1") { + val cc = ClusterConfig("qa", "2.2.1", "localhost:2181", jmxEnabled = false, pollConsumers = true, filterConsumers = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "SASL_PLAINTEXT", saslMechanism = Option("PLAIN"), jaasConfig = Option("blah")) + val serialize: String = ClusterConfig.serialize(cc) + val deserialize = ClusterConfig.deserialize(serialize) + assert(deserialize.isSuccess === true) + assert(cc == deserialize.get) + } + + test("serialize and deserialize 2.3.0") { + val cc = ClusterConfig("qa", "2.3.0", "localhost:2181", jmxEnabled = false, pollConsumers = true, filterConsumers = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "SASL_PLAINTEXT", saslMechanism = Option("PLAIN"), jaasConfig = Option("blah")) + val serialize: String = ClusterConfig.serialize(cc) + val deserialize = ClusterConfig.deserialize(serialize) + assert(deserialize.isSuccess === true) + assert(cc == deserialize.get) + } + } From a2067a8619c1f4dd55c0e2ed122edbb50905fa95 Mon Sep 17 00:00:00 2001 From: jmaxwell Date: Thu, 8 Aug 2019 10:36:21 -0500 Subject: [PATCH 2/2] #665 missed model.scala for 2.2.1 --- app/kafka/manager/model/model.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/app/kafka/manager/model/model.scala b/app/kafka/manager/model/model.scala index c89e16f6c..aedb6c766 100644 --- a/app/kafka/manager/model/model.scala +++ b/app/kafka/manager/model/model.scala @@ -133,6 +133,7 @@ object KafkaVersion { "2.1.0" -> Kafka_2_1_0, "2.1.1" -> Kafka_2_1_1, "2.2.0" -> Kafka_2_2_0, + "2.2.1" -> Kafka_2_2_1, "2.3.0" -> Kafka_2_3_0 )