Skip to content

Commit

Permalink
Remove panics
Browse files Browse the repository at this point in the history
  • Loading branch information
progval committed Nov 2, 2024
1 parent 0b245bf commit 61f78eb
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 36 deletions.
109 changes: 73 additions & 36 deletions sable_history/src/pg_history_service.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::HashMap;

use anyhow::{bail, Result};
use chrono::{DateTime, NaiveDateTime, Utc};
use diesel::dsl::sql;
use diesel::prelude::*;
Expand Down Expand Up @@ -49,27 +50,27 @@ impl<'a> HistoryService for PgHistoryService<'a> {
tracing::error!("Could not get history channels: {e}");
return HashMap::new();
}
Ok(rows) => {
rows.map(|row| row.expect("Could not deserialize row"))
.map(
|(channel_id, max_message_id): (i64, Uuid)| -> (TargetId, i64) {
let (seconds, _) = max_message_id
.get_timestamp()
.expect("messages.id is not a UUID7")
.to_unix();
(
TargetId::Channel(ChannelId::from(Snowflake::from(
u64::try_from(channel_id).expect("channel id is negative"),
))),
seconds
.try_into()
.expect("message's UNIX timestamp is negative"),
)
},
)
.collect()
.await
}
Ok(rows) => rows
.map(|row| -> Result<(TargetId, i64)> {
let (channel_id, max_message_id): (i64, Uuid) = row?;
let channel =
TargetId::Channel(ChannelId::from(Snowflake::from(channel_id as u64)));

let Some(ts) = max_message_id.get_timestamp() else {
bail!("messages.id should be a UUID7, not {max_message_id}");
};
let (seconds, _) = ts.to_unix();
let Ok(seconds) = seconds.try_into() else {
bail!("message {max_message_id}'s UNIX timestamp is negative");
};
Ok((channel, seconds))
})
.try_collect()
.await
.unwrap_or_else(|e| {
tracing::error!("Could not read rows: {e}");
HashMap::new()
}),
}
}

