Skip to content

Commit

Permalink
Fix data columns not persisting for PeerDAS due to a getBlobs race …
Browse files Browse the repository at this point in the history
…condition.
  • Loading branch information
jimmygchen committed Jan 6, 2025
1 parent f51a292 commit 44f8add
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 48 deletions.
48 changes: 24 additions & 24 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ use store::{
DatabaseBlock, Error as DBError, HotColdDB, KeyValueStore, KeyValueStoreOp, StoreItem, StoreOp,
};
use task_executor::{ShutdownReason, TaskExecutor};
use tokio::sync::mpsc::Receiver;
use tokio::sync::oneshot;
use tokio_stream::Stream;
use tree_hash::TreeHash;
use types::blob_sidecar::FixedBlobSidecarList;
Expand Down Expand Up @@ -3086,7 +3086,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
slot: Slot,
block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>,
data_column_recv: Option<Receiver<DataColumnSidecarList<T::EthSpec>>>,
data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
// If this block has already been imported to forkchoice it must have been available, so
// we don't need to process its blobs again.
Expand Down Expand Up @@ -3214,7 +3214,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
};

let r = self
.process_availability(slot, availability, None, || Ok(()))
.process_availability(slot, availability, || Ok(()))
.await;
self.remove_notified(&block_root, r)
.map(|availability_processing_status| {
Expand Down Expand Up @@ -3342,7 +3342,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

match executed_block {
ExecutedBlock::Available(block) => {
self.import_available_block(Box::new(block), None).await
self.import_available_block(Box::new(block)).await
}
ExecutedBlock::AvailabilityPending(block) => {
self.check_block_availability_and_import(block).await
Expand Down Expand Up @@ -3474,7 +3474,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let availability = self
.data_availability_checker
.put_pending_executed_block(block)?;
self.process_availability(slot, availability, None, || Ok(()))
self.process_availability(slot, availability, || Ok(()))
.await
}

Expand All @@ -3490,7 +3490,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
let availability = self.data_availability_checker.put_gossip_blob(blob)?;

self.process_availability(slot, availability, None, || Ok(()))
self.process_availability(slot, availability, || Ok(()))
.await
}

Expand All @@ -3513,7 +3513,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.data_availability_checker
.put_gossip_data_columns(block_root, data_columns)?;

self.process_availability(slot, availability, None, publish_fn)
self.process_availability(slot, availability, publish_fn)
.await
}

Expand Down Expand Up @@ -3557,7 +3557,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.data_availability_checker
.put_rpc_blobs(block_root, blobs)?;

self.process_availability(slot, availability, None, || Ok(()))
self.process_availability(slot, availability, || Ok(()))
.await
}

Expand All @@ -3566,14 +3566,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
slot: Slot,
block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>,
data_column_recv: Option<Receiver<DataColumnSidecarList<T::EthSpec>>>,
data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
self.check_blobs_for_slashability(block_root, &blobs)?;
let availability = self
.data_availability_checker
.put_engine_blobs(block_root, blobs)?;
let availability =
self.data_availability_checker
.put_engine_blobs(block_root, blobs, data_column_recv)?;

self.process_availability(slot, availability, data_column_recv, || Ok(()))
self.process_availability(slot, availability, || Ok(()))
.await
}

Expand Down Expand Up @@ -3613,7 +3613,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.data_availability_checker
.put_rpc_custody_columns(block_root, custody_columns)?;

self.process_availability(slot, availability, None, || Ok(()))
self.process_availability(slot, availability, || Ok(()))
.await
}

