Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GEOMESA-3411 FSDS - Fix reads of tar files from s3 #3228

Merged
merged 2 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions build/cqs.tsv
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,9 @@ org.slf4j:jul-to-slf4j 1.7.36 test
org.specs2:specs2-core_2.12 4.20.5 test
org.specs2:specs2-junit_2.12 4.20.5 test
org.specs2:specs2-mock_2.12 4.20.5 test
org.testcontainers:kafka 1.19.7 test
org.testcontainers:postgresql 1.19.7 test
org.testcontainers:testcontainers 1.19.7 test
org.testcontainers:kafka 1.20.3 test
org.testcontainers:minio 1.20.3 test
org.testcontainers:postgresql 1.20.3 test
org.testcontainers:testcontainers 1.20.3 test
org.wololo:jts2geojson 0.16.1 test
org.xerial.snappy:snappy-java 1.1.10.5 test
7 changes: 7 additions & 0 deletions build/test/resources/log4j.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,16 @@
</layout>
</appender>

<category name="org.locationtech.geomesa">
<priority value="info"/>
</category>

<category name="org.apache.hadoop">
<priority value="error"/>
</category>
<category name="software.amazon.awssdk.transfer.s3.S3TransferManager">
<priority value="error"/>
</category>
<category name="org.apache.accumulo">
<priority value="error"/>
</category>
Expand Down
19 changes: 19 additions & 0 deletions geomesa-fs/geomesa-fs-datastore/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,30 @@
<artifactId>geomesa-fs-storage-orc_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3-transfer-manager</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>minio</artifactId>
</dependency>
<dependency>
<groupId>org.geomesa.testcontainers</groupId>
<artifactId>testcontainers-accumulo</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ package org.locationtech.geomesa.fs.data

import com.typesafe.scalalogging.LazyLogging
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileContext, Path}
import org.apache.hadoop.fs.{FileContext, FileSystem, Path}
import org.geotools.api.data.Query
import org.geotools.api.feature.`type`.Name
import org.geotools.api.feature.simple.SimpleFeatureType
import org.geotools.data.store.{ContentDataStore, ContentEntry, ContentFeatureSource}
import org.locationtech.geomesa.fs.storage.api._
import org.locationtech.geomesa.fs.storage.common.StorageKeys
import org.locationtech.geomesa.fs.storage.common.metadata.FileBasedMetadata
import org.locationtech.geomesa.fs.storage.common.utils.PathCache
import org.locationtech.geomesa.index.stats.RunnableStats.UnoptimizedRunnableStats
import org.locationtech.geomesa.index.stats.{GeoMesaStats, HasGeoMesaStats}
import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes
Expand All @@ -27,8 +28,19 @@ import org.locationtech.geomesa.utils.io.CloseQuietly
import scala.concurrent.duration.Duration
import scala.util.control.NonFatal

