From 44f8add41ea2252769bb967864af95b3c13af8ca Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Mon, 6 Jan 2025 18:15:14 +1100 Subject: [PATCH] Fix data columns not persisting for PeerDAS due to a `getBlobs` race condition. --- beacon_node/beacon_chain/src/beacon_chain.rs | 48 ++++++++-------- .../beacon_chain/src/block_verification.rs | 1 + .../src/block_verification_types.rs | 11 +++- .../src/data_availability_checker.rs | 14 +++-- .../overflow_lru_cache.rs | 55 +++++++++++++++---- .../state_lru_cache.rs | 1 + beacon_node/beacon_chain/src/fetch_blobs.rs | 8 +-- .../beacon_chain/tests/block_verification.rs | 2 +- 8 files changed, 92 insertions(+), 48 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 80766d57b33..78f4a3ce814 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -120,7 +120,7 @@ use store::{ DatabaseBlock, Error as DBError, HotColdDB, KeyValueStore, KeyValueStoreOp, StoreItem, StoreOp, }; use task_executor::{ShutdownReason, TaskExecutor}; -use tokio::sync::mpsc::Receiver; +use tokio::sync::oneshot; use tokio_stream::Stream; use tree_hash::TreeHash; use types::blob_sidecar::FixedBlobSidecarList; @@ -3086,7 +3086,7 @@ impl BeaconChain { slot: Slot, block_root: Hash256, blobs: FixedBlobSidecarList, - data_column_recv: Option>>, + data_column_recv: Option>>, ) -> Result { // If this block has already been imported to forkchoice it must have been available, so // we don't need to process its blobs again. @@ -3214,7 +3214,7 @@ impl BeaconChain { }; let r = self - .process_availability(slot, availability, None, || Ok(())) + .process_availability(slot, availability, || Ok(())) .await; self.remove_notified(&block_root, r) .map(|availability_processing_status| { @@ -3342,7 +3342,7 @@ impl BeaconChain { match executed_block { ExecutedBlock::Available(block) => { - self.import_available_block(Box::new(block), None).await + self.import_available_block(Box::new(block)).await } ExecutedBlock::AvailabilityPending(block) => { self.check_block_availability_and_import(block).await @@ -3474,7 +3474,7 @@ impl BeaconChain { let availability = self .data_availability_checker .put_pending_executed_block(block)?; - self.process_availability(slot, availability, None, || Ok(())) + self.process_availability(slot, availability, || Ok(())) .await } @@ -3490,7 +3490,7 @@ impl BeaconChain { } let availability = self.data_availability_checker.put_gossip_blob(blob)?; - self.process_availability(slot, availability, None, || Ok(())) + self.process_availability(slot, availability, || Ok(())) .await } @@ -3513,7 +3513,7 @@ impl BeaconChain { .data_availability_checker .put_gossip_data_columns(block_root, data_columns)?; - self.process_availability(slot, availability, None, publish_fn) + self.process_availability(slot, availability, publish_fn) .await } @@ -3557,7 +3557,7 @@ impl BeaconChain { .data_availability_checker .put_rpc_blobs(block_root, blobs)?; - self.process_availability(slot, availability, None, || Ok(())) + self.process_availability(slot, availability, || Ok(())) .await } @@ -3566,14 +3566,14 @@ impl BeaconChain { slot: Slot, block_root: Hash256, blobs: FixedBlobSidecarList, - data_column_recv: Option>>, + data_column_recv: Option>>, ) -> Result { self.check_blobs_for_slashability(block_root, &blobs)?; - let availability = self - .data_availability_checker - .put_engine_blobs(block_root, blobs)?; + let availability = + self.data_availability_checker + .put_engine_blobs(block_root, blobs, data_column_recv)?; - self.process_availability(slot, availability, data_column_recv, || Ok(())) + self.process_availability(slot, availability, || Ok(())) .await } @@ -3613,7 +3613,7 @@ impl BeaconChain { .data_availability_checker .put_rpc_custody_columns(block_root, custody_columns)?; - self.process_availability(slot, availability, None, || Ok(())) + self.process_availability(slot, availability, || Ok(())) .await } @@ -3625,14 +3625,13 @@ impl BeaconChain { self: &Arc, slot: Slot, availability: Availability, - recv: Option>>, publish_fn: impl FnOnce() -> Result<(), BlockError>, ) -> Result { match availability { Availability::Available(block) => { publish_fn()?; // Block is fully available, import into fork choice - self.import_available_block(block, recv).await + self.import_available_block(block).await } Availability::MissingComponents(block_root) => Ok( AvailabilityProcessingStatus::MissingComponents(slot, block_root), @@ -3643,7 +3642,6 @@ impl BeaconChain { pub async fn import_available_block( self: &Arc, block: Box>, - data_column_recv: Option>>, ) -> Result { let AvailableExecutedBlock { block, @@ -3658,6 +3656,7 @@ impl BeaconChain { parent_eth1_finalization_data, confirmed_state_roots, consensus_context, + data_column_recv, } = import_data; // Record the time at which this block's blobs became available. @@ -3724,7 +3723,7 @@ impl BeaconChain { parent_block: SignedBlindedBeaconBlock, parent_eth1_finalization_data: Eth1FinalizationData, mut consensus_context: ConsensusContext, - data_column_recv: Option>>, + data_column_recv: Option>>, ) -> Result { // ----------------------------- BLOCK NOT YET ATTESTABLE ---------------------------------- // Everything in this initial section is on the hot path between processing the block and @@ -3895,22 +3894,23 @@ impl BeaconChain { // TODO(das) we currently store all subnet sampled columns. Tracking issue to exclude non // custody columns: https://github.com/sigp/lighthouse/issues/6465 let custody_columns_count = self.data_availability_checker.get_sampling_column_count(); - // if block is made available via blobs, dropped the data columns. - let data_columns = data_columns.filter(|columns| columns.len() == custody_columns_count); + // if block is made available via blobs and `data_columns` is either `None` or incomplete, dropped the data columns. + let maybe_all_data_columns = + data_columns.filter(|columns| columns.len() == custody_columns_count); - let data_columns = match (data_columns, data_column_recv) { + let data_columns_to_persist = match (maybe_all_data_columns, data_column_recv) { // If the block was made available via custody columns received from gossip / rpc, use them // since we already have them. (Some(columns), _) => Some(columns), // Otherwise, it means blobs were likely available via fetching from EL, in this case we // wait for the data columns to be computed (blocking). - (None, Some(mut data_column_recv)) => { + (None, Some(data_column_recv)) => { let _column_recv_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_DATA_COLUMNS_WAIT); // Unable to receive data columns from sender, sender is either dropped or // failed to compute data columns from blobs. We restore fork choice here and // return to avoid inconsistency in database. - if let Some(columns) = data_column_recv.blocking_recv() { + if let Ok(columns) = data_column_recv.blocking_recv() { Some(columns) } else { let err_msg = "Did not receive data columns from sender"; @@ -3952,7 +3952,7 @@ impl BeaconChain { } } - if let Some(data_columns) = data_columns { + if let Some(data_columns) = data_columns_to_persist { // TODO(das): `available_block includes all sampled columns, but we only need to store // custody columns. To be clarified in spec. if !data_columns.is_empty() { diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index ddb7bb614a3..315105ac2b5 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -1677,6 +1677,7 @@ impl ExecutionPendingBlock { parent_eth1_finalization_data, confirmed_state_roots, consensus_context, + data_column_recv: None, }, payload_verification_handle, }) diff --git a/beacon_node/beacon_chain/src/block_verification_types.rs b/beacon_node/beacon_chain/src/block_verification_types.rs index 420c83081c7..1f3a52cae55 100644 --- a/beacon_node/beacon_chain/src/block_verification_types.rs +++ b/beacon_node/beacon_chain/src/block_verification_types.rs @@ -8,10 +8,11 @@ use ssz_types::VariableList; use state_processing::ConsensusContext; use std::fmt::{Debug, Formatter}; use std::sync::Arc; +use tokio::sync::oneshot; use types::blob_sidecar::{BlobIdentifier, FixedBlobSidecarList}; use types::{ - BeaconBlockRef, BeaconState, BlindedPayload, BlobSidecarList, ChainSpec, Epoch, EthSpec, - Hash256, RuntimeVariableList, SignedBeaconBlock, SignedBeaconBlockHeader, Slot, + BeaconBlockRef, BeaconState, BlindedPayload, BlobSidecarList, ChainSpec, DataColumnSidecarList, + Epoch, EthSpec, Hash256, RuntimeVariableList, SignedBeaconBlock, SignedBeaconBlockHeader, Slot, }; /// A block that has been received over RPC. It has 2 internal variants: @@ -355,7 +356,8 @@ impl AvailabilityPendingExecutedBlock { } } -#[derive(Debug, PartialEq)] +#[derive(Debug, Derivative)] +#[derivative(PartialEq)] pub struct BlockImportData { pub block_root: Hash256, pub state: BeaconState, @@ -363,6 +365,8 @@ pub struct BlockImportData { pub parent_eth1_finalization_data: Eth1FinalizationData, pub confirmed_state_roots: Vec, pub consensus_context: ConsensusContext, + #[derivative(PartialEq = "ignore")] + pub data_column_recv: Option>>, } impl BlockImportData { @@ -381,6 +385,7 @@ impl BlockImportData { }, confirmed_state_roots: vec![], consensus_context: ConsensusContext::new(Slot::new(0)), + data_column_recv: None, } } } diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 72806a74d27..c325010c5af 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -15,6 +15,7 @@ use std::num::NonZeroUsize; use std::sync::Arc; use std::time::Duration; use task_executor::TaskExecutor; +use tokio::sync::oneshot; use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList}; use types::{ BlobSidecarList, ChainSpec, DataColumnIdentifier, DataColumnSidecar, DataColumnSidecarList, @@ -220,7 +221,7 @@ impl DataAvailabilityChecker { .map_err(AvailabilityCheckError::InvalidBlobs)?; self.availability_cache - .put_kzg_verified_blobs(block_root, verified_blobs, &self.log) + .put_kzg_verified_blobs(block_root, verified_blobs, None, &self.log) } /// Put a list of custody columns received via RPC into the availability cache. This performs KZG @@ -260,6 +261,7 @@ impl DataAvailabilityChecker { &self, block_root: Hash256, blobs: FixedBlobSidecarList, + data_column_recv: Option>>, ) -> Result, AvailabilityCheckError> { let seen_timestamp = self .slot_clock @@ -269,8 +271,12 @@ impl DataAvailabilityChecker { let verified_blobs = KzgVerifiedBlobList::from_verified(blobs.iter().flatten().cloned(), seen_timestamp); - self.availability_cache - .put_kzg_verified_blobs(block_root, verified_blobs, &self.log) + self.availability_cache.put_kzg_verified_blobs( + block_root, + verified_blobs, + data_column_recv, + &self.log, + ) } /// Check if we've cached other blobs for this block. If it completes a set and we also @@ -285,6 +291,7 @@ impl DataAvailabilityChecker { self.availability_cache.put_kzg_verified_blobs( gossip_blob.block_root(), vec![gossip_blob.into_inner()], + None, &self.log, ) } @@ -801,7 +808,6 @@ impl AvailableBlock { block, blobs, data_columns, - blobs_available_timestamp: _, .. } = self; (block_root, block, blobs, data_columns) diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 40361574aff..e619767b1da 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -13,28 +13,45 @@ use slog::{debug, Logger}; use ssz_types::FixedVector; use std::num::NonZeroUsize; use std::sync::Arc; +use tokio::sync::oneshot; use types::blob_sidecar::BlobIdentifier; use types::{ - BlobSidecar, ChainSpec, ColumnIndex, DataColumnIdentifier, DataColumnSidecar, Epoch, EthSpec, - Hash256, SignedBeaconBlock, + BlobSidecar, ChainSpec, ColumnIndex, DataColumnIdentifier, DataColumnSidecar, + DataColumnSidecarList, Epoch, EthSpec, Hash256, SignedBeaconBlock, }; /// This represents the components of a partially available block /// /// The blobs are all gossip and kzg verified. /// The block has completed all verifications except the availability check. -/// TODO(das): this struct can potentially be reafactored as blobs and data columns are mutually -/// exclusive and this could simplify `is_importable`. -#[derive(Clone)] pub struct PendingComponents { pub block_root: Hash256, pub verified_blobs: FixedVector>, E::MaxBlobsPerBlock>, pub verified_data_columns: Vec>, pub executed_block: Option>, pub reconstruction_started: bool, + /// Receiver for data columns that are computed asynchronously; + /// + /// If `data_column_recv` is `Some`, it means data column computation or reconstruction has been + /// started. This can happen either via engine blobs fetching or data column reconstruction + /// (triggered when >= 50% columns are received via gossip). + pub data_column_recv: Option>>, } impl PendingComponents { + /// Clones the `PendingComponent` without cloning `data_column_recv`, as `Receiver` is not cloneable. + /// This should only be used when the receiver is no longer needed. + pub fn clone_without_column_recv(&self) -> Self { + PendingComponents { + block_root: self.block_root, + verified_blobs: self.verified_blobs.clone(), + verified_data_columns: self.verified_data_columns.clone(), + executed_block: self.executed_block.clone(), + reconstruction_started: self.reconstruction_started, + data_column_recv: None, + } + } + /// Returns an immutable reference to the cached block. pub fn get_cached_block(&self) -> &Option> { &self.executed_block @@ -247,6 +264,7 @@ impl PendingComponents { verified_data_columns: vec![], executed_block: None, reconstruction_started: false, + data_column_recv: None, } } @@ -271,6 +289,7 @@ impl PendingComponents { verified_blobs, verified_data_columns, executed_block, + data_column_recv, .. } = self; @@ -309,10 +328,12 @@ impl PendingComponents { let AvailabilityPendingExecutedBlock { block, - import_data, + mut import_data, payload_verification_outcome, } = executed_block; + import_data.data_column_recv = data_column_recv; + let available_block = AvailableBlock { block_root, block, @@ -458,6 +479,7 @@ impl DataAvailabilityCheckerInner { &self, block_root: Hash256, kzg_verified_blobs: I, + data_column_recv: Option>>, log: &Logger, ) -> Result, AvailabilityCheckError> { let mut fixed_blobs = FixedVector::default(); @@ -479,8 +501,15 @@ impl DataAvailabilityCheckerInner { // Merge in the blobs. pending_components.merge_blobs(fixed_blobs); + if data_column_recv.is_some() { + // If `data_column_recv` is `Some`, it means we have all the blobs from engine, and have + // started computing data columns. We store the receiver in `PendingComponents` for + // later use when importing the block. + pending_components.data_column_recv = data_column_recv; + } + if pending_components.is_available(self.sampling_column_count, log) { - write_lock.put(block_root, pending_components.clone()); + write_lock.put(block_root, pending_components.clone_without_column_recv()); // No need to hold the write lock anymore drop(write_lock); pending_components.make_available(&self.spec, |diet_block| { @@ -513,7 +542,7 @@ impl DataAvailabilityCheckerInner { pending_components.merge_data_columns(kzg_verified_data_columns)?; if pending_components.is_available(self.sampling_column_count, log) { - write_lock.put(block_root, pending_components.clone()); + write_lock.put(block_root, pending_components.clone_without_column_recv()); // No need to hold the write lock anymore drop(write_lock); pending_components.make_available(&self.spec, |diet_block| { @@ -563,7 +592,7 @@ impl DataAvailabilityCheckerInner { } pending_components.reconstruction_started = true; - ReconstructColumnsDecision::Yes(pending_components.clone()) + ReconstructColumnsDecision::Yes(pending_components.clone_without_column_recv()) } /// This could mean some invalid data columns made it through to the `DataAvailabilityChecker`. @@ -602,7 +631,7 @@ impl DataAvailabilityCheckerInner { // Check if we have all components and entire set is consistent. if pending_components.is_available(self.sampling_column_count, log) { - write_lock.put(block_root, pending_components.clone()); + write_lock.put(block_root, pending_components.clone_without_column_recv()); // No need to hold the write lock anymore drop(write_lock); pending_components.make_available(&self.spec, |diet_block| { @@ -837,6 +866,7 @@ mod test { parent_eth1_finalization_data, confirmed_state_roots: vec![], consensus_context, + data_column_recv: None, }; let payload_verification_outcome = PayloadVerificationOutcome { @@ -939,7 +969,7 @@ mod test { for (blob_index, gossip_blob) in blobs.into_iter().enumerate() { kzg_verified_blobs.push(gossip_blob.into_inner()); let availability = cache - .put_kzg_verified_blobs(root, kzg_verified_blobs.clone(), harness.logger()) + .put_kzg_verified_blobs(root, kzg_verified_blobs.clone(), None, harness.logger()) .expect("should put blob"); if blob_index == blobs_expected - 1 { assert!(matches!(availability, Availability::Available(_))); @@ -965,7 +995,7 @@ mod test { for gossip_blob in blobs { kzg_verified_blobs.push(gossip_blob.into_inner()); let availability = cache - .put_kzg_verified_blobs(root, kzg_verified_blobs.clone(), harness.logger()) + .put_kzg_verified_blobs(root, kzg_verified_blobs.clone(), None, harness.logger()) .expect("should put blob"); assert_eq!( availability, @@ -1219,6 +1249,7 @@ mod pending_components_tests { }, confirmed_state_roots: vec![], consensus_context: ConsensusContext::new(Slot::new(0)), + data_column_recv: None, }, payload_verification_outcome: PayloadVerificationOutcome { payload_verification_status: PayloadVerificationStatus::Verified, diff --git a/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs index 5b9b7c70233..2a2a0431ccb 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/state_lru_cache.rs @@ -136,6 +136,7 @@ impl StateLRUCache { consensus_context: diet_executed_block .consensus_context .into_consensus_context(), + data_column_recv: None, }, payload_verification_outcome: diet_executed_block.payload_verification_outcome, }) diff --git a/beacon_node/beacon_chain/src/fetch_blobs.rs b/beacon_node/beacon_chain/src/fetch_blobs.rs index f740b693fbf..7cb49510981 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs.rs @@ -18,7 +18,7 @@ use slog::{debug, error, o, Logger}; use ssz_types::FixedVector; use state_processing::per_block_processing::deneb::kzg_commitment_to_versioned_hash; use std::sync::Arc; -use tokio::sync::mpsc::Receiver; +use tokio::sync::oneshot; use types::blob_sidecar::{BlobSidecarError, FixedBlobSidecarList}; use types::{ BeaconStateError, BlobSidecar, DataColumnSidecar, DataColumnSidecarList, EthSpec, FullPayload, @@ -212,9 +212,9 @@ fn spawn_compute_and_publish_data_columns_task( blobs: FixedBlobSidecarList, publish_fn: impl Fn(BlobsOrDataColumns) + Send + 'static, log: Logger, -) -> Receiver>>> { +) -> oneshot::Receiver>>> { let chain_cloned = chain.clone(); - let (data_columns_sender, data_columns_receiver) = tokio::sync::mpsc::channel(1); + let (data_columns_sender, data_columns_receiver) = oneshot::channel(); chain.task_executor.spawn_blocking( move || { @@ -247,7 +247,7 @@ fn spawn_compute_and_publish_data_columns_task( } }; - if let Err(e) = data_columns_sender.try_send(all_data_columns.clone()) { + if let Err(e) = data_columns_sender.send(all_data_columns.clone()) { error!(log, "Failed to send computed data columns"; "error" => ?e); }; diff --git a/beacon_node/beacon_chain/tests/block_verification.rs b/beacon_node/beacon_chain/tests/block_verification.rs index f094a173eec..8617cb2a037 100644 --- a/beacon_node/beacon_chain/tests/block_verification.rs +++ b/beacon_node/beacon_chain/tests/block_verification.rs @@ -1726,7 +1726,7 @@ async fn import_execution_pending_block( .unwrap() { ExecutedBlock::Available(block) => chain - .import_available_block(Box::from(block), None) + .import_available_block(Box::from(block)) .await .map_err(|e| format!("{e:?}")), ExecutedBlock::AvailabilityPending(_) => {