diff --git a/Cargo.lock b/Cargo.lock index 90891223..2f9df6fc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2412,6 +2412,7 @@ dependencies = [ "diesel", "diesel-async", "diesel_migrations", + "futures", "itertools", "parking_lot 0.12.3", "sable_network", diff --git a/sable_history/Cargo.toml b/sable_history/Cargo.toml index 1ed39cfb..c65abbd5 100644 --- a/sable_history/Cargo.toml +++ b/sable_history/Cargo.toml @@ -11,6 +11,7 @@ built = { version = "0.5", features = [ "git2" ] } sable_network = { path = "../sable_network" } sable_server = { path = "../sable_server" } +futures = "0.3" tokio = { version = "1.14", features = [ "full" ] } serde = { version = "1", features = [ "derive" ] } serde_with = "1.11" diff --git a/sable_history/diesel.toml b/sable_history/diesel.toml index a0d61bf4..06043aa5 100644 --- a/sable_history/diesel.toml +++ b/sable_history/diesel.toml @@ -4,6 +4,7 @@ [print_schema] file = "src/schema.rs" custom_type_derives = ["diesel::query_builder::QueryId", "Clone"] +import_types = ["crate::type::*"] [migrations_directory] dir = "migrations" diff --git a/sable_history/migrations/2024-10-27-125826_reproducible_messages/down.sql b/sable_history/migrations/2024-10-27-125826_reproducible_messages/down.sql new file mode 100644 index 00000000..12136b7f --- /dev/null +++ b/sable_history/migrations/2024-10-27-125826_reproducible_messages/down.sql @@ -0,0 +1,7 @@ +DROP INDEX messages_by_timestamp; + +ALTER TABLE + DROP COLUMN message_type, + DROP COLUMN timestamp; + +DROP TYPE "MessageType"; diff --git a/sable_history/migrations/2024-10-27-125826_reproducible_messages/up.sql b/sable_history/migrations/2024-10-27-125826_reproducible_messages/up.sql new file mode 100644 index 00000000..990a57c3 --- /dev/null +++ b/sable_history/migrations/2024-10-27-125826_reproducible_messages/up.sql @@ -0,0 +1,8 @@ +CREATE TYPE "Message_Type" AS ENUM ('privmsg', 'notice'); + +ALTER TABLE messages + ADD COLUMN message_type "Message_Type" NOT NULL, + ADD COLUMN timestamp TIMESTAMP NOT NULL; + +CREATE INDEX messages_by_timestamp ON messages USING BRIN (timestamp, id); +COMMENT ON INDEX messages_by_timestamp IS 'Includes the id in order to be a consistent total order across requests'; diff --git a/sable_history/src/lib.rs b/sable_history/src/lib.rs index 440bcb46..c89921ea 100644 --- a/sable_history/src/lib.rs +++ b/sable_history/src/lib.rs @@ -1,5 +1,8 @@ +mod pg_history_service; +pub use pg_history_service::PgHistoryService; mod server; pub use server::*; mod models; mod schema; +mod types; diff --git a/sable_history/src/models/message.rs b/sable_history/src/models/message.rs index 7934b523..d1f2d01f 100644 --- a/sable_history/src/models/message.rs +++ b/sable_history/src/models/message.rs @@ -5,9 +5,19 @@ use super::*; #[diesel(check_for_backend(diesel::pg::Pg))] #[diesel(belongs_to(Channel, foreign_key = target_channel))] #[diesel(belongs_to(HistoricUser, foreign_key = source_user))] +#[derive(Debug)] pub struct Message { pub id: Uuid, pub source_user: i32, pub target_channel: i64, pub text: String, + pub message_type: crate::types::MessageType, + /// Timestamp of the *update* introducing the message. + /// + /// This is usually the same second as the one in [`id`] (a UUIDv7), but is + /// occasionally 1 second later, because the message id is created before being + /// pushed to the log. + /// It can also before significantly different, because both are based on the + /// system clock, which can change arbitrarily. + pub timestamp: chrono::NaiveDateTime, } diff --git a/sable_history/src/pg_history_service.rs b/sable_history/src/pg_history_service.rs new file mode 100644 index 00000000..b151ad27 --- /dev/null +++ b/sable_history/src/pg_history_service.rs @@ -0,0 +1,326 @@ +use std::collections::HashMap; + +use anyhow::{bail, Result}; +use chrono::{DateTime, NaiveDateTime, Utc}; +use diesel::dsl::sql; +use diesel::prelude::*; +use diesel_async::{AsyncPgConnection, RunQueryDsl}; +use futures::stream::{StreamExt, TryStreamExt}; +use tokio::sync::Mutex; +use uuid::Uuid; + +use sable_network::prelude::*; + +use crate::schema::{channels, historic_users, messages}; + +/// Implementation of [`HistoryService`] backed PostgreSQL +pub struct PgHistoryService<'a> { + database_connection: &'a Mutex, +} + +impl<'a> PgHistoryService<'a> { + pub fn new(database_connection: &'a Mutex) -> Self { + Self { + database_connection, + } + } +} + +impl<'a> HistoryService for PgHistoryService<'a> { + async fn list_targets( + &self, + _user: UserId, + _after_ts: Option, + _before_ts: Option, + _limit: Option, + ) -> HashMap { + // TODO: access control + // TODO: after_ts, before_ts, limit + match channels::dsl::channels + .select(( + channels::dsl::id, + sql::( + "SELECT MAX(id) FROM messages WHERE target_channel=channels.id", + ), + )) + .load_stream(&mut *self.database_connection.lock().await) + .await + { + Err(e) => { + tracing::error!("Could not get history channels: {e}"); + return HashMap::new(); + } + Ok(rows) => rows + .map(|row| -> Result<(TargetId, i64)> { + let (channel_id, max_message_id): (i64, Uuid) = row?; + let channel = + TargetId::Channel(ChannelId::from(Snowflake::from(channel_id as u64))); + + let Some(ts) = max_message_id.get_timestamp() else { + bail!("messages.id should be a UUID7, not {max_message_id}"); + }; + let (seconds, _) = ts.to_unix(); + let Ok(seconds) = seconds.try_into() else { + bail!("message {max_message_id}'s UNIX timestamp is negative"); + }; + Ok((channel, seconds)) + }) + .try_collect() + .await + .unwrap_or_else(|e| { + tracing::error!("Could not read rows: {e}"); + HashMap::new() + }), + } + } + + async fn get_entries( + &self, + _user: UserId, + target: TargetId, + request: HistoryRequest, + ) -> Result, HistoryError> { + // TODO: access control + let TargetId::Channel(channel_id) = target else { + // TODO: PMs + return Err(HistoryError::InvalidTarget(target)); + }; + + let mut connection_lock = self.database_connection.lock().await; + + let db_channel_id = channel_id.as_u64() as i64; + let channel = match channels::dsl::channels + .find(db_channel_id) + .select(crate::models::Channel::as_select()) + .first(&mut *connection_lock) + .await + .optional() + { + Ok(Some(channel)) => channel, + Ok(None) => return Err(HistoryError::InvalidTarget(target)), + Err(e) => { + tracing::error!("Could not check if channel exists: {e}"); + return Err(HistoryError::InternalError( + "Could not check if channel exists".to_string(), + )); + } + }; + + let base_query = messages::dsl::messages + .inner_join(historic_users::dsl::historic_users) + .select(( + messages::dsl::id, + messages::dsl::timestamp, + messages::dsl::message_type, + messages::dsl::text, + historic_users::dsl::nick, + historic_users::dsl::ident, + historic_users::dsl::vhost, + historic_users::dsl::account_name, + )) + .filter(messages::dsl::target_channel.eq(db_channel_id)); + match request { + HistoryRequest::Latest { to_ts, limit } => { + let limit = i64::min(10000, i64::try_from(limit).unwrap_or(i64::MAX)); + match to_ts { + Some(to_ts) => { + let to_ts = DateTime::from_timestamp(to_ts, 999_999) + .unwrap_or(DateTime::::MIN_UTC) + .naive_utc(); + collect_query( + connection_lock, + &channel, + true, // reverse + base_query + .filter(messages::dsl::timestamp.gt(to_ts)) + // total order, consistent across requests + .order((messages::dsl::timestamp.desc(), messages::dsl::id.desc())) + .limit(limit), + ) + .await + } + None => { + collect_query( + connection_lock, + &channel, + true, // reverse + base_query + // total order, consistent across requests + .order((messages::dsl::timestamp.desc(), messages::dsl::id.desc())) + .limit(limit), + ) + .await + } + } + } + HistoryRequest::Before { from_ts, limit } => { + let limit = i64::min(10000, i64::try_from(limit).unwrap_or(i64::MAX)); + let from_ts = DateTime::from_timestamp(from_ts, 0) + .unwrap_or(DateTime::::MAX_UTC) + .naive_utc(); + collect_query( + connection_lock, + &channel, + true, // reverse + base_query + .filter(messages::dsl::timestamp.lt(from_ts)) + // total order, consistent across requests + .order((messages::dsl::timestamp.desc(), messages::dsl::id.desc())) + .limit(limit), + ) + .await + } + HistoryRequest::After { start_ts, limit } => { + let limit = i64::min(10000, i64::try_from(limit).unwrap_or(i64::MAX)); + let start_ts = DateTime::from_timestamp(start_ts, 999_999) + .unwrap_or(DateTime::::MIN_UTC) + .naive_utc(); + collect_query( + connection_lock, + &channel, + false, // don't reverse + base_query + .filter(messages::dsl::timestamp.gt(start_ts)) + // total order, consistent across requests + .order((messages::dsl::timestamp, messages::dsl::id)) + .limit(limit), + ) + .await + } + HistoryRequest::Around { around_ts, limit } => { + let limit = i64::min(10000, i64::try_from(limit).unwrap_or(i64::MAX)); + let around_ts = DateTime::from_timestamp(around_ts, 0) + .unwrap_or(DateTime::::MIN_UTC) + .naive_utc(); + collect_query( + connection_lock, + &channel, + false, // don't reverse + CombineDsl::union( + base_query + .filter(messages::dsl::timestamp.le(around_ts)) + // total order, consistent across requests + .order((messages::dsl::timestamp.desc(), messages::dsl::id.desc())) + .limit(limit), + base_query + .filter(messages::dsl::timestamp.gt(around_ts)) + // total order, consistent across requests + .order((messages::dsl::timestamp, messages::dsl::id)) + .limit(limit), + ), + ) + .await + .map(|mut events| { + // TODO: make postgresql sort it, it may be able to do it directly from + // the index scan instead of sorting after the union + events.sort_unstable_by_key(|event| match event { + HistoricalEvent::Message { id, timestamp, .. } => (*timestamp, *id), + }); + events + }) + } + HistoryRequest::Between { + start_ts, + end_ts, + limit, + } => { + if start_ts <= end_ts { + let start_ts = DateTime::from_timestamp(start_ts, 999_999) + .unwrap_or(DateTime::::MIN_UTC) + .naive_utc(); + let end_ts = DateTime::from_timestamp(end_ts, 0) + .unwrap_or(DateTime::::MAX_UTC) + .naive_utc(); + let limit = i64::min(10000, i64::try_from(limit).unwrap_or(i64::MAX)); + collect_query( + connection_lock, + &channel, + false, // don't reverse + base_query + .filter(messages::dsl::timestamp.gt(start_ts)) + .filter(messages::dsl::timestamp.lt(end_ts)) + // total order, consistent across requests + .order((messages::dsl::timestamp, messages::dsl::id)) + .limit(limit), + ) + .await + } else { + let start_ts = DateTime::from_timestamp(start_ts, 0) + .unwrap_or(DateTime::::MAX_UTC) + .naive_utc(); + let end_ts = DateTime::from_timestamp(end_ts, 999_999) + .unwrap_or(DateTime::::MIN_UTC) + .naive_utc(); + let limit = i64::min(10000, i64::try_from(limit).unwrap_or(i64::MAX)); + collect_query( + connection_lock, + &channel, + true, // reverse + base_query + .filter(messages::dsl::timestamp.gt(end_ts)) + .filter(messages::dsl::timestamp.lt(start_ts)) + // total order, consistent across requests + .order((messages::dsl::timestamp.desc(), messages::dsl::id.desc())) + .limit(limit), + ) + .await + } + } + } + } +} + +type JoinedMessageRow = ( + uuid::Uuid, + NaiveDateTime, + crate::types::MessageType, + String, + String, + String, + String, + Option, +); + +async fn collect_query<'query>( + mut connection: tokio::sync::MutexGuard<'_, AsyncPgConnection>, + channel: &crate::models::Channel, + reverse: bool, + query: impl diesel_async::RunQueryDsl + + diesel_async::methods::LoadQuery<'query, AsyncPgConnection, JoinedMessageRow> + + 'query, +) -> Result, HistoryError> { + let events = query + .load_stream(&mut *connection) + .await + .map_err(|e| { + tracing::error!("Could not query messages: {e}"); + HistoryError::InternalError("Could not query messages".to_string()) + })? + .map_ok(|row| make_historical_event(channel, row)) + .try_collect::>() + .await + .map_err(|e| { + tracing::error!("Could not parse messages: {e}"); + HistoryError::InternalError("Could not parse message".to_string()) + })?; + Ok(if reverse { + events.into_iter().rev().collect() + } else { + events + }) +} + +fn make_historical_event( + channel: &crate::models::Channel, + (id, timestamp, message_type, text, source_nick, source_ident, source_vhost, source_account): JoinedMessageRow, +) -> HistoricalEvent { + HistoricalEvent::Message { + id: MessageId::new(id.try_into().expect("Message id is a non-v7 UUID")), + timestamp: timestamp.and_utc().timestamp(), + source: format!("{}!{}@{}", source_nick, source_ident, source_vhost), + source_account, + message_type: message_type.into(), + target: channel.name.clone(), // assume it's the same + text, + } +} diff --git a/sable_history/src/schema.rs b/sable_history/src/schema.rs index 2823831b..1f34b6ce 100644 --- a/sable_history/src/schema.rs +++ b/sable_history/src/schema.rs @@ -1,5 +1,11 @@ // @generated automatically by Diesel CLI. +pub mod sql_types { + #[derive(diesel::query_builder::QueryId, Clone, diesel::sql_types::SqlType)] + #[diesel(postgres_type(name = "Message_Type"))] + pub struct MessageType; +} + diesel::table! { channels (id) { id -> Int8, @@ -21,11 +27,16 @@ diesel::table! { } diesel::table! { + use diesel::sql_types::*; + use super::sql_types::MessageType; + messages (id) { id -> Uuid, source_user -> Int4, target_channel -> Int8, text -> Varchar, + message_type -> MessageType, + timestamp -> Timestamp, } } diff --git a/sable_history/src/server/mod.rs b/sable_history/src/server/mod.rs index 2e7577cd..0e039a8d 100644 --- a/sable_history/src/server/mod.rs +++ b/sable_history/src/server/mod.rs @@ -10,10 +10,12 @@ use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness}; use itertools::Itertools; use serde::Deserialize; use tokio::sync::{mpsc::UnboundedReceiver, Mutex}; +use tracing::instrument; use sable_network::prelude::*; use sable_server::ServerType; +mod sync; mod update_handler; pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations"); @@ -27,7 +29,7 @@ pub struct HistoryServerConfig { pub struct HistoryServer { node: Arc, history_receiver: Mutex>, - database_connection: Mutex, + database_connection: Mutex, // TODO: use a connection pool } impl ServerType for HistoryServer { @@ -108,6 +110,14 @@ impl ServerType for HistoryServer { { let Some(update) = update else { break; }; + if let NetworkStateChange::NewServer(new_server) = &update.change + { + if new_server.server == self.node.id() + { + self.burst_to_network().await; + } + } + if let Err(error) = self.handle_history_update(update).await { tracing::error!(?error, "Error return handling history update"); } @@ -133,10 +143,56 @@ impl ServerType for HistoryServer { unimplemented!("history servers can't hot-upgrade"); } - fn handle_remote_command( + #[instrument(skip_all)] + async fn handle_remote_command( &self, - _request: sable_network::rpc::RemoteServerRequestType, + req: sable_network::rpc::RemoteServerRequestType, ) -> sable_network::rpc::RemoteServerResponse { - todo!() + tracing::debug!(?req, "Got remote request"); + + use crate::server::rpc::RemoteServerRequestType::*; + use sable_network::rpc::RemoteServerResponse; + + match req { + History(req) => { + use crate::server::rpc::RemoteHistoryServerRequestType::*; + use crate::server::rpc::RemoteHistoryServerResponse::*; + + let history_service = crate::PgHistoryService::new(&self.database_connection); + match req { + ListTargets { + user, + after_ts, + before_ts, + limit, + } => TargetList( + history_service + .list_targets(user, after_ts, before_ts, limit) + .await + .into_iter() + .collect(), + ), + GetEntries { + user, + target, + request, + } => Entries( + history_service + .get_entries(user, target, request) + .await + .map(|entries| entries.into_iter().collect()), + ), + } + .into() + } + Services(_) => { + tracing::warn!(?req, "Got unsupported request (services)"); + RemoteServerResponse::NotSupported + } + Ping => { + tracing::warn!(?req, "Got unsupported request (ping)"); + RemoteServerResponse::NotSupported + } + } } } diff --git a/sable_history/src/server/sync.rs b/sable_history/src/server/sync.rs new file mode 100644 index 00000000..c293e2f4 --- /dev/null +++ b/sable_history/src/server/sync.rs @@ -0,0 +1,10 @@ +use super::*; +use crate::server::event::IntroduceHistoryServer; + +impl HistoryServer { + pub(super) async fn burst_to_network(&self) { + // Set ourselves as the active history node + self.node + .submit_event(self.node.id(), IntroduceHistoryServer {}); + } +} diff --git a/sable_history/src/server/update_handler.rs b/sable_history/src/server/update_handler.rs index 23f5480f..f7f7dd44 100644 --- a/sable_history/src/server/update_handler.rs +++ b/sable_history/src/server/update_handler.rs @@ -1,3 +1,6 @@ +use chrono::DateTime; +use diesel_async::RunQueryDsl; + use super::*; use crate::models::HistoricUser; @@ -5,13 +8,13 @@ use rpc::NetworkHistoryUpdate; use state::HistoricMessageSourceId; use wrapper::HistoricMessageTarget; -use diesel::prelude::*; -use diesel_async::RunQueryDsl; - impl HistoryServer { pub async fn handle_history_update(&self, update: NetworkHistoryUpdate) -> anyhow::Result<()> { + let update_timestamp = update.timestamp(); match update.change { - NetworkStateChange::NewMessage(detail) => self.handle_new_message(detail).await, + NetworkStateChange::NewMessage(detail) => { + self.handle_new_message(detail, update_timestamp).await + } NetworkStateChange::NewUser(_) | NetworkStateChange::UserNickChange(_) @@ -34,6 +37,7 @@ impl HistoryServer { | NetworkStateChange::ServerQuit(_) | NetworkStateChange::NewAuditLogEntry(_) | NetworkStateChange::UserLoginChange(_) + | NetworkStateChange::HistoryServerUpdate(_) | NetworkStateChange::ServicesUpdate(_) | NetworkStateChange::EventComplete(_) => Ok(()), } @@ -131,7 +135,11 @@ impl HistoryServer { } } - async fn handle_new_message(&self, new_message: update::NewMessage) -> anyhow::Result<()> { + async fn handle_new_message( + &self, + new_message: update::NewMessage, + update_timestamp: i64, + ) -> anyhow::Result<()> { use crate::schema::messages::dsl::*; let net = self.node.network(); @@ -153,8 +161,12 @@ impl HistoryServer { let db_message = crate::models::Message { id: **net_message.id(), + timestamp: DateTime::from_timestamp(update_timestamp, 0) + .context("Timestamp overflowed")? + .naive_utc(), // may differ from the message's timestamp source_user: db_source.id, target_channel: db_channel.id, + message_type: net_message.message_type().into(), text: net_message.text().to_string(), }; @@ -164,6 +176,8 @@ impl HistoryServer { .execute(&mut *connection_lock) .await?; + tracing::trace!("Persisted message: {db_message:?}"); + Ok(()) } } diff --git a/sable_history/src/types.rs b/sable_history/src/types.rs new file mode 100644 index 00000000..c941720a --- /dev/null +++ b/sable_history/src/types.rs @@ -0,0 +1,54 @@ +use std::io::Write; + +use diesel::pg::{Pg, PgValue}; +use diesel::{deserialize, serialize}; +use diesel::{AsExpression, FromSqlRow}; + +use crate::schema::sql_types::MessageType as SqlMessageType; + +#[derive(Debug, PartialEq, FromSqlRow, AsExpression, Eq)] +#[diesel(sql_type = SqlMessageType)] +pub enum MessageType { + Privmsg, + Notice, +} + +impl serialize::ToSql for MessageType { + fn to_sql<'b>(&'b self, out: &mut serialize::Output<'b, '_, Pg>) -> serialize::Result { + match *self { + MessageType::Privmsg => out.write_all(b"privmsg")?, + MessageType::Notice => out.write_all(b"notice")?, + } + Ok(serialize::IsNull::No) + } +} + +impl deserialize::FromSql for MessageType { + fn from_sql(bytes: PgValue<'_>) -> deserialize::Result { + match bytes.as_bytes() { + b"privmsg" => Ok(MessageType::Privmsg), + b"notice" => Ok(MessageType::Notice), + _ => Err("Unrecognized enum variant for MessageType".into()), + } + } +} + +impl From for MessageType { + fn from(value: sable_network::network::state::MessageType) -> Self { + use sable_network::network::state::MessageType::*; + match value { + Privmsg => MessageType::Privmsg, + Notice => MessageType::Notice, + } + } +} + +impl From for sable_network::network::state::MessageType { + fn from(value: MessageType) -> Self { + use sable_network::network::state::MessageType::*; + match value { + MessageType::Privmsg => Privmsg, + MessageType::Notice => Notice, + } + } +} diff --git a/sable_ircd/src/capability/account_tag.rs b/sable_ircd/src/capability/account_tag.rs index 30af9595..f182dfac 100644 --- a/sable_ircd/src/capability/account_tag.rs +++ b/sable_ircd/src/capability/account_tag.rs @@ -27,6 +27,7 @@ fn account_for_tag(update: &NetworkStateChange, net: &Network) -> Option NetworkStateChange::NewServer(_) => None, NetworkStateChange::ServerQuit(_) => None, NetworkStateChange::NewAuditLogEntry(_) => None, + NetworkStateChange::HistoryServerUpdate(_) => None, NetworkStateChange::ServicesUpdate(_) => None, NetworkStateChange::EventComplete(_) => None, }?; diff --git a/sable_ircd/src/command/handlers/chathistory.rs b/sable_ircd/src/command/handlers/chathistory.rs index bf46a4ee..0d33d92f 100644 --- a/sable_ircd/src/command/handlers/chathistory.rs +++ b/sable_ircd/src/command/handlers/chathistory.rs @@ -1,9 +1,10 @@ -use super::*; -use crate::{capability::ClientCapability, utils}; -use messages::send_history::SendHistoryItem; +use std::cmp::{max, min}; + use sable_network::history::{HistoryError, HistoryRequest, HistoryService, TargetId}; -use std::cmp::{max, min}; +use super::*; +use crate::capability::server_time; +use crate::{capability::ClientCapability, utils}; fn parse_msgref(subcommand: &str, target: Option<&str>, msgref: &str) -> Result { match msgref.split_once('=') { @@ -136,13 +137,19 @@ async fn handle_chathistory( } }; - let history_service = LocalHistoryService::new(server.node()); + let history_service = server.node().history_service(); match history_service .get_entries(source.id(), target_id, request) .await { Ok(entries) => send_history_entries(server, response, target, entries)?, Err(HistoryError::InvalidTarget(_)) => Err(invalid_target_error())?, + Err(HistoryError::InternalError(e)) => Err(CommandError::Fail { + command: "CHATHISTORY", + code: "MESSAGE_ERROR", + context: format!("{} {}", subcommand, target), + description: e, + })?, }; } } @@ -160,7 +167,7 @@ async fn list_targets<'a>( to_ts: Option, limit: Option, ) { - let history_service = LocalHistoryService::new(server.node()); + let history_service = server.node().history_service(); let found_targets = history_service .list_targets(source.id(), to_ts, from_ts, limit) @@ -197,20 +204,45 @@ async fn list_targets<'a>( } fn send_history_entries<'a>( - server: &ClientServer, - into: impl MessageSink, + _server: &ClientServer, + conn: impl MessageSink, target: &str, - entries: impl IntoIterator, + entries: impl IntoIterator, ) -> CommandResult { - let batch = into + let batch = conn .batch("chathistory", ClientCapability::Batch) .with_arguments(&[target]) .start(); for entry in entries { - // Ignore errors here; it's possible that a message has been expired out of network state - // but a reference to it still exists in the history log - let _ = server.send_item(&entry, &batch, &entry); + match entry { + HistoricalEvent::Message { + id, + timestamp, + source, + source_account, + target: _, // assume it's the same as the one we got as parameter + message_type, + text, + } => { + let msg = message::Message::new(&source, &target, message_type, &text) + .with_tag(server_time::server_time_tag( + i64::try_from(timestamp).unwrap_or(i64::MAX), + )) + .with_tag(OutboundMessageTag::new( + "msgid", + Some(id.to_string()), + ClientCapability::MessageTags, + )) + .with_tag(OutboundMessageTag::new( + "account", + source_account, + ClientCapability::AccountTag, + )); + + batch.send(msg); + } + } } Ok(()) diff --git a/sable_ircd/src/command/handlers/register.rs b/sable_ircd/src/command/handlers/register.rs index 06ab5304..d7b1f3a2 100644 --- a/sable_ircd/src/command/handlers/register.rs +++ b/sable_ircd/src/command/handlers/register.rs @@ -35,7 +35,7 @@ async fn do_register_user( _email: &str, password: &str, ) -> CommandResult { - let Some(services_name) = network.current_services_name() else { + let Some(services_name) = network.current_services_server_name() else { response_to.send(message::Fail::new( "REGISTER", "TEMPORARILY_UNAVAILABLE", diff --git a/sable_ircd/src/command/plumbing/argument_wrappers.rs b/sable_ircd/src/command/plumbing/argument_wrappers.rs index 73d93603..3adf34b9 100644 --- a/sable_ircd/src/command/plumbing/argument_wrappers.rs +++ b/sable_ircd/src/command/plumbing/argument_wrappers.rs @@ -18,7 +18,7 @@ impl<'a> AmbientArgument<'a> for ServicesTarget<'a> { Ok(Self { name: ctx .network() - .current_services_name() + .current_services_server_name() .ok_or(CommandError::ServicesNotAvailable)?, server: ctx.server(), }) diff --git a/sable_ircd/src/messages/send_history.rs b/sable_ircd/src/messages/send_history.rs index 282ca9c1..10c245c9 100644 --- a/sable_ircd/src/messages/send_history.rs +++ b/sable_ircd/src/messages/send_history.rs @@ -50,6 +50,7 @@ impl SendHistoryItem for ClientServer { | NetworkStateChange::ServerQuit(_) | NetworkStateChange::NewAuditLogEntry(_) | NetworkStateChange::UserLoginChange(_) + | NetworkStateChange::HistoryServerUpdate(_) | NetworkStateChange::ServicesUpdate(_) | NetworkStateChange::EventComplete(_) => Ok(()), } @@ -86,6 +87,7 @@ impl SendHistoryItem for ClientServer { | NetworkStateChange::ServerQuit(_) | NetworkStateChange::NewAuditLogEntry(_) | NetworkStateChange::UserLoginChange(_) + | NetworkStateChange::HistoryServerUpdate(_) | NetworkStateChange::ServicesUpdate(_) | NetworkStateChange::EventComplete(_) => Ok(()), } diff --git a/sable_ircd/src/messages/source_target.rs b/sable_ircd/src/messages/source_target.rs index 51da5e06..22d1cfca 100644 --- a/sable_ircd/src/messages/source_target.rs +++ b/sable_ircd/src/messages/source_target.rs @@ -103,6 +103,12 @@ impl MessageTarget for String { } } +impl MessageTarget for &str { + fn format(&self) -> String { + self.to_string() + } +} + impl MessageTarget for state::HistoricUser { fn format(&self) -> String { self.nickname.to_string() diff --git a/sable_ircd/src/server/server_type.rs b/sable_ircd/src/server/server_type.rs index 7114773b..1ef7a65d 100644 --- a/sable_ircd/src/server/server_type.rs +++ b/sable_ircd/src/server/server_type.rs @@ -1,9 +1,11 @@ -use super::*; -use crate::connection_collection::ConnectionCollectionState; use anyhow::Context; +use tracing::instrument; + use client_listener::SavedListenerCollection; use sable_server::ServerSaveError; +use super::*; +use crate::connection_collection::ConnectionCollectionState; use crate::monitor::MonitorSet; /// Saved state of a [`ClientServer`] for later resumption @@ -168,7 +170,8 @@ impl sable_server::ServerType for ClientServer { } } - fn handle_remote_command(&self, cmd: RemoteServerRequestType) -> RemoteServerResponse { + #[instrument(skip_all)] + async fn handle_remote_command(&self, cmd: RemoteServerRequestType) -> RemoteServerResponse { match cmd { RemoteServerRequestType::Ping => RemoteServerResponse::Success, _ => RemoteServerResponse::NotSupported, diff --git a/sable_network/Cargo.toml b/sable_network/Cargo.toml index 5df64d42..d25615a2 100644 --- a/sable_network/Cargo.toml +++ b/sable_network/Cargo.toml @@ -42,7 +42,7 @@ rand = "0.8" arrayvec = { version = "0.7", features = [ "serde" ] } hashers = "1" serde_with = "1.11" -parking_lot = { version = "0.12", features = [ "serde" ] } +parking_lot = { version = "0.12.2", features = [ "serde", "arc_lock" ] } wildmatch = "2.1" concurrent_log = { version = "0.2.4", features = [ "serde" ] } ipnet = { version = "2", features = [ "serde" ] } diff --git a/sable_network/src/history/local_service.rs b/sable_network/src/history/local_service.rs index 7d91f3ef..a769e866 100644 --- a/sable_network/src/history/local_service.rs +++ b/sable_network/src/history/local_service.rs @@ -1,5 +1,7 @@ use std::collections::HashMap; +use tracing::instrument; + use crate::network::state::HistoricMessageTargetId; use crate::prelude::*; @@ -20,12 +22,12 @@ fn target_id_for_entry(for_user: UserId, entry: &HistoryLogEntry) -> Option { - node: &'a NetworkNode, +pub struct LocalHistoryService<'a, NetworkPolicy: policy::PolicyService> { + node: &'a NetworkNode, } -impl<'a> LocalHistoryService<'a> { - pub fn new(node: &'a NetworkNode) -> Self { +impl<'a, NetworkPolicy: policy::PolicyService> LocalHistoryService<'a, NetworkPolicy> { + pub fn new(node: &'a NetworkNode) -> Self { LocalHistoryService { node } } @@ -37,7 +39,7 @@ impl<'a> LocalHistoryService<'a> { to_ts: Option, backward_limit: usize, forward_limit: usize, - ) -> Result, HistoryError> { + ) -> Result, HistoryError> { let mut backward_entries = Vec::new(); let mut forward_entries = Vec::new(); let mut target_exists = false; @@ -45,6 +47,7 @@ impl<'a> LocalHistoryService<'a> { // Keep the lock on the NetworkHistoryLog between the backward and the forward // search to make sure both have a consistent state let log = self.node.history(); + let net = self.node.network(); if backward_limit != 0 { let from_ts = if forward_limit == 0 { @@ -109,14 +112,43 @@ impl<'a> LocalHistoryService<'a> { Ok(backward_entries .into_iter() .rev() - .chain(forward_entries.into_iter())) + .chain(forward_entries.into_iter()) + .flat_map(move |entry| Self::translate_log_entry(entry, &net))) } else { Err(HistoryError::InvalidTarget(target)) } } + + fn translate_log_entry(entry: HistoryLogEntry, net: &Network) -> Option { + match entry.details { + NetworkStateChange::NewMessage(update::NewMessage { + message, + source: _, + target: _, + }) => { + let message = net.message(message).ok()?; + let source = message.source().ok()?; + let target = message.target().ok()?; + + Some(HistoricalEvent::Message { + id: message.id(), + timestamp: entry.timestamp(), // update's timestamp, may differ from the message's timestamp + message_type: message.message_type(), + source: source.nuh(), + source_account: source.account_name().map(|n| n.to_string()), + target: target.to_string(), + text: message.text().to_string(), + }) + } + _ => None, + } + } } -impl<'a> HistoryService for LocalHistoryService<'a> { +impl<'a, NetworkPolicy: policy::PolicyService> HistoryService + for LocalHistoryService<'a, NetworkPolicy> +{ + #[instrument(skip(self))] async fn list_targets( &self, user: UserId, @@ -147,16 +179,19 @@ impl<'a> HistoryService for LocalHistoryService<'a> { } } + tracing::trace!("list_targets local response: {found_targets:?}"); + found_targets } + #[instrument(skip(self))] async fn get_entries( &self, user: UserId, target: TargetId, request: HistoryRequest, - ) -> Result, HistoryError> { - match request { + ) -> Result, HistoryError> { + let res = match request { #[rustfmt::skip] HistoryRequest::Latest { to_ts, limit } => self.get_history_for_target( user, @@ -222,6 +257,8 @@ impl<'a> HistoryService for LocalHistoryService<'a> { ) } } - } + }; + tracing::trace!("get_entries local response: {}", res.is_ok()); + res } } diff --git a/sable_network/src/history/log.rs b/sable_network/src/history/log.rs index 9d714bf1..1d6668c6 100644 --- a/sable_network/src/history/log.rs +++ b/sable_network/src/history/log.rs @@ -165,6 +165,7 @@ impl NetworkHistoryLog { | NewAuditLogEntry(_) | UserLoginChange(_) | ServicesUpdate(_) + | HistoryServerUpdate(_) | EventComplete(_) => None, UserNickChange(_) diff --git a/sable_network/src/history/mod.rs b/sable_network/src/history/mod.rs index 74aa1716..2d5dc38d 100644 --- a/sable_network/src/history/mod.rs +++ b/sable_network/src/history/mod.rs @@ -3,9 +3,13 @@ pub use log::*; mod service; pub use service::*; mod local_service; +pub use local_service::LocalHistoryService; +mod remote_service; +pub use remote_service::RemoteHistoryService; +mod tiered_service; +pub use tiered_service::TieredHistoryService; use crate::network::NetworkStateChange; -pub use local_service::LocalHistoryService; /// Implemented by types that provide metadata for a historic state change pub trait HistoryItem { diff --git a/sable_network/src/history/remote_service.rs b/sable_network/src/history/remote_service.rs new file mode 100644 index 00000000..ed9dd860 --- /dev/null +++ b/sable_network/src/history/remote_service.rs @@ -0,0 +1,112 @@ +use std::collections::HashMap; + +use tracing::instrument; + +use crate::prelude::*; +use crate::rpc::*; + +/// Implementation of [`HistoryService`] that forwards requests to a [`HistoryServer`] +/// through the RPC. +pub struct RemoteHistoryService<'a, NetworkPolicy: policy::PolicyService> { + node: &'a NetworkNode, + remote_server_name: ServerName, +} + +impl<'a, NetworkPolicy: policy::PolicyService> RemoteHistoryService<'a, NetworkPolicy> { + pub fn new(node: &'a NetworkNode, remote_server_name: ServerName) -> Self { + RemoteHistoryService { + node, + remote_server_name, + } + } +} + +impl<'a, NetworkPolicy: policy::PolicyService> HistoryService + for RemoteHistoryService<'a, NetworkPolicy> +{ + #[instrument(skip(self))] + async fn list_targets( + &self, + user: UserId, + after_ts: Option, + before_ts: Option, + limit: Option, + ) -> HashMap { + let res = self + .node + .sync_log() + .send_remote_request( + self.remote_server_name, + RemoteHistoryServerRequestType::ListTargets { + user, + after_ts, + before_ts, + limit, + } + .into(), + ) + .await; + tracing::trace!("list_targets RPC response: {res:?}"); + match res { + Ok(RemoteServerResponse::History(RemoteHistoryServerResponse::TargetList( + targets, + ))) => targets.into_iter().collect(), + Ok(RemoteServerResponse::History(_)) + | Ok(RemoteServerResponse::Services(_)) + // This request should never error + | Ok(RemoteServerResponse::Error(_)) + | Ok(RemoteServerResponse::NotSupported) + | Ok(RemoteServerResponse::Success) => { + tracing::error!("Got unexpected response to ListTargets request: {res:?}"); + HashMap::new() + }, + Err(e) => { + tracing::error!("ListTargets request failed: {e:?}"); + HashMap::new() + } + } + } + + #[instrument(skip(self))] + async fn get_entries( + &self, + user: UserId, + target: TargetId, + request: HistoryRequest, + ) -> Result, HistoryError> { + let res = self + .node + .sync_log() + .send_remote_request( + self.remote_server_name, + rpc::RemoteHistoryServerRequestType::GetEntries { + user, + target, + request, + } + .into(), + ) + .await; + match res { + Ok(RemoteServerResponse::History(RemoteHistoryServerResponse::Entries( + entries, + ))) => { + tracing::trace!("get_entries RPC response: {}", entries.is_ok()); + entries + }, + Ok(RemoteServerResponse::History(_)) + | Ok(RemoteServerResponse::Services(_)) + // Errors while processing this request would return Entries(Err(_)) + | Ok(RemoteServerResponse::Error(_)) + | Ok(RemoteServerResponse::NotSupported) + | Ok(RemoteServerResponse::Success) => { + tracing::error!("Got unexpected response to GetEntries request: {res:?}"); + Ok(Vec::new()) + }, + Err(e) => { + tracing::error!("GetEntries request failed: {e:?}"); + Ok(Vec::new()) + } + } + } +} diff --git a/sable_network/src/history/service.rs b/sable_network/src/history/service.rs index accc9551..4af67f04 100644 --- a/sable_network/src/history/service.rs +++ b/sable_network/src/history/service.rs @@ -5,11 +5,10 @@ use std::future::Future; use thiserror::Error; -use crate::history::HistoryLogEntry; -use crate::network::state::{HistoricMessageSourceId, HistoricMessageTargetId}; +use crate::network::state::{HistoricMessageSourceId, HistoricMessageTargetId, MessageType}; use crate::prelude::*; -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] pub enum TargetId { User(UserId), Channel(ChannelId), @@ -50,6 +49,7 @@ impl TryFrom<&HistoricMessageTargetId> for TargetId { } } +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum HistoryRequest { Latest { to_ts: Option, @@ -74,10 +74,12 @@ pub enum HistoryRequest { }, } -#[derive(Error, Debug)] +#[derive(Error, Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum HistoryError { #[error("invalid target: {0:?}")] InvalidTarget(TargetId), + #[error("internal server error: {0:?}")] + InternalError(String), } /// A backend implementation of [IRCv3 CHATHISTORY](https://ircv3.net/specs/extensions/chathistory) @@ -98,5 +100,21 @@ pub trait HistoryService { user: UserId, target: TargetId, request: HistoryRequest, - ) -> impl Future, HistoryError>> + Send; + ) -> impl Future + Send, HistoryError>> + + Send; +} + +/// A more concrete representation of `sable_ircd`'s `HistoryItem`, with all its fields +/// inflated to strings that will be sent to the client +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub enum HistoricalEvent { + Message { + id: MessageId, + timestamp: i64, + source: String, + source_account: Option, + target: String, + message_type: MessageType, + text: String, + }, } diff --git a/sable_network/src/history/tiered_service.rs b/sable_network/src/history/tiered_service.rs new file mode 100644 index 00000000..a7a5fb01 --- /dev/null +++ b/sable_network/src/history/tiered_service.rs @@ -0,0 +1,172 @@ +use std::collections::hash_map::{Entry, HashMap}; + +use futures::TryFutureExt; +use tracing::instrument; + +use crate::prelude::*; + +/// Implementation of [`HistoryService`] backed by two other services, one fast and short-lived and +/// the other slower and longer-lived. +/// +/// This is used to query [`HistoryService`] when possible, and fall-back to a remote server when +/// events expired locally. +pub struct TieredHistoryService< + FastService: HistoryService + Send + Sync, + SlowService: HistoryService + Send + Sync, +> { + fast_service: Option, + slow_service: Option, +} + +impl + TieredHistoryService +{ + pub fn new(fast_service: Option, slow_service: Option) -> Self { + Self { + fast_service, + slow_service, + } + } +} + +impl + HistoryService for TieredHistoryService +{ + #[instrument(skip(self))] + async fn list_targets( + &self, + user: UserId, + after_ts: Option, + before_ts: Option, + limit: Option, + ) -> HashMap { + match (&self.fast_service, &self.slow_service) { + (Some(fast_service), Some(slow_service)) => { + let (mut targets1, mut targets2) = futures::join!( + slow_service.list_targets(user, after_ts, before_ts, limit), + fast_service.list_targets(user, after_ts, before_ts, limit) + ); + + // merge targets, taking the most recent timestamp for those present + // in both backends + if targets1.len() < targets2.len() { + (targets1, targets2) = (targets2, targets1); + } + for (target, ts) in targets2.drain() { + match targets1.entry(target) { + Entry::Occupied(mut entry) => { + if *entry.get() < ts { + entry.insert(ts); + } + } + Entry::Vacant(entry) => { + entry.insert(ts); + } + } + } + targets1 + } + (None, Some(slow_service)) => { + slow_service + .list_targets(user, after_ts, before_ts, limit) + .await + } + (Some(fast_service), None) => { + fast_service + .list_targets(user, after_ts, before_ts, limit) + .await + } + (None, None) => HashMap::new(), + } + } + + #[instrument(skip(self))] + async fn get_entries( + &self, + user: UserId, + target: TargetId, + request: HistoryRequest, + ) -> Result, HistoryError> { + // It's tempting to return Box here instead of collecting into a + // temporary Vec, but we can't because IntoIterator::IntoIter potentially differs + + macro_rules! get_entries { + ($service:expr, $user:expr, $target:expr, $request:expr) => { + $service + .get_entries($user, $target, $request) + .map_ok(|entries| -> Vec<_> { entries.into_iter().collect() }) + .await + }; + } + + match (&self.fast_service, &self.slow_service) { + (Some(fast_service), Some(slow_service)) => { + match request { + HistoryRequest::Latest { limit, .. } | HistoryRequest::Before { limit, .. } => { + let mut entries = get_entries!(fast_service, user, target, request.clone()) + .unwrap_or_else(|e| { + tracing::error!("Could not get history from fast service: {e}"); + vec![] + }); + if entries.len() < limit { + // TODO: send a BEFORE request, and merge lists together + entries = get_entries!(slow_service, user, target, request)?; + } + Ok(entries) + } + HistoryRequest::After { start_ts, .. } => { + // Check if the fast-but-shortlived backend still has messages up to that + // timestamp + match fast_service + .get_entries( + user, + target, + HistoryRequest::Before { + from_ts: start_ts, + limit: 1, + }, + ) + .await + { + Ok(entries) => { + if entries.into_iter().count() > 0 { + // Yes, it does, so we don't need the slow_service to fulfill + // the request + match get_entries!(fast_service, user, target, request.clone()) + { + Ok(entries) => Ok(entries), + Err(e) => { + tracing::error!( + "Could not get history from fast service: {e}" + ); + get_entries!(slow_service, user, target, request) + } + } + } else { + get_entries!(slow_service, user, target, request) + } + } + Err(e) => { + tracing::error!("Could not get history from fast service: {e}"); + get_entries!(slow_service, user, target, request) + } + } + } + HistoryRequest::Around { .. } | HistoryRequest::Between { .. } => { + // TODO: try to use the fast_service when possible + get_entries!(slow_service, user, target, request) + } + } + } + (None, Some(slow_service)) => { + let entries = slow_service.get_entries(user, target, request).await?; + Ok(entries.into_iter().collect()) + } + (Some(fast_service), None) => { + let entries = fast_service.get_entries(user, target, request).await?; + Ok(entries.into_iter().collect()) + } + (None, None) => Ok(Vec::new()), + } + } +} diff --git a/sable_network/src/id.rs b/sable_network/src/id.rs index 2ed2de2b..a91e5861 100644 --- a/sable_network/src/id.rs +++ b/sable_network/src/id.rs @@ -25,6 +25,18 @@ impl Uuid7 { } } +impl TryFrom for Uuid7 { + type Error = (); + + fn try_from(uuid: Uuid) -> Result { + if uuid.get_version() == Some(uuid::Version::SortRand) { + Ok(Self(uuid)) + } else { + Err(()) + } + } +} + #[derive(Debug, Error)] #[error("Mismatched object ID type for event")] pub struct WrongIdTypeError; diff --git a/sable_network/src/network/event/details.rs b/sable_network/src/network/event/details.rs index 342c8f5a..24ea911b 100644 --- a/sable_network/src/network/event/details.rs +++ b/sable_network/src/network/event/details.rs @@ -190,10 +190,14 @@ EventDetails => { } #[target_type(ServerId)] - struct IntroduceServices { + struct IntroduceServicesServer { pub sasl_mechanisms: Vec, } + #[target_type(ServerId)] + struct IntroduceHistoryServer { + } + #[target_type(AccountId)] struct AccountUpdate { pub data: Option, diff --git a/sable_network/src/network/network/accessors.rs b/sable_network/src/network/network/accessors.rs index cb9db3d0..f7b36bef 100644 --- a/sable_network/src/network/network/accessors.rs +++ b/sable_network/src/network/network/accessors.rs @@ -286,11 +286,17 @@ impl Network { } /// Retrieve the server name of the current active services - pub fn current_services_name(&self) -> Option { - self.current_services - .as_ref() - .and_then(|state| self.servers.get(&state.server_id)) - .map(|s| s.name) + pub fn current_history_server_name(&self) -> Option { + Some(self.servers.get(&self.current_history_server_id?)?.name) + } + + /// Retrieve the server name of the current active services + pub fn current_services_server_name(&self) -> Option { + Some( + self.servers + .get(&self.current_services.as_ref()?.server_id)? + .name, + ) } /// Retrieve the current services data diff --git a/sable_network/src/network/network/account_state.rs b/sable_network/src/network/network/account_state.rs index 6acd79ac..f0e3cb06 100644 --- a/sable_network/src/network/network/account_state.rs +++ b/sable_network/src/network/network/account_state.rs @@ -4,11 +4,11 @@ use crate::network::update::*; use crate::prelude::*; impl Network { - pub(super) fn introduce_services( + pub(super) fn introduce_services_server( &mut self, target: ServerId, event: &Event, - update: &IntroduceServices, + update: &IntroduceServicesServer, updates: &dyn NetworkUpdateReceiver, ) { self.current_services = Some(state::ServicesData { diff --git a/sable_network/src/network/network/history_state.rs b/sable_network/src/network/network/history_state.rs new file mode 100644 index 00000000..475e39e9 --- /dev/null +++ b/sable_network/src/network/network/history_state.rs @@ -0,0 +1,16 @@ +use super::Network; +use crate::network::event::*; +use crate::network::update::*; +use crate::prelude::*; + +impl Network { + pub(super) fn introduce_history_server( + &mut self, + target: ServerId, + _event: &Event, + _update: &IntroduceHistoryServer, + _updates: &dyn NetworkUpdateReceiver, + ) { + self.current_history_server_id = Some(target); + } +} diff --git a/sable_network/src/network/network/mod.rs b/sable_network/src/network/network/mod.rs index 8c2e3e07..0b3f5a87 100644 --- a/sable_network/src/network/network/mod.rs +++ b/sable_network/src/network/network/mod.rs @@ -89,6 +89,7 @@ pub struct Network { channel_roles: HashMap, current_services: Option, + current_history_server_id: Option, config: config::NetworkConfig, clock: EventClock, @@ -130,6 +131,7 @@ impl Network { channel_roles: HashMap::new(), current_services: None, + current_history_server_id: None, config, clock: EventClock::new(), @@ -207,7 +209,8 @@ impl Network { NewAuditLogEntry => self.new_audit_log, EnablePersistentSession => self.enable_persistent_session, DisablePersistentSession => self.disable_persistent_session, - IntroduceServices => self.introduce_services, + IntroduceServicesServer => self.introduce_services_server, + IntroduceHistoryServer => self.introduce_history_server, AccountUpdate => self.update_account, NickRegistrationUpdate => self.update_nick_registration, ChannelRegistrationUpdate => self.update_channel_registration, @@ -286,6 +289,7 @@ mod audit_log; mod ban_state; mod channel_state; mod config_state; +mod history_state; mod message_state; mod oper_state; mod server_state; diff --git a/sable_network/src/network/network/server_state.rs b/sable_network/src/network/network/server_state.rs index 4b84d117..3e48fa33 100644 --- a/sable_network/src/network/network/server_state.rs +++ b/sable_network/src/network/network/server_state.rs @@ -71,6 +71,14 @@ impl Network { } } + // ditto for the history server + if let Some(history_server_id) = self.current_history_server_id { + if removed.id == history_server_id { + self.current_history_server_id = None; + updates.notify(update::HistoryServerUpdate {}, event); + } + } + // Collect all the user connections that were on the departing server let removed_connections: Vec<_> = self .user_connections diff --git a/sable_network/src/network/update.rs b/sable_network/src/network/update.rs index fe527ded..ff1b931d 100644 --- a/sable_network/src/network/update.rs +++ b/sable_network/src/network/update.rs @@ -184,6 +184,10 @@ NetworkStateChange => { struct ServicesUpdate { } + /// The current history node has changed + struct HistoryServerUpdate { + } + /// A delimiter event to denote that an Event has been completely processed struct EventComplete { } }); diff --git a/sable_network/src/network/wrapper/message.rs b/sable_network/src/network/wrapper/message.rs index 8922187c..411bd439 100644 --- a/sable_network/src/network/wrapper/message.rs +++ b/sable_network/src/network/wrapper/message.rs @@ -56,6 +56,15 @@ impl MessageTarget<'_> { } } +impl ToString for MessageTarget<'_> { + fn to_string(&self) -> String { + match self { + Self::User(u) => u.nuh(), + Self::Channel(c) => c.name().to_string(), + } + } +} + /// A wrapper around a [`state::Message`] pub struct Message<'a> { network: &'a Network, diff --git a/sable_network/src/node/mod.rs b/sable_network/src/node/mod.rs index acb75474..554517d9 100644 --- a/sable_network/src/node/mod.rs +++ b/sable_network/src/node/mod.rs @@ -148,6 +148,17 @@ impl NetworkNode { self.history_log.read() } + /// Access the long-term history, potentially backed by RPC calls to the + /// history server + pub fn history_service(&self) -> impl HistoryService + use<'_, Policy> { + let local_service = LocalHistoryService::new(self); + let remote_service = self + .network() + .current_history_server_name() + .map(|history_server_name| RemoteHistoryService::new(self, history_server_name)); + TieredHistoryService::new(Some(local_service), remote_service) + } + /// Access the event log. pub fn event_log(&self) -> std::sync::RwLockReadGuard { self.event_log.event_log() diff --git a/sable_network/src/node/update_receiver.rs b/sable_network/src/node/update_receiver.rs index 91b908b7..7c29c470 100644 --- a/sable_network/src/node/update_receiver.rs +++ b/sable_network/src/node/update_receiver.rs @@ -247,6 +247,10 @@ impl NetworkNode { Ok(Vec::new()) } + fn handle_history_server_update(&self, _detail: &update::HistoryServerUpdate) -> HandleResult { + Ok(Vec::new()) + } + fn handle_services_update(&self, _detail: &update::ServicesUpdate) -> HandleResult { Ok(Vec::new()) } @@ -284,6 +288,7 @@ impl NetworkUpdateReceiver for NetworkNode ServerQuit(detail) => self.handle_server_quit(detail), NewAuditLogEntry(detail) => self.report_audit_entry(detail), UserLoginChange(detail) => self.handle_user_login(detail), + HistoryServerUpdate(detail) => self.handle_history_server_update(detail), ServicesUpdate(detail) => self.handle_services_update(detail), // We don't need to do anything with EventComplete, just pass it along to the subscriber EventComplete(_) => Ok(Vec::new()), diff --git a/sable_network/src/policy/mod.rs b/sable_network/src/policy/mod.rs index 55a1ead6..881f2fe9 100644 --- a/sable_network/src/policy/mod.rs +++ b/sable_network/src/policy/mod.rs @@ -50,6 +50,8 @@ pub trait PolicyService: + OperAuthenticationService + OperPolicyService + RegistrationPolicyService + + Sync + + Send { } diff --git a/sable_network/src/rpc/network_message.rs b/sable_network/src/rpc/network_message.rs index d45b765f..c8beafd5 100644 --- a/sable_network/src/rpc/network_message.rs +++ b/sable_network/src/rpc/network_message.rs @@ -1,4 +1,5 @@ use crate::{ + history::{HistoricalEvent, HistoryError, HistoryRequest}, id::*, network::{event::*, state::ChannelAccessSet, Network}, validated::*, @@ -38,7 +39,10 @@ pub struct RemoteServerRequest { pub enum RemoteServerRequestType { /// Simple ping for communication tests Ping, + /// A message to be handled by a services node Services(RemoteServicesServerRequestType), + /// A message to be handled by a history node + History(RemoteHistoryServerRequestType), } impl From for RemoteServerRequestType { @@ -47,6 +51,12 @@ impl From for RemoteServerRequestType { } } +impl From for RemoteServerRequestType { + fn from(req: RemoteHistoryServerRequestType) -> Self { + RemoteServerRequestType::History(req) + } +} + /// A message to be handled by a services node #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum RemoteServicesServerRequestType { @@ -89,6 +99,23 @@ pub enum RemoteServicesServerRequestType { RemoveAccountFingerprint(AccountId, String), } +/// A message to be handled by a services node +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub enum RemoteHistoryServerRequestType { + ListTargets { + user: UserId, + after_ts: Option, + before_ts: Option, + limit: Option, + }, + + GetEntries { + user: UserId, + target: crate::history::TargetId, + request: HistoryRequest, + }, +} + /// A SASL authentication response #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum AuthenticateStatus { @@ -113,6 +140,8 @@ pub enum RemoteServerResponse { Error(String), /// Response type specific to services servers Services(RemoteServicesServerResponse), + /// Response type specific to history servers + History(RemoteHistoryServerResponse), } impl From for RemoteServerResponse { @@ -121,6 +150,12 @@ impl From for RemoteServerResponse { } } +impl From for RemoteServerResponse { + fn from(resp: RemoteHistoryServerResponse) -> Self { + RemoteServerResponse::History(resp) + } +} + /// Remote services server response type #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum RemoteServicesServerResponse { @@ -139,3 +174,12 @@ pub enum RemoteServicesServerResponse { /// Channel isn't registered ChannelNotRegistered, } + +/// Remote history server response type +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub enum RemoteHistoryServerResponse { + /// TODO: switch to HashMap when we move away from JSON as the wire format, + /// to be consistent with [`HistoryService`] + TargetList(Vec<(crate::history::TargetId, i64)>), + Entries(Result, HistoryError>), +} diff --git a/sable_server/src/server.rs b/sable_server/src/server.rs index 0a6aa6b9..5b284046 100644 --- a/sable_server/src/server.rs +++ b/sable_server/src/server.rs @@ -236,7 +236,7 @@ where { if let Some(request) = request { - let response = server.handle_remote_command(request.req); + let response = server.handle_remote_command(request.req).await; if let Err(e) = request.response.send(response) { tracing::error!(?e, "Couldn't send response to remote command"); diff --git a/sable_server/src/server_type.rs b/sable_server/src/server_type.rs index 5077e669..003852c1 100644 --- a/sable_server/src/server_type.rs +++ b/sable_server/src/server_type.rs @@ -64,5 +64,8 @@ pub trait ServerType: Send + Sync + Sized + 'static { ) -> std::io::Result; /// Handle a request originating from a remote server - fn handle_remote_command(&self, request: RemoteServerRequestType) -> RemoteServerResponse; + fn handle_remote_command( + &self, + request: RemoteServerRequestType, + ) -> impl Future + Send; } diff --git a/sable_services/src/server/mod.rs b/sable_services/src/server/mod.rs index 524ada23..cbbfa253 100644 --- a/sable_services/src/server/mod.rs +++ b/sable_services/src/server/mod.rs @@ -22,11 +22,10 @@ use sable_server::ServerType; use std::{collections::HashMap, convert::Infallible, sync::Arc}; use anyhow::Context; +use dashmap::DashMap; use serde::Deserialize; - use tokio::sync::{broadcast, mpsc::UnboundedReceiver, Mutex}; - -use dashmap::DashMap; +use tracing::instrument; mod command; mod roles; @@ -136,7 +135,8 @@ where unimplemented!("services can't hot-upgrade"); } - fn handle_remote_command(&self, req: RemoteServerRequestType) -> RemoteServerResponse { + #[instrument(skip_all)] + async fn handle_remote_command(&self, req: RemoteServerRequestType) -> RemoteServerResponse { tracing::debug!(?req, "Got remote request"); use RemoteServerRequestType::*; @@ -205,8 +205,12 @@ where self.user_del_fp(acc, fp) } }, + History(_) => { + tracing::warn!(?req, "Got unsupported request (history)"); + Ok(RemoteServerResponse::NotSupported) + } Ping => { - tracing::warn!(?req, "Got unsupported request"); + tracing::warn!(?req, "Got unsupported request (ping)"); Ok(RemoteServerResponse::NotSupported) } }; diff --git a/sable_services/src/server/sync.rs b/sable_services/src/server/sync.rs index 0822fe85..73d45c1b 100644 --- a/sable_services/src/server/sync.rs +++ b/sable_services/src/server/sync.rs @@ -160,7 +160,7 @@ where // Finally, set ourselves as the active services node self.node.submit_event( self.node.id(), - IntroduceServices { + IntroduceServicesServer { sasl_mechanisms: vec!["PLAIN".to_owned()], }, );