From aafef7331e17717a7945ee2186bfc9d0f3f81386 Mon Sep 17 00:00:00 2001 From: Sergii Mikhtoniuk Date: Sun, 12 Jul 2020 15:36:30 -0700 Subject: [PATCH] Update to ODF resources --- CHANGELOG.md | 4 +- .../cli/commands/AddInteractiveCommand.scala | 60 ++++++++-------- .../cli/commands/AssignWatermarkCommand.scala | 1 + .../dev/kamu/cli/commands/LogCommand.scala | 6 +- .../dev/kamu/cli/external/DockerImages.scala | 4 +- .../cli/ingest/CheckpointingExecutor.scala | 4 +- .../dev/kamu/cli/ingest/IngestService.scala | 27 ++++---- .../convert/ConversionStepFactory.scala | 8 +-- .../cli/ingest/convert/IngestCheckpoint.scala | 4 +- .../cli/ingest/fetch/DownloadCheckpoint.scala | 4 +- .../kamu/cli/ingest/fetch/SourceFactory.scala | 68 ++++++++++--------- .../cli/ingest/prep/DecompressZIPStep.scala | 8 +-- .../kamu/cli/ingest/prep/PrepCheckpoint.scala | 4 +- .../cli/ingest/prep/PrepStepFactory.scala | 10 +-- .../metadata/GenericResourceRepository.scala | 4 +- .../dev/kamu/cli/metadata/MetadataChain.scala | 9 +-- .../cli/metadata/MetadataRepository.scala | 2 +- .../kamu/cli/metadata/ResourceLoader.scala | 10 +-- .../dev/kamu/cli/transform/EngineUtils.scala | 4 +- .../kamu/cli/transform/TransformService.scala | 25 +++---- .../scala/dev/kamu/cli/DatasetFactory.scala | 49 +++++++------ .../kamu/cli/ingest/IngestGeoJsonSpec.scala | 10 +-- .../cli/ingest/IngestMultiSourceSpec.scala | 18 ++--- core.manifests | 2 +- 24 files changed, 168 insertions(+), 177 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 821bfa348b..6bfc223669 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,9 +4,11 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [UNRELEASED] +## [0.21.0] - 2020-07-12 ### Fixed - Encoding issue in `DatasetSummary` manifest +### Changed +- Upgraded to use ODF resources ## [0.20.0] - 2020-06-30 ### Added diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/commands/AddInteractiveCommand.scala b/core.coordinator/src/main/scala/dev/kamu/cli/commands/AddInteractiveCommand.scala index 49aec83afc..eb25d1e696 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/commands/AddInteractiveCommand.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/commands/AddInteractiveCommand.scala @@ -10,6 +10,7 @@ package dev.kamu.cli.commands import java.nio.file.Paths +import com.typesafe.config.ConfigObject import pureconfig.generic.auto._ import dev.kamu.core.manifests.parsing.pureconfig.yaml import yaml.defaults._ @@ -68,7 +69,7 @@ class AddInteractiveCommand( ) // TODO: Add heuristics - var prepareSteps = Vector.empty[PrepStepKind] + var prepareSteps = Vector.empty[PrepStep] if (inputYesNo("Is the source file compressed", "", false)) { val compression = inputChoice( @@ -77,7 +78,7 @@ class AddInteractiveCommand( Seq("zip", "gzip") ) - val subPathRegex = if (Seq("zip").contains(compression)) { + val subPath = if (Seq("zip").contains(compression)) { inputOptional( "Sub-path", "If this archive can contain multiple files - specify the path regex to " + @@ -87,29 +88,27 @@ class AddInteractiveCommand( None } - prepareSteps = prepareSteps :+ PrepStepKind.Decompress( + prepareSteps = prepareSteps :+ PrepStep.Decompress( format = compression, - subPathRegex = subPathRegex + subPath = subPath ) } var format = inputChoice( "Format", "Specify which format is the source data in.", - Seq("csv", "tsv", "json", "geojson", "shapefile") + Seq("csv", "json", "geojson", "shapefile") ) - val readerOptions = scala.collection.mutable.Map.empty[String, String] - - // TODO: Add heuristics - if (format == "tsv") { - format = "csv" - readerOptions.put("delimiter", "\\t") - } - - if (format == "csv") { - if (inputYesNo("Is the first line a header", "", true)) - readerOptions.put("header", "true") + val readStep = format match { + case "csv" => + if (inputYesNo("Is the first line a header", "", true)) + ReadStep.Csv(header = Some(true)) + else + ReadStep.Csv() + case "json" => ReadStep.JsonLines() + case "geojson" => ReadStep.GeoJson() + case "shapefile" => ReadStep.EsriShapefile() } val mergeStrategy = inputChoice( @@ -134,9 +133,9 @@ class AddInteractiveCommand( "Names of the columns should be compared to determine if a row has changed. " + "For example this can be a modification timestamp, an incremental version, " + "or a data hash. If not specified all data columns will be compared one by one." - )(s => s.split(',').map(_.trim).toVector).getOrElse(Vector.empty) + )(s => s.split(',').map(_.trim).toVector) - MergeStrategyKind.Snapshot( + MergeStrategy.Snapshot( primaryKey = primaryKey, compareColumns = compareColumns ) @@ -148,31 +147,30 @@ class AddInteractiveCommand( "Which columns uniquely identify the record throughout its lifetime (comma-separated)." )(s => s.split(',').map(_.trim).toVector) - MergeStrategyKind.Ledger(primaryKey = primaryKey) + MergeStrategy.Ledger(primaryKey = primaryKey) case "append" => - MergeStrategyKind.Append() + MergeStrategy.Append() } DatasetSnapshot( id = id, - source = SourceKind.Root( - fetch = FetchSourceKind.Url(url = url), - prepare = prepareSteps, - read = ReaderKind - .Generic(name = format, options = readerOptions.toMap), + source = DatasetSource.Root( + fetch = FetchStep.Url(url = url), + prepare = if (prepareSteps.isEmpty) None else Some(prepareSteps), + read = readStep, merge = mergeStrategy ) ) case "derivative" => DatasetSnapshot( id = id, - source = SourceKind.Derivative( + source = DatasetSource.Derivative( inputs = Vector.empty, - transform = yaml.saveObj( - TransformKind.SparkSQL( - engine = "sparkSQL", - query = Some("SELECT * FROM input") - ) + transform = yaml.load[ConfigObject]( + """ + |engine: sparkSQL + |query: 'SELECT * FROM input' + |""".stripMargin ) ) ) diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/commands/AssignWatermarkCommand.scala b/core.coordinator/src/main/scala/dev/kamu/cli/commands/AssignWatermarkCommand.scala index 290fabce40..199209adb0 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/commands/AssignWatermarkCommand.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/commands/AssignWatermarkCommand.scala @@ -36,6 +36,7 @@ class AssignWatermarkCommand( val newBlock = metaChain.append( MetadataBlock( + blockHash = "", prevBlockHash = metaChain.getBlocks().last.blockHash, systemTime = systemClock.instant(), outputWatermark = Some(watermark) diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/commands/LogCommand.scala b/core.coordinator/src/main/scala/dev/kamu/cli/commands/LogCommand.scala index 01500841b4..7dd6ea42bd 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/commands/LogCommand.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/commands/LogCommand.scala @@ -47,7 +47,7 @@ class LogCommand( println(renderProperty("Output.Watermark", w)) } - block.inputSlices.zipWithIndex.foreach { + block.inputSlices.getOrElse(Vector.empty).zipWithIndex.foreach { case (s, i) => println(renderProperty(s"Input[$i].Records", s.numRecords)) println(renderProperty(s"Input[$i].Interval", s.interval.format())) @@ -56,9 +56,9 @@ class LogCommand( } block.source.foreach { - case _: SourceKind.Root => + case _: DatasetSource.Root => println(renderProperty("Source", "")) - case _: SourceKind.Derivative => + case _: DatasetSource.Derivative => println(renderProperty("Source", "")) } diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/external/DockerImages.scala b/core.coordinator/src/main/scala/dev/kamu/cli/external/DockerImages.scala index ea9c3e9a9d..f423fd8fbc 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/external/DockerImages.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/external/DockerImages.scala @@ -9,8 +9,8 @@ package dev.kamu.cli.external object DockerImages { - val SPARK = "kamudata/engine-spark:0.7.0" - val FLINK = "kamudata/engine-flink:0.5.0" + val SPARK = "kamudata/engine-spark:0.8.0" + val FLINK = "kamudata/engine-flink:0.6.0" val LIVY = SPARK val JUPYTER = "kamudata/jupyter-uber:0.0.1" diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/ingest/CheckpointingExecutor.scala b/core.coordinator/src/main/scala/dev/kamu/cli/ingest/CheckpointingExecutor.scala index 8cb7d693ca..e8563327c9 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/ingest/CheckpointingExecutor.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/ingest/CheckpointingExecutor.scala @@ -12,7 +12,7 @@ import java.nio.file.Path import better.files.File import dev.kamu.core.manifests.parsing.pureconfig.yaml -import dev.kamu.core.manifests.{Manifest, Resource} +import dev.kamu.core.manifests.Manifest import org.apache.logging.log4j.LogManager import pureconfig.{ConfigReader, ConfigWriter, Derivation} @@ -21,7 +21,7 @@ case class ExecutionResult[TCheckpoint]( checkpoint: TCheckpoint ) -class CheckpointingExecutor[TCheckpoint <: Resource]()( +class CheckpointingExecutor[TCheckpoint]()( implicit icr: Derivation[ConfigReader[TCheckpoint]], icmr: Derivation[ConfigReader[Manifest[TCheckpoint]]], icw: Derivation[ConfigWriter[TCheckpoint]], diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/ingest/IngestService.scala b/core.coordinator/src/main/scala/dev/kamu/cli/ingest/IngestService.scala index 23c7dcb062..3f9ee7c287 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/ingest/IngestService.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/ingest/IngestService.scala @@ -24,12 +24,7 @@ import dev.kamu.cli.ingest.prep.{PrepCheckpoint, PrepStepFactory} import dev.kamu.cli.metadata.{MetadataChain, MetadataRepository} import dev.kamu.cli.transform.EngineFactory import dev.kamu.core.manifests.infra.IngestRequest -import dev.kamu.core.manifests.{ - DatasetID, - DatasetVocabulary, - MetadataBlock, - SourceKind -} +import dev.kamu.core.manifests._ import dev.kamu.core.utils.fs._ import dev.kamu.core.utils.Clock import org.apache.commons.compress.compressors.bzip2.{ @@ -74,7 +69,7 @@ class IngestService( .reverse .flatMap(_.source) .head - .asInstanceOf[SourceKind.Root] + .asInstanceOf[DatasetSource.Root] val cachingBehavior = sourceFactory.getCachingBehavior(source.fetch) @@ -123,7 +118,7 @@ class IngestService( prepResult.checkpoint, prepDataPath, ingestCheckpointPath, - summary.vocabulary.getOrElse(DatasetVocabulary()) + summary.vocab.getOrElse(DatasetVocabulary()) ) if (ingestResult.wasUpToDate) { @@ -151,7 +146,7 @@ class IngestService( } def maybeDownload( - source: SourceKind.Root, + source: DatasetSource.Root, externalSource: CacheableSource, cachingBehavior: CachingBehavior, downloadCheckpointPath: Path, @@ -186,7 +181,7 @@ class IngestService( // TODO: Avoid copying data if prepare step is a no-op def maybePrepare( - source: SourceKind.Root, + source: DatasetSource.Root, downloadDataPath: Path, downloadCheckpoint: DownloadCheckpoint, prepCheckpointPath: Path, @@ -202,7 +197,9 @@ class IngestService( checkpoint = storedCheckpoint.get ) } else { - val prepStep = prepStepFactory.getComposedSteps(source.prepare) + val prepStep = prepStepFactory.getComposedSteps( + source.prepare.getOrElse(Vector.empty) + ) val convertStep = conversionStepFactory.getComposedSteps(source.read) val inputStream = File(downloadDataPath).newInputStream @@ -239,7 +236,7 @@ class IngestService( def maybeIngest( datasetID: DatasetID, - source: SourceKind.Root, + source: DatasetSource.Root, prepCheckpoint: PrepCheckpoint, prepDataPath: Path, ingestCheckpointPath: Path, @@ -278,7 +275,7 @@ class IngestService( def ingest( datasetID: DatasetID, - source: SourceKind.Root, + source: DatasetSource.Root, eventTime: Option[Instant], prepDataPath: Path, vocabulary: DatasetVocabulary @@ -296,7 +293,9 @@ class IngestService( ) val engine = - engineFactory.getEngine(source.preprocessEngine.getOrElse("sparkSQL")) + engineFactory.getEngine( + source.preprocessPartial.map(_.engine).getOrElse("sparkSQL") + ) val result = engine.ingest(request) result.block diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/ingest/convert/ConversionStepFactory.scala b/core.coordinator/src/main/scala/dev/kamu/cli/ingest/convert/ConversionStepFactory.scala index f7680bd8a0..8e3055bae1 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/ingest/convert/ConversionStepFactory.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/ingest/convert/ConversionStepFactory.scala @@ -8,15 +8,15 @@ package dev.kamu.cli.ingest.convert -import dev.kamu.core.manifests.ReaderKind +import dev.kamu.core.manifests.ReadStep import org.apache.logging.log4j.LogManager class ConversionStepFactory { val logger = LogManager.getLogger(getClass.getName) - def getStep(readerConfig: ReaderKind): ConversionStep = { + def getStep(readerConfig: ReadStep): ConversionStep = { readerConfig match { - case _: ReaderKind.Geojson => + case _: ReadStep.GeoJson => logger.debug(s"Pre-processing as GeoJSON") new GeoJSONConverter() case _ => @@ -25,7 +25,7 @@ class ConversionStepFactory { } def getComposedSteps( - readerConfig: ReaderKind + readerConfig: ReadStep ): ConversionStep = { getStep(readerConfig) } diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/ingest/convert/IngestCheckpoint.scala b/core.coordinator/src/main/scala/dev/kamu/cli/ingest/convert/IngestCheckpoint.scala index 434784e268..5a1b8d18c9 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/ingest/convert/IngestCheckpoint.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/ingest/convert/IngestCheckpoint.scala @@ -10,10 +10,10 @@ package dev.kamu.cli.ingest.convert import java.time.Instant -import dev.kamu.core.manifests.{MetadataBlock, Resource} +import dev.kamu.core.manifests.MetadataBlock case class IngestCheckpoint( prepTimestamp: Instant, lastIngested: Instant, resultingBlock: MetadataBlock -) extends Resource +) diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/ingest/fetch/DownloadCheckpoint.scala b/core.coordinator/src/main/scala/dev/kamu/cli/ingest/fetch/DownloadCheckpoint.scala index f35a1d6367..ab766a3716 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/ingest/fetch/DownloadCheckpoint.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/ingest/fetch/DownloadCheckpoint.scala @@ -10,13 +10,11 @@ package dev.kamu.cli.ingest.fetch import java.time.Instant -import dev.kamu.core.manifests.Resource - case class DownloadCheckpoint( eventTime: Option[Instant], lastDownloaded: Instant, lastModified: Option[Instant] = None, eTag: Option[String] = None -) extends Resource { +) { def isCacheable: Boolean = lastModified.isDefined || eTag.isDefined } diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/ingest/fetch/SourceFactory.scala b/core.coordinator/src/main/scala/dev/kamu/cli/ingest/fetch/SourceFactory.scala index 1d7cc6dfd0..82c81e935e 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/ingest/fetch/SourceFactory.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/ingest/fetch/SourceFactory.scala @@ -11,40 +11,43 @@ package dev.kamu.cli.ingest.fetch import java.nio.file.Paths import better.files.File -import dev.kamu.core.manifests.{ - CachingKind, - EventTimeKind, - FetchSourceKind, - OrderingKind -} +import dev.kamu.core.manifests +import dev.kamu.core.manifests._ import dev.kamu.core.utils.Clock import org.apache.logging.log4j.LogManager class SourceFactory(systemClock: Clock) { private val logger = LogManager.getLogger(getClass.getName) - def getSource(kind: FetchSourceKind): Seq[CacheableSource] = { - val eventTimeSource = kind.eventTime match { + def getSource(fetch: FetchStep): Seq[CacheableSource] = { + fetch match { + case url: FetchStep.Url => + getFetchUrlSource(url, getEventTimeSource(url.eventTime)) + case glob: FetchStep.FilesGlob => + getFetchFilesGlobSource(glob, getEventTimeSource(glob.eventTime)) + } + } + + def getEventTimeSource( + eventTimeSource: Option[manifests.EventTimeSource] + ): EventTimeSource = { + eventTimeSource match { case None => new EventTimeSource.FromSystemTime(systemClock) - case Some(e: EventTimeKind.FromPath) => - new EventTimeSource.FromPath(e.pattern, e.timestampFormat) + case Some(e: manifests.EventTimeSource.FromPath) => + new EventTimeSource.FromPath( + e.pattern, + e.timestampFormat.getOrElse("yyyy-MM-dd") + ) case _ => throw new NotImplementedError( - s"Unsupported event time source: ${kind.eventTime}" + s"Unsupported event time source: ${eventTimeSource}" ) } - - kind match { - case fetch: FetchSourceKind.Url => - getFetchUrlSource(fetch, eventTimeSource) - case glob: FetchSourceKind.FilesGlob => - getFetchFilesGlobSource(glob, eventTimeSource) - } } def getFetchUrlSource( - kind: FetchSourceKind.Url, + kind: FetchStep.Url, eventTimeSource: EventTimeSource ): Seq[CacheableSource] = { kind.url.getScheme match { @@ -82,11 +85,12 @@ class SourceFactory(systemClock: Clock) { } def getFetchFilesGlobSource( - kind: FetchSourceKind.FilesGlob, + kind: FetchStep.FilesGlob, eventTimeSource: EventTimeSource ): Seq[CacheableSource] = { - val globbed = File(kind.path.getParent) - .glob(kind.path.getFileName.toString) + val path = Paths.get(kind.path) + val globbed = File(path.getParent) + .glob(path.getFileName.toString) .map( f => new FileSystemSource( @@ -98,15 +102,15 @@ class SourceFactory(systemClock: Clock) { ) .toVector - val orderBy = kind.orderBy.getOrElse( - if (kind.eventTime.isDefined) OrderingKind.ByMetadataEventTime - else OrderingKind.ByName + val orderBy = kind.order.getOrElse( + if (kind.eventTime.isDefined) SourceOrdering.ByEventTime() + else SourceOrdering.ByName() ) val sorted = orderBy match { - case OrderingKind.ByName => + case SourceOrdering.ByName() => globbed.sortBy(_.path.getFileName.toString.toLowerCase()) - case OrderingKind.ByMetadataEventTime => + case SourceOrdering.ByEventTime() => globbed.sortBy(eventTimeSource.getEventTime) } @@ -117,14 +121,14 @@ class SourceFactory(systemClock: Clock) { sorted } - def getCachingBehavior(kind: FetchSourceKind): CachingBehavior = { + def getCachingBehavior(kind: FetchStep): CachingBehavior = { val cacheSettings = kind match { - case url: FetchSourceKind.Url => url.cache - case glob: FetchSourceKind.FilesGlob => glob.cache + case url: FetchStep.Url => url.cache + case glob: FetchStep.FilesGlob => glob.cache } cacheSettings match { - case None => new CachingBehaviorDefault() - case Some(_: CachingKind.Forever) => new CachingBehaviorForever() + case None => new CachingBehaviorDefault() + case Some(_: SourceCaching.Forever) => new CachingBehaviorForever() } } } diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/ingest/prep/DecompressZIPStep.scala b/core.coordinator/src/main/scala/dev/kamu/cli/ingest/prep/DecompressZIPStep.scala index 2ba750397b..cc7204712e 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/ingest/prep/DecompressZIPStep.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/ingest/prep/DecompressZIPStep.scala @@ -12,12 +12,12 @@ import java.io.InputStream import java.util.regex.Pattern import dev.kamu.core.utils.ZipEntryStream -import dev.kamu.core.manifests.PrepStepKind +import dev.kamu.core.manifests.{PrepStep => PrepStepCfg} -class DecompressZIPStep(config: PrepStepKind.Decompress) extends PrepStep { +class DecompressZIPStep(config: PrepStepCfg.Decompress) extends PrepStep { override def prepare(inputStream: InputStream): InputStream = { - val subPathRegex = config.subPathRegex.orElse( - config.subPath.map(p => Pattern.quote(p.toString)) + val subPathRegex = config.subPath.orElse( + config.subPath.map(p => Pattern.quote(p)) ) val stream = subPathRegex diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/ingest/prep/PrepCheckpoint.scala b/core.coordinator/src/main/scala/dev/kamu/cli/ingest/prep/PrepCheckpoint.scala index 5d25b0e03c..b72869caba 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/ingest/prep/PrepCheckpoint.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/ingest/prep/PrepCheckpoint.scala @@ -10,10 +10,8 @@ package dev.kamu.cli.ingest.prep import java.time.Instant -import dev.kamu.core.manifests.Resource - case class PrepCheckpoint( downloadTimestamp: Instant, eventTime: Option[Instant], lastPrepared: Instant -) extends Resource +) diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/ingest/prep/PrepStepFactory.scala b/core.coordinator/src/main/scala/dev/kamu/cli/ingest/prep/PrepStepFactory.scala index 772d3c4cd6..304ff91057 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/ingest/prep/PrepStepFactory.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/ingest/prep/PrepStepFactory.scala @@ -8,17 +8,17 @@ package dev.kamu.cli.ingest.prep -import dev.kamu.core.manifests.PrepStepKind +import dev.kamu.core.manifests.{PrepStep => PrepStepCfg} import org.apache.logging.log4j.LogManager class PrepStepFactory() { private val logger = LogManager.getLogger(getClass.getName) def getStep( - config: PrepStepKind + config: PrepStepCfg ): PrepStep = { config match { - case dc: PrepStepKind.Decompress => + case dc: PrepStepCfg.Decompress => dc.format.toLowerCase match { case "gzip" => logger.debug("Extracting gzip") @@ -31,7 +31,7 @@ class PrepStepFactory() { s"Unknown compression format: ${dc.format}" ) } - case pipe: PrepStepKind.Pipe => + case pipe: PrepStepCfg.Pipe => new ProcessPipeStep(pipe.command) case _ => throw new NotImplementedError(s"Unknown prep step: $config") @@ -39,7 +39,7 @@ class PrepStepFactory() { } def getComposedSteps( - configs: Seq[PrepStepKind] + configs: Seq[PrepStepCfg] ): PrepStep = { new CompositePrepStep(configs.map(getStep).toVector) } diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/metadata/GenericResourceRepository.scala b/core.coordinator/src/main/scala/dev/kamu/cli/metadata/GenericResourceRepository.scala index 9584237c00..8c3bf67495 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/metadata/GenericResourceRepository.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/metadata/GenericResourceRepository.scala @@ -13,14 +13,14 @@ import java.nio.file.{Path, Paths} import better.files.File import dev.kamu.core.manifests.parsing.pureconfig.yaml -import dev.kamu.core.manifests.{DatasetID, Manifest, Resource} +import dev.kamu.core.manifests.{DatasetID, Manifest} import dev.kamu.core.utils.fs._ import org.apache.logging.log4j.LogManager import pureconfig.{ConfigReader, ConfigWriter, Derivation} import scala.reflect.ClassTag -class GenericResourceRepository[TRes <: Resource: ClassTag, TID]( +class GenericResourceRepository[TRes: ClassTag, TID]( storagePath: Path, resourceKind: String, idFromString: String => TID, diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/metadata/MetadataChain.scala b/core.coordinator/src/main/scala/dev/kamu/cli/metadata/MetadataChain.scala index 87b7b23225..c571e0e8e3 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/metadata/MetadataChain.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/metadata/MetadataChain.scala @@ -26,6 +26,7 @@ class MetadataChain(datasetDir: Path) { def init(ds: DatasetSnapshot, systemTime: Instant): Unit = { val initialBlock = MetadataBlock( + blockHash = "", prevBlockHash = "", systemTime = systemTime, source = Some(ds.source) @@ -35,7 +36,7 @@ class MetadataChain(datasetDir: Path) { id = ds.id, kind = ds.kind, datasetDependencies = ds.dependsOn.toSet, - vocabulary = ds.vocabulary, + vocab = ds.vocab, lastPulled = None, numRecords = 0, dataSize = 0 @@ -81,7 +82,7 @@ class MetadataChain(datasetDir: Path) { DatasetSnapshot( id = summary.id, source = source, - vocabulary = summary.vocabulary + vocab = summary.vocab ) } @@ -116,13 +117,13 @@ class MetadataChain(datasetDir: Path) { // Helpers ///////////////////////////////////////////////////////////////////////////// - protected def saveResource[T <: Resource: ClassTag](obj: T, path: Path)( + protected def saveResource[T: ClassTag](obj: T, path: Path)( implicit derivation: Derivation[ConfigWriter[Manifest[T]]] ): Unit = { yaml.save(Manifest(obj), path) } - protected def loadResource[T <: Resource: ClassTag](path: Path)( + protected def loadResource[T: ClassTag](path: Path)( implicit derivation: Derivation[ConfigReader[Manifest[T]]] ): T = { yaml.load[Manifest[T]](path).content diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/metadata/MetadataRepository.scala b/core.coordinator/src/main/scala/dev/kamu/cli/metadata/MetadataRepository.scala index 0374d83697..0afe0b9c96 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/metadata/MetadataRepository.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/metadata/MetadataRepository.scala @@ -100,7 +100,7 @@ class MetadataRepository( } def getDatasetVocabulary(id: DatasetID): DatasetVocabulary = { - getDatasetSummary(id).vocabulary.getOrElse(DatasetVocabulary()) + getDatasetSummary(id).vocab.getOrElse(DatasetVocabulary()) } def getDatasetRef(id: DatasetID): DatasetRef = { diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/metadata/ResourceLoader.scala b/core.coordinator/src/main/scala/dev/kamu/cli/metadata/ResourceLoader.scala index be746b404f..eaff0a2888 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/metadata/ResourceLoader.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/metadata/ResourceLoader.scala @@ -21,7 +21,7 @@ import java.nio.file.{Path, Paths} import dev.kamu.core.manifests.parsing.pureconfig.yaml import yaml.defaults._ -import dev.kamu.core.manifests.{Manifest, Resource} +import dev.kamu.core.manifests.Manifest import org.apache.logging.log4j.LogManager import pureconfig.{ConfigReader, ConfigWriter, Derivation} @@ -31,19 +31,19 @@ import scala.reflect.ClassTag class ResourceLoader() { private val logger = LogManager.getLogger(getClass.getName) - def loadResourceFromFile[T <: Resource: ClassTag](p: Path)( + def loadResourceFromFile[T: ClassTag](p: Path)( implicit reader: Derivation[ConfigReader[Manifest[T]]] ): T = { yaml.load[Manifest[T]](p).content } - def saveResourceToFile[T <: Resource: ClassTag](res: T, path: Path)( + def saveResourceToFile[T: ClassTag](res: T, path: Path)( implicit writer: Derivation[ConfigWriter[Manifest[T]]] ): Unit = { yaml.save(Manifest(res), path) } - def loadResourceFromURI[T <: Resource: ClassTag]( + def loadResourceFromURI[T: ClassTag]( uri: URI )( implicit reader: Derivation[ConfigReader[Manifest[T]]] @@ -57,7 +57,7 @@ class ResourceLoader() { } } - private def loadResourceFromURL[T <: Resource: ClassTag]( + private def loadResourceFromURL[T: ClassTag]( url: java.net.URL )( implicit reader: Derivation[ConfigReader[Manifest[T]]] diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/transform/EngineUtils.scala b/core.coordinator/src/main/scala/dev/kamu/cli/transform/EngineUtils.scala index 28a09ee07b..e71621dfff 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/transform/EngineUtils.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/transform/EngineUtils.scala @@ -33,12 +33,10 @@ trait EngineUtils { protected def toContainerPath(ps: String, volumePath: Path): String = { val p = Paths.get(ps).normalize().toAbsolutePath val rel = volumePath.relativize(p) - val x = if (!OS.isWindows) { + if (!OS.isWindows) { Paths.get(volumeDirInContainer).resolve(rel).toString } else { volumeDirInContainer + "/" + rel.toString.replace("\\", "/") } - println(s"Mapped path $ps to $x") - x } } diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/transform/TransformService.scala b/core.coordinator/src/main/scala/dev/kamu/cli/transform/TransformService.scala index e6d7741cac..aa648e66ad 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/transform/TransformService.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/transform/TransformService.scala @@ -11,12 +11,7 @@ package dev.kamu.cli.transform import better.files.File import dev.kamu.cli.metadata.{MetadataChain, MetadataRepository} import dev.kamu.core.manifests._ -import dev.kamu.core.manifests.infra.{ - ExecuteQueryRequest, - ExecuteQueryResult, - InputDataSlice, - Watermark -} +import dev.kamu.core.manifests.infra._ import dev.kamu.core.utils.Clock import org.apache.commons.io.FileUtils import org.apache.logging.log4j.LogManager @@ -24,7 +19,7 @@ import spire.math.Interval import spire.math.interval.{Closed, Unbound, ValueBound} case class TransformBatch( - source: SourceKind.Derivative, + source: DatasetSource.Derivative, inputSlices: Map[DatasetID, InputDataSlice] ) { def isEmpty: Boolean = { @@ -69,7 +64,7 @@ class TransformService( datasetID: DatasetID, batch: TransformBatch ): ExecuteQueryResult = { - val allDatasets = batch.source.inputs.map(_.id) :+ datasetID + val allDatasets = batch.source.inputs :+ datasetID val request = ExecuteQueryRequest( datasetID = datasetID, @@ -95,7 +90,7 @@ class TransformService( metadataRepository.getDatasetLayout(datasetID).checkpointsDir.toString ) - val engine = engineFactory.getEngine(batch.source.transformEngine) + val engine = engineFactory.getEngine(batch.source.transformPartial.engine) engine.executeQuery(request) } @@ -143,15 +138,15 @@ class TransformService( if (sources.length > 1) throw new RuntimeException("Transform evolution is not yet supported") - val source = sources.head.asInstanceOf[SourceKind.Derivative] + val source = sources.head.asInstanceOf[DatasetSource.Derivative] val inputSlices = source.inputs.zipWithIndex.map { - case (input, index) => - val inputMetaChain = metadataRepository.getMetadataChain(input.id) + case (inputID, index) => + val inputMetaChain = metadataRepository.getMetadataChain(inputID) ( - input.id, + inputID, getInputSlice( - input.id, + inputID, index, inputMetaChain, outputMetaChain @@ -174,7 +169,7 @@ class TransformService( .getBlocks() .reverse .filter(_.inputSlices.nonEmpty) - .map(_.inputSlices(inputIndex)) + .map(_.inputSlices.get(inputIndex)) .find(_.interval.nonEmpty) .map(_.interval) .getOrElse(Interval.empty) diff --git a/core.coordinator/src/test/scala/dev/kamu/cli/DatasetFactory.scala b/core.coordinator/src/test/scala/dev/kamu/cli/DatasetFactory.scala index 557cc91390..5a5fb66043 100644 --- a/core.coordinator/src/test/scala/dev/kamu/cli/DatasetFactory.scala +++ b/core.coordinator/src/test/scala/dev/kamu/cli/DatasetFactory.scala @@ -10,6 +10,7 @@ package dev.kamu.cli import java.net.URI +import com.typesafe.config.ConfigObject import pureconfig.generic.auto._ import dev.kamu.core.manifests._ import dev.kamu.core.manifests.parsing.pureconfig.yaml @@ -18,13 +19,13 @@ import dev.kamu.core.manifests.parsing.pureconfig.yaml.defaults._ import scala.util.Random object DatasetFactory { - val _schemes = Array( + private val _schemes = Array( "http", "https", "ftp" ) - val _topLevelDomains = Array( + private val _topLevelDomains = Array( "com", "org", "net", @@ -32,19 +33,19 @@ object DatasetFactory { "gov" ) - val _organizations = (0 to 100) + private val _organizations = (0 to 100) .map(_ => Random.nextInt(10) + 3) .map(len => Random.alphanumeric.take(len).mkString.toLowerCase) .toArray - val _subdomains = Array( + private val _subdomains = Array( "api", "catalog", "data", "portal" ) - val _extensions = Array( + private val _extensions = Array( "zip", "tar.gz", "gz", @@ -57,22 +58,21 @@ object DatasetFactory { url: Option[URI] = None, format: Option[String] = None, header: Boolean = false, - mergeStrategy: Option[MergeStrategyKind] = None, + mergeStrategy: Option[MergeStrategy] = None, schema: Seq[String] = Seq.empty ): DatasetSnapshot = { val _id = id.getOrElse(newDatasetID()) DatasetSnapshot( id = _id, - source = SourceKind.Root( - fetch = FetchSourceKind.Url(url.getOrElse(newURL(_id))), - read = ReaderKind.Generic( - name = format.getOrElse("csv"), - options = if (!header) Map.empty else Map("header" -> "true"), - schema = schema.toVector + source = DatasetSource.Root( + fetch = FetchStep.Url(url.getOrElse(newURL(_id))), + read = ReadStep.Csv( + header = Some(header), + schema = Some(schema.toVector) ), - merge = mergeStrategy.getOrElse(MergeStrategyKind.Append()) + merge = mergeStrategy.getOrElse(MergeStrategy.Append()) ) - ).postLoad() + ) } def newDerivativeDataset( @@ -81,22 +81,19 @@ object DatasetFactory { sql: Option[String] = None ): DatasetSnapshot = { val _id = id.getOrElse(newDatasetID()) + val _sql = sql.getOrElse(s"SELECT * FROM `$source`") DatasetSnapshot( id = _id, - source = SourceKind.Derivative( - inputs = Vector( - SourceKind.Derivative.Input( - id = source - ) - ), - transform = yaml.saveObj( - TransformKind.SparkSQL( - engine = "sparkSQL", - query = Some(sql.getOrElse(s"SELECT * FROM `$source`")) - ) + source = DatasetSource.Derivative( + inputs = Vector(source), + transform = yaml.load[ConfigObject]( + s""" + |engine: sparkSQL + |query: '${_sql}' + """.stripMargin ) ) - ).postLoad() + ) } def newDatasetID(): DatasetID = { diff --git a/core.coordinator/src/test/scala/dev/kamu/cli/ingest/IngestGeoJsonSpec.scala b/core.coordinator/src/test/scala/dev/kamu/cli/ingest/IngestGeoJsonSpec.scala index 1b5bd48f75..d67b3ef49b 100644 --- a/core.coordinator/src/test/scala/dev/kamu/cli/ingest/IngestGeoJsonSpec.scala +++ b/core.coordinator/src/test/scala/dev/kamu/cli/ingest/IngestGeoJsonSpec.scala @@ -71,12 +71,12 @@ class IngestGeoJsonSpec extends FlatSpec with Matchers with KamuTestBase { val ds = DatasetSnapshot( id = DatasetFactory.newDatasetID(), - source = SourceKind.Root( - fetch = FetchSourceKind.Url(inputPath.toUri), - read = ReaderKind.Geojson(), - merge = MergeStrategyKind.Snapshot(primaryKey = Vector("id")) + source = DatasetSource.Root( + fetch = FetchStep.Url(inputPath.toUri), + read = ReadStep.GeoJson(), + merge = MergeStrategy.Snapshot(primaryKey = Vector("id")) ) - ).postLoad() + ) kamu.addDataset(ds) kamu.run("pull", ds.id.toString) diff --git a/core.coordinator/src/test/scala/dev/kamu/cli/ingest/IngestMultiSourceSpec.scala b/core.coordinator/src/test/scala/dev/kamu/cli/ingest/IngestMultiSourceSpec.scala index e052c2a599..59466dc242 100644 --- a/core.coordinator/src/test/scala/dev/kamu/cli/ingest/IngestMultiSourceSpec.scala +++ b/core.coordinator/src/test/scala/dev/kamu/cli/ingest/IngestMultiSourceSpec.scala @@ -36,22 +36,22 @@ class IngestMultiSourceSpec extends FlatSpec with Matchers with KamuTestBase { val ds = DatasetSnapshot( id = DatasetFactory.newDatasetID(), - source = SourceKind.Root( - fetch = FetchSourceKind.FilesGlob( - path = globPath, + source = DatasetSource.Root( + fetch = FetchStep.FilesGlob( + path = globPath.toString, eventTime = Some( - EventTimeKind.FromPath( + EventTimeSource.FromPath( pattern = """balances-(\d+-\d+-\d+)\.csv""", - timestampFormat = "yyyy-MM-dd" + timestampFormat = Some("yyyy-MM-dd") ) ) ), - read = ReaderKind.Csv( - schema = Vector("id INT", "name STRING", "balance INT") + read = ReadStep.Csv( + schema = Some(Vector("id INT", "name STRING", "balance INT")) ), - merge = MergeStrategyKind.Snapshot(primaryKey = Vector("id")) + merge = MergeStrategy.Snapshot(primaryKey = Vector("id")) ) - ).postLoad() + ) kamu.addDataset(ds) kamu.run("pull", ds.id.toString) diff --git a/core.manifests b/core.manifests index a056d8beea..8dd114fa2d 160000 --- a/core.manifests +++ b/core.manifests @@ -1 +1 @@ -Subproject commit a056d8beea7da25ef8f3320746862c8ccc00cc77 +Subproject commit 8dd114fa2ddb8ffd9567182c937135ce8f0ba8ac