Skip to content

Commit

Permalink
fix(rust): shutdown processor and worker on init fail
Browse files Browse the repository at this point in the history
  • Loading branch information
pradovic authored and adrianbenavides committed Apr 16, 2024
1 parent 07a8791 commit 170504b
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ if [[ -z $BATS_LIB ]]; then
# export BATS_LIB=$NVM_DIR/versions/node/v18.8.0/lib/node_modules # linux
fi

export PYTHON_SERVER_PORT=5000
if [[ -z $PYTHON_SERVER_PORT ]]; then
export PYTHON_SERVER_PORT=5000
fi

mkdir -p "$HOME/.bats-tests"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,23 +257,23 @@ teardown() {
}

@test "portals - local inlet and outlet, removing and re-creating the outlet" {
port="$(random_port)"
node_port="$(random_port)"

run_success "$OCKAM" node create blue --tcp-listener-address "127.0.0.1:$node_port"
run_success "$OCKAM" tcp-outlet create --at /node/blue --to 127.0.0.1:$PYTHON_SERVER_PORT

inlet_port="$(random_port)"
run_success "$OCKAM" node create green
run_success "$OCKAM" tcp-inlet create --at /node/green --from "127.0.0.1:$port" --to /node/blue/secure/api/service/outlet
run_success curl -sfI --retry-connrefused --retry-delay 5 --retry 10 -m 5 "127.0.0.1:$port"
run_success "$OCKAM" tcp-inlet create --at /node/green --from "127.0.0.1:$inlet_port" --to /node/blue/secure/api/service/outlet
run_success curl -sfI --retry-connrefused --retry-delay 5 --retry 10 -m 5 "127.0.0.1:$inlet_port"

run_success "$OCKAM" node delete blue --yes
run_failure curl -sfI -m 3 "127.0.0.1:$port"
run_failure curl -sfI -m 3 "127.0.0.1:$inlet_port"

run_success "$OCKAM" node create blue --tcp-listener-address "127.0.0.1:$node_port"
run_success "$OCKAM" tcp-outlet create --at /node/blue --to 127.0.0.1:$PYTHON_SERVER_PORT

sleep 20
run_success curl -sfI --retry-connrefused --retry-delay 2 --retry 10 -m 5 "127.0.0.1:$port"
sleep 15
run_success curl -sfI --retry-all-errors --retry-delay 5 --retry 10 -m 5 "127.0.0.1:$inlet_port"
}

@test "portals - local inlet and outlet in reverse order" {
Expand Down
2 changes: 1 addition & 1 deletion implementations/rust/ockam/ockam_command/tests/bats/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ export BATS_TEST_TIMEOUT=240
current_directory=$(dirname "$0")

echo "Running local suite..."
bats "$current_directory/local" --timing -j 8
bats "$current_directory/local" --timing -j 3

if [ -z "${ORCHESTRATOR_TESTS}" ]; then
exit 0
Expand Down
36 changes: 24 additions & 12 deletions implementations/rust/ockam/ockam_node/src/relay/processor_relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ where
ctx.address(),
e
);
shutdown_and_stop_ack(&mut processor, &mut ctx, &ctx_addr).await;
return;
}
}

Expand Down Expand Up @@ -89,18 +91,7 @@ where
};

// If we reach this point the router has signaled us to shut down
match processor.shutdown(&mut ctx).await {
Ok(()) => {}
Err(e) => {
error!("Failure during '{}' processor shutdown: {}", ctx_addr, e);
}
}

// Finally send the router a stop ACK -- log errors
trace!("Sending shutdown ACK");
if let Err(e) = ctx.send_stop_ack().await {
error!("Error occurred during stop ACK sending: {}", e);
}
shutdown_and_stop_ack(&mut processor, &mut ctx, &ctx_addr).await;
}

/// Create a processor relay with two node contexts
Expand All @@ -114,3 +105,24 @@ where
rt.spawn(relay.run(ctrl_rx));
}
}

