Skip to content

Commit

Permalink
[Epoch Sync] Compress the EpochSyncProof over the network. (near#12058)
Browse files Browse the repository at this point in the history
Closes near#11933 

This compresses the EpochSyncProof similar to how we compressed
ChunkStateWitness. In fact, the code is refactored out to a
CompressedData trait so we don't duplicate this code.
  • Loading branch information
robin-near authored Sep 9, 2024
1 parent b3851ac commit 9244789
Show file tree
Hide file tree
Showing 14 changed files with 232 additions and 161 deletions.
1 change: 1 addition & 0 deletions chain/chain/src/stateless_validation/chunk_validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use near_primitives::stateless_validation::state_witness::{
use near_primitives::transaction::SignedTransaction;
use near_primitives::types::chunk_extra::ChunkExtra;
use near_primitives::types::{ProtocolVersion, ShardId};
use near_primitives::utils::compression::CompressedData;
use near_store::PartialStorage;
use std::collections::HashMap;
use std::num::NonZeroUsize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use near_primitives::reed_solomon::{
reed_solomon_decode, reed_solomon_encode, reed_solomon_part_length,
};
use near_primitives::stateless_validation::state_witness::EncodedChunkStateWitness;
use near_primitives::utils::compression::CompressedData;
use reed_solomon_erasure::galois_8::ReedSolomon;

/// Ratio of the number of data parts to total parts in the Reed Solomon encoding.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::stateless_validation::validate::validate_partial_encoded_state_witnes

use super::encoding::WitnessEncoderCache;
use super::partial_witness_tracker::PartialEncodedStateWitnessTracker;
use near_primitives::utils::compression::CompressedData;

pub struct PartialWitnessActor {
/// Adapter to send messages to the network.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::client_actor::ClientSenderForPartialWitness;
use crate::metrics;

use super::encoding::{WitnessEncoder, WitnessEncoderCache, WitnessPart};
use near_primitives::utils::compression::CompressedData;

/// Max number of chunks to keep in the witness tracker cache. We reach here only after validation
/// of the partial_witness so the LRU cache size need not be too large.
Expand Down
31 changes: 23 additions & 8 deletions chain/client/src/sync/epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ use near_primitives::epoch_block_info::BlockInfo;
use near_primitives::epoch_info::EpochInfo;
use near_primitives::epoch_manager::AGGREGATOR_KEY;
use near_primitives::epoch_sync::{
EpochSyncProof, EpochSyncProofCurrentEpochData, EpochSyncProofLastEpochData,
EpochSyncProofPastEpochData,
CompressedEpochSyncProof, EpochSyncProof, EpochSyncProofCurrentEpochData,
EpochSyncProofLastEpochData, EpochSyncProofPastEpochData,
};
use near_primitives::merkle::PartialMerkleTree;
use near_primitives::network::PeerId;
use near_primitives::types::{BlockHeight, EpochId};
use near_primitives::utils::compression::CompressedData;
use near_store::{DBCol, Store, FINAL_HEAD_KEY};
use rand::seq::SliceRandom;
use rayon::iter::{IntoParallelIterator, ParallelIterator};
Expand Down Expand Up @@ -487,13 +488,20 @@ impl Handler<EpochSyncRequestMessage> for ClientActorInner {
move || {
let proof = match EpochSync::derive_epoch_sync_proof(store) {
Ok(epoch_sync_proof) => epoch_sync_proof,
Err(e) => {
tracing::error!("Failed to derive epoch sync proof: {:?}", e);
Err(err) => {
tracing::error!(?err, "Failed to derive epoch sync proof");
return;
}
};
let (compressed_proof, _) = match CompressedEpochSyncProof::encode(&proof) {
Ok(compressed_proof) => compressed_proof,
Err(err) => {
tracing::error!(?err, "Failed to compress epoch sync proof");
return;
}
};
network_adapter.send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::EpochSyncResponse { route_back, proof },
NetworkRequests::EpochSyncResponse { route_back, proof: compressed_proof },
));
},
)
Expand All @@ -503,14 +511,21 @@ impl Handler<EpochSyncRequestMessage> for ClientActorInner {
impl Handler<EpochSyncResponseMessage> for ClientActorInner {
#[perf]
fn handle(&mut self, msg: EpochSyncResponseMessage) {
if let Err(e) = self.client.epoch_sync.apply_proof(
let (proof, _) = match msg.proof.decode() {
Ok(proof) => proof,
Err(err) => {
tracing::error!(?err, "Failed to uncompress epoch sync proof");
return;
}
};
if let Err(err) = self.client.epoch_sync.apply_proof(
&mut self.client.sync_status,
&mut self.client.chain,
msg.proof,
proof,
msg.from_peer,
self.client.epoch_manager.as_ref(),
) {
tracing::error!("Failed to apply epoch sync proof: {:?}", e);
tracing::error!(?err, "Failed to apply epoch sync proof");
}
}
}
4 changes: 2 additions & 2 deletions chain/network/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use near_async::messaging::{AsyncSender, Sender};
use near_async::{MultiSend, MultiSendMessage, MultiSenderFrom};
use near_primitives::block::{Approval, Block, BlockHeader};
use near_primitives::challenge::Challenge;
use near_primitives::epoch_sync::EpochSyncProof;
use near_primitives::epoch_sync::CompressedEpochSyncProof;
use near_primitives::errors::InvalidTxError;
use near_primitives::hash::CryptoHash;
use near_primitives::network::{AnnounceAccount, PeerId};
Expand Down Expand Up @@ -128,7 +128,7 @@ pub struct EpochSyncRequestMessage {
#[rtype(result = "()")]
pub struct EpochSyncResponseMessage {
pub from_peer: PeerId,
pub proof: EpochSyncProof,
pub proof: CompressedEpochSyncProof,
}

#[derive(Clone, MultiSend, MultiSenderFrom, MultiSendMessage)]
Expand Down
12 changes: 4 additions & 8 deletions chain/network/src/network_protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use near_crypto::Signature;
use near_o11y::OpenTelemetrySpanExt;
use near_primitives::block::{Approval, Block, BlockHeader, GenesisId};
use near_primitives::challenge::Challenge;
use near_primitives::epoch_sync::EpochSyncProof;
use near_primitives::epoch_sync::CompressedEpochSyncProof;
use near_primitives::hash::CryptoHash;
use near_primitives::merkle::combine_hash;
use near_primitives::network::{AnnounceAccount, PeerId};
Expand Down Expand Up @@ -551,7 +551,7 @@ pub enum RoutedMessageBody {
PartialEncodedStateWitnessForward(PartialEncodedStateWitness),
VersionedChunkEndorsement(ChunkEndorsement),
EpochSyncRequest,
EpochSyncResponse(EpochSyncProof),
EpochSyncResponse(CompressedEpochSyncProof),
}

impl RoutedMessageBody {
Expand Down Expand Up @@ -642,12 +642,8 @@ impl fmt::Debug for RoutedMessageBody {
write!(f, "VersionedChunkEndorsement")
}
RoutedMessageBody::EpochSyncRequest => write!(f, "EpochSyncRequest"),
RoutedMessageBody::EpochSyncResponse(proof) => {
write!(
f,
"EpochSyncResponse(epoch: {:?})",
proof.current_epoch.first_block_header_in_epoch.epoch_id(),
)
RoutedMessageBody::EpochSyncResponse(_) => {
write!(f, "EpochSyncResponse")
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions chain/network/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use near_async::time;
use near_crypto::PublicKey;
use near_primitives::block::{ApprovalMessage, Block, GenesisId};
use near_primitives::challenge::Challenge;
use near_primitives::epoch_sync::EpochSyncProof;
use near_primitives::epoch_sync::CompressedEpochSyncProof;
use near_primitives::hash::CryptoHash;
use near_primitives::network::{AnnounceAccount, PeerId};
use near_primitives::sharding::PartialEncodedChunkWithArcReceipts;
Expand Down Expand Up @@ -285,7 +285,7 @@ pub enum NetworkRequests {
/// Requests an epoch sync
EpochSyncRequest { peer_id: PeerId },
/// Response to an epoch sync request
EpochSyncResponse { route_back: CryptoHash, proof: EpochSyncProof },
EpochSyncResponse { route_back: CryptoHash, proof: CompressedEpochSyncProof },
}

/// Combines peer address info, chain.
Expand Down
36 changes: 36 additions & 0 deletions core/primitives/src/epoch_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@ use crate::epoch_block_info::BlockInfo;
use crate::epoch_info::EpochInfo;
use crate::merkle::PartialMerkleTree;
use crate::types::validator_stake::ValidatorStake;
use crate::utils::compression::CompressedData;
use borsh::{BorshDeserialize, BorshSerialize};
use bytesize::ByteSize;
use near_crypto::Signature;
use near_schema_checker_lib::ProtocolSchema;
use std::fmt::Debug;

/// Proof that the blockchain history had progressed from the genesis (not included here) to the
/// current epoch indicated in the proof.
Expand All @@ -25,6 +29,38 @@ pub struct EpochSyncProof {
pub current_epoch: EpochSyncProofCurrentEpochData,
}

const MAX_UNCOMPRESSED_EPOCH_SYNC_PROOF_SIZE: u64 = ByteSize::mib(500).0;
const EPOCH_SYNC_COMPRESSION_LEVEL: i32 = 3;

#[derive(
Clone,
PartialEq,
Eq,
BorshSerialize,
BorshDeserialize,
derive_more::From,
derive_more::AsRef,
ProtocolSchema,
)]
pub struct CompressedEpochSyncProof(Box<[u8]>);
impl
CompressedData<
EpochSyncProof,
MAX_UNCOMPRESSED_EPOCH_SYNC_PROOF_SIZE,
EPOCH_SYNC_COMPRESSION_LEVEL,
> for CompressedEpochSyncProof
{
}

impl Debug for CompressedEpochSyncProof {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CompressedEpochSyncProof")
.field("len", &self.0.len())
.field("proof", &self.decode())
.finish()
}
}

/// Data needed for each epoch in the past.
#[derive(Debug, Clone, PartialEq, Eq, BorshSerialize, BorshDeserialize)]
pub struct EpochSyncProofPastEpochData {
Expand Down
Loading

0 comments on commit 9244789

Please sign in to comment.