From 2b9606b244247e6e3ed06329c52fa1ed6ed8767e Mon Sep 17 00:00:00 2001 From: smehnov Date: Fri, 4 Oct 2024 23:28:34 +1100 Subject: [PATCH] add naive online status + change store code structure --- .gitignore | 1 + src/commands/docker.rs | 10 +- src/commands/mod.rs | 16 +- src/core/core.rs | 43 ++- src/external_api/messages.rs | 13 +- src/external_api/unix_socket.rs | 5 +- src/external_api/web_socket.rs | 2 +- src/main.rs | 16 +- src/node/node.rs | 14 +- src/store.rs | 633 -------------------------------- src/store/config.rs | 58 +++ src/store/job_manager.rs | 131 +++++++ src/store/key_manager.rs | 77 ++++ src/store/messages.rs | 126 +++++++ src/store/mod.rs | 6 + src/store/network_manager.rs | 44 +++ src/store/robot_manager.rs | 267 ++++++++++++++ 17 files changed, 786 insertions(+), 676 deletions(-) delete mode 100644 src/store.rs create mode 100644 src/store/config.rs create mode 100644 src/store/job_manager.rs create mode 100644 src/store/key_manager.rs create mode 100644 src/store/messages.rs create mode 100644 src/store/mod.rs create mode 100644 src/store/network_manager.rs create mode 100644 src/store/robot_manager.rs diff --git a/.gitignore b/.gitignore index 93aee8b..b767ddf 100644 --- a/.gitignore +++ b/.gitignore @@ -6,4 +6,5 @@ debug/ .DS_Store *.key script.py +scripts diff --git a/src/commands/docker.rs b/src/commands/docker.rs index 3b400b4..9666c4d 100644 --- a/src/commands/docker.rs +++ b/src/commands/docker.rs @@ -12,7 +12,7 @@ use base64::{engine::general_purpose, Engine as _}; use crate::{ commands::{RobotJob, RobotJobResult}, - store::Jobs, + store::job_manager::Jobs, utils::files::create_job_data_dir, }; @@ -185,14 +185,14 @@ impl DockerLaunch { loop { let channel_message = channel_to_job_rx.recv().await.unwrap(); match channel_message { - crate::store::ChannelMessageToJob::TerminalMessage( + crate::store::messages::ChannelMessageToJob::TerminalMessage( data, ) => { for byte in data.as_bytes().iter() { input.write_all(&[*byte]).await.ok(); } } - crate::store::ChannelMessageToJob::ArchiveMessage { + crate::store::messages::ChannelMessageToJob::ArchiveMessage { encoded_tar, path, } => { @@ -226,7 +226,7 @@ impl DockerLaunch { info!("Error while decoded tar"); } } - crate::store::ChannelMessageToJob::ArchiveRequest { + crate::store::messages::ChannelMessageToJob::ArchiveRequest { .. } => {} } @@ -249,7 +249,7 @@ impl DockerLaunch { while let Some(Ok(output)) = output.next().await { let job_manager = shared_jobs.lock().unwrap(); if let Some(tx) = job_manager.get_channel_from_job(&robot_job.id) { - tx.send(crate::store::ChannelMessageFromJob::TerminalMessage( + tx.send(crate::store::messages::ChannelMessageFromJob::TerminalMessage( output.to_string(), )) .unwrap(); diff --git a/src/commands/mod.rs b/src/commands/mod.rs index f6dad5d..9dcb3cd 100644 --- a/src/commands/mod.rs +++ b/src/commands/mod.rs @@ -7,8 +7,8 @@ use tokio::sync::broadcast::Sender; use tracing::{error, info}; use crate::store; -use crate::store::ChannelMessageFromJob; -use crate::store::Message; +use crate::store::messages::{Message, ChannelMessageFromJob}; +use crate::store::job_manager::Jobs; mod docker; @@ -60,7 +60,7 @@ pub enum TunnnelClient { }, } -pub async fn launch_new_job(robot_job: RobotJob, jobs: &store::Jobs) { +pub async fn launch_new_job(robot_job: RobotJob, jobs: &Jobs) { info!("{:?}", robot_job); let jobs = Arc::clone(jobs); let mut job_manager = jobs.lock().unwrap(); @@ -100,7 +100,7 @@ pub async fn start_tunnel_messanger( info!("sending stdout: {:?}", stdout); let _ = from_robot_tx.send( serde_json::to_string(&Message::new( - store::MessageContent::TunnelResponseMessage { + store::messages::MessageContent::TunnelResponseMessage { job_id: job_id.clone(), message: ChannelMessageFromJob::TerminalMessage(stdout), }, @@ -125,7 +125,7 @@ pub async fn start_tunnel_messanger( } } -pub async fn start_tunnel(tunnel_client: TunnnelClient, job_id: String, jobs: &store::Jobs) { +pub async fn start_tunnel(tunnel_client: TunnnelClient, job_id: String, jobs: &Jobs) { info!("Start tunnel request"); let jobs = Arc::clone(jobs); let mut job_manager = jobs.lock().unwrap(); @@ -158,7 +158,7 @@ pub async fn start_tunnel(tunnel_client: TunnnelClient, job_id: String, jobs: &s } } -pub async fn message_to_robot(message: MessageToRobot, jobs: &store::Jobs) { +pub async fn message_to_robot(message: MessageToRobot, jobs: &Jobs) { info!("Message to robot request"); info!("Message to robot: {:?}", message); @@ -171,12 +171,12 @@ pub async fn message_to_robot(message: MessageToRobot, jobs: &store::Jobs) { match content { MessageContent::Terminal { stdin } => { channel - .send(store::ChannelMessageToJob::TerminalMessage(stdin)) + .send(store::messages::ChannelMessageToJob::TerminalMessage(stdin)) .unwrap(); } MessageContent::Archive { dest_path, data } => { channel - .send(store::ChannelMessageToJob::ArchiveMessage { + .send(store::messages::ChannelMessageToJob::ArchiveMessage { encoded_tar: data, path: dest_path, }) diff --git a/src/core/core.rs b/src/core/core.rs index b0fd481..ab6a23d 100644 --- a/src/core/core.rs +++ b/src/core/core.rs @@ -1,19 +1,17 @@ -use crate::store::Message; -use crate::store::MessageContent; -use crate::store::MessageRequest; -use crate::store::MessageResponse; -use crate::store::RobotRole; -use crate::store::SignedMessage; +use crate::store::messages::{Message, MessageContent, MessageRequest, MessageResponse, SignedMessage}; +use crate::store::robot_manager::{RobotRole, Robots}; +use crate::store::job_manager::{JobManager, Jobs}; use crate::commands; use crate::cli::Args; -use crate::store::{JobManager, Jobs, Robots}; use std::sync::{Arc, Mutex}; use tokio::select; use tokio::sync::broadcast; use tokio::task::JoinHandle; use tracing::{error, info}; +use tokio::time::{interval, Duration}; + use std::error::Error; @@ -27,6 +25,8 @@ pub async fn main_normal( let mut to_message_rx = to_message_tx.subscribe(); + let mut handshake_timer = interval(Duration::from_secs(30)); + loop { select! { msg = to_message_rx.recv()=>match msg{ @@ -39,6 +39,7 @@ pub async fn main_normal( let robot_manager = robots.lock().unwrap(); if message.to.unwrap_or("".to_string()) == robot_manager.self_peer_id || matches!(message.content, MessageContent::UpdateConfig { .. }) + { let role = robot_manager.get_role(signed_message.public_key); info!("role: {:?}", role); @@ -49,6 +50,9 @@ pub async fn main_normal( } } } + if matches!(message.content, MessageContent::Handshake { }){ + should_process = true; + } info!("should process {}", should_process); if should_process{ @@ -116,6 +120,17 @@ pub async fn main_normal( ))?); } }, + MessageContent::Handshake{}=>{ + let identity = + libp2p::identity::ed25519::PublicKey::try_from_bytes(&signed_message.public_key)?; + let public_key: libp2p::identity::PublicKey = identity.into(); + let peer_id = public_key.to_peer_id(); + let from = peer_id.to_base58(); + info!("Got handshake from {}", from); + let mut robot_manager = robots.lock().unwrap(); + robot_manager.network_manager.process_handshake(from); + + } _=>{} } } @@ -124,6 +139,20 @@ pub async fn main_normal( error!("error while socket receiving libp2p message"); } }, + _ = handshake_timer.tick()=>{ + + let message_content = MessageContent::Handshake { }; + let _ = from_message_tx.send(serde_json::to_string(&Message::new( + message_content, + "".to_string(), + None + ))?); + + let mut robot_manager = robots.lock().unwrap(); + robot_manager.network_manager.clean_old_handshakes(); + + + } } } } diff --git a/src/external_api/messages.rs b/src/external_api/messages.rs index 52f83b1..3325896 100644 --- a/src/external_api/messages.rs +++ b/src/external_api/messages.rs @@ -1,7 +1,5 @@ -use crate::store::Message; -use crate::store::MessageContent; -use crate::store::Robots; -use crate::store::SignedMessage; +use crate::store::messages::{Message, MessageContent, SignedMessage}; +use crate::store::robot_manager::Robots; use base64::{engine::general_purpose, Engine as _}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -95,6 +93,13 @@ pub fn process_command( answer = Some("{\"ok\":false}".to_string()); } } + "/network_info"=>{ + info!("/network_info request"); + let robots_manager = robots.lock().unwrap(); + let network_info_text = serde_json::to_string(&robots_manager.network_manager.peers_info)?; + info!("{}", network_info_text); + answer = Some(network_info_text) + } "/send_message" => { if let Some(message_content) = command.content { let _ = from_message_tx.send(serde_json::to_string(&Message::new( diff --git a/src/external_api/unix_socket.rs b/src/external_api/unix_socket.rs index 2178eee..ba35228 100644 --- a/src/external_api/unix_socket.rs +++ b/src/external_api/unix_socket.rs @@ -1,8 +1,7 @@ use super::messages::{process_command, MessageQueue, SocketCommand}; use crate::cli::Args; -use crate::store::Message; -use crate::store::Robots; -use crate::store::SignedMessage; +use crate::store::messages::{Message, SignedMessage}; +use crate::store::robot_manager::Robots; use std::error::Error; use std::sync::{Arc, Mutex}; use tokio::net::{UnixListener, UnixStream}; diff --git a/src/external_api/web_socket.rs b/src/external_api/web_socket.rs index bff8f8c..3e918f9 100644 --- a/src/external_api/web_socket.rs +++ b/src/external_api/web_socket.rs @@ -1,5 +1,5 @@ use crate::cli::Args; -use crate::store::Robots; +use crate::store::robot_manager::Robots; use futures::SinkExt; use std::error::Error; use std::sync::{Arc, Mutex}; diff --git a/src/main.rs b/src/main.rs index fa3303e..3cb6b72 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,6 +11,8 @@ use tracing_subscriber::FmtSubscriber; use std::sync::{Arc, Mutex}; use tokio::sync::broadcast; +use store::key_manager::KeyConfig; +use store::robot_manager::{RobotsManager, Robots}; pub mod cli; pub mod commands; @@ -20,8 +22,8 @@ pub mod node; pub mod store; pub mod utils; -pub fn generate_key_file(key_filename: String) -> store::Config { - let config = store::Config::generate(); +pub fn generate_key_file(key_filename: String) -> store::key_manager::KeyConfig { + let config = store::key_manager::KeyConfig::generate(); let _ = config.save_to_file(key_filename.clone()); info!("Generated new key and saved it to {}", key_filename); @@ -40,13 +42,13 @@ async fn main() -> Result<(), Box> { } } - let mut config: store::Config = store::Config::generate(); + let mut config: KeyConfig = KeyConfig::generate(); match args.clone().secret_key { Some(secret_key_string) => { - config = store::Config::load_from_sk_string(secret_key_string)?; + config = KeyConfig::load_from_sk_string(secret_key_string)?; } None => { - let config_load_res = store::Config::load_from_file(args.key_filename.clone()); + let config_load_res = KeyConfig::load_from_file(args.key_filename.clone()); match config_load_res { Ok(loaded_config) => { config = loaded_config; @@ -61,7 +63,7 @@ async fn main() -> Result<(), Box> { error!("Can't write key to file {:?}", err); } - let mut robot_manager = store::RobotsManager { + let mut robot_manager = RobotsManager { self_peer_id: config.get_peer_id(), ..Default::default() }; @@ -87,7 +89,7 @@ async fn main() -> Result<(), Box> { _ => {} } - let robots: store::Robots = Arc::new(Mutex::new(robot_manager)); + let robots: Robots = Arc::new(Mutex::new(robot_manager)); let libp2p_port = args.port_libp2p.parse::().unwrap(); config.set_libp2p_port(libp2p_port); diff --git a/src/node/node.rs b/src/node/node.rs index 88907d0..5adf566 100644 --- a/src/node/node.rs +++ b/src/node/node.rs @@ -1,11 +1,9 @@ use crate::cli::Args; -use crate::store::Config; -use crate::store::Message; -use crate::store::MessageContent; -use crate::store::RobotRequest; -use crate::store::RobotResponse; -use crate::store::Robots; -use crate::store::SignedMessage; + +use crate::store::key_manager::KeyConfig; +use crate::store::messages::{Message, MessageContent, RobotRequest, RobotResponse, SignedMessage}; +use crate::store::robot_manager::{Robots}; + use futures::stream::StreamExt; use libp2p::PeerId; @@ -370,7 +368,7 @@ pub async fn start_libp2p_thread( from_message_tx: &broadcast::Sender, to_message_tx: &broadcast::Sender, robots: &Robots, - config: &Config, + config: &KeyConfig, args: &Args, ) -> JoinHandle<()> { let from_message_tx = from_message_tx.clone(); diff --git a/src/store.rs b/src/store.rs deleted file mode 100644 index 3efea5a..0000000 --- a/src/store.rs +++ /dev/null @@ -1,633 +0,0 @@ -use libp2p::{Multiaddr, PeerId}; -use libp2p_identity::ed25519; -use serde::{Deserialize, Serialize}; -use std::any::Any; -use std::collections::HashMap; -use std::error::Error; -use std::fs::{self}; -use std::hash::Hash; -use std::sync::{Arc, Mutex}; -use std::time::SystemTime; -use tokio::sync::broadcast; -use tracing::info; - -use base64::{engine::general_purpose, Engine as _}; - -use crate::commands::{RobotJob, RobotJobResult}; - -/// Represents a tunnel for job communication -#[derive(Debug, Clone, Serialize)] -pub struct Tunnel { - pub client_id: String, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum ChannelMessageToJob { - TerminalMessage(String), - ArchiveMessage { encoded_tar: String, path: String }, - ArchiveRequest { path: String }, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum ChannelMessageFromJob { - TerminalMessage(String), - ArchiveMessage { encoded_tar: String }, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(tag = "request_type")] -pub enum RobotRequest { - GetConfigVersion { version: u32, owner: [u8; 32] }, - ShareConfigMessage { signed_message: SignedMessage }, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(tag = "response_type")] -pub enum RobotResponse { - GetConfigVersion { version: u32, owner: [u8; 32] }, - ShareConfigMessage {}, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(tag = "request_type")] -pub enum MessageRequest { - ListJobs {}, - GetRobotsConfig {}, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(tag = "response_type")] -pub enum MessageResponse { - ListJobs { jobs: Vec }, - GetRobotsConfig { config: RobotsConfig }, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(tag = "type")] -pub enum MessageContent { - CustomMessage(serde_json::Value), - MessageResponse(MessageResponse), - MessageRequest(MessageRequest), - JobMessage(serde_json::Value), - StartTunnelReq { - job_id: String, - peer_id: String, - }, - TunnelResponseMessage { - job_id: String, - message: ChannelMessageFromJob, - }, - StartJob(RobotJob), - UpdateConfig { - config: RobotsConfig, - }, -} - -#[derive(Debug, Clone)] -pub struct JobProcess { - pub job_id: String, - pub job_type: String, - pub status: String, - pub channel_tx: Option>, - pub channel_to_job_tx: broadcast::Sender, - pub tunnel: Option, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct JobProcessData { - pub job_id: String, - pub job_type: String, - pub status: String, -} - -impl From for JobProcessData { - fn from(job_process: JobProcess) -> Self { - return Self { - job_id: job_process.job_id, - job_type: job_process.job_type, - status: job_process.status, - }; - } -} - -/// Manages job processes -#[derive(Default, Debug)] -pub struct JobManager { - pub data: HashMap, -} - -impl JobManager { - /// Creates a new job - pub fn new_job(&mut self, job_id: String, job_type: String, status: String) { - let (channel_to_job_tx, _channel_to_job_rx) = - broadcast::channel::(100); - - self.data.insert( - job_id.clone(), - JobProcess { - job_id, - job_type, - status, - channel_tx: None, - channel_to_job_tx: channel_to_job_tx.clone(), - tunnel: None, - }, - ); - } - /// Sets the result of a job - pub fn set_job_result(&mut self, reslut: RobotJobResult) { - let process = self.data.get_mut(&reslut.job_id); - match process { - Some(_process) => { - self.set_job_status(reslut.job_id, reslut.status); - } - None => {} - } - } - pub fn get_job_or_none(&self, job_id: &String) -> Option { - match self.data.get(job_id) { - Some(job) => Some(job.clone()), - None => None, - } - } - /// Retrieves job information - pub fn get_jobs_info(&self) -> Vec { - return self.data.clone().into_values().map(|x| x.into()).collect(); - } - pub fn set_job_status(&mut self, job_id: String, status: String) { - let process = self.data.get_mut(&job_id); - match process { - Some(process) => { - process.status = status; - } - None => {} - } - } - - /// Creates a tunnel for a job - pub fn create_job_tunnel(&mut self, job_id: &String, client_id: String) { - let process = self.data.get_mut(job_id); - let (tx, _rx) = broadcast::channel::(100); - match process { - Some(process) => { - process.tunnel = Some(Tunnel { client_id }); - process.channel_tx = Some(tx.clone()); - } - None => {} - } - } - pub fn get_channel_from_job( - &self, - job_id: &String, - ) -> Option> { - match self.data.get(job_id) { - Some(job) => match &job.channel_tx { - Some(channel) => Some(channel.clone()), - None => None, - }, - None => None, - } - } - pub fn get_channel_to_job( - &self, - job_id: &String, - ) -> Option> { - match self.data.get(job_id) { - Some(job) => Some(job.channel_to_job_tx.clone()), - None => None, - } - } -} - -/// Manages messages for the system -#[derive(Debug, Clone)] -pub struct MessageManager { - pub from_message_tx: broadcast::Sender, - pub to_message_tx: broadcast::Sender, -} - -impl MessageManager {} - -/// Represents a signed message -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct SignedMessage { - pub message: String, - sign: Vec, - pub public_key: [u8; 32], -} - -impl SignedMessage { - /// Verifies the signature of the message - pub fn verify(&self) -> bool { - if let Ok(public_key) = ed25519::PublicKey::try_from_bytes(&self.public_key) { - return public_key.verify(&*self.message.as_bytes(), &self.sign); - } - return false; - } -} - -/// Represents a message in the system -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Message { - pub timestamp: String, - pub content: MessageContent, - pub from: String, - pub to: Option, -} -impl Message { - /// Creates a new message - pub fn new(content: MessageContent, from: String, to: Option) -> Self { - let duration_since_epoch = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap(); - let timestamp_nanos = duration_since_epoch.as_nanos().to_string(); - Self { - timestamp: timestamp_nanos, - content, - from, - to, - } - } - /// Signs the message using the provided keypair - pub fn signed(self, keypair: ed25519::Keypair) -> Result { - let message_str = serde_json::to_string(&self)?; - let sign = keypair.sign(&*message_str.as_bytes()); - Ok(SignedMessage { - message: message_str, - sign, - public_key: keypair.public().to_bytes(), - }) - } -} -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum RobotRole { - Current, - OrganizationRobot, - OrganizationUser, - Owner, - Unknown, -} -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Robot { - pub robot_id: String, - pub robot_peer_id: String, - pub robot_public_key: String, - pub name: String, - pub tags: Vec, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct User { - pub username: String, - pub public_key: String, - pub tags: Vec, -} - -impl User { - pub fn get_public_key_bytes(&self) -> Result<[u8; 32], Box> { - let decoded_key = general_purpose::STANDARD.decode(&self.public_key); - match decoded_key { - Ok(decoded_key) => { - let public_key_bytes: [u8; 32] = decoded_key.as_slice().try_into()?; - return Ok(public_key_bytes); - } - Err(_err) => { - return Err("can't decode user public key from base64")?; - } - } - } - pub fn get_peer_id(&self) -> Result> { - let identity = - libp2p::identity::ed25519::PublicKey::try_from_bytes(&self.get_public_key_bytes()?)?; - let public_key: libp2p::identity::PublicKey = identity.into(); - let peer_id = public_key.to_peer_id(); - return Ok(peer_id.to_base58()); - } -} - -#[derive(Default, Debug, Clone, Serialize, Deserialize)] -pub struct RobotsConfig { - pub version: u32, - pub robots: Vec, - pub users: Vec, -} - -#[derive(Default, Debug, Clone, Serialize, Deserialize)] -pub struct ConfigStorage { - configs: HashMap<[u8; 32], (RobotsConfig, SignedMessage)>, // owner public key -> config -} - -impl ConfigStorage { - pub fn update_config( - &mut self, - owner: [u8; 32], - config: RobotsConfig, - signed_message: SignedMessage, - ) { - match self.configs.get(&owner) { - Some((old_config, _)) => { - if old_config.version >= config.version { - return; - } - } - None => {} - } - - info!( - "Updating config in config storage owner {:?} {:?}", - owner, signed_message - ); - self.configs.insert(owner, (config, signed_message)); - } - pub fn get_config(&self, owner: &[u8; 32]) -> Option<(RobotsConfig, SignedMessage)> { - if let Some((config, message)) = self.configs.get(owner) { - return Some((config.clone(), message.clone())); - } - return None; - } - pub fn get_config_version(&self, owner: &[u8; 32]) -> u32 { - match self.get_config(owner) { - Some((config, _message)) => { - return config.version; - } - None => { - return 0; - } - } - } -} - -#[derive(Debug, Clone, Serialize, Deserialize, Hash, PartialEq, Eq)] -pub struct RobotInterface { - pub ip4: String, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct RobotsManagerDump { - config: RobotsConfig, - config_message: SignedMessage, -} - -/// Manages robots in the system -#[derive(Default, Debug, Clone, Serialize, Deserialize)] -pub struct RobotsManager { - pub self_peer_id: String, - pub owner_peer_id: String, - pub owner_public_key: [u8; 32], - pub robots: HashMap, // peer id tp Robot - pub users: HashMap, // public key to User - pub config_version: u32, - pub config_message: Option, - pub peer_id_to_ip: HashMap, - pub peers: Vec, - pub config_storage: ConfigStorage, -} - -impl RobotsManager { - /// Adds a robot to the manager - pub fn add_robot(&mut self, robot: Robot) { - self.robots - .insert(robot.robot_peer_id.clone(), robot.clone()); - // if let Some(ip4) = self.peer_id_to_ip.get(&robot.robot_peer_id) { - // self.add_interface_to_robot(robot.robot_peer_id, ip4.to_string()); - // } - } - pub fn add_user(&mut self, user: User) { - // if let Ok(peer_id) = user.get_peer_id(){ - // self.users.insert(peer_id, user.clone()); - // }else{ - // error!("can't get peer_id for user {:?}", user); - // } - self.users.insert(user.public_key.clone(), user); - } - - /// Sets the robots configuration - pub fn set_robots_config(&mut self, config: RobotsConfig, signed_message: SignedMessage) { - if config.version > self.config_version { - self.config_version = config.version.clone(); - self.robots.clear(); - for robot in &config.robots { - self.add_robot(robot.clone()); - } - self.users.clear(); - for user in &config.users { - self.add_user(user.clone()); - } - self.config_message = Some(signed_message.clone()); - info!("OWNER_PUBLIC_KEY {:?}", self.owner_public_key); - self.config_storage.update_config( - self.owner_public_key, - config.clone(), - signed_message, - ); - } else { - info!("config version is too old"); - } - } - - /// Saves the current configuration to a file - pub fn save_to_file(&self, filepath: String) -> Result<(), Box> { - let dump = RobotsManagerDump { - config: self.get_robots_config(), - config_message: self - .config_message - .clone() - .ok_or("no config signed message")?, - }; - fs::write(filepath, serde_json::to_string(&dump)?)?; - Ok(()) - } - /// Reads the configuration from a file - pub fn read_config_from_file(&mut self, filepath: String) -> Result<(), Box> { - let dump_str = fs::read_to_string(filepath)?; - let dump: RobotsManagerDump = serde_json::from_str(&dump_str)?; - self.set_robots_config(dump.config, dump.config_message); - Ok(()) - } - - pub fn get_robots_config(&self) -> RobotsConfig { - return RobotsConfig { - version: self.config_version, - robots: self - .robots - .values() - .map(|robot| robot.clone()) - .collect::>(), - users: self - .users - .values() - .map(|user| user.clone()) - .collect::>(), - }; - } - - pub fn set_peers(&mut self, peers: Vec) { - self.peers = peers - } - - pub fn set_owner(&mut self, owner_b64: String) -> Result<(), Box> { - let decoded_key = general_purpose::STANDARD.decode(owner_b64); - match decoded_key { - Ok(owner_public_key) => { - match owner_public_key.as_slice().try_into() { - Ok(pk) => { - self.owner_public_key = pk; - } - _ => { - return Err("can't transform pk array".into()); - } - }; - } - _ => { - return Err("can't decode pk from b64".into()); - } - } - - Ok(()) - } - - pub fn read_robots_from_config(&mut self, config: String) { - let robots_config: RobotsConfig = - serde_json::from_str::(&config).expect("wrong JSON"); - for robot in robots_config.robots.iter() { - self.add_robot(robot.clone()); - } - } - - // pub fn add_interface_to_robot(&mut self, robot_peer_id: String, ip4: String) { - // info!("Adding interface {} = {}", robot_peer_id, ip4); - // match self.robots.get_mut(&robot_peer_id) { - // Some(robot) => { - // robot.interfaces.insert(RobotInterface { ip4 }); - // } - // None => { - // info!("No robot for peer id {}", robot_peer_id); - // self.peer_id_to_ip.insert(robot_peer_id, ip4); - // } - // } - // } - - /// Gets the role of an entity based on its ID - pub fn get_role(&self, id: T) -> RobotRole { - let value_any = &id as &dyn Any; - if let Some(_peer_id) = value_any.downcast_ref::() { - // for user_peer_id in self.users.keys() { - // if user_peer_id == peer_id { - // return RobotRole::OrganizationUser; - // } - // } - } else if let Some(public_key) = value_any.downcast_ref::<[u8; 32]>() { - if *public_key == self.owner_public_key { - return RobotRole::Owner; - } - - info!("public key: {:?}", public_key); - let pk_str = general_purpose::STANDARD.encode(&public_key.to_vec()); - info!("pk_str: {}", pk_str); - for robot_public_key in self - .robots - .values() - .map(|robot| robot.robot_public_key.clone()) - { - if robot_public_key == pk_str { - return RobotRole::OrganizationRobot; - } - } - info!("users: {:?}", self.users); - if let Some(_user) = self.users.get(&pk_str) { - return RobotRole::OrganizationUser; - } - } - - return RobotRole::Unknown; - } - - pub fn remove_interface_from_robot(&mut self, _robot_peer_id: String, _ip4: String) {} - - pub fn merge_update(&mut self, update_robots: RobotsConfig) { - for robot in update_robots.robots.iter() { - if !self.robots.contains_key(&robot.robot_peer_id) { - self.add_robot(robot.clone()); - } - } - } - - pub fn get_robots_json(self) -> String { - serde_json::to_string(&self).unwrap() - } -} - -/// Represents the configuration for the system -#[derive(Debug, Clone)] -pub struct Config { - pub identity: libp2p::identity::ed25519::Keypair, - pub bootstrap_addrs: Vec, - pub libp2p_port: u16, -} - -impl Config { - /// Generates a new configuration - pub fn generate() -> Self { - Self { - identity: libp2p::identity::ed25519::Keypair::generate(), - bootstrap_addrs: Vec::new(), - libp2p_port: 0, - } - } - - /// Saves the configuration to a file - pub fn save_to_file(self: &Self, filepath: String) -> Result<(), Box> { - let encoded_key = general_purpose::STANDARD.encode(&self.identity.to_bytes().to_vec()); - fs::write(filepath, encoded_key)?; - Ok(()) - } - - pub fn get_public_key_encoded(self: &Self) -> String { - let public_key_encoded = - general_purpose::STANDARD.encode(&self.identity.public().to_bytes().to_vec()); - public_key_encoded - } - - pub fn get_peer_id(self: &Self) -> String { - let public_key: libp2p::identity::PublicKey = self.identity.public().into(); - let peer_id = public_key.to_peer_id(); - return peer_id.to_string(); - } - - /// Loads the configuration from a file - pub fn load_from_file(filepath: String) -> Result> { - let key = fs::read(filepath)?; - let decoded_key: &mut [u8] = &mut general_purpose::STANDARD.decode(key)?; - let parsed_identity = libp2p::identity::ed25519::Keypair::try_from_bytes(decoded_key)?; - Ok(Self { - identity: parsed_identity, - bootstrap_addrs: Vec::new(), - libp2p_port: 0, - }) - } - - /// Loads the configuration from a secret key string - pub fn load_from_sk_string(sk_string: String) -> Result> { - let decoded_key: &mut [u8] = &mut general_purpose::STANDARD.decode(sk_string)?; - let parsed_secret = libp2p::identity::ed25519::SecretKey::try_from_bytes(decoded_key)?; - let identity = libp2p::identity::ed25519::Keypair::from(parsed_secret); - Ok(Self { - identity, - bootstrap_addrs: Vec::new(), - libp2p_port: 0, - }) - } - - pub fn add_bootstrap_addr(&mut self, addr: Multiaddr) { - self.bootstrap_addrs.push(addr); - } - - pub fn set_libp2p_port(&mut self, port: u16) { - self.libp2p_port = port; - } -} - -/// Type alias for thread-safe access to RobotsManager -pub type Robots = Arc>; -/// Type alias for thread-safe access to JobManager -pub type Jobs = Arc>; diff --git a/src/store/config.rs b/src/store/config.rs new file mode 100644 index 0000000..3a3bb29 --- /dev/null +++ b/src/store/config.rs @@ -0,0 +1,58 @@ +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use tracing::info; + +use super::messages::{SignedMessage}; +use super::robot_manager::{Robot,User}; + +#[derive(Default, Debug, Clone, Serialize, Deserialize)] +pub struct RobotsConfig { + pub version: u32, + pub robots: Vec, + pub users: Vec, +} + +#[derive(Default, Debug, Clone, Serialize, Deserialize)] +pub struct ConfigStorage { + configs: HashMap<[u8; 32], (RobotsConfig, SignedMessage)>, // owner public key -> config +} + +impl ConfigStorage { + pub fn update_config( + &mut self, + owner: [u8; 32], + config: RobotsConfig, + signed_message: SignedMessage, + ) { + match self.configs.get(&owner) { + Some((old_config, _)) => { + if old_config.version >= config.version { + return; + } + } + None => {} + } + + info!( + "Updating config in config storage owner {:?} {:?}", + owner, signed_message + ); + self.configs.insert(owner, (config, signed_message)); + } + pub fn get_config(&self, owner: &[u8; 32]) -> Option<(RobotsConfig, SignedMessage)> { + if let Some((config, message)) = self.configs.get(owner) { + return Some((config.clone(), message.clone())); + } + return None; + } + pub fn get_config_version(&self, owner: &[u8; 32]) -> u32 { + match self.get_config(owner) { + Some((config, _message)) => { + return config.version; + } + None => { + return 0; + } + } + } +} \ No newline at end of file diff --git a/src/store/job_manager.rs b/src/store/job_manager.rs new file mode 100644 index 0000000..bd35bf8 --- /dev/null +++ b/src/store/job_manager.rs @@ -0,0 +1,131 @@ +use tokio::sync::broadcast; +use std::sync::{Arc, Mutex}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use super::messages::{ChannelMessageFromJob, ChannelMessageToJob}; +use crate::commands::RobotJobResult; + +/// Represents a tunnel for job communication +#[derive(Debug, Clone, Serialize)] +pub struct Tunnel { + pub client_id: String, +} + +#[derive(Debug, Clone)] +pub struct JobProcess { + pub job_id: String, + pub job_type: String, + pub status: String, + pub channel_tx: Option>, + pub channel_to_job_tx: broadcast::Sender, + pub tunnel: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct JobProcessData { + pub job_id: String, + pub job_type: String, + pub status: String, +} + +impl From for JobProcessData { + fn from(job_process: JobProcess) -> Self { + return Self { + job_id: job_process.job_id, + job_type: job_process.job_type, + status: job_process.status, + }; + } +} + +/// Manages job processes +#[derive(Default, Debug)] +pub struct JobManager { + pub data: HashMap, +} + +impl JobManager { + /// Creates a new job + pub fn new_job(&mut self, job_id: String, job_type: String, status: String) { + let (channel_to_job_tx, _channel_to_job_rx) = + broadcast::channel::(100); + + self.data.insert( + job_id.clone(), + JobProcess { + job_id, + job_type, + status, + channel_tx: None, + channel_to_job_tx: channel_to_job_tx.clone(), + tunnel: None, + }, + ); + } + /// Sets the result of a job + pub fn set_job_result(&mut self, reslut: RobotJobResult) { + let process = self.data.get_mut(&reslut.job_id); + match process { + Some(_process) => { + self.set_job_status(reslut.job_id, reslut.status); + } + None => {} + } + } + pub fn get_job_or_none(&self, job_id: &String) -> Option { + match self.data.get(job_id) { + Some(job) => Some(job.clone()), + None => None, + } + } + /// Retrieves job information + pub fn get_jobs_info(&self) -> Vec { + return self.data.clone().into_values().map(|x| x.into()).collect(); + } + pub fn set_job_status(&mut self, job_id: String, status: String) { + let process = self.data.get_mut(&job_id); + match process { + Some(process) => { + process.status = status; + } + None => {} + } + } + + /// Creates a tunnel for a job + pub fn create_job_tunnel(&mut self, job_id: &String, client_id: String) { + let process = self.data.get_mut(job_id); + let (tx, _rx) = broadcast::channel::(100); + match process { + Some(process) => { + process.tunnel = Some(Tunnel { client_id }); + process.channel_tx = Some(tx.clone()); + } + None => {} + } + } + pub fn get_channel_from_job( + &self, + job_id: &String, + ) -> Option> { + match self.data.get(job_id) { + Some(job) => match &job.channel_tx { + Some(channel) => Some(channel.clone()), + None => None, + }, + None => None, + } + } + pub fn get_channel_to_job( + &self, + job_id: &String, + ) -> Option> { + match self.data.get(job_id) { + Some(job) => Some(job.channel_to_job_tx.clone()), + None => None, + } + } +} + + +pub type Jobs = Arc>; diff --git a/src/store/key_manager.rs b/src/store/key_manager.rs new file mode 100644 index 0000000..63ba251 --- /dev/null +++ b/src/store/key_manager.rs @@ -0,0 +1,77 @@ +use libp2p::{Multiaddr, PeerId}; +use base64::{engine::general_purpose, Engine as _}; +use std::fs::{self}; +use std::error::Error; + + + +/// Manages robots in the system +/// Represents the configuration for the system +#[derive(Debug, Clone)] +pub struct KeyConfig { + pub identity: libp2p::identity::ed25519::Keypair, + pub bootstrap_addrs: Vec, + pub libp2p_port: u16, +} + +impl KeyConfig { + /// Generates a new configuration + pub fn generate() -> Self { + Self { + identity: libp2p::identity::ed25519::Keypair::generate(), + bootstrap_addrs: Vec::new(), + libp2p_port: 0, + } + } + + /// Saves the configuration to a file + pub fn save_to_file(self: &Self, filepath: String) -> Result<(), Box> { + let encoded_key = general_purpose::STANDARD.encode(&self.identity.to_bytes().to_vec()); + fs::write(filepath, encoded_key)?; + Ok(()) + } + + pub fn get_public_key_encoded(self: &Self) -> String { + let public_key_encoded = + general_purpose::STANDARD.encode(&self.identity.public().to_bytes().to_vec()); + public_key_encoded + } + + pub fn get_peer_id(self: &Self) -> String { + let public_key: libp2p::identity::PublicKey = self.identity.public().into(); + let peer_id = public_key.to_peer_id(); + return peer_id.to_string(); + } + + /// Loads the configuration from a file + pub fn load_from_file(filepath: String) -> Result> { + let key = fs::read(filepath)?; + let decoded_key: &mut [u8] = &mut general_purpose::STANDARD.decode(key)?; + let parsed_identity = libp2p::identity::ed25519::Keypair::try_from_bytes(decoded_key)?; + Ok(Self { + identity: parsed_identity, + bootstrap_addrs: Vec::new(), + libp2p_port: 0, + }) + } + + /// Loads the configuration from a secret key string + pub fn load_from_sk_string(sk_string: String) -> Result> { + let decoded_key: &mut [u8] = &mut general_purpose::STANDARD.decode(sk_string)?; + let parsed_secret = libp2p::identity::ed25519::SecretKey::try_from_bytes(decoded_key)?; + let identity = libp2p::identity::ed25519::Keypair::from(parsed_secret); + Ok(Self { + identity, + bootstrap_addrs: Vec::new(), + libp2p_port: 0, + }) + } + + pub fn add_bootstrap_addr(&mut self, addr: Multiaddr) { + self.bootstrap_addrs.push(addr); + } + + pub fn set_libp2p_port(&mut self, port: u16) { + self.libp2p_port = port; + } +} diff --git a/src/store/messages.rs b/src/store/messages.rs new file mode 100644 index 0000000..bec9446 --- /dev/null +++ b/src/store/messages.rs @@ -0,0 +1,126 @@ +use serde::{Deserialize, Serialize}; +use base64::{engine::general_purpose, Engine as _}; +use libp2p_identity::ed25519; +use std::time::SystemTime; + +use super::job_manager::{JobProcessData}; +use crate::commands::{RobotJob}; + +use super::robot_manager::{Robot}; +use super::config::{RobotsConfig}; + + + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum ChannelMessageToJob { + TerminalMessage(String), + ArchiveMessage { encoded_tar: String, path: String }, + ArchiveRequest { path: String }, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum ChannelMessageFromJob { + TerminalMessage(String), + ArchiveMessage { encoded_tar: String }, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "request_type")] +pub enum RobotRequest { + GetConfigVersion { version: u32, owner: [u8; 32] }, + ShareConfigMessage { signed_message: SignedMessage }, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "response_type")] +pub enum RobotResponse { + GetConfigVersion { version: u32, owner: [u8; 32] }, + ShareConfigMessage {}, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "request_type")] +pub enum MessageRequest { + ListJobs {}, + GetRobotsConfig {}, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "response_type")] +pub enum MessageResponse { + ListJobs { jobs: Vec }, + GetRobotsConfig { config: RobotsConfig }, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type")] +pub enum MessageContent { + CustomMessage(serde_json::Value), + MessageResponse(MessageResponse), + MessageRequest(MessageRequest), + JobMessage(serde_json::Value), + StartTunnelReq { + job_id: String, + peer_id: String, + }, + TunnelResponseMessage { + job_id: String, + message: ChannelMessageFromJob, + }, + StartJob(RobotJob), + UpdateConfig { + config: RobotsConfig, + }, + Handshake{} +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SignedMessage { + pub message: String, + sign: Vec, + pub public_key: [u8; 32], +} + +impl SignedMessage { + /// Verifies the signature of the message + pub fn verify(&self) -> bool { + if let Ok(public_key) = ed25519::PublicKey::try_from_bytes(&self.public_key) { + return public_key.verify(&*self.message.as_bytes(), &self.sign); + } + return false; + } +} + +/// Represents a message in the system +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Message { + pub timestamp: String, + pub content: MessageContent, + pub from: String, + pub to: Option, +} +impl Message { + /// Creates a new message + pub fn new(content: MessageContent, from: String, to: Option) -> Self { + let duration_since_epoch = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap(); + let timestamp_nanos = duration_since_epoch.as_nanos().to_string(); + Self { + timestamp: timestamp_nanos, + content, + from, + to, + } + } + /// Signs the message using the provided keypair + pub fn signed(self, keypair: ed25519::Keypair) -> Result { + let message_str = serde_json::to_string(&self)?; + let sign = keypair.sign(&*message_str.as_bytes()); + Ok(SignedMessage { + message: message_str, + sign, + public_key: keypair.public().to_bytes(), + }) + } +} \ No newline at end of file diff --git a/src/store/mod.rs b/src/store/mod.rs new file mode 100644 index 0000000..fadd37f --- /dev/null +++ b/src/store/mod.rs @@ -0,0 +1,6 @@ +pub mod messages; +pub mod job_manager; +pub mod network_manager; +pub mod robot_manager; +pub mod config; +pub mod key_manager; diff --git a/src/store/network_manager.rs b/src/store/network_manager.rs new file mode 100644 index 0000000..1d5ff8a --- /dev/null +++ b/src/store/network_manager.rs @@ -0,0 +1,44 @@ +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::time::SystemTime; + +/// Peer info contains status, last hanshake etc for each peer +#[derive(Default, Debug, Clone, Serialize, Deserialize)] +pub struct PeerInfo{ + pub peer_id: String, + + pub last_handshake: u64, + pub is_online: bool +} + +#[derive(Default, Debug, Clone, Serialize, Deserialize)] +pub struct NetworkManager{ + pub peers_info: HashMap +} + +impl NetworkManager{ + pub fn process_handshake(&mut self, peer_id: String){ + let cur_time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); + if let Some(peer_info) = self.peers_info.get_mut(&peer_id){ + peer_info.last_handshake = cur_time; + peer_info.is_online = true; + } else{ + let peer_info = PeerInfo{ + peer_id: peer_id.clone(), + last_handshake: cur_time, + is_online: true + }; + self.peers_info.insert(peer_id, peer_info); + } + } + pub fn clean_old_handshakes(&mut self){ + let cur_time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); + + for (_peer_id, peer_info) in self.peers_info.iter_mut(){ + if cur_time - peer_info.last_handshake > 60{ + peer_info.is_online = false + } + } + } + +} diff --git a/src/store/robot_manager.rs b/src/store/robot_manager.rs new file mode 100644 index 0000000..9c03f29 --- /dev/null +++ b/src/store/robot_manager.rs @@ -0,0 +1,267 @@ +use serde::{Deserialize, Serialize}; +use std::error::Error; +use base64::{engine::general_purpose, Engine as _}; +use std::collections::HashMap; +use tracing::info; +use std::fs::{self}; +use libp2p::{Multiaddr, PeerId}; +use std::any::Any; +use std::sync::{Arc, Mutex}; + +use super::messages::{SignedMessage}; +use super::network_manager::{NetworkManager}; +use super::config::{RobotsConfig, ConfigStorage}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum RobotRole { + Current, + OrganizationRobot, + OrganizationUser, + Owner, + Unknown, +} +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Robot { + pub robot_id: String, + pub robot_peer_id: String, + pub robot_public_key: String, + pub name: String, + pub tags: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct User { + pub username: String, + pub public_key: String, + pub tags: Vec, +} + +impl User { + pub fn get_public_key_bytes(&self) -> Result<[u8; 32], Box> { + let decoded_key = general_purpose::STANDARD.decode(&self.public_key); + match decoded_key { + Ok(decoded_key) => { + let public_key_bytes: [u8; 32] = decoded_key.as_slice().try_into()?; + return Ok(public_key_bytes); + } + Err(_err) => { + return Err("can't decode user public key from base64")?; + } + } + } + pub fn get_peer_id(&self) -> Result> { + let identity = + libp2p::identity::ed25519::PublicKey::try_from_bytes(&self.get_public_key_bytes()?)?; + let public_key: libp2p::identity::PublicKey = identity.into(); + let peer_id = public_key.to_peer_id(); + return Ok(peer_id.to_base58()); + } +} + + +#[derive(Debug, Clone, Serialize, Deserialize, Hash, PartialEq, Eq)] +pub struct RobotInterface { + pub ip4: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RobotsManagerDump { + config: RobotsConfig, + config_message: SignedMessage, +} + + + +#[derive(Default, Debug, Clone, Serialize, Deserialize)] +pub struct RobotsManager { + pub self_peer_id: String, + pub owner_peer_id: String, + pub owner_public_key: [u8; 32], + pub robots: HashMap, // peer id tp Robot + pub users: HashMap, // public key to User + pub config_version: u32, + pub config_message: Option, + pub peer_id_to_ip: HashMap, + pub peers: Vec, + pub config_storage: ConfigStorage, + pub network_manager: NetworkManager +} + +impl RobotsManager { + /// Adds a robot to the manager + pub fn add_robot(&mut self, robot: Robot) { + self.robots + .insert(robot.robot_peer_id.clone(), robot.clone()); + // if let Some(ip4) = self.peer_id_to_ip.get(&robot.robot_peer_id) { + // self.add_interface_to_robot(robot.robot_peer_id, ip4.to_string()); + // } + } + pub fn add_user(&mut self, user: User) { + // if let Ok(peer_id) = user.get_peer_id(){ + // self.users.insert(peer_id, user.clone()); + // }else{ + // error!("can't get peer_id for user {:?}", user); + // } + self.users.insert(user.public_key.clone(), user); + } + + + /// Sets the robots configuration + pub fn set_robots_config(&mut self, config: RobotsConfig, signed_message: SignedMessage) { + if config.version > self.config_version { + self.config_version = config.version.clone(); + self.robots.clear(); + for robot in &config.robots { + self.add_robot(robot.clone()); + } + self.users.clear(); + for user in &config.users { + self.add_user(user.clone()); + } + self.config_message = Some(signed_message.clone()); + info!("OWNER_PUBLIC_KEY {:?}", self.owner_public_key); + self.config_storage.update_config( + self.owner_public_key, + config.clone(), + signed_message, + ); + } else { + info!("config version is too old"); + } + } + + /// Saves the current configuration to a file + pub fn save_to_file(&self, filepath: String) -> Result<(), Box> { + let dump = RobotsManagerDump { + config: self.get_robots_config(), + config_message: self + .config_message + .clone() + .ok_or("no config signed message")?, + }; + fs::write(filepath, serde_json::to_string(&dump)?)?; + Ok(()) + } + /// Reads the configuration from a file + pub fn read_config_from_file(&mut self, filepath: String) -> Result<(), Box> { + let dump_str = fs::read_to_string(filepath)?; + let dump: RobotsManagerDump = serde_json::from_str(&dump_str)?; + self.set_robots_config(dump.config, dump.config_message); + Ok(()) + } + + pub fn get_robots_config(&self) -> RobotsConfig { + return RobotsConfig { + version: self.config_version, + robots: self + .robots + .values() + .map(|robot| robot.clone()) + .collect::>(), + users: self + .users + .values() + .map(|user| user.clone()) + .collect::>(), + }; + } + + pub fn set_peers(&mut self, peers: Vec) { + self.peers = peers + } + + pub fn set_owner(&mut self, owner_b64: String) -> Result<(), Box> { + let decoded_key = general_purpose::STANDARD.decode(owner_b64); + match decoded_key { + Ok(owner_public_key) => { + match owner_public_key.as_slice().try_into() { + Ok(pk) => { + self.owner_public_key = pk; + } + _ => { + return Err("can't transform pk array".into()); + } + }; + } + _ => { + return Err("can't decode pk from b64".into()); + } + } + + Ok(()) + } + + pub fn read_robots_from_config(&mut self, config: String) { + let robots_config: RobotsConfig = + serde_json::from_str::(&config).expect("wrong JSON"); + for robot in robots_config.robots.iter() { + self.add_robot(robot.clone()); + } + } + + // pub fn add_interface_to_robot(&mut self, robot_peer_id: String, ip4: String) { + // info!("Adding interface {} = {}", robot_peer_id, ip4); + // match self.robots.get_mut(&robot_peer_id) { + // Some(robot) => { + // robot.interfaces.insert(RobotInterface { ip4 }); + // } + // None => { + // info!("No robot for peer id {}", robot_peer_id); + // self.peer_id_to_ip.insert(robot_peer_id, ip4); + // } + // } + // } + + /// Gets the role of an entity based on its ID + pub fn get_role(&self, id: T) -> RobotRole { + let value_any = &id as &dyn Any; + if let Some(_peer_id) = value_any.downcast_ref::() { + // for user_peer_id in self.users.keys() { + // if user_peer_id == peer_id { + // return RobotRole::OrganizationUser; + // } + // } + } else if let Some(public_key) = value_any.downcast_ref::<[u8; 32]>() { + if *public_key == self.owner_public_key { + return RobotRole::Owner; + } + + info!("public key: {:?}", public_key); + let pk_str = general_purpose::STANDARD.encode(&public_key.to_vec()); + info!("pk_str: {}", pk_str); + for robot_public_key in self + .robots + .values() + .map(|robot| robot.robot_public_key.clone()) + { + if robot_public_key == pk_str { + return RobotRole::OrganizationRobot; + } + } + info!("users: {:?}", self.users); + if let Some(_user) = self.users.get(&pk_str) { + return RobotRole::OrganizationUser; + } + } + + return RobotRole::Unknown; + } + + pub fn remove_interface_from_robot(&mut self, _robot_peer_id: String, _ip4: String) {} + + pub fn merge_update(&mut self, update_robots: RobotsConfig) { + for robot in update_robots.robots.iter() { + if !self.robots.contains_key(&robot.robot_peer_id) { + self.add_robot(robot.clone()); + } + } + } + + pub fn get_robots_json(self) -> String { + serde_json::to_string(&self).unwrap() + } +} + + +/// Type alias for thread-safe access to RobotsManager +pub type Robots = Arc>; \ No newline at end of file