From d35de36f726c663473fa07947976782a1fda7df6 Mon Sep 17 00:00:00 2001 From: smehnov Date: Mon, 5 Aug 2024 23:20:23 +0300 Subject: [PATCH 1/2] update job status --- src/commands/docker.rs | 49 +++++++++++++++++++++++++----------------- src/commands/mod.rs | 13 +++++------ src/store.rs | 11 +++++++++- 3 files changed, 46 insertions(+), 27 deletions(-) diff --git a/src/commands/docker.rs b/src/commands/docker.rs index 44465e3..eda18fd 100644 --- a/src/commands/docker.rs +++ b/src/commands/docker.rs @@ -30,28 +30,37 @@ pub async fn execute_launch( let args = serde_json::from_str::(&robot_job.args).unwrap(); info!("launching docker job {:?}", args); let docker_launch = DockerLaunch { args }; - let robot_job_result = match docker_launch.execute(robot_job.clone(), agent, jobs).await { - Ok(result) => { - info!("job successfully executed"); - result - } - Err(error) => { - error!("error {:?}", error); - RobotJobResult { - job_id: robot_job.id, - status: String::from("error"), - logs: error.to_string(), + { + let exec_jobs = Arc::clone(&jobs); + let robot_job_result = match docker_launch + .execute(robot_job.clone(), agent, exec_jobs) + .await + { + Ok(result) => { + info!("job successfully executed"); + result } - } - }; - match socket { - Some(socket) => { - let _ = socket - .emit("job_done", serde_json::json!(robot_job_result)) - .await; - } - None => {} + Err(error) => { + error!("error {:?}", error); + RobotJobResult { + job_id: robot_job.id, + status: String::from("error"), + logs: error.to_string(), + } + } + }; + let mut job_manager = jobs.lock().unwrap(); + job_manager.set_job_result(robot_job_result); } + + // match socket { + // Some(socket) => { + // let _ = socket + // .emit("job_done", serde_json::json!(robot_job_result)) + // .await; + // } + // None => {} + // } } #[derive(Clone, Serialize, Deserialize, Debug)] diff --git a/src/commands/mod.rs b/src/commands/mod.rs index 1003a26..9144259 100644 --- a/src/commands/mod.rs +++ b/src/commands/mod.rs @@ -76,16 +76,17 @@ pub async fn launch_new_job( jobs: store::Jobs, ) { info!("{:?}", robot_job); + let mut job_manager = jobs.lock().unwrap(); + job_manager.new_job( + robot_job.id.clone(), + robot_job.job_type.clone(), + robot_job.status.clone(), + ); + job_manager.set_job_status(robot_job.id.clone(), "processing".to_string()); match robot_job.job_type.as_str() { "docker-container-launch" => { info!("container launch"); - let mut job_manager = jobs.lock().unwrap(); - job_manager.new_job( - robot_job.id.clone(), - robot_job.job_type.clone(), - robot_job.status.clone(), - ); let shared_jobs = Arc::clone(&jobs); tokio::spawn(docker::execute_launch( socket, diff --git a/src/store.rs b/src/store.rs index 2a36d75..8b87cf4 100644 --- a/src/store.rs +++ b/src/store.rs @@ -11,7 +11,7 @@ use tracing::info; use base64::{engine::general_purpose, Engine as _}; -use crate::commands::RobotJob; +use crate::commands::{RobotJob, RobotJobResult}; #[derive(Debug, Clone, Serialize)] pub struct Tunnel { @@ -115,6 +115,15 @@ impl JobManager { }, ); } + pub fn set_job_result(&mut self, reslut: RobotJobResult) { + let process = self.data.get_mut(&reslut.job_id); + match process { + Some(process) => { + self.set_job_status(reslut.job_id, reslut.status); + } + None => {} + } + } pub fn get_job_or_none(&self, job_id: &String) -> Option { match self.data.get(job_id) { Some(job) => Some(job.clone()), From 6b1efa5b98b8cab9ea61dd42cc3d41876f39490d Mon Sep 17 00:00:00 2001 From: smehnov Date: Wed, 14 Aug 2024 08:30:41 +0300 Subject: [PATCH 2/2] config updates + signed messages --- src/agent.rs | 36 ------- src/cli.rs | 26 ++--- src/commands/docker.rs | 60 +++++------ src/commands/mod.rs | 15 +-- src/develop/mdns.rs | 194 ++++++++++++++++++++++++++++++------ src/develop/unix_socket.rs | 100 +++++++++++-------- src/main.rs | 190 +++++++++++++---------------------- src/store.rs | 199 ++++++++++++++++++++++++++++++++----- 8 files changed, 505 insertions(+), 315 deletions(-) delete mode 100644 src/agent.rs diff --git a/src/agent.rs b/src/agent.rs deleted file mode 100644 index 94a42d2..0000000 --- a/src/agent.rs +++ /dev/null @@ -1,36 +0,0 @@ -#[derive(Debug, Clone)] -pub struct Agent { - pub api_key: String, - pub robot_server_url: String, -} - -pub struct AgentBuilder { - api_key: Option, - robot_server_url: Option, -} - -impl Default for AgentBuilder { - fn default() -> Self { - Self { - api_key: None, - robot_server_url: None, - } - } -} - -impl AgentBuilder { - pub fn api_key(mut self, api_key: String) -> Self { - self.api_key = Some(api_key); - self - } - pub fn robot_server_url(mut self, robot_server_url: String) -> Self { - self.robot_server_url = Some(robot_server_url); - self - } - pub fn build(self) -> Agent { - Agent { - api_key: self.api_key.unwrap(), - robot_server_url: self.robot_server_url.unwrap(), - } - } -} diff --git a/src/cli.rs b/src/cli.rs index 9a17509..3fe88a5 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -3,29 +3,29 @@ use clap::Parser; #[derive(Parser, Debug, Clone)] #[command(author, version, about, long_about = None)] pub struct Args { - #[arg(short, long)] - pub api_key: String, - - #[arg(short, long, default_value = "https://robots.merklebot.com")] - pub robot_server_url: String, - - #[arg(short, long, default_value = "normal")] + #[arg(short = 'm', long, default_value = "normal")] pub mode: String, - #[arg(short, long, default_value = "merklebot.socket")] + #[arg(short = 's', long, default_value = "rn.socket")] pub socket_filename: String, - #[arg(short, long, default_value = "merklebot.key")] + #[arg(short = 'f', long, default_value = "rn.key")] pub key_filename: String, - #[arg(short, long)] + #[arg(short = 'b', long)] pub bootstrap_addr: Option, - #[arg(short, long, default_value = "8765")] + #[arg(short = 'l', long, default_value = "8765")] pub port_libp2p: String, - #[arg(short, long)] - pub owner: Option, + #[arg(short = 'c', long, default_value = "rn.json")] + pub config_path: String, + + #[arg(short = 'k', long)] + pub secret_key: Option, + + #[arg(short = 'o', long)] + pub owner: String, } pub fn get_args() -> Args { diff --git a/src/commands/docker.rs b/src/commands/docker.rs index eda18fd..a8b4f67 100644 --- a/src/commands/docker.rs +++ b/src/commands/docker.rs @@ -11,7 +11,6 @@ use serde::{Deserialize, Serialize}; use base64::{engine::general_purpose, Engine as _}; -use crate::agent; use crate::{ commands::{RobotJob, RobotJobResult}, store::Jobs, @@ -21,21 +20,13 @@ use crate::{ }, }; -pub async fn execute_launch( - socket: Option, - robot_job: RobotJob, - agent: agent::Agent, - jobs: Jobs, -) { +pub async fn execute_launch(socket: Option, robot_job: RobotJob, jobs: Jobs) { let args = serde_json::from_str::(&robot_job.args).unwrap(); info!("launching docker job {:?}", args); let docker_launch = DockerLaunch { args }; { let exec_jobs = Arc::clone(&jobs); - let robot_job_result = match docker_launch - .execute(robot_job.clone(), agent, exec_jobs) - .await - { + let robot_job_result = match docker_launch.execute(robot_job.clone(), exec_jobs).await { Ok(result) => { info!("job successfully executed"); result @@ -90,7 +81,6 @@ impl DockerLaunch { pub async fn execute( &self, robot_job: RobotJob, - agent: agent::Agent, jobs: Jobs, ) -> Result { info!("launching docker with image {}", self.args.image); @@ -311,29 +301,29 @@ impl DockerLaunch { }; let job_data_path = get_job_data_path(&robot_job.id); - if let Some(true) = &self.args.store_data { - match get_files_in_directory_recursively(&job_data_path) { - //TODO: change to path - Ok(paths) => { - info!("{:?}", paths); - for path in paths { - let path_str = path.as_path().display().to_string(); - let key = path_str.replace(&get_merklebot_data_path(), ""); - upload_content( - agent.robot_server_url.clone(), - path, - key, - robot_job.id.clone(), - agent.api_key.clone(), - ) - .await; - } - } - _ => { - error!("Can't get resulting paths"); - } - } - } + // if let Some(true) = &self.args.store_data { + // match get_files_in_directory_recursively(&job_data_path) { + // //TODO: change to path + // Ok(paths) => { + // info!("{:?}", paths); + // for path in paths { + // let path_str = path.as_path().display().to_string(); + // let key = path_str.replace(&get_merklebot_data_path(), ""); + // upload_content( + // agent.robot_server_url.clone(), + // path, + // key, + // robot_job.id.clone(), + // agent.api_key.clone(), + // ) + // .await; + // } + // } + // _ => { + // error!("Can't get resulting paths"); + // } + // } + // } Ok(robot_job_result) } } diff --git a/src/commands/mod.rs b/src/commands/mod.rs index 9144259..f805617 100644 --- a/src/commands/mod.rs +++ b/src/commands/mod.rs @@ -9,7 +9,6 @@ use serde_json::json; use tokio::sync::broadcast::Sender; use tracing::{error, info}; -use crate::agent; use crate::store; use crate::store::ChannelMessageFromJob; use crate::store::Message; @@ -69,12 +68,7 @@ pub enum TunnnelClient { }, } -pub async fn launch_new_job( - robot_job: RobotJob, - socket: Option, - agent: agent::Agent, - jobs: store::Jobs, -) { +pub async fn launch_new_job(robot_job: RobotJob, socket: Option, jobs: store::Jobs) { info!("{:?}", robot_job); let mut job_manager = jobs.lock().unwrap(); job_manager.new_job( @@ -88,12 +82,7 @@ pub async fn launch_new_job( "docker-container-launch" => { info!("container launch"); let shared_jobs = Arc::clone(&jobs); - tokio::spawn(docker::execute_launch( - socket, - robot_job, - agent, - shared_jobs, - )); + tokio::spawn(docker::execute_launch(socket, robot_job, shared_jobs)); } _ => {} } diff --git a/src/develop/mdns.rs b/src/develop/mdns.rs index 7a81b25..5f4411c 100644 --- a/src/develop/mdns.rs +++ b/src/develop/mdns.rs @@ -1,13 +1,20 @@ use crate::store::Config; use crate::store::Message; +use crate::store::MessageContent; +use crate::store::RobotRequest; +use crate::store::RobotResponse; use crate::store::RobotRole; use crate::store::Robots; +use crate::store::SignedMessage; use futures::stream::StreamExt; +use libp2p::relay::client::new; +use libp2p::PeerId; +use libp2p::StreamProtocol; use libp2p::{ gossipsub, identify, kad, kad::store::MemoryStore, - mdns, noise, + mdns, noise, ping, request_response, swarm::{NetworkBehaviour, SwarmEvent}, tcp, yamux, }; @@ -16,6 +23,8 @@ use libp2p::Multiaddr; use std::collections::hash_map::DefaultHasher; use std::error::Error; use std::hash::{Hash, Hasher}; +use std::str::FromStr; +use std::sync::Arc; use std::time::Duration; use tokio::sync::broadcast; use tokio::{io, select}; @@ -24,9 +33,40 @@ use tracing::{error, info}; #[derive(NetworkBehaviour)] struct MyBehaviour { identify: identify::Behaviour, + request_response: request_response::json::Behaviour, gossipsub: gossipsub::Behaviour, mdns: mdns::tokio::Behaviour, kademlia: kad::Behaviour, + ping: ping::Behaviour, +} + +fn process_signed_message( + message_string: String, + robots: Robots, + to_message_tx: broadcast::Sender, +) { + if let Ok(signed_message) = serde_json::from_str::(&message_string) { + info!("signed_message: {:?}", signed_message); + let robot_manager = robots.lock().unwrap(); + if signed_message.verify() { + info!("Verified"); + if let Ok(message) = serde_json::from_str::(&signed_message.message) { + info!("message: {:?}", message); + + if message.to.unwrap_or("".to_string()) == robot_manager.self_peer_id + || matches!(message.content, MessageContent::UpdateConfig { .. }) + { + let role = robot_manager.get_role(signed_message.public_key); + info!("role: {:?}", role); + if matches!(role, RobotRole::Owner) + || matches!(role, RobotRole::OrganizationRobot) + { + let _ = to_message_tx.send(message_string); + } + } + } + } + } } async fn start( @@ -81,18 +121,30 @@ async fn start( identify::Config::new("/agent/connection/1.0.0".to_string(), key.clone().public()); let identify = identify::Behaviour::new(identify_config); + let request_response = + request_response::json::Behaviour::::new( + [( + StreamProtocol::new("/rn/1"), + request_response::ProtocolSupport::Full, + )], + request_response::Config::default(), + ); + + let ping = ping::Behaviour::default(); Ok(MyBehaviour { gossipsub, mdns, kademlia, identify, + ping, + request_response, }) })? .with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60))) .build(); // Create a Gossipsub topic - let topic = gossipsub::IdentTopic::new("merklebot"); + let topic = gossipsub::IdentTopic::new("rn"); // subscribes to our topic swarm.behaviour_mut().gossipsub.subscribe(&topic)?; @@ -102,6 +154,10 @@ async fn start( if bootstrap_addrs.len() > 0 { let bootaddr = bootstrap_addrs.first().unwrap(); swarm.dial(bootaddr.clone())?; + swarm.behaviour_mut().kademlia.add_address( + &PeerId::from_str("12D3KooWB19yrtJ8ed9YGaDFhbExo27UdVGkqNzfG1dJfTqEXVFX").unwrap(), + bootaddr.clone(), + ); } let mut from_message_rx = from_message_tx.subscribe(); @@ -110,21 +166,31 @@ async fn start( select! { msg = from_message_rx.recv()=>match msg{ Ok(msg)=>{ - let mut message = serde_json::from_str::(&msg)?; - { - let robots_manager = robots.lock().unwrap(); - message.from = robots_manager.self_peer_id.clone(); - } - // message.from = Some(std::str::from_utf8(&identity.public().to_bytes())?.to_string()); + if let Ok(mut message) = serde_json::from_str::(&msg){ + { + let robots_manager = robots.lock().unwrap(); + message.from = robots_manager.self_peer_id.clone(); + } + // message.from = Some(std::str::from_utf8(&identity.public().to_bytes())?.to_string()); - info!("libp2p received socket message: {:?}", message); + info!("libp2p received socket message: {:?}", message); + if let Ok(signed_message) = message.signed(identity.clone()){ + if let Err(e) = swarm + .behaviour_mut().gossipsub + .publish(topic.clone(), serde_json::to_string(&signed_message)?.as_bytes()) { + println!("Publish error: {e:?}"); + } + } + } else if let Ok(signed_message) = serde_json::from_str::(&msg){ + info!("libp2p received socket signed_message: {:?}", signed_message); + process_signed_message(msg.clone(), Arc::clone(&robots), to_message_tx.clone()); + if let Err(e) = swarm + .behaviour_mut().gossipsub + .publish(topic.clone(), serde_json::to_string(&signed_message)?.as_bytes()) { + println!("Publish error: {e:?}"); + } - if let Err(e) = swarm - .behaviour_mut().gossipsub - .publish(topic.clone(), serde_json::to_string(&message)?.as_bytes()) { - println!("Publish error: {e:?}"); } - } Err(_)=>{ error!("error while socket receiving libp2p message"); @@ -132,16 +198,21 @@ async fn start( }, event = swarm.select_next_some() => match event { SwarmEvent::Behaviour(behaviour)=> { + { + let mut robots_manager = robots.lock().unwrap(); + robots_manager.set_peers(swarm.connected_peers().map(|peer_id| peer_id.clone()).collect::>()) + } + info!("Event: {:?}", behaviour); match behaviour { MyBehaviourEvent::Mdns(mdns::Event::Discovered(list)) => { for (peer_id, multiaddr) in list { let ip4: String = (&multiaddr.to_string().split("/").collect::>()[2]).to_string(); info!("{:?}", ip4); - { - let mut robots_manager = robots.lock().unwrap(); - info!("Adding interface"); - robots_manager.add_interface_to_robot(peer_id.to_string(), ip4); - } + // { + // let mut robots_manager = robots.lock().unwrap(); + // info!("Adding interface"); + // robots_manager.add_interface_to_robot(peer_id.to_string(), ip4); + // } println!("mDNS discovered a new peer: {peer_id}, {multiaddr}"); swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer_id); } @@ -162,18 +233,8 @@ async fn start( String::from_utf8_lossy(&message.data), ); let message_string = String::from_utf8_lossy(&message.data).to_string(); - let message_data = serde_json::from_str::(&message_string)?; - - { - let robots_manager = robots.lock().unwrap(); - if message_data.to.unwrap_or("".to_string())==robots_manager.self_peer_id{ - let role = robots_manager.get_role(message_data.from); - info!("role: {:?}", role); - if matches!(role, RobotRole::OrganizationRobot){ - let _ = to_message_tx.send(message_string); - } - } - } + process_signed_message(message_string, Arc::clone(&robots), to_message_tx.clone()); + }, MyBehaviourEvent::Kademlia(event)=>{ match event{ @@ -200,6 +261,75 @@ async fn start( _=>{} } }, + MyBehaviourEvent::RequestResponse(event)=>{ + match event{ + request_response::Event::Message{ + peer, + message: + request_response::Message::Request{ + request, + channel, .. + }, + }=>{ + + let robot_manager = robots.lock().unwrap(); + + if let RobotRole::OrganizationRobot = robot_manager.get_role(peer.to_string()){ + match request{ + RobotRequest::GetConfigVersion{}=>{ + info!("get config verison request from peer {}", peer); + let version = robot_manager.config_version.clone(); + let _ =swarm.behaviour_mut().request_response.send_response(channel, RobotResponse::GetConfigVersion { version }); + + }, + RobotRequest::GetSignedConfigMessage{}=>{ + info!("get signed config request from peer {}", peer); + if let Some(config_message) = &robot_manager.config_message{ + let _ = swarm.behaviour_mut().request_response.send_response(channel, RobotResponse::GetSignedConfigMessage { + signed_message: config_message.clone() + }); + } + }, + _=>{ + + } + } + } + + }, + request_response::Event::Message{ + peer, + message: + request_response::Message::Response { + request_id, + response, + }, + }=>{ + match response { + RobotResponse::GetConfigVersion{version}=>{ + info!("got config version {} from peer {}", version, peer); + let robot_manager = robots.lock().unwrap(); + if version>robot_manager.config_version{ + info!("asking for new config"); + swarm.behaviour_mut().request_response.send_request(&peer, RobotRequest::GetSignedConfigMessage { } ); + } + }, + RobotResponse::GetSignedConfigMessage{signed_message}=>{ + info!("got config from peer {}", peer); + let robots = Arc::clone(&robots); + process_signed_message(serde_json::to_string(&signed_message)?, robots, to_message_tx.clone()) + } + _=>{ + + } + } + + + }, + + _=>{} + } + } _=>{} } }, @@ -216,6 +346,8 @@ async fn start( established_in }=>{ info!("ConnectionEstablished: {peer_id} | {connection_id} | {endpoint:?} | {num_established} | {concurrent_dial_errors:?} | {established_in:?}"); + let get_config_version_req = RobotRequest::GetConfigVersion{}; + swarm.behaviour_mut().request_response.send_request(&peer_id, get_config_version_req); }, _ => {} } diff --git a/src/develop/unix_socket.rs b/src/develop/unix_socket.rs index 615ad0f..a521bc8 100644 --- a/src/develop/unix_socket.rs +++ b/src/develop/unix_socket.rs @@ -1,6 +1,7 @@ use crate::store::Message; use crate::store::MessageContent; use crate::store::Robots; +use crate::store::SignedMessage; use serde::{Deserialize, Serialize}; use std::error::Error; use std::sync::{Arc, Mutex}; @@ -52,6 +53,7 @@ pub struct Content { struct SocketCommand { action: String, content: Option, + signed_message: Option, } impl SocketServer { @@ -101,13 +103,19 @@ impl SocketServer { msg = to_message_rx.recv()=>{ match msg{ Ok(msg) => { - let message = serde_json::from_str::(&msg)?; - info!("socket received libp2p message: {:?}", message.content); - let content_serialized = message.content.serialize(serde_json::value::Serializer)?; + let mut msg_to_socket: Option = None; + if let Ok(message) = serde_json::from_str::(&msg){ + info!("socket received libp2p message: {:?}", message.content); + let content_serialized = message.content.serialize(serde_json::value::Serializer)?; + msg_to_socket = Some(content_serialized.to_string()); + }else if let Ok(signed_message) = serde_json::from_str::(&msg){ + info!("socket received libp2p signed message: {:?}", signed_message); + msg_to_socket = Some(serde_json::to_string(&signed_message)?); + } let message_queue_clone = Arc::clone(&self.message_queue); - { + if let Some(msg_to_socket) = msg_to_socket{ let mut message_queue = message_queue_clone.lock().unwrap(); - message_queue.add_message(content_serialized.to_string()); + message_queue.add_message(msg_to_socket); message_queue.broadcast_messages()?; } } @@ -134,7 +142,7 @@ async fn handle_stream( robots: Robots, ) -> Result<(), Box> { stream.readable().await?; - let mut data = vec![0; 2048]; + let mut data = vec![0; 65536]; if let Ok(n) = stream.try_read(&mut data) { info!("read {} bytes", n); let message = std::str::from_utf8(&data[..n])?; @@ -142,45 +150,57 @@ async fn handle_stream( match serde_json::from_str::(message) { Ok(command) => { info!("command: {:?}", command); - if command.action == "/me" { - info!("/me request"); - //stream.writable().await?; - //match stream.try_write(b"{\"name\":\"\"}") { - // Ok(_) => {} - // Err(_) => {} - //} - } - if command.action == "/local_robots" { - info!("/local_robots request"); - stream.writable().await?; - let robots_manager = robots.lock().unwrap(); - let robots_text = robots_manager.clone().get_robots_json(); - info!("robots: {}", robots_text); - match stream.try_write(&robots_text.into_bytes()) { - Ok(_) => {} - Err(_) => { - error!("can't write /robots result to unix socket") + match command.action.as_str() { + "/me" => { + info!("/me request"); + } + "/local_robots" => { + info!("/local_robots request"); + stream.writable().await?; + let robots_manager = robots.lock().unwrap(); + let robots_text = robots_manager.clone().get_robots_json(); + info!("robots: {}", robots_text); + match stream.try_write(&robots_text.into_bytes()) { + Ok(_) => {} + Err(err) => { + error!("can't write /robots result to unix socket: {:?}", err) + } } } - } - if command.action == "/send_message" { - if let Some(message_content) = command.content { - let _ = from_message_tx.send(serde_json::to_string(&Message::new( - message_content.content, - "".to_string(), - Some(message_content.to), - ))?); - info!("Sent from unix socket to libp2p"); + "/send_message" => { + if let Some(message_content) = command.content { + let _ = from_message_tx.send(serde_json::to_string(&Message::new( + message_content.content, + "".to_string(), + Some(message_content.to), + ))?); + info!("Sent from unix socket to libp2p"); + stream.writable().await?; + stream.try_write(b"{\"ok\":true}")?; + } + } + "/send_signed_message" => { + if command.action == "/send_signed_message" { + if let Some(signed_message) = command.signed_message { + let _ = + from_message_tx.send(serde_json::to_string(&signed_message)?); + info!("Sent from unix socket to libp2p"); + stream.writable().await?; + if let Err(_err) = stream.try_write(b"{\"ok\":true}") { + error!( + "Can't write /send_signed_message result to unix socket" + ); + } + } + } + } + "/subscribe_messages" => { + info!("/subscribe_messages request"); stream.writable().await?; stream.try_write(b"{\"ok\":true}")?; + message_queue.lock().unwrap().add_subscriber(stream); } - } - - if command.action == "/subscribe_messages" { - info!("/subscribe_messages request"); - stream.writable().await?; - stream.try_write(b"{\"ok\":true}")?; - message_queue.lock().unwrap().add_subscriber(stream); + _ => {} } } Err(err) => { diff --git a/src/main.rs b/src/main.rs index d2e78fb..169025f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -20,15 +20,13 @@ use crate::store::Message; use crate::store::MessageContent; use crate::store::MessageRequest; use crate::store::MessageResponse; +use crate::store::SignedMessage; use std::sync::{Arc, Mutex}; use tokio::select; use tokio::sync::broadcast; -use agent::{Agent, AgentBuilder}; - use develop::unix_socket::SocketServer; -mod agent; mod cli; mod commands; mod develop; @@ -42,115 +40,18 @@ async fn main_normal( to_message_tx: broadcast::Sender, from_message_tx: broadcast::Sender, ) -> Result<(), Box> { - let agent: Agent = AgentBuilder::default() - .api_key(args.api_key) - .robot_server_url(args.robot_server_url.clone()) - .build(); - info!("Starting agent: {:?}", agent); let jobs: store::Jobs = Arc::new(Mutex::new(store::JobManager::default())); - let mut socket = ClientBuilder::new(args.robot_server_url) - .auth(json!({"api_key": agent.clone().api_key, "public_key": config.get_public_key_encoded(), "session_type": "ROBOT"})) - .on("error", |err, _| { - async move { eprintln!("Error: {:#?}", err) }.boxed() - }); - - { - let shared_jobs: Arc> = Arc::clone(&jobs); - let shared_agent = agent.clone(); - socket = socket.on("new_job", move |payload: Payload, socket: Client| { - let shared_jobs = Arc::clone(&shared_jobs); - let agent = shared_agent.clone(); - async move { - if let Payload::Text(str) = payload { - let robot_job: RobotJob = - serde_json::from_value(str.first().unwrap().clone()).unwrap(); //serde_json::from_str(&str).unwrap(); - commands::launch_new_job(robot_job, Some(socket), agent, shared_jobs).await - } - } - .boxed() - }) - } - - { - let shared_jobs: Arc> = Arc::clone(&jobs); - socket = socket.on("start_tunnel", move |payload: Payload, socket: Client| { - info!("Start tunnel request"); - let shared_jobs = Arc::clone(&shared_jobs); - async move { - if let Payload::Text(str) = payload { - let start_tunnel_request: commands::StartTunnelReq = - serde_json::from_value(str.first().unwrap().clone()).unwrap(); - commands::start_tunnel( - commands::TunnnelClient::SocketClient { - socket: socket, - client_id: start_tunnel_request.client_id, - job_id: start_tunnel_request.job_id.clone(), - }, - start_tunnel_request.job_id, - shared_jobs, - ) - .await - } - } - .boxed() - }) - } - { - let shared_jobs: Arc> = Arc::clone(&jobs); - socket = socket.on( - "message_to_robot", - move |payload: Payload, _socket: Client| { - info!("Message to robot request"); - let shared_jobs = Arc::clone(&shared_jobs); - async move { - match payload { - Payload::Text(payload) => { - let message: MessageToRobot = - serde_json::from_value(payload.first().unwrap().clone()).unwrap(); - commands::message_to_robot(message, shared_jobs).await - } - _ => {} - } - } - .boxed() - }, - ) - } - - { - let shared_robots: store::Robots = Arc::clone(&robots); - socket = socket.on("update_robots", move |payload: Payload, _socket: Client| { - match payload { - Payload::Text(value) => { - let robots_update: store::RobotsConfig = - serde_json::from_value(value.first().expect("no value got").clone()) - .expect("can't parse value"); - { - let mut robots_manager = shared_robots.lock().unwrap(); - robots_manager.merge_update(robots_update); - } - } - _ => {} - } - - async move {}.boxed() - }) - } - let _socket = socket.connect().await?; let mut to_message_rx = to_message_tx.subscribe(); - let _res = _socket.emit("me", json!({})).await; - - let sleep = tokio::time::sleep(Duration::from_secs(5)); - tokio::pin!(sleep); let shared_jobs: Arc> = Arc::clone(&jobs); loop { select! { msg = to_message_rx.recv()=>match msg{ Ok(msg)=>{ - let message = serde_json::from_str::(&msg)?; + let signed_message = serde_json::from_str::(&msg)?; + let message = serde_json::from_str::(&signed_message.message)?; match message.content{ MessageContent::JobMessage(message_content) =>{ info!("main got job message: {:?}", message_content); @@ -169,10 +70,28 @@ async fn main_normal( MessageContent::StartJob(robot_job)=>{ info!("new job {:?}", robot_job); let shared_jobs = Arc::clone(&shared_jobs); - let agent = agent.clone(); - commands::launch_new_job(robot_job, None, agent, shared_jobs).await; - + commands::launch_new_job(robot_job, None, shared_jobs).await; }, + MessageContent::UpdateConfig{config}=>{ + + info!("UpdateConfig: {:?}", config); + let shared_robots = Arc::clone(&robots); + let mut robot_manager = shared_robots.lock().unwrap(); + let signed_message = signed_message.clone(); + if signed_message.verify() && signed_message.public_key == robot_manager.owner_public_key{ + robot_manager.set_robots_config(config, Some(signed_message)); + info!("Config updated"); + match robot_manager.save_to_file(args.config_path.clone()){ + Ok(_)=>{ + info!("Config saved to file"); + }, + Err(_)=>{ + error!("Can't save config to file"); + } + } + } + + } MessageContent::MessageRequest(request)=>{ let mut response_content:Option = None; match request{ @@ -184,6 +103,14 @@ async fn main_normal( info!("jobs: {:?}", jobs); response_content = Some(MessageResponse::ListJobs { jobs: jobs }); }, + MessageRequest::GetRobotsConfig{}=>{ + info!("GetRobotsConfig request"); + let shared_robots = Arc::clone(&robots); + let robot_manager = shared_robots.lock().unwrap(); + let robots_config = robot_manager.get_robots_config(); + info!("config: {:?}", robots_config); + response_content = Some(MessageResponse::GetRobotsConfig { config:robots_config }) + } _=>{ } @@ -204,13 +131,6 @@ async fn main_normal( error!("error while socket receiving libp2p message"); } }, - () = &mut sleep => { - info!("me request"); - let _res = _socket - .emit("me", json!({})) - .await; - sleep.as_mut().reset(tokio::time::Instant::now() + Duration::from_secs(60)); - }, } } } @@ -237,18 +157,38 @@ async fn main() -> Result<(), Box> { } let mut config: store::Config = store::Config::generate(); - let config_load_res = store::Config::load_from_file(args.key_filename.clone()); - match config_load_res { - Ok(loaded_config) => config = loaded_config, - Err(..) => { - info!("Can't load private key from file"); + match args.clone().secret_key { + Some(secret_key_string) => { + config = store::Config::load_from_sk_string(secret_key_string)?; } + None => { + let config_load_res = store::Config::load_from_file(args.key_filename.clone()); + match config_load_res { + Ok(loaded_config) => { + config = loaded_config; + } + Err(err) => { + info!("Can't load private key from file {:?}", err); + } + } + } + } + if let Err(err) = config.save_to_file(args.key_filename.clone()) { + error!("Can't write key to file {:?}", err); } - let robots: store::Robots = Arc::new(Mutex::new(store::RobotsManager { + let mut robot_manager = store::RobotsManager { self_peer_id: config.get_peer_id(), ..Default::default() - })); + }; + robot_manager.read_config_from_file(args.config_path.clone()); + let robots: store::Robots = Arc::new(Mutex::new(robot_manager)); + + { + let mut robots_manager = robots.lock().unwrap(); + robots_manager.set_owner(args.clone().owner)?; + } + { match std::fs::read_to_string("config.json") { Ok(contents) => { @@ -272,7 +212,7 @@ async fn main() -> Result<(), Box> { let unix_socket_filename = args.socket_filename.clone(); let _unix_socket_thread = tokio::spawn(async move { info!("Start unix socket server"); - socket_server + match socket_server .start( from_message_tx_socket, to_message_tx_socket, @@ -280,6 +220,12 @@ async fn main() -> Result<(), Box> { unix_socket_filename, ) .await + { + Ok(_) => {} + Err(err) => { + error!("UNIX SOCKET MODULE PANIC: {:?}", err); + } + } }); let to_message_tx_libp2p = to_message_tx.clone(); @@ -329,9 +275,7 @@ async fn main() -> Result<(), Box> { ) .await; }); - loop { - tokio::time::sleep(Duration::from_secs(1)); - } + _main_thread.await; Ok(()) } diff --git a/src/store.rs b/src/store.rs index 8b87cf4..979a0c4 100644 --- a/src/store.rs +++ b/src/store.rs @@ -1,5 +1,7 @@ -use libp2p::Multiaddr; +use libp2p::{Multiaddr, PeerId}; +use libp2p_identity::ed25519; use serde::{Deserialize, Serialize}; +use std::any::{Any, TypeId}; use std::collections::{HashMap, HashSet}; use std::error::Error; use std::fs; @@ -31,16 +33,32 @@ pub enum ChannelMessageFromJob { ArchiveMessage { encoded_tar: String }, } +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "request_type")] +pub enum RobotRequest { + GetConfigVersion {}, + GetSignedConfigMessage {}, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "response_type")] +pub enum RobotResponse { + GetConfigVersion { version: u32 }, + GetSignedConfigMessage { signed_message: SignedMessage }, +} + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "request_type")] pub enum MessageRequest { ListJobs {}, + GetRobotsConfig {}, } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "response_type")] pub enum MessageResponse { ListJobs { jobs: Vec }, + GetRobotsConfig { config: RobotsConfig }, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -60,9 +78,7 @@ pub enum MessageContent { }, StartJob(RobotJob), UpdateConfig { - config: serde_json::Value, - signer: String, - sign: String, + config: RobotsConfig, }, } @@ -185,9 +201,25 @@ pub struct MessageManager { impl MessageManager {} +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SignedMessage { + pub message: String, + sign: Vec, + pub public_key: [u8; 32], +} + +impl SignedMessage { + pub fn verify(&self) -> bool { + if let Ok(public_key) = ed25519::PublicKey::try_from_bytes(&self.public_key) { + return public_key.verify(&*self.message.as_bytes(), &self.sign); + } + return false; + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Message { - pub timestamp: u128, + pub timestamp: String, pub content: MessageContent, pub from: String, pub to: Option, @@ -197,7 +229,7 @@ impl Message { let duration_since_epoch = SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .unwrap(); - let timestamp_nanos = duration_since_epoch.as_nanos(); + let timestamp_nanos = duration_since_epoch.as_nanos().to_string(); Self { timestamp: timestamp_nanos, content, @@ -205,25 +237,35 @@ impl Message { to, } } + pub fn signed(self, keypair: ed25519::Keypair) -> Result { + let message_str = serde_json::to_string(&self)?; + let sign = keypair.sign(&*message_str.as_bytes()); + Ok(SignedMessage { + message: message_str, + sign, + public_key: keypair.public().to_bytes(), + }) + } } #[derive(Debug, Clone, Serialize, Deserialize)] pub enum RobotRole { Current, OrganizationRobot, - OrganizationAdmin, + Owner, Unknown, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Robot { pub robot_id: String, pub robot_peer_id: String, + pub robot_public_key: String, pub name: String, pub tags: Vec, - pub interfaces: HashSet, } #[derive(Default, Debug, Clone, Serialize, Deserialize)] pub struct RobotsConfig { + pub version: u32, pub robots: Vec, } @@ -232,20 +274,99 @@ pub struct RobotInterface { pub ip4: String, } +#[derive(Default, Debug, Clone, Serialize, Deserialize)] +pub struct RobotsManagerDump { + config: RobotsConfig, + config_message: Option, +} + #[derive(Default, Debug, Clone, Serialize, Deserialize)] pub struct RobotsManager { pub self_peer_id: String, + pub owner_peer_id: String, + pub owner_public_key: [u8; 32], pub robots: HashMap, + pub config_version: u32, + pub config_message: Option, pub peer_id_to_ip: HashMap, + pub peers: Vec, } impl RobotsManager { pub fn add_robot(&mut self, robot: Robot) { self.robots .insert(robot.robot_peer_id.clone(), robot.clone()); - if let Some(ip4) = self.peer_id_to_ip.get(&robot.robot_peer_id) { - self.add_interface_to_robot(robot.robot_peer_id, ip4.to_string()); + // if let Some(ip4) = self.peer_id_to_ip.get(&robot.robot_peer_id) { + // self.add_interface_to_robot(robot.robot_peer_id, ip4.to_string()); + // } + } + + pub fn set_robots_config( + &mut self, + config: RobotsConfig, + signed_message: Option, + ) { + if config.version > self.config_version { + self.robots.clear(); + self.config_version = config.version; + for robot in config.robots { + self.add_robot(robot); + } + self.config_message = signed_message + } else { + info!("config version is too old"); + } + } + + pub fn save_to_file(&self, filepath: String) -> Result<(), Box> { + let dump = RobotsManagerDump { + config: self.get_robots_config(), + config_message: self.config_message.clone(), + }; + fs::write(filepath, serde_json::to_string(&dump)?)?; + Ok(()) + } + pub fn read_config_from_file(&mut self, filepath: String) -> Result<(), Box> { + let dump_str = fs::read_to_string(filepath)?; + let dump: RobotsManagerDump = serde_json::from_str(&dump_str)?; + self.set_robots_config(dump.config, dump.config_message); + Ok(()) + } + + pub fn get_robots_config(&self) -> RobotsConfig { + return RobotsConfig { + version: self.config_version, + robots: self + .robots + .values() + .map(|robot| robot.clone()) + .collect::>(), + }; + } + + pub fn set_peers(&mut self, peers: Vec) { + self.peers = peers + } + + pub fn set_owner(&mut self, owner_b64: String) -> Result<(), Box> { + let decoded_key = general_purpose::STANDARD.decode(owner_b64); + match decoded_key { + Ok(owner_public_key) => { + match owner_public_key.as_slice().try_into() { + Ok(pk) => { + self.owner_public_key = pk; + } + _ => { + return Err("can't transform pk array".into()); + } + }; + } + _ => { + return Err("can't decode pk from b64".into()); + } } + + Ok(()) } pub fn read_robots_from_config(&mut self, config: String) { @@ -256,25 +377,44 @@ impl RobotsManager { } } - pub fn add_interface_to_robot(&mut self, robot_peer_id: String, ip4: String) { - info!("Adding interface {} = {}", robot_peer_id, ip4); - match self.robots.get_mut(&robot_peer_id) { - Some(robot) => { - robot.interfaces.insert(RobotInterface { ip4 }); + // pub fn add_interface_to_robot(&mut self, robot_peer_id: String, ip4: String) { + // info!("Adding interface {} = {}", robot_peer_id, ip4); + // match self.robots.get_mut(&robot_peer_id) { + // Some(robot) => { + // robot.interfaces.insert(RobotInterface { ip4 }); + // } + // None => { + // info!("No robot for peer id {}", robot_peer_id); + // self.peer_id_to_ip.insert(robot_peer_id, ip4); + // } + // } + // } + + pub fn get_role(&self, id: T) -> RobotRole { + let value_any = &id as &dyn Any; + if let Some(peer_id) = value_any.downcast_ref::() { + for robot_peer_id in self.robots.keys() { + if robot_peer_id == peer_id { + return RobotRole::OrganizationRobot; + } } - None => { - info!("No robot for peer id {}", robot_peer_id); - self.peer_id_to_ip.insert(robot_peer_id, ip4); + } else if let Some(public_key) = value_any.downcast_ref::<[u8; 32]>() { + if *public_key == self.owner_public_key { + return RobotRole::Owner; } - } - } - pub fn get_role(&self, peer_id: String) -> RobotRole { - for robot_peer_id in self.robots.keys() { - if robot_peer_id == &peer_id { - return RobotRole::OrganizationRobot; + let pk_str = general_purpose::STANDARD.encode(&public_key.to_vec()); + for robot_public_key in self + .robots + .values() + .map(|robot| robot.robot_public_key.clone()) + { + if robot_public_key == pk_str { + return RobotRole::OrganizationRobot; + } } } + return RobotRole::Unknown; } @@ -338,6 +478,17 @@ impl Config { }) } + pub fn load_from_sk_string(sk_string: String) -> Result> { + let decoded_key: &mut [u8] = &mut general_purpose::STANDARD.decode(sk_string)?; + let parsed_secret = libp2p::identity::ed25519::SecretKey::try_from_bytes(decoded_key)?; + let identity = libp2p::identity::ed25519::Keypair::from(parsed_secret); + Ok(Self { + identity, + bootstrap_addrs: Vec::new(), + libp2p_port: 0, + }) + } + pub fn add_bootstrap_addr(&mut self, addr: Multiaddr) { self.bootstrap_addrs.push(addr); }