Skip to content

Commit

Permalink
feat(rust): support foreground flag when creating a node with a con…
Browse files Browse the repository at this point in the history
…fig file
  • Loading branch information
adrianbenavides committed Apr 12, 2024
1 parent 105f294 commit 2c660af
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 122 deletions.
1 change: 0 additions & 1 deletion implementations/rust/ockam/ockam_command/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ mod secure_channel;
mod service;
#[cfg(feature = "orchestrator")]
mod share;
pub mod shutdown;
mod sidecar;
mod space;
mod status;
Expand Down
30 changes: 15 additions & 15 deletions implementations/rust/ockam/ockam_command/src/node/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,20 +117,6 @@ impl Default for CreateCommand {
impl Command for CreateCommand {
const NAME: &'static str = "node create";

async fn async_run(self, ctx: &Context, opts: CommandGlobalOpts) -> Result<()> {
let ctx = ctx.async_try_clone().await.into_diagnostic()?;
if self.has_name_arg() {
if self.foreground {
self.foreground_mode(&ctx, opts).await?;
} else {
self.background_mode(&ctx, opts).await?;
}
} else {
self.run_config(&ctx, &opts).await?;
}
Ok(())
}

#[instrument(skip_all)]
fn run(self, opts: CommandGlobalOpts) -> miette::Result<()> {
if self.has_name_arg() {
Expand All @@ -151,10 +137,24 @@ impl Command for CreateCommand {
}
} else {
return async_cmd(&self.name(), opts.clone(), |ctx| async move {
self.run_config(&ctx, &opts).await
self.run_config(&ctx, opts).await
});
}
}

async fn async_run(self, ctx: &Context, opts: CommandGlobalOpts) -> Result<()> {
let ctx = ctx.async_try_clone().await.into_diagnostic()?;
if self.has_name_arg() {
if self.foreground {
self.foreground_mode(&ctx, opts).await?;
} else {
self.background_mode(&ctx, opts).await?;
}
} else {
self.run_config(&ctx, opts).await?;
}
Ok(())
}
}

impl CreateCommand {
Expand Down
34 changes: 22 additions & 12 deletions implementations/rust/ockam/ockam_command/src/node/create/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use tracing::{instrument, Span};

impl CreateCommand {
#[instrument(skip_all, fields(app.event.command.configuration_file))]
pub async fn run_config(self, ctx: &Context, opts: &CommandGlobalOpts) -> miette::Result<()> {
pub async fn run_config(self, ctx: &Context, opts: CommandGlobalOpts) -> miette::Result<()> {
let contents = async_parse_path_or_url(&self.name).await?;
// Set environment variables from the cli command args
for (key, value) in &self.variables {
Expand All @@ -27,8 +27,13 @@ impl CreateCommand {
);

let mut config = NodeConfig::new(&contents)?;
let node_name = config.merge(self)?;
config.run(ctx, opts.clone(), &node_name).await?;
let node_name = config.merge(&self)?;
config.run(ctx, &opts, &node_name).await?;

if self.foreground {
self.wait_for_exit_signal(ctx, opts).await?;
}

Ok(())
}
}
Expand Down Expand Up @@ -59,8 +64,8 @@ impl NodeConfig {
}

/// Merge the arguments of the node defined in the config with the arguments from the
/// "create" command, giving precedence to the config values.
fn merge(&mut self, cli_args: CreateCommand) -> miette::Result<String> {
/// "create" command, giving precedence to the config values. Returns the node name.
fn merge(&mut self, cli_args: &CreateCommand) -> miette::Result<String> {
// Set environment variables from the cli command again
// to override the duplicate entries from the config file.
for (key, value) in &cli_args.variables {
Expand All @@ -84,13 +89,18 @@ impl NodeConfig {
self.node.exit_on_eof = Some(ArgValue::Bool(cli_args.exit_on_eof));
}
if self.node.tcp_listener_address.is_none() {
self.node.tcp_listener_address = Some(ArgValue::String(cli_args.tcp_listener_address));
self.node.tcp_listener_address =
Some(ArgValue::String(cli_args.tcp_listener_address.clone()));
}
if self.node.identity.is_none() {
self.node.identity = cli_args.identity.map(ArgValue::String);
self.node.identity = cli_args.identity.clone().map(ArgValue::String);
}
if self.node.project.is_none() {
self.node.project = cli_args.trust_opts.project_name.map(ArgValue::String);
self.node.project = cli_args
.trust_opts
.project_name
.clone()
.map(ArgValue::String);
}

let node_name = self.node.name.as_ref().unwrap().to_string();
Expand All @@ -100,7 +110,7 @@ impl NodeConfig {
pub async fn run(
self,
ctx: &Context,
opts: CommandGlobalOpts,
opts: &CommandGlobalOpts,
node_name: &str,
) -> miette::Result<()> {
let overrides = &ValuesOverrides::default().with_override_node_name(node_name);
Expand All @@ -117,7 +127,7 @@ impl NodeConfig {

// Run commands
for cmd in commands {
cmd.run(ctx, &opts).await?
cmd.run(ctx, opts).await?
}
Ok(())
}
Expand Down Expand Up @@ -162,7 +172,7 @@ mod tests {

// No node config, cli args should be used
let mut config = NodeConfig::parse("").unwrap();
let node_name = config.merge(cli_args.clone()).unwrap();
let node_name = config.merge(&cli_args).unwrap();
let node = config
.node
.parse_commands(&ValuesOverrides::default())
Expand All @@ -185,7 +195,7 @@ mod tests {
"#,
)
.unwrap();
let node_name = config.merge(cli_args).unwrap();
let node_name = config.merge(&cli_args).unwrap();
let node = config
.node
.parse_commands(&ValuesOverrides::default())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,27 @@
use std::io;
use std::io::Read;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;

use colorful::Colorful;
use miette::{miette, IntoDiagnostic};
use tokio::time::{sleep, Duration};
use tracing::{debug, instrument};
use tracing::{debug, info, instrument};

use ockam::{Address, TcpListenerOptions};
use ockam::{Context, TcpTransport};
use ockam_api::fmt_ok;
use ockam_api::nodes::InMemoryNode;
use ockam_api::nodes::{
service::{NodeManagerGeneralOptions, NodeManagerTransportOptions},
NodeManagerWorker, NODEMANAGER_ADDR,
};
use ockam_api::{fmt_ok, fmt_warn};
use ockam_core::{route, LOCAL};

use crate::node::CreateCommand;
use crate::secure_channel::listener::create as secure_channel_listener;
use crate::service::config::Config;
use crate::{shutdown, CommandGlobalOpts};
use crate::CommandGlobalOpts;

impl CreateCommand {
#[instrument(skip_all, fields(node_name = self.name))]
Expand Down Expand Up @@ -116,23 +119,60 @@ impl CreateCommand {
}
}

// Create a channel for communicating back to the main thread
self.wait_for_exit_signal(ctx, opts).await
}

/// Wait until it receives a CTRL+C, EOF or a signal to exit
pub async fn wait_for_exit_signal(
&self,
ctx: &Context,
opts: CommandGlobalOpts,
) -> miette::Result<()> {
let (tx, mut rx) = tokio::sync::mpsc::channel(2);
shutdown::wait(
opts.terminal.clone(),
self.exit_on_eof,
opts.global_args.quiet,
tx,
&mut rx,
)
.await?;

opts.shutdown();
// Register a handler for SIGINT, SIGTERM, SIGHUP
{
let tx = tx.clone();
let terminal = opts.terminal.clone();
// To avoid handling multiple CTRL+C signals at the same time
let flag = Arc::new(AtomicBool::new(true));
ctrlc::set_handler(move || {
if flag.load(std::sync::atomic::Ordering::Relaxed) {
let _ = tx.blocking_send(());
info!("Ctrl+C signal received");
if !opts.global_args.quiet {
let _ = terminal.write_line(fmt_warn!("Ctrl+C signal received"));
}
flag.store(false, std::sync::atomic::Ordering::Relaxed);
}
})
.expect("Error setting Ctrl+C handler");
}

// Try to stop node; it might have already been stopped or deleted (e.g. when running `node delete --all`)
let _ = opts.state.stop_node(&node_name, true).await;
ctx.stop().await.into_diagnostic()?;
if self.exit_on_eof {
// Spawn a thread to monitor STDIN for EOF
{
let tx = tx.clone();
let terminal = opts.terminal.clone();
std::thread::spawn(move || {
let mut buffer = Vec::new();
let mut handle = io::stdin().lock();
if handle.read_to_end(&mut buffer).is_ok() {
let _ = tx.blocking_send(());
info!("EOF received");
let _ = terminal.write_line(fmt_warn!("EOF received"));
}
});
}
}

// Wait for signal SIGINT, SIGTERM, SIGHUP or EOF; or for the tx to be closed.
rx.recv().await;

// Clean up and exit
opts.shutdown();
let _ = opts.state.stop_node(&self.name, true).await;
let _ = ctx.stop().await;
opts.terminal
.write_line(fmt_ok!("Node stopped successfully"))?;

Expand Down
65 changes: 0 additions & 65 deletions implementations/rust/ockam/ockam_command/src/shutdown.rs

This file was deleted.

15 changes: 2 additions & 13 deletions implementations/rust/ockam/ockam_command/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,6 @@ pub mod duration;
pub mod exitcode;
pub mod parsers;

/// A simple wrapper for shutting down the local embedded node (for
/// the client side of the CLI). Swallows errors and turns them into
/// eprintln logs.
///
/// TODO: We may want to change this behaviour in the future.
pub async fn stop_node(ctx: Context) {
if let Err(e) = ctx.stop().await {
eprintln!("an error occurred while shutting down local node: {e}");
}
}

pub fn local_cmd(res: miette::Result<()>) -> miette::Result<()> {
if let Err(error) = &res {
// Note: error! is also called in command_event.rs::add_command_error_event()
Expand All @@ -54,7 +43,7 @@ where
Fut: core::future::Future<Output = miette::Result<()>> + Send + 'static,
{
debug!("running '{}' asynchronously", command_name);
let res = embedded_node(opts.clone(), |ctx| {
let res = embedded_node(opts, |ctx| {
async move { f(ctx).await }.with_context(OpenTelemetryContext::current_context())
});
local_cmd(res)
Expand Down Expand Up @@ -82,7 +71,7 @@ where
.await
.expect("Embedded node child ctx can't be created");
let r = f(child_ctx).await;
stop_node(ctx).await;
let _ = ctx.stop().await;
r
}
.with_context(OpenTelemetryContext::current_context()),
Expand Down

0 comments on commit 2c660af

Please sign in to comment.