From 9244789d4eb130e18a57a2a6b9b21f1cf69d8e67 Mon Sep 17 00:00:00 2001 From: robin-near <111538878+robin-near@users.noreply.github.com> Date: Mon, 9 Sep 2024 12:27:36 -0700 Subject: [PATCH] [Epoch Sync] Compress the EpochSyncProof over the network. (#12058) Closes #11933 This compresses the EpochSyncProof similar to how we compressed ChunkStateWitness. In fact, the code is refactored out to a CompressedData trait so we don't duplicate this code. --- .../stateless_validation/chunk_validation.rs | 1 + .../partial_witness/encoding.rs | 1 + .../partial_witness/partial_witness_actor.rs | 1 + .../partial_witness_tracker.rs | 1 + chain/client/src/sync/epoch.rs | 31 +++- chain/network/src/client.rs | 4 +- chain/network/src/network_protocol/mod.rs | 12 +- chain/network/src/types.rs | 4 +- core/primitives/src/epoch_sync.rs | 36 ++++ .../src/stateless_validation/state_witness.rs | 162 +++--------------- core/primitives/src/utils.rs | 1 + core/primitives/src/utils/compression.rs | 131 ++++++++++++++ core/primitives/src/validator_signer.rs | 1 + .../res/protocol_schema.toml | 7 +- 14 files changed, 232 insertions(+), 161 deletions(-) create mode 100644 core/primitives/src/utils/compression.rs diff --git a/chain/chain/src/stateless_validation/chunk_validation.rs b/chain/chain/src/stateless_validation/chunk_validation.rs index 6a1ab9e75e0..8100d159e16 100644 --- a/chain/chain/src/stateless_validation/chunk_validation.rs +++ b/chain/chain/src/stateless_validation/chunk_validation.rs @@ -29,6 +29,7 @@ use near_primitives::stateless_validation::state_witness::{ use near_primitives::transaction::SignedTransaction; use near_primitives::types::chunk_extra::ChunkExtra; use near_primitives::types::{ProtocolVersion, ShardId}; +use near_primitives::utils::compression::CompressedData; use near_store::PartialStorage; use std::collections::HashMap; use std::num::NonZeroUsize; diff --git a/chain/client/src/stateless_validation/partial_witness/encoding.rs b/chain/client/src/stateless_validation/partial_witness/encoding.rs index 7a8ec115c25..4e23453e419 100644 --- a/chain/client/src/stateless_validation/partial_witness/encoding.rs +++ b/chain/client/src/stateless_validation/partial_witness/encoding.rs @@ -5,6 +5,7 @@ use near_primitives::reed_solomon::{ reed_solomon_decode, reed_solomon_encode, reed_solomon_part_length, }; use near_primitives::stateless_validation::state_witness::EncodedChunkStateWitness; +use near_primitives::utils::compression::CompressedData; use reed_solomon_erasure::galois_8::ReedSolomon; /// Ratio of the number of data parts to total parts in the Reed Solomon encoding. diff --git a/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs b/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs index b11e9db8047..cb5fbf75ee0 100644 --- a/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs +++ b/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs @@ -30,6 +30,7 @@ use crate::stateless_validation::validate::validate_partial_encoded_state_witnes use super::encoding::WitnessEncoderCache; use super::partial_witness_tracker::PartialEncodedStateWitnessTracker; +use near_primitives::utils::compression::CompressedData; pub struct PartialWitnessActor { /// Adapter to send messages to the network. diff --git a/chain/client/src/stateless_validation/partial_witness/partial_witness_tracker.rs b/chain/client/src/stateless_validation/partial_witness/partial_witness_tracker.rs index c94d7b504b6..9b608f1a033 100644 --- a/chain/client/src/stateless_validation/partial_witness/partial_witness_tracker.rs +++ b/chain/client/src/stateless_validation/partial_witness/partial_witness_tracker.rs @@ -19,6 +19,7 @@ use crate::client_actor::ClientSenderForPartialWitness; use crate::metrics; use super::encoding::{WitnessEncoder, WitnessEncoderCache, WitnessPart}; +use near_primitives::utils::compression::CompressedData; /// Max number of chunks to keep in the witness tracker cache. We reach here only after validation /// of the partial_witness so the LRU cache size need not be too large. diff --git a/chain/client/src/sync/epoch.rs b/chain/client/src/sync/epoch.rs index 4a82bc3af6c..6f9dee93ae9 100644 --- a/chain/client/src/sync/epoch.rs +++ b/chain/client/src/sync/epoch.rs @@ -17,12 +17,13 @@ use near_primitives::epoch_block_info::BlockInfo; use near_primitives::epoch_info::EpochInfo; use near_primitives::epoch_manager::AGGREGATOR_KEY; use near_primitives::epoch_sync::{ - EpochSyncProof, EpochSyncProofCurrentEpochData, EpochSyncProofLastEpochData, - EpochSyncProofPastEpochData, + CompressedEpochSyncProof, EpochSyncProof, EpochSyncProofCurrentEpochData, + EpochSyncProofLastEpochData, EpochSyncProofPastEpochData, }; use near_primitives::merkle::PartialMerkleTree; use near_primitives::network::PeerId; use near_primitives::types::{BlockHeight, EpochId}; +use near_primitives::utils::compression::CompressedData; use near_store::{DBCol, Store, FINAL_HEAD_KEY}; use rand::seq::SliceRandom; use rayon::iter::{IntoParallelIterator, ParallelIterator}; @@ -487,13 +488,20 @@ impl Handler for ClientActorInner { move || { let proof = match EpochSync::derive_epoch_sync_proof(store) { Ok(epoch_sync_proof) => epoch_sync_proof, - Err(e) => { - tracing::error!("Failed to derive epoch sync proof: {:?}", e); + Err(err) => { + tracing::error!(?err, "Failed to derive epoch sync proof"); + return; + } + }; + let (compressed_proof, _) = match CompressedEpochSyncProof::encode(&proof) { + Ok(compressed_proof) => compressed_proof, + Err(err) => { + tracing::error!(?err, "Failed to compress epoch sync proof"); return; } }; network_adapter.send(PeerManagerMessageRequest::NetworkRequests( - NetworkRequests::EpochSyncResponse { route_back, proof }, + NetworkRequests::EpochSyncResponse { route_back, proof: compressed_proof }, )); }, ) @@ -503,14 +511,21 @@ impl Handler for ClientActorInner { impl Handler for ClientActorInner { #[perf] fn handle(&mut self, msg: EpochSyncResponseMessage) { - if let Err(e) = self.client.epoch_sync.apply_proof( + let (proof, _) = match msg.proof.decode() { + Ok(proof) => proof, + Err(err) => { + tracing::error!(?err, "Failed to uncompress epoch sync proof"); + return; + } + }; + if let Err(err) = self.client.epoch_sync.apply_proof( &mut self.client.sync_status, &mut self.client.chain, - msg.proof, + proof, msg.from_peer, self.client.epoch_manager.as_ref(), ) { - tracing::error!("Failed to apply epoch sync proof: {:?}", e); + tracing::error!(?err, "Failed to apply epoch sync proof"); } } } diff --git a/chain/network/src/client.rs b/chain/network/src/client.rs index ac201ab3230..57dc5d48908 100644 --- a/chain/network/src/client.rs +++ b/chain/network/src/client.rs @@ -4,7 +4,7 @@ use near_async::messaging::{AsyncSender, Sender}; use near_async::{MultiSend, MultiSendMessage, MultiSenderFrom}; use near_primitives::block::{Approval, Block, BlockHeader}; use near_primitives::challenge::Challenge; -use near_primitives::epoch_sync::EpochSyncProof; +use near_primitives::epoch_sync::CompressedEpochSyncProof; use near_primitives::errors::InvalidTxError; use near_primitives::hash::CryptoHash; use near_primitives::network::{AnnounceAccount, PeerId}; @@ -128,7 +128,7 @@ pub struct EpochSyncRequestMessage { #[rtype(result = "()")] pub struct EpochSyncResponseMessage { pub from_peer: PeerId, - pub proof: EpochSyncProof, + pub proof: CompressedEpochSyncProof, } #[derive(Clone, MultiSend, MultiSenderFrom, MultiSendMessage)] diff --git a/chain/network/src/network_protocol/mod.rs b/chain/network/src/network_protocol/mod.rs index 82d644bc1d5..6c584219c70 100644 --- a/chain/network/src/network_protocol/mod.rs +++ b/chain/network/src/network_protocol/mod.rs @@ -39,7 +39,7 @@ use near_crypto::Signature; use near_o11y::OpenTelemetrySpanExt; use near_primitives::block::{Approval, Block, BlockHeader, GenesisId}; use near_primitives::challenge::Challenge; -use near_primitives::epoch_sync::EpochSyncProof; +use near_primitives::epoch_sync::CompressedEpochSyncProof; use near_primitives::hash::CryptoHash; use near_primitives::merkle::combine_hash; use near_primitives::network::{AnnounceAccount, PeerId}; @@ -551,7 +551,7 @@ pub enum RoutedMessageBody { PartialEncodedStateWitnessForward(PartialEncodedStateWitness), VersionedChunkEndorsement(ChunkEndorsement), EpochSyncRequest, - EpochSyncResponse(EpochSyncProof), + EpochSyncResponse(CompressedEpochSyncProof), } impl RoutedMessageBody { @@ -642,12 +642,8 @@ impl fmt::Debug for RoutedMessageBody { write!(f, "VersionedChunkEndorsement") } RoutedMessageBody::EpochSyncRequest => write!(f, "EpochSyncRequest"), - RoutedMessageBody::EpochSyncResponse(proof) => { - write!( - f, - "EpochSyncResponse(epoch: {:?})", - proof.current_epoch.first_block_header_in_epoch.epoch_id(), - ) + RoutedMessageBody::EpochSyncResponse(_) => { + write!(f, "EpochSyncResponse") } } } diff --git a/chain/network/src/types.rs b/chain/network/src/types.rs index ccf46543316..3ddcbbcc067 100644 --- a/chain/network/src/types.rs +++ b/chain/network/src/types.rs @@ -16,7 +16,7 @@ use near_async::time; use near_crypto::PublicKey; use near_primitives::block::{ApprovalMessage, Block, GenesisId}; use near_primitives::challenge::Challenge; -use near_primitives::epoch_sync::EpochSyncProof; +use near_primitives::epoch_sync::CompressedEpochSyncProof; use near_primitives::hash::CryptoHash; use near_primitives::network::{AnnounceAccount, PeerId}; use near_primitives::sharding::PartialEncodedChunkWithArcReceipts; @@ -285,7 +285,7 @@ pub enum NetworkRequests { /// Requests an epoch sync EpochSyncRequest { peer_id: PeerId }, /// Response to an epoch sync request - EpochSyncResponse { route_back: CryptoHash, proof: EpochSyncProof }, + EpochSyncResponse { route_back: CryptoHash, proof: CompressedEpochSyncProof }, } /// Combines peer address info, chain. diff --git a/core/primitives/src/epoch_sync.rs b/core/primitives/src/epoch_sync.rs index d4a269e2a8d..d7299cebbbb 100644 --- a/core/primitives/src/epoch_sync.rs +++ b/core/primitives/src/epoch_sync.rs @@ -3,8 +3,12 @@ use crate::epoch_block_info::BlockInfo; use crate::epoch_info::EpochInfo; use crate::merkle::PartialMerkleTree; use crate::types::validator_stake::ValidatorStake; +use crate::utils::compression::CompressedData; use borsh::{BorshDeserialize, BorshSerialize}; +use bytesize::ByteSize; use near_crypto::Signature; +use near_schema_checker_lib::ProtocolSchema; +use std::fmt::Debug; /// Proof that the blockchain history had progressed from the genesis (not included here) to the /// current epoch indicated in the proof. @@ -25,6 +29,38 @@ pub struct EpochSyncProof { pub current_epoch: EpochSyncProofCurrentEpochData, } +const MAX_UNCOMPRESSED_EPOCH_SYNC_PROOF_SIZE: u64 = ByteSize::mib(500).0; +const EPOCH_SYNC_COMPRESSION_LEVEL: i32 = 3; + +#[derive( + Clone, + PartialEq, + Eq, + BorshSerialize, + BorshDeserialize, + derive_more::From, + derive_more::AsRef, + ProtocolSchema, +)] +pub struct CompressedEpochSyncProof(Box<[u8]>); +impl + CompressedData< + EpochSyncProof, + MAX_UNCOMPRESSED_EPOCH_SYNC_PROOF_SIZE, + EPOCH_SYNC_COMPRESSION_LEVEL, + > for CompressedEpochSyncProof +{ +} + +impl Debug for CompressedEpochSyncProof { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("CompressedEpochSyncProof") + .field("len", &self.0.len()) + .field("proof", &self.decode()) + .finish() + } +} + /// Data needed for each epoch in the past. #[derive(Debug, Clone, PartialEq, Eq, BorshSerialize, BorshDeserialize)] pub struct EpochSyncProofPastEpochData { diff --git a/core/primitives/src/stateless_validation/state_witness.rs b/core/primitives/src/stateless_validation/state_witness.rs index f3edcedb9df..cf82101f036 100644 --- a/core/primitives/src/stateless_validation/state_witness.rs +++ b/core/primitives/src/stateless_validation/state_witness.rs @@ -7,10 +7,9 @@ use crate::congestion_info::CongestionInfo; use crate::sharding::{ChunkHash, ReceiptProof, ShardChunkHeader, ShardChunkHeaderV3}; use crate::transaction::SignedTransaction; use crate::types::EpochId; -use crate::utils::io::{CountingRead, CountingWrite}; +use crate::utils::compression::CompressedData; use crate::validator_signer::EmptyValidatorSigner; use borsh::{BorshDeserialize, BorshSerialize}; -use bytes::{Buf, BufMut}; use bytesize::ByteSize; use near_primitives_core::hash::CryptoHash; use near_primitives_core::types::{AccountId, BlockHeight, ShardId}; @@ -19,87 +18,36 @@ use near_schema_checker_lib::ProtocolSchema; /// Represents max allowed size of the raw (not compressed) state witness, /// corresponds to the size of borsh-serialized ChunkStateWitness. -pub const MAX_UNCOMPRESSED_STATE_WITNESS_SIZE: ByteSize = - ByteSize::mib(if cfg!(feature = "test_features") { 512 } else { 64 }); +pub const MAX_UNCOMPRESSED_STATE_WITNESS_SIZE: u64 = + ByteSize::mib(if cfg!(feature = "test_features") { 512 } else { 64 }).0; +pub const STATE_WITNESS_COMPRESSION_LEVEL: i32 = 3; /// Represents bytes of encoded ChunkStateWitness. /// This is the compressed version of borsh-serialized state witness. -#[derive(Debug, Clone, PartialEq, Eq, BorshSerialize, BorshDeserialize, ProtocolSchema)] +#[derive( + Debug, + Clone, + PartialEq, + Eq, + BorshSerialize, + BorshDeserialize, + ProtocolSchema, + derive_more::From, + derive_more::AsRef, +)] pub struct EncodedChunkStateWitness(Box<[u8]>); -pub type ChunkStateWitnessSize = usize; - -impl EncodedChunkStateWitness { - /// Only use this if you are sure that the data is already encoded. - pub fn from_boxed_slice(data: Box<[u8]>) -> Self { - Self(data) - } - - /// Borsh-serialize and compress state witness. - /// Returns encoded witness along with the raw (uncompressed) witness size. - pub fn encode(witness: &ChunkStateWitness) -> std::io::Result<(Self, ChunkStateWitnessSize)> { - const STATE_WITNESS_COMPRESSION_LEVEL: i32 = 3; - - // Flow of data: State witness --> Borsh serialization --> Counting write --> zstd compression --> Bytes. - // CountingWrite will count the number of bytes for the Borsh-serialized witness, before compression. - let mut counting_write = CountingWrite::new(zstd::stream::Encoder::new( - Vec::new().writer(), - STATE_WITNESS_COMPRESSION_LEVEL, - )?); - borsh::to_writer(&mut counting_write, witness)?; - - let borsh_bytes_len = counting_write.bytes_written(); - let encoded_bytes = counting_write.into_inner().finish()?.into_inner(); - - Ok((Self(encoded_bytes.into()), borsh_bytes_len.as_u64() as usize)) - } - - /// Decompress and borsh-deserialize encoded witness bytes. - /// Returns decoded witness along with the raw (uncompressed) witness size. - pub fn decode(&self) -> std::io::Result<(ChunkStateWitness, ChunkStateWitnessSize)> { - // We want to limit the size of decompressed data to address "Zip bomb" attack. - self.decode_with_limit(MAX_UNCOMPRESSED_STATE_WITNESS_SIZE) - } - - /// Decompress and borsh-deserialize encoded witness bytes. - /// Returns decoded witness along with the raw (uncompressed) witness size. - pub fn decode_with_limit( - &self, - limit: ByteSize, - ) -> std::io::Result<(ChunkStateWitness, ChunkStateWitnessSize)> { - // Flow of data: Bytes --> zstd decompression --> Counting read --> Borsh deserialization --> State witness. - // CountingRead will count the number of bytes for the Borsh-deserialized witness, after decompression. - let mut counting_read = CountingRead::new_with_limit( - zstd::stream::Decoder::new(self.0.as_ref().reader())?, - limit, - ); - - match borsh::from_reader(&mut counting_read) { - Err(err) => { - // If decompressed data exceeds the limit then CountingRead will return a WriteZero error. - // Here we convert it to a more descriptive error to make debugging easier. - let err = if err.kind() == std::io::ErrorKind::WriteZero { - std::io::Error::other(format!( - "Decompressed data exceeded limit of {limit}: {err}" - )) - } else { - err - }; - Err(err) - } - Ok(witness) => Ok((witness, counting_read.bytes_read().as_u64().try_into().unwrap())), - } - } - - pub fn size_bytes(&self) -> ChunkStateWitnessSize { - self.0.len() - } - - pub fn as_slice(&self) -> &[u8] { - &self.0 - } +impl + CompressedData< + ChunkStateWitness, + MAX_UNCOMPRESSED_STATE_WITNESS_SIZE, + STATE_WITNESS_COMPRESSION_LEVEL, + > for EncodedChunkStateWitness +{ } +pub type ChunkStateWitnessSize = usize; + /// An acknowledgement sent from the chunk producer upon receiving the state witness to /// the originator of the witness (chunk producer). /// @@ -293,65 +241,3 @@ pub struct ChunkStateTransition { /// this makes it easier to debug why a state witness may fail to validate. pub post_state_root: CryptoHash, } - -#[cfg(test)] -mod tests { - use bytesize::ByteSize; - use near_primitives_core::hash::CryptoHash; - use std::io::ErrorKind; - - use crate::stateless_validation::state_witness::{ChunkStateWitness, EncodedChunkStateWitness}; - - #[test] - fn encode_decode_state_dummy_witness_default_limit() { - let original_witness = ChunkStateWitness::new_dummy(42, 0, CryptoHash::default()); - let (encoded_witness, borsh_bytes_from_encode) = - EncodedChunkStateWitness::encode(&original_witness).unwrap(); - let (decoded_witness, borsh_bytes_from_decode) = - EncodedChunkStateWitness::from_boxed_slice(encoded_witness.0).decode().unwrap(); - assert_eq!(decoded_witness, original_witness); - assert_eq!(borsh_bytes_from_encode, borsh_bytes_from_decode); - assert_eq!(borsh::to_vec(&original_witness).unwrap().len(), borsh_bytes_from_encode); - } - - #[test] - fn encode_decode_state_dummy_witness_within_limit() { - const LIMIT: ByteSize = ByteSize::mib(32); - let original_witness = ChunkStateWitness::new_dummy(42, 0, CryptoHash::default()); - let (encoded_witness, borsh_bytes_from_encode) = - EncodedChunkStateWitness::encode(&original_witness).unwrap(); - let (decoded_witness, borsh_bytes_from_decode) = - EncodedChunkStateWitness::from_boxed_slice(encoded_witness.0) - .decode_with_limit(LIMIT) - .unwrap(); - assert_eq!(decoded_witness, original_witness); - assert_eq!(borsh_bytes_from_encode, borsh_bytes_from_decode); - assert_eq!(borsh::to_vec(&original_witness).unwrap().len(), borsh_bytes_from_encode); - } - - #[test] - fn encode_decode_state_dummy_witness_exceeds_limit() { - const LIMIT: ByteSize = ByteSize::b(32); - let original_witness = ChunkStateWitness::new_dummy(42, 0, CryptoHash::default()); - let (encoded_witness, borsh_bytes_from_encode) = - EncodedChunkStateWitness::encode(&original_witness).unwrap(); - assert!(borsh_bytes_from_encode > LIMIT.as_u64() as usize); - let error = EncodedChunkStateWitness::from_boxed_slice(encoded_witness.0) - .decode_with_limit(LIMIT) - .unwrap_err(); - assert_eq!(error.kind(), ErrorKind::Other); - assert_eq!( - error.to_string(), - "Decompressed data exceeded limit of 32 B: Exceeded the limit of 32 bytes" - ); - } - - #[test] - fn decode_state_dummy_witness_invalid_data() { - let invalid_data = [0; 10]; - let error = EncodedChunkStateWitness::from_boxed_slice(Box::new(invalid_data)) - .decode() - .unwrap_err(); - assert_eq!(error.kind(), ErrorKind::Other); - } -} diff --git a/core/primitives/src/utils.rs b/core/primitives/src/utils.rs index 186a3b3bf34..1f91c8cc2a1 100644 --- a/core/primitives/src/utils.rs +++ b/core/primitives/src/utils.rs @@ -21,6 +21,7 @@ use near_primitives_core::account::id::{AccountId, AccountType}; use std::mem::size_of; use std::ops::Deref; +pub mod compression; pub mod io; pub mod min_heap; diff --git a/core/primitives/src/utils/compression.rs b/core/primitives/src/utils/compression.rs new file mode 100644 index 00000000000..0b6fcce1103 --- /dev/null +++ b/core/primitives/src/utils/compression.rs @@ -0,0 +1,131 @@ +use super::io::CountingRead; +use crate::utils::io::CountingWrite; +use borsh::{BorshDeserialize, BorshSerialize}; +use bytes::{Buf, BufMut}; +use bytesize::ByteSize; + +/// Helper trait for implementing a compressed structure for networking messages. +/// The reason this is not a struct is because some derives do not work well on +/// structs that have generics; e.g. ProtocolSchema. +pub trait CompressedData +where + T: BorshSerialize + BorshDeserialize, + Self: From> + AsRef>, +{ + /// Only use this if you are sure that the data is already encoded. + fn from_boxed_slice(data: Box<[u8]>) -> Self { + Self::from(data) + } + + /// Borsh-serialize and compress the given data. + /// Returns compressed data along with the raw (uncompressed) serialized data size. + fn encode(uncompressed: &T) -> std::io::Result<(Self, usize)> { + // Flow of data: Original --> Borsh serialization --> Counting write --> zstd compression --> Bytes. + // CountingWrite will count the number of bytes for the Borsh-serialized data, before compression. + let mut counting_write = + CountingWrite::new(zstd::stream::Encoder::new(Vec::new().writer(), COMPRESSION_LEVEL)?); + borsh::to_writer(&mut counting_write, uncompressed)?; + + let borsh_bytes_len = counting_write.bytes_written(); + let encoded_bytes = counting_write.into_inner().finish()?.into_inner(); + + Ok((Self::from(encoded_bytes.into()), borsh_bytes_len.as_u64() as usize)) + } + + /// Decompress and borsh-deserialize the compressed data. + /// Returns decompressed and deserialized data along with the raw (uncompressed) serialized data size. + fn decode(&self) -> std::io::Result<(T, usize)> { + // We want to limit the size of decompressed data to address "Zip bomb" attack. + self.decode_with_limit(ByteSize(MAX_UNCOMPRESSED_SIZE)) + } + + /// Decompress and borsh-deserialize the compressed data. + /// Returns decompressed and deserialized data along with the raw (uncompressed) serialized data size. + fn decode_with_limit(&self, limit: ByteSize) -> std::io::Result<(T, usize)> { + // Flow of data: Bytes --> zstd decompression --> Counting read --> Borsh deserialization --> Original. + // CountingRead will count the number of bytes for the Borsh-deserialized data, after decompression. + let mut counting_read = CountingRead::new_with_limit( + zstd::stream::Decoder::new(self.as_ref().reader())?, + limit, + ); + + match borsh::from_reader(&mut counting_read) { + Err(err) => { + // If decompressed data exceeds the limit then CountingRead will return a WriteZero error. + // Here we convert it to a more descriptive error to make debugging easier. + let err = if err.kind() == std::io::ErrorKind::WriteZero { + std::io::Error::other(format!( + "Decompressed data exceeded limit of {limit}: {err}" + )) + } else { + err + }; + Err(err) + } + Ok(deserialized) => { + Ok((deserialized, counting_read.bytes_read().as_u64().try_into().unwrap())) + } + } + } + + fn size_bytes(&self) -> usize { + self.as_ref().len() + } + + fn as_slice(&self) -> &[u8] { + &self.as_ref() + } +} + +#[cfg(test)] +mod tests { + use crate::utils::compression::CompressedData; + use borsh::{BorshDeserialize, BorshSerialize}; + use std::io::ErrorKind; + + #[derive(BorshSerialize, BorshDeserialize, PartialEq, Debug)] + struct MyData(Vec); + + #[derive(derive_more::From, derive_more::AsRef)] + struct CompressedMyData(Box<[u8]>); + + impl super::CompressedData for CompressedMyData {} + + #[test] + fn encode_decode_within_limit() { + let data = MyData(vec![42; 100]); + let (compressed, uncompressed_size) = CompressedMyData::encode(&data).unwrap(); + let (decompressed, decompressed_size) = compressed.decode().unwrap(); + assert_eq!(&decompressed, &data); + assert_eq!(uncompressed_size, decompressed_size); + assert_eq!(borsh::to_vec(&data).unwrap().len(), uncompressed_size); + } + + #[test] + fn encode_exceeding_limit() { + // Encode exceeding limit is OK. + let data = MyData(vec![42; 2000]); + let (_, uncompressed_size) = CompressedMyData::encode(&data).unwrap(); + assert_eq!(borsh::to_vec(&data).unwrap().len(), uncompressed_size); + } + + #[test] + fn decode_exceeding_limit() { + let data = MyData(vec![42; 2000]); + let (compressed, _) = CompressedMyData::encode(&data).unwrap(); + let error = compressed.decode().unwrap_err(); + assert_eq!(error.kind(), ErrorKind::Other); + assert_eq!( + error.to_string(), + "Decompressed data exceeded limit of 1.0 KB: Exceeded the limit of 1000 bytes" + ); + } + + #[test] + fn decode_invalid_data() { + let invalid_data = [0; 10]; + let error = + CompressedMyData::from_boxed_slice(Box::new(invalid_data)).decode().unwrap_err(); + assert_eq!(error.kind(), ErrorKind::Other); + } +} diff --git a/core/primitives/src/validator_signer.rs b/core/primitives/src/validator_signer.rs index e640f0b7f25..d92a6ee1db6 100644 --- a/core/primitives/src/validator_signer.rs +++ b/core/primitives/src/validator_signer.rs @@ -16,6 +16,7 @@ use crate::stateless_validation::partial_witness::PartialEncodedStateWitnessInne use crate::stateless_validation::state_witness::EncodedChunkStateWitness; use crate::telemetry::TelemetryInfo; use crate::types::{AccountId, BlockHeight, EpochId}; +use crate::utils::compression::CompressedData; /// Enum for validator signer, that holds validator id and key used for signing data. #[derive(Clone, Debug, PartialEq)] diff --git a/tools/protocol-schema-check/res/protocol_schema.toml b/tools/protocol-schema-check/res/protocol_schema.toml index 21632483d7e..54b9fc41319 100644 --- a/tools/protocol-schema-check/res/protocol_schema.toml +++ b/tools/protocol-schema-check/res/protocol_schema.toml @@ -65,6 +65,7 @@ ChunkStateWitness = 1299024010 ChunkStateWitnessAck = 177881908 ChunkStats = 4176245277 CompilationError = 738158707 +CompressedEpochSyncProof = 1117061636 CongestionInfo = 2682682461 CongestionInfoV1 = 2571332168 ConnectionInfoRepr = 3621760869 @@ -152,7 +153,7 @@ PeerChainInfoV2 = 2686179044 PeerId = 2447445523 PeerIdOrHash = 4080492546 PeerInfo = 3831734408 -PeerMessage = 253071944 +PeerMessage = 1912504821 Ping = 2783493472 Pong = 3159638327 PrepareError = 4009037507 @@ -175,8 +176,8 @@ ReceiptV1 = 2994842769 ReceiptValidationError = 551721215 ReceivedData = 3601438283 RootProof = 3135729669 -RoutedMessage = 313421816 -RoutedMessageBody = 3925228741 +RoutedMessage = 334669112 +RoutedMessageBody = 237812276 RoutingTableUpdate = 2987752645 Secp256K1PublicKey = 4117078281 Secp256K1Signature = 3687154735