diff --git a/Cargo.lock b/Cargo.lock index 2e01782417b..dd199bc2992 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3968,6 +3968,7 @@ dependencies = [ name = "low_latency_portal" version = "0.1.0" dependencies = [ + "async-trait", "hex", "log", "ockam", diff --git a/examples/rust/low_latency_portal/Cargo.toml b/examples/rust/low_latency_portal/Cargo.toml index e69475838ac..1512d550a19 100644 --- a/examples/rust/low_latency_portal/Cargo.toml +++ b/examples/rust/low_latency_portal/Cargo.toml @@ -9,6 +9,7 @@ rust-version = "1.70.0" [dependencies] ockam = { path = "../../../implementations/rust/ockam/ockam", features = ["aws-lc"] } +async-trait = "0.1" serde = { version = "1.0.215", features = ["derive"] } serde_json = "1.0" hex = "0.4.3" diff --git a/examples/rust/low_latency_portal/examples/inlet-node.rs b/examples/rust/low_latency_portal/examples/inlet-node.rs index 7bf535eaa05..f9fcaf93367 100644 --- a/examples/rust/low_latency_portal/examples/inlet-node.rs +++ b/examples/rust/low_latency_portal/examples/inlet-node.rs @@ -1,17 +1,22 @@ use log::info; -use low_latency_portal::{parse, InletConfig}; +use low_latency_portal::{parse, HashMapRepository, InletConfig}; +use ockam::compat::str::FromStr; +use ockam::compat::sync::Arc; use ockam::identity::models::ChangeHistory; -use ockam::identity::{Identifier, SecureChannelOptions, SecureChannels, TrustIdentifierPolicy, Vault}; +use ockam::identity::{ + Identifier, Identities, SecureChannelOptions, SecureChannelRegistry, SecureChannels, TrustIdentifierPolicy, Vault, +}; use ockam::tcp::{TcpConnectionOptions, TcpInletOptions, TcpTransport}; -use ockam::vault::{EdDSACurve25519SecretKey, SigningSecret, SoftwareVaultForSigning}; +use ockam::vault::{ + EdDSACurve25519SecretKey, SigningSecret, SoftwareVaultForSecureChannels, SoftwareVaultForSigning, + SoftwareVaultForVerifyingSignatures, +}; use ockam::{route, Context, Result}; -use std::str::FromStr; #[ockam::node] async fn main(ctx: Context) -> Result<()> { let args: Vec = std::env::args().collect(); - info!("A"); let config = if let Some(config) = args.get(1) { config.clone() } else { @@ -19,43 +24,41 @@ async fn main(ctx: Context) -> Result<()> { String::from_utf8(config).unwrap() }; - info!("B"); let config: InletConfig = parse(&config)?; - info!("C"); let outlet_identifier = Identifier::from_str(&config.outlet_identifier)?; let inlet_identity_key = hex::decode(config.inlet_identity_key).unwrap(); let inlet_change_history = ChangeHistory::import_from_string(&config.inlet_change_history)?; - info!("D"); + let hash_map_storage = Arc::new(HashMapRepository::default()); let tcp = TcpTransport::create(&ctx).await?; - info!("E"); - - let identity_vault = SoftwareVaultForSigning::create().await?; // FIXME: 16ms - - info!("F"); + let identity_vault = Arc::new(SoftwareVaultForSigning::new(hash_map_storage.clone())); + let secure_channel_vault = Arc::new(SoftwareVaultForSecureChannels::new(hash_map_storage.clone())); + let credential_vault = Arc::new(SoftwareVaultForSigning::new(hash_map_storage.clone())); + let verifying_vault = Arc::new(SoftwareVaultForVerifyingSignatures::new()); let relay_identity_key = EdDSACurve25519SecretKey::new(inlet_identity_key.try_into().unwrap()); - info!("G"); let relay_identity_key = SigningSecret::EdDSACurve25519(relay_identity_key); - info!("H"); - identity_vault.import_key(relay_identity_key).await?; - info!("J"); - - let mut vault = Vault::create().await?; // FIXME: 25ms - vault.identity_vault = identity_vault; - - info!("K"); - - let secure_channels = SecureChannels::builder().await?.with_vault(vault).build(); // FIXME: 16 ms - - info!("L"); + let vault = Vault::new(identity_vault, secure_channel_vault, credential_vault, verifying_vault); + + let identities = Identities::new( + vault, + hash_map_storage.clone(), + hash_map_storage.clone(), + hash_map_storage.clone(), + hash_map_storage.clone(), + ); + let secure_channels = SecureChannels::new( + Arc::new(identities), + SecureChannelRegistry::new(), + hash_map_storage.clone(), + ); let inlet_identifier = secure_channels .identities() @@ -63,14 +66,10 @@ async fn main(ctx: Context) -> Result<()> { .import_from_change_history(None, inlet_change_history) .await?; - info!("M"); - let tcp_connection_to_relay = tcp .connect(config.relay_address.to_string(), TcpConnectionOptions::new()) .await?; - info!("N"); - let secure_channel_to_outlet = secure_channels .create_secure_channel( &ctx, @@ -84,8 +83,6 @@ async fn main(ctx: Context) -> Result<()> { ) .await?; - info!("O"); - tcp.create_inlet( config.inlet_address.to_string(), route![secure_channel_to_outlet, "outlet"], @@ -93,13 +90,9 @@ async fn main(ctx: Context) -> Result<()> { ) .await?; - info!("P"); - info!("Initialized successfully"); ctx.stop().await?; - info!("Q"); - Ok(()) } diff --git a/examples/rust/low_latency_portal/src/hash_map_repository.rs b/examples/rust/low_latency_portal/src/hash_map_repository.rs new file mode 100644 index 00000000000..6e3e41f2553 --- /dev/null +++ b/examples/rust/low_latency_portal/src/hash_map_repository.rs @@ -0,0 +1,314 @@ +use async_trait::async_trait; +use ockam::compat::asynchronous::RwLock as AsyncRwLock; +use ockam::compat::collections::HashMap; +use ockam::compat::sync::RwLock as SyncRwLock; +use ockam::identity::models::{ChangeHistory, CredentialAndPurposeKey, PurposeKeyAttestation}; +use ockam::identity::storage::PurposeKeysRepository; +use ockam::identity::{ + AttributesEntry, ChangeHistoryRepository, CredentialRepository, Identifier, Identity, IdentityAttributesRepository, + IdentityError, IdentityHistoryComparison, PersistedSecureChannel, Purpose, SecureChannelRepository, + TimestampInSeconds, Vault, +}; +use ockam::vault::storage::SecretsRepository; +use ockam::vault::{ + AeadSecret, AeadSecretKeyHandle, SigningSecret, SigningSecretKeyHandle, X25519SecretKey, X25519SecretKeyHandle, +}; +use ockam::{Address, Result}; + +#[derive(Default)] +pub struct HashMapRepository { + signing_secrets: SyncRwLock>, + x25519_secrets: SyncRwLock>, + aead_secrets: SyncRwLock>, + + secure_channels: SyncRwLock>, + + identities: AsyncRwLock>, + + #[allow(clippy::type_complexity)] + attributes: SyncRwLock, AttributesEntry)>>>, + + purpose_keys: SyncRwLock>>, + + #[allow(clippy::type_complexity)] + credentials: + SyncRwLock>>, +} + +#[async_trait] +impl SecretsRepository for HashMapRepository { + async fn store_signing_secret(&self, handle: &SigningSecretKeyHandle, secret: SigningSecret) -> Result<()> { + self.signing_secrets.write().unwrap().insert(handle.clone(), secret); + + Ok(()) + } + + async fn delete_signing_secret(&self, handle: &SigningSecretKeyHandle) -> Result { + Ok(self.signing_secrets.write().unwrap().remove(handle).is_some()) + } + + async fn get_signing_secret(&self, handle: &SigningSecretKeyHandle) -> Result> { + Ok(self.signing_secrets.read().unwrap().get(handle).cloned()) + } + + async fn get_signing_secret_handles(&self) -> Result> { + Ok(self.signing_secrets.read().unwrap().keys().cloned().collect()) + } + + async fn store_x25519_secret(&self, handle: &X25519SecretKeyHandle, secret: X25519SecretKey) -> Result<()> { + self.x25519_secrets.write().unwrap().insert(handle.clone(), secret); + + Ok(()) + } + + async fn delete_x25519_secret(&self, handle: &X25519SecretKeyHandle) -> Result { + Ok(self.x25519_secrets.write().unwrap().remove(handle).is_some()) + } + + async fn get_x25519_secret(&self, handle: &X25519SecretKeyHandle) -> Result> { + Ok(self.x25519_secrets.read().unwrap().get(handle).cloned()) + } + + async fn get_x25519_secret_handles(&self) -> Result> { + Ok(self.x25519_secrets.read().unwrap().keys().cloned().collect()) + } + + async fn store_aead_secret(&self, handle: &AeadSecretKeyHandle, secret: AeadSecret) -> Result<()> { + self.aead_secrets.write().unwrap().insert(handle.clone(), secret); + + Ok(()) + } + + async fn delete_aead_secret(&self, handle: &AeadSecretKeyHandle) -> Result { + Ok(self.aead_secrets.write().unwrap().remove(handle).is_some()) + } + + async fn get_aead_secret(&self, handle: &AeadSecretKeyHandle) -> Result> { + Ok(self.aead_secrets.read().unwrap().get(handle).cloned()) + } + + async fn delete_all(&self) -> Result<()> { + self.signing_secrets.write().unwrap().clear(); + self.x25519_secrets.write().unwrap().clear(); + self.aead_secrets.write().unwrap().clear(); + + Ok(()) + } +} + +#[async_trait] +impl SecureChannelRepository for HashMapRepository { + async fn get(&self, decryptor_remote_address: &Address) -> Result> { + Ok(self + .secure_channels + .read() + .unwrap() + .get(decryptor_remote_address) + .cloned()) + } + + async fn put(&self, secure_channel: PersistedSecureChannel) -> Result<()> { + self.secure_channels + .write() + .unwrap() + .insert(secure_channel.decryptor_remote().clone(), secure_channel); + + Ok(()) + } + + async fn delete(&self, decryptor_remote_address: &Address) -> Result<()> { + self.secure_channels.write().unwrap().remove(decryptor_remote_address); + + Ok(()) + } +} + +#[async_trait] +impl ChangeHistoryRepository for HashMapRepository { + async fn update_identity(&self, identity: &Identity, ignore_older: bool) -> Result<()> { + let mut identities = self.identities.write().await; + + let do_insert = match identities.get(identity.identifier()) { + Some(existing_identity) => { + let known_identity = Identity::import_from_change_history( + Some(identity.identifier()), + existing_identity.clone(), + Vault::create_verifying_vault(), + ) + .await?; + + match identity.compare(&known_identity) { + IdentityHistoryComparison::Conflict => { + return Err(IdentityError::ConsistencyError)?; + } + IdentityHistoryComparison::Older => { + if ignore_older { + false + } else { + return Err(IdentityError::ConsistencyError)?; + } + } + + IdentityHistoryComparison::Newer => true, + IdentityHistoryComparison::Equal => false, + } + } + None => true, + }; + + if do_insert { + identities.insert(identity.identifier().clone(), identity.change_history().clone()); + } + + Ok(()) + } + + async fn store_change_history(&self, identifier: &Identifier, change_history: ChangeHistory) -> Result<()> { + self.identities.write().await.insert(identifier.clone(), change_history); + + Ok(()) + } + + async fn delete_change_history(&self, identifier: &Identifier) -> Result<()> { + self.identities.write().await.remove(identifier); + + Ok(()) + } + + async fn get_change_history(&self, identifier: &Identifier) -> Result> { + Ok(self.identities.read().await.get(identifier).cloned()) + } + + async fn get_change_histories(&self) -> Result> { + Ok(self.identities.read().await.values().cloned().collect()) + } +} + +#[async_trait] +impl IdentityAttributesRepository for HashMapRepository { + async fn get_attributes(&self, subject: &Identifier, attested_by: &Identifier) -> Result> { + Ok(self.attributes.read().unwrap().get(subject).and_then(|attrs| { + attrs.iter().find_map(|(e_identifier, e_entry)| { + if e_identifier.as_ref() == Some(attested_by) { + Some(e_entry.clone()) + } else { + None + } + }) + })) + } + + async fn put_attributes(&self, subject: &Identifier, entry: AttributesEntry) -> Result<()> { + self.attributes + .write() + .unwrap() + .entry(subject.clone()) + .or_default() + .push((entry.attested_by(), entry)); + + Ok(()) + } + + async fn delete_expired_attributes(&self, now: TimestampInSeconds) -> Result<()> { + for value in self.attributes.write().unwrap().values_mut() { + _ = value.retain(|entry| entry.1.expires_at() > Some(now)); + } + + Ok(()) + } +} + +#[async_trait] +impl PurposeKeysRepository for HashMapRepository { + async fn set_purpose_key( + &self, + subject: &Identifier, + purpose: Purpose, + purpose_key_attestation: &PurposeKeyAttestation, + ) -> Result<()> { + self.purpose_keys + .write() + .unwrap() + .entry(subject.clone()) + .or_default() + .push((purpose, purpose_key_attestation.clone())); + + Ok(()) + } + + async fn delete_purpose_key(&self, subject: &Identifier, _purpose: Purpose) -> Result<()> { + self.purpose_keys.write().unwrap().remove(subject); + + Ok(()) + } + + async fn get_purpose_key( + &self, + identifier: &Identifier, + purpose: Purpose, + ) -> Result> { + Ok(self.purpose_keys.read().unwrap().get(identifier).and_then(|e| { + e.iter().find_map(|(e_purpose, attestation)| { + if e_purpose == &purpose { + Some(attestation.clone()) + } else { + None + } + }) + })) + } + + async fn delete_all(&self) -> Result<()> { + self.purpose_keys.write().unwrap().clear(); + + Ok(()) + } +} + +#[async_trait] +impl CredentialRepository for HashMapRepository { + async fn get( + &self, + subject: &Identifier, + issuer: &Identifier, + scope: &str, + ) -> Result> { + if let Some(e) = self.credentials.read().unwrap().get(subject) { + return Ok(e.iter().find_map(|(e_issuer, e_scope, _expires, cred)| { + if e_issuer == issuer && e_scope == scope { + Some(cred.clone()) + } else { + None + } + })); + } + + Ok(None) + } + + async fn put( + &self, + subject: &Identifier, + issuer: &Identifier, + scope: &str, + expires_at: TimestampInSeconds, + credential: CredentialAndPurposeKey, + ) -> Result<()> { + self.credentials + .write() + .unwrap() + .entry(subject.clone()) + .or_default() + .push((issuer.clone(), scope.to_string(), expires_at, credential.clone())); + + Ok(()) + } + + async fn delete(&self, subject: &Identifier, issuer: &Identifier, scope: &str) -> Result<()> { + if let Some(e) = self.credentials.write().unwrap().get_mut(subject) { + e.retain(|(e_issuer, e_scope, _e_expires, _e_cred)| e_issuer != issuer || e_scope != scope); + } + + Ok(()) + } +} diff --git a/examples/rust/low_latency_portal/src/lib.rs b/examples/rust/low_latency_portal/src/lib.rs index f93e0db7574..02faaafff4e 100644 --- a/examples/rust/low_latency_portal/src/lib.rs +++ b/examples/rust/low_latency_portal/src/lib.rs @@ -1,3 +1,5 @@ mod configs; +mod hash_map_repository; pub use configs::*; +pub use hash_map_repository::*;