diff --git a/sable_history/src/pg_history_service.rs b/sable_history/src/pg_history_service.rs index 009f8430..dc97a70b 100644 --- a/sable_history/src/pg_history_service.rs +++ b/sable_history/src/pg_history_service.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; +use anyhow::{bail, Result}; use chrono::{DateTime, NaiveDateTime, Utc}; use diesel::dsl::sql; use diesel::prelude::*; @@ -49,27 +50,27 @@ impl<'a> HistoryService for PgHistoryService<'a> { tracing::error!("Could not get history channels: {e}"); return HashMap::new(); } - Ok(rows) => { - rows.map(|row| row.expect("Could not deserialize row")) - .map( - |(channel_id, max_message_id): (i64, Uuid)| -> (TargetId, i64) { - let (seconds, _) = max_message_id - .get_timestamp() - .expect("messages.id is not a UUID7") - .to_unix(); - ( - TargetId::Channel(ChannelId::from(Snowflake::from( - u64::try_from(channel_id).expect("channel id is negative"), - ))), - seconds - .try_into() - .expect("message's UNIX timestamp is negative"), - ) - }, - ) - .collect() - .await - } + Ok(rows) => rows + .map(|row| -> Result<(TargetId, i64)> { + let (channel_id, max_message_id): (i64, Uuid) = row?; + let channel = + TargetId::Channel(ChannelId::from(Snowflake::from(channel_id as u64))); + + let Some(ts) = max_message_id.get_timestamp() else { + bail!("messages.id should be a UUID7, not {max_message_id}"); + }; + let (seconds, _) = ts.to_unix(); + let Ok(seconds) = seconds.try_into() else { + bail!("message {max_message_id}'s UNIX timestamp is negative"); + }; + Ok((channel, seconds)) + }) + .try_collect() + .await + .unwrap_or_else(|e| { + tracing::error!("Could not read rows: {e}"); + HashMap::new() + }), } } @@ -87,16 +88,22 @@ impl<'a> HistoryService for PgHistoryService<'a> { let mut connection_lock = self.database_connection.lock().await; - let db_channel_id = i64::try_from(channel_id.as_u64()).expect("channel id overflows u64"); - let Some(channel) = channels::dsl::channels + let db_channel_id = channel_id.as_u64() as i64; + let channel = match channels::dsl::channels .find(db_channel_id) .select(crate::models::Channel::as_select()) .first(&mut *connection_lock) .await .optional() - .expect("Could not check if channel exists") - else { - return Err(HistoryError::InvalidTarget(target)); + { + Ok(Some(channel)) => channel, + Ok(None) => return Err(HistoryError::InvalidTarget(target)), + Err(e) => { + tracing::error!("Could not check if channel exists: {e}"); + return Err(HistoryError::InternalError( + "Could not check if channel exists".to_string(), + )); + } }; let base_query = messages::dsl::messages @@ -138,11 +145,17 @@ impl<'a> HistoryService for PgHistoryService<'a> { ), } .await - .expect("could not query messages") + .map_err(|e| { + tracing::error!("Could not query messages: {e}"); + HistoryError::InternalError("Could not query messages".to_string()) + })? .map_ok(|row| make_historical_event(&channel, row)) .try_collect::>() .await - .expect("could not parse all records") + .map_err(|e| { + tracing::error!("Could not parse messages: {e}"); + HistoryError::InternalError("Could not parse message".to_string()) + })? .into_iter() .rev() // need to reverse *after* applying the SQL LIMIT .collect::>() @@ -161,11 +174,17 @@ impl<'a> HistoryService for PgHistoryService<'a> { .load_stream(&mut *connection_lock), ) .await - .expect("could not query messages") + .map_err(|e| { + tracing::error!("Could not query messages: {e}"); + HistoryError::InternalError("Could not query messages".to_string()) + })? .map_ok(|row| make_historical_event(&channel, row)) .try_collect::>() .await - .expect("could not parse all records") + .map_err(|e| { + tracing::error!("Could not parse messages: {e}"); + HistoryError::InternalError("Could not parse message".to_string()) + })? .into_iter() .rev() // need to reverse *after* applying the SQL LIMIT .collect::>() @@ -184,11 +203,17 @@ impl<'a> HistoryService for PgHistoryService<'a> { .load_stream(&mut *connection_lock), ) .await - .expect("could not query messages") + .map_err(|e| { + tracing::error!("Could not query messages: {e}"); + HistoryError::InternalError("Could not query messages".to_string()) + })? .map_ok(|row| make_historical_event(&channel, row)) .try_collect::>() .await - .expect("could not parse all records") + .map_err(|e| { + tracing::error!("Could not parse messages: {e}"); + HistoryError::InternalError("Could not parse message".to_string()) + })? } HistoryRequest::Around { around_ts, limit } => { todo!("around") @@ -216,11 +241,17 @@ impl<'a> HistoryService for PgHistoryService<'a> { .load_stream(&mut *connection_lock), ) .await - .expect("could not query messages") + .map_err(|e| { + tracing::error!("Could not query messages: {e}"); + HistoryError::InternalError("Could not query messages".to_string()) + })? .map_ok(|row| make_historical_event(&channel, row)) .try_collect::>() .await - .expect("could not parse all records") + .map_err(|e| { + tracing::error!("Could not parse messages: {e}"); + HistoryError::InternalError("Could not parse message".to_string()) + })? } else { let start_ts = DateTime::from_timestamp(start_ts, 0) .unwrap_or(DateTime::::MAX_UTC) @@ -239,11 +270,17 @@ impl<'a> HistoryService for PgHistoryService<'a> { .load_stream(&mut *connection_lock), ) .await - .expect("could not query messages") + .map_err(|e| { + tracing::error!("Could not query messages: {e}"); + HistoryError::InternalError("Could not query messages".to_string()) + })? .map_ok(|row| make_historical_event(&channel, row)) .try_collect::>() .await - .expect("could not parse all records") + .map_err(|e| { + tracing::error!("Could not parse messages: {e}"); + HistoryError::InternalError("Could not parse message".to_string()) + })? .into_iter() .rev() // need to reverse *after* applying the SQL LIMIT .collect::>() diff --git a/sable_ircd/src/command/handlers/chathistory.rs b/sable_ircd/src/command/handlers/chathistory.rs index 315539e8..0d33d92f 100644 --- a/sable_ircd/src/command/handlers/chathistory.rs +++ b/sable_ircd/src/command/handlers/chathistory.rs @@ -144,6 +144,12 @@ async fn handle_chathistory( { Ok(entries) => send_history_entries(server, response, target, entries)?, Err(HistoryError::InvalidTarget(_)) => Err(invalid_target_error())?, + Err(HistoryError::InternalError(e)) => Err(CommandError::Fail { + command: "CHATHISTORY", + code: "MESSAGE_ERROR", + context: format!("{} {}", subcommand, target), + description: e, + })?, }; } } diff --git a/sable_network/src/history/service.rs b/sable_network/src/history/service.rs index fcf3702c..4af67f04 100644 --- a/sable_network/src/history/service.rs +++ b/sable_network/src/history/service.rs @@ -78,6 +78,8 @@ pub enum HistoryRequest { pub enum HistoryError { #[error("invalid target: {0:?}")] InvalidTarget(TargetId), + #[error("internal server error: {0:?}")] + InternalError(String), } /// A backend implementation of [IRCv3 CHATHISTORY](https://ircv3.net/specs/extensions/chathistory)