Skip to content

Commit

Permalink
attempt to implement priority shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
SanjoDeundiak committed Dec 30, 2024
1 parent 2de28ae commit 3b8a913
Show file tree
Hide file tree
Showing 39 changed files with 496 additions and 532 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::tcp_interceptor::{Role, TcpMitmProcessor, TcpMitmRegistry, TcpMitmTransport, CLUSTER_NAME};
use crate::tcp_interceptor::{Role, TcpMitmProcessor, TcpMitmRegistry, TcpMitmTransport};
use ockam_core::{async_trait, compat::net::SocketAddr};
use ockam_core::{Address, Processor, Result};
use ockam_node::Context;
Expand Down Expand Up @@ -45,8 +45,6 @@ impl Processor for TcpMitmListenProcessor {
type Context = Context;

async fn initialize(&mut self, ctx: &mut Context) -> Result<()> {
ctx.set_cluster(CLUSTER_NAME)?;

self.registry.add_listener(ctx.primary_address());

Ok(())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::tcp_interceptor::{Role, TcpMitmRegistry, CLUSTER_NAME};
use crate::tcp_interceptor::{Role, TcpMitmRegistry};
use ockam_core::compat::sync::Arc;
use ockam_core::{async_trait, Address, AllowAll};
use ockam_core::{Processor, Result};
Expand Down Expand Up @@ -59,8 +59,6 @@ impl Processor for TcpMitmProcessor {
type Context = Context;

async fn initialize(&mut self, ctx: &mut Context) -> Result<()> {
ctx.set_cluster(CLUSTER_NAME)?;

self.registry
.add_processor(ctx.primary_address(), self.role, self.write_half.clone());

Expand Down
2 changes: 1 addition & 1 deletion implementations/rust/ockam/ockam_core/src/compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ pub mod sync {
#[cfg(feature = "std")]
pub mod sync {
pub use std::sync::{Arc, Weak};
pub use std::sync::{Mutex, RwLock, RwLockWriteGuard};
pub use std::sync::{Mutex, RwLock};
}

/// Provides `std::task` for `no_std` targets.
Expand Down
52 changes: 31 additions & 21 deletions implementations/rust/ockam/ockam_node/src/context/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use core::sync::atomic::AtomicUsize;
use ockam_core::compat::collections::HashMap;
use ockam_core::compat::sync::{Arc, RwLock};
use ockam_core::compat::time::Duration;
use ockam_core::compat::{string::String, vec::Vec};
use ockam_core::compat::vec::Vec;
use ockam_core::flow_control::FlowControls;
#[cfg(feature = "std")]
use ockam_core::OpenTelemetryContext;
Expand All @@ -31,6 +31,36 @@ pub enum ContextMode {
Attached,
}

/// Higher value means the worker is shutdown earlier
#[derive(Debug, Default, Clone, Copy, Eq, PartialEq)]
pub enum WorkerShutdownPriority {
/// 1
Priority1,
/// 2
Priority2,
/// 3
Priority3,
/// 4
#[default]
Priority4,
/// 5
Priority5,
/// 6
Priority6,
/// 7
Priority7,
}

impl WorkerShutdownPriority {
/// All possible values in descending order
pub fn all_descending_order() -> [WorkerShutdownPriority; 7] {
use WorkerShutdownPriority::*;
[
Priority7, Priority6, Priority5, Priority4, Priority3, Priority2, Priority1,
]
}
}

/// Context contains Node state and references to the runtime.
pub struct Context {
pub(super) mailboxes: Mailboxes,
Expand Down Expand Up @@ -121,26 +151,6 @@ impl Context {
}

impl Context {
/// Assign the current worker to a cluster
///
/// A cluster is a set of workers that should be stopped together
/// when the node is stopped or parts of the system are reloaded.
/// **This is not to be confused with supervisors!**
///
/// By adding your worker to a cluster you signal to the runtime
/// that your worker may be depended on by other workers that
/// should be stopped first.
///
/// **Your cluster name MUST NOT start with `_internals.` or
/// `ockam.`!**
///
/// Clusters are de-allocated in reverse order of their
/// initialisation when the node is stopped.
pub fn set_cluster<S: Into<String>>(&self, label: S) -> Result<()> {
self.router()?
.set_cluster(self.primary_address(), label.into())
}

/// Return a list of all available worker addresses on a node
pub fn list_workers(&self) -> Result<Vec<Address>> {
Ok(self.router()?.list_workers())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,13 @@ impl Context {
// Create a new context and get access to the mailbox senders
let (ctx, sender, _) = self.new_with_mailboxes(mailboxes, ContextMode::Detached);

self.router()?
.add_worker(ctx.mailboxes(), sender, true, self.mailbox_count.clone())?;
self.router()?.add_worker(
ctx.mailboxes(),
sender,
true,
Default::default(),
self.mailbox_count.clone(),
)?;

Ok(ctx)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ impl Context {
/// This call will hang until a safe shutdown has been completed
/// or the desired timeout has been reached.
pub async fn stop_timeout(&self, seconds: u8) -> Result<()> {
self.router()?.stop_graceful(seconds).await
self.router()?.stop_graceful(seconds).await?;

// TODO: Would be cool to shutdown the Runtime here with a timeout in case router timed out,
// that would require more transparent ownership over the Runtime, since shutdown is
// consuming the value.

Ok(())
}
}
6 changes: 4 additions & 2 deletions implementations/rust/ockam/ockam_node/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub struct Executor {
impl Executor {
/// Create a new Ockam node [`Executor`] instance
pub fn new(runtime: Arc<Runtime>, flow_controls: &FlowControls) -> Self {
let router = Arc::new(Router::new(runtime.handle().clone(), flow_controls));
let router = Arc::new(Router::new(flow_controls));
#[cfg(feature = "metrics")]
let metrics = Metrics::new(runtime.handle().clone(), router.get_metrics_readout());
Self {
Expand Down Expand Up @@ -110,7 +110,9 @@ impl Executor {
match future.await {
Ok(val) => {
// TODO: Add timeout here
router.wait_termination().await;
debug!("Wait for router termination...");
router.wait_termination().await.unwrap(); // FIXME
debug!("Router terminated successfully!...");
Ok(val)
}
Err(e) => {
Expand Down
8 changes: 7 additions & 1 deletion implementations/rust/ockam/ockam_node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,13 @@ impl NodeBuilder {

// Register this mailbox handle with the executor
router
.add_worker(ctx.mailboxes(), sender, true, ctx.mailbox_count())
.add_worker(
ctx.mailboxes(),
sender,
true,
Default::default(),
ctx.mailbox_count(),
)
.expect("router initialization failed");

// Then return the root context and executor
Expand Down
34 changes: 30 additions & 4 deletions implementations/rust/ockam/ockam_node/src/processor_builder.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{debugger, ContextMode};
use crate::{debugger, ContextMode, WorkerShutdownPriority};
use crate::{relay::ProcessorRelay, Context};
use ockam_core::compat::string::String;
use ockam_core::compat::sync::Arc;
Expand Down Expand Up @@ -72,13 +72,15 @@ where
processor: self.processor,
address: address.into(),
metadata,
shutdown_priority: Default::default(),
}
}

/// Worker with multiple [`Address`]es
pub fn with_mailboxes(self, mailboxes: Mailboxes) -> ProcessorBuilderMultipleAddresses<P> {
ProcessorBuilderMultipleAddresses {
mailboxes,
shutdown_priority: Default::default(),
processor: self.processor,
}
}
Expand All @@ -89,6 +91,7 @@ where
P: Processor<Context = Context>,
{
mailboxes: Mailboxes,
shutdown_priority: WorkerShutdownPriority,
processor: P,
}

Expand All @@ -98,7 +101,18 @@ where
{
/// Consume this builder and start a new Ockam [`Processor`] from the given context
pub async fn start(self, context: &Context) -> Result<()> {
start(context, self.mailboxes, self.processor).await
start(
context,
self.mailboxes,
self.shutdown_priority,
self.processor,
)
.await
}

pub fn with_shutdown_priority(mut self, shutdown_priority: WorkerShutdownPriority) -> Self {
self.shutdown_priority = shutdown_priority;
self
}
}

Expand All @@ -111,6 +125,7 @@ where
address: Address,
processor: P,
metadata: Option<AddressMetadata>,
shutdown_priority: WorkerShutdownPriority,
}

impl<P> ProcessorBuilderOneAddress<P>
Expand Down Expand Up @@ -158,6 +173,7 @@ where
),
vec![],
),
self.shutdown_priority,
self.processor,
)
.await
Expand Down Expand Up @@ -203,10 +219,20 @@ where
self.outgoing_ac = outgoing_access_control.clone();
self
}

pub fn with_shutdown_priority(mut self, shutdown_priority: WorkerShutdownPriority) -> Self {
self.shutdown_priority = shutdown_priority;
self
}
}

/// Consume this builder and start a new Ockam [`Processor`] from the given context
pub async fn start<P>(context: &Context, mailboxes: Mailboxes, processor: P) -> Result<()>
pub async fn start<P>(
context: &Context,
mailboxes: Mailboxes,
shutdown_priority: WorkerShutdownPriority,
processor: P,
) -> Result<()>
where
P: Processor<Context = Context>,
{
Expand All @@ -223,7 +249,7 @@ where
debugger::log_inherit_context("PROCESSOR", context, &ctx);

let router = context.router()?;
router.add_processor(ctx.mailboxes(), sender)?;
router.add_processor(ctx.mailboxes(), sender, shutdown_priority)?;

// Then initialise the processor message relay
ProcessorRelay::<P>::init(context.runtime(), processor, ctx, ctrl_rx);
Expand Down
Loading

0 comments on commit 3b8a913

Please sign in to comment.