Skip to content

Commit

Permalink
fix balancers not recognizing client disconnects (#1368)
Browse files Browse the repository at this point in the history
* fix balancers not recognizing client disconnects

* Revert "fix balancers not recognizing client disconnects"

This reverts commit 237f73d.

* turns out it this is all that's needed

* remove flaky harness test because its easier to test with a load test
  • Loading branch information
dyc3 authored Feb 22, 2024
1 parent 326f451 commit 7e09ad8
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 59 deletions.
59 changes: 0 additions & 59 deletions crates/harness-tests/src/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,65 +124,6 @@ async fn route_ws_to_correct_monolith(ctx: &mut TestRunner) {
assert_eq!(recvd.len(), 1);
}

#[test_context(TestRunner)]
#[tokio::test]
async fn route_ws_to_correct_monolith_race(ctx: &mut TestRunner) {
// smoke test for the possible race condition where a room is loaded and a client joins at the same time

let mut dummy = Monolith::new(ctx).await.unwrap();
dummy.show().await;

let mut m = Monolith::new(ctx).await.unwrap();
m.show().await;
tokio::time::sleep(Duration::from_millis(200)).await; // ensure that the monoliths are fully connected before sending the room load message

for i in 0..100 {
let room_name = format!("foo{}", i);
println!("iteration: {}", room_name);
m.load_room(room_name.clone()).await;

let mut client = Client::new(ctx).unwrap();
client.join(room_name.clone()).await;

println!("waiting for monolith to receive join message");
// this more accurately emulates what the client would actually do
loop {
tokio::select! {
result = tokio::time::timeout(Duration::from_secs(1), m.wait_recv()) => {
result.expect("msg recv timeout");
break;
},
result = tokio::time::timeout(Duration::from_secs(1), dummy.wait_recv()) => {
result.expect("msg recv timeout");
println!("dummy received message");
tokio::time::timeout(Duration::from_millis(100), dummy.wait_recv()).await.expect("dummy never received unload message"); // wait for unload message
continue; // because we are waiting for the client to reconnect
},
_ = client.wait_for_disconnect() => {
println!("client disconnected, retrying =====================================");
client.join(room_name.clone()).await;
continue;
}
};
}

let recvd = m.collect_recv();
assert_eq!(
recvd.len(),
1,
"expected exactly one message, got {:?}",
recvd
);
if let MsgB2M::Join(m) = &recvd[0] {
assert_eq!(m.room, room_name.into());
} else {
panic!("expected join message, got {:?}", recvd[0])
}
m.clear_recv();
dummy.clear_recv();
}
}

#[test_context(TestRunner)]
#[tokio::test]
async fn monolith_double_load_room(ctx: &mut TestRunner) {
Expand Down
10 changes: 10 additions & 0 deletions crates/ott-balancer/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,16 @@ pub async fn client_entry<'r>(
}

info!("ending client connection");
client_link
.room_tx
.send(Context::new(
client_id,
SocketMessage::Message(Message::Close(Some(CloseFrame {
code: CloseCode::Normal,
reason: "client connection ended".into(),
}))),
))
.await?;

Ok(())
}

0 comments on commit 7e09ad8

Please sign in to comment.