From baf84b1bd096ff75d194642324434e15f20fb633 Mon Sep 17 00:00:00 2001 From: Val Lorentz Date: Sat, 4 May 2024 14:00:23 +0200 Subject: [PATCH] Make history backend modular The new `sable_history` crate defines a `HistoryService` trait, which could be implemented in various ways: 1. in-memory, by querying the `NetworkHistoryLog` that every node has 2. database-based, by querying a local or remote database (sqlite, postgresql, ...) 3. remote, by querying an hypothetical dedicated node with a RPC mechanism that remains to be defined For now, only item 1 is implemented, by moving the backend implementation of the `CHATHISTORY` command handler to `sable_history`. The "hypothetical dedicated node" will probably be implemented by `sable_history` itself, and it will also benefit from the `HistoryService`, which will allow it to support multiple database backends, so we can more easily prototype and compare them. --- Cargo.lock | 10 + Cargo.toml | 9 +- sable_history/Cargo.toml | 12 + sable_history/build.rs | 3 + sable_history/src/lib.rs | 99 +++++ sable_history/src/local_history.rs | 199 +++++++++ sable_ircd/Cargo.toml | 1 + .../src/command/handlers/chathistory.rs | 381 +++++------------- .../src/command/plumbing/target_types.rs | 15 + 9 files changed, 441 insertions(+), 288 deletions(-) create mode 100644 sable_history/Cargo.toml create mode 100644 sable_history/build.rs create mode 100644 sable_history/src/lib.rs create mode 100644 sable_history/src/local_history.rs diff --git a/Cargo.lock b/Cargo.lock index 32c303d6..1530aaa1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2047,6 +2047,15 @@ version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fe232bdf6be8c8de797b22184ee71118d63780ea42ac85b61d1baa6d3b782ae9" +[[package]] +name = "sable_history" +version = "0.1.0" +dependencies = [ + "built", + "sable_macros", + "sable_network", +] + [[package]] name = "sable_ipc" version = "0.1.0" @@ -2078,6 +2087,7 @@ dependencies = [ "pwhash", "rand", "rustls", + "sable_history", "sable_macros", "sable_network", "sable_server", diff --git a/Cargo.toml b/Cargo.toml index 4d4f672e..717e1ade 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,12 +1,13 @@ [workspace] resolver = "2" members = [ - "sable_macros", - "sable_network", - "sable_ircd", - "client_listener", "auth_client", + "client_listener", + "sable_ircd", "sable_ipc", + "sable_history", + "sable_macros", + "sable_network", "sable_server", "sable_services", ] diff --git a/sable_history/Cargo.toml b/sable_history/Cargo.toml new file mode 100644 index 00000000..41ae381f --- /dev/null +++ b/sable_history/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "sable_history" +version = "0.1.0" +edition = "2021" +build = "build.rs" + +[build-dependencies] +built = { version = "0.5", features = [ "git2" ] } + +[dependencies] +sable_macros = { path = "../sable_macros" } +sable_network = { path = "../sable_network" } diff --git a/sable_history/build.rs b/sable_history/build.rs new file mode 100644 index 00000000..d8f91cb9 --- /dev/null +++ b/sable_history/build.rs @@ -0,0 +1,3 @@ +fn main() { + built::write_built_file().expect("Failed to acquire build-time information"); +} diff --git a/sable_history/src/lib.rs b/sable_history/src/lib.rs new file mode 100644 index 00000000..3ae51299 --- /dev/null +++ b/sable_history/src/lib.rs @@ -0,0 +1,99 @@ +//! History storage and retrieval + +use std::collections::HashMap; + +use sable_network::history::HistoryLogEntry; +use sable_network::prelude::update::{HistoricMessageSource, HistoricMessageTarget}; +use sable_network::prelude::*; + +#[derive(Clone, Copy, PartialEq, Eq, Hash)] +pub enum TargetId { + User(UserId), + Channel(ChannelId), +} + +impl From for TargetId { + fn from(value: UserId) -> Self { + TargetId::User(value) + } +} + +impl From for TargetId { + fn from(value: ChannelId) -> Self { + TargetId::Channel(value) + } +} + +impl TryFrom<&HistoricMessageSource> for TargetId { + type Error = (); + + fn try_from(value: &HistoricMessageSource) -> Result { + match value { + HistoricMessageSource::Server(_) => Err(()), // Is that okay? + HistoricMessageSource::User(user) => Ok(TargetId::User(user.user.id)), + HistoricMessageSource::Unknown => Err(()), + } + } +} +impl TryFrom<&HistoricMessageTarget> for TargetId { + type Error = (); + + fn try_from(value: &HistoricMessageTarget) -> Result { + match value { + HistoricMessageTarget::User(user) => Ok(TargetId::User(user.user.id)), + HistoricMessageTarget::Channel(channel) => Ok(TargetId::Channel(channel.id)), + HistoricMessageTarget::Unknown => Err(()), + } + } +} + +pub enum HistoryRequest { + Latest { + to_ts: Option, + limit: usize, + }, + Before { + from_ts: i64, + limit: usize, + }, + After { + start_ts: i64, + limit: usize, + }, + Around { + around_ts: i64, + limit: usize, + }, + Between { + start_ts: i64, + end_ts: i64, + limit: usize, + }, +} + +/// A backend implementation of [IRCv3 CHATHISTORY](https://ircv3.net/specs/extensions/chathistory) +pub trait HistoryService { + /// Returns a list of list of history logs the given user has access to + /// + /// And the timestamp of the last known message in that log. + fn list_targets( + &self, + user: UserId, + after_ts: Option, + before_ts: Option, + limit: Option, + ) -> HashMap; + + fn get_entries( + &self, + user: UserId, + target: TargetId, + request: HistoryRequest, + ) -> impl Iterator; +} + +pub mod local_history; + +mod build_data { + include!(concat!(env!("OUT_DIR"), "/built.rs")); +} diff --git a/sable_history/src/local_history.rs b/sable_history/src/local_history.rs new file mode 100644 index 00000000..ca37bd43 --- /dev/null +++ b/sable_history/src/local_history.rs @@ -0,0 +1,199 @@ +use std::collections::HashMap; + +use sable_network::prelude::*; + +use crate::*; + +/// Helper to extract the target name for chathistory purposes from a given event. +/// +/// This might be the source or target of the actual event, or might be None if it's +/// an event type that we don't include in history playback +fn target_id_for_entry(for_user: UserId, entry: &HistoryLogEntry) -> Option { + match &entry.details { + NetworkStateChange::NewMessage(message) => { + if matches!(&message.target, update::HistoricMessageTarget::User(user) if user.user.id == for_user) + { + (&message.source).try_into().ok() + } else { + (&message.target).try_into().ok() + } + } + _ => None, + } +} + +/// Implementation of [`HistoryService`] backed by [`NetworkNode`] +impl HistoryService for NetworkHistoryLog { + fn list_targets( + &self, + user: UserId, + after_ts: Option, + before_ts: Option, + limit: Option, + ) -> HashMap { + let mut found_targets = HashMap::new(); + + for entry in self.entries_for_user_reverse(user) { + if matches!(after_ts, Some(ts) if entry.timestamp >= ts) { + // Skip over until we hit the timestamp window we're interested in + continue; + } + if matches!(before_ts, Some(ts) if entry.timestamp <= ts) { + // We're iterating backwards through time; if we hit this then we've + // passed the requested window and should stop + break; + } + + if let Some(target_id) = target_id_for_entry(user, entry) { + found_targets.entry(target_id).or_insert(entry.timestamp); + } + + // If this pushes us past the the requested limit, stop + if matches!(limit, Some(limit) if limit <= found_targets.len()) { + break; + } + } + + found_targets + } + + fn get_entries( + &self, + user: UserId, + target: TargetId, + request: HistoryRequest, + ) -> impl Iterator { + match request { + #[rustfmt::skip] + HistoryRequest::Latest { to_ts, limit } => get_history_for_target( + self, + user, + target, + None, + to_ts, + limit, + 0, // Forward limit + ), + + HistoryRequest::Before { from_ts, limit } => { + get_history_for_target( + self, + user, + target, + Some(from_ts), + None, + limit, + 0, // Forward limit + ) + } + HistoryRequest::After { start_ts, limit } => get_history_for_target( + self, + user, + target, + Some(start_ts), + None, + 0, // Backward limit + limit, + ), + HistoryRequest::Around { around_ts, limit } => { + get_history_for_target( + self, + user, + target, + Some(around_ts), + None, + limit / 2, // Backward limit + limit / 2, // Forward limit + ) + } + HistoryRequest::Between { + start_ts, + end_ts, + limit, + } => get_history_for_target( + self, + user, + target, + Some(start_ts), + Some(end_ts), + 0, // Backward limit + limit, + ), + } + } +} + +fn get_history_for_target( + log: &NetworkHistoryLog, + source: UserId, + target: TargetId, + from_ts: Option, + to_ts: Option, + backward_limit: usize, + forward_limit: usize, +) -> impl Iterator { + let mut backward_entries = Vec::new(); + let mut forward_entries = Vec::new(); + + if backward_limit != 0 { + let from_ts = if forward_limit == 0 { + from_ts + } else { + // HACK: This is AROUND so we want to capture messages whose timestamp matches exactly + // (it's a message in the middle of the range) + from_ts.map(|from_ts| from_ts + 1) + }; + + for entry in log.entries_for_user_reverse(source) { + if matches!(from_ts, Some(ts) if entry.timestamp >= ts) { + // Skip over until we hit the timestamp window we're interested in + continue; + } + if matches!(to_ts, Some(ts) if entry.timestamp <= ts) { + // If we hit this then we've passed the requested window and should stop + break; + } + + if let Some(event_target) = target_id_for_entry(source, entry) { + if event_target == target { + backward_entries.push(entry); + } + } + + if backward_limit <= backward_entries.len() { + break; + } + } + } + + if forward_limit != 0 { + for entry in log.entries_for_user(source) { + if matches!(from_ts, Some(ts) if entry.timestamp <= ts) { + // Skip over until we hit the timestamp window we're interested in + continue; + } + if matches!(to_ts, Some(ts) if entry.timestamp >= ts) { + // If we hit this then we've passed the requested window and should stop + break; + } + + if let Some(event_target) = target_id_for_entry(source, entry) { + if event_target == target { + forward_entries.push(entry); + } + } + + if forward_limit <= forward_entries.len() { + break; + } + } + } + + // "The order of returned messages within the batch is implementation-defined, but SHOULD be + // ascending time order or some approximation thereof, regardless of the subcommand used." + // -- https://ircv3.net/specs/extensions/chathistory#returned-message-notes + backward_entries + .into_iter() + .rev() + .chain(forward_entries.into_iter()) +} diff --git a/sable_ircd/Cargo.toml b/sable_ircd/Cargo.toml index adc78088..19b2deee 100644 --- a/sable_ircd/Cargo.toml +++ b/sable_ircd/Cargo.toml @@ -9,6 +9,7 @@ debug = [] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +sable_history = { path = "../sable_history" } sable_macros = { path = "../sable_macros" } sable_network = { path = "../sable_network" } sable_server = { path = "../sable_server" } diff --git a/sable_ircd/src/command/handlers/chathistory.rs b/sable_ircd/src/command/handlers/chathistory.rs index 59ffc108..90bd4404 100644 --- a/sable_ircd/src/command/handlers/chathistory.rs +++ b/sable_ircd/src/command/handlers/chathistory.rs @@ -1,7 +1,8 @@ +use sable_history::{HistoryRequest, HistoryService}; + use super::*; use crate::{capability::ClientCapability, utils}; use messages::send_history::SendHistoryItem; -use sable_network::network::update::HistoricMessageTarget; use std::cmp::{max, min}; @@ -34,9 +35,19 @@ fn parse_msgref(subcommand: &str, target: Option<&str>, msgref: &str) -> Result< } } +fn parse_limit(s: &str) -> Result { + s.parse().map_err(|_| CommandError::Fail { + command: "CHATHISTORY", + code: "INVALID_PARAMS", + context: "".to_string(), + description: "Invalid limit".to_string(), + }) +} + #[allow(clippy::too_many_arguments)] #[command_handler("CHATHISTORY")] fn handle_chathistory( + ctx: &dyn Command, source: UserSource, server: &ClientServer, response: &dyn CommandResponse, @@ -49,20 +60,10 @@ fn handle_chathistory( let source = source.deref(); match subcommand.to_ascii_uppercase().as_str() { - "TARGETS" => { + "TARGET" => { let from_ts = parse_msgref(subcommand, None, arg_1)?; let to_ts = parse_msgref(subcommand, None, arg_2)?; - let limit = arg_3.parse().ok(); - - if limit.is_none() { - response.send(message::Fail::new( - "CHATHISTORY", - "INVALID_PARAMS", - "", - "Invalid limit", - )); - return Ok(()); - } + let limit = parse_limit(arg_3)?; // The spec allows the from and to timestamps in either order; list_targets requires from < to list_targets( @@ -71,180 +72,78 @@ fn handle_chathistory( source, Some(min(from_ts, to_ts)), Some(max(from_ts, to_ts)), - limit, + Some(limit), ); } - "LATEST" => { + normalized_subcommand => { let target = arg_1; - let to_ts = match arg_2 { - "*" => None, - _ => Some(parse_msgref(subcommand, Some(target), arg_2)?), - }; - - let limit = arg_3.parse().ok(); - if limit.is_none() { - response.send(message::Fail::new( - "CHATHISTORY", - "INVALID_PARAMS", - "", - "Invalid limit", - )); - return Ok(()); - } - - send_history_for_target( - server, - response, - source, - subcommand, - target, - None, - to_ts, - limit, - Some(0), // forward limit - )?; - } - "BEFORE" => { - let target = arg_1; - let from_ts = parse_msgref(subcommand, Some(target), arg_2)?; - - let limit = arg_3.parse().ok(); - if limit.is_none() { - response.send(message::Fail::new( - "CHATHISTORY", - "INVALID_PARAMS", - "", - "Invalid limit", - )); - return Ok(()); - } - - send_history_for_target( - server, - response, - source, - subcommand, - target, - Some(from_ts), - None, - limit, - Some(0), // forward limit - )?; - } - "AFTER" => { - let target = arg_1; - let start_ts = parse_msgref(subcommand, Some(target), arg_2)?; + let target_id = TargetParameter::parse_str(ctx, target) + .map_err(|_| CommandError::Fail { + command: "CHATHISTORY", + code: "INVALID_TARGET", + context: format!("{} {}", subcommand, target), + description: format!("Cannot fetch history from {}", target), + })? + .into(); + let request = match normalized_subcommand { + "LATEST" => { + let to_ts = match arg_2 { + "*" => None, + _ => Some(parse_msgref(subcommand, Some(target), arg_2)?), + }; + let limit = parse_limit(arg_3)?; + + HistoryRequest::Latest { to_ts, limit } + } + "BEFORE" => { + let from_ts = parse_msgref(subcommand, Some(target), arg_2)?; + let limit = parse_limit(arg_3)?; - let limit = arg_3.parse().ok(); - if limit.is_none() { - response.send(message::Fail::new( - "CHATHISTORY", - "INVALID_PARAMS", - "", - "Invalid limit", - )); - return Ok(()); - } + HistoryRequest::Before { from_ts, limit } + } + "AFTER" => { + let start_ts = parse_msgref(subcommand, Some(target), arg_2)?; + let limit = parse_limit(arg_3)?; - send_history_for_target( - server, - response, - source, - subcommand, - target, - Some(start_ts), - None, - Some(0), // backward limit - limit, - )?; - } - "AROUND" => { - let target = arg_1; - let around_ts = parse_msgref(subcommand, Some(target), arg_2)?; + HistoryRequest::After { start_ts, limit } + } + "AROUND" => { + let around_ts = parse_msgref(subcommand, Some(target), arg_2)?; + let limit = parse_limit(arg_3)?; - let limit = match arg_3.parse::().ok() { - Some(limit) => limit, - None => { + HistoryRequest::Around { around_ts, limit } + } + "BETWEEN" => { + let start_ts = parse_msgref(subcommand, Some(target), arg_2)?; + let end_ts = parse_msgref(subcommand, Some(target), arg_3)?; + let limit = parse_limit(arg_4.unwrap_or(""))?; + + HistoryRequest::Between { + start_ts, + end_ts, + limit, + } + } + _ => { response.send(message::Fail::new( "CHATHISTORY", "INVALID_PARAMS", - "", - "Invalid limit", + subcommand, + "Invalid subcommand", )); return Ok(()); } }; - send_history_for_target( - server, - response, - source, - subcommand, - target, - Some(around_ts), - None, - Some(limit / 2), // backward limit - Some(limit / 2), // forward limit - )?; - } - "BETWEEN" => { - let target = arg_1; - let start_ts = parse_msgref(subcommand, Some(target), arg_2)?; - let end_ts = parse_msgref(subcommand, Some(target), arg_3)?; - - let limit = arg_4.and_then(|arg| arg.parse().ok()); - if limit.is_none() { - response.send(message::Fail::new( - "CHATHISTORY", - "INVALID_PARAMS", - "", - "Invalid limit", - )); - return Ok(()); - } - - send_history_for_target( - server, - response, - source, - subcommand, - target, - Some(start_ts), - Some(end_ts), - Some(0), // backward limit - limit, - )?; - } - _ => { - response.send(message::Fail::new( - "CHATHISTORY", - "INVALID_PARAMS", - subcommand, - "Invalid subcommand", - )); + let log = server.node().history(); + let entries = log.get_entries(source.id(), target_id, request); + send_history_entries(response, subcommand, target, entries)?; } } Ok(()) } -// Helper to extract the target name for chathistory purposes from a given event. -// This might be the source or target of the actual event, or might be None if it's -// an event type that we don't include in history playback -fn target_name_for_entry(for_user: UserId, entry: &HistoryLogEntry) -> Option { - match &entry.details { - NetworkStateChange::NewMessage(message) => { - if matches!(&message.target, HistoricMessageTarget::User(user) if user.user.id == for_user) - { - Some(messages::MessageTarget::format(&message.source)) - } else { - Some(message.target.format()) - } - } - _ => None, - } -} - // For listing targets, we iterate backwards through time; this allows us to just collect the // first timestamp we see for each target and know that it's the most recent one fn list_targets( @@ -256,28 +155,8 @@ fn list_targets( limit: Option, ) { let log = server.node().history(); - let mut found_targets = HashMap::new(); - - for entry in log.entries_for_user_reverse(source.id()) { - if matches!(to_ts, Some(ts) if entry.timestamp >= ts) { - // Skip over until we hit the timestamp window we're interested in - continue; - } - if matches!(from_ts, Some(ts) if entry.timestamp <= ts) { - // We're iterating backwards through time; if we hit this then we've - // passed the requested window and should stop - break; - } - - if let Some(target_name) = target_name_for_entry(source.id(), entry) { - found_targets.entry(target_name).or_insert(entry.timestamp); - } - // If this pushes us past the the requested limit, stop - if matches!(limit, Some(limit) if limit <= found_targets.len()) { - break; - } - } + let found_targets = log.list_targets(source.id(), to_ts, from_ts, limit); // The appropriate cap here is Batch - chathistory is enabled because we got here, // but can be used without batch support. @@ -286,6 +165,22 @@ fn list_targets( .start(); for (target, timestamp) in found_targets { + let target = match target { + sable_history::TargetId::User(user) => server + .node() + .network() + .user(user) + .expect("History service returned unknown user id") + .nick() + .format(), + sable_history::TargetId::Channel(channel) => server + .node() + .network() + .channel(channel) + .expect("History service returned unknown channel id") + .name() + .to_string(), + }; batch.send(message::ChatHistoryTarget::new( &target, &utils::format_timestamp(timestamp), @@ -293,108 +188,26 @@ fn list_targets( } } -fn send_history_for_target( - server: &ClientServer, - into: impl MessageSink, - source: &wrapper::User, - subcommand: &str, - target: &str, - from_ts: Option, - to_ts: Option, - backward_limit: Option, - forward_limit: Option, -) -> CommandResult { - let log = server.node().history(); - let mut backward_entries = Vec::new(); - let mut forward_entries = Vec::new(); - - if backward_limit != Some(0) { - let from_ts = if forward_limit == Some(0) { - from_ts - } else { - // HACK: This is AROUND so we want to capture messages whose timestamp matches exactly - // (it's a message in the middle of the range) - from_ts.map(|from_ts| from_ts + 1) - }; - - for entry in log.entries_for_user_reverse(source.id()) { - if matches!(from_ts, Some(ts) if entry.timestamp >= ts) { - // Skip over until we hit the timestamp window we're interested in - continue; - } - if matches!(to_ts, Some(ts) if entry.timestamp <= ts) { - // If we hit this then we've passed the requested window and should stop - break; - } - - if let Some(event_target) = target_name_for_entry(source.id(), entry) { - if event_target == target { - backward_entries.push(entry); - } - } - - if matches!(backward_limit, Some(limit) if limit <= backward_entries.len()) { - break; - } - } - } - - if forward_limit != Some(0) { - for entry in log.entries_for_user(source.id()) { - if matches!(from_ts, Some(ts) if entry.timestamp <= ts) { - // Skip over until we hit the timestamp window we're interested in - continue; - } - if matches!(to_ts, Some(ts) if entry.timestamp >= ts) { - // If we hit this then we've passed the requested window and should stop - break; - } - - if let Some(event_target) = target_name_for_entry(source.id(), entry) { - if event_target == target { - forward_entries.push(entry); - } - } - - if matches!(forward_limit, Some(limit) if limit <= forward_entries.len()) { - break; - } - } - } - - send_history_entries(into, subcommand, target, backward_entries, forward_entries) -} - fn send_history_entries<'a>( into: impl MessageSink, subcommand: &str, target: &str, - backward_entries: Vec<&'a HistoryLogEntry>, - forward_entries: Vec<&'a HistoryLogEntry>, + mut entries: impl Iterator, ) -> CommandResult { - if backward_entries.is_empty() && forward_entries.is_empty() { - into.send(message::Fail::new( - "CHATHISTORY", - "INVALID_TARGET", - &format!("{} {}", subcommand, target), - &format!("Cannot fetch history from {}", target), - )); - } else { - let batch = into - .batch("chathistory", ClientCapability::Batch) - .with_arguments(&[target]) - .start(); + let first_entry = entries.next().ok_or(CommandError::Fail { + command: "CHATHISTORY", + code: "INVALID_TARGET", + context: format!("{} {}", subcommand, target), + description: format!("Cannot fetch history from {}", target), + })?; + let batch = into + .batch("chathistory", ClientCapability::Batch) + .with_arguments(&[target]) + .start(); + first_entry.send_to(&batch, first_entry)?; - // "The order of returned messages within the batch is implementation-defined, but SHOULD be - // ascending time order or some approximation thereof, regardless of the subcommand used." - // -- https://ircv3.net/specs/extensions/chathistory#returned-message-notes - for entry in backward_entries - .into_iter() - .rev() - .chain(forward_entries.into_iter()) - { - entry.send_to(&batch, entry)?; - } + for entry in entries { + entry.send_to(&batch, entry)?; } Ok(()) diff --git a/sable_ircd/src/command/plumbing/target_types.rs b/sable_ircd/src/command/plumbing/target_types.rs index 705a5638..5302cc1e 100644 --- a/sable_ircd/src/command/plumbing/target_types.rs +++ b/sable_ircd/src/command/plumbing/target_types.rs @@ -16,6 +16,21 @@ impl TargetParameter<'_> { } } +impl<'a> From<&TargetParameter<'a>> for sable_history::TargetId { + fn from(value: &TargetParameter) -> Self { + match value { + TargetParameter::User(u) => u.id().into(), + TargetParameter::Channel(c) => c.id().into(), + } + } +} + +impl<'a> From> for sable_history::TargetId { + fn from(value: TargetParameter) -> Self { + (&value).into() + } +} + impl<'a> PositionalArgument<'a> for TargetParameter<'a> { fn parse_str(ctx: &'a dyn Command, value: &'a str) -> Result { if let Ok(chname) = ChannelName::from_str(value) {