diff --git a/CHANGELOG.md b/CHANGELOG.md index 9d42bd43fd..1f24c5ad79 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,7 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## Unreleased +## [0.36.0] - 2020-11-16 ### Changed - Engine errors will not list all relevant log files - UI improvements diff --git a/Cargo.lock b/Cargo.lock index f113b73fb2..2aa5c2de1c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -63,7 +63,7 @@ checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" [[package]] name = "arrow" version = "3.0.0-SNAPSHOT" -source = "git+https://github.com/apache/arrow#e2f1e0107c555a544b43b0721cb7f4ded4f074a3" +source = "git+https://github.com/apache/arrow#e5fce7f6a70c370c758b133f025444c98cdd305d" dependencies = [ "chrono", "csv", @@ -1000,7 +1000,7 @@ dependencies = [ [[package]] name = "kamu" -version = "0.35.0" +version = "0.36.0" dependencies = [ "assert_matches", "bytes", @@ -1040,7 +1040,7 @@ dependencies = [ [[package]] name = "kamu-cli" -version = "0.35.0" +version = "0.36.0" dependencies = [ "chrono", "chrono-humanize", @@ -1070,7 +1070,7 @@ dependencies = [ [[package]] name = "kamu-test" -version = "0.35.0" +version = "0.36.0" dependencies = [ "chrono", "opendatafabric", @@ -1218,7 +1218,7 @@ checksum = "0840c1c50fd55e521b247f949c241c9997709f23bd7f023b9762cd561e935656" dependencies = [ "log", "mio", - "miow 0.3.5", + "miow 0.3.6", "winapi 0.3.9", ] @@ -1247,9 +1247,9 @@ dependencies = [ [[package]] name = "miow" -version = "0.3.5" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07b88fb9795d4d36d62a012dfbf49a8f5cf12751f36d31a9dbe66d528e58979e" +checksum = "5a33c1b55807fbed163481b5ba66db4b2fa6cde694a5027be10fb724206c5897" dependencies = [ "socket2", "winapi 0.3.9", @@ -1388,7 +1388,7 @@ checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" [[package]] name = "opendatafabric" -version = "0.35.0" +version = "0.36.0" dependencies = [ "byteorder", "chrono", @@ -1444,7 +1444,7 @@ dependencies = [ [[package]] name = "parquet" version = "3.0.0-SNAPSHOT" -source = "git+https://github.com/apache/arrow#e2f1e0107c555a544b43b0721cb7f4ded4f074a3" +source = "git+https://github.com/apache/arrow#e5fce7f6a70c370c758b133f025444c98cdd305d" dependencies = [ "arrow", "base64 0.13.0", @@ -2160,9 +2160,9 @@ checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" [[package]] name = "standback" -version = "0.2.11" +version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4e0831040d2cf2bdfd51b844be71885783d489898a192f254ae25d57cce725c" +checksum = "cf906c8b8fc3f6ecd1046e01da1d8ddec83e48c8b08b84dcc02b585a6bedf5a8" dependencies = [ "version_check", ] diff --git a/kamu-cli/Cargo.toml b/kamu-cli/Cargo.toml index 1685d39939..fa0113812d 100644 --- a/kamu-cli/Cargo.toml +++ b/kamu-cli/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "kamu-cli" -version = "0.35.0" +version = "0.36.0" description = "Decentralized data management tool" authors = ["Sergii Mikhtoniuk "] license = "MPL-2.0" diff --git a/kamu-cli/src/commands/notebook_command.rs b/kamu-cli/src/commands/notebook_command.rs index 92fbcc1b0d..a86c25f6ab 100644 --- a/kamu-cli/src/commands/notebook_command.rs +++ b/kamu-cli/src/commands/notebook_command.rs @@ -1,5 +1,6 @@ use super::{Command, Error}; use crate::output::OutputConfig; +use kamu::domain::PullImageListener; use kamu::infra::explore::*; use kamu::infra::*; @@ -65,6 +66,9 @@ impl Command for NotebookCommand { .collect::, _>>()?; let spinner = if self.output_config.verbosity_level == 0 { + let mut pull_progress = PullImageProgress { progress_bar: None }; + NotebookServerImpl::ensure_images(&mut pull_progress); + let s = indicatif::ProgressBar::new_spinner(); s.set_style( indicatif::ProgressStyle::default_spinner().template("{spinner:.cyan} {msg}"), @@ -99,3 +103,21 @@ impl Command for NotebookCommand { Ok(()) } } + +struct PullImageProgress { + progress_bar: Option, +} + +impl PullImageListener for PullImageProgress { + fn begin(&mut self, image: &str) { + let s = indicatif::ProgressBar::new_spinner(); + s.set_style(indicatif::ProgressStyle::default_spinner().template("{spinner:.cyan} {msg}")); + s.set_message(&format!("Pulling docker image {}", image)); + s.enable_steady_tick(100); + self.progress_bar = Some(s); + } + + fn success(&mut self) { + self.progress_bar = None; + } +} diff --git a/kamu-cli/src/commands/pull_command.rs b/kamu-cli/src/commands/pull_command.rs index b6028f3a36..dfa6f034d6 100644 --- a/kamu-cli/src/commands/pull_command.rs +++ b/kamu-cli/src/commands/pull_command.rs @@ -397,13 +397,33 @@ impl IngestListener for PrettyIngestProgress { console::style("Failed to update root dataset").red(), )); } + + fn get_pull_image_listener(&mut self) -> Option<&mut dyn PullImageListener> { + Some(self) + } +} + +impl PullImageListener for PrettyIngestProgress { + fn begin(&mut self, image: &str) { + // This currently happens during the Read stage + self.curr_progress.set_message(&Self::spinner_message( + &self.dataset_id, + IngestStage::Read as u32, + format!("Pulling engine image {}", image), + )); + } + + fn success(&mut self) { + self.curr_progress.finish(); + self.on_stage_progress(self.stage, 0, 0); + } } /////////////////////////////////////////////////////////////////////////////// struct PrettyTransformProgress { dataset_id: DatasetIDBuf, - //multi_progress: Arc, + multi_progress: Arc, curr_progress: indicatif::ProgressBar, } @@ -416,7 +436,7 @@ impl PrettyTransformProgress { 0, "Applying derivative transformations", ))), - //multi_progress: multi_progress, + multi_progress: multi_progress, } } @@ -462,4 +482,29 @@ impl TransformListener for PrettyTransformProgress { console::style("Failed to update derivative dataset").red(), )); } + + fn get_pull_image_listener(&mut self) -> Option<&mut dyn PullImageListener> { + Some(self) + } +} + +impl PullImageListener for PrettyTransformProgress { + fn begin(&mut self, image: &str) { + self.curr_progress.set_message(&Self::spinner_message( + &self.dataset_id, + 0, + format!("Pulling engine image {}", image), + )); + } + + fn success(&mut self) { + self.curr_progress.finish(); + self.curr_progress = self + .multi_progress + .add(Self::new_spinner(&Self::spinner_message( + &self.dataset_id, + 0, + "Applying derivative transformations", + ))); + } } diff --git a/kamu-cli/src/commands/sql_shell_command.rs b/kamu-cli/src/commands/sql_shell_command.rs index ba9267e294..ca8c418eeb 100644 --- a/kamu-cli/src/commands/sql_shell_command.rs +++ b/kamu-cli/src/commands/sql_shell_command.rs @@ -1,5 +1,6 @@ use super::{Command, Error}; use crate::output::*; +use kamu::domain::PullImageListener; use kamu::infra::explore::*; use kamu::infra::*; @@ -34,6 +35,9 @@ impl SqlShellCommand { impl Command for SqlShellCommand { fn run(&mut self) -> Result<(), Error> { let spinner = if self.output_config.verbosity_level == 0 { + let mut pull_progress = PullImageProgress { progress_bar: None }; + SqlShellImpl::ensure_images(&mut pull_progress); + let s = indicatif::ProgressBar::new_spinner(); s.set_style( indicatif::ProgressStyle::default_spinner().template("{spinner:.cyan} {msg}"), @@ -64,3 +68,21 @@ impl Command for SqlShellCommand { Ok(()) } } + +struct PullImageProgress { + progress_bar: Option, +} + +impl PullImageListener for PullImageProgress { + fn begin(&mut self, image: &str) { + let s = indicatif::ProgressBar::new_spinner(); + s.set_style(indicatif::ProgressStyle::default_spinner().template("{spinner:.cyan} {msg}")); + s.set_message(&format!("Pulling docker image {}", image)); + s.enable_steady_tick(100); + self.progress_bar = Some(s); + } + + fn success(&mut self) { + self.progress_bar = None; + } +} diff --git a/kamu-core-test/Cargo.toml b/kamu-core-test/Cargo.toml index 833da30708..5e0310b198 100644 --- a/kamu-core-test/Cargo.toml +++ b/kamu-core-test/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "kamu-test" -version = "0.35.0" +version = "0.36.0" authors = ["Sergii Mikhtoniuk "] edition = "2018" diff --git a/kamu-core/Cargo.toml b/kamu-core/Cargo.toml index 2586032edb..665f0d6475 100644 --- a/kamu-core/Cargo.toml +++ b/kamu-core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "kamu" -version = "0.35.0" +version = "0.36.0" authors = ["Sergii Mikhtoniuk "] edition = "2018" diff --git a/kamu-core/src/domain/engine.rs b/kamu-core/src/domain/engine.rs index 508d2dbbf1..3c53ced2ac 100644 --- a/kamu-core/src/domain/engine.rs +++ b/kamu-core/src/domain/engine.rs @@ -16,6 +16,14 @@ pub trait Engine { fn transform(&self, request: ExecuteQueryRequest) -> Result; } +pub trait PullImageListener { + fn begin(&mut self, _image: &str) {} + fn success(&mut self) {} +} + +pub struct NullPullImageListener; +impl PullImageListener for NullPullImageListener {} + /////////////////////////////////////////////////////////////////////////////// // Request / Response DTOs /////////////////////////////////////////////////////////////////////////////// diff --git a/kamu-core/src/domain/ingest_service.rs b/kamu-core/src/domain/ingest_service.rs index d5efe1fe30..70a2a736b7 100644 --- a/kamu-core/src/domain/ingest_service.rs +++ b/kamu-core/src/domain/ingest_service.rs @@ -1,4 +1,4 @@ -use super::EngineError; +use super::{EngineError, PullImageListener}; use opendatafabric::{DatasetID, DatasetIDBuf, Sha3_256}; use std::backtrace::Backtrace; @@ -75,6 +75,10 @@ pub trait IngestListener: Send { fn success(&mut self, _result: &IngestResult) {} fn uncacheable(&mut self) {} fn error(&mut self, _error: &IngestError) {} + + fn get_pull_image_listener(&mut self) -> Option<&mut dyn PullImageListener> { + None + } } pub struct NullIngestListener; diff --git a/kamu-core/src/domain/transform_service.rs b/kamu-core/src/domain/transform_service.rs index e005258e14..86e61550b8 100644 --- a/kamu-core/src/domain/transform_service.rs +++ b/kamu-core/src/domain/transform_service.rs @@ -1,4 +1,4 @@ -use super::EngineError; +use super::{EngineError, PullImageListener}; use opendatafabric::{DatasetID, DatasetIDBuf, Sha3_256}; use std::backtrace::Backtrace; @@ -37,6 +37,10 @@ pub trait TransformListener: Send { fn begin(&mut self) {} fn success(&mut self, _result: &TransformResult) {} fn error(&mut self, _error: &TransformError) {} + + fn get_pull_image_listener(&mut self) -> Option<&mut dyn PullImageListener> { + None + } } pub struct NullTransformListener; diff --git a/kamu-core/src/infra/engine/engine_factory.rs b/kamu-core/src/infra/engine/engine_factory.rs index 4b4f6176ae..7d543e8062 100644 --- a/kamu-core/src/infra/engine/engine_factory.rs +++ b/kamu-core/src/infra/engine/engine_factory.rs @@ -1,4 +1,5 @@ use crate::domain::*; +use crate::infra::utils::docker_client::DockerClient; use crate::infra::utils::docker_images; use crate::infra::*; @@ -6,11 +7,15 @@ use super::engine_flink::*; use super::engine_spark::*; use slog::{o, Logger}; +use std::collections::HashSet; +use std::process::Stdio; use std::sync::{Arc, Mutex}; pub struct EngineFactory { spark_engine: Arc>, flink_engine: Arc>, + docker_client: DockerClient, + known_images: HashSet, } impl EngineFactory { @@ -26,14 +31,47 @@ impl EngineFactory { workspace_layout, logger.new(o!("engine" => "flink")), ))), + docker_client: DockerClient::new(), + known_images: HashSet::new(), } } - pub fn get_engine(&mut self, engine_id: &str) -> Result>, EngineError> { - match engine_id { - "spark" => Ok(self.spark_engine.clone()), - "flink" => Ok(self.flink_engine.clone()), + pub fn get_engine( + &mut self, + engine_id: &str, + maybe_listener: Option<&mut dyn PullImageListener>, + ) -> Result>, EngineError> { + let (engine, image) = match engine_id { + "spark" => Ok(( + self.spark_engine.clone() as Arc>, + docker_images::SPARK, + )), + "flink" => Ok(( + self.flink_engine.clone() as Arc>, + docker_images::FLINK, + )), _ => Err(EngineError::not_found(engine_id)), + }?; + + if !self.known_images.contains(image) { + if !self.docker_client.has_image(image) { + let mut null_listener = NullPullImageListener; + let listener = maybe_listener.unwrap_or(&mut null_listener); + + listener.begin(image); + + // TODO: Return better errors + self.docker_client + .pull_cmd(image) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .status()?; + + listener.success(); + } + self.known_images.insert(image.to_owned()); } + + Ok(engine) } } diff --git a/kamu-core/src/infra/explore/notebook_server_impl.rs b/kamu-core/src/infra/explore/notebook_server_impl.rs index 8bc798a235..a383f9d25a 100644 --- a/kamu-core/src/infra/explore/notebook_server_impl.rs +++ b/kamu-core/src/infra/explore/notebook_server_impl.rs @@ -1,17 +1,24 @@ +use crate::domain::PullImageListener; use crate::infra::utils::docker_client::*; use crate::infra::utils::docker_images; use crate::infra::*; +use slog::{info, Logger}; use std::fs::File; use std::path::{Path, PathBuf}; use std::process::Stdio; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use slog::{info, Logger}; pub struct NotebookServerImpl; impl NotebookServerImpl { + pub fn ensure_images(listener: &mut dyn PullImageListener) { + let docker_client = DockerClient::new(); + docker_client.ensure_image(docker_images::LIVY, Some(listener)); + docker_client.ensure_image(docker_images::JUPYTER, Some(listener)); + } + pub fn run( workspace_layout: &WorkspaceLayout, volume_layout: &VolumeLayout, @@ -37,28 +44,28 @@ impl NotebookServerImpl { let jupyter_stdout_path = workspace_layout.run_info_dir.join("jupyter.out.txt"); let jupyter_stderr_path = workspace_layout.run_info_dir.join("jupyter.err.txt"); - let mut livy_cmd = docker_client - .run_cmd(DockerRunArgs { - image: docker_images::LIVY.to_owned(), - container_name: Some("kamu-livy".to_owned()), - hostname: Some("kamu-livy".to_owned()), - network: Some(network_name.to_owned()), - args: vec!["livy".to_owned()], - user: Some("root".to_owned()), - volume_map: if volume_layout.data_dir.exists() { - vec![( - volume_layout.data_dir.clone(), - PathBuf::from("/opt/spark/work-dir"), - )] - } else { - vec![] - }, - ..DockerRunArgs::default() - }); + let mut livy_cmd = docker_client.run_cmd(DockerRunArgs { + image: docker_images::LIVY.to_owned(), + container_name: Some("kamu-livy".to_owned()), + hostname: Some("kamu-livy".to_owned()), + network: Some(network_name.to_owned()), + args: vec!["livy".to_owned()], + user: Some("root".to_owned()), + volume_map: if volume_layout.data_dir.exists() { + vec![( + volume_layout.data_dir.clone(), + PathBuf::from("/opt/spark/work-dir"), + )] + } else { + vec![] + }, + ..DockerRunArgs::default() + }); info!(logger, "Starting Livy container"; "command" => ?livy_cmd); - let mut livy = livy_cmd.stdout(if inherit_stdio { + let mut livy = livy_cmd + .stdout(if inherit_stdio { Stdio::inherit() } else { Stdio::from(File::create(&livy_stdout_path)?) @@ -71,16 +78,15 @@ impl NotebookServerImpl { .spawn()?; let _drop_livy = DropContainer::new(docker_client.clone(), "kamu-livy"); - let mut jupyter_cmd = docker_client - .run_cmd(DockerRunArgs { - image: docker_images::JUPYTER.to_owned(), - container_name: Some("kamu-jupyter".to_owned()), - network: Some(network_name.to_owned()), - expose_ports: vec![80], - volume_map: vec![(cwd.clone(), PathBuf::from("/opt/workdir"))], - environment_vars: environment_vars, - ..DockerRunArgs::default() - }); + let mut jupyter_cmd = docker_client.run_cmd(DockerRunArgs { + image: docker_images::JUPYTER.to_owned(), + container_name: Some("kamu-jupyter".to_owned()), + network: Some(network_name.to_owned()), + expose_ports: vec![80], + volume_map: vec![(cwd.clone(), PathBuf::from("/opt/workdir"))], + environment_vars: environment_vars, + ..DockerRunArgs::default() + }); info!(logger, "Starting Jupyter container"; "command" => ?jupyter_cmd); diff --git a/kamu-core/src/infra/explore/sql_shell_impl.rs b/kamu-core/src/infra/explore/sql_shell_impl.rs index 7d53d1ef2e..9839385bf3 100644 --- a/kamu-core/src/infra/explore/sql_shell_impl.rs +++ b/kamu-core/src/infra/explore/sql_shell_impl.rs @@ -1,3 +1,4 @@ +use crate::domain::PullImageListener; use crate::infra::utils::docker_client::*; use crate::infra::utils::docker_images; use crate::infra::*; @@ -13,6 +14,11 @@ pub struct SqlShellImpl; // TODO: Need to allocate pseudo-terminal to perfectly forward to the shell impl SqlShellImpl { + pub fn ensure_images(listener: &mut dyn PullImageListener) { + let docker_client = DockerClient::new(); + docker_client.ensure_image(docker_images::SPARK, Some(listener)); + } + pub fn run( workspace_layout: &WorkspaceLayout, volume_layout: &VolumeLayout, @@ -53,10 +59,7 @@ impl SqlShellImpl { PathBuf::from("/opt/spark/kamu_data"), ), (cwd, PathBuf::from("/opt/spark/kamu_shell")), - ( - init_script_path, - PathBuf::from("/opt/spark/shell_init.sql"), - ), + (init_script_path, PathBuf::from("/opt/spark/shell_init.sql")), ] } else { vec![] diff --git a/kamu-core/src/infra/ingest/ingest_task.rs b/kamu-core/src/infra/ingest/ingest_task.rs index 8b308a54fc..d7c95d0763 100644 --- a/kamu-core/src/infra/ingest/ingest_task.rs +++ b/kamu-core/src/infra/ingest/ingest_task.rs @@ -241,6 +241,7 @@ impl IngestTask { prep_result.checkpoint.last_prepared, old_checkpoint, &self.layout.cache_dir.join("prepared.bin"), + self.listener.clone(), ) }, ) diff --git a/kamu-core/src/infra/ingest/read_service.rs b/kamu-core/src/infra/ingest/read_service.rs index d2824c53f6..4ecab32778 100644 --- a/kamu-core/src/infra/ingest/read_service.rs +++ b/kamu-core/src/infra/ingest/read_service.rs @@ -33,8 +33,12 @@ impl ReadService { for_prepared_at: DateTime, _old_checkpoint: Option, src_path: &Path, + listener: Arc>, ) -> Result, IngestError> { - let engine = self.engine_factory.lock().unwrap().get_engine("spark")?; + let engine = self.engine_factory.lock().unwrap().get_engine( + "spark", + listener.lock().unwrap().get_pull_image_listener(), + )?; let request = IngestRequest { dataset_id: dataset_id.to_owned(), diff --git a/kamu-core/src/infra/transform_service_impl.rs b/kamu-core/src/infra/transform_service_impl.rs index 79d29d3f79..c8bafe3e8d 100644 --- a/kamu-core/src/infra/transform_service_impl.rs +++ b/kamu-core/src/infra/transform_service_impl.rs @@ -39,7 +39,7 @@ impl TransformServiceImpl { ) -> Result { listener.lock().unwrap().begin(); - match Self::do_transform_inner(request, meta_chain, engine_factory) { + match Self::do_transform_inner(request, meta_chain, engine_factory, listener.clone()) { Ok(res) => { listener.lock().unwrap().success(&res); Ok(res) @@ -56,15 +56,16 @@ impl TransformServiceImpl { request: ExecuteQueryRequest, mut meta_chain: Box, engine_factory: Arc>, + listener: Arc>, ) -> Result { let prev_hash = meta_chain.read_ref(&BlockRef::Head); - let engine = engine_factory - .lock() - .unwrap() - .get_engine(match request.source.transform { + let engine = engine_factory.lock().unwrap().get_engine( + match request.source.transform { Transform::Sql(ref sql) => &sql.engine, - })?; + }, + listener.lock().unwrap().get_pull_image_listener(), + )?; let result = engine.lock().unwrap().transform(request)?; diff --git a/kamu-core/src/infra/utils/docker_client.rs b/kamu-core/src/infra/utils/docker_client.rs index e282ff9100..3c97a311ed 100644 --- a/kamu-core/src/infra/utils/docker_client.rs +++ b/kamu-core/src/infra/utils/docker_client.rs @@ -1,3 +1,5 @@ +use crate::domain::{NullPullImageListener, PullImageListener}; + use std::path::PathBuf; use std::process::{Command, Stdio}; use std::time::{Duration, Instant}; @@ -88,6 +90,25 @@ impl DockerClient { .success() } + pub fn ensure_image(&self, image: &str, maybe_listener: Option<&mut dyn PullImageListener>) { + let mut null_listener = NullPullImageListener; + let listener = maybe_listener.unwrap_or(&mut null_listener); + + if !self.has_image(image) { + listener.begin(image); + + // TODO: Handle pull errors gracefully + self.pull_cmd(image) + .stdin(Stdio::null()) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .status() + .expect("Failed to pull image"); + + listener.success(); + } + } + pub fn pull_cmd(&self, image: &str) -> Command { let mut cmd = Command::new("docker"); cmd.arg("pull"); diff --git a/opendatafabric/Cargo.toml b/opendatafabric/Cargo.toml index 75b9ffa778..daffe20358 100644 --- a/opendatafabric/Cargo.toml +++ b/opendatafabric/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "opendatafabric" -version = "0.35.0" +version = "0.36.0" authors = ["Sergii Mikhtoniuk "] edition = "2018"