From 170504b57783f4f985a2c74a9e5723588c9ec970 Mon Sep 17 00:00:00 2001 From: Petar Radovic Date: Mon, 26 Feb 2024 14:45:49 +0100 Subject: [PATCH] fix(rust): shutdown processor and worker on init fail --- .../ockam_command/tests/bats/load/base.bash | 4 +- .../tests/bats/local/portals.bats | 14 ++-- .../ockam/ockam_command/tests/bats/run.sh | 2 +- .../ockam_node/src/relay/processor_relay.rs | 36 ++++++--- .../ockam_node/src/relay/worker_relay.rs | 6 ++ .../rust/ockam/ockam_node/tests/tests.rs | 80 +++++++++++++++++++ 6 files changed, 121 insertions(+), 21 deletions(-) diff --git a/implementations/rust/ockam/ockam_command/tests/bats/load/base.bash b/implementations/rust/ockam/ockam_command/tests/bats/load/base.bash index 231cb044357..9ff6ff22154 100644 --- a/implementations/rust/ockam/ockam_command/tests/bats/load/base.bash +++ b/implementations/rust/ockam/ockam_command/tests/bats/load/base.bash @@ -30,7 +30,9 @@ if [[ -z $BATS_LIB ]]; then # export BATS_LIB=$NVM_DIR/versions/node/v18.8.0/lib/node_modules # linux fi -export PYTHON_SERVER_PORT=5000 +if [[ -z $PYTHON_SERVER_PORT ]]; then + export PYTHON_SERVER_PORT=5000 +fi mkdir -p "$HOME/.bats-tests" diff --git a/implementations/rust/ockam/ockam_command/tests/bats/local/portals.bats b/implementations/rust/ockam/ockam_command/tests/bats/local/portals.bats index 61b84602284..7550301d9d4 100644 --- a/implementations/rust/ockam/ockam_command/tests/bats/local/portals.bats +++ b/implementations/rust/ockam/ockam_command/tests/bats/local/portals.bats @@ -257,23 +257,23 @@ teardown() { } @test "portals - local inlet and outlet, removing and re-creating the outlet" { - port="$(random_port)" node_port="$(random_port)" - run_success "$OCKAM" node create blue --tcp-listener-address "127.0.0.1:$node_port" run_success "$OCKAM" tcp-outlet create --at /node/blue --to 127.0.0.1:$PYTHON_SERVER_PORT + + inlet_port="$(random_port)" run_success "$OCKAM" node create green - run_success "$OCKAM" tcp-inlet create --at /node/green --from "127.0.0.1:$port" --to /node/blue/secure/api/service/outlet - run_success curl -sfI --retry-connrefused --retry-delay 5 --retry 10 -m 5 "127.0.0.1:$port" + run_success "$OCKAM" tcp-inlet create --at /node/green --from "127.0.0.1:$inlet_port" --to /node/blue/secure/api/service/outlet + run_success curl -sfI --retry-connrefused --retry-delay 5 --retry 10 -m 5 "127.0.0.1:$inlet_port" run_success "$OCKAM" node delete blue --yes - run_failure curl -sfI -m 3 "127.0.0.1:$port" + run_failure curl -sfI -m 3 "127.0.0.1:$inlet_port" run_success "$OCKAM" node create blue --tcp-listener-address "127.0.0.1:$node_port" run_success "$OCKAM" tcp-outlet create --at /node/blue --to 127.0.0.1:$PYTHON_SERVER_PORT - sleep 20 - run_success curl -sfI --retry-connrefused --retry-delay 2 --retry 10 -m 5 "127.0.0.1:$port" + sleep 15 + run_success curl -sfI --retry-all-errors --retry-delay 5 --retry 10 -m 5 "127.0.0.1:$inlet_port" } @test "portals - local inlet and outlet in reverse order" { diff --git a/implementations/rust/ockam/ockam_command/tests/bats/run.sh b/implementations/rust/ockam/ockam_command/tests/bats/run.sh index 51983765c9f..b8ef73e529a 100755 --- a/implementations/rust/ockam/ockam_command/tests/bats/run.sh +++ b/implementations/rust/ockam/ockam_command/tests/bats/run.sh @@ -10,7 +10,7 @@ export BATS_TEST_TIMEOUT=240 current_directory=$(dirname "$0") echo "Running local suite..." -bats "$current_directory/local" --timing -j 8 +bats "$current_directory/local" --timing -j 3 if [ -z "${ORCHESTRATOR_TESTS}" ]; then exit 0 diff --git a/implementations/rust/ockam/ockam_node/src/relay/processor_relay.rs b/implementations/rust/ockam/ockam_node/src/relay/processor_relay.rs index 62c12a70e0b..f3f2d63114a 100644 --- a/implementations/rust/ockam/ockam_node/src/relay/processor_relay.rs +++ b/implementations/rust/ockam/ockam_node/src/relay/processor_relay.rs @@ -33,6 +33,8 @@ where ctx.address(), e ); + shutdown_and_stop_ack(&mut processor, &mut ctx, &ctx_addr).await; + return; } } @@ -89,18 +91,7 @@ where }; // If we reach this point the router has signaled us to shut down - match processor.shutdown(&mut ctx).await { - Ok(()) => {} - Err(e) => { - error!("Failure during '{}' processor shutdown: {}", ctx_addr, e); - } - } - - // Finally send the router a stop ACK -- log errors - trace!("Sending shutdown ACK"); - if let Err(e) = ctx.send_stop_ack().await { - error!("Error occurred during stop ACK sending: {}", e); - } + shutdown_and_stop_ack(&mut processor, &mut ctx, &ctx_addr).await; } /// Create a processor relay with two node contexts @@ -114,3 +105,24 @@ where rt.spawn(relay.run(ctrl_rx)); } } + +async fn shutdown_and_stop_ack

