Skip to content

Commit

Permalink
Update to ODF resources
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiimk committed Jul 12, 2020
1 parent fdb5b6b commit aafef73
Show file tree
Hide file tree
Showing 24 changed files with 168 additions and 177 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [UNRELEASED]
## [0.21.0] - 2020-07-12
### Fixed
- Encoding issue in `DatasetSummary` manifest
### Changed
- Upgraded to use ODF resources

## [0.20.0] - 2020-06-30
### Added
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package dev.kamu.cli.commands

import java.nio.file.Paths

import com.typesafe.config.ConfigObject
import pureconfig.generic.auto._
import dev.kamu.core.manifests.parsing.pureconfig.yaml
import yaml.defaults._
Expand Down Expand Up @@ -68,7 +69,7 @@ class AddInteractiveCommand(
)

// TODO: Add heuristics
var prepareSteps = Vector.empty[PrepStepKind]
var prepareSteps = Vector.empty[PrepStep]

if (inputYesNo("Is the source file compressed", "", false)) {
val compression = inputChoice(
Expand All @@ -77,7 +78,7 @@ class AddInteractiveCommand(
Seq("zip", "gzip")
)

val subPathRegex = if (Seq("zip").contains(compression)) {
val subPath = if (Seq("zip").contains(compression)) {
inputOptional(
"Sub-path",
"If this archive can contain multiple files - specify the path regex to " +
Expand All @@ -87,29 +88,27 @@ class AddInteractiveCommand(
None
}

prepareSteps = prepareSteps :+ PrepStepKind.Decompress(
prepareSteps = prepareSteps :+ PrepStep.Decompress(
format = compression,
subPathRegex = subPathRegex
subPath = subPath
)
}

var format = inputChoice(
"Format",
"Specify which format is the source data in.",
Seq("csv", "tsv", "json", "geojson", "shapefile")
Seq("csv", "json", "geojson", "shapefile")
)

val readerOptions = scala.collection.mutable.Map.empty[String, String]

// TODO: Add heuristics
if (format == "tsv") {
format = "csv"
readerOptions.put("delimiter", "\\t")
}

if (format == "csv") {
if (inputYesNo("Is the first line a header", "", true))
readerOptions.put("header", "true")
val readStep = format match {
case "csv" =>
if (inputYesNo("Is the first line a header", "", true))
ReadStep.Csv(header = Some(true))
else
ReadStep.Csv()
case "json" => ReadStep.JsonLines()
case "geojson" => ReadStep.GeoJson()
case "shapefile" => ReadStep.EsriShapefile()
}

val mergeStrategy = inputChoice(
Expand All @@ -134,9 +133,9 @@ class AddInteractiveCommand(
"Names of the columns should be compared to determine if a row has changed. " +
"For example this can be a modification timestamp, an incremental version, " +
"or a data hash. If not specified all data columns will be compared one by one."
)(s => s.split(',').map(_.trim).toVector).getOrElse(Vector.empty)
)(s => s.split(',').map(_.trim).toVector)

MergeStrategyKind.Snapshot(
MergeStrategy.Snapshot(
primaryKey = primaryKey,
compareColumns = compareColumns
)
Expand All @@ -148,31 +147,30 @@ class AddInteractiveCommand(
"Which columns uniquely identify the record throughout its lifetime (comma-separated)."
)(s => s.split(',').map(_.trim).toVector)

MergeStrategyKind.Ledger(primaryKey = primaryKey)
MergeStrategy.Ledger(primaryKey = primaryKey)
case "append" =>
MergeStrategyKind.Append()
MergeStrategy.Append()
}

DatasetSnapshot(
id = id,
source = SourceKind.Root(
fetch = FetchSourceKind.Url(url = url),
prepare = prepareSteps,
read = ReaderKind
.Generic(name = format, options = readerOptions.toMap),
source = DatasetSource.Root(
fetch = FetchStep.Url(url = url),
prepare = if (prepareSteps.isEmpty) None else Some(prepareSteps),
read = readStep,
merge = mergeStrategy
)
)
case "derivative" =>
DatasetSnapshot(
id = id,
source = SourceKind.Derivative(
source = DatasetSource.Derivative(
inputs = Vector.empty,
transform = yaml.saveObj(
TransformKind.SparkSQL(
engine = "sparkSQL",
query = Some("SELECT * FROM input")
)
transform = yaml.load[ConfigObject](
"""
|engine: sparkSQL
|query: 'SELECT * FROM input'
|""".stripMargin
)
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class AssignWatermarkCommand(

val newBlock = metaChain.append(
MetadataBlock(
blockHash = "",
prevBlockHash = metaChain.getBlocks().last.blockHash,
systemTime = systemClock.instant(),
outputWatermark = Some(watermark)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class LogCommand(
println(renderProperty("Output.Watermark", w))
}

block.inputSlices.zipWithIndex.foreach {
block.inputSlices.getOrElse(Vector.empty).zipWithIndex.foreach {
case (s, i) =>
println(renderProperty(s"Input[$i].Records", s.numRecords))
println(renderProperty(s"Input[$i].Interval", s.interval.format()))
Expand All @@ -56,9 +56,9 @@ class LogCommand(
}

block.source.foreach {
case _: SourceKind.Root =>
case _: DatasetSource.Root =>
println(renderProperty("Source", "<Root source updated>"))
case _: SourceKind.Derivative =>
case _: DatasetSource.Derivative =>
println(renderProperty("Source", "<Derivative source updated>"))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
package dev.kamu.cli.external

object DockerImages {
val SPARK = "kamudata/engine-spark:0.7.0"
val FLINK = "kamudata/engine-flink:0.5.0"
val SPARK = "kamudata/engine-spark:0.8.0"
val FLINK = "kamudata/engine-flink:0.6.0"

val LIVY = SPARK
val JUPYTER = "kamudata/jupyter-uber:0.0.1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import java.nio.file.Path

import better.files.File
import dev.kamu.core.manifests.parsing.pureconfig.yaml
import dev.kamu.core.manifests.{Manifest, Resource}
import dev.kamu.core.manifests.Manifest
import org.apache.logging.log4j.LogManager
import pureconfig.{ConfigReader, ConfigWriter, Derivation}

Expand All @@ -21,7 +21,7 @@ case class ExecutionResult[TCheckpoint](
checkpoint: TCheckpoint
)

class CheckpointingExecutor[TCheckpoint <: Resource]()(
class CheckpointingExecutor[TCheckpoint]()(
implicit icr: Derivation[ConfigReader[TCheckpoint]],
icmr: Derivation[ConfigReader[Manifest[TCheckpoint]]],
icw: Derivation[ConfigWriter[TCheckpoint]],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,7 @@ import dev.kamu.cli.ingest.prep.{PrepCheckpoint, PrepStepFactory}
import dev.kamu.cli.metadata.{MetadataChain, MetadataRepository}
import dev.kamu.cli.transform.EngineFactory
import dev.kamu.core.manifests.infra.IngestRequest
import dev.kamu.core.manifests.{
DatasetID,
DatasetVocabulary,
MetadataBlock,
SourceKind
}
import dev.kamu.core.manifests._
import dev.kamu.core.utils.fs._
import dev.kamu.core.utils.Clock
import org.apache.commons.compress.compressors.bzip2.{
Expand Down Expand Up @@ -74,7 +69,7 @@ class IngestService(
.reverse
.flatMap(_.source)
.head
.asInstanceOf[SourceKind.Root]
.asInstanceOf[DatasetSource.Root]

val cachingBehavior = sourceFactory.getCachingBehavior(source.fetch)

Expand Down Expand Up @@ -123,7 +118,7 @@ class IngestService(
prepResult.checkpoint,
prepDataPath,
ingestCheckpointPath,
summary.vocabulary.getOrElse(DatasetVocabulary())
summary.vocab.getOrElse(DatasetVocabulary())
)

if (ingestResult.wasUpToDate) {
Expand Down Expand Up @@ -151,7 +146,7 @@ class IngestService(
}

def maybeDownload(
source: SourceKind.Root,
source: DatasetSource.Root,
externalSource: CacheableSource,
cachingBehavior: CachingBehavior,
downloadCheckpointPath: Path,
Expand Down Expand Up @@ -186,7 +181,7 @@ class IngestService(

// TODO: Avoid copying data if prepare step is a no-op
def maybePrepare(
source: SourceKind.Root,
source: DatasetSource.Root,
downloadDataPath: Path,
downloadCheckpoint: DownloadCheckpoint,
prepCheckpointPath: Path,
Expand All @@ -202,7 +197,9 @@ class IngestService(
checkpoint = storedCheckpoint.get
)
} else {
val prepStep = prepStepFactory.getComposedSteps(source.prepare)
val prepStep = prepStepFactory.getComposedSteps(
source.prepare.getOrElse(Vector.empty)
)
val convertStep = conversionStepFactory.getComposedSteps(source.read)

val inputStream = File(downloadDataPath).newInputStream
Expand Down Expand Up @@ -239,7 +236,7 @@ class IngestService(

def maybeIngest(
datasetID: DatasetID,
source: SourceKind.Root,
source: DatasetSource.Root,
prepCheckpoint: PrepCheckpoint,
prepDataPath: Path,
ingestCheckpointPath: Path,
Expand Down Expand Up @@ -278,7 +275,7 @@ class IngestService(

def ingest(
datasetID: DatasetID,
source: SourceKind.Root,
source: DatasetSource.Root,
eventTime: Option[Instant],
prepDataPath: Path,
vocabulary: DatasetVocabulary
Expand All @@ -296,7 +293,9 @@ class IngestService(
)

val engine =
engineFactory.getEngine(source.preprocessEngine.getOrElse("sparkSQL"))
engineFactory.getEngine(
source.preprocessPartial.map(_.engine).getOrElse("sparkSQL")
)

val result = engine.ingest(request)
result.block
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@

package dev.kamu.cli.ingest.convert

import dev.kamu.core.manifests.ReaderKind
import dev.kamu.core.manifests.ReadStep
import org.apache.logging.log4j.LogManager

class ConversionStepFactory {
val logger = LogManager.getLogger(getClass.getName)

def getStep(readerConfig: ReaderKind): ConversionStep = {
def getStep(readerConfig: ReadStep): ConversionStep = {
readerConfig match {
case _: ReaderKind.Geojson =>
case _: ReadStep.GeoJson =>
logger.debug(s"Pre-processing as GeoJSON")
new GeoJSONConverter()
case _ =>
Expand All @@ -25,7 +25,7 @@ class ConversionStepFactory {
}

def getComposedSteps(
readerConfig: ReaderKind
readerConfig: ReadStep
): ConversionStep = {
getStep(readerConfig)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ package dev.kamu.cli.ingest.convert

import java.time.Instant

import dev.kamu.core.manifests.{MetadataBlock, Resource}
import dev.kamu.core.manifests.MetadataBlock

case class IngestCheckpoint(
prepTimestamp: Instant,
lastIngested: Instant,
resultingBlock: MetadataBlock
) extends Resource
)
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,11 @@ package dev.kamu.cli.ingest.fetch

import java.time.Instant

import dev.kamu.core.manifests.Resource

case class DownloadCheckpoint(
eventTime: Option[Instant],
lastDownloaded: Instant,
lastModified: Option[Instant] = None,
eTag: Option[String] = None
) extends Resource {
) {
def isCacheable: Boolean = lastModified.isDefined || eTag.isDefined
}
Loading

0 comments on commit aafef73

Please sign in to comment.