Skip to content

Commit

Permalink
fix sse test
Browse files Browse the repository at this point in the history
  • Loading branch information
Fraser999 committed Oct 20, 2023
1 parent 9d4a881 commit 65fa46f
Showing 1 changed file with 45 additions and 8 deletions.
53 changes: 45 additions & 8 deletions node/src/components/event_stream_server/tests.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
use std::{
collections::HashMap,
error::Error,
fs, io, iter, str,
fs, io,
iter::{self, FromIterator},
str,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};

use bytes::{Buf, Bytes};
use futures::{join, StreamExt};
use http::StatusCode;
use pretty_assertions::assert_eq;
Expand Down Expand Up @@ -466,6 +469,24 @@ async fn subscribe_no_sync(
handle_response(response, final_event_id, client_id).await
}

/// Converts some bytes to a `String`.
///
/// If `maybe_previous_bytes` is `Some`, these bytes are prepended to `new_bytes`. If a string
/// cannot be constructed from the resulting bytes, the bytes are returned as an `Err`.
fn bytes_to_string(
maybe_previous_bytes: &mut Option<Bytes>,
new_bytes: Bytes,
) -> Result<String, Bytes> {
let bytes = if let Some(previous_bytes) = maybe_previous_bytes.take() {
Bytes::from_iter(previous_bytes.chain(new_bytes))
} else {
new_bytes
};
str::from_utf8(bytes.as_ref())
.map(ToString::to_string)
.map_err(|_| bytes)
}

/// Handles a response from the server.
async fn handle_response(
response: Response,
Expand All @@ -487,12 +508,21 @@ async fn handle_response(
let mut stream = response.bytes_stream();
let final_id_line = format!("id:{}", final_event_id);
let keepalive = ":";
let mut temp_bytes: Option<Bytes> = None;
while let Some(item) = stream.next().await {
// If the server crashes or returns an error in the stream, it is caught here as `item` will
// be an `Err`.
let bytes = item?;
let chunk = str::from_utf8(bytes.as_ref()).unwrap();
response_text.push_str(chunk);
// If the server crashes or returns an error in the stream, it is caught here as `item`
// will be an `Err`.
let new_bytes = item?;
let chunk = match bytes_to_string(&mut temp_bytes, new_bytes) {
Ok(chunk) => chunk,
Err(bytes) => {
// We got a chunk splitting a unicode scalar value - dump the data to `temp_bytes`
// and get the next chunk from the stream.
temp_bytes = Some(bytes);
continue;
}
};
response_text.push_str(&chunk);
if let Some(line) = response_text
.lines()
.find(|&line| line == final_id_line || line == keepalive)
Expand Down Expand Up @@ -740,10 +770,17 @@ async fn lagging_clients_should_be_disconnected() {

let mut stream = response.bytes_stream();
let pause_between_events = Duration::from_secs(100) / MAX_EVENT_COUNT;
let mut temp_bytes: Option<Bytes> = None;
while let Some(item) = stream.next().await {
// The function is expected to exit here with an `UnexpectedEof` error.
let bytes = item?;
let chunk = str::from_utf8(bytes.as_ref()).unwrap();
let new_bytes = item?;
let chunk = match bytes_to_string(&mut temp_bytes, new_bytes) {
Ok(chunk) => chunk,
Err(bytes) => {
temp_bytes = Some(bytes);
continue;
}
};
if chunk.lines().any(|line| line == ":") {
debug!("{} received keepalive: exiting", client_id);
break;
Expand Down

0 comments on commit 65fa46f

Please sign in to comment.