Skip to content

Commit

Permalink
Added --force-uncachable mode to pull
Browse files Browse the repository at this point in the history
  • Loading branch information
sergiimk committed Oct 9, 2020
1 parent e51b818 commit c558742
Show file tree
Hide file tree
Showing 18 changed files with 340 additions and 200 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.31.0] - 2020-10-08
### Added
- `pull` command now supports `--force-uncacheable` flag for refreshing uncacheable datasets.

## [0.30.1] - 2020-09-07
### Fixed
- Add back `.gitignore` file when creating local volume dir.
Expand Down
405 changes: 234 additions & 171 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion kamu-cli/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "kamu-cli"
version = "0.30.1"
version = "0.31.0"
description = "Decentralized data management tool"
authors = ["Sergii Mikhtoniuk <mikhtoniuk@gmail.com>"]
license = "MPL-2.0"
Expand Down
4 changes: 2 additions & 2 deletions kamu-cli/examples/pull-root-multi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ fn main() {
["a"].iter(),
false,
false,
false,
&OutputConfig::default(),
);
cmd.run().unwrap();
Expand Down Expand Up @@ -124,8 +125,7 @@ impl PullService for TestPullService {
fn pull_multi(
&mut self,
_dataset_ids_iter: &mut dyn Iterator<Item = &DatasetID>,
_recursive: bool,
_all: bool,
_options: PullOptions,
ingest_listener: Option<Arc<Mutex<dyn IngestMultiListener>>>,
transform_listener: Option<Arc<Mutex<dyn TransformMultiListener>>>,
) -> Vec<(DatasetIDBuf, Result<PullResult, PullError>)> {
Expand Down
4 changes: 2 additions & 2 deletions kamu-cli/examples/pull-root-single.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ fn main() {
["a"].iter(),
false,
false,
false,
&OutputConfig::default(),
);
cmd.run().unwrap();
Expand All @@ -26,8 +27,7 @@ impl PullService for TestPullService {
fn pull_multi(
&mut self,
_dataset_ids_iter: &mut dyn Iterator<Item = &DatasetID>,
_recursive: bool,
_all: bool,
_options: PullOptions,
ingest_listener: Option<Arc<Mutex<dyn IngestMultiListener>>>,
_transform_listener: Option<Arc<Mutex<dyn TransformMultiListener>>>,
) -> Vec<(DatasetIDBuf, Result<PullResult, PullError>)> {
Expand Down
3 changes: 3 additions & 0 deletions kamu-cli/src/cli_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,9 @@ pub fn cli(binary_name: &'static str, version: &'static str) -> App<'static, 'st
.short("r")
.long("recursive")
.help("Also pull all transitive dependencies of specified datasets"),
Arg::with_name("force-uncacheable")
.long("force-uncacheable")
.help("Pull latest data from the uncacheable data sources"),
Arg::with_name("dataset")
.multiple(true)
.index(1)
Expand Down
23 changes: 19 additions & 4 deletions kamu-cli/src/commands/pull_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub struct PullCommand {
ids: Vec<String>,
all: bool,
recursive: bool,
force_uncacheable: bool,
output_config: OutputConfig,
}

Expand All @@ -27,6 +28,7 @@ impl PullCommand {
ids: I,
all: bool,
recursive: bool,
force_uncacheable: bool,
output_config: &OutputConfig,
) -> Self
where
Expand All @@ -38,6 +40,7 @@ impl PullCommand {
ids: ids.map(|s| s.as_ref().to_owned()).collect(),
all: all,
recursive: recursive,
force_uncacheable: force_uncacheable,
output_config: output_config.clone(),
}
}
Expand All @@ -48,8 +51,14 @@ impl PullCommand {
) -> Vec<(DatasetIDBuf, Result<PullResult, PullError>)> {
self.pull_svc.borrow_mut().pull_multi(
&mut dataset_ids.iter().map(|id| id.as_ref()),
self.recursive,
self.all,
PullOptions {
recursive: self.recursive,
all: self.all,
ingest_options: IngestOptions {
force_uncacheable: self.force_uncacheable,
exhaust_sources: true,
},
},
None,
None,
)
Expand All @@ -68,8 +77,14 @@ impl PullCommand {

let results = self.pull_svc.borrow_mut().pull_multi(
&mut dataset_ids.iter().map(|id| id.as_ref()),
self.recursive,
self.all,
PullOptions {
recursive: self.recursive,
all: self.all,
ingest_options: IngestOptions {
force_uncacheable: self.force_uncacheable,
exhaust_sources: true,
},
},
Some(listener.clone()),
Some(listener.clone()),
);
Expand Down
1 change: 1 addition & 0 deletions kamu-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ fn main() {
submatches.values_of("dataset").unwrap_or_default(),
submatches.is_present("all"),
submatches.is_present("recursive"),
submatches.is_present("force-uncacheable"),
&output_format,
))
}
Expand Down
2 changes: 1 addition & 1 deletion kamu-core-test/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "kamu-test"
version = "0.30.1"
version = "0.31.0"
authors = ["Sergii Mikhtoniuk <mikhtoniuk@gmail.com>"]
edition = "2018"

Expand Down
2 changes: 1 addition & 1 deletion kamu-core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "kamu"
version = "0.30.1"
version = "0.31.0"
authors = ["Sergii Mikhtoniuk <mikhtoniuk@gmail.com>"]
edition = "2018"

Expand Down
21 changes: 20 additions & 1 deletion kamu-core/src/domain/ingest_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,36 @@ pub trait IngestService {
fn ingest(
&mut self,
dataset_id: &DatasetID,
options: IngestOptions,
listener: Option<Arc<Mutex<dyn IngestListener>>>,
) -> Result<IngestResult, IngestError>;

fn ingest_multi(
&mut self,
dataset_ids: &mut dyn Iterator<Item = &DatasetID>,
exhaust_sources: bool,
options: IngestOptions,
listener: Option<Arc<Mutex<dyn IngestMultiListener>>>,
) -> Vec<(DatasetIDBuf, Result<IngestResult, IngestError>)>;
}

#[derive(Debug, Clone)]
pub struct IngestOptions {
/// Fetch latest data from uncacheable data sources
pub force_uncacheable: bool,
/// Pull sources that yield multiple data files until they are
/// fully exhausted
pub exhaust_sources: bool,
}

impl Default for IngestOptions {
fn default() -> Self {
Self {
force_uncacheable: false,
exhaust_sources: false,
}
}
}

#[derive(Debug)]
pub enum IngestResult {
UpToDate,
Expand Down
23 changes: 21 additions & 2 deletions kamu-core/src/domain/pull_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ pub trait PullService {
fn pull_multi(
&mut self,
dataset_ids: &mut dyn Iterator<Item = &DatasetID>,
recursive: bool,
all: bool,
options: PullOptions,
ingest_listener: Option<Arc<Mutex<dyn IngestMultiListener>>>,
transform_listener: Option<Arc<Mutex<dyn TransformMultiListener>>>,
) -> Vec<(DatasetIDBuf, Result<PullResult, PullError>)>;
Expand All @@ -27,6 +26,26 @@ pub trait PullService {
) -> Result<PullResult, PullError>;
}

#[derive(Debug, Clone)]
pub struct PullOptions {
/// Pull all dataset dependencies recursively in depth-first order
pub recursive: bool,
/// Pull all known datasets
pub all: bool,
/// Ingest-specific options
pub ingest_options: IngestOptions,
}

impl Default for PullOptions {
fn default() -> Self {
Self {
recursive: false,
all: false,
ingest_options: IngestOptions::default(),
}
}
}

#[derive(Debug)]
pub enum PullResult {
UpToDate,
Expand Down
5 changes: 4 additions & 1 deletion kamu-core/src/infra/ingest/ingest_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::sync::{Arc, Mutex};

pub struct IngestTask {
dataset_id: DatasetIDBuf,
options: IngestOptions,
layout: DatasetLayout,
meta_chain: RefCell<Box<dyn MetadataChain>>,
source: DatasetSourceRoot,
Expand All @@ -28,6 +29,7 @@ pub struct IngestTask {
impl IngestTask {
pub fn new<'a>(
dataset_id: &DatasetID,
options: IngestOptions,
layout: DatasetLayout,
meta_chain: Box<dyn MetadataChain>,
vocab: DatasetVocabulary,
Expand All @@ -43,6 +45,7 @@ impl IngestTask {

Self {
dataset_id: dataset_id.to_owned(),
options: options,
layout: layout,
meta_chain: RefCell::new(meta_chain),
source: source,
Expand Down Expand Up @@ -139,7 +142,7 @@ impl IngestTask {
"FetchCheckpoint",
|old_checkpoint: Option<FetchCheckpoint>| {
if let Some(ref cp) = old_checkpoint {
if !cp.is_cacheable() {
if !cp.is_cacheable() && !self.options.force_uncacheable {
return Ok(ExecutionResult {
was_up_to_date: true,
checkpoint: old_checkpoint.unwrap(),
Expand Down
8 changes: 7 additions & 1 deletion kamu-core/src/infra/ingest_service_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ impl IngestService for IngestServiceImpl {
fn ingest(
&mut self,
dataset_id: &DatasetID,
options: IngestOptions,
maybe_listener: Option<Arc<Mutex<dyn IngestListener>>>,
) -> Result<IngestResult, IngestError> {
let null_listener: Arc<Mutex<dyn IngestListener>> =
Expand Down Expand Up @@ -107,6 +108,7 @@ impl IngestService for IngestServiceImpl {

let mut ingest_task = IngestTask::new(
dataset_id,
options,
layout,
meta_chain,
vocab,
Expand All @@ -123,7 +125,7 @@ impl IngestService for IngestServiceImpl {
fn ingest_multi(
&mut self,
dataset_ids: &mut dyn Iterator<Item = &DatasetID>,
exhaust_sources: bool,
options: IngestOptions,
maybe_multi_listener: Option<Arc<Mutex<dyn IngestMultiListener>>>,
) -> Vec<(DatasetIDBuf, Result<IngestResult, IngestError>)> {
let null_multi_listener: Arc<Mutex<dyn IngestMultiListener>> =
Expand All @@ -140,6 +142,7 @@ impl IngestService for IngestServiceImpl {
let meta_chain = self.metadata_repo.borrow().get_metadata_chain(&id).unwrap();
let vocab = self.metadata_repo.borrow().get_summary(&id).unwrap().vocab;
let engine_factory = self.engine_factory.clone();
let task_options = options.clone();

let null_listener = Arc::new(Mutex::new(NullIngestListener {}));
let listener = multi_listener
Expand All @@ -153,8 +156,11 @@ impl IngestService for IngestServiceImpl {
std::thread::Builder::new()
.name("ingest_multi".to_owned())
.spawn(move || {
let exhaust_sources = task_options.exhaust_sources;

let mut ingest_task = IngestTask::new(
&id,
task_options,
layout,
meta_chain,
vocab,
Expand Down
9 changes: 4 additions & 5 deletions kamu-core/src/infra/pull_service_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,11 @@ impl PullService for PullServiceImpl {
fn pull_multi(
&mut self,
dataset_ids: &mut dyn Iterator<Item = &DatasetID>,
recursive: bool,
all: bool,
options: PullOptions,
ingest_listener: Option<Arc<Mutex<dyn IngestMultiListener>>>,
transform_listener: Option<Arc<Mutex<dyn TransformMultiListener>>>,
) -> Vec<(DatasetIDBuf, Result<PullResult, PullError>)> {
let starting_dataset_ids: std::collections::HashSet<DatasetIDBuf> = if !all {
let starting_dataset_ids: std::collections::HashSet<DatasetIDBuf> = if !options.all {
dataset_ids.map(|id| id.to_owned()).collect()
} else {
self.metadata_repo.borrow().get_all_datasets().collect()
Expand All @@ -134,7 +133,7 @@ impl PullService for PullServiceImpl {
let datasets_labeled = self
.get_datasets_ordered_by_depth(&mut starting_dataset_ids.iter().map(|id| id.as_ref()));

let datasets_to_pull = if recursive || all {
let datasets_to_pull = if options.recursive || options.all {
datasets_labeled
} else {
datasets_labeled
Expand All @@ -155,7 +154,7 @@ impl PullService for PullServiceImpl {
.borrow_mut()
.ingest_multi(
&mut level.iter().map(|(id, _)| id.as_ref()),
true,
options.ingest_options.clone(),
ingest_listener.clone(),
)
.into_iter()
Expand Down
4 changes: 3 additions & 1 deletion kamu-core/tests/infra/engine/test_ingest_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ fn test_ingest_with_engine() {
.add_dataset(dataset_snapshot)
.unwrap();

let res = ingest_svc.borrow_mut().ingest(&dataset_id, None);
let res = ingest_svc
.borrow_mut()
.ingest(&dataset_id, IngestOptions::default(), None);
assert_ok!(res, IngestResult::Updated {..});

let dataset_layout = DatasetLayout::new(&volume_layout, &dataset_id);
Expand Down
8 changes: 6 additions & 2 deletions kamu-core/tests/infra/engine/test_transform_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ fn test_transform_with_engine_spark() {
.add_dataset(root_snapshot)
.unwrap();

ingest_svc.ingest(&root_id, None).unwrap();
ingest_svc
.ingest(&root_id, IngestOptions::default(), None)
.unwrap();

///////////////////////////////////////////////////////////////////////////
// Derivative setup
Expand Down Expand Up @@ -272,7 +274,9 @@ fn test_transform_with_engine_flink() {
.add_dataset(root_snapshot)
.unwrap();

ingest_svc.ingest(&root_id, None).unwrap();
ingest_svc
.ingest(&root_id, IngestOptions::default(), None)
.unwrap();

///////////////////////////////////////////////////////////////////////////
// Derivative setup
Expand Down
Loading

0 comments on commit c558742

Please sign in to comment.