Expand All @@ -87,16 +88,22 @@ impl<'a> HistoryService for PgHistoryService<'a> {

let mut connection_lock = self.database_connection.lock().await;

let db_channel_id = i64::try_from(channel_id.as_u64()).expect("channel id overflows u64");
let Some(channel) = channels::dsl::channels
let db_channel_id = channel_id.as_u64() as i64;
let channel = match channels::dsl::channels
.find(db_channel_id)
.select(crate::models::Channel::as_select())
.first(&mut *connection_lock)
.await
.optional()
.expect("Could not check if channel exists")
else {
return Err(HistoryError::InvalidTarget(target));
{
Ok(Some(channel)) => channel,
Ok(None) => return Err(HistoryError::InvalidTarget(target)),
Err(e) => {
tracing::error!("Could not check if channel exists: {e}");
return Err(HistoryError::InternalError(
"Could not check if channel exists".to_string(),
));
}
};

let base_query = messages::dsl::messages
Expand Down Expand Up @@ -138,11 +145,17 @@ impl<'a> HistoryService for PgHistoryService<'a> {
),
}
.await
.expect("could not query messages")
.map_err(|e| {
tracing::error!("Could not query messages: {e}");
HistoryError::InternalError("Could not query messages".to_string())
})?
.map_ok(|row| make_historical_event(&channel, row))
.try_collect::<Vec<_>>()
.await
.expect("could not parse all records")
.map_err(|e| {
tracing::error!("Could not parse messages: {e}");
HistoryError::InternalError("Could not parse message".to_string())
})?
.into_iter()
.rev() // need to reverse *after* applying the SQL LIMIT
.collect::<Vec<_>>()
Expand All @@ -161,11 +174,17 @@ impl<'a> HistoryService for PgHistoryService<'a> {
.load_stream(&mut *connection_lock),
)
.await
.expect("could not query messages")
.map_err(|e| {
tracing::error!("Could not query messages: {e}");
HistoryError::InternalError("Could not query messages".to_string())
})?
.map_ok(|row| make_historical_event(&channel, row))
.try_collect::<Vec<_>>()
.await
.expect("could not parse all records")
.map_err(|e| {
tracing::error!("Could not parse messages: {e}");
HistoryError::InternalError("Could not parse message".to_string())
})?
.into_iter()
.rev() // need to reverse *after* applying the SQL LIMIT
.collect::<Vec<_>>()
Expand All @@ -184,11 +203,17 @@ impl<'a> HistoryService for PgHistoryService<'a> {
.load_stream(&mut *connection_lock),
)
.await
.expect("could not query messages")
.map_err(|e| {
tracing::error!("Could not query messages: {e}");
HistoryError::InternalError("Could not query messages".to_string())
})?
.map_ok(|row| make_historical_event(&channel, row))
.try_collect::<Vec<_>>()
.await
.expect("could not parse all records")
.map_err(|e| {
tracing::error!("Could not parse messages: {e}");
HistoryError::InternalError("Could not parse message".to_string())
})?
}
HistoryRequest::Around { around_ts, limit } => {

Check warning on line 218 in sable_history/src/pg_history_service.rs

View workflow job for this annotation

GitHub Actions / Test (nightly)

unused variable: `around_ts`

Check warning on line 218 in sable_history/src/pg_history_service.rs

View workflow job for this annotation

GitHub Actions / Test (nightly)

unused variable: `limit`

Check warning on line 218 in sable_history/src/pg_history_service.rs

View workflow job for this annotation

GitHub Actions / Test (nightly)

unused variable: `around_ts`

Check warning on line 218 in sable_history/src/pg_history_service.rs

View workflow job for this annotation

GitHub Actions / Test (nightly)

unused variable: `limit`
todo!("around")
Expand Down Expand Up @@ -216,11 +241,17 @@ impl<'a> HistoryService for PgHistoryService<'a> {
.load_stream(&mut *connection_lock),
)
.await
.expect("could not query messages")
.map_err(|e| {
tracing::error!("Could not query messages: {e}");
HistoryError::InternalError("Could not query messages".to_string())
})?
.map_ok(|row| make_historical_event(&channel, row))
.try_collect::<Vec<_>>()
.await
.expect("could not parse all records")
.map_err(|e| {
tracing::error!("Could not parse messages: {e}");
HistoryError::InternalError("Could not parse message".to_string())
})?
} else {
let start_ts = DateTime::from_timestamp(start_ts, 0)
.unwrap_or(DateTime::<Utc>::MAX_UTC)
Expand All @@ -239,11 +270,17 @@ impl<'a> HistoryService for PgHistoryService<'a> {
.load_stream(&mut *connection_lock),
)
.await
.expect("could not query messages")
.map_err(|e| {
tracing::error!("Could not query messages: {e}");
HistoryError::InternalError("Could not query messages".to_string())
})?
.map_ok(|row| make_historical_event(&channel, row))
.try_collect::<Vec<_>>()
.await
.expect("could not parse all records")
.map_err(|e| {
tracing::error!("Could not parse messages: {e}");
HistoryError::InternalError("Could not parse message".to_string())
})?
.into_iter()
.rev() // need to reverse *after* applying the SQL LIMIT
.collect::<Vec<_>>()
Expand Down
6 changes: 6 additions & 0 deletions sable_ircd/src/command/handlers/chathistory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ async fn handle_chathistory(
{
Ok(entries) => send_history_entries(server, response, target, entries)?,
Err(HistoryError::InvalidTarget(_)) => Err(invalid_target_error())?,
Err(HistoryError::InternalError(e)) => Err(CommandError::Fail {
command: "CHATHISTORY",
code: "MESSAGE_ERROR",
context: format!("{} {}", subcommand, target),
description: e,
})?,
};
}
}
Expand Down
2 changes: 2 additions & 0 deletions sable_network/src/history/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ pub enum HistoryRequest {
pub enum HistoryError {
#[error("invalid target: {0:?}")]
InvalidTarget(TargetId),
#[error("internal server error: {0:?}")]
InternalError(String),
}

/// A backend implementation of [IRCv3 CHATHISTORY](https://ircv3.net/specs/extensions/chathistory)
Expand Down

0 comments on commit 61f78eb

Please sign in to comment.