/**
* File system data store
*
* @param fs file system - note, this is expected to be a shared resource, and is not cleaned up on data store dispose
* @param conf conf
* @param root root path
* @param readThreads number of threads used for read ops
* @param writeTimeout timeout for write ops
* @param defaultEncoding default file encoding
* @param namespace geoserver namespace
*/
class FileSystemDataStore(
fc: FileContext,
fs: FileSystem,
conf: Configuration,
root: Path,
readThreads: Int,
Expand All @@ -37,9 +49,21 @@ class FileSystemDataStore(
namespace: Option[String]
) extends ContentDataStore with HasGeoMesaStats with LazyLogging {

// noinspection ScalaUnusedSymbol
@deprecated("Use FileSystem instead of FileContext")
def this(
fc: FileContext,
conf: Configuration,
root: Path,
readThreads: Int,
writeTimeout: Duration,
defaultEncoding: Option[String],
namespace: Option[String]) =
this(FileSystem.get(root.toUri, conf), conf, root, readThreads, writeTimeout, defaultEncoding, namespace)

namespace.foreach(setNamespaceURI)

private val manager = FileSystemStorageManager(fc, conf, root, namespace)
private val manager = FileSystemStorageManager(fs, conf, root, namespace)

override val stats: GeoMesaStats = new UnoptimizedRunnableStats(this)

Expand Down Expand Up @@ -82,13 +106,15 @@ class FileSystemDataStore(
val fileSize = sft.removeTargetFileSize()

val path = manager.defaultPath(sft.getTypeName)
val context = FileSystemContext(fc, conf, path, namespace)
val context = FileSystemContext(fs, conf, path, namespace)

val metadata =
StorageMetadataFactory.create(context, meta, Metadata(sft, encoding, scheme, leafStorage, fileSize))
try { manager.register(path, FileSystemStorageFactory(context, metadata)) } catch {
case NonFatal(e) => CloseQuietly(metadata).foreach(e.addSuppressed); throw e
}
PathCache.register(fs, root)
PathCache.register(fs, path)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@

package org.locationtech.geomesa.fs.data

import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine, LoadingCache}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileContext, Path}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.geotools.api.data.DataAccessFactory.Param
import org.geotools.api.data.{DataStore, DataStoreFactorySpi}
import org.locationtech.geomesa.fs.storage.api.FileSystemStorageFactory
Expand All @@ -30,7 +29,6 @@ import scala.concurrent.duration.Duration
class FileSystemDataStoreFactory extends DataStoreFactorySpi {

import FileSystemDataStoreFactory.FileSystemDataStoreParams._
import FileSystemDataStoreFactory.fileContextCache

override def createDataStore(params: java.util.Map[String, _]): DataStore = {

Expand All @@ -45,8 +43,6 @@ class FileSystemDataStoreFactory extends DataStoreFactorySpi {
conf
}

val fc = fileContextCache.get(conf)

val path = new Path(PathParam.lookup(params))
val encoding = EncodingParam.lookupOpt(params).filterNot(_.isEmpty)

Expand All @@ -63,7 +59,9 @@ class FileSystemDataStoreFactory extends DataStoreFactorySpi {

val namespace = NamespaceParam.lookupOpt(params)

new FileSystemDataStore(fc, conf, path, readThreads, writeTimeout, encoding, namespace)
val fs = FileSystem.get(path.toUri, conf)

new FileSystemDataStore(fs, conf, path, readThreads, writeTimeout, encoding, namespace)
}

override def createNewDataStore(params: java.util.Map[String, _]): DataStore =
Expand Down Expand Up @@ -105,12 +103,6 @@ object FileSystemDataStoreFactory extends GeoMesaDataStoreInfo {

private val configuration = new Configuration()

private val fileContextCache: LoadingCache[Configuration, FileContext] = Caffeine.newBuilder().build(
new CacheLoader[Configuration, FileContext]() {
override def load(conf: Configuration): FileContext = FileContext.getFileContext(conf)
}
)

object FileSystemDataStoreParams extends NamespaceParams {

val WriterFileTimeout: SystemProperty = SystemProperty("geomesa.fs.writer.partition.timeout", "60s")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,16 @@ class FileSystemFeatureStore(
}

override def canLimit: Boolean = false
override def canLimit(query: Query): Boolean = false
override def canTransact: Boolean = false
override def canEvent: Boolean = false
override def canReproject: Boolean = false
override def canSort: Boolean = false

override def canSort(query: Query): Boolean = false
override def canFilter: Boolean = true
override def canFilter(query: Query): Boolean = true
override def canRetype: Boolean = true
override def canRetype(query: Query): Boolean = true

override protected def buildQueryCapabilities(): QueryCapabilities = FileSystemFeatureStore.capabilities
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ package org.locationtech.geomesa.fs.data
import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine}
import com.typesafe.scalalogging.LazyLogging
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileContext, Path}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.locationtech.geomesa.fs.storage.api._
import org.locationtech.geomesa.fs.storage.common.utils.PathCache
import org.locationtech.geomesa.utils.io.CloseQuietly
Expand All @@ -23,11 +23,11 @@ import scala.util.control.NonFatal
/**
* Manages the storages and associated simple feature types underneath a given path
*
* @param fc file context
* @param fs file system
* @param conf configuration
* @param root root path for the data store
*/
class FileSystemStorageManager private (fc: FileContext, conf: Configuration, root: Path, namespace: Option[String])
class FileSystemStorageManager private (fs: FileSystem, conf: Configuration, root: Path, namespace: Option[String])
extends MethodProfiling with LazyLogging {

import scala.collection.JavaConverters._
Expand All @@ -42,7 +42,7 @@ class FileSystemStorageManager private (fc: FileContext, conf: Configuration, ro
*/
def storage(typeName: String): Option[FileSystemStorage] = {
cache.get(typeName).map(_._2) // check cached values
.orElse(Some(defaultPath(typeName)).filter(PathCache.exists(fc, _)).flatMap(loadPath)) // check expected (default) path
.orElse(Some(defaultPath(typeName)).filter(PathCache.exists(fs, _)).flatMap(loadPath)) // check expected (default) path
.orElse(loadAll().find(_.metadata.sft.getTypeName == typeName)) // check other paths until we find it
}

Expand Down Expand Up @@ -80,8 +80,8 @@ class FileSystemStorageManager private (fc: FileContext, conf: Configuration, ro
* @return
*/
private def loadAll(): Iterator[FileSystemStorage] = {
if (!PathCache.exists(fc, root)) { Iterator.empty } else {
val dirs = PathCache.list(fc, root).filter(_.isDirectory).map(_.getPath)
if (!PathCache.exists(fs, root)) { Iterator.empty } else {
val dirs = PathCache.list(fs, root).filter(_.isDirectory).map(_.getPath)
dirs.filterNot(path => cache.exists { case (_, (p, _)) => p == path }).flatMap(loadPath)
}
}
Expand All @@ -99,7 +99,7 @@ class FileSystemStorageManager private (fc: FileContext, conf: Configuration, ro
logger.debug(s"${ if (storage.isDefined) "Loaded" else "No" } storage at path '$path' in ${time}ms")

profile(complete _) {
val context = FileSystemContext(fc, conf, path, namespace)
val context = FileSystemContext(fs, conf, path, namespace)
StorageMetadataFactory.load(context).map { meta =>
try {
val storage = FileSystemStorageFactory(context, meta)
Expand All @@ -116,20 +116,20 @@ class FileSystemStorageManager private (fc: FileContext, conf: Configuration, ro
object FileSystemStorageManager {

private val cache = Caffeine.newBuilder().build(
new CacheLoader[(FileContext, Configuration, Path, Option[String]), FileSystemStorageManager]() {
override def load(key: (FileContext, Configuration, Path, Option[String])): FileSystemStorageManager =
new CacheLoader[(FileSystem, Configuration, Path, Option[String]), FileSystemStorageManager]() {
override def load(key: (FileSystem, Configuration, Path, Option[String])): FileSystemStorageManager =
new FileSystemStorageManager(key._1, key._2, key._3, key._4)
}
)

/**
* Load a cached storage manager instance
*
* @param fc file context
* @param fs file system
* @param conf configuration
* @param root data store root path
* @return
*/
def apply(fc: FileContext, conf: Configuration, root: Path, namespace: Option[String]): FileSystemStorageManager =
cache.get((fc, conf, root, namespace))
def apply(fs: FileSystem, conf: Configuration, root: Path, namespace: Option[String]): FileSystemStorageManager =
cache.get((fs, conf, root, namespace))
}
Empty file.
Loading
Loading