Skip to content

Commit

Permalink
WIP multiple custody requests in lookup struct
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed May 12, 2024
1 parent 1514c17 commit ddda406
Show file tree
Hide file tree
Showing 7 changed files with 274 additions and 562 deletions.
85 changes: 61 additions & 24 deletions beacon_node/network/src/sync/block_lookups/common.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
use crate::sync::block_lookups::single_block_lookup::{
LookupError, SingleBlockLookup, SingleLookupRequestState,
};
use crate::sync::block_lookups::{BlobRequestState, BlockRequestState, PeerId};
use crate::sync::manager::{BlockProcessType, Id, SLOT_IMPORT_TOLERANCE};
use crate::sync::network_context::{ReqId, SyncNetworkContext};
use crate::sync::block_lookups::{
BlobRequestState, BlockRequestState, CustodyColumnRequestState, PeerId,
};
use crate::sync::manager::{
BlockProcessType, DataColumnsByRootRequester, Id, SLOT_IMPORT_TOLERANCE,
};
use crate::sync::network_context::{
DataColumnsByRootSingleBlockRequest, ReqId, SyncNetworkContext,
};
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::data_column_verification::CustodyDataColumn;
use beacon_chain::BeaconChainTypes;
use std::sync::Arc;
use types::blob_sidecar::FixedBlobSidecarList;
use types::SignedBeaconBlock;
use types::{blob_sidecar::FixedBlobSidecarList, ColumnIndex, Epoch, SignedBeaconBlock};

use super::single_block_lookup::DownloadResult;
use super::SingleLookupId;

use super::single_block_lookup::CustodyRequestState;

