Skip to content

Commit

Permalink
Ensure watermark is monotonic
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiimk committed Nov 4, 2020
1 parent a68a334 commit c13e2ca
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 8 deletions.
6 changes: 0 additions & 6 deletions Makefile

This file was deleted.

6 changes: 5 additions & 1 deletion kamu-cli/src/commands/log_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use super::{Command, Error};
use kamu::domain::*;
use opendatafabric::*;

use chrono::prelude::*;
use console::style;
use std::cell::RefCell;
use std::fmt::Display;
Expand Down Expand Up @@ -36,7 +37,10 @@ impl LogCommand {
}

if let Some(ref wm) = block.output_watermark {
self.render_property("Output.Watermark", &wm);
self.render_property(
"Output.Watermark",
&wm.to_rfc3339_opts(SecondsFormat::AutoSi, true),
);
}

if let Some(ref slices) = block.input_slices {
Expand Down
5 changes: 4 additions & 1 deletion kamu-cli/src/commands/set_watermark_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,17 @@ impl Command for SetWatermarkCommand {
.borrow_mut()
.set_watermark(dataset_id, watermark.into())
{
Ok(PullResult::UpToDate) => {
eprintln!("{}", console::style("Watermark was up-to-date").yellow());
Ok(())
}
Ok(PullResult::Updated { block_hash }) => {
eprintln!(
"{}",
console::style(format!("Committed new block {}", block_hash)).green()
);
Ok(())
}
Ok(_) => panic!("Unexpected result"),
Err(e) => Err(DomainError::InfraError(e.into()).into()),
}
}
Expand Down
10 changes: 10 additions & 0 deletions kamu-core/src/infra/pull_service_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,16 @@ impl PullService for PullServiceImpl {
.borrow_mut()
.get_metadata_chain(dataset_id)?;

if let Some(last_watermark) = chain
.iter_blocks()
.filter_map(|b| b.output_watermark)
.next()
{
if last_watermark >= watermark {
return Ok(PullResult::UpToDate);
}
}

let last_hash = chain.read_ref(&BlockRef::Head);

let new_block = MetadataBlock {
Expand Down
56 changes: 56 additions & 0 deletions kamu-core/tests/infra/test_pull_service_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use kamu::infra::*;
use kamu_test::*;
use opendatafabric::*;

use chrono::prelude::*;
use itertools::Itertools;
use std::cell::RefCell;
use std::convert::TryFrom;
Expand Down Expand Up @@ -98,6 +99,61 @@ fn test_pull_batching() {
);
}

#[test]
fn test_set_watermark() {
let tmp_dir = tempfile::tempdir().unwrap();
let repo = Rc::new(RefCell::new(MetadataRepositoryImpl::new(
&WorkspaceLayout::create(tmp_dir.path()).unwrap(),
)));
let test_ingest_svc = Rc::new(RefCell::new(TestIngestService::new()));
let test_transform_svc = Rc::new(RefCell::new(TestTransformService::new()));
let mut pull_svc = PullServiceImpl::new(
repo.clone(),
test_ingest_svc.clone(),
test_transform_svc.clone(),
slog::Logger::root(slog::Discard, slog::o!()),
);

let dataset_id = DatasetIDBuf::try_from("foo").unwrap();

repo.borrow_mut()
.add_dataset(MetadataFactory::dataset_snapshot().id(&dataset_id).build())
.unwrap();

let num_blocks = || {
repo.borrow()
.get_metadata_chain(&dataset_id)
.unwrap()
.iter_blocks()
.count()
};
assert_eq!(num_blocks(), 1);

assert!(matches!(
pull_svc.set_watermark(&dataset_id, Utc.ymd(2000, 1, 2).and_hms(0, 0, 0)),
Ok(PullResult::Updated { .. })
));
assert_eq!(num_blocks(), 2);

assert!(matches!(
pull_svc.set_watermark(&dataset_id, Utc.ymd(2000, 1, 3).and_hms(0, 0, 0)),
Ok(PullResult::Updated { .. })
));
assert_eq!(num_blocks(), 3);

assert!(matches!(
pull_svc.set_watermark(&dataset_id, Utc.ymd(2000, 1, 3).and_hms(0, 0, 0)),
Ok(PullResult::UpToDate)
));
assert_eq!(num_blocks(), 3);

assert!(matches!(
pull_svc.set_watermark(&dataset_id, Utc.ymd(2000, 1, 2).and_hms(0, 0, 0)),
Ok(PullResult::UpToDate)
));
assert_eq!(num_blocks(), 3);
}

pub struct TestIngestService {
calls: Vec<Vec<DatasetIDBuf>>,
}
Expand Down

0 comments on commit c13e2ca

Please sign in to comment.