From 753fd16084eb55531b2449582549f07c6a97677f Mon Sep 17 00:00:00 2001 From: Emilio Lahr-Vivaz Date: Mon, 25 Nov 2024 16:20:34 -0500 Subject: [PATCH] GEOMESA-3420 Add Kafka readiness endpoint --- geomesa-gs-kafka-status-endpoint/README.md | 7 ++ geomesa-gs-kafka-status-endpoint/pom.xml | 44 +++++++++ .../src/main/resources/applicationContext.xml | 11 +++ .../status/KafkaLoadStatusController.scala | 94 +++++++++++++++++++ pom.xml | 13 +++ 5 files changed, 169 insertions(+) create mode 100644 geomesa-gs-kafka-status-endpoint/README.md create mode 100644 geomesa-gs-kafka-status-endpoint/pom.xml create mode 100644 geomesa-gs-kafka-status-endpoint/src/main/resources/applicationContext.xml create mode 100644 geomesa-gs-kafka-status-endpoint/src/main/scala/org/geomesa/gs/kafka/status/KafkaLoadStatusController.scala diff --git a/geomesa-gs-kafka-status-endpoint/README.md b/geomesa-gs-kafka-status-endpoint/README.md new file mode 100644 index 0000000..36f53a3 --- /dev/null +++ b/geomesa-gs-kafka-status-endpoint/README.md @@ -0,0 +1,7 @@ +# geomesa-gs-kafka-status-endpoint + +This module provides a readiness check available as a REST endpoint which indicates when any initial Kafka loads have +completed. In order for this to matter, Kafka DataStores need to have `kafka.consumer.start-on-demand` set to false and +`kafka.consumer.read-back` set to a non-zero value. + +The endpoint is available at `/geoserver/rest/kafka` (assuming a default context path). diff --git a/geomesa-gs-kafka-status-endpoint/pom.xml b/geomesa-gs-kafka-status-endpoint/pom.xml new file mode 100644 index 0000000..acdab98 --- /dev/null +++ b/geomesa-gs-kafka-status-endpoint/pom.xml @@ -0,0 +1,44 @@ + + + 4.0.0 + + org.geomesa.geoserver + geomesa-geoserver_2.12 + 5.2.0-SNAPSHOT + + + geomesa-gs-kafka-status-endpoint_2.12 + GeoMesa GeoServer Kafka Status Endpoint + + + + org.geoserver + gs-rest + + + org.locationtech.geomesa + geomesa-kafka-datastore_${scala.binary.version} + provided + + + org.springframework + spring-web + provided + + + org.springframework + spring-webmvc + provided + + + + + + + org.apache.maven.plugins + maven-assembly-plugin + + + + + diff --git a/geomesa-gs-kafka-status-endpoint/src/main/resources/applicationContext.xml b/geomesa-gs-kafka-status-endpoint/src/main/resources/applicationContext.xml new file mode 100644 index 0000000..118af19 --- /dev/null +++ b/geomesa-gs-kafka-status-endpoint/src/main/resources/applicationContext.xml @@ -0,0 +1,11 @@ + + + + diff --git a/geomesa-gs-kafka-status-endpoint/src/main/scala/org/geomesa/gs/kafka/status/KafkaLoadStatusController.scala b/geomesa-gs-kafka-status-endpoint/src/main/scala/org/geomesa/gs/kafka/status/KafkaLoadStatusController.scala new file mode 100644 index 0000000..2db0100 --- /dev/null +++ b/geomesa-gs-kafka-status-endpoint/src/main/scala/org/geomesa/gs/kafka/status/KafkaLoadStatusController.scala @@ -0,0 +1,94 @@ +/*********************************************************************** + * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc. + * All rights reserved. This program and the accompanying materials are + * made available under the terms of the GNU GENERAL PUBLIC LICENSE, + * Version 2 which accompanies this distribution and is available at + * https://opensource.org/licenses/GPL-2.0. + ***********************************************************************/ + +package org.geomesa.gs.kafka.status + +import com.typesafe.scalalogging.StrictLogging +import org.geoserver.catalog.event._ +import org.geoserver.catalog.{Catalog, DataStoreInfo, FeatureTypeInfo} +import org.geoserver.rest.RestBaseController +import org.locationtech.geomesa.kafka.data.KafkaCacheLoader +import org.locationtech.geomesa.utils.concurrent.CachedThreadPool +import org.springframework.beans.factory.InitializingBean +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.http.{HttpStatus, MediaType, ResponseEntity} +import org.springframework.web.bind.annotation.{GetMapping, RequestMapping, RestController} + +@RestController +@RequestMapping(path = Array("/rest/kafka"), produces = Array(MediaType.APPLICATION_JSON_VALUE)) +class KafkaLoadStatusController extends RestBaseController with CatalogListener with InitializingBean with StrictLogging { + + import scala.collection.JavaConverters._ + + @Autowired + private var catalog: Catalog = _ + + @volatile + private var loaded: Boolean = false + + @GetMapping + // noinspection ScalaUnusedSymbol + def status(): ResponseEntity[String] = { + if (loaded && KafkaCacheLoader.LoaderStatus.allLoaded()) { + new ResponseEntity("", HttpStatus.OK) + } else { + new ResponseEntity("Kafka layers are still loading", HttpStatus.SERVICE_UNAVAILABLE) + } + } + + override def afterPropertiesSet(): Unit = { + catalog.addListener(this) + reloaded() + } + + override def handleAddEvent(event: CatalogAddEvent): Unit = loadStore(event) + override def handleModifyEvent(event: CatalogModifyEvent): Unit = loadStore(event) + override def handlePostModifyEvent(event: CatalogPostModifyEvent): Unit = loadStore(event) + override def handleRemoveEvent(event: CatalogRemoveEvent): Unit = {} + + override def reloaded(): Unit = { + logger.info("Starting to load all datastores") + val start = System.currentTimeMillis() + CachedThreadPool.submit(() => { + try { + val futures = catalog.getDataStores.asScala.toList.map { dsi => + CachedThreadPool.submit(() => { + val start = System.currentTimeMillis() + try { loadStore(dsi) } finally { + logger.info(s"Loaded store ${name(dsi)} in ${System.currentTimeMillis() - start}ms") + } + }) + } + futures.foreach(_.get) + logger.info(s"Finished loading datastores in ${System.currentTimeMillis() - start}ms") + } finally { + loaded = true + } + }) + } + + private def loadStore(event: CatalogEvent): Unit = { + logger.debug(s"Received event: $event") + event.getSource match { + case dsi: DataStoreInfo => loadStore(dsi) + case fti: FeatureTypeInfo => loadStore(fti.getStore) + case _ => // not a new layer - no action necessary + } + } + + private def loadStore(dsi: DataStoreInfo): Unit = { + try { dsi.getDataStore(null) } catch { + case e: Throwable => logger.error(s"Error loading store ${name(dsi)}:", e) + } + } + + private def name(dsi: DataStoreInfo): String = s"${dsi.getWorkspace.getName}:${dsi.getName}" + + def setCatalog(catalog: Catalog): Unit = this.catalog = catalog + def getCatalog: Catalog = this.catalog +} diff --git a/pom.xml b/pom.xml index 954b0e1..adbced1 100644 --- a/pom.xml +++ b/pom.xml @@ -53,6 +53,7 @@ pom + geomesa-gs-kafka-status-endpoint geomesa-gs-monitor-elasticsearch geomesa-gs-styling geomesa-gs-wfs @@ -224,6 +225,18 @@ + + org.geoserver + gs-rest + ${geoserver.version} + provided + + + * + * + + + org.geoserver.extension gs-wps-core