Skip to content

Commit

Permalink
balancer: unload duplicate rooms (#1131)
Browse files Browse the repository at this point in the history
* balancer: unload duplicate rooms

* fix lints
  • Loading branch information
dyc3 authored Nov 6, 2023
1 parent d2c65e0 commit 32e8c2e
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 2 deletions.
1 change: 1 addition & 0 deletions crates/harness-tests/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use test_context::test_context;

mod connection;
mod routing;
mod state;

#[test_context(TestRunner)]
#[tokio::test]
Expand Down
50 changes: 50 additions & 0 deletions crates/harness-tests/src/state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
//! Tests for how the balancer handles room state in the context of managing rooms on monoliths.
use std::time::Duration;

use harness::{Client, Monolith, TestRunner};
use ott_balancer_protocol::monolith::{B2MUnload, MsgB2M};
use test_context::test_context;

#[test_context(TestRunner)]
#[tokio::test]
async fn should_unload_duplicate_rooms(ctx: &TestRunner) {
let mut m1 = Monolith::new(ctx).await.unwrap();
let mut m2 = Monolith::new(ctx).await.unwrap();

m1.show().await;
m2.show().await;

m1.load_room("foo").await;
m2.load_room("foo").await;

m2.wait_recv().await;

let recv = m2.collect_recv();
assert_eq!(recv.len(), 1);
assert!(matches!(recv[0], MsgB2M::Unload(B2MUnload { .. })));
}

#[test_context(TestRunner)]
#[tokio::test]
async fn should_unload_duplicate_rooms_and_route_correctly(ctx: &TestRunner) {
let mut m1 = Monolith::new(ctx).await.unwrap();
let mut m2 = Monolith::new(ctx).await.unwrap();

m1.show().await;
m2.show().await;

m1.load_room("foo").await;
m2.load_room("foo").await;

tokio::time::timeout(Duration::from_millis(100), m2.wait_recv())
.await
.expect("timed out waiting for unload");

let mut c = Client::new(ctx).unwrap();
c.join("foo").await;

tokio::time::timeout(Duration::from_millis(100), m1.wait_recv())
.await
.expect("timed out waiting for client join");
}
15 changes: 13 additions & 2 deletions crates/ott-balancer-bin/src/balancer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::{collections::HashMap, sync::Arc};

use ott_balancer_protocol::monolith::{B2MClientMsg, B2MJoin, B2MLeave, MsgM2B, RoomMetadata};
use ott_balancer_protocol::monolith::{
B2MClientMsg, B2MJoin, B2MLeave, B2MUnload, MsgM2B, RoomMetadata,
};
use ott_balancer_protocol::*;
use rand::seq::IteratorRandom;
use serde_json::value::RawValue;
Expand Down Expand Up @@ -343,12 +345,15 @@ impl BalancerContext {
match locator.load_epoch().cmp(&load_epoch) {
std::cmp::Ordering::Less => {
// we already have an older version of this room
self.unload_room(monolith_id, metadata.name.clone()).await?;
return Err(anyhow::anyhow!("room already loaded"));
}
std::cmp::Ordering::Greater => {
// we have an newer version of this room, remove it
self.remove_room(&metadata.name, locator.monolith_id())
self.unload_room(locator.monolith_id(), metadata.name.clone())
.await?;
// self.remove_room(&metadata.name, locator.monolith_id())
// .await?;
}
_ => {}
}
Expand Down Expand Up @@ -414,6 +419,12 @@ impl BalancerContext {
.ok_or(anyhow::anyhow!("no monoliths available"))?;
Ok(selected)
}

pub async fn unload_room(&self, monolith: MonolithId, room: RoomName) -> anyhow::Result<()> {
let monolith = self.monoliths.get(&monolith).unwrap();
monolith.send(B2MUnload { room }).await?;
Ok(())
}
}

pub async fn join_client(
Expand Down
6 changes: 6 additions & 0 deletions crates/ott-balancer-protocol/src/monolith.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ impl From<B2MLoad> for MsgB2M {
}
}

impl From<B2MUnload> for MsgB2M {
fn from(val: B2MUnload) -> Self {
Self::Unload(val)
}
}

impl From<B2MJoin> for MsgB2M {
fn from(val: B2MJoin) -> Self {
Self::Join(val)
Expand Down

0 comments on commit 32e8c2e

Please sign in to comment.