Skip to content

Commit

Permalink
Priorities outgoing pings over normal traffic.
Browse files Browse the repository at this point in the history
(It doesn't help much though)
  • Loading branch information
vi committed Aug 16, 2024
1 parent 1ce372b commit 214de61
Showing 1 changed file with 33 additions and 5 deletions.
38 changes: 33 additions & 5 deletions src/ws_peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type WsSource<T> = futures::stream::SplitStream<
pub struct WsSinkWithOneBufferedMessage<T> {
sink: futures::stream::SplitSink<tokio_codec::Framed<T, websocket::r#async::MessageCodec<websocket::OwnedMessage>>>,
pong_debt: Option<websocket_base::OwnedMessage>,
ping_debt: Option<websocket_base::OwnedMessage>,
}

#[derive(Copy,Clone,PartialEq, Eq)]
Expand Down Expand Up @@ -378,7 +379,20 @@ impl<T: WsStream + 'static> Write for WsWriteWrapper<T> {
return wouldblock();
},
futures::AsyncSink::Ready => {
debug!("Finished sending Pong reply message");
debug!("Finished sending cached Pong reply message");
}
}
}
if self.sink.borrow().ping_debt.is_some() {
let mut sink = self.sink.borrow_mut();
let debt = sink.ping_debt.take().unwrap();
match sink.sink.start_send(debt).map_err(io_other_error)? {
futures::AsyncSink::NotReady(debt) => {
sink.ping_debt = Some(debt);
return wouldblock();
},
futures::AsyncSink::Ready => {
debug!("Finished sending cached Ping message");
}
}
}
Expand Down Expand Up @@ -575,10 +589,21 @@ impl<T: WsStream + 'static> ::futures::Future for WsPinger<T> {
ts[0..8].copy_from_slice(&ts1.to_be_bytes());
ts[8..12].copy_from_slice(&ts2.to_be_bytes());
let om = OwnedMessage::Ping(ts.to_vec());
match self.si.borrow_mut().sink.start_send(om) {
let mut sink = self.si.borrow_mut();
match sink.sink.start_send(om) {
Err(e) => info!("wsping: {}", e),
Ok(AsyncSink::NotReady(_om)) => {
return Ok(Async::NotReady);
Ok(AsyncSink::NotReady(om)) => {
if sink.ping_debt.is_some() {
warn!(
"dropped a ping request to websocket due to channel contention"
);
} else {
debug!("WebSocket write contenction: buffering ping instead of sending immediately");
sink.ping_debt = Some(om);
}

self.st = WaitingForTimer;
continue;
}
Ok(AsyncSink::Ready) => {
self.st = PollComplete;
Expand All @@ -589,7 +614,9 @@ impl<T: WsStream + 'static> ::futures::Future for WsPinger<T> {
PollComplete => match self.si.borrow_mut().sink.poll_complete() {
Err(e) => info!("wsping: {}", e),
Ok(Async::NotReady) => {
return Ok(Async::NotReady);
self.st = WaitingForTimer;
continue;
//return Ok(Async::NotReady);
}
Ok(Async::Ready(())) => {
self.st = WaitingForTimer;
Expand All @@ -612,6 +639,7 @@ pub fn finish_building_ws_peer<S>(opts: &super::Options, duplex: Duplex<S>, clos
let wrappedsink = WsSinkWithOneBufferedMessage {
sink,
pong_debt: None,
ping_debt: None,
};
let mpsink = Rc::new(RefCell::new(wrappedsink));

Expand Down

0 comments on commit 214de61

Please sign in to comment.