Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix data columns not persisting for PeerDAS due to a getBlobs race condition #6756

Merged
merged 5 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 104 additions & 79 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ use store::{
KeyValueStoreOp, StoreItem, StoreOp,
};
use task_executor::{ShutdownReason, TaskExecutor};
use tokio::sync::mpsc::Receiver;
use tokio::sync::oneshot;
use tokio_stream::Stream;
use tree_hash::TreeHash;
use types::blob_sidecar::FixedBlobSidecarList;
Expand Down Expand Up @@ -3088,7 +3088,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
slot: Slot,
block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>,
data_column_recv: Option<Receiver<DataColumnSidecarList<T::EthSpec>>>,
data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
// If this block has already been imported to forkchoice it must have been available, so
// we don't need to process its blobs again.
Expand Down Expand Up @@ -3216,7 +3216,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
};

let r = self
.process_availability(slot, availability, None, || Ok(()))
.process_availability(slot, availability, || Ok(()))
.await;
self.remove_notified(&block_root, r)
.map(|availability_processing_status| {
Expand Down Expand Up @@ -3344,7 +3344,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

match executed_block {
ExecutedBlock::Available(block) => {
self.import_available_block(Box::new(block), None).await
self.import_available_block(Box::new(block)).await
}
ExecutedBlock::AvailabilityPending(block) => {
self.check_block_availability_and_import(block).await
Expand Down Expand Up @@ -3476,7 +3476,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let availability = self
.data_availability_checker
.put_pending_executed_block(block)?;
self.process_availability(slot, availability, None, || Ok(()))
self.process_availability(slot, availability, || Ok(()))
.await
}

Expand All @@ -3492,7 +3492,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
let availability = self.data_availability_checker.put_gossip_blob(blob)?;

self.process_availability(slot, availability, None, || Ok(()))
self.process_availability(slot, availability, || Ok(()))
.await
}

Expand All @@ -3515,7 +3515,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.data_availability_checker
.put_gossip_data_columns(block_root, data_columns)?;

self.process_availability(slot, availability, None, publish_fn)
self.process_availability(slot, availability, publish_fn)
.await
}

Expand Down Expand Up @@ -3559,7 +3559,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.data_availability_checker
.put_rpc_blobs(block_root, blobs)?;

self.process_availability(slot, availability, None, || Ok(()))
self.process_availability(slot, availability, || Ok(()))
.await
}

Expand All @@ -3568,14 +3568,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
slot: Slot,
block_root: Hash256,
blobs: FixedBlobSidecarList<T::EthSpec>,
data_column_recv: Option<Receiver<DataColumnSidecarList<T::EthSpec>>>,
data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
self.check_blobs_for_slashability(block_root, &blobs)?;
let availability = self
.data_availability_checker
.put_engine_blobs(block_root, blobs)?;
let availability =
self.data_availability_checker
.put_engine_blobs(block_root, blobs, data_column_recv)?;

self.process_availability(slot, availability, data_column_recv, || Ok(()))
self.process_availability(slot, availability, || Ok(()))
.await
}

Expand Down Expand Up @@ -3615,7 +3615,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.data_availability_checker
.put_rpc_custody_columns(block_root, custody_columns)?;

self.process_availability(slot, availability, None, || Ok(()))
self.process_availability(slot, availability, || Ok(()))
.await
}

