diff --git a/geomesa-gs-kafka-status-endpoint/README.md b/geomesa-gs-kafka-status-endpoint/README.md
new file mode 100644
index 0000000..36f53a3
--- /dev/null
+++ b/geomesa-gs-kafka-status-endpoint/README.md
@@ -0,0 +1,7 @@
+# geomesa-gs-kafka-status-endpoint
+
+This module provides a readiness check available as a REST endpoint which indicates when any initial Kafka loads have
+completed. In order for this to matter, Kafka DataStores need to have `kafka.consumer.start-on-demand` set to false and
+`kafka.consumer.read-back` set to a non-zero value.
+
+The endpoint is available at `/geoserver/rest/kafka` (assuming a default context path).
diff --git a/geomesa-gs-kafka-status-endpoint/pom.xml b/geomesa-gs-kafka-status-endpoint/pom.xml
new file mode 100644
index 0000000..acdab98
--- /dev/null
+++ b/geomesa-gs-kafka-status-endpoint/pom.xml
@@ -0,0 +1,44 @@
+
+
+ 4.0.0
+
+ org.geomesa.geoserver
+ geomesa-geoserver_2.12
+ 5.2.0-SNAPSHOT
+
+
+ geomesa-gs-kafka-status-endpoint_2.12
+ GeoMesa GeoServer Kafka Status Endpoint
+
+
+
+ org.geoserver
+ gs-rest
+
+
+ org.locationtech.geomesa
+ geomesa-kafka-datastore_${scala.binary.version}
+ provided
+
+
+ org.springframework
+ spring-web
+ provided
+
+
+ org.springframework
+ spring-webmvc
+ provided
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-assembly-plugin
+
+
+
+
+
diff --git a/geomesa-gs-kafka-status-endpoint/src/main/resources/applicationContext.xml b/geomesa-gs-kafka-status-endpoint/src/main/resources/applicationContext.xml
new file mode 100644
index 0000000..118af19
--- /dev/null
+++ b/geomesa-gs-kafka-status-endpoint/src/main/resources/applicationContext.xml
@@ -0,0 +1,11 @@
+
+
+
+
diff --git a/geomesa-gs-kafka-status-endpoint/src/main/scala/org/geomesa/gs/kafka/status/KafkaLoadStatusController.scala b/geomesa-gs-kafka-status-endpoint/src/main/scala/org/geomesa/gs/kafka/status/KafkaLoadStatusController.scala
new file mode 100644
index 0000000..2db0100
--- /dev/null
+++ b/geomesa-gs-kafka-status-endpoint/src/main/scala/org/geomesa/gs/kafka/status/KafkaLoadStatusController.scala
@@ -0,0 +1,94 @@
+/***********************************************************************
+ * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc.
+ * All rights reserved. This program and the accompanying materials are
+ * made available under the terms of the GNU GENERAL PUBLIC LICENSE,
+ * Version 2 which accompanies this distribution and is available at
+ * https://opensource.org/licenses/GPL-2.0.
+ ***********************************************************************/
+
+package org.geomesa.gs.kafka.status
+
+import com.typesafe.scalalogging.StrictLogging
+import org.geoserver.catalog.event._
+import org.geoserver.catalog.{Catalog, DataStoreInfo, FeatureTypeInfo}
+import org.geoserver.rest.RestBaseController
+import org.locationtech.geomesa.kafka.data.KafkaCacheLoader
+import org.locationtech.geomesa.utils.concurrent.CachedThreadPool
+import org.springframework.beans.factory.InitializingBean
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.http.{HttpStatus, MediaType, ResponseEntity}
+import org.springframework.web.bind.annotation.{GetMapping, RequestMapping, RestController}
+
+@RestController
+@RequestMapping(path = Array("/rest/kafka"), produces = Array(MediaType.APPLICATION_JSON_VALUE))
+class KafkaLoadStatusController extends RestBaseController with CatalogListener with InitializingBean with StrictLogging {
+
+ import scala.collection.JavaConverters._
+
+ @Autowired
+ private var catalog: Catalog = _
+
+ @volatile
+ private var loaded: Boolean = false
+
+ @GetMapping
+ // noinspection ScalaUnusedSymbol
+ def status(): ResponseEntity[String] = {
+ if (loaded && KafkaCacheLoader.LoaderStatus.allLoaded()) {
+ new ResponseEntity("", HttpStatus.OK)
+ } else {
+ new ResponseEntity("Kafka layers are still loading", HttpStatus.SERVICE_UNAVAILABLE)
+ }
+ }
+
+ override def afterPropertiesSet(): Unit = {
+ catalog.addListener(this)
+ reloaded()
+ }
+
+ override def handleAddEvent(event: CatalogAddEvent): Unit = loadStore(event)
+ override def handleModifyEvent(event: CatalogModifyEvent): Unit = loadStore(event)
+ override def handlePostModifyEvent(event: CatalogPostModifyEvent): Unit = loadStore(event)
+ override def handleRemoveEvent(event: CatalogRemoveEvent): Unit = {}
+
+ override def reloaded(): Unit = {
+ logger.info("Starting to load all datastores")
+ val start = System.currentTimeMillis()
+ CachedThreadPool.submit(() => {
+ try {
+ val futures = catalog.getDataStores.asScala.toList.map { dsi =>
+ CachedThreadPool.submit(() => {
+ val start = System.currentTimeMillis()
+ try { loadStore(dsi) } finally {
+ logger.info(s"Loaded store ${name(dsi)} in ${System.currentTimeMillis() - start}ms")
+ }
+ })
+ }
+ futures.foreach(_.get)
+ logger.info(s"Finished loading datastores in ${System.currentTimeMillis() - start}ms")
+ } finally {
+ loaded = true
+ }
+ })
+ }
+
+ private def loadStore(event: CatalogEvent): Unit = {
+ logger.debug(s"Received event: $event")
+ event.getSource match {
+ case dsi: DataStoreInfo => loadStore(dsi)
+ case fti: FeatureTypeInfo => loadStore(fti.getStore)
+ case _ => // not a new layer - no action necessary
+ }
+ }
+
+ private def loadStore(dsi: DataStoreInfo): Unit = {
+ try { dsi.getDataStore(null) } catch {
+ case e: Throwable => logger.error(s"Error loading store ${name(dsi)}:", e)
+ }
+ }
+
+ private def name(dsi: DataStoreInfo): String = s"${dsi.getWorkspace.getName}:${dsi.getName}"
+
+ def setCatalog(catalog: Catalog): Unit = this.catalog = catalog
+ def getCatalog: Catalog = this.catalog
+}
diff --git a/pom.xml b/pom.xml
index 954b0e1..adbced1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -53,6 +53,7 @@
pom
+ geomesa-gs-kafka-status-endpoint
geomesa-gs-monitor-elasticsearch
geomesa-gs-styling
geomesa-gs-wfs
@@ -224,6 +225,18 @@
+
+ org.geoserver
+ gs-rest
+ ${geoserver.version}
+ provided
+
+
+ *
+ *
+
+
+
org.geoserver.extension
gs-wps-core