From a9f10326cc782eda65d00aaded2ecd9651a11643 Mon Sep 17 00:00:00 2001 From: Sergii Mikhtoniuk Date: Sat, 9 May 2020 19:38:54 -0700 Subject: [PATCH] Move meta chain from manifests --- CHANGELOG.md | 5 + .../main/scala/dev/kamu/cli/CliParser.scala | 5 + .../src/main/scala/dev/kamu/cli/Kamu.scala | 3 +- .../dev/kamu/cli/commands/PurgeCommand.scala | 58 ++++--- .../dev/kamu/cli/external/DockerImages.scala | 2 +- .../dev/kamu/cli/ingest/IngestService.scala | 6 +- .../dev/kamu/cli/metadata/MetadataChain.scala | 157 ++++++++++++++++++ .../cli/metadata/MetadataRepository.scala | 46 +++-- .../kamu/cli/transform/TransformService.scala | 17 +- core.manifests | 2 +- 10 files changed, 252 insertions(+), 49 deletions(-) create mode 100644 core.coordinator/src/main/scala/dev/kamu/cli/metadata/MetadataChain.scala diff --git a/CHANGELOG.md b/CHANGELOG.md index fedbd40440..a68cf7b3f2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.15.0] - 2020-05-09 +### Added +- `purge` command now supports `--recursive` flag +- Internal improvements and refactoring + ## [0.14.0] - 2020-05-03 ### Changed - Consolidating more logic into `engine.spark` diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/CliParser.scala b/core.coordinator/src/main/scala/dev/kamu/cli/CliParser.scala index f837cdb228..7fcd4c1d5e 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/CliParser.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/CliParser.scala @@ -265,6 +265,11 @@ class CliArgs(arguments: Seq[String]) extends ScallopConf(arguments) { descr = "Purge all datasets" ) + val recursive = opt[Boolean]( + "recursive", + descr = "Also purge all known dependent datasets" + ) + val ids = trailArg[List[String]]( "ids", required = false, diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/Kamu.scala b/core.coordinator/src/main/scala/dev/kamu/cli/Kamu.scala index d0fd338a59..3d408db500 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/Kamu.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/Kamu.scala @@ -87,7 +87,8 @@ class Kamu( new PurgeCommand( metadataRepository, c.purge.ids(), - c.purge.all() + c.purge.all(), + c.purge.recursive() ) case List(c.delete) => new DeleteCommand( diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/commands/PurgeCommand.scala b/core.coordinator/src/main/scala/dev/kamu/cli/commands/PurgeCommand.scala index d09a20d137..059d6a58e3 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/commands/PurgeCommand.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/commands/PurgeCommand.scala @@ -8,38 +8,50 @@ package dev.kamu.cli.commands -import dev.kamu.cli.metadata.{DoesNotExistException, MetadataRepository} +import dev.kamu.cli.UsageException +import dev.kamu.cli.metadata._ import dev.kamu.core.manifests.DatasetID import org.apache.log4j.LogManager class PurgeCommand( metadataRepository: MetadataRepository, ids: Seq[String], - all: Boolean + all: Boolean, + recursive: Boolean ) extends Command { private val logger = LogManager.getLogger(getClass.getName) override def run(): Unit = { - val toPurge = - if (all) - metadataRepository.getAllDatasets() - else - ids.map(DatasetID) - - val numPurged = toPurge - .map(id => { - try { - logger.info(s"Purging dataset: ${id.toString}") - metadataRepository.purgeDataset(id) - 1 - } catch { - case e: DoesNotExistException => - logger.error(e.getMessage) - 0 - } - }) - .sum - - logger.info(s"Purged $numPurged datasets") + val datasetIDs = { + if (all) metadataRepository.getAllDatasets() + else ids.map(DatasetID) + } + + val plan = try { + metadataRepository + .getDatasetsInReverseDependencyOrder( + datasetIDs, + recursive || all // All implies recursive, which is more efficient + ) + } catch { + case e: DoesNotExistException => + throw new UsageException(e.getMessage) + } + + val ghostSnapshots = plan.map(id => { + logger.info(s"Purging dataset: ${id.toString}") + val snapshot = metadataRepository.getMetadataChain(id).getSnapshot() + try { + metadataRepository.deleteDataset(id) + } catch { + case e: DanglingReferenceException => + throw new UsageException(e.getMessage) + } + snapshot + }) + + ghostSnapshots.reverse.foreach(metadataRepository.addDataset) + + logger.info(s"Purged ${ghostSnapshots.size} datasets") } } diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/external/DockerImages.scala b/core.coordinator/src/main/scala/dev/kamu/cli/external/DockerImages.scala index e9ec8774a9..de32849aa2 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/external/DockerImages.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/external/DockerImages.scala @@ -9,7 +9,7 @@ package dev.kamu.cli.external object DockerImages { - val SPARK = "kamudata/engine-spark:0.1.0" + val SPARK = "kamudata/engine-spark:0.2.0" val LIVY = SPARK val JUPYTER = "kamudata/jupyter-uber:0.0.1" diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/ingest/IngestService.scala b/core.coordinator/src/main/scala/dev/kamu/cli/ingest/IngestService.scala index f1cc7d2864..c97d71c057 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/ingest/IngestService.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/ingest/IngestService.scala @@ -19,9 +19,9 @@ import dev.kamu.cli.ingest.fetch.{ SourceFactory } import dev.kamu.cli.ingest.prep.{PrepCheckpoint, PrepStepFactory} -import dev.kamu.cli.metadata.MetadataRepository +import dev.kamu.cli.metadata.{MetadataChain, MetadataRepository} import dev.kamu.cli.transform.EngineFactory -import dev.kamu.core.manifests.infra.{IngestConfig, IngestTask, MetadataChainFS} +import dev.kamu.core.manifests.infra.{IngestConfig, IngestTask} import dev.kamu.core.manifests.{ DatasetID, DatasetVocabulary, @@ -355,7 +355,7 @@ class IngestService( def commitMetadata( datasetID: DatasetID, - metaChain: MetadataChainFS, + metaChain: MetadataChain, ingestResult: ExecutionResult[IngestCheckpoint] ): Unit = { // TODO: Avoid loading blocks again diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/metadata/MetadataChain.scala b/core.coordinator/src/main/scala/dev/kamu/cli/metadata/MetadataChain.scala new file mode 100644 index 0000000000..0a3187258b --- /dev/null +++ b/core.coordinator/src/main/scala/dev/kamu/cli/metadata/MetadataChain.scala @@ -0,0 +1,157 @@ +/* + * Copyright (c) 2018 kamu.dev + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +package dev.kamu.cli.metadata + +import java.nio.charset.StandardCharsets +import java.security.MessageDigest +import java.time.Instant + +import dev.kamu.core.manifests._ +import dev.kamu.core.manifests.parsing.pureconfig.yaml +import dev.kamu.core.manifests.parsing.pureconfig.yaml.defaults._ +import dev.kamu.core.utils.fs._ +import org.apache.hadoop.fs.{FileSystem, Path} +import pureconfig.{ConfigReader, ConfigWriter, Derivation} +import pureconfig.generic.auto._ + +import scala.reflect.ClassTag + +class MetadataChain(fileSystem: FileSystem, datasetDir: Path) { + + def init(ds: DatasetSnapshot, systemTime: Instant): Unit = { + val initialBlock = MetadataBlock( + prevBlockHash = "", + systemTime = systemTime, + source = Some(ds.source) + ).hashed() + + val initialSummary = DatasetSummary( + id = ds.id, + kind = ds.kind, + datasetDependencies = ds.dependsOn.toSet, + vocabulary = ds.vocabulary, + lastPulled = None, + numRecords = 0, + dataSize = 0 + ) + + try { + fileSystem.mkdirs(blocksDir) + saveResource(initialSummary, summaryPath) + saveResource(initialBlock, blocksDir.resolve(initialBlock.blockHash)) + } catch { + case e: Exception => + fileSystem.delete(datasetDir, true) + throw e + } + } + + // TODO: add invariant validation + def append(_block: MetadataBlock): MetadataBlock = { + val block = _block.hashed() + saveResource(block, blocksDir.resolve(block.blockHash)) + block + } + + def getSummary(): DatasetSummary = { + loadResource[DatasetSummary](summaryPath) + } + + def updateSummary( + update: DatasetSummary => DatasetSummary + ): DatasetSummary = { + val newSummary = update(getSummary()) + saveResource(newSummary, summaryPath) + newSummary + } + + def getSnapshot(): DatasetSnapshot = { + val summary = getSummary() + + val source = getBlocks().reverse + .flatMap(_.source) + .head + + DatasetSnapshot( + id = summary.id, + source = source, + vocabulary = summary.vocabulary + ) + } + + /** Returns metadata blocks in historical order */ + def getBlocks(): Vector[MetadataBlock] = { + val blocks = fileSystem + .listStatus(blocksDir) + .map(_.getPath) + .map(loadResource[MetadataBlock]) + .map(b => (b.blockHash, b)) + .toMap + + val nextBlocks = blocks.values + .map(b => (b.prevBlockHash, b.blockHash)) + .toMap + + val blocksOrdered = + new scala.collection.immutable.VectorBuilder[MetadataBlock]() + + var parentBlockHash = "" + while (nextBlocks.contains(parentBlockHash)) { + parentBlockHash = nextBlocks(parentBlockHash) + blocksOrdered += blocks(parentBlockHash) + } + + blocksOrdered.result() + } + + protected def summaryPath: Path = datasetDir.resolve("summary") + + protected def blocksDir: Path = datasetDir.resolve("blocks") + + ///////////////////////////////////////////////////////////////////////////// + // Helpers + ///////////////////////////////////////////////////////////////////////////// + + protected def saveResource[T <: Resource: ClassTag](obj: T, path: Path)( + implicit derivation: Derivation[ConfigWriter[Manifest[T]]] + ): Unit = { + val outputStream = fileSystem.create(path) + try { + yaml.save(Manifest(obj), outputStream) + } finally { + outputStream.close() + } + } + + protected def loadResource[T <: Resource: ClassTag](path: Path)( + implicit derivation: Derivation[ConfigReader[Manifest[T]]] + ): T = { + val inputStream = fileSystem.open(path) + try { + yaml.load[Manifest[T]](inputStream).content + } finally { + inputStream.close() + } + } + + protected implicit class MetadataBlockEx(b: MetadataBlock) { + def hashed(): MetadataBlock = { + val digest = MessageDigest.getInstance("sha-256") + val repr = yaml.saveStr(b) + + val blockHash = digest + .digest(repr.getBytes(StandardCharsets.UTF_8)) + .map("%02x".format(_)) + .mkString + + b.copy(blockHash = blockHash) + } + } + +} diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/metadata/MetadataRepository.scala b/core.coordinator/src/main/scala/dev/kamu/cli/metadata/MetadataRepository.scala index d778106a0c..d7030cba1f 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/metadata/MetadataRepository.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/metadata/MetadataRepository.scala @@ -13,7 +13,6 @@ import java.net.URI import dev.kamu.cli.utility.DependencyGraph import dev.kamu.cli._ -import dev.kamu.core.manifests.infra.MetadataChainFS import dev.kamu.core.manifests._ import dev.kamu.core.utils.Clock import dev.kamu.core.manifests.parsing.pureconfig.yaml @@ -83,8 +82,8 @@ class MetadataRepository( ) } - def getMetadataChain(id: DatasetID): MetadataChainFS = { - new MetadataChainFS(fileSystem, getDatasetLayout(id).metadataDir) + def getMetadataChain(id: DatasetID): MetadataChain = { + new MetadataChain(fileSystem, getDatasetLayout(id).metadataDir) } def getDatasetKind(id: DatasetID): DatasetKind = { @@ -98,10 +97,16 @@ class MetadataRepository( def getDatasetSummary(id: DatasetID): DatasetSummary = { ensureDatasetExistsAndPulled(id) - val chain = new MetadataChainFS(fileSystem, datasetMetadataDir(id)) + val chain = new MetadataChain(fileSystem, datasetMetadataDir(id)) chain.getSummary() } + def getDatasetVocabulary(id: DatasetID): DatasetVocabulary = { + getDatasetSummary(id).vocabulary + .getOrElse(DatasetVocabularyOverrides()) + .asDatasetVocabulary() + } + def getDatasetRef(id: DatasetID): DatasetRef = { if (!isRemote(id)) throw new RuntimeException(s"Dataset $id is not remote") @@ -134,6 +139,30 @@ class MetadataRepository( depGraph.resolve(ids.toList) } + def getDatasetsInReverseDependencyOrder( + ids: Seq[DatasetID], + recursive: Boolean + ): Seq[DatasetID] = { + val inverseDependencies = + getAllDatasets() + .flatMap(id => { + getDatasetDependencies(id).map(depID => (depID, id)) + }) + .groupBy(_._1) + .map { case (id, seq) => (id, seq.map(_._2).toList) } + + def dependencyOf(id: DatasetID): List[DatasetID] = { + inverseDependencies.getOrElse(id, List.empty) + } + + val depGraph = new DependencyGraph[DatasetID](dependencyOf) + val deps = depGraph.resolve(ids.toList) + if (recursive) + deps + else + deps.filter(ids.contains) + } + def getAllDatasets(): Seq[DatasetID] = { fileSystem .listStatus(workspaceLayout.metadataDir) @@ -163,7 +192,7 @@ class MetadataRepository( ) } - val chain = new MetadataChainFS(fileSystem, datasetDir) + val chain = new MetadataChain(fileSystem, datasetDir) chain.init(ds, systemClock.instant()) } @@ -201,13 +230,6 @@ class MetadataRepository( fileSystem.delete(workspaceLayout.metadataDir.resolve(id.toString), true) } - def purgeDataset(id: DatasetID): Unit = { - // TODO: Purging a dataset that is used by non-empty derivatives should raise an error - val snapshot = getMetadataChain(id).getSnapshot() - deleteDataset(id) - addDataset(snapshot) - } - //////////////////////////////////////////////////////////////////////////// // Volumes //////////////////////////////////////////////////////////////////////////// diff --git a/core.coordinator/src/main/scala/dev/kamu/cli/transform/TransformService.scala b/core.coordinator/src/main/scala/dev/kamu/cli/transform/TransformService.scala index dbd94c4d2e..ed408a52d9 100644 --- a/core.coordinator/src/main/scala/dev/kamu/cli/transform/TransformService.scala +++ b/core.coordinator/src/main/scala/dev/kamu/cli/transform/TransformService.scala @@ -9,13 +9,9 @@ package dev.kamu.cli.transform import dev.kamu.cli.WorkspaceLayout -import dev.kamu.cli.metadata.MetadataRepository +import dev.kamu.cli.metadata.{MetadataChain, MetadataRepository} import dev.kamu.core.manifests._ -import dev.kamu.core.manifests.infra.{ - MetadataChainFS, - TransformConfig, - TransformTaskConfig -} +import dev.kamu.core.manifests.infra.{TransformConfig, TransformTaskConfig} import dev.kamu.core.utils.Clock import org.apache.hadoop.fs.FileSystem import org.apache.log4j.LogManager @@ -83,6 +79,11 @@ class TransformService( inputSlices = batch.inputSlices.map { case (id, slice) => (id.toString, slice) }, + datasetVocabs = allDatasets + .map( + id => (id.toString, metadataRepository.getDatasetVocabulary(id)) + ) + .toMap, datasetLayouts = allDatasets .map(i => (i.toString, metadataRepository.getDatasetLayout(i))) .toMap, @@ -176,8 +177,8 @@ class TransformService( private def getInputSlice( inputID: DatasetID, inputIndex: Int, - inputMetaChain: MetadataChainFS, - outputMetaChain: MetadataChainFS + inputMetaChain: MetadataChain, + outputMetaChain: MetadataChain ): DataSlice = { // Determine available data range diff --git a/core.manifests b/core.manifests index 105898e33d..3f07a761ea 160000 --- a/core.manifests +++ b/core.manifests @@ -1 +1 @@ -Subproject commit 105898e33d6588a7b6f67d123a1f38c935fd8281 +Subproject commit 3f07a761ea0f48c514cf1f47a2130fc08f3513de