From e7b5da3b7283732a689a70363d43167e7d37060f Mon Sep 17 00:00:00 2001 From: David Teller Date: Tue, 28 Feb 2023 18:48:08 +0100 Subject: [PATCH] Draft: Global process registry - resolves #127 --- crates/lunatic-control-axum/src/routes.rs | 52 ++++++++++++++++++- crates/lunatic-control-axum/src/server.rs | 14 +++++ crates/lunatic-distributed/src/control/api.rs | 5 ++ 3 files changed, 69 insertions(+), 2 deletions(-) diff --git a/crates/lunatic-control-axum/src/routes.rs b/crates/lunatic-control-axum/src/routes.rs index 943071aac..4b88c51bc 100644 --- a/crates/lunatic-control-axum/src/routes.rs +++ b/crates/lunatic-control-axum/src/routes.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use axum::{ body::Bytes, extract::DefaultBodyLimit, - routing::{get, post}, + routing::{get, post, delete}, Extension, Json, Router, }; use lunatic_distributed::{ @@ -15,7 +15,7 @@ use tower_http::limit::RequestBodyLimitLayer; use crate::{ api::{ok, ApiError, ApiResponse, HostExtractor, JsonExtractor, NodeAuth, PathExtractor}, - server::ControlServer, + server::{ControlServer, ProcessId}, }; pub async fn register( @@ -50,6 +50,9 @@ pub async fn register( get_module: format!("http://{host}/module/{{id}}"), add_module: format!("http://{host}/module"), get_nodes: format!("http://{host}/nodes"), + get_process: format!("http://{host}/process/get/{{id}}"), + add_process: format!("http://{host}/process/add"), + remove_process: format!("http://{host}/process/remove/{{id}}"), }, }) } @@ -141,6 +144,48 @@ pub async fn get_module( ok(ModuleBytes { bytes }) } +pub async fn get_process( + node_auth: NodeAuth, + PathExtractor(id): PathExtractor, + control: Extension>, +) -> ApiResponse { + log::info!("Node {} get_process {}", node_auth.node_name, id); + + let process = control + .processes + .get(&id) + .ok_or_else(|| ApiError::custom_code("error_reading_process_name"))?; + + ok(process.value().to_string()) +} + +pub async fn remove_process( + node_auth: NodeAuth, + PathExtractor(id): PathExtractor, + control: Extension>, +) -> ApiResponse { + log::info!("Node {} remove_process {}", node_auth.node_name, id); + + let was_removed = control.processes + .remove(&id) + .is_some(); + + ok(was_removed) +} + +pub async fn add_process( + node_auth: NodeAuth, + control: Extension>, + PathExtractor(id): PathExtractor, + JsonExtractor(name): JsonExtractor, +) -> ApiResponse { + log::info!("Node {} add_process {}", node_auth.node_name, id); + + let was_replaced = control.processes.insert(id, name).is_some(); + + ok(was_replaced) +} + pub fn init_routes() -> Router { Router::new() .route("/", post(register)) @@ -149,6 +194,9 @@ pub fn init_routes() -> Router { .route("/nodes", get(list_nodes)) .route("/module", post(add_module)) .route("/module/:id", get(get_module)) + .route("/process/:id", get(get_process)) + .route("/process/:id", post(add_process)) + .route("/process/:id", delete(remove_process)) .layer(DefaultBodyLimit::disable()) .layer(RequestBodyLimitLayer::new(50 * 1024 * 1024)) // 50 mb } diff --git a/crates/lunatic-control-axum/src/server.rs b/crates/lunatic-control-axum/src/server.rs index 9ba342fb5..d0210f462 100644 --- a/crates/lunatic-control-axum/src/server.rs +++ b/crates/lunatic-control-axum/src/server.rs @@ -12,6 +12,7 @@ use chrono::{DateTime, Utc}; use dashmap::DashMap; use lunatic_distributed::control::api::{NodeStart, Register}; use rcgen::Certificate; +use serde::{Deserialize, Serialize}; use uuid::Uuid; use crate::routes; @@ -33,12 +34,24 @@ pub struct NodeDetails { pub attributes: serde_json::Value, } +/// The id of a process. +/// +/// FIXME: Is this a global id or a local id? +#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug, Deserialize, Serialize)] +pub struct ProcessId(u64); +impl std::fmt::Display for ProcessId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + pub struct ControlServer { pub ca_cert: Certificate, pub quic_client: lunatic_distributed::quic::Client, pub registrations: DashMap, pub nodes: DashMap, pub modules: DashMap>, + pub(crate) processes: DashMap, next_registration_id: AtomicU64, next_node_id: AtomicU64, next_module_id: AtomicU64, @@ -52,6 +65,7 @@ impl ControlServer { registrations: DashMap::new(), nodes: DashMap::new(), modules: DashMap::new(), + processes: DashMap::new(), next_registration_id: AtomicU64::new(1), next_node_id: AtomicU64::new(1), next_module_id: AtomicU64::new(1), diff --git a/crates/lunatic-distributed/src/control/api.rs b/crates/lunatic-distributed/src/control/api.rs index af290b4f4..74b475797 100644 --- a/crates/lunatic-distributed/src/control/api.rs +++ b/crates/lunatic-distributed/src/control/api.rs @@ -27,6 +27,11 @@ pub struct ControlUrls { pub get_module: String, pub add_module: String, pub get_nodes: String, + + /// Get a process + pub get_process: String, + pub add_process: String, + pub remove_process: String, } #[derive(Clone, Debug, Serialize, Deserialize)]