Skip to content

Commit

Permalink
Update quinn and rustls
Browse files Browse the repository at this point in the history
rustls: 0.20.6 -> 0.21.7
quinn: 0.8.3 -> 0.10.2
  • Loading branch information
patowen committed Oct 29, 2023
1 parent 664f9d2 commit bbae2b9
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 65 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ members = ["client", "server", "common", "save", "save/gen-protos"]
[workspace.dependencies]
hecs = "0.10.0"
nalgebra = { version = "0.32.1", features = ["libm-force"] }
quinn = "0.8.3"
quinn = "0.10.2"
toml = { version = "0.8.0", default-features = false, features = ["parse"] }

[profile.dev]
Expand Down
2 changes: 1 addition & 1 deletion client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ fxhash = "0.2.1"
downcast-rs = "1.1.1"
quinn = { workspace = true }
futures-util = "0.3.1"
rustls = { version = "0.20.6", features = ["dangerous_configuration"] }
rustls = { version = "0.21.7", features = ["dangerous_configuration"] }
webpki = "0.22.0"
hecs = { workspace = true }
rcgen = { version = "0.11.0", default-features = false }
Expand Down
48 changes: 23 additions & 25 deletions client/src/net.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::{sync::Arc, thread};

use anyhow::{anyhow, Error, Result};
use futures_util::{StreamExt, TryStreamExt};
use tokio::sync::mpsc;

