From 4205bb41c2e68f641274c839edfe88a443d878e6 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Tue, 30 Apr 2024 23:24:36 +0900 Subject: [PATCH] Implement custody sync --- beacon_node/beacon_chain/src/beacon_chain.rs | 69 ++++- .../src/block_verification_types.rs | 73 ++++- .../src/data_availability_checker.rs | 110 +++++-- .../src/data_availability_checker/error.rs | 2 + .../overflow_lru_cache.rs | 77 +++-- .../src/data_column_verification.rs | 93 +++++- beacon_node/beacon_processor/src/lib.rs | 15 + beacon_node/beacon_processor/src/metrics.rs | 4 + .../lighthouse_network/src/rpc/methods.rs | 4 + .../lighthouse_network/src/types/globals.rs | 1 + .../src/network_beacon_processor/mod.rs | 30 +- .../network_beacon_processor/sync_methods.rs | 53 ++++ .../network/src/sync/block_lookups/common.rs | 57 +++- .../network/src/sync/block_lookups/mod.rs | 75 ++--- .../sync/block_lookups/single_block_lookup.rs | 123 ++++---- .../network/src/sync/block_lookups/tests.rs | 23 +- .../src/sync/block_sidecar_coupling.rs | 26 +- beacon_node/network/src/sync/manager.rs | 73 +++-- .../network/src/sync/network_context.rs | 232 +++++++++++++- .../src/sync/network_context/custody.rs | 283 ++++++++++++++++++ .../network/src/sync/range_sync/batch.rs | 1 + beacon_node/network/src/sync/sampling.rs | 4 +- 22 files changed, 1174 insertions(+), 254 deletions(-) create mode 100644 beacon_node/network/src/sync/network_context/custody.rs diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index e6028b80e67..b9c1156c478 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -23,7 +23,9 @@ use crate::chain_config::ChainConfig; use crate::data_availability_checker::{ Availability, AvailabilityCheckError, AvailableBlock, DataAvailabilityChecker, }; -use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn}; +use crate::data_column_verification::{ + CustodyDataColumn, GossipDataColumnError, GossipVerifiedDataColumn, +}; use crate::early_attester_cache::EarlyAttesterCache; use crate::errors::{BeaconChainError as Error, BlockProductionError}; use crate::eth1_chain::{Eth1Chain, Eth1ChainBackend}; @@ -3074,6 +3076,33 @@ impl BeaconChain { self.remove_notified(&block_root, r) } + /// Cache the blobs in the processing cache, process it, then evict it from the cache if it was + /// imported or errors. + pub async fn process_rpc_custody_columns( + self: &Arc, + block_root: Hash256, + custody_columns: Vec>, + ) -> Result> { + // If this block has already been imported to forkchoice it must have been available, so + // we don't need to process its blobs again. + if self + .canonical_head + .fork_choice_read_lock() + .contains_block(&block_root) + { + return Err(BlockError::BlockIsAlreadyKnown(block_root)); + } + + // TODO(das): custody column SSE event + // TODO(das): Why is the slot necessary here? + let slot = Slot::new(0); + + let r = self + .check_rpc_custody_columns_availability_and_import(slot, block_root, custody_columns) + .await; + self.remove_notified(&block_root, r) + } + /// Remove any block components from the *processing cache* if we no longer require them. If the /// block was imported full or erred, we no longer require them. fn remove_notified( @@ -3374,6 +3403,44 @@ impl BeaconChain { self.process_availability(slot, availability).await } + /// Checks if the provided columns can make any cached blocks available, and imports immediately + /// if so, otherwise caches the columns in the data availability checker. + async fn check_rpc_custody_columns_availability_and_import( + self: &Arc, + slot: Slot, + block_root: Hash256, + custody_columns: Vec>, + ) -> Result> { + // Need to scope this to ensure the lock is dropped before calling `process_availability` + // Even an explicit drop is not enough to convince the borrow checker. + { + let mut slashable_cache = self.observed_slashable.write(); + for header in custody_columns + .iter() + .map(|c| c.as_data_column().signed_block_header.clone()) + .unique() + { + if verify_header_signature::>(self, &header).is_ok() { + slashable_cache + .observe_slashable( + header.message.slot, + header.message.proposer_index, + block_root, + ) + .map_err(|e| BlockError::BeaconChainError(e.into()))?; + if let Some(slasher) = self.slasher.as_ref() { + slasher.accept_block_header(header); + } + } + } + } + let availability = self + .data_availability_checker + .put_rpc_custody_columns(block_root, custody_columns)?; + + self.process_availability(slot, availability).await + } + /// Imports a fully available block. Otherwise, returns `AvailabilityProcessingStatus::MissingComponents` /// /// An error is returned if the block was unable to be imported. It may be partially imported diff --git a/beacon_node/beacon_chain/src/block_verification_types.rs b/beacon_node/beacon_chain/src/block_verification_types.rs index 68019712395..6101cca3823 100644 --- a/beacon_node/beacon_chain/src/block_verification_types.rs +++ b/beacon_node/beacon_chain/src/block_verification_types.rs @@ -2,7 +2,9 @@ use crate::blob_verification::{GossipBlobError, GossipVerifiedBlobList}; use crate::block_verification::BlockError; use crate::data_availability_checker::AvailabilityCheckError; pub use crate::data_availability_checker::{AvailableBlock, MaybeAvailableBlock}; -use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumnList}; +use crate::data_column_verification::{ + CustodyDataColumn, CustodyDataColumnList, GossipDataColumnError, GossipVerifiedDataColumnList, +}; use crate::eth1_finalization_cache::Eth1FinalizationData; use crate::{get_block_root, GossipVerifiedBlock, PayloadVerificationOutcome}; use derivative::Derivative; @@ -11,7 +13,7 @@ use state_processing::ConsensusContext; use std::fmt::{Debug, Formatter}; use std::sync::Arc; use types::blob_sidecar::{self, BlobIdentifier, FixedBlobSidecarList}; -use types::data_column_sidecar::{self, DataColumnSidecarList}; +use types::data_column_sidecar::{self}; use types::{ BeaconBlockRef, BeaconState, BlindedPayload, BlobSidecarList, Epoch, EthSpec, Hash256, SignedBeaconBlock, SignedBeaconBlockHeader, Slot, @@ -52,7 +54,7 @@ impl RpcBlock { match &self.block { RpcBlockInner::Block(block) => block, RpcBlockInner::BlockAndBlobs(block, _) => block, - RpcBlockInner::BlockAndDataColumns(block, _) => block, + RpcBlockInner::BlockAndCustodyColumns(block, _) => block, } } @@ -60,7 +62,7 @@ impl RpcBlock { match &self.block { RpcBlockInner::Block(block) => block.clone(), RpcBlockInner::BlockAndBlobs(block, _) => block.clone(), - RpcBlockInner::BlockAndDataColumns(block, _) => block.clone(), + RpcBlockInner::BlockAndCustodyColumns(block, _) => block.clone(), } } @@ -68,7 +70,7 @@ impl RpcBlock { match &self.block { RpcBlockInner::Block(_) => None, RpcBlockInner::BlockAndBlobs(_, blobs) => Some(blobs), - RpcBlockInner::BlockAndDataColumns(_, _) => None, + RpcBlockInner::BlockAndCustodyColumns(_, _) => None, } } } @@ -86,7 +88,10 @@ enum RpcBlockInner { BlockAndBlobs(Arc>, BlobSidecarList), /// This variant is used with parent lookups and by-range responses. It should have all /// requested data columns, all block roots matching for this block. - BlockAndDataColumns(Arc>, DataColumnSidecarList), + BlockAndCustodyColumns( + Arc>, + VariableList, ::DataColumnCount>, + ), } impl RpcBlock { @@ -144,6 +149,35 @@ impl RpcBlock { }) } + pub fn new_with_custody_columns( + block_root: Option, + block: Arc>, + custody_columns: Vec>, + ) -> Result { + let block_root = block_root.unwrap_or_else(|| get_block_root(&block)); + + if let Ok(block_commitments) = block.message().body().blob_kzg_commitments() { + // The number of required custody columns is out of scope here. + if block_commitments.len() > 0 && custody_columns.len() == 0 { + return Err(AvailabilityCheckError::MissingCustodyColumns); + } + } + // Treat empty blob lists as if they are missing. + let inner = if custody_columns.is_empty() { + RpcBlockInner::BlockAndCustodyColumns( + block, + VariableList::new(custody_columns) + .expect("TODO(das): custody vec should never exceed len"), + ) + } else { + RpcBlockInner::Block(block) + }; + Ok(Self { + block_root, + block: inner, + }) + } + pub fn new_from_fixed( block_root: Hash256, block: Arc>, @@ -168,27 +202,27 @@ impl RpcBlock { Hash256, Arc>, Option>, - Option>, + Option>, ) { let block_root = self.block_root(); match self.block { RpcBlockInner::Block(block) => (block_root, block, None, None), RpcBlockInner::BlockAndBlobs(block, blobs) => (block_root, block, Some(blobs), None), - RpcBlockInner::BlockAndDataColumns(block, data_columns) => { + RpcBlockInner::BlockAndCustodyColumns(block, data_columns) => { (block_root, block, None, Some(data_columns)) } } } pub fn n_blobs(&self) -> usize { match &self.block { - RpcBlockInner::Block(_) | RpcBlockInner::BlockAndDataColumns(_, _) => 0, + RpcBlockInner::Block(_) | RpcBlockInner::BlockAndCustodyColumns(_, _) => 0, RpcBlockInner::BlockAndBlobs(_, blobs) => blobs.len(), } } pub fn n_data_columns(&self) -> usize { match &self.block { RpcBlockInner::Block(_) | RpcBlockInner::BlockAndBlobs(_, _) => 0, - RpcBlockInner::BlockAndDataColumns(_, data_columns) => data_columns.len(), + RpcBlockInner::BlockAndCustodyColumns(_, data_columns) => data_columns.len(), } } } @@ -545,7 +579,20 @@ impl AsBlock for AvailableBlock { let inner = match (blobs_opt, data_columns_opt) { (None, None) => RpcBlockInner::Block(block), (Some(blobs), _) => RpcBlockInner::BlockAndBlobs(block, blobs), - (_, Some(data_columns)) => RpcBlockInner::BlockAndDataColumns(block, data_columns), + (_, Some(data_columns)) => RpcBlockInner::BlockAndCustodyColumns( + block, + VariableList::new( + data_columns + .into_iter() + // TODO(das): This is an ugly hack that should be removed. After updating + // store types to handle custody data columns this should not be required. + // It's okay-ish because available blocks must have all the required custody + // columns. + .map(|d| CustodyDataColumn::from_asserted_custody(d)) + .collect(), + ) + .expect("data column list is within bounds"), + ), }; RpcBlock { block_root, @@ -577,14 +624,14 @@ impl AsBlock for RpcBlock { match &self.block { RpcBlockInner::Block(block) => block, RpcBlockInner::BlockAndBlobs(block, _) => block, - RpcBlockInner::BlockAndDataColumns(block, _) => block, + RpcBlockInner::BlockAndCustodyColumns(block, _) => block, } } fn block_cloned(&self) -> Arc> { match &self.block { RpcBlockInner::Block(block) => block.clone(), RpcBlockInner::BlockAndBlobs(block, _) => block.clone(), - RpcBlockInner::BlockAndDataColumns(block, _) => block.clone(), + RpcBlockInner::BlockAndCustodyColumns(block, _) => block.clone(), } } fn canonical_root(&self) -> Hash256 { diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 6c8afaa01a6..b63d6d90eac 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -7,6 +7,7 @@ use crate::{BeaconChain, BeaconChainTypes, BeaconStore}; use kzg::Kzg; use slog::{debug, error, o, Logger}; use slot_clock::SlotClock; +use ssz_types::VariableList; use std::fmt; use std::fmt::Debug; use std::num::NonZeroUsize; @@ -22,7 +23,10 @@ mod error; mod overflow_lru_cache; mod state_lru_cache; -use crate::data_column_verification::{verify_kzg_for_data_column_list, GossipVerifiedDataColumn}; +use crate::data_column_verification::{ + verify_kzg_for_data_column_list, CustodyDataColumn, GossipVerifiedDataColumn, + KzgVerifiedCustodyDataColumn, +}; pub use error::{Error as AvailabilityCheckError, ErrorCategory as AvailabilityCheckErrorCategory}; use types::data_column_sidecar::{DataColumnIdentifier, DataColumnSidecarList}; use types::non_zero_usize::new_non_zero_usize; @@ -133,6 +137,15 @@ impl DataAvailabilityChecker { }) } + /// Return the set of imported custody column indexes for `block_root`. Returns None if there is + /// no block component for `block_root`. + pub fn imported_custody_column_indexes(&self, block_root: &Hash256) -> Option> { + self.availability_cache + .peek_pending_components(block_root, |components| { + components.map(|components| components.get_cached_data_columns_indices()) + }) + } + /// Get a blob from the availability cache. pub fn get_blob( &self, @@ -173,6 +186,27 @@ impl DataAvailabilityChecker { .put_kzg_verified_blobs(block_root, verified_blobs) } + /// Put a list of custody columns received via RPC into the availability cache. This performs KZG + /// verification on the blobs in the list. + pub fn put_rpc_custody_columns( + &self, + block_root: Hash256, + custody_columns: Vec>, + ) -> Result, AvailabilityCheckError> { + let Some(kzg) = self.kzg.as_ref() else { + return Err(AvailabilityCheckError::KzgNotInitialized); + }; + + // TODO(das): report which column is invalid for proper peer scoring + let verified_custody_columns = custody_columns + .into_iter() + .map(|c| KzgVerifiedCustodyDataColumn::new(c, kzg).map_err(AvailabilityCheckError::Kzg)) + .collect::, _>>()?; + + self.availability_cache + .put_kzg_verified_data_columns(block_root, verified_custody_columns) + } + /// Check if we've cached other blobs for this block. If it completes a set and we also /// have a block cached, return the `Availability` variant triggering block import. /// Otherwise cache the blob sidecar. @@ -195,10 +229,14 @@ impl DataAvailabilityChecker { &self, gossip_data_column: GossipVerifiedDataColumn, ) -> Result, AvailabilityCheckError> { - self.availability_cache.put_kzg_verified_data_columns( - gossip_data_column.block_root(), - vec![gossip_data_column.into_inner()], - ) + let block_root = gossip_data_column.block_root(); + + // TODO(das): ensure that our custody requirements include this column + let custody_column = + KzgVerifiedCustodyDataColumn::from_asserted_custody(gossip_data_column.into_inner()); + + self.availability_cache + .put_kzg_verified_data_columns(block_root, vec![custody_column]) } /// Check if we have all the blobs for a block. Returns `Availability` which has information @@ -253,7 +291,16 @@ impl DataAvailabilityChecker { block, blobs: None, blobs_available_timestamp: None, - data_columns, + // TODO(das): update store type to prevent this conversion + data_columns: Some( + VariableList::new( + data_column_list + .into_iter() + .map(|d| d.clone_arc()) + .collect(), + ) + .expect("data column list is within bounds"), + ), })) } else { Ok(MaybeAvailableBlock::AvailabilityPending { block_root, block }) @@ -322,7 +369,13 @@ impl DataAvailabilityChecker { block, blobs: None, blobs_available_timestamp: None, - data_columns, + // TODO(das): update store type to prevent this conversion + data_columns: data_columns.map(|data_columns| { + VariableList::new( + data_columns.into_iter().map(|d| d.into_inner()).collect(), + ) + .expect("data column list is within bounds") + }), }) } else { MaybeAvailableBlock::AvailabilityPending { block_root, block } @@ -371,16 +424,27 @@ impl DataAvailabilityChecker { /// `None` if the `Deneb` fork is disabled. pub fn data_availability_boundary(&self) -> Option { self.spec.deneb_fork_epoch.and_then(|fork_epoch| { - self.slot_clock - .now() - .map(|slot| slot.epoch(T::EthSpec::slots_per_epoch())) - .map(|current_epoch| { - std::cmp::max( - fork_epoch, - current_epoch - .saturating_sub(self.spec.min_epochs_for_blob_sidecars_requests), - ) - }) + self.slot_clock.now().map(|current_slot| { + std::cmp::max( + fork_epoch, + current_slot + .epoch(T::EthSpec::slots_per_epoch()) + .saturating_sub(self.spec.min_epochs_for_blob_sidecars_requests), + ) + }) + }) + } + + pub fn data_availability_boundary_peerdas(&self) -> Option { + self.spec.peer_das_epoch.and_then(|fork_epoch| { + self.slot_clock.now().map(|current_slot| { + std::cmp::max( + fork_epoch, + current_slot + .epoch(T::EthSpec::slots_per_epoch()) + .saturating_sub(self.spec.min_epochs_for_blob_sidecars_requests), + ) + }) }) } @@ -390,18 +454,6 @@ impl DataAvailabilityChecker { .map_or(false, |da_epoch| block_epoch >= da_epoch) } - pub fn da_check_required_for_current_epoch(&self) -> bool { - let Some(current_slot) = self.slot_clock.now_or_genesis() else { - error!( - self.log, - "Failed to read slot clock when checking for missing blob ids" - ); - return false; - }; - - self.da_check_required_for_epoch(current_slot.epoch(T::EthSpec::slots_per_epoch())) - } - /// Returns `true` if the current epoch is greater than or equal to the `Deneb` epoch. pub fn is_deneb(&self) -> bool { self.slot_clock.now().map_or(false, |slot| { diff --git a/beacon_node/beacon_chain/src/data_availability_checker/error.rs b/beacon_node/beacon_chain/src/data_availability_checker/error.rs index 1a61d28608c..79793d6dc29 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/error.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/error.rs @@ -14,6 +14,7 @@ pub enum Error { Unexpected, SszTypes(ssz_types::Error), MissingBlobs, + MissingCustodyColumns, BlobIndexInvalid(u64), DataColumnIndexInvalid(u64), StoreError(store::Error), @@ -38,6 +39,7 @@ impl Error { Error::KzgNotInitialized | Error::SszTypes(_) | Error::MissingBlobs + | Error::MissingCustodyColumns | Error::StoreError(_) | Error::DecodeError(_) | Error::Unexpected diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index c511d2c1cc2..9d62499711c 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -34,7 +34,7 @@ use crate::block_verification_types::{ AvailabilityPendingExecutedBlock, AvailableBlock, AvailableExecutedBlock, }; use crate::data_availability_checker::{Availability, AvailabilityCheckError}; -use crate::data_column_verification::KzgVerifiedDataColumn; +use crate::data_column_verification::{KzgVerifiedCustodyDataColumn, KzgVerifiedDataColumn}; use crate::store::{DBColumn, KeyValueStore}; use crate::BeaconChainTypes; use lru::LruCache; @@ -47,7 +47,7 @@ use std::num::NonZeroUsize; use std::{collections::HashSet, sync::Arc}; use types::blob_sidecar::BlobIdentifier; use types::data_column_sidecar::DataColumnIdentifier; -use types::{BlobSidecar, ChainSpec, DataColumnSidecar, Epoch, EthSpec, Hash256}; +use types::{BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, Epoch, EthSpec, Hash256}; /// This represents the components of a partially available block /// @@ -59,7 +59,7 @@ use types::{BlobSidecar, ChainSpec, DataColumnSidecar, Epoch, EthSpec, Hash256}; pub struct PendingComponents { pub block_root: Hash256, pub verified_blobs: FixedVector>, E::MaxBlobsPerBlock>, - pub verified_data_columns: VariableList, E::DataColumnCount>, + pub verified_data_columns: VariableList, E::DataColumnCount>, pub executed_block: Option>, } @@ -85,17 +85,11 @@ impl PendingComponents { pub fn get_cached_data_column( &self, data_column_index: u64, - ) -> Option<&KzgVerifiedDataColumn> { + ) -> Option>> { self.verified_data_columns .iter() - .find(|d| d.data_column_index() == data_column_index) - } - - /// Returns an immutable reference to the list of cached data columns. - pub fn get_cached_data_columns( - &self, - ) -> &VariableList, E::DataColumnCount> { - &self.verified_data_columns + .find(|d| d.index() == data_column_index) + .map(|d| d.clone_arc()) } /// Returns a mutable reference to the cached block. @@ -110,13 +104,6 @@ impl PendingComponents { &mut self.verified_blobs } - /// Returns a mutable reference to the fixed vector of cached data columns. - pub fn get_cached_data_columns_mut( - &mut self, - ) -> &mut VariableList, E::DataColumnCount> { - &mut self.verified_data_columns - } - /// Checks if a blob exists at the given index in the cache. /// /// Returns: @@ -134,10 +121,8 @@ impl PendingComponents { /// Returns: /// - `true` if a data column for the given index exists. /// - `false` otherwise. - fn data_column_exists(&self, data_colum_index: u64) -> bool { - self.get_cached_data_columns() - .iter() - .any(|d| d.data_column_index() == data_colum_index) + fn data_column_exists(&self, data_column_index: u64) -> bool { + self.get_cached_data_column(data_column_index).is_some() } /// Returns the number of blobs that are expected to be present. Returns `None` if we don't have a @@ -157,7 +142,15 @@ impl PendingComponents { /// Returns the number of data columns that have been received and are stored in the cache. pub fn num_received_data_columns(&self) -> usize { - self.get_cached_data_columns().iter().count() + self.verified_data_columns.iter().count() + } + + /// Returns the indices of cached custody columns + pub fn get_cached_data_columns_indices(&self) -> Vec { + self.verified_data_columns + .iter() + .map(|d| d.index()) + .collect() } /// Inserts a block into the cache. @@ -208,13 +201,13 @@ impl PendingComponents { } /// Merges a given set of data columns into the cache. - fn merge_data_columns>>( + fn merge_data_columns>>( &mut self, kzg_verified_data_columns: I, ) -> Result<(), AvailabilityCheckError> { for data_column in kzg_verified_data_columns { // TODO(das): Add equivalent checks for data columns if necessary - if !self.data_column_exists(data_column.data_column_index()) { + if !self.data_column_exists(data_column.index()) { self.verified_data_columns.push(data_column)?; } } @@ -325,13 +318,6 @@ impl PendingComponents { }; let verified_blobs = VariableList::new(verified_blobs)?; - // TODO(das) Do we need a check here for number of expected custody columns? - let verified_data_columns = verified_data_columns - .into_iter() - .map(|d| d.to_data_column()) - .collect::>() - .into(); - let executed_block = recover(diet_executed_block)?; let AvailabilityPendingExecutedBlock { @@ -345,7 +331,17 @@ impl PendingComponents { block, blobs: Some(verified_blobs), blobs_available_timestamp, - data_columns: Some(verified_data_columns), + // TODO(das) Do we need a check here for number of expected custody columns? + // TODO(das): Update store types to prevent this conversion + data_columns: Some( + VariableList::new( + verified_data_columns + .into_iter() + .map(|d| d.into_inner()) + .collect(), + ) + .expect("data column list is within bounds"), + ), }; Ok(Availability::Available(Box::new( AvailableExecutedBlock::new(available_block, import_data, payload_verification_outcome), @@ -468,7 +464,7 @@ impl OverflowStore { for data_column in pending_components.verified_data_columns.into_iter() { let key = OverflowKey::from_data_column_id::(DataColumnIdentifier { block_root, - index: data_column.data_column_index(), + index: data_column.index(), })?; self.0.hot_db.put_bytes( @@ -513,11 +509,10 @@ impl OverflowStore { } OverflowKey::DataColumn(_, _index) => { let data_column = - KzgVerifiedDataColumn::from_ssz_bytes(value_bytes.as_slice())?; + KzgVerifiedCustodyDataColumn::from_ssz_bytes(value_bytes.as_slice())?; maybe_pending_components .get_or_insert_with(|| PendingComponents::empty(block_root)) - .get_cached_data_columns_mut() - .push(data_column)?; + .merge_data_columns(vec![data_column])?; } } } @@ -633,9 +628,7 @@ impl Critical { data_column_id: &DataColumnIdentifier, ) -> Result>>, AvailabilityCheckError> { if let Some(pending_components) = self.in_memory.peek(&data_column_id.block_root) { - Ok(pending_components - .get_cached_data_column(data_column_id.index) - .map(|data_column| data_column.clone_data_column())) + Ok(pending_components.get_cached_data_column(data_column_id.index)) } else { Ok(None) } @@ -811,7 +804,7 @@ impl OverflowLRUCache { } pub fn put_kzg_verified_data_columns< - I: IntoIterator>, + I: IntoIterator>, >( &self, block_root: Hash256, diff --git a/beacon_node/beacon_chain/src/data_column_verification.rs b/beacon_node/beacon_chain/src/data_column_verification.rs index 7f1338bdb58..136ca037d0b 100644 --- a/beacon_node/beacon_chain/src/data_column_verification.rs +++ b/beacon_node/beacon_chain/src/data_column_verification.rs @@ -131,15 +131,15 @@ impl GossipVerifiedDataColumn { } pub fn slot(&self) -> Slot { - self.data_column.data_column.slot() + self.data_column.data.slot() } pub fn index(&self) -> ColumnIndex { - self.data_column.data_column.index + self.data_column.data.index } pub fn signed_block_header(&self) -> SignedBeaconBlockHeader { - self.data_column.data_column.signed_block_header.clone() + self.data_column.data.signed_block_header.clone() } pub fn into_inner(self) -> KzgVerifiedDataColumn { @@ -152,7 +152,7 @@ impl GossipVerifiedDataColumn { #[derivative(PartialEq, Eq)] #[ssz(struct_behaviour = "transparent")] pub struct KzgVerifiedDataColumn { - data_column: Arc>, + data: Arc>, } impl KzgVerifiedDataColumn { @@ -160,18 +160,91 @@ impl KzgVerifiedDataColumn { verify_kzg_for_data_column(data_column, kzg) } pub fn to_data_column(self) -> Arc> { - self.data_column + self.data } pub fn as_data_column(&self) -> &DataColumnSidecar { - &self.data_column + &self.data } /// This is cheap as we're calling clone on an Arc pub fn clone_data_column(&self) -> Arc> { - self.data_column.clone() + self.data.clone() } pub fn data_column_index(&self) -> u64 { - self.data_column.index + self.data.index + } +} + +pub type CustodyDataColumnList = + VariableList, ::DataColumnCount>; + +/// Data column that we must custody +#[derive(Debug, Derivative, Clone, Encode, Decode)] +#[derivative(PartialEq, Eq, Hash(bound = "E: EthSpec"))] +#[ssz(struct_behaviour = "transparent")] +pub struct CustodyDataColumn { + data: Arc>, +} + +impl CustodyDataColumn { + /// Mark a column as custody column. Caller must ensure that our current custody requirements + /// include this column + pub fn from_asserted_custody(data: Arc>) -> Self { + Self { data } + } + + pub fn into_inner(self) -> Arc> { + self.data + } + pub fn as_data_column(&self) -> &DataColumnSidecar { + &self.data + } + /// This is cheap as we're calling clone on an Arc + pub fn clone_arc(&self) -> Arc> { + self.data.clone() + } + pub fn index(&self) -> u64 { + self.data.index + } +} + +/// Data column that we must custody and has completed kzg verification +#[derive(Debug, Derivative, Clone, Encode, Decode)] +#[derivative(PartialEq, Eq)] +#[ssz(struct_behaviour = "transparent")] +pub struct KzgVerifiedCustodyDataColumn { + data: Arc>, +} + +impl KzgVerifiedCustodyDataColumn { + /// Mark a column as custody column. Caller must ensure that our current custody requirements + /// include this column + pub fn from_asserted_custody(kzg_verified: KzgVerifiedDataColumn) -> Self { + Self { + data: kzg_verified.to_data_column(), + } + } + + /// Verify a column already marked as custody column + pub fn new(data_column: CustodyDataColumn, _kzg: &Kzg) -> Result { + // TODO(das): verify kzg proof + Ok(Self { + data: data_column.data, + }) + } + + pub fn into_inner(self) -> Arc> { + self.data + } + pub fn as_data_column(&self) -> &DataColumnSidecar { + &self.data + } + /// This is cheap as we're calling clone on an Arc + pub fn clone_arc(&self) -> Arc> { + self.data.clone() + } + pub fn index(&self) -> u64 { + self.data.index } } @@ -189,7 +262,7 @@ pub fn verify_kzg_for_data_column( // data_column.kzg_commitment, // data_column.kzg_proof, // )?; - Ok(KzgVerifiedDataColumn { data_column }) + Ok(KzgVerifiedDataColumn { data: data_column }) } /// Complete kzg verification for a list of `DataColumnSidecar`s. @@ -202,7 +275,7 @@ pub fn verify_kzg_for_data_column_list<'a, E: EthSpec, I>( _kzg: &'a Kzg, ) -> Result<(), KzgError> where - I: Iterator>>, + I: Iterator>, { // TODO(das): implement kzg verification Ok(()) diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index 3eebf5718a1..5d443d896a4 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -162,6 +162,7 @@ const MAX_RPC_BLOCK_QUEUE_LEN: usize = 1_024; const MAX_RPC_BLOB_QUEUE_LEN: usize = 1_024; /// TODO(das): Placeholder number +const MAX_RPC_CUSTODY_COLUMN_QUEUE_LEN: usize = 1000; const MAX_RPC_VERIFY_DATA_COLUMN_QUEUE_LEN: usize = 1000; const MAX_SAMPLING_RESULT_QUEUE_LEN: usize = 1000; @@ -260,6 +261,7 @@ pub const GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE: &str = "light_client_optimistic pub const RPC_BLOCK: &str = "rpc_block"; pub const IGNORED_RPC_BLOCK: &str = "ignored_rpc_block"; pub const RPC_BLOBS: &str = "rpc_blob"; +pub const RPC_CUSTODY_COLUMN: &str = "rpc_custody_column"; pub const RPC_VERIFY_DATA_COLUMNS: &str = "rpc_verify_data_columns"; pub const SAMPLING_RESULT: &str = "sampling_result"; pub const CHAIN_SEGMENT: &str = "chain_segment"; @@ -647,6 +649,7 @@ pub enum Work { RpcBlobs { process_fn: AsyncFn, }, + RpcCustodyColumn(AsyncFn), RpcVerifyDataColumn(AsyncFn), SamplingResult(AsyncFn), IgnoredRpcBlock { @@ -695,6 +698,7 @@ impl Work { Work::GossipLightClientOptimisticUpdate(_) => GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE, Work::RpcBlock { .. } => RPC_BLOCK, Work::RpcBlobs { .. } => RPC_BLOBS, + Work::RpcCustodyColumn { .. } => RPC_CUSTODY_COLUMN, Work::RpcVerifyDataColumn(_) => RPC_VERIFY_DATA_COLUMNS, Work::SamplingResult(_) => SAMPLING_RESULT, Work::IgnoredRpcBlock { .. } => IGNORED_RPC_BLOCK, @@ -858,6 +862,7 @@ impl BeaconProcessor { // Using a FIFO queue since blocks need to be imported sequentially. let mut rpc_block_queue = FifoQueue::new(MAX_RPC_BLOCK_QUEUE_LEN); let mut rpc_blob_queue = FifoQueue::new(MAX_RPC_BLOB_QUEUE_LEN); + let mut rpc_custody_column_queue = FifoQueue::new(MAX_RPC_CUSTODY_COLUMN_QUEUE_LEN); let mut rpc_verify_data_column_queue = FifoQueue::new(MAX_RPC_VERIFY_DATA_COLUMN_QUEUE_LEN); let mut sampling_result_queue = FifoQueue::new(MAX_SAMPLING_RESULT_QUEUE_LEN); let mut chain_segment_queue = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN); @@ -1016,6 +1021,8 @@ impl BeaconProcessor { } else if let Some(item) = rpc_blob_queue.pop() { self.spawn_worker(item, idle_tx); // TODO(das): decide proper priorization for sampling columns + } else if let Some(item) = rpc_custody_column_queue.pop() { + self.spawn_worker(item, idle_tx); } else if let Some(item) = rpc_verify_data_column_queue.pop() { self.spawn_worker(item, idle_tx); } else if let Some(item) = sampling_result_queue.pop() { @@ -1316,6 +1323,9 @@ impl BeaconProcessor { rpc_block_queue.push(work, work_id, &self.log) } Work::RpcBlobs { .. } => rpc_blob_queue.push(work, work_id, &self.log), + Work::RpcCustodyColumn { .. } => { + rpc_custody_column_queue.push(work, work_id, &self.log) + } Work::RpcVerifyDataColumn(_) => { rpc_verify_data_column_queue.push(work, work_id, &self.log) } @@ -1418,6 +1428,10 @@ impl BeaconProcessor { &metrics::BEACON_PROCESSOR_RPC_BLOB_QUEUE_TOTAL, rpc_blob_queue.len() as i64, ); + metrics::set_gauge( + &metrics::BEACON_PROCESSOR_RPC_CUSTODY_COLUMN_QUEUE_TOTAL, + rpc_custody_column_queue.len() as i64, + ); metrics::set_gauge( &metrics::BEACON_PROCESSOR_RPC_VERIFY_DATA_COLUMN_QUEUE_TOTAL, rpc_verify_data_column_queue.len() as i64, @@ -1567,6 +1581,7 @@ impl BeaconProcessor { } => task_spawner.spawn_async(process_fn), Work::RpcBlock { process_fn } | Work::RpcBlobs { process_fn } + | Work::RpcCustodyColumn(process_fn) | Work::RpcVerifyDataColumn(process_fn) | Work::SamplingResult(process_fn) => task_spawner.spawn_async(process_fn), Work::IgnoredRpcBlock { process_fn } => task_spawner.spawn_blocking(process_fn), diff --git a/beacon_node/beacon_processor/src/metrics.rs b/beacon_node/beacon_processor/src/metrics.rs index f4c9d61e7d9..472996b1c20 100644 --- a/beacon_node/beacon_processor/src/metrics.rs +++ b/beacon_node/beacon_processor/src/metrics.rs @@ -86,6 +86,10 @@ lazy_static::lazy_static! { "beacon_processor_rpc_blob_queue_total", "Count of blobs from the rpc waiting to be verified." ); + pub static ref BEACON_PROCESSOR_RPC_CUSTODY_COLUMN_QUEUE_TOTAL: Result = try_create_int_gauge( + "beacon_processor_rpc_custody_column_queue_total", + "Count of custody columns from the rpc waiting to be imported." + ); // Rpc verify data columns pub static ref BEACON_PROCESSOR_RPC_VERIFY_DATA_COLUMN_QUEUE_TOTAL: Result = try_create_int_gauge( "beacon_processor_rpc_verify_data_column_queue_total", diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index 18bf4e7dcfc..b030d8eff4e 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -388,6 +388,10 @@ impl DataColumnsByRootRequest { Self { data_column_ids } } + pub fn new_single(block_root: Hash256, index: ColumnIndex, spec: &ChainSpec) -> Self { + Self::new(vec![DataColumnIdentifier { block_root, index }], spec) + } + pub fn group_by_ordered_block_root(&self) -> Vec<(Hash256, Vec)> { let mut column_indexes_by_block = BTreeMap::>::new(); for request_id in self.data_column_ids.as_slice() { diff --git a/beacon_node/lighthouse_network/src/types/globals.rs b/beacon_node/lighthouse_network/src/types/globals.rs index ff2cb97d057..5dd74f25726 100644 --- a/beacon_node/lighthouse_network/src/types/globals.rs +++ b/beacon_node/lighthouse_network/src/types/globals.rs @@ -115,6 +115,7 @@ impl NetworkGlobals { pub fn custody_columns(&self, _epoch: Epoch) -> Result, &'static str> { let enr = self.local_enr(); let node_id = enr.node_id().raw().into(); + // TODO(das): cache this number at start-up to not make this fallible let custody_subnet_count = enr.custody_subnet_count::()?; Ok( DataColumnSubnetId::compute_custody_columns::(node_id, custody_subnet_count) diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 18965fcd6ef..13529bebae7 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -2,7 +2,9 @@ use crate::{ service::NetworkMessage, sync::{manager::BlockProcessType, SamplingId, SyncMessage}, }; -use beacon_chain::block_verification_types::RpcBlock; +use beacon_chain::{ + block_verification_types::RpcBlock, data_column_verification::CustodyDataColumn, +}; use beacon_chain::{builder::Witness, eth1_chain::CachingEth1Backend, BeaconChain}; use beacon_chain::{BeaconChainTypes, NotifyExecutionLayer}; use beacon_processor::{ @@ -478,8 +480,32 @@ impl NetworkBeaconProcessor { }) } + /// Create a new `Work` event for some custody columns. `process_rpc_custody_columns` reports + /// the result back to sync. + pub fn send_rpc_custody_columns( + self: &Arc, + block_root: Hash256, + custody_columns: Vec>, + seen_timestamp: Duration, + process_type: BlockProcessType, + ) -> Result<(), Error> { + let s = self.clone(); + self.try_send(BeaconWorkEvent { + drop_during_sync: false, + work: Work::RpcCustodyColumn(Box::pin(async move { + s.process_rpc_custody_columns( + block_root, + custody_columns, + seen_timestamp, + process_type, + ) + .await; + })), + }) + } + /// Create a new `Work` event for some data_columns from ReqResp - pub fn send_rpc_data_columns( + pub fn send_rpc_validate_data_columns( self: &Arc, block_root: Hash256, data_columns: Vec>>, diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 03fd30abb79..7c432a4866b 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -8,6 +8,7 @@ use crate::sync::{ use beacon_chain::block_verification_types::{AsBlock, RpcBlock}; use beacon_chain::data_availability_checker::AvailabilityCheckError; use beacon_chain::data_availability_checker::MaybeAvailableBlock; +use beacon_chain::data_column_verification::CustodyDataColumn; use beacon_chain::{ validator_monitor::get_slot_delay_ms, AvailabilityProcessingStatus, BeaconChainError, BeaconChainTypes, BlockError, ChainSegmentResult, HistoricalBlockError, NotifyExecutionLayer, @@ -303,6 +304,58 @@ impl NetworkBeaconProcessor { }); } + pub async fn process_rpc_custody_columns( + self: Arc>, + block_root: Hash256, + custody_columns: Vec>, + _seen_timestamp: Duration, + process_type: BlockProcessType, + ) { + let result = self + .chain + .process_rpc_custody_columns(block_root, custody_columns) + .await; + + match &result { + Ok(AvailabilityProcessingStatus::Imported(hash)) => { + debug!( + self.log, + "Block components retrieved"; + "result" => "imported block and custody columns", + "block_hash" => %hash, + ); + self.chain.recompute_head_at_current_slot().await; + } + Ok(AvailabilityProcessingStatus::MissingComponents(_, _)) => { + debug!( + self.log, + "Missing components over rpc"; + "block_hash" => %block_root, + ); + } + Err(BlockError::BlockIsAlreadyKnown(_)) => { + debug!( + self.log, + "Custody columns have already been imported"; + "block_hash" => %block_root, + ); + } + Err(e) => { + warn!( + self.log, + "Error when importing rpc custody columns"; + "error" => ?e, + "block_hash" => %block_root, + ); + } + } + + self.send_sync_message(SyncMessage::BlockComponentProcessed { + process_type, + result: result.into(), + }); + } + /// Validate a list of data columns received from RPC requests pub async fn validate_rpc_data_columns( self: Arc>, diff --git a/beacon_node/network/src/sync/block_lookups/common.rs b/beacon_node/network/src/sync/block_lookups/common.rs index dc82000ef1a..7fb58d3c2b1 100644 --- a/beacon_node/network/src/sync/block_lookups/common.rs +++ b/beacon_node/network/src/sync/block_lookups/common.rs @@ -9,6 +9,7 @@ use crate::sync::network_context::{ BlobsByRootSingleBlockRequest, BlocksByRootSingleRequest, SyncNetworkContext, }; use beacon_chain::block_verification_types::RpcBlock; +use beacon_chain::data_column_verification::CustodyDataColumn; use beacon_chain::BeaconChainTypes; use std::sync::Arc; use types::blob_sidecar::FixedBlobSidecarList; @@ -17,10 +18,13 @@ use types::SignedBeaconBlock; use super::single_block_lookup::DownloadResult; use super::SingleLookupId; +use super::single_block_lookup::CustodyRequestState; + #[derive(Debug, Copy, Clone)] pub enum ResponseType { Block, Blob, + CustodyColumn, } /// The maximum depth we will search for a parent block. In principle we should have sync'd any @@ -153,7 +157,7 @@ impl RequestState for BlockRequestState { value, block_root, seen_timestamp, - peer_id: _, + .. } = download_result; cx.send_block_for_processing( block_root, @@ -207,7 +211,7 @@ impl RequestState for BlobRequestState { value, block_root, seen_timestamp, - peer_id: _, + .. } = download_result; cx.send_blobs_for_processing( block_root, @@ -231,3 +235,52 @@ impl RequestState for BlobRequestState { &mut self.state } } + +impl RequestState for CustodyRequestState { + type RequestType = (); + type VerifiedResponseType = Vec>; + + fn make_request( + &self, + id: Id, + _peer_id: PeerId, + downloaded_block_expected_blobs: Option, + cx: &mut SyncNetworkContext, + ) -> Result { + cx.custody_lookup_request(id, self.block_root, downloaded_block_expected_blobs) + .map_err(LookupRequestError::SendFailed) + } + + fn send_for_processing( + id: Id, + download_result: DownloadResult, + cx: &SyncNetworkContext, + ) -> Result<(), LookupRequestError> { + let DownloadResult { + value, + block_root, + seen_timestamp, + .. + } = download_result; + cx.send_custody_columns_for_processing( + block_root, + value, + seen_timestamp, + BlockProcessType::SingleCustodyColumn(id), + ) + .map_err(LookupRequestError::SendFailed) + } + + fn response_type() -> ResponseType { + ResponseType::CustodyColumn + } + fn request_state_mut(request: &mut SingleBlockLookup) -> &mut Self { + &mut request.custody_request_state + } + fn get_state(&self) -> &SingleLookupRequestState { + &self.state + } + fn get_state_mut(&mut self) -> &mut SingleLookupRequestState { + &mut self.state + } +} diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index a0c7c33bb0f..73e54fb7870 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -2,12 +2,11 @@ use self::parent_chain::{compute_parent_chains, NodeChain}; pub use self::single_block_lookup::DownloadResult; use self::single_block_lookup::{LookupRequestError, SingleBlockLookup}; use super::manager::{BlockProcessType, BlockProcessingResult}; -use super::network_context::{RpcProcessingResult, SyncNetworkContext}; +use super::network_context::{LookupFailure, PeerGroup, SyncNetworkContext}; use crate::metrics; use crate::sync::block_lookups::common::{ResponseType, PARENT_DEPTH_TOLERANCE}; use crate::sync::block_lookups::parent_chain::find_oldest_fork_ancestor; use crate::sync::manager::Id; -use crate::sync::network_context::LookupFailure; use beacon_chain::block_verification_types::AsBlock; use beacon_chain::data_availability_checker::AvailabilityCheckErrorCategory; use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError}; @@ -15,7 +14,7 @@ pub use common::RequestState; use fnv::FnvHashMap; use lighthouse_network::{PeerAction, PeerId}; use lru_cache::LRUTimeCache; -pub use single_block_lookup::{BlobRequestState, BlockRequestState}; +pub use single_block_lookup::{BlobRequestState, BlockRequestState, CustodyRequestState}; use slog::{debug, error, trace, warn, Logger}; use std::sync::Arc; use std::time::Duration; @@ -187,7 +186,7 @@ impl BlockLookups { .iter() .find(|(_, l)| l.block_root() == block_to_drop) { - for &peer_id in lookup.all_used_peers() { + for &peer_id in lookup.all_peers() { cx.report_peer( peer_id, PeerAction::LowToleranceError, @@ -292,11 +291,10 @@ impl BlockLookups { pub fn on_download_response>( &mut self, id: SingleLookupId, - peer_id: PeerId, - response: RpcProcessingResult, + response: Result<(R::VerifiedResponseType, PeerGroup, Duration), LookupFailure>, cx: &mut SyncNetworkContext, ) { - if let Err(e) = self.on_download_response_inner::(id, peer_id, response, cx) { + if let Err(e) = self.on_download_response_inner::(id, response, cx) { self.on_lookup_request_error(id, e, "download_response"); } } @@ -305,16 +303,10 @@ impl BlockLookups { pub fn on_download_response_inner>( &mut self, id: SingleLookupId, - peer_id: PeerId, - response: RpcProcessingResult, + response: Result<(R::VerifiedResponseType, PeerGroup, Duration), LookupFailure>, cx: &mut SyncNetworkContext, ) -> Result<(), LookupRequestError> { - // Downscore peer even if lookup is not known - // Only downscore lookup verify errors. RPC errors are downscored in the network handler. - if let Err(LookupFailure::LookupVerifyError(e)) = &response { - // Note: the error is displayed in full debug form on the match below - cx.report_peer(peer_id, PeerAction::LowToleranceError, e.into()); - } + // Note: do not downscore peers here for requests errors, SyncNetworkContext does it. let response_type = R::response_type(); let Some(lookup) = self.single_block_lookups.get_mut(&id) else { @@ -328,11 +320,11 @@ impl BlockLookups { let request_state = R::request_state_mut(lookup).get_state_mut(); match response { - Ok((response, seen_timestamp)) => { + Ok((response, peer_group, seen_timestamp)) => { debug!(self.log, "Received lookup download success"; "block_root" => %block_root, - "peer_id" => %peer_id, + "peer_group" => ?peer_group, "response_type" => ?response_type, ); @@ -343,18 +335,19 @@ impl BlockLookups { value: response, block_root, seen_timestamp, - peer_id, + peer_group, })?; // continue_request will send for processing as the request state is AwaitingProcessing lookup.continue_request::(cx) } Err(e) => { + // TODO(das): is it okay to not log the peer source of request failures? Then we + // should log individual requests failures in the SyncNetworkContext debug!(self.log, "Received lookup download failure"; "block_root" => %block_root, - "peer_id" => %peer_id, "response_type" => ?response_type, - "error" => %e, + "error" => ?e, ); request_state.on_download_failure()?; @@ -395,6 +388,9 @@ impl BlockLookups { BlockProcessType::SingleBlob { id } => { self.on_processing_result_inner::>(id, result, cx) } + BlockProcessType::SingleCustodyColumn(id) => { + self.on_processing_result_inner::>(id, result, cx) + } } { self.on_lookup_request_error(process_type.id(), e, "processing_result"); } @@ -446,11 +442,14 @@ impl BlockLookups { .blob_request_state .state .on_post_process_validation_failure()?; - cx.report_peer( - blob_peer, - PeerAction::MidToleranceError, - "sent_incomplete_blobs", - ); + // TODO(das): downscore only the peer that served the request for all blobs + for peer in blob_peer.all() { + cx.report_peer( + *peer, + PeerAction::MidToleranceError, + "sent_incomplete_blobs", + ); + } } Action::Retry } @@ -507,15 +506,21 @@ impl BlockLookups { } other => { debug!(self.log, "Invalid lookup component"; "block_root" => %block_root, "component" => ?R::response_type(), "error" => ?other); - let peer_id = request_state.on_processing_failure()?; - cx.report_peer( - peer_id, - PeerAction::MidToleranceError, - match R::response_type() { - ResponseType::Block => "lookup_block_processing_failure", - ResponseType::Blob => "lookup_blobs_processing_failure", - }, - ); + let peer_group = request_state.on_processing_failure()?; + // TOOD(das): only downscore peer subgroup that provided the invalid proof + for peer in peer_group.all() { + cx.report_peer( + *peer, + PeerAction::MidToleranceError, + match R::response_type() { + ResponseType::Block => "lookup_block_processing_failure", + ResponseType::Blob => "lookup_blobs_processing_failure", + ResponseType::CustodyColumn => { + "lookup_custody_column_processing_failure" + } + }, + ); + } Action::Retry } @@ -530,7 +535,7 @@ impl BlockLookups { lookup.continue_requests(cx)?; } Action::ParentUnknown { parent_root } => { - let peers = lookup.all_available_peers().cloned().collect::>(); + let peers = lookup.all_peers().cloned().collect::>(); lookup.set_awaiting_parent(parent_root); debug!(self.log, "Marking lookup as awaiting parent"; "lookup" => %block_root, "parent_root" => %parent_root); self.search_parent_of_child(parent_root, block_root, &peers, cx); diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 76deb236742..90ab630c316 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -2,10 +2,9 @@ use super::common::{AwaitingParent, BlockIsProcessed}; use super::{BlockComponent, PeerId}; use crate::sync::block_lookups::common::RequestState; use crate::sync::block_lookups::Id; -use crate::sync::network_context::SyncNetworkContext; +use crate::sync::network_context::{PeerGroup, SyncNetworkContext}; +use beacon_chain::data_column_verification::CustodyDataColumn; use beacon_chain::BeaconChainTypes; -use itertools::Itertools; -use rand::seq::IteratorRandom; use std::collections::HashSet; use std::fmt::Debug; use std::sync::Arc; @@ -31,8 +30,11 @@ pub struct SingleBlockLookup { pub id: Id, pub block_request_state: BlockRequestState, pub blob_request_state: BlobRequestState, + pub custody_request_state: CustodyRequestState, block_root: Hash256, awaiting_parent: Option, + /// Peers that claim to have imported this block or blob. + peers: HashSet, } impl SingleBlockLookup { @@ -46,8 +48,10 @@ impl SingleBlockLookup { id, block_request_state: BlockRequestState::new(requested_block_root, peers), blob_request_state: BlobRequestState::new(requested_block_root, peers), + custody_request_state: CustodyRequestState::new(requested_block_root, peers), block_root: requested_block_root, awaiting_parent, + peers: peers.iter().copied().collect(), } } @@ -94,22 +98,9 @@ impl SingleBlockLookup { self.block_root() == block_root } - /// Get all unique used peers across block and blob requests. - pub fn all_used_peers(&self) -> impl Iterator + '_ { - self.block_request_state - .state - .get_used_peers() - .chain(self.blob_request_state.state.get_used_peers()) - .unique() - } - - /// Get all unique available peers across block and blob requests. - pub fn all_available_peers(&self) -> impl Iterator + '_ { - self.block_request_state - .state - .get_available_peers() - .chain(self.blob_request_state.state.get_available_peers()) - .unique() + /// Get all unique peers across block and blob requests. + pub fn all_peers(&self) -> impl Iterator + '_ { + self.peers.iter() } pub fn continue_requests( @@ -119,6 +110,7 @@ impl SingleBlockLookup { // TODO: Check what's necessary to download, specially for blobs self.continue_request::>(cx)?; self.continue_request::>(cx)?; + self.continue_request::>(cx)?; Ok(()) } @@ -145,8 +137,7 @@ impl SingleBlockLookup { /// Add all given peers to both block and blob request states. pub fn add_peer(&mut self, peer_id: PeerId) { - self.block_request_state.state.add_peer(&peer_id); - self.blob_request_state.state.add_peer(&peer_id); + self.peers.insert(peer_id); } /// Add all given peers to both block and blob request states. @@ -160,16 +151,17 @@ impl SingleBlockLookup { pub fn both_components_processed(&self) -> bool { self.block_request_state.state.is_processed() && self.blob_request_state.state.is_processed() + && self.custody_request_state.state.is_processed() } /// Checks both the block and blob request states to see if the peer is disconnected. /// /// Returns true if the lookup should be dropped. pub fn should_drop_lookup_on_disconnected_peer(&mut self, peer_id: &PeerId) -> bool { - self.block_request_state.state.remove_peer(peer_id); - self.blob_request_state.state.remove_peer(peer_id); - - if self.all_available_peers().count() == 0 { + // TODO(das): this condition is not correct. We may have a request with the block already + // processed but missing custody columns fetching. Custody peers are tracked somewhere else, + // so we should let a request retry fail this lookup + if self.peers.len() == 0 { return true; } @@ -195,6 +187,21 @@ impl BlobRequestState { } } +/// The state of the blob request component of a `SingleBlockLookup`. +pub struct CustodyRequestState { + pub block_root: Hash256, + pub state: SingleLookupRequestState>>, +} + +impl CustodyRequestState { + pub fn new(block_root: Hash256, peer_source: &[PeerId]) -> Self { + Self { + block_root, + state: SingleLookupRequestState::new(peer_source), + } + } +} + /// The state of the block request component of a `SingleBlockLookup`. pub struct BlockRequestState { pub requested_block_root: Hash256, @@ -210,32 +217,28 @@ impl BlockRequestState { } } -#[derive(Debug, PartialEq, Eq, Clone)] +#[derive(Debug, Clone)] pub struct DownloadResult { pub value: T, pub block_root: Hash256, pub seen_timestamp: Duration, - pub peer_id: PeerId, + pub peer_group: PeerGroup, } -#[derive(Debug, PartialEq, Eq)] +#[derive(Debug)] pub enum State { AwaitingDownload, Downloading, AwaitingProcess(DownloadResult), Processing(DownloadResult), - Processed(PeerId), + Processed(PeerGroup), } /// Object representing the state of a single block or blob lookup request. -#[derive(PartialEq, Eq, Debug)] +#[derive(Debug)] pub struct SingleLookupRequestState { /// State of this request. state: State, - /// Peers that should have this block or blob. - available_peers: HashSet, - /// Peers from which we have requested this block. - used_peers: HashSet, /// How many times have we attempted to process this block or blob. failed_processing: u8, /// How many times have we attempted to download this block or blob. @@ -244,15 +247,8 @@ pub struct SingleLookupRequestState { impl SingleLookupRequestState { pub fn new(peers: &[PeerId]) -> Self { - let mut available_peers = HashSet::default(); - for peer in peers.iter().copied() { - available_peers.insert(peer); - } - Self { state: State::AwaitingDownload, - available_peers, - used_peers: HashSet::default(), failed_processing: 0, failed_downloading: 0, } @@ -370,13 +366,13 @@ impl SingleLookupRequestState { } /// Registers a failure in processing a block. - pub fn on_processing_failure(&mut self) -> Result { + pub fn on_processing_failure(&mut self) -> Result { match &self.state { State::Processing(result) => { - let peer_id = result.peer_id; + let peers_source = result.peer_group.clone(); self.failed_processing = self.failed_processing.saturating_add(1); self.state = State::AwaitingDownload; - Ok(peer_id) + Ok(peers_source) } other => Err(LookupRequestError::BadState(format!( "Bad state on_processing_failure expected Processing got {other}" @@ -384,12 +380,12 @@ impl SingleLookupRequestState { } } - pub fn on_processing_success(&mut self) -> Result { + pub fn on_processing_success(&mut self) -> Result { match &self.state { State::Processing(result) => { - let peer_id = result.peer_id; - self.state = State::Processed(peer_id); - Ok(peer_id) + let peer_group = result.peer_group.clone(); + self.state = State::Processed(peer_group.clone()); + Ok(peer_group) } other => Err(LookupRequestError::BadState(format!( "Bad state on_processing_success expected Processing got {other}" @@ -397,13 +393,13 @@ impl SingleLookupRequestState { } } - pub fn on_post_process_validation_failure(&mut self) -> Result { + pub fn on_post_process_validation_failure(&mut self) -> Result { match &self.state { - State::Processed(peer_id) => { - let peer_id = *peer_id; + State::Processed(peer_group) => { + let peer_group = peer_group.clone(); self.failed_processing = self.failed_processing.saturating_add(1); self.state = State::AwaitingDownload; - Ok(peer_id) + Ok(peer_group) } other => Err(LookupRequestError::BadState(format!( "Bad state on_post_process_validation_failure expected Processed got {other}" @@ -420,32 +416,13 @@ impl SingleLookupRequestState { self.failed_processing >= self.failed_downloading } - /// This method should be used for peers wrapped in `PeerId::BlockAndBlobs`. - pub fn add_peer(&mut self, peer_id: &PeerId) { - self.available_peers.insert(*peer_id); - } - - /// If a peer disconnects, this request could be failed. If so, an error is returned - pub fn remove_peer(&mut self, disconnected_peer_id: &PeerId) { - self.available_peers.remove(disconnected_peer_id); - } - - pub fn get_used_peers(&self) -> impl Iterator { - self.used_peers.iter() - } - - pub fn get_available_peers(&self) -> impl Iterator { - self.available_peers.iter() - } - - /// Selects a random peer from available peers if any, inserts it in used peers and returns it. - pub fn use_rand_available_peer(&mut self) -> Option { + /// Selects a random peer from available peers if any. + pub fn use_rand_available_peer(&self) -> Option { let peer_id = self .available_peers .iter() .choose(&mut rand::thread_rng()) .copied()?; - self.used_peers.insert(peer_id); Some(peer_id) } } diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index bf92ab78796..e374c8bfd88 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -1,7 +1,9 @@ use crate::network_beacon_processor::NetworkBeaconProcessor; use crate::service::RequestId; -use crate::sync::manager::{RequestId as SyncRequestId, SingleLookupReqId, SyncManager}; +use crate::sync::manager::{ + DataColumnsByRootRequestId, RequestId as SyncRequestId, SingleLookupReqId, SyncManager, +}; use crate::sync::sampling::{SamplingConfig, SamplingRequester}; use crate::sync::{SamplingId, SyncMessage}; use crate::NetworkMessage; @@ -82,7 +84,7 @@ const D: Duration = Duration::new(0, 0); const PARENT_FAIL_TOLERANCE: u8 = SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS; const SAMPLING_REQUIRED_SUCCESSES: usize = 2; -type SamplingIds = Vec<(Id, ColumnIndex)>; +type SamplingIds = Vec<(DataColumnsByRootRequestId, ColumnIndex)>; impl TestRig { fn test_setup() -> Self { @@ -495,7 +497,7 @@ impl TestRig { } } - fn return_empty_sampling_request(&mut self, id: Id) { + fn return_empty_sampling_request(&mut self, id: DataColumnsByRootRequestId) { let peer_id = PeerId::random(); // Send stream termination self.send_sync_message(SyncMessage::RpcDataColumn { @@ -522,7 +524,7 @@ impl TestRig { fn complete_valid_sampling_column_request( &mut self, - id: Id, + id: DataColumnsByRootRequestId, data_column: DataColumnSidecar, ) { let peer_id = PeerId::random(); @@ -752,6 +754,7 @@ impl TestRig { } other => panic!("Expected blob process, found {:?}", other), }, + ResponseType::CustodyColumn => todo!(), } } @@ -1468,6 +1471,18 @@ fn sampling_with_retries() { r.expect_clean_finished_sampling(); } +#[test] +fn custody_lookup_happy_path() { + let Some(mut r) = TestRig::test_setup_after_deneb() else { + return; + }; + r.new_connected_peers(100); // Add enough sampling peers + let (block, data_columns) = r.rand_block_and_data_columns(); + let block_root = block.canonical_root(); + let peer_id = r.new_connected_peer(); + r.trigger_unknown_block_from_attestation(block_root, peer_id); +} + // TODO(das): Test retries of DataColumnByRoot: // - Expect request for column_index // - Respond with bad data diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index d159733cbc7..80cfb4eb64b 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -1,4 +1,6 @@ -use beacon_chain::block_verification_types::RpcBlock; +use beacon_chain::{ + block_verification_types::RpcBlock, data_column_verification::CustodyDataColumn, +}; use ssz_types::VariableList; use std::{collections::VecDeque, sync::Arc}; use types::{BlobSidecar, EthSpec, SignedBeaconBlock}; @@ -11,10 +13,12 @@ pub struct BlocksAndBlobsRequestInfo { accumulated_blocks: VecDeque>>, /// Sidecars we have received awaiting for their corresponding block. accumulated_sidecars: VecDeque>>, + accumulated_custody_columns: VecDeque>, /// Whether the individual RPC request for blocks is finished or not. is_blocks_stream_terminated: bool, /// Whether the individual RPC request for sidecars is finished or not. is_sidecars_stream_terminated: bool, + is_custody_columns_stream_terminated: bool, /// Used to determine if this accumulator should wait for a sidecars stream termination request_type: ByRangeRequestType, } @@ -24,8 +28,10 @@ impl BlocksAndBlobsRequestInfo { Self { accumulated_blocks: <_>::default(), accumulated_sidecars: <_>::default(), + accumulated_custody_columns: <_>::default(), is_blocks_stream_terminated: <_>::default(), is_sidecars_stream_terminated: <_>::default(), + is_custody_columns_stream_terminated: <_>::default(), request_type, } } @@ -48,6 +54,13 @@ impl BlocksAndBlobsRequestInfo { } } + pub fn add_custody_column(&mut self, column_opt: Option>) { + match column_opt { + Some(column) => self.accumulated_custody_columns.push_back(column), + None => self.is_custody_columns_stream_terminated = true, + } + } + pub fn into_responses(self) -> Result>, String> { let BlocksAndBlobsRequestInfo { accumulated_blocks, @@ -96,11 +109,12 @@ impl BlocksAndBlobsRequestInfo { } pub fn is_finished(&self) -> bool { - let blobs_requested = match self.request_type { - ByRangeRequestType::Blocks => false, - ByRangeRequestType::BlocksAndBlobs => true, - }; - self.is_blocks_stream_terminated && (!blobs_requested || self.is_sidecars_stream_terminated) + let blobs_requested = matches!(self.request_type, ByRangeRequestType::BlocksAndBlobs); + let custody_columns_requested = + matches!(self.request_type, ByRangeRequestType::BlocksAndColumns); + self.is_blocks_stream_terminated + && (!blobs_requested || self.is_sidecars_stream_terminated) + && (!custody_columns_requested || self.is_custody_columns_stream_terminated) } } diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index e1d052a4e11..ca55440954c 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -35,7 +35,9 @@ use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart}; use super::block_lookups::BlockLookups; -use super::network_context::{BlockOrBlob, RangeRequestId, RpcEvent, SyncNetworkContext}; +use super::network_context::{ + custody::CustodyRequester, BlockOrBlob, CustodyId, RangeRequestId, RpcEvent, SyncNetworkContext, +}; use super::peer_sync_info::{remote_sync_type, PeerSyncType}; use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH}; use super::sampling::{Sampling, SamplingConfig, SamplingId, SamplingRequester, SamplingResult}; @@ -43,9 +45,10 @@ use crate::network_beacon_processor::{ChainSegmentProcessId, NetworkBeaconProces use crate::service::NetworkMessage; use crate::status::ToStatusMessage; use crate::sync::block_lookups::{ - BlobRequestState, BlockComponent, BlockRequestState, DownloadResult, + BlobRequestState, BlockComponent, BlockRequestState, CustodyRequestState, DownloadResult, }; use crate::sync::block_sidecar_coupling::BlocksAndBlobsRequestInfo; +use crate::sync::network_context::PeerGroup; use beacon_chain::block_verification_types::AsBlock; use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::validator_monitor::timestamp_now; @@ -89,15 +92,21 @@ pub enum RequestId { /// Request searching for a set of blobs given a hash. SingleBlob { id: SingleLookupReqId }, /// Request searching for a set of data columns given a hash and list of column indices. - DataColumnsByRoot(Id), + DataColumnsByRoot(DataColumnsByRootRequestId), /// Range request that is composed by both a block range request and a blob range request. RangeBlockAndBlobs { id: Id }, } +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub struct DataColumnsByRootRequestId { + pub requester: DataColumnsByRootRequester, + pub req_id: Id, +} + #[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] pub enum DataColumnsByRootRequester { Sampling(SamplingId), - Custody, + Custody(CustodyId), } #[derive(Debug)] @@ -178,12 +187,15 @@ pub enum SyncMessage { pub enum BlockProcessType { SingleBlock { id: Id }, SingleBlob { id: Id }, + SingleCustodyColumn(Id), } impl BlockProcessType { pub fn id(&self) -> Id { match self { - BlockProcessType::SingleBlock { id } | BlockProcessType::SingleBlob { id } => *id, + BlockProcessType::SingleBlock { id } + | BlockProcessType::SingleBlob { id } + | BlockProcessType::SingleCustodyColumn(id) => *id, } } } @@ -638,7 +650,7 @@ impl SyncManager { value: block.block_cloned(), block_root, seen_timestamp: timestamp_now(), - peer_id, + peer_group: PeerGroup::from_single(peer_id), }), ); } @@ -656,7 +668,7 @@ impl SyncManager { value: blob, block_root, seen_timestamp: timestamp_now(), - peer_id, + peer_group: PeerGroup::from_single(peer_id), }), ); } @@ -871,12 +883,13 @@ impl SyncManager { peer_id: PeerId, block: RpcEvent>>, ) { - if let Some(resp) = self.network.on_single_block_response(id, block) { + if let Some(resp) = self.network.on_single_block_response(id, peer_id, block) { self.block_lookups .on_download_response::>( id.lookup_id, - peer_id, - resp, + resp.map(|(value, seen_timestamp)| { + (value, PeerGroup::from_single(peer_id), seen_timestamp) + }), &mut self.network, ) } @@ -940,12 +953,13 @@ impl SyncManager { peer_id: PeerId, blob: RpcEvent>>, ) { - if let Some(resp) = self.network.on_single_blob_response(id, blob) { + if let Some(resp) = self.network.on_single_blob_response(id, peer_id, blob) { self.block_lookups .on_download_response::>( id.lookup_id, - peer_id, - resp, + resp.map(|(value, seen_timestamp)| { + (value, PeerGroup::from_single(peer_id), seen_timestamp) + }), &mut self.network, ) } @@ -953,13 +967,13 @@ impl SyncManager { fn on_single_data_column_response( &mut self, - id: Id, + id: DataColumnsByRootRequestId, peer_id: PeerId, data_column: RpcEvent>>, ) { - if let Some((requester, resp)) = self - .network - .on_data_columns_by_root_response(id, data_column) + if let Some((requester, resp)) = + self.network + .on_data_columns_by_root_response(id, peer_id, data_column) { match requester { DataColumnsByRootRequester::Sampling(id) => { @@ -970,8 +984,29 @@ impl SyncManager { self.on_sampling_result(requester, result) } } - DataColumnsByRootRequester::Custody => { - todo!("TODO(das): handle custody requests"); + DataColumnsByRootRequester::Custody(id) => { + if let Some((requester, custody_columns)) = + self.network.on_custody_by_root_response(id, peer_id, resp) + { + // TODO(das): get proper timestamp + let seen_timestamp = timestamp_now(); + match requester { + CustodyRequester::Lookup(id) => self + .block_lookups + .on_download_response::>( + id.lookup_id, + custody_columns.map(|(columns, peer_group)| { + (columns, peer_group, seen_timestamp) + }), + &mut self.network, + ), + CustodyRequester::RangeSync(_) => { + // TODO(das): this line should be unreachable, no mechanism to make + // custody requests for sync yet + todo!("custody fetching for sync not implemented"); + } + } + } } } } diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 0c80945e6e1..313d94d4575 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -1,6 +1,8 @@ //! Provides network functionality for the Syncing thread. This fundamentally wraps a network //! channel and stores a global RPC ID to perform requests. +pub use self::custody::CustodyId; +use self::custody::{ActiveCustodyRequest, CustodyRequester, Error as CustodyRequestError}; use self::requests::{ ActiveBlobsByRootRequest, ActiveBlocksByRootRequest, ActiveDataColumnsByRootRequest, }; @@ -9,7 +11,8 @@ pub use self::requests::{ }; use super::block_sidecar_coupling::BlocksAndBlobsRequestInfo; use super::manager::{ - BlockProcessType, DataColumnsByRootRequester, Id, RequestId as SyncRequestId, + BlockProcessType, DataColumnsByRootRequestId, DataColumnsByRootRequester, Id, + RequestId as SyncRequestId, }; use super::range_sync::{BatchId, ByRangeRequestType, ChainId}; use crate::network_beacon_processor::NetworkBeaconProcessor; @@ -18,6 +21,7 @@ use crate::status::ToStatusMessage; use crate::sync::block_lookups::SingleLookupId; use crate::sync::manager::SingleLookupReqId; use beacon_chain::block_verification_types::RpcBlock; +use beacon_chain::data_column_verification::CustodyDataColumn; use beacon_chain::validator_monitor::timestamp_now; use beacon_chain::{BeaconChain, BeaconChainTypes, EngineState}; use fnv::FnvHashMap; @@ -38,6 +42,7 @@ use types::{ SignedBeaconBlock, }; +pub mod custody; mod requests; pub struct BlocksAndBlobsByRangeResponse { @@ -66,18 +71,11 @@ pub enum RpcEvent { pub type RpcProcessingResult = Result<(T, Duration), LookupFailure>; +#[derive(Debug)] pub enum LookupFailure { RpcError(RPCError), LookupVerifyError(LookupVerifyError), -} - -impl std::fmt::Display for LookupFailure { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - match self { - LookupFailure::RpcError(e) => write!(f, "RPC Error: {:?}", e), - LookupFailure::LookupVerifyError(e) => write!(f, "Lookup Verify Error: {:?}", e), - } - } + CustodyRequestError(CustodyRequestError), } impl From for LookupFailure { @@ -92,6 +90,23 @@ impl From for LookupFailure { } } +#[derive(Clone, Debug)] +pub struct PeerGroup { + peers: Vec, +} + +impl PeerGroup { + pub fn from_single(peer: PeerId) -> Self { + Self { peers: vec![peer] } + } + pub fn from_set(peers: Vec) -> Self { + Self { peers } + } + pub fn all(&self) -> &[PeerId] { + &self.peers + } +} + /// Wraps a Network channel to employ various RPC related network functionality for the Sync manager. This includes management of a global RPC request Id. pub struct SyncNetworkContext { /// The network channel to relay messages to the Network service. @@ -105,8 +120,12 @@ pub struct SyncNetworkContext { /// A mapping of active BlobsByRoot requests, including both current slot and parent lookups. blobs_by_root_requests: FnvHashMap>, - data_columns_by_root_requests: - FnvHashMap>, + data_columns_by_root_requests: FnvHashMap< + DataColumnsByRootRequestId, + ActiveDataColumnsByRootRequest, + >, + /// Mapping of active custody column requests for a block root + custody_by_root_requests: FnvHashMap>, /// BlocksByRange requests paired with BlobsByRange range_blocks_and_blobs_requests: @@ -129,6 +148,7 @@ pub struct SyncNetworkContext { pub enum BlockOrBlob { Block(Option>>), Blob(Option>>), + CustodyColumns(Option>), } impl From>>> for BlockOrBlob { @@ -157,6 +177,7 @@ impl SyncNetworkContext { blocks_by_root_requests: <_>::default(), blobs_by_root_requests: <_>::default(), data_columns_by_root_requests: <_>::default(), + custody_by_root_requests: <_>::default(), range_blocks_and_blobs_requests: FnvHashMap::default(), network_beacon_processor, chain, @@ -321,6 +342,7 @@ impl SyncNetworkContext { match block_or_blob { BlockOrBlob::Block(maybe_block) => info.add_block_response(maybe_block), BlockOrBlob::Blob(maybe_sidecar) => info.add_sidecar_response(maybe_sidecar), + BlockOrBlob::CustodyColumns(column) => info.add_custody_column(column), } if info.is_finished() { // If the request is finished, dequeue everything @@ -456,8 +478,7 @@ impl SyncNetworkContext { peer_id: PeerId, request: DataColumnsByRootSingleBlockRequest, ) -> Result<(), &'static str> { - let id = self.next_id(); - + let req_id = self.next_id(); debug!( self.log, "Sending DataColumnsByRoot Request"; @@ -466,8 +487,9 @@ impl SyncNetworkContext { "indices" => ?request.indices, "peer" => %peer_id, "requester" => ?requester, - "id" => id, + "id" => req_id, ); + let id = DataColumnsByRootRequestId { requester, req_id }; self.send_network_msg(NetworkMessage::SendRequest { peer_id, @@ -481,6 +503,80 @@ impl SyncNetworkContext { Ok(()) } + pub fn custody_lookup_request( + &mut self, + lookup_id: SingleLookupId, + block_root: Hash256, + downloaded_block_expected_data: Option, + ) -> Result { + let expects_data = downloaded_block_expected_data + .or_else(|| { + self.chain + .data_availability_checker + .num_expected_blobs(&block_root) + }) + .map(|n| n > 0) + .unwrap_or_else(|| { + // If we don't about the block being requested, assume block has data + self.chain + .data_availability_checker + .da_check_peerdas_required_for_current_epoch() + }); + + // No data required for this block + if !expects_data { + return Ok(false); + } + + let custody_indexes_imported = self + .chain + .data_availability_checker + .imported_custody_column_indexes(&block_root) + .unwrap_or_default(); + + // TODO(das): figure out how to pass block.slot if we end up doing rotation + let block_epoch = Epoch::new(0); + let custody_indexes_duty = self.network_globals().custody_columns(block_epoch)?; + + // Include only the blob indexes not yet imported (received through gossip) + let custody_indexes_to_fetch = custody_indexes_duty + .into_iter() + .filter(|index| !custody_indexes_imported.contains(index)) + .collect::>(); + + if custody_indexes_to_fetch.is_empty() { + // No indexes required, do not issue any request + return Ok(false); + } + + let id = SingleLookupReqId { + lookup_id, + req_id: self.next_id(), + }; + + debug!( + self.log, + "Starting custody columns request"; + "block_root" => ?block_root, + "indices" => ?custody_indexes_to_fetch, + "id" => ?id + ); + + let requester = CustodyRequester::Lookup(id); + let request = ActiveCustodyRequest::new( + block_root, + requester, + custody_indexes_to_fetch, + self.log.clone(), + ); + + // TODO(das): start request + // Note that you can only send, but not handle a response here + + self.custody_by_root_requests.insert(requester, request); + Ok(true) + } + pub fn is_execution_engine_online(&self) -> bool { self.execution_engine_state == EngineState::Online } @@ -589,6 +685,7 @@ impl SyncNetworkContext { if let Some(data_availability_boundary) = self.chain.data_availability_boundary() { if epoch >= data_availability_boundary { + // TODO(das): After peerdas fork, return `BlocksAndColumns` ByRangeRequestType::BlocksAndBlobs } else { ByRangeRequestType::Blocks @@ -613,6 +710,7 @@ impl SyncNetworkContext { pub fn on_single_block_response( &mut self, request_id: SingleLookupReqId, + peer_id: PeerId, block: RpcEvent>>, ) -> Option>>> { let Entry::Occupied(mut request) = self.blocks_by_root_requests.entry(request_id) else { @@ -639,12 +737,17 @@ impl SyncNetworkContext { Err(e.into()) } }; + + if let Err(ref e) = resp { + self.on_lookup_failure(peer_id, e); + } Some(resp) } pub fn on_single_blob_response( &mut self, request_id: SingleLookupReqId, + peer_id: PeerId, blob: RpcEvent>>, ) -> Option>> { let Entry::Occupied(mut request) = self.blobs_by_root_requests.entry(request_id) else { @@ -676,13 +779,18 @@ impl SyncNetworkContext { Err(e.into()) } }; + + if let Err(ref e) = resp { + self.on_lookup_failure(peer_id, e); + } Some(resp) } #[allow(clippy::type_complexity)] pub fn on_data_columns_by_root_response( &mut self, - id: Id, + id: DataColumnsByRootRequestId, + peer_id: PeerId, item: RpcEvent>>, ) -> Option<( DataColumnsByRootRequester, @@ -716,9 +824,54 @@ impl SyncNetworkContext { Err(e.into()) } }; + + if let Err(ref e) = resp { + self.on_lookup_failure(peer_id, e); + } Some((requester, resp)) } + /// Insert a downloaded column into an active custody request. Then make progress on the + /// entire request. + /// + /// ### Returns + /// + /// - `Some`: Request completed, won't make more progress. Expect requester to act on the result. + /// - `None`: Request still active, requester should do no action + #[allow(clippy::type_complexity)] + pub fn on_custody_by_root_response( + &mut self, + id: CustodyId, + peer_id: PeerId, + resp: RpcProcessingResult>>>, + ) -> Option<( + CustodyRequester, + Result<(Vec>, PeerGroup), LookupFailure>, + )> { + // Note: need to remove the request to borrow self again below. Otherwise we can't + // do nested requests + let Some(mut request) = self.custody_by_root_requests.remove(&id.id) else { + // TOOD(das): This log can happen if the request is error'ed early and dropped + debug!(self.log, "Custody column downloaded event for unknown request"; "id" => ?id); + return None; + }; + + let result = request + .on_data_column_downloaded(peer_id, id.column_index, resp, self) + .map_err(LookupFailure::CustodyRequestError) + .transpose(); + + // Convert a result from internal format of `ActiveCustodyRequest` (error first to use ?) to + // an Option first to use in an `if let Some() { act on result }` block. + if let Some(result) = result { + debug!(self.log, "Custody request completed, removing"; "id" => ?id, "result" => ?result); + Some((id.id, result)) + } else { + self.custody_by_root_requests.insert(id.id, request); + None + } + } + pub fn send_block_for_processing( &self, block_root: Hash256, @@ -781,6 +934,53 @@ impl SyncNetworkContext { } } } + + pub fn send_custody_columns_for_processing( + &self, + block_root: Hash256, + custody_columns: Vec>, + duration: Duration, + process_type: BlockProcessType, + ) -> Result<(), &'static str> { + match self.beacon_processor_if_enabled() { + Some(beacon_processor) => { + debug!(self.log, "Sending custody columns for processing"; "block" => ?block_root, "process_type" => ?process_type); + if let Err(e) = beacon_processor.send_rpc_custody_columns( + block_root, + custody_columns, + duration, + process_type, + ) { + error!( + self.log, + "Failed to send sync custody columns to processor"; + "error" => ?e + ); + Err("beacon processor send failure") + } else { + Ok(()) + } + } + None => { + trace!(self.log, "Dropping custody columns ready for processing. Beacon processor not available"; "block_root" => %block_root); + Err("beacon processor unavailable") + } + } + } + + /// Downscore peers for lookup errors that originate from sync + pub fn on_lookup_failure(&self, peer_id: PeerId, err: &LookupFailure) { + match err { + // RPCErros are downscored in the network handler + LookupFailure::RpcError(_) => {} + // Only downscore lookup verify errors. RPC errors are downscored in the network handler. + LookupFailure::LookupVerifyError(e) => { + self.report_peer(peer_id, PeerAction::LowToleranceError, e.into()); + } + // CustodyRequestError are downscored in the each data_columns_by_root request + LookupFailure::CustodyRequestError(_) => {} + } + } } fn to_fixed_blob_sidecar_list( diff --git a/beacon_node/network/src/sync/network_context/custody.rs b/beacon_node/network/src/sync/network_context/custody.rs new file mode 100644 index 00000000000..36c842ae227 --- /dev/null +++ b/beacon_node/network/src/sync/network_context/custody.rs @@ -0,0 +1,283 @@ +use crate::sync::manager::{Id, SingleLookupReqId}; + +use self::request::ActiveColumnSampleRequest; +use beacon_chain::data_column_verification::CustodyDataColumn; +use beacon_chain::BeaconChainTypes; +use fnv::FnvHashMap; +use lighthouse_network::PeerId; +use slog::{debug, warn}; +use std::{marker::PhantomData, sync::Arc}; +use types::{data_column_sidecar::ColumnIndex, DataColumnSidecar, Epoch, Hash256}; + +use super::{PeerGroup, RpcProcessingResult, SyncNetworkContext}; + +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub struct CustodyId { + pub id: CustodyRequester, + pub column_index: ColumnIndex, +} + +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +pub enum CustodyRequester { + Lookup(SingleLookupReqId), + RangeSync(Id), +} + +type DataColumnSidecarList = Vec>>; + +pub struct ActiveCustodyRequest { + block_root: Hash256, + block_epoch: Epoch, + requester_id: CustodyRequester, + column_requests: FnvHashMap, + columns: Vec>, + /// Logger for the `SyncNetworkContext`. + pub log: slog::Logger, + _phantom: PhantomData, +} + +#[derive(Debug)] +pub enum Error { + SendFailed(&'static str), + TooManyFailures, + BadState(String), + NoPeers(ColumnIndex), +} + +impl ActiveCustodyRequest { + pub(crate) fn new( + block_root: Hash256, + requester_id: CustodyRequester, + column_indexes: Vec, + log: slog::Logger, + ) -> Self { + Self { + block_root, + // TODO(das): use actual epoch if there's rotation + block_epoch: Epoch::new(0), + requester_id, + column_requests: column_indexes + .into_iter() + .map(|index| (index, ActiveColumnSampleRequest::new(index))) + .collect(), + columns: vec![], + log, + _phantom: PhantomData, + } + } + + /// Insert a downloaded column into an active sampling request. Then make progress on the + /// entire request. + /// + /// ### Returns + /// + /// - `Err`: Sampling request has failed and will be dropped + /// - `Ok(Some)`: Sampling request has successfully completed and will be dropped + /// - `Ok(None)`: Sampling request still active + pub(crate) fn on_data_column_downloaded( + &mut self, + _peer_id: PeerId, + column_index: ColumnIndex, + resp: RpcProcessingResult>, + cx: &mut SyncNetworkContext, + ) -> Result>, PeerGroup)>, Error> { + // TODO(das): Should downscore peers for verify errors here + + let Some(request) = self.column_requests.get_mut(&column_index) else { + warn!( + self.log, + "Received sampling response for unrequested column index" + ); + return Ok(None); + }; + + match resp { + Ok((mut data_columns, _seen_timestamp)) => { + debug!(self.log, "Sample download success"; "block_root" => %self.block_root, "column_index" => column_index, "count" => data_columns.len()); + + // No need to check data_columns has len > 1, as the SyncNetworkContext ensure that + // only requested is returned (or none); + if let Some(data_column) = data_columns.pop() { + request.on_download_success()?; + + // If on_download_success is successful, we are expecting a columna for this + // custody requirement. + self.columns + .push(CustodyDataColumn::from_asserted_custody(data_column)); + } else { + // Peer does not have the requested data. + // TODO(das) what to do? + debug!(self.log, "Sampling peer claims to not have the data"; "block_root" => %self.block_root, "column_index" => column_index); + // TODO(das) tolerate this failure if you are not sure the block has data + request.on_download_success()?; + } + } + Err(err) => { + debug!(self.log, "Sample download error"; "block_root" => %self.block_root, "column_index" => column_index, "error" => ?err); + + // Error downloading, maybe penalize peer and retry again. + // TODO(das) with different peer or different peer? + request.on_download_error()?; + } + }; + + self.continue_requests(cx) + } + + pub(crate) fn continue_requests( + &mut self, + cx: &mut SyncNetworkContext, + ) -> Result>, PeerGroup)>, Error> { + // First check if sampling is completed, by computing `required_successes` + let mut successes = 0; + + for request in self.column_requests.values() { + if request.is_downloaded() { + successes += 1; + } + } + + // All requests have completed successfully. We may not have all the expected columns if the + // serving peers claim that this block has no data. + if successes == self.column_requests.len() { + let columns = std::mem::take(&mut self.columns); + + let peers = self + .column_requests + .values() + .filter_map(|r| r.peer()) + .collect::>(); + let peer_group = PeerGroup::from_set(peers); + + return Ok(Some((columns, peer_group))); + } + + for (_, request) in self.column_requests.iter_mut() { + request.request(self.block_root, self.block_epoch, self.requester_id, cx)?; + } + + Ok(None) + } +} + +mod request { + use super::{CustodyId, CustodyRequester, Error}; + use crate::sync::{ + manager::DataColumnsByRootRequester, + network_context::{DataColumnsByRootSingleBlockRequest, SyncNetworkContext}, + }; + use beacon_chain::BeaconChainTypes; + use lighthouse_network::PeerId; + use types::{data_column_sidecar::ColumnIndex, Epoch, Hash256}; + + /// TODO(das): this attempt count is nested into the existing lookup request count. + const MAX_CUSTODY_COLUMN_DOWNLOAD_ATTEMPTS: usize = 3; + + pub(crate) struct ActiveColumnSampleRequest { + column_index: ColumnIndex, + status: Status, + download_failures: usize, + } + + #[derive(Debug, Clone)] + enum Status { + NotStarted, + Downloading(PeerId), + Downloaded(PeerId), + } + + impl ActiveColumnSampleRequest { + pub(crate) fn new(column_index: ColumnIndex) -> Self { + Self { + column_index, + status: Status::NotStarted, + download_failures: 0, + } + } + + pub(crate) fn is_downloaded(&self) -> bool { + match self.status { + Status::NotStarted | Status::Downloading(_) => false, + Status::Downloaded(_) => true, + } + } + + pub(crate) fn peer(&self) -> Option { + match self.status { + Status::NotStarted | Status::Downloading(_) => None, + Status::Downloaded(peer) => Some(peer), + } + } + + pub(crate) fn request( + &mut self, + block_root: Hash256, + block_epoch: Epoch, + requester: CustodyRequester, + cx: &mut SyncNetworkContext, + ) -> Result { + match &self.status { + Status::NotStarted => {} // Ok to continue + Status::Downloading(_) => return Ok(false), // Already downloading + Status::Downloaded(_) => return Ok(false), // Already completed + } + + if self.download_failures > MAX_CUSTODY_COLUMN_DOWNLOAD_ATTEMPTS { + return Err(Error::TooManyFailures); + } + + // TODO: When is a fork and only a subset of your peers know about a block, sampling should only + // be queried on the peers on that fork. Should this case be handled? How to handle it? + let peer_ids = cx.get_custodial_peers(block_epoch, self.column_index); + + // TODO(das) randomize custodial peer and avoid failing peers + let Some(peer_id) = peer_ids.first().cloned() else { + // Do not tolerate not having custody peers, hard error. + // TODO(das): we might implement some grace period. The request will pause for X + // seconds expecting the peer manager to find peers before failing the request. + return Err(Error::NoPeers(self.column_index)); + }; + + cx.data_column_lookup_request( + DataColumnsByRootRequester::Custody(CustodyId { + id: requester, + column_index: self.column_index, + }), + peer_id, + DataColumnsByRootSingleBlockRequest { + block_root, + indices: vec![self.column_index], + }, + ) + .map_err(Error::SendFailed)?; + + self.status = Status::Downloading(peer_id); + Ok(true) + } + + pub(crate) fn on_download_error(&mut self) -> Result { + match self.status.clone() { + Status::Downloading(peer_id) => { + self.download_failures += 1; + self.status = Status::NotStarted; + Ok(peer_id) + } + other => Err(Error::BadState(format!( + "bad state on_sampling_error expected Sampling got {other:?}" + ))), + } + } + + pub(crate) fn on_download_success(&mut self) -> Result<(), Error> { + match &self.status { + Status::Downloading(peer) => { + self.status = Status::Downloaded(*peer); + Ok(()) + } + other => Err(Error::BadState(format!( + "bad state on_sampling_success expected Sampling got {other:?}" + ))), + } + } + } +} diff --git a/beacon_node/network/src/sync/range_sync/batch.rs b/beacon_node/network/src/sync/range_sync/batch.rs index 75cb49d176d..273d3248e16 100644 --- a/beacon_node/network/src/sync/range_sync/batch.rs +++ b/beacon_node/network/src/sync/range_sync/batch.rs @@ -19,6 +19,7 @@ const MAX_BATCH_PROCESSING_ATTEMPTS: u8 = 3; #[derive(Debug, Copy, Clone, Display)] #[strum(serialize_all = "snake_case")] pub enum ByRangeRequestType { + BlocksAndColumns, BlocksAndBlobs, Blocks, } diff --git a/beacon_node/network/src/sync/sampling.rs b/beacon_node/network/src/sync/sampling.rs index 9ee8616daac..84ebe69928a 100644 --- a/beacon_node/network/src/sync/sampling.rs +++ b/beacon_node/network/src/sync/sampling.rs @@ -267,7 +267,7 @@ impl ActiveSamplingRequest { }; debug!(self.log, "Sending data_column for verification"; "block" => ?self.block_root, "column_index" => column_index); - if let Err(e) = beacon_processor.send_rpc_data_columns( + if let Err(e) = beacon_processor.send_rpc_validate_data_columns( self.block_root, vec![data_column], seen_timestamp, @@ -288,7 +288,7 @@ impl ActiveSamplingRequest { } } Err(err) => { - debug!(self.log, "Sample download error"; "block_root" => %self.block_root, "column_index" => column_index, "error" => %err); + debug!(self.log, "Sample download error"; "block_root" => %self.block_root, "column_index" => column_index, "error" => ?err); metrics::inc_counter_vec(&metrics::SAMPLE_DOWNLOAD_RESULT, &[metrics::FAILURE]); // Error downloading, maybe penalize peer and retry again.