From d8057ba7a9635f9db6d078073a897314b7774041 Mon Sep 17 00:00:00 2001 From: yea-hb Date: Tue, 17 Nov 2020 17:43:35 +0900 Subject: [PATCH] #monitoring add monitoring api for broker status --- app/controllers/api/KafkaStateCheck.scala | 77 ++++++++++++++++++++++- conf/routes | 4 ++ 2 files changed, 80 insertions(+), 1 deletion(-) diff --git a/app/controllers/api/KafkaStateCheck.scala b/app/controllers/api/KafkaStateCheck.scala index c7007c2ee..8e01f36ae 100644 --- a/app/controllers/api/KafkaStateCheck.scala +++ b/app/controllers/api/KafkaStateCheck.scala @@ -141,7 +141,7 @@ class KafkaStateCheck (val cc: ControllerComponents, val kafkaManagerContext: Ka } Future.sequence(cosumdTopicSummary).map(_.toMap) } - + def consumersSummaryAction(cluster: String) = Action.async { implicit request:RequestHeader => implicit val formats = org.json4s.DefaultFormats kafkaManager.getConsumerListExtended(cluster).map { errorOrConsumersSummary => @@ -161,4 +161,79 @@ class KafkaStateCheck (val cc: ControllerComponents, val kafkaManagerContext: Ka } } + def monitoring(c: String) = Action.async { implicit request:RequestHeader => + kafkaManager.getBrokerList(c).map { errorOrBrokerList => + errorOrBrokerList.fold( + error => BadRequest(Json.obj("msg" -> error.msg)), + brokerList => Ok(Json.obj("messagesInSecMean" -> brokerList.combinedMetric.map(_.messagesInPerSec.formatMeanRate), + "messagesInSecMean" -> brokerList.combinedMetric.map(_.messagesInPerSec.formatMeanRate), + "messagesInSecOneMin" -> brokerList.combinedMetric.map(_.messagesInPerSec.formatOneMinuteRate), + "messagesInSecFiveMin" -> brokerList.combinedMetric.map(_.messagesInPerSec.formatFiveMinuteRate), + "messagesInSecFifteenMin" -> brokerList.combinedMetric.map(_.messagesInPerSec.formatFifteenMinuteRate), + "bytesInSec" -> brokerList.combinedMetric.map(_.bytesInPerSec.formatMeanRate), + "bytesInSecOneMin" -> brokerList.combinedMetric.map(_.bytesInPerSec.formatOneMinuteRate), + "bytesInSecFiveMin" -> brokerList.combinedMetric.map(_.bytesInPerSec.formatFiveMinuteRate), + "bytesInSecFifteenMin" -> brokerList.combinedMetric.map(_.bytesInPerSec.formatFifteenMinuteRate), + "bytesOutSec" -> brokerList.combinedMetric.map(_.bytesOutPerSec.formatMeanRate), + "bytesOutSecOneMin" -> brokerList.combinedMetric.map(_.bytesOutPerSec.formatOneMinuteRate), + "bytesOutSecFiveMin" -> brokerList.combinedMetric.map(_.bytesOutPerSec.formatFiveMinuteRate), + "bytesOutSecFifteenMin" -> brokerList.combinedMetric.map(_.bytesOutPerSec.formatFifteenMinuteRate), + "bytesRejectSec" -> brokerList.combinedMetric.map(_.bytesRejectedPerSec.formatMeanRate), + "bytesRejectSecOneMin" -> brokerList.combinedMetric.map(_.bytesRejectedPerSec.formatOneMinuteRate), + "bytesRejectSecFiveMin" -> brokerList.combinedMetric.map(_.bytesRejectedPerSec.formatFiveMinuteRate), + "bytesRejectSecFifteenMin" -> brokerList.combinedMetric.map(_.bytesRejectedPerSec.formatFifteenMinuteRate), + "failedFetchRequestSec" -> brokerList.combinedMetric.map(_.failedFetchRequestsPerSec.formatMeanRate), + "failedFetchRequestSecOneMin" -> brokerList.combinedMetric.map(_.failedFetchRequestsPerSec.formatOneMinuteRate), + "failedFetchRequestSecFiveMin" -> brokerList.combinedMetric.map(_.failedFetchRequestsPerSec.formatFiveMinuteRate), + "failedFetchRequestSecFifteenMin" -> brokerList.combinedMetric.map(_.failedFetchRequestsPerSec.formatFifteenMinuteRate), + "failedProduceRequestSec" -> brokerList.combinedMetric.map(_.failedProduceRequestsPerSec.formatMeanRate), + "failedProduceRequestSecOneMin" -> brokerList.combinedMetric.map(_.failedProduceRequestsPerSec.formatOneMinuteRate), + "failedProduceRequestSecFiveMin" -> brokerList.combinedMetric.map(_.failedProduceRequestsPerSec.formatFiveMinuteRate), + "failedProduceRequestSecFifteenMin" -> brokerList.combinedMetric.map(_.failedProduceRequestsPerSec.formatFifteenMinuteRate)) + ).withHeaders("X-Frame-Options" -> "SAMEORIGIN") + ) + } + } + + def brokermonitoring(c: String, b: Int) = Action.async { implicit request:RequestHeader => + implicit val formats = org.json4s.DefaultFormats + kafkaManager.getBrokerView(c,b).map { errorOrBrokerList => + errorOrBrokerList.fold( + error => BadRequest(Json.obj("msg" -> error.msg)), + brokerView => Ok(Json.obj("numTopics" -> brokerView.numTopics, + "numPartitions" -> brokerView.numPartitions, + "numPartitionsAsLeader" -> brokerView.numPartitionsAsLeader, + "perMessages" -> brokerView.stats.map(_.perMessages), + "perIncoming" -> brokerView.stats.map(_.perIncoming), + "perOutgoing" -> brokerView.stats.map(_.perOutgoing), + "messagesCount" -> brokerView.messagesPerSecCountHistory.map(v => v.last.count), + "messagesInSecMean" -> brokerView.metrics.map(_.messagesInPerSec.formatMeanRate), + "messagesInSecOneMin" -> brokerView.metrics.map(_.messagesInPerSec.formatOneMinuteRate), + "messagesInSecFiveMin" -> brokerView.metrics.map(_.messagesInPerSec.formatFiveMinuteRate), + "messagesInSecFifteenMin" -> brokerView.metrics.map(_.messagesInPerSec.formatFifteenMinuteRate), + "bytesInSec" -> brokerView.metrics.map(_.bytesInPerSec.formatMeanRate), + "bytesInSecOneMin" -> brokerView.metrics.map(_.bytesInPerSec.formatOneMinuteRate), + "bytesInSecFiveMin" -> brokerView.metrics.map(_.bytesInPerSec.formatFiveMinuteRate), + "bytesInSecFifteenMin" -> brokerView.metrics.map(_.bytesInPerSec.formatFifteenMinuteRate), + "bytesOutSec" -> brokerView.metrics.map(_.bytesOutPerSec.formatMeanRate), + "bytesOutSecOneMin" -> brokerView.metrics.map(_.bytesOutPerSec.formatOneMinuteRate), + "bytesOutSecFiveMin" -> brokerView.metrics.map(_.bytesOutPerSec.formatFiveMinuteRate), + "bytesOutSecFifteenMin" -> brokerView.metrics.map(_.bytesOutPerSec.formatFifteenMinuteRate), + "bytesRejectSec" -> brokerView.metrics.map(_.bytesRejectedPerSec.formatMeanRate), + "bytesRejectSecOneMin" -> brokerView.metrics.map(_.bytesRejectedPerSec.formatOneMinuteRate), + "bytesRejectSecFiveMin" -> brokerView.metrics.map(_.bytesRejectedPerSec.formatFiveMinuteRate), + "bytesRejectSecFifteenMin" -> brokerView.metrics.map(_.bytesRejectedPerSec.formatFifteenMinuteRate), + "failedFetchRequestSec" -> brokerView.metrics.map(_.failedFetchRequestsPerSec.formatMeanRate), + "failedFetchRequestSecOneMin" -> brokerView.metrics.map(_.failedFetchRequestsPerSec.formatOneMinuteRate), + "failedFetchRequestSecFiveMin" -> brokerView.metrics.map(_.failedFetchRequestsPerSec.formatFiveMinuteRate), + "failedFetchRequestSecFifteenMin" -> brokerView.metrics.map(_.failedFetchRequestsPerSec.formatFifteenMinuteRate), + "failedProduceRequestSec" -> brokerView.metrics.map(_.failedProduceRequestsPerSec.formatMeanRate), + "failedProduceRequestSecOneMin" -> brokerView.metrics.map(_.failedProduceRequestsPerSec.formatOneMinuteRate), + "failedProduceRequestSecFiveMin" -> brokerView.metrics.map(_.failedProduceRequestsPerSec.formatFiveMinuteRate), + "failedProduceRequestSecFifteenMin" -> brokerView.metrics.map(_.failedProduceRequestsPerSec.formatFifteenMinuteRate) + )).withHeaders("X-Frame-Options" -> "SAMEORIGIN") + ) + } + } + } diff --git a/conf/routes b/conf/routes index 27ec9de98..be5e0422b 100644 --- a/conf/routes +++ b/conf/routes @@ -79,3 +79,7 @@ GET /assets/*file controllers.Assets.a # Ping / Health Check GET /api/health controllers.ApiHealth.ping + +# Monitoring Broker Status +GET /api/status/:c/monitoring controllers.api.KafkaStateCheck.monitoring(c:String) +GET /api/status/:c/:b/brokermonitoring controllers.api.KafkaStateCheck.brokermonitoring(c:String, b:Int)