diff --git a/build/cqs.tsv b/build/cqs.tsv
index c6f4df9eb104..b4886cfc17ec 100644
--- a/build/cqs.tsv
+++ b/build/cqs.tsv
@@ -345,8 +345,9 @@ org.slf4j:jul-to-slf4j 1.7.36 test
org.specs2:specs2-core_2.12 4.20.5 test
org.specs2:specs2-junit_2.12 4.20.5 test
org.specs2:specs2-mock_2.12 4.20.5 test
-org.testcontainers:kafka 1.19.7 test
-org.testcontainers:postgresql 1.19.7 test
-org.testcontainers:testcontainers 1.19.7 test
+org.testcontainers:kafka 1.20.3 test
+org.testcontainers:minio 1.20.3 test
+org.testcontainers:postgresql 1.20.3 test
+org.testcontainers:testcontainers 1.20.3 test
org.wololo:jts2geojson 0.16.1 test
org.xerial.snappy:snappy-java 1.1.10.5 test
diff --git a/build/test/resources/log4j.xml b/build/test/resources/log4j.xml
index 051b86fef69d..487166bd90da 100644
--- a/build/test/resources/log4j.xml
+++ b/build/test/resources/log4j.xml
@@ -7,9 +7,16 @@
+
+
+
+
+
+
+
diff --git a/geomesa-fs/geomesa-fs-datastore/pom.xml b/geomesa-fs/geomesa-fs-datastore/pom.xml
index 44313b16956a..8bd3cf828057 100644
--- a/geomesa-fs/geomesa-fs-datastore/pom.xml
+++ b/geomesa-fs/geomesa-fs-datastore/pom.xml
@@ -83,11 +83,30 @@
geomesa-fs-storage-orc_${scala.binary.version}
test
+
+ org.apache.hadoop
+ hadoop-aws
+ test
+
+
+ software.amazon.awssdk
+ s3
+ test
+
+
+ software.amazon.awssdk
+ s3-transfer-manager
+ test
+
org.testcontainers
testcontainers
+
+ org.testcontainers
+ minio
+
org.geomesa.testcontainers
testcontainers-accumulo
diff --git a/geomesa-fs/geomesa-fs-datastore/src/main/scala/org/locationtech/geomesa/fs/data/FileSystemDataStore.scala b/geomesa-fs/geomesa-fs-datastore/src/main/scala/org/locationtech/geomesa/fs/data/FileSystemDataStore.scala
index f1d096c8aa5e..3a4dea6519c4 100644
--- a/geomesa-fs/geomesa-fs-datastore/src/main/scala/org/locationtech/geomesa/fs/data/FileSystemDataStore.scala
+++ b/geomesa-fs/geomesa-fs-datastore/src/main/scala/org/locationtech/geomesa/fs/data/FileSystemDataStore.scala
@@ -10,7 +10,7 @@ package org.locationtech.geomesa.fs.data
import com.typesafe.scalalogging.LazyLogging
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileContext, Path}
+import org.apache.hadoop.fs.{FileContext, FileSystem, Path}
import org.geotools.api.data.Query
import org.geotools.api.feature.`type`.Name
import org.geotools.api.feature.simple.SimpleFeatureType
@@ -18,6 +18,7 @@ import org.geotools.data.store.{ContentDataStore, ContentEntry, ContentFeatureSo
import org.locationtech.geomesa.fs.storage.api._
import org.locationtech.geomesa.fs.storage.common.StorageKeys
import org.locationtech.geomesa.fs.storage.common.metadata.FileBasedMetadata
+import org.locationtech.geomesa.fs.storage.common.utils.PathCache
import org.locationtech.geomesa.index.stats.RunnableStats.UnoptimizedRunnableStats
import org.locationtech.geomesa.index.stats.{GeoMesaStats, HasGeoMesaStats}
import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes
@@ -27,8 +28,19 @@ import org.locationtech.geomesa.utils.io.CloseQuietly
import scala.concurrent.duration.Duration
import scala.util.control.NonFatal
+/**
+ * File system data store
+ *
+ * @param fs file system - note, this is expected to be a shared resource, and is not cleaned up on data store dispose
+ * @param conf conf
+ * @param root root path
+ * @param readThreads number of threads used for read ops
+ * @param writeTimeout timeout for write ops
+ * @param defaultEncoding default file encoding
+ * @param namespace geoserver namespace
+ */
class FileSystemDataStore(
- fc: FileContext,
+ fs: FileSystem,
conf: Configuration,
root: Path,
readThreads: Int,
@@ -37,9 +49,21 @@ class FileSystemDataStore(
namespace: Option[String]
) extends ContentDataStore with HasGeoMesaStats with LazyLogging {
+ // noinspection ScalaUnusedSymbol
+ @deprecated("Use FileSystem instead of FileContext")
+ def this(
+ fc: FileContext,
+ conf: Configuration,
+ root: Path,
+ readThreads: Int,
+ writeTimeout: Duration,
+ defaultEncoding: Option[String],
+ namespace: Option[String]) =
+ this(FileSystem.get(root.toUri, conf), conf, root, readThreads, writeTimeout, defaultEncoding, namespace)
+
namespace.foreach(setNamespaceURI)
- private val manager = FileSystemStorageManager(fc, conf, root, namespace)
+ private val manager = FileSystemStorageManager(fs, conf, root, namespace)
override val stats: GeoMesaStats = new UnoptimizedRunnableStats(this)
@@ -82,13 +106,15 @@ class FileSystemDataStore(
val fileSize = sft.removeTargetFileSize()
val path = manager.defaultPath(sft.getTypeName)
- val context = FileSystemContext(fc, conf, path, namespace)
+ val context = FileSystemContext(fs, conf, path, namespace)
val metadata =
StorageMetadataFactory.create(context, meta, Metadata(sft, encoding, scheme, leafStorage, fileSize))
try { manager.register(path, FileSystemStorageFactory(context, metadata)) } catch {
case NonFatal(e) => CloseQuietly(metadata).foreach(e.addSuppressed); throw e
}
+ PathCache.register(fs, root)
+ PathCache.register(fs, path)
}
}
diff --git a/geomesa-fs/geomesa-fs-datastore/src/main/scala/org/locationtech/geomesa/fs/data/FileSystemDataStoreFactory.scala b/geomesa-fs/geomesa-fs-datastore/src/main/scala/org/locationtech/geomesa/fs/data/FileSystemDataStoreFactory.scala
index d754402e351d..c8714acb364a 100644
--- a/geomesa-fs/geomesa-fs-datastore/src/main/scala/org/locationtech/geomesa/fs/data/FileSystemDataStoreFactory.scala
+++ b/geomesa-fs/geomesa-fs-datastore/src/main/scala/org/locationtech/geomesa/fs/data/FileSystemDataStoreFactory.scala
@@ -8,9 +8,8 @@
package org.locationtech.geomesa.fs.data
-import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine, LoadingCache}
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileContext, Path}
+import org.apache.hadoop.fs.{FileSystem, Path}
import org.geotools.api.data.DataAccessFactory.Param
import org.geotools.api.data.{DataStore, DataStoreFactorySpi}
import org.locationtech.geomesa.fs.storage.api.FileSystemStorageFactory
@@ -30,7 +29,6 @@ import scala.concurrent.duration.Duration
class FileSystemDataStoreFactory extends DataStoreFactorySpi {
import FileSystemDataStoreFactory.FileSystemDataStoreParams._
- import FileSystemDataStoreFactory.fileContextCache
override def createDataStore(params: java.util.Map[String, _]): DataStore = {
@@ -45,8 +43,6 @@ class FileSystemDataStoreFactory extends DataStoreFactorySpi {
conf
}
- val fc = fileContextCache.get(conf)
-
val path = new Path(PathParam.lookup(params))
val encoding = EncodingParam.lookupOpt(params).filterNot(_.isEmpty)
@@ -63,7 +59,9 @@ class FileSystemDataStoreFactory extends DataStoreFactorySpi {
val namespace = NamespaceParam.lookupOpt(params)
- new FileSystemDataStore(fc, conf, path, readThreads, writeTimeout, encoding, namespace)
+ val fs = FileSystem.get(path.toUri, conf)
+
+ new FileSystemDataStore(fs, conf, path, readThreads, writeTimeout, encoding, namespace)
}
override def createNewDataStore(params: java.util.Map[String, _]): DataStore =
@@ -105,12 +103,6 @@ object FileSystemDataStoreFactory extends GeoMesaDataStoreInfo {
private val configuration = new Configuration()
- private val fileContextCache: LoadingCache[Configuration, FileContext] = Caffeine.newBuilder().build(
- new CacheLoader[Configuration, FileContext]() {
- override def load(conf: Configuration): FileContext = FileContext.getFileContext(conf)
- }
- )
-
object FileSystemDataStoreParams extends NamespaceParams {
val WriterFileTimeout: SystemProperty = SystemProperty("geomesa.fs.writer.partition.timeout", "60s")
diff --git a/geomesa-fs/geomesa-fs-datastore/src/main/scala/org/locationtech/geomesa/fs/data/FileSystemFeatureStore.scala b/geomesa-fs/geomesa-fs-datastore/src/main/scala/org/locationtech/geomesa/fs/data/FileSystemFeatureStore.scala
index 239ed4322bb2..f879bef5dbb9 100644
--- a/geomesa-fs/geomesa-fs-datastore/src/main/scala/org/locationtech/geomesa/fs/data/FileSystemFeatureStore.scala
+++ b/geomesa-fs/geomesa-fs-datastore/src/main/scala/org/locationtech/geomesa/fs/data/FileSystemFeatureStore.scala
@@ -85,7 +85,6 @@ class FileSystemFeatureStore(
override def canEvent: Boolean = false
override def canReproject: Boolean = false
override def canSort: Boolean = false
-
override def canFilter: Boolean = true
override def canRetype: Boolean = true
diff --git a/geomesa-fs/geomesa-fs-datastore/src/main/scala/org/locationtech/geomesa/fs/data/FileSystemStorageManager.scala b/geomesa-fs/geomesa-fs-datastore/src/main/scala/org/locationtech/geomesa/fs/data/FileSystemStorageManager.scala
index c4631675582c..56273964511c 100644
--- a/geomesa-fs/geomesa-fs-datastore/src/main/scala/org/locationtech/geomesa/fs/data/FileSystemStorageManager.scala
+++ b/geomesa-fs/geomesa-fs-datastore/src/main/scala/org/locationtech/geomesa/fs/data/FileSystemStorageManager.scala
@@ -11,7 +11,7 @@ package org.locationtech.geomesa.fs.data
import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine}
import com.typesafe.scalalogging.LazyLogging
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileContext, Path}
+import org.apache.hadoop.fs.{FileSystem, Path}
import org.locationtech.geomesa.fs.storage.api._
import org.locationtech.geomesa.fs.storage.common.utils.PathCache
import org.locationtech.geomesa.utils.io.CloseQuietly
@@ -23,11 +23,11 @@ import scala.util.control.NonFatal
/**
* Manages the storages and associated simple feature types underneath a given path
*
- * @param fc file context
+ * @param fs file system
* @param conf configuration
* @param root root path for the data store
*/
-class FileSystemStorageManager private (fc: FileContext, conf: Configuration, root: Path, namespace: Option[String])
+class FileSystemStorageManager private (fs: FileSystem, conf: Configuration, root: Path, namespace: Option[String])
extends MethodProfiling with LazyLogging {
import scala.collection.JavaConverters._
@@ -42,7 +42,7 @@ class FileSystemStorageManager private (fc: FileContext, conf: Configuration, ro
*/
def storage(typeName: String): Option[FileSystemStorage] = {
cache.get(typeName).map(_._2) // check cached values
- .orElse(Some(defaultPath(typeName)).filter(PathCache.exists(fc, _)).flatMap(loadPath)) // check expected (default) path
+ .orElse(Some(defaultPath(typeName)).filter(PathCache.exists(fs, _)).flatMap(loadPath)) // check expected (default) path
.orElse(loadAll().find(_.metadata.sft.getTypeName == typeName)) // check other paths until we find it
}
@@ -80,8 +80,8 @@ class FileSystemStorageManager private (fc: FileContext, conf: Configuration, ro
* @return
*/
private def loadAll(): Iterator[FileSystemStorage] = {
- if (!PathCache.exists(fc, root)) { Iterator.empty } else {
- val dirs = PathCache.list(fc, root).filter(_.isDirectory).map(_.getPath)
+ if (!PathCache.exists(fs, root)) { Iterator.empty } else {
+ val dirs = PathCache.list(fs, root).filter(_.isDirectory).map(_.getPath)
dirs.filterNot(path => cache.exists { case (_, (p, _)) => p == path }).flatMap(loadPath)
}
}
@@ -99,7 +99,7 @@ class FileSystemStorageManager private (fc: FileContext, conf: Configuration, ro
logger.debug(s"${ if (storage.isDefined) "Loaded" else "No" } storage at path '$path' in ${time}ms")
profile(complete _) {
- val context = FileSystemContext(fc, conf, path, namespace)
+ val context = FileSystemContext(fs, conf, path, namespace)
StorageMetadataFactory.load(context).map { meta =>
try {
val storage = FileSystemStorageFactory(context, meta)
@@ -116,8 +116,8 @@ class FileSystemStorageManager private (fc: FileContext, conf: Configuration, ro
object FileSystemStorageManager {
private val cache = Caffeine.newBuilder().build(
- new CacheLoader[(FileContext, Configuration, Path, Option[String]), FileSystemStorageManager]() {
- override def load(key: (FileContext, Configuration, Path, Option[String])): FileSystemStorageManager =
+ new CacheLoader[(FileSystem, Configuration, Path, Option[String]), FileSystemStorageManager]() {
+ override def load(key: (FileSystem, Configuration, Path, Option[String])): FileSystemStorageManager =
new FileSystemStorageManager(key._1, key._2, key._3, key._4)
}
)
@@ -125,11 +125,11 @@ object FileSystemStorageManager {
/**
* Load a cached storage manager instance
*
- * @param fc file context
+ * @param fs file system
* @param conf configuration
* @param root data store root path
* @return
*/
- def apply(fc: FileContext, conf: Configuration, root: Path, namespace: Option[String]): FileSystemStorageManager =
- cache.get((fc, conf, root, namespace))
+ def apply(fs: FileSystem, conf: Configuration, root: Path, namespace: Option[String]): FileSystemStorageManager =
+ cache.get((fs, conf, root, namespace))
}
diff --git a/geomesa-fs/geomesa-fs-datastore/src/test/resources/day-scheme.conf b/geomesa-fs/geomesa-fs-datastore/src/test/resources/day-scheme.conf
deleted file mode 100644
index e69de29bb2d1..000000000000
diff --git a/geomesa-fs/geomesa-fs-datastore/src/test/scala/org/locationtech/geomesa/fs/converter/ConverterDataStoreTest.scala b/geomesa-fs/geomesa-fs-datastore/src/test/scala/org/locationtech/geomesa/fs/converter/ConverterDataStoreTest.scala
index 0cbca4b563a4..9fc298c83ee5 100644
--- a/geomesa-fs/geomesa-fs-datastore/src/test/scala/org/locationtech/geomesa/fs/converter/ConverterDataStoreTest.scala
+++ b/geomesa-fs/geomesa-fs-datastore/src/test/scala/org/locationtech/geomesa/fs/converter/ConverterDataStoreTest.scala
@@ -9,53 +9,89 @@
package org.locationtech.geomesa.fs.converter
import com.typesafe.config.{ConfigFactory, ConfigRenderOptions}
+import org.apache.commons.compress.archivers.tar.{TarArchiveEntry, TarArchiveOutputStream}
+import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Options.CreateOpts
+import org.apache.hadoop.fs.{CreateFlag, FileContext, Path}
import org.geotools.api.data.{DataStoreFinder, Query, Transaction}
import org.geotools.api.feature.simple.SimpleFeature
import org.geotools.api.filter.Filter
import org.junit.runner.RunWith
+import org.locationtech.geomesa.utils.collection.SelfClosingIterator
+import org.locationtech.geomesa.utils.io.WithClose
+import org.slf4j.LoggerFactory
import org.specs2.mutable.Specification
import org.specs2.runner.JUnitRunner
+import org.specs2.specification.BeforeAfterAll
+import org.testcontainers.containers.MinIOContainer
+import org.testcontainers.containers.output.Slf4jLogConsumer
+import org.testcontainers.utility.DockerImageName
+import java.io.{BufferedOutputStream, ByteArrayInputStream}
+import java.nio.charset.StandardCharsets
import scala.collection.mutable
-/**
- * Created by hulbert on 6/21/17.
- */
+
@RunWith(classOf[JUnitRunner])
-class ConverterDataStoreTest extends Specification {
+class ConverterDataStoreTest extends Specification with BeforeAfterAll {
import scala.collection.JavaConverters._
sequential
+ var minio: MinIOContainer = _
+ val bucket = "geomesa"
+
+ override def beforeAll(): Unit = {
+ minio =
+ new MinIOContainer(
+ DockerImageName.parse("minio/minio").withTag(sys.props.getOrElse("minio.docker.tag", "RELEASE.2024-10-29T16-01-48Z")))
+ minio.start()
+ minio.followOutput(new Slf4jLogConsumer(LoggerFactory.getLogger("minio")))
+ minio.execInContainer("mc", "alias", "set", "localhost", "http://localhost:9000", minio.getUserName, minio.getPassword)
+ minio.execInContainer("mc", "mb", s"localhost/$bucket")
+ }
+
+ override def afterAll(): Unit = {
+ if (minio != null) {
+ minio.close()
+ }
+ }
+
def fsConfig(converter: String, path: String): String = {
- s"""
- |
+ val props = Seq(
+ prop("fs.options.converter.path", path),
+ prop("fs.partition-scheme.name", "datetime"),
+ prop("fs.partition-scheme.opts.datetime-format", "yyyy/DDD/HH/mm"),
+ prop("fs.partition-scheme.opts.step-unit", "MINUTES"),
+ prop("fs.partition-scheme.opts.step", "15"),
+ prop("fs.partition-scheme.opts.dtg-attribute", "dtg"),
+ prop("fs.options.leaf-storage", "true"),
+ )
+ s"""
|$converter
- | fs.options.converter.path$path
- | fs.partition-scheme.namedatetime
- | fs.partition-scheme.opts.datetime-formatyyyy/DDD/HH/mm
- | fs.partition-scheme.opts.step-unitMINUTES
- | fs.partition-scheme.opts.step15
- | fs.partition-scheme.opts.dtg-attributedtg
- | fs.options.leaf-storagetrue
+ |${props.mkString("\n")}
|
|""".stripMargin
}
def sftByName(name: String): String = {
- s"""
- | fs.options.sft.name$name
- | fs.options.converter.name$name
- |""".stripMargin
+ Seq(
+ prop("fs.options.sft.name", name),
+ prop("fs.options.converter.name", name),
+ ).mkString("\n")
}
def sftByConf(conf: String): String = {
- s"""
- | fs.options.sft.conf$conf
- | fs.options.converter.conf$conf
- |""".stripMargin
+ Seq(
+ prop("fs.options.sft.conf", conf),
+ prop("fs.options.converter.conf", conf),
+ ).mkString("\n")
}
+ def prop(key: String, value: String): String = s" $key$value"
+
"ConverterDataStore" should {
"work with one datastore" >> {
val ds = DataStoreFinder.getDataStore(Map(
@@ -70,12 +106,8 @@ class ConverterDataStoreTest extends Specification {
types.head mustEqual "fs-test"
val q = new Query("fs-test", Filter.INCLUDE)
- val fr = ds.getFeatureReader(q, Transaction.AUTO_COMMIT)
- val feats = mutable.ListBuffer.empty[SimpleFeature]
- while (fr.hasNext) {
- feats += fr.next()
- }
- feats.size mustEqual 4
+ val feats = SelfClosingIterator(ds.getFeatureReader(q, Transaction.AUTO_COMMIT)).toList
+ feats must haveLength(4)
}
"work with something else" >> {
@@ -91,12 +123,68 @@ class ConverterDataStoreTest extends Specification {
types.head mustEqual "fs-test"
val q = new Query("fs-test", Filter.INCLUDE)
- val fr = ds.getFeatureReader(q, Transaction.AUTO_COMMIT)
- val feats = mutable.ListBuffer.empty[SimpleFeature]
- while (fr.hasNext) {
- feats += fr.next()
+ val feats = SelfClosingIterator(ds.getFeatureReader(q, Transaction.AUTO_COMMIT)).toList
+ feats must haveLength(4)
+ }
+
+ "read tar.gz files from s3 storage" >> {
+ val bucket = s"s3a://${this.bucket}/"
+ val config = {
+ val props = Seq(
+ sftByName("fs-test"),
+ prop("fs.s3a.endpoint", minio.getS3URL),
+ prop("fs.s3a.access.key", minio.getUserName),
+ prop("fs.s3a.secret.key", minio.getPassword),
+ prop("fs.s3a.path.style.access", "true"),
+ prop("dfs.client.use.datanode.hostname", "true"),
+ ).mkString("\n")
+ fsConfig(props, "datastore1")
}
- feats.size mustEqual 4
+ val fc = {
+ val conf = new Configuration()
+ conf.addResource(new ByteArrayInputStream(config.getBytes(StandardCharsets.UTF_8)))
+ FileContext.getFileContext(conf)
+ }
+ // number of times to write the sample files into our tgz
+ // note: we need fairly large files to trigger GEOMESA-3411
+ val multiplier = 177156
+ Seq("00", "15", "30", "45").foreach { file =>
+ val path = s"datastore1/2017/001/01/$file"
+ val contents = WithClose(getClass.getClassLoader.getResourceAsStream(s"example/$path"))(IOUtils.toByteArray)
+ WithClose(fc.create(new Path(s"$bucket$path.tgz"), java.util.EnumSet.of(CreateFlag.CREATE), CreateOpts.createParent())) { os =>
+ WithClose(new BufferedOutputStream(os)) { buf =>
+ WithClose(new GzipCompressorOutputStream(buf)) { gz =>
+ WithClose(new TarArchiveOutputStream(gz)) { tar =>
+ val entry = new TarArchiveEntry(file)
+ entry.setSize(contents.length * multiplier)
+ tar.putArchiveEntry(entry)
+ var i = 0
+ while (i < multiplier) {
+ tar.write(contents)
+ i += 1
+ }
+ tar.closeArchiveEntry()
+ tar.finish()
+ }
+ }
+ }
+ }
+ }
+
+ val ds = DataStoreFinder.getDataStore(Map(
+ "fs.path" -> bucket,
+ "fs.encoding" -> "converter",
+ "fs.config.xml" -> config,
+ ).asJava)
+ ds must not(beNull)
+
+ val types = ds.getTypeNames
+ types must haveSize(1)
+ types.head mustEqual "fs-test"
+
+ val q = new Query("fs-test", Filter.INCLUDE)
+ val count = SelfClosingIterator(ds.getFeatureReader(q, Transaction.AUTO_COMMIT)).length
+ count mustEqual multiplier * 4
}
"load sft as a string" >> {
diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-api/src/main/scala/org/locationtech/geomesa/fs/storage/api/package.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-api/src/main/scala/org/locationtech/geomesa/fs/storage/api/package.scala
index 9259f8a8996c..9bf4a90d1370 100644
--- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-api/src/main/scala/org/locationtech/geomesa/fs/storage/api/package.scala
+++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-api/src/main/scala/org/locationtech/geomesa/fs/storage/api/package.scala
@@ -9,7 +9,7 @@
package org.locationtech.geomesa.fs.storage
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileContext, Path}
+import org.apache.hadoop.fs.{FileSystem, Path}
import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
import org.geotools.api.filter.Filter
@@ -22,11 +22,15 @@ package object api {
/**
* Holder for file system references
*
- * @param fc file context
+ * @param fs file system
* @param conf configuration
* @param root root path
*/
- case class FileSystemContext(fc: FileContext, conf: Configuration, root: Path, namespace: Option[String] = None)
+ case class FileSystemContext(fs: FileSystem, conf: Configuration, root: Path, namespace: Option[String] = None)
+
+ object FileSystemContext {
+ def apply(root: Path, conf: Configuration): FileSystemContext = FileSystemContext(root.getFileSystem(conf), conf, root)
+ }
/**
* Identifier plus configuration
diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/AbstractFileSystemStorage.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/AbstractFileSystemStorage.scala
index d8cc86722ff1..5fcfb47ac9cf 100644
--- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/AbstractFileSystemStorage.scala
+++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/AbstractFileSystemStorage.scala
@@ -91,7 +91,7 @@ abstract class AbstractFileSystemStorage(
val baseDir = StorageUtils.baseDirectory(context.root, partition, metadata.leafStorage)
p.files.flatMap { file =>
val path = new Path(baseDir, file.name)
- if (PathCache.exists(context.fc, path)) {
+ if (PathCache.exists(context.fs, path)) {
Seq(StorageFilePath(file, path))
} else {
logger.warn(s"Inconsistent metadata for ${metadata.sft.getTypeName}: $path")
@@ -201,10 +201,10 @@ abstract class AbstractFileSystemStorage(
val failures = ListBuffer.empty[Path]
toCompact.foreach { file =>
- if (!context.fc.delete(file.path, false)) {
+ if (!context.fs.delete(file.path, false)) {
failures.append(file.path)
}
- PathCache.invalidate(context.fc, file.path)
+ PathCache.invalidate(context.fs, file.path)
}
if (failures.nonEmpty) {
@@ -233,7 +233,7 @@ abstract class AbstractFileSystemStorage(
def pathAndObserver: WriterConfig = {
val path = StorageUtils.nextFile(context.root, partition, metadata.leafStorage, extension, fileType)
- PathCache.register(context.fc, path)
+ PathCache.register(context.fs, path)
val updateObserver = new UpdateObserver(partition, path, action)
val observer = if (observers.isEmpty) { updateObserver } else {
new CompositeObserver(observers.map(_.apply(path)).+:(updateObserver))
@@ -278,7 +278,7 @@ abstract class AbstractFileSystemStorage(
writer.close()
writer = null
// adjust our estimate to account for the actual bytes written
- total += context.fc.getFileStatus(path).getLen
+ total += context.fs.getFileStatus(path).getLen
estimator.update(total, count)
remaining = estimator.estimate(0L)
}
diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/SizeableFileSystemStorage.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/SizeableFileSystemStorage.scala
index cf41b1af9df6..dbe908323675 100644
--- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/SizeableFileSystemStorage.scala
+++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/SizeableFileSystemStorage.scala
@@ -39,7 +39,7 @@ trait SizeableFileSystemStorage extends FileSystemStorage {
* @return true if the file is appropriately sized
*/
def fileIsSized(path: Path, target: Long): Boolean = {
- val size = context.fc.getFileStatus(path).getLen
+ val size = context.fs.getFileStatus(path).getLen
math.abs((size.toDouble / target) - 1d) <= fileSizeError
}
diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/jobs/PartitionOutputFormat.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/jobs/PartitionOutputFormat.scala
index dae3c9351c1c..02c8e6f1ea70 100644
--- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/jobs/PartitionOutputFormat.scala
+++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/jobs/PartitionOutputFormat.scala
@@ -9,7 +9,7 @@
package org.locationtech.geomesa.fs.storage.common.jobs
import com.typesafe.scalalogging.LazyLogging
-import org.apache.hadoop.fs.{FileContext, Path}
+import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.InvalidJobConfException
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, FileOutputFormat}
@@ -57,7 +57,7 @@ class PartitionOutputFormat(delegate: SingleFileOutputFormat) extends OutputForm
private val storage = {
val conf = context.getConfiguration
val root = StorageConfiguration.getRootPath(conf)
- val fsc = FileSystemContext(FileContext.getFileContext(root.toUri, conf), conf, root)
+ val fsc = FileSystemContext(root, conf)
val metadata = StorageMetadataFactory.load(fsc).getOrElse {
throw new IllegalArgumentException(s"No storage defined under path '$root'")
}
@@ -171,7 +171,7 @@ class PartitionOutputFormat(delegate: SingleFileOutputFormat) extends OutputForm
writer.close(context)
writer = null
// adjust our estimate to account for the actual bytes written
- total += storage.context.fc.getFileStatus(path).getLen
+ total += storage.context.fs.getFileStatus(path).getLen
estimator.update(total, count)
remaining = estimator.estimate(0L)
}
diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/metadata/FileBasedMetadata.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/metadata/FileBasedMetadata.scala
index 4c2ef533d127..e8ea0f492ecc 100644
--- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/metadata/FileBasedMetadata.scala
+++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/metadata/FileBasedMetadata.scala
@@ -11,7 +11,6 @@ package org.locationtech.geomesa.fs.storage.common.metadata
import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine, LoadingCache}
import com.typesafe.config._
import com.typesafe.scalalogging.LazyLogging
-import org.apache.hadoop.fs.Options.CreateOpts
import org.apache.hadoop.fs._
import org.geotools.api.feature.simple.SimpleFeatureType
import org.locationtech.geomesa.fs.storage.api.StorageMetadata.PartitionMetadata
@@ -46,14 +45,14 @@ import scala.util.control.NonFatal
* reload. In general this does not cause problems, as reads and writes happen in different JVMs (ingest
* vs query).
*
- * @param fc file context
+ * @param fs file system
* @param directory metadata root path
* @param sft simple feature type
* @param meta basic metadata config
* @param converter file converter
*/
class FileBasedMetadata(
- private val fc: FileContext,
+ private val fs: FileSystem,
val directory: Path,
val sft: SimpleFeatureType,
private val meta: Metadata,
@@ -94,7 +93,7 @@ class FileBasedMetadata(
override def set(key: String, value: String): Unit = {
kvs.put(key, value)
- FileBasedMetadataFactory.write(fc, directory.getParent, meta.copy(config = kvs.asScala.toMap))
+ FileBasedMetadataFactory.write(fs, directory.getParent, meta.copy(config = kvs.asScala.toMap))
}
override def getPartitions(prefix: Option[String]): Seq[PartitionMetadata] = {
@@ -204,12 +203,12 @@ class FileBasedMetadata(
val encoded = StringSerialization.alphaNumericSafeString(config.name)
val name = s"$UpdatePartitionPrefix$encoded-${UUID.randomUUID()}${converter.suffix}"
val file = new Path(directory, name)
- WithClose(fc.create(file, java.util.EnumSet.of(CreateFlag.CREATE), CreateOpts.createParent)) { out =>
+ WithClose(fs.create(file, false)) { out =>
out.write(data.getBytes(StandardCharsets.UTF_8))
out.hflush()
out.hsync()
}
- PathCache.register(fc, file)
+ PathCache.register(fs, file)
file
}
}
@@ -225,19 +224,18 @@ class FileBasedMetadata(
}
profile("Persisted compacted partition configuration") {
val file = new Path(directory, CompactedPrefix + converter.suffix)
- val flags = java.util.EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
- WithClose(fc.create(file, flags, CreateOpts.createParent)) { out =>
+ WithClose(fs.create(file, true)) { out =>
out.write(data.getBytes(StandardCharsets.UTF_8))
out.hflush()
out.hsync()
}
- PathCache.register(fc, file)
+ PathCache.register(fs, file)
// generally we overwrite the existing file but if we change rendering the name will change
val toRemove =
new Path(directory, if (converter.suffix == HoconPathSuffix) { CompactedJson } else { CompactedHocon })
- if (PathCache.exists(fc, toRemove, reload = true)) {
- fc.delete(toRemove, false)
- PathCache.invalidate(fc, toRemove)
+ if (PathCache.exists(fs, toRemove, reload = true)) {
+ fs.delete(toRemove, false)
+ PathCache.invalidate(fs, toRemove)
}
}
}
@@ -257,7 +255,7 @@ class FileBasedMetadata(
val pool = new CachedThreadPool(threads)
// use a phaser to track worker thread completion
val phaser = new Phaser(2) // 1 for the initial directory worker + 1 for this thread
- pool.submit(new DirectoryWorker(pool, phaser, fc.listStatus(directory), result))
+ pool.submit(new DirectoryWorker(pool, phaser, fs.listStatusIterator(directory), result))
// wait for the worker threads to complete
try {
phaser.awaitAdvanceInterruptibly(phaser.arrive())
@@ -306,7 +304,7 @@ class FileBasedMetadata(
private def readPartitionConfig(file: Path): Option[PartitionConfig] = {
try {
val config = profile("Loaded partition configuration") {
- WithClose(new InputStreamReader(fc.open(file), StandardCharsets.UTF_8)) { in =>
+ WithClose(new InputStreamReader(fs.open(file), StandardCharsets.UTF_8)) { in =>
ConfigFactory.parseReader(in, ConfigParseOptions.defaults().setSyntax(getSyntax(file.getName)))
}
}
@@ -327,7 +325,7 @@ class FileBasedMetadata(
private def readCompactedConfig(file: Path): Seq[PartitionConfig] = {
try {
val config = profile("Loaded compacted partition configuration") {
- WithClose(new InputStreamReader(fc.open(file), StandardCharsets.UTF_8)) { in =>
+ WithClose(new InputStreamReader(fs.open(file), StandardCharsets.UTF_8)) { in =>
ConfigFactory.parseReader(in, ConfigParseOptions.defaults().setSyntax(getSyntax(file.getName)))
}
}
@@ -347,11 +345,11 @@ class FileBasedMetadata(
*/
private def delete(paths: Iterable[Path], threads: Int): Unit = {
if (threads < 2) {
- paths.foreach(fc.delete(_, false))
+ paths.foreach(fs.delete(_, false))
} else {
val ec = new CachedThreadPool(threads)
try {
- paths.toList.map(p => ec.submit(new Runnable() { override def run(): Unit = fc.delete(p, false)})).foreach(_.get)
+ paths.toList.map(p => ec.submit(new Runnable() { override def run(): Unit = fs.delete(p, false)})).foreach(_.get)
} finally {
ec.shutdown()
}
@@ -376,7 +374,7 @@ class FileBasedMetadata(
if (status.isDirectory) {
i += 1
// use a tiered phaser on each directory avoid the limit of 65535 registered parties
- es.submit(new DirectoryWorker(es, new Phaser(phaser, 1), fc.listStatus(path), result))
+ es.submit(new DirectoryWorker(es, new Phaser(phaser, 1), fs.listStatusIterator(path), result))
} else if (name.startsWith(UpdatePartitionPrefix)) {
// pull out the partition name but don't parse the file yet
val encoded = name.substring(8, name.length - 42) // strip out prefix and suffix
@@ -491,7 +489,7 @@ object FileBasedMetadata {
* @return
*/
def copy(m: FileBasedMetadata): FileBasedMetadata =
- new FileBasedMetadata(m.fc, m.directory, m.sft, m.meta, m.converter)
+ new FileBasedMetadata(m.fs, m.directory, m.sft, m.meta, m.converter)
private def getSyntax(file: String): ConfigSyntax = {
if (file.endsWith(HoconPathSuffix)) {
diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/metadata/FileBasedMetadataFactory.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/metadata/FileBasedMetadataFactory.scala
index a4689347a3a1..1ada242f483a 100644
--- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/metadata/FileBasedMetadataFactory.scala
+++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/metadata/FileBasedMetadataFactory.scala
@@ -9,8 +9,7 @@
package org.locationtech.geomesa.fs.storage.common.metadata
import com.typesafe.scalalogging.LazyLogging
-import org.apache.hadoop.fs.Options.CreateOpts
-import org.apache.hadoop.fs.{CreateFlag, FileContext, Path}
+import org.apache.hadoop.fs.{FileSystem, Path}
import org.locationtech.geomesa.fs.storage.api._
import org.locationtech.geomesa.fs.storage.common.metadata.FileBasedMetadata.Config
import org.locationtech.geomesa.fs.storage.common.utils.PathCache
@@ -58,9 +57,9 @@ class FileBasedMetadataFactory extends StorageMetadataFactory {
PartitionSchemeFactory.load(sft, meta.scheme)
val renderer = config.get(Config.RenderKey).map(MetadataConverter.apply).getOrElse(RenderCompact)
MetadataJson.writeMetadata(context, NamedOptions(name, config + (Config.RenderKey -> renderer.name)))
- FileBasedMetadataFactory.write(context.fc, context.root, meta)
+ FileBasedMetadataFactory.write(context.fs, context.root, meta)
val directory = new Path(context.root, FileBasedMetadataFactory.MetadataDirectory)
- val metadata = new FileBasedMetadata(context.fc, directory, sft, meta, renderer)
+ val metadata = new FileBasedMetadata(context.fs, directory, sft, meta, renderer)
FileBasedMetadataFactory.cache.put(FileBasedMetadataFactory.key(context), metadata)
metadata
}
@@ -80,12 +79,12 @@ object FileBasedMetadataFactory extends MethodProfiling with LazyLogging {
val loader = new java.util.function.Function[String, FileBasedMetadata]() {
override def apply(ignored: String): FileBasedMetadata = {
val file = new Path(context.root, StoragePath)
- if (!PathCache.exists(context.fc, file)) { null } else {
+ if (!PathCache.exists(context.fs, file)) { null } else {
val directory = new Path(context.root, MetadataDirectory)
- val meta = WithClose(context.fc.open(file))(MetadataSerialization.deserialize)
+ val meta = WithClose(context.fs.open(file))(MetadataSerialization.deserialize)
val sft = namespaced(meta.sft, context.namespace)
val renderer = config.get(Config.RenderKey).map(MetadataConverter.apply).getOrElse(RenderPretty)
- new FileBasedMetadata(context.fc, directory, sft, meta, renderer)
+ new FileBasedMetadata(context.fs, directory, sft, meta, renderer)
}
}
}
@@ -95,18 +94,17 @@ object FileBasedMetadataFactory extends MethodProfiling with LazyLogging {
/**
* Write basic metadata to disk. This should be done once, when the storage is created
*
- * @param fc file context
+ * @param fs file system
* @param root root path
* @param metadata simple feature type, file encoding, partition scheme, etc
*/
- private [metadata] def write(fc: FileContext, root: Path, metadata: Metadata): Unit = {
+ private [metadata] def write(fs: FileSystem, root: Path, metadata: Metadata): Unit = {
val file = new Path(root, StoragePath)
- val flags = java.util.EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
- WithClose(fc.create(file, flags, CreateOpts.createParent)) { out =>
+ WithClose(fs.create(file, true)) { out =>
MetadataSerialization.serialize(out, metadata)
out.hflush()
out.hsync()
}
- PathCache.register(fc, file)
+ PathCache.register(fs, file)
}
}
diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/metadata/MetadataJson.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/metadata/MetadataJson.scala
index c1d4d3915b9c..5822d7a08ba5 100644
--- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/metadata/MetadataJson.scala
+++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/metadata/MetadataJson.scala
@@ -49,9 +49,9 @@ object MetadataJson extends MethodProfiling {
// using an atomic operation or cache loader can cause problems, as we sometimes insert into the
// map during the load, which is not allowed
val file = new Path(context.root, MetadataPath)
- if (PathCache.exists(context.fc, file)) {
+ if (PathCache.exists(context.fs, file)) {
val config = profile("Loaded metadata configuration") {
- WithClose(new InputStreamReader(context.fc.open(file), StandardCharsets.UTF_8)) { in =>
+ WithClose(new InputStreamReader(context.fs.open(file), StandardCharsets.UTF_8)) { in =>
ConfigFactory.load(ConfigFactory.parseReader(in, ParseOptions)) // call load to resolve sys props
}
}
@@ -61,8 +61,8 @@ object MetadataJson extends MethodProfiling {
}
cache.put(key, cached)
} else {
- context.fc.rename(file, new Path(context.root, s"$MetadataPath.bak"))
- PathCache.invalidate(context.fc, file)
+ context.fs.rename(file, new Path(context.root, s"$MetadataPath.bak"))
+ PathCache.invalidate(context.fs, file)
transitionMetadata(context, config).foreach { meta =>
cached = meta // will be set in the cache in the transition code
}
@@ -80,7 +80,7 @@ object MetadataJson extends MethodProfiling {
*/
def writeMetadata(context: FileSystemContext, metadata: NamedOptions): Unit = {
val file = new Path(context.root, MetadataPath)
- if (PathCache.exists(context.fc, file, reload = true)) {
+ if (PathCache.exists(context.fs, file, reload = true)) {
throw new IllegalArgumentException(
s"Trying to create a new storage instance but metadata already exists at '$file'")
}
@@ -92,7 +92,7 @@ object MetadataJson extends MethodProfiling {
// either side of the expression (typesafe will concatenate them), i.e. "foo ${bar}" -> "foo "${bar}""
val interpolated = data.replaceAll("\\$\\{[a-zA-Z0-9_.]+}", "\"$0\"")
profile("Persisted metadata configuration") {
- WithClose(context.fc.create(file, java.util.EnumSet.of(CreateFlag.CREATE), CreateOpts.createParent)) { out =>
+ WithClose(context.fs.create(file, false)) { out =>
out.write(interpolated.getBytes(StandardCharsets.UTF_8))
out.hflush()
out.hsync()
@@ -104,7 +104,7 @@ object MetadataJson extends MethodProfiling {
.loadOrThrow[NamedOptions]
}
cache.put(context.root.toUri.toString, toCache)
- PathCache.register(context.fc, file)
+ PathCache.register(context.fs, file)
}
diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/utils/PathCache.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/utils/PathCache.scala
index cf11d66ec340..519fe4efaa38 100644
--- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/utils/PathCache.scala
+++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/main/scala/org/locationtech/geomesa/fs/storage/common/utils/PathCache.scala
@@ -9,7 +9,7 @@
package org.locationtech.geomesa.fs.storage.common.utils
import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine}
-import org.apache.hadoop.fs.{FileContext, FileStatus, Path, RemoteIterator}
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, RemoteIterator}
import org.locationtech.geomesa.utils.conf.GeoMesaSystemProperties.SystemProperty
import java.util.concurrent.TimeUnit
@@ -27,97 +27,96 @@ object PathCache {
// cache for checking existence of files
private val pathCache =
Caffeine.newBuilder().expireAfterWrite(duration, TimeUnit.MILLISECONDS).build(
- new CacheLoader[(FileContext, Path), java.lang.Boolean]() {
- override def load(key: (FileContext, Path)): java.lang.Boolean = key._1.util.exists(key._2)
+ new CacheLoader[(FileSystem, Path), java.lang.Boolean]() {
+ override def load(key: (FileSystem, Path)): java.lang.Boolean = key._1.exists(key._2)
}
)
// cache for individual file status
private val statusCache =
Caffeine.newBuilder().expireAfterWrite(duration, TimeUnit.MILLISECONDS).build(
- new CacheLoader[(FileContext, Path), FileStatus]() {
- override def load(key: (FileContext, Path)): FileStatus = key._1.getFileStatus(key._2)
+ new CacheLoader[(FileSystem, Path), FileStatus]() {
+ override def load(key: (FileSystem, Path)): FileStatus = key._1.getFileStatus(key._2)
}
)
// cache for checking directory contents
private val listCache =
Caffeine.newBuilder().expireAfterWrite(duration, TimeUnit.MILLISECONDS).build(
- new CacheLoader[(FileContext, Path), Stream[FileStatus]]() {
- override def load(key: (FileContext, Path)): Stream[FileStatus] =
- RemoteIterator(key._1.listStatus(key._2)).toStream
+ new CacheLoader[(FileSystem, Path), Stream[FileStatus]]() {
+ override def load(key: (FileSystem, Path)): Stream[FileStatus] =
+ RemoteIterator(key._1.listStatusIterator(key._2)).toStream
}
)
/**
* * Register a path as existing
*
- * @param fc file context
+ * @param fs file system
* @param path path
- * @param status file status, if available
- * @param list directory contents, if available
*/
- def register(
- fc: FileContext,
- path: Path,
- status: Option[FileStatus] = None,
- list: Option[Stream[FileStatus]] = None): Unit = {
- pathCache.put((fc, path), java.lang.Boolean.TRUE)
- status.foreach(statusCache.put((fc, path), _))
- list.foreach(listCache.put((fc, path), _))
+ def register(fs: FileSystem, path: Path): Unit = {
+ pathCache.put((fs, path), java.lang.Boolean.TRUE)
+ val status = statusCache.refresh((fs, path))
+ val parent = path.getParent
+ if (parent != null) {
+ listCache.getIfPresent((fs, parent)) match {
+ case null => // no-op
+ case list => listCache.put((fs, parent), list :+ status.get())
+ }
+ }
}
/**
* Check to see if a path exists
*
- * @param fc file context
+ * @param fs file system
* @param path path
* @param reload reload the file status from the underlying file system before checking
* @return
*/
- def exists(fc: FileContext, path: Path, reload: Boolean = false): Boolean = {
+ def exists(fs: FileSystem, path: Path, reload: Boolean = false): Boolean = {
if (reload) {
- invalidate(fc, path)
+ invalidate(fs, path)
}
- pathCache.get((fc, path)).booleanValue()
+ pathCache.get((fs, path)).booleanValue()
}
/**
* Gets the file status for a path
*
- * @param fc file context
+ * @param fs file system
* @param path path
* @return
*/
- def status(fc: FileContext, path: Path, reload: Boolean = false): FileStatus = {
+ def status(fs: FileSystem, path: Path, reload: Boolean = false): FileStatus = {
if (reload) {
- invalidate(fc, path)
+ invalidate(fs, path)
}
- statusCache.get((fc, path))
+ statusCache.get((fs, path))
}
/**
* List the children of a path
*
- * @param fc file context
+ * @param fs file system
* @param dir directory path
* @return
*/
- def list(fc: FileContext, dir: Path, reload: Boolean = false): Iterator[FileStatus] = {
+ def list(fs: FileSystem, dir: Path, reload: Boolean = false): Iterator[FileStatus] = {
if (reload) {
- invalidate(fc, dir)
+ invalidate(fs, dir)
}
- listCache.get((fc, dir)).iterator
+ listCache.get((fs, dir)).iterator
}
/**
* Invalidate any cached values for the path - they will be re-loaded on next access
*
- * @param fc file context
+ * @param fs file system
* @param path path
*/
- def invalidate(fc: FileContext, path: Path): Unit =
- Seq(pathCache, statusCache, listCache).foreach(_.invalidate((fc, path)))
+ def invalidate(fs: FileSystem, path: Path): Unit = Seq(pathCache, statusCache, listCache).foreach(_.invalidate((fs, path)))
object RemoteIterator {
def apply[T](iter: RemoteIterator[T]): Iterator[T] = new Iterator[T] {
diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/metadata/FileBasedMetadataTest.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/metadata/FileBasedMetadataTest.scala
index c5d4094068e3..9c66a0581096 100644
--- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/metadata/FileBasedMetadataTest.scala
+++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/metadata/FileBasedMetadataTest.scala
@@ -10,8 +10,7 @@ package org.locationtech.geomesa.fs.storage.common.metadata
import org.apache.commons.io.{FileUtils, IOUtils}
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.permission.FsPermission
-import org.apache.hadoop.fs.{FileContext, Path}
+import org.apache.hadoop.fs.{FileSystem, Path}
import org.junit.runner.RunWith
import org.locationtech.geomesa.fs.storage.api.StorageMetadata.{PartitionBounds, PartitionMetadata, StorageFile, StorageFileAction}
import org.locationtech.geomesa.fs.storage.api._
@@ -33,7 +32,7 @@ class FileBasedMetadataTest extends Specification with AllExpectations {
import scala.collection.JavaConverters._
lazy val conf = new Configuration()
- lazy val fc = FileContext.getFileContext(conf)
+ lazy val fs = FileSystem.get(conf)
val sft = SimpleFeatureTypes.createType("metadata",
"name:String,dtg:Date,*geom:Point:srid=4326;geomesa.user-data.prefix=desc,desc.name=姓名,desc.dtg=ひづけ,desc.geom=좌표")
val encoding = "parquet"
@@ -53,7 +52,7 @@ class FileBasedMetadataTest extends Specification with AllExpectations {
"create and persist an empty metadata file" in {
withPath { context =>
val created = factory.create(context, Map.empty, meta)
- PathCache.invalidate(fc, context.root)
+ PathCache.invalidate(fs, context.root)
factory.load(context) must beSome(created)
foreach(Seq(created, FileBasedMetadata.copy(created))) { metadata =>
metadata.encoding mustEqual encoding
@@ -69,7 +68,7 @@ class FileBasedMetadataTest extends Specification with AllExpectations {
created.addPartition(PartitionMetadata("1", Seq(f1), new Envelope(-10, 10, -5, 5), 10L))
created.addPartition(PartitionMetadata("1", Seq(f2,f3), new Envelope(-11, 11, -5, 5), 20L))
created.addPartition(PartitionMetadata("2", Seq(f5, f6), new Envelope(-1, 1, -5, 5), 20L))
- PathCache.invalidate(fc, context.root)
+ PathCache.invalidate(fs, context.root)
factory.load(context) must beSome(created)
foreach(Seq(created, FileBasedMetadata.copy(created))) { metadata =>
metadata.encoding mustEqual encoding
@@ -87,10 +86,10 @@ class FileBasedMetadataTest extends Specification with AllExpectations {
val created = factory.create(context, Map.empty, meta)
created.addPartition(PartitionMetadata("1", Seq(f1), new Envelope(-10, 10, -5, 5), 10L))
created.addPartition(PartitionMetadata("1", Seq(f2, f3), new Envelope(-11, 11, -5, 5), 20L))
- fc.mkdir(new Path(context.root, "metadata/nested/"), FsPermission.getDirDefault, false)
- fc.util.listStatus(new Path(context.root, "metadata")).foreach { file =>
+ fs.mkdirs(new Path(context.root, "metadata/nested/"))
+ fs.listStatus(new Path(context.root, "metadata")).foreach { file =>
if (file.getPath.getName.startsWith("update-")) {
- fc.rename(file.getPath, new Path(context.root, s"metadata/nested/${file.getPath.getName}"))
+ fs.rename(file.getPath, new Path(context.root, s"metadata/nested/${file.getPath.getName}"))
}
}
factory.load(context) must beSome(created)
@@ -176,7 +175,7 @@ class FileBasedMetadataTest extends Specification with AllExpectations {
metadata.addPartition(PartitionMetadata("1", Seq(f1), new Envelope(-10, 10, -5, 5), 10L))
val updates = list(metadata.directory).filter(_.startsWith("update"))
updates must haveLength(1)
- val update = WithClose(fc.open(new Path(metadata.directory, updates.head))) { in =>
+ val update = WithClose(fs.open(new Path(metadata.directory, updates.head))) { in =>
IOUtils.toString(in, StandardCharsets.UTF_8)
}
update must not(beEmpty)
@@ -185,7 +184,7 @@ class FileBasedMetadataTest extends Specification with AllExpectations {
metadata.compact(None)
val compactions = list(metadata.directory).filter(_.startsWith("compact"))
compactions must haveLength(1)
- val compaction = WithClose(fc.open(new Path(metadata.directory, compactions.head))) { in =>
+ val compaction = WithClose(fs.open(new Path(metadata.directory, compactions.head))) { in =>
IOUtils.toString(in, StandardCharsets.UTF_8)
}
compaction must not(beEmpty)
@@ -198,7 +197,7 @@ class FileBasedMetadataTest extends Specification with AllExpectations {
val path = new Path(url.toURI)
val conf = new Configuration()
conf.set("parquet.compression", "gzip")
- val context = FileSystemContext(FileContext.getFileContext(url.toURI), conf, path)
+ val context = FileSystemContext(path, conf)
val metadata = StorageMetadataFactory.load(context).orNull
metadata must beAnInstanceOf[FileBasedMetadata]
val partitions = metadata.getPartitions()
@@ -215,14 +214,14 @@ class FileBasedMetadataTest extends Specification with AllExpectations {
def withPath[R](code: FileSystemContext => R): R = {
val file = Files.createTempDirectory("geomesa").toFile.getPath
- try { code(FileSystemContext(fc, conf, new Path(file))) } finally {
+ try { code(FileSystemContext(fs, conf, new Path(file))) } finally {
FileUtils.deleteDirectory(new File(file))
}
}
def list(dir: Path): Seq[String] = {
val builder = Seq.newBuilder[String]
- val iter = fc.listStatus(dir)
+ val iter = fs.listStatusIterator(dir)
while (iter.hasNext) {
val status = iter.next
val path = status.getPath
diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/metadata/JdbcMetadataTest.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/metadata/JdbcMetadataTest.scala
index aa7569bc47f1..bb35a0ac5ba8 100644
--- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/metadata/JdbcMetadataTest.scala
+++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/metadata/JdbcMetadataTest.scala
@@ -11,7 +11,7 @@ package org.locationtech.geomesa.fs.storage.common.metadata
import com.typesafe.scalalogging.LazyLogging
import org.apache.commons.io.{FileUtils, IOUtils}
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileContext, Path}
+import org.apache.hadoop.fs.{FileSystem, Path}
import org.junit.runner.RunWith
import org.locationtech.geomesa.fs.storage.api.StorageMetadata.{PartitionBounds, PartitionMetadata, StorageFile, StorageFileAction}
import org.locationtech.geomesa.fs.storage.api.{FileSystemContext, Metadata, NamedOptions, PartitionSchemeFactory}
@@ -35,7 +35,7 @@ class JdbcMetadataTest extends Specification with LazyLogging with BeforeAfterAl
import scala.collection.JavaConverters._
lazy val conf = new Configuration()
- lazy val fc = FileContext.getFileContext(conf)
+ lazy val fs = FileSystem.get(conf)
val sft = SimpleFeatureTypes.createType("metadata",
"name:String,dtg:Date,*geom:Point:srid=4326;geomesa.user-data.prefix=desc,desc.name=姓名,desc.dtg=ひづけ,desc.geom=좌표")
val encoding = "parquet"
@@ -252,7 +252,7 @@ class JdbcMetadataTest extends Specification with LazyLogging with BeforeAfterAl
def withPath[R](code: FileSystemContext => R): R = {
val file = Files.createTempDirectory("geomesa").toFile.getPath
- try { code(FileSystemContext(fc, conf, new Path(file))) } finally {
+ try { code(FileSystemContext(fs, conf, new Path(file))) } finally {
FileUtils.deleteDirectory(new File(file))
}
}
diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/metadata/MetadataJsonTest.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/metadata/MetadataJsonTest.scala
index 8a615fffdb7c..9b2c735e9d12 100644
--- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/metadata/MetadataJsonTest.scala
+++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-common/src/test/scala/org/locationtech/geomesa/fs/storage/common/metadata/MetadataJsonTest.scala
@@ -10,7 +10,7 @@ package org.locationtech.geomesa.fs.storage.common.metadata
import org.apache.commons.io.{FileUtils, IOUtils}
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileContext, Path}
+import org.apache.hadoop.fs.{FileSystem, Path}
import org.junit.runner.RunWith
import org.locationtech.geomesa.fs.storage.api._
import org.locationtech.geomesa.fs.storage.common.metadata.MetadataJson.MetadataPath
@@ -26,7 +26,7 @@ import java.nio.file.Files
class MetadataJsonTest extends Specification {
lazy val conf = new Configuration()
- lazy val fc = FileContext.getFileContext(conf)
+ lazy val fs = FileSystem.get(conf)
"MetadataJson" should {
"persist and replace system properties (and environment variables)" in {
@@ -39,7 +39,7 @@ class MetadataJsonTest extends Specification {
val opts = NamedOptions("jdbc", Map("user" -> "root", "password" -> interpolated))
MetadataJson.writeMetadata(context, opts)
val file = new Path(context.root, MetadataPath)
- val serialized = WithClose(context.fc.open(file))(is => IOUtils.toString(is, StandardCharsets.UTF_8))
+ val serialized = WithClose(context.fs.open(file))(is => IOUtils.toString(is, StandardCharsets.UTF_8))
serialized must contain(interpolated)
serialized must not(contain("bar"))
val returned = MetadataJson.readMetadata(context)
@@ -53,7 +53,7 @@ class MetadataJsonTest extends Specification {
def withPath[R](code: FileSystemContext => R): R = {
val file = Files.createTempDirectory("geomesa").toFile.getPath
- try { code(FileSystemContext(fc, conf, new Path(file))) } finally {
+ try { code(FileSystemContext(fs, conf, new Path(file))) } finally {
FileUtils.deleteDirectory(new File(file))
}
}
diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterFileSystemReader.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterFileSystemReader.scala
index 741bc33fa956..6db37de760bc 100644
--- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterFileSystemReader.scala
+++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterFileSystemReader.scala
@@ -10,7 +10,7 @@ package org.locationtech.geomesa.fs.storage.converter
import com.typesafe.scalalogging.StrictLogging
import org.apache.commons.compress.archivers.ArchiveStreamFactory
-import org.apache.hadoop.fs.{FileContext, Path}
+import org.apache.hadoop.fs.{FileContext, FileSystem, Path}
import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
import org.geotools.api.filter.Filter
import org.locationtech.geomesa.convert.EvaluationContext
@@ -25,7 +25,7 @@ import java.util.Locale
import scala.util.control.NonFatal
class ConverterFileSystemReader(
- fc: FileContext,
+ fs: FileSystem,
converter: SimpleFeatureConverter,
filter: Option[Filter],
transform: Option[(String, SimpleFeatureType)]
@@ -37,9 +37,9 @@ class ConverterFileSystemReader(
logger.debug(s"Opening file $path")
val iter = try {
val handle = PathUtils.getUncompressedExtension(path.getName).toLowerCase(Locale.US) match {
- case TAR => new HadoopTarHandle(fc, path)
- case ZIP | JAR => new HadoopZipHandle(fc, path)
- case _ => new HadoopFileHandle(fc, path)
+ case TAR => new HadoopTarHandle(fs, path)
+ case ZIP | JAR => new HadoopZipHandle(fs, path)
+ case _ => new HadoopFileHandle(fs, path)
}
handle.open.flatMap { case (name, is) =>
val params = EvaluationContext.inputFileParam(name.getOrElse(handle.path))
diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterMetadata.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterMetadata.scala
index 0ceda79f1348..c0fd6db0fa62 100644
--- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterMetadata.scala
+++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterMetadata.scala
@@ -38,9 +38,9 @@ class ConverterMetadata(
override def getPartition(name: String): Option[PartitionMetadata] = {
val path = new Path(context.root, name)
- if (!PathCache.exists(context.fc, path)) { None } else {
+ if (!PathCache.exists(context.fs, path)) { None } else {
val files = if (leafStorage) { Seq(StorageFile(name, 0L)) } else {
- PathCache.list(context.fc, path).map(fs => StorageFile(fs.getPath.getName, 0L)).toList
+ PathCache.list(context.fs, path).map(fs => StorageFile(fs.getPath.getName, 0L)).toList
}
Some(PartitionMetadata(name, files, None, -1L))
}
@@ -49,7 +49,7 @@ class ConverterMetadata(
override def getPartitions(prefix: Option[String]): Seq[PartitionMetadata] = {
buildPartitionList(prefix, dirty.compareAndSet(true, false)).map { name =>
val files = if (leafStorage) { Seq(StorageFile(name, 0L)) } else {
- PathCache.list(context.fc, new Path(context.root, name)).map(fs => StorageFile(fs.getPath.getName, 0L)).toList
+ PathCache.list(context.fs, new Path(context.root, name)).map(fs => StorageFile(fs.getPath.getName, 0L)).toList
}
PartitionMetadata(name, files, None, -1L)
}
@@ -73,9 +73,9 @@ class ConverterMetadata(
private def buildPartitionList(prefix: Option[String], invalidate: Boolean): List[String] = {
if (invalidate) {
- PathCache.invalidate(context.fc, context.root)
+ PathCache.invalidate(context.fs, context.root)
}
- val top = PathCache.list(context.fc, context.root)
+ val top = PathCache.list(context.fs, context.root)
top.flatMap(f => buildPartitionList(f.getPath, "", prefix, 1, invalidate)).toList
}
@@ -86,9 +86,9 @@ class ConverterMetadata(
curDepth: Int,
invalidate: Boolean): List[String] = {
if (invalidate) {
- PathCache.invalidate(context.fc, path)
+ PathCache.invalidate(context.fs, path)
}
- if (curDepth > scheme.depth || !PathCache.status(context.fc, path).isDirectory) {
+ if (curDepth > scheme.depth || !PathCache.status(context.fs, path).isDirectory) {
val file = s"$prefix${path.getName}"
if (filter.forall(file.startsWith)) { List(file) } else { List.empty }
} else {
@@ -97,7 +97,7 @@ class ConverterMetadata(
if (next.length >= f.length) { next.startsWith(f) } else { next == f.substring(0, next.length) }
}
if (continue) {
- PathCache.list(context.fc, path).toList.flatMap { f =>
+ PathCache.list(context.fs, path).toList.flatMap { f =>
buildPartitionList(f.getPath, s"$next/", filter, curDepth + 1, invalidate)
}
} else {
diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterStorage.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterStorage.scala
index 4e92c5fddb80..8a4503fb2b80 100644
--- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterStorage.scala
+++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterStorage.scala
@@ -36,13 +36,13 @@ class ConverterStorage(context: FileSystemContext, metadata: StorageMetadata, co
override protected def createReader(
filter: Option[Filter],
transform: Option[(String, SimpleFeatureType)]): FileSystemPathReader = {
- new ConverterFileSystemReader(context.fc, converter, filter, transform)
+ new ConverterFileSystemReader(context.fs, converter, filter, transform)
}
override def getFilePaths(partition: String): Seq[StorageFilePath] = {
val path = new Path(context.root, partition)
if (metadata.leafStorage) { Seq(StorageFilePath(StorageFile(path.getName, 0L), path)) } else {
- PathCache.list(context.fc, path).map(p => StorageFilePath(StorageFile(p.getPath.getName, 0L), p.getPath)).toList
+ PathCache.list(context.fs, path).map(p => StorageFilePath(StorageFile(p.getPath.getName, 0L), p.getPath)).toList
}
}
diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/scala/org/locationtech/geomesa/fs/storage/converter/ConverterFileSystemStorageTest.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/scala/org/locationtech/geomesa/fs/storage/converter/ConverterFileSystemStorageTest.scala
index 8bfbe41eb7bc..5d6c2eb6683b 100644
--- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/scala/org/locationtech/geomesa/fs/storage/converter/ConverterFileSystemStorageTest.scala
+++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/scala/org/locationtech/geomesa/fs/storage/converter/ConverterFileSystemStorageTest.scala
@@ -10,7 +10,7 @@ package org.locationtech.geomesa.fs.storage.converter
import com.typesafe.scalalogging.LazyLogging
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileContext, Path}
+import org.apache.hadoop.fs.{FileSystem, Path}
import org.geotools.api.data.Query
import org.geotools.filter.text.ecql.ECQL
import org.junit.runner.RunWith
@@ -60,7 +60,7 @@ class ConverterFileSystemStorageTest extends Specification with LazyLogging {
conf.set(ConverterStorageFactory.PartitionSchemeParam, "daily")
conf.set(ConverterStorageFactory.LeafStorageParam, "false")
- val context = FileSystemContext(FileContext.getFileContext(dir), conf, new Path(dir))
+ val context = FileSystemContext(new Path(dir), conf)
val metadata = StorageMetadataFactory.load(context).orNull
metadata must not(beNull)
val storage = FileSystemStorageFactory(context, metadata)
diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-orc/src/test/scala/org/locationtech/geomesa/fs/storage/orc/OrcFileSystemStorageTest.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-orc/src/test/scala/org/locationtech/geomesa/fs/storage/orc/OrcFileSystemStorageTest.scala
index 18e5608ac4fa..9a53cf9e11a1 100644
--- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-orc/src/test/scala/org/locationtech/geomesa/fs/storage/orc/OrcFileSystemStorageTest.scala
+++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-orc/src/test/scala/org/locationtech/geomesa/fs/storage/orc/OrcFileSystemStorageTest.scala
@@ -10,7 +10,7 @@ package org.locationtech.geomesa.fs.storage.orc
import com.typesafe.scalalogging.LazyLogging
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileContext, Path}
+import org.apache.hadoop.fs.{FileContext, FileSystem, Path}
import org.geotools.api.data.Query
import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
import org.geotools.api.filter.Filter
@@ -59,7 +59,7 @@ class OrcFileSystemStorageTest extends Specification with LazyLogging {
}
withTestDir { dir =>
- val context = FileSystemContext(FileContext.getFileContext(dir.toUri), config, dir)
+ val context = FileSystemContext(dir, config)
val metadata =
new FileBasedMetadataFactory()
.create(context, Map.empty, Metadata(sft, "orc", scheme, leafStorage = true))
@@ -145,7 +145,7 @@ class OrcFileSystemStorageTest extends Specification with LazyLogging {
}
withTestDir { dir =>
- val context = FileSystemContext(FileContext.getFileContext(dir.toUri), config, dir)
+ val context = FileSystemContext(dir, config)
val metadata =
new FileBasedMetadataFactory()
.create(context, Map.empty, Metadata(sft, "orc", scheme, leafStorage = true))
@@ -202,7 +202,7 @@ class OrcFileSystemStorageTest extends Specification with LazyLogging {
}
withTestDir { dir =>
- val context = FileSystemContext(FileContext.getFileContext(dir.toUri), config, dir)
+ val context = FileSystemContext(dir, config)
val metadata =
new FileBasedMetadataFactory()
.create(context, Map.empty, Metadata(sft, "orc", scheme, leafStorage = true))
@@ -264,7 +264,7 @@ class OrcFileSystemStorageTest extends Specification with LazyLogging {
}
withTestDir { dir =>
- val context = FileSystemContext(FileContext.getFileContext(dir.toUri), config, dir)
+ val context = FileSystemContext(dir, config)
val metadata =
new FileBasedMetadataFactory()
.create(context, Map.empty, Metadata(sft, "orc", scheme, leafStorage = true))
@@ -315,11 +315,11 @@ class OrcFileSystemStorageTest extends Specification with LazyLogging {
"transition old metadata files" in {
withTestDir { dir =>
- val context = FileSystemContext(FileContext.getFileContext(dir.toUri), config, dir)
+ val context = FileSystemContext(dir, config)
val meta = new Path(dir, "metadata.json")
- context.fc.util.copy(new Path(getClass.getClassLoader.getResource("metadata-old.json").toURI), meta)
- context.fc.util.exists(meta) must beTrue
- PathCache.invalidate(context.fc, meta)
+ context.fs.copyFromLocalFile(new Path(getClass.getClassLoader.getResource("metadata-old.json").toURI), meta)
+ context.fs.exists(meta) must beTrue
+ PathCache.invalidate(context.fs, meta)
val metadata = new FileBasedMetadataFactory().load(context)
metadata must beSome
diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/test/scala/org/locationtech/geomesa/parquet/CompactionTest.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/test/scala/org/locationtech/geomesa/parquet/CompactionTest.scala
index b016357343f2..19f6096c3a54 100644
--- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/test/scala/org/locationtech/geomesa/parquet/CompactionTest.scala
+++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/test/scala/org/locationtech/geomesa/parquet/CompactionTest.scala
@@ -10,7 +10,7 @@ package org.locationtech.geomesa.parquet
import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileContext, Path}
+import org.apache.hadoop.fs.{FileSystem, Path}
import org.geotools.api.data.Query
import org.geotools.filter.text.ecql.ECQL
import org.geotools.util.factory.Hints
@@ -35,15 +35,15 @@ class CompactionTest extends Specification with AllExpectations {
sequential
- val sft = SimpleFeatureTypes.createType("test", "name:String,age:Int,dtg:Date,*geom:Point:srid=4326")
- val tempDir = Files.createTempDirectory("geomesa")
- val fc = FileContext.getFileContext(tempDir.toUri)
+ lazy val sft = SimpleFeatureTypes.createType("test", "name:String,age:Int,dtg:Date,*geom:Point:srid=4326")
+ lazy val tempDir = Files.createTempDirectory("geomesa")
+ lazy val fs = FileSystem.get(tempDir.toUri, new Configuration())
"ParquetFileSystemStorage" should {
"compact partitions" >> {
val conf = new Configuration()
conf.set("parquet.compression", "gzip")
- val context = FileSystemContext(fc, conf, new Path(tempDir.toUri))
+ val context = FileSystemContext(fs, conf, new Path(tempDir.toUri))
val metadata =
new FileBasedMetadataFactory()
@@ -107,7 +107,7 @@ class CompactionTest extends Specification with AllExpectations {
// compact to a given file size
// verify if file is appropriately sized, it won't be modified
val paths = fsStorage.getFilePaths(partition).map(_.path)
- val size = paths.map(f => fc.getFileStatus(f).getLen).sum
+ val size = paths.map(f => fs.getFileStatus(f).getLen).sum
fsStorage.compact(Some(partition), Some(size))
fsStorage.getFilePaths(partition).map(_.path) mustEqual paths
// verify files are split into smaller ones
diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/test/scala/org/locationtech/geomesa/parquet/ParquetStorageTest.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/test/scala/org/locationtech/geomesa/parquet/ParquetStorageTest.scala
index 995482f535ba..1207d259da17 100644
--- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/test/scala/org/locationtech/geomesa/parquet/ParquetStorageTest.scala
+++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-parquet/src/test/scala/org/locationtech/geomesa/parquet/ParquetStorageTest.scala
@@ -11,7 +11,7 @@ package org.locationtech.geomesa.parquet
import com.typesafe.scalalogging.LazyLogging
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileContext, Path}
+import org.apache.hadoop.fs.{FileSystem, Path}
import org.geotools.api.data.Query
import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType}
import org.geotools.api.filter.Filter
@@ -62,7 +62,7 @@ class ParquetStorageTest extends Specification with AllExpectations with LazyLog
}
withTestDir { dir =>
- val context = FileSystemContext(FileContext.getFileContext(dir.toUri), config, dir)
+ val context = FileSystemContext(dir, config)
val metadata =
new FileBasedMetadataFactory()
.create(context, Map.empty, Metadata(sft, "parquet", scheme, leafStorage = true))
@@ -148,7 +148,7 @@ class ParquetStorageTest extends Specification with AllExpectations with LazyLog
}
withTestDir { dir =>
- val context = FileSystemContext(FileContext.getFileContext(dir.toUri), config, dir)
+ val context = FileSystemContext(dir, config)
val metadata =
new FileBasedMetadataFactory()
.create(context, Map.empty, Metadata(sft, "parquet", scheme, leafStorage = true))
@@ -205,7 +205,7 @@ class ParquetStorageTest extends Specification with AllExpectations with LazyLog
}
withTestDir { dir =>
- val context = FileSystemContext(FileContext.getFileContext(dir.toUri), config, dir)
+ val context = FileSystemContext(dir, config)
val metadata =
new FileBasedMetadataFactory()
.create(context, Map.empty, Metadata(sft, "parquet", scheme, leafStorage = true))
@@ -267,7 +267,7 @@ class ParquetStorageTest extends Specification with AllExpectations with LazyLog
}
withTestDir { dir =>
- val context = FileSystemContext(FileContext.getFileContext(dir.toUri), config, dir)
+ val context = FileSystemContext(dir, config)
val metadata =
new FileBasedMetadataFactory()
.create(context, Map.empty, Metadata(sft, "parquet", scheme, leafStorage = true))
@@ -333,7 +333,7 @@ class ParquetStorageTest extends Specification with AllExpectations with LazyLog
val targetSize = 2100L
withTestDir { dir =>
- val context = FileSystemContext(FileContext.getFileContext(dir.toUri), config, dir)
+ val context = FileSystemContext(dir, config)
val metadata =
new FileBasedMetadataFactory()
.create(context, Map.empty, Metadata(sft, "parquet", scheme, leafStorage = true, Some(targetSize)))
@@ -358,7 +358,7 @@ class ParquetStorageTest extends Specification with AllExpectations with LazyLog
foreach(partitions) { partition =>
val paths = storage.getFilePaths(partition)
paths.size must beGreaterThan(1)
- foreach(paths)(p => context.fc.getFileStatus(p.path).getLen must beCloseTo(targetSize, targetSize / 10))
+ foreach(paths)(p => context.fs.getFileStatus(p.path).getLen must beCloseTo(targetSize, targetSize / 10))
}
}
}
@@ -367,7 +367,7 @@ class ParquetStorageTest extends Specification with AllExpectations with LazyLog
val url = getClass.getClassLoader.getResource("data/2.3.0/example-csv/")
url must not(beNull)
val path = new Path(url.toURI)
- val context = FileSystemContext(FileContext.getFileContext(url.toURI), config, path)
+ val context = FileSystemContext(path, config)
val metadata = StorageMetadataFactory.load(context).orNull
metadata must not(beNull)
val storage = FileSystemStorageFactory(context, metadata)
diff --git a/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/compact/FileSystemCompactionJob.scala b/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/compact/FileSystemCompactionJob.scala
index 1dd163d6fe09..f7a3ef948af9 100644
--- a/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/compact/FileSystemCompactionJob.scala
+++ b/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/compact/FileSystemCompactionJob.scala
@@ -63,7 +63,7 @@ trait FileSystemCompactionJob extends StorageConfiguration with JobWithLibJars {
job.setOutputKeyClass(classOf[Void])
job.setOutputValueClass(classOf[SimpleFeature])
- val qualifiedTempPath = tempPath.map(storage.context.fc.makeQualified)
+ val qualifiedTempPath = tempPath.map(storage.context.fs.makeQualified)
StorageConfiguration.setRootPath(job.getConfiguration, storage.context.root)
StorageConfiguration.setPartitions(job.getConfiguration, partitions.map(_.name).toArray)
@@ -105,7 +105,7 @@ trait FileSystemCompactionJob extends StorageConfiguration with JobWithLibJars {
existingDataFiles.foreach { case (partition, files) =>
val counter = StorageConfiguration.Counters.partition(partition.name)
val count = Option(job.getCounters.findCounter(StorageConfiguration.Counters.Group, counter)).map(_.getValue)
- files.foreach(f => storage.context.fc.delete(f.path, false))
+ files.foreach(f => storage.context.fs.delete(f.path, false))
storage.metadata.removePartition(partition.copy(count = count.getOrElse(0L)))
val removed = count.map(c => s"containing $c features ").getOrElse("")
Command.user.info(s"Removed ${TextTools.getPlural(files.size, "file")} ${removed}in partition ${partition.name}")
diff --git a/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/compact/PartitionInputFormat.scala b/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/compact/PartitionInputFormat.scala
index 8af2bfc64b35..8895273e76ca 100644
--- a/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/compact/PartitionInputFormat.scala
+++ b/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/compact/PartitionInputFormat.scala
@@ -8,7 +8,7 @@
package org.locationtech.geomesa.fs.tools.compact
-import org.apache.hadoop.fs.{FileContext, Path}
+import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
import org.geotools.api.data.Query
@@ -35,7 +35,7 @@ class PartitionInputFormat extends InputFormat[Void, SimpleFeature] {
val conf = context.getConfiguration
val root = StorageConfiguration.getRootPath(conf)
- val fsc = FileSystemContext(FileContext.getFileContext(root.toUri, conf), conf, root)
+ val fsc = FileSystemContext(root, conf)
val fileSize = StorageConfiguration.getTargetFileSize(conf)
val metadata = StorageMetadataFactory.load(fsc).getOrElse {
@@ -49,7 +49,7 @@ class PartitionInputFormat extends InputFormat[Void, SimpleFeature] {
var size = 0L
val files = storage.getFilePaths(partition).filter { f =>
if (sizeCheck.exists(_.apply(f.path))) { false } else {
- size += PathCache.status(fsc.fc, f.path).getLen
+ size += PathCache.status(fsc.fs, f.path).getLen
true
}
}
@@ -127,7 +127,7 @@ object PartitionInputFormat {
override def initialize(split: InputSplit, context: TaskAttemptContext): Unit = {
val conf = context.getConfiguration
val root = StorageConfiguration.getRootPath(conf)
- val fsc = FileSystemContext(FileContext.getFileContext(root.toUri, conf), conf, root)
+ val fsc = FileSystemContext(root, conf)
val metadata = StorageMetadataFactory.load(fsc).getOrElse {
throw new IllegalArgumentException(s"No storage defined under path '$root'")
}
diff --git a/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/ingest/FileSystemConverterJob.scala b/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/ingest/FileSystemConverterJob.scala
index ccc61826e487..d751fd719057 100644
--- a/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/ingest/FileSystemConverterJob.scala
+++ b/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/ingest/FileSystemConverterJob.scala
@@ -10,7 +10,7 @@ package org.locationtech.geomesa.fs.tools.ingest
import com.typesafe.config.Config
import com.typesafe.scalalogging.LazyLogging
-import org.apache.hadoop.fs.{FileContext, Path}
+import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.{BytesWritable, LongWritable, Text}
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
@@ -137,9 +137,8 @@ object FileSystemConverterJob {
override def setup(context: Context): Unit = {
val root = StorageConfiguration.getRootPath(context.getConfiguration)
- val fc = FileContext.getFileContext(root.toUri, context.getConfiguration)
// note: we don't call `reload` (to get the partition metadata) as we aren't using it
- metadata = StorageMetadataFactory.load(FileSystemContext(fc, context.getConfiguration, root)).getOrElse {
+ metadata = StorageMetadataFactory.load(FileSystemContext(root, context.getConfiguration)).getOrElse {
throw new IllegalArgumentException(s"Could not load storage instance at path $root")
}
serializer = KryoFeatureSerializer(metadata.sft, SerializationOptions.none)
@@ -179,9 +178,8 @@ object FileSystemConverterJob {
override def setup(context: Context): Unit = {
val root = StorageConfiguration.getRootPath(context.getConfiguration)
- val fc = FileContext.getFileContext(root.toUri, context.getConfiguration)
// note: we don't call `reload` (to get the partition metadata) as we aren't using it
- val metadata = StorageMetadataFactory.load(FileSystemContext(fc, context.getConfiguration, root)).getOrElse {
+ val metadata = StorageMetadataFactory.load(FileSystemContext(root, context.getConfiguration)).getOrElse {
throw new IllegalArgumentException(s"Could not load storage instance at path $root")
}
serializer = KryoFeatureSerializer(metadata.sft, SerializationOptions.none)
diff --git a/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/ingest/FsIngestCommand.scala b/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/ingest/FsIngestCommand.scala
index f3b7f047c46d..fc900c3e8fe7 100644
--- a/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/ingest/FsIngestCommand.scala
+++ b/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/ingest/FsIngestCommand.scala
@@ -53,13 +53,13 @@ class FsIngestCommand extends IngestCommand[FileSystemDataStore] with FsDistribu
throw new ParameterException("Please specify --num-reducers for distributed ingest")
}
val storage = ds.storage(sft.getTypeName)
- val tmpPath = Option(params.tempPath).map(d => storage.context.fc.makeQualified(new Path(d)))
+ val tmpPath = Option(params.tempPath).map(d => storage.context.fs.makeQualified(new Path(d)))
val targetFileSize = storage.metadata.get(Metadata.TargetFileSize).map(_.toLong)
tmpPath.foreach { tp =>
- if (storage.context.fc.util.exists(tp)) {
+ if (storage.context.fs.exists(tp)) {
Command.user.info(s"Deleting temp path $tp")
- storage.context.fc.delete(tp, true)
+ storage.context.fs.delete(tp, true)
}
}
diff --git a/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/ingest/FsManageMetadataCommand.scala b/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/ingest/FsManageMetadataCommand.scala
index ad8c33aa8276..9dafbd8738a1 100644
--- a/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/ingest/FsManageMetadataCommand.scala
+++ b/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/ingest/FsManageMetadataCommand.scala
@@ -228,7 +228,7 @@ object FsManageMetadataCommand {
* added to onDisk
*/
private def listRoot(): Unit = {
- val iter = storage.context.fc.listStatus(storage.context.root)
+ val iter = storage.context.fs.listStatusIterator(storage.context.root)
// use a phaser to track worker thread completion
val phaser = new Phaser(2) // 1 for this thread + 1 for the worker
pool.submit(new TopLevelListWorker(phaser, iter))
@@ -263,7 +263,7 @@ object FsManageMetadataCommand {
partitions.forall(_.exists(p => p == name || p.startsWith(s"$name/")))) {
i += 1
// use a tiered phaser on each directory avoid the limit of 65535 registered parties
- pool.submit(new ListWorker(new Phaser(phaser, 1), name, storage.context.fc.listStatus(path)))
+ pool.submit(new ListWorker(new Phaser(phaser, 1), name, storage.context.fs.listStatusIterator(path)))
}
} else if (name != MetadataJson.MetadataPath) {
onDisk.computeIfAbsent("", computeFunction).put(name, java.lang.Boolean.TRUE)
@@ -293,7 +293,7 @@ object FsManageMetadataCommand {
if (partitions.forall(_.exists(p => p == nextPartition || p.startsWith(s"$nextPartition/")))) {
i += 1
// use a tiered phaser on each directory avoid the limit of 65535 registered parties
- pool.submit(new ListWorker(new Phaser(phaser, 1), nextPartition, storage.context.fc.listStatus(path)))
+ pool.submit(new ListWorker(new Phaser(phaser, 1), nextPartition, storage.context.fs.listStatusIterator(path)))
}
} else {
val leafPartition =
diff --git a/geomesa-fs/geomesa-fs-tools/src/test/scala/org/locationtech/geomesa/fs/tools/ingest/CompactCommandTest.scala b/geomesa-fs/geomesa-fs-tools/src/test/scala/org/locationtech/geomesa/fs/tools/ingest/CompactCommandTest.scala
index 3ea889ef558b..f5aa815c7e90 100644
--- a/geomesa-fs/geomesa-fs-tools/src/test/scala/org/locationtech/geomesa/fs/tools/ingest/CompactCommandTest.scala
+++ b/geomesa-fs/geomesa-fs-tools/src/test/scala/org/locationtech/geomesa/fs/tools/ingest/CompactCommandTest.scala
@@ -160,7 +160,7 @@ class CompactCommandTest extends Specification {
val storage = ds.storage(sft.getTypeName)
foreach(storage.metadata.getPartitions()) { partition =>
partition.files.size must beGreaterThan(1)
- val sizes = storage.getFilePaths(partition.name).map(p => storage.context.fc.getFileStatus(p.path).getLen)
+ val sizes = storage.getFilePaths(partition.name).map(p => storage.context.fs.getFileStatus(p.path).getLen)
// hard to get very close with 2 different formats and small files...
foreach(sizes)(_ must beCloseTo(targetFileSize, 4000))
}
diff --git a/geomesa-fs/geomesa-fs-tools/src/test/scala/org/locationtech/geomesa/fs/tools/ingest/FsManageMetadataCommandTest.scala b/geomesa-fs/geomesa-fs-tools/src/test/scala/org/locationtech/geomesa/fs/tools/ingest/FsManageMetadataCommandTest.scala
index cd74c3229766..88325a39ceb0 100644
--- a/geomesa-fs/geomesa-fs-tools/src/test/scala/org/locationtech/geomesa/fs/tools/ingest/FsManageMetadataCommandTest.scala
+++ b/geomesa-fs/geomesa-fs-tools/src/test/scala/org/locationtech/geomesa/fs/tools/ingest/FsManageMetadataCommandTest.scala
@@ -61,7 +61,7 @@ class FsManageMetadataCommandTest extends Specification {
val files = storage.metadata.getPartitions().flatMap(_.files.map(_.name)).toList
// move a file - it's not in the right partition so it won't be matched correctly by filters,
// but it's good enough for a test
- storage.context.fc.rename(new Path(storage.context.root, "2022"), new Path(storage.context.root, "2019"))
+ storage.context.fs.rename(new Path(storage.context.root, "2022"), new Path(storage.context.root, "2019"))
// verify we can't retrieve the moved file
SelfClosingIterator(ds.getFeatureReader(new Query(sft.getTypeName), Transaction.AUTO_COMMIT)).toList must
containTheSameElementsAs(features.take(2))
@@ -94,7 +94,7 @@ class FsManageMetadataCommandTest extends Specification {
}
val storage = ds.storage(sft.getTypeName)
// delete a file
- storage.context.fc.delete(new Path(storage.context.root, "2022"), true)
+ storage.context.fs.delete(new Path(storage.context.root, "2022"), true)
// delete a partition from the metadata
storage.metadata.getPartition("2021/01/01") match {
case None => ko("Expected Some for partition but got none")
diff --git a/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/export/ExportCommand.scala b/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/export/ExportCommand.scala
index f2ec865e6aab..48f7c8ecee7e 100644
--- a/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/export/ExportCommand.scala
+++ b/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/export/ExportCommand.scala
@@ -601,7 +601,7 @@ object ExportCommand extends LazyLogging {
// lowest level - keep track of the bytes we write
// do this before any compression, buffering, etc so we get an accurate count
- private val counter = new CountingOutputStream(out.write(CreateMode.Create, createParents = true))
+ private val counter = new CountingOutputStream(out.write(CreateMode.Create))
private val stream = {
val compressed = gzip match {
case None => counter
diff --git a/geomesa-tools/src/test/scala/org/locationtech/geomesa/tools/export/ExportToFsTest.scala b/geomesa-tools/src/test/scala/org/locationtech/geomesa/tools/export/ExportToFsTest.scala
index b32eb4133a0f..47bfc909f608 100644
--- a/geomesa-tools/src/test/scala/org/locationtech/geomesa/tools/export/ExportToFsTest.scala
+++ b/geomesa-tools/src/test/scala/org/locationtech/geomesa/tools/export/ExportToFsTest.scala
@@ -9,7 +9,7 @@
package org.locationtech.geomesa.tools.`export`
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileContext, Path}
+import org.apache.hadoop.fs.{FileSystem, Path}
import org.geotools.api.data.{DataStore, Query, SimpleFeatureStore}
import org.geotools.data.collection.ListFeatureCollection
import org.geotools.data.memory.MemoryDataStore
@@ -59,7 +59,8 @@ class ExportToFsTest extends Specification {
.addFeatures(new ListFeatureCollection(sft, features: _*))
val storage = {
- val context = FileSystemContext(FileContext.getFileContext, new Configuration(), new Path(out.toUri))
+ val conf = new Configuration()
+ val context = FileSystemContext(new Path(out.toUri), conf)
val metadata =
new FileBasedMetadataFactory()
.create(context, Map.empty, Metadata(sft, "parquet", NamedOptions("daily"), leafStorage = true))
diff --git a/geomesa-utils-parent/geomesa-hadoop-utils/src/main/scala/org/locationtech/geomesa/utils/hadoop/HadoopDelegate.scala b/geomesa-utils-parent/geomesa-hadoop-utils/src/main/scala/org/locationtech/geomesa/utils/hadoop/HadoopDelegate.scala
index 8576e9c289f8..9693a6479181 100644
--- a/geomesa-utils-parent/geomesa-hadoop-utils/src/main/scala/org/locationtech/geomesa/utils/hadoop/HadoopDelegate.scala
+++ b/geomesa-utils-parent/geomesa-hadoop-utils/src/main/scala/org/locationtech/geomesa/utils/hadoop/HadoopDelegate.scala
@@ -9,12 +9,11 @@
package org.locationtech.geomesa.utils.hadoop
import com.typesafe.scalalogging.LazyLogging
-import org.apache.commons.compress.archivers.{ArchiveEntry, ArchiveInputStream, ArchiveStreamFactory}
import org.apache.commons.compress.archivers.zip.ZipFile
+import org.apache.commons.compress.archivers.{ArchiveEntry, ArchiveInputStream, ArchiveStreamFactory}
import org.apache.commons.compress.utils.SeekableInMemoryByteChannel
import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Options.CreateOpts
import org.apache.hadoop.fs._
import org.locationtech.geomesa.utils.collection.CloseableIterator
import org.locationtech.geomesa.utils.hadoop.HadoopDelegate.{HadoopFileHandle, HadoopTarHandle, HadoopZipHandle}
@@ -47,19 +46,19 @@ class HadoopDelegate(conf: Configuration) extends FileSystemDelegate {
override def getHandle(path: String): FileHandle = {
val p = new Path(path)
- val fc = FileContext.getFileContext(p.toUri, conf)
+ val fs = FileSystem.get(p.toUri, conf)
PathUtils.getUncompressedExtension(p.getName).toLowerCase(Locale.US) match {
- case TAR => new HadoopTarHandle(fc, p)
- case ZIP | JAR => new HadoopZipHandle(fc, p)
- case _ => new HadoopFileHandle(fc, p)
+ case TAR => new HadoopTarHandle(fs, p)
+ case ZIP | JAR => new HadoopZipHandle(fs, p)
+ case _ => new HadoopFileHandle(fs, p)
}
}
// based on logic from hadoop FileInputFormat
override def interpretPath(path: String): Seq[FileHandle] = {
val p = new Path(path)
- val fc = FileContext.getFileContext(p.toUri, conf)
- val files = fc.util.globStatus(p, HiddenFileFilter)
+ val fs = FileSystem.get(p.toUri, conf)
+ val files = fs.globStatus(p, HiddenFileFilter)
if (files == null) {
throw new IllegalArgumentException(s"Input path does not exist: $path")
@@ -74,7 +73,7 @@ class HadoopDelegate(conf: Configuration) extends FileSystemDelegate {
val file = remaining.dequeue()
if (file.isDirectory) {
if (recursive) {
- val children = fc.listLocatedStatus(file.getPath)
+ val children = fs.listLocatedStatus(file.getPath)
val iter = new Iterator[LocatedFileStatus] {
override def hasNext: Boolean = children.hasNext
override def next(): LocatedFileStatus = children.next
@@ -83,9 +82,9 @@ class HadoopDelegate(conf: Configuration) extends FileSystemDelegate {
}
} else {
PathUtils.getUncompressedExtension(file.getPath.getName).toLowerCase(Locale.US) match {
- case TAR => result += new HadoopTarHandle(fc, file.getPath)
- case ZIP | JAR => result += new HadoopZipHandle(fc, file.getPath)
- case _ => result += new HadoopFileHandle(fc, file.getPath)
+ case TAR => result += new HadoopTarHandle(fs, file.getPath)
+ case ZIP | JAR => result += new HadoopZipHandle(fs, file.getPath)
+ case _ => result += new HadoopFileHandle(fs, file.getPath)
}
}
}
@@ -129,63 +128,57 @@ object HadoopDelegate extends LazyLogging {
}
}
- class HadoopFileHandle(fc: FileContext, file: Path) extends FileHandle {
+ class HadoopFileHandle(fs: FileSystem, file: Path) extends FileHandle {
override def path: String = file.toString
- override def exists: Boolean = fc.util.exists(file)
+ override def exists: Boolean = fs.exists(file)
- override def length: Long = if (exists) { fc.getFileStatus(file).getLen } else { 0L }
+ override def length: Long = if (exists) { fs.getFileStatus(file).getLen } else { 0L }
override def open: CloseableIterator[(Option[String], InputStream)] = {
- val is = PathUtils.handleCompression(fc.open(file), file.getName)
+ val is = PathUtils.handleCompression(fs.open(file), file.getName)
CloseableIterator.single(None -> is, is.close())
}
- override def write(mode: CreateMode, createParents: Boolean): OutputStream = {
+ override def write(mode: CreateMode): OutputStream = {
mode.validate()
- val flags = java.util.EnumSet.noneOf(classOf[CreateFlag])
if (mode.append) {
- flags.add(CreateFlag.APPEND)
- } else if (mode.overwrite) {
- flags.add(CreateFlag.OVERWRITE)
- }
- if (mode.create) {
- flags.add(CreateFlag.CREATE)
+ fs.append(file)
+ } else {
+ fs.create(file, mode.overwrite) // TODO do we need to hsync/hflush?
}
- val ops = if (createParents) { CreateOpts.createParent() } else { CreateOpts.donotCreateParent() }
- fc.create(file, flags, ops) // TODO do we need to hsync/hflush?
}
override def delete(recursive: Boolean): Unit = {
- if (!fc.delete(file, recursive)) {
+ if (!fs.delete(file, recursive)) {
throw new IOException(s"Could not delete file: $path")
}
}
}
- class HadoopZipHandle(fc: FileContext, file: Path) extends HadoopFileHandle(fc, file) {
+ class HadoopZipHandle(fs: FileSystem, file: Path) extends HadoopFileHandle(fs, file) {
override def open: CloseableIterator[(Option[String], InputStream)] = {
// we have to read the bytes into memory to get random access reads
- val bytes = WithClose(PathUtils.handleCompression(fc.open(file), file.getName)) { is =>
+ val bytes = WithClose(PathUtils.handleCompression(fs.open(file), file.getName)) { is =>
IOUtils.toByteArray(is)
}
new ZipFileIterator(new ZipFile(new SeekableInMemoryByteChannel(bytes)), file.toString)
}
- override def write(mode: CreateMode, createParents: Boolean): OutputStream =
- factory.createArchiveOutputStream(ArchiveStreamFactory.ZIP, super.write(mode, createParents))
+ override def write(mode: CreateMode): OutputStream =
+ factory.createArchiveOutputStream(ArchiveStreamFactory.ZIP, super.write(mode))
}
- class HadoopTarHandle(fc: FileContext, file: Path) extends HadoopFileHandle(fc, file) {
+ class HadoopTarHandle(fs: FileSystem, file: Path) extends HadoopFileHandle(fs, file) {
override def open: CloseableIterator[(Option[String], InputStream)] = {
- val uncompressed = PathUtils.handleCompression(fc.open(file), file.getName)
+ val uncompressed = PathUtils.handleCompression(fs.open(file), file.getName)
val archive: ArchiveInputStream[_ <: ArchiveEntry] =
factory.createArchiveInputStream(ArchiveStreamFactory.TAR, uncompressed)
new ArchiveFileIterator(archive, file.toString)
}
- override def write(mode: CreateMode, createParents: Boolean): OutputStream =
- factory.createArchiveOutputStream(ArchiveStreamFactory.TAR, super.write(mode, createParents))
+ override def write(mode: CreateMode): OutputStream =
+ factory.createArchiveOutputStream(ArchiveStreamFactory.TAR, super.write(mode))
}
}
diff --git a/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/io/CompressionUtils.scala b/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/io/CompressionUtils.scala
index 726ee9f80645..0e1e2132ac44 100644
--- a/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/io/CompressionUtils.scala
+++ b/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/io/CompressionUtils.scala
@@ -24,7 +24,9 @@ trait CompressionUtils {
def isCompressedFilename(filename: String): Boolean
def getUncompressedFilename(filename: String): String
def getCompressedFilename(filename: String): String
- def compress(is: InputStream): InputStream
+ @deprecated("misleading name - replaced with decompress")
+ def compress(is: InputStream): InputStream = decompress(is)
+ def decompress(is: InputStream): InputStream
}
object CompressionUtils {
@@ -59,7 +61,7 @@ object CompressionUtils {
org.apache.commons.compress.compressors.gzip.GzipUtils.getUncompressedFilename(filename)
override def getCompressedFilename(filename: String): String =
org.apache.commons.compress.compressors.gzip.GzipUtils.getCompressedFilename(filename)
- override def compress(is: InputStream): InputStream = new GZIPInputStream(is)
+ override def decompress(is: InputStream): InputStream = new GZIPInputStream(is)
}
case object XZUtils extends CompressionUtils {
@@ -69,7 +71,7 @@ object CompressionUtils {
org.apache.commons.compress.compressors.xz.XZUtils.getUncompressedFilename(filename)
override def getCompressedFilename(filename: String): String =
org.apache.commons.compress.compressors.xz.XZUtils.getCompressedFilename(filename)
- override def compress(is: InputStream): InputStream = new XZCompressorInputStream(is)
+ override def decompress(is: InputStream): InputStream = new XZCompressorInputStream(is)
}
case object BZip2Utils extends CompressionUtils {
@@ -79,6 +81,6 @@ object CompressionUtils {
org.apache.commons.compress.compressors.bzip2.BZip2Utils.getUncompressedFilename(filename)
override def getCompressedFilename(filename: String): String =
org.apache.commons.compress.compressors.bzip2.BZip2Utils.getCompressedFilename(filename)
- override def compress(is: InputStream): InputStream = new BZip2CompressorInputStream(is)
+ override def decompress(is: InputStream): InputStream = new BZip2CompressorInputStream(is)
}
}
diff --git a/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/io/PathUtils.scala b/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/io/PathUtils.scala
index 58e11f093822..e17a9c09e55a 100644
--- a/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/io/PathUtils.scala
+++ b/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/io/PathUtils.scala
@@ -93,7 +93,7 @@ object PathUtils extends FileSystemDelegate with LazyLogging {
val buffered = new BufferedInputStream(is)
CompressionUtils.Utils.find(_.isCompressedFilename(filename)) match {
case None => buffered
- case Some(utils) => utils.compress(buffered)
+ case Some(utils) => utils.decompress(buffered)
}
}
diff --git a/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/io/fs/FileSystemDelegate.scala b/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/io/fs/FileSystemDelegate.scala
index c5d4aa6b1272..47fcaec70ebc 100644
--- a/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/io/fs/FileSystemDelegate.scala
+++ b/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/io/fs/FileSystemDelegate.scala
@@ -43,7 +43,7 @@ trait FileSystemDelegate extends LazyLogging {
def getUrl(path: String): URL
}
-object FileSystemDelegate {
+object FileSystemDelegate extends LazyLogging {
/**
* Creation mode for files
@@ -125,10 +125,22 @@ object FileSystemDelegate {
* Open the file for writing
*
* @param mode write mode
- * @param createParents if the file does not exist, create its parents. Note that this only makes sense
- * with `CreateMode.Create`
*/
- def write(mode: CreateMode, createParents: Boolean = false): OutputStream
+ def write(mode: CreateMode): OutputStream
+
+ /**
+ * Open the file for writing
+ *
+ * @param mode write mode
+ * @param createParents create parent dirs as necessary
+ */
+ @deprecated("createParents is always true")
+ def write(mode: CreateMode, createParents: Boolean): OutputStream = {
+ if (!createParents) {
+ logger.warn("Call to write with createParents=false, which is not supported")
+ }
+ write(mode)
+ }
/**
* Delete the file
diff --git a/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/io/fs/LocalDelegate.scala b/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/io/fs/LocalDelegate.scala
index 4430472cd712..a4a186413e6f 100644
--- a/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/io/fs/LocalDelegate.scala
+++ b/geomesa-utils-parent/geomesa-utils/src/main/scala/org/locationtech/geomesa/utils/io/fs/LocalDelegate.scala
@@ -105,7 +105,7 @@ object LocalDelegate {
CloseableIterator.single(None -> is, is.close())
}
- override def write(mode: CreateMode, createParents: Boolean): OutputStream = {
+ override def write(mode: CreateMode): OutputStream = {
mode.validate()
if (file.exists()) {
if (mode.append) {
@@ -120,12 +120,8 @@ object LocalDelegate {
throw new FileNotFoundException(s"File does not exist: $path")
} else {
val parent = file.getParentFile
- if (parent != null && !parent.exists()) {
- if (!createParents) {
- throw new FileNotFoundException(s"Parent file does not exist: $path")
- } else if (!parent.mkdirs()) {
- throw new IOException(s"Parent file does not exist and could not be created: $path")
- }
+ if (parent != null && !parent.exists() && !parent.mkdirs()) {
+ throw new IOException(s"Parent file does not exist and could not be created: $path")
}
new FileOutputStream(file)
}
@@ -143,8 +139,8 @@ object LocalDelegate {
class LocalZipHandle(file: File) extends LocalFileHandle(file) {
override def open: CloseableIterator[(Option[String], InputStream)] =
new ZipFileIterator(new ZipFile(file), file.getAbsolutePath)
- override def write(mode: CreateMode, createParents: Boolean): OutputStream =
- factory.createArchiveOutputStream(ArchiveStreamFactory.ZIP, super.write(mode, createParents))
+ override def write(mode: CreateMode): OutputStream =
+ factory.createArchiveOutputStream(ArchiveStreamFactory.ZIP, super.write(mode))
}
class LocalTarHandle(file: File) extends LocalFileHandle(file) {
@@ -154,8 +150,8 @@ object LocalDelegate {
factory.createArchiveInputStream(ArchiveStreamFactory.TAR, uncompressed)
new ArchiveFileIterator(archive, file.getAbsolutePath)
}
- override def write(mode: CreateMode, createParents: Boolean): OutputStream =
- factory.createArchiveOutputStream(ArchiveStreamFactory.TAR, super.write(mode, createParents))
+ override def write(mode: CreateMode): OutputStream =
+ factory.createArchiveOutputStream(ArchiveStreamFactory.TAR, super.write(mode))
}
private class StdInHandle(in: InputStream) extends FileHandle {
@@ -164,7 +160,7 @@ object LocalDelegate {
override def length: Long = Try(in.available().toLong).getOrElse(0L) // .available will throw if stream is closed
override def open: CloseableIterator[(Option[String], InputStream)] =
CloseableIterator.single(None -> CloseShieldInputStream.wrap(in))
- override def write(mode: CreateMode, createParents: Boolean): OutputStream = System.out
+ override def write(mode: CreateMode): OutputStream = System.out
override def delete(recursive: Boolean): Unit = {}
}
diff --git a/pom.xml b/pom.xml
index eec18c7c0e9e..7f13a0f3d7e2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -121,8 +121,8 @@
4.20.5
4.13.2
5.9.3
- 1.19.7
- 1.4.0
+ 1.20.3
+ 1.4.1
2.28.2
@@ -201,10 +201,11 @@
2
false
2.1.3
- 7-alpine
- 15.1
- 15-3.4
7.6.0
+ RELEASE.2024-10-29T16-01-48Z
+ 15-3.4
+ 15.1
+ 7-alpine
3.9.2
@@ -2892,6 +2893,12 @@
${testcontainers.version}
test
+
+ org.testcontainers
+ minio
+ ${testcontainers.version}
+ test
+
org.geomesa.testcontainers
testcontainers-accumulo
@@ -3098,10 +3105,11 @@
logging.properties
slf4j
${test.accumulo.docker.tag}
+ ${test.confluent.docker.tag}
+ ${test.minio.docker.tag}
${test.postgis.docker.tag}
${test.postgres.docker.tag}
${test.redis.docker.tag}
- ${test.confluent.docker.tag}
${test.zookeeper.docker.tag}