From 937145e5d75d4d5c01f7dd6b4847cd91259f9485 Mon Sep 17 00:00:00 2001 From: Sam Gwilym Date: Thu, 24 Oct 2024 17:13:47 +0200 Subject: [PATCH 1/5] Add ReadyTransport and friends --- Cargo.lock | 2 + wgps/Cargo.toml | 4 + wgps/src/lib.rs | 3 + wgps/src/ready_transport.rs | 173 ++++++++++++++++++++++++++++++++++++ 4 files changed, 182 insertions(+) create mode 100644 wgps/src/ready_transport.rs diff --git a/Cargo.lock b/Cargo.lock index a13a7a8..004cac3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -531,6 +531,8 @@ checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" name = "wgps" version = "0.1.0" dependencies = [ + "either", + "smol", "ufotofu", "willow-data-model", ] diff --git a/wgps/Cargo.toml b/wgps/Cargo.toml index 030740b..068365c 100644 --- a/wgps/Cargo.toml +++ b/wgps/Cargo.toml @@ -6,6 +6,10 @@ edition = "2021" [dependencies] willow-data-model = { path = "../data-model", version = "0.1.0" } ufotofu = { version = "0.4.2", features = ["std"] } +either = "1.10.0" + +[dev-dependencies] +smol = "2.0.0" [lints] workspace = true diff --git a/wgps/src/lib.rs b/wgps/src/lib.rs index 949845f..c66bbff 100644 --- a/wgps/src/lib.rs +++ b/wgps/src/lib.rs @@ -7,6 +7,9 @@ use willow_data_model::{ ResumptionFailedError, Store, StoreEvent, SubspaceId, }; +mod ready_transport; +pub use ready_transport::*; + /// Options to specify how ranges should be partitioned. #[derive(Debug, Clone, Copy)] pub struct PartitionOpts { diff --git a/wgps/src/ready_transport.rs b/wgps/src/ready_transport.rs new file mode 100644 index 0000000..1a0a024 --- /dev/null +++ b/wgps/src/ready_transport.rs @@ -0,0 +1,173 @@ +use either::Either; +use ufotofu::local_nb::Producer; + +/** When things go wrong while trying to make a WGPS transport ready. */ +#[derive(Debug)] +pub enum ReadyTransportError { + /** The transport returned an error of its own. */ + Transport(E), + /** The received max payload power was invalid, i.e. greater than 64. */ + MaxPayloadInvalid, + /** The transport stopped producing bytes before it could be deemed ready. */ + FinishedTooSoon, +} + +impl core::fmt::Display for ReadyTransportError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ReadyTransportError::Transport(e) => write!(f, "{}", e), + ReadyTransportError::MaxPayloadInvalid => write!( + f, + "The received max payload power was invalid, i.e. greater than 64." + ), + ReadyTransportError::FinishedTooSoon => write!( + f, + "The transport stopped producing bytes before it could be deemed ready." + ), + } + } +} + +/** The result of intercepting the first few bytes of a WGPS transport. */ +#[derive(Debug)] +pub struct ReadyTransport< + const CHALLENGE_HASH_LENGTH: usize, + E: core::fmt::Display, + P: Producer, +> { + /** The maximum payload size which may be sent without being explicitly requested.*/ + pub maximum_payload_size: usize, + /** The challenge hash of a nonce */ + pub received_commitment: [u8; CHALLENGE_HASH_LENGTH], + /** A 'ready' transport set to immediately produce encoded WGPS messages. */ + pub transport: P, +} + +/** Given a producer of bytes which is to immediately produce the bytes corresponding to the WGPS' [maximum payload size](https://willowprotocol.org/specs/sync/index.html#peer_max_payload_size) and [received commitment](https://willowprotocol.org/specs/sync/index.html#received_commitment), returns the computed maximum payload size, received commitment, and a 'ready' transport set to produce encoded WGPS messages. +*/ +pub async fn ready_transport< + const CHALLENGE_HASH_LENGTH: usize, + E: core::fmt::Display, + P: Producer, +>( + mut transport: P, +) -> Result, ReadyTransportError> { + let maximum_payload_power = match transport.produce().await { + Ok(either) => match either { + Either::Left(byte) => Ok(byte), + Either::Right(_) => Err(ReadyTransportError::FinishedTooSoon), + }, + Err(transport_err) => Err(ReadyTransportError::Transport(transport_err)), + }?; + + if maximum_payload_power > 64 { + return Err(ReadyTransportError::MaxPayloadInvalid); + } + + let maximum_payload_size = 2_usize.pow(maximum_payload_power as u32); + + let mut received_commitment = [0_u8; CHALLENGE_HASH_LENGTH]; + + for commitment_byte in received_commitment.iter_mut() { + match transport.produce().await { + Ok(either) => match either { + Either::Left(byte) => { + *commitment_byte = byte; + } + Either::Right(_) => return Err(ReadyTransportError::FinishedTooSoon), + }, + Err(transport_err) => return Err(ReadyTransportError::Transport(transport_err)), + }; + } + + Ok(ReadyTransport { + maximum_payload_size, + received_commitment, + transport, + }) +} + +#[cfg(test)] +mod tests { + + use super::*; + + use ufotofu::local_nb::producer::FromSlice; + + #[test] + fn empty_producer() { + let empty_transport = FromSlice::new(&[]); + + smol::block_on(async { + let result = ready_transport::<4, _, _>(empty_transport).await; + + assert!(matches!(result, Err(ReadyTransportError::FinishedTooSoon))) + }); + } + + #[test] + fn only_power_producer() { + let only_power_transport = FromSlice::new(&[0_u8]); + + smol::block_on(async { + let result = ready_transport::<4, _, _>(only_power_transport).await; + + assert!(matches!(result, Err(ReadyTransportError::FinishedTooSoon))) + }); + } + + #[test] + fn invalid_power_producer() { + let only_power_transport = FromSlice::new(&[65_u8]); + + smol::block_on(async { + let result = ready_transport::<4, _, _>(only_power_transport).await; + + assert!(matches!( + result, + Err(ReadyTransportError::MaxPayloadInvalid) + )) + }); + } + + #[test] + fn invalid_power_producer_correct_length() { + let only_power_transport = FromSlice::new(&[65_u8, 0, 0, 0, 0]); + + smol::block_on(async { + let result = ready_transport::<4, _, _>(only_power_transport).await; + + assert!(matches!( + result, + Err(ReadyTransportError::MaxPayloadInvalid) + )) + }); + } + + #[test] + fn commitment_too_short() { + let only_power_transport = FromSlice::new(&[0_u8, 0]); + + smol::block_on(async { + let result = ready_transport::<4, _, _>(only_power_transport).await; + + assert!(matches!(result, Err(ReadyTransportError::FinishedTooSoon))) + }); + } + + #[test] + fn success() { + let only_power_transport = FromSlice::new(&[1_u8, 1, 2, 3, 4, 5]); + + smol::block_on(async { + let result = ready_transport::<4, _, _>(only_power_transport).await; + + if let Ok(ready) = result { + assert!(ready.maximum_payload_size == 2); + assert!(ready.received_commitment == [1, 2, 3, 4]); + } else { + panic!() + } + }); + } +} From 3e26f9e0b58519df008736de174a7f32a689fa7a Mon Sep 17 00:00:00 2001 From: Sam Gwilym Date: Tue, 5 Nov 2024 10:57:04 +0100 Subject: [PATCH 2/5] Make ready transport pub(crate) --- wgps/src/ready_transport.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/wgps/src/ready_transport.rs b/wgps/src/ready_transport.rs index 1a0a024..46c795a 100644 --- a/wgps/src/ready_transport.rs +++ b/wgps/src/ready_transport.rs @@ -30,14 +30,14 @@ impl core::fmt::Display for ReadyTransportError { /** The result of intercepting the first few bytes of a WGPS transport. */ #[derive(Debug)] -pub struct ReadyTransport< +pub(crate) struct ReadyTransport< const CHALLENGE_HASH_LENGTH: usize, E: core::fmt::Display, P: Producer, > { /** The maximum payload size which may be sent without being explicitly requested.*/ pub maximum_payload_size: usize, - /** The challenge hash of a nonce */ + /** The challenge hash of a nonce. */ pub received_commitment: [u8; CHALLENGE_HASH_LENGTH], /** A 'ready' transport set to immediately produce encoded WGPS messages. */ pub transport: P, @@ -45,7 +45,7 @@ pub struct ReadyTransport< /** Given a producer of bytes which is to immediately produce the bytes corresponding to the WGPS' [maximum payload size](https://willowprotocol.org/specs/sync/index.html#peer_max_payload_size) and [received commitment](https://willowprotocol.org/specs/sync/index.html#received_commitment), returns the computed maximum payload size, received commitment, and a 'ready' transport set to produce encoded WGPS messages. */ -pub async fn ready_transport< +pub(crate) async fn ready_transport< const CHALLENGE_HASH_LENGTH: usize, E: core::fmt::Display, P: Producer, From 812010ada38915275be7c8cc8dcfe2a204ff74d5 Mon Sep 17 00:00:00 2001 From: Sam Gwilym Date: Tue, 5 Nov 2024 11:04:22 +0100 Subject: [PATCH 3/5] Use bulk_overwrite_full_slice instead of custom logic --- wgps/src/ready_transport.rs | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/wgps/src/ready_transport.rs b/wgps/src/ready_transport.rs index 46c795a..34c770b 100644 --- a/wgps/src/ready_transport.rs +++ b/wgps/src/ready_transport.rs @@ -1,5 +1,5 @@ use either::Either; -use ufotofu::local_nb::Producer; +use ufotofu::local_nb::BulkProducer; /** When things go wrong while trying to make a WGPS transport ready. */ #[derive(Debug)] @@ -33,7 +33,7 @@ impl core::fmt::Display for ReadyTransportError { pub(crate) struct ReadyTransport< const CHALLENGE_HASH_LENGTH: usize, E: core::fmt::Display, - P: Producer, + P: BulkProducer, > { /** The maximum payload size which may be sent without being explicitly requested.*/ pub maximum_payload_size: usize, @@ -48,7 +48,7 @@ pub(crate) struct ReadyTransport< pub(crate) async fn ready_transport< const CHALLENGE_HASH_LENGTH: usize, E: core::fmt::Display, - P: Producer, + P: BulkProducer, >( mut transport: P, ) -> Result, ReadyTransportError> { @@ -68,17 +68,15 @@ pub(crate) async fn ready_transport< let mut received_commitment = [0_u8; CHALLENGE_HASH_LENGTH]; - for commitment_byte in received_commitment.iter_mut() { - match transport.produce().await { - Ok(either) => match either { - Either::Left(byte) => { - *commitment_byte = byte; - } - Either::Right(_) => return Err(ReadyTransportError::FinishedTooSoon), - }, - Err(transport_err) => return Err(ReadyTransportError::Transport(transport_err)), - }; - } + if let Err(e) = transport + .bulk_overwrite_full_slice(&mut received_commitment) + .await + { + match e.reason { + Either::Left(_) => return Err(ReadyTransportError::FinishedTooSoon), + Either::Right(e) => return Err(ReadyTransportError::Transport(e)), + } + }; Ok(ReadyTransport { maximum_payload_size, From 5feebc02ef21742a305ec762fb42cf7f0d86b42a Mon Sep 17 00:00:00 2001 From: Sam Gwilym Date: Tue, 5 Nov 2024 11:14:00 +0100 Subject: [PATCH 4/5] Make getting of maximum payload power more readable --- wgps/src/ready_transport.rs | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/wgps/src/ready_transport.rs b/wgps/src/ready_transport.rs index 34c770b..41d6060 100644 --- a/wgps/src/ready_transport.rs +++ b/wgps/src/ready_transport.rs @@ -52,13 +52,10 @@ pub(crate) async fn ready_transport< >( mut transport: P, ) -> Result, ReadyTransportError> { - let maximum_payload_power = match transport.produce().await { - Ok(either) => match either { - Either::Left(byte) => Ok(byte), - Either::Right(_) => Err(ReadyTransportError::FinishedTooSoon), - }, - Err(transport_err) => Err(ReadyTransportError::Transport(transport_err)), - }?; + let maximum_payload_power = match transport.produce().await? { + Either::Left(byte) => byte, + Either::Right(_) => return Err(ReadyTransportError::FinishedTooSoon), + }; if maximum_payload_power > 64 { return Err(ReadyTransportError::MaxPayloadInvalid); @@ -85,6 +82,12 @@ pub(crate) async fn ready_transport< }) } +impl From for ReadyTransportError { + fn from(value: E) -> Self { + ReadyTransportError::Transport(value) + } +} + #[cfg(test)] mod tests { From 410639ecee94ac029bd13dc8be2498d20f9cdb3e Mon Sep 17 00:00:00 2001 From: Sam Gwilym Date: Tue, 5 Nov 2024 11:21:16 +0100 Subject: [PATCH 5/5] Allow dead code for now. --- wgps/src/ready_transport.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/wgps/src/ready_transport.rs b/wgps/src/ready_transport.rs index 41d6060..1170092 100644 --- a/wgps/src/ready_transport.rs +++ b/wgps/src/ready_transport.rs @@ -30,6 +30,7 @@ impl core::fmt::Display for ReadyTransportError { /** The result of intercepting the first few bytes of a WGPS transport. */ #[derive(Debug)] +#[allow(dead_code)] // TODO: Remove when this is used. pub(crate) struct ReadyTransport< const CHALLENGE_HASH_LENGTH: usize, E: core::fmt::Display, @@ -43,8 +44,21 @@ pub(crate) struct ReadyTransport< pub transport: P, } +impl< + const CHALLENGE_HASH_LENGTH: usize, + E: core::fmt::Display, + P: BulkProducer, + > ReadyTransport +{ + #[allow(dead_code)] // TODO: Remove when this is used. + pub(crate) fn transport(&self) -> &P { + &self.transport + } +} + /** Given a producer of bytes which is to immediately produce the bytes corresponding to the WGPS' [maximum payload size](https://willowprotocol.org/specs/sync/index.html#peer_max_payload_size) and [received commitment](https://willowprotocol.org/specs/sync/index.html#received_commitment), returns the computed maximum payload size, received commitment, and a 'ready' transport set to produce encoded WGPS messages. */ +#[allow(dead_code)] // TODO: Remove when this is used. pub(crate) async fn ready_transport< const CHALLENGE_HASH_LENGTH: usize, E: core::fmt::Display,