Skip to content

Commit

Permalink
add typeshare (#1103)
Browse files Browse the repository at this point in the history
* add typeshare to deps, add codegen script

* ci: add check for codegen

* run lint after codegen

* refactor how M2B and B2M messages are defined for easier typeshare

* more refactors

* more refactors

* rename B2M message types

* remove enum_derive

* rebase, more refactoring

* minor refactor
  • Loading branch information
dyc3 authored Oct 13, 2023
1 parent 3f855ad commit 074d9bb
Show file tree
Hide file tree
Showing 15 changed files with 435 additions and 209 deletions.
8 changes: 8 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ jobs:
env:
NODE_ENV: production

- name: cargo-install
uses: baptiste0928/cargo-install@v1
with:
crate: typeshare-cli
version: "1.7.0"
- name: Ensure generated code is up to date
run: ./scripts/codegen.sh && git diff --exit-code

test:
runs-on: ubuntu-latest
strategy:
Expand Down
3 changes: 2 additions & 1 deletion .prettierignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@
*.lock
*.log
*.env
**/tests/unit/fixtures/**
**/tests/unit/fixtures/**
server/generated.ts
84 changes: 84 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ futures-util = "0.3.28"
harness = { path = "crates/harness" }
harness_macros = { path = "crates/harness_macros" }
hyper = { version = "1.0.0-rc.4", features = ["full"] }
hyper-util = { git ="https://github.com/hyperium/hyper-util.git", rev = "f898015" }
hyper-util = { git = "https://github.com/hyperium/hyper-util.git", rev = "f898015" }
http-body-util = "0.1.0-rc.2"
once_cell = "1.17.1"
ott-common = { path = "crates/ott-common" }
Expand All @@ -30,5 +30,6 @@ tokio-util = "0.7.8"
tracing = "0.1.37"
tracing-subscriber = "0.3.17"
tungstenite = "0.19.0"
typeshare = "1.0.0"
url = "2.3.1"
uuid = { version = "1.3.0", features = ["serde", "v4"] }
5 changes: 2 additions & 3 deletions crates/harness-tests/src/routing.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::time::Duration;

use harness::{Client, MockRespParts, Monolith, TestRunner};

use test_context::test_context;

#[test_context(TestRunner)]
Expand All @@ -20,7 +19,7 @@ async fn route_http_to_correct_monolith(ctx: &mut TestRunner) {
);

m.show().await;
m.load_room("foo".to_string()).await;
m.load_room("foo").await;

// Without this sleep, this test can trigger a race condition where the client connects to the balancer before the monolith has the room loaded.
// This will cause the other monolith to get the room loaded, and the client will connect to that monolith instead.
Expand Down Expand Up @@ -100,7 +99,7 @@ async fn route_ws_to_correct_monolith(ctx: &mut TestRunner) {

let mut m = Monolith::new(ctx).await.unwrap();
m.show().await;
m.load_room("foo".to_string()).await;
m.load_room("foo").await;

// Without this sleep, this test can trigger a race condition where the client connects to the balancer before the monolith has the room loaded.
// This will cause the other monolith to get the room loaded, and the client will connect to that monolith instead.
Expand Down
16 changes: 4 additions & 12 deletions crates/harness/src/monolith.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,18 +241,10 @@ impl Monolith {

pub async fn load_room(&mut self, room: impl Into<RoomName> + Clone) {
let room = room.into();
let meta = RoomMetadata::default();
self.state
.lock()
.unwrap()
.rooms
.insert(room.clone(), meta.clone());
let meta = RoomMetadata::default_with_name(room.clone());
self.state.lock().unwrap().rooms.insert(room, meta.clone());
if self.connected() {
self.send(MsgM2B::Loaded {
name: room,
metadata: meta,
})
.await;
self.send(M2BLoaded { room: meta }).await;
}
}

Expand All @@ -264,7 +256,7 @@ impl Monolith {
.rooms
.remove(&room.clone().clone());
if self.connected() {
self.send(MsgM2B::Unloaded { room }).await;
self.send(M2BUnloaded { name: room }).await;
}
}
}
Expand Down
53 changes: 23 additions & 30 deletions crates/ott-balancer-bin/src/balancer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{collections::HashMap, sync::Arc};

use ott_balancer_protocol::monolith::{MsgB2M, MsgM2B, RoomMetadata};
use ott_balancer_protocol::monolith::{B2MClientMsg, B2MJoin, B2MLeave, MsgM2B, RoomMetadata};
use ott_balancer_protocol::*;
use rand::seq::IteratorRandom;
use serde_json::value::RawValue;
Expand Down Expand Up @@ -239,7 +239,7 @@ impl BalancerContext {
};
monolith.add_client(&client.room, client.id);
monolith
.send(&MsgB2M::Join {
.send(B2MJoin {
room: client.room.clone(),
client: client.id,
token: client.token.clone(),
Expand All @@ -253,7 +253,7 @@ impl BalancerContext {
pub async fn remove_client(&mut self, client_id: ClientId) -> anyhow::Result<()> {
let monolith = self.find_monolith_mut(client_id)?;
monolith.remove_client(client_id);
monolith.send(&MsgB2M::Leave { client: client_id }).await?;
monolith.send(B2MLeave { client: client_id }).await?;

Ok(())
}
Expand Down Expand Up @@ -311,13 +311,13 @@ impl BalancerContext {

pub fn add_or_sync_room(
&mut self,
room: &RoomName,
metadata: RoomMetadata,
monolith_id: MonolithId,
) -> anyhow::Result<()> {
let monolith = self.monoliths.get_mut(&monolith_id).unwrap();
self.rooms_to_monoliths.insert(room.clone(), monolith_id);
monolith.add_or_sync_room(room, metadata);
self.rooms_to_monoliths
.insert(metadata.name.clone(), monolith_id);
monolith.add_or_sync_room(metadata);

Ok(())
}
Expand Down Expand Up @@ -437,7 +437,7 @@ pub async fn dispatch_client_message(
};

monolith
.send(&MsgB2M::ClientMsg {
.send(B2MClientMsg {
client_id: *msg.id(),
payload: raw_value,
})
Expand Down Expand Up @@ -521,58 +521,51 @@ pub async fn dispatch_monolith_message(
monolith_id
);
}
MsgM2B::Loaded {
name: room,
metadata,
} => {
debug!("room loaded on {}: {:?}", monolith_id, room);
MsgM2B::Loaded(msg) => {
debug!("room loaded on {}: {:?}", monolith_id, msg.room.name);
let mut ctx_write = ctx.write().await;
ctx_write.add_or_sync_room(&room, metadata, *monolith_id)?;
ctx_write.add_or_sync_room(msg.room, *monolith_id)?;
}
MsgM2B::Unloaded { room } => {
MsgM2B::Unloaded(msg) => {
let mut ctx_write = ctx.write().await;
ctx_write.remove_room(&room, *monolith_id)?;
ctx_write.remove_room(&msg.name, *monolith_id)?;
}
MsgM2B::Gossip { rooms } => {
MsgM2B::Gossip(msg) => {
let mut ctx_write = ctx.write_owned().await;
let to_remove = ctx_write
.monoliths
.get_mut(monolith_id)
.unwrap()
.rooms()
.keys()
.filter(|room| !rooms.iter().any(|r| r.name == **room))
.filter(|room| !msg.rooms.iter().any(|r| r.name == **room))
.cloned()
.collect::<Vec<_>>();
debug!("to_remove: {:?}", to_remove);
for gossip_room in rooms.iter() {
for gossip_room in msg.rooms.iter() {
ctx_write
.rooms_to_monoliths
.insert(gossip_room.name.clone(), *monolith_id);
}
let monolith = ctx_write.monoliths.get_mut(monolith_id).unwrap();
for gossip_room in rooms {
monolith.add_or_sync_room(&gossip_room.name, gossip_room.metadata);
for gossip_room in msg.rooms {
monolith.add_or_sync_room(gossip_room);
}

let monolith = ctx_write.monoliths.get_mut(monolith_id).unwrap();
for room in to_remove {
monolith.remove_room(&room)
}
}
MsgM2B::RoomMsg {
room,
client_id: _,
payload,
} => {
MsgM2B::RoomMsg(msg) => {
let ctx_read = ctx.read().await;

let Some(room) = ctx_read
.monoliths
.get(monolith_id)
.unwrap()
.rooms()
.get(&room)
.get(&msg.room)
else {
anyhow::bail!("room not found on monolith");
};
Expand All @@ -582,7 +575,7 @@ pub async fn dispatch_monolith_message(
// broadcast to all clients
debug!("broadcasting to clients in room: {:?}", room.name());
// TODO: optimize this using a broadcast channel
let built_msg = Message::text(payload.to_string());
let built_msg = Message::text(msg.payload.to_string());
for client in room.clients() {
let Some(client) = ctx_read.clients.get(client) else {
anyhow::bail!("client not found");
Expand All @@ -591,14 +584,14 @@ pub async fn dispatch_monolith_message(
client.send(built_msg.clone()).await?;
}
}
MsgM2B::Kick { client_id, reason } => {
MsgM2B::Kick(msg) => {
let ctx_read = ctx.read().await;
let Some(client) = ctx_read.clients.get(&client_id) else {
let Some(client) = ctx_read.clients.get(&msg.client_id) else {
anyhow::bail!("client not found");
};
client
.send(Message::Close(Some(CloseFrame {
code: CloseCode::Library(reason),
code: CloseCode::Library(msg.reason),
reason: "".into(),
})))
.await?;
Expand Down
Loading

0 comments on commit 074d9bb

Please sign in to comment.