Expand All @@ -3627,14 +3627,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self: &Arc<Self>,
slot: Slot,
availability: Availability<T::EthSpec>,
recv: Option<Receiver<DataColumnSidecarList<T::EthSpec>>>,
publish_fn: impl FnOnce() -> Result<(), BlockError>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
match availability {
Availability::Available(block) => {
publish_fn()?;
// Block is fully available, import into fork choice
self.import_available_block(block, recv).await
self.import_available_block(block).await
}
Availability::MissingComponents(block_root) => Ok(
AvailabilityProcessingStatus::MissingComponents(slot, block_root),
Expand All @@ -3645,7 +3644,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub async fn import_available_block(
self: &Arc<Self>,
block: Box<AvailableExecutedBlock<T::EthSpec>>,
data_column_recv: Option<Receiver<DataColumnSidecarList<T::EthSpec>>>,
) -> Result<AvailabilityProcessingStatus, BlockError> {
let AvailableExecutedBlock {
block,
Expand All @@ -3660,6 +3658,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
parent_eth1_finalization_data,
confirmed_state_roots,
consensus_context,
data_column_recv,
} = import_data;

// Record the time at which this block's blobs became available.
Expand Down Expand Up @@ -3726,7 +3725,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
parent_block: SignedBlindedBeaconBlock<T::EthSpec>,
parent_eth1_finalization_data: Eth1FinalizationData,
mut consensus_context: ConsensusContext<T::EthSpec>,
data_column_recv: Option<Receiver<DataColumnSidecarList<T::EthSpec>>>,
data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>>,
) -> Result<Hash256, BlockError> {
// ----------------------------- BLOCK NOT YET ATTESTABLE ----------------------------------
// Everything in this initial section is on the hot path between processing the block and
Expand Down Expand Up @@ -3894,44 +3893,32 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// end up with blocks in fork choice that are missing from disk.
// See https://github.com/sigp/lighthouse/issues/2028
let (_, signed_block, blobs, data_columns) = signed_block.deconstruct();
// TODO(das) we currently store all subnet sampled columns. Tracking issue to exclude non
// custody columns: https://github.com/sigp/lighthouse/issues/6465
let custody_columns_count = self.data_availability_checker.get_sampling_column_count();
// if block is made available via blobs, dropped the data columns.
let data_columns = data_columns.filter(|columns| columns.len() == custody_columns_count);

let data_columns = match (data_columns, data_column_recv) {
// If the block was made available via custody columns received from gossip / rpc, use them
// since we already have them.
(Some(columns), _) => Some(columns),
// Otherwise, it means blobs were likely available via fetching from EL, in this case we
// wait for the data columns to be computed (blocking).
(None, Some(mut data_column_recv)) => {
let _column_recv_timer =
metrics::start_timer(&metrics::BLOCK_PROCESSING_DATA_COLUMNS_WAIT);
// Unable to receive data columns from sender, sender is either dropped or
// failed to compute data columns from blobs. We restore fork choice here and
// return to avoid inconsistency in database.
if let Some(columns) = data_column_recv.blocking_recv() {
Some(columns)
} else {
let err_msg = "Did not receive data columns from sender";
error!(
self.log,
"Failed to store data columns into the database";
"msg" => "Restoring fork choice from disk",
"error" => err_msg,
);
return Err(self
.handle_import_block_db_write_error(fork_choice)
.err()
.unwrap_or(BlockError::InternalError(err_msg.to_string())));
}

match self.get_blobs_or_columns_store_op(
block_root,
signed_block.epoch(),
blobs,
data_columns,
data_column_recv,
) {
Ok(Some(blobs_or_columns_store_op)) => {
ops.push(blobs_or_columns_store_op);
}
// No data columns present and compute data columns task was not spawned.
// Could either be no blobs in the block or before PeerDAS activation.
(None, None) => None,
};
Ok(None) => {}
Err(e) => {
error!(
self.log,
"Failed to store data columns into the database";
"msg" => "Restoring fork choice from disk",
"error" => &e,
"block_root" => ?block_root
);
return Err(self
.handle_import_block_db_write_error(fork_choice)
.err()
.unwrap_or(BlockError::InternalError(e)));
}
}

let block = signed_block.message();
let db_write_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_DB_WRITE);
Expand All @@ -3943,30 +3930,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
ops.push(StoreOp::PutBlock(block_root, signed_block.clone()));
ops.push(StoreOp::PutState(block.state_root(), &state));

if let Some(blobs) = blobs {
if !blobs.is_empty() {
debug!(
self.log, "Writing blobs to store";
"block_root" => %block_root,
"count" => blobs.len(),
);
ops.push(StoreOp::PutBlobs(block_root, blobs));
}
}

if let Some(data_columns) = data_columns {
// TODO(das): `available_block includes all sampled columns, but we only need to store
// custody columns. To be clarified in spec.
if !data_columns.is_empty() {
debug!(
self.log, "Writing data_columns to store";
"block_root" => %block_root,
"count" => data_columns.len(),
);
ops.push(StoreOp::PutDataColumns(block_root, data_columns));
}
}

let txn_lock = self.store.hot_db.begin_rw_transaction();

if let Err(e) = self.store.do_atomically_with_block_and_blobs_cache(ops) {
Expand Down Expand Up @@ -7184,6 +7147,68 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
reqresp_pre_import_cache_len: self.reqresp_pre_import_cache.read().len(),
}
}

fn get_blobs_or_columns_store_op(
&self,
block_root: Hash256,
block_epoch: Epoch,
blobs: Option<BlobSidecarList<T::EthSpec>>,
data_columns: Option<DataColumnSidecarList<T::EthSpec>>,
data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<T::EthSpec>>>,
) -> Result<Option<StoreOp<T::EthSpec>>, String> {
if self.spec.is_peer_das_enabled_for_epoch(block_epoch) {
// TODO(das) we currently store all subnet sampled columns. Tracking issue to exclude non
// custody columns: https://github.com/sigp/lighthouse/issues/6465
let custody_columns_count = self.data_availability_checker.get_sampling_column_count();

let custody_columns_available = data_columns
.as_ref()
.as_ref()
.is_some_and(|columns| columns.len() == custody_columns_count);

let data_columns_to_persist = if custody_columns_available {
// If the block was made available via custody columns received from gossip / rpc, use them
// since we already have them.
data_columns
} else if let Some(data_column_recv) = data_column_recv {
// Blobs were available from the EL, in this case we wait for the data columns to be computed (blocking).
let _column_recv_timer =
metrics::start_timer(&metrics::BLOCK_PROCESSING_DATA_COLUMNS_WAIT);
// Unable to receive data columns from sender, sender is either dropped or
// failed to compute data columns from blobs. We restore fork choice here and
// return to avoid inconsistency in database.
let computed_data_columns = data_column_recv
.blocking_recv()
.map_err(|e| format!("Did not receive data columns from sender: {e:?}"))?;
Some(computed_data_columns)
} else {
// No blobs in the block.
None
};

if let Some(data_columns) = data_columns_to_persist {
if !data_columns.is_empty() {
debug!(
self.log, "Writing data_columns to store";
"block_root" => %block_root,
"count" => data_columns.len(),
);
return Ok(Some(StoreOp::PutDataColumns(block_root, data_columns)));
}
}
} else if let Some(blobs) = blobs {
if !blobs.is_empty() {
debug!(
self.log, "Writing blobs to store";
"block_root" => %block_root,
"count" => blobs.len(),
);
return Ok(Some(StoreOp::PutBlobs(block_root, blobs)));
}
}

Ok(None)
}
}

impl<T: BeaconChainTypes> Drop for BeaconChain<T> {
Expand Down
1 change: 1 addition & 0 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1677,6 +1677,7 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
parent_eth1_finalization_data,
confirmed_state_roots,
consensus_context,
data_column_recv: None,
},
payload_verification_handle,
})
Expand Down
15 changes: 12 additions & 3 deletions beacon_node/beacon_chain/src/block_verification_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ use derivative::Derivative;
use state_processing::ConsensusContext;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use tokio::sync::oneshot;
use types::blob_sidecar::BlobIdentifier;
use types::{
BeaconBlockRef, BeaconState, BlindedPayload, BlobSidecarList, ChainSpec, Epoch, EthSpec,
Hash256, RuntimeVariableList, SignedBeaconBlock, SignedBeaconBlockHeader, Slot,
BeaconBlockRef, BeaconState, BlindedPayload, BlobSidecarList, ChainSpec, DataColumnSidecarList,
Epoch, EthSpec, Hash256, RuntimeVariableList, SignedBeaconBlock, SignedBeaconBlockHeader, Slot,
};

