Skip to content

Commit

Permalink
Use new metadata chain prototype
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiimk committed Mar 8, 2020
1 parent f23bc6a commit 380993c
Show file tree
Hide file tree
Showing 31 changed files with 549 additions and 208 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.9.0] - 2020-03-08
### Changed
- Using new metadata chain prototype!

## [0.8.0] - 2020-01-12
### Added
- Experimental support for remote S3 volumes
Expand Down
13 changes: 11 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,15 @@ lazy val kamuCoreUtils = project
libraryDependencies ++= Seq(
deps.hadoopCommon,
deps.scalaTest % "test",
deps.sparkCore % "provided",
deps.sparkHive % "provided",
deps.geoSpark % "test",
deps.geoSparkSql % "test",
deps.sparkTestingBase % "test",
deps.sparkHive % "test"
),
commonSettings
commonSettings,
sparkTestingSettings
)

lazy val kamuCoreManifests = project
Expand All @@ -66,7 +69,8 @@ lazy val kamuCoreManifests = project
libraryDependencies ++= Seq(
deps.hadoopCommon,
deps.pureConfig,
deps.pureConfigYaml
deps.pureConfigYaml,
deps.spire
),
commonSettings
)
Expand Down Expand Up @@ -121,6 +125,7 @@ lazy val versions = new {
val scalajHttp = "2.4.1"
val spark = "2.4.0"
val sparkTestingBase = s"${spark}_0.11.0"
val spire = "0.13.0" // Used by spark too
}

lazy val deps =
Expand All @@ -145,6 +150,10 @@ lazy val deps =
.exclude("commons-beanutils", "commons-beanutils-core")
// SQL Shell
val sqlLine = "sqlline" % "sqlline" % "1.8.0"
// Math
// TODO: Using older version as it's also used by Spark
//val spire = "org.typelevel" %% "spire" % versions.spire
val spire = "org.spire-math" %% "spire" % versions.spire
// Test
val scalaTest = "org.scalatest" %% "scalatest" % "3.0.8"
val sparkHive = "org.apache.spark" %% "spark-hive" % versions.spark
Expand Down
2 changes: 1 addition & 1 deletion core.transform.streaming
28 changes: 28 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# User Manual

> This page is under construction
- Getting Data In and Out
- Root datasets
- Supported polling sources
- Exporting Data
- Collaboration
- Derivative datasets
- Transformation model
- Transactional guarantees
- Dataset Evolution
- Schema Evolution
- Upstream schema changes
- Adding / deprecating columns
- Backwards incompatible changes
- Root Dataset Evolution
- Handling source URL changes
- Handling upstream format changes
- Derivative Dataset Evolution
- Handling upstream changes
- Evolving transformations
- Handling Bad Data
- Corrections and compensations
- Bad data upon ingestion
- Bad data in upstream datasets
- PII and sensitive data
17 changes: 17 additions & 0 deletions src/main/scala/dev/kamu/cli/CliParser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,23 @@ class CliArgs(arguments: Seq[String]) extends ScallopConf(arguments) {
descr = "Manifest URLs/files containing dataset definitions",
default = Some(List.empty)
)

