Skip to content

Commit

Permalink
add naive online status + change store code structure
Browse files Browse the repository at this point in the history
  • Loading branch information
Smehnov committed Oct 4, 2024
1 parent 2ce7b0f commit 2b9606b
Show file tree
Hide file tree
Showing 17 changed files with 786 additions and 676 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ debug/
.DS_Store
*.key
script.py
scripts

10 changes: 5 additions & 5 deletions src/commands/docker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use base64::{engine::general_purpose, Engine as _};

use crate::{
commands::{RobotJob, RobotJobResult},
store::Jobs,
store::job_manager::Jobs,
utils::files::create_job_data_dir,
};

Expand Down Expand Up @@ -185,14 +185,14 @@ impl DockerLaunch {
loop {
let channel_message = channel_to_job_rx.recv().await.unwrap();
match channel_message {
crate::store::ChannelMessageToJob::TerminalMessage(
crate::store::messages::ChannelMessageToJob::TerminalMessage(
data,
) => {
for byte in data.as_bytes().iter() {
input.write_all(&[*byte]).await.ok();
}
}
crate::store::ChannelMessageToJob::ArchiveMessage {
crate::store::messages::ChannelMessageToJob::ArchiveMessage {
encoded_tar,
path,
} => {
Expand Down Expand Up @@ -226,7 +226,7 @@ impl DockerLaunch {
info!("Error while decoded tar");
}
}
crate::store::ChannelMessageToJob::ArchiveRequest {
crate::store::messages::ChannelMessageToJob::ArchiveRequest {
..
} => {}
}
Expand All @@ -249,7 +249,7 @@ impl DockerLaunch {
while let Some(Ok(output)) = output.next().await {
let job_manager = shared_jobs.lock().unwrap();
if let Some(tx) = job_manager.get_channel_from_job(&robot_job.id) {
tx.send(crate::store::ChannelMessageFromJob::TerminalMessage(
tx.send(crate::store::messages::ChannelMessageFromJob::TerminalMessage(
output.to_string(),
))
.unwrap();
Expand Down
16 changes: 8 additions & 8 deletions src/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use tokio::sync::broadcast::Sender;
use tracing::{error, info};

use crate::store;
use crate::store::ChannelMessageFromJob;
use crate::store::Message;
use crate::store::messages::{Message, ChannelMessageFromJob};
use crate::store::job_manager::Jobs;

mod docker;

Expand Down Expand Up @@ -60,7 +60,7 @@ pub enum TunnnelClient {
},
}

pub async fn launch_new_job(robot_job: RobotJob, jobs: &store::Jobs) {
pub async fn launch_new_job(robot_job: RobotJob, jobs: &Jobs) {
info!("{:?}", robot_job);
let jobs = Arc::clone(jobs);
let mut job_manager = jobs.lock().unwrap();
Expand Down Expand Up @@ -100,7 +100,7 @@ pub async fn start_tunnel_messanger(
info!("sending stdout: {:?}", stdout);
let _ = from_robot_tx.send(
serde_json::to_string(&Message::new(
store::MessageContent::TunnelResponseMessage {
store::messages::MessageContent::TunnelResponseMessage {
job_id: job_id.clone(),
message: ChannelMessageFromJob::TerminalMessage(stdout),
},
Expand All @@ -125,7 +125,7 @@ pub async fn start_tunnel_messanger(
}
}

pub async fn start_tunnel(tunnel_client: TunnnelClient, job_id: String, jobs: &store::Jobs) {
pub async fn start_tunnel(tunnel_client: TunnnelClient, job_id: String, jobs: &Jobs) {
info!("Start tunnel request");
let jobs = Arc::clone(jobs);
let mut job_manager = jobs.lock().unwrap();
Expand Down Expand Up @@ -158,7 +158,7 @@ pub async fn start_tunnel(tunnel_client: TunnnelClient, job_id: String, jobs: &s
}
}

pub async fn message_to_robot(message: MessageToRobot, jobs: &store::Jobs) {
pub async fn message_to_robot(message: MessageToRobot, jobs: &Jobs) {
info!("Message to robot request");

info!("Message to robot: {:?}", message);
Expand All @@ -171,12 +171,12 @@ pub async fn message_to_robot(message: MessageToRobot, jobs: &store::Jobs) {
match content {
MessageContent::Terminal { stdin } => {
channel
.send(store::ChannelMessageToJob::TerminalMessage(stdin))
.send(store::messages::ChannelMessageToJob::TerminalMessage(stdin))
.unwrap();
}
MessageContent::Archive { dest_path, data } => {
channel
.send(store::ChannelMessageToJob::ArchiveMessage {
.send(store::messages::ChannelMessageToJob::ArchiveMessage {
encoded_tar: data,
path: dest_path,
})
Expand Down
43 changes: 36 additions & 7 deletions src/core/core.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
use crate::store::Message;
use crate::store::MessageContent;
use crate::store::MessageRequest;
use crate::store::MessageResponse;
use crate::store::RobotRole;
use crate::store::SignedMessage;
use crate::store::messages::{Message, MessageContent, MessageRequest, MessageResponse, SignedMessage};
use crate::store::robot_manager::{RobotRole, Robots};
use crate::store::job_manager::{JobManager, Jobs};

use crate::commands;

use crate::cli::Args;
use crate::store::{JobManager, Jobs, Robots};
use std::sync::{Arc, Mutex};
use tokio::select;
use tokio::sync::broadcast;
use tokio::task::JoinHandle;
use tracing::{error, info};
use tokio::time::{interval, Duration};


use std::error::Error;

Expand All @@ -27,6 +25,8 @@ pub async fn main_normal(

let mut to_message_rx = to_message_tx.subscribe();

let mut handshake_timer = interval(Duration::from_secs(30));

loop {
select! {
msg = to_message_rx.recv()=>match msg{
Expand All @@ -39,6 +39,7 @@ pub async fn main_normal(
let robot_manager = robots.lock().unwrap();
if message.to.unwrap_or("".to_string()) == robot_manager.self_peer_id
|| matches!(message.content, MessageContent::UpdateConfig { .. })

{
let role = robot_manager.get_role(signed_message.public_key);
info!("role: {:?}", role);
Expand All @@ -49,6 +50,9 @@ pub async fn main_normal(
}
}
}
if matches!(message.content, MessageContent::Handshake { }){
should_process = true;
}
info!("should process {}", should_process);

if should_process{
Expand Down Expand Up @@ -116,6 +120,17 @@ pub async fn main_normal(
))?);
}
},
MessageContent::Handshake{}=>{
let identity =
libp2p::identity::ed25519::PublicKey::try_from_bytes(&signed_message.public_key)?;
let public_key: libp2p::identity::PublicKey = identity.into();
let peer_id = public_key.to_peer_id();
let from = peer_id.to_base58();
info!("Got handshake from {}", from);
let mut robot_manager = robots.lock().unwrap();
robot_manager.network_manager.process_handshake(from);

}
_=>{}
}
}
Expand All @@ -124,6 +139,20 @@ pub async fn main_normal(
error!("error while socket receiving libp2p message");
}
},
_ = handshake_timer.tick()=>{

let message_content = MessageContent::Handshake { };
let _ = from_message_tx.send(serde_json::to_string(&Message::new(
message_content,
"".to_string(),
None
))?);

let mut robot_manager = robots.lock().unwrap();
robot_manager.network_manager.clean_old_handshakes();


}
}
}
}
Expand Down
13 changes: 9 additions & 4 deletions src/external_api/messages.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use crate::store::Message;
use crate::store::MessageContent;
use crate::store::Robots;
use crate::store::SignedMessage;
use crate::store::messages::{Message, MessageContent, SignedMessage};
use crate::store::robot_manager::Robots;
use base64::{engine::general_purpose, Engine as _};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
Expand Down Expand Up @@ -95,6 +93,13 @@ pub fn process_command(
answer = Some("{\"ok\":false}".to_string());
}
}
"/network_info"=>{
info!("/network_info request");
let robots_manager = robots.lock().unwrap();
let network_info_text = serde_json::to_string(&robots_manager.network_manager.peers_info)?;
info!("{}", network_info_text);
answer = Some(network_info_text)
}
"/send_message" => {
if let Some(message_content) = command.content {
let _ = from_message_tx.send(serde_json::to_string(&Message::new(
Expand Down
5 changes: 2 additions & 3 deletions src/external_api/unix_socket.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use super::messages::{process_command, MessageQueue, SocketCommand};
use crate::cli::Args;
use crate::store::Message;
use crate::store::Robots;
use crate::store::SignedMessage;
use crate::store::messages::{Message, SignedMessage};
use crate::store::robot_manager::Robots;
use std::error::Error;
use std::sync::{Arc, Mutex};
use tokio::net::{UnixListener, UnixStream};
Expand Down
2 changes: 1 addition & 1 deletion src/external_api/web_socket.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::cli::Args;
use crate::store::Robots;
use crate::store::robot_manager::Robots;
use futures::SinkExt;
use std::error::Error;
use std::sync::{Arc, Mutex};
Expand Down
16 changes: 9 additions & 7 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use tracing_subscriber::FmtSubscriber;

use std::sync::{Arc, Mutex};
use tokio::sync::broadcast;
use store::key_manager::KeyConfig;
use store::robot_manager::{RobotsManager, Robots};

pub mod cli;
pub mod commands;
Expand All @@ -20,8 +22,8 @@ pub mod node;
pub mod store;
pub mod utils;

pub fn generate_key_file(key_filename: String) -> store::Config {
let config = store::Config::generate();
pub fn generate_key_file(key_filename: String) -> store::key_manager::KeyConfig {
let config = store::key_manager::KeyConfig::generate();
let _ = config.save_to_file(key_filename.clone());
info!("Generated new key and saved it to {}", key_filename);

Expand All @@ -40,13 +42,13 @@ async fn main() -> Result<(), Box<dyn Error>> {
}
}

let mut config: store::Config = store::Config::generate();
let mut config: KeyConfig = KeyConfig::generate();
match args.clone().secret_key {
Some(secret_key_string) => {
config = store::Config::load_from_sk_string(secret_key_string)?;
config = KeyConfig::load_from_sk_string(secret_key_string)?;
}
None => {
let config_load_res = store::Config::load_from_file(args.key_filename.clone());
let config_load_res = KeyConfig::load_from_file(args.key_filename.clone());
match config_load_res {
Ok(loaded_config) => {
config = loaded_config;
Expand All @@ -61,7 +63,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
error!("Can't write key to file {:?}", err);
}

let mut robot_manager = store::RobotsManager {
let mut robot_manager = RobotsManager {
self_peer_id: config.get_peer_id(),
..Default::default()
};
Expand All @@ -87,7 +89,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
_ => {}
}

let robots: store::Robots = Arc::new(Mutex::new(robot_manager));
let robots: Robots = Arc::new(Mutex::new(robot_manager));

let libp2p_port = args.port_libp2p.parse::<u16>().unwrap();
config.set_libp2p_port(libp2p_port);
Expand Down
14 changes: 6 additions & 8 deletions src/node/node.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use crate::cli::Args;
use crate::store::Config;
use crate::store::Message;
use crate::store::MessageContent;
use crate::store::RobotRequest;
use crate::store::RobotResponse;
use crate::store::Robots;
use crate::store::SignedMessage;

use crate::store::key_manager::KeyConfig;
use crate::store::messages::{Message, MessageContent, RobotRequest, RobotResponse, SignedMessage};
use crate::store::robot_manager::{Robots};


use futures::stream::StreamExt;
use libp2p::PeerId;
Expand Down Expand Up @@ -370,7 +368,7 @@ pub async fn start_libp2p_thread(
from_message_tx: &broadcast::Sender<String>,
to_message_tx: &broadcast::Sender<String>,
robots: &Robots,
config: &Config,
config: &KeyConfig,
args: &Args,
) -> JoinHandle<()> {
let from_message_tx = from_message_tx.clone();
Expand Down
Loading

0 comments on commit 2b9606b

Please sign in to comment.