Skip to content

Commit

Permalink
Add tests for websocket_server
Browse files Browse the repository at this point in the history
  • Loading branch information
esensar committed Jan 24, 2025
1 parent eef4bf8 commit b8853a2
Show file tree
Hide file tree
Showing 2 changed files with 168 additions and 3 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -799,7 +799,8 @@ sinks-statsd = ["sinks-utils-udp", "tokio-util/net"]
sinks-utils-udp = []
sinks-vector = ["sinks-utils-udp", "dep:tonic", "protobuf-build", "dep:prost"]
sinks-websocket = ["dep:tokio-tungstenite"]
sinks-websocket-server = ["dep:tokio-tungstenite"]
sinks-websocket-server = ["dep:tokio-tungstenite", "sources-utils-http-auth",
"sources-utils-http-error", "sources-utils-http-prelude"]
sinks-webhdfs = ["dep:opendal"]

# Identifies that the build is a nightly build
Expand Down
168 changes: 166 additions & 2 deletions src/sinks/websocket_server/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,7 @@ impl WebSocketListenerSink {
let header_callback = |req: &Request, response: Response| match auth.is_valid(
&req.headers()
.get(AUTHORIZATION)
.map(|h| h.to_str().ok())
.flatten()
.and_then(|h| h.to_str().ok())
.map(ToString::to_string),
) {
Ok(_) => Ok(response),
Expand Down Expand Up @@ -227,3 +226,168 @@ impl StreamSink<Event> for WebSocketListenerSink {
Ok(())
}
}

#[cfg(test)]
mod tests {
use futures::{channel::mpsc::UnboundedReceiver, SinkExt, Stream, StreamExt};
use futures_util::stream;
use std::future::ready;

use tokio::{task::JoinHandle, time};
use vector_lib::sink::VectorSink;

use super::*;

use crate::{
event::{Event, LogEvent},
test_util::{
components::{run_and_assert_sink_compliance, SINK_TAGS},
next_addr,
},
};

#[tokio::test]
async fn test_single_client() {
let event = Event::Log(LogEvent::from("foo"));

let (mut sender, input_events) = build_test_event_channel();
let address = next_addr();
let port = address.port();

let websocket_sink = start_websocket_server_sink(
WebSocketListenerSinkConfig {
address,
..Default::default()
},
input_events,
)
.await;

let client_handle = attach_websocket_client(port, vec![event.clone()]).await;
sender.send(event).await.expect("Failed to send.");

client_handle.await.unwrap();
drop(sender);
websocket_sink.await.unwrap();
}

#[tokio::test]
async fn test_single_client_late_connect() {
let event1 = Event::Log(LogEvent::from("foo1"));
let event2 = Event::Log(LogEvent::from("foo2"));

let (mut sender, input_events) = build_test_event_channel();
let address = next_addr();
let port = address.port();

let websocket_sink = start_websocket_server_sink(
WebSocketListenerSinkConfig {
address,
..Default::default()
},
input_events,
)
.await;

// Sending event 1 before client joined, the client should not received it
sender.send(event1).await.expect("Failed to send.");

// Now connect the client
let client_handle = attach_websocket_client(port, vec![event2.clone()]).await;

// Sending event 2, this one should be received by the client
sender.send(event2).await.expect("Failed to send.");

client_handle.await.unwrap();
drop(sender);
websocket_sink.await.unwrap();
}

#[tokio::test]
async fn test_multiple_clients() {
let event = Event::Log(LogEvent::from("foo"));

let (mut sender, input_events) = build_test_event_channel();
let address = next_addr();
let port = address.port();

let websocket_sink = start_websocket_server_sink(
WebSocketListenerSinkConfig {
address,
..Default::default()
},
input_events,
)
.await;

let client_handle_1 = attach_websocket_client(port, vec![event.clone()]).await;
let client_handle_2 = attach_websocket_client(port, vec![event.clone()]).await;
sender.send(event).await.expect("Failed to send.");

client_handle_1.await.unwrap();
client_handle_2.await.unwrap();
drop(sender);
websocket_sink.await.unwrap();
}

#[tokio::test]
async fn sink_spec_compliance() {
let event = Event::Log(LogEvent::from("foo"));

let sink = WebSocketListenerSink::new(WebSocketListenerSinkConfig {
address: next_addr(),
..Default::default()
})
.unwrap();

run_and_assert_sink_compliance(
VectorSink::from_event_streamsink(sink),
stream::once(ready(event)),
&SINK_TAGS,
)
.await;
}

async fn start_websocket_server_sink<S>(
config: WebSocketListenerSinkConfig,
events: S,
) -> JoinHandle<()>
where
S: Stream<Item = Event> + Send + 'static,
{
let sink = WebSocketListenerSink::new(config).unwrap();

let compliance_assertion = tokio::spawn(run_and_assert_sink_compliance(
VectorSink::from_event_streamsink(sink),
events,
&SINK_TAGS,
));

time::sleep(time::Duration::from_millis(100)).await;

compliance_assertion
}

async fn attach_websocket_client(port: u16, expected_events: Vec<Event>) -> JoinHandle<()> {
let (ws_stream, _) = tokio_tungstenite::connect_async(format!("ws://localhost:{port}"))
.await
.expect("Client failed to connect.");
let (_, rx) = ws_stream.split();
tokio::spawn(async move {
let events = expected_events.clone();
rx.take(events.len())
.zip(stream::iter(events))
.for_each(|(msg, expected)| async {
let msg_text = msg.unwrap().into_text().unwrap();
let expected = serde_json::to_string(expected.into_log().value()).unwrap();
assert_eq!(expected, msg_text);
})
.await;
})
}

fn build_test_event_channel() -> (UnboundedSender<Event>, UnboundedReceiver<Event>) {
let (tx, rx) = futures::channel::mpsc::unbounded();
(tx, rx)
}
}

0 comments on commit b8853a2

Please sign in to comment.