Skip to content

Commit

Permalink
harness: keep track of client ids in emulated monoliths (#1128)
Browse files Browse the repository at this point in the history
* harness: keep track of client ids in emulated monoliths

* fix lints
  • Loading branch information
dyc3 authored Nov 6, 2023
1 parent 40909e8 commit d2c65e0
Showing 1 changed file with 46 additions and 3 deletions.
49 changes: 46 additions & 3 deletions crates/harness/src/monolith.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
collections::HashMap,
collections::{HashMap, HashSet},
net::{IpAddr, Ipv6Addr, SocketAddr},
pin::Pin,
sync::{
Expand All @@ -14,7 +14,7 @@ use http_body_util::{BodyExt, Full};
use hyper::{body::Incoming as IncomingBody, Request};
use hyper::{service::Service, Response};

use ott_balancer_protocol::{monolith::*, RoomName};
use ott_balancer_protocol::{monolith::*, ClientId, RoomName};
use tokio::{net::TcpListener, sync::Notify};
use tracing::warn;
use tungstenite::Message;
Expand Down Expand Up @@ -52,6 +52,7 @@ pub(crate) struct MonolithState {
response_mocks: HashMap<String, (MockRespParts, Bytes)>,
rooms: HashMap<RoomName, RoomMetadata>,
room_load_epoch: Arc<AtomicU32>,
clients: HashSet<ClientId>,
}

impl Monolith {
Expand Down Expand Up @@ -103,7 +104,21 @@ impl Monolith {
match msg {
Ok(msg) => {
println!("monolith: incoming msg: {}", msg);
state.lock().unwrap().received_raw.push(msg);
let mut state = state.lock().unwrap();
state.received_raw.push(msg.clone());
// TODO: there's a better way to generalize this
if let Message::Text(m) = msg {
let msg: MsgB2M = serde_json::from_str(&m).unwrap();
match msg {
MsgB2M::Join(join) => {
state.clients.insert(join.client);
},
MsgB2M::Leave(leave) => {
state.clients.remove(&leave.client);
},
_ => {},
}
}
_notif_recv.notify_one();
},
Err(e) => {
Expand Down Expand Up @@ -178,6 +193,10 @@ impl Monolith {
}
}

pub fn clients(&self) -> HashSet<ClientId> {
self.state.lock().unwrap().clients.clone()
}

/// Tell the provider to add this monolith to the list of available monoliths.
pub async fn show(&mut self) {
println!("showing monolith");
Expand Down Expand Up @@ -371,3 +390,27 @@ pub struct MockRequest {
pub headers: hyper::HeaderMap,
pub body: Bytes,
}

#[cfg(test)]
mod tests {
use test_context::test_context;

use crate::{Client, Monolith, TestRunner};

#[test_context(TestRunner)]
#[tokio::test]
async fn should_track_clients(ctx: &mut TestRunner) {
let mut m = Monolith::new(ctx).await.unwrap();
m.show().await;

let mut c1 = Client::new(ctx).unwrap();
c1.join("foo").await;

m.wait_recv().await;
assert_eq!(m.clients().len(), 1);

c1.disconnect().await;
m.wait_recv().await;
assert_eq!(m.clients().len(), 0);
}
}

0 comments on commit d2c65e0

Please sign in to comment.