( + processor: &mut P, + ctx: &mut Context, + ctx_addr: &ockam_core::Address, +) where + P: Processor, +{ + match processor.shutdown(ctx).await { + Ok(()) => {} + Err(e) => { + error!("Failure during '{}' processor shutdown: {}", ctx_addr, e); + } + } + + // Finally send the router a stop ACK -- log errors + trace!("Sending shutdown ACK"); + if let Err(e) = ctx.send_stop_ack().await { + error!("Error occurred during stop ACK sending: {}", e); + } +} diff --git a/implementations/rust/ockam/ockam_node/src/relay/worker_relay.rs b/implementations/rust/ockam/ockam_node/src/relay/worker_relay.rs index 269da07303f..63f2f00d9c9 100644 --- a/implementations/rust/ockam/ockam_node/src/relay/worker_relay.rs +++ b/implementations/rust/ockam/ockam_node/src/relay/worker_relay.rs @@ -106,6 +106,8 @@ where self.ctx.address(), e ); + self.shutdown_and_stop_ack().await; + return; } } @@ -162,6 +164,10 @@ where } } + self.shutdown_and_stop_ack().await; + } + + async fn shutdown_and_stop_ack(&mut self) { // Run the shutdown hook for this worker match self.worker.shutdown(&mut self.ctx).await { Ok(()) => {} diff --git a/implementations/rust/ockam/ockam_node/tests/tests.rs b/implementations/rust/ockam/ockam_node/tests/tests.rs index 890e3b83520..7db1548b0a8 100644 --- a/implementations/rust/ockam/ockam_node/tests/tests.rs +++ b/implementations/rust/ockam/ockam_node/tests/tests.rs @@ -133,6 +133,86 @@ async fn simple_worker__run_node_lifecycle__worker_lifecycle_should_be_full( Ok(()) } +struct FailingWorkerProcessor { + shutdown_was_called: Arc, +} + +#[async_trait] +impl Worker for FailingWorkerProcessor { + type Context = Context; + type Message = String; + + async fn initialize(&mut self, _context: &mut Self::Context) -> Result<()> { + Err(ockam_core::Error::new(Origin::Core, Kind::Internal, "test")) + } + + async fn shutdown(&mut self, _context: &mut Self::Context) -> Result<()> { + self.shutdown_was_called.store(true, Ordering::Relaxed); + Ok(()) + } + + async fn handle_message( + &mut self, + _ctx: &mut Self::Context, + _msg: Routed, + ) -> Result<()> { + Ok(()) + } +} + +#[allow(non_snake_case)] +#[ockam_macros::test] +async fn worker_initialize_fail_should_shutdown(ctx: &mut Context) -> Result<()> { + let shutdown_was_called = Arc::new(AtomicBool::new(false)); + let address = Address::from_string("failing_worker"); + let worker = FailingWorkerProcessor { + shutdown_was_called: shutdown_was_called.clone(), + }; + let res = ctx.start_worker(address.clone(), worker).await; + assert!(res.is_ok()); + sleep(Duration::new(1, 0)).await; + assert!(shutdown_was_called.load(Ordering::Relaxed)); + + assert!(!ctx.list_workers().await?.contains(&address)); + + Ok(()) +} + +#[async_trait] +impl Processor for FailingWorkerProcessor { + type Context = Context; + + async fn process(&mut self, _ctx: &mut Self::Context) -> Result { + Ok(true) + } + + async fn initialize(&mut self, _context: &mut Self::Context) -> Result<()> { + Err(ockam_core::Error::new(Origin::Core, Kind::Internal, "test")) + } + + async fn shutdown(&mut self, _context: &mut Self::Context) -> Result<()> { + self.shutdown_was_called.store(true, Ordering::Relaxed); + Ok(()) + } +} + +#[allow(non_snake_case)] +#[ockam_macros::test] +async fn processor_initialize_fail_should_shutdown(ctx: &mut Context) -> Result<()> { + let shutdown_was_called = Arc::new(AtomicBool::new(false)); + let address = Address::from_string("failing_processor"); + let processor = FailingWorkerProcessor { + shutdown_was_called: shutdown_was_called.clone(), + }; + let res = ctx.start_processor(address.clone(), processor).await; + assert!(res.is_ok()); + sleep(Duration::new(1, 0)).await; + assert!(shutdown_was_called.load(Ordering::Relaxed)); + assert!(!ctx.list_workers().await?.contains(&address)); + + Ok(()) +} + struct DummyProcessor; #[async_trait]