From 1f7b56ce91a9343b355083840aed9ca9c28151a3 Mon Sep 17 00:00:00 2001 From: smehnov Date: Mon, 26 Aug 2024 21:17:42 +0300 Subject: [PATCH] websocket rpc + users in config + small code structure changes --- .gitpod.yml | 11 - Cargo.lock | 68 +++++- Cargo.toml | 1 + config_example.json | 18 -- install.sh | 15 +- script.py | 56 ----- src/cli.rs | 7 +- src/commands/docker.rs | 15 +- src/commands/mod.rs | 10 +- src/core/core.rs | 152 ++++++++++++ src/core/mod.rs | 2 + src/develop.rs | 1 - src/external_api/messages.rs | 134 +++++++++++ src/external_api/mod.rs | 6 + src/{develop => external_api}/unix_socket.rs | 169 ++++++-------- src/external_api/web_socket.rs | 159 +++++++++++++ src/main.rs | 230 +++---------------- src/node/mod.rs | 3 + src/{develop/mdns.rs => node/node.rs} | 152 +++++++----- src/store.rs | 187 ++++++++++++--- 20 files changed, 894 insertions(+), 502 deletions(-) delete mode 100644 .gitpod.yml delete mode 100644 config_example.json delete mode 100644 script.py create mode 100644 src/core/core.rs create mode 100644 src/core/mod.rs create mode 100644 src/external_api/messages.rs create mode 100644 src/external_api/mod.rs rename src/{develop => external_api}/unix_socket.rs (53%) create mode 100644 src/external_api/web_socket.rs create mode 100644 src/node/mod.rs rename src/{develop/mdns.rs => node/node.rs} (76%) diff --git a/.gitpod.yml b/.gitpod.yml deleted file mode 100644 index de291ac..0000000 --- a/.gitpod.yml +++ /dev/null @@ -1,11 +0,0 @@ -# This configuration file was automatically generated by Gitpod. -# Please adjust to your needs (see https://www.gitpod.io/docs/introduction/learn-gitpod/gitpod-yaml) -# and commit this file to your remote git repository to share the goodness with others. - -# Learn more from ready-to-use templates: https://www.gitpod.io/docs/introduction/getting-started/quickstart - -tasks: - - init: cargo build - command: cargo watch -x run - - diff --git a/Cargo.lock b/Cargo.lock index 2e541da..4630a34 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -510,7 +510,7 @@ version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8d9a9bf8b79a749ee0b911b91b671cc2b6c670bdbc7e3dfd537576ddc94bb2a2" dependencies = [ - "http", + "http 0.2.12", "log", "url", ] @@ -532,7 +532,7 @@ dependencies = [ "bitflags 1.3.2", "bytes", "futures-util", - "http", + "http 0.2.12", "http-body", "hyper", "itoa", @@ -558,7 +558,7 @@ dependencies = [ "async-trait", "bytes", "futures-util", - "http", + "http 0.2.12", "http-body", "mime", "rustversion", @@ -687,7 +687,7 @@ dependencies = [ "futures-core", "futures-util", "hex", - "http", + "http 0.2.12", "hyper", "hyperlocal", "log", @@ -1653,7 +1653,7 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http", + "http 0.2.12", "indexmap 2.2.6", "slab", "tokio", @@ -1833,6 +1833,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http-body" version = "0.4.6" @@ -1840,7 +1851,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" dependencies = [ "bytes", - "http", + "http 0.2.12", "pin-project-lite", ] @@ -1873,7 +1884,7 @@ dependencies = [ "futures-core", "futures-util", "h2", - "http", + "http 0.2.12", "http-body", "httparse", "httpdate", @@ -2007,7 +2018,7 @@ dependencies = [ "attohttpc", "bytes", "futures", - "http", + "http 0.2.12", "hyper", "log", "rand 0.8.5", @@ -3962,7 +3973,7 @@ dependencies = [ "futures-core", "futures-util", "h2", - "http", + "http 0.2.12", "http-body", "hyper", "hyper-tls", @@ -4061,6 +4072,7 @@ dependencies = [ "serde_json", "termion", "tokio", + "tokio-tungstenite", "tracing", "tracing-subscriber", "tracing-test", @@ -4843,6 +4855,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.23.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6989540ced10490aaf14e6bad2e3d33728a2813310a0c71d1574304c49631cd" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.7.11" @@ -4869,7 +4893,7 @@ dependencies = [ "futures-core", "futures-util", "h2", - "http", + "http 0.2.12", "http-body", "hyper", "hyper-timeout", @@ -5004,6 +5028,24 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e2e2ce1e47ed2994fd43b04c8f618008d4cabdd5ee34027cf14f9d918edd9c8" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http 1.1.0", + "httparse", + "log", + "rand 0.8.5", + "sha1", + "thiserror", + "utf-8", +] + [[package]] name = "typenum" version = "1.17.0" @@ -5107,6 +5149,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "utf8parse" version = "0.2.2" diff --git a/Cargo.toml b/Cargo.toml index ddb5a68..6189693 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,5 +32,6 @@ base64 = "0.22.0" libp2p-identity = { version="0.2.8", features=["peerid"]} openssl = { version = "0.10", features = ["vendored"] } libp2p-kad = "0.45.3" +tokio-tungstenite = "0.23.1" diff --git a/config_example.json b/config_example.json deleted file mode 100644 index 7f6b2b3..0000000 --- a/config_example.json +++ /dev/null @@ -1,18 +0,0 @@ -{ - "robots": [ - { - "robot_id": "device-0", - "robot_peer_id": "12D3KooWKnY2J5CFny3Ef8abndmg9U4gAkndUDPM4oMP7yVbftBK", - "name": "spot", - "tags": [], - "interfaces": [] - }, - { - "robot_id": "device-1", - "robot_peer_id": "12D3KooWK8ogtRq7DXD21ji9nwkTgCfuz6AYU9dys8GrH7weC2AC", - "name": "jetson-1", - "tags": [], - "interfaces": [] - } - ] -} diff --git a/install.sh b/install.sh index 4f44a20..2df7a71 100644 --- a/install.sh +++ b/install.sh @@ -1,4 +1,5 @@ -api_key=$1 +owner_public_key=$1 +secret_key=$2 echo $api_key os=$(uname) architecture=$(uname -m) @@ -9,7 +10,7 @@ if [[ "$os" == "Darwin" ]]; then fi username=$(whoami) -agent_download_url="$(curl -s https://api.github.com/repos/merklebot/robot-agent/releases/latest | grep /robot-agent-$platform | cut -d : -f 2,3 | tr -d \")" +agent_download_url="$(curl -s https://api.github.com/repos/otaberu/robot-agent/releases/latest | grep /robot-agent-$platform | cut -d : -f 2,3 | tr -d \")" @@ -31,15 +32,15 @@ SERVICE_NAME=robot-agent.service echo "Creating systemd service" -service_path=/etc/systemd/system/merklebot.service +service_path=/etc/systemd/system/robotagent.service sudo tee -a $service_path << EOF [Unit] -Description=Merklebot robot agent +Description=Robot agent After=network.target [Service] -ExecStart=$agent_binary_location -a $api_key +ExecStart=$agent_binary_location -o $owner_public_key -k $secret_key Restart=on-failure [Install] @@ -47,6 +48,6 @@ WantedBy=multi-user.target EOF -sudo systemctl enable merklebot -sudo systemctl start merklebot +sudo systemctl enable robotagent +sudo systemctl start robotagent echo "Service Started" diff --git a/script.py b/script.py deleted file mode 100644 index 09043fc..0000000 --- a/script.py +++ /dev/null @@ -1,56 +0,0 @@ -import socket -import os -import json -import pprint -import time -import sys - -socket_path = 'm1.socket' - -def send_request(action, content=None): - client = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - client.connect(os.path.realpath(socket_path)) - message = {"action": action} - if content: - message["content"] = content - client.sendall(json.dumps(message).encode()) - response = client.recv(2048).decode() - client.close() - print(response) - return json.loads(response) - -def subscribe_messages(): - with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as client: - client.connect(os.path.realpath(socket_path)) - print("Subscribed to messages") - client.sendall(json.dumps({"action": "/subscribe_messages"}).encode()) - while True: - time.sleep(0.05) - response = client.recv(1024).decode() - if response: - print(response) - - -def main(): - pass - #robot = send_request("/me") - #print('=== /me ===') - #pprint.pprint(robot) - local_devices = send_request("/local_robots") - print('=== /local_robots ===') - pprint.pprint(local_devices) - exit(0) - #subscribe_messages() - #while True: - # inp = sys.stdin.read(1) - # print(inp) - # #res = send_request("/send_message", "w") - # #print(res) -def press(key): - print(key) - res =send_request("/send_message", key) - print(res) - -if __name__=='__main__': - main() - subscribe_messages() diff --git a/src/cli.rs b/src/cli.rs index 3fe88a5..7aeff5b 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -6,8 +6,8 @@ pub struct Args { #[arg(short = 'm', long, default_value = "normal")] pub mode: String, - #[arg(short = 's', long, default_value = "rn.socket")] - pub socket_filename: String, + #[arg(short = 's', long)] + pub socket_filename: Option, #[arg(short = 'f', long, default_value = "rn.key")] pub key_filename: String, @@ -18,6 +18,9 @@ pub struct Args { #[arg(short = 'l', long, default_value = "8765")] pub port_libp2p: String, + #[arg(short = 'r', long)] + pub rpc: Option, + #[arg(short = 'c', long, default_value = "rn.json")] pub config_path: String, diff --git a/src/commands/docker.rs b/src/commands/docker.rs index 8b9de2f..3b400b4 100644 --- a/src/commands/docker.rs +++ b/src/commands/docker.rs @@ -13,10 +13,7 @@ use base64::{engine::general_purpose, Engine as _}; use crate::{ commands::{RobotJob, RobotJobResult}, store::Jobs, - utils::files::{ - create_job_data_dir, get_files_in_directory_recursively, get_job_data_path, - get_data_path, upload_content, - }, + utils::files::create_job_data_dir, }; pub async fn execute_launch(robot_job: RobotJob, jobs: Jobs) { @@ -111,7 +108,7 @@ impl DockerLaunch { Ok(path) => { info!("Sharing dir {}", path); // 2. Share folder as volume - volumes.push(format!("{}:{}", path, "/merklebot/job_data/")); + volumes.push(format!("{}:{}", path, "/rn/job_data/")); } _ => { error!("Couldn't create shared dir for job {}", robot_job.id); @@ -230,7 +227,7 @@ impl DockerLaunch { } } crate::store::ChannelMessageToJob::ArchiveRequest { - path, + .. } => {} } } @@ -298,8 +295,8 @@ impl DockerLaunch { status: String::from("done"), logs: concatenated_logs, }; - let job_data_path = get_job_data_path(&robot_job.id); - + //let job_data_path = get_job_data_path(&robot_job.id); + // // if let Some(true) = &self.args.store_data { // match get_files_in_directory_recursively(&job_data_path) { // //TODO: change to path @@ -307,7 +304,7 @@ impl DockerLaunch { // info!("{:?}", paths); // for path in paths { // let path_str = path.as_path().display().to_string(); - // let key = path_str.replace(&get_merklebot_data_path(), ""); + // let key = path_str.replace(&get_rn_data_path(), ""); // upload_content( // agent.robot_server_url.clone(), // path, diff --git a/src/commands/mod.rs b/src/commands/mod.rs index 05abe73..f6dad5d 100644 --- a/src/commands/mod.rs +++ b/src/commands/mod.rs @@ -3,7 +3,6 @@ use tokio::sync::broadcast; use serde::{Deserialize, Serialize}; -use serde_json::json; use tokio::sync::broadcast::Sender; use tracing::{error, info}; @@ -61,8 +60,9 @@ pub enum TunnnelClient { }, } -pub async fn launch_new_job(robot_job: RobotJob, jobs: store::Jobs) { +pub async fn launch_new_job(robot_job: RobotJob, jobs: &store::Jobs) { info!("{:?}", robot_job); + let jobs = Arc::clone(jobs); let mut job_manager = jobs.lock().unwrap(); job_manager.new_job( robot_job.id.clone(), @@ -125,8 +125,9 @@ pub async fn start_tunnel_messanger( } } -pub async fn start_tunnel(tunnel_client: TunnnelClient, job_id: String, jobs: store::Jobs) { +pub async fn start_tunnel(tunnel_client: TunnnelClient, job_id: String, jobs: &store::Jobs) { info!("Start tunnel request"); + let jobs = Arc::clone(jobs); let mut job_manager = jobs.lock().unwrap(); match job_manager.get_job_or_none(&job_id) { @@ -157,10 +158,11 @@ pub async fn start_tunnel(tunnel_client: TunnnelClient, job_id: String, jobs: st } } -pub async fn message_to_robot(message: MessageToRobot, jobs: store::Jobs) { +pub async fn message_to_robot(message: MessageToRobot, jobs: &store::Jobs) { info!("Message to robot request"); info!("Message to robot: {:?}", message); + let jobs = Arc::clone(jobs); let job_manager = jobs.lock().unwrap(); if let Some(_job) = job_manager.get_job_or_none(&message.job_id) { diff --git a/src/core/core.rs b/src/core/core.rs new file mode 100644 index 0000000..b0fd481 --- /dev/null +++ b/src/core/core.rs @@ -0,0 +1,152 @@ +use crate::store::Message; +use crate::store::MessageContent; +use crate::store::MessageRequest; +use crate::store::MessageResponse; +use crate::store::RobotRole; +use crate::store::SignedMessage; + +use crate::commands; + +use crate::cli::Args; +use crate::store::{JobManager, Jobs, Robots}; +use std::sync::{Arc, Mutex}; +use tokio::select; +use tokio::sync::broadcast; +use tokio::task::JoinHandle; +use tracing::{error, info}; + +use std::error::Error; + +pub async fn main_normal( + args: Args, + robots: Robots, + to_message_tx: broadcast::Sender, + from_message_tx: broadcast::Sender, +) -> Result<(), Box> { + let jobs: Jobs = Arc::new(Mutex::new(JobManager::default())); + + let mut to_message_rx = to_message_tx.subscribe(); + + loop { + select! { + msg = to_message_rx.recv()=>match msg{ + Ok(msg)=>{ + let signed_message = serde_json::from_str::(&msg)?; + let message = serde_json::from_str::(&signed_message.message)?; + + let mut should_process = false; + { + let robot_manager = robots.lock().unwrap(); + if message.to.unwrap_or("".to_string()) == robot_manager.self_peer_id + || matches!(message.content, MessageContent::UpdateConfig { .. }) + { + let role = robot_manager.get_role(signed_message.public_key); + info!("role: {:?}", role); + if matches!(role, RobotRole::Owner) + || matches!(role, RobotRole::OrganizationUser) + { + should_process = true + } + } + } + info!("should process {}", should_process); + + if should_process{ + match message.content{ + MessageContent::JobMessage(message_content) =>{ + info!("main got job message: {:?}", message_content); + if let Ok(message) = serde_json::from_value::(message_content){ + commands::message_to_robot(message, &jobs).await + }else{ + error!("Can't deserialize MessageToRobot"); + } + }, + MessageContent::StartTunnelReq { job_id, peer_id }=>{ + + commands::start_tunnel(commands::TunnnelClient::RobotClient { peer_id: peer_id, from_robot_tx: from_message_tx.clone(), job_id: job_id.clone() }, job_id, &jobs).await + }, + MessageContent::StartJob(robot_job)=>{ + info!("new job {:?}", robot_job); + commands::launch_new_job(robot_job, &jobs).await; + }, + MessageContent::UpdateConfig{config}=>{ + + info!("UpdateConfig: {:?}", config); + let shared_robots = Arc::clone(&robots); + let mut robot_manager = shared_robots.lock().unwrap(); + let signed_message = signed_message.clone(); + if signed_message.verify() && signed_message.public_key == robot_manager.owner_public_key{ + robot_manager.set_robots_config(config, signed_message); + info!("Config updated"); + match robot_manager.save_to_file(args.config_path.clone()){ + Ok(_)=>{ + info!("Config saved to file"); + }, + Err(_)=>{ + error!("Can't save config to file"); + } + } + } + + } + MessageContent::MessageRequest(request)=>{ + let response_content:Option; + match request{ + MessageRequest::ListJobs{}=>{ + info!("ListJobs request"); + let job_manager = jobs.lock().unwrap(); + let jobs = job_manager.get_jobs_info(); + info!("jobs: {:?}", jobs); + response_content = Some(MessageResponse::ListJobs { jobs: jobs }); + }, + MessageRequest::GetRobotsConfig{}=>{ + info!("GetRobotsConfig request"); + let robot_manager = robots.lock().unwrap(); + let robots_config = robot_manager.get_robots_config(); + info!("config: {:?}", robots_config); + response_content = Some(MessageResponse::GetRobotsConfig { config:robots_config }) + } + } + if let Some(message_response) =response_content{ + let message_content = MessageContent::MessageResponse(message_response); + let _ = from_message_tx.send(serde_json::to_string(&Message::new( + message_content, + "".to_string(), + Some(message.from), + ))?); + } + }, + _=>{} + } + } + }, + Err(_)=>{ + error!("error while socket receiving libp2p message"); + } + }, + } + } +} + +pub async fn start_core_thread( + from_message_tx: &broadcast::Sender, + to_message_tx: &broadcast::Sender, + robots: &Robots, + args: &Args, +) -> JoinHandle<()> { + let robots = Arc::clone(&robots); + let args = args.clone(); + let to_message_tx = to_message_tx.clone(); + let from_message_tx = from_message_tx.clone(); + + let main_thread = tokio::spawn(async move { + match main_normal(args, robots, to_message_tx, from_message_tx).await { + Ok(_) => {} + Err(err) => { + error!("CORE MODULE PANIC: {:?}", err); + } + }; + }); + + return main_thread; +} diff --git a/src/core/mod.rs b/src/core/mod.rs new file mode 100644 index 0000000..8d5a9f8 --- /dev/null +++ b/src/core/mod.rs @@ -0,0 +1,2 @@ +mod core; +pub use core::start_core_thread; diff --git a/src/develop.rs b/src/develop.rs index c9af4f6..e06d341 100644 --- a/src/develop.rs +++ b/src/develop.rs @@ -1,2 +1 @@ -pub mod mdns; pub mod unix_socket; diff --git a/src/external_api/messages.rs b/src/external_api/messages.rs new file mode 100644 index 0000000..52f83b1 --- /dev/null +++ b/src/external_api/messages.rs @@ -0,0 +1,134 @@ +use crate::store::Message; +use crate::store::MessageContent; +use crate::store::Robots; +use crate::store::SignedMessage; +use base64::{engine::general_purpose, Engine as _}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::error::Error; +use std::sync::{Arc, Mutex}; +use tokio::sync::broadcast; +use tracing::{error, info}; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Content { + pub to: String, + pub content: MessageContent, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct SocketCommand { + pub action: String, + pub content: Option, + pub signed_message: Option, + pub action_param: Option, +} + +#[derive(Default)] +pub struct MessageQueue { + messages: Vec, + subscribers: HashMap>, // PeerId -> tx to socket thread +} +impl MessageQueue { + pub fn add_subscriber(&mut self, peer_id: String, tx: broadcast::Sender) { + self.subscribers.insert(peer_id, tx); + } + pub fn add_message(&mut self, message: String) { + self.messages.push(message); + } + + pub fn broadcast_messages(&mut self) -> Result<(), broadcast::error::SendError> { + if let Some(last_message) = self.messages.last() { + let last_message = match serde_json::from_str::(&last_message) { + Ok(signed_message) => signed_message.message, + _ => last_message.clone(), + }; + if let Ok(message) = serde_json::from_str::(&last_message) { + info!("socket received libp2p message: {:?}", message); + if let Ok(_content_serialized) = serde_json::to_string(&message.content) { + if let Some(peer_id) = message.to { + if let Some(tx) = self.subscribers.get(&peer_id) { + if let Err(err) = tx.send(last_message.clone()) { + error!("Error while sending message to ws tx: {:?}", err); + } + } + } + } + } + } + Ok(()) + } +} + +pub fn process_command( + command: SocketCommand, + sender: broadcast::Sender, + robots: Robots, + from_message_tx: &broadcast::Sender, + message_queue: Arc>, +) -> Result> { + info!("command: {:?}", command); + let mut answer: Option = None; + match command.action.as_str() { + "/me" => { + info!("/me request"); + } + "/config" => { + info!("/config request"); + let robots_manager = robots.lock().unwrap(); + if let Some(owner_b64) = command.action_param { + let decoded_owner = general_purpose::STANDARD.decode(owner_b64)?; + info!("decoded_owner {:?}", decoded_owner); + if let Some((config, _)) = robots_manager + .config_storage + .get_config(decoded_owner.as_slice().try_into()?) + { + let config_text = serde_json::to_string(&config)?; + info!("config: {}", config_text); + answer = Some(config_text); + } else { + info!("{:?}", robots_manager.config_storage); + info!("no config found"); + answer = Some("{\"ok\":false}".to_string()); + } + } else { + answer = Some("{\"ok\":false}".to_string()); + } + } + "/send_message" => { + if let Some(message_content) = command.content { + let _ = from_message_tx.send(serde_json::to_string(&Message::new( + message_content.content, + "".to_string(), + Some(message_content.to), + ))?); + info!("Sent from unix socket to libp2p"); + answer = Some("{\"ok\":true}".to_string()); + } + } + "/send_signed_message" => { + if command.action == "/send_signed_message" { + if let Some(signed_message) = command.signed_message { + let _ = from_message_tx.send(serde_json::to_string(&signed_message)?); + info!("Sent from unix socket to libp2p"); + answer = Some("{\"ok\":true}".to_string()); + } + } + } + "/subscribe_messages" => { + info!("/subscribe_messages request"); + if let Some(client_peer_id) = command.action_param { + message_queue + .lock() + .unwrap() + .add_subscriber(client_peer_id, sender); + answer = Some("{\"ok\":true}".to_string()); + } else { + answer = Some("{\"ok\":false}".to_string()); + } + } + _ => {} + } + + return Ok(answer.ok_or("no answer")?); +} diff --git a/src/external_api/mod.rs b/src/external_api/mod.rs new file mode 100644 index 0000000..81c7008 --- /dev/null +++ b/src/external_api/mod.rs @@ -0,0 +1,6 @@ +mod messages; +mod unix_socket; +mod web_socket; + +pub use unix_socket::start_unix_socket_thread; +pub use web_socket::start_web_socket_thread; diff --git a/src/develop/unix_socket.rs b/src/external_api/unix_socket.rs similarity index 53% rename from src/develop/unix_socket.rs rename to src/external_api/unix_socket.rs index a521bc8..2178eee 100644 --- a/src/develop/unix_socket.rs +++ b/src/external_api/unix_socket.rs @@ -1,61 +1,22 @@ +use super::messages::{process_command, MessageQueue, SocketCommand}; +use crate::cli::Args; use crate::store::Message; -use crate::store::MessageContent; use crate::store::Robots; use crate::store::SignedMessage; -use serde::{Deserialize, Serialize}; use std::error::Error; use std::sync::{Arc, Mutex}; use tokio::net::{UnixListener, UnixStream}; use tokio::select; use tokio::sync::broadcast; +use tokio::task::JoinHandle; use tracing::{error, info}; -#[derive(Default)] -pub struct MessageQueue { - messages: Vec, - subscribers: Vec, -} -impl MessageQueue { - pub fn add_subscriber(&mut self, stream: UnixStream) { - self.subscribers.push(stream); - } - pub fn add_message(&mut self, message: String) { - self.messages.push(message); - } - - pub fn broadcast_messages(&mut self) -> Result<(), Box> { - if let Some(last_message) = self.messages.last() { - let message_bytes = last_message.as_bytes(); - - for stream in &mut self.subscribers { - if let Err(err) = stream.try_write(message_bytes) { - error!("Can't write message to socket {:?}", err); - } - } - } - Ok(()) - } -} - #[derive(Default)] pub struct SocketServer { listener: Option, message_queue: Arc>, } -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct Content { - to: String, - content: MessageContent, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -struct SocketCommand { - action: String, - content: Option, - signed_message: Option, -} - impl SocketServer { async fn create_listener(&mut self, socket_filename: String) -> Result<(), Box> { info!("creating listener"); @@ -77,8 +38,10 @@ impl SocketServer { socket_filename: String, ) -> Result<(), Box> { info!("Starting server"); - if (self.create_listener(socket_filename).await).is_ok() {}; + let from_message_tx = from_message_tx.clone(); + let to_message_tx = to_message_tx.clone(); + if (self.create_listener(socket_filename).await).is_ok() {}; match &self.listener { Some(listener) => { info!("Server started"); @@ -93,7 +56,7 @@ impl SocketServer { let stream_robots = Arc::clone(&robots); let message_queue= Arc::clone(&self.message_queue); - let _ = handle_stream(stream, message_queue, from_message_tx.clone(), stream_robots).await; + let _ = handle_stream(stream, message_queue, from_message_tx.clone(), stream_robots).await?; }, Err(_)=>{ error!("Error while accepting connection"); @@ -106,7 +69,7 @@ impl SocketServer { let mut msg_to_socket: Option = None; if let Ok(message) = serde_json::from_str::(&msg){ info!("socket received libp2p message: {:?}", message.content); - let content_serialized = message.content.serialize(serde_json::value::Serializer)?; + let content_serialized = serde_json::to_string(&message.content)?; msg_to_socket = Some(content_serialized.to_string()); }else if let Ok(signed_message) = serde_json::from_str::(&msg){ info!("socket received libp2p signed message: {:?}", signed_message); @@ -116,7 +79,9 @@ impl SocketServer { if let Some(msg_to_socket) = msg_to_socket{ let mut message_queue = message_queue_clone.lock().unwrap(); message_queue.add_message(msg_to_socket); - message_queue.broadcast_messages()?; + if let Err(err) = message_queue.broadcast_messages(){ + error!("Error while broadcasting messages: {:?}", err); + } } } Err(_) => { @@ -128,7 +93,7 @@ impl SocketServer { } } None => { - error!("Listener not initialized") + error!("Listener not initialized"); } } Ok(()) @@ -140,67 +105,27 @@ async fn handle_stream( message_queue: Arc>, from_message_tx: broadcast::Sender, robots: Robots, -) -> Result<(), Box> { +) -> Result<(), Box> { stream.readable().await?; let mut data = vec![0; 65536]; + let (subscriber_tx, mut subscriber_rx) = broadcast::channel::(16); + if let Ok(n) = stream.try_read(&mut data) { info!("read {} bytes", n); let message = std::str::from_utf8(&data[..n])?; info!("message: {}", message); match serde_json::from_str::(message) { Ok(command) => { - info!("command: {:?}", command); - match command.action.as_str() { - "/me" => { - info!("/me request"); - } - "/local_robots" => { - info!("/local_robots request"); - stream.writable().await?; - let robots_manager = robots.lock().unwrap(); - let robots_text = robots_manager.clone().get_robots_json(); - info!("robots: {}", robots_text); - match stream.try_write(&robots_text.into_bytes()) { - Ok(_) => {} - Err(err) => { - error!("can't write /robots result to unix socket: {:?}", err) - } - } - } - "/send_message" => { - if let Some(message_content) = command.content { - let _ = from_message_tx.send(serde_json::to_string(&Message::new( - message_content.content, - "".to_string(), - Some(message_content.to), - ))?); - info!("Sent from unix socket to libp2p"); - stream.writable().await?; - stream.try_write(b"{\"ok\":true}")?; - } - } - "/send_signed_message" => { - if command.action == "/send_signed_message" { - if let Some(signed_message) = command.signed_message { - let _ = - from_message_tx.send(serde_json::to_string(&signed_message)?); - info!("Sent from unix socket to libp2p"); - stream.writable().await?; - if let Err(_err) = stream.try_write(b"{\"ok\":true}") { - error!( - "Can't write /send_signed_message result to unix socket" - ); - } - } - } - } - "/subscribe_messages" => { - info!("/subscribe_messages request"); - stream.writable().await?; - stream.try_write(b"{\"ok\":true}")?; - message_queue.lock().unwrap().add_subscriber(stream); - } - _ => {} + let answer = process_command( + command, + subscriber_tx, + robots, + &from_message_tx, + message_queue, + )?; + stream.writable().await?; + if let Err(err) = stream.try_write(&answer.into_bytes()) { + error!("Can't write command answer to unix socket: {:?}", err); } } Err(err) => { @@ -209,5 +134,47 @@ async fn handle_stream( } } } - Ok(()) + loop { + select! { + msg = subscriber_rx.recv()=>{ + if let Ok(msg) = msg{ + info!("Got subscriber msg: {}", msg); + stream.writable().await?; + if let Err(err) = stream.try_write(&msg.into_bytes()){ + error!("Error while sending message to subscriber socket: {:?}", err); + } + } + + + } + } + } +} + +pub async fn start_unix_socket_thread( + from_message_tx: &broadcast::Sender, + to_message_tx: &broadcast::Sender, + robots: &Robots, + args: &Args, +) -> JoinHandle<()> { + let mut socket_server = SocketServer::default(); + let from_message_tx = from_message_tx.clone(); + let to_message_tx = to_message_tx.clone(); + let robots = Arc::clone(&robots); + + let unix_socket_filename = args.socket_filename.clone().unwrap(); + let unix_socket_thread = tokio::spawn(async move { + info!("Start unix socket server"); + match socket_server + .start(from_message_tx, to_message_tx, robots, unix_socket_filename) + .await + { + Ok(_) => {} + Err(err) => { + error!("UNIX SOCKET MODULE PANIC: {:?}", err); + } + } + }); + + return unix_socket_thread; } diff --git a/src/external_api/web_socket.rs b/src/external_api/web_socket.rs new file mode 100644 index 0000000..bff8f8c --- /dev/null +++ b/src/external_api/web_socket.rs @@ -0,0 +1,159 @@ +use crate::cli::Args; +use crate::store::Robots; +use futures::SinkExt; +use std::error::Error; +use std::sync::{Arc, Mutex}; +use tokio::select; +use tokio::sync::broadcast; +use tokio::task::JoinHandle; +use tokio_tungstenite::tungstenite; +use tracing::{error, info}; + +use tokio::net::{TcpListener, TcpStream}; + +use futures_util::StreamExt; + +use super::messages::{process_command, MessageQueue, SocketCommand}; + +pub async fn start( + from_message_tx: broadcast::Sender, + to_message_tx: broadcast::Sender, + robots: Robots, + rpc_addr: String, +) -> Result<(), Box> { + let listener = TcpListener::bind(&rpc_addr) + .await + .expect("can't bind web socket address"); + info!("Websocket listening on {}", rpc_addr); + + let message_queue = Arc::new(Mutex::new(MessageQueue::default())); + let mut to_message_rx = to_message_tx.subscribe(); + + loop { + select! { + listener_res = listener.accept()=>{ + if let Ok((stream, _)) = listener_res{ + + let robots = Arc::clone(&robots); + let message_queue= Arc::clone(&message_queue); + let from_message_tx =from_message_tx.clone(); + + tokio::spawn(async { + let _ = accept_connection(stream, message_queue, from_message_tx, robots).await; + }); + } + }, + msg = to_message_rx.recv()=>{ + info!("Got msg: {:?}", msg); + match msg{ + Ok(msg) => { + let message_queue_clone = Arc::clone(&message_queue); + let mut message_queue = message_queue_clone.lock().unwrap(); + message_queue.add_message(msg); + if let Err(err) = message_queue.broadcast_messages(){ + error!("Error while broadcasting messages: {:?}", err); + } + } + Err(_) => { + error!("error while socket receiving libp2p message"); + } + } + } + + } + } +} + +pub async fn accept_connection( + stream: TcpStream, + message_queue: Arc>, + from_message_tx: broadcast::Sender, + robots: Robots, +) -> Result<(), Box> { + let addr = stream + .peer_addr() + .expect("connected streams should have a peer address"); + info!("Peer address: {}", addr); + + let ws_stream = tokio_tungstenite::accept_async(stream) + .await + .expect("Error during the websocket handshake occurred"); + + info!("New WebSocket connection: {}", addr); + + let (mut ws_sender, mut ws_receiver) = ws_stream.split(); + let (subscriber_tx, mut subscriber_rx) = broadcast::channel::(16); + loop { + select! { + msg = ws_receiver.next() =>{ + match msg{ + Some(msg)=>{ + let msg = msg?; + info!("New message: {:?}", msg); + if msg.is_text(){ + let message = msg.to_text()?; + if let Ok(command) = serde_json::from_str::(&message){ + info!("Got command: {:?}", command); + let robots = Arc::clone(&robots); + let message_queue = Arc::clone(&message_queue); + + if let Ok(answer) = process_command(command, subscriber_tx.clone(), robots, &from_message_tx, message_queue){ + match ws_sender.send(tungstenite::Message::Text(answer)).await{ + Ok(_)=>{ + info!("Answer sent"); + }, + Err(err)=>{ + error!("Error while sending answer {:?}", err); + } + } + } + } + }else{ + break; + } + + }, + None => {break;} + } + }, + msg = subscriber_rx.recv()=>{ + info!("got msg for subscriber: {:?}", msg); + if let Ok(msg) = msg{ + match ws_sender.send(tungstenite::Message::Text(msg)).await{ + Ok(_)=>{ + info!("Msg sent to subscriber"); + }, + Err(err)=>{ + error!("Couldn't send msg to subscriber: {:?}", err); + } + } + } + } + } + } + Ok(()) +} + +pub async fn start_web_socket_thread( + from_message_tx: &broadcast::Sender, + to_message_tx: &broadcast::Sender, + robots: &Robots, + args: &Args, +) -> JoinHandle<()> { + let from_message_tx = from_message_tx.clone(); + let to_message_tx = to_message_tx.clone(); + let robots = Arc::clone(&robots); + + let rpc_addr = args.clone().rpc.unwrap_or("127.0.0.1:8888".to_string()); + let web_socket_thread = tokio::spawn(async move { + info!("Start unix socket server"); + match start(from_message_tx, to_message_tx, robots, rpc_addr).await { + Ok(_) => {} + Err(err) => { + error!("WEB SOCKET MODULE PANIC: {:?}", err); + } + } + }); + + return web_socket_thread; +} diff --git a/src/main.rs b/src/main.rs index 1650983..fa3303e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,138 +1,29 @@ use std::error::Error; -use commands::MessageToRobot; -use futures_util::FutureExt; +use core::start_core_thread; +use external_api::{start_unix_socket_thread, start_web_socket_thread}; +use node::start_libp2p_thread; use libp2p::Multiaddr; -use cli::Args; - use tracing::{error, info}; use tracing_subscriber::FmtSubscriber; -use crate::store::Message; -use crate::store::MessageContent; -use crate::store::MessageRequest; -use crate::store::MessageResponse; -use crate::store::SignedMessage; use std::sync::{Arc, Mutex}; -use tokio::select; use tokio::sync::broadcast; -use develop::unix_socket::SocketServer; - -mod cli; -mod commands; -mod develop; -mod store; -mod utils; - -async fn main_normal( - args: Args, - config: store::Config, - robots: store::Robots, - to_message_tx: broadcast::Sender, - from_message_tx: broadcast::Sender, -) -> Result<(), Box> { - let jobs: store::Jobs = Arc::new(Mutex::new(store::JobManager::default())); - - let mut to_message_rx = to_message_tx.subscribe(); - - let shared_jobs: Arc> = Arc::clone(&jobs); - - loop { - select! { - msg = to_message_rx.recv()=>match msg{ - Ok(msg)=>{ - let signed_message = serde_json::from_str::(&msg)?; - let message = serde_json::from_str::(&signed_message.message)?; - match message.content{ - MessageContent::JobMessage(message_content) =>{ - info!("main got job message: {:?}", message_content); - if let Ok(message) = serde_json::from_value::(message_content){ - let shared_jobs = Arc::clone(&shared_jobs); - commands::message_to_robot(message, shared_jobs).await - }else{ - error!("Can't deserialize MessageToRobot"); - } - }, - MessageContent::StartTunnelReq { job_id, peer_id }=>{ - let shared_jobs = Arc::clone(&shared_jobs); - - commands::start_tunnel(commands::TunnnelClient::RobotClient { peer_id: peer_id, from_robot_tx: from_message_tx.clone(), job_id: job_id.clone() }, job_id, shared_jobs).await - }, - MessageContent::StartJob(robot_job)=>{ - info!("new job {:?}", robot_job); - let shared_jobs = Arc::clone(&shared_jobs); - commands::launch_new_job(robot_job, shared_jobs).await; - }, - MessageContent::UpdateConfig{config}=>{ - - info!("UpdateConfig: {:?}", config); - let shared_robots = Arc::clone(&robots); - let mut robot_manager = shared_robots.lock().unwrap(); - let signed_message = signed_message.clone(); - if signed_message.verify() && signed_message.public_key == robot_manager.owner_public_key{ - robot_manager.set_robots_config(config, Some(signed_message)); - info!("Config updated"); - match robot_manager.save_to_file(args.config_path.clone()){ - Ok(_)=>{ - info!("Config saved to file"); - }, - Err(_)=>{ - error!("Can't save config to file"); - } - } - } - - } - MessageContent::MessageRequest(request)=>{ - let mut response_content:Option = None; - match request{ - MessageRequest::ListJobs{}=>{ - info!("ListJobs request"); - let shared_jobs = Arc::clone(&shared_jobs); - let job_manager = shared_jobs.lock().unwrap(); - let jobs = job_manager.get_jobs_info(); - info!("jobs: {:?}", jobs); - response_content = Some(MessageResponse::ListJobs { jobs: jobs }); - }, - MessageRequest::GetRobotsConfig{}=>{ - info!("GetRobotsConfig request"); - let shared_robots = Arc::clone(&robots); - let robot_manager = shared_robots.lock().unwrap(); - let robots_config = robot_manager.get_robots_config(); - info!("config: {:?}", robots_config); - response_content = Some(MessageResponse::GetRobotsConfig { config:robots_config }) - } - _=>{ - - } - } - if let Some(message_response) =response_content{ - let message_content = MessageContent::MessageResponse(message_response); - let _ = from_message_tx.send(serde_json::to_string(&Message::new( - message_content, - "".to_string(), - Some(message.from), - ))?); - } - }, - _=>{} - } - }, - Err(_)=>{ - error!("error while socket receiving libp2p message"); - } - }, - } - } -} +pub mod cli; +pub mod commands; +pub mod core; +pub mod external_api; +pub mod node; +pub mod store; +pub mod utils; pub fn generate_key_file(key_filename: String) -> store::Config { let config = store::Config::generate(); - let _ = config.save_to_file(key_filename); - info!("Generated new key and saved it to merklebot.key"); + let _ = config.save_to_file(key_filename.clone()); + info!("Generated new key and saved it to {}", key_filename); config } @@ -140,7 +31,6 @@ pub fn generate_key_file(key_filename: String) -> store::Config { #[tokio::main] async fn main() -> Result<(), Box> { let _ = tracing::subscriber::set_global_default(FmtSubscriber::default()); - //console_subscriber::init(); let args = cli::get_args(); match args.mode.as_str() { @@ -175,19 +65,13 @@ async fn main() -> Result<(), Box> { self_peer_id: config.get_peer_id(), ..Default::default() }; - robot_manager.read_config_from_file(args.config_path.clone()); - let robots: store::Robots = Arc::new(Mutex::new(robot_manager)); - - { - let mut robots_manager = robots.lock().unwrap(); - robots_manager.set_owner(args.clone().owner)?; - } + robot_manager.set_owner(args.clone().owner)?; + let _ = robot_manager.read_config_from_file(args.config_path.clone()); { match std::fs::read_to_string("config.json") { Ok(contents) => { - let mut robots_manager = robots.lock().unwrap(); - robots_manager.read_robots_from_config(contents); + robot_manager.read_robots_from_config(contents); } Err(_err) => { error!("Error while reading robots list"); @@ -195,81 +79,41 @@ async fn main() -> Result<(), Box> { } } - let (to_message_tx, _to_message_rx) = broadcast::channel::(16); - let (from_message_tx, _from_message_rx) = broadcast::channel::(16); - - let mut socket_server = SocketServer::default(); - - let to_message_tx_socket = to_message_tx.clone(); - let from_message_tx_socket = from_message_tx.clone(); - let unix_socket_robots = Arc::clone(&robots); - let unix_socket_filename = args.socket_filename.clone(); - let _unix_socket_thread = tokio::spawn(async move { - info!("Start unix socket server"); - match socket_server - .start( - from_message_tx_socket, - to_message_tx_socket, - unix_socket_robots, - unix_socket_filename, - ) - .await - { - Ok(_) => {} - Err(err) => { - error!("UNIX SOCKET MODULE PANIC: {:?}", err); - } - } - }); - - let to_message_tx_libp2p = to_message_tx.clone(); - let from_message_tx_libp2p = from_message_tx.clone(); - - let libp2p_robots = Arc::clone(&robots); - let mut libp2p_config = config.clone(); - match args.bootstrap_addr.clone() { Some(addr) => { let addr: Multiaddr = addr.parse().unwrap(); - libp2p_config.add_bootstrap_addr(addr); + config.add_bootstrap_addr(addr); } _ => {} } + let robots: store::Robots = Arc::new(Mutex::new(robot_manager)); + let libp2p_port = args.port_libp2p.parse::().unwrap(); - libp2p_config.set_libp2p_port(libp2p_port); + config.set_libp2p_port(libp2p_port); - let _libp2p_thread = tokio::spawn(async move { - info!("Start libp2p node"); - develop::mdns::main_libp2p( - libp2p_config, - to_message_tx_libp2p, - from_message_tx_libp2p, - libp2p_robots, - ) - .await - }); + let (to_message_tx, _to_message_rx) = broadcast::channel::(16); + let (from_message_tx, _from_message_rx) = broadcast::channel::(16); - let main_args = args.clone(); - let mut main_config = config.clone(); - let main_robots = Arc::clone(&robots); + if args.socket_filename.is_some() { + // Starting unix socket server + let _unix_socket_thread = + start_unix_socket_thread(&from_message_tx, &to_message_tx, &robots, &args).await; + } - //main_config.add_bootstrap_addr() + if args.rpc.is_some() { + // Starting websocket server + let _web_socket_thread = + start_web_socket_thread(&from_message_tx, &to_message_tx, &robots, &args).await; + } - let to_message_tx_libp2p = to_message_tx.clone(); - let from_message_tx_libp2p = from_message_tx.clone(); + // Starting libp2p node + let _libp2p_thread = + start_libp2p_thread(&from_message_tx, &to_message_tx, &robots, &config, &args).await; - let _main_thread = tokio::spawn(async move { - main_normal( - main_args, - main_config, - main_robots, - to_message_tx_libp2p, - from_message_tx_libp2p, - ) - .await; - }); - _main_thread.await; + // Starting core logic + let core_thread = start_core_thread(&from_message_tx, &to_message_tx, &robots, &args).await; + let _ = core_thread.await; Ok(()) } diff --git a/src/node/mod.rs b/src/node/mod.rs new file mode 100644 index 0000000..2522fe2 --- /dev/null +++ b/src/node/mod.rs @@ -0,0 +1,3 @@ +mod node; + +pub use node::start_libp2p_thread; diff --git a/src/develop/mdns.rs b/src/node/node.rs similarity index 76% rename from src/develop/mdns.rs rename to src/node/node.rs index 5f4411c..88907d0 100644 --- a/src/develop/mdns.rs +++ b/src/node/node.rs @@ -1,14 +1,13 @@ +use crate::cli::Args; use crate::store::Config; use crate::store::Message; use crate::store::MessageContent; use crate::store::RobotRequest; use crate::store::RobotResponse; -use crate::store::RobotRole; use crate::store::Robots; use crate::store::SignedMessage; use futures::stream::StreamExt; -use libp2p::relay::client::new; use libp2p::PeerId; use libp2p::StreamProtocol; use libp2p::{ @@ -27,6 +26,7 @@ use std::str::FromStr; use std::sync::Arc; use std::time::Duration; use tokio::sync::broadcast; +use tokio::task::JoinHandle; use tokio::{io, select}; use tracing::{error, info}; @@ -44,27 +44,31 @@ fn process_signed_message( message_string: String, robots: Robots, to_message_tx: broadcast::Sender, + args: &Args, ) { if let Ok(signed_message) = serde_json::from_str::(&message_string) { info!("signed_message: {:?}", signed_message); - let robot_manager = robots.lock().unwrap(); + let mut robot_manager = robots.lock().unwrap(); if signed_message.verify() { info!("Verified"); if let Ok(message) = serde_json::from_str::(&signed_message.message) { info!("message: {:?}", message); - - if message.to.unwrap_or("".to_string()) == robot_manager.self_peer_id - || matches!(message.content, MessageContent::UpdateConfig { .. }) - { - let role = robot_manager.get_role(signed_message.public_key); - info!("role: {:?}", role); - if matches!(role, RobotRole::Owner) - || matches!(role, RobotRole::OrganizationRobot) - { - let _ = to_message_tx.send(message_string); + let _ = to_message_tx.send(message_string); + + if args.rpc.is_some() { + if let MessageContent::UpdateConfig { config } = message.content { + info!("UPDATE CONFIG"); + info!("{:?}", signed_message.public_key); + robot_manager.config_storage.update_config( + signed_message.public_key, + config, + signed_message, + ); } } } + } else { + info!("not verified"); } } } @@ -76,6 +80,7 @@ async fn start( to_message_tx: broadcast::Sender, from_message_tx: broadcast::Sender, robots: Robots, + args: Args, ) -> Result<(), Box> { let public_key: libp2p::identity::PublicKey = identity.public().into(); info!("PeerId: {:?}", public_key.to_peer_id()); @@ -183,7 +188,7 @@ async fn start( } } else if let Ok(signed_message) = serde_json::from_str::(&msg){ info!("libp2p received socket signed_message: {:?}", signed_message); - process_signed_message(msg.clone(), Arc::clone(&robots), to_message_tx.clone()); + process_signed_message(msg.clone(), Arc::clone(&robots), to_message_tx.clone(), &args); if let Err(e) = swarm .behaviour_mut().gossipsub .publish(topic.clone(), serde_json::to_string(&signed_message)?.as_bytes()) { @@ -233,7 +238,7 @@ async fn start( String::from_utf8_lossy(&message.data), ); let message_string = String::from_utf8_lossy(&message.data).to_string(); - process_signed_message(message_string, Arc::clone(&robots), to_message_tx.clone()); + process_signed_message(message_string, Arc::clone(&robots), to_message_tx.clone(), &args); }, MyBehaviourEvent::Kademlia(event)=>{ @@ -262,6 +267,7 @@ async fn start( } }, MyBehaviourEvent::RequestResponse(event)=>{ + info!("req resp {:?}", event); match event{ request_response::Event::Message{ peer, @@ -272,59 +278,60 @@ async fn start( }, }=>{ - let robot_manager = robots.lock().unwrap(); + info!("request: {:?}", request ); - if let RobotRole::OrganizationRobot = robot_manager.get_role(peer.to_string()){ match request{ - RobotRequest::GetConfigVersion{}=>{ + RobotRequest::GetConfigVersion{version, owner}=>{ + let robot_manager = robots.lock().unwrap(); info!("get config verison request from peer {}", peer); - let version = robot_manager.config_version.clone(); - let _ =swarm.behaviour_mut().request_response.send_response(channel, RobotResponse::GetConfigVersion { version }); - }, - RobotRequest::GetSignedConfigMessage{}=>{ - info!("get signed config request from peer {}", peer); - if let Some(config_message) = &robot_manager.config_message{ - let _ = swarm.behaviour_mut().request_response.send_response(channel, RobotResponse::GetSignedConfigMessage { - signed_message: config_message.clone() - }); + let mut stored_version = 0; + + if let Some((config, signed_message)) = robot_manager.config_storage.get_config(&owner){ + if version{ + RobotRequest::ShareConfigMessage{signed_message}=>{ + info!("got config from peer {}", peer); + let robots = Arc::clone(&robots); + process_signed_message(serde_json::to_string(&signed_message)?, robots, to_message_tx.clone(), &args); } } - } - }, request_response::Event::Message{ peer, message: request_response::Message::Response { - request_id, response, + .. }, }=>{ match response { - RobotResponse::GetConfigVersion{version}=>{ + RobotResponse::GetConfigVersion{version, owner}=>{ info!("got config version {} from peer {}", version, peer); let robot_manager = robots.lock().unwrap(); - if version>robot_manager.config_version{ - info!("asking for new config"); - swarm.behaviour_mut().request_response.send_request(&peer, RobotRequest::GetSignedConfigMessage { } ); + + if let Some((config, signed_message)) = robot_manager.config_storage.get_config(&owner){ + info!("config version {}", config.version); + if version{ - info!("got config from peer {}", peer); - let robots = Arc::clone(&robots); - process_signed_message(serde_json::to_string(&signed_message)?, robots, to_message_tx.clone()) - } - _=>{ + _=>{} - } } - - }, _=>{} @@ -346,7 +353,11 @@ async fn start( established_in }=>{ info!("ConnectionEstablished: {peer_id} | {connection_id} | {endpoint:?} | {num_established} | {concurrent_dial_errors:?} | {established_in:?}"); - let get_config_version_req = RobotRequest::GetConfigVersion{}; + let robot_manager = robots.lock().unwrap(); + let version = robot_manager.config_version.clone(); + let owner = robot_manager.owner_public_key.clone(); + let get_config_version_req = RobotRequest::GetConfigVersion{version, owner}; + info!("Sending config version req {:?}", get_config_version_req); swarm.behaviour_mut().request_response.send_request(&peer_id, get_config_version_req); }, _ => {} @@ -355,21 +366,38 @@ async fn start( } } -pub async fn main_libp2p( - config: Config, - to_message_tx: broadcast::Sender, - from_message_tx: broadcast::Sender, - robots: Robots, -) -> Result<(), Box> { - let _ = start( - config.identity, - config.libp2p_port, - config.bootstrap_addrs, - to_message_tx, - from_message_tx, - robots, - ) - .await; - - Ok(()) +pub async fn start_libp2p_thread( + from_message_tx: &broadcast::Sender, + to_message_tx: &broadcast::Sender, + robots: &Robots, + config: &Config, + args: &Args, +) -> JoinHandle<()> { + let from_message_tx = from_message_tx.clone(); + let to_message_tx = to_message_tx.clone(); + let robots = Arc::clone(robots); + let config = config.clone(); + let args = args.clone(); + + let libp2p_thread = tokio::spawn(async move { + info!("Start libp2p node"); + match start( + config.identity, + config.libp2p_port, + config.bootstrap_addrs, + to_message_tx, + from_message_tx, + robots, + args, + ) + .await + { + Ok(_) => {} + Err(err) => { + error!("LIBP2P MODULE PANIC: {:?}", err); + } + } + }); + + return libp2p_thread; } diff --git a/src/store.rs b/src/store.rs index 979a0c4..3efea5a 100644 --- a/src/store.rs +++ b/src/store.rs @@ -1,10 +1,10 @@ use libp2p::{Multiaddr, PeerId}; use libp2p_identity::ed25519; use serde::{Deserialize, Serialize}; -use std::any::{Any, TypeId}; -use std::collections::{HashMap, HashSet}; +use std::any::Any; +use std::collections::HashMap; use std::error::Error; -use std::fs; +use std::fs::{self}; use std::hash::Hash; use std::sync::{Arc, Mutex}; use std::time::SystemTime; @@ -15,6 +15,7 @@ use base64::{engine::general_purpose, Engine as _}; use crate::commands::{RobotJob, RobotJobResult}; +/// Represents a tunnel for job communication #[derive(Debug, Clone, Serialize)] pub struct Tunnel { pub client_id: String, @@ -36,15 +37,15 @@ pub enum ChannelMessageFromJob { #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "request_type")] pub enum RobotRequest { - GetConfigVersion {}, - GetSignedConfigMessage {}, + GetConfigVersion { version: u32, owner: [u8; 32] }, + ShareConfigMessage { signed_message: SignedMessage }, } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "response_type")] pub enum RobotResponse { - GetConfigVersion { version: u32 }, - GetSignedConfigMessage { signed_message: SignedMessage }, + GetConfigVersion { version: u32, owner: [u8; 32] }, + ShareConfigMessage {}, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -109,12 +110,14 @@ impl From for JobProcessData { } } +/// Manages job processes #[derive(Default, Debug)] pub struct JobManager { pub data: HashMap, } impl JobManager { + /// Creates a new job pub fn new_job(&mut self, job_id: String, job_type: String, status: String) { let (channel_to_job_tx, _channel_to_job_rx) = broadcast::channel::(100); @@ -131,10 +134,11 @@ impl JobManager { }, ); } + /// Sets the result of a job pub fn set_job_result(&mut self, reslut: RobotJobResult) { let process = self.data.get_mut(&reslut.job_id); match process { - Some(process) => { + Some(_process) => { self.set_job_status(reslut.job_id, reslut.status); } None => {} @@ -146,6 +150,7 @@ impl JobManager { None => None, } } + /// Retrieves job information pub fn get_jobs_info(&self) -> Vec { return self.data.clone().into_values().map(|x| x.into()).collect(); } @@ -159,6 +164,7 @@ impl JobManager { } } + /// Creates a tunnel for a job pub fn create_job_tunnel(&mut self, job_id: &String, client_id: String) { let process = self.data.get_mut(job_id); let (tx, _rx) = broadcast::channel::(100); @@ -193,6 +199,7 @@ impl JobManager { } } +/// Manages messages for the system #[derive(Debug, Clone)] pub struct MessageManager { pub from_message_tx: broadcast::Sender, @@ -201,6 +208,7 @@ pub struct MessageManager { impl MessageManager {} +/// Represents a signed message #[derive(Debug, Clone, Serialize, Deserialize)] pub struct SignedMessage { pub message: String, @@ -209,6 +217,7 @@ pub struct SignedMessage { } impl SignedMessage { + /// Verifies the signature of the message pub fn verify(&self) -> bool { if let Ok(public_key) = ed25519::PublicKey::try_from_bytes(&self.public_key) { return public_key.verify(&*self.message.as_bytes(), &self.sign); @@ -217,6 +226,7 @@ impl SignedMessage { } } +/// Represents a message in the system #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Message { pub timestamp: String, @@ -225,6 +235,7 @@ pub struct Message { pub to: Option, } impl Message { + /// Creates a new message pub fn new(content: MessageContent, from: String, to: Option) -> Self { let duration_since_epoch = SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) @@ -237,6 +248,7 @@ impl Message { to, } } + /// Signs the message using the provided keypair pub fn signed(self, keypair: ed25519::Keypair) -> Result { let message_str = serde_json::to_string(&self)?; let sign = keypair.sign(&*message_str.as_bytes()); @@ -251,6 +263,7 @@ impl Message { pub enum RobotRole { Current, OrganizationRobot, + OrganizationUser, Owner, Unknown, } @@ -263,10 +276,85 @@ pub struct Robot { pub tags: Vec, } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct User { + pub username: String, + pub public_key: String, + pub tags: Vec, +} + +impl User { + pub fn get_public_key_bytes(&self) -> Result<[u8; 32], Box> { + let decoded_key = general_purpose::STANDARD.decode(&self.public_key); + match decoded_key { + Ok(decoded_key) => { + let public_key_bytes: [u8; 32] = decoded_key.as_slice().try_into()?; + return Ok(public_key_bytes); + } + Err(_err) => { + return Err("can't decode user public key from base64")?; + } + } + } + pub fn get_peer_id(&self) -> Result> { + let identity = + libp2p::identity::ed25519::PublicKey::try_from_bytes(&self.get_public_key_bytes()?)?; + let public_key: libp2p::identity::PublicKey = identity.into(); + let peer_id = public_key.to_peer_id(); + return Ok(peer_id.to_base58()); + } +} + #[derive(Default, Debug, Clone, Serialize, Deserialize)] pub struct RobotsConfig { pub version: u32, pub robots: Vec, + pub users: Vec, +} + +#[derive(Default, Debug, Clone, Serialize, Deserialize)] +pub struct ConfigStorage { + configs: HashMap<[u8; 32], (RobotsConfig, SignedMessage)>, // owner public key -> config +} + +impl ConfigStorage { + pub fn update_config( + &mut self, + owner: [u8; 32], + config: RobotsConfig, + signed_message: SignedMessage, + ) { + match self.configs.get(&owner) { + Some((old_config, _)) => { + if old_config.version >= config.version { + return; + } + } + None => {} + } + + info!( + "Updating config in config storage owner {:?} {:?}", + owner, signed_message + ); + self.configs.insert(owner, (config, signed_message)); + } + pub fn get_config(&self, owner: &[u8; 32]) -> Option<(RobotsConfig, SignedMessage)> { + if let Some((config, message)) = self.configs.get(owner) { + return Some((config.clone(), message.clone())); + } + return None; + } + pub fn get_config_version(&self, owner: &[u8; 32]) -> u32 { + match self.get_config(owner) { + Some((config, _message)) => { + return config.version; + } + None => { + return 0; + } + } + } } #[derive(Debug, Clone, Serialize, Deserialize, Hash, PartialEq, Eq)] @@ -274,25 +362,29 @@ pub struct RobotInterface { pub ip4: String, } -#[derive(Default, Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct RobotsManagerDump { config: RobotsConfig, - config_message: Option, + config_message: SignedMessage, } +/// Manages robots in the system #[derive(Default, Debug, Clone, Serialize, Deserialize)] pub struct RobotsManager { pub self_peer_id: String, pub owner_peer_id: String, pub owner_public_key: [u8; 32], - pub robots: HashMap, + pub robots: HashMap, // peer id tp Robot + pub users: HashMap, // public key to User pub config_version: u32, pub config_message: Option, pub peer_id_to_ip: HashMap, pub peers: Vec, + pub config_storage: ConfigStorage, } impl RobotsManager { + /// Adds a robot to the manager pub fn add_robot(&mut self, robot: Robot) { self.robots .insert(robot.robot_peer_id.clone(), robot.clone()); @@ -300,32 +392,52 @@ impl RobotsManager { // self.add_interface_to_robot(robot.robot_peer_id, ip4.to_string()); // } } + pub fn add_user(&mut self, user: User) { + // if let Ok(peer_id) = user.get_peer_id(){ + // self.users.insert(peer_id, user.clone()); + // }else{ + // error!("can't get peer_id for user {:?}", user); + // } + self.users.insert(user.public_key.clone(), user); + } - pub fn set_robots_config( - &mut self, - config: RobotsConfig, - signed_message: Option, - ) { + /// Sets the robots configuration + pub fn set_robots_config(&mut self, config: RobotsConfig, signed_message: SignedMessage) { if config.version > self.config_version { + self.config_version = config.version.clone(); self.robots.clear(); - self.config_version = config.version; - for robot in config.robots { - self.add_robot(robot); + for robot in &config.robots { + self.add_robot(robot.clone()); } - self.config_message = signed_message + self.users.clear(); + for user in &config.users { + self.add_user(user.clone()); + } + self.config_message = Some(signed_message.clone()); + info!("OWNER_PUBLIC_KEY {:?}", self.owner_public_key); + self.config_storage.update_config( + self.owner_public_key, + config.clone(), + signed_message, + ); } else { info!("config version is too old"); } } + /// Saves the current configuration to a file pub fn save_to_file(&self, filepath: String) -> Result<(), Box> { let dump = RobotsManagerDump { config: self.get_robots_config(), - config_message: self.config_message.clone(), + config_message: self + .config_message + .clone() + .ok_or("no config signed message")?, }; fs::write(filepath, serde_json::to_string(&dump)?)?; Ok(()) } + /// Reads the configuration from a file pub fn read_config_from_file(&mut self, filepath: String) -> Result<(), Box> { let dump_str = fs::read_to_string(filepath)?; let dump: RobotsManagerDump = serde_json::from_str(&dump_str)?; @@ -341,6 +453,11 @@ impl RobotsManager { .values() .map(|robot| robot.clone()) .collect::>(), + users: self + .users + .values() + .map(|user| user.clone()) + .collect::>(), }; } @@ -390,20 +507,23 @@ impl RobotsManager { // } // } + /// Gets the role of an entity based on its ID pub fn get_role(&self, id: T) -> RobotRole { let value_any = &id as &dyn Any; - if let Some(peer_id) = value_any.downcast_ref::() { - for robot_peer_id in self.robots.keys() { - if robot_peer_id == peer_id { - return RobotRole::OrganizationRobot; - } - } + if let Some(_peer_id) = value_any.downcast_ref::() { + // for user_peer_id in self.users.keys() { + // if user_peer_id == peer_id { + // return RobotRole::OrganizationUser; + // } + // } } else if let Some(public_key) = value_any.downcast_ref::<[u8; 32]>() { if *public_key == self.owner_public_key { return RobotRole::Owner; } + info!("public key: {:?}", public_key); let pk_str = general_purpose::STANDARD.encode(&public_key.to_vec()); + info!("pk_str: {}", pk_str); for robot_public_key in self .robots .values() @@ -413,12 +533,16 @@ impl RobotsManager { return RobotRole::OrganizationRobot; } } + info!("users: {:?}", self.users); + if let Some(_user) = self.users.get(&pk_str) { + return RobotRole::OrganizationUser; + } } return RobotRole::Unknown; } - pub fn remove_interface_from_robot(&mut self, robot_peer_id: String, ip4: String) {} + pub fn remove_interface_from_robot(&mut self, _robot_peer_id: String, _ip4: String) {} pub fn merge_update(&mut self, update_robots: RobotsConfig) { for robot in update_robots.robots.iter() { @@ -433,6 +557,7 @@ impl RobotsManager { } } +/// Represents the configuration for the system #[derive(Debug, Clone)] pub struct Config { pub identity: libp2p::identity::ed25519::Keypair, @@ -441,6 +566,7 @@ pub struct Config { } impl Config { + /// Generates a new configuration pub fn generate() -> Self { Self { identity: libp2p::identity::ed25519::Keypair::generate(), @@ -449,6 +575,7 @@ impl Config { } } + /// Saves the configuration to a file pub fn save_to_file(self: &Self, filepath: String) -> Result<(), Box> { let encoded_key = general_purpose::STANDARD.encode(&self.identity.to_bytes().to_vec()); fs::write(filepath, encoded_key)?; @@ -467,6 +594,7 @@ impl Config { return peer_id.to_string(); } + /// Loads the configuration from a file pub fn load_from_file(filepath: String) -> Result> { let key = fs::read(filepath)?; let decoded_key: &mut [u8] = &mut general_purpose::STANDARD.decode(key)?; @@ -478,6 +606,7 @@ impl Config { }) } + /// Loads the configuration from a secret key string pub fn load_from_sk_string(sk_string: String) -> Result> { let decoded_key: &mut [u8] = &mut general_purpose::STANDARD.decode(sk_string)?; let parsed_secret = libp2p::identity::ed25519::SecretKey::try_from_bytes(decoded_key)?; @@ -498,5 +627,7 @@ impl Config { } } +/// Type alias for thread-safe access to RobotsManager pub type Robots = Arc>; +/// Type alias for thread-safe access to JobManager pub type Jobs = Arc>;