From 4ce59557b56943eb8b5d7dc45f97eda3f0ae0104 Mon Sep 17 00:00:00 2001 From: dswij Date: Fri, 19 Jan 2024 20:15:16 +0800 Subject: [PATCH] feat: not returning UnexpectedEof when client drop without close_notify --- src/proto/connection.rs | 20 +++++++++-- src/proto/streams/buffer.rs | 4 +++ src/proto/streams/streams.rs | 13 +++++++ tests/h2-tests/tests/client_request.rs | 50 ++++++++++++++++++++++++++ 4 files changed, 84 insertions(+), 3 deletions(-) diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 5d6b9d2b1..8627375ae 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -461,13 +461,27 @@ where // active streams must be reset. // // TODO: Are I/O errors recoverable? - Err(Error::Io(e, inner)) => { - tracing::debug!(error = ?e, "Connection::poll; IO error"); - let e = Error::Io(e, inner); + Err(Error::Io(kind, inner)) => { + tracing::debug!(error = ?kind, "Connection::poll; IO error"); + let e = Error::Io(kind, inner); // Reset all active streams self.streams.handle_error(e.clone()); + // Some client implementations drop the connections without notifying its peer + // Attempting to read after the client dropped the connection results in UnexpectedEof + // If as a server, we don't have anything more to send, just close the connection + // without error + // + // See https://github.com/hyperium/hyper/issues/3427 + if self.streams.is_server() + && self.streams.is_buffer_empty() + && matches!(kind, io::ErrorKind::UnexpectedEof) + { + *self.state = State::Closed(Reason::NO_ERROR, Initiator::Library); + return Ok(()); + } + // Return the error Err(e) } diff --git a/src/proto/streams/buffer.rs b/src/proto/streams/buffer.rs index 2648a410e..02d265061 100644 --- a/src/proto/streams/buffer.rs +++ b/src/proto/streams/buffer.rs @@ -29,6 +29,10 @@ impl Buffer { pub fn new() -> Self { Buffer { slab: Slab::new() } } + + pub fn is_empty(&self) -> bool { + self.slab.is_empty() + } } impl Deque { diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index f4b12c7bb..fa8e6843b 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -323,6 +323,14 @@ where } impl DynStreams<'_, B> { + pub fn is_buffer_empty(&self) -> bool { + self.send_buffer.is_empty() + } + + pub fn is_server(&self) -> bool { + self.peer.is_server() + } + pub fn recv_headers(&mut self, frame: frame::Headers) -> Result<(), Error> { let mut me = self.inner.lock().unwrap(); @@ -1509,6 +1517,11 @@ impl SendBuffer { let inner = Mutex::new(Buffer::new()); SendBuffer { inner } } + + pub fn is_empty(&self) -> bool { + let buf = self.inner.lock().unwrap(); + buf.is_empty() + } } // ===== impl Actions ===== diff --git a/tests/h2-tests/tests/client_request.rs b/tests/h2-tests/tests/client_request.rs index 3d285ce2c..7bd223e3c 100644 --- a/tests/h2-tests/tests/client_request.rs +++ b/tests/h2-tests/tests/client_request.rs @@ -1773,6 +1773,56 @@ async fn receive_settings_frame_twice_with_second_one_empty() { join(srv, h2).await; } +#[tokio::test] +async fn receive_settings_frame_twice_with_second_one_non_empty() { + h2_support::trace_init!(); + let (io, mut srv) = mock::new(); + + let srv = async move { + // Send the initial SETTINGS frame with MAX_CONCURRENT_STREAMS set to 42 + srv.send_frame(frames::settings().max_concurrent_streams(42)) + .await; + + // Handle the client's connection preface + srv.read_preface().await.unwrap(); + match srv.next().await { + Some(frame) => match frame.unwrap() { + h2::frame::Frame::Settings(_) => { + let ack = frame::Settings::ack(); + srv.send(ack.into()).await.unwrap(); + } + frame => { + panic!("unexpected frame: {:?}", frame); + } + }, + None => { + panic!("unexpected EOF"); + } + } + + // Should receive the ack for the server's initial SETTINGS frame + let frame = assert_settings!(srv.next().await.unwrap().unwrap()); + assert!(frame.is_ack()); + + // Send another SETTINGS frame with no MAX_CONCURRENT_STREAMS + // This should not update the max_concurrent_send_streams value that + // the client manages. + srv.send_frame(frames::settings().max_concurrent_streams(2024)) + .await; + }; + + let h2 = async move { + let (_client, h2) = client::handshake(io).await.unwrap(); + let mut h2 = std::pin::pin!(h2); + assert_eq!(h2.max_concurrent_send_streams(), usize::MAX); + h2.as_mut().await.unwrap(); + // The most-recently advertised value should be used + assert_eq!(h2.max_concurrent_send_streams(), 2024); + }; + + join(srv, h2).await; +} + #[tokio::test] async fn server_drop_connection_unexpectedly_return_unexpected_eof_err() { h2_support::trace_init!();