use common::{codec, proto};
Expand Down Expand Up @@ -63,16 +62,12 @@ async fn inner(
endpoint: quinn::Endpoint,
) -> Result<()> {
let server = cfg.server.unwrap();
let quinn::NewConnection {
connection,
mut uni_streams,
..
} = endpoint.connect(server, "localhost").unwrap().await?;
let connection = endpoint.connect(server, "localhost").unwrap().await?;

// Open the first stream for our hello message
let clienthello_stream = connection.open_uni().await?;
// Start sending commands asynchronously
tokio::spawn(handle_outgoing(outgoing, connection));
tokio::spawn(handle_outgoing(outgoing, connection.clone()));
// Actually send the hello message
codec::send_whole(
clienthello_stream,
Expand All @@ -82,9 +77,9 @@ async fn inner(
)
.await?;

let mut ordered = uni_streams.next().await.unwrap()?;
let mut ordered = connection.accept_uni().await?;
// Handle unordered messages
tokio::spawn(handle_unordered(incoming.clone(), uni_streams));
tokio::spawn(handle_unordered(incoming.clone(), connection));

// Receive the server's hello message
let hello = codec::recv::<proto::ServerHello>(&mut ordered)
Expand Down Expand Up @@ -116,23 +111,26 @@ async fn handle_outgoing(
}

/// Receive unordered messages from the server
async fn handle_unordered(
incoming: mpsc::UnboundedSender<Message>,
uni_streams: quinn::IncomingUniStreams,
) -> Result<()> {
let mut msgs = uni_streams
.map(|stream| async {
let stream = stream?;
codec::recv_whole::<proto::StateDelta>(2usize.pow(16), stream).await
})
.buffer_unordered(128);
// TODO: Don't silently die on parse errors
while let Some(msg) = msgs.try_next().await? {
// Ignore errors so we don't panic if the simulation thread goes away between checking
// `msgs` and here.
let _ = incoming.send(Message::StateDelta(msg));
async fn handle_unordered(incoming: mpsc::UnboundedSender<Message>, connection: quinn::Connection) {
loop {
let Ok(stream) = connection.accept_uni().await else {
// accept_uni should only fail if the connection is closed, which is already handled elsewhere.
return;
};
let incoming = incoming.clone();
let connection = connection.clone();
tokio::spawn(async move {
match codec::recv_whole::<proto::StateDelta>(2usize.pow(16), stream).await {
Err(e) => {
tracing::error!("Error when parsing unordered stream from server: {e}");
connection.close(1u32.into(), b"could not process stream");
}
Ok(msg) => {
let _ = incoming.send(Message::StateDelta(msg));
}
}
});
}
Ok(())
}

struct AcceptAnyCert;
Expand Down
2 changes: 1 addition & 1 deletion common/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub async fn send_whole<T: Serialize + ?Sized>(
/// Receive the entirety of `stream` as a `T`
pub async fn recv_whole<T: DeserializeOwned>(
size_limit: usize,
stream: quinn::RecvStream,
mut stream: quinn::RecvStream,
) -> Result<T> {
let buf = stream.read_to_end(size_limit).await?;
Ok(bincode::deserialize(&buf)?)
Expand Down
2 changes: 1 addition & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@ fxhash = "0.2.1"
nalgebra = { workspace = true }
libm = "0.2.6"
slotmap = "1.0.6"
rustls = "0.20.6"
rustls = "0.21.7"
rustls-pemfile = "1.0.0"
save = { path = "../save" }
98 changes: 62 additions & 36 deletions server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ mod sim;
use std::{net::UdpSocket, sync::Arc, time::Instant};

use anyhow::{Context, Error, Result};
use futures::{select, StreamExt, TryStreamExt};
use futures::{select, StreamExt};
use hecs::Entity;
use slotmap::DenseSlotMap;
use tokio::sync::mpsc;
Expand All @@ -32,15 +32,16 @@ pub async fn run(net: NetParams, mut sim: SimConfig, save: Save) -> Result<()> {
let server_config =
quinn::ServerConfig::with_single_cert(net.certificate_chain, net.private_key)
.context("parsing certificate")?;
let (endpoint, incoming) = quinn::Endpoint::new(
let endpoint = quinn::Endpoint::new(
quinn::EndpointConfig::default(),
Some(server_config),
net.socket,
quinn::default_runtime().unwrap(),
)?;
info!(address = %endpoint.local_addr().unwrap(), "listening");

let server = Server::new(sim, save);
server.run(incoming).await;
server.run(endpoint).await;
Ok(())
}

Expand All @@ -62,11 +63,9 @@ impl Server {
}
}

async fn run(mut self, incoming: quinn::Incoming) {
async fn run(mut self, endpoint: quinn::Endpoint) {
let mut ticks = IntervalStream::new(tokio::time::interval(self.cfg.step_interval)).fuse();
let mut incoming = incoming
.inspect(|x| trace!(address = %x.remote_address(), "connection incoming"))
.buffer_unordered(16);
let mut incoming = ReceiverStream::new(self.handle_incoming(endpoint)).fuse();
let (client_events_send, client_events) = mpsc::channel(128);
let mut client_events = ReceiverStream::new(client_events).fuse();
loop {
Expand All @@ -78,6 +77,27 @@ impl Server {
}
}

fn handle_incoming(&self, endpoint: quinn::Endpoint) -> mpsc::Receiver<quinn::Connection> {
let (incoming_send, incoming_recv) = mpsc::channel(16);
tokio::spawn(async move {
while let Some(conn) = endpoint.accept().await {
trace!(address = %conn.remote_address(), "connection incoming");
let incoming_send = incoming_send.clone();
tokio::spawn(async move {
match conn.await {
Err(e) => {
error!("incoming connection failed: {}", e.to_string());
}
Ok(connection) => {
let _ = incoming_send.send(connection).await;
}
}
});
}
});
incoming_recv
}

fn on_step(&mut self) {
let now = Instant::now();
// Apply queued inputs
Expand Down Expand Up @@ -135,7 +155,10 @@ impl Server {
fn on_client_event(&mut self, client_id: ClientId, event: ClientEvent) {
let span = error_span!("client", id = ?client_id.0);
let _guard = span.enter();
let client = &mut self.clients[client_id];
let Some(client) = self.clients.get_mut(client_id) else {
// Skip messages from cleaned-up clients
return;
};
match event {
ClientEvent::Hello(hello) => {
assert!(client.handles.is_none());
Expand Down Expand Up @@ -186,25 +209,19 @@ impl Server {

fn on_connect(
&mut self,
conn: Result<quinn::NewConnection, quinn::ConnectionError>,
connection: quinn::Connection,
mut send: mpsc::Sender<(ClientId, ClientEvent)>,
) {
let quinn::NewConnection {
connection,
uni_streams,
..
} = match conn {
Ok(x) => x,
Err(e) => {
error!("incoming connection failed: {}", e);
return;
}
};
let id = self.clients.insert(Client::new(connection.clone()));
info!(id = ?id.0, address = %connection.remote_address(), "connection established");
tokio::spawn(async move {
if let Err(e) = drive_recv(id, uni_streams, &mut send).await {
if let Err(e) = drive_recv(id, connection, &mut send).await {
// drive_recv returns an error when any connection-terminating issue occurs, so we
// send a `Lost` message to ensure the client is cleaned up. Note that this message may
// be redundant, as dropping a slow client also sends a `Lost` message.
let _ = send.send((id, ClientEvent::Lost(e))).await;
} else {
unreachable!("Graceful disconnects are not implemented.")
}
});
}
Expand All @@ -214,26 +231,35 @@ const MAX_CLIENT_MSG_SIZE: usize = 1 << 16;

async fn drive_recv(
id: ClientId,
mut streams: quinn::IncomingUniStreams,
connection: quinn::Connection,
send: &mut mpsc::Sender<(ClientId, ClientEvent)>,
) -> Result<()> {
let hello = match streams.next().await {
None => return Ok(()),
Some(stream) => {
codec::recv_whole::<proto::ClientHello>(MAX_CLIENT_MSG_SIZE, stream?).await?
}
};
let stream = connection.accept_uni().await.map_err(Error::msg)?;
let hello = codec::recv_whole::<proto::ClientHello>(MAX_CLIENT_MSG_SIZE, stream).await?;
let _ = send.send((id, ClientEvent::Hello(hello))).await;

let mut cmds = streams
.map(|stream| async {
codec::recv_whole::<proto::Command>(MAX_CLIENT_MSG_SIZE, stream?).await
})
.buffer_unordered(16); // Allow a modest amount of out-of-order completion
while let Some(msg) = cmds.try_next().await? {
let _ = send.send((id, ClientEvent::Command(msg))).await;
loop {
let stream = connection.accept_uni().await.map_err(Error::msg)?;
let send = send.clone();

// We spawn a separate task to allow messages to be processed in a different order from when they were
// initiated.
let connection = connection.clone();
tokio::spawn(async move {
match codec::recv_whole::<proto::Command>(MAX_CLIENT_MSG_SIZE, stream).await {
Err(e) => {
// This error can occur if the client sends a badly-formatted command. In this case,
// we want to drop the client. We close the connection, which will cause `drive_recv` to
// return eventually.
tracing::error!("Error when parsing unordered stream from client: {e}");
connection.close(2u32.into(), b"could not process stream");
}
Ok(msg) => {
let _ = send.send((id, ClientEvent::Command(msg))).await;
}
}
});
}
Ok(())
}

async fn drive_send(
Expand Down

0 comments on commit bbae2b9

Please sign in to comment.