Skip to content

Commit

Permalink
feat(server): Impelement subscribe_to_user, refactor room creation
Browse files Browse the repository at this point in the history
  • Loading branch information
mxxntype committed May 21, 2024
1 parent a6fd379 commit f3737ac
Show file tree
Hide file tree
Showing 7 changed files with 197 additions and 151 deletions.
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion proto/events.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ message ServersideRoomEvent {
}

message ServersideUserEvent {
UUID room_uuid = 1;
UUID user_uuid = 1;

oneof event {
UUID added_to_room = 2;
Expand Down
1 change: 0 additions & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ rand_core = "0.6.4"
redis = { version = "0.25.3", features = ["uuid", "tokio-comp", "aio"] }
thiserror = "1.0.61"
tokio = { version = "1.37.0", features = ["macros", "rt-multi-thread"] }
tokio-stream = { version = "0.1.15", features = ["sync"] }
tokio-util = "0.7.11"
tonic = "0.11.0"
tracing = "0.1.40"
Expand Down
10 changes: 5 additions & 5 deletions server/src/channel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,29 @@ use std::{ops::Deref, pin::Pin};
use tokio::sync::{mpsc, oneshot};

pub struct DisconnectChannel<T> {
pub(crate) signal_sender: Option<oneshot::Sender<()>>,
pub(crate) inner_channel: mpsc::Receiver<T>,
pub(crate) disconnect_tx: Option<oneshot::Sender<()>>,
pub(crate) grpc_rx: mpsc::Receiver<T>,
}

impl<T> Stream for DisconnectChannel<T> {
type Item = T;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.inner_channel).poll_recv(cx)
Pin::new(&mut self.grpc_rx).poll_recv(cx)
}
}

impl<T> Deref for DisconnectChannel<T> {
type Target = mpsc::Receiver<T>;

fn deref(&self) -> &Self::Target {
&self.inner_channel
&self.grpc_rx
}
}

impl<T> Drop for DisconnectChannel<T> {
fn drop(&mut self) {
if let Some(drop_signal) = self.signal_sender.take() {
if let Some(drop_signal) = self.disconnect_tx.take() {
let _ = drop_signal.send(());
}
}
Expand Down
75 changes: 1 addition & 74 deletions server/src/entities/room.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use crate::entities::relations::RoomUser;
use crate::persistence::Connection;
use crate::proto::ServersideRoom;
use crate::{persistence::Connection, proto::ClientsideRoom};
use diesel::prelude::*;
use diesel::r2d2::{ConnectionManager, PooledConnection};
use redis::{aio::MultiplexedConnection, AsyncCommands};
use std::fmt;
use uuid::Uuid;

Expand All @@ -24,77 +22,6 @@ impl Room {
}
}

#[tracing::instrument(skip_all)]
pub async fn from_room_with_members(
clientside_room: ClientsideRoom,
db_connection: &mut PooledConnection<ConnectionManager<Connection>>,
cache_connection: &mut MultiplexedConnection,
) -> Result<Uuid, tonic::Status> {
let user_uuids = clientside_room
.members
.into_iter()
.map(Uuid::try_from)
.collect::<Result<Vec<_>, _>>()
.map_err(|err| {
let err = err.to_string();
let msg = format!("Some member UUIDs could not be converted: {err}");
tonic::Status::invalid_argument(msg)
})?;

let room = Room::new(clientside_room.name);
let room_uuid = room.uuid;
let members: Vec<RoomUser> = user_uuids
.iter()
.map(|user_uuid| RoomUser {
room_uuid,
user_uuid: *user_uuid,
})
.collect();

// Store the room and members in the database.
{
use crate::entities::schema::rooms::dsl::*;
use crate::entities::schema::rooms_users::dsl::*;
use diesel::{insert_into, RunQueryDsl};

let _ = insert_into(rooms)
.values(&room)
.execute(db_connection)
.map_err(|err| {
let err = err.to_string();
let msg = format!("Could not save the room in the database: {err}");
tonic::Status::internal(msg)
})?;

let _ = insert_into(rooms_users)
.values(&members)
.execute(db_connection)
.map_err(|err| {
let err = err.to_string();
let msg = format!("Could not save the room's members: {err}");
tonic::Status::internal(msg)
})?;

tracing::info!(message = "Created new room", members = ?user_uuids, uuid = ?room.uuid);
}

// Update the membership cache.
for user_uuid in user_uuids.into_iter() {
let _: () = cache_connection
.rpush(user_uuid, room.uuid)
.await
.map_err(|err| {
let msg = "Could not update membership cache";
tracing::error!(message = msg, ?err);
tonic::Status::internal(msg)
})?;
}

tracing::info!(message = "Updated membership cache", room = ?room.uuid);

Ok(room.uuid)
}

pub async fn get_members(
&self,
db_connection: &mut PooledConnection<ConnectionManager<Connection>>,
Expand Down
Loading

0 comments on commit f3737ac

Please sign in to comment.