Skip to content

Commit

Permalink
add a test for load epoch logic
Browse files Browse the repository at this point in the history
  • Loading branch information
dyc3 committed Oct 16, 2023
1 parent 6f586e3 commit 0ac0b0f
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 6 deletions.
39 changes: 38 additions & 1 deletion crates/harness-tests/src/routing.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::time::Duration;

use harness::{Client, MockRespParts, Monolith, TestRunner};
use harness::{Client, MockRespParts, Monolith, TestRunner, WebsocketSender};
use test_context::test_context;

#[test_context(TestRunner)]
Expand Down Expand Up @@ -147,3 +147,40 @@ async fn route_ws_to_correct_monolith_race(ctx: &mut TestRunner) {
tokio::time::sleep(Duration::from_millis(100)).await;
}
}

#[test_context(TestRunner)]
#[tokio::test]
async fn monolith_double_load_room(ctx: &mut TestRunner) {
println!("port: {}", ctx.port);
let mut m1 = Monolith::new(ctx).await.unwrap();
let mut m2 = Monolith::new(ctx).await.unwrap();
println!("m1: http {}", m1.http_port());
println!("m2: http {}", m2.http_port());
m1.mock_http_json(
"/api/room/foo",
MockRespParts::default(),
serde_json::json!({
"name": "foo",
}),
);

m1.show().await;
m2.show().await;

m1.load_room("foo").await;
m2.load_room("foo").await;

tokio::time::sleep(Duration::from_millis(100)).await;

let resp = reqwest::get(format!("http://[::1]:{}/api/room/foo", ctx.port))
.await
.expect("http request failed")
.error_for_status()
.expect("bad http status");

let reqs = m1.collect_mock_http();
assert_eq!(reqs.len(), 1);

let t = resp.text().await.expect("failed to read http response");
assert_eq!(t, "{\"name\":\"foo\"}");
}
17 changes: 12 additions & 5 deletions crates/ott-balancer-bin/src/balancer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,10 @@ impl Balancer {
if let Some(msg) = msg {
let ctx = self.ctx.clone();
let _ = tokio::task::Builder::new().name("dispatch monolith message").spawn(async move {
let id = *msg.id();
match dispatch_monolith_message(ctx, msg).await {
Ok(_) => {},
Err(err) => error!("failed to dispatch monolith message: {:?}", err)
Err(err) => error!("failed to dispatch monolith message {}: {:?}", id, err)
}
});
} else {
Expand Down Expand Up @@ -291,6 +292,7 @@ impl BalancerContext {
}

pub fn add_room(&mut self, room: Room, locator: RoomLocator) -> anyhow::Result<()> {
debug!("add_room {} {:?}", room.name(), locator);
let monolith = self
.monoliths
.get_mut(&locator.monolith_id())
Expand All @@ -305,6 +307,7 @@ impl BalancerContext {
room: &RoomName,
monolith_id: MonolithId,
) -> anyhow::Result<()> {
debug!("remove_room {}, {:?}", room, monolith_id);
let monolith = self
.monoliths
.get_mut(&monolith_id)
Expand Down Expand Up @@ -332,12 +335,16 @@ impl BalancerContext {
monolith_id: MonolithId,
load_epoch: u32,
) -> anyhow::Result<()> {
debug!(
"add_or_sync_room {}, {:?} load_epoch {}",
metadata.name, monolith_id, load_epoch
);
if let Some(locator) = self.rooms_to_monoliths.get(&metadata.name) {
if locator.load_epoch() > load_epoch {
// we already have a newer version of this room
if locator.load_epoch() < load_epoch {
// we already have an older version of this room
return Err(anyhow::anyhow!("room already loaded"));
} else if locator.monolith_id() < monolith_id {
// we have an older version of this room, remove it
} else if locator.monolith_id() > monolith_id {
// we have an newer version of this room, remove it
self.remove_room(&metadata.name, locator.monolith_id())
.await?;
}
Expand Down
4 changes: 4 additions & 0 deletions server/roommanager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ async function addRoom(room: Room) {
rooms.push(room);
const epoch = await redisClient.incr(LOAD_EPOCH_KEY);
room.loadEpoch = epoch;
if (epoch >= 2147483647) {
// ensure we don't overflow the integer
await redisClient.set(LOAD_EPOCH_KEY, 0);
}
bus.emit("load", room.name);
}

Expand Down

0 comments on commit 0ac0b0f

Please sign in to comment.