From bbae2b9217e006a6aff2733e259f90a32339d9e9 Mon Sep 17 00:00:00 2001 From: Patrick Owen Date: Fri, 27 Oct 2023 19:52:48 -0400 Subject: [PATCH] Update quinn and rustls rustls: 0.20.6 -> 0.21.7 quinn: 0.8.3 -> 0.10.2 --- Cargo.toml | 2 +- client/Cargo.toml | 2 +- client/src/net.rs | 48 +++++++++++----------- common/src/codec.rs | 2 +- server/Cargo.toml | 2 +- server/src/lib.rs | 98 ++++++++++++++++++++++++++++----------------- 6 files changed, 89 insertions(+), 65 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5e0f35f9..ef9797b3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,7 @@ members = ["client", "server", "common", "save", "save/gen-protos"] [workspace.dependencies] hecs = "0.10.0" nalgebra = { version = "0.32.1", features = ["libm-force"] } -quinn = "0.8.3" +quinn = "0.10.2" toml = { version = "0.8.0", default-features = false, features = ["parse"] } [profile.dev] diff --git a/client/Cargo.toml b/client/Cargo.toml index 28a516de..d9abb76b 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -31,7 +31,7 @@ fxhash = "0.2.1" downcast-rs = "1.1.1" quinn = { workspace = true } futures-util = "0.3.1" -rustls = { version = "0.20.6", features = ["dangerous_configuration"] } +rustls = { version = "0.21.7", features = ["dangerous_configuration"] } webpki = "0.22.0" hecs = { workspace = true } rcgen = { version = "0.11.0", default-features = false } diff --git a/client/src/net.rs b/client/src/net.rs index 25e458bf..5832aed5 100644 --- a/client/src/net.rs +++ b/client/src/net.rs @@ -1,7 +1,6 @@ use std::{sync::Arc, thread}; use anyhow::{anyhow, Error, Result}; -use futures_util::{StreamExt, TryStreamExt}; use tokio::sync::mpsc; use common::{codec, proto}; @@ -63,16 +62,12 @@ async fn inner( endpoint: quinn::Endpoint, ) -> Result<()> { let server = cfg.server.unwrap(); - let quinn::NewConnection { - connection, - mut uni_streams, - .. - } = endpoint.connect(server, "localhost").unwrap().await?; + let connection = endpoint.connect(server, "localhost").unwrap().await?; // Open the first stream for our hello message let clienthello_stream = connection.open_uni().await?; // Start sending commands asynchronously - tokio::spawn(handle_outgoing(outgoing, connection)); + tokio::spawn(handle_outgoing(outgoing, connection.clone())); // Actually send the hello message codec::send_whole( clienthello_stream, @@ -82,9 +77,9 @@ async fn inner( ) .await?; - let mut ordered = uni_streams.next().await.unwrap()?; + let mut ordered = connection.accept_uni().await?; // Handle unordered messages - tokio::spawn(handle_unordered(incoming.clone(), uni_streams)); + tokio::spawn(handle_unordered(incoming.clone(), connection)); // Receive the server's hello message let hello = codec::recv::(&mut ordered) @@ -116,23 +111,26 @@ async fn handle_outgoing( } /// Receive unordered messages from the server -async fn handle_unordered( - incoming: mpsc::UnboundedSender, - uni_streams: quinn::IncomingUniStreams, -) -> Result<()> { - let mut msgs = uni_streams - .map(|stream| async { - let stream = stream?; - codec::recv_whole::(2usize.pow(16), stream).await - }) - .buffer_unordered(128); - // TODO: Don't silently die on parse errors - while let Some(msg) = msgs.try_next().await? { - // Ignore errors so we don't panic if the simulation thread goes away between checking - // `msgs` and here. - let _ = incoming.send(Message::StateDelta(msg)); +async fn handle_unordered(incoming: mpsc::UnboundedSender, connection: quinn::Connection) { + loop { + let Ok(stream) = connection.accept_uni().await else { + // accept_uni should only fail if the connection is closed, which is already handled elsewhere. + return; + }; + let incoming = incoming.clone(); + let connection = connection.clone(); + tokio::spawn(async move { + match codec::recv_whole::(2usize.pow(16), stream).await { + Err(e) => { + tracing::error!("Error when parsing unordered stream from server: {e}"); + connection.close(1u32.into(), b"could not process stream"); + } + Ok(msg) => { + let _ = incoming.send(Message::StateDelta(msg)); + } + } + }); } - Ok(()) } struct AcceptAnyCert; diff --git a/common/src/codec.rs b/common/src/codec.rs index d21f1cbc..3535d769 100644 --- a/common/src/codec.rs +++ b/common/src/codec.rs @@ -47,7 +47,7 @@ pub async fn send_whole( /// Receive the entirety of `stream` as a `T` pub async fn recv_whole( size_limit: usize, - stream: quinn::RecvStream, + mut stream: quinn::RecvStream, ) -> Result { let buf = stream.read_to_end(size_limit).await?; Ok(bincode::deserialize(&buf)?) diff --git a/server/Cargo.toml b/server/Cargo.toml index 15531c38..702c1cfd 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -27,6 +27,6 @@ fxhash = "0.2.1" nalgebra = { workspace = true } libm = "0.2.6" slotmap = "1.0.6" -rustls = "0.20.6" +rustls = "0.21.7" rustls-pemfile = "1.0.0" save = { path = "../save" } diff --git a/server/src/lib.rs b/server/src/lib.rs index 8e411a3a..dc58e12a 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -8,7 +8,7 @@ mod sim; use std::{net::UdpSocket, sync::Arc, time::Instant}; use anyhow::{Context, Error, Result}; -use futures::{select, StreamExt, TryStreamExt}; +use futures::{select, StreamExt}; use hecs::Entity; use slotmap::DenseSlotMap; use tokio::sync::mpsc; @@ -32,15 +32,16 @@ pub async fn run(net: NetParams, mut sim: SimConfig, save: Save) -> Result<()> { let server_config = quinn::ServerConfig::with_single_cert(net.certificate_chain, net.private_key) .context("parsing certificate")?; - let (endpoint, incoming) = quinn::Endpoint::new( + let endpoint = quinn::Endpoint::new( quinn::EndpointConfig::default(), Some(server_config), net.socket, + quinn::default_runtime().unwrap(), )?; info!(address = %endpoint.local_addr().unwrap(), "listening"); let server = Server::new(sim, save); - server.run(incoming).await; + server.run(endpoint).await; Ok(()) } @@ -62,11 +63,9 @@ impl Server { } } - async fn run(mut self, incoming: quinn::Incoming) { + async fn run(mut self, endpoint: quinn::Endpoint) { let mut ticks = IntervalStream::new(tokio::time::interval(self.cfg.step_interval)).fuse(); - let mut incoming = incoming - .inspect(|x| trace!(address = %x.remote_address(), "connection incoming")) - .buffer_unordered(16); + let mut incoming = ReceiverStream::new(self.handle_incoming(endpoint)).fuse(); let (client_events_send, client_events) = mpsc::channel(128); let mut client_events = ReceiverStream::new(client_events).fuse(); loop { @@ -78,6 +77,27 @@ impl Server { } } + fn handle_incoming(&self, endpoint: quinn::Endpoint) -> mpsc::Receiver { + let (incoming_send, incoming_recv) = mpsc::channel(16); + tokio::spawn(async move { + while let Some(conn) = endpoint.accept().await { + trace!(address = %conn.remote_address(), "connection incoming"); + let incoming_send = incoming_send.clone(); + tokio::spawn(async move { + match conn.await { + Err(e) => { + error!("incoming connection failed: {}", e.to_string()); + } + Ok(connection) => { + let _ = incoming_send.send(connection).await; + } + } + }); + } + }); + incoming_recv + } + fn on_step(&mut self) { let now = Instant::now(); // Apply queued inputs @@ -135,7 +155,10 @@ impl Server { fn on_client_event(&mut self, client_id: ClientId, event: ClientEvent) { let span = error_span!("client", id = ?client_id.0); let _guard = span.enter(); - let client = &mut self.clients[client_id]; + let Some(client) = self.clients.get_mut(client_id) else { + // Skip messages from cleaned-up clients + return; + }; match event { ClientEvent::Hello(hello) => { assert!(client.handles.is_none()); @@ -186,25 +209,19 @@ impl Server { fn on_connect( &mut self, - conn: Result, + connection: quinn::Connection, mut send: mpsc::Sender<(ClientId, ClientEvent)>, ) { - let quinn::NewConnection { - connection, - uni_streams, - .. - } = match conn { - Ok(x) => x, - Err(e) => { - error!("incoming connection failed: {}", e); - return; - } - }; let id = self.clients.insert(Client::new(connection.clone())); info!(id = ?id.0, address = %connection.remote_address(), "connection established"); tokio::spawn(async move { - if let Err(e) = drive_recv(id, uni_streams, &mut send).await { + if let Err(e) = drive_recv(id, connection, &mut send).await { + // drive_recv returns an error when any connection-terminating issue occurs, so we + // send a `Lost` message to ensure the client is cleaned up. Note that this message may + // be redundant, as dropping a slow client also sends a `Lost` message. let _ = send.send((id, ClientEvent::Lost(e))).await; + } else { + unreachable!("Graceful disconnects are not implemented.") } }); } @@ -214,26 +231,35 @@ const MAX_CLIENT_MSG_SIZE: usize = 1 << 16; async fn drive_recv( id: ClientId, - mut streams: quinn::IncomingUniStreams, + connection: quinn::Connection, send: &mut mpsc::Sender<(ClientId, ClientEvent)>, ) -> Result<()> { - let hello = match streams.next().await { - None => return Ok(()), - Some(stream) => { - codec::recv_whole::(MAX_CLIENT_MSG_SIZE, stream?).await? - } - }; + let stream = connection.accept_uni().await.map_err(Error::msg)?; + let hello = codec::recv_whole::(MAX_CLIENT_MSG_SIZE, stream).await?; let _ = send.send((id, ClientEvent::Hello(hello))).await; - let mut cmds = streams - .map(|stream| async { - codec::recv_whole::(MAX_CLIENT_MSG_SIZE, stream?).await - }) - .buffer_unordered(16); // Allow a modest amount of out-of-order completion - while let Some(msg) = cmds.try_next().await? { - let _ = send.send((id, ClientEvent::Command(msg))).await; + loop { + let stream = connection.accept_uni().await.map_err(Error::msg)?; + let send = send.clone(); + + // We spawn a separate task to allow messages to be processed in a different order from when they were + // initiated. + let connection = connection.clone(); + tokio::spawn(async move { + match codec::recv_whole::(MAX_CLIENT_MSG_SIZE, stream).await { + Err(e) => { + // This error can occur if the client sends a badly-formatted command. In this case, + // we want to drop the client. We close the connection, which will cause `drive_recv` to + // return eventually. + tracing::error!("Error when parsing unordered stream from client: {e}"); + connection.close(2u32.into(), b"could not process stream"); + } + Ok(msg) => { + let _ = send.send((id, ClientEvent::Command(msg))).await; + } + } + }); } - Ok(()) } async fn drive_send(