Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed May 8, 2024
1 parent e57222b commit 8705eda
Show file tree
Hide file tree
Showing 13 changed files with 119 additions and 169 deletions.
4 changes: 2 additions & 2 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3403,8 +3403,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.process_availability(slot, availability).await
}

/// Checks if the provided blobs can make any cached blocks available, and imports immediately
/// if so, otherwise caches the blob in the data availability checker.
/// Checks if the provided columns can make any cached blocks available, and imports immediately
/// if so, otherwise caches the columns in the data availability checker.
async fn check_rpc_custody_columns_availability_and_import(
self: &Arc<Self>,
slot: Slot,
Expand Down
5 changes: 3 additions & 2 deletions beacon_node/beacon_chain/src/block_verification_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,9 @@ impl<E: EthSpec> RpcBlock<E> {
let block_root = block_root.unwrap_or_else(|| get_block_root(&block));

if let Ok(block_commitments) = block.message().body().blob_kzg_commitments() {
if custody_columns.len() != block_commitments.len() {
return Err(AvailabilityCheckError::MissingBlobs);
// The number of required custody columns is out of scope here.
if block_commitments.len() > 0 && custody_columns.len() == 0 {
return Err(AvailabilityCheckError::MissingCustodyColumns);
}
}
// Treat empty blob lists as if they are missing.
Expand Down
43 changes: 21 additions & 22 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,16 +424,27 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
/// `None` if the `Deneb` fork is disabled.
pub fn data_availability_boundary(&self) -> Option<Epoch> {
self.spec.deneb_fork_epoch.and_then(|fork_epoch| {
self.slot_clock
.now()
.map(|slot| slot.epoch(T::EthSpec::slots_per_epoch()))
.map(|current_epoch| {
std::cmp::max(
fork_epoch,
current_epoch
.saturating_sub(self.spec.min_epochs_for_blob_sidecars_requests),
)
})
self.slot_clock.now().map(|current_slot| {
std::cmp::max(
fork_epoch,
current_slot
.epoch(T::EthSpec::slots_per_epoch())
.saturating_sub(self.spec.min_epochs_for_blob_sidecars_requests),
)
})
})
}

pub fn data_availability_boundary_peerdas(&self) -> Option<Epoch> {
self.spec.peer_das_epoch.and_then(|fork_epoch| {
self.slot_clock.now().map(|current_slot| {
std::cmp::max(
fork_epoch,
current_slot
.epoch(T::EthSpec::slots_per_epoch())
.saturating_sub(self.spec.min_epochs_for_blob_sidecars_requests),
)
})
})
}

Expand All @@ -443,18 +454,6 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
.map_or(false, |da_epoch| block_epoch >= da_epoch)
}

pub fn da_check_required_for_current_epoch(&self) -> bool {
let Some(current_slot) = self.slot_clock.now_or_genesis() else {
error!(
self.log,
"Failed to read slot clock when checking for missing blob ids"
);
return false;
};

self.da_check_required_for_epoch(current_slot.epoch(T::EthSpec::slots_per_epoch()))
}

/// Returns `true` if the current epoch is greater than or equal to the `Deneb` epoch.
pub fn is_deneb(&self) -> bool {
self.slot_clock.now().map_or(false, |slot| {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub enum Error {
Unexpected,
SszTypes(ssz_types::Error),
MissingBlobs,
MissingCustodyColumns,
BlobIndexInvalid(u64),
DataColumnIndexInvalid(u64),
StoreError(store::Error),
Expand All @@ -38,6 +39,7 @@ impl Error {
Error::KzgNotInitialized
| Error::SszTypes(_)
| Error::MissingBlobs
| Error::MissingCustodyColumns
| Error::StoreError(_)
| Error::DecodeError(_)
| Error::Unexpected
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/network/src/network_beacon_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,8 +480,8 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
})
}

/// Create a new `Work` event for some blobs, where the result from computation (if any) is
/// sent to the other side of `result_tx`.
/// Create a new `Work` event for some custody columns. `process_rpc_custody_columns` reports
/// the result back to sync.
pub fn send_rpc_custody_columns(
self: &Arc<Self>,
block_root: Hash256,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
debug!(
self.log,
"Block components retrieved";
"result" => "imported block and blobs",
"result" => "imported block and custody columns",
"block_hash" => %hash,
);
self.chain.recompute_head_at_current_slot().await;
Expand All @@ -336,14 +336,14 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
Err(BlockError::BlockIsAlreadyKnown(_)) => {
debug!(
self.log,
"Blobs have already been imported";
"Custody columns have already been imported";
"block_hash" => %block_root,
);
}
Err(e) => {
warn!(
self.log,
"Error when importing rpc blobs";
"Error when importing rpc custody columns";
"error" => ?e,
"block_hash" => %block_root,
);
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/network/src/sync/block_lookups/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ impl<T: BeaconChainTypes> RequestState<T> for CustodyRequestState<T::EthSpec> {
fn make_request(
&self,
id: Id,
peer_id: PeerId,
_peer_id: PeerId,
downloaded_block_expected_blobs: Option<usize>,
cx: &mut SyncNetworkContext<T>,
) -> Result<bool, LookupRequestError> {
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/network/src/sync/block_lookups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
.iter()
.find(|(_, l)| l.block_root() == block_to_drop)
{
for &peer_id in lookup.all_used_peers() {
for &peer_id in lookup.all_peers() {
cx.report_peer(
peer_id,
PeerAction::LowToleranceError,
Expand Down Expand Up @@ -535,7 +535,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
lookup.continue_requests(cx)?;
}
Action::ParentUnknown { parent_root } => {
let peers = lookup.all_available_peers().cloned().collect::<Vec<_>>();
let peers = lookup.all_peers().cloned().collect::<Vec<_>>();
lookup.set_awaiting_parent(parent_root);
debug!(self.log, "Marking lookup as awaiting parent"; "lookup" => %block_root, "parent_root" => %parent_root);
self.search_parent_of_child(parent_root, block_root, &peers, cx);
Expand Down
71 changes: 15 additions & 56 deletions beacon_node/network/src/sync/block_lookups/single_block_lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ use crate::sync::block_lookups::Id;
use crate::sync::network_context::{PeerGroup, SyncNetworkContext};
use beacon_chain::data_column_verification::CustodyDataColumn;
use beacon_chain::BeaconChainTypes;
use itertools::Itertools;
use rand::seq::IteratorRandom;
use std::collections::HashSet;
use std::fmt::Debug;
use std::sync::Arc;
Expand Down Expand Up @@ -35,6 +33,8 @@ pub struct SingleBlockLookup<T: BeaconChainTypes> {
pub custody_request_state: CustodyRequestState<T::EthSpec>,
block_root: Hash256,
awaiting_parent: Option<Hash256>,
/// Peers that claim to have imported this block or blob.
peers: HashSet<PeerId>,
}

impl<T: BeaconChainTypes> SingleBlockLookup<T> {
Expand All @@ -51,6 +51,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
custody_request_state: CustodyRequestState::new(requested_block_root, peers),
block_root: requested_block_root,
awaiting_parent,
peers: peers.iter().copied().collect(),
}
}

Expand Down Expand Up @@ -97,22 +98,9 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
self.block_root() == block_root
}

/// Get all unique used peers across block and blob requests.
pub fn all_used_peers(&self) -> impl Iterator<Item = &PeerId> + '_ {
self.block_request_state
.state
.get_used_peers()
.chain(self.blob_request_state.state.get_used_peers())
.unique()
}

/// Get all unique available peers across block and blob requests.
pub fn all_available_peers(&self) -> impl Iterator<Item = &PeerId> + '_ {
self.block_request_state
.state
.get_available_peers()
.chain(self.blob_request_state.state.get_available_peers())
.unique()
/// Get all unique peers across block and blob requests.
pub fn all_peers(&self) -> impl Iterator<Item = &PeerId> + '_ {
self.peers.iter()
}

pub fn continue_requests(
Expand All @@ -122,6 +110,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
// TODO: Check what's necessary to download, specially for blobs
self.continue_request::<BlockRequestState<T::EthSpec>>(cx)?;
self.continue_request::<BlobRequestState<T::EthSpec>>(cx)?;
self.continue_request::<CustodyRequestState<T::EthSpec>>(cx)?;
Ok(())
}

Expand All @@ -148,8 +137,7 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {

/// Add all given peers to both block and blob request states.
pub fn add_peer(&mut self, peer_id: PeerId) {
self.block_request_state.state.add_peer(&peer_id);
self.blob_request_state.state.add_peer(&peer_id);
self.peers.insert(peer_id);
}

/// Add all given peers to both block and blob request states.
Expand All @@ -163,16 +151,17 @@ impl<T: BeaconChainTypes> SingleBlockLookup<T> {
pub fn both_components_processed(&self) -> bool {
self.block_request_state.state.is_processed()
&& self.blob_request_state.state.is_processed()
&& self.custody_request_state.state.is_processed()
}

/// Checks both the block and blob request states to see if the peer is disconnected.
///
/// Returns true if the lookup should be dropped.
pub fn should_drop_lookup_on_disconnected_peer(&mut self, peer_id: &PeerId) -> bool {
self.block_request_state.state.remove_peer(peer_id);
self.blob_request_state.state.remove_peer(peer_id);

if self.all_available_peers().count() == 0 {
// TODO(das): this condition is not correct. We may have a request with the block already
// processed but missing custody columns fetching. Custody peers are tracked somewhere else,
// so we should let a request retry fail this lookup
if self.peers.len() == 0 {
return true;
}

Expand Down Expand Up @@ -250,10 +239,6 @@ pub enum State<T: Clone> {
pub struct SingleLookupRequestState<T: Clone> {
/// State of this request.
state: State<T>,
/// Peers that should have this block or blob.
available_peers: HashSet<PeerId>,
/// Peers from which we have requested this block.
used_peers: HashSet<PeerId>,
/// How many times have we attempted to process this block or blob.
failed_processing: u8,
/// How many times have we attempted to download this block or blob.
Expand All @@ -262,15 +247,8 @@ pub struct SingleLookupRequestState<T: Clone> {

impl<T: Clone> SingleLookupRequestState<T> {
pub fn new(peers: &[PeerId]) -> Self {
let mut available_peers = HashSet::default();
for peer in peers.iter().copied() {
available_peers.insert(peer);
}

Self {
state: State::AwaitingDownload,
available_peers,
used_peers: HashSet::default(),
failed_processing: 0,
failed_downloading: 0,
}
Expand Down Expand Up @@ -438,32 +416,13 @@ impl<T: Clone> SingleLookupRequestState<T> {
self.failed_processing >= self.failed_downloading
}

/// This method should be used for peers wrapped in `PeerId::BlockAndBlobs`.
pub fn add_peer(&mut self, peer_id: &PeerId) {
self.available_peers.insert(*peer_id);
}

/// If a peer disconnects, this request could be failed. If so, an error is returned
pub fn remove_peer(&mut self, disconnected_peer_id: &PeerId) {
self.available_peers.remove(disconnected_peer_id);
}

pub fn get_used_peers(&self) -> impl Iterator<Item = &PeerId> {
self.used_peers.iter()
}

pub fn get_available_peers(&self) -> impl Iterator<Item = &PeerId> {
self.available_peers.iter()
}

/// Selects a random peer from available peers if any, inserts it in used peers and returns it.
pub fn use_rand_available_peer(&mut self) -> Option<PeerId> {
/// Selects a random peer from available peers if any.
pub fn use_rand_available_peer(&self) -> Option<PeerId> {
let peer_id = self
.available_peers
.iter()
.choose(&mut rand::thread_rng())
.copied()?;
self.used_peers.insert(peer_id);
Some(peer_id)
}
}
Expand Down
12 changes: 12 additions & 0 deletions beacon_node/network/src/sync/block_lookups/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1471,6 +1471,18 @@ fn sampling_with_retries() {
r.expect_clean_finished_sampling();
}

#[test]
fn custody_lookup_happy_path() {
let Some(mut r) = TestRig::test_setup_after_deneb() else {
return;
};
r.new_connected_peers(100); // Add enough sampling peers
let (block, data_columns) = r.rand_block_and_data_columns();
let block_root = block.canonical_root();
let peer_id = r.new_connected_peer();
r.trigger_unknown_block_from_attestation(block_root, peer_id);
}

// TODO(das): Test retries of DataColumnByRoot:
// - Expect request for column_index
// - Respond with bad data
Expand Down
21 changes: 7 additions & 14 deletions beacon_node/network/src/sync/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,8 @@
use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart};
use super::block_lookups::BlockLookups;
use super::network_context::LookupFailure;
use super::network_context::{
custody::{Custody, CustodyRequester},
BlockOrBlob, CustodyId, RangeRequestId, RpcEvent, SyncNetworkContext,
custody::CustodyRequester, BlockOrBlob, CustodyId, RangeRequestId, RpcEvent, SyncNetworkContext,
};
use super::peer_sync_info::{remote_sync_type, PeerSyncType};
use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH};
Expand Down Expand Up @@ -247,7 +245,6 @@ pub struct SyncManager<T: BeaconChainTypes> {
block_lookups: BlockLookups<T>,

sampling: Sampling<T>,
custody: Custody<T>,

/// The logger for the import manager.
log: Logger,
Expand Down Expand Up @@ -307,7 +304,6 @@ impl<T: BeaconChainTypes> SyncManager<T> {
backfill_sync: BackFillSync::new(beacon_chain.clone(), network_globals, log.clone()),
block_lookups: BlockLookups::new(log.clone()),
sampling: Sampling::new(sampling_config, log.clone()),
custody: Custody::new(log.clone()),
log: log.clone(),
}
}
Expand Down Expand Up @@ -989,9 +985,8 @@ impl<T: BeaconChainTypes> SyncManager<T> {
}
}
DataColumnsByRootRequester::Custody(id) => {
if let Some((requester, columns)) =
self.custody
.on_data_column_downloaded(id, peer_id, resp, &mut self.network)
if let Some((requester, custody_columns)) =
self.network.on_custody_by_root_response(id, peer_id, resp)
{
// TODO(das): get proper timestamp
let seen_timestamp = timestamp_now();
Expand All @@ -1000,14 +995,12 @@ impl<T: BeaconChainTypes> SyncManager<T> {
.block_lookups
.on_download_response::<CustodyRequestState<T::EthSpec>>(
id.lookup_id,
columns
.map(|(columns, peer_group)| {
(columns, peer_group, seen_timestamp)
})
.map_err(LookupFailure::CustodyRequestError),
custody_columns.map(|(columns, peer_group)| {
(columns, peer_group, seen_timestamp)
}),
&mut self.network,
),
CustodyRequester::RangeSync(id) => {
CustodyRequester::RangeSync(_) => {
// TODO(das): this line should be unreachable, no mechanism to make
// custody requests for sync yet
todo!("custody fetching for sync not implemented");
Expand Down
Loading

0 comments on commit 8705eda

Please sign in to comment.