From 2c660af651d8c664f14b412b1f6b82d46af7ec79 Mon Sep 17 00:00:00 2001 From: Adrian Benavides Date: Fri, 12 Apr 2024 09:56:40 +0200 Subject: [PATCH] feat(rust): support `foreground` flag when creating a node with a config file --- .../rust/ockam/ockam_command/src/lib.rs | 1 - .../ockam/ockam_command/src/node/create.rs | 30 ++++---- .../ockam_command/src/node/create/config.rs | 34 +++++---- .../src/node/create/foreground.rs | 72 ++++++++++++++----- .../rust/ockam/ockam_command/src/shutdown.rs | 65 ----------------- .../rust/ockam/ockam_command/src/util/mod.rs | 15 +--- 6 files changed, 95 insertions(+), 122 deletions(-) delete mode 100644 implementations/rust/ockam/ockam_command/src/shutdown.rs diff --git a/implementations/rust/ockam/ockam_command/src/lib.rs b/implementations/rust/ockam/ockam_command/src/lib.rs index d4bb06bac55..e40fdea50f4 100644 --- a/implementations/rust/ockam/ockam_command/src/lib.rs +++ b/implementations/rust/ockam/ockam_command/src/lib.rs @@ -61,7 +61,6 @@ mod secure_channel; mod service; #[cfg(feature = "orchestrator")] mod share; -pub mod shutdown; mod sidecar; mod space; mod status; diff --git a/implementations/rust/ockam/ockam_command/src/node/create.rs b/implementations/rust/ockam/ockam_command/src/node/create.rs index 8466b7002d0..a27af99904d 100644 --- a/implementations/rust/ockam/ockam_command/src/node/create.rs +++ b/implementations/rust/ockam/ockam_command/src/node/create.rs @@ -117,20 +117,6 @@ impl Default for CreateCommand { impl Command for CreateCommand { const NAME: &'static str = "node create"; - async fn async_run(self, ctx: &Context, opts: CommandGlobalOpts) -> Result<()> { - let ctx = ctx.async_try_clone().await.into_diagnostic()?; - if self.has_name_arg() { - if self.foreground { - self.foreground_mode(&ctx, opts).await?; - } else { - self.background_mode(&ctx, opts).await?; - } - } else { - self.run_config(&ctx, &opts).await?; - } - Ok(()) - } - #[instrument(skip_all)] fn run(self, opts: CommandGlobalOpts) -> miette::Result<()> { if self.has_name_arg() { @@ -151,10 +137,24 @@ impl Command for CreateCommand { } } else { return async_cmd(&self.name(), opts.clone(), |ctx| async move { - self.run_config(&ctx, &opts).await + self.run_config(&ctx, opts).await }); } } + + async fn async_run(self, ctx: &Context, opts: CommandGlobalOpts) -> Result<()> { + let ctx = ctx.async_try_clone().await.into_diagnostic()?; + if self.has_name_arg() { + if self.foreground { + self.foreground_mode(&ctx, opts).await?; + } else { + self.background_mode(&ctx, opts).await?; + } + } else { + self.run_config(&ctx, opts).await?; + } + Ok(()) + } } impl CreateCommand { diff --git a/implementations/rust/ockam/ockam_command/src/node/create/config.rs b/implementations/rust/ockam/ockam_command/src/node/create/config.rs index 7a33250fb07..e8e284e63f7 100644 --- a/implementations/rust/ockam/ockam_command/src/node/create/config.rs +++ b/implementations/rust/ockam/ockam_command/src/node/create/config.rs @@ -13,7 +13,7 @@ use tracing::{instrument, Span}; impl CreateCommand { #[instrument(skip_all, fields(app.event.command.configuration_file))] - pub async fn run_config(self, ctx: &Context, opts: &CommandGlobalOpts) -> miette::Result<()> { + pub async fn run_config(self, ctx: &Context, opts: CommandGlobalOpts) -> miette::Result<()> { let contents = async_parse_path_or_url(&self.name).await?; // Set environment variables from the cli command args for (key, value) in &self.variables { @@ -27,8 +27,13 @@ impl CreateCommand { ); let mut config = NodeConfig::new(&contents)?; - let node_name = config.merge(self)?; - config.run(ctx, opts.clone(), &node_name).await?; + let node_name = config.merge(&self)?; + config.run(ctx, &opts, &node_name).await?; + + if self.foreground { + self.wait_for_exit_signal(ctx, opts).await?; + } + Ok(()) } } @@ -59,8 +64,8 @@ impl NodeConfig { } /// Merge the arguments of the node defined in the config with the arguments from the - /// "create" command, giving precedence to the config values. - fn merge(&mut self, cli_args: CreateCommand) -> miette::Result { + /// "create" command, giving precedence to the config values. Returns the node name. + fn merge(&mut self, cli_args: &CreateCommand) -> miette::Result { // Set environment variables from the cli command again // to override the duplicate entries from the config file. for (key, value) in &cli_args.variables { @@ -84,13 +89,18 @@ impl NodeConfig { self.node.exit_on_eof = Some(ArgValue::Bool(cli_args.exit_on_eof)); } if self.node.tcp_listener_address.is_none() { - self.node.tcp_listener_address = Some(ArgValue::String(cli_args.tcp_listener_address)); + self.node.tcp_listener_address = + Some(ArgValue::String(cli_args.tcp_listener_address.clone())); } if self.node.identity.is_none() { - self.node.identity = cli_args.identity.map(ArgValue::String); + self.node.identity = cli_args.identity.clone().map(ArgValue::String); } if self.node.project.is_none() { - self.node.project = cli_args.trust_opts.project_name.map(ArgValue::String); + self.node.project = cli_args + .trust_opts + .project_name + .clone() + .map(ArgValue::String); } let node_name = self.node.name.as_ref().unwrap().to_string(); @@ -100,7 +110,7 @@ impl NodeConfig { pub async fn run( self, ctx: &Context, - opts: CommandGlobalOpts, + opts: &CommandGlobalOpts, node_name: &str, ) -> miette::Result<()> { let overrides = &ValuesOverrides::default().with_override_node_name(node_name); @@ -117,7 +127,7 @@ impl NodeConfig { // Run commands for cmd in commands { - cmd.run(ctx, &opts).await? + cmd.run(ctx, opts).await? } Ok(()) } @@ -162,7 +172,7 @@ mod tests { // No node config, cli args should be used let mut config = NodeConfig::parse("").unwrap(); - let node_name = config.merge(cli_args.clone()).unwrap(); + let node_name = config.merge(&cli_args).unwrap(); let node = config .node .parse_commands(&ValuesOverrides::default()) @@ -185,7 +195,7 @@ mod tests { "#, ) .unwrap(); - let node_name = config.merge(cli_args).unwrap(); + let node_name = config.merge(&cli_args).unwrap(); let node = config .node .parse_commands(&ValuesOverrides::default()) diff --git a/implementations/rust/ockam/ockam_command/src/node/create/foreground.rs b/implementations/rust/ockam/ockam_command/src/node/create/foreground.rs index 22b18e5d52c..dcf19293600 100644 --- a/implementations/rust/ockam/ockam_command/src/node/create/foreground.rs +++ b/implementations/rust/ockam/ockam_command/src/node/create/foreground.rs @@ -1,24 +1,27 @@ +use std::io; +use std::io::Read; +use std::sync::atomic::AtomicBool; use std::sync::Arc; use colorful::Colorful; use miette::{miette, IntoDiagnostic}; use tokio::time::{sleep, Duration}; -use tracing::{debug, instrument}; +use tracing::{debug, info, instrument}; use ockam::{Address, TcpListenerOptions}; use ockam::{Context, TcpTransport}; -use ockam_api::fmt_ok; use ockam_api::nodes::InMemoryNode; use ockam_api::nodes::{ service::{NodeManagerGeneralOptions, NodeManagerTransportOptions}, NodeManagerWorker, NODEMANAGER_ADDR, }; +use ockam_api::{fmt_ok, fmt_warn}; use ockam_core::{route, LOCAL}; use crate::node::CreateCommand; use crate::secure_channel::listener::create as secure_channel_listener; use crate::service::config::Config; -use crate::{shutdown, CommandGlobalOpts}; +use crate::CommandGlobalOpts; impl CreateCommand { #[instrument(skip_all, fields(node_name = self.name))] @@ -116,23 +119,60 @@ impl CreateCommand { } } - // Create a channel for communicating back to the main thread + self.wait_for_exit_signal(ctx, opts).await + } + + /// Wait until it receives a CTRL+C, EOF or a signal to exit + pub async fn wait_for_exit_signal( + &self, + ctx: &Context, + opts: CommandGlobalOpts, + ) -> miette::Result<()> { let (tx, mut rx) = tokio::sync::mpsc::channel(2); - shutdown::wait( - opts.terminal.clone(), - self.exit_on_eof, - opts.global_args.quiet, - tx, - &mut rx, - ) - .await?; - opts.shutdown(); + // Register a handler for SIGINT, SIGTERM, SIGHUP + { + let tx = tx.clone(); + let terminal = opts.terminal.clone(); + // To avoid handling multiple CTRL+C signals at the same time + let flag = Arc::new(AtomicBool::new(true)); + ctrlc::set_handler(move || { + if flag.load(std::sync::atomic::Ordering::Relaxed) { + let _ = tx.blocking_send(()); + info!("Ctrl+C signal received"); + if !opts.global_args.quiet { + let _ = terminal.write_line(fmt_warn!("Ctrl+C signal received")); + } + flag.store(false, std::sync::atomic::Ordering::Relaxed); + } + }) + .expect("Error setting Ctrl+C handler"); + } - // Try to stop node; it might have already been stopped or deleted (e.g. when running `node delete --all`) - let _ = opts.state.stop_node(&node_name, true).await; - ctx.stop().await.into_diagnostic()?; + if self.exit_on_eof { + // Spawn a thread to monitor STDIN for EOF + { + let tx = tx.clone(); + let terminal = opts.terminal.clone(); + std::thread::spawn(move || { + let mut buffer = Vec::new(); + let mut handle = io::stdin().lock(); + if handle.read_to_end(&mut buffer).is_ok() { + let _ = tx.blocking_send(()); + info!("EOF received"); + let _ = terminal.write_line(fmt_warn!("EOF received")); + } + }); + } + } + + // Wait for signal SIGINT, SIGTERM, SIGHUP or EOF; or for the tx to be closed. + rx.recv().await; + // Clean up and exit + opts.shutdown(); + let _ = opts.state.stop_node(&self.name, true).await; + let _ = ctx.stop().await; opts.terminal .write_line(fmt_ok!("Node stopped successfully"))?; diff --git a/implementations/rust/ockam/ockam_command/src/shutdown.rs b/implementations/rust/ockam/ockam_command/src/shutdown.rs deleted file mode 100644 index a61f5d3e00a..00000000000 --- a/implementations/rust/ockam/ockam_command/src/shutdown.rs +++ /dev/null @@ -1,65 +0,0 @@ -use std::io; -use std::io::Read; -use std::sync::atomic::AtomicBool; -use std::sync::Arc; - -use colorful::Colorful; -use console::Term; -use ockam_api::terminal::{Terminal, TerminalStream}; -use tokio::sync::mpsc::{Receiver, Sender}; -use tracing::info; - -/// Waits for CTRL+C, EOF or a signal to exit, can provide extra shutdown events by -/// sending a message through the channel -pub async fn wait( - terminal: Terminal>, - exit_on_eof: bool, - quiet: bool, - tx: Sender<()>, - rx: &mut Receiver<()>, -) -> miette::Result { - // Register a handler for SIGINT, SIGTERM, SIGHUP - { - let tx = tx.clone(); - let terminal = terminal.clone(); - // avoid printing CTRL+C multiple times - let flag = Arc::new(AtomicBool::new(true)); - ctrlc::set_handler(move || { - if flag.load(std::sync::atomic::Ordering::Relaxed) { - let _ = tx.blocking_send(()); - info!("Ctrl+C signal received"); - if !quiet { - let _ = terminal.write_line( - format!("{} Ctrl+C signal received", "!".light_yellow()).as_str(), - ); - } - flag.store(false, std::sync::atomic::Ordering::Relaxed); - } - }) - .expect("Error setting Ctrl+C handler"); - } - - if exit_on_eof { - // Spawn a thread to monitor STDIN for EOF - { - let tx = tx.clone(); - let terminal = terminal.clone(); - std::thread::spawn(move || { - let mut buffer = Vec::new(); - let mut handle = io::stdin().lock(); - handle - .read_to_end(&mut buffer) - .expect("Error reading from stdin"); - let _ = tx.blocking_send(()); - info!("EOF received"); - if !quiet { - let _ = terminal - .write_line(format!("{} EOF received", "!".light_yellow()).as_str()); - } - }); - } - } - - // Shutdown on SIGINT, SIGTERM, SIGHUP or EOF - Ok(rx.recv().await.is_some()) -} diff --git a/implementations/rust/ockam/ockam_command/src/util/mod.rs b/implementations/rust/ockam/ockam_command/src/util/mod.rs index 6d8e81d4151..d33a5cbe6d2 100644 --- a/implementations/rust/ockam/ockam_command/src/util/mod.rs +++ b/implementations/rust/ockam/ockam_command/src/util/mod.rs @@ -29,17 +29,6 @@ pub mod duration; pub mod exitcode; pub mod parsers; -/// A simple wrapper for shutting down the local embedded node (for -/// the client side of the CLI). Swallows errors and turns them into -/// eprintln logs. -/// -/// TODO: We may want to change this behaviour in the future. -pub async fn stop_node(ctx: Context) { - if let Err(e) = ctx.stop().await { - eprintln!("an error occurred while shutting down local node: {e}"); - } -} - pub fn local_cmd(res: miette::Result<()>) -> miette::Result<()> { if let Err(error) = &res { // Note: error! is also called in command_event.rs::add_command_error_event() @@ -54,7 +43,7 @@ where Fut: core::future::Future> + Send + 'static, { debug!("running '{}' asynchronously", command_name); - let res = embedded_node(opts.clone(), |ctx| { + let res = embedded_node(opts, |ctx| { async move { f(ctx).await }.with_context(OpenTelemetryContext::current_context()) }); local_cmd(res) @@ -82,7 +71,7 @@ where .await .expect("Embedded node child ctx can't be created"); let r = f(child_ctx).await; - stop_node(ctx).await; + let _ = ctx.stop().await; r } .with_context(OpenTelemetryContext::current_context()),