#[derive(Debug, Copy, Clone)]
pub enum ResponseType {
Block,
Expand Down Expand Up @@ -56,6 +59,7 @@ pub trait RequestState<T: BeaconChainTypes> {
/// Send the response to the beacon processor.
fn send_for_processing(
id: Id,
component_index: usize,
result: DownloadResult<Self::VerifiedResponseType>,
cx: &SyncNetworkContext<T>,
) -> Result<(), LookupError>;
Expand All @@ -66,7 +70,10 @@ pub trait RequestState<T: BeaconChainTypes> {
fn response_type() -> ResponseType;

/// A getter for the `BlockRequestState` or `BlobRequestState` associated with this trait.
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> &mut Self;
fn request_state_mut(
request: &mut SingleBlockLookup<T>,
component_index: usize,
) -> Option<&mut Self>;

/// A getter for a reference to the `SingleLookupRequestState` associated with this trait.
fn get_state(&self) -> &SingleLookupRequestState<Self::VerifiedResponseType>;
Expand All @@ -82,7 +89,7 @@ impl<T: BeaconChainTypes> RequestState<T> for BlockRequestState<T::EthSpec> {
&self,
id: SingleLookupId,
peer_id: PeerId,
_: Option<usize>,
_downloaded_block_expected_blobs: Option<usize>,
cx: &mut SyncNetworkContext<T>,
) -> Result<Option<ReqId>, LookupError> {
cx.block_lookup_request(id, peer_id, self.requested_block_root)
Expand All @@ -91,6 +98,7 @@ impl<T: BeaconChainTypes> RequestState<T> for BlockRequestState<T::EthSpec> {

fn send_for_processing(
id: SingleLookupId,
_component_index: usize,
download_result: DownloadResult<Self::VerifiedResponseType>,
cx: &SyncNetworkContext<T>,
) -> Result<(), LookupError> {
Expand All @@ -112,8 +120,8 @@ impl<T: BeaconChainTypes> RequestState<T> for BlockRequestState<T::EthSpec> {
fn response_type() -> ResponseType {
ResponseType::Block
}
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> &mut Self {
&mut request.block_request_state
fn request_state_mut(request: &mut SingleBlockLookup<T>, _: usize) -> Option<&mut Self> {
Some(&mut request.block_request_state)
}
fn get_state(&self) -> &SingleLookupRequestState<Self::VerifiedResponseType> {
&self.state
Expand Down Expand Up @@ -144,6 +152,7 @@ impl<T: BeaconChainTypes> RequestState<T> for BlobRequestState<T::EthSpec> {

fn send_for_processing(
id: Id,
_component_index: usize,
download_result: DownloadResult<Self::VerifiedResponseType>,
cx: &SyncNetworkContext<T>,
) -> Result<(), LookupError> {
Expand All @@ -165,8 +174,8 @@ impl<T: BeaconChainTypes> RequestState<T> for BlobRequestState<T::EthSpec> {
fn response_type() -> ResponseType {
ResponseType::Blob
}
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> &mut Self {
&mut request.blob_request_state
fn request_state_mut(request: &mut SingleBlockLookup<T>, _: usize) -> Option<&mut Self> {
Some(&mut request.blob_request_state)
}
fn get_state(&self) -> &SingleLookupRequestState<Self::VerifiedResponseType> {
&self.state
Expand All @@ -176,27 +185,49 @@ impl<T: BeaconChainTypes> RequestState<T> for BlobRequestState<T::EthSpec> {
}
}

impl<T: BeaconChainTypes> RequestState<T> for CustodyRequestState<T::EthSpec> {
type VerifiedResponseType = Vec<CustodyDataColumn<T::EthSpec>>;
impl<T: BeaconChainTypes> RequestState<T> for CustodyColumnRequestState<T::EthSpec> {
type VerifiedResponseType = CustodyDataColumn<T::EthSpec>;

fn make_request(
&self,
id: Id,
// TODO(das): consider selecting peers that have custody but are in this set
_peer_id: PeerId,
downloaded_block_expected_blobs: Option<usize>,
_downloaded_block_expected_blobs: Option<usize>,
cx: &mut SyncNetworkContext<T>,
) -> Result<Option<ReqId>, LookupError> {
cx.custody_lookup_request(
id,
self.block_root,
downloaded_block_expected_blobs.map(|n| n > 0),
// TODO: Ensure that requests are not sent when the block is:
// - not yet in PeerDAS epoch
// - outside da_boundary (maybe not necessary to check)

// TODO(das): no rotation
let block_epoch = Epoch::new(0);
// TODO: When is a fork and only a subset of your peers know about a block, sampling should only
// be queried on the peers on that fork. Should this case be handled? How to handle it?
let peer_ids = cx.get_custodial_peers(block_epoch, self.column_index);

// TODO(das) randomize custodial peer and avoid failing peers
let Some(peer_id) = peer_ids.first().cloned() else {
// Allow custody lookups to have zero peers. If no peers are found to have custody after
// some time the lookup is dropped.
return Ok(None);
};

cx.data_column_lookup_request(
DataColumnsByRootRequester::Custody(id, self.column_index),
peer_id,
DataColumnsByRootSingleBlockRequest {
block_root: self.block_root,
indices: vec![self.column_index],
},
)
.map(Some)
.map_err(LookupError::SendFailed)
}

fn send_for_processing(
id: Id,
component_index: usize,
download_result: DownloadResult<Self::VerifiedResponseType>,
cx: &SyncNetworkContext<T>,
) -> Result<(), LookupError> {
Expand All @@ -208,18 +239,24 @@ impl<T: BeaconChainTypes> RequestState<T> for CustodyRequestState<T::EthSpec> {
} = download_result;
cx.send_custody_columns_for_processing(
block_root,
value,
// TODO(das): might be inneficient to send columns one by one
vec![value],
seen_timestamp,
BlockProcessType::SingleCustodyColumn(id),
BlockProcessType::SingleCustodyColumn(id, component_index as ColumnIndex),
)
.map_err(LookupError::SendFailed)
}

fn response_type() -> ResponseType {
ResponseType::CustodyColumn
}
fn request_state_mut(request: &mut SingleBlockLookup<T>) -> &mut Self {
&mut request.custody_request_state
fn request_state_mut(
request: &mut SingleBlockLookup<T>,
component_index: usize,
) -> Option<&mut Self> {
request
.custody_columns_requests
.get_mut(&(component_index as u64))
}
fn get_state(&self) -> &SingleLookupRequestState<Self::VerifiedResponseType> {
&self.state
Expand Down
92 changes: 59 additions & 33 deletions beacon_node/network/src/sync/block_lookups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,20 @@ use crate::metrics;
use crate::sync::block_lookups::common::{ResponseType, PARENT_DEPTH_TOLERANCE};
use crate::sync::block_lookups::parent_chain::find_oldest_fork_ancestor;
use crate::sync::manager::{Id, SingleLookupReqId};
use crate::sync::network_context::custody::CustodyRequestError;
use beacon_chain::block_verification_types::AsBlock;
use beacon_chain::data_availability_checker::AvailabilityCheckErrorCategory;
use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError};
pub use common::RequestState;
use fnv::FnvHashMap;
use lighthouse_network::{PeerAction, PeerId};
use lru_cache::LRUTimeCache;
pub use single_block_lookup::{BlobRequestState, BlockRequestState, CustodyRequestState};
pub use single_block_lookup::{BlobRequestState, BlockRequestState, CustodyColumnRequestState};
use slog::{debug, error, warn, Logger};
use std::collections::hash_map::Entry;
use std::sync::Arc;
use std::time::Duration;
use store::Hash256;
use types::{BlobSidecar, EthSpec, SignedBeaconBlock};
use types::{BlobSidecar, Epoch, EthSpec, SignedBeaconBlock};

pub mod common;
pub mod parent_chain;
Expand All @@ -36,7 +35,7 @@ pub const SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS: u8 = 4;
pub enum LookupRequestError {
BlockRequestError(RpcByRootRequestError),
BlobRequestError(RpcByRootRequestError),
CustodyRequestError(CustodyRequestError),
CustodyRequestError(RpcByRootRequestError),
}

pub enum BlockComponent<E: EthSpec> {
Expand Down Expand Up @@ -268,9 +267,23 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
}
}

// TODO(das): If there's rotation we need to fetch the block before knowing which columns to
// custody.
let block_epoch = Epoch::new(0);
let custody_column_indexes = cx
.network_globals()
.custody_columns(block_epoch)
.expect("TODO: parse custody value on start-up");

// If we know that this lookup has unknown parent (is awaiting a parent lookup to resolve),
// signal here to hold processing downloaded data.
let mut lookup = SingleBlockLookup::new(block_root, peers, cx.next_id(), awaiting_parent);
let mut lookup = SingleBlockLookup::new(
block_root,
peers,
cx.next_id(),
awaiting_parent,
custody_column_indexes,
);

let msg = if block_component.is_some() {
"Searching for components of a block with unknown parent"
Expand Down Expand Up @@ -313,32 +326,33 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
pub fn on_download_response<R: RequestState<T>>(
&mut self,
id: SingleLookupReqId,
component_index: usize,
response: Result<(R::VerifiedResponseType, PeerGroup, Duration), LookupRequestError>,
cx: &mut SyncNetworkContext<T>,
) {
let result = self.on_download_response_inner::<R>(id, response, cx);
let result = self.on_download_response_inner::<R>(id, component_index, response, cx);
self.on_lookup_result(id.lookup_id, result, "download_response", cx);
}

/// Process a block or blob response received from a single lookup request.
pub fn on_download_response_inner<R: RequestState<T>>(
&mut self,
id: SingleLookupReqId,
component_index: usize,
response: Result<(R::VerifiedResponseType, PeerGroup, Duration), LookupRequestError>,
cx: &mut SyncNetworkContext<T>,
) -> Result<LookupResult, LookupError> {
// Note: do not downscore peers here for requests errors, SyncNetworkContext does it.

let response_type = R::response_type();
let Some(lookup) = self.single_block_lookups.get_mut(&id.lookup_id) else {
// We don't have the ability to cancel in-flight RPC requests. So this can happen
// if we started this RPC request, and later saw the block/blobs via gossip.
debug!(self.log, "Block returned for single block lookup not present"; "id" => id.lookup_id);
return Err(LookupError::UnknownLookup);
};
let lookup = self
.single_block_lookups
.get_mut(&id.lookup_id)
.ok_or(LookupError::UnknownLookup)?;

let block_root = lookup.block_root();
let request_state = R::request_state_mut(lookup).get_state_mut();
let request = R::request_state_mut(lookup, component_index)
.ok_or(LookupError::UnknownComponentIndex(component_index))?;
let request_state = request.get_state_mut();

match response {
Ok((response, peer_group, seen_timestamp)) => {
Expand All @@ -347,7 +361,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
"block_root" => ?block_root,
"id" => ?id,
"peer_group" => ?peer_group,
"response_type" => ?response_type,
"component" => format!("{:?}({})", R::response_type(), component_index),
);

// Register the download peer here. Once we have received some data over the wire we
Expand All @@ -371,7 +385,7 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
"Received lookup download failure";
"block_root" => ?block_root,
"id" => ?id,
"response_type" => ?response_type,
"component" => format!("{:?}({})", R::response_type(), component_index),
"error" => ?e,
);

Expand Down Expand Up @@ -404,38 +418,45 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
result: BlockProcessingResult<T::EthSpec>,
cx: &mut SyncNetworkContext<T>,
) {
let lookup_result = match process_type {
BlockProcessType::SingleBlock { id } => {
self.on_processing_result_inner::<BlockRequestState<T::EthSpec>>(id, result, cx)
}
BlockProcessType::SingleBlob { id } => {
self.on_processing_result_inner::<BlobRequestState<T::EthSpec>>(id, result, cx)
}
BlockProcessType::SingleCustodyColumn(id) => {
self.on_processing_result_inner::<CustodyRequestState<T::EthSpec>>(id, result, cx)
}
};
let lookup_result =
match process_type {
BlockProcessType::SingleBlock { id } => self
.on_processing_result_inner::<BlockRequestState<T::EthSpec>>(id, 0, result, cx),
BlockProcessType::SingleBlob { id } => self
.on_processing_result_inner::<BlobRequestState<T::EthSpec>>(id, 0, result, cx),
BlockProcessType::SingleCustodyColumn(id, component_index) => {
self.on_processing_result_inner::<CustodyColumnRequestState<T::EthSpec>>(
id,
component_index as usize,
result,
cx,
)
}
};
self.on_lookup_result(process_type.id(), lookup_result, "processing_result", cx);
}

pub fn on_processing_result_inner<R: RequestState<T>>(
&mut self,
lookup_id: SingleLookupId,
component_index: usize,
result: BlockProcessingResult<T::EthSpec>,
cx: &mut SyncNetworkContext<T>,
) -> Result<LookupResult, LookupError> {
let Some(lookup) = self.single_block_lookups.get_mut(&lookup_id) else {
debug!(self.log, "Unknown single block lookup"; "id" => lookup_id);
return Err(LookupError::UnknownLookup);
};
let lookup = self
.single_block_lookups
.get_mut(&lookup_id)
.ok_or(LookupError::UnknownLookup)?;

let block_root = lookup.block_root();
let request_state = R::request_state_mut(lookup).get_state_mut();
let request = R::request_state_mut(lookup, component_index)
.ok_or(LookupError::UnknownComponentIndex(component_index))?;
let request_state = request.get_state_mut();

debug!(
self.log,
"Received lookup processing result";
"component" => ?R::response_type(),
"component" => format!("{:?}({})", R::response_type(), component_index),
"block_root" => ?block_root,
"id" => lookup_id,
"result" => ?result,
Expand Down Expand Up @@ -463,6 +484,11 @@ impl<T: BeaconChainTypes> BlockLookups<T> {
if lookup.all_components_processed() {
return Err(LookupError::MissingComponentsAfterAllProcessed);
} else {
// Trigger reconstruction if node has more than 50% of columns. Reconstruction
// is expenseive so it should be handled as a separate work event. The
// result of the reconstruction must be imported and published to gossip.
// The result of a reconstruction will be an import block components

Action::Retry
}
}
Expand Down
Loading

0 comments on commit ddda406

Please sign in to comment.