Skip to content

Commit

Permalink
Merge pull request #55 from paritytech/altonen-fix-memory-leaks
Browse files Browse the repository at this point in the history
Fix memory leaks
  • Loading branch information
altonen authored Mar 11, 2024
2 parents b7deb51 + ea483f9 commit cc043b6
Show file tree
Hide file tree
Showing 33 changed files with 3,793 additions and 161 deletions.
20 changes: 3 additions & 17 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,10 @@ url = "2.4.0"
webpki = "0.22.2"
x25519-dalek = "2.0.0"
x509-parser = "0.15.0"
yamux = "0.11.0"
yasna = "0.5.0"
zeroize = "1.5.7"
nohash-hasher = "0.2.0"
static_assertions = "1.1.0"

# Exposed dependencies. Breaking changes to these are breaking changes to us.
[dependencies.rustls]
Expand Down
6 changes: 3 additions & 3 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ pub enum Role {
Listener,
}

impl From<Role> for yamux::Mode {
impl From<Role> for crate::yamux::Mode {
fn from(value: Role) -> Self {
match value {
Role::Dialer => yamux::Mode::Client,
Role::Listener => yamux::Mode::Server,
Role::Dialer => crate::yamux::Mode::Client,
Role::Listener => crate::yamux::Mode::Server,
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub enum Error {
#[error("Transport not supported")]
TransportNotSupported(Multiaddr),
#[error("Yamux error for substream `{0:?}`: `{1}`")]
YamuxError(Direction, yamux::ConnectionError),
YamuxError(Direction, crate::yamux::ConnectionError),
#[error("Operation not supported: `{0}`")]
NotSupported(String),
#[error("Other error occurred: `{0}`")]
Expand Down Expand Up @@ -142,7 +142,7 @@ pub enum SubstreamError {
#[error("Connection closed")]
ConnectionClosed,
#[error("yamux error: `{0}`")]
YamuxError(yamux::ConnectionError),
YamuxError(crate::yamux::ConnectionError),
#[error("Failed to read from substream, substream id `{0:?}`")]
ReadFailure(Option<SubstreamId>),
#[error("Failed to write to substream, substream id `{0:?}`")]
Expand Down Expand Up @@ -245,7 +245,7 @@ mod tests {
tracing::trace!("{:?}", DialError::AlreadyConnected);
tracing::trace!(
"{:?}",
SubstreamError::YamuxError(yamux::ConnectionError::Closed)
SubstreamError::YamuxError(crate::yamux::ConnectionError::Closed)
);
tracing::trace!("{:?}", AddressError::PeerIdMissing);
tracing::trace!(
Expand Down
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub use error::Error;
pub use peer_id::PeerId;
pub use types::protocol::ProtocolName;

pub use yamux;
// pub use yamux;

pub(crate) mod peer_id;

Expand All @@ -61,6 +61,7 @@ pub mod protocol;
pub mod substream;
pub mod transport;
pub mod types;
pub mod yamux;

mod bandwidth;
mod mock;
Expand Down
41 changes: 35 additions & 6 deletions src/protocol/libp2p/identify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ use prost::Message;
use tokio::sync::mpsc::{channel, Sender};
use tokio_stream::wrappers::ReceiverStream;

use std::collections::{HashMap, HashSet};
use std::{
collections::{HashMap, HashSet},
time::Duration,
};

/// Log target for the file.
const LOG_TARGET: &str = "litep2p::ipfs::identify";
Expand Down Expand Up @@ -247,8 +250,26 @@ impl Identify {
identify.encode(&mut msg).expect("`msg` to have enough capacity");

self.pending_inbound.push(Box::pin(async move {
if let Err(error) = substream.send_framed(msg.into()).await {
tracing::debug!(target: LOG_TARGET, ?peer, ?error, "failed to send ipfs identify response");
match tokio::time::timeout(Duration::from_secs(10), substream.send_framed(msg.into()))
.await
{
Err(error) => {
tracing::debug!(
target: LOG_TARGET,
?peer,
?error,
"timed out while sending ipfs identify response",
);
}
Ok(Err(error)) => {
tracing::debug!(
target: LOG_TARGET,
?peer,
?error,
"failed to send ipfs identify response",
);
}
Ok(_) => {}
}
}))
}
Expand All @@ -270,9 +291,17 @@ impl Identify {
);

self.pending_outbound.push(Box::pin(async move {
let payload = substream.next().await.ok_or(Error::SubstreamError(
SubstreamError::ReadFailure(Some(substream_id)),
))??;
let payload =
match tokio::time::timeout(Duration::from_secs(10), substream.next()).await {
Err(_) => return Err(Error::Timeout),
Ok(None) =>
return Err(Error::SubstreamError(SubstreamError::ReadFailure(Some(
substream_id,
)))),
Ok(Some(Err(error))) => return Err(error),
Ok(Some(Ok(payload))) => payload,
};

let info = identify_schema::Identify::decode(payload.to_vec().as_slice())?;

tracing::trace!(target: LOG_TARGET, ?peer, ?info, "peer identified");
Expand Down
50 changes: 33 additions & 17 deletions src/protocol/libp2p/ping/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,23 @@ impl Ping {
tracing::trace!(target: LOG_TARGET, ?peer, "handle outbound substream");

self.pending_outbound.push(Box::pin(async move {
// TODO: generate random payload and verify it
let _ = substream.send_framed(vec![0u8; 32].into()).await?;
let now = Instant::now();
let _ = substream.next().await.ok_or(Error::SubstreamError(
SubstreamError::ReadFailure(Some(substream_id)),
))?;
let _ = substream.close().await;

Ok((peer, now.elapsed()))
let future = async move {
// TODO: generate random payload and verify it
let _ = substream.send_framed(vec![0u8; 32].into()).await?;
let now = Instant::now();
let _ = substream.next().await.ok_or(Error::SubstreamError(
SubstreamError::ReadFailure(Some(substream_id)),
))?;
let _ = substream.close().await;

Ok(now.elapsed())
};

match tokio::time::timeout(Duration::from_secs(10), future).await {
Err(_) => return Err(Error::Timeout),
Ok(Err(error)) => return Err(error),
Ok(Ok(elapsed)) => Ok((peer, elapsed)),
}
}));
}

Expand All @@ -141,14 +149,22 @@ impl Ping {
tracing::trace!(target: LOG_TARGET, ?peer, "handle inbound substream");

self.pending_inbound.push(Box::pin(async move {
let payload = substream
.next()
.await
.ok_or(Error::SubstreamError(SubstreamError::ReadFailure(None)))??;
substream.send_framed(payload.freeze()).await?;
let _ = substream.next().await.map(|_| ());

Ok(())
let future = async move {
let payload = substream
.next()
.await
.ok_or(Error::SubstreamError(SubstreamError::ReadFailure(None)))??;
substream.send_framed(payload.freeze()).await?;
let _ = substream.next().await.map(|_| ());

Ok(())
};

match tokio::time::timeout(Duration::from_secs(10), future).await {
Err(_) => return Err(Error::Timeout),
Ok(Err(error)) => return Err(error),
Ok(Ok(())) => Ok(()),
}
}));
}

Expand Down
Loading

0 comments on commit cc043b6

Please sign in to comment.