From f23bc6af7cfff0d2bf58edfcbc6fc48b5a7c71dd Mon Sep 17 00:00:00 2001 From: Sergii Mikhtoniuk Date: Sun, 12 Jan 2020 13:28:17 -0800 Subject: [PATCH] Eperimental support for S3 volumes --- CHANGELOG.md | 12 ++ core.manifests | 2 +- core.utils | 2 +- docs/developer_guide.md | 1 + docs/directory_structure.md | 72 +++++++ src/main/scala/dev/kamu/cli/CliParser.scala | 70 +++++++ .../kamu/cli/GenericResourceRepository.scala | 181 ++++++++++++++++++ src/main/scala/dev/kamu/cli/Kamu.scala | 33 ++++ src/main/scala/dev/kamu/cli/KamuApp.scala | 1 + .../dev/kamu/cli/MetadataRepository.scala | 176 +++++++---------- .../scala/dev/kamu/cli/WorkspaceLayout.scala | 4 +- .../dev/kamu/cli/commands/AddCommand.scala | 2 +- .../cli/commands/AddInteractiveCommand.scala | 2 +- .../kamu/cli/commands/CompletionCommand.scala | 18 +- .../dev/kamu/cli/commands/DeleteCommand.scala | 4 +- .../dev/kamu/cli/commands/InitCommand.scala | 1 + .../dev/kamu/cli/commands/PullCommand.scala | 71 +++++-- .../dev/kamu/cli/commands/PurgeCommand.scala | 4 +- .../dev/kamu/cli/commands/PushCommand.scala | 69 +++++++ .../kamu/cli/commands/VolumeAddCommand.scala | 77 ++++++++ .../cli/commands/VolumeDeleteCommand.scala | 37 ++++ .../kamu/cli/commands/VolumeListCommand.scala | 42 ++++ .../dev/kamu/cli/external/SparkRunner.scala | 7 +- .../kamu/cli/external/VolumeOperator.scala | 51 +++++ .../cli/external/VolumeOperatorS3Cli.scala | 140 ++++++++++++++ .../scala/dev/kamu/cli/KamuTestBase.scala | 24 +-- .../dev/kamu/cli/MetadataRepositorySpec.scala | 11 +- 27 files changed, 942 insertions(+), 172 deletions(-) create mode 100644 docs/directory_structure.md create mode 100644 src/main/scala/dev/kamu/cli/GenericResourceRepository.scala create mode 100644 src/main/scala/dev/kamu/cli/commands/PushCommand.scala create mode 100644 src/main/scala/dev/kamu/cli/commands/VolumeAddCommand.scala create mode 100644 src/main/scala/dev/kamu/cli/commands/VolumeDeleteCommand.scala create mode 100644 src/main/scala/dev/kamu/cli/commands/VolumeListCommand.scala create mode 100644 src/main/scala/dev/kamu/cli/external/VolumeOperator.scala create mode 100644 src/main/scala/dev/kamu/cli/external/VolumeOperatorS3Cli.scala diff --git a/CHANGELOG.md b/CHANGELOG.md index 6815dd6bcd..7c8b90a61b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,18 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.8.0] - 2020-01-12 +### Added +- Experimental support for remote S3 volumes + +## [0.7.1] - 2019-12-29 +### Changed +- Bumped ingest version + +## [0.7.0] - 2019-12-29 +### Changed +- Using snake_case dataset vocabulary + ## [0.6.0] - 2019-12-15 ### Added - Richer set of CSV reader options diff --git a/core.manifests b/core.manifests index 63bc109389..00b8d197c1 160000 --- a/core.manifests +++ b/core.manifests @@ -1 +1 @@ -Subproject commit 63bc109389c888e1f872e91eae7436cde4613782 +Subproject commit 00b8d197c1be0a133d5cc27109b3f167d20480bf diff --git a/core.utils b/core.utils index 48d0f69a42..a3a94a8fd6 160000 --- a/core.utils +++ b/core.utils @@ -1 +1 @@ -Subproject commit 48d0f69a42df0aba5dac0817714dee5b14b2c622 +Subproject commit a3a94a8fd6f85fb7e1d294fd2b065547d193a52d diff --git a/docs/developer_guide.md b/docs/developer_guide.md index a2e86aae86..a828bda07e 100644 --- a/docs/developer_guide.md +++ b/docs/developer_guide.md @@ -16,6 +16,7 @@ What you'll need: * Sbt * Maven * Spark (optional) +* AWS account and configured AWS CLI (optional, needed for S3 volumes) Note: Use [SdkMan!](https://sdkman.io/) to install these dependencies: diff --git a/docs/directory_structure.md b/docs/directory_structure.md new file mode 100644 index 0000000000..5da16372a9 --- /dev/null +++ b/docs/directory_structure.md @@ -0,0 +1,72 @@ +# Directory Structure + +- [Directory Structure](#directory-structure) + - [Workspace](#workspace) + - [Metadata Repository](#metadata-repository) + - [Data Volume](#data-volume) + +## Workspace + +Workspace is a directory that is set up to keep track of certain datasets. It consists of: + +- `.kamu` - Metadata repository +- `.kamu.local` - Local data volume + +Workspace is usually a part of a `git` (or any other version control) repository. VCS stores everything needed to reconstruct exactly the same state of data on any other machine, but doesn't store data itself. + +``` +├── .git +├── .kamu +│ └── +│── .kamu.local +│ └── +└── ... +``` + +## Metadata Repository + +Metadata repository consists of: + +- `datasets` - definitions of all datasets tracked by the workspace +- `volumes` - definitions of all volumes that are used for externally storing the dataset data +- `kamu.yaml` - local workspace configuration + +``` +├── datasets +│   ├── com.example.deriv.yaml +│   └── com.example.root.yaml +├── volumes +│ └── public.s3.example.com.yaml +├── .gitignore +└── kamu.yaml +``` + +## Data Volume + +Volumes store actual data records locally within the workspace or externally (e.g. S3, GCS). + +- `datasets` - definitions of the datasets present in the volume +- `data` - actual data records +- `checkpoints` - information necessary to resume the data processing +- `cache` (local volume only) - used for caching results between ingestion stages to speed up iterations + +``` +├── datasets +│   ├── com.example.deriv.yaml +│   └── com.example.root.yaml +├── data +│   ├── com.example.deriv +│   │   └── *.parquet +│   └── com.example.root +│   └── *.parquet +├── checkpoints +│ ├── com.example.root +│ │ ├── foo_1 +│ │ │ └── +│ │ └── bar_n +│ │ └── ... +│   └── com.example.deriv +│ └── primary +│   └── +└── kamu.yaml +``` diff --git a/src/main/scala/dev/kamu/cli/CliParser.scala b/src/main/scala/dev/kamu/cli/CliParser.scala index b763899143..cf721d06b0 100644 --- a/src/main/scala/dev/kamu/cli/CliParser.scala +++ b/src/main/scala/dev/kamu/cli/CliParser.scala @@ -322,6 +322,76 @@ class CliArgs(arguments: Seq[String]) extends ScallopConf(arguments) { ///////////////////////////////////////////////////////////////////////////// + val push = new KamuSubcommand("push") { + descr("Push the data to a remote volume") + + val volume = opt[String]( + "volume", + descr = "ID of the volume to push data to", + required = true + ) + + val all = opt[Boolean]( + "all", + descr = "Push all datasets" + ) + + val recursive = opt[Boolean]( + "recursive", + descr = "Push datasets and their dependencies" + ) + + val ids = trailArg[List[String]]( + "ids", + required = false, + descr = "IDs of the datasets to push", + default = Some(List.empty) + ) + } + addSubcommand(push) + + ///////////////////////////////////////////////////////////////////////////// + + val volume = new KamuSubcommand("volume") { + descr("List or manipulate the volumes") + + val list = new TabularOutputSubcommand("list") { + banner("List existing volumes") + } + addSubcommand(list) + + val add = new Subcommand("add") { + banner("Add new volumes") + + val manifests = trailArg[List[java.net.URI]]( + "manifests", + required = false, + descr = "Manifest URLs/files containing volume definitions", + default = Some(List.empty) + ) + + val replace = opt[Boolean]( + "replace", + descr = "Delete and replace the volumes that already exist" + ) + } + addSubcommand(add) + + val delete = new Subcommand("delete") { + banner("Delete existing volume") + + val ids = trailArg[List[String]]( + "ids", + "IDs of the volumes to delete", + default = Some(List.empty) + ) + } + addSubcommand(delete) + } + addSubcommand(volume) + + ///////////////////////////////////////////////////////////////////////////// + val sql = new TabularOutputSubcommand("sql") { descr("Executes an SQL query or drops you into an SQL shell") diff --git a/src/main/scala/dev/kamu/cli/GenericResourceRepository.scala b/src/main/scala/dev/kamu/cli/GenericResourceRepository.scala new file mode 100644 index 0000000000..cf19744862 --- /dev/null +++ b/src/main/scala/dev/kamu/cli/GenericResourceRepository.scala @@ -0,0 +1,181 @@ +/* + * Copyright (c) 2018 kamu.dev + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +package dev.kamu.cli + +import java.net.URI + +import dev.kamu.core.manifests.{DatasetID, Manifest, Resource} +import dev.kamu.core.manifests.parsing.pureconfig.yaml +import dev.kamu.core.utils.fs._ +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.log4j.LogManager +import pureconfig.{ConfigReader, ConfigWriter, Derivation} + +import scala.reflect.ClassTag + +class GenericResourceRepository[TRes <: Resource[TRes]: ClassTag, TID]( + fileSystem: FileSystem, + storagePath: Path, + resourceKind: String, + idFromString: String => TID, + idFromRes: TRes => TID +)( + implicit rderivation: Derivation[ConfigReader[Manifest[TRes]]], + wderivation: Derivation[ConfigWriter[Manifest[TRes]]] +) { + private val logger = LogManager.getLogger(getClass.getName) + + def getResourcePath(id: TID): Path = { + storagePath.resolve(id.toString + ".yaml") + } + + def getAllResourceIDs(): Seq[TID] = { + fileSystem + .listStatus(storagePath) + .map(_.getPath.getName) + .map(filename => filename.substring(0, filename.length - ".yaml".length)) + .map(idFromString) + } + + def getResource(id: TID): TRes = { + getResourceOpt(id) match { + case None => throw new DoesNotExistException(id.toString, resourceKind) + case Some(ds) => ds + } + } + + def getResourceOpt(id: TID): Option[TRes] = { + val path = getResourcePath(id) + + if (!fileSystem.exists(path)) + None + else + Some(loadResourceFromFile(path)) + } + + def getAllResources(): Seq[TRes] = { + val resourceFiles = fileSystem + .listStatus(storagePath) + .map(_.getPath) + + resourceFiles.map(loadResourceFromFile) + } + + def addResource(res: TRes): Unit = { + val id = idFromRes(res) + val path = getResourcePath(id) + + if (fileSystem.exists(path)) + throw new AlreadyExistsException( + id.toString, + resourceKind + ) + + saveResource(res) + } + + def deleteResource(id: TID): Unit = { + val path = getResourcePath(id) + + if (!fileSystem.exists(path)) + throw new DoesNotExistException(id.toString, resourceKind) + + fileSystem.delete(path, false) + } + + def loadResourceFromFile(p: Path): TRes = { + val inputStream = fileSystem.open(p) + try { + yaml.load[Manifest[TRes]](inputStream).content + } catch { + case e: Exception => + logger.error(s"Error while loading $resourceKind from file: $p") + throw e + } finally { + inputStream.close() + } + } + + def saveResource(res: TRes): Unit = { + val path = getResourcePath(idFromRes(res)) + saveResourceToFile(res, path) + } + + def saveResourceToFile(res: TRes, path: Path): Unit = { + val outputStream = fileSystem.create(path) + + try { + yaml.save(res.asManifest, outputStream) + } catch { + case e: Exception => + outputStream.close() + fileSystem.delete(path, false) + throw e + } finally { + outputStream.close() + } + } + + def loadResourceFromURI(uri: URI): TRes = { + uri.getScheme match { + case "https" => loadResourceFromURL(uri.toURL) + case "http" => loadResourceFromURL(uri.toURL) + case null | "file" => loadResourceFromFile(new Path(uri.getPath)) + case s => throw new SchemaNotSupportedException(s) + } + } + + private def loadResourceFromURL(url: java.net.URL): TRes = { + val source = scala.io.Source.fromURL(url) + try { + yaml.load[Manifest[TRes]](source.mkString).content + } catch { + case e: Exception => + logger.error( + s"Error while loading ${resourceKind} manifest from URL: $url" + ) + throw e + } finally { + source.close() + } + } +} + +///////////////////////////////////////////////////////////////////////////////////////// +// Exceptions +///////////////////////////////////////////////////////////////////////////////////////// + +class DoesNotExistException( + val id: String, + val kind: String +) extends Exception(s"${kind.capitalize} $id does not exist") + +class AlreadyExistsException( + val id: String, + val kind: String +) extends Exception(s"${kind.capitalize} $id already exists") + +class MissingReferenceException( + val fromID: String, + val fromKind: String, + val toID: String, + val toKind: String +) extends Exception( + s"${fromKind.capitalize} $fromID refers to non existent $toKind $toID" + ) + +class SchemaNotSupportedException(val schema: String) + extends Exception(s"$schema") + +class DanglingReferenceException( + val fromIDs: Seq[DatasetID], + val toID: DatasetID +) extends Exception( + s"Dataset $toID is referenced by: " + fromIDs.mkString(", ") + ) diff --git a/src/main/scala/dev/kamu/cli/Kamu.scala b/src/main/scala/dev/kamu/cli/Kamu.scala index 4c248e759d..7e889573da 100644 --- a/src/main/scala/dev/kamu/cli/Kamu.scala +++ b/src/main/scala/dev/kamu/cli/Kamu.scala @@ -24,12 +24,16 @@ class Kamu( val workspaceLayout = WorkspaceLayout( metadataRootDir = config.kamuRoot, datasetsDir = config.kamuRoot.resolve("datasets"), + volumesDir = config.kamuRoot.resolve("volumes"), localVolumeDir = config.localVolume ).toAbsolute(fileSystem) val metadataRepository = new MetadataRepository(fileSystem, workspaceLayout) + val volumeOperatorFactory = + new VolumeOperatorFactory(fileSystem, workspaceLayout, metadataRepository) + def run(cliArgs: CliArgs): Unit = { val command = getCommand(cliArgs) @@ -90,6 +94,7 @@ class Kamu( fileSystem, workspaceLayout, metadataRepository, + volumeOperatorFactory, getSparkRunner( c.localSpark(), if (c.debug()) Level.INFO else c.sparkLogLevel() @@ -98,6 +103,34 @@ class Kamu( c.pull.all(), c.pull.recursive() ) + case List(c.push) => + new PushCommand( + fileSystem, + workspaceLayout, + metadataRepository, + volumeOperatorFactory, + c.push.volume(), + c.push.ids(), + c.push.all(), + c.push.recursive() + ) + case List(c.volume, c.volume.list) => + new VolumeListCommand( + metadataRepository, + getOutputFormatter(c.volume.list.getOutputFormat) + ) + case List(c.volume, c.volume.add) => + new VolumeAddCommand( + metadataRepository, + volumeOperatorFactory, + c.volume.add.manifests(), + c.volume.add.replace() + ) + case List(c.volume, c.volume.delete) => + new VolumeDeleteCommand( + metadataRepository, + c.volume.delete.ids() + ) case List(c.sql) => new SQLShellCommand( metadataRepository, diff --git a/src/main/scala/dev/kamu/cli/KamuApp.scala b/src/main/scala/dev/kamu/cli/KamuApp.scala index d1f1b59b07..db04bd02a6 100644 --- a/src/main/scala/dev/kamu/cli/KamuApp.scala +++ b/src/main/scala/dev/kamu/cli/KamuApp.scala @@ -21,6 +21,7 @@ object KamuApp extends App { val config = KamuConfig() val fileSystem = FileSystem.get(new Configuration()) + FileSystem.enableSymlinks() fileSystem.setWriteChecksum(false) fileSystem.setVerifyChecksum(false) diff --git a/src/main/scala/dev/kamu/cli/MetadataRepository.scala b/src/main/scala/dev/kamu/cli/MetadataRepository.scala index 8934fb9f76..d2baa9ae9c 100644 --- a/src/main/scala/dev/kamu/cli/MetadataRepository.scala +++ b/src/main/scala/dev/kamu/cli/MetadataRepository.scala @@ -10,7 +10,13 @@ package dev.kamu.cli import java.io.PrintWriter -import dev.kamu.core.manifests.{Dataset, DatasetID, Manifest, VolumeLayout} +import dev.kamu.core.manifests.{ + Dataset, + DatasetID, + Volume, + VolumeID, + VolumeLayout +} import dev.kamu.cli.utility.DependencyGraph import org.apache.hadoop.fs.{FileSystem, Path} import dev.kamu.core.manifests.parsing.pureconfig.yaml @@ -27,17 +33,30 @@ class MetadataRepository( ) { private val logger = LogManager.getLogger(getClass.getName) + protected val datasetRepo = + new GenericResourceRepository[Dataset, DatasetID]( + fileSystem, + workspaceLayout.datasetsDir, + "dataset", + DatasetID, + (ds: Dataset) => ds.id + ) + + protected val volumeRepo = + new GenericResourceRepository[Volume, VolumeID]( + fileSystem, + workspaceLayout.volumesDir, + "volume", + VolumeID, + (vol: Volume) => vol.id + ) + //////////////////////////////////////////////////////////////////////////// // Datasets //////////////////////////////////////////////////////////////////////////// def getDataset(id: DatasetID): Dataset = { - val path = getDatasetDefinitionPath(id) - - if (!fileSystem.exists(path)) - throw new DoesNotExistsException(id) - - loadDatasetFromFile(path) + datasetRepo.getResource(id) } def getDatasetsInDependencyOrder( @@ -57,85 +76,35 @@ class MetadataRepository( } def getAllDatasetIDs(): Seq[DatasetID] = { - fileSystem - .listStatus(workspaceLayout.datasetsDir) - .map(_.getPath.getName) - .map(filename => filename.substring(0, filename.length - ".yaml".length)) - .map(DatasetID) + datasetRepo.getAllResourceIDs() } def getAllDatasets(): Seq[Dataset] = { - val manifestFiles = fileSystem - .listStatus(workspaceLayout.datasetsDir) - .map(_.getPath) - - manifestFiles.map(loadDatasetFromFile) - } - - def loadDatasetFromFile(p: Path): Dataset = { - val inputStream = fileSystem.open(p) - try { - yaml.load[Manifest[Dataset]](inputStream).content - } catch { - case e: Exception => - logger.error(s"Error while loading data source manifest from file: $p") - throw e - } finally { - inputStream.close() - } + datasetRepo.getAllResources() } def loadDatasetFromURI(uri: URI): Dataset = { - uri.getScheme match { - case "https" => loadDatasetFromURL(uri.toURL) - case "http" => loadDatasetFromURL(uri.toURL) - case "file" => loadDatasetFromFile(new Path(uri.getPath)) - case s => throw new SchemaNotSupportedException(s) - } - } - - def loadDatasetFromURL(url: java.net.URL): Dataset = { - val source = scala.io.Source.fromURL(url) - try { - yaml.load[Manifest[Dataset]](source.mkString).content - } catch { - case e: Exception => - logger.error(s"Error while loading data source manifest from URL: $url") - throw e - } finally { - source.close() - } + datasetRepo.loadResourceFromURI(uri) } def addDataset(ds: Dataset): Unit = { - val path = getDatasetDefinitionPath(ds.id) - - if (fileSystem.exists(path)) - throw new AlreadyExistsException(ds.id) + if (datasetRepo.getResourceOpt(ds.id).isDefined) + throw new AlreadyExistsException(ds.id.toString, "dataset") try { ds.dependsOn.map(getDataset) + ds.remoteSource.map(rs => getVolume(rs.volumeID)) } catch { - case e: DoesNotExistsException => - throw new MissingReferenceException(ds.id, e.datasetID) + case e: DoesNotExistException => + throw new MissingReferenceException( + ds.id.toString, + "dataset", + e.id, + e.kind + ) } - saveDataset(ds, path) - } - - def saveDataset(ds: Dataset, path: Path): Unit = { - val outputStream = fileSystem.create(path) - - try { - yaml.save(ds.asManifest, outputStream) - } catch { - case e: Exception => - outputStream.close() - fileSystem.delete(path, false) - throw e - } finally { - outputStream.close() - } + datasetRepo.addResource(ds) } def deleteDataset(id: DatasetID): Unit = { @@ -144,12 +113,9 @@ class MetadataRepository( if (referencedBy.nonEmpty) throw new DanglingReferenceException(referencedBy.map(_.id), id) - val path = getDatasetDefinitionPath(id) - if (!fileSystem.exists(path)) - throw new DoesNotExistsException(id) - getAllDataPaths(id).foreach(p => fileSystem.delete(p, true)) - fileSystem.delete(path, false) + + datasetRepo.deleteResource(id) } def purgeDataset(id: DatasetID): Unit = { @@ -158,19 +124,20 @@ class MetadataRepository( // TODO: Purging a dataset that is used by non-empty derivatives should raise an error } + def exportDataset(ds: Dataset, path: Path): Unit = { + datasetRepo.saveResourceToFile(ds, path) + } + protected def getAllDataPaths(id: DatasetID): Seq[Path] = { - val volume = getVolumeFor(id) + val volumeLayout = getLocalVolume() Seq( - volume.dataDir.resolve(id.toString), - volume.checkpointsDir.resolve(id.toString) + volumeLayout.dataDir.resolve(id.toString), + volumeLayout.cacheDir.resolve(id.toString), + volumeLayout.checkpointsDir.resolve(id.toString) ) } - protected def getDatasetDefinitionPath(id: DatasetID): Path = { - workspaceLayout.datasetsDir.resolve(id.toString + ".yaml") - } - //////////////////////////////////////////////////////////////////////////// // Volumes //////////////////////////////////////////////////////////////////////////// @@ -188,35 +155,34 @@ class MetadataRepository( VolumeLayout( datasetsDir = workspaceLayout.localVolumeDir.resolve("datasets"), checkpointsDir = workspaceLayout.localVolumeDir.resolve("checkpoints"), - dataDir = workspaceLayout.localVolumeDir.resolve("data") + dataDir = workspaceLayout.localVolumeDir.resolve("data"), + cacheDir = workspaceLayout.localVolumeDir.resolve("cache") ) } - def getVolumeFor(id: DatasetID): VolumeLayout = { - // Only local volumes are supported now - getLocalVolume() + def loadVolumeFromURI(uri: URI): Volume = { + volumeRepo.loadResourceFromURI(uri) } -} -///////////////////////////////////////////////////////////////////////////////////////// -// Exceptions -///////////////////////////////////////////////////////////////////////////////////////// + def getAllVolumeIDs(): Seq[VolumeID] = { + volumeRepo.getAllResourceIDs() + } -class DoesNotExistsException(val datasetID: DatasetID) - extends Exception(s"Dataset $datasetID does not exist") + def getAllVolumes(): Seq[Volume] = { + volumeRepo.getAllResources() + } -class AlreadyExistsException(val datasetID: DatasetID) - extends Exception(s"Dataset $datasetID already exists") + def getVolume(volumeID: VolumeID): Volume = { + volumeRepo.getResource(volumeID) + } -class MissingReferenceException(val fromID: DatasetID, val toID: DatasetID) - extends Exception(s"Dataset $fromID refers to non existent dataset $toID") + def addVolume(volume: Volume): Unit = { + volumeRepo.addResource(volume) + } -class SchemaNotSupportedException(val schema: String) - extends Exception(s"$schema") + def deleteVolume(volumeID: VolumeID): Unit = { + // TODO: validate references + volumeRepo.deleteResource(volumeID) + } -class DanglingReferenceException( - val fromIDs: Seq[DatasetID], - val toID: DatasetID -) extends Exception( - s"Dataset $toID is referenced by: " + fromIDs.mkString(", ") - ) +} diff --git a/src/main/scala/dev/kamu/cli/WorkspaceLayout.scala b/src/main/scala/dev/kamu/cli/WorkspaceLayout.scala index f0b2f4f2fb..22f2944f08 100644 --- a/src/main/scala/dev/kamu/cli/WorkspaceLayout.scala +++ b/src/main/scala/dev/kamu/cli/WorkspaceLayout.scala @@ -17,6 +17,8 @@ case class WorkspaceLayout( metadataRootDir: Path, /** Contains dataset definitions */ datasetsDir: Path, + /** Contains volume definitions */ + volumesDir: Path, /** Root directory of a local storage volume */ localVolumeDir: Path ) { @@ -25,6 +27,7 @@ case class WorkspaceLayout( copy( metadataRootDir = fs.toAbsolute(metadataRootDir), datasetsDir = fs.toAbsolute(datasetsDir), + volumesDir = fs.toAbsolute(volumesDir), localVolumeDir = fs.toAbsolute(localVolumeDir) ) } @@ -34,7 +37,6 @@ case class WorkspaceLayout( object WorkspaceLayout { val GITIGNORE_CONTENT: String = """ - |/poll |""".stripMargin val LOCAL_VOLUME_GITIGNORE_CONTENT: String = diff --git a/src/main/scala/dev/kamu/cli/commands/AddCommand.scala b/src/main/scala/dev/kamu/cli/commands/AddCommand.scala index 0878fd26ec..a7e91ff310 100644 --- a/src/main/scala/dev/kamu/cli/commands/AddCommand.scala +++ b/src/main/scala/dev/kamu/cli/commands/AddCommand.scala @@ -68,6 +68,6 @@ class AddCommand( .map(addDataset) .count(added => added) - logger.info(s"Added $numAdded datasets") + logger.info(s"Added $numAdded dataset(s)") } } diff --git a/src/main/scala/dev/kamu/cli/commands/AddInteractiveCommand.scala b/src/main/scala/dev/kamu/cli/commands/AddInteractiveCommand.scala index f81717e7a1..076b3a70f3 100644 --- a/src/main/scala/dev/kamu/cli/commands/AddInteractiveCommand.scala +++ b/src/main/scala/dev/kamu/cli/commands/AddInteractiveCommand.scala @@ -37,7 +37,7 @@ class AddInteractiveCommand( logger.info("Added dataset") } else { val path = new Path("./" + dataset.id + ".yaml") - metadataRepository.saveDataset( + metadataRepository.exportDataset( dataset, path ) diff --git a/src/main/scala/dev/kamu/cli/commands/CompletionCommand.scala b/src/main/scala/dev/kamu/cli/commands/CompletionCommand.scala index 8be967bb7b..a43f94acbd 100644 --- a/src/main/scala/dev/kamu/cli/commands/CompletionCommand.scala +++ b/src/main/scala/dev/kamu/cli/commands/CompletionCommand.scala @@ -76,12 +76,18 @@ class CompletionCommand( val flags = builder.opts.filter(!_.isPositional).map("--" + _.name) - val positional = builder.opts.filter(_.isPositional).map(_.name) match { - case Seq("ids") => - Seq("options+=`ls .kamu/datasets | sed -e 's/\\.[^.]*$//'`") - case _ => - Seq.empty - } + val positional = builder.opts + .filter(_.isPositional) + .flatMap(opt => { + opt.name match { + case "ids" if opt.descr.contains("dataset") => + Seq("options+=`ls .kamu/datasets | sed -e 's/\\.[^.]*$//'`") + case "ids" if opt.descr.contains("volume") => + Seq("options+=`ls .kamu/volumes | sed -e 's/\\.[^.]*$//'`") + case _ => + Seq.empty + } + }) val finale = Seq( "if [[ $cur == -* ]]; then", diff --git a/src/main/scala/dev/kamu/cli/commands/DeleteCommand.scala b/src/main/scala/dev/kamu/cli/commands/DeleteCommand.scala index f17e6d7793..c3a96fa83c 100644 --- a/src/main/scala/dev/kamu/cli/commands/DeleteCommand.scala +++ b/src/main/scala/dev/kamu/cli/commands/DeleteCommand.scala @@ -10,7 +10,7 @@ package dev.kamu.cli.commands import dev.kamu.cli.{ DanglingReferenceException, - DoesNotExistsException, + DoesNotExistException, MetadataRepository } import dev.kamu.core.manifests.DatasetID @@ -30,7 +30,7 @@ class DeleteCommand( metadataRepository.deleteDataset(id) 1 } catch { - case e: DoesNotExistsException => + case e: DoesNotExistException => logger.error(e.getMessage) 0 case e: DanglingReferenceException => diff --git a/src/main/scala/dev/kamu/cli/commands/InitCommand.scala b/src/main/scala/dev/kamu/cli/commands/InitCommand.scala index 319bd89c9a..7ce59e5984 100644 --- a/src/main/scala/dev/kamu/cli/commands/InitCommand.scala +++ b/src/main/scala/dev/kamu/cli/commands/InitCommand.scala @@ -28,6 +28,7 @@ class InitCommand( throw new UsageException("Already a kamu workspace") fileSystem.mkdirs(workspaceLayout.datasetsDir) + fileSystem.mkdirs(workspaceLayout.volumesDir) val outputStream = fileSystem.create(workspaceLayout.metadataRootDir.resolve(".gitignore")) diff --git a/src/main/scala/dev/kamu/cli/commands/PullCommand.scala b/src/main/scala/dev/kamu/cli/commands/PullCommand.scala index b62cc3ac7f..5a6934face 100644 --- a/src/main/scala/dev/kamu/cli/commands/PullCommand.scala +++ b/src/main/scala/dev/kamu/cli/commands/PullCommand.scala @@ -8,15 +8,20 @@ package dev.kamu.cli.commands -import dev.kamu.cli.external.SparkRunner -import dev.kamu.cli.{MetadataRepository, WorkspaceLayout} -import dev.kamu.core.manifests.{Dataset, DatasetID, ExternalSourceKind} +import dev.kamu.cli.external.{SparkRunner, VolumeOperatorFactory} +import dev.kamu.cli.{ + DoesNotExistException, + MetadataRepository, + UsageException, + WorkspaceLayout +} import dev.kamu.core.ingest.polling +import dev.kamu.core.manifests.parsing.pureconfig.yaml +import dev.kamu.core.manifests.{Dataset, DatasetID, ExternalSourceKind} import dev.kamu.core.transform.streaming +import dev.kamu.core.utils.fs._ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.log4j.LogManager -import dev.kamu.core.manifests.parsing.pureconfig.yaml -import dev.kamu.core.utils.fs._ import yaml.defaults._ import pureconfig.generic.auto._ @@ -24,6 +29,7 @@ class PullCommand( fileSystem: FileSystem, workspaceLayout: WorkspaceLayout, metadataRepository: MetadataRepository, + volumeOperatorFactory: VolumeOperatorFactory, sparkRunner: SparkRunner, ids: Seq[String], all: Boolean, @@ -37,11 +43,16 @@ class PullCommand( else ids.map(DatasetID) } - val plan = metadataRepository - .getDatasetsInDependencyOrder( - datasetIDs, - recursive || all // All implies recursive, which is more efficient - ) + val plan = try { + metadataRepository + .getDatasetsInDependencyOrder( + datasetIDs, + recursive || all // All implies recursive, which is more efficient + ) + } catch { + case e: DoesNotExistException => + throw new UsageException(e.getMessage) + } logger.debug(s"Pulling datasets in following order:") plan.foreach(d => logger.debug(s" ${d.id.toString}")) @@ -56,12 +67,13 @@ class PullCommand( val (batch, rest) = plan.span(_.kind == kind) kind match { - case Dataset.Kind.Root => pullRoot(batch) + case Dataset.Kind.Root => + pullRoot(batch) case Dataset.Kind.Derivative => // TODO: Streaming transform currently doesn't handle dependencies batch.foreach(ds => pullDerivative(Seq(ds))) case Dataset.Kind.Remote => - throw new NotImplementedError("Cannot pull remote datasets") + pullRemote(batch) } batch.length + pullBatched(rest) @@ -96,12 +108,13 @@ class PullCommand( val pollConfig = polling.AppConf( tasks = datasets .map(ds => { - val volume = metadataRepository.getVolumeFor(ds.id) + val volumeLayout = metadataRepository.getLocalVolume() polling.IngestTask( datasetToIngest = ds, - checkpointsPath = volume.checkpointsDir.resolve(ds.id.toString), - pollCachePath = volume.checkpointsDir.resolve(ds.id.toString), - dataPath = volume.dataDir.resolve(ds.id.toString) + checkpointsPath = + volumeLayout.checkpointsDir.resolve(ds.id.toString), + pollCachePath = volumeLayout.cacheDir.resolve(ds.id.toString), + dataPath = volumeLayout.dataDir.resolve(ds.id.toString) ) }) ) @@ -139,15 +152,17 @@ class PullCommand( val transformConfig = streaming.AppConfig( tasks = datasets.map(ds => { - val volume = metadataRepository.getVolumeFor(ds.id) + val volumeLayout = metadataRepository.getLocalVolume() streaming.TransformTaskConfig( datasetToTransform = ds, inputDataPaths = ds.derivativeSource.get.inputs - .map(i => i.id.toString -> volume.dataDir.resolve(i.id.toString)) + .map( + i => i.id.toString -> volumeLayout.dataDir.resolve(i.id.toString) + ) .toMap, - checkpointsPath = volume.checkpointsDir.resolve(ds.id.toString), - outputDataPath = volume.dataDir.resolve(ds.id.toString) + checkpointsPath = volumeLayout.checkpointsDir.resolve(ds.id.toString), + outputDataPath = volumeLayout.dataDir.resolve(ds.id.toString) ) }) ) @@ -170,4 +185,20 @@ class PullCommand( true } + + /////////////////////////////////////////////////////////////////////////////////////// + // Remote + /////////////////////////////////////////////////////////////////////////////////////// + + def pullRemote(batch: Seq[Dataset]): Boolean = { + for (ds <- batch) { + val sourceVolume = metadataRepository + .getVolume(ds.remoteSource.get.volumeID) + + val volumeOperator = volumeOperatorFactory.buildFor(sourceVolume) + + volumeOperator.pull(Seq(ds)) + } + true + } } diff --git a/src/main/scala/dev/kamu/cli/commands/PurgeCommand.scala b/src/main/scala/dev/kamu/cli/commands/PurgeCommand.scala index fe9703521c..15086b1f6b 100644 --- a/src/main/scala/dev/kamu/cli/commands/PurgeCommand.scala +++ b/src/main/scala/dev/kamu/cli/commands/PurgeCommand.scala @@ -8,7 +8,7 @@ package dev.kamu.cli.commands -import dev.kamu.cli.{DoesNotExistsException, MetadataRepository} +import dev.kamu.cli.{DoesNotExistException, MetadataRepository} import dev.kamu.core.manifests.DatasetID import org.apache.log4j.LogManager @@ -33,7 +33,7 @@ class PurgeCommand( metadataRepository.purgeDataset(id) 1 } catch { - case e: DoesNotExistsException => + case e: DoesNotExistException => logger.error(e.getMessage) 0 } diff --git a/src/main/scala/dev/kamu/cli/commands/PushCommand.scala b/src/main/scala/dev/kamu/cli/commands/PushCommand.scala new file mode 100644 index 0000000000..5376e148b7 --- /dev/null +++ b/src/main/scala/dev/kamu/cli/commands/PushCommand.scala @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2018 kamu.dev + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +package dev.kamu.cli.commands + +import dev.kamu.cli.external.VolumeOperatorFactory +import dev.kamu.cli.{ + DoesNotExistException, + MetadataRepository, + UsageException, + WorkspaceLayout +} +import dev.kamu.core.manifests.{Dataset, DatasetID, Volume, VolumeID} +import org.apache.hadoop.fs.FileSystem +import org.apache.log4j.LogManager + +class PushCommand( + fileSystem: FileSystem, + workspaceLayout: WorkspaceLayout, + metadataRepository: MetadataRepository, + volumeOperatorFactory: VolumeOperatorFactory, + volumeID: String, + ids: Seq[String], + all: Boolean, + recursive: Boolean +) extends Command { + private val logger = LogManager.getLogger(getClass.getName) + + def run(): Unit = { + val volume = try { + metadataRepository.getVolume(VolumeID(volumeID)) + } catch { + case e: DoesNotExistException => + throw new UsageException(e.getMessage) + } + + val datasetIDs = { + if (all) metadataRepository.getAllDatasetIDs() + else ids.map(DatasetID) + } + + val plan = try { + metadataRepository + .getDatasetsInDependencyOrder( + datasetIDs, + recursive || all // All implies recursive, which is more efficient + ) + } catch { + case e: DoesNotExistException => + throw new UsageException(e.getMessage) + } + + logger.debug(s"Pushing following dataset(s): ${plan.mkString(", ")}") + + val numPushed = pushToVolume(plan, volume) + logger.info(s"Pushed $numPushed dataset(s)") + } + + def pushToVolume(datasets: Seq[Dataset], volume: Volume): Int = { + val volumeOperator = volumeOperatorFactory.buildFor(volume) + volumeOperator.push(datasets) + datasets.length + } +} diff --git a/src/main/scala/dev/kamu/cli/commands/VolumeAddCommand.scala b/src/main/scala/dev/kamu/cli/commands/VolumeAddCommand.scala new file mode 100644 index 0000000000..2ad391cb8d --- /dev/null +++ b/src/main/scala/dev/kamu/cli/commands/VolumeAddCommand.scala @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2018 kamu.dev + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +package dev.kamu.cli.commands + +import dev.kamu.cli.external.VolumeOperatorFactory +import dev.kamu.cli.{ + AlreadyExistsException, + MetadataRepository, + MissingReferenceException, + SchemaNotSupportedException +} +import dev.kamu.core.manifests.Volume +import org.apache.log4j.LogManager + +class VolumeAddCommand( + metadataRepository: MetadataRepository, + volumeOperatorFactory: VolumeOperatorFactory, + manifests: Seq[java.net.URI], + replace: Boolean +) extends Command { + private val logger = LogManager.getLogger(getClass.getName) + + def run(): Unit = { + val volumes = { + try { + manifests.map(manifestURI => { + logger.debug(s"Loading volume from: $manifestURI") + metadataRepository.loadVolumeFromURI(manifestURI) + }) + } catch { + case e: java.io.FileNotFoundException => + logger.error(s"File not found: ${e.getMessage} - aborted") + Seq.empty + case e: SchemaNotSupportedException => + logger.error(s"URI schema not supported: ${e.getMessage} - aborted") + Seq.empty + } + } + + @scala.annotation.tailrec + def addVolume(volume: Volume): Boolean = { + try { + volumeOperatorFactory.ensureSupported(volume) + metadataRepository.addVolume(volume) + true + } catch { + case e: NotImplementedError => + logger.warn(e.getMessage + " - skipping") + false + case e: AlreadyExistsException => + if (replace) { + logger.warn(e.getMessage + " - replacing") + metadataRepository.deleteVolume(volume.id) + addVolume(volume) + } else { + logger.warn(e.getMessage + " - skipping") + false + } + case e: MissingReferenceException => + logger.warn(e.getMessage + " - skipping") + false + } + } + + val numAdded = volumes + .map(addVolume) + .count(added => added) + + logger.info(s"Added $numAdded volume(s)") + } +} diff --git a/src/main/scala/dev/kamu/cli/commands/VolumeDeleteCommand.scala b/src/main/scala/dev/kamu/cli/commands/VolumeDeleteCommand.scala new file mode 100644 index 0000000000..bd63269cd8 --- /dev/null +++ b/src/main/scala/dev/kamu/cli/commands/VolumeDeleteCommand.scala @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2018 kamu.dev + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +package dev.kamu.cli.commands + +import dev.kamu.cli.{DoesNotExistException, MetadataRepository} +import dev.kamu.core.manifests.VolumeID +import org.apache.log4j.LogManager + +class VolumeDeleteCommand( + metadataRepository: MetadataRepository, + ids: Seq[String] +) extends Command { + private val logger = LogManager.getLogger(getClass.getName) + + def run(): Unit = { + val numDeleted = ids + .map(VolumeID) + .map(id => { + try { + metadataRepository.deleteVolume(id) + 1 + } catch { + case e: DoesNotExistException => + logger.error(e.getMessage) + 0 + } + }) + + logger.info(s"Deleted $numDeleted volume(s)") + } +} diff --git a/src/main/scala/dev/kamu/cli/commands/VolumeListCommand.scala b/src/main/scala/dev/kamu/cli/commands/VolumeListCommand.scala new file mode 100644 index 0000000000..e7a16f9d8c --- /dev/null +++ b/src/main/scala/dev/kamu/cli/commands/VolumeListCommand.scala @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2018 kamu.dev + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +package dev.kamu.cli.commands + +import dev.kamu.cli.MetadataRepository +import dev.kamu.cli.output.{OutputFormatter, SimpleResultSet} +import org.apache.log4j.LogManager + +class VolumeListCommand( + metadataRepository: MetadataRepository, + outputFormatter: OutputFormatter +) extends Command { + private val logger = LogManager.getLogger(getClass.getName) + + def run(): Unit = { + + val rs = new SimpleResultSet() + rs.addColumn("ID") + rs.addColumn("URL") + + metadataRepository + .getAllVolumes() + .map( + v => (v.id.toString, v.url.toString) + ) + .sortBy { + case (id, _) => id + } + .foreach { + case (id, url) => + rs.addRow(id, url) + } + + outputFormatter.format(rs) + } +} diff --git a/src/main/scala/dev/kamu/cli/external/SparkRunner.scala b/src/main/scala/dev/kamu/cli/external/SparkRunner.scala index b56394a6ae..f21e08f03e 100644 --- a/src/main/scala/dev/kamu/cli/external/SparkRunner.scala +++ b/src/main/scala/dev/kamu/cli/external/SparkRunner.scala @@ -105,11 +105,6 @@ abstract class SparkRunner( } protected def tempDir: Path = { - // Note: not using "java.io.tmpdir" because on Mac this resolves to /var/folders for whatever reason - // and this directory is not mounted into docker's VM - val p = fileSystem.toAbsolute(new Path(".kamu/run")) - if (!fileSystem.exists(p)) - fileSystem.mkdirs(p) - p + new Path(sys.props("java.io.tmpdir")) } } diff --git a/src/main/scala/dev/kamu/cli/external/VolumeOperator.scala b/src/main/scala/dev/kamu/cli/external/VolumeOperator.scala new file mode 100644 index 0000000000..83f1bcfb91 --- /dev/null +++ b/src/main/scala/dev/kamu/cli/external/VolumeOperator.scala @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2018 kamu.dev + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +package dev.kamu.cli.external + +import dev.kamu.cli.{MetadataRepository, WorkspaceLayout} +import dev.kamu.core.manifests.{Dataset, Volume} +import org.apache.hadoop.fs.FileSystem + +trait VolumeOperator { + def push(datasets: Seq[Dataset]) + + def pull(datasets: Seq[Dataset]) +} + +class VolumeOperatorFactory( + fileSystem: FileSystem, + workspaceLayout: WorkspaceLayout, + metadataRepository: MetadataRepository +) { + def ensureSupported(volume: Volume): Unit = { + volume.url.getScheme match { + case "s3" => + case _ => + throw new NotImplementedError( + s"No volume operator found that support scheme: ${volume.url.getScheme}" + ) + } + } + + def buildFor(volume: Volume): VolumeOperator = { + volume.url.getScheme match { + case "s3" => + new VolumeOperatorS3Cli( + fileSystem, + workspaceLayout, + metadataRepository, + volume + ) + case _ => + throw new NotImplementedError( + s"No volume operator found that support scheme: ${volume.url.getScheme}" + ) + } + } +} diff --git a/src/main/scala/dev/kamu/cli/external/VolumeOperatorS3Cli.scala b/src/main/scala/dev/kamu/cli/external/VolumeOperatorS3Cli.scala new file mode 100644 index 0000000000..f9e890bfcb --- /dev/null +++ b/src/main/scala/dev/kamu/cli/external/VolumeOperatorS3Cli.scala @@ -0,0 +1,140 @@ +/* + * Copyright (c) 2018 kamu.dev + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +package dev.kamu.cli.external + +import dev.kamu.cli.{MetadataRepository, WorkspaceLayout} +import dev.kamu.core.manifests.{Dataset, Volume, VolumeLayout} +import dev.kamu.core.utils.fs._ +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.log4j.LogManager + +import scala.sys.process.Process + +class VolumeOperatorS3Cli( + fileSystem: FileSystem, + workspaceLayout: WorkspaceLayout, + metadataRepository: MetadataRepository, + volume: Volume +) extends VolumeOperator { + private val logger = LogManager.getLogger(getClass.getName) + + override def push(datasets: Seq[Dataset]): Unit = { + withVolumeLayout(datasets) { localDir => + s3Sync(localDir, new Path(volume.url)) + } + } + + override def pull(datasets: Seq[Dataset]): Unit = { + val localVolumeLayout = metadataRepository.getLocalVolume() + + val remoteVolumeLayout = VolumeLayout( + datasetsDir = new Path("datasets"), + checkpointsDir = new Path("checkpoints"), + dataDir = new Path("data"), + cacheDir = new Path("cache") + ) + + for (ds <- datasets) { + val sourceLayout = remoteVolumeLayout.relativeTo(new Path(volume.url)) + + s3Copy( + sourceLayout.datasetsDir.resolve(ds.id.toString + ".yaml"), + localVolumeLayout.datasetsDir.resolve(ds.id.toString + ".yaml") + ) + + s3Sync( + sourceLayout.checkpointsDir.resolve(ds.id.toString), + localVolumeLayout.checkpointsDir.resolve(ds.id.toString) + ) + + s3Sync( + sourceLayout.dataDir.resolve(ds.id.toString), + localVolumeLayout.dataDir.resolve(ds.id.toString) + ) + } + } + + protected def withVolumeLayout[T]( + datasets: Seq[Dataset] + )(func: Path => T): T = { + Temp.withRandomTempDir(fileSystem, "kamu-volume-") { tempDir => + val localVolumeLayout = + metadataRepository.getLocalVolume().toAbsolute(fileSystem) + + val targetLayout = VolumeLayout( + datasetsDir = new Path("datasets"), + checkpointsDir = new Path("checkpoints"), + dataDir = new Path("data"), + cacheDir = new Path("cache") + ).relativeTo(tempDir) + + targetLayout.allDirs.foreach(fileSystem.mkdirs) + + for (ds <- datasets) { + fileSystem.createSymlink( + localVolumeLayout.dataDir.resolve(ds.id.toString), + targetLayout.dataDir.resolve(ds.id.toString), + false + ) + + fileSystem.createSymlink( + localVolumeLayout.checkpointsDir.resolve(ds.id.toString), + targetLayout.checkpointsDir.resolve(ds.id.toString), + false + ) + + val dsManifestPath = + if (fileSystem.exists( + localVolumeLayout.datasetsDir.resolve(ds.id.toString + ".yaml") + )) + localVolumeLayout.datasetsDir.resolve(ds.id.toString + ".yaml") + else + workspaceLayout.datasetsDir.resolve(ds.id.toString + ".yaml") + + fileSystem.createSymlink( + dsManifestPath, + targetLayout.datasetsDir.resolve(ds.id.toString + ".yaml"), + false + ) + } + + func(tempDir) + } + } + + def s3Sync(from: Path, to: Path) = { + command( + Array( + "aws", + "s3", + "sync", + from.toString.replace("file:", ""), + to.toString.replace("file:", "") + ) + ) + } + + def s3Copy(from: Path, to: Path) = { + command( + Array( + "aws", + "s3", + "cp", + from.toString.replace("file:", ""), + to.toString.replace("file:", "") + ) + ) + } + + def command(cmd: Seq[String]) = { + logger.debug(s"Executing command: ${cmd.mkString(" ")}") + if (Process(cmd).! != 0) + throw new Exception(s"Command failed: ${cmd.mkString(" ")}") + } +} diff --git a/src/test/scala/dev/kamu/cli/KamuTestBase.scala b/src/test/scala/dev/kamu/cli/KamuTestBase.scala index ed05e6e3fc..2b1f54acd7 100644 --- a/src/test/scala/dev/kamu/cli/KamuTestBase.scala +++ b/src/test/scala/dev/kamu/cli/KamuTestBase.scala @@ -8,39 +8,21 @@ package dev.kamu.cli -import java.util.UUID - import dev.kamu.core.utils.fs._ import dev.kamu.core.utils.test.KamuDataFrameSuite import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.fs.FileSystem import org.scalatest._ trait KamuTestBase extends KamuDataFrameSuite { self: Suite => val fileSystem = FileSystem.get(new Configuration()) - def getSystemTempDir(): Path = - new Path(System.getProperty("java.io.tmpdir")) - - def getRandomDir(): Path = - getSystemTempDir() - .resolve("kamu-test-" + UUID.randomUUID.toString) - def withEmptyDir[T](func: KamuTestAdapter => T): T = { - val testDir = getRandomDir() - fileSystem.mkdirs(testDir) - - try { - val config = KamuConfig( - workspaceRoot = testDir - ) - + Temp.withRandomTempDir(fileSystem, "kamu-test-") { tempDir => + val config = KamuConfig(workspaceRoot = tempDir) val kamu = new KamuTestAdapter(config, fileSystem, spark) - func(kamu) - } finally { - fileSystem.delete(testDir, true) } } diff --git a/src/test/scala/dev/kamu/cli/MetadataRepositorySpec.scala b/src/test/scala/dev/kamu/cli/MetadataRepositorySpec.scala index 5652249e4e..001cb682a2 100644 --- a/src/test/scala/dev/kamu/cli/MetadataRepositorySpec.scala +++ b/src/test/scala/dev/kamu/cli/MetadataRepositorySpec.scala @@ -77,7 +77,7 @@ class MetadataRepositorySpec extends FunSuite with Matchers with KamuTestBase { new Path(kamu.config.workspaceRoot, "server") fileSystem.mkdirs(serverDir) val path: Path = new Path(serverDir, "test-dataset.yaml") - kamu.metadataRepository.saveDataset(expected, path) + kamu.metadataRepository.exportDataset(expected, path) // start up the server and host the directory val serverPort = 80 // httpd:2.4 default port @@ -123,11 +123,12 @@ class MetadataRepositorySpec extends FunSuite with Matchers with KamuTestBase { // works for names with colons and spaces s"${kamu.config.workspaceRoot.toString}${SEPARATOR}test: add from file" ) - val path: Path = new Path(testDir, "test-dataset.yaml") + fileSystem.mkdirs(testDir) - kamu.metadataRepository.saveDataset(expected, path) - val actual = - kamu.metadataRepository.loadDatasetFromFile(path) + val datasetPath: Path = new Path(testDir, "test-dataset.yaml") + kamu.metadataRepository.exportDataset(expected, datasetPath) + + val actual = kamu.metadataRepository.loadDatasetFromURI(datasetPath.toUri) actual shouldEqual expected }