Skip to content

Commit

Permalink
Remove use of Hadoop FS
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiimk committed Jun 28, 2020
1 parent 4f6d87f commit 1180b12
Show file tree
Hide file tree
Showing 57 changed files with 352 additions and 499 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.19.0] - 2020-06-28
### Changed
- Improving Windows support by removing Hadoop FS dependencies

## [0.18.2] - 2020-06-26
### Changed
- Upgraded Flink engine to `0.3.3`
Expand Down
25 changes: 21 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ lazy val kamuCoreCoordinator = project
.enablePlugins(AutomateHeaderPlugin)
.settings(
libraryDependencies ++= Seq(
deps.jcabiLog,
//deps.apacheCommonsCompress,
deps.betterFiles,
deps.log4jApi,
deps.log4jCore,
deps.log4jBridge,
deps.scallop,
deps.hadoopCommon,
deps.sqlLine,
Expand All @@ -56,7 +60,8 @@ lazy val kamuCoreUtils = project
.enablePlugins(AutomateHeaderPlugin)
.settings(
libraryDependencies ++= Seq(
deps.hadoopCommon,
deps.betterFiles,
deps.log4jApi,
deps.scalaTest % "test",
deps.sparkCore % "provided",
deps.sparkHive % "provided",
Expand All @@ -77,7 +82,7 @@ lazy val kamuCoreManifests = project
.enablePlugins(AutomateHeaderPlugin)
.settings(
libraryDependencies ++= Seq(
deps.hadoopCommon,
deps.betterFiles,
deps.pureConfig,
deps.pureConfigYaml,
deps.spire
Expand All @@ -90,11 +95,14 @@ lazy val kamuCoreManifests = project
//////////////////////////////////////////////////////////////////////////////

lazy val versions = new {
val apacheCommonsCompress = "1.20"
val betterFiles = "3.9.1"
val geoSpark = "1.2.0"
val hadoopCommon = "2.6.5"
val json4sJackson = "3.5.3"
val jacksonCore = "2.6.7"
val jacksonDatabind = "2.6.7.1"
val log4j = "2.13.3"
val pureConfig = "0.11.1"
val scalajHttp = "2.4.1"
val spark = "2.4.0"
Expand All @@ -104,8 +112,14 @@ lazy val versions = new {

lazy val deps =
new {
val jcabiLog = "com.jcabi" % "jcabi-log" % "0.17.3"
val log4jApi = "org.apache.logging.log4j" % "log4j-api" % versions.log4j
val log4jCore = "org.apache.logging.log4j" % "log4j-core" % versions.log4j
val log4jBridge = "org.apache.logging.log4j" % "log4j-1.2-api" % versions.log4j
//val jcabiLog = "com.jcabi" % "jcabi-log" % "0.17.3"
val scallop = "org.rogach" %% "scallop" % "3.3.1"
// File system
val betterFiles = "com.github.pathikrit" %% "better-files" % versions.betterFiles
val apacheCommonsCompress = "org.apache.commons" % "commons-compress" % versions.apacheCommonsCompress
// Configs
val pureConfig = "com.github.pureconfig" %% "pureconfig" % versions.pureConfig
val pureConfigYaml = "com.github.pureconfig" %% "pureconfig-yaml" % versions.pureConfig
Expand All @@ -126,6 +140,7 @@ lazy val deps =
// Hadoop File System
val hadoopCommon =
("org.apache.hadoop" % "hadoop-common" % versions.hadoopCommon)
.exclude("log4j", "log4j")
.exclude("commons-beanutils", "commons-beanutils")
.exclude("commons-beanutils", "commons-beanutils-core")
// SQL Shell
Expand Down Expand Up @@ -176,6 +191,8 @@ lazy val assemblySettings = Seq(
case PathList("org", "xerial", xs @ _*) => MergeStrategy.last
case PathList("com", "thoughtworks", "paranamer", xs @ _*) =>
MergeStrategy.last
case PathList(ps @ _*) if ps.last == "Log4j2Plugins.dat" =>
MergeStrategy.discard
// end insanity
case "overview.html" => MergeStrategy.discard
case "plugin.xml" => MergeStrategy.discard
Expand Down
44 changes: 0 additions & 44 deletions core.coordinator/src/main/resources/log4j.xml

This file was deleted.

22 changes: 22 additions & 0 deletions core.coordinator/src/main/resources/log4j2.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<?xml version="1.0" encoding="UTF-8" ?>
<Configuration>
<Appenders>
<Console name="console-generic" target="SYSTEM_ERR">
<PatternLayout pattern="[%highlight{%level}{FATAL=bg_red, ERROR=red, WARN=yellow, INFO=green, DEBUG=blue}] %c - %msg%n" />
</Console>
<Console name="console-cli" target="SYSTEM_ERR">
<PatternLayout pattern="[%highlight{%level}{FATAL=bg_red, ERROR=red, WARN=yellow, INFO=green, DEBUG=blue}] %msg%n" />
</Console>
</Appenders>

<Loggers>
<Logger name="org.apache.hive.jdbc" level="warn"/>
<Logger name="dev.kamu.cli" level="info" additivity="false">
<AppenderRef ref="console-cli"/>
</Logger>
<Root level="info">
<AppenderRef ref="console-generic"/>
</Root>
</Loggers>

</Configuration>
6 changes: 2 additions & 4 deletions core.coordinator/src/main/scala/dev/kamu/cli/CliParser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@

package dev.kamu.cli

import java.nio.file.Path
import java.time.Instant

import dev.kamu.cli.output.OutputFormat
import org.apache.hadoop.fs.Path
import org.apache.log4j.Level
import org.apache.logging.log4j.Level
import org.rogach.scallop._

import scala.concurrent.duration.Duration
Expand Down Expand Up @@ -156,8 +156,6 @@ class CliArgs(arguments: Seq[String]) extends ScallopConf(arguments) {
)

implicit val _logLevelConverter = singleArgConverter[Level](Level.toLevel)
implicit val _pathConverter = singleArgConverter[Path](s => new Path(s))
implicit val _pathListConverter = listArgConverter[Path](s => new Path(s))
val _envVarConverter = new ValueConverter[Map[String, String]] {
def resolveEnvVar(s: String): (String, String) = {
if (s.indexOf("=") < 0)
Expand Down
34 changes: 12 additions & 22 deletions core.coordinator/src/main/scala/dev/kamu/cli/Kamu.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,34 +10,33 @@ package dev.kamu.cli

import java.io.PrintStream

import better.files.File
import dev.kamu.cli.commands._
import dev.kamu.cli.transform.{EngineFactory, TransformService}
import dev.kamu.cli.external._
import dev.kamu.cli.ingest.IngestService
import dev.kamu.cli.metadata.MetadataRepository
import dev.kamu.cli.output._
import dev.kamu.core.utils.{Clock, DockerClient}
import dev.kamu.core.utils.fs._
import org.apache.hadoop.fs.FileSystem
import org.apache.log4j.Level
import dev.kamu.core.utils.{Clock, DockerClient}
import org.apache.logging.log4j.Level

class Kamu(
config: KamuConfig,
fileSystem: FileSystem,
systemClock: Clock
) {
val workspaceLayout = WorkspaceLayout(
kamuRootDir = config.kamuRoot,
metadataDir = config.kamuRoot.resolve("datasets"),
remotesDir = config.kamuRoot.resolve("remotes"),
metadataDir = config.kamuRoot / "datasets",
remotesDir = config.kamuRoot / "remotes",
localVolumeDir = config.localVolume
).toAbsolute(fileSystem)
).toAbsolute

val metadataRepository =
new MetadataRepository(fileSystem, workspaceLayout, systemClock)
new MetadataRepository(workspaceLayout, systemClock)

val remoteOperatorFactory =
new RemoteOperatorFactory(fileSystem, workspaceLayout, metadataRepository)
new RemoteOperatorFactory(workspaceLayout, metadataRepository)

def run(cliArgs: CliArgs): Unit = {
val command = getCommand(cliArgs)
Expand All @@ -61,10 +60,7 @@ class Kamu(
if (c.init.pullImages())
new PullImagesCommand(getDockerClient())
else
new InitCommand(
fileSystem,
workspaceLayout
)
new InitCommand(workspaceLayout)
case List(c.list) =>
new ListCommand(
metadataRepository,
Expand All @@ -73,12 +69,10 @@ class Kamu(
case List(c.add) =>
if (c.add.interactive())
new AddInteractiveCommand(
fileSystem,
metadataRepository
)
else
new AddCommand(
fileSystem,
metadataRepository,
c.add.manifests(),
c.add.replace()
Expand All @@ -102,14 +96,12 @@ class Kamu(
)
new PullCommand(
new IngestService(
fileSystem,
workspaceLayout,
metadataRepository,
engineFactory,
systemClock
),
new TransformService(
fileSystem,
metadataRepository,
systemClock,
engineFactory
Expand All @@ -135,7 +127,6 @@ class Kamu(
)
case List(c.push) =>
new PushCommand(
fileSystem,
workspaceLayout,
metadataRepository,
remoteOperatorFactory,
Expand Down Expand Up @@ -179,7 +170,6 @@ class Kamu(
)
case List(c.notebook) =>
new NotebookCommand(
fileSystem,
metadataRepository,
getDockerClient(),
c.notebook.env()
Expand All @@ -201,16 +191,16 @@ class Kamu(
}

protected def ensureWorkspace(): Unit = {
if (!fileSystem.exists(workspaceLayout.kamuRootDir))
if (!File(workspaceLayout.kamuRootDir).isDirectory)
throw new UsageException("Not a kamu workspace")
}

protected def getDockerClient(): DockerClient = {
new DockerClient(fileSystem)
new DockerClient()
}

protected def getEngineFactory(logLevel: Level): EngineFactory = {
new EngineFactory(fileSystem, workspaceLayout, logLevel)
new EngineFactory(workspaceLayout, logLevel)
}

protected def getOutputStream(): PrintStream = {
Expand Down
25 changes: 11 additions & 14 deletions core.coordinator/src/main/scala/dev/kamu/cli/KamuApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@

package dev.kamu.cli

import java.nio.file.Paths

import dev.kamu.core.utils.AutoClock
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.log4j.{Level, LogManager}
import org.apache.logging.log4j.core.config.Configurator
import org.apache.logging.log4j.{Level, LogManager}

class UsageException(message: String = "", cause: Throwable = None.orNull)
extends RuntimeException(message, cause)
Expand All @@ -21,25 +22,21 @@ object KamuApp extends App {

val systemClock = new AutoClock()

val fileSystem = FileSystem.get(new Configuration())
FileSystem.enableSymlinks()
fileSystem.setWriteChecksum(false)
fileSystem.setVerifyChecksum(false)

val config = KamuConfig(
workspaceRoot = KamuConfig
.findWorkspaceRoot(fileSystem, new Path("."))
.getOrElse(new Path("."))
.findWorkspaceRoot(Paths.get(""))
.getOrElse(Paths.get(""))
)

try {
val cliArgs = new CliArgs(args)

LogManager
.getLogger(getClass.getPackage.getName)
.setLevel(if (cliArgs.debug()) Level.ALL else cliArgs.logLevel())
Configurator.setLevel(
getClass.getPackage.getName,
if (cliArgs.debug()) Level.ALL else cliArgs.logLevel()
)

new Kamu(config, fileSystem, systemClock)
new Kamu(config, systemClock)
.run(cliArgs)
} catch {
case e: UsageException =>
Expand Down
22 changes: 10 additions & 12 deletions core.coordinator/src/main/scala/dev/kamu/cli/KamuConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@

package dev.kamu.cli

import org.apache.hadoop.fs.{FileSystem, Path}
import dev.kamu.core.utils.fs._
import java.nio.file.Path

import better.files.File

case class KamuConfig(
workspaceRoot: Path
Expand All @@ -26,21 +27,18 @@ case class KamuConfig(
object KamuConfig {
val ROOT_DIR_NAME = ".kamu"

def findWorkspaceRoot(fileSystem: FileSystem, dir: Path): Option[Path] = {
findWorkspaceRootRec(fileSystem, fileSystem.toAbsolute(dir))
def findWorkspaceRoot(dir: Path): Option[Path] = {
findWorkspaceRootRec(dir.toAbsolutePath)
}

@scala.annotation.tailrec
private def findWorkspaceRootRec(
fileSystem: FileSystem,
dir: Path
): Option[Path] = {
if (fileSystem.exists(dir.resolve(ROOT_DIR_NAME))) {
Some(dir)
} else if (dir.isRoot) {
private def findWorkspaceRootRec(dir: Path): Option[Path] = {
if (dir == null) {
None
} else if (File(dir.resolve(ROOT_DIR_NAME)).exists) {
Some(dir)
} else {
findWorkspaceRootRec(fileSystem, dir.getParent)
findWorkspaceRootRec(dir.getParent)
}
}
}
Loading

0 comments on commit 1180b12

Please sign in to comment.