val remote = new Subcommand("remote") {
banner("Add a reference to a dataset stored in remote volume")

val volumeID = trailArg[String](
"volumeID",
required = true,
descr = "IDs of the remote volume"
)

val datasetID = trailArg[String](
"datasetID",
required = true,
descr = "IDs of the dataset"
)
}
addSubcommand(remote)
}
addSubcommand(add)

Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/dev/kamu/cli/GenericResourceRepository.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ class GenericResourceRepository[TRes <: Resource[TRes]: ClassTag, TID](
idFromString: String => TID,
idFromRes: TRes => TID
)(
implicit rderivation: Derivation[ConfigReader[Manifest[TRes]]],
wderivation: Derivation[ConfigWriter[Manifest[TRes]]]
implicit readerDerivation: Derivation[ConfigReader[Manifest[TRes]]],
writerDerivation: Derivation[ConfigWriter[Manifest[TRes]]]
) {
private val logger = LogManager.getLogger(getClass.getName)

Expand Down
18 changes: 14 additions & 4 deletions src/main/scala/dev/kamu/cli/Kamu.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import java.io.PrintStream
import dev.kamu.cli.commands._
import dev.kamu.cli.external._
import dev.kamu.cli.output._
import dev.kamu.core.utils.AutoClock
import dev.kamu.core.utils.fs._
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.log4j.Level
Expand All @@ -21,15 +22,17 @@ class Kamu(
config: KamuConfig,
fileSystem: FileSystem
) {
val systemClock = new AutoClock()

val workspaceLayout = WorkspaceLayout(
metadataRootDir = config.kamuRoot,
datasetsDir = config.kamuRoot.resolve("datasets"),
kamuRootDir = config.kamuRoot,
metadataDir = config.kamuRoot.resolve("datasets"),
volumesDir = config.kamuRoot.resolve("volumes"),
localVolumeDir = config.localVolume
).toAbsolute(fileSystem)

val metadataRepository =
new MetadataRepository(fileSystem, workspaceLayout)
new MetadataRepository(fileSystem, workspaceLayout, systemClock)

val volumeOperatorFactory =
new VolumeOperatorFactory(fileSystem, workspaceLayout, metadataRepository)
Expand Down Expand Up @@ -78,6 +81,13 @@ class Kamu(
c.add.manifests(),
c.add.replace()
)
case List(c.add, c.add.remote) =>
new AddRemoteCommand(
fileSystem,
metadataRepository,
c.add.remote.volumeID(),
c.add.remote.datasetID()
)
case List(c.purge) =>
new PurgeCommand(
metadataRepository,
Expand Down Expand Up @@ -171,7 +181,7 @@ class Kamu(
}

def ensureWorkspace(): Unit = {
if (!fileSystem.exists(workspaceLayout.metadataRootDir))
if (!fileSystem.exists(workspaceLayout.kamuRootDir))
throw new UsageException("Not a kamu workspace")
}

Expand Down
10 changes: 7 additions & 3 deletions src/main/scala/dev/kamu/cli/KamuApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
package dev.kamu.cli

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.log4j.{Level, LogManager}

class UsageException(message: String = "", cause: Throwable = None.orNull)
Expand All @@ -18,13 +18,17 @@ class UsageException(message: String = "", cause: Throwable = None.orNull)
object KamuApp extends App {
val logger = LogManager.getLogger(getClass.getName)

val config = KamuConfig()

val fileSystem = FileSystem.get(new Configuration())
FileSystem.enableSymlinks()
fileSystem.setWriteChecksum(false)
fileSystem.setVerifyChecksum(false)

val config = KamuConfig(
workspaceRoot = KamuConfig
.findWorkspaceRoot(fileSystem, new Path("."))
.getOrElse(new Path("."))
)

try {
val cliArgs = new CliArgs(args)

Expand Down
28 changes: 25 additions & 3 deletions src/main/scala/dev/kamu/cli/KamuConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,44 @@

package dev.kamu.cli

import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{FileSystem, Path}
import dev.kamu.core.utils.fs._

case class KamuConfig(
workspaceRoot: Path = new Path("."),
workspaceRoot: Path,
spark: SparkConfig = SparkConfig()
) {
def kamuRoot: Path = {
workspaceRoot.resolve(".kamu")
workspaceRoot.resolve(KamuConfig.ROOT_DIR_NAME)
}

def localVolume: Path = {
workspaceRoot.resolve(".kamu.local")
}
}

object KamuConfig {
val ROOT_DIR_NAME = ".kamu"

def findWorkspaceRoot(fileSystem: FileSystem, dir: Path): Option[Path] = {
findWorkspaceRootRec(fileSystem, fileSystem.toAbsolute(dir))
}

@scala.annotation.tailrec
private def findWorkspaceRootRec(
fileSystem: FileSystem,
dir: Path
): Option[Path] = {
if (fileSystem.exists(dir.resolve(ROOT_DIR_NAME))) {
Some(dir)
} else if (dir.isRoot) {
None
} else {
findWorkspaceRootRec(fileSystem, dir.getParent)
}
}
}

case class SparkConfig(
driverMemory: String = "2g"
)
Loading

0 comments on commit 380993c

Please sign in to comment.