Skip to content

Commit

Permalink
Eperimental support for S3 volumes
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiimk committed Jan 12, 2020
1 parent eaf91c3 commit f23bc6a
Show file tree
Hide file tree
Showing 27 changed files with 942 additions and 172 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,18 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.8.0] - 2020-01-12
### Added
- Experimental support for remote S3 volumes

## [0.7.1] - 2019-12-29
### Changed
- Bumped ingest version

## [0.7.0] - 2019-12-29
### Changed
- Using snake_case dataset vocabulary

## [0.6.0] - 2019-12-15
### Added
- Richer set of CSV reader options
Expand Down
2 changes: 1 addition & 1 deletion core.utils
1 change: 1 addition & 0 deletions docs/developer_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ What you'll need:
* Sbt
* Maven
* Spark (optional)
* AWS account and configured AWS CLI (optional, needed for S3 volumes)

Note: Use [SdkMan!](https://sdkman.io/) to install these dependencies:

Expand Down
72 changes: 72 additions & 0 deletions docs/directory_structure.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Directory Structure

- [Directory Structure](#directory-structure)
- [Workspace](#workspace)
- [Metadata Repository](#metadata-repository)
- [Data Volume](#data-volume)

## Workspace

Workspace is a directory that is set up to keep track of certain datasets. It consists of:

- `.kamu` - Metadata repository
- `.kamu.local` - Local data volume

Workspace is usually a part of a `git` (or any other version control) repository. VCS stores everything needed to reconstruct exactly the same state of data on any other machine, but doesn't store data itself.

```
├── .git
├── .kamu
│ └── <see metadata repository>
│── .kamu.local
│ └── <see volume>
└── ...
```

## Metadata Repository

Metadata repository consists of:

- `datasets` - definitions of all datasets tracked by the workspace
- `volumes` - definitions of all volumes that are used for externally storing the dataset data
- `kamu.yaml` - local workspace configuration

```
├── datasets
│   ├── com.example.deriv.yaml
│   └── com.example.root.yaml
├── volumes
│ └── public.s3.example.com.yaml
├── .gitignore
└── kamu.yaml
```

## Data Volume

Volumes store actual data records locally within the workspace or externally (e.g. S3, GCS).

- `datasets` - definitions of the datasets present in the volume
- `data` - actual data records
- `checkpoints` - information necessary to resume the data processing
- `cache` (local volume only) - used for caching results between ingestion stages to speed up iterations

```
├── datasets
│   ├── com.example.deriv.yaml
│   └── com.example.root.yaml
├── data
│   ├── com.example.deriv
│   │   └── *.parquet
│   └── com.example.root
│   └── *.parquet
├── checkpoints
│ ├── com.example.root
│ │ ├── foo_1
│ │ │ └── <ingest polling checkpoint files>
│ │ └── bar_n
│ │ └── ...
│   └── com.example.deriv
│ └── primary
│   └── <spark streaming checkpoint files>
└── kamu.yaml
```
70 changes: 70 additions & 0 deletions src/main/scala/dev/kamu/cli/CliParser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,76 @@ class CliArgs(arguments: Seq[String]) extends ScallopConf(arguments) {

/////////////////////////////////////////////////////////////////////////////

val push = new KamuSubcommand("push") {
descr("Push the data to a remote volume")

val volume = opt[String](
"volume",
descr = "ID of the volume to push data to",
required = true
)

val all = opt[Boolean](
"all",
descr = "Push all datasets"
)

val recursive = opt[Boolean](
"recursive",
descr = "Push datasets and their dependencies"
)

val ids = trailArg[List[String]](
"ids",
required = false,
descr = "IDs of the datasets to push",
default = Some(List.empty)
)
}
addSubcommand(push)

/////////////////////////////////////////////////////////////////////////////

val volume = new KamuSubcommand("volume") {
descr("List or manipulate the volumes")

val list = new TabularOutputSubcommand("list") {
banner("List existing volumes")
}
addSubcommand(list)

val add = new Subcommand("add") {
banner("Add new volumes")

val manifests = trailArg[List[java.net.URI]](
"manifests",
required = false,
descr = "Manifest URLs/files containing volume definitions",
default = Some(List.empty)
)

val replace = opt[Boolean](
"replace",
descr = "Delete and replace the volumes that already exist"
)
}
addSubcommand(add)

val delete = new Subcommand("delete") {
banner("Delete existing volume")

val ids = trailArg[List[String]](
"ids",
"IDs of the volumes to delete",
default = Some(List.empty)
)
}
addSubcommand(delete)
}
addSubcommand(volume)

/////////////////////////////////////////////////////////////////////////////

val sql = new TabularOutputSubcommand("sql") {
descr("Executes an SQL query or drops you into an SQL shell")

Expand Down
181 changes: 181 additions & 0 deletions src/main/scala/dev/kamu/cli/GenericResourceRepository.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/*
* Copyright (c) 2018 kamu.dev
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

package dev.kamu.cli

import java.net.URI

import dev.kamu.core.manifests.{DatasetID, Manifest, Resource}
import dev.kamu.core.manifests.parsing.pureconfig.yaml
import dev.kamu.core.utils.fs._
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.log4j.LogManager
import pureconfig.{ConfigReader, ConfigWriter, Derivation}

import scala.reflect.ClassTag

class GenericResourceRepository[TRes <: Resource[TRes]: ClassTag, TID](
fileSystem: FileSystem,
storagePath: Path,
resourceKind: String,
idFromString: String => TID,
idFromRes: TRes => TID
)(
implicit rderivation: Derivation[ConfigReader[Manifest[TRes]]],
wderivation: Derivation[ConfigWriter[Manifest[TRes]]]
) {
private val logger = LogManager.getLogger(getClass.getName)

def getResourcePath(id: TID): Path = {
storagePath.resolve(id.toString + ".yaml")
}

def getAllResourceIDs(): Seq[TID] = {
fileSystem
.listStatus(storagePath)
.map(_.getPath.getName)
.map(filename => filename.substring(0, filename.length - ".yaml".length))
.map(idFromString)
}

def getResource(id: TID): TRes = {
getResourceOpt(id) match {
case None => throw new DoesNotExistException(id.toString, resourceKind)
case Some(ds) => ds
}
}

def getResourceOpt(id: TID): Option[TRes] = {
val path = getResourcePath(id)

if (!fileSystem.exists(path))
None
else
Some(loadResourceFromFile(path))
}

def getAllResources(): Seq[TRes] = {
val resourceFiles = fileSystem
.listStatus(storagePath)
.map(_.getPath)

resourceFiles.map(loadResourceFromFile)
}

def addResource(res: TRes): Unit = {
val id = idFromRes(res)
val path = getResourcePath(id)

if (fileSystem.exists(path))
throw new AlreadyExistsException(
id.toString,
resourceKind
)

saveResource(res)
}

def deleteResource(id: TID): Unit = {
val path = getResourcePath(id)

if (!fileSystem.exists(path))
throw new DoesNotExistException(id.toString, resourceKind)

fileSystem.delete(path, false)
}

def loadResourceFromFile(p: Path): TRes = {
val inputStream = fileSystem.open(p)
try {
yaml.load[Manifest[TRes]](inputStream).content
} catch {
case e: Exception =>
logger.error(s"Error while loading $resourceKind from file: $p")
throw e
} finally {
inputStream.close()
}
}

def saveResource(res: TRes): Unit = {
val path = getResourcePath(idFromRes(res))
saveResourceToFile(res, path)
}

def saveResourceToFile(res: TRes, path: Path): Unit = {
val outputStream = fileSystem.create(path)

try {
yaml.save(res.asManifest, outputStream)
} catch {
case e: Exception =>
outputStream.close()
fileSystem.delete(path, false)
throw e
} finally {
outputStream.close()
}
}

def loadResourceFromURI(uri: URI): TRes = {
uri.getScheme match {
case "https" => loadResourceFromURL(uri.toURL)
case "http" => loadResourceFromURL(uri.toURL)
case null | "file" => loadResourceFromFile(new Path(uri.getPath))
case s => throw new SchemaNotSupportedException(s)
}
}

private def loadResourceFromURL(url: java.net.URL): TRes = {
val source = scala.io.Source.fromURL(url)
try {
yaml.load[Manifest[TRes]](source.mkString).content
} catch {
case e: Exception =>
logger.error(
s"Error while loading ${resourceKind} manifest from URL: $url"
)
throw e
} finally {
source.close()
}
}
}

/////////////////////////////////////////////////////////////////////////////////////////
// Exceptions
/////////////////////////////////////////////////////////////////////////////////////////

class DoesNotExistException(
val id: String,
val kind: String
) extends Exception(s"${kind.capitalize} $id does not exist")

class AlreadyExistsException(
val id: String,
val kind: String
) extends Exception(s"${kind.capitalize} $id already exists")

class MissingReferenceException(
val fromID: String,
val fromKind: String,
val toID: String,
val toKind: String
) extends Exception(
s"${fromKind.capitalize} $fromID refers to non existent $toKind $toID"
)

class SchemaNotSupportedException(val schema: String)
extends Exception(s"$schema")

class DanglingReferenceException(
val fromIDs: Seq[DatasetID],
val toID: DatasetID
) extends Exception(
s"Dataset $toID is referenced by: " + fromIDs.mkString(", ")
)
Loading

0 comments on commit f23bc6a

Please sign in to comment.