diff --git a/CHANGELOG.md b/CHANGELOG.md index e3d65a5ad1..f2335f5d9d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.19.0] - 2020-06-28 +### Changed +- Improving Windows support by removing Hadoop FS dependencies + ## [0.18.2] - 2020-06-26 ### Changed - Upgraded Flink engine to `0.3.3` diff --git a/build.sbt b/build.sbt index 8310561e13..14a598075f 100644 --- a/build.sbt +++ b/build.sbt @@ -38,7 +38,11 @@ lazy val kamuCoreCoordinator = project .enablePlugins(AutomateHeaderPlugin) .settings( libraryDependencies ++= Seq( - deps.jcabiLog, + //deps.apacheCommonsCompress, + deps.betterFiles, + deps.log4jApi, + deps.log4jCore, + deps.log4jBridge, deps.scallop, deps.hadoopCommon, deps.sqlLine, @@ -56,7 +60,8 @@ lazy val kamuCoreUtils = project .enablePlugins(AutomateHeaderPlugin) .settings( libraryDependencies ++= Seq( - deps.hadoopCommon, + deps.betterFiles, + deps.log4jApi, deps.scalaTest % "test", deps.sparkCore % "provided", deps.sparkHive % "provided", @@ -77,7 +82,7 @@ lazy val kamuCoreManifests = project .enablePlugins(AutomateHeaderPlugin) .settings( libraryDependencies ++= Seq( - deps.hadoopCommon, + deps.betterFiles, deps.pureConfig, deps.pureConfigYaml, deps.spire @@ -90,11 +95,14 @@ lazy val kamuCoreManifests = project ////////////////////////////////////////////////////////////////////////////// lazy val versions = new { + val apacheCommonsCompress = "1.20" + val betterFiles = "3.9.1" val geoSpark = "1.2.0" val hadoopCommon = "2.6.5" val json4sJackson = "3.5.3" val jacksonCore = "2.6.7" val jacksonDatabind = "2.6.7.1" + val log4j = "2.13.3" val pureConfig = "0.11.1" val scalajHttp = "2.4.1" val spark = "2.4.0" @@ -104,8 +112,14 @@ lazy val versions = new { lazy val deps = new { - val jcabiLog = "com.jcabi" % "jcabi-log" % "0.17.3" + val log4jApi = "org.apache.logging.log4j" % "log4j-api" % versions.log4j + val log4jCore = "org.apache.logging.log4j" % "log4j-core" % versions.log4j + val log4jBridge = "org.apache.logging.log4j" % "log4j-1.2-api" % versions.log4j + //val jcabiLog = "com.jcabi" % "jcabi-log" % "0.17.3" val scallop = "org.rogach" %% "scallop" % "3.3.1" + // File system + val betterFiles = "com.github.pathikrit" %% "better-files" % versions.betterFiles + val apacheCommonsCompress = "org.apache.commons" % "commons-compress" % versions.apacheCommonsCompress // Configs val pureConfig = "com.github.pureconfig" %% "pureconfig" % versions.pureConfig val pureConfigYaml = "com.github.pureconfig" %% "pureconfig-yaml" % versions.pureConfig @@ -126,6 +140,7 @@ lazy val deps = // Hadoop File System val hadoopCommon = ("org.apache.hadoop" % "hadoop-common" % versions.hadoopCommon) + .exclude("log4j", "log4j") .exclude("commons-beanutils", "commons-beanutils") .exclude("commons-beanutils", "commons-beanutils-core") // SQL Shell @@ -176,6 +191,8 @@ lazy val assemblySettings = Seq( case PathList("org", "xerial", xs @ _*) => MergeStrategy.last case PathList("com", "thoughtworks", "paranamer", xs @ _*) => MergeStrategy.last + case PathList(ps @ _*) if ps.last == "Log4j2Plugins.dat" => + MergeStrategy.discard // end insanity case "overview.html" => MergeStrategy.discard case "plugin.xml" => MergeStrategy.discard diff --git a/core.coordinator/src/main/resources/log4j.xml b/core.coordinator/src/main/resources/log4j.xml deleted file mode 100644 index 0af1cb175a..0000000000 --- a/core.coordinator/src/main/resources/log4j.xml +++ /dev/null @@ -1,44 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/core.coordinator/src/main/resources/log4j2.xml b/core.coordinator/src/main/resources/log4j2.xml new file mode 100644 index 0000000000..0cd48a58ad --- /dev/null +++ b/core.coordinator/src/main/resources/log4j2.xml @@ -0,0 +1,22 @@ + + + + + + + + + + + + + + + + + + + + + + diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/CliParser.scala b/core.coordinator/src/main/scala/dev/kamu/cli/CliParser.scala index 0f3da352cf..f1a3546871 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/CliParser.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/CliParser.scala @@ -8,11 +8,11 @@ package dev.kamu.cli +import java.nio.file.Path import java.time.Instant import dev.kamu.cli.output.OutputFormat -import org.apache.hadoop.fs.Path -import org.apache.log4j.Level +import org.apache.logging.log4j.Level import org.rogach.scallop._ import scala.concurrent.duration.Duration @@ -156,8 +156,6 @@ class CliArgs(arguments: Seq[String]) extends ScallopConf(arguments) { ) implicit val _logLevelConverter = singleArgConverter[Level](Level.toLevel) - implicit val _pathConverter = singleArgConverter[Path](s => new Path(s)) - implicit val _pathListConverter = listArgConverter[Path](s => new Path(s)) val _envVarConverter = new ValueConverter[Map[String, String]] { def resolveEnvVar(s: String): (String, String) = { if (s.indexOf("=") < 0) diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/Kamu.scala b/core.coordinator/src/main/scala/dev/kamu/cli/Kamu.scala index 257df2f9cb..7f04ed43e9 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/Kamu.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/Kamu.scala @@ -10,34 +10,33 @@ package dev.kamu.cli import java.io.PrintStream +import better.files.File import dev.kamu.cli.commands._ import dev.kamu.cli.transform.{EngineFactory, TransformService} import dev.kamu.cli.external._ import dev.kamu.cli.ingest.IngestService import dev.kamu.cli.metadata.MetadataRepository import dev.kamu.cli.output._ -import dev.kamu.core.utils.{Clock, DockerClient} import dev.kamu.core.utils.fs._ -import org.apache.hadoop.fs.FileSystem -import org.apache.log4j.Level +import dev.kamu.core.utils.{Clock, DockerClient} +import org.apache.logging.log4j.Level class Kamu( config: KamuConfig, - fileSystem: FileSystem, systemClock: Clock ) { val workspaceLayout = WorkspaceLayout( kamuRootDir = config.kamuRoot, - metadataDir = config.kamuRoot.resolve("datasets"), - remotesDir = config.kamuRoot.resolve("remotes"), + metadataDir = config.kamuRoot / "datasets", + remotesDir = config.kamuRoot / "remotes", localVolumeDir = config.localVolume - ).toAbsolute(fileSystem) + ).toAbsolute val metadataRepository = - new MetadataRepository(fileSystem, workspaceLayout, systemClock) + new MetadataRepository(workspaceLayout, systemClock) val remoteOperatorFactory = - new RemoteOperatorFactory(fileSystem, workspaceLayout, metadataRepository) + new RemoteOperatorFactory(workspaceLayout, metadataRepository) def run(cliArgs: CliArgs): Unit = { val command = getCommand(cliArgs) @@ -61,10 +60,7 @@ class Kamu( if (c.init.pullImages()) new PullImagesCommand(getDockerClient()) else - new InitCommand( - fileSystem, - workspaceLayout - ) + new InitCommand(workspaceLayout) case List(c.list) => new ListCommand( metadataRepository, @@ -73,12 +69,10 @@ class Kamu( case List(c.add) => if (c.add.interactive()) new AddInteractiveCommand( - fileSystem, metadataRepository ) else new AddCommand( - fileSystem, metadataRepository, c.add.manifests(), c.add.replace() @@ -102,14 +96,12 @@ class Kamu( ) new PullCommand( new IngestService( - fileSystem, workspaceLayout, metadataRepository, engineFactory, systemClock ), new TransformService( - fileSystem, metadataRepository, systemClock, engineFactory @@ -135,7 +127,6 @@ class Kamu( ) case List(c.push) => new PushCommand( - fileSystem, workspaceLayout, metadataRepository, remoteOperatorFactory, @@ -179,7 +170,6 @@ class Kamu( ) case List(c.notebook) => new NotebookCommand( - fileSystem, metadataRepository, getDockerClient(), c.notebook.env() @@ -201,16 +191,16 @@ class Kamu( } protected def ensureWorkspace(): Unit = { - if (!fileSystem.exists(workspaceLayout.kamuRootDir)) + if (!File(workspaceLayout.kamuRootDir).isDirectory) throw new UsageException("Not a kamu workspace") } protected def getDockerClient(): DockerClient = { - new DockerClient(fileSystem) + new DockerClient() } protected def getEngineFactory(logLevel: Level): EngineFactory = { - new EngineFactory(fileSystem, workspaceLayout, logLevel) + new EngineFactory(workspaceLayout, logLevel) } protected def getOutputStream(): PrintStream = { diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/KamuApp.scala b/core.coordinator/src/main/scala/dev/kamu/cli/KamuApp.scala index 72bdc7836a..b0b0a62f92 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/KamuApp.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/KamuApp.scala @@ -8,10 +8,11 @@ package dev.kamu.cli +import java.nio.file.Paths + import dev.kamu.core.utils.AutoClock -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.log4j.{Level, LogManager} +import org.apache.logging.log4j.core.config.Configurator +import org.apache.logging.log4j.{Level, LogManager} class UsageException(message: String = "", cause: Throwable = None.orNull) extends RuntimeException(message, cause) @@ -21,25 +22,21 @@ object KamuApp extends App { val systemClock = new AutoClock() - val fileSystem = FileSystem.get(new Configuration()) - FileSystem.enableSymlinks() - fileSystem.setWriteChecksum(false) - fileSystem.setVerifyChecksum(false) - val config = KamuConfig( workspaceRoot = KamuConfig - .findWorkspaceRoot(fileSystem, new Path(".")) - .getOrElse(new Path(".")) + .findWorkspaceRoot(Paths.get("")) + .getOrElse(Paths.get("")) ) try { val cliArgs = new CliArgs(args) - LogManager - .getLogger(getClass.getPackage.getName) - .setLevel(if (cliArgs.debug()) Level.ALL else cliArgs.logLevel()) + Configurator.setLevel( + getClass.getPackage.getName, + if (cliArgs.debug()) Level.ALL else cliArgs.logLevel() + ) - new Kamu(config, fileSystem, systemClock) + new Kamu(config, systemClock) .run(cliArgs) } catch { case e: UsageException => diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/KamuConfig.scala b/core.coordinator/src/main/scala/dev/kamu/cli/KamuConfig.scala index 57932d4a4c..f5472e44c6 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/KamuConfig.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/KamuConfig.scala @@ -8,8 +8,9 @@ package dev.kamu.cli -import org.apache.hadoop.fs.{FileSystem, Path} -import dev.kamu.core.utils.fs._ +import java.nio.file.Path + +import better.files.File case class KamuConfig( workspaceRoot: Path @@ -26,21 +27,18 @@ case class KamuConfig( object KamuConfig { val ROOT_DIR_NAME = ".kamu" - def findWorkspaceRoot(fileSystem: FileSystem, dir: Path): Option[Path] = { - findWorkspaceRootRec(fileSystem, fileSystem.toAbsolute(dir)) + def findWorkspaceRoot(dir: Path): Option[Path] = { + findWorkspaceRootRec(dir.toAbsolutePath) } @scala.annotation.tailrec - private def findWorkspaceRootRec( - fileSystem: FileSystem, - dir: Path - ): Option[Path] = { - if (fileSystem.exists(dir.resolve(ROOT_DIR_NAME))) { - Some(dir) - } else if (dir.isRoot) { + private def findWorkspaceRootRec(dir: Path): Option[Path] = { + if (dir == null) { None + } else if (File(dir.resolve(ROOT_DIR_NAME)).exists) { + Some(dir) } else { - findWorkspaceRootRec(fileSystem, dir.getParent) + findWorkspaceRootRec(dir.getParent) } } } diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/WorkspaceLayout.scala b/core.coordinator/src/main/scala/dev/kamu/cli/WorkspaceLayout.scala index 23e6c4e1f7..f502632332 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/WorkspaceLayout.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/WorkspaceLayout.scala @@ -8,7 +8,8 @@ package dev.kamu.cli -import org.apache.hadoop.fs.{FileSystem, Path} +import java.nio.file.Path + import dev.kamu.core.utils.fs._ /** Describes the layout of the workspace on disk */ @@ -23,12 +24,12 @@ case class WorkspaceLayout( localVolumeDir: Path ) { - def toAbsolute(fs: FileSystem): WorkspaceLayout = { + def toAbsolute: WorkspaceLayout = { copy( - kamuRootDir = fs.toAbsolute(kamuRootDir), - metadataDir = fs.toAbsolute(metadataDir), - remotesDir = fs.toAbsolute(remotesDir), - localVolumeDir = fs.toAbsolute(localVolumeDir) + kamuRootDir = kamuRootDir.toAbsolutePath, + metadataDir = metadataDir.toAbsolutePath, + remotesDir = remotesDir.toAbsolutePath, + localVolumeDir = localVolumeDir.toAbsolutePath ) } diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/commands/AddCommand.scala b/core.coordinator/src/main/scala/dev/kamu/cli/commands/AddCommand.scala index a3dca5fbd9..f9309fa316 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/commands/AddCommand.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/commands/AddCommand.scala @@ -19,11 +19,9 @@ import dev.kamu.cli.metadata.{ SchemaNotSupportedException } import dev.kamu.core.manifests.{DatasetID, DatasetRef, DatasetSnapshot} -import org.apache.hadoop.fs.FileSystem -import org.apache.log4j.LogManager +import org.apache.logging.log4j.LogManager class AddCommand( - fileSystem: FileSystem, metadataRepository: MetadataRepository, urls: Seq[URI], replace: Boolean diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/commands/AddInteractiveCommand.scala b/core.coordinator/src/main/scala/dev/kamu/cli/commands/AddInteractiveCommand.scala index ea1ce4daf9..49aec83afc 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/commands/AddInteractiveCommand.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/commands/AddInteractiveCommand.scala @@ -8,21 +8,21 @@ package dev.kamu.cli.commands +import java.nio.file.Paths + import pureconfig.generic.auto._ import dev.kamu.core.manifests.parsing.pureconfig.yaml import yaml.defaults._ import dev.kamu.cli.metadata.{MetadataRepository, ResourceLoader} import dev.kamu.core.manifests._ import dev.kamu.core.utils.fs._ -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.log4j.LogManager +import org.apache.logging.log4j.LogManager import scala.io.StdIn class InvalidInputException(msg: String) extends Exception(msg) class AddInteractiveCommand( - fileSystem: FileSystem, metadataRepository: MetadataRepository ) extends Command { private val logger = LogManager.getLogger(getClass.getName) @@ -39,9 +39,9 @@ class AddInteractiveCommand( metadataRepository.addDataset(dataset) logger.info("Added dataset") } else { - val path = new Path("./" + dataset.id + ".yaml") - new ResourceLoader(fileSystem).saveResourceToFile(dataset, path) - logger.info(s"Saved dataset to: ${fileSystem.toAbsolute(path)}") + val path = Paths.get("./" + dataset.id + ".yaml") + new ResourceLoader().saveResourceToFile(dataset, path) + logger.info(s"Saved dataset to: ${path.toAbsolutePath}") } } diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/commands/AssignWatermarkCommand.scala b/core.coordinator/src/main/scala/dev/kamu/cli/commands/AssignWatermarkCommand.scala index d34d35eab1..290fabce40 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/commands/AssignWatermarkCommand.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/commands/AssignWatermarkCommand.scala @@ -13,7 +13,7 @@ import java.time.Instant import dev.kamu.cli.metadata.MetadataRepository import dev.kamu.core.manifests.{DatasetID, MetadataBlock} import dev.kamu.core.utils.Clock -import org.apache.log4j.LogManager +import org.apache.logging.log4j.LogManager class AssignWatermarkCommand( systemClock: Clock, diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/commands/CompletionCommand.scala b/core.coordinator/src/main/scala/dev/kamu/cli/commands/CompletionCommand.scala index 059a6db6d4..58a95234e2 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/commands/CompletionCommand.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/commands/CompletionCommand.scala @@ -9,7 +9,7 @@ package dev.kamu.cli.commands import dev.kamu.cli.{CliArgs, UsageException} -import org.apache.log4j.LogManager +import org.apache.logging.log4j.LogManager import org.rogach.scallop.Scallop class CompletionCommand( diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/commands/DeleteCommand.scala b/core.coordinator/src/main/scala/dev/kamu/cli/commands/DeleteCommand.scala index 0da75e1295..d6d70fabb5 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/commands/DeleteCommand.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/commands/DeleteCommand.scala @@ -14,7 +14,7 @@ import dev.kamu.cli.metadata.{ MetadataRepository } import dev.kamu.core.manifests.DatasetID -import org.apache.log4j.LogManager +import org.apache.logging.log4j.LogManager class DeleteCommand( metadataRepository: MetadataRepository, diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/commands/DependencyGraphCommand.scala b/core.coordinator/src/main/scala/dev/kamu/cli/commands/DependencyGraphCommand.scala index 31be5d4545..a0ca0d52d7 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/commands/DependencyGraphCommand.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/commands/DependencyGraphCommand.scala @@ -10,7 +10,7 @@ package dev.kamu.cli.commands import dev.kamu.cli.metadata.MetadataRepository import dev.kamu.core.manifests.{DatasetID, DatasetKind} -import org.apache.log4j.LogManager +import org.apache.logging.log4j.LogManager class DependencyGraphCommand( metadataRepository: MetadataRepository @@ -31,7 +31,7 @@ class DependencyGraphCommand( datasets.foreach( ds => ds.datasetDependencies - .foreach(d => println(s"${quote(ds.id)} -> ${quote(d)};")) + .foreach(d => println(s"${quote(d)} -> ${quote(ds.id)};")) ) datasets.foreach( diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/commands/InitCommand.scala b/core.coordinator/src/main/scala/dev/kamu/cli/commands/InitCommand.scala index d55e857d67..0a91896ef0 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/commands/InitCommand.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/commands/InitCommand.scala @@ -10,13 +10,11 @@ package dev.kamu.cli.commands import java.io.PrintWriter +import better.files.File import dev.kamu.cli.{UsageException, WorkspaceLayout} -import dev.kamu.core.utils.fs._ -import org.apache.hadoop.fs.FileSystem -import org.apache.log4j.LogManager +import org.apache.logging.log4j.LogManager class InitCommand( - fileSystem: FileSystem, workspaceLayout: WorkspaceLayout ) extends Command { private val logger = LogManager.getLogger(getClass.getName) @@ -24,14 +22,14 @@ class InitCommand( override def requiresWorkspace: Boolean = false def run(): Unit = { - if (fileSystem.exists(workspaceLayout.kamuRootDir)) + if (File(workspaceLayout.kamuRootDir).isDirectory) throw new UsageException("Already a kamu workspace") - fileSystem.mkdirs(workspaceLayout.metadataDir) - fileSystem.mkdirs(workspaceLayout.remotesDir) + File(workspaceLayout.metadataDir).createDirectories() + File(workspaceLayout.remotesDir).createDirectories() - val outputStream = - fileSystem.create(workspaceLayout.kamuRootDir.resolve(".gitignore")) + val outputStream = File(workspaceLayout.kamuRootDir.resolve(".gitignore")) + .newOutputStream() val writer = new PrintWriter(outputStream) writer.write(WorkspaceLayout.GITIGNORE_CONTENT) diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/commands/ListCommand.scala b/core.coordinator/src/main/scala/dev/kamu/cli/commands/ListCommand.scala index e5c39f5659..d3fd208d22 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/commands/ListCommand.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/commands/ListCommand.scala @@ -10,7 +10,7 @@ package dev.kamu.cli.commands import dev.kamu.cli.metadata.{DoesNotExistException, MetadataRepository} import dev.kamu.cli.output.{FormatHint, OutputFormatter, SimpleResultSet} -import org.apache.log4j.LogManager +import org.apache.logging.log4j.LogManager class ListCommand( metadataRepository: MetadataRepository, diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/commands/LogCommand.scala b/core.coordinator/src/main/scala/dev/kamu/cli/commands/LogCommand.scala index f2f7845a19..01500841b4 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/commands/LogCommand.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/commands/LogCommand.scala @@ -10,7 +10,7 @@ package dev.kamu.cli.commands import dev.kamu.cli.metadata.{DoesNotExistException, MetadataRepository} import dev.kamu.core.manifests._ -import org.apache.log4j.LogManager +import org.apache.logging.log4j.LogManager class LogCommand( metadataRepository: MetadataRepository, diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/commands/NotebookCommand.scala b/core.coordinator/src/main/scala/dev/kamu/cli/commands/NotebookCommand.scala index 0217e35940..f118a21d5e 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/commands/NotebookCommand.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/commands/NotebookCommand.scala @@ -11,11 +11,9 @@ package dev.kamu.cli.commands import dev.kamu.cli.external.NotebookRunnerDocker import dev.kamu.cli.metadata.MetadataRepository import dev.kamu.core.utils.DockerClient -import org.apache.hadoop.fs.FileSystem -import org.apache.log4j.LogManager +import org.apache.logging.log4j.LogManager class NotebookCommand( - fileSystem: FileSystem, metadataRepository: MetadataRepository, dockerClient: DockerClient, environmentVars: Map[String, String] @@ -24,7 +22,6 @@ class NotebookCommand( def run(): Unit = { new NotebookRunnerDocker( - fileSystem, dockerClient, metadataRepository, environmentVars diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/commands/PullCommand.scala b/core.coordinator/src/main/scala/dev/kamu/cli/commands/PullCommand.scala index 107cd11838..3d3c9ea283 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/commands/PullCommand.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/commands/PullCommand.scala @@ -16,7 +16,7 @@ import dev.kamu.cli.UsageException import dev.kamu.core.manifests.{DatasetID, DatasetKind} import dev.kamu.core.manifests.parsing.pureconfig.yaml import dev.kamu.core.utils.fs._ -import org.apache.log4j.LogManager +import org.apache.logging.log4j.LogManager import yaml.defaults._ import pureconfig.generic.auto._ diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/commands/PullImagesCommand.scala b/core.coordinator/src/main/scala/dev/kamu/cli/commands/PullImagesCommand.scala index 636a2eccf2..c0209697ec 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/commands/PullImagesCommand.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/commands/PullImagesCommand.scala @@ -10,7 +10,7 @@ package dev.kamu.cli.commands import dev.kamu.cli.external.DockerImages import dev.kamu.core.utils.DockerClient -import org.apache.log4j.LogManager +import org.apache.logging.log4j.LogManager class PullImagesCommand( dockerClient: DockerClient diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/commands/PurgeCommand.scala b/core.coordinator/src/main/scala/dev/kamu/cli/commands/PurgeCommand.scala index 059d6a58e3..0f162e0922 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/commands/PurgeCommand.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/commands/PurgeCommand.scala @@ -11,7 +11,7 @@ package dev.kamu.cli.commands import dev.kamu.cli.UsageException import dev.kamu.cli.metadata._ import dev.kamu.core.manifests.DatasetID -import org.apache.log4j.LogManager +import org.apache.logging.log4j.LogManager class PurgeCommand( metadataRepository: MetadataRepository, diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/commands/PushCommand.scala b/core.coordinator/src/main/scala/dev/kamu/cli/commands/PushCommand.scala index 791c7034be..73e3acb480 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/commands/PushCommand.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/commands/PushCommand.scala @@ -12,11 +12,9 @@ import dev.kamu.cli.external.RemoteOperatorFactory import dev.kamu.cli.metadata.{DoesNotExistException, MetadataRepository} import dev.kamu.cli.{UsageException, WorkspaceLayout} import dev.kamu.core.manifests.{DatasetID, Remote, RemoteID} -import org.apache.hadoop.fs.FileSystem -import org.apache.log4j.LogManager +import org.apache.logging.log4j.LogManager class PushCommand( - fileSystem: FileSystem, workspaceLayout: WorkspaceLayout, metadataRepository: MetadataRepository, remoteOperatorFactory: RemoteOperatorFactory, diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/commands/RemoteAddCommand.scala b/core.coordinator/src/main/scala/dev/kamu/cli/commands/RemoteAddCommand.scala index aa927cfef9..9315c0002f 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/commands/RemoteAddCommand.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/commands/RemoteAddCommand.scala @@ -16,7 +16,7 @@ import dev.kamu.cli.metadata.{ SchemaNotSupportedException } import dev.kamu.core.manifests.Remote -import org.apache.log4j.LogManager +import org.apache.logging.log4j.LogManager class RemoteAddCommand( metadataRepository: MetadataRepository, diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/commands/RemoteDeleteCommand.scala b/core.coordinator/src/main/scala/dev/kamu/cli/commands/RemoteDeleteCommand.scala index 52900d936a..6d014ebda8 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/commands/RemoteDeleteCommand.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/commands/RemoteDeleteCommand.scala @@ -10,7 +10,7 @@ package dev.kamu.cli.commands import dev.kamu.cli.metadata.{DoesNotExistException, MetadataRepository} import dev.kamu.core.manifests.RemoteID -import org.apache.log4j.LogManager +import org.apache.logging.log4j.LogManager class RemoteDeleteCommand( metadataRepository: MetadataRepository, diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/commands/RemoteListCommand.scala b/core.coordinator/src/main/scala/dev/kamu/cli/commands/RemoteListCommand.scala index 37b8c41528..017ae5105a 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/commands/RemoteListCommand.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/commands/RemoteListCommand.scala @@ -10,7 +10,7 @@ package dev.kamu.cli.commands import dev.kamu.cli.metadata.MetadataRepository import dev.kamu.cli.output.{OutputFormatter, SimpleResultSet} -import org.apache.log4j.LogManager +import org.apache.logging.log4j.LogManager class RemoteListCommand( metadataRepository: MetadataRepository, diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/commands/SQLServerCommand.scala b/core.coordinator/src/main/scala/dev/kamu/cli/commands/SQLServerCommand.scala index 93fe4af5af..064f33322e 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/commands/SQLServerCommand.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/commands/SQLServerCommand.scala @@ -11,7 +11,7 @@ package dev.kamu.cli.commands import dev.kamu.cli.external.LivyDockerProcessBuilder import dev.kamu.cli.metadata.MetadataRepository import dev.kamu.core.utils.DockerClient -import org.apache.log4j.LogManager +import org.apache.logging.log4j.LogManager import scala.concurrent.duration._ diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/commands/SQLShellCommand.scala b/core.coordinator/src/main/scala/dev/kamu/cli/commands/SQLShellCommand.scala index 99e2bdafb6..c7e7fdc32a 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/commands/SQLShellCommand.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/commands/SQLShellCommand.scala @@ -10,13 +10,13 @@ package dev.kamu.cli.commands import java.io.PrintStream import java.net.URI +import java.nio.file.Path import dev.kamu.cli.external.LivyDockerProcessBuilder import dev.kamu.cli.metadata.MetadataRepository import dev.kamu.cli.output.OutputFormat import dev.kamu.core.utils.{DockerClient, IOHandlerPresets} -import org.apache.hadoop.fs.Path -import org.apache.log4j.LogManager +import org.apache.logging.log4j.LogManager import sqlline.SqlLine import scala.concurrent.duration._ diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/external/DockerImages.scala b/core.coordinator/src/main/scala/dev/kamu/cli/external/DockerImages.scala index eb33eb26c3..f8f5ab80ec 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/external/DockerImages.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/external/DockerImages.scala @@ -9,8 +9,8 @@ package dev.kamu.cli.external object DockerImages { - val SPARK = "kamudata/engine-spark:0.5.0" - val FLINK = "kamudata/engine-flink:0.3.3" + val SPARK = "kamudata/engine-spark:0.6.0" + val FLINK = "kamudata/engine-flink:0.4.0" val LIVY = SPARK val JUPYTER = "kamudata/jupyter-uber:0.0.1" diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/external/JupyterDockerProcessBuilder.scala b/core.coordinator/src/main/scala/dev/kamu/cli/external/JupyterDockerProcessBuilder.scala index 3db4e8aca2..ab2343e47a 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/external/JupyterDockerProcessBuilder.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/external/JupyterDockerProcessBuilder.scala @@ -10,20 +10,20 @@ package dev.kamu.cli.external import java.awt.Desktop import java.net.URI +import java.nio.file.Paths +import dev.kamu.core.utils.fs._ import dev.kamu.core.utils.{ DockerClient, DockerProcess, DockerProcessBuilder, DockerRunArgs } -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.log4j.LogManager +import org.apache.logging.log4j.LogManager import scala.sys.process.{ProcessBuilder, ProcessIO} class JupyterDockerProcessBuilder( - fileSystem: FileSystem, dockerClient: DockerClient, network: String, environmentVars: Map[String, String] @@ -36,9 +36,8 @@ class JupyterDockerProcessBuilder( hostname = Some("kamu-jupyter"), network = Some(network), exposePorts = List(80), - volumeMap = Map( - fileSystem.getWorkingDirectory -> new Path("/opt/workdir") - ), + volumeMap = + Map(Paths.get("").toAbsolutePath -> Paths.get("/opt/workdir")), environmentVars = environmentVars ) ) { @@ -72,7 +71,7 @@ class JupyterDockerProcessBuilder( DockerRunArgs( image = runArgs.image, volumeMap = - Map(fileSystem.getWorkingDirectory -> new Path("/opt/workdir")) + Map(Paths.get("").toAbsolutePath -> Paths.get("/opt/workdir")) ), shellCommand ) diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/external/LivyDockerProcessBuilder.scala b/core.coordinator/src/main/scala/dev/kamu/cli/external/LivyDockerProcessBuilder.scala index ccb3e955aa..1d70d35555 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/external/LivyDockerProcessBuilder.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/external/LivyDockerProcessBuilder.scala @@ -8,9 +8,10 @@ package dev.kamu.cli.external +import java.nio.file.Paths + import dev.kamu.core.manifests.VolumeLayout import dev.kamu.core.utils.{DockerClient, DockerProcessBuilder, DockerRunArgs} -import org.apache.hadoop.fs.Path class LivyDockerProcessBuilder( volumeLayout: VolumeLayout, @@ -31,7 +32,7 @@ class LivyDockerProcessBuilder( exposePorts = exposePorts, exposePortMap = exposePortMap, volumeMap = Map( - volumeLayout.dataDir -> new Path("/opt/spark/work-dir") + volumeLayout.dataDir -> Paths.get("/opt/spark/work-dir") ), network = network ) diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/external/NotebookRunnerDocker.scala b/core.coordinator/src/main/scala/dev/kamu/cli/external/NotebookRunnerDocker.scala index af28f3e2aa..4100ac3dfb 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/external/NotebookRunnerDocker.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/external/NotebookRunnerDocker.scala @@ -10,12 +10,10 @@ package dev.kamu.cli.external import dev.kamu.cli.metadata.MetadataRepository import dev.kamu.core.utils.{DockerClient, DockerProcess, IOHandlerPresets} -import org.apache.hadoop.fs.FileSystem -import org.apache.log4j.LogManager +import org.apache.logging.log4j.LogManager import sun.misc.{Signal, SignalHandler} class NotebookRunnerDocker( - fileSystem: FileSystem, dockerClient: DockerClient, metadataRepository: MetadataRepository, environmentVars: Map[String, String] @@ -34,7 +32,6 @@ class NotebookRunnerDocker( val jupyterBuilder = new JupyterDockerProcessBuilder( - fileSystem, dockerClient, network, environmentVars diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/external/RemoteOperator.scala b/core.coordinator/src/main/scala/dev/kamu/cli/external/RemoteOperator.scala index 57fc097b73..8c3d0da143 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/external/RemoteOperator.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/external/RemoteOperator.scala @@ -11,7 +11,6 @@ package dev.kamu.cli.external import dev.kamu.cli.WorkspaceLayout import dev.kamu.cli.metadata.MetadataRepository import dev.kamu.core.manifests.{DatasetID, Remote} -import org.apache.hadoop.fs.FileSystem trait RemoteOperator { def push(datasets: Seq[DatasetID]) @@ -20,7 +19,6 @@ trait RemoteOperator { } class RemoteOperatorFactory( - fileSystem: FileSystem, workspaceLayout: WorkspaceLayout, metadataRepository: MetadataRepository ) { @@ -38,7 +36,6 @@ class RemoteOperatorFactory( remote.url.getScheme match { case "s3" => new RemoteOperatorS3Cli( - fileSystem, metadataRepository, remote ) diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/external/RemoteOperatorS3Cli.scala b/core.coordinator/src/main/scala/dev/kamu/cli/external/RemoteOperatorS3Cli.scala index 7e1b529c75..dbbe0e0482 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/external/RemoteOperatorS3Cli.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/external/RemoteOperatorS3Cli.scala @@ -8,16 +8,17 @@ package dev.kamu.cli.external +import java.nio.file.{Files, Path, Paths} + +import better.files.File import dev.kamu.cli.metadata.MetadataRepository import dev.kamu.core.manifests.{DatasetID, Remote, VolumeLayout} -import dev.kamu.core.utils.fs._ -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.log4j.LogManager +import dev.kamu.core.utils.Temp +import org.apache.logging.log4j.LogManager import scala.sys.process.Process class RemoteOperatorS3Cli( - fileSystem: FileSystem, metadataRepository: MetadataRepository, remote: Remote ) extends RemoteOperator { @@ -25,21 +26,21 @@ class RemoteOperatorS3Cli( override def push(datasets: Seq[DatasetID]): Unit = { withVolumeLayout(datasets) { localDir => - s3Sync(localDir, new Path(remote.url)) + s3Sync(localDir, Paths.get(remote.url)) } } override def pull(datasets: Seq[DatasetID]): Unit = { val remoteVolumeLayout = VolumeLayout( - metadataDir = new Path("datasets"), - checkpointsDir = new Path("checkpoints"), - dataDir = new Path("data"), - cacheDir = new Path("cache") + metadataDir = Paths.get("datasets"), + checkpointsDir = Paths.get("checkpoints"), + dataDir = Paths.get("data"), + cacheDir = Paths.get("cache") ) for (id <- datasets) { // TODO: Do one sync instead since volume layouts should match - val sourceLayout = remoteVolumeLayout.relativeTo(new Path(remote.url)) + val sourceLayout = remoteVolumeLayout.relativeTo(Paths.get(remote.url)) val destLayout = metadataRepository.getDatasetLayout(id) s3Sync( @@ -62,36 +63,33 @@ class RemoteOperatorS3Cli( protected def withVolumeLayout[T]( datasets: Seq[DatasetID] )(func: Path => T): T = { - Temp.withRandomTempDir(fileSystem, "kamu-volume-") { tempDir => + Temp.withRandomTempDir("kamu-volume-") { tempDir => val targetLayout = VolumeLayout( - metadataDir = new Path("datasets"), - checkpointsDir = new Path("checkpoints"), - dataDir = new Path("data"), - cacheDir = new Path("cache") + metadataDir = Paths.get("datasets"), + checkpointsDir = Paths.get("checkpoints"), + dataDir = Paths.get("data"), + cacheDir = Paths.get("cache") ).relativeTo(tempDir) - targetLayout.allDirs.foreach(fileSystem.mkdirs) + targetLayout.allDirs.foreach(File(_).createDirectories()) for (id <- datasets) { val datasetLayout = metadataRepository.getDatasetLayout(id) - fileSystem.createSymlink( - datasetLayout.dataDir, + Files.createSymbolicLink( targetLayout.dataDir.resolve(id.toString), - false + datasetLayout.dataDir ) - if (fileSystem.exists(datasetLayout.checkpointsDir)) - fileSystem.createSymlink( - datasetLayout.checkpointsDir, + if (File(datasetLayout.checkpointsDir).exists) + Files.createSymbolicLink( targetLayout.checkpointsDir.resolve(id.toString), - false + datasetLayout.checkpointsDir ) - fileSystem.createSymlink( - datasetLayout.metadataDir, + Files.createSymbolicLink( targetLayout.metadataDir.resolve(id.toString), - false + datasetLayout.metadataDir ) } diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/ingest/CheckpointingExecutor.scala b/core.coordinator/src/main/scala/dev/kamu/cli/ingest/CheckpointingExecutor.scala index 2755812ead..8cb7d693ca 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/ingest/CheckpointingExecutor.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/ingest/CheckpointingExecutor.scala @@ -8,10 +8,12 @@ package dev.kamu.cli.ingest +import java.nio.file.Path + +import better.files.File import dev.kamu.core.manifests.parsing.pureconfig.yaml import dev.kamu.core.manifests.{Manifest, Resource} -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.log4j.LogManager +import org.apache.logging.log4j.LogManager import pureconfig.{ConfigReader, ConfigWriter, Derivation} case class ExecutionResult[TCheckpoint]( @@ -19,9 +21,7 @@ case class ExecutionResult[TCheckpoint]( checkpoint: TCheckpoint ) -class CheckpointingExecutor[TCheckpoint <: Resource]( - fileSystem: FileSystem -)( +class CheckpointingExecutor[TCheckpoint <: Resource]()( implicit icr: Derivation[ConfigReader[TCheckpoint]], icmr: Derivation[ConfigReader[Manifest[TCheckpoint]]], icw: Derivation[ConfigWriter[TCheckpoint]], @@ -56,23 +56,19 @@ class CheckpointingExecutor[TCheckpoint <: Resource]( def readCheckpoint( checkpointPath: Path ): Option[TCheckpoint] = { - if (!fileSystem.exists(checkpointPath)) + if (!File(checkpointPath).exists) return None - val inputStream = fileSystem.open(checkpointPath) - val cacheInfo = yaml.load[Manifest[TCheckpoint]](inputStream).content + val cacheInfo = yaml.load[Manifest[TCheckpoint]](checkpointPath).content // TODO: detect when cache should be invalidated Some(cacheInfo) } def writeCheckpoint(checkpointPath: Path, checkpoint: TCheckpoint): Unit = { - if (!fileSystem.exists(checkpointPath.getParent)) - fileSystem.mkdirs(checkpointPath.getParent) - - val outputStream = fileSystem.create(checkpointPath) + if (!File(checkpointPath.getParent).exists) + File(checkpointPath.getParent).createDirectories() - yaml.save(Manifest(checkpoint), outputStream) - outputStream.close() + yaml.save(Manifest(checkpoint), checkpointPath) } } diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/ingest/IngestService.scala b/core.coordinator/src/main/scala/dev/kamu/cli/ingest/IngestService.scala index 3a20cb3ab0..ca9d9d7b4e 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/ingest/IngestService.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/ingest/IngestService.scala @@ -8,8 +8,10 @@ package dev.kamu.cli.ingest +import java.nio.file.Path import java.time.Instant +import better.files.File import dev.kamu.cli.WorkspaceLayout import dev.kamu.cli.ingest.convert.{ConversionStepFactory, IngestCheckpoint} import dev.kamu.cli.ingest.fetch.{ @@ -34,15 +36,13 @@ import org.apache.commons.compress.compressors.bzip2.{ BZip2CompressorInputStream, BZip2CompressorOutputStream } -import org.apache.commons.io.IOUtils -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.log4j.LogManager +import org.apache.commons.io.{FileUtils, IOUtils} +import org.apache.logging.log4j.LogManager import pureconfig.generic.auto._ import dev.kamu.core.manifests.parsing.pureconfig.yaml import dev.kamu.core.manifests.parsing.pureconfig.yaml.defaults._ class IngestService( - fileSystem: FileSystem, workspaceLayout: WorkspaceLayout, metadataRepository: MetadataRepository, engineFactory: EngineFactory, @@ -56,15 +56,12 @@ class IngestService( private val logger = LogManager.getLogger(getClass.getName) - private val sourceFactory = new SourceFactory(fileSystem, systemClock) + private val sourceFactory = new SourceFactory(systemClock) private val conversionStepFactory = new ConversionStepFactory() - private val prepStepFactory = new PrepStepFactory(fileSystem) - private val downloadExecutor = - new CheckpointingExecutor[DownloadCheckpoint](fileSystem) - private val prepExecutor = - new CheckpointingExecutor[PrepCheckpoint](fileSystem) - private val ingestExecutor = - new CheckpointingExecutor[IngestCheckpoint](fileSystem) + private val prepStepFactory = new PrepStepFactory() + private val downloadExecutor = new CheckpointingExecutor[DownloadCheckpoint]() + private val prepExecutor = new CheckpointingExecutor[PrepCheckpoint]() + private val ingestExecutor = new CheckpointingExecutor[IngestCheckpoint]() def pollAndIngest(datasetIDs: Seq[DatasetID]): Unit = { for (datasetID <- datasetIDs) { @@ -86,30 +83,20 @@ class IngestService( s"Processing data source: $datasetID:${externalSource.sourceID}" ) - val downloadCheckpointPath = datasetLayout.checkpointsDir - .resolve(externalSource.sourceID) - .resolve(downloadCheckpointFileName) - val downloadDataPath = datasetLayout.cacheDir - .resolve(externalSource.sourceID) - .resolve(downloadDataFileName) - val prepCheckpointPath = datasetLayout.checkpointsDir - .resolve(externalSource.sourceID) - .resolve(prepCheckpointFileName) - val prepDataPath = datasetLayout.cacheDir - .resolve(externalSource.sourceID) - .resolve(prepDataFileName) - val ingestCheckpointPath = datasetLayout.checkpointsDir - .resolve(externalSource.sourceID) - .resolve(ingestCheckpointFileName) + val downloadCheckpointPath = datasetLayout.checkpointsDir / externalSource.sourceID / downloadCheckpointFileName + val downloadDataPath = datasetLayout.cacheDir / externalSource.sourceID / downloadDataFileName + val prepCheckpointPath = datasetLayout.checkpointsDir / externalSource.sourceID / prepCheckpointFileName + val prepDataPath = datasetLayout.cacheDir / externalSource.sourceID / prepDataFileName + val ingestCheckpointPath = datasetLayout.checkpointsDir / externalSource.sourceID / ingestCheckpointFileName Seq( downloadCheckpointPath, downloadDataPath, prepCheckpointPath, prepDataPath - ).map(_.getParent) - .filter(!fileSystem.exists(_)) - .foreach(fileSystem.mkdirs) + ).map(p => File(p.getParent)) + .filter(!_.exists) + .foreach(_.createDirectories()) logger.debug(s"Stage: polling") val downloadResult = maybeDownload( @@ -157,10 +144,7 @@ class IngestService( ) // Clean up the source cache dir - fileSystem.delete( - datasetLayout.cacheDir.resolve(externalSource.sourceID), - true - ) + File(datasetLayout.cacheDir.resolve(externalSource.sourceID)).delete() } } } @@ -180,9 +164,8 @@ class IngestService( storedCheckpoint, cachingBehavior, body => { - val outputStream = fileSystem.create(downloadDataPath, true) - val compressedStream = - new BZip2CompressorOutputStream(outputStream) + val outputStream = File(downloadDataPath).newOutputStream() + val compressedStream = new BZip2CompressorOutputStream(outputStream) try { IOUtils.copy(body, compressedStream) } finally { @@ -222,10 +205,10 @@ class IngestService( val prepStep = prepStepFactory.getComposedSteps(source.prepare) val convertStep = conversionStepFactory.getComposedSteps(source.read) - val inputStream = fileSystem.open(downloadDataPath) + val inputStream = File(downloadDataPath).newInputStream val decompressedInStream = new BZip2CompressorInputStream(inputStream) - val outputStream = fileSystem.create(prepDataPath, true) + val outputStream = File(prepDataPath).newOutputStream val compressedOutStream = new BZip2CompressorOutputStream(outputStream) @@ -328,9 +311,9 @@ class IngestService( ) ) - val dataSize = fileSystem - .getContentSummary(metadataRepository.getDatasetLayout(datasetID).dataDir) - .getSpaceConsumed + val dataSize = FileUtils.sizeOfDirectory( + metadataRepository.getDatasetLayout(datasetID).dataDir.toFile + ) // TODO: Atomicity metaChain.updateSummary( diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/ingest/convert/ConversionStepFactory.scala b/core.coordinator/src/main/scala/dev/kamu/cli/ingest/convert/ConversionStepFactory.scala index 9765fa22eb..f7680bd8a0 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/ingest/convert/ConversionStepFactory.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/ingest/convert/ConversionStepFactory.scala @@ -9,7 +9,7 @@ package dev.kamu.cli.ingest.convert import dev.kamu.core.manifests.ReaderKind -import org.apache.log4j.LogManager +import org.apache.logging.log4j.LogManager class ConversionStepFactory { val logger = LogManager.getLogger(getClass.getName) diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/ingest/fetch/CacheableSource.scala b/core.coordinator/src/main/scala/dev/kamu/cli/ingest/fetch/CacheableSource.scala index 65b0ac3733..a4d07bc5fc 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/ingest/fetch/CacheableSource.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/ingest/fetch/CacheableSource.scala @@ -11,7 +11,7 @@ package dev.kamu.cli.ingest.fetch import java.io.InputStream import dev.kamu.cli.ingest.ExecutionResult -import org.apache.log4j.LogManager +import org.apache.logging.log4j.LogManager trait CacheableSource { protected val logger = LogManager.getLogger(getClass.getName) diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/ingest/fetch/CachingBehavior.scala b/core.coordinator/src/main/scala/dev/kamu/cli/ingest/fetch/CachingBehavior.scala index 91a63b6d61..1b02c0128a 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/ingest/fetch/CachingBehavior.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/ingest/fetch/CachingBehavior.scala @@ -8,7 +8,7 @@ package dev.kamu.cli.ingest.fetch -import org.apache.log4j.LogManager +import org.apache.logging.log4j.LogManager trait CachingBehavior { protected val logger = LogManager.getLogger(getClass.getName) diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/ingest/fetch/FileSystemSource.scala b/core.coordinator/src/main/scala/dev/kamu/cli/ingest/fetch/FileSystemSource.scala index 511e8ff16b..1717c5ec6e 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/ingest/fetch/FileSystemSource.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/ingest/fetch/FileSystemSource.scala @@ -9,15 +9,15 @@ package dev.kamu.cli.ingest.fetch import java.io.InputStream +import java.nio.file.Path import java.time.Instant +import better.files.File import dev.kamu.cli.ingest.ExecutionResult import dev.kamu.core.utils.Clock -import org.apache.hadoop.fs.{FileSystem, Path} class FileSystemSource( val sourceID: String, - fileSystem: FileSystem, systemClock: Clock, val path: Path, eventTimeSource: EventTimeSource @@ -35,10 +35,9 @@ class FileSystemSource( ) logger.debug(s"FS stat $path") - val fs = path.getFileSystem(fileSystem.getConf) val lastModified = - Instant.ofEpochMilli(fs.getFileStatus(path).getModificationTime) + Instant.ofEpochMilli(path.toFile.lastModified()) val needsPull = checkpoint .flatMap(_.lastModified) @@ -46,7 +45,7 @@ class FileSystemSource( if (needsPull) { logger.debug(s"FS reading $path") - handler(fs.open(path)) + handler(File(path).newInputStream) ExecutionResult( wasUpToDate = false, diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/ingest/fetch/SourceFactory.scala b/core.coordinator/src/main/scala/dev/kamu/cli/ingest/fetch/SourceFactory.scala index 621c0a6fdf..1d7cc6dfd0 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/ingest/fetch/SourceFactory.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/ingest/fetch/SourceFactory.scala @@ -8,6 +8,9 @@ package dev.kamu.cli.ingest.fetch +import java.nio.file.Paths + +import better.files.File import dev.kamu.core.manifests.{ CachingKind, EventTimeKind, @@ -15,10 +18,9 @@ import dev.kamu.core.manifests.{ OrderingKind } import dev.kamu.core.utils.Clock -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.log4j.LogManager +import org.apache.logging.log4j.LogManager -class SourceFactory(fileSystem: FileSystem, systemClock: Clock) { +class SourceFactory(systemClock: Clock) { private val logger = LogManager.getLogger(getClass.getName) def getSource(kind: FetchSourceKind): Seq[CacheableSource] = { @@ -69,9 +71,8 @@ class SourceFactory(fileSystem: FileSystem, systemClock: Clock) { Seq( new FileSystemSource( "primary", - fileSystem, systemClock, - new Path(kind.url), + Paths.get(kind.url), eventTimeSource ) ) @@ -84,18 +85,18 @@ class SourceFactory(fileSystem: FileSystem, systemClock: Clock) { kind: FetchSourceKind.FilesGlob, eventTimeSource: EventTimeSource ): Seq[CacheableSource] = { - val globbed = fileSystem - .globStatus(kind.path) + val globbed = File(kind.path.getParent) + .glob(kind.path.getFileName.toString) .map( f => new FileSystemSource( - f.getPath.getName, - fileSystem, + f.name, systemClock, - f.getPath, + f.path, eventTimeSource ) ) + .toVector val orderBy = kind.orderBy.getOrElse( if (kind.eventTime.isDefined) OrderingKind.ByMetadataEventTime @@ -104,16 +105,13 @@ class SourceFactory(fileSystem: FileSystem, systemClock: Clock) { val sorted = orderBy match { case OrderingKind.ByName => - globbed.sortWith( - (lhs, rhs) => - lhs.path.getName.compareToIgnoreCase(rhs.path.getName) < 0 - ) + globbed.sortBy(_.path.getFileName.toString.toLowerCase()) case OrderingKind.ByMetadataEventTime => globbed.sortBy(eventTimeSource.getEventTime) } logger.debug( - s"Glob pattern resolved to: ${sorted.map(_.path.getName).mkString(", ")}" + s"Glob pattern resolved to: ${sorted.map(_.path.getFileName).mkString(", ")}" ) sorted diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/ingest/prep/PrepStep.scala b/core.coordinator/src/main/scala/dev/kamu/cli/ingest/prep/PrepStep.scala index 6d8a2a201a..d6088618a9 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/ingest/prep/PrepStep.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/ingest/prep/PrepStep.scala @@ -10,7 +10,7 @@ package dev.kamu.cli.ingest.prep import java.io.InputStream -import org.apache.log4j.{LogManager, Logger} +import org.apache.logging.log4j.{LogManager, Logger} trait PrepStep { protected val logger: Logger = LogManager.getLogger(getClass.getName) diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/ingest/prep/PrepStepFactory.scala b/core.coordinator/src/main/scala/dev/kamu/cli/ingest/prep/PrepStepFactory.scala index 8721e63348..772d3c4cd6 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/ingest/prep/PrepStepFactory.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/ingest/prep/PrepStepFactory.scala @@ -9,10 +9,9 @@ package dev.kamu.cli.ingest.prep import dev.kamu.core.manifests.PrepStepKind -import org.apache.hadoop.fs.FileSystem -import org.apache.log4j.LogManager +import org.apache.logging.log4j.LogManager -class PrepStepFactory(fileSystem: FileSystem) { +class PrepStepFactory() { private val logger = LogManager.getLogger(getClass.getName) def getStep( diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/metadata/GenericResourceRepository.scala b/core.coordinator/src/main/scala/dev/kamu/cli/metadata/GenericResourceRepository.scala index 286379ad7a..9584237c00 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/metadata/GenericResourceRepository.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/metadata/GenericResourceRepository.scala @@ -9,18 +9,18 @@ package dev.kamu.cli.metadata import java.net.URI +import java.nio.file.{Path, Paths} +import better.files.File import dev.kamu.core.manifests.parsing.pureconfig.yaml import dev.kamu.core.manifests.{DatasetID, Manifest, Resource} import dev.kamu.core.utils.fs._ -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.log4j.LogManager +import org.apache.logging.log4j.LogManager import pureconfig.{ConfigReader, ConfigWriter, Derivation} import scala.reflect.ClassTag class GenericResourceRepository[TRes <: Resource: ClassTag, TID]( - fileSystem: FileSystem, storagePath: Path, resourceKind: String, idFromString: String => TID, @@ -36,11 +36,11 @@ class GenericResourceRepository[TRes <: Resource: ClassTag, TID]( } def getAllResourceIDs(): Seq[TID] = { - fileSystem - .listStatus(storagePath) - .map(_.getPath.getName) + File(storagePath).list + .map(_.name) .map(filename => filename.substring(0, filename.length - ".yaml".length)) .map(idFromString) + .toSeq } def getResource(id: TID): TRes = { @@ -53,25 +53,23 @@ class GenericResourceRepository[TRes <: Resource: ClassTag, TID]( def getResourceOpt(id: TID): Option[TRes] = { val path = getResourcePath(id) - if (!fileSystem.exists(path)) + if (!File(path).exists) None else Some(loadResourceFromFile(path)) } def getAllResources(): Seq[TRes] = { - val resourceFiles = fileSystem - .listStatus(storagePath) - .map(_.getPath) - - resourceFiles.map(loadResourceFromFile) + File(storagePath).list + .map(f => loadResourceFromFile(f.path)) + .toSeq } def addResource(res: TRes): Unit = { val id = idFromRes(res) val path = getResourcePath(id) - if (fileSystem.exists(path)) + if (File(path).exists) throw new AlreadyExistsException( id.toString, resourceKind @@ -81,24 +79,21 @@ class GenericResourceRepository[TRes <: Resource: ClassTag, TID]( } def deleteResource(id: TID): Unit = { - val path = getResourcePath(id) + val file = File(getResourcePath(id)) - if (!fileSystem.exists(path)) + if (!file.isRegularFile) throw new DoesNotExistException(id.toString, resourceKind) - fileSystem.delete(path, false) + file.delete() } def loadResourceFromFile(p: Path): TRes = { - val inputStream = fileSystem.open(p) try { - yaml.load[Manifest[TRes]](inputStream).content + yaml.load[Manifest[TRes]](p).content } catch { case e: Exception => logger.error(s"Error while loading $resourceKind from file: $p") throw e - } finally { - inputStream.close() } } @@ -108,25 +103,14 @@ class GenericResourceRepository[TRes <: Resource: ClassTag, TID]( } def saveResourceToFile(res: TRes, path: Path): Unit = { - val outputStream = fileSystem.create(path) - - try { - yaml.save(Manifest(res), outputStream) - } catch { - case e: Exception => - outputStream.close() - fileSystem.delete(path, false) - throw e - } finally { - outputStream.close() - } + yaml.save(Manifest(res), path) } def loadResourceFromURI(uri: URI): TRes = { uri.getScheme match { case "https" => loadResourceFromURL(uri.toURL) case "http" => loadResourceFromURL(uri.toURL) - case null | "file" => loadResourceFromFile(new Path(uri.getPath)) + case null | "file" => loadResourceFromFile(Paths.get(uri.getPath)) case s => throw new SchemaNotSupportedException(s) } } diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/metadata/MetadataChain.scala b/core.coordinator/src/main/scala/dev/kamu/cli/metadata/MetadataChain.scala index 0a3187258b..87b7b23225 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/metadata/MetadataChain.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/metadata/MetadataChain.scala @@ -9,20 +9,20 @@ package dev.kamu.cli.metadata import java.nio.charset.StandardCharsets +import java.nio.file.Path import java.security.MessageDigest import java.time.Instant +import better.files.File import dev.kamu.core.manifests._ import dev.kamu.core.manifests.parsing.pureconfig.yaml import dev.kamu.core.manifests.parsing.pureconfig.yaml.defaults._ -import dev.kamu.core.utils.fs._ -import org.apache.hadoop.fs.{FileSystem, Path} import pureconfig.{ConfigReader, ConfigWriter, Derivation} import pureconfig.generic.auto._ import scala.reflect.ClassTag -class MetadataChain(fileSystem: FileSystem, datasetDir: Path) { +class MetadataChain(datasetDir: Path) { def init(ds: DatasetSnapshot, systemTime: Instant): Unit = { val initialBlock = MetadataBlock( @@ -42,12 +42,12 @@ class MetadataChain(fileSystem: FileSystem, datasetDir: Path) { ) try { - fileSystem.mkdirs(blocksDir) + File(blocksDir).createDirectories() saveResource(initialSummary, summaryPath) saveResource(initialBlock, blocksDir.resolve(initialBlock.blockHash)) } catch { case e: Exception => - fileSystem.delete(datasetDir, true) + File(datasetDir).delete() throw e } } @@ -87,10 +87,8 @@ class MetadataChain(fileSystem: FileSystem, datasetDir: Path) { /** Returns metadata blocks in historical order */ def getBlocks(): Vector[MetadataBlock] = { - val blocks = fileSystem - .listStatus(blocksDir) - .map(_.getPath) - .map(loadResource[MetadataBlock]) + val blocks = File(blocksDir).list + .map(f => loadResource[MetadataBlock](f.path)) .map(b => (b.blockHash, b)) .toMap @@ -121,23 +119,13 @@ class MetadataChain(fileSystem: FileSystem, datasetDir: Path) { protected def saveResource[T <: Resource: ClassTag](obj: T, path: Path)( implicit derivation: Derivation[ConfigWriter[Manifest[T]]] ): Unit = { - val outputStream = fileSystem.create(path) - try { - yaml.save(Manifest(obj), outputStream) - } finally { - outputStream.close() - } + yaml.save(Manifest(obj), path) } protected def loadResource[T <: Resource: ClassTag](path: Path)( implicit derivation: Derivation[ConfigReader[Manifest[T]]] ): T = { - val inputStream = fileSystem.open(path) - try { - yaml.load[Manifest[T]](inputStream).content - } finally { - inputStream.close() - } + yaml.load[Manifest[T]](path).content } protected implicit class MetadataBlockEx(b: MetadataBlock) { diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/metadata/MetadataRepository.scala b/core.coordinator/src/main/scala/dev/kamu/cli/metadata/MetadataRepository.scala index bb0eac56ee..c596abca01 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/metadata/MetadataRepository.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/metadata/MetadataRepository.scala @@ -10,7 +10,9 @@ package dev.kamu.cli.metadata import java.io.PrintWriter import java.net.URI +import java.nio.file.Path +import better.files.File import dev.kamu.cli.utility.DependencyGraph import dev.kamu.cli._ import dev.kamu.core.manifests._ @@ -19,11 +21,9 @@ import dev.kamu.core.manifests.parsing.pureconfig.yaml import yaml.defaults._ import pureconfig.generic.auto._ import dev.kamu.core.utils.fs._ -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.log4j.LogManager +import org.apache.logging.log4j.LogManager class MetadataRepository( - fileSystem: FileSystem, workspaceLayout: WorkspaceLayout, systemClock: Clock ) { @@ -31,7 +31,6 @@ class MetadataRepository( protected val volumeRepo = new GenericResourceRepository[Remote, RemoteID]( - fileSystem, workspaceLayout.remotesDir, "remote", RemoteID, @@ -54,19 +53,18 @@ class MetadataRepository( } protected def ensureDatasetExists(id: DatasetID): Unit = { - if (!fileSystem.exists(workspaceLayout.metadataDir.resolve(id.toString))) + if (!File(workspaceLayout.metadataDir.resolve(id.toString)).isDirectory) throw new DoesNotExistException(id.toString, "dataset") } protected def ensureDatasetExistsAndPulled(id: DatasetID): Unit = { ensureDatasetExists(id) - if (!fileSystem.exists(datasetMetadataDir(id))) + if (!File(datasetMetadataDir(id)).isDirectory) throw new DoesNotExistException(id.toString, "dataset") } def isRemote(id: DatasetID): Boolean = { - val refPath = remoteRefFilePath(id) - fileSystem.exists(refPath) + File(remoteRefFilePath(id)).isRegularFile } def getDatasetLayout(id: DatasetID): DatasetLayout = { @@ -83,7 +81,7 @@ class MetadataRepository( } def getMetadataChain(id: DatasetID): MetadataChain = { - new MetadataChain(fileSystem, getDatasetLayout(id).metadataDir) + new MetadataChain(getDatasetLayout(id).metadataDir) } def getDatasetKind(id: DatasetID): DatasetKind = { @@ -97,7 +95,7 @@ class MetadataRepository( def getDatasetSummary(id: DatasetID): DatasetSummary = { ensureDatasetExistsAndPulled(id) - val chain = new MetadataChain(fileSystem, datasetMetadataDir(id)) + val chain = new MetadataChain(datasetMetadataDir(id)) chain.getSummary() } @@ -111,8 +109,7 @@ class MetadataRepository( val refFile = remoteRefFilePath(id) - new ResourceLoader(fileSystem) - .loadResourceFromFile[DatasetRef](refFile) + new ResourceLoader().loadResourceFromFile[DatasetRef](refFile) } protected def getDatasetDependencies(id: DatasetID): List[DatasetID] = { @@ -162,20 +159,20 @@ class MetadataRepository( } def getAllDatasets(): Seq[DatasetID] = { - fileSystem - .listStatus(workspaceLayout.metadataDir) - .map(_.getPath.getName) + File(workspaceLayout.metadataDir).list + .map(_.name) .map(DatasetID) + .toSeq } def loadDatasetSnapshotFromURI(uri: URI): DatasetSnapshot = { - new ResourceLoader(fileSystem).loadResourceFromURI[DatasetSnapshot](uri) + new ResourceLoader().loadResourceFromURI[DatasetSnapshot](uri) } def addDataset(ds: DatasetSnapshot): Unit = { val datasetDir = workspaceLayout.metadataDir.resolve(ds.id.toString) - if (fileSystem.exists(datasetDir)) + if (File(datasetDir).exists) throw new AlreadyExistsException(ds.id.toString, "dataset") try { @@ -190,7 +187,7 @@ class MetadataRepository( ) } - val chain = new MetadataChain(fileSystem, datasetDir) + val chain = new MetadataChain(datasetDir) chain.init(ds, systemClock.instant()) } @@ -199,13 +196,13 @@ class MetadataRepository( val datasetDir = workspaceLayout.metadataDir.resolve(localDatasetID.toString) - if (fileSystem.exists(datasetDir)) + if (File(datasetDir).exists) throw new AlreadyExistsException(localDatasetID.toString, "dataset") getRemote(datasetRef.remoteID) - fileSystem.mkdirs(datasetDir) - new ResourceLoader(fileSystem) + File(datasetDir).createDirectories() + new ResourceLoader() .saveResourceToFile(datasetRef, remoteRefFilePath(localDatasetID)) } @@ -221,11 +218,15 @@ class MetadataRepository( throw new DanglingReferenceException(referencedBy.map(_.id), id) val layout = getDatasetLayout(id) - fileSystem.delete(layout.cacheDir, true) - fileSystem.delete(layout.dataDir, true) - fileSystem.delete(layout.checkpointsDir, true) - fileSystem.delete(layout.metadataDir, true) - fileSystem.delete(workspaceLayout.metadataDir.resolve(id.toString), true) + + Seq( + layout.cacheDir, + layout.cacheDir, + layout.dataDir, + layout.checkpointsDir, + layout.metadataDir, + workspaceLayout.metadataDir.resolve(id.toString) + ).foreach(p => File(p).delete()) } //////////////////////////////////////////////////////////////////////////// @@ -233,10 +234,13 @@ class MetadataRepository( //////////////////////////////////////////////////////////////////////////// def getLocalVolume(): VolumeLayout = { - if (!fileSystem.exists(workspaceLayout.localVolumeDir)) { - fileSystem.mkdirs(workspaceLayout.localVolumeDir) - val outputStream = - fileSystem.create(workspaceLayout.localVolumeDir.resolve(".gitignore")) + if (!File(workspaceLayout.localVolumeDir).isDirectory) { + File(workspaceLayout.localVolumeDir).createDirectories() + + val outputStream = File( + workspaceLayout.localVolumeDir.resolve(".gitignore") + ).newOutputStream + val writer = new PrintWriter(outputStream) writer.write(WorkspaceLayout.LOCAL_VOLUME_GITIGNORE_CONTENT) writer.close() diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/metadata/ResourceLoader.scala b/core.coordinator/src/main/scala/dev/kamu/cli/metadata/ResourceLoader.scala index e8cff6348a..be746b404f 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/metadata/ResourceLoader.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/metadata/ResourceLoader.scala @@ -17,54 +17,30 @@ package dev.kamu.cli.metadata */ import java.net.URI +import java.nio.file.{Path, Paths} import dev.kamu.core.manifests.parsing.pureconfig.yaml import yaml.defaults._ import dev.kamu.core.manifests.{Manifest, Resource} -import dev.kamu.core.utils.fs._ -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.log4j.LogManager +import org.apache.logging.log4j.LogManager import pureconfig.{ConfigReader, ConfigWriter, Derivation} import scala.reflect.ClassTag // TODO: Remove this class? -class ResourceLoader( - fileSystem: FileSystem -) { +class ResourceLoader() { private val logger = LogManager.getLogger(getClass.getName) - def loadResourceFromFile[T <: Resource: ClassTag]( - p: Path - )( + def loadResourceFromFile[T <: Resource: ClassTag](p: Path)( implicit reader: Derivation[ConfigReader[Manifest[T]]] ): T = { - val inputStream = fileSystem.open(p) - try { - yaml.load[Manifest[T]](inputStream).content - } finally { - inputStream.close() - } + yaml.load[Manifest[T]](p).content } - def saveResourceToFile[T <: Resource: ClassTag]( - res: T, - path: Path - )( + def saveResourceToFile[T <: Resource: ClassTag](res: T, path: Path)( implicit writer: Derivation[ConfigWriter[Manifest[T]]] ): Unit = { - val outputStream = fileSystem.create(path) - - try { - yaml.save(Manifest(res), outputStream) - } catch { - case e: Exception => - outputStream.close() - fileSystem.delete(path, false) - throw e - } finally { - outputStream.close() - } + yaml.save(Manifest(res), path) } def loadResourceFromURI[T <: Resource: ClassTag]( @@ -73,10 +49,11 @@ class ResourceLoader( implicit reader: Derivation[ConfigReader[Manifest[T]]] ): T = { uri.getScheme match { - case "https" => loadResourceFromURL(uri.toURL) - case "http" => loadResourceFromURL(uri.toURL) - case null | "file" => loadResourceFromFile(new Path(uri.getPath)) - case s => throw new SchemaNotSupportedException(s) + case "https" => loadResourceFromURL(uri.toURL) + case "http" => loadResourceFromURL(uri.toURL) + case "file" => loadResourceFromFile(Paths.get(uri)) + case null => loadResourceFromFile(Paths.get(uri.toString)) + case s => throw new SchemaNotSupportedException(s) } } diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/transform/EngineFactory.scala b/core.coordinator/src/main/scala/dev/kamu/cli/transform/EngineFactory.scala index d5d67810c0..e5241769a7 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/transform/EngineFactory.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/transform/EngineFactory.scala @@ -16,8 +16,7 @@ import dev.kamu.core.manifests.infra.{ IngestResult } import dev.kamu.core.utils.DockerClient -import org.apache.hadoop.fs.FileSystem -import org.apache.log4j.Level +import org.apache.logging.log4j.Level trait Engine { def ingest(request: IngestRequest): IngestResult @@ -25,7 +24,6 @@ trait Engine { } class EngineFactory( - fileSystem: FileSystem, workspaceLayout: WorkspaceLayout, logLevel: Level ) { @@ -33,16 +31,14 @@ class EngineFactory( engineID match { case "sparkSQL" => new SparkEngine( - fileSystem, workspaceLayout, logLevel, - new DockerClient(fileSystem) + new DockerClient() ) case "flink" => new FlinkEngine( - fileSystem, workspaceLayout, - new DockerClient(fileSystem) + new DockerClient() ) case _ => throw new NotImplementedError(s"Unsupported engine: $engineID") } diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/transform/FlinkEngine.scala b/core.coordinator/src/main/scala/dev/kamu/cli/transform/FlinkEngine.scala index bd7536fb8b..63d84c5ab5 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/transform/FlinkEngine.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/transform/FlinkEngine.scala @@ -8,8 +8,9 @@ package dev.kamu.cli.transform -import java.io.OutputStream +import java.nio.file.{Path, Paths} +import better.files.File import dev.kamu.cli.WorkspaceLayout import scala.concurrent.duration._ @@ -25,6 +26,7 @@ import dev.kamu.core.manifests.infra.{ IngestResult } import dev.kamu.core.utils.fs._ +import dev.kamu.core.utils.Temp import dev.kamu.core.utils.{ DockerClient, DockerProcessBuilder, @@ -32,11 +34,9 @@ import dev.kamu.core.utils.{ ExecArgs, IOHandlerPresets } -import org.apache.hadoop.fs.{FileSystem, Path} import org.slf4j.LoggerFactory class FlinkEngine( - fileSystem: FileSystem, workspaceLayout: WorkspaceLayout, dockerClient: DockerClient, image: String = DockerImages.FLINK, @@ -51,20 +51,19 @@ class FlinkEngine( override def executeQuery( request: ExecuteQueryRequest ): ExecuteQueryResult = { - val inOutDirInContainer = new Path("/opt/engine/in-out") - val engineJarInContainer = new Path("/opt/engine/bin/engine.flink.jar") + val inOutDirInContainer = Paths.get("/opt/engine/in-out") + val engineJarInContainer = Paths.get("/opt/engine/bin/engine.flink.jar") val workspaceVolumes = Seq(workspaceLayout.kamuRootDir, workspaceLayout.localVolumeDir) - .filter(fileSystem.exists) + .filter(p => File(p).exists) .map(p => (p, p)) .toMap Temp.withRandomTempDir( - fileSystem, "kamu-inout-" ) { inOutDir => - yaml.save(Manifest(request), fileSystem, inOutDir.resolve("request.yaml")) + yaml.save(Manifest(request), inOutDir / "request.yaml") dockerClient.withNetwork(networkName) { @@ -151,12 +150,7 @@ class FlinkEngine( } } - yaml - .load[Manifest[ExecuteQueryResult]]( - fileSystem, - inOutDir.resolve("result.yaml") - ) - .content + yaml.load[Manifest[ExecuteQueryResult]](inOutDir / "result.yaml").content } } @@ -164,13 +158,12 @@ class FlinkEngine( val checkpointsDir = request.datasetLayouts(request.datasetID.toString).checkpointsDir - if (!fileSystem.exists(checkpointsDir)) + if (!File(checkpointsDir).exists) return None - val allSavepoints = fileSystem - .listStatus(checkpointsDir) - .map(_.getPath) - .filter(fileSystem.isDirectory) + val allSavepoints = File(checkpointsDir).list + .filter(_.isDirectory) + .toList // TODO: Atomicity if (allSavepoints.length > 1) @@ -180,7 +173,7 @@ class FlinkEngine( logger.debug("Using savepoint: {}", allSavepoints.headOption) - allSavepoints.headOption + allSavepoints.map(_.path).headOption } // TODO: Atomicity @@ -190,6 +183,6 @@ class FlinkEngine( logger.debug("Deleting savepoint: {}", oldSavepoint) - oldSavepoint.foreach(fileSystem.delete(_, true)) + oldSavepoint.foreach(p => File(p).delete()) } } diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/transform/SparkEngine.scala b/core.coordinator/src/main/scala/dev/kamu/cli/transform/SparkEngine.scala index c20d96cf44..389b280c38 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/transform/SparkEngine.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/transform/SparkEngine.scala @@ -9,7 +9,9 @@ package dev.kamu.cli.transform import java.io.OutputStream +import java.nio.file.{Path, Paths} +import better.files.File import pureconfig.generic.auto._ import dev.kamu.core.manifests.parsing.pureconfig.yaml import dev.kamu.core.manifests.parsing.pureconfig.yaml.defaults._ @@ -24,12 +26,11 @@ import dev.kamu.core.manifests.infra.{ } import dev.kamu.core.utils.{DockerClient, DockerRunArgs} import dev.kamu.core.utils.fs._ +import dev.kamu.core.utils.Temp import org.apache.commons.io.IOUtils -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.log4j.{Level, LogManager, Logger} +import org.apache.logging.log4j.{Level, LogManager, Logger} class SparkEngine( - fileSystem: FileSystem, workspaceLayout: WorkspaceLayout, logLevel: Level, dockerClient: DockerClient, @@ -39,20 +40,15 @@ class SparkEngine( override def ingest(request: IngestRequest): IngestResult = { Temp.withRandomTempDir( - fileSystem, "kamu-inout-" ) { inOutDir => - yaml.save( - Manifest(request), - fileSystem, - inOutDir.resolve("request.yaml") - ) + yaml.save(Manifest(request), inOutDir / "request.yaml") // TODO: Account for missing files val extraMounts = request.source.fetch match { case furl: FetchSourceKind.Url => furl.url.getScheme match { - case "file" | null => List(new Path(furl.url)) + case "file" | null => List(Paths.get(furl.url)) case _ => List.empty } case glob: FetchSourceKind.FilesGlob => @@ -65,12 +61,7 @@ class SparkEngine( submit("dev.kamu.engine.spark.ingest.IngestApp", inOutDir, extraMounts) - yaml - .load[Manifest[IngestResult]]( - fileSystem, - inOutDir.resolve("result.yaml") - ) - .content + yaml.load[Manifest[IngestResult]](inOutDir / "result.yaml").content } } @@ -78,14 +69,9 @@ class SparkEngine( request: ExecuteQueryRequest ): ExecuteQueryResult = { Temp.withRandomTempDir( - fileSystem, "kamu-inout-" ) { inOutDir => - yaml.save( - Manifest(request), - fileSystem, - inOutDir.resolve("request.yaml") - ) + yaml.save(Manifest(request), inOutDir / "request.yaml") submit( "dev.kamu.engine.spark.transform.TransformApp", @@ -93,12 +79,7 @@ class SparkEngine( Seq.empty ) - yaml - .load[Manifest[ExecuteQueryResult]]( - fileSystem, - inOutDir.resolve("result.yaml") - ) - .content + yaml.load[Manifest[ExecuteQueryResult]](inOutDir / "result.yaml").content } } @@ -107,22 +88,21 @@ class SparkEngine( inOutDir: Path, extraMounts: Seq[Path] ): Unit = { - val inOutDirInContainer = new Path("/opt/engine/in-out") - val engineJarInContainer = new Path("/opt/engine/bin/engine.spark.jar") + val inOutDirInContainer = Paths.get("/opt/engine/in-out") + val engineJarInContainer = Paths.get("/opt/engine/bin/engine.spark.jar") Temp.withTempFile( - fileSystem, "kamu-logging-cfg-", writeLog4jConfig ) { loggingConfigPath => val workspaceVolumes = Seq(workspaceLayout.kamuRootDir, workspaceLayout.localVolumeDir) - .filter(fileSystem.exists) + .filter(p => File(p).exists) .map(p => (p, p)) .toMap val appVolumes = Map( - loggingConfigPath -> new Path("/opt/spark/conf/log4j.properties"), + loggingConfigPath -> Paths.get("/opt/spark/conf/log4j.properties"), inOutDir -> inOutDirInContainer ) diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/transform/TransformService.scala b/core.coordinator/src/main/scala/dev/kamu/cli/transform/TransformService.scala index 5439db1173..0d07152cac 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/transform/TransformService.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/transform/TransformService.scala @@ -8,6 +8,7 @@ package dev.kamu.cli.transform +import better.files.File import dev.kamu.cli.metadata.{MetadataChain, MetadataRepository} import dev.kamu.core.manifests._ import dev.kamu.core.manifests.infra.{ @@ -17,8 +18,8 @@ import dev.kamu.core.manifests.infra.{ Watermark } import dev.kamu.core.utils.Clock -import org.apache.hadoop.fs.FileSystem -import org.apache.log4j.LogManager +import org.apache.commons.io.FileUtils +import org.apache.logging.log4j.LogManager import spire.math.Interval import spire.math.interval.{Closed, Unbound, ValueBound} @@ -34,7 +35,6 @@ case class TransformBatch( } class TransformService( - fileSystem: FileSystem, metadataRepository: MetadataRepository, systemClock: Clock, engineFactory: EngineFactory @@ -47,8 +47,7 @@ class TransformService( val missingInputs = batch.inputSlices.keys.filter( inputID => - !fileSystem - .exists(metadataRepository.getDatasetLayout(inputID).dataDir) + !File(metadataRepository.getDatasetLayout(inputID).dataDir).exists ) if (missingInputs.nonEmpty) { @@ -104,8 +103,8 @@ class TransformService( ) val dataSize = Some(metadataRepository.getDatasetLayout(datasetID).dataDir) - .filter(fileSystem.exists) - .map(p => fileSystem.getContentSummary(p).getSpaceConsumed) + .filter(p => File(p).exists) + .map(p => FileUtils.sizeOfDirectory(p.toFile)) .getOrElse(0L) outputMetaChain.updateSummary( diff --git a/core.coordinator/src/test/scala/dev/kamu/cli/KamuTestAdapter.scala b/core.coordinator/src/test/scala/dev/kamu/cli/KamuTestAdapter.scala index 05173ee5fc..cd6d9617d3 100644 --- a/core.coordinator/src/test/scala/dev/kamu/cli/KamuTestAdapter.scala +++ b/core.coordinator/src/test/scala/dev/kamu/cli/KamuTestAdapter.scala @@ -10,12 +10,13 @@ package dev.kamu.cli import java.io.{ByteArrayOutputStream, PrintStream, PrintWriter} import java.nio.charset.StandardCharsets +import java.nio.file.Path +import better.files.File import dev.kamu.cli.output._ import dev.kamu.core.utils.fs._ import dev.kamu.core.manifests.{DatasetID, DatasetSnapshot} import dev.kamu.core.utils.ManualClock -import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.{DataFrame, SparkSession} import scala.util.Random @@ -39,10 +40,9 @@ case class CommandResult( class KamuTestAdapter( val config: KamuConfig, // config should be public for tests to access workspaceRoot - fileSystem: FileSystem, spark: SparkSession, val systemClock: ManualClock -) extends Kamu(config, fileSystem, systemClock) { +) extends Kamu(config, systemClock) { val _captureFormatter = new CaptureOutputFormatter val _captureOutput = new ByteArrayOutputStream() @@ -78,8 +78,8 @@ class KamuTestAdapter( metadataRepository.addDataset(ds) val volume = metadataRepository.getLocalVolume() - if (!fileSystem.exists(volume.dataDir)) - fileSystem.mkdirs(volume.dataDir) + if (!File(volume.dataDir).exists) + File(volume.dataDir).createDirectories() df.write.parquet( volume.dataDir.resolve(ds.id.toString).toUri.getPath @@ -93,7 +93,7 @@ class KamuTestAdapter( def writeData(content: String, name: String): Path = { val path = config.workspaceRoot.resolve(name) - val writer = new PrintWriter(fileSystem.create(path)) + val writer = new PrintWriter(File(path).newOutputStream) writer.write(content) writer.close() @@ -109,11 +109,11 @@ class KamuTestAdapter( .option("header", "true") .csv(path.toUri.getPath) - fileSystem - .listStatus(path) - .filter(_.getPath.getName.startsWith("part")) + File(path).list + .filter(_.name.startsWith("part")) + .toSeq + .map(_.path) .head - .getPath } def readDataset(id: DatasetID): DataFrame = { diff --git a/core.coordinator/src/test/scala/dev/kamu/cli/KamuTestBase.scala b/core.coordinator/src/test/scala/dev/kamu/cli/KamuTestBase.scala index 6b0b73473a..63379dafbf 100644 --- a/core.coordinator/src/test/scala/dev/kamu/cli/KamuTestBase.scala +++ b/core.coordinator/src/test/scala/dev/kamu/cli/KamuTestBase.scala @@ -9,22 +9,18 @@ package dev.kamu.cli import dev.kamu.core.utils.ManualClock -import dev.kamu.core.utils.fs._ +import dev.kamu.core.utils.Temp import dev.kamu.core.utils.test.KamuDataFrameSuite -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.FileSystem import org.scalatest._ trait KamuTestBase extends KamuDataFrameSuite { self: Suite => - val fileSystem = FileSystem.get(new Configuration()) - def withEmptyDir[T](func: KamuTestAdapter => T): T = { - Temp.withRandomTempDir(fileSystem, "kamu-test-") { tempDir => + Temp.withRandomTempDir("kamu-test-") { tempDir => val config = KamuConfig(workspaceRoot = tempDir) val clock = new ManualClock() clock.advance() - val kamu = new KamuTestAdapter(config, fileSystem, spark, clock) + val kamu = new KamuTestAdapter(config, spark, clock) func(kamu) } } diff --git a/core.coordinator/src/test/scala/dev/kamu/cli/MetadataRepositorySpec.scala b/core.coordinator/src/test/scala/dev/kamu/cli/MetadataRepositorySpec.scala index 2c980fecb3..febcdc29cd 100644 --- a/core.coordinator/src/test/scala/dev/kamu/cli/MetadataRepositorySpec.scala +++ b/core.coordinator/src/test/scala/dev/kamu/cli/MetadataRepositorySpec.scala @@ -9,14 +9,15 @@ package dev.kamu.cli import java.net.URI +import java.nio.file.Paths +import better.files.File import pureconfig.generic.auto._ import dev.kamu.core.manifests.parsing.pureconfig.yaml.defaults._ import dev.kamu.cli.metadata.ResourceLoader import dev.kamu.core.manifests.DatasetID +import dev.kamu.core.utils.fs._ import dev.kamu.core.utils.{DockerClient, DockerProcessBuilder, DockerRunArgs} -import org.apache.hadoop.fs.Path -import org.apache.hadoop.fs.Path.SEPARATOR import org.scalatest._ import scala.concurrent.duration._ @@ -73,11 +74,11 @@ class MetadataRepositorySpec extends FunSuite with Matchers with KamuTestBase { withEmptyWorkspace { kamu => val expected = DatasetFactory.newRootDataset() // create a temporary directory with the dataset to host - val serverDir = - new Path(kamu.config.workspaceRoot, "server") - fileSystem.mkdirs(serverDir) - val path: Path = new Path(serverDir, "test-dataset.yaml") - new ResourceLoader(fileSystem).saveResourceToFile(expected, path) + val serverDir = kamu.config.workspaceRoot / "server" + File(serverDir).createDirectories() + + val path = serverDir / "test-dataset.yaml" + new ResourceLoader().saveResourceToFile(expected, path) // start up the server and host the directory val serverPort = 80 // httpd:2.4 default port @@ -85,11 +86,11 @@ class MetadataRepositorySpec extends FunSuite with Matchers with KamuTestBase { val testHttpServerArgs = DockerRunArgs( image = "httpd:2.4", exposePorts = List(serverPort), - volumeMap = Map(serverDir -> new Path("/usr/local/apache2/htdocs")), // httpd:2.4 default location + volumeMap = Map(serverDir -> Paths.get("/usr/local/apache2/htdocs")), // httpd:2.4 default location containerName = Some(testServerName), detached = true ) - val testHttpServer = new DockerClient(fileSystem) + val testHttpServer = new DockerClient() try { val testHttpServerProc = new DockerProcessBuilder( "http", @@ -117,16 +118,11 @@ class MetadataRepositorySpec extends FunSuite with Matchers with KamuTestBase { test(raw"'kamu add' from file") { withEmptyWorkspace { kamu => val expected = DatasetFactory.newRootDataset() - val testDir = - new Path( - // Path(parent, child) throws an exception, while SEPARATOP - // works for names with colons and spaces - s"${kamu.config.workspaceRoot.toString}${SEPARATOR}test: add from file" - ) + val testDir = kamu.config.workspaceRoot / "my folder" - fileSystem.mkdirs(testDir) - val datasetPath: Path = new Path(testDir, "test-dataset.yaml") - new ResourceLoader(fileSystem).saveResourceToFile(expected, datasetPath) + File(testDir).createDirectories() + val datasetPath = testDir / "test-dataset.yaml" + new ResourceLoader().saveResourceToFile(expected, datasetPath) val actual = kamu.metadataRepository.loadDatasetSnapshotFromURI(datasetPath.toUri) diff --git a/core.coordinator/src/test/scala/dev/kamu/cli/ingest/IngestMultiSourceSpec.scala b/core.coordinator/src/test/scala/dev/kamu/cli/ingest/IngestMultiSourceSpec.scala index c0d49e809e..e052c2a599 100644 --- a/core.coordinator/src/test/scala/dev/kamu/cli/ingest/IngestMultiSourceSpec.scala +++ b/core.coordinator/src/test/scala/dev/kamu/cli/ingest/IngestMultiSourceSpec.scala @@ -10,7 +10,6 @@ package dev.kamu.cli.ingest import dev.kamu.cli.{DatasetFactory, KamuTestBase} import dev.kamu.core.manifests._ -import dev.kamu.core.utils.fs._ import org.scalatest._ class IngestMultiSourceSpec extends FlatSpec with Matchers with KamuTestBase { diff --git a/core.manifests b/core.manifests index 14c7935bd3..56938115c0 160000 --- a/core.manifests +++ b/core.manifests @@ -1 +1 @@ -Subproject commit 14c7935bd30ea4f2033578d13e038919ccf67b16 +Subproject commit 56938115c010349a4e689eedbde2324527f793b5 diff --git a/core.utils b/core.utils index 733078597f..af7a18f669 160000 --- a/core.utils +++ b/core.utils @@ -1 +1 @@ -Subproject commit 733078597f2ba6028927747633347a329419f871 +Subproject commit af7a18f66904dab116fe7b053abf4b6572ff67d7