Expand All @@ -3625,14 +3625,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self: &Arc<Self>,
slot: Slot,
availability: Availability<T::EthSpec>,
recv: Option<Receiver<DataColumnSidecarList<T::EthSpec>>>,
publish_fn: impl FnOnce() -> Result<(), BlockError>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
match availability {
Availability::Available(block) => {
publish_fn()?;
// Block is fully available, import into fork choice
self.import_available_block(block, recv).await
self.import_available_block(block).await
}
Availability::MissingComponents(block_root) => Ok(
AvailabilityProcessingStatus::MissingComponents(slot, block_root),
Expand All @@ -3643,7 +3642,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub async fn import_available_block(
self: &Arc<Self>,
block: Box<AvailableExecutedBlock<T::EthSpec>>,
data_column_recv: Option<Receiver<DataColumnSidecarList<T::EthSpec>>>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
let AvailableExecutedBlock {
block,
Expand All @@ -3658,6 +3656,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
parent_eth1_finalization_data,
confirmed_state_roots,
consensus_context,
data_column_recv,
} = import_data;

// Record the time at which this block's blobs became available.
Expand Down Expand Up @@ -3724,7 +3723,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
parent_block: SignedBlindedBeaconBlock<T::EthSpec>,
parent_eth1_finalization_data: Eth1FinalizationData,
mut consensus_context: ConsensusContext<T::EthSpec>,
data_column_recv: Option<Receiver<DataColumnSidecarList<T::EthSpec>>>,
data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>>,
) -> Result<Hash256, BlockError> {
// ----------------------------- BLOCK NOT YET ATTESTABLE ----------------------------------
// Everything in this initial section is on the hot path between processing the block and
Expand Down Expand Up @@ -3895,22 +3894,23 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// TODO(das) we currently store all subnet sampled columns. Tracking issue to exclude non
// custody columns: https://github.com/sigp/lighthouse/issues/6465
let custody_columns_count = self.data_availability_checker.get_sampling_column_count();
// if block is made available via blobs, dropped the data columns.
let data_columns = data_columns.filter(|columns| columns.len() == custody_columns_count);
// if block is made available via blobs and `data_columns` is either `None` or incomplete, dropped the data columns.
let maybe_all_data_columns =
data_columns.filter(|columns| columns.len() == custody_columns_count);

let data_columns = match (data_columns, data_column_recv) {
let data_columns_to_persist = match (maybe_all_data_columns, data_column_recv) {
// If the block was made available via custody columns received from gossip / rpc, use them
// since we already have them.
(Some(columns), _) => Some(columns),
// Otherwise, it means blobs were likely available via fetching from EL, in this case we
// wait for the data columns to be computed (blocking).
(None, Some(mut data_column_recv)) => {
(None, Some(data_column_recv)) => {
let _column_recv_timer =
metrics::start_timer(&metrics::BLOCK_PROCESSING_DATA_COLUMNS_WAIT);
// Unable to receive data columns from sender, sender is either dropped or
// failed to compute data columns from blobs. We restore fork choice here and
// return to avoid inconsistency in database.
if let Some(columns) = data_column_recv.blocking_recv() {
if let Ok(columns) = data_column_recv.blocking_recv() {
Some(columns)
} else {
let err_msg = "Did not receive data columns from sender";
Expand Down Expand Up @@ -3952,7 +3952,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

if let Some(data_columns) = data_columns {
if let Some(data_columns) = data_columns_to_persist {
// TODO(das): `available_block includes all sampled columns, but we only need to store
// custody columns. To be clarified in spec.
if !data_columns.is_empty() {
Expand Down
1 change: 1 addition & 0 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1677,6 +1677,7 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
parent_eth1_finalization_data,
confirmed_state_roots,
consensus_context,
data_column_recv: None,
},
payload_verification_handle,
})
Expand Down
11 changes: 8 additions & 3 deletions beacon_node/beacon_chain/src/block_verification_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ use ssz_types::VariableList;
use state_processing::ConsensusContext;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use tokio::sync::oneshot;
use types::blob_sidecar::{BlobIdentifier, FixedBlobSidecarList};
use types::{
BeaconBlockRef, BeaconState, BlindedPayload, BlobSidecarList, ChainSpec, Epoch, EthSpec,
Hash256, RuntimeVariableList, SignedBeaconBlock, SignedBeaconBlockHeader, Slot,
BeaconBlockRef, BeaconState, BlindedPayload, BlobSidecarList, ChainSpec, DataColumnSidecarList,
Epoch, EthSpec, Hash256, RuntimeVariableList, SignedBeaconBlock, SignedBeaconBlockHeader, Slot,
};

/// A block that has been received over RPC. It has 2 internal variants:
Expand Down Expand Up @@ -355,14 +356,17 @@ impl<E: EthSpec> AvailabilityPendingExecutedBlock<E> {
}
}

#[derive(Debug, PartialEq)]
#[derive(Debug, Derivative)]
#[derivative(PartialEq)]
pub struct BlockImportData<E: EthSpec> {
pub block_root: Hash256,
pub state: BeaconState<E>,
pub parent_block: SignedBeaconBlock<E, BlindedPayload<E>>,
pub parent_eth1_finalization_data: Eth1FinalizationData,
pub confirmed_state_roots: Vec<Hash256>,
pub consensus_context: ConsensusContext<E>,
#[derivative(PartialEq = "ignore")]
pub data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<E>>>,
}

impl<E: EthSpec> BlockImportData<E> {
Expand All @@ -381,6 +385,7 @@ impl<E: EthSpec> BlockImportData<E> {
},
confirmed_state_roots: vec![],
consensus_context: ConsensusContext::new(Slot::new(0)),
data_column_recv: None,
}
}
}
Expand Down
14 changes: 10 additions & 4 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use std::num::NonZeroUsize;
use std::sync::Arc;
use std::time::Duration;
use task_executor::TaskExecutor;
use tokio::sync::oneshot;
use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList};
use types::{
BlobSidecarList, ChainSpec, DataColumnIdentifier, DataColumnSidecar, DataColumnSidecarList,
Expand Down Expand Up @@ -220,7 +221,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
.map_err(AvailabilityCheckError::InvalidBlobs)?;

self.availability_cache
.put_kzg_verified_blobs(block_root, verified_blobs, &self.log)
.put_kzg_verified_blobs(block_root, verified_blobs, None, &self.log)
}

/// Put a list of custody columns received via RPC into the availability cache. This performs KZG
Expand Down Expand Up @@ -260,6 +261,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
&self,
block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>,
data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>>,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
let seen_timestamp = self
.slot_clock
Expand All @@ -269,8 +271,12 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
let verified_blobs =
KzgVerifiedBlobList::from_verified(blobs.iter().flatten().cloned(), seen_timestamp);

self.availability_cache
.put_kzg_verified_blobs(block_root, verified_blobs, &self.log)
self.availability_cache.put_kzg_verified_blobs(
block_root,
verified_blobs,
data_column_recv,
&self.log,
)
}

/// Check if we've cached other blobs for this block. If it completes a set and we also
Expand All @@ -285,6 +291,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
self.availability_cache.put_kzg_verified_blobs(
gossip_blob.block_root(),
vec![gossip_blob.into_inner()],
None,
&self.log,
)
}
Expand Down Expand Up @@ -801,7 +808,6 @@ impl<E: EthSpec> AvailableBlock<E> {
block,
blobs,
data_columns,
blobs_available_timestamp: _,
..
} = self;
(block_root, block, blobs, data_columns)
Expand Down
Loading

0 comments on commit 44f8add

Please sign in to comment.