Skip to content

Commit

Permalink
feat: Change health connection to SSE
Browse files Browse the repository at this point in the history
  • Loading branch information
Threated committed Jan 22, 2025
1 parent ba6bbae commit 1b7d1e0
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 160 deletions.
1 change: 1 addition & 0 deletions broker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ bytes = { version = "1", optional = true }
axum-extra = { version = "0.10", features = ["typed-header"] }
hyper = { version = "1", default-features = false, optional = true}
hyper-util = { version = "0.1", default-features = false, features = ["tokio"], optional = true}
parking_lot = { version = "0.12", features = ["arc_lock", "send_guard"] }

[features]
sockets = ["dep:bytes", "shared/sockets", "dep:hyper", "dep:hyper-util"]
Expand Down
2 changes: 1 addition & 1 deletion broker/src/crypto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::time::Duration;
use tokio::{sync::RwLock, time::timeout};
use tracing::{debug, error, warn, info};

use crate::health::{self, Health, VaultStatus};
use crate::serve_health::{Health, VaultStatus};

pub struct GetCertsFromPki {
pki_realm: String,
Expand Down
85 changes: 0 additions & 85 deletions broker/src/health.rs

This file was deleted.

7 changes: 3 additions & 4 deletions broker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

mod banner;
mod crypto;
mod health;
mod serve;
mod serve_health;
mod serve_pki;
Expand All @@ -15,7 +14,7 @@ mod compare_client_server_version;
use std::{collections::HashMap, sync::Arc, time::Duration};

use crypto::GetCertsFromPki;
use health::{Health, InitStatus};
use serve_health::{Health, InitStatus};
use once_cell::sync::Lazy;
use shared::{config::CONFIG_CENTRAL, *, errors::SamplyBeamError};
use tokio::sync::RwLock;
Expand Down Expand Up @@ -45,8 +44,8 @@ pub async fn main() -> anyhow::Result<()> {

async fn init_broker_ca_chain(health: Arc<RwLock<Health>>) {
{
health.write().await.initstatus = health::InitStatus::FetchingIntermediateCert
health.write().await.initstatus = InitStatus::FetchingIntermediateCert
}
shared::crypto::init_ca_chain().await.expect("Failed to init broker ca chain");
health.write().await.initstatus = health::InitStatus::Done;
health.write().await.initstatus = InitStatus::Done;
}
2 changes: 1 addition & 1 deletion broker/src/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use tokio::{
};
use tracing::{debug, info, trace, warn};

use crate::{banner, crypto, health::Health, serve_health, serve_pki, serve_tasks, compare_client_server_version};
use crate::{banner, crypto, serve_health::Health, serve_health, serve_pki, serve_tasks, compare_client_server_version};

pub(crate) async fn serve(health: Arc<RwLock<Health>>) -> anyhow::Result<()> {
let app = serve_tasks::router()
Expand Down
126 changes: 85 additions & 41 deletions broker/src/serve_health.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use std::{sync::Arc, time::{Duration, SystemTime}};
use std::{collections::HashMap, convert::Infallible, marker::PhantomData, sync::Arc, time::{Duration, SystemTime}};

use axum::{extract::{State, Path}, http::StatusCode, routing::get, Json, Router, response::Response};
use axum::{extract::{Path, State}, http::StatusCode, response::{sse::{Event, KeepAlive}, Response, Sse}, routing::get, Json, Router};
use axum_extra::{headers::{authorization::Basic, Authorization}, TypedHeader};
use beam_lib::ProxyId;
use futures_core::Stream;
use serde::{Serialize, Deserialize};
use shared::{crypto_jwt::Authorized, Msg, config::CONFIG_CENTRAL};
use tokio::sync::RwLock;

use crate::{health::{Health, VaultStatus, Verdict, ProxyStatus, InitStatus}, compare_client_server_version::log_version_mismatch};
use crate::compare_client_server_version::log_version_mismatch;

#[derive(Serialize)]
struct HealthOutput {
Expand All @@ -16,6 +17,58 @@ struct HealthOutput {
init_status: InitStatus
}

#[derive(Serialize)]
#[serde(rename_all = "lowercase")]
pub enum Verdict {
Healthy,
Unhealthy,
Unknown,
}

impl Default for Verdict {
fn default() -> Self {
Verdict::Unknown
}
}

#[derive(Debug, Serialize, Clone, Copy, Default)]
#[serde(rename_all = "lowercase")]
pub enum VaultStatus {
Ok,
#[default]
Unknown,
OtherError,
LockedOrSealed,
Unreachable,
}

#[derive(Debug, Serialize, Clone, Copy, Default)]
#[serde(rename_all = "lowercase")]
pub enum InitStatus {
#[default]
Unknown,
FetchingIntermediateCert,
Done
}

#[derive(Debug, Default)]
pub struct Health {
pub vault: VaultStatus,
pub initstatus: InitStatus,
proxies: HashMap<ProxyId, ProxyStatus>
}

#[derive(Debug, Clone, Default)]
struct ProxyStatus {
online_guard: Arc<parking_lot::Mutex<Option<SystemTime>>>
}

impl ProxyStatus {
pub fn is_online(&self) -> bool {
self.online_guard.is_locked()
}
}

pub(crate) fn router(health: Arc<RwLock<Health>>) -> Router {
Router::new()
.route("/v1/health", get(handler))
Expand Down Expand Up @@ -46,14 +99,14 @@ async fn handler(
}

async fn get_all_proxies(State(state): State<Arc<RwLock<Health>>>) -> Json<Vec<ProxyId>> {
Json(state.read().await.proxies.keys().cloned().collect())
Json(state.read().await.proxies.iter().filter(|(_, v)| v.is_online()).map(|(k, _)| k).cloned().collect())
}

async fn proxy_health(
State(state): State<Arc<RwLock<Health>>>,
Path(proxy): Path<ProxyId>,
auth: TypedHeader<Authorization<Basic>>
) -> Result<(StatusCode, Json<ProxyStatus>), StatusCode> {
) -> Result<(StatusCode, Json<serde_json::Value>), StatusCode> {
let Some(ref monitoring_key) = CONFIG_CENTRAL.monitoring_api_key else {
return Err(StatusCode::NOT_IMPLEMENTED);
};
Expand All @@ -63,10 +116,12 @@ async fn proxy_health(
}

if let Some(reported_back) = state.read().await.proxies.get(&proxy) {
if reported_back.online() {
Err(StatusCode::OK)
if let Some(last_disconnect) = reported_back.online_guard.try_lock().as_deref().copied() {
Ok((StatusCode::SERVICE_UNAVAILABLE, Json(serde_json::json!({
"last_disconnect": last_disconnect
}))))
} else {
Ok((StatusCode::SERVICE_UNAVAILABLE, Json(reported_back.clone())))
Err(StatusCode::OK)
}
} else {
Err(StatusCode::NOT_FOUND)
Expand All @@ -76,48 +131,37 @@ async fn proxy_health(
async fn get_control_tasks(
State(state): State<Arc<RwLock<Health>>>,
proxy_auth: Authorized,
) -> StatusCode {
) -> Result<Sse<ForeverStream>, StatusCode> {
let proxy_id = proxy_auth.get_from().proxy_id();
// Once this is freed the connection will be removed from the map of connected proxies again
// This ensures that when the connection is dropped and therefore this response future the status of this proxy will be updated
let _connection_remover = ConnectedGuard::connect(&proxy_id, &state).await;

// In the future, this will wait for control tasks for the given proxy
tokio::time::sleep(Duration::from_secs(60 * 60)).await;
let Some(connect_guard) = state
.write()
.await
.proxies
.entry(proxy_id)
.or_default()
.online_guard
.try_lock_arc_for(Duration::from_secs(60))
else {
return Err(StatusCode::CONFLICT);
};

StatusCode::OK
Ok(Sse::new(ForeverStream(connect_guard)).keep_alive(KeepAlive::new()))
}

struct ConnectedGuard<'a> {
proxy: &'a ProxyId,
state: &'a Arc<RwLock<Health>>
}
struct ForeverStream(#[allow(dead_code)] parking_lot::ArcMutexGuard<parking_lot::RawMutex, Option<SystemTime>>);

impl<'a> ConnectedGuard<'a> {
async fn connect(proxy: &'a ProxyId, state: &'a Arc<RwLock<Health>>) -> ConnectedGuard<'a> {
{
state.write().await.proxies
.entry(proxy.clone())
.and_modify(ProxyStatus::connect)
.or_insert(ProxyStatus::new());
}
Self { proxy, state }
impl Stream for ForeverStream {
type Item = Result<Event, Infallible>;

fn poll_next(self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<Self::Item>> {
std::task::Poll::Pending
}
}

impl<'a> Drop for ConnectedGuard<'a> {
impl Drop for ForeverStream {
fn drop(&mut self) {
let proxy_id = self.proxy.clone();
let map = self.state.clone();
tokio::spawn(async move {
// We wait here for one second to give the client a bit of time to reconnect incrementing the connection count so that it will be one again after the decrement
tokio::time::sleep(Duration::from_secs(1)).await;
map.write()
.await
.proxies
.get_mut(&proxy_id)
.expect("Has to exist as we don't remove items and the constructor of this type inserts the entry")
.disconnect();
});
*self.0 = Some(SystemTime::now());
}
}
}
Loading

0 comments on commit 1b7d1e0

Please sign in to comment.