From 074d9bb63dcc79f1a7582c58cdc677988de9792f Mon Sep 17 00:00:00 2001 From: Carson McManus Date: Fri, 13 Oct 2023 08:31:31 -0400 Subject: [PATCH] add typeshare (#1103) * add typeshare to deps, add codegen script * ci: add check for codegen * run lint after codegen * refactor how M2B and B2M messages are defined for easier typeshare * more refactors * more refactors * rename B2M message types * remove enum_derive * rebase, more refactoring * minor refactor --- .github/workflows/main.yml | 8 + .prettierignore | 3 +- Cargo.lock | 84 +++++++++ Cargo.toml | 3 +- crates/harness-tests/src/routing.rs | 5 +- crates/harness/src/monolith.rs | 16 +- crates/ott-balancer-bin/src/balancer.rs | 53 +++--- crates/ott-balancer-bin/src/monolith.rs | 17 +- crates/ott-balancer-protocol/Cargo.toml | 1 + crates/ott-balancer-protocol/src/monolith.rs | 186 ++++++++++++++----- crates/ott-balancer-protocol/src/wrappers.rs | 16 ++ scripts/codegen.sh | 11 ++ server/balancer.ts | 90 +-------- server/generated.ts | 94 ++++++++++ tools/balancer-tester/src/monolith.rs | 57 +++--- 15 files changed, 435 insertions(+), 209 deletions(-) create mode 100755 scripts/codegen.sh create mode 100644 server/generated.ts diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index c0c61398b..6a4181668 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -42,6 +42,14 @@ jobs: env: NODE_ENV: production + - name: cargo-install + uses: baptiste0928/cargo-install@v1 + with: + crate: typeshare-cli + version: "1.7.0" + - name: Ensure generated code is up to date + run: ./scripts/codegen.sh && git diff --exit-code + test: runs-on: ubuntu-latest strategy: diff --git a/.prettierignore b/.prettierignore index fe5ceef35..b6f9684b5 100644 --- a/.prettierignore +++ b/.prettierignore @@ -6,4 +6,5 @@ *.lock *.log *.env -**/tests/unit/fixtures/** \ No newline at end of file +**/tests/unit/fixtures/** +server/generated.ts diff --git a/Cargo.lock b/Cargo.lock index 6a6131567..9fc05055e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8,6 +8,21 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + [[package]] name = "anstream" version = "0.3.1" @@ -299,6 +314,20 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chrono" +version = "0.4.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38" +dependencies = [ + "android-tzdata", + "iana-time-zone", + "js-sys", + "num-traits", + "wasm-bindgen", + "windows-targets 0.48.0", +] + [[package]] name = "clap" version = "4.3.12" @@ -1062,6 +1091,29 @@ dependencies = [ "tracing", ] +[[package]] +name = "iana-time-zone" +version = "0.1.57" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fad5b825842d2b38bd206f3e81d6957625fd7f0a361e345c30e01a0ae2dd613" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "wasm-bindgen", + "windows", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + [[package]] name = "idna" version = "0.1.5" @@ -1565,6 +1617,7 @@ version = "0.1.0" dependencies = [ "serde", "serde_json", + "typeshare", "uuid", ] @@ -2899,6 +2952,28 @@ version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba" +[[package]] +name = "typeshare" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f44d1a2f454cb35fbe05b218c410792697e76bd868f48d3a418f2cd1a7d527d6" +dependencies = [ + "chrono", + "serde", + "serde_json", + "typeshare-annotation", +] + +[[package]] +name = "typeshare-annotation" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc670d0e358428857cc3b4bf504c691e572fccaec9542ff09212d3f13d74b7a9" +dependencies = [ + "quote", + "syn 1.0.109", +] + [[package]] name = "uncased" version = "0.9.9" @@ -3223,6 +3298,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e686886bc078bc1b0b600cac0147aadb815089b6e4da64016cbd754b6342700f" +dependencies = [ + "windows-targets 0.48.0", +] + [[package]] name = "windows-sys" version = "0.42.0" diff --git a/Cargo.toml b/Cargo.toml index 1028dc49e..87eaba1f5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ futures-util = "0.3.28" harness = { path = "crates/harness" } harness_macros = { path = "crates/harness_macros" } hyper = { version = "1.0.0-rc.4", features = ["full"] } -hyper-util = { git ="https://github.com/hyperium/hyper-util.git", rev = "f898015" } +hyper-util = { git = "https://github.com/hyperium/hyper-util.git", rev = "f898015" } http-body-util = "0.1.0-rc.2" once_cell = "1.17.1" ott-common = { path = "crates/ott-common" } @@ -30,5 +30,6 @@ tokio-util = "0.7.8" tracing = "0.1.37" tracing-subscriber = "0.3.17" tungstenite = "0.19.0" +typeshare = "1.0.0" url = "2.3.1" uuid = { version = "1.3.0", features = ["serde", "v4"] } diff --git a/crates/harness-tests/src/routing.rs b/crates/harness-tests/src/routing.rs index 5f1497906..450b4a760 100644 --- a/crates/harness-tests/src/routing.rs +++ b/crates/harness-tests/src/routing.rs @@ -1,7 +1,6 @@ use std::time::Duration; use harness::{Client, MockRespParts, Monolith, TestRunner}; - use test_context::test_context; #[test_context(TestRunner)] @@ -20,7 +19,7 @@ async fn route_http_to_correct_monolith(ctx: &mut TestRunner) { ); m.show().await; - m.load_room("foo".to_string()).await; + m.load_room("foo").await; // Without this sleep, this test can trigger a race condition where the client connects to the balancer before the monolith has the room loaded. // This will cause the other monolith to get the room loaded, and the client will connect to that monolith instead. @@ -100,7 +99,7 @@ async fn route_ws_to_correct_monolith(ctx: &mut TestRunner) { let mut m = Monolith::new(ctx).await.unwrap(); m.show().await; - m.load_room("foo".to_string()).await; + m.load_room("foo").await; // Without this sleep, this test can trigger a race condition where the client connects to the balancer before the monolith has the room loaded. // This will cause the other monolith to get the room loaded, and the client will connect to that monolith instead. diff --git a/crates/harness/src/monolith.rs b/crates/harness/src/monolith.rs index 8bfb4852f..4ec2cc3de 100644 --- a/crates/harness/src/monolith.rs +++ b/crates/harness/src/monolith.rs @@ -241,18 +241,10 @@ impl Monolith { pub async fn load_room(&mut self, room: impl Into + Clone) { let room = room.into(); - let meta = RoomMetadata::default(); - self.state - .lock() - .unwrap() - .rooms - .insert(room.clone(), meta.clone()); + let meta = RoomMetadata::default_with_name(room.clone()); + self.state.lock().unwrap().rooms.insert(room, meta.clone()); if self.connected() { - self.send(MsgM2B::Loaded { - name: room, - metadata: meta, - }) - .await; + self.send(M2BLoaded { room: meta }).await; } } @@ -264,7 +256,7 @@ impl Monolith { .rooms .remove(&room.clone().clone()); if self.connected() { - self.send(MsgM2B::Unloaded { room }).await; + self.send(M2BUnloaded { name: room }).await; } } } diff --git a/crates/ott-balancer-bin/src/balancer.rs b/crates/ott-balancer-bin/src/balancer.rs index fb9435db9..c5681d504 100644 --- a/crates/ott-balancer-bin/src/balancer.rs +++ b/crates/ott-balancer-bin/src/balancer.rs @@ -1,6 +1,6 @@ use std::{collections::HashMap, sync::Arc}; -use ott_balancer_protocol::monolith::{MsgB2M, MsgM2B, RoomMetadata}; +use ott_balancer_protocol::monolith::{B2MClientMsg, B2MJoin, B2MLeave, MsgM2B, RoomMetadata}; use ott_balancer_protocol::*; use rand::seq::IteratorRandom; use serde_json::value::RawValue; @@ -239,7 +239,7 @@ impl BalancerContext { }; monolith.add_client(&client.room, client.id); monolith - .send(&MsgB2M::Join { + .send(B2MJoin { room: client.room.clone(), client: client.id, token: client.token.clone(), @@ -253,7 +253,7 @@ impl BalancerContext { pub async fn remove_client(&mut self, client_id: ClientId) -> anyhow::Result<()> { let monolith = self.find_monolith_mut(client_id)?; monolith.remove_client(client_id); - monolith.send(&MsgB2M::Leave { client: client_id }).await?; + monolith.send(B2MLeave { client: client_id }).await?; Ok(()) } @@ -311,13 +311,13 @@ impl BalancerContext { pub fn add_or_sync_room( &mut self, - room: &RoomName, metadata: RoomMetadata, monolith_id: MonolithId, ) -> anyhow::Result<()> { let monolith = self.monoliths.get_mut(&monolith_id).unwrap(); - self.rooms_to_monoliths.insert(room.clone(), monolith_id); - monolith.add_or_sync_room(room, metadata); + self.rooms_to_monoliths + .insert(metadata.name.clone(), monolith_id); + monolith.add_or_sync_room(metadata); Ok(()) } @@ -437,7 +437,7 @@ pub async fn dispatch_client_message( }; monolith - .send(&MsgB2M::ClientMsg { + .send(B2MClientMsg { client_id: *msg.id(), payload: raw_value, }) @@ -521,19 +521,16 @@ pub async fn dispatch_monolith_message( monolith_id ); } - MsgM2B::Loaded { - name: room, - metadata, - } => { - debug!("room loaded on {}: {:?}", monolith_id, room); + MsgM2B::Loaded(msg) => { + debug!("room loaded on {}: {:?}", monolith_id, msg.room.name); let mut ctx_write = ctx.write().await; - ctx_write.add_or_sync_room(&room, metadata, *monolith_id)?; + ctx_write.add_or_sync_room(msg.room, *monolith_id)?; } - MsgM2B::Unloaded { room } => { + MsgM2B::Unloaded(msg) => { let mut ctx_write = ctx.write().await; - ctx_write.remove_room(&room, *monolith_id)?; + ctx_write.remove_room(&msg.name, *monolith_id)?; } - MsgM2B::Gossip { rooms } => { + MsgM2B::Gossip(msg) => { let mut ctx_write = ctx.write_owned().await; let to_remove = ctx_write .monoliths @@ -541,18 +538,18 @@ pub async fn dispatch_monolith_message( .unwrap() .rooms() .keys() - .filter(|room| !rooms.iter().any(|r| r.name == **room)) + .filter(|room| !msg.rooms.iter().any(|r| r.name == **room)) .cloned() .collect::>(); debug!("to_remove: {:?}", to_remove); - for gossip_room in rooms.iter() { + for gossip_room in msg.rooms.iter() { ctx_write .rooms_to_monoliths .insert(gossip_room.name.clone(), *monolith_id); } let monolith = ctx_write.monoliths.get_mut(monolith_id).unwrap(); - for gossip_room in rooms { - monolith.add_or_sync_room(&gossip_room.name, gossip_room.metadata); + for gossip_room in msg.rooms { + monolith.add_or_sync_room(gossip_room); } let monolith = ctx_write.monoliths.get_mut(monolith_id).unwrap(); @@ -560,11 +557,7 @@ pub async fn dispatch_monolith_message( monolith.remove_room(&room) } } - MsgM2B::RoomMsg { - room, - client_id: _, - payload, - } => { + MsgM2B::RoomMsg(msg) => { let ctx_read = ctx.read().await; let Some(room) = ctx_read @@ -572,7 +565,7 @@ pub async fn dispatch_monolith_message( .get(monolith_id) .unwrap() .rooms() - .get(&room) + .get(&msg.room) else { anyhow::bail!("room not found on monolith"); }; @@ -582,7 +575,7 @@ pub async fn dispatch_monolith_message( // broadcast to all clients debug!("broadcasting to clients in room: {:?}", room.name()); // TODO: optimize this using a broadcast channel - let built_msg = Message::text(payload.to_string()); + let built_msg = Message::text(msg.payload.to_string()); for client in room.clients() { let Some(client) = ctx_read.clients.get(client) else { anyhow::bail!("client not found"); @@ -591,14 +584,14 @@ pub async fn dispatch_monolith_message( client.send(built_msg.clone()).await?; } } - MsgM2B::Kick { client_id, reason } => { + MsgM2B::Kick(msg) => { let ctx_read = ctx.read().await; - let Some(client) = ctx_read.clients.get(&client_id) else { + let Some(client) = ctx_read.clients.get(&msg.client_id) else { anyhow::bail!("client not found"); }; client .send(Message::Close(Some(CloseFrame { - code: CloseCode::Library(reason), + code: CloseCode::Library(msg.reason), reason: "".into(), }))) .await?; diff --git a/crates/ott-balancer-bin/src/monolith.rs b/crates/ott-balancer-bin/src/monolith.rs index e2855993d..647fdb8e6 100644 --- a/crates/ott-balancer-bin/src/monolith.rs +++ b/crates/ott-balancer-bin/src/monolith.rs @@ -85,16 +85,17 @@ impl BalancerMonolith { } } - pub async fn send(&self, msg: &MsgB2M) -> anyhow::Result<()> { - let text = serde_json::to_string(&msg)?; + pub async fn send(&self, msg: impl Into) -> anyhow::Result<()> { + let text = serde_json::to_string(&msg.into())?; let socket_msg = Message::Text(text).into(); self.socket_tx.send(socket_msg).await?; Ok(()) } - pub fn set_room_metadata(&mut self, room: &RoomName, metadata: RoomMetadata) { - let Some(room) = self.rooms.get_mut(room) else { + pub fn set_room_metadata(&mut self, metadata: RoomMetadata) { + let room = metadata.name.clone(); + let Some(room) = self.rooms.get_mut(&room) else { error!( "Error setting metadata, Monolith {} does not have room {}", self.id, room @@ -104,11 +105,11 @@ impl BalancerMonolith { room.set_metadata(metadata); } - pub fn add_or_sync_room(&mut self, name: &RoomName, metadata: RoomMetadata) { - if self.has_room(name) { - self.set_room_metadata(name, metadata); + pub fn add_or_sync_room(&mut self, metadata: RoomMetadata) { + if self.has_room(&metadata.name) { + self.set_room_metadata(metadata); } else { - let mut room = Room::new(name.clone()); + let mut room = Room::new(metadata.name.clone()); room.set_metadata(metadata); self.add_room(room); } diff --git a/crates/ott-balancer-protocol/Cargo.toml b/crates/ott-balancer-protocol/Cargo.toml index dbe771f5e..33682f33b 100644 --- a/crates/ott-balancer-protocol/Cargo.toml +++ b/crates/ott-balancer-protocol/Cargo.toml @@ -8,4 +8,5 @@ edition = "2021" [dependencies] serde.workspace = true serde_json.workspace = true +typeshare.workspace = true uuid.workspace = true diff --git a/crates/ott-balancer-protocol/src/monolith.rs b/crates/ott-balancer-protocol/src/monolith.rs index f4c1c65a8..d36c5ee86 100644 --- a/crates/ott-balancer-protocol/src/monolith.rs +++ b/crates/ott-balancer-protocol/src/monolith.rs @@ -2,75 +2,163 @@ use serde::{Deserialize, Serialize}; use serde_json::value::RawValue; +use typeshare::typeshare; use crate::{ClientId, RoomName}; #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type", content = "payload", rename_all = "snake_case")] +#[typeshare] pub enum MsgB2M { - Load { - room: RoomName, - }, - Join { - room: RoomName, - client: ClientId, - token: String, - }, - Leave { - client: ClientId, - }, - ClientMsg { - /// The client that sent the message. - client_id: ClientId, - /// The message that was received from the client, verbatim. - payload: Box, - }, + Load(B2MLoad), + Join(B2MJoin), + Leave(B2MLeave), + ClientMsg(B2MClientMsg), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[typeshare] +pub struct B2MLoad { + pub room: RoomName, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[typeshare] +pub struct B2MJoin { + pub room: RoomName, + pub client: ClientId, + pub token: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[typeshare] +pub struct B2MLeave { + pub client: ClientId, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[typeshare] +pub struct B2MClientMsg> { + /// The client that sent the message. + pub client_id: ClientId, + /// The message that was received from the client, verbatim. + pub payload: T, +} + +impl From for MsgB2M { + fn from(val: B2MLoad) -> Self { + Self::Load(val) + } +} + +impl From for MsgB2M { + fn from(val: B2MJoin) -> Self { + Self::Join(val) + } +} + +impl From for MsgB2M { + fn from(val: B2MLeave) -> Self { + Self::Leave(val) + } +} + +impl From for MsgB2M { + fn from(val: B2MClientMsg) -> Self { + Self::ClientMsg(val) + } } #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type", content = "payload", rename_all = "snake_case")] +#[typeshare] pub enum MsgM2B { Init(M2BInit), - Loaded { - name: RoomName, - #[serde(flatten)] - metadata: RoomMetadata, - }, - Unloaded { - room: RoomName, - }, - Gossip { - rooms: Vec, - }, - RoomMsg { - /// The room to send the message to. - room: RoomName, - /// The client to send the message to. If `None`, send to all clients in the room. - client_id: Option, - /// The message to send, verbatim. - payload: Box, - }, - Kick { - client_id: ClientId, - reason: u16, - }, + Loaded(M2BLoaded), + Unloaded(M2BUnloaded), + Gossip(M2BGossip), + RoomMsg(M2BRoomMsg), + Kick(M2BKick), } #[derive(Debug, Clone, Serialize, Deserialize)] +#[typeshare] pub struct M2BInit { + /// The port that the monolith is listening for HTTP requests on. pub port: u16, } #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct GossipRoom { +#[typeshare] +pub struct M2BLoaded { + pub room: RoomMetadata, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[typeshare] +pub struct M2BUnloaded { pub name: RoomName, - #[serde(flatten)] - pub metadata: RoomMetadata, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[typeshare] +pub struct M2BGossip { + pub rooms: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[typeshare] +pub struct M2BRoomMsg> { + /// The room to send the message to. + pub room: RoomName, + /// The client to send the message to. If `None`, send to all clients in the room. + pub client_id: Option, + /// The message to send, verbatim. + pub payload: T, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[typeshare] +pub struct M2BKick { + pub client_id: ClientId, + pub reason: u16, +} + +impl From for MsgM2B { + fn from(val: M2BInit) -> Self { + Self::Init(val) + } +} + +impl From for MsgM2B { + fn from(val: M2BLoaded) -> Self { + Self::Loaded(val) + } +} + +impl From for MsgM2B { + fn from(val: M2BUnloaded) -> Self { + Self::Unloaded(val) + } +} + +impl From for MsgM2B { + fn from(val: M2BGossip) -> Self { + Self::Gossip(val) + } +} + +impl From for MsgM2B { + fn from(val: M2BRoomMsg) -> Self { + Self::RoomMsg(val) + } } #[derive(Debug, Clone, Default, Serialize, Deserialize)] +#[typeshare] /// Metadata about a room, according to the Monolith. pub struct RoomMetadata { + pub name: RoomName, pub title: String, pub description: String, #[serde(rename = "isTemporary")] @@ -81,11 +169,21 @@ pub struct RoomMetadata { #[serde(rename = "currentSource")] pub current_source: serde_json::Value, /// The number of clients in this room. - pub users: usize, + pub users: u32, +} + +impl RoomMetadata { + pub fn default_with_name(name: impl Into) -> Self { + Self { + name: name.into(), + ..Default::default() + } + } } #[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "lowercase")] +#[typeshare] pub enum Visibility { Public, #[default] diff --git a/crates/ott-balancer-protocol/src/wrappers.rs b/crates/ott-balancer-protocol/src/wrappers.rs index 6e1f98fd4..47940c092 100644 --- a/crates/ott-balancer-protocol/src/wrappers.rs +++ b/crates/ott-balancer-protocol/src/wrappers.rs @@ -1,13 +1,17 @@ use std::{fmt::Display, sync::Arc}; use serde::{Deserialize, Serialize}; +use typeshare::typeshare; use uuid::Uuid; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)] +#[typeshare(serialized_as = "String")] pub struct ClientId(Uuid); #[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)] +#[typeshare(serialized_as = "String")] pub struct RoomName(Arc); #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)] +#[typeshare(serialized_as = "String")] pub struct MonolithId(Uuid); impl From for Uuid { @@ -34,6 +38,18 @@ impl From for RoomName { } } +impl From<&str> for RoomName { + fn from(val: &str) -> Self { + Self(val.into()) + } +} + +impl Default for RoomName { + fn default() -> Self { + Self("".into()) + } +} + impl From for Uuid { fn from(val: MonolithId) -> Self { val.0 diff --git a/scripts/codegen.sh b/scripts/codegen.sh new file mode 100755 index 000000000..70471f965 --- /dev/null +++ b/scripts/codegen.sh @@ -0,0 +1,11 @@ +#!/bin/bash + +set -xeo pipefail + +cd "$(dirname "$0")/.." || exit 1 + +typeshare "crates/" --lang=typescript --output-file="server/generated.ts" +sed -i 's/M2BRoomMsg/M2BRoomMsg/g' server/generated.ts +sed -i 's/B2MClientMsg/B2MClientMsg/g' server/generated.ts +sed -i 's/currentSource: Value/currentSource: unknown/g' server/generated.ts +yarn run lint diff --git a/server/balancer.ts b/server/balancer.ts index f9c6b4a46..11def2f4e 100644 --- a/server/balancer.ts +++ b/server/balancer.ts @@ -11,6 +11,8 @@ import { OttWebsocketError } from "ott-common/models/types"; import roommanager from "./roommanager"; import type { RoomListItem } from "./api/room"; import _ from "lodash"; +import type { MsgB2M, MsgM2B } from "./generated"; +export type { MsgB2M, MsgM2B }; const log = getLogger("balancer"); @@ -69,7 +71,7 @@ class BalancerManager { log.info(`Connected to balancer ${conn.id}`); this.emit("connect", conn); - const init: MsgM2BInit = { + const init: MsgM2B = { type: "init", payload: { port: conf.get("port"), @@ -164,7 +166,7 @@ export class BalancerConnection { } private onSocketConnect(event: WebSocket.OpenEvent) { - const init: MsgM2BInit = { + const init: MsgM2B = { type: "init", payload: { port: conf.get("port"), @@ -274,7 +276,9 @@ async function onRoomLoad(roomName: string) { broadcastToBalancers({ type: "loaded", - payload: obj, + payload: { + room: obj, + }, }); gossipDebounced(); } @@ -283,7 +287,7 @@ function onRoomUnload(roomName: string) { broadcastToBalancers({ type: "unloaded", payload: { - room: roomName, + name: roomName, }, }); gossipDebounced(); @@ -311,82 +315,4 @@ function gossip() { const gossipDebounced = _.debounce(gossip, 1000 * 20, { trailing: true, maxWait: 1000 * 20 }); -// TODO: use typeshare? -export type MsgB2M = MsgB2MJoin | MsgB2MLeave | MsgB2MClientMsg; - -interface MsgB2MJoin { - type: "join"; - payload: { - room: string; - client: ClientId; - token: AuthToken; - }; -} - -interface MsgB2MLeave { - type: "leave"; - payload: { - client: ClientId; - }; -} - -interface MsgB2MClientMsg { - type: "client_msg"; - payload: { - client_id: ClientId; - payload: T; - }; -} - -export type MsgM2B = - | MsgM2BInit - | MsgM2BLoaded - | MsgM2BUnloaded - | MsgM2BGossip - | MsgM2BRoomMsg - | MsgM2BKick; - -interface MsgM2BInit { - type: "init"; - payload: { - port: number; - }; -} - -interface MsgM2BLoaded { - type: "loaded"; - payload: GossipRoom; -} - -interface MsgM2BUnloaded { - type: "unloaded"; - payload: { - room: string; - }; -} - -interface MsgM2BGossip { - type: "gossip"; - payload: { - rooms: GossipRoom[]; - }; -} - -interface MsgM2BRoomMsg { - type: "room_msg"; - payload: { - room: string; - client_id?: ClientId; - payload: T; - }; -} - -interface MsgM2BKick { - type: "kick"; - payload: { - client_id: ClientId; - reason: OttWebsocketError; - }; -} - interface GossipRoom extends RoomListItem {} diff --git a/server/generated.ts b/server/generated.ts new file mode 100644 index 000000000..e965dd9bd --- /dev/null +++ b/server/generated.ts @@ -0,0 +1,94 @@ +/* + Generated by typeshare 1.7.0 +*/ + +export type ClientId = string; + +export type RoomName = string; + +export type MonolithId = string; + +export interface B2MLoad { + room: RoomName; +} + +export interface B2MJoin { + room: RoomName; + client: ClientId; + token: string; +} + +export interface B2MLeave { + client: ClientId; +} + +export interface B2MClientMsg { + /** The client that sent the message. */ + client_id: ClientId; + /** The message that was received from the client, verbatim. */ + payload: T; +} + +export interface M2BInit { + /** The port that the monolith is listening for HTTP requests on. */ + port: number; +} + +export enum Visibility { + Public = "public", + Unlisted = "unlisted", + Private = "private", +} + +/** Metadata about a room, according to the Monolith. */ +export interface RoomMetadata { + name: RoomName; + title: string; + description: string; + isTemporary: boolean; + visibility: Visibility; + queueMode: string; + currentSource: unknown; + /** The number of clients in this room. */ + users: number; +} + +export interface M2BLoaded { + room: RoomMetadata; +} + +export interface M2BUnloaded { + name: RoomName; +} + +export interface M2BGossip { + rooms: RoomMetadata[]; +} + +export interface M2BRoomMsg { + /** The room to send the message to. */ + room: RoomName; + /** The client to send the message to. If `None`, send to all clients in the room. */ + client_id?: ClientId; + /** The message to send, verbatim. */ + payload: T; +} + +export interface M2BKick { + client_id: ClientId; + reason: number; +} + +export type MsgB2M = + | { type: "load"; payload: B2MLoad } + | { type: "join"; payload: B2MJoin } + | { type: "leave"; payload: B2MLeave } + | { type: "client_msg"; payload: B2MClientMsg }; + +export type MsgM2B = + | { type: "init"; payload: M2BInit } + | { type: "loaded"; payload: M2BLoaded } + | { type: "unloaded"; payload: M2BUnloaded } + | { type: "gossip"; payload: M2BGossip } + | { type: "room_msg"; payload: M2BRoomMsg } + | { type: "kick"; payload: M2BKick }; diff --git a/tools/balancer-tester/src/monolith.rs b/tools/balancer-tester/src/monolith.rs index e03d259dd..0de065a74 100644 --- a/tools/balancer-tester/src/monolith.rs +++ b/tools/balancer-tester/src/monolith.rs @@ -6,7 +6,7 @@ use tokio::sync::mpsc::{Receiver, Sender}; use tokio::task::JoinHandle; use tokio_tungstenite::tungstenite::protocol::Message; -use ott_balancer_protocol::monolith::{MsgB2M, MsgM2B, RoomMetadata, Visibility}; +use ott_balancer_protocol::monolith::*; use tracing::{info, warn}; pub struct SimMonolith { @@ -40,8 +40,8 @@ impl SimMonolith { (handle, inbound_tx, outbound_rx) } - fn build_message(&self, msg: MsgM2B) -> Message { - Message::text(serde_json::to_string(&msg).unwrap()) + fn build_message(&self, msg: impl Into) -> Message { + Message::text(serde_json::to_string(&msg.into()).unwrap()) } async fn handle_msg(&mut self, msg: Message, outbound_tx: &Sender) { @@ -49,44 +49,42 @@ impl SimMonolith { let req: MsgB2M = serde_json::from_str(text).unwrap(); match req { - MsgB2M::Load { room } => { - self.load_room(room.clone()); - let msg = MsgM2B::Loaded { - name: room, - metadata: generate_random_metadata(), - }; + MsgB2M::Load(msg) => { + self.load_room(msg.room.clone()); + let room = generate_random_metadata(&msg.room); + let msg = M2BLoaded { room }; outbound_tx.send(self.build_message(msg)).await.unwrap(); } - MsgB2M::Join { room, client, .. } => { - let room = match self.rooms.get_mut(&room) { + MsgB2M::Join(msg) => { + let room = match self.rooms.get_mut(&msg.room) { Some(room) => room, None => { - warn!("room {} not found, loading", room); - self.load_room(room.clone()); - self.rooms.get_mut(&room).unwrap() + warn!("room {} not found, loading", msg.room); + self.load_room(msg.room.clone()); + self.rooms.get_mut(&msg.room).unwrap() } }; - room.add_client(client); + room.add_client(msg.client); } - MsgB2M::Leave { client } => { - let room = self.find_client_room(client).unwrap(); + MsgB2M::Leave(msg) => { + let room = self.find_client_room(msg.client).unwrap(); let room = self.rooms.get_mut(&room).unwrap(); - room.remove_client(client); + room.remove_client(msg.client); } - MsgB2M::ClientMsg { client_id, payload } => { + MsgB2M::ClientMsg(msg) => { let room = self - .find_client_room(client_id) + .find_client_room(msg.client_id) .expect("client not found in any rooms"); info!( "{}: got message from client {}: {:?}", - room, client_id, payload + room, msg.client_id, msg.payload ); - if serde_json::from_str::(payload.get()).is_ok() { - let msg = MsgM2B::RoomMsg { + if serde_json::from_str::(msg.payload.get()).is_ok() { + let msg = M2BRoomMsg { room, - client_id: Some(client_id), - payload, + client_id: Some(msg.client_id), + payload: msg.payload, }; outbound_tx.send(self.build_message(msg)).await.unwrap(); } @@ -124,10 +122,12 @@ pub struct SimRoom { impl SimRoom { pub fn new(name: impl Into) -> Self { + let name = name.into(); + let metadata = generate_random_metadata(&name); Self { - name: name.into(), + name, clients: Vec::new(), - metadata: generate_random_metadata(), + metadata, } } @@ -147,8 +147,9 @@ pub struct SimpleEcho { pub echo: String, } -fn generate_random_metadata() -> RoomMetadata { +fn generate_random_metadata(room: &RoomName) -> RoomMetadata { RoomMetadata { + name: room.clone(), title: "foo".to_string(), description: "foo".to_string(), is_temporary: false,