From 3f42c7f48af929d8fd1be2abfb4d4d3bbfd1746a Mon Sep 17 00:00:00 2001 From: Emilio Lahr-Vivaz Date: Tue, 14 Jan 2025 14:04:42 -0500 Subject: [PATCH] GEOMESA-3436 NiFi - Fix FSDS path cache classpath * Use Caffeine 2.x-compatible API calls --- .../fs/storage/common/utils/PathCache.scala | 73 ++++++++++++++----- .../storage/common/utils/PathCacheTest.scala | 49 +++++++++++++ 2 files changed, 103 insertions(+), 19 deletions(-) create mode 100644 geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/utils/PathCacheTest.scala diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/utils/PathCache.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/utils/PathCache.scala index 7878c509331..f8dd2877052 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/utils/PathCache.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/utils/PathCache.scala @@ -12,7 +12,9 @@ import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine} import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, RemoteIterator} import org.locationtech.geomesa.utils.conf.GeoMesaSystemProperties.SystemProperty -import java.util.concurrent.TimeUnit +import java.io.FileNotFoundException +import java.util.concurrent.{CompletableFuture, Executor, TimeUnit} +import java.util.function.BiConsumer /** * Caches file statuses to avoid repeated file system operations. Status expires after a @@ -32,14 +34,6 @@ object PathCache { } ) - // cache for individual file status - private val statusCache = - Caffeine.newBuilder().expireAfterWrite(duration, TimeUnit.MILLISECONDS).build( - new CacheLoader[(FileSystem, Path), FileStatus]() { - override def load(key: (FileSystem, Path)): FileStatus = key._1.getFileStatus(key._2) - } - ) - // cache for checking directory contents private val listCache = Caffeine.newBuilder().expireAfterWrite(duration, TimeUnit.MILLISECONDS).build( @@ -49,6 +43,33 @@ object PathCache { } ) + // cache for individual file status + private val statusCache = + Caffeine.newBuilder().expireAfterWrite(duration, TimeUnit.MILLISECONDS).build( + new CacheLoader[(FileSystem, Path), FileStatus]() { + override def load(key: (FileSystem, Path)): FileStatus = { + try { key._1.getFileStatus(key._2) } catch { + case _: FileNotFoundException => null + } + } + + override def asyncLoad(key: (FileSystem, Path), executor: Executor): CompletableFuture[FileStatus] = { + super.asyncLoad(key, executor) + .whenCompleteAsync(new ListCacheRefresh(key), executor) + .asInstanceOf[CompletableFuture[FileStatus]] + } + + override def asyncReload( + key: (FileSystem, Path), + oldValue: FileStatus, + executor: Executor): CompletableFuture[FileStatus] = { + super.asyncReload(key, oldValue, executor) + .whenCompleteAsync(new ListCacheRefresh(key), executor) + .asInstanceOf[CompletableFuture[FileStatus]] + } + } + ) + /** * Register a path as existing * @@ -57,14 +78,7 @@ object PathCache { */ def register(fs: FileSystem, path: Path): Unit = { pathCache.put((fs, path), java.lang.Boolean.TRUE) - val status = statusCache.refresh((fs, path)) - val parent = path.getParent - if (parent != null) { - listCache.getIfPresent((fs, parent)) match { - case null => // no-op - case list => listCache.put((fs, parent), list :+ status.get()) - } - } + statusCache.refresh((fs, path)) // also triggers listCache update } /** @@ -83,7 +97,7 @@ object PathCache { } /** - * Gets the file status for a path + * Gets the file status for a path. Path must exist. * * @param fs file system * @param path path @@ -116,7 +130,12 @@ object PathCache { * @param fs file system * @param path path */ - def invalidate(fs: FileSystem, path: Path): Unit = Seq(pathCache, statusCache, listCache).foreach(_.invalidate((fs, path))) + def invalidate(fs: FileSystem, path: Path): Unit = { + Seq(pathCache, statusCache, listCache).foreach(_.invalidate((fs, path))) + if (path.getParent != null) { + listCache.invalidate((fs, path.getParent)) + } + } object RemoteIterator { def apply[T](iter: RemoteIterator[T]): Iterator[T] = new Iterator[T] { @@ -124,4 +143,20 @@ object PathCache { override def next(): T = iter.next } } + + private class ListCacheRefresh(key: (FileSystem, Path)) extends BiConsumer[FileStatus, Throwable] { + override def accept(status: FileStatus, u: Throwable): Unit = { + if (status != null) { // could be null if load fails + val (fs, path) = key + val parent = path.getParent + if (parent != null) { + listCache.asMap().computeIfPresent((fs, parent), load(status)_) + } + } + } + + // noinspection ScalaUnusedSymbol + private def load(status: FileStatus)(ignored: (FileSystem, Path), list: Stream[FileStatus]): Stream[FileStatus] = + list.filterNot(f => f.getPath == status.getPath) :+ status + } } diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/utils/PathCacheTest.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/utils/PathCacheTest.scala new file mode 100644 index 00000000000..a1cfe810fba --- /dev/null +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/utils/PathCacheTest.scala @@ -0,0 +1,49 @@ +/*********************************************************************** + * Copyright (c) 2013-2025 Commonwealth Computer Research, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Apache License, Version 2.0 + * which accompanies this distribution and is available at + * http://www.opensource.org/licenses/apache2.0.php. + ***********************************************************************/ + +package org.locationtech.geomesa.fs.storage.common.utils + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import org.junit.runner.RunWith +import org.specs2.mutable.Specification +import org.specs2.runner.JUnitRunner + +import java.nio.file.Files +import scala.concurrent.duration.DurationInt + +@RunWith(classOf[JUnitRunner]) +class PathCacheTest extends Specification { + + "PathCache" should { + "update list cache when registering a new file" >> { + val root = new Path(Files.createTempDirectory("geomesa").toFile.getPath) + val fs = FileSystem.get(root.toUri, new Configuration()) + try { + val file = new Path(root, "test") + PathCache.exists(fs, file) must beFalse + PathCache.list(fs, root) must beEmpty + // create the file + fs.create(file).close() + fs.exists(file) must beTrue + // verify cache has not been updated + PathCache.exists(fs, file) must beFalse + PathCache.list(fs, root) must beEmpty + // register the file + PathCache.register(fs, file) + // verify cached values have been updated + PathCache.exists(fs, file) must beTrue + eventually(10, 100.millis)(PathCache.list(fs, root).toList must haveLength(1)) + // note: it's hard to verify this is a cached value, since it doesn't cache if a file doesn't exist... + PathCache.status(fs, file) must not(beNull) + } finally { + fs.delete(root, true) + } + } + } +}