Skip to content

Commit

Permalink
feat(rust): improve ockam_node
Browse files Browse the repository at this point in the history
  • Loading branch information
SanjoDeundiak committed Dec 30, 2024
1 parent 58907c3 commit 0572200
Show file tree
Hide file tree
Showing 134 changed files with 1,506 additions and 1,852 deletions.
14 changes: 7 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion examples/rust/get_started/examples/bob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ impl Worker for Echoer {
type Message = String;

async fn handle_message(&mut self, ctx: &mut Context, msg: Routed<String>) -> Result<()> {
println!("\n[✓] Address: {}, Received: {:?}", ctx.address(), msg);
println!("\n[✓] Address: {}, Received: {:?}", ctx.primary_address(), msg);

// Echo the message body back on its return_route.
ctx.send(msg.return_route().clone(), msg.into_body()?).await
Expand Down
2 changes: 1 addition & 1 deletion examples/rust/get_started/src/echoer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ impl Worker for Echoer {
type Message = String;

async fn handle_message(&mut self, ctx: &mut Context, msg: Routed<String>) -> Result<()> {
println!("Address: {}, Received: {:?}", ctx.address(), msg);
println!("Address: {}, Received: {:?}", ctx.primary_address(), msg);

// Echo the message body back on its return_route.
ctx.send(msg.return_route().clone(), msg.into_body()?).await
Expand Down
4 changes: 2 additions & 2 deletions examples/rust/get_started/src/hop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ impl Worker for Hop {
/// This handle function takes any incoming message and forwards
/// it to the next hop in it's onward route
async fn handle_message(&mut self, ctx: &mut Context, msg: Routed<Any>) -> Result<()> {
println!("Address: {}, Received: {:?}", ctx.address(), msg);
println!("Address: {}, Received: {:?}", ctx.primary_address(), msg);

// Send the message to the next worker on its onward_route
ctx.forward(msg.into_local_message().step_forward(&ctx.address())?)
ctx.forward(msg.into_local_message().step_forward(ctx.primary_address())?)
.await
}
}
10 changes: 7 additions & 3 deletions examples/rust/get_started/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@ impl Worker for Logger {
let payload = local_msg.payload_ref();

if let Ok(str) = String::from_utf8(payload.to_vec()) {
println!("Address: {}, Received string: {}", ctx.address(), str);
println!("Address: {}, Received string: {}", ctx.primary_address(), str);
} else {
println!("Address: {}, Received binary: {}", ctx.address(), hex::encode(payload));
println!(
"Address: {}, Received binary: {}",
ctx.primary_address(),
hex::encode(payload)
);
}

ctx.forward(local_msg.step_forward(&ctx.address())?).await
ctx.forward(local_msg.step_forward(ctx.primary_address())?).await
}
}
2 changes: 1 addition & 1 deletion examples/rust/get_started/src/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl Worker for Relay {
/// This handle function takes any incoming message and forwards
/// it to the next hop in it's onward route
async fn handle_message(&mut self, ctx: &mut Context, msg: Routed<Any>) -> Result<()> {
println!("Address: {}, Received: {:?}", ctx.address(), msg);
println!("Address: {}, Received: {:?}", ctx.primary_address(), msg);

let next_on_route = self.route.next()?.clone();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl TcpMitmTransport {
}

/// Interrupt an active TCP listener given its `Address`
pub async fn stop_listener(&self, address: &Address) -> Result<()> {
self.ctx.stop_processor(address.clone()).await
pub fn stop_listener(&self, address: &Address) -> Result<()> {
self.ctx.stop_address(address.clone())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,15 @@ impl Processor for TcpMitmListenProcessor {
type Context = Context;

async fn initialize(&mut self, ctx: &mut Context) -> Result<()> {
ctx.set_cluster(CLUSTER_NAME).await?;
ctx.set_cluster(CLUSTER_NAME)?;

self.registry.add_listener(&ctx.address());
self.registry.add_listener(ctx.primary_address());

Ok(())
}

async fn shutdown(&mut self, ctx: &mut Self::Context) -> Result<()> {
self.registry.remove_listener(&ctx.address());
self.registry.remove_listener(ctx.primary_address());

Ok(())
}
Expand Down
20 changes: 10 additions & 10 deletions examples/rust/mitm_node/src/tcp_interceptor/workers/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,20 +59,20 @@ impl Processor for TcpMitmProcessor {
type Context = Context;

async fn initialize(&mut self, ctx: &mut Context) -> Result<()> {
ctx.set_cluster(CLUSTER_NAME).await?;
ctx.set_cluster(CLUSTER_NAME)?;

self.registry
.add_processor(&ctx.address(), self.role, self.write_half.clone());
.add_processor(ctx.primary_address(), self.role, self.write_half.clone());

debug!("Initialize {}", ctx.address());
debug!("Initialize {}", ctx.primary_address());

Ok(())
}

async fn shutdown(&mut self, ctx: &mut Self::Context) -> Result<()> {
self.registry.remove_processor(&ctx.address());
self.registry.remove_processor(ctx.primary_address());

debug!("Shutdown {}", ctx.address());
debug!("Shutdown {}", ctx.primary_address());

Ok(())
}
Expand All @@ -83,22 +83,22 @@ impl Processor for TcpMitmProcessor {
let len = match self.read_half.read(&mut buf).await {
Ok(l) if l != 0 => l,
_ => {
info!("Connection was closed; dropping stream {}", ctx.address());
info!("Connection was closed; dropping stream {}", ctx.primary_address());

let _ = ctx.stop_processor(self.address_of_other_processor.clone()).await;
let _ = ctx.stop_address(self.address_of_other_processor.clone());

return Ok(false);
}
};

match self.write_half.lock().await.write_all(&buf[..len]).await {
Ok(_) => {
debug!("Forwarded {} bytes from {}", len, ctx.address());
debug!("Forwarded {} bytes from {}", len, ctx.primary_address());
}
_ => {
debug!("Connection was closed; dropping stream {}", ctx.address());
debug!("Connection was closed; dropping stream {}", ctx.primary_address());

let _ = ctx.stop_processor(self.address_of_other_processor.clone()).await;
let _ = ctx.stop_address(self.address_of_other_processor.clone());

return Ok(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl Worker for Relay {
ctx.forward(
LocalMessage::new()
.with_onward_route(self.forward_route.clone())
.with_return_route(route![ctx.address()])
.with_return_route(route![ctx.primary_address()])
.with_payload(payload),
)
.await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ impl RelayService {
options.setup_flow_control_for_relay_service(ctx.flow_controls(), alias);
additional_mailboxes.push(Mailbox::new(
alias.clone(),
None,
options.service_incoming_access_control.clone(),
Arc::new(DenyAll),
));
Expand All @@ -45,6 +46,7 @@ impl RelayService {
.with_mailboxes(Mailboxes::new(
Mailbox::new(
address.clone(),
None,
service_incoming_access_control,
Arc::new(DenyAll),
),
Expand Down
6 changes: 4 additions & 2 deletions implementations/rust/ockam/ockam/src/remote/lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ impl RemoteRelay {
) -> Mailboxes {
let main_internal = Mailbox::new(
addresses.main_internal,
None,
Arc::new(DenyAll),
outgoing_access_control,
);

let main_remote = Mailbox::new(
addresses.main_remote,
None,
Arc::new(AllowAll),
Arc::new(AllowAll),
);
Expand Down Expand Up @@ -72,7 +74,7 @@ impl RemoteRelay {
let addresses = Addresses::generate(RelayType::Static);

let mut callback_ctx = ctx
.new_detached_with_mailboxes(Mailboxes::main(
.new_detached_with_mailboxes(Mailboxes::primary(
addresses.completion_callback.clone(),
Arc::new(AllowSourceAddress(addresses.main_remote.clone())),
Arc::new(DenyAll),
Expand Down Expand Up @@ -117,7 +119,7 @@ impl RemoteRelay {
let addresses = Addresses::generate(RelayType::Ephemeral);

let mut callback_ctx = ctx
.new_detached_with_mailboxes(Mailboxes::main(
.new_detached_with_mailboxes(Mailboxes::primary(
addresses.completion_callback.clone(),
Arc::new(AllowSourceAddress(addresses.main_remote.clone())),
Arc::new(DenyAll),
Expand Down
11 changes: 5 additions & 6 deletions implementations/rust/ockam/ockam_abac/src/abac/abac.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,20 +59,19 @@ impl Abac {
}

impl Abac {
pub async fn get_outgoing_identifier(
pub fn get_outgoing_identifier(
ctx: &Context,
relay_msg: &RelayMessage,
) -> Result<Option<Identifier>> {
let terminal = if let Some(terminal) = ctx
.find_terminal_address(relay_msg.onward_route().clone())
.await?
let metadata = if let Some((_address, metadata)) =
ctx.find_terminal_address(relay_msg.onward_route().iter())?
{
terminal
metadata
} else {
return Ok(None);
};

if let Ok(metadata) = SecureChannelMetadata::from_terminal_address(&terminal) {
if let Ok(metadata) = SecureChannelMetadata::from_terminal_address_metadata(&metadata) {
Ok(Some(metadata.their_identifier().into()))
} else {
Ok(None)
Expand Down
2 changes: 1 addition & 1 deletion implementations/rust/ockam/ockam_abac/src/abac/outgoing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl OutgoingAbac {

/// Returns true if the sender of the message is validated by the expression stored in AbacAccessControl
pub async fn is_authorized_impl(&self, relay_msg: &RelayMessage) -> Result<bool> {
let identifier = match Abac::get_outgoing_identifier(&self.ctx, relay_msg).await? {
let identifier = match Abac::get_outgoing_identifier(&self.ctx, relay_msg)? {
Some(identifier) => identifier,
None => {
debug! {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl OutgoingAccessControl for OutgoingPolicyAccessControl {
return Ok(false);
};

let identifier = match Abac::get_outgoing_identifier(&self.ctx, relay_msg).await? {
let identifier = match Abac::get_outgoing_identifier(&self.ctx, relay_msg)? {
Some(identifier) => identifier,
None => {
debug! {
Expand Down
7 changes: 5 additions & 2 deletions implementations/rust/ockam/ockam_api/src/hop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ impl Worker for Hop {
/// it to the next hop in it's onward route
async fn handle_message(&mut self, ctx: &mut Context, msg: Routed<Any>) -> Result<()> {
// Send the message on its onward_route
ctx.forward(msg.into_local_message().step_forward(&ctx.address())?)
.await
ctx.forward(
msg.into_local_message()
.step_forward(ctx.primary_address())?,
)
.await
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl TokenLeaseRefresher {
lease_issuer_route: MultiAddr,
) -> Result<TokenLeaseRefresher, Error> {
let token = Arc::new(RwLock::new(None));
let mailboxes = Mailboxes::main(
let mailboxes = Mailboxes::primary(
Address::random_tagged("LeaseRetriever"),
Arc::new(DenyAll),
Arc::new(AllowAll),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,8 @@ impl InMemoryNode {
match self.registry.influxdb_services.get(&address).await {
None => Ok(None),
Some(_) => {
context.stop_worker(address.clone()).await?;
context
.stop_processor(format!("{address}-processor"))
.await?;
context.stop_address(address.clone())?;
context.stop_address(format!("{address}-processor"))?;
self.registry.influxdb_services.remove(&address).await;
Ok(Some(()))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ async fn producer__flow_with_mock_kafka__content_encryption_and_decryption(
.await;
drop(consumer_mock_kafka);
// drop the outlet and re-create it when we need it later
context.stop_worker("kafka_consumer_outlet").await?;
context.stop_address("kafka_consumer_outlet")?;
}

let mut producer_mock_kafka = TcpServerSimulator::start("127.0.0.1:0").await;
Expand Down
Loading

0 comments on commit 0572200

Please sign in to comment.