From 7e09ad853c277f108cd04c30f0226c6e9e0b8e72 Mon Sep 17 00:00:00 2001 From: Carson McManus Date: Wed, 21 Feb 2024 22:48:24 -0500 Subject: [PATCH] fix balancers not recognizing client disconnects (#1368) * fix balancers not recognizing client disconnects * Revert "fix balancers not recognizing client disconnects" This reverts commit 237f73d898b6dd1bd3feb7a16d01c1036c889c00. * turns out it this is all that's needed * remove flaky harness test because its easier to test with a load test --- crates/harness-tests/src/routing.rs | 59 ----------------------------- crates/ott-balancer/src/client.rs | 10 +++++ 2 files changed, 10 insertions(+), 59 deletions(-) diff --git a/crates/harness-tests/src/routing.rs b/crates/harness-tests/src/routing.rs index 00d8f3fc5..5fd0e60cb 100644 --- a/crates/harness-tests/src/routing.rs +++ b/crates/harness-tests/src/routing.rs @@ -124,65 +124,6 @@ async fn route_ws_to_correct_monolith(ctx: &mut TestRunner) { assert_eq!(recvd.len(), 1); } -#[test_context(TestRunner)] -#[tokio::test] -async fn route_ws_to_correct_monolith_race(ctx: &mut TestRunner) { - // smoke test for the possible race condition where a room is loaded and a client joins at the same time - - let mut dummy = Monolith::new(ctx).await.unwrap(); - dummy.show().await; - - let mut m = Monolith::new(ctx).await.unwrap(); - m.show().await; - tokio::time::sleep(Duration::from_millis(200)).await; // ensure that the monoliths are fully connected before sending the room load message - - for i in 0..100 { - let room_name = format!("foo{}", i); - println!("iteration: {}", room_name); - m.load_room(room_name.clone()).await; - - let mut client = Client::new(ctx).unwrap(); - client.join(room_name.clone()).await; - - println!("waiting for monolith to receive join message"); - // this more accurately emulates what the client would actually do - loop { - tokio::select! { - result = tokio::time::timeout(Duration::from_secs(1), m.wait_recv()) => { - result.expect("msg recv timeout"); - break; - }, - result = tokio::time::timeout(Duration::from_secs(1), dummy.wait_recv()) => { - result.expect("msg recv timeout"); - println!("dummy received message"); - tokio::time::timeout(Duration::from_millis(100), dummy.wait_recv()).await.expect("dummy never received unload message"); // wait for unload message - continue; // because we are waiting for the client to reconnect - }, - _ = client.wait_for_disconnect() => { - println!("client disconnected, retrying ====================================="); - client.join(room_name.clone()).await; - continue; - } - }; - } - - let recvd = m.collect_recv(); - assert_eq!( - recvd.len(), - 1, - "expected exactly one message, got {:?}", - recvd - ); - if let MsgB2M::Join(m) = &recvd[0] { - assert_eq!(m.room, room_name.into()); - } else { - panic!("expected join message, got {:?}", recvd[0]) - } - m.clear_recv(); - dummy.clear_recv(); - } -} - #[test_context(TestRunner)] #[tokio::test] async fn monolith_double_load_room(ctx: &mut TestRunner) { diff --git a/crates/ott-balancer/src/client.rs b/crates/ott-balancer/src/client.rs index 351b250eb..ced9645ed 100644 --- a/crates/ott-balancer/src/client.rs +++ b/crates/ott-balancer/src/client.rs @@ -237,6 +237,16 @@ pub async fn client_entry<'r>( } info!("ending client connection"); + client_link + .room_tx + .send(Context::new( + client_id, + SocketMessage::Message(Message::Close(Some(CloseFrame { + code: CloseCode::Normal, + reason: "client connection ended".into(), + }))), + )) + .await?; Ok(()) }