From 7e7aa244cfaf94e9703516cbd1a87461d52f5980 Mon Sep 17 00:00:00 2001 From: Emilio Date: Tue, 14 Jan 2025 16:12:08 -0500 Subject: [PATCH] GEOMESA-3436 NiFi - Fix FSDS path cache classpath with reflection (#3259) --- .../fs/storage/common/utils/PathCache.scala | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/utils/PathCache.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/utils/PathCache.scala index f8dd2877052..8e7eb70a903 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/utils/PathCache.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/utils/PathCache.scala @@ -8,11 +8,14 @@ package org.locationtech.geomesa.fs.storage.common.utils -import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine} +import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine, LoadingCache} +import com.typesafe.scalalogging.LazyLogging import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, RemoteIterator} import org.locationtech.geomesa.utils.conf.GeoMesaSystemProperties.SystemProperty import java.io.FileNotFoundException +import java.lang +import java.lang.reflect.Method import java.util.concurrent.{CompletableFuture, Executor, TimeUnit} import java.util.function.BiConsumer @@ -20,14 +23,14 @@ import java.util.function.BiConsumer * Caches file statuses to avoid repeated file system operations. Status expires after a * configurable period, by default 10 minutes. */ -object PathCache { +object PathCache extends LazyLogging { val CacheDurationProperty: SystemProperty = SystemProperty("geomesa.fs.file.cache.duration", "15 minutes") private val duration = CacheDurationProperty.toDuration.get.toMillis // cache for checking existence of files - private val pathCache = + private val pathCache: LoadingCache[(FileSystem, Path), lang.Boolean] = Caffeine.newBuilder().expireAfterWrite(duration, TimeUnit.MILLISECONDS).build( new CacheLoader[(FileSystem, Path), java.lang.Boolean]() { override def load(key: (FileSystem, Path)): java.lang.Boolean = key._1.exists(key._2) @@ -35,7 +38,7 @@ object PathCache { ) // cache for checking directory contents - private val listCache = + private val listCache: LoadingCache[(FileSystem, Path), Stream[FileStatus]] = Caffeine.newBuilder().expireAfterWrite(duration, TimeUnit.MILLISECONDS).build( new CacheLoader[(FileSystem, Path), Stream[FileStatus]]() { override def load(key: (FileSystem, Path)): Stream[FileStatus] = @@ -44,7 +47,7 @@ object PathCache { ) // cache for individual file status - private val statusCache = + private val statusCache: LoadingCache[(FileSystem, Path), FileStatus] = Caffeine.newBuilder().expireAfterWrite(duration, TimeUnit.MILLISECONDS).build( new CacheLoader[(FileSystem, Path), FileStatus]() { override def load(key: (FileSystem, Path)): FileStatus = { @@ -70,6 +73,12 @@ object PathCache { } ) + // we use reflection to get around Caffeine 2.x/3.x API differences in some environments + private val refresh: Method = classOf[LoadingCache[_, _]].getMethods.find(_.getName == "refresh").getOrElse { + logger.warn("Could not get refresh cache method - cache operations will be less efficient") + null + } + /** * Register a path as existing * @@ -78,7 +87,9 @@ object PathCache { */ def register(fs: FileSystem, path: Path): Unit = { pathCache.put((fs, path), java.lang.Boolean.TRUE) - statusCache.refresh((fs, path)) // also triggers listCache update + if (refresh != null) { + refresh.invoke(statusCache, (fs, path)) // also triggers listCache update + } } /**