async fn shutdown_and_stop_ack<P>(
processor: &mut P,
ctx: &mut Context,
ctx_addr: &ockam_core::Address,
) where
P: Processor<Context = Context>,
{
match processor.shutdown(ctx).await {
Ok(()) => {}
Err(e) => {
error!("Failure during '{}' processor shutdown: {}", ctx_addr, e);
}
}

// Finally send the router a stop ACK -- log errors
trace!("Sending shutdown ACK");
if let Err(e) = ctx.send_stop_ack().await {
error!("Error occurred during stop ACK sending: {}", e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ where
self.ctx.address(),
e
);
self.shutdown_and_stop_ack().await;
return;
}
}

Expand Down Expand Up @@ -162,6 +164,10 @@ where
}
}

self.shutdown_and_stop_ack().await;
}

async fn shutdown_and_stop_ack(&mut self) {
// Run the shutdown hook for this worker
match self.worker.shutdown(&mut self.ctx).await {
Ok(()) => {}
Expand Down
80 changes: 80 additions & 0 deletions implementations/rust/ockam/ockam_node/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,86 @@ async fn simple_worker__run_node_lifecycle__worker_lifecycle_should_be_full(
Ok(())
}

struct FailingWorkerProcessor {
shutdown_was_called: Arc<AtomicBool>,
}

#[async_trait]
impl Worker for FailingWorkerProcessor {
type Context = Context;
type Message = String;

async fn initialize(&mut self, _context: &mut Self::Context) -> Result<()> {
Err(ockam_core::Error::new(Origin::Core, Kind::Internal, "test"))
}

async fn shutdown(&mut self, _context: &mut Self::Context) -> Result<()> {
self.shutdown_was_called.store(true, Ordering::Relaxed);
Ok(())
}

async fn handle_message(
&mut self,
_ctx: &mut Self::Context,
_msg: Routed<Self::Message>,
) -> Result<()> {
Ok(())
}
}

#[allow(non_snake_case)]
#[ockam_macros::test]
async fn worker_initialize_fail_should_shutdown(ctx: &mut Context) -> Result<()> {
let shutdown_was_called = Arc::new(AtomicBool::new(false));
let address = Address::from_string("failing_worker");
let worker = FailingWorkerProcessor {
shutdown_was_called: shutdown_was_called.clone(),
};
let res = ctx.start_worker(address.clone(), worker).await;
assert!(res.is_ok());
sleep(Duration::new(1, 0)).await;
assert!(shutdown_was_called.load(Ordering::Relaxed));

assert!(!ctx.list_workers().await?.contains(&address));

Ok(())
}

#[async_trait]
impl Processor for FailingWorkerProcessor {
type Context = Context;

async fn process(&mut self, _ctx: &mut Self::Context) -> Result<bool> {
Ok(true)
}

async fn initialize(&mut self, _context: &mut Self::Context) -> Result<()> {
Err(ockam_core::Error::new(Origin::Core, Kind::Internal, "test"))
}

async fn shutdown(&mut self, _context: &mut Self::Context) -> Result<()> {
self.shutdown_was_called.store(true, Ordering::Relaxed);
Ok(())
}
}

#[allow(non_snake_case)]
#[ockam_macros::test]
async fn processor_initialize_fail_should_shutdown(ctx: &mut Context) -> Result<()> {
let shutdown_was_called = Arc::new(AtomicBool::new(false));
let address = Address::from_string("failing_processor");
let processor = FailingWorkerProcessor {
shutdown_was_called: shutdown_was_called.clone(),
};
let res = ctx.start_processor(address.clone(), processor).await;
assert!(res.is_ok());
sleep(Duration::new(1, 0)).await;
assert!(shutdown_was_called.load(Ordering::Relaxed));
assert!(!ctx.list_workers().await?.contains(&address));

Ok(())
}

struct DummyProcessor;

#[async_trait]
Expand Down

0 comments on commit 170504b

Please sign in to comment.