Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update quinn and rustls #320

Merged
merged 1 commit into from
Oct 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ members = ["client", "server", "common", "save", "save/gen-protos"]
[workspace.dependencies]
hecs = "0.10.0"
nalgebra = { version = "0.32.1", features = ["libm-force"] }
quinn = "0.8.3"
quinn = "0.10.2"
toml = { version = "0.8.0", default-features = false, features = ["parse"] }

[profile.dev]
Expand Down
2 changes: 1 addition & 1 deletion client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ fxhash = "0.2.1"
downcast-rs = "1.1.1"
quinn = { workspace = true }
futures-util = "0.3.1"
rustls = { version = "0.20.6", features = ["dangerous_configuration"] }
rustls = { version = "0.21.7", features = ["dangerous_configuration"] }
webpki = "0.22.0"
hecs = { workspace = true }
rcgen = { version = "0.11.0", default-features = false }
Expand Down
48 changes: 23 additions & 25 deletions client/src/net.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::{sync::Arc, thread};

use anyhow::{anyhow, Error, Result};
use futures_util::{StreamExt, TryStreamExt};
use tokio::sync::mpsc;

use common::{codec, proto};
Expand Down Expand Up @@ -63,16 +62,12 @@ async fn inner(
endpoint: quinn::Endpoint,
) -> Result<()> {
let server = cfg.server.unwrap();
let quinn::NewConnection {
connection,
mut uni_streams,
..
} = endpoint.connect(server, "localhost").unwrap().await?;
let connection = endpoint.connect(server, "localhost").unwrap().await?;

// Open the first stream for our hello message
let clienthello_stream = connection.open_uni().await?;
// Start sending commands asynchronously
tokio::spawn(handle_outgoing(outgoing, connection));
tokio::spawn(handle_outgoing(outgoing, connection.clone()));
// Actually send the hello message
codec::send_whole(
clienthello_stream,
Expand All @@ -82,9 +77,9 @@ async fn inner(
)
.await?;

let mut ordered = uni_streams.next().await.unwrap()?;
let mut ordered = connection.accept_uni().await?;
// Handle unordered messages
tokio::spawn(handle_unordered(incoming.clone(), uni_streams));
tokio::spawn(handle_unordered(incoming.clone(), connection));

// Receive the server's hello message
let hello = codec::recv::<proto::ServerHello>(&mut ordered)
Expand Down Expand Up @@ -116,23 +111,26 @@ async fn handle_outgoing(
}

/// Receive unordered messages from the server
async fn handle_unordered(
incoming: mpsc::UnboundedSender<Message>,
uni_streams: quinn::IncomingUniStreams,
) -> Result<()> {
let mut msgs = uni_streams
.map(|stream| async {
let stream = stream?;
codec::recv_whole::<proto::StateDelta>(2usize.pow(16), stream).await
})
.buffer_unordered(128);
// TODO: Don't silently die on parse errors
while let Some(msg) = msgs.try_next().await? {
// Ignore errors so we don't panic if the simulation thread goes away between checking
// `msgs` and here.
let _ = incoming.send(Message::StateDelta(msg));
async fn handle_unordered(incoming: mpsc::UnboundedSender<Message>, connection: quinn::Connection) {
loop {
let Ok(stream) = connection.accept_uni().await else {
// accept_uni should only fail if the connection is closed, which is already handled elsewhere.
return;
};
let incoming = incoming.clone();
let connection = connection.clone();
tokio::spawn(async move {
match codec::recv_whole::<proto::StateDelta>(2usize.pow(16), stream).await {
Err(e) => {
tracing::error!("Error when parsing unordered stream from server: {e}");
connection.close(1u32.into(), b"could not process stream");
}
Ok(msg) => {
let _ = incoming.send(Message::StateDelta(msg));
}
}
});
}
Ok(())
}

struct AcceptAnyCert;
Expand Down
2 changes: 1 addition & 1 deletion common/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub async fn send_whole<T: Serialize + ?Sized>(
/// Receive the entirety of `stream` as a `T`
pub async fn recv_whole<T: DeserializeOwned>(
size_limit: usize,
stream: quinn::RecvStream,
mut stream: quinn::RecvStream,
) -> Result<T> {
let buf = stream.read_to_end(size_limit).await?;
Ok(bincode::deserialize(&buf)?)
Expand Down
2 changes: 1 addition & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@ fxhash = "0.2.1"
nalgebra = { workspace = true }
libm = "0.2.6"
slotmap = "1.0.6"
rustls = "0.20.6"
rustls = "0.21.7"
rustls-pemfile = "1.0.0"
save = { path = "../save" }
98 changes: 62 additions & 36 deletions server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ mod sim;
use std::{net::UdpSocket, sync::Arc, time::Instant};

use anyhow::{Context, Error, Result};
use futures::{select, StreamExt, TryStreamExt};
use futures::{select, StreamExt};
use hecs::Entity;
use slotmap::DenseSlotMap;
use tokio::sync::mpsc;
Expand All @@ -32,15 +32,16 @@ pub async fn run(net: NetParams, mut sim: SimConfig, save: Save) -> Result<()> {
let server_config =
quinn::ServerConfig::with_single_cert(net.certificate_chain, net.private_key)
.context("parsing certificate")?;
let (endpoint, incoming) = quinn::Endpoint::new(
let endpoint = quinn::Endpoint::new(
quinn::EndpointConfig::default(),
Some(server_config),
net.socket,
quinn::default_runtime().unwrap(),
)?;
info!(address = %endpoint.local_addr().unwrap(), "listening");

let server = Server::new(sim, save);
server.run(incoming).await;
server.run(endpoint).await;
Ok(())
}

Expand All @@ -62,11 +63,9 @@ impl Server {
}
}

async fn run(mut self, incoming: quinn::Incoming) {
async fn run(mut self, endpoint: quinn::Endpoint) {
let mut ticks = IntervalStream::new(tokio::time::interval(self.cfg.step_interval)).fuse();
let mut incoming = incoming
.inspect(|x| trace!(address = %x.remote_address(), "connection incoming"))
.buffer_unordered(16);
let mut incoming = ReceiverStream::new(self.handle_incoming(endpoint)).fuse();
let (client_events_send, client_events) = mpsc::channel(128);
let mut client_events = ReceiverStream::new(client_events).fuse();
loop {
Expand All @@ -78,6 +77,27 @@ impl Server {
}
}

fn handle_incoming(&self, endpoint: quinn::Endpoint) -> mpsc::Receiver<quinn::Connection> {
let (incoming_send, incoming_recv) = mpsc::channel(16);
tokio::spawn(async move {
while let Some(conn) = endpoint.accept().await {
trace!(address = %conn.remote_address(), "connection incoming");
let incoming_send = incoming_send.clone();
tokio::spawn(async move {
match conn.await {
Err(e) => {
error!("incoming connection failed: {}", e.to_string());
}
Ok(connection) => {
let _ = incoming_send.send(connection).await;
}
}
});
}
});
incoming_recv
}

fn on_step(&mut self) {
let now = Instant::now();
// Apply queued inputs
Expand Down Expand Up @@ -135,7 +155,10 @@ impl Server {
fn on_client_event(&mut self, client_id: ClientId, event: ClientEvent) {
let span = error_span!("client", id = ?client_id.0);
let _guard = span.enter();
let client = &mut self.clients[client_id];
let Some(client) = self.clients.get_mut(client_id) else {
// Skip messages from cleaned-up clients
return;
};
match event {
ClientEvent::Hello(hello) => {
assert!(client.handles.is_none());
Expand Down Expand Up @@ -186,25 +209,19 @@ impl Server {

fn on_connect(
&mut self,
conn: Result<quinn::NewConnection, quinn::ConnectionError>,
connection: quinn::Connection,
mut send: mpsc::Sender<(ClientId, ClientEvent)>,
) {
let quinn::NewConnection {
connection,
uni_streams,
..
} = match conn {
Ok(x) => x,
Err(e) => {
error!("incoming connection failed: {}", e);
return;
}
};
let id = self.clients.insert(Client::new(connection.clone()));
info!(id = ?id.0, address = %connection.remote_address(), "connection established");
tokio::spawn(async move {
if let Err(e) = drive_recv(id, uni_streams, &mut send).await {
if let Err(e) = drive_recv(id, connection, &mut send).await {
// drive_recv returns an error when any connection-terminating issue occurs, so we
// send a `Lost` message to ensure the client is cleaned up. Note that this message may
// be redundant, as dropping a slow client also sends a `Lost` message.
let _ = send.send((id, ClientEvent::Lost(e))).await;
} else {
unreachable!("Graceful disconnects are not implemented.")
}
});
}
Expand All @@ -214,26 +231,35 @@ const MAX_CLIENT_MSG_SIZE: usize = 1 << 16;

async fn drive_recv(
id: ClientId,
mut streams: quinn::IncomingUniStreams,
connection: quinn::Connection,
send: &mut mpsc::Sender<(ClientId, ClientEvent)>,
) -> Result<()> {
let hello = match streams.next().await {
None => return Ok(()),
Some(stream) => {
codec::recv_whole::<proto::ClientHello>(MAX_CLIENT_MSG_SIZE, stream?).await?
}
};
let stream = connection.accept_uni().await.map_err(Error::msg)?;
let hello = codec::recv_whole::<proto::ClientHello>(MAX_CLIENT_MSG_SIZE, stream).await?;
let _ = send.send((id, ClientEvent::Hello(hello))).await;

let mut cmds = streams
.map(|stream| async {
codec::recv_whole::<proto::Command>(MAX_CLIENT_MSG_SIZE, stream?).await
})
.buffer_unordered(16); // Allow a modest amount of out-of-order completion
while let Some(msg) = cmds.try_next().await? {
let _ = send.send((id, ClientEvent::Command(msg))).await;
loop {
let stream = connection.accept_uni().await.map_err(Error::msg)?;
let send = send.clone();

// We spawn a separate task to allow messages to be processed in a different order from when they were
// initiated.
let connection = connection.clone();
tokio::spawn(async move {
match codec::recv_whole::<proto::Command>(MAX_CLIENT_MSG_SIZE, stream).await {
Err(e) => {
// This error can occur if the client sends a badly-formatted command. In this case,
// we want to drop the client. We close the connection, which will cause `drive_recv` to
// return eventually.
tracing::error!("Error when parsing unordered stream from client: {e}");
connection.close(2u32.into(), b"could not process stream");
}
Ok(msg) => {
let _ = send.send((id, ClientEvent::Command(msg))).await;
}
}
});
}
Ok(())
}

async fn drive_send(
Expand Down
Loading