diff --git a/sable_ircd/src/command/handlers/monitor.rs b/sable_ircd/src/command/handlers/monitor.rs new file mode 100644 index 00000000..47002ee4 --- /dev/null +++ b/sable_ircd/src/command/handlers/monitor.rs @@ -0,0 +1,129 @@ +//! Implementation of the UI of [IRCv3 MONITOR](https://ircv3.net/specs/extensions/monitor) + +use super::*; +use crate::monitor::MonitorInsertError; +use crate::utils::LineWrapper; + +const MAX_CONTENT_LENGTH: usize = 300; // Conservative limit to avoid hitting 512 bytes limit + +#[command_handler("MONITOR")] +fn handle_monitor( + server: &ClientServer, + cmd: &dyn Command, + subcommand: &str, + targets: Option<&str>, +) -> CommandResult { + match subcommand.to_ascii_uppercase().as_str() { + "+" => handle_monitor_add(server, cmd, targets), + "-" => handle_monitor_del(server, cmd, targets), + "C" => handle_monitor_clear(server, cmd), + "L" => handle_monitor_list(server, cmd), + "S" => handle_monitor_show(server, cmd), + _ => Ok(()), // The spec does not say what to do; existing implementations ignore it + } +} + +fn handle_monitor_add( + server: &ClientServer, + cmd: &dyn Command, + targets: Option<&str>, +) -> CommandResult { + let targets = targets + .ok_or(CommandError::NotEnoughParameters)? // technically we could just ignore + .split(',') + .map(|target| Nickname::parse_str(cmd, target)) + .collect::, _>>()?; // ditto + let mut monitors = server.monitors.write(); + let res = targets + .iter() + .map(|&target| monitors.insert(target, cmd.connection_id())) + .collect::>() + .map_err( + |MonitorInsertError::TooManyMonitorsPerConnection { max, current }| { + CommandError::Numeric(make_numeric!(MonListFull, max, current)) + }, + ); + drop(monitors); // Release lock + send_statuses(cmd, targets); + res +} + +fn handle_monitor_del( + server: &ClientServer, + cmd: &dyn Command, + targets: Option<&str>, +) -> CommandResult { + let targets = targets + .ok_or(CommandError::NotEnoughParameters)? // technically we could just ignore + .split(',') + .map(|target| Nickname::parse_str(cmd, target)) + .collect::, _>>()?; // ditto + + let mut monitors = server.monitors.write(); + for target in targets { + monitors.remove(target, cmd.connection_id()); + } + Ok(()) +} + +fn handle_monitor_clear(server: &ClientServer, cmd: &dyn Command) -> CommandResult { + server + .monitors + .write() + .remove_connection(cmd.connection_id()); + Ok(()) +} + +fn handle_monitor_list(server: &ClientServer, cmd: &dyn Command) -> CommandResult { + // Copying the set of monitors to release lock on `server.monitors` ASAP + let monitors: Option> = server + .monitors + .read() + .monitored_nicks(cmd.connection_id()) + .map(|monitors| monitors.iter().copied().collect()); + + if let Some(monitors) = monitors { + LineWrapper::<',', _, _>::new(MAX_CONTENT_LENGTH, monitors.into_iter()) + .for_each(|line| cmd.numeric(make_numeric!(MonList, &line))); + } + cmd.numeric(make_numeric!(EndOfMonList)); + + Ok(()) +} + +fn handle_monitor_show(server: &ClientServer, cmd: &dyn Command) -> CommandResult { + // Copying the set of monitors to release lock on `server.monitors` ASAP + let monitors: Option> = server + .monitors + .read() + .monitored_nicks(cmd.connection_id()) + .map(|monitors| monitors.iter().copied().collect()); + + if let Some(monitors) = monitors { + send_statuses(cmd, monitors); + } + Ok(()) +} + +fn send_statuses(cmd: &dyn Command, targets: Vec) { + let mut online = Vec::new(); + let mut offline = Vec::new(); + for target in targets { + match cmd.network().user_by_nick(&target) { + Ok(user) => online.push(user.nuh()), + Err(LookupError::NoSuchNick(_)) => offline.push(target), + Err(e) => { + tracing::error!( + "Unexpected error while computing online status of {}: {}", + target, + e + ); + } + } + } + + LineWrapper::<',', _, _>::new(MAX_CONTENT_LENGTH, online.into_iter()) + .for_each(|line| cmd.numeric(make_numeric!(MonOnline, &line))); + LineWrapper::<',', _, _>::new(MAX_CONTENT_LENGTH, offline.into_iter()) + .for_each(|line| cmd.numeric(make_numeric!(MonOffline, &line))); +} diff --git a/sable_ircd/src/command/mod.rs b/sable_ircd/src/command/mod.rs index 64e150c2..bb49b016 100644 --- a/sable_ircd/src/command/mod.rs +++ b/sable_ircd/src/command/mod.rs @@ -48,6 +48,7 @@ mod handlers { mod kill; mod kline; mod mode; + mod monitor; mod motd; mod names; mod nick; diff --git a/sable_ircd/src/lib.rs b/sable_ircd/src/lib.rs index ed9a0226..86d68918 100644 --- a/sable_ircd/src/lib.rs +++ b/sable_ircd/src/lib.rs @@ -70,6 +70,7 @@ use connection_collection::ConnectionCollectionLockHelper; mod isupport; use isupport::*; +mod monitor; mod movable; pub mod server; diff --git a/sable_ircd/src/messages/numeric.rs b/sable_ircd/src/messages/numeric.rs index 3f17f5e7..fe7a367b 100644 --- a/sable_ircd/src/messages/numeric.rs +++ b/sable_ircd/src/messages/numeric.rs @@ -114,6 +114,13 @@ define_messages! { 440(ServicesNotAvailable) => { () => ":Services are not available"}, + // https://ircv3.net/specs/extensions/monitor + 730(MonOnline) => { (content: &str ) => ":{content}" }, + 731(MonOffline) => { (content: &str ) => ":{content}" }, + 732(MonList) => { (targets: &str) => ":{targets}" }, + 733(EndOfMonList) => { () => ":End of MONITOR list" }, + 734(MonListFull) => { (limit: usize, targets: usize) => "{limit} {targets} :Monitor list is full." }, + 900(LoggedIn) => { (account: &Nickname) => "* {account} :You are now logged in as {account}" }, // TODO: !@ instead of * 903(SaslSuccess) => { () => ":SASL authentication successful" }, 904(SaslFail) => { () => ":SASL authentication failed" }, diff --git a/sable_ircd/src/messages/source_target.rs b/sable_ircd/src/messages/source_target.rs index 30dd37e7..6ae084f7 100644 --- a/sable_ircd/src/messages/source_target.rs +++ b/sable_ircd/src/messages/source_target.rs @@ -58,10 +58,7 @@ impl MessageSource for update::HistoricMessageSource { impl MessageSource for update::HistoricUser { fn format(&self) -> String { - format!( - "{}!{}@{}", - self.nickname, self.user.user, self.user.visible_host - ) + self.nuh() } } diff --git a/sable_ircd/src/monitor.rs b/sable_ircd/src/monitor.rs new file mode 100644 index 00000000..78a95d72 --- /dev/null +++ b/sable_ircd/src/monitor.rs @@ -0,0 +1,206 @@ +//! Implementation of [IRCv3 MONITOR](https://ircv3.net/specs/extensions/monitor) +//! +//! Monitors are connection-specific (not user-wide), and not propagated across the network. +//! Therefore, they are identified only by a `ConnectionId`. + +use std::collections::{HashMap, HashSet}; + +use anyhow::{anyhow, Context, Result}; +use thiserror::Error; + +use crate::make_numeric; +use crate::messages::MessageSink; +use crate::prelude::*; +use crate::ClientServer; +use client_listener::ConnectionId; +use sable_network::prelude::*; +use sable_network::validated::Nickname; + +#[derive(Error, Clone, Debug)] +pub enum MonitorInsertError { + #[error("this connection has too many monitors ({current}), maximum is {max}")] + /// `current` may be greater than `max` if server configuration was edited. + TooManyMonitorsPerConnection { max: usize, current: usize }, +} + +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)] +pub struct MonitorSet { + pub max_per_connection: usize, + monitors_by_connection: HashMap>, + monitors_by_nickname: HashMap>, +} + +impl MonitorSet { + pub fn new(max_per_connection: usize) -> MonitorSet { + MonitorSet { + max_per_connection, + monitors_by_connection: HashMap::new(), + monitors_by_nickname: HashMap::new(), + } + } + + /// Marks the `nick` as being monitored by the given connection + pub fn insert( + &mut self, + nick: Nickname, + monitor: ConnectionId, + ) -> Result<(), MonitorInsertError> { + let entry = self + .monitors_by_connection + .entry(monitor) + .or_insert_with(HashSet::new); + if entry.len() >= self.max_per_connection { + return Err(MonitorInsertError::TooManyMonitorsPerConnection { + max: self.max_per_connection, + current: entry.len(), + }); + } + entry.insert(nick); + self.monitors_by_nickname + .entry(nick) + .or_insert_with(HashSet::new) + .insert(monitor); + Ok(()) + } + + /// Marks the `nick` as no longer monitored by the given connection + /// + /// Returns whether the nick was indeed monitored by the connection. + pub fn remove(&mut self, nick: Nickname, monitor: ConnectionId) -> bool { + self.monitors_by_connection + .get_mut(&monitor) + .map(|set| set.remove(&nick)); + self.monitors_by_nickname + .get_mut(&nick) + .map(|set| set.remove(&monitor)) + .unwrap_or(false) + } + + /// Remove all monitors of a connection + /// + /// Returns the set of nicks the connection monitored, if any. + pub fn remove_connection(&mut self, monitor: ConnectionId) -> Option> { + let nicks = self.monitors_by_connection.remove(&monitor); + if let Some(nicks) = &nicks { + for nick in nicks { + self.monitors_by_nickname + .get_mut(nick) + .expect("monitors_by_nickname missing nick present in monitors_by_connection") + .remove(&monitor); + } + } + nicks + } + + /// Returns all connections monitoring the given nick + pub fn nick_monitors(&self, nick: &Nickname) -> Option<&HashSet> { + self.monitors_by_nickname.get(nick) + } + + /// Returns all nicks monitored by the given connection + pub fn monitored_nicks(&self, monitor: ConnectionId) -> Option<&HashSet> { + self.monitors_by_connection.get(&monitor) + } +} + +/// Trait of [`NetworkStateChange`] details that are relevant to connections using +/// [IRCv3 MONITOR](https://ircv3.net/specs/extensions/monitor) to monitor users. +pub(crate) trait MonitoredItem: std::fmt::Debug { + /// Same as [`try_notify_monitors`] but logs errors instead of returning `Result`. + fn notify_monitors(&self, server: &ClientServer) { + if let Err(e) = self.try_notify_monitors(server) { + tracing::error!("Error while notifying monitors of {:?}: {}", self, e); + } + } + + /// Send `RPL_MONONLINE`/`RPL_MONOFFLINE` to all connections monitoring nicks involved in this + /// event + fn try_notify_monitors(&self, server: &ClientServer) -> Result<()>; +} + +impl MonitoredItem for update::NewUser { + fn try_notify_monitors(&self, server: &ClientServer) -> Result<()> { + notify_monitors(server, &self.user.nickname, || { + make_numeric!(MonOnline, &self.user.nuh()) + }) + } +} + +impl MonitoredItem for update::UserNickChange { + fn try_notify_monitors(&self, server: &ClientServer) -> Result<()> { + if self.user.nickname != self.new_nick { + // Don't notify on case change + notify_monitors(server, &self.user.nickname, || { + make_numeric!(MonOffline, &self.user.nickname.to_string()) + })?; + notify_monitors(server, &self.new_nick, || { + make_numeric!( + MonOnline, + &update::HistoricUser { + nickname: self.new_nick, + ..self.user.clone() + } + .nuh() + ) + })?; + } + Ok(()) + } +} + +impl MonitoredItem for update::UserQuit { + fn try_notify_monitors(&self, server: &ClientServer) -> Result<()> { + notify_monitors(server, &self.user.nickname, || { + make_numeric!(MonOffline, &self.user.nickname.to_string()) + }) + } +} + +impl MonitoredItem for update::BulkUserQuit { + fn try_notify_monitors(&self, server: &ClientServer) -> Result<()> { + self.items + .iter() + .map(|item| item.try_notify_monitors(server)) + .collect::>() // Notify all monitors even if one of them fails halfway + .into_iter() + .collect() + } +} + +fn notify_monitors( + server: &ClientServer, + nick: &Nickname, + mut make_numeric: impl FnMut() -> UntargetedNumeric, +) -> Result<()> { + // Copying the set of monitors to release lock on `server.monitors` ASAP + let monitors: Option> = server + .monitors + .read() + .monitors_by_nickname + .get(nick) + .map(|monitors| monitors.iter().copied().collect()); + if let Some(monitors) = monitors { + let network = server.network(); + monitors + .into_iter() + .map(|monitor| -> Result<()> { + let Some(conn) = server.find_connection(monitor) else { + // TODO: Remove from monitors? + return Ok(()); + }; + let user_id = conn + .user_id() + .ok_or(anyhow!("Monitor by user with no user_id {:?}", conn.id()))?; + let monitor_user = network + .user(user_id) + .context("Could not find monitoring user")?; + conn.send(make_numeric().format_for(server, &monitor_user)); + Ok(()) + }) + .collect::>() // Notify all monitors even if one of them fails halfway + .into_iter() + .collect() + } else { + Ok(()) + } +} diff --git a/sable_ircd/src/server/config.rs b/sable_ircd/src/server/config.rs index 1d63c01e..3ab02985 100644 --- a/sable_ircd/src/server/config.rs +++ b/sable_ircd/src/server/config.rs @@ -22,9 +22,11 @@ pub struct RawClientServerConfig { pub listeners: Vec, #[serde(flatten)] pub info_paths: RawServerInfo, + #[serde(default)] + pub monitor: MonitorConfig, } -#[derive(Serialize, Deserialize, Clone)] +#[derive(Serialize, Deserialize, Clone, Debug)] pub struct ServerInfoStrings { pub motd: Option>, // Linewise to not repeatedly split pub admin_info: Option, @@ -66,9 +68,32 @@ pub struct AdminInfo { pub email: Option, } +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct MonitorConfig { + /// Maximum number of active MONITORs per client connection. Default to 100 + #[serde(default = "default_max_monitors_per_connection")] + pub max_per_connection: u16, + // TODO: add a maximum per user and/or per account, specific limits for + // authenticated vs unauthenticated users, etc. +} + +impl Default for MonitorConfig { + fn default() -> MonitorConfig { + MonitorConfig { + max_per_connection: 64, + } + } +} + +fn default_max_monitors_per_connection() -> u16 { + MonitorConfig::default().max_per_connection +} + +#[derive(Debug)] pub struct ClientServerConfig { pub listeners: Vec, pub info_strings: ServerInfoStrings, + pub monitor: MonitorConfig, } #[derive(Debug, Error)] diff --git a/sable_ircd/src/server/mod.rs b/sable_ircd/src/server/mod.rs index ee30f7c7..d7b9425e 100644 --- a/sable_ircd/src/server/mod.rs +++ b/sable_ircd/src/server/mod.rs @@ -36,9 +36,10 @@ pub use async_handler_collection::*; mod upgrade; use self::{ - config::{RawClientServerConfig, ServerInfoStrings}, + config::{ClientServerConfig, RawClientServerConfig, ServerInfoStrings}, message_sink_repository::MessageSinkRepository, }; +use crate::monitor::MonitorSet; pub mod config; @@ -90,6 +91,8 @@ pub struct ClientServer { // Any general static info (responses for MOTD, ADMIN, and so on) pub info_strings: ServerInfoStrings, + + pub monitors: RwLock, } impl ClientServer { @@ -166,12 +169,17 @@ impl ClientServer { } #[tracing::instrument] - fn build_basic_isupport() -> ISupportBuilder { + fn build_basic_isupport(config: &ClientServerConfig) -> ISupportBuilder { let mut ret = ISupportBuilder::new(); ret.add(ISupportEntry::simple("EXCEPTS")); ret.add(ISupportEntry::simple("INVEX")); ret.add(ISupportEntry::simple("FNC")); + ret.add(ISupportEntry::int( + "MONITOR", + config.monitor.max_per_connection.into(), + )); + ret.add(ISupportEntry::string("CASEMAPPING", "ascii")); ret.add(ISupportEntry::int( diff --git a/sable_ircd/src/server/server_type.rs b/sable_ircd/src/server/server_type.rs index bb07b2b7..52559699 100644 --- a/sable_ircd/src/server/server_type.rs +++ b/sable_ircd/src/server/server_type.rs @@ -5,6 +5,8 @@ use async_trait::async_trait; use client_listener::SavedListenerCollection; use sable_server::ServerSaveError; +use crate::monitor::MonitorSet; + /// Saved state of a [`ClientServer`] for later resumption #[derive(serde::Serialize, serde::Deserialize)] pub struct ClientServerState { @@ -12,6 +14,7 @@ pub struct ClientServerState { auth_state: AuthClientState, client_caps: CapabilityRepository, listener_state: SavedListenerCollection, + monitors: MonitorSet, } #[async_trait] @@ -28,6 +31,7 @@ impl sable_server::ServerType for ClientServer { Ok(Self::ProcessedConfig { listeners: config.listeners.clone(), info_strings: ServerInfoStrings::load(&config.info_paths)?, + monitor: config.monitor.clone(), }) } @@ -81,11 +85,12 @@ impl sable_server::ServerType for ClientServer { connections: RwLock::new(ConnectionCollection::new()), prereg_connections: Mutex::new(VecDeque::new()), myinfo: Self::build_myinfo(), - isupport: Self::build_basic_isupport(), + isupport: Self::build_basic_isupport(&config), client_caps: CapabilityRepository::new(), node: node, listeners: Movable::new(client_listeners), info_strings: config.info_strings, + monitors: MonitorSet::new(config.monitor.max_per_connection.into()).into(), }) } @@ -106,12 +111,13 @@ impl sable_server::ServerType for ClientServer { .save() .await .map_err(ServerSaveError::IoError)?, + monitors: self.monitors.into_inner(), }) } /// Restore from a previously saved state. fn restore( - state: ClientServerState, + mut state: ClientServerState, node: Arc, history_receiver: UnboundedReceiver, config: &Self::ProcessedConfig, @@ -123,6 +129,8 @@ impl sable_server::ServerType for ClientServer { let listeners = ListenerCollection::resume(state.listener_state, client_send)?; let connections = ConnectionCollection::restore_from(state.connections, &listeners); + + state.monitors.max_per_connection = config.monitor.max_per_connection.into(); Ok(Self { node, action_receiver: Mutex::new(action_recv), @@ -143,11 +151,12 @@ impl sable_server::ServerType for ClientServer { auth_client: AuthClient::resume(state.auth_state, auth_send)?, auth_events: Mutex::new(auth_recv), myinfo: Self::build_myinfo(), - isupport: Self::build_basic_isupport(), + isupport: Self::build_basic_isupport(config), client_caps: state.client_caps, history_receiver: Mutex::new(history_receiver), listeners: Movable::new(listeners), info_strings: config.info_strings.clone(), + monitors: state.monitors.into(), }) } diff --git a/sable_ircd/src/server/update_handler.rs b/sable_ircd/src/server/update_handler.rs index 4828eb12..fcbf3efd 100644 --- a/sable_ircd/src/server/update_handler.rs +++ b/sable_ircd/src/server/update_handler.rs @@ -3,6 +3,7 @@ use sable_network::prelude::update::{HistoricMessageSource, HistoricMessageTarge use super::*; use crate::errors::HandleResult; +use crate::monitor::MonitoredItem; impl ClientServer { pub(super) fn handle_history_update(&self, update: NetworkHistoryUpdate) -> HandleResult { @@ -13,6 +14,18 @@ impl ClientServer { let history = self.node.history(); if let Some(entry) = history.get(entry_id) { match &entry.details { + NetworkStateChange::NewUser(detail) => { + detail.notify_monitors(&self); + } + NetworkStateChange::UserNickChange(detail) => { + detail.notify_monitors(&self); + } + NetworkStateChange::UserQuit(detail) => { + detail.notify_monitors(&self); + } + NetworkStateChange::BulkUserQuit(detail) => { + detail.notify_monitors(&self); + } NetworkStateChange::NewUserConnection(detail) => { let new_user_connection = detail.clone(); drop(history); diff --git a/sable_ircd/src/utils/line_wrapper.rs b/sable_ircd/src/utils/line_wrapper.rs new file mode 100644 index 00000000..03383eb2 --- /dev/null +++ b/sable_ircd/src/utils/line_wrapper.rs @@ -0,0 +1,105 @@ +/// Iterator of strings that concatenates input items into lines of a given maximum length +/// +/// The length is given in **bytes**. +pub struct LineWrapper, Iter: IntoIterator> { + line_length: usize, + iter: Iter::IntoIter, + buf: Option, +} + +impl, Iter: Iterator> + LineWrapper +{ + pub fn new(line_length: usize, iter: Iter) -> Self { + let mut iter = iter.into_iter(); + LineWrapper { + line_length, + buf: iter.next().map(|item| { + let mut buf = String::with_capacity(line_length); + buf.push_str(item.as_ref()); + buf + }), + iter: iter, + } + } +} + +impl, Iter: Iterator> Iterator + for LineWrapper +{ + type Item = String; + + fn next(&mut self) -> Option { + let Some(buf) = &mut self.buf else { + return None; + }; + + while let Some(item) = self.iter.next() { + let item = item.as_ref(); + if buf.as_bytes().len() + JOINER.len_utf8() + item.as_bytes().len() <= self.line_length + { + buf.push(JOINER); + buf.push_str(item); + } else { + // Line length exceeded; put the item aside for next call and return + // the content of the current buffer + let line = String::from(buf.as_str()); // Reallocate without the extra capacity + buf.clear(); + buf.push_str(item); + return Some(line); + } + } + + // No more items in the source iterator; return what remains in the buffer + + let mut buf = self.buf.take().unwrap(); // Can't panic, we already checked for None-ness + buf.shrink_to_fit(); + Some(buf) + } +} + +#[test] +fn test_linewrapper() { + let items = ["a", "ab", "cde", "f", "ghi", "jklm", "nopqr"]; + + assert_eq!( + LineWrapper::<' ', _, _>::new(3, items.into_iter()).collect::>(), + vec!["a", "ab", "cde", "f", "ghi", "jklm", "nopqr"] + ); + + assert_eq!( + LineWrapper::<' ', _, _>::new(4, items.into_iter()).collect::>(), + vec!["a ab", "cde", "f", "ghi", "jklm", "nopqr"] + ); + + assert_eq!( + LineWrapper::<' ', _, _>::new(5, items.into_iter()).collect::>(), + vec!["a ab", "cde f", "ghi", "jklm", "nopqr"] + ); + + assert_eq!( + LineWrapper::<' ', _, _>::new(9, items.into_iter()).collect::>(), + vec!["a ab cde", "f ghi", "jklm", "nopqr"] + ); +} + +#[test] +fn test_linewrapper_empty() { + assert_eq!( + LineWrapper::<' ', _, _>::new(3, Vec::<&str>::new().into_iter()).collect::>(), + Vec::::new() + ); +} + +#[test] +fn test_linewrapper_single() { + assert_eq!( + LineWrapper::<' ', _, _>::new(3, ["a"].into_iter()).collect::>(), + vec!["a"] + ); + + assert_eq!( + LineWrapper::<' ', _, _>::new(3, ["abcde"].into_iter()).collect::>(), + vec!["abcde"] + ); +} diff --git a/sable_ircd/src/utils/mod.rs b/sable_ircd/src/utils/mod.rs index 0ee58c61..c78e32e3 100644 --- a/sable_ircd/src/utils/mod.rs +++ b/sable_ircd/src/utils/mod.rs @@ -12,3 +12,6 @@ pub use time_utils::*; mod serde_once_cell; pub use serde_once_cell::*; + +mod line_wrapper; +pub use line_wrapper::*; diff --git a/sable_network/src/network/update.rs b/sable_network/src/network/update.rs index 287bd393..5cb74227 100644 --- a/sable_network/src/network/update.rs +++ b/sable_network/src/network/update.rs @@ -16,6 +16,15 @@ pub struct HistoricUser { pub account: Option, } +impl HistoricUser { + pub fn nuh(&self) -> String { + format!( + "{}!{}@{}", + self.nickname, self.user.user, self.user.visible_host + ) + } +} + /// Some state changes can originate from either users or servers; this enum is used in the /// [`NetworkStateChange`] for those changes to describe the source of the change. ///