diff --git a/CHANGELOG.md b/CHANGELOG.md index 97b27189c9..d10366b397 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.34.0] - 2020-11-09 +### Added +- Initial version of `remote` management +- Initial version of `push` / `pull` commands working with `remote` +- Local file system `remote` backend implementation + ## [0.33.0] - 2020-11-01 ### Changed - Upgraded the Spark engine to Spark 3.0 diff --git a/Cargo.lock b/Cargo.lock index 59e5e6f4ba..210d7c5de9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8,9 +8,9 @@ checksum = "ee2a4ec343196209d6594e19543ae87a39f96d5534d7174822a3ad825dd6ed7e" [[package]] name = "aho-corasick" -version = "0.7.14" +version = "0.7.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b476ce7103678b0c6d3d395dbbae31d48ff910bd28be979ba5d48c6351131d0d" +checksum = "7404febffaa47dac81aa44dba71523c9d069b1bdc50a77db41195149e17f68e5" dependencies = [ "memchr", ] @@ -63,7 +63,7 @@ checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" [[package]] name = "arrow" version = "3.0.0-SNAPSHOT" -source = "git+https://github.com/apache/arrow#e7ce8cfda3a612cd54fa47d06e26ca07b83a7cd6" +source = "git+https://github.com/apache/arrow#3e72c7006653c97bbbd6faa6446f45b7a0c1d953" dependencies = [ "chrono", "csv", @@ -79,6 +79,12 @@ dependencies = [ "serde_json", ] +[[package]] +name = "assert_matches" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "695579f0f2520f3774bb40461e5adb066459d4e0af4d59d20175484fb8e9edf1" + [[package]] name = "atty" version = "0.2.14" @@ -116,9 +122,9 @@ checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" [[package]] name = "blake2b_simd" -version = "0.5.10" +version = "0.5.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8fb2d74254a3a0b5cac33ac9f8ed0e44aa50378d9dbb2e5d83bd21ed1dc2c8a" +checksum = "afa748e348ad3be8263be728124b24a24f268266f6f5d58af9d75f6a40b5c587" dependencies = [ "arrayref", "arrayvec 0.5.2", @@ -193,9 +199,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.0.61" +version = "1.0.62" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed67cbde08356238e75fc4656be4749481eeffb09e19f320a25237d5221c985d" +checksum = "f1770ced377336a88a67c473594ccc14eca6f4559217c34f64aac8f83d641b40" dependencies = [ "jobserver", ] @@ -304,9 +310,9 @@ dependencies = [ [[package]] name = "csv" -version = "1.1.3" +version = "1.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "00affe7f6ab566df61b4be3ce8cf16bc2576bca0963ceb0955e45d514bf9a279" +checksum = "fc4666154fd004af3fd6f1da2e81a96fd5a81927fe8ddb6ecc79e2aa6e138b54" dependencies = [ "bstr", "csv-core", @@ -442,11 +448,11 @@ checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" [[package]] name = "filetime" -version = "0.2.12" +version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ed85775dcc68644b5c950ac06a2b23768d3bc9390464151aaf27136998dcf9e" +checksum = "0c122a393ea57648015bf06fbd3d372378992e86b9ff5a7a497b076a28c79efe" dependencies = [ - "cfg-if 0.1.10", + "cfg-if 1.0.0", "libc", "redox_syscall", "winapi", @@ -473,11 +479,11 @@ dependencies = [ [[package]] name = "flate2" -version = "1.0.18" +version = "1.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da80be589a72651dcda34d8b35bcdc9b7254ad06325611074d9cc0fbb19f60ee" +checksum = "7411863d55df97a419aa64cb4d2f167103ea9d767e2c54a1868b7ac3f6b47129" dependencies = [ - "cfg-if 0.1.10", + "cfg-if 1.0.0", "crc32fast", "libc", "miniz_oxide", @@ -489,6 +495,16 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "form_urlencoded" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ece68d15c92e84fa4f19d3780f1294e5ca82a78a6d515f1efaabcc144688be00" +dependencies = [ + "matches", + "percent-encoding", +] + [[package]] name = "fs_extra" version = "1.2.0" @@ -649,8 +665,9 @@ dependencies = [ [[package]] name = "kamu" -version = "0.33.0" +version = "0.34.0" dependencies = [ + "assert_matches", "cfg-if 1.0.0", "chrono", "curl", @@ -682,7 +699,7 @@ dependencies = [ [[package]] name = "kamu-cli" -version = "0.33.0" +version = "0.34.0" dependencies = [ "chrono", "chrono-humanize", @@ -712,7 +729,7 @@ dependencies = [ [[package]] name = "kamu-test" -version = "0.33.0" +version = "0.34.0" dependencies = [ "chrono", "opendatafabric", @@ -825,9 +842,9 @@ checksum = "72ef4a56884ca558e5ddb05a1d1e7e1bfd9a68d9ed024c21704cc98872dae1bb" [[package]] name = "num" -version = "0.3.0" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab3e176191bc4faad357e3122c4747aa098ac880e88b168f106386128736cf4a" +checksum = "8b7a8e9be5e039e2ff869df49155f1c06bd01ade2117ec783e56ab0932b67a8f" dependencies = [ "num-bigint", "num-complex", @@ -839,9 +856,9 @@ dependencies = [ [[package]] name = "num-bigint" -version = "0.3.0" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7f3fc75e3697059fb1bc465e3d8cca6cf92f56854f201158b3f9c77d5a3cfa0" +checksum = "5e9a41747ae4633fce5adffb4d2e81ffc5e89593cb19917f8fb2cc5ff76507bf" dependencies = [ "autocfg", "num-integer", @@ -890,9 +907,9 @@ dependencies = [ [[package]] name = "num-rational" -version = "0.3.1" +version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5fa6d5f418879385b213d905f7cf5bf4aa553d4c380f0152d1d4f2749186fa9" +checksum = "12ac428b1cb17fce6f731001d307d351ec70a6d202fc2e60f7d4c5e42d8f4f07" dependencies = [ "autocfg", "num-bigint", @@ -927,7 +944,7 @@ checksum = "17b02fc0ff9a9e4b35b3342880f48e896ebf69f2967921fe8646bf5b7125956a" [[package]] name = "opendatafabric" -version = "0.33.0" +version = "0.34.0" dependencies = [ "byteorder", "chrono", @@ -983,7 +1000,7 @@ dependencies = [ [[package]] name = "parquet" version = "3.0.0-SNAPSHOT" -source = "git+https://github.com/apache/arrow#e7ce8cfda3a612cd54fa47d06e26ca07b83a7cd6" +source = "git+https://github.com/apache/arrow#3e72c7006653c97bbbd6faa6446f45b7a0c1d953" dependencies = [ "arrow", "base64 0.13.0", @@ -1028,9 +1045,9 @@ checksum = "b18befed8bc2b61abc79a457295e7e838417326da1586050b919414073977f19" [[package]] name = "ppv-lite86" -version = "0.2.9" +version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c36fa947111f5c62a733b652544dd0016a43ce89619538a8ef92724a6f501a20" +checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857" [[package]] name = "prettytable-rs" @@ -1495,18 +1512,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.21" +version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "318234ffa22e0920fe9a40d7b8369b5f649d490980cf7aadcf1eb91594869b42" +checksum = "0e9ae34b84616eedaaf1e9dd6026dbe00dcafa92aa0c8077cb69df1fcfe5e53e" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.21" +version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cae2447b6282786c3493999f40a9be2a6ad20cb8bd268b0a0dbf5a065535c0ab" +checksum = "9ba20f23e85b10754cd195504aebf6a27e2e6cbe28c17778a0c930724628dd56" dependencies = [ "proc-macro2", "quote", @@ -1599,10 +1616,11 @@ checksum = "f14ee04d9415b52b3aeab06258a3f07093182b88ba0f9b8d203f211a7a7d41c7" [[package]] name = "url" -version = "2.1.1" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "829d4a8476c35c9bf0bbce5a3b23f4106f79728039b726d292bb93bc106787cb" +checksum = "5909f2b0817350449ed73e8bcd81c8c3c8d9a7a5d8acba4b27db277f1868976e" dependencies = [ + "form_urlencoded", "idna", "matches", "percent-encoding", diff --git a/kamu-cli/Cargo.toml b/kamu-cli/Cargo.toml index a018f38a7a..70147dc61d 100644 --- a/kamu-cli/Cargo.toml +++ b/kamu-cli/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "kamu-cli" -version = "0.33.0" +version = "0.34.0" description = "Decentralized data management tool" authors = ["Sergii Mikhtoniuk "] license = "MPL-2.0" diff --git a/kamu-cli/src/cli_parser.rs b/kamu-cli/src/cli_parser.rs index 1c381b70a3..f7d0345c37 100644 --- a/kamu-cli/src/cli_parser.rs +++ b/kamu-cli/src/cli_parser.rs @@ -233,9 +233,35 @@ pub fn cli(binary_name: &'static str, version: &'static str) -> App<'static, 'st Arg::with_name("set-watermark") .long("set-watermark") .takes_value(true) - .value_name("T") + .value_name("TIME") .help("Injects a manual watermark into the dataset to signify that \ - no data is expected to arrive with event time that precedes it") + no data is expected to arrive with event time that precedes it"), + Arg::with_name("remote") + .long("remote") + .takes_value(true) + .value_name("REMOTE") + .help("Specifies which remote to pull data from"), + ]), + SubCommand::with_name("push") + .about("Push local data into the remote repository") + .args(&[ + // Arg::with_name("all") + // .short("a") + // .long("all") + // .help("Push all datasets in the workspace"), + // Arg::with_name("recursive") + // .short("r") + // .long("recursive") + // .help("Also push all transitive dependencies of specified datasets"), + Arg::with_name("dataset") + .multiple(true) + .index(1) + .help("Dataset ID(s)"), + Arg::with_name("remote") + .long("remote") + .takes_value(true) + .value_name("REMOTE") + .help("Specifies which remote to push data into"), ]), SubCommand::with_name("reset") .about("Revert the dataset back to the specified state") diff --git a/kamu-cli/src/commands/complete_command.rs b/kamu-cli/src/commands/complete_command.rs index def53a0a10..a862510922 100644 --- a/kamu-cli/src/commands/complete_command.rs +++ b/kamu-cli/src/commands/complete_command.rs @@ -1,6 +1,7 @@ use super::{Command, Error}; use kamu::domain::*; +use chrono::prelude::*; use glob; use std::cell::RefCell; use std::fs; @@ -31,6 +32,10 @@ impl CompleteCommand { } } + fn complete_timestamp(&self) { + println!("{}", Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true)); + } + fn complete_dataset(&self, prefix: &str) { if let Some(repo) = self.metadata_repo.as_ref() { for dataset_id in repo.borrow().get_all_datasets() { @@ -109,14 +114,33 @@ impl Command for CompleteCommand { for s in last_cmd.subcommands.iter() { if s.p.meta.name == *arg { last_cmd = &s.p; - continue; } } } let empty = "".to_owned(); + let prev = args.get(self.current - 1).unwrap_or(&empty); let to_complete = args.get(self.current).unwrap_or(&empty); + // Complete option values + if prev.starts_with("--") { + for opt in last_cmd.opts.iter() { + let full_name = format!("--{}", opt.s.long.unwrap()); + if full_name == *prev { + if let Some(val_names) = &opt.v.val_names { + for (_, name) in val_names.iter() { + match *name { + "REMOTE" => self.complete_remote(to_complete), + "TIME" => self.complete_timestamp(), + _ => (), + } + } + return Ok(()); + } + } + } + } + // Complete commands for s in last_cmd.subcommands.iter() { if !s.p.is_set(clap::AppSettings::Hidden) && s.p.meta.name.starts_with(to_complete) { @@ -136,6 +160,9 @@ impl Command for CompleteCommand { // Complete flags and options if to_complete.starts_with("-") { + if "--help".starts_with(to_complete) { + println!("--help"); + } for flg in last_cmd.flags.iter() { let full_name = if flg.s.long.is_some() { format!("--{}", flg.s.long.unwrap()) diff --git a/kamu-cli/src/commands/mod.rs b/kamu-cli/src/commands/mod.rs index d77fcc5378..36238acc96 100644 --- a/kamu-cli/src/commands/mod.rs +++ b/kamu-cli/src/commands/mod.rs @@ -36,6 +36,9 @@ pub use pull_command::*; mod pull_images_command; pub use pull_images_command::*; +mod push_command; +pub use push_command::*; + mod remote_add_command; pub use remote_add_command::*; @@ -54,6 +57,9 @@ pub use sql_server_command::*; mod sql_shell_command; pub use sql_shell_command::*; +mod sync_from_command; +pub use sync_from_command::*; + pub trait Command { fn needs_workspace(&self) -> bool { true diff --git a/kamu-cli/src/commands/push_command.rs b/kamu-cli/src/commands/push_command.rs new file mode 100644 index 0000000000..38de972993 --- /dev/null +++ b/kamu-cli/src/commands/push_command.rs @@ -0,0 +1,142 @@ +use super::{Command, Error}; +use crate::output::OutputConfig; +use kamu::domain::*; +use opendatafabric::*; + +use std::backtrace::BacktraceStatus; +use std::cell::RefCell; +use std::error::Error as StdError; +use std::rc::Rc; + +/////////////////////////////////////////////////////////////////////////////// +// Command +/////////////////////////////////////////////////////////////////////////////// + +pub struct PushCommand { + sync_svc: Rc>, + ids: Vec, + remote: Option, + _output_config: OutputConfig, +} + +impl PushCommand { + pub fn new( + sync_svc: Rc>, + ids: I, + remote: Option, + output_config: &OutputConfig, + ) -> Self + where + I: Iterator, + S: AsRef, + S2: AsRef, + { + Self { + sync_svc: sync_svc, + ids: ids.map(|s| s.as_ref().to_owned()).collect(), + remote: remote.map(|v| v.as_ref().to_owned()), + _output_config: output_config.clone(), + } + } + + fn push( + &self, + dataset_ids: Vec, + ) -> Vec<(DatasetIDBuf, Result)> { + dataset_ids + .into_iter() + .map(|id| { + let result = self.sync_svc.borrow_mut().sync_to( + &id, + &id, + self.remote.as_ref().unwrap(), + SyncOptions::default(), + None, + ); + (id, result) + }) + .collect() + } +} + +impl Command for PushCommand { + fn run(&mut self) -> Result<(), Error> { + if self.ids.len() == 0 { + return Err(Error::UsageError { + msg: "Specify a dataset or pass --all".to_owned(), + }); + } + + let dataset_ids: Vec = self.ids.iter().map(|s| s.parse().unwrap()).collect(); + + let mut updated = 0; + let mut up_to_date = 0; + let mut errors = 0; + + let results = self.push(dataset_ids); + + for (_, res) in results.iter() { + match res { + Ok(r) => match r { + SyncResult::UpToDate => up_to_date += 1, + SyncResult::Updated { .. } => updated += 1, + }, + Err(_) => errors += 1, + } + } + + if updated != 0 { + eprintln!( + "{}", + console::style(format!("{} dataset(s) pushed", updated)) + .green() + .bold() + ); + } + if up_to_date != 0 { + eprintln!( + "{}", + console::style(format!("{} dataset(s) up-to-date", up_to_date)) + .yellow() + .bold() + ); + } + if errors != 0 { + eprintln!( + "{}\n\n{}:", + console::style(format!("{} dataset(s) had errors", errors)) + .red() + .bold(), + console::style("Summary of errors") + ); + results + .into_iter() + .filter_map(|(id, res)| res.err().map(|e| (id, e))) + .for_each(|(id, err)| { + eprintln!( + "\n{}: {}", + console::style(format!("{}", id)).red().bold(), + err + ); + if let Some(bt) = err.backtrace() { + if bt.status() == BacktraceStatus::Captured { + eprintln!("{}", console::style(bt).dim()); + } + } + + let mut source = err.source(); + while source.is_some() { + if let Some(bt) = source.unwrap().backtrace() { + if bt.status() == BacktraceStatus::Captured { + eprintln!("\nCaused by: {}", source.unwrap()); + eprintln!("{}", console::style(bt).dim()); + } + } + source = source.unwrap().source(); + } + }); + } + + Ok(()) + } +} diff --git a/kamu-cli/src/commands/sync_from_command.rs b/kamu-cli/src/commands/sync_from_command.rs new file mode 100644 index 0000000000..510d739db1 --- /dev/null +++ b/kamu-cli/src/commands/sync_from_command.rs @@ -0,0 +1,142 @@ +use super::{Command, Error}; +use crate::output::OutputConfig; +use kamu::domain::*; +use opendatafabric::*; + +use std::backtrace::BacktraceStatus; +use std::cell::RefCell; +use std::error::Error as StdError; +use std::rc::Rc; + +/////////////////////////////////////////////////////////////////////////////// +// Command +/////////////////////////////////////////////////////////////////////////////// + +pub struct SyncFromCommand { + sync_svc: Rc>, + ids: Vec, + remote: Option, + _output_config: OutputConfig, +} + +impl SyncFromCommand { + pub fn new( + sync_svc: Rc>, + ids: I, + remote: Option, + output_config: &OutputConfig, + ) -> Self + where + I: Iterator, + S: AsRef, + S2: AsRef, + { + Self { + sync_svc: sync_svc, + ids: ids.map(|s| s.as_ref().to_owned()).collect(), + remote: remote.map(|v| v.as_ref().to_owned()), + _output_config: output_config.clone(), + } + } + + fn pull( + &self, + dataset_ids: Vec, + ) -> Vec<(DatasetIDBuf, Result)> { + dataset_ids + .into_iter() + .map(|id| { + let result = self.sync_svc.borrow_mut().sync_from( + &id, + &id, + self.remote.as_ref().unwrap(), + SyncOptions::default(), + None, + ); + (id, result) + }) + .collect() + } +} + +impl Command for SyncFromCommand { + fn run(&mut self) -> Result<(), Error> { + if self.ids.len() == 0 { + return Err(Error::UsageError { + msg: "Specify a dataset or pass --all".to_owned(), + }); + } + + let dataset_ids: Vec = self.ids.iter().map(|s| s.parse().unwrap()).collect(); + + let mut updated = 0; + let mut up_to_date = 0; + let mut errors = 0; + + let results = self.pull(dataset_ids); + + for (_, res) in results.iter() { + match res { + Ok(r) => match r { + SyncResult::UpToDate => up_to_date += 1, + SyncResult::Updated { .. } => updated += 1, + }, + Err(_) => errors += 1, + } + } + + if updated != 0 { + eprintln!( + "{}", + console::style(format!("{} dataset(s) pulled", updated)) + .green() + .bold() + ); + } + if up_to_date != 0 { + eprintln!( + "{}", + console::style(format!("{} dataset(s) up-to-date", up_to_date)) + .yellow() + .bold() + ); + } + if errors != 0 { + eprintln!( + "{}\n\n{}:", + console::style(format!("{} dataset(s) had errors", errors)) + .red() + .bold(), + console::style("Summary of errors") + ); + results + .into_iter() + .filter_map(|(id, res)| res.err().map(|e| (id, e))) + .for_each(|(id, err)| { + eprintln!( + "\n{}: {}", + console::style(format!("{}", id)).red().bold(), + err + ); + if let Some(bt) = err.backtrace() { + if bt.status() == BacktraceStatus::Captured { + eprintln!("{}", console::style(bt).dim()); + } + } + + let mut source = err.source(); + while source.is_some() { + if let Some(bt) = source.unwrap().backtrace() { + if bt.status() == BacktraceStatus::Captured { + eprintln!("\nCaused by: {}", source.unwrap()); + eprintln!("{}", console::style(bt).dim()); + } + } + source = source.unwrap().source(); + } + }); + } + + Ok(()) + } +} diff --git a/kamu-cli/src/main.rs b/kamu-cli/src/main.rs index ecf6f9366d..65b40acaf4 100644 --- a/kamu-cli/src/main.rs +++ b/kamu-cli/src/main.rs @@ -58,6 +58,13 @@ fn main() { transform_svc.clone(), logger.new(o!()), ))); + let remote_factory = Arc::new(Mutex::new(RemoteFactory::new(logger.new(o!())))); + let sync_svc = Rc::new(RefCell::new(SyncServiceImpl::new( + workspace_layout.clone(), + metadata_repo.clone(), + remote_factory.clone(), + logger.new(o!()), + ))); let mut command: Box = match matches.subcommand() { ("add", Some(submatches)) => Box::new(AddCommand::new( @@ -127,6 +134,13 @@ fn main() { submatches.is_present("recursive"), submatches.value_of("set-watermark").unwrap(), )) + } else if submatches.is_present("remote") { + Box::new(SyncFromCommand::new( + sync_svc.clone(), + submatches.values_of("dataset").unwrap_or_default(), + submatches.value_of("remote"), + &output_format, + )) } else { Box::new(PullCommand::new( pull_svc.clone(), @@ -138,6 +152,12 @@ fn main() { )) } } + ("push", Some(push_matches)) => Box::new(PushCommand::new( + sync_svc.clone(), + push_matches.values_of("dataset").unwrap_or_default(), + push_matches.value_of("remote"), + &output_format, + )), ("remote", Some(remote_matches)) => match remote_matches.subcommand() { ("add", Some(add_matches)) => Box::new(RemoteAddCommand::new( metadata_repo.clone(), diff --git a/kamu-core-test/Cargo.toml b/kamu-core-test/Cargo.toml index cf06681bbb..8cfb9d8858 100644 --- a/kamu-core-test/Cargo.toml +++ b/kamu-core-test/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "kamu-test" -version = "0.33.0" +version = "0.34.0" authors = ["Sergii Mikhtoniuk "] edition = "2018" diff --git a/kamu-core/Cargo.toml b/kamu-core/Cargo.toml index eed79df6a8..64cd354544 100644 --- a/kamu-core/Cargo.toml +++ b/kamu-core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "kamu" -version = "0.33.0" +version = "0.34.0" authors = ["Sergii Mikhtoniuk "] edition = "2018" @@ -45,5 +45,6 @@ users = "*" # For getting uid:gid [dev-dependencies] kamu-test = { path = "../kamu-core-test" } +assert_matches = "*" filetime = "*" parquet = { git = "https://github.com/apache/arrow" } diff --git a/kamu-core/src/domain/metadata_repository.rs b/kamu-core/src/domain/metadata_repository.rs index 9552c3ae7a..e1d3aff06f 100644 --- a/kamu-core/src/domain/metadata_repository.rs +++ b/kamu-core/src/domain/metadata_repository.rs @@ -7,12 +7,12 @@ use url::Url; pub trait MetadataRepository { fn get_all_datasets<'s>(&'s self) -> Box + 's>; - fn add_dataset(&mut self, snapshot: DatasetSnapshot) -> Result<(), DomainError>; + fn add_dataset(&mut self, snapshot: DatasetSnapshot) -> Result; fn add_datasets( &mut self, snapshots: &mut dyn Iterator, - ) -> Vec<(DatasetIDBuf, Result<(), DomainError>)>; + ) -> Vec<(DatasetIDBuf, Result)>; fn delete_dataset(&mut self, dataset_id: &DatasetID) -> Result<(), DomainError>; diff --git a/kamu-core/src/domain/mod.rs b/kamu-core/src/domain/mod.rs index 6047e99958..54434f85cf 100644 --- a/kamu-core/src/domain/mod.rs +++ b/kamu-core/src/domain/mod.rs @@ -26,5 +26,8 @@ pub use remote::*; mod resource_loader; pub use resource_loader::*; +mod sync_service; +pub use sync_service::*; + mod transform_service; pub use transform_service::*; diff --git a/kamu-core/src/domain/remote.rs b/kamu-core/src/domain/remote.rs index 5bc6744de6..1b4bb5a7a9 100644 --- a/kamu-core/src/domain/remote.rs +++ b/kamu-core/src/domain/remote.rs @@ -1,7 +1,12 @@ +use opendatafabric::{DatasetID, Sha3_256}; use serde::{Deserialize, Serialize}; use serde_with::skip_serializing_none; use url::Url; +use std::backtrace::Backtrace; +use std::path::{Path, PathBuf}; +use thiserror::Error; + pub type RemoteID = str; pub type RemoteIDBuf = String; @@ -11,3 +16,52 @@ pub type RemoteIDBuf = String; pub struct Remote { pub url: Url, } + +pub trait RemoteClient { + fn read_ref(&self, dataset_id: &DatasetID) -> Result, RemoteError>; + + fn write( + &mut self, + dataset_id: &DatasetID, + expected_head: Option, + new_head: Sha3_256, + blocks: &mut dyn Iterator)>, + data_files: &mut dyn Iterator, + checkpoint_dir: &Path, + ) -> Result<(), RemoteError>; + + fn read( + &self, + dataset_id: &DatasetID, + expected_head: Sha3_256, + last_seen_block: Option, + tmp_dir: &Path, + ) -> Result; +} + +pub struct RemoteReadResult { + pub blocks: Vec>, + pub data_files: Vec, + pub checkpoint_dir: PathBuf, +} + +type BoxedError = Box; + +#[derive(Debug, Error)] +pub enum RemoteError { + #[error("Dataset does not exist")] + DoesNotExist, + #[error("Dataset diverged")] + Diverged, + #[error("Dataset was updated concurrently")] + UpdatedConcurrently, + #[error("IO error")] + IOError { + #[from] + source: std::io::Error, + #[backtrace] + backtrace: Backtrace, + }, + #[error("Protocol error")] + ProtocolError(BoxedError), +} diff --git a/kamu-core/src/domain/sync_service.rs b/kamu-core/src/domain/sync_service.rs new file mode 100644 index 0000000000..ef99bd9b3f --- /dev/null +++ b/kamu-core/src/domain/sync_service.rs @@ -0,0 +1,111 @@ +use super::{RemoteID, RemoteIDBuf}; +use opendatafabric::{DatasetID, DatasetIDBuf, Sha3_256}; + +use std::sync::{Arc, Mutex}; +use thiserror::Error; + +/////////////////////////////////////////////////////////////////////////////// +// Service +/////////////////////////////////////////////////////////////////////////////// + +pub trait SyncService { + fn sync_from( + &mut self, + local_dataset_id: &DatasetID, + remote_dataset_id: &DatasetID, + remote_id: &RemoteID, + options: SyncOptions, + listener: Option>>, + ) -> Result; + + fn sync_to( + &mut self, + local_dataset_id: &DatasetID, + remote_dataset_id: &DatasetID, + remote_id: &RemoteID, + options: SyncOptions, + listener: Option>>, + ) -> Result; +} + +#[derive(Debug, Clone)] +pub struct SyncOptions {} + +impl Default for SyncOptions { + fn default() -> Self { + Self {} + } +} + +#[derive(Debug)] +pub enum SyncResult { + UpToDate, + Updated { + old_head: Option, + new_head: Sha3_256, + }, +} + +/////////////////////////////////////////////////////////////////////////////// +// Listener +/////////////////////////////////////////////////////////////////////////////// + +pub trait SyncListener: Send { + fn begin(&mut self) {} + fn success(&mut self, _result: &SyncResult) {} + fn error(&mut self, _error: &SyncError) {} +} + +pub struct NullSyncListener; +impl SyncListener for NullSyncListener {} + +pub trait SyncMultiListener { + fn begin_sync(&mut self, _dataset_id: &DatasetID) -> Option>> { + None + } +} + +pub struct NullSyncMultiListener; +impl SyncMultiListener for NullSyncMultiListener {} + +/////////////////////////////////////////////////////////////////////////////// +// Errors +/////////////////////////////////////////////////////////////////////////////// + +type BoxedError = Box; + +#[derive(Debug, Error)] +pub enum SyncError { + #[error("Dataset {dataset_id} does not exist locally")] + LocalDatasetDoesNotExist { dataset_id: DatasetIDBuf }, + #[error("Dataset {dataset_id} does not exist in remote {remote_id}")] + RemoteDatasetDoesNotExist { + remote_id: RemoteIDBuf, + dataset_id: DatasetIDBuf, + }, + #[error("Remote {remote_id} does not exist")] + RemoteDoesNotExist { remote_id: RemoteIDBuf }, + #[error("Local dataset ({local_head}) and remote ({remote_head}) have diverged")] + DatasetsDiverged { + local_head: Sha3_256, + remote_head: Sha3_256, + }, + #[error("Inconsistent metadata")] + InconsistentMetadata(BoxedError), + #[error("Protocol error")] + ProtocolError(BoxedError), + #[error("Internal error")] + InternalError(BoxedError), +} + +impl From for SyncError { + fn from(e: std::io::Error) -> Self { + Self::InternalError(e.into()) + } +} + +impl From for SyncError { + fn from(e: fs_extra::error::Error) -> Self { + Self::InternalError(e.into()) + } +} diff --git a/kamu-core/src/domain/time_interval.rs b/kamu-core/src/domain/time_interval.rs deleted file mode 100644 index c4f191c637..0000000000 --- a/kamu-core/src/domain/time_interval.rs +++ /dev/null @@ -1,270 +0,0 @@ -use chrono::prelude::*; -use intervals_general::bound_pair::BoundPair; -use intervals_general::interval::Interval; - -use serde::de::{Deserialize, Deserializer, Error, Visitor}; -use serde::{Serialize, Serializer}; -use std::convert::TryFrom; -use std::fmt; - -type Element = DateTime; - -#[derive(Debug, Clone, PartialEq)] -pub struct TimeInterval(Interval); - -#[derive(Debug, Clone)] -pub struct InvalidTimeInterval; - -impl fmt::Display for InvalidTimeInterval { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "invalid time interval") - } -} - -impl TimeInterval { - pub fn empty() -> Self { - Self(Interval::Empty) - } - - pub fn closed(left: Element, right: Element) -> Result { - match BoundPair::new(left, right) { - None => Err(InvalidTimeInterval), - Some(p) => Ok(Self(Interval::Closed { bound_pair: p })), - } - } - - pub fn open(left: Element, right: Element) -> Result { - match BoundPair::new(left, right) { - None => Err(InvalidTimeInterval), - Some(p) => Ok(Self(Interval::Open { bound_pair: p })), - } - } - - pub fn left_half_open(left: Element, right: Element) -> Result { - match BoundPair::new(left, right) { - None => Err(InvalidTimeInterval), - Some(p) => Ok(Self(Interval::LeftHalfOpen { bound_pair: p })), - } - } - - pub fn right_half_open(left: Element, right: Element) -> Result { - match BoundPair::new(left, right) { - None => Err(InvalidTimeInterval), - Some(p) => Ok(Self(Interval::RightHalfOpen { bound_pair: p })), - } - } - - pub fn unbounded_closed_right(right: Element) -> Self { - Self(Interval::UnboundedClosedRight { right: right }) - } - - pub fn unbounded_open_right(right: Element) -> Self { - Self(Interval::UnboundedOpenRight { right: right }) - } - - pub fn unbounded_closed_left(left: Element) -> Self { - Self(Interval::UnboundedClosedLeft { left: left }) - } - - pub fn unbounded_open_left(left: Element) -> Self { - Self(Interval::UnboundedOpenLeft { left: left }) - } - - pub fn singleton(at: Element) -> Self { - Self(Interval::Singleton { at: at }) - } - - pub fn unbounded() -> Self { - Self(Interval::Unbounded) - } - - pub fn is_empty(&self) -> bool { - match self.0 { - Interval::Empty => true, - _ => false, - } - } - - // TODO: upstream - pub fn left_complement(&self) -> TimeInterval { - match self.0 { - Interval::Empty => Self(Interval::Unbounded), - Interval::Unbounded => Self(Interval::Empty), - Interval::Singleton { at } => Self::unbounded_open_right(at), - Interval::Open { bound_pair } => { - Self::unbounded_closed_right(bound_pair.left().clone()) - } - Interval::Closed { bound_pair } => { - Self::unbounded_open_right(bound_pair.left().clone()) - } - Interval::LeftHalfOpen { bound_pair } => { - Self::unbounded_closed_right(bound_pair.left().clone()) - } - Interval::RightHalfOpen { bound_pair } => { - Self::unbounded_open_right(bound_pair.left().clone()) - } - Interval::UnboundedOpenRight { .. } => Self(Interval::Empty), - Interval::UnboundedClosedRight { .. } => Self(Interval::Empty), - Interval::UnboundedOpenLeft { left } => Self::unbounded_closed_right(left.clone()), - Interval::UnboundedClosedLeft { left } => Self::unbounded_open_right(left.clone()), - } - } - - pub fn right_complement(&self) -> TimeInterval { - match self.0 { - Interval::Empty => Self(Interval::Unbounded), - Interval::Unbounded => Self(Interval::Empty), - Interval::Singleton { at } => Self::unbounded_open_left(at), - Interval::Open { bound_pair } => { - Self::unbounded_closed_left(bound_pair.right().clone()) - } - Interval::Closed { bound_pair } => { - Self::unbounded_open_left(bound_pair.right().clone()) - } - Interval::LeftHalfOpen { bound_pair } => { - Self::unbounded_open_left(bound_pair.right().clone()) - } - Interval::RightHalfOpen { bound_pair } => { - Self::unbounded_closed_left(bound_pair.right().clone()) - } - Interval::UnboundedOpenRight { right } => Self::unbounded_closed_left(right.clone()), - Interval::UnboundedClosedRight { right } => Self::unbounded_open_left(right.clone()), - Interval::UnboundedOpenLeft { .. } => Self(Interval::Empty), - Interval::UnboundedClosedLeft { .. } => Self(Interval::Empty), - } - } - - pub fn contains(&self, other: &TimeInterval) -> bool { - self.0.contains(&other.0) - } - - pub fn contains_point(&self, point: &Element) -> bool { - self.contains(&TimeInterval::singleton(point.clone())) - } - - pub fn intersect(&self, other: &TimeInterval) -> TimeInterval { - Self(self.0.intersect(&other.0)) - } -} - -impl Eq for TimeInterval {} - -impl fmt::Display for TimeInterval { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - fn fmt_elem(v: &Element) -> String { - v.to_rfc3339_opts(SecondsFormat::Millis, true) - } - - match self.0 { - Interval::Unbounded => f.write_str("(-inf, inf)"), - Interval::Empty => f.write_str("()"), - Interval::Closed { bound_pair: p } => { - write!(f, "[{}, {}]", fmt_elem(p.left()), fmt_elem(p.right())) - } - Interval::Open { bound_pair: p } => { - write!(f, "({}, {})", fmt_elem(p.left()), fmt_elem(p.right())) - } - Interval::LeftHalfOpen { bound_pair: p } => { - write!(f, "({}, {}]", fmt_elem(p.left()), fmt_elem(p.right())) - } - Interval::RightHalfOpen { bound_pair: p } => { - write!(f, "[{}, {})", fmt_elem(p.left()), fmt_elem(p.right())) - } - Interval::UnboundedClosedRight { right } => write!(f, "(-inf, {}]", fmt_elem(&right)), - Interval::UnboundedOpenRight { right } => write!(f, "(-inf, {})", fmt_elem(&right)), - Interval::UnboundedClosedLeft { left } => write!(f, "[{}, inf)", fmt_elem(&left)), - Interval::UnboundedOpenLeft { left } => write!(f, "({}, inf)", fmt_elem(&left)), - Interval::Singleton { at } => write!(f, "[{}, {}]", fmt_elem(&at), fmt_elem(&at)), - } - } -} - -impl TryFrom<&str> for TimeInterval { - type Error = InvalidTimeInterval; - - fn try_from(value: &str) -> Result { - if value == "()" { - return Ok(TimeInterval::empty()); - } else if value.len() < 3 { - return Err(InvalidTimeInterval); - } - - fn parse_dt(s: &str) -> Result { - DateTime::parse_from_rfc3339(s.trim()) - .map(|dt| dt.into()) - .map_err(|_| InvalidTimeInterval) - } - - let lbound = &value[0..1]; - let rbound = &value[value.len() - 1..value.len()]; - let mut split = value[1..value.len() - 1].split(','); - - let left = match split.next() { - None => return Err(InvalidTimeInterval), - Some("-inf") => None, - Some(v) => match parse_dt(v) { - Ok(dt) => Some(dt), - _ => return Err(InvalidTimeInterval), - }, - }; - - let right = match split.next() { - None => return Err(InvalidTimeInterval), - Some("inf") => None, - Some(v) => match parse_dt(v) { - Ok(dt) => Some(dt), - _ => return Err(InvalidTimeInterval), - }, - }; - - if split.next() != None { - return Err(InvalidTimeInterval); - } - - match (lbound, left, right, rbound) { - ("(", None, None, ")") => Ok(TimeInterval::unbounded()), - ("(", None, Some(r), ")") => Ok(TimeInterval::unbounded_open_right(r)), - ("(", None, Some(r), "]") => Ok(TimeInterval::unbounded_closed_right(r)), - ("(", Some(l), None, ")") => Ok(TimeInterval::unbounded_open_left(l)), - ("[", Some(l), None, ")") => Ok(TimeInterval::unbounded_closed_left(l)), - ("(", Some(l), Some(r), ")") => TimeInterval::open(l, r), - ("(", Some(l), Some(r), "]") => TimeInterval::left_half_open(l, r), - ("[", Some(l), Some(r), ")") => TimeInterval::right_half_open(l, r), - ("[", Some(l), Some(r), "]") => { - if l == r { - Ok(TimeInterval::singleton(l)) - } else { - TimeInterval::closed(l, r) - } - } - _ => Err(InvalidTimeInterval), - } - } -} - -impl Serialize for TimeInterval { - fn serialize(&self, serializer: S) -> Result { - serializer.collect_str(self) - } -} - -struct TimeIntervalVisitor; - -impl<'de> Visitor<'de> for TimeIntervalVisitor { - type Value = TimeInterval; - - fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { - formatter.write_str("a time interval") - } - - fn visit_str(self, v: &str) -> Result { - TimeInterval::try_from(v).map_err(serde::de::Error::custom) - } -} - -// This is the trait that informs Serde how to deserialize MyMap. -impl<'de> Deserialize<'de> for TimeInterval { - fn deserialize>(deserializer: D) -> Result { - deserializer.deserialize_str(TimeIntervalVisitor) - } -} diff --git a/kamu-core/src/infra/dataset_summary.rs b/kamu-core/src/infra/dataset_summary.rs index f3dcb2e7b0..0b303784e8 100644 --- a/kamu-core/src/infra/dataset_summary.rs +++ b/kamu-core/src/infra/dataset_summary.rs @@ -11,7 +11,6 @@ use serde_with::skip_serializing_none; pub enum DatasetKind { Root, Derivative, - Remote, } #[skip_serializing_none] diff --git a/kamu-core/src/infra/metadata_chain_impl.rs b/kamu-core/src/infra/metadata_chain_impl.rs index a754cbc80b..0c29a0049c 100644 --- a/kamu-core/src/infra/metadata_chain_impl.rs +++ b/kamu-core/src/infra/metadata_chain_impl.rs @@ -22,6 +22,8 @@ impl MetadataChainImpl { meta_path: &Path, first_block: MetadataBlock, ) -> Result<(Self, Sha3_256), InfraError> { + assert_eq!(first_block.prev_block_hash, None); + std::fs::create_dir(&meta_path)?; std::fs::create_dir(meta_path.join("blocks"))?; std::fs::create_dir(meta_path.join("refs"))?; @@ -33,6 +35,32 @@ impl MetadataChainImpl { Ok((chain, hash)) } + pub fn from_blocks( + meta_path: &Path, + blocks: &mut dyn Iterator, + ) -> Result<(Self, Sha3_256), InfraError> { + std::fs::create_dir(&meta_path)?; + std::fs::create_dir(meta_path.join("blocks"))?; + std::fs::create_dir(meta_path.join("refs"))?; + + let mut chain = Self::new(meta_path); + let mut last_hash = Sha3_256::zero(); + + for b in blocks { + last_hash = if last_hash.is_zero() { + assert_eq!(b.prev_block_hash, None); + let hash = chain.write_block(&b)?; + chain.write_ref(&BlockRef::Head, &hash)?; + hash + } else { + chain.append(b) + }; + } + + assert!(!last_hash.is_zero()); + Ok((chain, last_hash)) + } + fn read_block(path: &Path) -> MetadataBlock { // TODO: Use mmap let buffer = std::fs::read(path) diff --git a/kamu-core/src/infra/metadata_repository_impl.rs b/kamu-core/src/infra/metadata_repository_impl.rs index 44fc4c7e21..5363e5f2ad 100644 --- a/kamu-core/src/infra/metadata_repository_impl.rs +++ b/kamu-core/src/infra/metadata_repository_impl.rs @@ -84,7 +84,7 @@ impl MetadataRepository for MetadataRepositoryImpl { Box::new(self.get_all_datasets_impl().unwrap()) } - fn add_dataset(&mut self, snapshot: DatasetSnapshot) -> Result<(), DomainError> { + fn add_dataset(&mut self, snapshot: DatasetSnapshot) -> Result { let dataset_metadata_dir = self.get_dataset_metadata_dir(&snapshot.id); if dataset_metadata_dir.exists() { @@ -121,7 +121,8 @@ impl MetadataRepository for MetadataRepositoryImpl { input_slices: None, }; - MetadataChainImpl::create(&dataset_metadata_dir, first_block).map_err(|e| e.into())?; + let (_chain, block_hash) = + MetadataChainImpl::create(&dataset_metadata_dir, first_block).map_err(|e| e.into())?; let summary = DatasetSummary { id: snapshot.id.clone(), @@ -134,13 +135,13 @@ impl MetadataRepository for MetadataRepositoryImpl { }; self.update_summary(&snapshot.id, summary)?; - Ok(()) + Ok(block_hash) } fn add_datasets( &mut self, snapshots: &mut dyn Iterator, - ) -> Vec<(DatasetIDBuf, Result<(), DomainError>)> { + ) -> Vec<(DatasetIDBuf, Result)> { let snapshots_ordered = self.sort_snapshots_in_dependency_order(snapshots.collect()); snapshots_ordered diff --git a/kamu-core/src/infra/mod.rs b/kamu-core/src/infra/mod.rs index bba48e83d0..c639002dc7 100644 --- a/kamu-core/src/infra/mod.rs +++ b/kamu-core/src/infra/mod.rs @@ -1,6 +1,9 @@ mod engine; pub use engine::*; +mod remote; +pub use remote::*; + pub mod explore; pub mod ingest; pub mod utils; @@ -46,5 +49,8 @@ pub use pull_service_impl::*; mod resource_loader_impl; pub use resource_loader_impl::*; +mod sync_service_impl; +pub use sync_service_impl::*; + mod transform_service_impl; pub use transform_service_impl::*; diff --git a/kamu-core/src/infra/remote/mod.rs b/kamu-core/src/infra/remote/mod.rs new file mode 100644 index 0000000000..0842343ff0 --- /dev/null +++ b/kamu-core/src/infra/remote/mod.rs @@ -0,0 +1,4 @@ +mod remote_factory; +pub use remote_factory::*; + +mod remote_local_fs; diff --git a/kamu-core/src/infra/remote/remote_factory.rs b/kamu-core/src/infra/remote/remote_factory.rs new file mode 100644 index 0000000000..3ef227b9ca --- /dev/null +++ b/kamu-core/src/infra/remote/remote_factory.rs @@ -0,0 +1,48 @@ +use crate::domain::*; + +use super::remote_local_fs::*; + +use slog::Logger; +use std::backtrace::Backtrace; +use std::sync::{Arc, Mutex}; +use thiserror::Error; + +pub struct RemoteFactory { + _logger: Logger, +} + +impl RemoteFactory { + pub fn new(logger: Logger) -> Self { + Self { _logger: logger } + } + + pub fn get_remote_client( + &mut self, + remote: &Remote, + ) -> Result>, RemoteFactoryError> { + match remote.url.scheme() { + "file" => Ok(Arc::new(Mutex::new(RemoteLocalFS::new( + remote.url.to_file_path().unwrap(), + )))), + s @ _ => Err(RemoteFactoryError::unsupported(s)), + } + } +} + +#[derive(Debug, Error)] +pub enum RemoteFactoryError { + #[error("No suitable remote implementation found for scheme \"{scheme}\"")] + Unsupported { + scheme: String, + backtrace: Backtrace, + }, +} + +impl RemoteFactoryError { + pub fn unsupported(scheme: &str) -> Self { + RemoteFactoryError::Unsupported { + scheme: scheme.to_owned(), + backtrace: Backtrace::capture(), + } + } +} diff --git a/kamu-core/src/infra/remote/remote_local_fs.rs b/kamu-core/src/infra/remote/remote_local_fs.rs new file mode 100644 index 0000000000..eac61716ee --- /dev/null +++ b/kamu-core/src/infra/remote/remote_local_fs.rs @@ -0,0 +1,179 @@ +use crate::domain::*; +use crate::infra::*; +use opendatafabric::{DatasetID, Sha3_256}; + +use std::ffi::OsStr; +use std::path::{Path, PathBuf}; + +pub struct RemoteLocalFS { + path: PathBuf, +} + +impl RemoteLocalFS { + pub fn new(path: PathBuf) -> Self { + Self { path: path } + } +} + +impl RemoteClient for RemoteLocalFS { + fn read_ref(&self, dataset_id: &DatasetID) -> Result, RemoteError> { + let ref_path: PathBuf = [ + self.path.as_ref() as &OsStr, + OsStr::new(dataset_id as &str), + OsStr::new("meta"), + OsStr::new("refs"), + OsStr::new("head"), + ] + .iter() + .collect(); + if ref_path.exists() { + let hash = std::fs::read_to_string(&ref_path) + .map_err(|e| RemoteError::ProtocolError(e.into()))?; + Ok(Some(Sha3_256::from_str(&hash).expect("Malformed hash"))) + } else { + Ok(None) + } + } + + // TODO: Locking + fn write( + &mut self, + dataset_id: &DatasetID, + expected_head: Option, + new_head: Sha3_256, + blocks: &mut dyn Iterator)>, + data_files: &mut dyn Iterator, + checkpoint_dir: &Path, + ) -> Result<(), RemoteError> { + if self.read_ref(dataset_id)? != expected_head { + return Err(RemoteError::UpdatedConcurrently); + } + + let out_dataset_dir = self.path.join(dataset_id); + let out_meta_dir = out_dataset_dir.join("meta"); + let out_blocks_dir = out_meta_dir.join("blocks"); + let out_refs_dir = out_meta_dir.join("refs"); + let out_checkpoint_dir = out_dataset_dir.join("checkpoint"); + let out_data_dir = out_dataset_dir.join("data"); + + std::fs::create_dir_all(&out_blocks_dir)?; + std::fs::create_dir_all(&out_refs_dir)?; + std::fs::create_dir_all(&out_checkpoint_dir)?; + std::fs::create_dir_all(&out_data_dir)?; + + for in_data_path in data_files { + let out_data_path = out_data_dir.join( + in_data_path + .file_name() + .expect("Data file without file_name"), + ); + if !out_data_path.exists() { + std::fs::copy(in_data_path, out_data_path)?; + } + } + + for (hash, data) in blocks { + let block_path = out_blocks_dir.join(hash.to_string()); + std::fs::write(block_path, data)?; + } + + // TODO: This is really bad but we need to + // establish proper checkpoint naming and rotation first + if checkpoint_dir.exists() { + if out_checkpoint_dir.exists() { + std::fs::remove_dir_all(&out_checkpoint_dir)?; + } + + fs_extra::dir::copy( + &checkpoint_dir, + &out_checkpoint_dir, + &fs_extra::dir::CopyOptions { + copy_inside: true, + ..fs_extra::dir::CopyOptions::default() + }, + ) + .map_err(|e| match e.kind { + fs_extra::error::ErrorKind::Io(io_error) => io_error.into(), + _ => RemoteError::ProtocolError(e.into()), + })?; + } + + std::fs::write(out_refs_dir.join("head"), &new_head.to_string().as_bytes())?; + Ok(()) + } + + fn read( + &self, + dataset_id: &DatasetID, + expected_head: Sha3_256, + last_seen_block: Option, + tmp_dir: &Path, + ) -> Result { + let in_dataset_dir = self.path.join(dataset_id); + if !in_dataset_dir.exists() { + return Err(RemoteError::DoesNotExist); + } + + let in_meta_dir = in_dataset_dir.join("meta"); + let chain = MetadataChainImpl::new(&in_meta_dir); + + if chain.read_ref(&BlockRef::Head) != Some(expected_head) { + return Err(RemoteError::UpdatedConcurrently); + } + + let mut result = RemoteReadResult { + blocks: Vec::new(), + data_files: Vec::new(), + checkpoint_dir: tmp_dir.join("checkpoint"), + }; + + let in_blocks_dir = in_meta_dir.join("blocks"); + let in_checkpoint_dir = in_dataset_dir.join("checkpoint"); + let in_data_dir = in_dataset_dir.join("data"); + + let out_data_dir = tmp_dir.join("data"); + std::fs::create_dir_all(&result.checkpoint_dir)?; + std::fs::create_dir_all(&out_data_dir)?; + + let mut found_last_seen_block = false; + + for block in chain.iter_blocks() { + if Some(block.block_hash) == last_seen_block { + found_last_seen_block = true; + break; + } + let block_path = in_blocks_dir.join(block.block_hash.to_string()); + let data = std::fs::read(block_path)?; + result.blocks.push(data); + } + + if !found_last_seen_block && last_seen_block.is_some() { + return Err(RemoteError::Diverged); + } + + // TODO: limit the set of files based on metadata + for entry in std::fs::read_dir(&in_data_dir)? { + let in_path = entry?.path(); + let out_path = + out_data_dir.join(in_path.file_name().expect("Data file without file name")); + std::fs::copy(&in_path, &out_path)?; + result.data_files.push(out_path); + } + + fs_extra::dir::copy( + &in_checkpoint_dir, + &result.checkpoint_dir, + &fs_extra::dir::CopyOptions { + content_only: true, + copy_inside: true, + ..fs_extra::dir::CopyOptions::default() + }, + ) + .map_err(|e| match e.kind { + fs_extra::error::ErrorKind::Io(io_error) => io_error.into(), + _ => RemoteError::ProtocolError(e.into()), + })?; + + Ok(result) + } +} diff --git a/kamu-core/src/infra/sync_service_impl.rs b/kamu-core/src/infra/sync_service_impl.rs new file mode 100644 index 0000000000..f31cff4b70 --- /dev/null +++ b/kamu-core/src/infra/sync_service_impl.rs @@ -0,0 +1,356 @@ +use super::*; +use crate::domain::*; +use opendatafabric::serde::yaml::*; +use opendatafabric::*; + +use chrono::Utc; +use slog::Logger; +use std::cell::RefCell; +use std::path::{Path, PathBuf}; +use std::rc::Rc; +use std::sync::{Arc, Mutex}; + +pub struct SyncServiceImpl { + workspace_layout: WorkspaceLayout, + metadata_repo: Rc>, + remote_factory: Arc>, + _logger: Logger, +} + +impl SyncServiceImpl { + pub fn new( + workspace_layout: WorkspaceLayout, + metadata_repo: Rc>, + remote_factory: Arc>, + logger: Logger, + ) -> Self { + Self { + workspace_layout: workspace_layout, + metadata_repo: metadata_repo, + remote_factory: remote_factory, + _logger: logger, + } + } + + fn update_summary( + &self, + dataset_id: &DatasetID, + blocks: Vec, + ) -> Result<(), SyncError> { + let mut metadata_repo = self.metadata_repo.borrow_mut(); + + let mut summary = match metadata_repo.get_summary(dataset_id) { + Ok(sum) => sum, + Err(DomainError::DoesNotExist { .. }) => DatasetSummary { + id: dataset_id.to_owned(), + kind: blocks + .iter() + .flat_map(|b| b.source.as_ref()) + .next() + .map(|s| match s { + DatasetSource::Root(_) => DatasetKind::Root, + DatasetSource::Derivative(_) => DatasetKind::Derivative, + }) + .expect("Chain had no source block"), + dependencies: Vec::new(), + last_pulled: None, + num_records: 0, + data_size: 0, + vocab: DatasetVocabulary::default(), + }, + Err(e) => return Err(SyncError::InternalError(e.into())), + }; + + for b in blocks.iter() { + summary.num_records += b + .output_slice + .as_ref() + .map(|s| s.num_records as u64) + .unwrap_or(0); + + match b.source.as_ref() { + Some(DatasetSource::Derivative(deriv)) => { + for id in deriv.inputs.iter() { + if !summary.dependencies.contains(id) { + summary.dependencies.push(id.clone()); + } + } + } + _ => (), + } + } + + summary.last_pulled = Some(Utc::now()); + + let volume_layout = VolumeLayout::new(&self.workspace_layout.local_volume_dir); + let dataset_layout = DatasetLayout::new(&volume_layout, dataset_id); + summary.data_size = fs_extra::dir::get_size(dataset_layout.data_dir).unwrap_or(0); + summary.data_size += fs_extra::dir::get_size(dataset_layout.checkpoints_dir).unwrap_or(0); + + metadata_repo + .update_summary(dataset_id, summary) + .map_err(|e| SyncError::InternalError(e.into()))?; + + Ok(()) + } +} + +impl SyncService for SyncServiceImpl { + fn sync_from( + &mut self, + local_dataset_id: &DatasetID, + remote_dataset_id: &DatasetID, + remote_id: &RemoteID, + _options: SyncOptions, + _listener: Option>>, + ) -> Result { + let remote = self + .metadata_repo + .borrow() + .get_remote(remote_id) + .map_err(|e| match e { + DomainError::DoesNotExist { .. } => SyncError::RemoteDoesNotExist { + remote_id: remote_id.to_owned(), + }, + _ => SyncError::InternalError(e.into()), + })?; + + let client = self + .remote_factory + .lock() + .unwrap() + .get_remote_client(&remote) + .map_err(|e| SyncError::InternalError(e.into()))?; + + let cl = client.lock().unwrap(); + + let remote_head = match cl + .read_ref(remote_dataset_id) + .map_err(|e| SyncError::ProtocolError(e.into()))? + { + Some(hash) => hash, + None => { + return Err(SyncError::RemoteDatasetDoesNotExist { + remote_id: remote_id.to_owned(), + dataset_id: remote_dataset_id.to_owned(), + }) + } + }; + + let chain = match self + .metadata_repo + .borrow() + .get_metadata_chain(local_dataset_id) + { + Ok(chain) => Some(chain), + Err(DomainError::DoesNotExist { .. }) => None, + Err(e @ _) => return Err(SyncError::InternalError(e.into())), + }; + + let local_head = chain.as_ref().and_then(|c| c.read_ref(&BlockRef::Head)); + + if Some(remote_head) == local_head { + return Ok(SyncResult::UpToDate); + } + + let tmp_dir = self.workspace_layout.run_info_dir.join(local_dataset_id); + std::fs::create_dir_all(&tmp_dir)?; + + let volume_layout = VolumeLayout::new(&self.workspace_layout.local_volume_dir); + let dataset_layout = DatasetLayout::new(&volume_layout, local_dataset_id); + + let read_result = cl + .read(remote_dataset_id, remote_head, local_head, &tmp_dir) + .map_err(|e| match e { + RemoteError::DoesNotExist => SyncError::RemoteDatasetDoesNotExist { + remote_id: remote_id.to_owned(), + dataset_id: remote_dataset_id.to_owned(), + }, + RemoteError::Diverged => SyncError::DatasetsDiverged { + remote_head: remote_head, + local_head: local_head.unwrap(), + }, + _ => SyncError::InternalError(e.into()), + })?; + + let de = YamlMetadataBlockDeserializer; + let new_blocks = read_result + .blocks + .iter() + .map(|data| de.read_manifest(&data)) + .collect::, _>>() + .map_err(|e| match e { + opendatafabric::serde::Error::InvalidHash { .. } => { + SyncError::InconsistentMetadata(e.into()) + } + _ => SyncError::ProtocolError(e.into()), + })?; + + // TODO: This is very unsafe + if dataset_layout.checkpoints_dir.exists() { + std::fs::remove_dir_all(&dataset_layout.checkpoints_dir)?; + } + std::fs::create_dir_all(&dataset_layout.checkpoints_dir)?; + if read_result.checkpoint_dir.exists() { + fs_extra::dir::move_dir( + &read_result.checkpoint_dir, + &dataset_layout.checkpoints_dir, + &fs_extra::dir::CopyOptions { + content_only: true, + ..fs_extra::dir::CopyOptions::default() + }, + )?; + } + + std::fs::create_dir_all(&dataset_layout.data_dir)?; + for data_file in read_result.data_files.iter() { + let new_data_file_path = dataset_layout + .data_dir + .join(data_file.file_name().expect("Data file without file name")); + + if !new_data_file_path.exists() { + std::fs::rename(data_file, new_data_file_path)?; + } + } + + // TODO: Remote assumption on block ordering + match chain { + None => { + MetadataChainImpl::from_blocks( + &self.workspace_layout.datasets_dir.join(local_dataset_id), + &mut new_blocks.iter().rev().map(|b| b.clone()), + ) + .map_err(|e| SyncError::InternalError(e.into()))?; + () + } + Some(mut c) => { + for block in new_blocks.iter().rev() { + c.append(block.clone()); + } + } + } + + std::fs::remove_dir_all(&tmp_dir)?; + self.update_summary(local_dataset_id, new_blocks)?; + + // TODO: race condition on remote head + Ok(SyncResult::Updated { + old_head: local_head, + new_head: remote_head, + }) + } + + fn sync_to( + &mut self, + local_dataset_id: &DatasetID, + remote_dataset_id: &DatasetID, + remote_id: &RemoteID, + _options: SyncOptions, + _listener: Option>>, + ) -> Result { + let chain = match self + .metadata_repo + .borrow() + .get_metadata_chain(local_dataset_id) + { + Ok(c) => c, + Err(DomainError::DoesNotExist { .. }) => { + return Err(SyncError::LocalDatasetDoesNotExist { + dataset_id: local_dataset_id.to_owned(), + }) + } + Err(e) => return Err(SyncError::InternalError(e.into())), + }; + + let remote = self + .metadata_repo + .borrow() + .get_remote(remote_id) + .map_err(|e| match e { + DomainError::DoesNotExist { .. } => SyncError::RemoteDoesNotExist { + remote_id: remote_id.to_owned(), + }, + _ => SyncError::InternalError(e.into()), + })?; + + let client = self + .remote_factory + .lock() + .unwrap() + .get_remote_client(&remote) + .map_err(|e| SyncError::InternalError(e.into()))?; + + let mut cl = client.lock().unwrap(); + + let remote_head = cl + .read_ref(remote_dataset_id) + .map_err(|e| SyncError::InternalError(e.into()))?; + + let local_head = chain.read_ref(&BlockRef::Head).unwrap(); + + if remote_head == Some(local_head) { + return Ok(SyncResult::UpToDate); + } + + let volume_layout = VolumeLayout::new(&self.workspace_layout.local_volume_dir); + let dataset_layout = DatasetLayout::new(&volume_layout, local_dataset_id); + let metadata_dir = self.workspace_layout.datasets_dir.join(local_dataset_id); + let blocks_dir = metadata_dir.join("blocks"); + + let mut found_remote_head = false; + + let blocks_to_sync: Vec<(Sha3_256, Vec)> = chain + .iter_blocks() + .map(|b| b.block_hash) + .take_while(|h| { + if Some(*h) == remote_head { + found_remote_head = true; + false + } else { + true + } + }) + .map(|h| { + let block_path = blocks_dir.join(h.to_string()); + let data = std::fs::read(block_path)?; + Ok((h, data)) + }) + .collect::>()?; + + if !found_remote_head && remote_head.is_some() { + return Err(SyncError::DatasetsDiverged { + local_head: local_head, + remote_head: remote_head.unwrap(), + }); + } + + let data_files_to_sync: Vec = if dataset_layout.data_dir.exists() { + std::fs::read_dir(&dataset_layout.data_dir)? + .map(|e| e.unwrap().path()) + .collect() + } else { + Vec::new() + }; + + cl.write( + remote_dataset_id, + remote_head, + local_head, + &mut blocks_to_sync.into_iter(), + &mut data_files_to_sync.iter().map(|e| e as &Path), + &dataset_layout.checkpoints_dir, + ) + .map_err(|e| match e { + RemoteError::Diverged => SyncError::DatasetsDiverged { + remote_head: remote_head.unwrap(), + local_head: local_head, + }, + _ => SyncError::InternalError(e.into()), + })?; + + Ok(SyncResult::Updated { + old_head: remote_head, + new_head: local_head, + }) + } +} diff --git a/kamu-core/tests/infra/mod.rs b/kamu-core/tests/infra/mod.rs index d1e61f320a..6bfb29c844 100644 --- a/kamu-core/tests/infra/mod.rs +++ b/kamu-core/tests/infra/mod.rs @@ -5,4 +5,5 @@ mod test_metadata_repository_impl; mod test_pull_service_impl; mod test_resource_loader_impl; mod test_serde_yaml; +mod test_sync_service_impl; mod test_transform_service_impl; diff --git a/kamu-core/tests/infra/test_sync_service_impl.rs b/kamu-core/tests/infra/test_sync_service_impl.rs new file mode 100644 index 0000000000..256ecba7fb --- /dev/null +++ b/kamu-core/tests/infra/test_sync_service_impl.rs @@ -0,0 +1,308 @@ +use kamu::domain::*; +use kamu::infra::*; +use kamu_test::*; +use opendatafabric::*; + +use assert_matches::assert_matches; +use chrono::prelude::*; +use std::cell::RefCell; +use std::path::{Path, PathBuf}; +use std::rc::Rc; +use std::sync::{Arc, Mutex}; +use url::Url; + +fn list_files(dir: &Path) -> Vec { + if !dir.exists() { + return Vec::new(); + } + + let mut v = _list_files_rec(dir); + + for path in v.iter_mut() { + *path = path.strip_prefix(dir).unwrap().to_owned(); + } + + v.sort(); + v +} + +fn _list_files_rec(dir: &Path) -> Vec { + std::fs::read_dir(dir) + .unwrap() + .flat_map(|e| { + let entry = e.unwrap(); + let path = entry.path(); + if path.is_dir() { + _list_files_rec(&path) + } else { + vec![path] + } + }) + .collect() +} + +fn assert_in_sync( + workspace_layout: &WorkspaceLayout, + local_dataset_id: &DatasetID, + remote_dataset_id: &DatasetID, + remote_dir: &Path, +) { + let local_volume_layout = VolumeLayout::new(&workspace_layout.local_volume_dir); + let local_dataset_layout = DatasetLayout::new(&local_volume_layout, local_dataset_id); + let local_meta_dir = workspace_layout.datasets_dir.join(local_dataset_id); + let local_blocks_dir = local_meta_dir.join("blocks"); + let local_refs_dir = local_meta_dir.join("refs"); + let local_checkpoint_dir = local_dataset_layout.checkpoints_dir; + let local_data_dir = local_dataset_layout.data_dir; + + let remote_dataset_dir = remote_dir.join(remote_dataset_id); + let remote_meta_dir = remote_dataset_dir.join("meta"); + let remote_blocks_dir = remote_meta_dir.join("blocks"); + let remote_refs_dir = remote_meta_dir.join("refs"); + let remote_checkpoint_dir = remote_dataset_dir.join("checkpoint"); + let remote_data_dir = remote_dataset_dir.join("data"); + + assert_eq!( + list_files(&local_blocks_dir), + list_files(&remote_blocks_dir) + ); + + assert_eq!(list_files(&local_data_dir), list_files(&remote_data_dir)); + + assert_eq!( + list_files(&local_checkpoint_dir), + list_files(&remote_checkpoint_dir), + ); + + let local_head = std::fs::read_to_string(local_refs_dir.join("head")).unwrap(); + let remote_head = std::fs::read_to_string(remote_refs_dir.join("head")).unwrap(); + assert_eq!(local_head, remote_head); +} + +fn create_fake_data_file(dataset_layout: &DatasetLayout) -> PathBuf { + let t = Utc::now(); + let file_name = format!( + "{}.snappy.parquet", + t.to_rfc3339_opts(SecondsFormat::Nanos, true), + ); + + std::fs::create_dir_all(&dataset_layout.data_dir).unwrap(); + + let path = dataset_layout.data_dir.join(file_name); + std::fs::write(&path, "".as_bytes()).unwrap(); + path +} + +#[test] +fn test_sync_to_local_fs() { + // Tests sync between "foo" -> remote -> "bar" + + let tmp_workspace_dir = tempfile::tempdir().unwrap(); + let tmp_remote_dir = tempfile::tempdir().unwrap(); + + let dataset_id = DatasetID::new_unchecked("foo"); + let dataset_id_2 = DatasetID::new_unchecked("bar"); + + let logger = slog::Logger::root(slog::Discard, slog::o!()); + let workspace_layout = WorkspaceLayout::create(tmp_workspace_dir.path()).unwrap(); + let volume_layout = VolumeLayout::new(&workspace_layout.local_volume_dir); + let dataset_layout = DatasetLayout::new(&volume_layout, dataset_id); + let metadata_repo = Rc::new(RefCell::new(MetadataRepositoryImpl::new(&workspace_layout))); + let remote_factory = Arc::new(Mutex::new(RemoteFactory::new(logger.clone()))); + + let mut sync_svc = SyncServiceImpl::new( + workspace_layout.clone(), + metadata_repo.clone(), + remote_factory.clone(), + logger.clone(), + ); + + // Add remote + let remote_id = String::from("remote"); + let remote_url = Url::from_directory_path(tmp_remote_dir.path()).unwrap(); + metadata_repo + .borrow_mut() + .add_remote(&remote_id, remote_url) + .unwrap(); + + // Dataset does not exist locally / remotely ////////////////////////////// + assert_matches!( + sync_svc.sync_to( + dataset_id, + dataset_id, + &remote_id, + SyncOptions::default(), + None, + ), + Err(SyncError::LocalDatasetDoesNotExist { .. }) + ); + + assert_matches!( + sync_svc.sync_from( + dataset_id_2, + dataset_id, + &remote_id, + SyncOptions::default(), + None, + ), + Err(SyncError::RemoteDatasetDoesNotExist { .. }) + ); + + // Add dataset + let snapshot = MetadataFactory::dataset_snapshot() + .id(&dataset_id) + .source(MetadataFactory::dataset_source_root().build()) + .build(); + + let b1 = metadata_repo.borrow_mut().add_dataset(snapshot).unwrap(); + + // Initial sync /////////////////////////////////////////////////////////// + assert_matches!( + sync_svc.sync_to(dataset_id, dataset_id, &remote_id, SyncOptions::default(), None), + Ok(SyncResult::Updated { + old_head: None, + new_head, + }) if new_head == b1 + ); + assert_in_sync( + &workspace_layout, + dataset_id, + dataset_id, + tmp_remote_dir.path(), + ); + + assert_matches!( + sync_svc.sync_from(dataset_id_2, dataset_id, &remote_id, SyncOptions::default(), None), + Ok(SyncResult::Updated { + old_head: None, + new_head, + }) if new_head == b1 + ); + assert_in_sync( + &workspace_layout, + dataset_id_2, + dataset_id, + tmp_remote_dir.path(), + ); + + // Subsequent sync //////////////////////////////////////////////////////// + create_fake_data_file(&dataset_layout); + let b2 = metadata_repo + .borrow_mut() + .get_metadata_chain(dataset_id) + .unwrap() + .append( + MetadataFactory::metadata_block() + .prev(&b1) + .output_slice(DataSlice { + hash: Sha3_256::zero(), + interval: TimeInterval::singleton(Utc::now()), + num_records: 10, + }) + .build(), + ); + + create_fake_data_file(&dataset_layout); + let b3 = metadata_repo + .borrow_mut() + .get_metadata_chain(dataset_id) + .unwrap() + .append( + MetadataFactory::metadata_block() + .prev(&b2) + .output_slice(DataSlice { + hash: Sha3_256::zero(), + interval: TimeInterval::singleton(Utc::now()), + num_records: 20, + }) + .build(), + ); + + let checkpoint_dir = dataset_layout.checkpoints_dir.join(b3.to_string()); + std::fs::create_dir_all(&checkpoint_dir).unwrap(); + std::fs::write( + &checkpoint_dir.join("checkpoint_data.bin"), + "".as_bytes(), + ) + .unwrap(); + + assert_matches!( + sync_svc.sync_from(dataset_id, dataset_id, &remote_id, SyncOptions::default(), None), + Err(SyncError::DatasetsDiverged { local_head, remote_head}) + if local_head == b3 && remote_head == b1 + ); + + assert_matches!( + sync_svc.sync_to(dataset_id, dataset_id, &remote_id, SyncOptions::default(), None), + Ok(SyncResult::Updated { + old_head, + new_head, + }) if old_head == Some(b1) && new_head == b3 + ); + assert_in_sync( + &workspace_layout, + dataset_id, + dataset_id, + tmp_remote_dir.path(), + ); + + assert_matches!( + sync_svc.sync_from(dataset_id_2, dataset_id, &remote_id, SyncOptions::default(), None), + Ok(SyncResult::Updated { + old_head, + new_head, + }) if old_head == Some(b1) && new_head == b3 + ); + assert_in_sync( + &workspace_layout, + dataset_id_2, + dataset_id, + tmp_remote_dir.path(), + ); + + // Up to date ///////////////////////////////////////////////////////////// + assert_matches!( + sync_svc.sync_to( + dataset_id, + dataset_id, + &remote_id, + SyncOptions::default(), + None + ), + Ok(SyncResult::UpToDate) + ); + assert_in_sync( + &workspace_layout, + dataset_id, + dataset_id, + tmp_remote_dir.path(), + ); + + assert_matches!( + sync_svc.sync_from( + dataset_id_2, + dataset_id, + &remote_id, + SyncOptions::default(), + None + ), + Ok(SyncResult::UpToDate) + ); + assert_in_sync( + &workspace_layout, + dataset_id_2, + dataset_id, + tmp_remote_dir.path(), + ); + + // Datasets diverged on push ////////////////////////////////////////////// + let mut remote_chain = + MetadataChainImpl::new(&tmp_remote_dir.path().join(dataset_id).join("meta")); + let diverged_head = remote_chain.append(MetadataFactory::metadata_block().prev(&b3).build()); + + assert_matches!( + sync_svc.sync_to(dataset_id, dataset_id, &remote_id, SyncOptions::default(), None), + Err(SyncError::DatasetsDiverged { local_head, remote_head }) + if local_head == b3 && remote_head == diverged_head + ); +} diff --git a/opendatafabric/Cargo.toml b/opendatafabric/Cargo.toml index b7483e39bb..3a0f2b29b6 100644 --- a/opendatafabric/Cargo.toml +++ b/opendatafabric/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "opendatafabric" -version = "0.33.0" +version = "0.34.0" authors = ["Sergii Mikhtoniuk "] edition = "2018"