Skip to content

Commit

Permalink
Watermarking support
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiimk committed Jun 23, 2020
1 parent 700c946 commit 1a76960
Show file tree
Hide file tree
Showing 9 changed files with 151 additions and 48 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.18.0] - 2020-06-23
### Added
- Watermarking support

## [0.17.0] - 2020-06-14
### Added
- Added support for [Apache Flink](https://github.com/kamu-data/kamu-engine-flink) engine!
Expand Down
16 changes: 16 additions & 0 deletions core.coordinator/src/main/scala/dev/kamu/cli/CliParser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,15 @@

package dev.kamu.cli

import java.time.Instant

import dev.kamu.cli.output.OutputFormat
import org.apache.hadoop.fs.Path
import org.apache.log4j.Level
import org.rogach.scallop._

import scala.concurrent.duration.Duration

///////////////////////////////////////////////////////////////////////////////

class KamuSubcommand(name: String) extends Subcommand(name) {
Expand Down Expand Up @@ -177,6 +181,13 @@ class CliArgs(arguments: Seq[String]) extends ScallopConf(arguments) {
}
val argType = ArgType.LIST
}
implicit val _instantConverter = singleArgConverter[Instant](s => {
if (s.endsWith(" ago")) {
Instant.now().minusMillis(Duration(s.substring(0, s.length - 4)).toMillis)
} else {
Instant.parse(s)
}
})

editBuilder(s => s.copy(helpFormatter = new BetterScallopHelpFormatter()))

Expand Down Expand Up @@ -308,6 +319,11 @@ class CliArgs(arguments: Seq[String]) extends ScallopConf(arguments) {
descr = "Pull datasets and their dependencies"
)

val setWatermark = opt[Instant](
name = "set-watermark",
descr = "Manually assigns the watermark to the dataset"
)

val ids = trailArg[List[String]](
"ids",
required = false,
Expand Down
51 changes: 30 additions & 21 deletions core.coordinator/src/main/scala/dev/kamu/cli/Kamu.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,29 +96,38 @@ class Kamu(
c.delete.ids()
)
case List(c.pull) =>
val engineFactory = getEngineFactory(
if (c.debug()) Level.INFO else c.sparkLogLevel()
)
new PullCommand(
new IngestService(
fileSystem,
workspaceLayout,
metadataRepository,
engineFactory,
systemClock
),
new TransformService(
fileSystem,
if (c.pull.setWatermark.isEmpty) {
val engineFactory = getEngineFactory(
if (c.debug()) Level.INFO else c.sparkLogLevel()
)
new PullCommand(
new IngestService(
fileSystem,
workspaceLayout,
metadataRepository,
engineFactory,
systemClock
),
new TransformService(
fileSystem,
metadataRepository,
systemClock,
engineFactory
),
metadataRepository,
remoteOperatorFactory,
c.pull.ids(),
c.pull.all(),
c.pull.recursive()
)
} else {
new AssignWatermarkCommand(
systemClock,
engineFactory
),
metadataRepository,
remoteOperatorFactory,
c.pull.ids(),
c.pull.all(),
c.pull.recursive()
)
metadataRepository,
c.pull.ids(),
c.pull.setWatermark()
)
}
case List(c.log) =>
new LogCommand(
metadataRepository,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright (c) 2018 kamu.dev
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

package dev.kamu.cli.commands

import java.time.Instant

import dev.kamu.cli.metadata.MetadataRepository
import dev.kamu.core.manifests.{DatasetID, MetadataBlock}
import dev.kamu.core.utils.Clock
import org.apache.log4j.LogManager

class AssignWatermarkCommand(
systemClock: Clock,
metadataRepository: MetadataRepository,
ids: Seq[String],
watermark: Instant
) extends Command {
private val logger = LogManager.getLogger(getClass.getName)

def run(): Unit = {
val datasetIDs = ids.map(DatasetID)

val numUpdated = datasetIDs.map(assignWatermark).count(b => b)

logger.info(s"Updated $numUpdated datasets")
}

def assignWatermark(datasetID: DatasetID): Boolean = {
val metaChain = metadataRepository.getMetadataChain(datasetID)

val newBlock = metaChain.append(
MetadataBlock(
prevBlockHash = metaChain.getBlocks().last.blockHash,
systemTime = systemClock.instant(),
outputWatermark = Some(watermark)
)
)

logger.info(
s"Committed new block: $datasetID (${newBlock.blockHash})"
)

true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,20 @@ class LogCommand(
block.outputSlice.foreach { s =>
println(renderProperty("Output.Records", s.numRecords))
println(renderProperty("Output.Interval", s.interval.format()))
println(renderProperty("Output.Hash", s.hash))
if (s.hash.nonEmpty)
println(renderProperty("Output.Hash", s.hash))
}

block.outputWatermark.foreach { w =>
println(renderProperty("Output.Watermark", w))
}

block.inputSlices.zipWithIndex.foreach {
case (s, i) =>
println(renderProperty(s"Input[$i].Records", s.numRecords))
println(renderProperty(s"Input[$i].Interval", s.interval.format()))
println(renderProperty(s"Input[$i].Hash", s.hash))
if (s.hash.nonEmpty)
println(renderProperty(s"Input[$i].Hash", s.hash))
}

block.source.foreach {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@
package dev.kamu.cli.external

object DockerImages {
val SPARK = "kamudata/engine-spark:0.4.0"
val SPARK = "kamudata/engine-spark:0.5.0"
val FLINK = "kamudata/engine-flink:0.2.0"

val LIVY = SPARK
val JUPYTER = "kamudata/jupyter-uber:0.0.1"

val FLINK = "kamudata/engine-flink:0.1.0"

val ALL = Array(
SPARK,
FLINK,
LIVY,
JUPYTER
).distinct
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,10 @@ class FlinkEngine(
if (!fileSystem.exists(checkpointsDir))
return None

val allSavepoints = fileSystem.listStatus(checkpointsDir).map(_.getPath)
val allSavepoints = fileSystem
.listStatus(checkpointsDir)
.map(_.getPath)
.filter(fileSystem.isDirectory)

// TODO: Atomicity
if (allSavepoints.length > 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,27 @@ package dev.kamu.cli.transform

import dev.kamu.cli.metadata.{MetadataChain, MetadataRepository}
import dev.kamu.core.manifests._
import dev.kamu.core.manifests.infra.{ExecuteQueryRequest, ExecuteQueryResult}
import dev.kamu.core.manifests.infra.{
ExecuteQueryRequest,
ExecuteQueryResult,
InputDataSlice,
Watermark
}
import dev.kamu.core.utils.Clock
import org.apache.hadoop.fs.FileSystem
import org.apache.log4j.LogManager
import spire.math.Interval
import spire.math.interval.{Unbound, ValueBound}
import spire.math.interval.{Closed, Unbound, ValueBound}

case class TransformBatch(
source: SourceKind.Derivative,
inputSlices: Map[DatasetID, DataSlice]
inputSlices: Map[DatasetID, InputDataSlice]
) {
def isEmpty: Boolean = inputSlices.values.forall(_.interval.isEmpty)
def isEmpty: Boolean = {
inputSlices.values.forall(
s => s.interval.isEmpty && s.explicitWatermarks.isEmpty
)
}
}

class TransformService(
Expand Down Expand Up @@ -151,19 +160,7 @@ class TransformService(
inputIndex: Int,
inputMetaChain: MetadataChain,
outputMetaChain: MetadataChain
): DataSlice = {

// Determine available data range
// Result is either: () or (-inf, upper]
val ivAvailable = inputMetaChain
.getBlocks()
.reverse
.flatMap(_.outputSlice)
.find(_.interval.nonEmpty)
.map(_.interval)
.map(i => Interval.fromBounds(Unbound(), i.upperBound))
.getOrElse(Interval.empty)

): InputDataSlice = {
// Determine processed data range
// Result is either: () or (inf, upper] or (lower, upper]
val ivProcessed = outputMetaChain
Expand All @@ -184,16 +181,32 @@ class TransformService(
Interval.all
}

// Filter unprocessed input blocks
val blocksUnprocessed = inputMetaChain
.getBlocks()
.reverse
.takeWhile(b => ivUnprocessed.contains(b.systemTime))

// Determine available data/watermark range
// Result is either: () or (-inf, upper]
val ivAvailable = blocksUnprocessed.headOption
.map(b => Interval.fromBounds(Unbound(), Closed(b.systemTime)))
.getOrElse(Interval.empty)

// Result is either: () or (lower, upper]
val ivToProcess = ivAvailable & ivUnprocessed

val explicitWatermarks = blocksUnprocessed.reverse
.filter(_.outputWatermark.isDefined)
.map(b => Watermark(b.systemTime, b.outputWatermark.get))

logger.debug(
s"Input range for $inputID is: $ivToProcess (available: $ivAvailable, processed: $ivProcessed)"
)
DataSlice(

InputDataSlice(
interval = ivToProcess,
hash = "",
numRecords = -1
explicitWatermarks = explicitWatermarks
)
}
}

0 comments on commit 1a76960

Please sign in to comment.