/// A block that has been received over RPC. It has 2 internal variants:
Expand Down Expand Up @@ -337,14 +338,21 @@ impl<E: EthSpec> AvailabilityPendingExecutedBlock<E> {
}
}

#[derive(Debug, PartialEq)]
#[derive(Debug, Derivative)]
#[derivative(PartialEq)]
pub struct BlockImportData<E: EthSpec> {
pub block_root: Hash256,
pub state: BeaconState<E>,
pub parent_block: SignedBeaconBlock<E, BlindedPayload<E>>,
pub parent_eth1_finalization_data: Eth1FinalizationData,
pub confirmed_state_roots: Vec<Hash256>,
pub consensus_context: ConsensusContext<E>,
#[derivative(PartialEq = "ignore")]
dapplion marked this conversation as resolved.
Show resolved Hide resolved
/// An optional receiver for `DataColumnSidecarList`.
///
/// This field is `Some` when data columns are being computed asynchronously.
/// The resulting `DataColumnSidecarList` will be sent through this receiver.
pub data_column_recv: Option<oneshot::Receiver<DataColumnSidecarList<E>>>,
}

impl<E: EthSpec> BlockImportData<E> {
Expand All @@ -363,6 +371,7 @@ impl<E: EthSpec> BlockImportData<E> {
},
confirmed_state_roots: vec![],
consensus_context: ConsensusContext::new(Slot::new(0)),
data_column_recv: None,
}
}
}
Expand Down
Loading
Loading