From 3b8a913036a12f61e9a434ba4dfdb08043a9d978 Mon Sep 17 00:00:00 2001 From: Oleksandr Deundiak Date: Sat, 28 Dec 2024 01:00:36 +0100 Subject: [PATCH] attempt to implement priority shutdown --- .../src/tcp_interceptor/workers/listener.rs | 4 +- .../src/tcp_interceptor/workers/processor.rs | 4 +- .../rust/ockam/ockam_core/src/compat.rs | 2 +- .../ockam/ockam_node/src/context/context.rs | 52 ++--- .../src/context/context_lifecycle.rs | 9 +- .../ockam/ockam_node/src/context/stop_env.rs | 8 +- .../rust/ockam/ockam_node/src/executor.rs | 6 +- .../rust/ockam/ockam_node/src/node.rs | 8 +- .../ockam/ockam_node/src/processor_builder.rs | 34 +++- .../rust/ockam/ockam_node/src/router/mod.rs | 155 +-------------- .../ockam/ockam_node/src/router/processor.rs | 47 +++-- .../ockam/ockam_node/src/router/record.rs | 175 +++++++---------- .../ockam/ockam_node/src/router/router.rs | 178 ++++++++++++++++++ .../ockam/ockam_node/src/router/shutdown.rs | 99 +++++++--- .../rust/ockam/ockam_node/src/router/state.rs | 105 ----------- .../ockam/ockam_node/src/router/worker.rs | 29 +-- .../ockam/ockam_node/src/worker_builder.rs | 34 +++- .../rust/ockam/ockam_node/tests/tests.rs | 2 +- .../rust/ockam/ockam_transport_ble/src/lib.rs | 2 - .../ockam_transport_ble/src/router/mod.rs | 5 - .../src/workers/listener.rs | 4 - .../src/workers/receiver.rs | 4 - .../ockam_transport_ble/src/workers/sender.rs | 2 - .../rust/ockam/ockam_transport_tcp/src/lib.rs | 2 - .../src/workers/listener.rs | 10 +- .../src/workers/receiver.rs | 5 +- .../ockam_transport_tcp/src/workers/sender.rs | 5 +- .../rust/ockam/ockam_transport_udp/src/lib.rs | 2 - .../ockam_transport_udp/src/transport/bind.rs | 3 +- .../src/workers/receiver.rs | 4 - .../src/router/uds_router.rs | 4 - .../src/workers/listener.rs | 4 - .../src/workers/receiver.rs | 4 - .../ockam_transport_uds/src/workers/sender.rs | 2 - .../ockam_transport_websocket/src/lib.rs | 2 - .../src/router/mod.rs | 4 - .../src/workers/listener.rs | 4 - .../src/workers/receiver.rs | 4 - .../src/workers/sender.rs | 1 - 39 files changed, 496 insertions(+), 532 deletions(-) create mode 100644 implementations/rust/ockam/ockam_node/src/router/router.rs delete mode 100644 implementations/rust/ockam/ockam_node/src/router/state.rs diff --git a/examples/rust/mitm_node/src/tcp_interceptor/workers/listener.rs b/examples/rust/mitm_node/src/tcp_interceptor/workers/listener.rs index 53ff6d6c46d..8be30c0bb5c 100644 --- a/examples/rust/mitm_node/src/tcp_interceptor/workers/listener.rs +++ b/examples/rust/mitm_node/src/tcp_interceptor/workers/listener.rs @@ -1,4 +1,4 @@ -use crate::tcp_interceptor::{Role, TcpMitmProcessor, TcpMitmRegistry, TcpMitmTransport, CLUSTER_NAME}; +use crate::tcp_interceptor::{Role, TcpMitmProcessor, TcpMitmRegistry, TcpMitmTransport}; use ockam_core::{async_trait, compat::net::SocketAddr}; use ockam_core::{Address, Processor, Result}; use ockam_node::Context; @@ -45,8 +45,6 @@ impl Processor for TcpMitmListenProcessor { type Context = Context; async fn initialize(&mut self, ctx: &mut Context) -> Result<()> { - ctx.set_cluster(CLUSTER_NAME)?; - self.registry.add_listener(ctx.primary_address()); Ok(()) diff --git a/examples/rust/mitm_node/src/tcp_interceptor/workers/processor.rs b/examples/rust/mitm_node/src/tcp_interceptor/workers/processor.rs index e11a71e88c4..6a91ba4492d 100644 --- a/examples/rust/mitm_node/src/tcp_interceptor/workers/processor.rs +++ b/examples/rust/mitm_node/src/tcp_interceptor/workers/processor.rs @@ -1,4 +1,4 @@ -use crate::tcp_interceptor::{Role, TcpMitmRegistry, CLUSTER_NAME}; +use crate::tcp_interceptor::{Role, TcpMitmRegistry}; use ockam_core::compat::sync::Arc; use ockam_core::{async_trait, Address, AllowAll}; use ockam_core::{Processor, Result}; @@ -59,8 +59,6 @@ impl Processor for TcpMitmProcessor { type Context = Context; async fn initialize(&mut self, ctx: &mut Context) -> Result<()> { - ctx.set_cluster(CLUSTER_NAME)?; - self.registry .add_processor(ctx.primary_address(), self.role, self.write_half.clone()); diff --git a/implementations/rust/ockam/ockam_core/src/compat.rs b/implementations/rust/ockam/ockam_core/src/compat.rs index dc8d008cf17..3bdb1347067 100644 --- a/implementations/rust/ockam/ockam_core/src/compat.rs +++ b/implementations/rust/ockam/ockam_core/src/compat.rs @@ -305,7 +305,7 @@ pub mod sync { #[cfg(feature = "std")] pub mod sync { pub use std::sync::{Arc, Weak}; - pub use std::sync::{Mutex, RwLock, RwLockWriteGuard}; + pub use std::sync::{Mutex, RwLock}; } /// Provides `std::task` for `no_std` targets. diff --git a/implementations/rust/ockam/ockam_node/src/context/context.rs b/implementations/rust/ockam/ockam_node/src/context/context.rs index 6a1ed6af223..b8aa6a46a05 100644 --- a/implementations/rust/ockam/ockam_node/src/context/context.rs +++ b/implementations/rust/ockam/ockam_node/src/context/context.rs @@ -4,7 +4,7 @@ use core::sync::atomic::AtomicUsize; use ockam_core::compat::collections::HashMap; use ockam_core::compat::sync::{Arc, RwLock}; use ockam_core::compat::time::Duration; -use ockam_core::compat::{string::String, vec::Vec}; +use ockam_core::compat::vec::Vec; use ockam_core::flow_control::FlowControls; #[cfg(feature = "std")] use ockam_core::OpenTelemetryContext; @@ -31,6 +31,36 @@ pub enum ContextMode { Attached, } +/// Higher value means the worker is shutdown earlier +#[derive(Debug, Default, Clone, Copy, Eq, PartialEq)] +pub enum WorkerShutdownPriority { + /// 1 + Priority1, + /// 2 + Priority2, + /// 3 + Priority3, + /// 4 + #[default] + Priority4, + /// 5 + Priority5, + /// 6 + Priority6, + /// 7 + Priority7, +} + +impl WorkerShutdownPriority { + /// All possible values in descending order + pub fn all_descending_order() -> [WorkerShutdownPriority; 7] { + use WorkerShutdownPriority::*; + [ + Priority7, Priority6, Priority5, Priority4, Priority3, Priority2, Priority1, + ] + } +} + /// Context contains Node state and references to the runtime. pub struct Context { pub(super) mailboxes: Mailboxes, @@ -121,26 +151,6 @@ impl Context { } impl Context { - /// Assign the current worker to a cluster - /// - /// A cluster is a set of workers that should be stopped together - /// when the node is stopped or parts of the system are reloaded. - /// **This is not to be confused with supervisors!** - /// - /// By adding your worker to a cluster you signal to the runtime - /// that your worker may be depended on by other workers that - /// should be stopped first. - /// - /// **Your cluster name MUST NOT start with `_internals.` or - /// `ockam.`!** - /// - /// Clusters are de-allocated in reverse order of their - /// initialisation when the node is stopped. - pub fn set_cluster>(&self, label: S) -> Result<()> { - self.router()? - .set_cluster(self.primary_address(), label.into()) - } - /// Return a list of all available worker addresses on a node pub fn list_workers(&self) -> Result> { Ok(self.router()?.list_workers()) diff --git a/implementations/rust/ockam/ockam_node/src/context/context_lifecycle.rs b/implementations/rust/ockam/ockam_node/src/context/context_lifecycle.rs index 9a46a0af64a..b7fa1fb7a7f 100644 --- a/implementations/rust/ockam/ockam_node/src/context/context_lifecycle.rs +++ b/implementations/rust/ockam/ockam_node/src/context/context_lifecycle.rs @@ -237,8 +237,13 @@ impl Context { // Create a new context and get access to the mailbox senders let (ctx, sender, _) = self.new_with_mailboxes(mailboxes, ContextMode::Detached); - self.router()? - .add_worker(ctx.mailboxes(), sender, true, self.mailbox_count.clone())?; + self.router()?.add_worker( + ctx.mailboxes(), + sender, + true, + Default::default(), + self.mailbox_count.clone(), + )?; Ok(ctx) } diff --git a/implementations/rust/ockam/ockam_node/src/context/stop_env.rs b/implementations/rust/ockam/ockam_node/src/context/stop_env.rs index a3d2234170d..926f1c85aac 100644 --- a/implementations/rust/ockam/ockam_node/src/context/stop_env.rs +++ b/implementations/rust/ockam/ockam_node/src/context/stop_env.rs @@ -17,6 +17,12 @@ impl Context { /// This call will hang until a safe shutdown has been completed /// or the desired timeout has been reached. pub async fn stop_timeout(&self, seconds: u8) -> Result<()> { - self.router()?.stop_graceful(seconds).await + self.router()?.stop_graceful(seconds).await?; + + // TODO: Would be cool to shutdown the Runtime here with a timeout in case router timed out, + // that would require more transparent ownership over the Runtime, since shutdown is + // consuming the value. + + Ok(()) } } diff --git a/implementations/rust/ockam/ockam_node/src/executor.rs b/implementations/rust/ockam/ockam_node/src/executor.rs index d83ed5b8f64..8fc21beb446 100644 --- a/implementations/rust/ockam/ockam_node/src/executor.rs +++ b/implementations/rust/ockam/ockam_node/src/executor.rs @@ -48,7 +48,7 @@ pub struct Executor { impl Executor { /// Create a new Ockam node [`Executor`] instance pub fn new(runtime: Arc, flow_controls: &FlowControls) -> Self { - let router = Arc::new(Router::new(runtime.handle().clone(), flow_controls)); + let router = Arc::new(Router::new(flow_controls)); #[cfg(feature = "metrics")] let metrics = Metrics::new(runtime.handle().clone(), router.get_metrics_readout()); Self { @@ -110,7 +110,9 @@ impl Executor { match future.await { Ok(val) => { // TODO: Add timeout here - router.wait_termination().await; + debug!("Wait for router termination..."); + router.wait_termination().await.unwrap(); // FIXME + debug!("Router terminated successfully!..."); Ok(val) } Err(e) => { diff --git a/implementations/rust/ockam/ockam_node/src/node.rs b/implementations/rust/ockam/ockam_node/src/node.rs index 48548fdf86f..51e83487ce8 100644 --- a/implementations/rust/ockam/ockam_node/src/node.rs +++ b/implementations/rust/ockam/ockam_node/src/node.rs @@ -142,7 +142,13 @@ impl NodeBuilder { // Register this mailbox handle with the executor router - .add_worker(ctx.mailboxes(), sender, true, ctx.mailbox_count()) + .add_worker( + ctx.mailboxes(), + sender, + true, + Default::default(), + ctx.mailbox_count(), + ) .expect("router initialization failed"); // Then return the root context and executor diff --git a/implementations/rust/ockam/ockam_node/src/processor_builder.rs b/implementations/rust/ockam/ockam_node/src/processor_builder.rs index 5d1198946e5..c60f94fe8df 100644 --- a/implementations/rust/ockam/ockam_node/src/processor_builder.rs +++ b/implementations/rust/ockam/ockam_node/src/processor_builder.rs @@ -1,4 +1,4 @@ -use crate::{debugger, ContextMode}; +use crate::{debugger, ContextMode, WorkerShutdownPriority}; use crate::{relay::ProcessorRelay, Context}; use ockam_core::compat::string::String; use ockam_core::compat::sync::Arc; @@ -72,6 +72,7 @@ where processor: self.processor, address: address.into(), metadata, + shutdown_priority: Default::default(), } } @@ -79,6 +80,7 @@ where pub fn with_mailboxes(self, mailboxes: Mailboxes) -> ProcessorBuilderMultipleAddresses

{ ProcessorBuilderMultipleAddresses { mailboxes, + shutdown_priority: Default::default(), processor: self.processor, } } @@ -89,6 +91,7 @@ where P: Processor, { mailboxes: Mailboxes, + shutdown_priority: WorkerShutdownPriority, processor: P, } @@ -98,7 +101,18 @@ where { /// Consume this builder and start a new Ockam [`Processor`] from the given context pub async fn start(self, context: &Context) -> Result<()> { - start(context, self.mailboxes, self.processor).await + start( + context, + self.mailboxes, + self.shutdown_priority, + self.processor, + ) + .await + } + + pub fn with_shutdown_priority(mut self, shutdown_priority: WorkerShutdownPriority) -> Self { + self.shutdown_priority = shutdown_priority; + self } } @@ -111,6 +125,7 @@ where address: Address, processor: P, metadata: Option, + shutdown_priority: WorkerShutdownPriority, } impl

ProcessorBuilderOneAddress

@@ -158,6 +173,7 @@ where ), vec![], ), + self.shutdown_priority, self.processor, ) .await @@ -203,10 +219,20 @@ where self.outgoing_ac = outgoing_access_control.clone(); self } + + pub fn with_shutdown_priority(mut self, shutdown_priority: WorkerShutdownPriority) -> Self { + self.shutdown_priority = shutdown_priority; + self + } } /// Consume this builder and start a new Ockam [`Processor`] from the given context -pub async fn start

(context: &Context, mailboxes: Mailboxes, processor: P) -> Result<()> +pub async fn start

( + context: &Context, + mailboxes: Mailboxes, + shutdown_priority: WorkerShutdownPriority, + processor: P, +) -> Result<()> where P: Processor, { @@ -223,7 +249,7 @@ where debugger::log_inherit_context("PROCESSOR", context, &ctx); let router = context.router()?; - router.add_processor(ctx.mailboxes(), sender)?; + router.add_processor(ctx.mailboxes(), sender, shutdown_priority)?; // Then initialise the processor message relay ProcessorRelay::

::init(context.runtime(), processor, ctx, ctrl_rx); diff --git a/implementations/rust/ockam/ockam_node/src/router/mod.rs b/implementations/rust/ockam/ockam_node/src/router/mod.rs index d1d3bc5b0e3..171dab262b9 100644 --- a/implementations/rust/ockam/ockam_node/src/router/mod.rs +++ b/implementations/rust/ockam/ockam_node/src/router/mod.rs @@ -1,157 +1,8 @@ mod processor; mod record; +#[allow(clippy::module_inception)] +mod router; mod shutdown; -mod state; pub mod worker; -#[cfg(feature = "metrics")] -use core::sync::atomic::AtomicUsize; - -use crate::channel_types::{MessageSender, OneshotSender}; -use crate::relay::CtrlSignal; -use crate::tokio::runtime::Handle; -use crate::{NodeError, NodeReason}; -use alloc::string::String; -use alloc::vec::Vec; -use ockam_core::compat::collections::hash_map::Entry; -use ockam_core::compat::collections::HashMap; -use ockam_core::compat::sync::RwLock as SyncRwLock; -use ockam_core::errcode::{Kind, Origin}; -use ockam_core::flow_control::FlowControls; -use ockam_core::{Address, AddressMetadata, Error, RelayMessage, Result, TransportType}; -use record::{AddressRecord, InternalMap, WorkerMeta}; -use state::{NodeState, RouterState}; - -/// A pair of senders to a worker relay -#[derive(Debug)] -pub struct SenderPair { - pub msgs: MessageSender, - pub ctrl: OneshotSender, -} - -/// A combined address type and local worker router -/// -/// This router supports two routing modes: internal, and external. -/// -/// Internal routing resolves `type=0` addresses to local workers. -/// -/// External routing is supported only after a plugin component -/// registers itself with this router. Only one router can be -/// registered per address type. -pub struct Router { - runtime_handle: Handle, - /// Keep track of some additional router state information - state: RouterState, - /// Internal address state - map: InternalMap, - /// Externally registered router components - external: SyncRwLock>, -} - -enum RouteType { - Internal, - External(TransportType), -} - -fn determine_type(next: &Address) -> RouteType { - if next.transport_type().is_local() { - RouteType::Internal - } else { - RouteType::External(next.transport_type()) - } -} - -impl Router { - pub fn new(runtime_handle: Handle, flow_controls: &FlowControls) -> Self { - Self { - runtime_handle, - state: RouterState::new(), - map: InternalMap::new(flow_controls), - external: Default::default(), - } - } - - pub fn set_cluster(&self, addr: &Address, label: String) -> Result<()> { - self.map.set_cluster(addr, label) - } - - pub fn list_workers(&self) -> Vec

{ - self.map.list_workers() - } - - pub fn is_worker_registered_at(&self, address: &Address) -> bool { - self.map.is_worker_registered_at(address) - } - - pub fn stop_ack(&self, primary_address: &Address) -> Result<()> { - let running = self.state.is_running(); - debug!(%running, "Handling shutdown ACK for {}", primary_address); - - let empty = self.map.stop_ack(primary_address); - if !running && empty { - self.state.set_to_stopped(); - } - - Ok(()) - } - - pub fn find_terminal_address<'a>( - &self, - addresses: impl Iterator, - ) -> Option<(&'a Address, AddressMetadata)> { - self.map.find_terminal_address(addresses) - } - - pub fn get_address_metadata(&self, address: &Address) -> Option { - self.map.get_address_metadata(address) - } - - pub fn register_router(&self, tt: TransportType, addr: Address) -> Result<()> { - if let Entry::Vacant(e) = self.external.write().unwrap().entry(tt) { - e.insert(addr); - Ok(()) - } else { - // already exists - Err(Error::new( - Origin::Node, - Kind::AlreadyExists, - "Router already exists", - )) - } - } - - pub fn resolve(&self, addr: &Address) -> Result> { - let addr = match determine_type(addr) { - RouteType::Internal => addr, - // TODO: Remove after other transport implementations are moved to new architecture - RouteType::External(tt) => &self.address_for_transport(tt)?, - }; - self.map.resolve(addr) - } - - fn address_for_transport(&self, tt: TransportType) -> Result
{ - let guard = self.external.read().unwrap(); - guard - .get(&tt) - .cloned() - .ok_or_else(|| NodeError::NodeState(NodeReason::Unknown).internal()) - } - - pub async fn wait_termination(&self) { - self.state.wait_until_stopped().await; - } - - /// Stop the worker - pub fn stop_address(&self, addr: &Address, skip_sending_stop_signal: bool) -> Result<()> { - debug!("Stopping address '{}'", addr); - - self.map.stop(addr, skip_sending_stop_signal)?; - - Ok(()) - } - - #[cfg(feature = "metrics")] - pub(crate) fn get_metrics_readout(&self) -> (Arc, Arc) { - self.map.get_metrics() - } -} +pub use router::*; diff --git a/implementations/rust/ockam/ockam_node/src/router/processor.rs b/implementations/rust/ockam/ockam_node/src/router/processor.rs index 10fc6edce5f..0a7182393d6 100644 --- a/implementations/rust/ockam/ockam_node/src/router/processor.rs +++ b/implementations/rust/ockam/ockam_node/src/router/processor.rs @@ -1,27 +1,35 @@ -use super::{AddressRecord, NodeState, Router, SenderPair, WorkerMeta}; +use super::{Router, RouterState, SenderPair}; +use crate::router::record::{AddressRecord, WorkerMeta}; +use crate::WorkerShutdownPriority; use ockam_core::compat::sync::Arc; use ockam_core::errcode::{Kind, Origin}; use ockam_core::{Error, Mailboxes, Result}; impl Router { /// Start a processor - pub(crate) fn add_processor(&self, mailboxes: &Mailboxes, senders: SenderPair) -> Result<()> { - if self.state.is_running() { - self.add_processor_impl(mailboxes, senders) - } else { - match self.state.node_state() { - NodeState::Stopping => Err(Error::new( - Origin::Node, - Kind::Shutdown, - "The node is shutting down", - ))?, - NodeState::Running => unreachable!(), - NodeState::Stopped => unreachable!(), - } + pub(crate) fn add_processor( + &self, + mailboxes: &Mailboxes, + senders: SenderPair, + shutdown_priority: WorkerShutdownPriority, + ) -> Result<()> { + if *self.state.read().unwrap() != RouterState::Running { + return Err(Error::new( + Origin::Node, + Kind::Shutdown, + "The node is shutting down", + ))?; } + + self.add_processor_impl(mailboxes, senders, shutdown_priority) } - fn add_processor_impl(&self, mailboxes: &Mailboxes, senders: SenderPair) -> Result<()> { + fn add_processor_impl( + &self, + mailboxes: &Mailboxes, + senders: SenderPair, + shutdown_priority: WorkerShutdownPriority, + ) -> Result<()> { debug!("Starting new processor '{}'", mailboxes.primary_address()); let SenderPair { msgs, ctrl } = senders; @@ -30,16 +38,17 @@ impl Router { mailboxes.additional_addresses().cloned().collect(), msgs, ctrl, + WorkerMeta { + processor: true, + detached: false, + }, + shutdown_priority, // We don't keep track of the mailbox count for processors // because, while they are able to send and receive messages // via their mailbox, most likely this metric is going to be // irrelevant. We may want to re-visit this decision in the // future, if the way processors are used changes. Arc::new(0.into()), - WorkerMeta { - processor: true, - detached: false, - }, ); self.map.insert_address_record(record, mailboxes)?; diff --git a/implementations/rust/ockam/ockam_node/src/router/record.rs b/implementations/rust/ockam/ockam_node/src/router/record.rs index 1732195c1e4..fb837438d6e 100644 --- a/implementations/rust/ockam/ockam_node/src/router/record.rs +++ b/implementations/rust/ockam/ockam_node/src/router/record.rs @@ -1,11 +1,11 @@ use crate::channel_types::{MessageSender, OneshotSender}; use crate::error::{NodeError, NodeReason}; use crate::relay::CtrlSignal; -use alloc::string::String; +use crate::WorkerShutdownPriority; use core::default::Default; use core::fmt::Debug; use core::sync::atomic::{AtomicU8, AtomicUsize, Ordering}; -use ockam_core::compat::collections::hash_map::{Entry, EntryRef}; +use ockam_core::compat::collections::hash_map::Entry; use ockam_core::compat::sync::RwLock as SyncRwLock; use ockam_core::errcode::{Kind, Origin}; use ockam_core::{ @@ -31,23 +31,16 @@ struct AddressMaps { metadata: SyncRwLock>, } -#[derive(Default)] -struct ClusterMaps { - /// The order in which clusters are allocated and de-allocated - order: Vec, - /// Key is Label - map: HashMap>, // Only primary addresses here -} - /// Address states and associated logic pub struct InternalMap { // NOTE: It's crucial that if more that one of these structures is needed to perform an // operation, we should always acquire locks in the order they're declared here. Otherwise, it // can cause a deadlock. address_maps: AddressMaps, - cluster_maps: SyncRwLock, - /// Track addresses that are being stopped atm + /// Track addresses that are being stopped stopping: SyncRwLock>, + /// Track addresses that are being stopped during shutdown + stopping_shutdown: SyncRwLock>, /// Access to [`FlowControls`] to clean resources flow_controls: FlowControls, /// Metrics collection and sharing @@ -87,8 +80,8 @@ impl InternalMap { pub(super) fn new(flow_controls: &FlowControls) -> Self { Self { address_maps: Default::default(), - cluster_maps: Default::default(), stopping: Default::default(), + stopping_shutdown: Default::default(), flow_controls: flow_controls.clone(), #[cfg(feature = "metrics")] metrics: Default::default(), @@ -96,6 +89,11 @@ impl InternalMap { } } +pub struct StopAckResult { + pub stopping_is_empty: bool, + pub stopping_shutdown_is_empty: bool, +} + impl InternalMap { pub(super) fn stop(&self, address: &Address, skip_sending_stop_signal: bool) -> Result<()> { // To guarantee consistency we'll first acquire lock on all the maps we need to touch @@ -133,7 +131,11 @@ impl InternalMap { // Detached doesn't need any stop confirmation, since they don't have a Relay = don't have // an async task running in a background that should be stopped. if !record.meta.detached { - stopping.insert(primary_address); + let res = stopping.insert(primary_address); + debug!( + "Inserted {} into stopping. Res = {}", + record.primary_address, res + ); } record.stop(skip_sending_stop_signal)?; @@ -141,12 +143,33 @@ impl InternalMap { Ok(()) } - pub(super) fn stop_ack(&self, primary_address: &Address) -> bool { + pub(super) fn stop_ack(&self, primary_address: &Address) -> StopAckResult { // FIXME: Detached workers - let mut stopping = self.stopping.write().unwrap(); - stopping.remove(primary_address); + let stopping_is_empty = { + let mut stopping = self.stopping.write().unwrap(); + let res = stopping.remove(primary_address); + + debug!("Removing {} from stopping. Res = {}", primary_address, res); + + stopping.is_empty() + }; + + let stopping_shutdown_is_empty = { + let mut stopping_shutdown = self.stopping_shutdown.write().unwrap(); + + let res = stopping_shutdown.remove(primary_address); + debug!( + "Removing {} from stopping_shutdown. Res = {}", + primary_address, res + ); - stopping.is_empty() + stopping_shutdown.is_empty() + }; + + StopAckResult { + stopping_is_empty, + stopping_shutdown_is_empty, + } } pub(super) fn is_worker_registered_at(&self, primary_address: &Address) -> bool { @@ -292,10 +315,6 @@ impl InternalMap { self.address_maps.records.read().unwrap().len(), Ordering::Release, ); - self.metrics.1.store( - self.cluster_maps.read().unwrap().map.len(), - Ordering::Release, - ); } #[cfg(feature = "metrics")] @@ -308,60 +327,34 @@ impl InternalMap { self.metrics.0.load(Ordering::Acquire) } - /// Add an address to a particular cluster - pub(super) fn set_cluster(&self, primary: &Address, label: String) -> Result<()> { - if !self - .address_maps - .records - .read() - .unwrap() - .contains_key(primary) - { - return Err(NodeError::Address(primary.clone()).not_found())?; - } - - // If this is the first time we see this cluster ID - let mut cluster_maps = self.cluster_maps.write().unwrap(); - match cluster_maps.map.entry_ref(&label) { - EntryRef::Occupied(mut occupied_entry) => { - occupied_entry.get_mut().insert(primary.clone()); - } - EntryRef::Vacant(vacant_entry) => { - vacant_entry.insert_entry(HashSet::from([primary.clone()])); - cluster_maps.order.push(label.clone()); - } - } - - Ok(()) - } - /// Stop all workers not in a cluster, returns their primary addresses - pub(super) fn stop_all_non_cluster_workers(&self) -> bool { - // All clustered addresses - let clustered = self.cluster_maps.read().unwrap().map.iter().fold( - HashSet::new(), - |mut acc, (_, set)| { - acc.extend(set.iter().cloned()); - acc - }, - ); - - let mut records = self.address_maps.records.write().unwrap(); - let mut stopping = self.stopping.write().unwrap(); + pub(super) fn stop_workers(&self, shutdown_priority: WorkerShutdownPriority) -> bool { + let records_to_stop: Vec = { + let mut records = self.address_maps.records.write().unwrap(); + + records + .extract_if(|_addr, record| { + // Filter all clustered workers + record.shutdown_order == shutdown_priority + }) + .map(|(_addr, record)| record) + .collect() + }; - let records_to_stop = records.extract_if(|addr, _record| { - // Filter all clustered workers - !clustered.contains(addr) - }); + let mut stopping_shutdown = self.stopping_shutdown.write().unwrap(); let mut was_empty = true; - for (_, record) in records_to_stop { + for record in records_to_stop { // Detached doesn't need any stop confirmation, since they don't have a Relay = don't have // an async task running in a background that should be stopped. if !record.meta.detached { was_empty = false; - stopping.insert(record.primary_address.clone()); + let res = stopping_shutdown.insert(record.primary_address.clone()); + debug!( + "Inserted {} into stopping_shutdown. Res = {}", + record.primary_address, res + ); } if let Err(err) = record.stop(false) { @@ -373,47 +366,6 @@ impl InternalMap { was_empty } - pub(super) fn stop_all_cluster_workers(&self) -> bool { - let mut records = self.address_maps.records.write().unwrap(); - let mut cluster_maps = self.cluster_maps.write().unwrap(); - let mut stopping = self.stopping.write().unwrap(); - - let mut was_empty = true; - while let Some(label) = cluster_maps.order.pop() { - if let Some(addrs) = cluster_maps.map.remove(&label) { - for addr in addrs { - match records.remove(&addr) { - Some(record) => { - was_empty = false; - // Detached doesn't need any stop confirmation, since they don't have a Relay = don't have - // an async task running in a background that should be stopped. - if !record.meta.detached { - stopping.insert(record.primary_address.clone()); - } - match record.stop(false) { - Ok(_) => {} - Err(err) => { - error!( - "Error stopping address {} from cluster {}. Err={}", - addr, label, err - ); - } - } - } - None => { - warn!( - "Stopping address {} from cluster {} but it doesn't exist", - addr, label - ); - } - } - } - } - } - - was_empty - } - pub(super) fn force_clear_records(&self) -> Vec
{ let mut records = self.address_maps.records.write().unwrap(); @@ -437,6 +389,7 @@ pub struct AddressRecord { ctrl_tx: OneshotSender, state: AtomicU8, meta: WorkerMeta, + shutdown_order: WorkerShutdownPriority, msg_count: Arc, } @@ -460,8 +413,9 @@ impl AddressRecord { additional_addresses: Vec
, sender: MessageSender, ctrl_tx: OneshotSender, - msg_count: Arc, meta: WorkerMeta, + shutdown_order: WorkerShutdownPriority, + msg_count: Arc, ) -> Self { AddressRecord { primary_address, @@ -469,8 +423,9 @@ impl AddressRecord { sender, ctrl_tx, state: AtomicU8::new(AddressState::Running as u8), - msg_count, meta, + shutdown_order, + msg_count, } } diff --git a/implementations/rust/ockam/ockam_node/src/router/router.rs b/implementations/rust/ockam/ockam_node/src/router/router.rs new file mode 100644 index 00000000000..5d81c0a8a01 --- /dev/null +++ b/implementations/rust/ockam/ockam_node/src/router/router.rs @@ -0,0 +1,178 @@ +#[cfg(feature = "metrics")] +use core::sync::atomic::AtomicUsize; + +use super::record::InternalMap; +use crate::channel_types::{MessageSender, OneshotSender}; +use crate::relay::CtrlSignal; +use crate::{NodeError, NodeReason}; +use alloc::vec::Vec; +use ockam_core::compat::collections::hash_map::Entry; +use ockam_core::compat::collections::HashMap; +use ockam_core::compat::sync::RwLock as SyncRwLock; +use ockam_core::errcode::{Kind, Origin}; +use ockam_core::flow_control::FlowControls; +use ockam_core::{Address, AddressMetadata, Error, RelayMessage, Result, TransportType}; + +/// A pair of senders to a worker relay +#[derive(Debug)] +pub struct SenderPair { + pub msgs: MessageSender, + pub ctrl: OneshotSender, +} + +enum RouteType { + Internal, + External(TransportType), +} + +fn determine_type(next: &Address) -> RouteType { + if next.transport_type().is_local() { + RouteType::Internal + } else { + RouteType::External(next.transport_type()) + } +} + +/// A combined address type and local worker router +/// +/// This router supports two routing modes: internal, and external. +/// +/// Internal routing resolves `type=0` addresses to local workers. +/// +/// External routing is supported only after a plugin component +/// registers itself with this router. Only one router can be +/// registered per address type. +pub struct Router { + /// Keep track of some additional router state information + pub(super) state: SyncRwLock, // TODO: Could be AtomicU8 + /// Internal address state + pub(super) map: InternalMap, + /// Externally registered router components + pub(super) external: SyncRwLock>, + pub(super) shutdown_yield_sender: SyncRwLock>>, + pub(super) stopped_broadcast_sender: SyncRwLock>>, +} + +/// Node state +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum RouterState { + Running, + Stopping, + Stopped, +} + +impl Router { + pub fn new(flow_controls: &FlowControls) -> Self { + let (stopped_broadcast_sender, _stopped_broadcast_receiver) = + tokio::sync::broadcast::channel(1); + + Self { + state: RouterState::Running.into(), + map: InternalMap::new(flow_controls), + external: Default::default(), + shutdown_yield_sender: Default::default(), + stopped_broadcast_sender: SyncRwLock::new(Some(stopped_broadcast_sender)), + } + } + + pub fn list_workers(&self) -> Vec
{ + self.map.list_workers() + } + + pub fn is_worker_registered_at(&self, address: &Address) -> bool { + self.map.is_worker_registered_at(address) + } + + pub fn stop_ack(&self, primary_address: &Address) -> Result<()> { + let state = *self.state.read().unwrap(); + debug!("Handling shutdown ACK for {}", primary_address); + + let stop_ack_result = self.map.stop_ack(primary_address); + + if state == RouterState::Stopping { + // FIXME: This is dubious and far from perfect + if stop_ack_result.stopping_shutdown_is_empty && stop_ack_result.stopping_is_empty { + let sender = self.shutdown_yield_sender.write().unwrap().take(); + + // FIXME: Why it's sometime None? + if let Some(sender) = sender { + debug!("Sending stop_ack signal"); + if sender.send(()).is_err() { + warn!("shutdown_yield send Errored"); + } + } else { + warn!("shutdown_yield_sender is None"); + } + } + } + + Ok(()) + } + + pub fn find_terminal_address<'a>( + &self, + addresses: impl Iterator, + ) -> Option<(&'a Address, AddressMetadata)> { + self.map.find_terminal_address(addresses) + } + + pub fn get_address_metadata(&self, address: &Address) -> Option { + self.map.get_address_metadata(address) + } + + pub fn register_router(&self, tt: TransportType, addr: Address) -> Result<()> { + if let Entry::Vacant(e) = self.external.write().unwrap().entry(tt) { + e.insert(addr); + Ok(()) + } else { + // already exists + Err(Error::new( + Origin::Node, + Kind::AlreadyExists, + "Router already exists", + )) + } + } + + pub fn resolve(&self, addr: &Address) -> Result> { + let addr = match determine_type(addr) { + RouteType::Internal => addr, + // TODO: Remove after other transport implementations are moved to new architecture + RouteType::External(tt) => &self.address_for_transport(tt)?, + }; + self.map.resolve(addr) + } + + fn address_for_transport(&self, tt: TransportType) -> Result
{ + let guard = self.external.read().unwrap(); + guard + .get(&tt) + .cloned() + .ok_or_else(|| NodeError::NodeState(NodeReason::Unknown).internal()) + } + + /// Stop the worker + pub fn stop_address(&self, addr: &Address, skip_sending_stop_signal: bool) -> Result<()> { + debug!("Stopping address '{}'", addr); + + self.map.stop(addr, skip_sending_stop_signal)?; + + Ok(()) + } + + pub async fn wait_termination(&self) -> Result<()> { + let mut receiver = match self.stopped_broadcast_sender.read().unwrap().as_ref() { + None => return Ok(()), + Some(sender) => sender.subscribe(), + }; + + receiver.recv().await.unwrap(); // FIXME + + Ok(()) + } + + #[cfg(feature = "metrics")] + pub(crate) fn get_metrics_readout(&self) -> (Arc, Arc) { + self.map.get_metrics() + } +} diff --git a/implementations/rust/ockam/ockam_node/src/router/shutdown.rs b/implementations/rust/ockam/ockam_node/src/router/shutdown.rs index a91da3168e1..fe6dcd3942b 100644 --- a/implementations/rust/ockam/ockam_node/src/router/shutdown.rs +++ b/implementations/rust/ockam/ockam_node/src/router/shutdown.rs @@ -1,5 +1,7 @@ -use super::Router; +use super::{Router, RouterState}; +use crate::channel_types::oneshot_channel; use crate::tokio::time; +use crate::WorkerShutdownPriority; use core::time::Duration; use ockam_core::compat::sync::Arc; use ockam_core::Result; @@ -13,34 +15,38 @@ impl Router { /// Implement the graceful stop strategy #[cfg_attr(not(feature = "std"), allow(unused_variables))] pub async fn stop_graceful(self: Arc, seconds: u8) -> Result<()> { - // Mark the router as shutting down to prevent spawning - info!("Initiate graceful node shutdown"); // This changes the router state to `Stopping` - let receiver = self.state.set_to_stopping(); - let receiver = if let Some(receiver) = receiver { - receiver - } else { - debug!("Node is already terminated"); - return Ok(()); - }; + let state = { + let mut state = self.state.write().unwrap(); - // Start by shutting down clusterless workers - let no_non_cluster_workers = self.map.stop_all_non_cluster_workers(); + let state_val = *state; + if state_val == RouterState::Running { + *state = RouterState::Stopping; + } - // Not stop cluster addresses - let no_cluster_workers = self.map.stop_all_cluster_workers(); + state_val + }; - if no_non_cluster_workers && no_cluster_workers { - // No stop ack will arrive because we didn't stop anything - self.state.set_to_stopped(); - return Ok(()); + match state { + RouterState::Running => {} + RouterState::Stopping => { + info!("Router is already stopping"); + self.wait_termination().await?; + return Ok(()); + } + RouterState::Stopped => { + info!("Router is already stopped down"); + return Ok(()); + } } + info!("Initiate graceful node shutdown"); + // Start a timeout task to interrupt us... let dur = Duration::from_secs(seconds as u64); let r = self.clone(); - let timeout_handle = self.runtime_handle.spawn(async move { + let timeout = async move { time::sleep(dur).await; warn!("Shutdown timeout reached; aborting node!"); @@ -53,14 +59,59 @@ impl Router { uncleared_addresses ); } + }; + + let r = self.clone(); + let shutdown = async move { + for shutdown_priority in WorkerShutdownPriority::all_descending_order() { + let (shutdown_yield_sender, shutdown_yield_receiver) = oneshot_channel(); - r.state.set_to_stopped(); - }); + *r.shutdown_yield_sender.write().unwrap() = Some(shutdown_yield_sender); - receiver.await.unwrap(); //FIXME + debug!("Stopping workers with priority: {:?}", shutdown_priority); + let was_empty = r.map.stop_workers(shutdown_priority); + + if was_empty { + debug!( + "There was no workers with priority: {:?}", + shutdown_priority + ); + continue; + } + + debug!( + "Waiting for yield for workers with priority: {:?}", + shutdown_priority + ); + // Wait for stop ack + shutdown_yield_receiver.await.unwrap(); + + debug!( + "Received yield for workers with priority: {:?}", + shutdown_priority + ); + } + }; + + tokio::select! { + _ = shutdown => { + debug!("Router shutdown finished"); + } + _ = timeout => { + warn!("Shutdown timeout reached; aborting node!"); + } + } - #[cfg(feature = "std")] - timeout_handle.abort(); + debug!("Setting Router state to Stopped"); + *self.state.write().unwrap() = RouterState::Stopped; + debug!("Sending Router stopped broadcast"); + _ = self + .stopped_broadcast_sender + .write() + .unwrap() + .take() + .unwrap() + .send(()); // FIXME Ok(()) } diff --git a/implementations/rust/ockam/ockam_node/src/router/state.rs b/implementations/rust/ockam/ockam_node/src/router/state.rs deleted file mode 100644 index 1e8e46188a6..00000000000 --- a/implementations/rust/ockam/ockam_node/src/router/state.rs +++ /dev/null @@ -1,105 +0,0 @@ -//! Router run state utilities -use crate::channel_types::{OneshotReceiver, OneshotSender}; -use alloc::vec::Vec; -use core::ops::{Deref, DerefMut}; -use core::sync::atomic::AtomicBool; -use ockam_core::compat::sync::Mutex as SyncMutex; - -// TODO: Merge RouterState and NodeState. -/// Node state -#[derive(Clone)] -pub enum NodeState { - Running, - Stopping, - Stopped, -} - -pub struct RouterState { - node_state: SyncMutex, // TODO: Use AtomicU8 instead and remove is_running field - termination_senders: SyncMutex>>, - is_running: AtomicBool, -} - -impl RouterState { - pub fn new() -> Self { - Self { - node_state: SyncMutex::new(NodeState::Running), - termination_senders: SyncMutex::new(Default::default()), - is_running: AtomicBool::new(true), - } - } - - /// Set the router state to `Stopping` and return a receiver - /// to wait for the stop to complete. - /// When `None` is returned, the router is already terminated. - pub(super) fn set_to_stopping(&self) -> Option> { - let mut node_state = self.node_state.lock().unwrap(); - match node_state.deref_mut() { - NodeState::Running => { - let (sender, receiver) = crate::channel_types::oneshot_channel(); - *node_state = NodeState::Stopping; - self.is_running - .store(false, core::sync::atomic::Ordering::Relaxed); - self.termination_senders.lock().unwrap().push(sender); - Some(receiver) - } - NodeState::Stopping => { - let (sender, receiver) = crate::channel_types::oneshot_channel(); - self.termination_senders.lock().unwrap().push(sender); - Some(receiver) - } - NodeState::Stopped => None, - } - } - - /// Set the router to `Stopped` state and notify all tasks waiting for shutdown - pub(super) fn set_to_stopped(&self) { - self.is_running - .store(false, core::sync::atomic::Ordering::Relaxed); - let previous = { - let mut guard = self.node_state.lock().unwrap(); - core::mem::replace(guard.deref_mut(), NodeState::Stopped) - }; - - match previous { - NodeState::Running | NodeState::Stopping => { - info!("No more workers left. Goodbye!"); - let senders = { - let mut guard = self.termination_senders.lock().unwrap(); - core::mem::take(guard.deref_mut()) - }; - for sender in senders { - let _ = sender.send(()); - } - } - NodeState::Stopped => {} - } - } - - pub(super) async fn wait_until_stopped(&self) { - let receiver = { - let guard = self.node_state.lock().unwrap(); - match guard.deref() { - NodeState::Running | NodeState::Stopping => { - let (sender, receiver) = crate::channel_types::oneshot_channel(); - self.termination_senders.lock().unwrap().push(sender); - receiver - } - NodeState::Stopped => { - return; - } - } - }; - receiver.await.unwrap() // FIXME - } - - pub(super) fn is_running(&self) -> bool { - self.is_running.load(core::sync::atomic::Ordering::Relaxed) - } - - /// Check if this router is still `running`, meaning allows - /// spawning new workers and processors - pub(super) fn node_state(&self) -> NodeState { - self.node_state.lock().unwrap().clone() - } -} diff --git a/implementations/rust/ockam/ockam_node/src/router/worker.rs b/implementations/rust/ockam/ockam_node/src/router/worker.rs index ca8ea9a012a..463054f6eae 100644 --- a/implementations/rust/ockam/ockam_node/src/router/worker.rs +++ b/implementations/rust/ockam/ockam_node/src/router/worker.rs @@ -1,4 +1,6 @@ -use crate::router::{AddressRecord, NodeState, Router, SenderPair, WorkerMeta}; +use crate::router::record::{AddressRecord, WorkerMeta}; +use crate::router::{Router, RouterState, SenderPair}; +use crate::WorkerShutdownPriority; use core::sync::atomic::AtomicUsize; use ockam_core::errcode::{Kind, Origin}; use ockam_core::{compat::sync::Arc, Error, Mailboxes, Result}; @@ -10,21 +12,18 @@ impl Router { mailboxes: &Mailboxes, senders: SenderPair, detached: bool, + shutdown_priority: WorkerShutdownPriority, metrics: Arc, ) -> Result<()> { - if !self.state.is_running() { - match self.state.node_state() { - NodeState::Stopping => Err(Error::new( - Origin::Node, - Kind::Shutdown, - "The node is shutting down", - ))?, - NodeState::Running => unreachable!(), - NodeState::Stopped => unreachable!(), - } - } else { - self.add_worker_impl(mailboxes, senders, detached, metrics) + if *self.state.read().unwrap() != RouterState::Running { + return Err(Error::new( + Origin::Node, + Kind::Shutdown, + "The node is shutting down", + ))?; } + + self.add_worker_impl(mailboxes, senders, detached, shutdown_priority, metrics) } fn add_worker_impl( @@ -32,6 +31,7 @@ impl Router { mailboxes: &Mailboxes, senders: SenderPair, detached: bool, + shutdown_priority: WorkerShutdownPriority, metrics: Arc, ) -> Result<()> { debug!("Starting new worker '{}'", mailboxes.primary_address()); @@ -43,11 +43,12 @@ impl Router { mailboxes.additional_addresses().cloned().collect(), msgs, ctrl, - metrics, WorkerMeta { processor: false, detached, }, + shutdown_priority, + metrics, ); self.map.insert_address_record(address_record, mailboxes)?; diff --git a/implementations/rust/ockam/ockam_node/src/worker_builder.rs b/implementations/rust/ockam/ockam_node/src/worker_builder.rs index 7eb3328ce6b..07d15f9b0e2 100644 --- a/implementations/rust/ockam/ockam_node/src/worker_builder.rs +++ b/implementations/rust/ockam/ockam_node/src/worker_builder.rs @@ -1,4 +1,4 @@ -use crate::{debugger, ContextMode}; +use crate::{debugger, ContextMode, WorkerShutdownPriority}; use crate::{relay::WorkerRelay, Context}; use ockam_core::compat::string::String; use ockam_core::compat::sync::Arc; @@ -69,6 +69,7 @@ where worker: self.worker, address: address.into(), metadata, + shutdown_priority: Default::default(), } } @@ -76,6 +77,7 @@ where pub fn with_mailboxes(self, mailboxes: Mailboxes) -> WorkerBuilderMultipleAddresses { WorkerBuilderMultipleAddresses { mailboxes, + shutdown_priority: Default::default(), worker: self.worker, } } @@ -86,6 +88,7 @@ where W: Worker, { mailboxes: Mailboxes, + shutdown_priority: WorkerShutdownPriority, worker: W, } @@ -95,7 +98,12 @@ where { /// Consume this builder and start a new Ockam [`Worker`] from the given context pub async fn start(self, context: &Context) -> Result<()> { - start(context, self.mailboxes, self.worker).await + start(context, self.mailboxes, self.shutdown_priority, self.worker).await + } + + pub fn with_shutdown_priority(mut self, shutdown_priority: WorkerShutdownPriority) -> Self { + self.shutdown_priority = shutdown_priority; + self } } @@ -108,6 +116,7 @@ where address: Address, worker: W, metadata: Option, + shutdown_priority: WorkerShutdownPriority, } impl WorkerBuilderOneAddress @@ -142,6 +151,11 @@ where self } + pub fn with_shutdown_priority(mut self, shutdown_priority: WorkerShutdownPriority) -> Self { + self.shutdown_priority = shutdown_priority; + self + } + /// Consume this builder and start a new Ockam [`Worker`] from the given context pub async fn start(self, context: &Context) -> Result<()> { start( @@ -155,6 +169,7 @@ where ), vec![], ), + self.shutdown_priority, self.worker, ) .await @@ -203,7 +218,12 @@ where } /// Consume this builder and start a new Ockam [`Worker`] from the given context -async fn start(context: &Context, mailboxes: Mailboxes, worker: W) -> Result<()> +async fn start( + context: &Context, + mailboxes: Mailboxes, + shutdown_priority: WorkerShutdownPriority, + worker: W, +) -> Result<()> where W: Worker, { @@ -220,7 +240,13 @@ where debugger::log_inherit_context("WORKER", context, &ctx); let router = context.router()?; - router.add_worker(ctx.mailboxes(), sender, false, context.mailbox_count())?; + router.add_worker( + ctx.mailboxes(), + sender, + false, + shutdown_priority, + context.mailbox_count(), + )?; // Then initialise the worker message relay WorkerRelay::init(context.runtime(), worker, ctx, ctrl_rx); diff --git a/implementations/rust/ockam/ockam_node/tests/tests.rs b/implementations/rust/ockam/ockam_node/tests/tests.rs index e0a1a9ec30f..f6d781528bc 100644 --- a/implementations/rust/ockam/ockam_node/tests/tests.rs +++ b/implementations/rust/ockam/ockam_node/tests/tests.rs @@ -39,7 +39,7 @@ async fn receive_timeout__1_sec__should_return_from_call(ctx: &mut Context) -> R #[allow(non_snake_case)] #[test] fn start_and_shutdown_node__many_iterations__should_not_fail() { - for _ in 0..1 { + for _ in 0..50 { let (ctx, mut executor) = NodeBuilder::new().build(); executor .execute(async move { diff --git a/implementations/rust/ockam/ockam_transport_ble/src/lib.rs b/implementations/rust/ockam/ockam_transport_ble/src/lib.rs index 651d66335ad..b4dd95dded3 100644 --- a/implementations/rust/ockam/ockam_transport_ble/src/lib.rs +++ b/implementations/rust/ockam/ockam_transport_ble/src/lib.rs @@ -39,5 +39,3 @@ pub use types::*; /// BLE address type constant pub const BLE: TransportType = TransportType::new(4); - -pub(crate) const CLUSTER_NAME: &str = "_internals.transport.ble"; diff --git a/implementations/rust/ockam/ockam_transport_ble/src/router/mod.rs b/implementations/rust/ockam/ockam_transport_ble/src/router/mod.rs index 05cefc8cca7..4e5e9499b77 100644 --- a/implementations/rust/ockam/ockam_transport_ble/src/router/mod.rs +++ b/implementations/rust/ockam/ockam_transport_ble/src/router/mod.rs @@ -109,11 +109,6 @@ impl Worker for BleRouter { type Context = Context; type Message = Any; - async fn initialize(&mut self, ctx: &mut Context) -> Result<()> { - ctx.set_cluster(crate::CLUSTER_NAME)?; - Ok(()) - } - async fn handle_message(&mut self, ctx: &mut Context, msg: Routed) -> Result<()> { let msg_addr = msg.msg_addr(); diff --git a/implementations/rust/ockam/ockam_transport_ble/src/workers/listener.rs b/implementations/rust/ockam/ockam_transport_ble/src/workers/listener.rs index 5696bac7740..128264d3190 100644 --- a/implementations/rust/ockam/ockam_transport_ble/src/workers/listener.rs +++ b/implementations/rust/ockam/ockam_transport_ble/src/workers/listener.rs @@ -55,10 +55,6 @@ where { type Context = Context; - async fn initialize(&mut self, ctx: &mut Self::Context) -> Result<()> { - ctx.set_cluster(crate::CLUSTER_NAME) - } - async fn process(&mut self, ctx: &mut Self::Context) -> Result { if self.inner.is_none() { return Ok(true); diff --git a/implementations/rust/ockam/ockam_transport_ble/src/workers/receiver.rs b/implementations/rust/ockam/ockam_transport_ble/src/workers/receiver.rs index 0d762c4163f..949ea192192 100644 --- a/implementations/rust/ockam/ockam_transport_ble/src/workers/receiver.rs +++ b/implementations/rust/ockam/ockam_transport_ble/src/workers/receiver.rs @@ -38,10 +38,6 @@ where { type Context = Context; - async fn initialize(&mut self, ctx: &mut Context) -> Result<()> { - ctx.set_cluster(crate::CLUSTER_NAME) - } - async fn process(&mut self, ctx: &mut Context) -> Result { let mut buffer = [0_u8; crate::driver::MAX_OCKAM_MESSAGE_LENGTH]; diff --git a/implementations/rust/ockam/ockam_transport_ble/src/workers/sender.rs b/implementations/rust/ockam/ockam_transport_ble/src/workers/sender.rs index ca59918f006..e2476e2aace 100644 --- a/implementations/rust/ockam/ockam_transport_ble/src/workers/sender.rs +++ b/implementations/rust/ockam/ockam_transport_ble/src/workers/sender.rs @@ -83,8 +83,6 @@ where type Message = TransportMessage; async fn initialize(&mut self, ctx: &mut Self::Context) -> Result<()> { - ctx.set_cluster(crate::CLUSTER_NAME)?; - debug!("initialize for peer: {:?}", self.peer); if let Some(rx_stream) = self.rx_stream.take() { diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/lib.rs b/implementations/rust/ockam/ockam_transport_tcp/src/lib.rs index 43dffb09877..e83c06d6a81 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/lib.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/lib.rs @@ -41,8 +41,6 @@ pub use transport::*; /// eBPF backed TCP portals that works on TCP level rather than on top of TCP pub mod privileged_portal; -pub(crate) const CLUSTER_NAME: &str = "_internals.transport.tcp"; - /// Transport type for TCP addresses pub const TCP: ockam_core::TransportType = ockam_core::TransportType::new(1); diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/workers/listener.rs b/implementations/rust/ockam/ockam_transport_tcp/src/workers/listener.rs index 563569c0d92..32c9e18b6a6 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/workers/listener.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/workers/listener.rs @@ -2,7 +2,7 @@ use crate::workers::{Addresses, TcpRecvProcessor}; use crate::{TcpConnectionMode, TcpListenerInfo, TcpListenerOptions, TcpRegistry, TcpSendWorker}; use ockam_core::{async_trait, compat::net::SocketAddr}; use ockam_core::{Address, Processor, Result}; -use ockam_node::Context; +use ockam_node::{Context, ProcessorBuilder, WorkerShutdownPriority}; use ockam_transport_core::TransportError; use tokio::net::TcpListener; use tracing::{debug, instrument}; @@ -43,7 +43,11 @@ impl TcpListenProcessor { options, }; - ctx.start_processor(address.clone(), processor).await?; + ProcessorBuilder::new(processor) + .with_address(address.clone()) + .with_shutdown_priority(WorkerShutdownPriority::Priority5) + .start(ctx) + .await?; Ok((saddr, address)) } @@ -55,8 +59,6 @@ impl Processor for TcpListenProcessor { #[instrument(skip_all, name = "TcpListenProcessor::initialize")] async fn initialize(&mut self, ctx: &mut Context) -> Result<()> { - ctx.set_cluster(crate::CLUSTER_NAME)?; - self.registry.add_listener_processor(TcpListenerInfo::new( ctx.primary_address().clone(), self.socket_address, diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/workers/receiver.rs b/implementations/rust/ockam/ockam_transport_tcp/src/workers/receiver.rs index 914458a908b..d66f136e0fb 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/workers/receiver.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/workers/receiver.rs @@ -13,7 +13,7 @@ use ockam_core::{ OutgoingAccessControl, }; use ockam_core::{Processor, Result}; -use ockam_node::{Context, ProcessorBuilder}; +use ockam_node::{Context, ProcessorBuilder, WorkerShutdownPriority}; use ockam_transport_core::TransportError; use tokio::{io::AsyncReadExt, net::tcp::OwnedReadHalf}; use tracing::{info, instrument, trace}; @@ -94,6 +94,7 @@ impl TcpRecvProcessor { ); ProcessorBuilder::new(receiver) .with_mailboxes(Mailboxes::new(mailbox, vec![internal])) + .with_shutdown_priority(WorkerShutdownPriority::Priority1) .start(ctx) .await?; @@ -121,8 +122,6 @@ impl Processor for TcpRecvProcessor { #[instrument(skip_all, name = "TcpRecvProcessor::initialize")] async fn initialize(&mut self, ctx: &mut Context) -> Result<()> { - ctx.set_cluster(crate::CLUSTER_NAME)?; - self.registry.add_receiver_processor(TcpReceiverInfo::new( ctx.primary_address().clone(), self.addresses.sender_address().clone(), diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/workers/sender.rs b/implementations/rust/ockam/ockam_transport_tcp/src/workers/sender.rs index 4c7b98dc56f..241585e2f2e 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/workers/sender.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/workers/sender.rs @@ -7,7 +7,7 @@ use ockam_core::{ AddressMetadata, AllowAll, AllowSourceAddress, DenyAll, LocalMessage, }; use ockam_core::{Any, Decodable, Mailbox, Mailboxes, Message, Result, Routed, Worker}; -use ockam_node::{Context, WorkerBuilder}; +use ockam_node::{Context, WorkerBuilder, WorkerShutdownPriority}; use crate::transport_message::TcpTransportMessage; use ockam_transport_core::TransportError; @@ -108,6 +108,7 @@ impl TcpSendWorker { WorkerBuilder::new(sender_worker) .with_mailboxes(Mailboxes::new(main_mailbox.clone(), vec![internal_mailbox])) + .with_shutdown_priority(WorkerShutdownPriority::Priority1) .start(ctx) .await?; @@ -169,8 +170,6 @@ impl Worker for TcpSendWorker { #[instrument(skip_all, name = "TcpSendWorker::initialize")] async fn initialize(&mut self, ctx: &mut Self::Context) -> Result<()> { - ctx.set_cluster(crate::CLUSTER_NAME)?; - self.registry.add_sender_worker(TcpSenderInfo::new( self.addresses.sender_address().clone(), self.addresses.receiver_address().clone(), diff --git a/implementations/rust/ockam/ockam_transport_udp/src/lib.rs b/implementations/rust/ockam/ockam_transport_udp/src/lib.rs index 70bfe40b390..023cfd7b551 100644 --- a/implementations/rust/ockam/ockam_transport_udp/src/lib.rs +++ b/implementations/rust/ockam/ockam_transport_udp/src/lib.rs @@ -27,8 +27,6 @@ pub use options::UdpBindOptions; pub use puncture::*; pub use transport::{UdpBind, UdpBindArguments, UdpTransport, UdpTransportExtension}; -pub(crate) const CLUSTER_NAME: &str = "_internals.transport.udp"; - /// Transport type for UDP addresses pub const UDP: ockam_core::TransportType = ockam_core::TransportType::new(2); diff --git a/implementations/rust/ockam/ockam_transport_udp/src/transport/bind.rs b/implementations/rust/ockam/ockam_transport_udp/src/transport/bind.rs index c558f76b21e..5356e609d4b 100644 --- a/implementations/rust/ockam/ockam_transport_udp/src/transport/bind.rs +++ b/implementations/rust/ockam/ockam_transport_udp/src/transport/bind.rs @@ -6,7 +6,7 @@ use ockam_core::errcode::{Kind, Origin}; use ockam_core::flow_control::FlowControlId; use ockam_core::{Address, AllowAll, DenyAll, Error, Result}; use ockam_node::compat::asynchronous::resolve_peer; -use ockam_node::{ProcessorBuilder, WorkerBuilder}; +use ockam_node::{ProcessorBuilder, WorkerBuilder, WorkerShutdownPriority}; use ockam_transport_core::{parse_socket_addr, TransportError}; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use tokio::net::UdpSocket; @@ -128,6 +128,7 @@ impl UdpTransport { .with_address(addresses.receiver_address().clone()) .with_incoming_access_control(DenyAll) .with_outgoing_access_control_arc(receiver_outgoing_access_control) + .with_shutdown_priority(WorkerShutdownPriority::Priority1) .start(&self.ctx) .await?; diff --git a/implementations/rust/ockam/ockam_transport_udp/src/workers/receiver.rs b/implementations/rust/ockam/ockam_transport_udp/src/workers/receiver.rs index 94ae1e876cc..ffe979f1efa 100644 --- a/implementations/rust/ockam/ockam_transport_udp/src/workers/receiver.rs +++ b/implementations/rust/ockam/ockam_transport_udp/src/workers/receiver.rs @@ -144,10 +144,6 @@ impl UdpReceiverProcessor { impl Processor for UdpReceiverProcessor { type Context = Context; - async fn initialize(&mut self, ctx: &mut Context) -> Result<()> { - ctx.set_cluster(crate::CLUSTER_NAME) - } - async fn process(&mut self, ctx: &mut Self::Context) -> Result { trace!("Waiting for incoming UDP datagram..."); diff --git a/implementations/rust/ockam/ockam_transport_uds/src/router/uds_router.rs b/implementations/rust/ockam/ockam_transport_uds/src/router/uds_router.rs index c40c79f32b8..ee81ae65c46 100644 --- a/implementations/rust/ockam/ockam_transport_uds/src/router/uds_router.rs +++ b/implementations/rust/ockam/ockam_transport_uds/src/router/uds_router.rs @@ -268,10 +268,6 @@ impl Worker for UdsRouter { type Context = Context; type Message = Any; - async fn initialize(&mut self, ctx: &mut Context) -> Result<()> { - ctx.set_cluster(crate::CLUSTER_NAME) - } - async fn handle_message(&mut self, ctx: &mut Context, msg: Routed) -> Result<()> { let return_route = msg.return_route().clone(); let msg_addr = msg.msg_addr(); diff --git a/implementations/rust/ockam/ockam_transport_uds/src/workers/listener.rs b/implementations/rust/ockam/ockam_transport_uds/src/workers/listener.rs index 37e4d283c25..a9858bf0aa2 100644 --- a/implementations/rust/ockam/ockam_transport_uds/src/workers/listener.rs +++ b/implementations/rust/ockam/ockam_transport_uds/src/workers/listener.rs @@ -60,10 +60,6 @@ impl UdsListenProcessor { impl Processor for UdsListenProcessor { type Context = Context; - async fn initialize(&mut self, ctx: &mut Context) -> Result<()> { - ctx.set_cluster(crate::CLUSTER_NAME) - } - /// Listen for and accept incoming UDS connections. /// /// Register the peers socket address, and create a worker to communicate with the peer. diff --git a/implementations/rust/ockam/ockam_transport_uds/src/workers/receiver.rs b/implementations/rust/ockam/ockam_transport_uds/src/workers/receiver.rs index 4e0c95e282c..e04c6b0cc37 100644 --- a/implementations/rust/ockam/ockam_transport_uds/src/workers/receiver.rs +++ b/implementations/rust/ockam/ockam_transport_uds/src/workers/receiver.rs @@ -36,10 +36,6 @@ impl UdsRecvProcessor { impl Processor for UdsRecvProcessor { type Context = Context; - async fn initialize(&mut self, ctx: &mut Context) -> Result<()> { - ctx.set_cluster(crate::CLUSTER_NAME) - } - /// Get the next message from the connection if there are any /// available and forward it to the next hop in the route. async fn process(&mut self, ctx: &mut Context) -> Result { diff --git a/implementations/rust/ockam/ockam_transport_uds/src/workers/sender.rs b/implementations/rust/ockam/ockam_transport_uds/src/workers/sender.rs index 0c2e55965a7..0e47e4bbbc7 100644 --- a/implementations/rust/ockam/ockam_transport_uds/src/workers/sender.rs +++ b/implementations/rust/ockam/ockam_transport_uds/src/workers/sender.rs @@ -180,8 +180,6 @@ impl Worker for UdsSendWorker { /// /// Spawn a UDS Recceiver worker to processes incoming UDS messages async fn initialize(&mut self, ctx: &mut Self::Context) -> Result<()> { - ctx.set_cluster(crate::CLUSTER_NAME)?; - let path = match self.peer.as_pathname() { Some(p) => p, None => { diff --git a/implementations/rust/ockam/ockam_transport_websocket/src/lib.rs b/implementations/rust/ockam/ockam_transport_websocket/src/lib.rs index 8a129b02726..b35a128d78c 100644 --- a/implementations/rust/ockam/ockam_transport_websocket/src/lib.rs +++ b/implementations/rust/ockam/ockam_transport_websocket/src/lib.rs @@ -101,8 +101,6 @@ mod workers; /// WebSocket address type constant. pub const WS: TransportType = TransportType::new(3); -pub(crate) const CLUSTER_NAME: &str = "_internals.transport.ws"; - fn parse_socket_addr>(s: S) -> Result { Ok(s.as_ref() .parse() diff --git a/implementations/rust/ockam/ockam_transport_websocket/src/router/mod.rs b/implementations/rust/ockam/ockam_transport_websocket/src/router/mod.rs index 59acefd049a..421503f3b49 100644 --- a/implementations/rust/ockam/ockam_transport_websocket/src/router/mod.rs +++ b/implementations/rust/ockam/ockam_transport_websocket/src/router/mod.rs @@ -118,10 +118,6 @@ impl Worker for WebSocketRouter { type Message = Any; type Context = Context; - async fn initialize(&mut self, ctx: &mut Context) -> Result<()> { - ctx.set_cluster(crate::CLUSTER_NAME) - } - async fn handle_message(&mut self, ctx: &mut Context, msg: Routed) -> Result<()> { let return_route = msg.return_route().clone(); let msg_addr = msg.msg_addr(); diff --git a/implementations/rust/ockam/ockam_transport_websocket/src/workers/listener.rs b/implementations/rust/ockam/ockam_transport_websocket/src/workers/listener.rs index 13fdb338446..8a237e0df50 100644 --- a/implementations/rust/ockam/ockam_transport_websocket/src/workers/listener.rs +++ b/implementations/rust/ockam/ockam_transport_websocket/src/workers/listener.rs @@ -48,10 +48,6 @@ impl WebSocketListenProcessor { impl Processor for WebSocketListenProcessor { type Context = Context; - async fn initialize(&mut self, ctx: &mut Context) -> Result<()> { - ctx.set_cluster(crate::CLUSTER_NAME) - } - async fn process(&mut self, ctx: &mut Self::Context) -> Result { debug!("Waiting for incoming TCP connection..."); diff --git a/implementations/rust/ockam/ockam_transport_websocket/src/workers/receiver.rs b/implementations/rust/ockam/ockam_transport_websocket/src/workers/receiver.rs index 6de472569dc..5b9c8253dbd 100644 --- a/implementations/rust/ockam/ockam_transport_websocket/src/workers/receiver.rs +++ b/implementations/rust/ockam/ockam_transport_websocket/src/workers/receiver.rs @@ -45,10 +45,6 @@ where { type Context = Context; - async fn initialize(&mut self, ctx: &mut Context) -> Result<()> { - ctx.set_cluster(crate::CLUSTER_NAME) - } - /// Get next message from the WebSocket stream if there is /// any available, and forward it to the next hop in the route. async fn process(&mut self, ctx: &mut Context) -> Result { diff --git a/implementations/rust/ockam/ockam_transport_websocket/src/workers/sender.rs b/implementations/rust/ockam/ockam_transport_websocket/src/workers/sender.rs index 4005c145d84..ca95b826ed9 100644 --- a/implementations/rust/ockam/ockam_transport_websocket/src/workers/sender.rs +++ b/implementations/rust/ockam/ockam_transport_websocket/src/workers/sender.rs @@ -168,7 +168,6 @@ where return Err(TransportError::GenericIo)?; } - ctx.set_cluster(crate::CLUSTER_NAME)?; self.schedule_heartbeat()?; Ok(()) }