diff --git a/CHANGELOG.md b/CHANGELOG.md index 7c8b90a61b..538e798b20 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.9.0] - 2020-03-08 +### Changed +- Using new metadata chain prototype! + ## [0.8.0] - 2020-01-12 ### Added - Experimental support for remote S3 volumes diff --git a/build.sbt b/build.sbt index 9c4e3d2e6d..c25a0ac5b3 100644 --- a/build.sbt +++ b/build.sbt @@ -48,12 +48,15 @@ lazy val kamuCoreUtils = project libraryDependencies ++= Seq( deps.hadoopCommon, deps.scalaTest % "test", + deps.sparkCore % "provided", + deps.sparkHive % "provided", deps.geoSpark % "test", deps.geoSparkSql % "test", deps.sparkTestingBase % "test", deps.sparkHive % "test" ), - commonSettings + commonSettings, + sparkTestingSettings ) lazy val kamuCoreManifests = project @@ -66,7 +69,8 @@ lazy val kamuCoreManifests = project libraryDependencies ++= Seq( deps.hadoopCommon, deps.pureConfig, - deps.pureConfigYaml + deps.pureConfigYaml, + deps.spire ), commonSettings ) @@ -121,6 +125,7 @@ lazy val versions = new { val scalajHttp = "2.4.1" val spark = "2.4.0" val sparkTestingBase = s"${spark}_0.11.0" + val spire = "0.13.0" // Used by spark too } lazy val deps = @@ -145,6 +150,10 @@ lazy val deps = .exclude("commons-beanutils", "commons-beanutils-core") // SQL Shell val sqlLine = "sqlline" % "sqlline" % "1.8.0" + // Math + // TODO: Using older version as it's also used by Spark + //val spire = "org.typelevel" %% "spire" % versions.spire + val spire = "org.spire-math" %% "spire" % versions.spire // Test val scalaTest = "org.scalatest" %% "scalatest" % "3.0.8" val sparkHive = "org.apache.spark" %% "spark-hive" % versions.spark diff --git a/core.ingest.polling b/core.ingest.polling index ef1cf05909..98522e3855 160000 --- a/core.ingest.polling +++ b/core.ingest.polling @@ -1 +1 @@ -Subproject commit ef1cf05909be5c84c44911ef521911b483c127de +Subproject commit 98522e3855428eea0552a4684897350d002374e9 diff --git a/core.manifests b/core.manifests index 00b8d197c1..ba45b08471 160000 --- a/core.manifests +++ b/core.manifests @@ -1 +1 @@ -Subproject commit 00b8d197c1be0a133d5cc27109b3f167d20480bf +Subproject commit ba45b08471532536e933c684aee24c713db28c9b diff --git a/core.transform.streaming b/core.transform.streaming index 95e214da33..af9a097c72 160000 --- a/core.transform.streaming +++ b/core.transform.streaming @@ -1 +1 @@ -Subproject commit 95e214da33af29b594819e57517ec66c6bdfb536 +Subproject commit af9a097c7234d4ff24ba851a5308d88b88c44ce7 diff --git a/core.utils b/core.utils index a3a94a8fd6..d0087a3d11 160000 --- a/core.utils +++ b/core.utils @@ -1 +1 @@ -Subproject commit a3a94a8fd6f85fb7e1d294fd2b065547d193a52d +Subproject commit d0087a3d1150f7f1fcfb255cf150696df2d52d59 diff --git a/docs/index.md b/docs/index.md new file mode 100644 index 0000000000..4e7162ef86 --- /dev/null +++ b/docs/index.md @@ -0,0 +1,28 @@ +# User Manual + +> This page is under construction + +- Getting Data In and Out + - Root datasets + - Supported polling sources + - Exporting Data +- Collaboration + - Derivative datasets + - Transformation model + - Transactional guarantees +- Dataset Evolution + - Schema Evolution + - Upstream schema changes + - Adding / deprecating columns + - Backwards incompatible changes + - Root Dataset Evolution + - Handling source URL changes + - Handling upstream format changes + - Derivative Dataset Evolution + - Handling upstream changes + - Evolving transformations +- Handling Bad Data + - Corrections and compensations + - Bad data upon ingestion + - Bad data in upstream datasets + - PII and sensitive data diff --git a/src/main/scala/dev/kamu/cli/CliParser.scala b/src/main/scala/dev/kamu/cli/CliParser.scala index cf721d06b0..546de6f366 100644 --- a/src/main/scala/dev/kamu/cli/CliParser.scala +++ b/src/main/scala/dev/kamu/cli/CliParser.scala @@ -260,6 +260,23 @@ class CliArgs(arguments: Seq[String]) extends ScallopConf(arguments) { descr = "Manifest URLs/files containing dataset definitions", default = Some(List.empty) ) + + val remote = new Subcommand("remote") { + banner("Add a reference to a dataset stored in remote volume") + + val volumeID = trailArg[String]( + "volumeID", + required = true, + descr = "IDs of the remote volume" + ) + + val datasetID = trailArg[String]( + "datasetID", + required = true, + descr = "IDs of the dataset" + ) + } + addSubcommand(remote) } addSubcommand(add) diff --git a/src/main/scala/dev/kamu/cli/GenericResourceRepository.scala b/src/main/scala/dev/kamu/cli/GenericResourceRepository.scala index cf19744862..69604ef79a 100644 --- a/src/main/scala/dev/kamu/cli/GenericResourceRepository.scala +++ b/src/main/scala/dev/kamu/cli/GenericResourceRepository.scala @@ -26,8 +26,8 @@ class GenericResourceRepository[TRes <: Resource[TRes]: ClassTag, TID]( idFromString: String => TID, idFromRes: TRes => TID )( - implicit rderivation: Derivation[ConfigReader[Manifest[TRes]]], - wderivation: Derivation[ConfigWriter[Manifest[TRes]]] + implicit readerDerivation: Derivation[ConfigReader[Manifest[TRes]]], + writerDerivation: Derivation[ConfigWriter[Manifest[TRes]]] ) { private val logger = LogManager.getLogger(getClass.getName) diff --git a/src/main/scala/dev/kamu/cli/Kamu.scala b/src/main/scala/dev/kamu/cli/Kamu.scala index 7e889573da..708a458bbc 100644 --- a/src/main/scala/dev/kamu/cli/Kamu.scala +++ b/src/main/scala/dev/kamu/cli/Kamu.scala @@ -13,6 +13,7 @@ import java.io.PrintStream import dev.kamu.cli.commands._ import dev.kamu.cli.external._ import dev.kamu.cli.output._ +import dev.kamu.core.utils.AutoClock import dev.kamu.core.utils.fs._ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.log4j.Level @@ -21,15 +22,17 @@ class Kamu( config: KamuConfig, fileSystem: FileSystem ) { + val systemClock = new AutoClock() + val workspaceLayout = WorkspaceLayout( - metadataRootDir = config.kamuRoot, - datasetsDir = config.kamuRoot.resolve("datasets"), + kamuRootDir = config.kamuRoot, + metadataDir = config.kamuRoot.resolve("datasets"), volumesDir = config.kamuRoot.resolve("volumes"), localVolumeDir = config.localVolume ).toAbsolute(fileSystem) val metadataRepository = - new MetadataRepository(fileSystem, workspaceLayout) + new MetadataRepository(fileSystem, workspaceLayout, systemClock) val volumeOperatorFactory = new VolumeOperatorFactory(fileSystem, workspaceLayout, metadataRepository) @@ -78,6 +81,13 @@ class Kamu( c.add.manifests(), c.add.replace() ) + case List(c.add, c.add.remote) => + new AddRemoteCommand( + fileSystem, + metadataRepository, + c.add.remote.volumeID(), + c.add.remote.datasetID() + ) case List(c.purge) => new PurgeCommand( metadataRepository, @@ -171,7 +181,7 @@ class Kamu( } def ensureWorkspace(): Unit = { - if (!fileSystem.exists(workspaceLayout.metadataRootDir)) + if (!fileSystem.exists(workspaceLayout.kamuRootDir)) throw new UsageException("Not a kamu workspace") } diff --git a/src/main/scala/dev/kamu/cli/KamuApp.scala b/src/main/scala/dev/kamu/cli/KamuApp.scala index db04bd02a6..f3d679645b 100644 --- a/src/main/scala/dev/kamu/cli/KamuApp.scala +++ b/src/main/scala/dev/kamu/cli/KamuApp.scala @@ -9,7 +9,7 @@ package dev.kamu.cli import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.log4j.{Level, LogManager} class UsageException(message: String = "", cause: Throwable = None.orNull) @@ -18,13 +18,17 @@ class UsageException(message: String = "", cause: Throwable = None.orNull) object KamuApp extends App { val logger = LogManager.getLogger(getClass.getName) - val config = KamuConfig() - val fileSystem = FileSystem.get(new Configuration()) FileSystem.enableSymlinks() fileSystem.setWriteChecksum(false) fileSystem.setVerifyChecksum(false) + val config = KamuConfig( + workspaceRoot = KamuConfig + .findWorkspaceRoot(fileSystem, new Path(".")) + .getOrElse(new Path(".")) + ) + try { val cliArgs = new CliArgs(args) diff --git a/src/main/scala/dev/kamu/cli/KamuConfig.scala b/src/main/scala/dev/kamu/cli/KamuConfig.scala index 9a29214eaa..f0416563be 100644 --- a/src/main/scala/dev/kamu/cli/KamuConfig.scala +++ b/src/main/scala/dev/kamu/cli/KamuConfig.scala @@ -8,15 +8,15 @@ package dev.kamu.cli -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileSystem, Path} import dev.kamu.core.utils.fs._ case class KamuConfig( - workspaceRoot: Path = new Path("."), + workspaceRoot: Path, spark: SparkConfig = SparkConfig() ) { def kamuRoot: Path = { - workspaceRoot.resolve(".kamu") + workspaceRoot.resolve(KamuConfig.ROOT_DIR_NAME) } def localVolume: Path = { @@ -24,6 +24,28 @@ case class KamuConfig( } } +object KamuConfig { + val ROOT_DIR_NAME = ".kamu" + + def findWorkspaceRoot(fileSystem: FileSystem, dir: Path): Option[Path] = { + findWorkspaceRootRec(fileSystem, fileSystem.toAbsolute(dir)) + } + + @scala.annotation.tailrec + private def findWorkspaceRootRec( + fileSystem: FileSystem, + dir: Path + ): Option[Path] = { + if (fileSystem.exists(dir.resolve(ROOT_DIR_NAME))) { + Some(dir) + } else if (dir.isRoot) { + None + } else { + findWorkspaceRootRec(fileSystem, dir.getParent) + } + } +} + case class SparkConfig( driverMemory: String = "2g" ) diff --git a/src/main/scala/dev/kamu/cli/MetadataRepository.scala b/src/main/scala/dev/kamu/cli/MetadataRepository.scala index d2baa9ae9c..0e8e08c083 100644 --- a/src/main/scala/dev/kamu/cli/MetadataRepository.scala +++ b/src/main/scala/dev/kamu/cli/MetadataRepository.scala @@ -11,8 +11,12 @@ package dev.kamu.cli import java.io.PrintWriter import dev.kamu.core.manifests.{ - Dataset, DatasetID, + DatasetKind, + DatasetLayout, + DatasetRef, + DatasetSnapshot, + DatasetSummary, Volume, VolumeID, VolumeLayout @@ -22,6 +26,8 @@ import org.apache.hadoop.fs.{FileSystem, Path} import dev.kamu.core.manifests.parsing.pureconfig.yaml import java.net.URI +import dev.kamu.core.manifests.infra.MetadataChainFS +import dev.kamu.core.utils.Clock import yaml.defaults._ import pureconfig.generic.auto._ import dev.kamu.core.utils.fs._ @@ -29,19 +35,11 @@ import org.apache.log4j.LogManager class MetadataRepository( fileSystem: FileSystem, - workspaceLayout: WorkspaceLayout + workspaceLayout: WorkspaceLayout, + systemClock: Clock ) { private val logger = LogManager.getLogger(getClass.getName) - protected val datasetRepo = - new GenericResourceRepository[Dataset, DatasetID]( - fileSystem, - workspaceLayout.datasetsDir, - "dataset", - DatasetID, - (ds: Dataset) => ds.id - ) - protected val volumeRepo = new GenericResourceRepository[Volume, VolumeID]( fileSystem, @@ -55,45 +53,115 @@ class MetadataRepository( // Datasets //////////////////////////////////////////////////////////////////////////// - def getDataset(id: DatasetID): Dataset = { - datasetRepo.getResource(id) + protected def remoteRefFilePath(id: DatasetID): Path = { + workspaceLayout.metadataDir.resolve(id.toString).resolve("remote.yaml") + } + + protected def datasetMetadataDir(id: DatasetID): Path = { + if (isRemote(id)) + getLocalVolume().metadataDir.resolve(id.toString) + else + workspaceLayout.metadataDir.resolve(id.toString) + } + + protected def ensureDatasetExists(id: DatasetID): Unit = { + if (!fileSystem.exists(workspaceLayout.metadataDir.resolve(id.toString))) + throw new DoesNotExistException(id.toString, "dataset") + } + + protected def ensureDatasetExistsAndPulled(id: DatasetID): Unit = { + ensureDatasetExists(id) + if (!fileSystem.exists(datasetMetadataDir(id))) + throw new DoesNotExistException(id.toString, "dataset") + } + + def isRemote(id: DatasetID): Boolean = { + val refPath = remoteRefFilePath(id) + fileSystem.exists(refPath) + } + + def getDatasetLayout(id: DatasetID): DatasetLayout = { + ensureDatasetExists(id) + + val localVolume = getLocalVolume() + + DatasetLayout( + metadataDir = datasetMetadataDir(id), + dataDir = localVolume.dataDir.resolve(id.toString), + checkpointsDir = localVolume.checkpointsDir.resolve(id.toString), + cacheDir = localVolume.cacheDir.resolve(id.toString) + ) + } + + def getMetadataChain(id: DatasetID): MetadataChainFS = { + new MetadataChainFS(fileSystem, getDatasetLayout(id).metadataDir) + } + + def getDatasetKind(id: DatasetID): DatasetKind = { + ensureDatasetExists(id) + if (isRemote(id)) + DatasetKind.Remote + else + getDatasetSummary(id).kind + } + + def getDatasetSummary(id: DatasetID): DatasetSummary = { + ensureDatasetExistsAndPulled(id) + + val chain = new MetadataChainFS(fileSystem, datasetMetadataDir(id)) + chain.getSummary() + } + + def getDatasetVolumeID(id: DatasetID): VolumeID = { + if (!isRemote(id)) + throw new RuntimeException(s"Dataset $id is not remote") + val refFile = remoteRefFilePath(id) + val ref = new ResourceLoader(fileSystem) + .loadResourceFromFile[DatasetRef](refFile) + ref.volumeID + } + + protected def getDatasetDependencies(id: DatasetID): List[DatasetID] = { + if (isRemote(id)) + List.empty + else + getDatasetSummary(id).datasetDependencies.toList } def getDatasetsInDependencyOrder( ids: Seq[DatasetID], recursive: Boolean - ): Seq[Dataset] = { - if (recursive) { - val dependsOn = getDataset(_: DatasetID).dependsOn.toList - val depGraph = new DependencyGraph[DatasetID](dependsOn) - depGraph.resolve(ids.toList).map(getDataset) - } else { - val dependsOn = getDataset(_: DatasetID).dependsOn.toList - .filter(ids.contains) - val depGraph = new DependencyGraph[DatasetID](dependsOn) - depGraph.resolve(ids.toList).map(getDataset) - } + ): Seq[DatasetID] = { + // TODO: Check recursive implemented correctly + val dependsOn = + if (recursive) + getDatasetDependencies _ + else + getDatasetDependencies(_: DatasetID).filter(ids.contains) + + val depGraph = new DependencyGraph[DatasetID](dependsOn) + depGraph.resolve(ids.toList) } - def getAllDatasetIDs(): Seq[DatasetID] = { - datasetRepo.getAllResourceIDs() + def getAllDatasets(): Seq[DatasetID] = { + fileSystem + .listStatus(workspaceLayout.metadataDir) + .map(_.getPath.getName) + .map(DatasetID) } - def getAllDatasets(): Seq[Dataset] = { - datasetRepo.getAllResources() + def loadDatasetSnapshotFromURI(uri: URI): DatasetSnapshot = { + new ResourceLoader(fileSystem).loadResourceFromURI[DatasetSnapshot](uri) } - def loadDatasetFromURI(uri: URI): Dataset = { - datasetRepo.loadResourceFromURI(uri) - } + def addDataset(ds: DatasetSnapshot): Unit = { + val datasetDir = workspaceLayout.metadataDir.resolve(ds.id.toString) - def addDataset(ds: Dataset): Unit = { - if (datasetRepo.getResourceOpt(ds.id).isDefined) + if (fileSystem.exists(datasetDir)) throw new AlreadyExistsException(ds.id.toString, "dataset") try { - ds.dependsOn.map(getDataset) - ds.remoteSource.map(rs => getVolume(rs.volumeID)) + ds.dependsOn.foreach(ensureDatasetExists) } catch { case e: DoesNotExistException => throw new MissingReferenceException( @@ -104,38 +172,47 @@ class MetadataRepository( ) } - datasetRepo.addResource(ds) + val chain = new MetadataChainFS(fileSystem, datasetDir) + chain.init(ds, systemClock.instant()) + } + + def addDatasetReference(id: DatasetID, volumeID: VolumeID): Unit = { + val datasetDir = workspaceLayout.metadataDir.resolve(id.toString) + + if (fileSystem.exists(datasetDir)) + throw new AlreadyExistsException(id.toString, "dataset") + + getVolume(volumeID) + + fileSystem.mkdirs(datasetDir) + new ResourceLoader(fileSystem) + .saveResourceToFile(DatasetRef(volumeID), remoteRefFilePath(id)) } def deleteDataset(id: DatasetID): Unit = { + ensureDatasetExists(id) + // Validate references - val referencedBy = getAllDatasets().filter(_.dependsOn.contains(id)) + val referencedBy = getAllDatasets() + .map(getDatasetSummary) + .filter(_.datasetDependencies.contains(id)) + if (referencedBy.nonEmpty) throw new DanglingReferenceException(referencedBy.map(_.id), id) - getAllDataPaths(id).foreach(p => fileSystem.delete(p, true)) - - datasetRepo.deleteResource(id) + val layout = getDatasetLayout(id) + fileSystem.delete(layout.cacheDir, true) + fileSystem.delete(layout.dataDir, true) + fileSystem.delete(layout.checkpointsDir, true) + fileSystem.delete(layout.metadataDir, true) + fileSystem.delete(workspaceLayout.metadataDir.resolve(id.toString), true) } def purgeDataset(id: DatasetID): Unit = { - getDataset(id) - getAllDataPaths(id).foreach(p => fileSystem.delete(p, true)) // TODO: Purging a dataset that is used by non-empty derivatives should raise an error - } - - def exportDataset(ds: Dataset, path: Path): Unit = { - datasetRepo.saveResourceToFile(ds, path) - } - - protected def getAllDataPaths(id: DatasetID): Seq[Path] = { - val volumeLayout = getLocalVolume() - - Seq( - volumeLayout.dataDir.resolve(id.toString), - volumeLayout.cacheDir.resolve(id.toString), - volumeLayout.checkpointsDir.resolve(id.toString) - ) + val snapshot = getMetadataChain(id).getSnapshot() + deleteDataset(id) + addDataset(snapshot) } //////////////////////////////////////////////////////////////////////////// @@ -153,7 +230,7 @@ class MetadataRepository( } VolumeLayout( - datasetsDir = workspaceLayout.localVolumeDir.resolve("datasets"), + metadataDir = workspaceLayout.localVolumeDir.resolve("datasets"), checkpointsDir = workspaceLayout.localVolumeDir.resolve("checkpoints"), dataDir = workspaceLayout.localVolumeDir.resolve("data"), cacheDir = workspaceLayout.localVolumeDir.resolve("cache") diff --git a/src/main/scala/dev/kamu/cli/ResourceLoader.scala b/src/main/scala/dev/kamu/cli/ResourceLoader.scala new file mode 100644 index 0000000000..d82bfac652 --- /dev/null +++ b/src/main/scala/dev/kamu/cli/ResourceLoader.scala @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2018 kamu.dev + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +package dev.kamu.cli + +/* + * Copyright (c) 2018 kamu.dev + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +import java.net.URI + +import dev.kamu.core.manifests.parsing.pureconfig.yaml +import yaml.defaults._ +import dev.kamu.core.manifests.{Manifest, Resource} +import dev.kamu.core.utils.fs._ +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.log4j.LogManager +import pureconfig.{ConfigReader, ConfigWriter, Derivation} + +import scala.reflect.ClassTag + +// TODO: Remove this class? +class ResourceLoader( + fileSystem: FileSystem +) { + private val logger = LogManager.getLogger(getClass.getName) + + def loadResourceFromFile[T <: Resource[T]: ClassTag]( + p: Path + )( + implicit reader: Derivation[ConfigReader[Manifest[T]]] + ): T = { + val inputStream = fileSystem.open(p) + try { + yaml.load[Manifest[T]](inputStream).content + } finally { + inputStream.close() + } + } + + def saveResourceToFile[T <: Resource[T]: ClassTag]( + res: T, + path: Path + )( + implicit writer: Derivation[ConfigWriter[Manifest[T]]] + ): Unit = { + val outputStream = fileSystem.create(path) + + try { + yaml.save(res.asManifest, outputStream) + } catch { + case e: Exception => + outputStream.close() + fileSystem.delete(path, false) + throw e + } finally { + outputStream.close() + } + } + + def loadResourceFromURI[T <: Resource[T]: ClassTag]( + uri: URI + )( + implicit reader: Derivation[ConfigReader[Manifest[T]]] + ): T = { + uri.getScheme match { + case "https" => loadResourceFromURL(uri.toURL) + case "http" => loadResourceFromURL(uri.toURL) + case null | "file" => loadResourceFromFile(new Path(uri.getPath)) + case s => throw new SchemaNotSupportedException(s) + } + } + + private def loadResourceFromURL[T <: Resource[T]: ClassTag]( + url: java.net.URL + )( + implicit reader: Derivation[ConfigReader[Manifest[T]]] + ): T = { + val source = scala.io.Source.fromURL(url) + try { + yaml.load[Manifest[T]](source.mkString).content + } finally { + source.close() + } + } +} diff --git a/src/main/scala/dev/kamu/cli/WorkspaceLayout.scala b/src/main/scala/dev/kamu/cli/WorkspaceLayout.scala index 22f2944f08..c397ebb180 100644 --- a/src/main/scala/dev/kamu/cli/WorkspaceLayout.scala +++ b/src/main/scala/dev/kamu/cli/WorkspaceLayout.scala @@ -13,10 +13,10 @@ import dev.kamu.core.utils.fs._ /** Describes the layout of the workspace on disk */ case class WorkspaceLayout( - /** Root directory of the workspace metadata */ - metadataRootDir: Path, - /** Contains dataset definitions */ - datasetsDir: Path, + /** Root directory of the workspace */ + kamuRootDir: Path, + /** Contains dataset metadata */ + metadataDir: Path, /** Contains volume definitions */ volumesDir: Path, /** Root directory of a local storage volume */ @@ -25,8 +25,8 @@ case class WorkspaceLayout( def toAbsolute(fs: FileSystem): WorkspaceLayout = { copy( - metadataRootDir = fs.toAbsolute(metadataRootDir), - datasetsDir = fs.toAbsolute(datasetsDir), + kamuRootDir = fs.toAbsolute(kamuRootDir), + metadataDir = fs.toAbsolute(metadataDir), volumesDir = fs.toAbsolute(volumesDir), localVolumeDir = fs.toAbsolute(localVolumeDir) ) diff --git a/src/main/scala/dev/kamu/cli/commands/AddCommand.scala b/src/main/scala/dev/kamu/cli/commands/AddCommand.scala index a7e91ff310..3d51700eca 100644 --- a/src/main/scala/dev/kamu/cli/commands/AddCommand.scala +++ b/src/main/scala/dev/kamu/cli/commands/AddCommand.scala @@ -14,7 +14,7 @@ import dev.kamu.cli.{ MissingReferenceException, SchemaNotSupportedException } -import dev.kamu.core.manifests.Dataset +import dev.kamu.core.manifests.DatasetSnapshot import org.apache.hadoop.fs.FileSystem import org.apache.log4j.LogManager @@ -31,7 +31,7 @@ class AddCommand( try { manifests.map(manifestURI => { logger.debug(s"Loading dataset from: $manifestURI") - metadataRepository.loadDatasetFromURI(manifestURI) + metadataRepository.loadDatasetSnapshotFromURI(manifestURI) }) } catch { case e: java.io.FileNotFoundException => @@ -44,7 +44,7 @@ class AddCommand( } @scala.annotation.tailrec - def addDataset(ds: Dataset): Boolean = { + def addDataset(ds: DatasetSnapshot): Boolean = { try { metadataRepository.addDataset(ds) true diff --git a/src/main/scala/dev/kamu/cli/commands/AddInteractiveCommand.scala b/src/main/scala/dev/kamu/cli/commands/AddInteractiveCommand.scala index 076b3a70f3..4403d34765 100644 --- a/src/main/scala/dev/kamu/cli/commands/AddInteractiveCommand.scala +++ b/src/main/scala/dev/kamu/cli/commands/AddInteractiveCommand.scala @@ -8,7 +8,10 @@ package dev.kamu.cli.commands -import dev.kamu.cli.{MetadataRepository, UsageException} +import pureconfig.generic.auto._ +import dev.kamu.core.manifests.parsing.pureconfig.yaml +import yaml.defaults._ +import dev.kamu.cli.{MetadataRepository, ResourceLoader} import dev.kamu.core.manifests._ import dev.kamu.core.utils.fs._ import org.apache.hadoop.fs.{FileSystem, Path} @@ -37,15 +40,12 @@ class AddInteractiveCommand( logger.info("Added dataset") } else { val path = new Path("./" + dataset.id + ".yaml") - metadataRepository.exportDataset( - dataset, - path - ) + new ResourceLoader(fileSystem).saveResourceToFile(dataset, path) logger.info(s"Saved dataset to: ${fileSystem.toAbsolute(path)}") } } - def runDatasetWizard(): Dataset = { + def runDatasetWizard(): DatasetSnapshot = { val id = input( "Dataset ID", "Specify the ID of the new dataset.\nIt is recommended that you use dot-separated " + @@ -153,7 +153,7 @@ class AddInteractiveCommand( MergeStrategyKind.Append() } - Dataset( + DatasetSnapshot( id = id, rootPollingSource = Some( RootPollingSource( @@ -166,7 +166,7 @@ class AddInteractiveCommand( ) ) case "derivative" => - Dataset( + DatasetSnapshot( id = id, derivativeSource = Some( DerivativeSource( diff --git a/src/main/scala/dev/kamu/cli/commands/AddRemoteCommand.scala b/src/main/scala/dev/kamu/cli/commands/AddRemoteCommand.scala new file mode 100644 index 0000000000..18c4889ee0 --- /dev/null +++ b/src/main/scala/dev/kamu/cli/commands/AddRemoteCommand.scala @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2018 kamu.dev + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +package dev.kamu.cli.commands + +import dev.kamu.cli.{ + AlreadyExistsException, + MetadataRepository, + MissingReferenceException +} +import dev.kamu.core.manifests.{DatasetID, VolumeID} +import org.apache.hadoop.fs.FileSystem +import org.apache.log4j.LogManager + +class AddRemoteCommand( + fileSystem: FileSystem, + metadataRepository: MetadataRepository, + volumeID: String, + datasetID: String +) extends Command { + private val logger = LogManager.getLogger(getClass.getName) + + def run(): Unit = { + val added = try { + metadataRepository.addDatasetReference( + DatasetID(datasetID), + VolumeID(volumeID) + ) + true + } catch { + case e: AlreadyExistsException => + logger.warn(e.getMessage + " - skipping") + false + case e: MissingReferenceException => + logger.warn(e.getMessage + " - skipping") + false + } + + val numAdded = if (added) 1 else 0 + + logger.info(s"Added $numAdded dataset") + } +} diff --git a/src/main/scala/dev/kamu/cli/commands/CompletionCommand.scala b/src/main/scala/dev/kamu/cli/commands/CompletionCommand.scala index a43f94acbd..4b4ca05ce5 100644 --- a/src/main/scala/dev/kamu/cli/commands/CompletionCommand.scala +++ b/src/main/scala/dev/kamu/cli/commands/CompletionCommand.scala @@ -40,11 +40,19 @@ class CompletionCommand( path: Seq[String], builder: Scallop ): String = { - val preable = path match { + val preamble = path match { case Seq() => Seq( "local cur prev words cword", - "_init_completion || return" + "_init_completion || return", + "", + "# Find workspace", + "WORKSPACE_ROOT=`pwd`", + "while : ; do", + " KAMU_ROOT=\"$WORKSPACE_ROOT/.kamu\"", + " [[ \"$WORKSPACE_ROOT\" != \"/\" ]] && [[ ! -d \"$KAMU_ROOT\" ]] || break", + " WORKSPACE_ROOT=`dirname \"$WORKSPACE_ROOT\"`", + "done" ) case _ => Seq.empty @@ -81,9 +89,17 @@ class CompletionCommand( .flatMap(opt => { opt.name match { case "ids" if opt.descr.contains("dataset") => - Seq("options+=`ls .kamu/datasets | sed -e 's/\\.[^.]*$//'`") + Seq( + "if [[ -d \"$KAMU_ROOT\" ]]; then", + " options+=`ls \"$KAMU_ROOT/datasets\"`", + "fi" + ) case "ids" if opt.descr.contains("volume") => - Seq("options+=`ls .kamu/volumes | sed -e 's/\\.[^.]*$//'`") + Seq( + "if [[ -d \"$KAMU_ROOT\" ]]; then", + " options+=`ls \"$KAMU_ROOT/volumes\" | sed -e 's/\\.[^.]*$//'`", + "fi" + ) case _ => Seq.empty } @@ -101,7 +117,7 @@ class CompletionCommand( ) val body = ( - preable.break + preamble.break ++ dispatch.break ++ Seq("options=()") ++ subcommands diff --git a/src/main/scala/dev/kamu/cli/commands/DependencyGraphCommand.scala b/src/main/scala/dev/kamu/cli/commands/DependencyGraphCommand.scala index 0800ce1807..bea232b139 100644 --- a/src/main/scala/dev/kamu/cli/commands/DependencyGraphCommand.scala +++ b/src/main/scala/dev/kamu/cli/commands/DependencyGraphCommand.scala @@ -9,7 +9,7 @@ package dev.kamu.cli.commands import dev.kamu.cli.MetadataRepository -import dev.kamu.core.manifests.{Dataset, DatasetID} +import dev.kamu.core.manifests.{DatasetID, DatasetKind} import org.apache.log4j.LogManager class DependencyGraphCommand( @@ -22,21 +22,23 @@ class DependencyGraphCommand( def quote(id: DatasetID) = "\"" + id.toString + "\"" + // TODO: Not include dependencies of remote datasets? val datasets = metadataRepository .getAllDatasets() + .map(metadataRepository.getDatasetSummary) .sortBy(_.id.toString) datasets.foreach( ds => - ds.dependsOn + ds.datasetDependencies .foreach(d => println(s"${quote(ds.id)} -> ${quote(d)};")) ) datasets.foreach( ds => - if (ds.kind == Dataset.Kind.Root) + if (ds.kind == DatasetKind.Root) println(s"${quote(ds.id)} [style=filled, fillcolor=darkolivegreen1];") - else if (ds.kind == Dataset.Kind.Derivative) + else if (ds.kind == DatasetKind.Derivative) println(s"${quote(ds.id)} [style=filled, fillcolor=lightblue];") ) diff --git a/src/main/scala/dev/kamu/cli/commands/InitCommand.scala b/src/main/scala/dev/kamu/cli/commands/InitCommand.scala index 7ce59e5984..68a0e90ecb 100644 --- a/src/main/scala/dev/kamu/cli/commands/InitCommand.scala +++ b/src/main/scala/dev/kamu/cli/commands/InitCommand.scala @@ -24,14 +24,14 @@ class InitCommand( override def requiresWorkspace: Boolean = false def run(): Unit = { - if (fileSystem.exists(workspaceLayout.metadataRootDir)) + if (fileSystem.exists(workspaceLayout.kamuRootDir)) throw new UsageException("Already a kamu workspace") - fileSystem.mkdirs(workspaceLayout.datasetsDir) + fileSystem.mkdirs(workspaceLayout.metadataDir) fileSystem.mkdirs(workspaceLayout.volumesDir) val outputStream = - fileSystem.create(workspaceLayout.metadataRootDir.resolve(".gitignore")) + fileSystem.create(workspaceLayout.kamuRootDir.resolve(".gitignore")) val writer = new PrintWriter(outputStream) writer.write(WorkspaceLayout.GITIGNORE_CONTENT) diff --git a/src/main/scala/dev/kamu/cli/commands/ListCommand.scala b/src/main/scala/dev/kamu/cli/commands/ListCommand.scala index 8256ebd61b..ab7b011ab6 100644 --- a/src/main/scala/dev/kamu/cli/commands/ListCommand.scala +++ b/src/main/scala/dev/kamu/cli/commands/ListCommand.scala @@ -27,7 +27,7 @@ class ListCommand( metadataRepository .getAllDatasets() .map( - ds => (ds.id, ds.kind.toString) + id => (id, metadataRepository.getDatasetKind(id).toString) ) .sortBy { case (id, _) => id.toString diff --git a/src/main/scala/dev/kamu/cli/commands/PullCommand.scala b/src/main/scala/dev/kamu/cli/commands/PullCommand.scala index 5a6934face..c30a51d6b4 100644 --- a/src/main/scala/dev/kamu/cli/commands/PullCommand.scala +++ b/src/main/scala/dev/kamu/cli/commands/PullCommand.scala @@ -16,8 +16,8 @@ import dev.kamu.cli.{ WorkspaceLayout } import dev.kamu.core.ingest.polling +import dev.kamu.core.manifests.{DatasetID, DatasetKind, ExternalSourceKind} import dev.kamu.core.manifests.parsing.pureconfig.yaml -import dev.kamu.core.manifests.{Dataset, DatasetID, ExternalSourceKind} import dev.kamu.core.transform.streaming import dev.kamu.core.utils.fs._ import org.apache.hadoop.fs.{FileSystem, Path} @@ -39,7 +39,7 @@ class PullCommand( def run(): Unit = { val datasetIDs = { - if (all) metadataRepository.getAllDatasetIDs() + if (all) metadataRepository.getAllDatasets() else ids.map(DatasetID) } @@ -54,29 +54,32 @@ class PullCommand( throw new UsageException(e.getMessage) } - logger.debug(s"Pulling datasets in following order:") - plan.foreach(d => logger.debug(s" ${d.id.toString}")) + logger.debug("Pulling datasets in following order: " + plan.mkString(", ")) val numUpdated = pullBatched(plan) logger.info(s"Updated $numUpdated datasets") } - def pullBatched(plan: Seq[Dataset]): Int = { + def pullBatched(plan: Seq[DatasetID]): Int = { if (plan.nonEmpty) { - val kind = plan.head.kind - val (batch, rest) = plan.span(_.kind == kind) + val withKinds = + plan.map(id => (id, metadataRepository.getDatasetKind(id))) + + val kind = withKinds.head._2 + val (first, rest) = withKinds.span(_._2 == kind) + val batch = first.map(_._1) kind match { - case Dataset.Kind.Root => + case DatasetKind.Root => pullRoot(batch) - case Dataset.Kind.Derivative => + case DatasetKind.Derivative => // TODO: Streaming transform currently doesn't handle dependencies batch.foreach(ds => pullDerivative(Seq(ds))) - case Dataset.Kind.Remote => + case DatasetKind.Remote => pullRemote(batch) } - batch.length + pullBatched(rest) + batch.length + pullBatched(rest.map(_._1)) } else { 0 } @@ -86,15 +89,24 @@ class PullCommand( // Root /////////////////////////////////////////////////////////////////////////////////////// - def pullRoot(batch: Seq[Dataset]): Boolean = { + def pullRoot(batch: Seq[DatasetID]): Boolean = { val datasets = batch.toVector logger.debug( - "Pulling root datasets: " + datasets.map(_.id.toString).mkString(", ") + "Pulling root datasets: " + datasets.map(_.toString).mkString(", ") ) + // TODO: Costly chain traversal + // TODO: Account for missing files val extraMounts = datasets - .map(_.rootPollingSource.get.fetch) + .flatMap(id => { + val metaChain = metadataRepository.getMetadataChain(id) + + metaChain + .getBlocks() + .flatMap(_.rootPollingSource) + .map(_.fetch) + }) .flatMap({ case furl: ExternalSourceKind.FetchUrl => furl.url.getScheme match { @@ -107,14 +119,10 @@ class PullCommand( val pollConfig = polling.AppConf( tasks = datasets - .map(ds => { - val volumeLayout = metadataRepository.getLocalVolume() + .map(id => { polling.IngestTask( - datasetToIngest = ds, - checkpointsPath = - volumeLayout.checkpointsDir.resolve(ds.id.toString), - pollCachePath = volumeLayout.cacheDir.resolve(ds.id.toString), - dataPath = volumeLayout.dataDir.resolve(ds.id.toString) + datasetToIngest = id, + datasetLayout = metadataRepository.getDatasetLayout(id) ) }) ) @@ -130,7 +138,7 @@ class PullCommand( logger.debug( s"Successfully pulled root datasets: " + datasets - .map(_.id.toString) + .map(_.toString) .mkString(", ") ) @@ -141,28 +149,33 @@ class PullCommand( // Derivative /////////////////////////////////////////////////////////////////////////////////////// - def pullDerivative(batch: Seq[Dataset]): Boolean = { + def pullDerivative(batch: Seq[DatasetID]): Boolean = { val datasets = batch.toVector logger.debug( s"Running transformations for derivative datasets: " + datasets - .map(_.id.toString) + .map(_.toString) .mkString(", ") ) val transformConfig = streaming.AppConfig( - tasks = datasets.map(ds => { - val volumeLayout = metadataRepository.getLocalVolume() + tasks = datasets.map(id => { + val metaChain = metadataRepository.getMetadataChain(id) + + // TODO: Costly chain traversal + val allInputs = metaChain + .getBlocks() + .flatMap(_.derivativeSource) + .flatMap(_.inputs) + .map(_.id) + + val allDatasets = allInputs :+ id streaming.TransformTaskConfig( - datasetToTransform = ds, - inputDataPaths = ds.derivativeSource.get.inputs - .map( - i => i.id.toString -> volumeLayout.dataDir.resolve(i.id.toString) - ) - .toMap, - checkpointsPath = volumeLayout.checkpointsDir.resolve(ds.id.toString), - outputDataPath = volumeLayout.dataDir.resolve(ds.id.toString) + datasetToTransform = id, + datasetLayouts = allDatasets + .map(i => (i.toString, metadataRepository.getDatasetLayout(i))) + .toMap ) }) ) @@ -179,7 +192,7 @@ class PullCommand( logger.debug( s"Successfully applied transformations for derivative datasets: " + datasets - .map(_.id.toString) + .map(_.toString) .mkString(", ") ) @@ -190,14 +203,22 @@ class PullCommand( // Remote /////////////////////////////////////////////////////////////////////////////////////// - def pullRemote(batch: Seq[Dataset]): Boolean = { - for (ds <- batch) { - val sourceVolume = metadataRepository - .getVolume(ds.remoteSource.get.volumeID) + def pullRemote(batch: Seq[DatasetID]): Boolean = { + val datasets = batch.toVector + + logger.debug( + "Pulling remote datasets: " + datasets.map(_.toString).mkString(", ") + ) + + for (id <- datasets) { + val sourceVolume = metadataRepository.getVolume( + metadataRepository + .getDatasetVolumeID(id) + ) val volumeOperator = volumeOperatorFactory.buildFor(sourceVolume) - volumeOperator.pull(Seq(ds)) + volumeOperator.pull(Seq(id)) } true } diff --git a/src/main/scala/dev/kamu/cli/commands/PurgeCommand.scala b/src/main/scala/dev/kamu/cli/commands/PurgeCommand.scala index 15086b1f6b..bc74b960a4 100644 --- a/src/main/scala/dev/kamu/cli/commands/PurgeCommand.scala +++ b/src/main/scala/dev/kamu/cli/commands/PurgeCommand.scala @@ -22,7 +22,7 @@ class PurgeCommand( override def run(): Unit = { val toPurge = if (all) - metadataRepository.getAllDatasetIDs() + metadataRepository.getAllDatasets() else ids.map(DatasetID) diff --git a/src/main/scala/dev/kamu/cli/commands/PushCommand.scala b/src/main/scala/dev/kamu/cli/commands/PushCommand.scala index 5376e148b7..1f46b85569 100644 --- a/src/main/scala/dev/kamu/cli/commands/PushCommand.scala +++ b/src/main/scala/dev/kamu/cli/commands/PushCommand.scala @@ -15,7 +15,7 @@ import dev.kamu.cli.{ UsageException, WorkspaceLayout } -import dev.kamu.core.manifests.{Dataset, DatasetID, Volume, VolumeID} +import dev.kamu.core.manifests.{DatasetID, Volume, VolumeID} import org.apache.hadoop.fs.FileSystem import org.apache.log4j.LogManager @@ -40,7 +40,7 @@ class PushCommand( } val datasetIDs = { - if (all) metadataRepository.getAllDatasetIDs() + if (all) metadataRepository.getAllDatasets() else ids.map(DatasetID) } @@ -61,7 +61,7 @@ class PushCommand( logger.info(s"Pushed $numPushed dataset(s)") } - def pushToVolume(datasets: Seq[Dataset], volume: Volume): Int = { + def pushToVolume(datasets: Seq[DatasetID], volume: Volume): Int = { val volumeOperator = volumeOperatorFactory.buildFor(volume) volumeOperator.push(datasets) datasets.length diff --git a/src/main/scala/dev/kamu/cli/external/SparkRunnerDocker.scala b/src/main/scala/dev/kamu/cli/external/SparkRunnerDocker.scala index a5d5f02e13..b3978505ac 100644 --- a/src/main/scala/dev/kamu/cli/external/SparkRunnerDocker.scala +++ b/src/main/scala/dev/kamu/cli/external/SparkRunnerDocker.scala @@ -42,7 +42,7 @@ class SparkRunnerDocker( .toMap val workspaceVolumes = - Seq(workspaceLayout.metadataRootDir, workspaceLayout.localVolumeDir) + Seq(workspaceLayout.kamuRootDir, workspaceLayout.localVolumeDir) .filter(fileSystem.exists) .map(p => (p, p)) .toMap diff --git a/src/main/scala/dev/kamu/cli/external/VolumeOperator.scala b/src/main/scala/dev/kamu/cli/external/VolumeOperator.scala index 83f1bcfb91..dff6faf094 100644 --- a/src/main/scala/dev/kamu/cli/external/VolumeOperator.scala +++ b/src/main/scala/dev/kamu/cli/external/VolumeOperator.scala @@ -9,13 +9,13 @@ package dev.kamu.cli.external import dev.kamu.cli.{MetadataRepository, WorkspaceLayout} -import dev.kamu.core.manifests.{Dataset, Volume} +import dev.kamu.core.manifests.{DatasetID, Volume} import org.apache.hadoop.fs.FileSystem trait VolumeOperator { - def push(datasets: Seq[Dataset]) + def push(datasets: Seq[DatasetID]) - def pull(datasets: Seq[Dataset]) + def pull(datasets: Seq[DatasetID]) } class VolumeOperatorFactory( @@ -38,7 +38,6 @@ class VolumeOperatorFactory( case "s3" => new VolumeOperatorS3Cli( fileSystem, - workspaceLayout, metadataRepository, volume ) diff --git a/src/main/scala/dev/kamu/cli/external/VolumeOperatorS3Cli.scala b/src/main/scala/dev/kamu/cli/external/VolumeOperatorS3Cli.scala index f9e890bfcb..08aa88f341 100644 --- a/src/main/scala/dev/kamu/cli/external/VolumeOperatorS3Cli.scala +++ b/src/main/scala/dev/kamu/cli/external/VolumeOperatorS3Cli.scala @@ -9,7 +9,7 @@ package dev.kamu.cli.external import dev.kamu.cli.{MetadataRepository, WorkspaceLayout} -import dev.kamu.core.manifests.{Dataset, Volume, VolumeLayout} +import dev.kamu.core.manifests.{DatasetID, Volume, VolumeLayout} import dev.kamu.core.utils.fs._ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.log4j.LogManager @@ -18,57 +18,53 @@ import scala.sys.process.Process class VolumeOperatorS3Cli( fileSystem: FileSystem, - workspaceLayout: WorkspaceLayout, metadataRepository: MetadataRepository, volume: Volume ) extends VolumeOperator { private val logger = LogManager.getLogger(getClass.getName) - override def push(datasets: Seq[Dataset]): Unit = { + override def push(datasets: Seq[DatasetID]): Unit = { withVolumeLayout(datasets) { localDir => s3Sync(localDir, new Path(volume.url)) } } - override def pull(datasets: Seq[Dataset]): Unit = { - val localVolumeLayout = metadataRepository.getLocalVolume() - + override def pull(datasets: Seq[DatasetID]): Unit = { val remoteVolumeLayout = VolumeLayout( - datasetsDir = new Path("datasets"), + metadataDir = new Path("datasets"), checkpointsDir = new Path("checkpoints"), dataDir = new Path("data"), cacheDir = new Path("cache") ) - for (ds <- datasets) { + for (id <- datasets) { + // TODO: Do one sync instead since volume layouts should match val sourceLayout = remoteVolumeLayout.relativeTo(new Path(volume.url)) + val destLayout = metadataRepository.getDatasetLayout(id) - s3Copy( - sourceLayout.datasetsDir.resolve(ds.id.toString + ".yaml"), - localVolumeLayout.datasetsDir.resolve(ds.id.toString + ".yaml") + s3Sync( + sourceLayout.metadataDir.resolve(id.toString), + destLayout.metadataDir ) s3Sync( - sourceLayout.checkpointsDir.resolve(ds.id.toString), - localVolumeLayout.checkpointsDir.resolve(ds.id.toString) + sourceLayout.checkpointsDir.resolve(id.toString), + destLayout.checkpointsDir ) s3Sync( - sourceLayout.dataDir.resolve(ds.id.toString), - localVolumeLayout.dataDir.resolve(ds.id.toString) + sourceLayout.dataDir.resolve(id.toString), + destLayout.dataDir ) } } protected def withVolumeLayout[T]( - datasets: Seq[Dataset] + datasets: Seq[DatasetID] )(func: Path => T): T = { Temp.withRandomTempDir(fileSystem, "kamu-volume-") { tempDir => - val localVolumeLayout = - metadataRepository.getLocalVolume().toAbsolute(fileSystem) - val targetLayout = VolumeLayout( - datasetsDir = new Path("datasets"), + metadataDir = new Path("datasets"), checkpointsDir = new Path("checkpoints"), dataDir = new Path("data"), cacheDir = new Path("cache") @@ -76,30 +72,24 @@ class VolumeOperatorS3Cli( targetLayout.allDirs.foreach(fileSystem.mkdirs) - for (ds <- datasets) { + for (id <- datasets) { + val datasetLayout = metadataRepository.getDatasetLayout(id) + fileSystem.createSymlink( - localVolumeLayout.dataDir.resolve(ds.id.toString), - targetLayout.dataDir.resolve(ds.id.toString), + datasetLayout.dataDir, + targetLayout.dataDir.resolve(id.toString), false ) fileSystem.createSymlink( - localVolumeLayout.checkpointsDir.resolve(ds.id.toString), - targetLayout.checkpointsDir.resolve(ds.id.toString), + datasetLayout.checkpointsDir, + targetLayout.checkpointsDir.resolve(id.toString), false ) - val dsManifestPath = - if (fileSystem.exists( - localVolumeLayout.datasetsDir.resolve(ds.id.toString + ".yaml") - )) - localVolumeLayout.datasetsDir.resolve(ds.id.toString + ".yaml") - else - workspaceLayout.datasetsDir.resolve(ds.id.toString + ".yaml") - fileSystem.createSymlink( - dsManifestPath, - targetLayout.datasetsDir.resolve(ds.id.toString + ".yaml"), + datasetLayout.metadataDir, + targetLayout.metadataDir.resolve(id.toString), false ) } @@ -108,7 +98,7 @@ class VolumeOperatorS3Cli( } } - def s3Sync(from: Path, to: Path) = { + protected def s3Sync(from: Path, to: Path): Unit = { command( Array( "aws", @@ -120,7 +110,7 @@ class VolumeOperatorS3Cli( ) } - def s3Copy(from: Path, to: Path) = { + protected def s3Copy(from: Path, to: Path): Unit = { command( Array( "aws", @@ -132,7 +122,7 @@ class VolumeOperatorS3Cli( ) } - def command(cmd: Seq[String]) = { + protected def command(cmd: Seq[String]): Unit = { logger.debug(s"Executing command: ${cmd.mkString(" ")}") if (Process(cmd).! != 0) throw new Exception(s"Command failed: ${cmd.mkString(" ")}") diff --git a/src/test/scala/dev/kamu/cli/DatasetFactory.scala b/src/test/scala/dev/kamu/cli/DatasetFactory.scala index 363f445224..c142a21076 100644 --- a/src/test/scala/dev/kamu/cli/DatasetFactory.scala +++ b/src/test/scala/dev/kamu/cli/DatasetFactory.scala @@ -56,9 +56,9 @@ object DatasetFactory { header: Boolean = false, mergeStrategy: Option[MergeStrategyKind] = None, schema: Seq[String] = Seq.empty - ): Dataset = { + ): DatasetSnapshot = { val _id = id.getOrElse(newDatasetID()) - Dataset( + DatasetSnapshot( id = _id, rootPollingSource = Some( RootPollingSource( @@ -78,9 +78,9 @@ object DatasetFactory { source: DatasetID, id: Option[DatasetID] = None, sql: Option[String] = None - ): Dataset = { + ): DatasetSnapshot = { val _id = id.getOrElse(newDatasetID()) - Dataset( + DatasetSnapshot( id = _id, derivativeSource = Some( DerivativeSource( @@ -90,8 +90,7 @@ object DatasetFactory { ) ), steps = Vector( - ProcessingStepSQL( - view = _id.toString, + ProcessingStepKind.SparkSQL( query = sql.getOrElse(s"SELECT * FROM `$source`") ) ) diff --git a/src/test/scala/dev/kamu/cli/KamuTestAdapter.scala b/src/test/scala/dev/kamu/cli/KamuTestAdapter.scala index aef1ae1571..0e7a3181a6 100644 --- a/src/test/scala/dev/kamu/cli/KamuTestAdapter.scala +++ b/src/test/scala/dev/kamu/cli/KamuTestAdapter.scala @@ -13,7 +13,7 @@ import java.nio.charset.StandardCharsets import dev.kamu.cli.output._ import dev.kamu.core.utils.fs._ -import dev.kamu.core.manifests.{Dataset, DatasetID} +import dev.kamu.core.manifests.{DatasetSnapshot, DatasetID} import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.{DataFrame, SparkSession} @@ -73,11 +73,11 @@ class KamuTestAdapter( res } - def addDataset(ds: Dataset): Unit = { + def addDataset(ds: DatasetSnapshot): Unit = { metadataRepository.addDataset(ds) } - def addDataset(ds: Dataset, df: DataFrame): Unit = { + def addDataset(ds: DatasetSnapshot, df: DataFrame): Unit = { metadataRepository.addDataset(ds) val volume = metadataRepository.getLocalVolume() diff --git a/src/test/scala/dev/kamu/cli/MetadataRepositorySpec.scala b/src/test/scala/dev/kamu/cli/MetadataRepositorySpec.scala index 001cb682a2..11e720605a 100644 --- a/src/test/scala/dev/kamu/cli/MetadataRepositorySpec.scala +++ b/src/test/scala/dev/kamu/cli/MetadataRepositorySpec.scala @@ -10,6 +10,8 @@ package dev.kamu.cli import java.net.URI +import pureconfig.generic.auto._ +import dev.kamu.core.manifests.parsing.pureconfig.yaml.defaults._ import dev.kamu.cli.external.{DockerClient, DockerProcessBuilder, DockerRunArgs} import dev.kamu.core.manifests.DatasetID import org.apache.hadoop.fs.Path @@ -45,7 +47,6 @@ class MetadataRepositorySpec extends FunSuite with Matchers with KamuTestBase { Seq(dsA.id, dsB.id, dsC.id), recursive = false ) - .map(_.id) actual1 shouldEqual Seq(dsC.id, dsB.id, dsA.id) @@ -54,7 +55,6 @@ class MetadataRepositorySpec extends FunSuite with Matchers with KamuTestBase { Seq(dsA.id, dsB.id), recursive = false ) - .map(_.id) actual2 shouldEqual Seq(dsB.id, dsA.id) @@ -63,7 +63,6 @@ class MetadataRepositorySpec extends FunSuite with Matchers with KamuTestBase { Seq(dsA.id), recursive = true ) - .map(_.id) actual3 shouldEqual Seq(dsC.id, dsB.id, dsA.id) } @@ -77,7 +76,7 @@ class MetadataRepositorySpec extends FunSuite with Matchers with KamuTestBase { new Path(kamu.config.workspaceRoot, "server") fileSystem.mkdirs(serverDir) val path: Path = new Path(serverDir, "test-dataset.yaml") - kamu.metadataRepository.exportDataset(expected, path) + new ResourceLoader(fileSystem).saveResourceToFile(expected, path) // start up the server and host the directory val serverPort = 80 // httpd:2.4 default port @@ -102,7 +101,7 @@ class MetadataRepositorySpec extends FunSuite with Matchers with KamuTestBase { // pull the dataset from the server val actual = - kamu.metadataRepository.loadDatasetFromURI( + kamu.metadataRepository.loadDatasetSnapshotFromURI( new URI(s"http://localhost:${hostPort}/test-dataset.yaml") ) @@ -126,9 +125,10 @@ class MetadataRepositorySpec extends FunSuite with Matchers with KamuTestBase { fileSystem.mkdirs(testDir) val datasetPath: Path = new Path(testDir, "test-dataset.yaml") - kamu.metadataRepository.exportDataset(expected, datasetPath) + new ResourceLoader(fileSystem).saveResourceToFile(expected, datasetPath) - val actual = kamu.metadataRepository.loadDatasetFromURI(datasetPath.toUri) + val actual = + kamu.metadataRepository.loadDatasetSnapshotFromURI(datasetPath.toUri) actual shouldEqual expected }