From 0ed3f04dcc431baf0467dca23e9587c29c6035a6 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Wed, 26 Jun 2024 10:12:42 +0300 Subject: [PATCH] Block import and verification refactoring (#4844) A few refactorings to block import and block verification that should not be controversial. Block verification before block import is stateless by design as described in https://substrate.stackexchange.com/a/1322/25 and the fact that it wasn't yet I consider to be a bug. Some code that requires it had to use `Mutex`, but I do not expect it to have a measurable performance impact. Similarly with block import checking whether block preconditions should not be an exclusive operation, there is nothing fundamentally wrong with checking a few competing blocks whose parent blocks exist at the same time (and even import them concurrently later, though IIRC this is not yet implemented either). They were originally a part of https://github.com/paritytech/polkadot-sdk/pull/4842 and upstreaming will help us to reduce the size of the patch we need to apply on top of upstream code at Subspace every time we upgrade. There are no new features introduced here, just refactoring to get rid of unnecessary requirements. --- Cargo.lock | 1 + cumulus/client/consensus/aura/Cargo.toml | 1 + .../aura/src/equivocation_import_queue.rs | 9 +- .../consensus/common/src/import_queue.rs | 2 +- cumulus/client/consensus/common/src/lib.rs | 4 +- .../consensus/relay-chain/src/import_queue.rs | 2 +- cumulus/polkadot-parachain/src/service.rs | 91 ++++------ prdoc/pr_4844.prdoc | 34 ++++ .../client/consensus/aura/src/import_queue.rs | 2 +- substrate/client/consensus/babe/src/lib.rs | 4 +- substrate/client/consensus/babe/src/tests.rs | 10 +- .../client/consensus/beefy/src/import.rs | 2 +- .../consensus/common/src/block_import.rs | 15 +- .../consensus/common/src/import_queue.rs | 160 ++++++++++++------ .../common/src/import_queue/basic_queue.rs | 41 +++-- .../client/consensus/grandpa/src/import.rs | 4 +- .../manual-seal/src/consensus/babe.rs | 2 +- .../client/consensus/manual-seal/src/lib.rs | 2 +- substrate/client/consensus/pow/src/lib.rs | 7 +- substrate/client/network/test/src/lib.rs | 11 +- substrate/client/network/test/src/service.rs | 2 +- substrate/client/service/src/client/client.rs | 6 +- 22 files changed, 244 insertions(+), 168 deletions(-) create mode 100644 prdoc/pr_4844.prdoc diff --git a/Cargo.lock b/Cargo.lock index 6f1e91f7b025..5935e7ba7301 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3719,6 +3719,7 @@ dependencies = [ "cumulus-relay-chain-interface", "futures", "parity-scale-codec", + "parking_lot 0.12.1", "polkadot-node-primitives", "polkadot-node-subsystem", "polkadot-overseer", diff --git a/cumulus/client/consensus/aura/Cargo.toml b/cumulus/client/consensus/aura/Cargo.toml index c473b2113dd3..5ab3e6f25129 100644 --- a/cumulus/client/consensus/aura/Cargo.toml +++ b/cumulus/client/consensus/aura/Cargo.toml @@ -13,6 +13,7 @@ workspace = true async-trait = { workspace = true } codec = { features = ["derive"], workspace = true, default-features = true } futures = { workspace = true } +parking_lot = { workspace = true } tracing = { workspace = true, default-features = true } schnellru = { workspace = true } diff --git a/cumulus/client/consensus/aura/src/equivocation_import_queue.rs b/cumulus/client/consensus/aura/src/equivocation_import_queue.rs index be554bdcfc79..68f2d37c8748 100644 --- a/cumulus/client/consensus/aura/src/equivocation_import_queue.rs +++ b/cumulus/client/consensus/aura/src/equivocation_import_queue.rs @@ -21,6 +21,7 @@ /// should be thrown out and which ones should be kept. use codec::Codec; use cumulus_client_consensus_common::ParachainBlockImportMarker; +use parking_lot::Mutex; use schnellru::{ByLength, LruMap}; use sc_consensus::{ @@ -70,7 +71,7 @@ impl NaiveEquivocationDefender { struct Verifier { client: Arc, create_inherent_data_providers: CIDP, - defender: NaiveEquivocationDefender, + defender: Mutex, telemetry: Option, _phantom: std::marker::PhantomData (Block, P)>, } @@ -88,7 +89,7 @@ where CIDP: CreateInherentDataProviders, { async fn verify( - &mut self, + &self, mut block_params: BlockImportParams, ) -> Result, String> { // Skip checks that include execution, if being told so, or when importing only state. @@ -137,7 +138,7 @@ where block_params.post_hash = Some(post_hash); // Check for and reject egregious amounts of equivocations. - if self.defender.insert_and_check(slot) { + if self.defender.lock().insert_and_check(slot) { return Err(format!( "Rejecting block {:?} due to excessive equivocations at slot", post_hash, @@ -243,7 +244,7 @@ where let verifier = Verifier:: { client, create_inherent_data_providers, - defender: NaiveEquivocationDefender::default(), + defender: Mutex::new(NaiveEquivocationDefender::default()), telemetry, _phantom: std::marker::PhantomData, }; diff --git a/cumulus/client/consensus/common/src/import_queue.rs b/cumulus/client/consensus/common/src/import_queue.rs index 8024b7695a28..488693604fef 100644 --- a/cumulus/client/consensus/common/src/import_queue.rs +++ b/cumulus/client/consensus/common/src/import_queue.rs @@ -50,7 +50,7 @@ pub struct VerifyNothing; #[async_trait::async_trait] impl Verifier for VerifyNothing { async fn verify( - &mut self, + &self, params: BlockImportParams, ) -> Result, String> { Ok(params) diff --git a/cumulus/client/consensus/common/src/lib.rs b/cumulus/client/consensus/common/src/lib.rs index cebe34e7ea58..2b0d8290182a 100644 --- a/cumulus/client/consensus/common/src/lib.rs +++ b/cumulus/client/consensus/common/src/lib.rs @@ -172,13 +172,13 @@ impl Clone for ParachainBlockImport { impl BlockImport for ParachainBlockImport where Block: BlockT, - BI: BlockImport + Send, + BI: BlockImport + Send + Sync, BE: Backend, { type Error = BI::Error; async fn check_block( - &mut self, + &self, block: sc_consensus::BlockCheckParams, ) -> Result { self.inner.check_block(block).await diff --git a/cumulus/client/consensus/relay-chain/src/import_queue.rs b/cumulus/client/consensus/relay-chain/src/import_queue.rs index 1b521e79d482..1d6f039da4c1 100644 --- a/cumulus/client/consensus/relay-chain/src/import_queue.rs +++ b/cumulus/client/consensus/relay-chain/src/import_queue.rs @@ -52,7 +52,7 @@ where CIDP: CreateInherentDataProviders, { async fn verify( - &mut self, + &self, mut block_params: BlockImportParams, ) -> Result, String> { block_params.fork_choice = Some(sc_consensus::ForkChoiceStrategy::Custom( diff --git a/cumulus/polkadot-parachain/src/service.rs b/cumulus/polkadot-parachain/src/service.rs index 9cd3a0037223..a22ae77a7759 100644 --- a/cumulus/polkadot-parachain/src/service.rs +++ b/cumulus/polkadot-parachain/src/service.rs @@ -498,43 +498,26 @@ pub async fn start_shell_node>( .await } -enum BuildOnAccess { - Uninitialized(Option R + Send + Sync>>), - Initialized(R), -} - -impl BuildOnAccess { - fn get_mut(&mut self) -> &mut R { - loop { - match self { - Self::Uninitialized(f) => { - *self = Self::Initialized((f.take().unwrap())()); - }, - Self::Initialized(ref mut r) => return r, - } - } - } -} - struct Verifier { client: Arc, - aura_verifier: BuildOnAccess>>, + aura_verifier: Box>, relay_chain_verifier: Box>, _phantom: PhantomData, } #[async_trait::async_trait] -impl VerifierT for Verifier +impl VerifierT for Verifier where - Client: sp_api::ProvideRuntimeApi + Send + Sync, + Client: ProvideRuntimeApi + Send + Sync, Client::Api: AuraRuntimeApi, + AuraId: AuraIdT + Sync, { async fn verify( - &mut self, + &self, block_import: BlockImportParams, ) -> Result, String> { if self.client.runtime_api().has_aura_api(*block_import.header.parent_hash()) { - self.aura_verifier.get_mut().verify(block_import).await + self.aura_verifier.verify(block_import).await } else { self.relay_chain_verifier.verify(block_import).await } @@ -543,7 +526,7 @@ where /// Build the import queue for parachain runtimes that started with relay chain consensus and /// switched to aura. -pub fn build_relay_to_aura_import_queue( +pub fn build_relay_to_aura_import_queue( client: Arc>, block_import: ParachainBlockImport, config: &Configuration, @@ -553,38 +536,35 @@ pub fn build_relay_to_aura_import_queue( where RuntimeApi: ConstructNodeRuntimeApi>, RuntimeApi::RuntimeApi: AuraRuntimeApi, + AuraId: AuraIdT + Sync, { let verifier_client = client.clone(); - let aura_verifier = move || { - Box::new(cumulus_client_consensus_aura::build_verifier::< - ::Pair, - _, - _, - _, - >(cumulus_client_consensus_aura::BuildVerifierParams { - client: verifier_client.clone(), - create_inherent_data_providers: move |parent_hash, _| { - let cidp_client = verifier_client.clone(); - async move { - let slot_duration = cumulus_client_consensus_aura::slot_duration_at( - &*cidp_client, - parent_hash, - )?; - let timestamp = sp_timestamp::InherentDataProvider::from_system_time(); - - let slot = - sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration( - *timestamp, - slot_duration, - ); - - Ok((slot, timestamp)) - } - }, - telemetry: telemetry_handle, - })) as Box<_> - }; + let aura_verifier = cumulus_client_consensus_aura::build_verifier::< + ::Pair, + _, + _, + _, + >(cumulus_client_consensus_aura::BuildVerifierParams { + client: verifier_client.clone(), + create_inherent_data_providers: move |parent_hash, _| { + let cidp_client = verifier_client.clone(); + async move { + let slot_duration = + cumulus_client_consensus_aura::slot_duration_at(&*cidp_client, parent_hash)?; + let timestamp = sp_timestamp::InherentDataProvider::from_system_time(); + + let slot = + sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration( + *timestamp, + slot_duration, + ); + + Ok((slot, timestamp)) + } + }, + telemetry: telemetry_handle, + }); let relay_chain_verifier = Box::new(RelayChainVerifier::new(client.clone(), |_, _| async { Ok(()) })) as Box<_>; @@ -592,7 +572,7 @@ where let verifier = Verifier { client, relay_chain_verifier, - aura_verifier: BuildOnAccess::Uninitialized(Some(Box::new(aura_verifier))), + aura_verifier: Box::new(aura_verifier), _phantom: PhantomData, }; @@ -632,7 +612,7 @@ pub async fn start_generic_aura_lookahead_node> /// /// Uses the lookahead collator to support async backing. #[sc_tracing::logging::prefix_logs_with("Parachain")] -pub async fn start_asset_hub_lookahead_node( +pub async fn start_asset_hub_lookahead_node( parachain_config: Configuration, polkadot_config: Configuration, collator_options: CollatorOptions, @@ -644,6 +624,7 @@ where RuntimeApi::RuntimeApi: AuraRuntimeApi + pallet_transaction_payment_rpc::TransactionPaymentRuntimeApi + substrate_frame_rpc_system::AccountNonceApi, + AuraId: AuraIdT + Sync, Net: NetworkBackend, { start_node_impl::( diff --git a/prdoc/pr_4844.prdoc b/prdoc/pr_4844.prdoc new file mode 100644 index 000000000000..999e63c84ed9 --- /dev/null +++ b/prdoc/pr_4844.prdoc @@ -0,0 +1,34 @@ +title: Make `Verifier::verify` and `BlockImport::check_block` use `&self` instead of `&mut self` + +doc: + - audience: Node Dev + description: | + `Verifier::verify` and `BlockImport::check_block` were refactored to use `&self` instead of `&mut self` + because there is no fundamental requirement for those operations to be exclusive in nature. + +crates: +- name: sc-consensus + bump: major + validate: false +- name: sc-consensus-aura + bump: major +- name: sc-consensus-babe + bump: major +- name: sc-consensus-beefy + bump: major +- name: sc-consensus-grandpa + bump: major +- name: sc-consensus-manual-seal + bump: major +- name: sc-consensus-pow + bump: major +- name: sc-service + bump: major +- name: cumulus-client-consensus-common + bump: major +- name: cumulus-client-consensus-aura + bump: major +- name: cumulus-client-consensus-relay-chain + bump: major +- name: polkadot-parachain-bin + validate: false diff --git a/substrate/client/consensus/aura/src/import_queue.rs b/substrate/client/consensus/aura/src/import_queue.rs index a8777ef8788c..79f4faa5ebf9 100644 --- a/substrate/client/consensus/aura/src/import_queue.rs +++ b/substrate/client/consensus/aura/src/import_queue.rs @@ -174,7 +174,7 @@ where CIDP::InherentDataProviders: InherentDataProviderExt + Send + Sync, { async fn verify( - &mut self, + &self, mut block: BlockImportParams, ) -> Result, String> { // Skip checks that include execution, if being told so or when importing only state. diff --git a/substrate/client/consensus/babe/src/lib.rs b/substrate/client/consensus/babe/src/lib.rs index 0c85de240040..0c1eb8875864 100644 --- a/substrate/client/consensus/babe/src/lib.rs +++ b/substrate/client/consensus/babe/src/lib.rs @@ -1128,7 +1128,7 @@ where CIDP::InherentDataProviders: InherentDataProviderExt + Send + Sync, { async fn verify( - &mut self, + &self, mut block: BlockImportParams, ) -> Result, String> { trace!( @@ -1681,7 +1681,7 @@ where } async fn check_block( - &mut self, + &self, block: BlockCheckParams, ) -> Result { self.inner.check_block(block).await.map_err(Into::into) diff --git a/substrate/client/consensus/babe/src/tests.rs b/substrate/client/consensus/babe/src/tests.rs index 716067ae4000..6f805188b9a4 100644 --- a/substrate/client/consensus/babe/src/tests.rs +++ b/substrate/client/consensus/babe/src/tests.rs @@ -143,11 +143,11 @@ thread_local! { pub struct PanickingBlockImport(B); #[async_trait::async_trait] -impl> BlockImport for PanickingBlockImport +impl BlockImport for PanickingBlockImport where - B: Send, + BI: BlockImport + Send + Sync, { - type Error = B::Error; + type Error = BI::Error; async fn import_block( &mut self, @@ -157,7 +157,7 @@ where } async fn check_block( - &mut self, + &self, block: BlockCheckParams, ) -> Result { Ok(self.0.check_block(block).await.expect("checking block failed")) @@ -198,7 +198,7 @@ impl Verifier for TestVerifier { /// new set of validators to import. If not, err with an Error-Message /// presented to the User in the logs. async fn verify( - &mut self, + &self, mut block: BlockImportParams, ) -> Result, String> { // apply post-sealing mutations (i.e. stripping seal, if desired). diff --git a/substrate/client/consensus/beefy/src/import.rs b/substrate/client/consensus/beefy/src/import.rs index c01fb3db4845..848026852933 100644 --- a/substrate/client/consensus/beefy/src/import.rs +++ b/substrate/client/consensus/beefy/src/import.rs @@ -192,7 +192,7 @@ where } async fn check_block( - &mut self, + &self, block: BlockCheckParams, ) -> Result { self.inner.check_block(block).await diff --git a/substrate/client/consensus/common/src/block_import.rs b/substrate/client/consensus/common/src/block_import.rs index d91851aea62c..c5adbb5a5fca 100644 --- a/substrate/client/consensus/common/src/block_import.rs +++ b/substrate/client/consensus/common/src/block_import.rs @@ -307,10 +307,7 @@ pub trait BlockImport { type Error: std::error::Error + Send + 'static; /// Check block preconditions. - async fn check_block( - &mut self, - block: BlockCheckParams, - ) -> Result; + async fn check_block(&self, block: BlockCheckParams) -> Result; /// Import a block. async fn import_block( @@ -324,10 +321,7 @@ impl BlockImport for crate::import_queue::BoxBlockImport { type Error = sp_consensus::error::Error; /// Check block preconditions. - async fn check_block( - &mut self, - block: BlockCheckParams, - ) -> Result { + async fn check_block(&self, block: BlockCheckParams) -> Result { (**self).check_block(block).await } @@ -348,10 +342,7 @@ where { type Error = E; - async fn check_block( - &mut self, - block: BlockCheckParams, - ) -> Result { + async fn check_block(&self, block: BlockCheckParams) -> Result { (&**self).check_block(block).await } diff --git a/substrate/client/consensus/common/src/import_queue.rs b/substrate/client/consensus/common/src/import_queue.rs index 1ddda04126a9..35fc8ad4a402 100644 --- a/substrate/client/consensus/common/src/import_queue.rs +++ b/substrate/client/consensus/common/src/import_queue.rs @@ -28,6 +28,10 @@ //! queues to be instantiated simply. use log::{debug, trace}; +use std::{ + fmt, + time::{Duration, Instant}, +}; use sp_consensus::{error::Error as ConsensusError, BlockOrigin}; use sp_runtime::{ @@ -93,11 +97,10 @@ pub struct IncomingBlock { /// Verify a justification of a block #[async_trait::async_trait] -pub trait Verifier: Send { +pub trait Verifier: Send + Sync { /// Verify the given block data and return the `BlockImportParams` to /// continue the block import process. - async fn verify(&mut self, block: BlockImportParams) - -> Result, String>; + async fn verify(&self, block: BlockImportParams) -> Result, String>; } /// Blocks import queue API. @@ -166,16 +169,16 @@ pub trait Link: Send { /// Block import successful result. #[derive(Debug, PartialEq)] -pub enum BlockImportStatus { +pub enum BlockImportStatus { /// Imported known block. - ImportedKnown(N, Option), + ImportedKnown(BlockNumber, Option), /// Imported unknown block. - ImportedUnknown(N, ImportedAux, Option), + ImportedUnknown(BlockNumber, ImportedAux, Option), } -impl BlockImportStatus { +impl BlockImportStatus { /// Returns the imported block number. - pub fn number(&self) -> &N { + pub fn number(&self) -> &BlockNumber { match self { BlockImportStatus::ImportedKnown(n, _) | BlockImportStatus::ImportedUnknown(n, _, _) => n, @@ -224,44 +227,30 @@ pub async fn import_single_block>( block: IncomingBlock, verifier: &mut V, ) -> BlockImportResult { - import_single_block_metered(import_handle, block_origin, block, verifier, None).await + match verify_single_block_metered(import_handle, block_origin, block, verifier, None).await? { + SingleBlockVerificationOutcome::Imported(import_status) => Ok(import_status), + SingleBlockVerificationOutcome::Verified(import_parameters) => + import_single_block_metered(import_handle, import_parameters, None).await, + } } -/// Single block import function with metering. -pub(crate) async fn import_single_block_metered>( - import_handle: &mut impl BlockImport, - block_origin: BlockOrigin, - block: IncomingBlock, - verifier: &mut V, - metrics: Option, -) -> BlockImportResult { - let peer = block.origin; - - let (header, justifications) = match (block.header, block.justifications) { - (Some(header), justifications) => (header, justifications), - (None, _) => { - if let Some(ref peer) = peer { - debug!(target: LOG_TARGET, "Header {} was not provided by {} ", block.hash, peer); - } else { - debug!(target: LOG_TARGET, "Header {} was not provided ", block.hash); - } - return Err(BlockImportError::IncompleteHeader(peer)) - }, - }; - - trace!(target: LOG_TARGET, "Header {} has {:?} logs", block.hash, header.digest().logs().len()); - - let number = *header.number(); - let hash = block.hash; - let parent_hash = *header.parent_hash(); - - let import_handler = |import| match import { +fn import_handler( + number: NumberFor, + hash: Block::Hash, + parent_hash: Block::Hash, + block_origin: Option, + import: Result, +) -> Result>, BlockImportError> +where + Block: BlockT, +{ + match import { Ok(ImportResult::AlreadyInChain) => { trace!(target: LOG_TARGET, "Block already in chain {}: {:?}", number, hash); - Ok(BlockImportStatus::ImportedKnown(number, peer)) + Ok(BlockImportStatus::ImportedKnown(number, block_origin)) }, Ok(ImportResult::Imported(aux)) => - Ok(BlockImportStatus::ImportedUnknown(number, aux, peer)), + Ok(BlockImportStatus::ImportedUnknown(number, aux, block_origin)), Ok(ImportResult::MissingState) => { debug!( target: LOG_TARGET, @@ -278,15 +267,60 @@ pub(crate) async fn import_single_block_metered>( }, Ok(ImportResult::KnownBad) => { debug!(target: LOG_TARGET, "Peer gave us a bad block {}: {:?}", number, hash); - Err(BlockImportError::BadBlock(peer)) + Err(BlockImportError::BadBlock(block_origin)) }, Err(e) => { debug!(target: LOG_TARGET, "Error importing block {}: {:?}: {}", number, hash, e); Err(BlockImportError::Other(e)) }, + } +} + +pub(crate) enum SingleBlockVerificationOutcome { + /// Block is already imported. + Imported(BlockImportStatus>), + /// Block is verified, but needs to be imported. + Verified(SingleBlockImportParameters), +} + +pub(crate) struct SingleBlockImportParameters { + import_block: BlockImportParams, + hash: Block::Hash, + block_origin: Option, + verification_time: Duration, +} + +/// Single block import function with metering. +pub(crate) async fn verify_single_block_metered>( + import_handle: &impl BlockImport, + block_origin: BlockOrigin, + block: IncomingBlock, + verifier: &mut V, + metrics: Option<&Metrics>, +) -> Result, BlockImportError> { + let peer = block.origin; + let justifications = block.justifications; + + let Some(header) = block.header else { + if let Some(ref peer) = peer { + debug!(target: LOG_TARGET, "Header {} was not provided by {peer} ", block.hash); + } else { + debug!(target: LOG_TARGET, "Header {} was not provided ", block.hash); + } + return Err(BlockImportError::IncompleteHeader(peer)) }; - match import_handler( + trace!(target: LOG_TARGET, "Header {} has {:?} logs", block.hash, header.digest().logs().len()); + + let number = *header.number(); + let hash = block.hash; + let parent_hash = *header.parent_hash(); + + match import_handler::( + number, + hash, + parent_hash, + peer, import_handle .check_block(BlockCheckParams { hash, @@ -299,10 +333,13 @@ pub(crate) async fn import_single_block_metered>( .await, )? { BlockImportStatus::ImportedUnknown { .. } => (), - r => return Ok(r), // Any other successful result means that the block is already imported. + r => { + // Any other successful result means that the block is already imported. + return Ok(SingleBlockVerificationOutcome::Imported(r)) + }, } - let started = std::time::Instant::now(); + let started = Instant::now(); let mut import_block = BlockImportParams::new(block_origin, header); import_block.body = block.body; @@ -333,19 +370,42 @@ pub(crate) async fn import_single_block_metered>( } else { trace!(target: LOG_TARGET, "Verifying {}({}) failed: {}", number, hash, msg); } - if let Some(metrics) = metrics.as_ref() { + if let Some(metrics) = metrics { metrics.report_verification(false, started.elapsed()); } BlockImportError::VerificationFailed(peer, msg) })?; - if let Some(metrics) = metrics.as_ref() { - metrics.report_verification(true, started.elapsed()); + let verification_time = started.elapsed(); + if let Some(metrics) = metrics { + metrics.report_verification(true, verification_time); } + Ok(SingleBlockVerificationOutcome::Verified(SingleBlockImportParameters { + import_block, + hash, + block_origin: peer, + verification_time, + })) +} + +pub(crate) async fn import_single_block_metered( + import_handle: &mut impl BlockImport, + import_parameters: SingleBlockImportParameters, + metrics: Option<&Metrics>, +) -> BlockImportResult { + let started = Instant::now(); + + let SingleBlockImportParameters { import_block, hash, block_origin, verification_time } = + import_parameters; + + let number = *import_block.header.number(); + let parent_hash = *import_block.header.parent_hash(); + let imported = import_handle.import_block(import_block).await; - if let Some(metrics) = metrics.as_ref() { - metrics.report_verification_and_import(started.elapsed()); + if let Some(metrics) = metrics { + metrics.report_verification_and_import(started.elapsed() + verification_time); } - import_handler(imported) + + import_handler::(number, hash, parent_hash, block_origin, imported) } diff --git a/substrate/client/consensus/common/src/import_queue/basic_queue.rs b/substrate/client/consensus/common/src/import_queue/basic_queue.rs index e5eac3896cc8..05f2b2527961 100644 --- a/substrate/client/consensus/common/src/import_queue/basic_queue.rs +++ b/substrate/client/consensus/common/src/import_queue/basic_queue.rs @@ -32,9 +32,9 @@ use std::pin::Pin; use crate::{ import_queue::{ buffered_link::{self, BufferedLinkReceiver, BufferedLinkSender}, - import_single_block_metered, BlockImportError, BlockImportStatus, BoxBlockImport, - BoxJustificationImport, ImportQueue, ImportQueueService, IncomingBlock, Link, - RuntimeOrigin, Verifier, LOG_TARGET, + import_single_block_metered, verify_single_block_metered, BlockImportError, + BlockImportStatus, BoxBlockImport, BoxJustificationImport, ImportQueue, ImportQueueService, + IncomingBlock, Link, RuntimeOrigin, SingleBlockVerificationOutcome, Verifier, LOG_TARGET, }, metrics::Metrics, }; @@ -60,13 +60,16 @@ impl BasicQueue { /// Instantiate a new basic queue, with given verifier. /// /// This creates a background task, and calls `on_start` on the justification importer. - pub fn new>( + pub fn new( verifier: V, block_import: BoxBlockImport, justification_import: Option>, spawner: &impl sp_core::traits::SpawnEssentialNamed, prometheus_registry: Option<&Registry>, - ) -> Self { + ) -> Self + where + V: Verifier + 'static, + { let (result_sender, result_port) = buffered_link::buffered_link(100_000); let metrics = prometheus_registry.and_then(|r| { @@ -252,7 +255,7 @@ struct BlockImportWorker { } impl BlockImportWorker { - fn new>( + fn new( result_sender: BufferedLinkSender, verifier: V, block_import: BoxBlockImport, @@ -262,7 +265,10 @@ impl BlockImportWorker { impl Future + Send, TracingUnboundedSender>, TracingUnboundedSender>, - ) { + ) + where + V: Verifier + 'static, + { use worker_messages::*; let (justification_sender, mut justification_port) = @@ -419,15 +425,22 @@ async fn import_many_blocks>( let import_result = if has_error { Err(BlockImportError::Cancelled) } else { - // The actual import. - import_single_block_metered( + let verification_fut = verify_single_block_metered( import_handle, blocks_origin, block, verifier, - metrics.clone(), - ) - .await + metrics.as_ref(), + ); + match verification_fut.await { + Ok(SingleBlockVerificationOutcome::Imported(import_status)) => Ok(import_status), + Ok(SingleBlockVerificationOutcome::Verified(import_parameters)) => { + // The actual import. + import_single_block_metered(import_handle, import_parameters, metrics.as_ref()) + .await + }, + Err(e) => Err(e), + } }; if let Some(metrics) = metrics.as_ref() { @@ -494,7 +507,7 @@ mod tests { #[async_trait::async_trait] impl Verifier for () { async fn verify( - &mut self, + &self, block: BlockImportParams, ) -> Result, String> { Ok(BlockImportParams::new(block.origin, block.header)) @@ -506,7 +519,7 @@ mod tests { type Error = sp_consensus::Error; async fn check_block( - &mut self, + &self, _block: BlockCheckParams, ) -> Result { Ok(ImportResult::imported(false)) diff --git a/substrate/client/consensus/grandpa/src/import.rs b/substrate/client/consensus/grandpa/src/import.rs index b594c0f678ce..8b7b02f180ec 100644 --- a/substrate/client/consensus/grandpa/src/import.rs +++ b/substrate/client/consensus/grandpa/src/import.rs @@ -518,7 +518,7 @@ where Client: ClientForGrandpa, Client::Api: GrandpaApi, for<'a> &'a Client: BlockImport, - SC: Send, + SC: Send + Sync, { type Error = ConsensusError; @@ -697,7 +697,7 @@ where } async fn check_block( - &mut self, + &self, block: BlockCheckParams, ) -> Result { self.inner.check_block(block).await diff --git a/substrate/client/consensus/manual-seal/src/consensus/babe.rs b/substrate/client/consensus/manual-seal/src/consensus/babe.rs index bc56ce022714..a68e46f0134d 100644 --- a/substrate/client/consensus/manual-seal/src/consensus/babe.rs +++ b/substrate/client/consensus/manual-seal/src/consensus/babe.rs @@ -96,7 +96,7 @@ where C: HeaderBackend + HeaderMetadata, { async fn verify( - &mut self, + &self, mut import_params: BlockImportParams, ) -> Result, String> { import_params.finalized = false; diff --git a/substrate/client/consensus/manual-seal/src/lib.rs b/substrate/client/consensus/manual-seal/src/lib.rs index 8fc7e7ecab2f..39f8f8609d8d 100644 --- a/substrate/client/consensus/manual-seal/src/lib.rs +++ b/substrate/client/consensus/manual-seal/src/lib.rs @@ -65,7 +65,7 @@ struct ManualSealVerifier; #[async_trait::async_trait] impl Verifier for ManualSealVerifier { async fn verify( - &mut self, + &self, mut block: BlockImportParams, ) -> Result, String> { block.finalized = false; diff --git a/substrate/client/consensus/pow/src/lib.rs b/substrate/client/consensus/pow/src/lib.rs index ee5c1dfc6f11..50e9533abb36 100644 --- a/substrate/client/consensus/pow/src/lib.rs +++ b/substrate/client/consensus/pow/src/lib.rs @@ -312,10 +312,7 @@ where { type Error = ConsensusError; - async fn check_block( - &mut self, - block: BlockCheckParams, - ) -> Result { + async fn check_block(&self, block: BlockCheckParams) -> Result { self.inner.check_block(block).await.map_err(Into::into) } @@ -442,7 +439,7 @@ where Algorithm::Difficulty: 'static + Send, { async fn verify( - &mut self, + &self, mut block: BlockImportParams, ) -> Result, String> { let hash = block.header.hash(); diff --git a/substrate/client/network/test/src/lib.rs b/substrate/client/network/test/src/lib.rs index 8a8f9608051a..221c8515d6d4 100644 --- a/substrate/client/network/test/src/lib.rs +++ b/substrate/client/network/test/src/lib.rs @@ -114,7 +114,7 @@ impl PassThroughVerifier { #[async_trait::async_trait] impl Verifier for PassThroughVerifier { async fn verify( - &mut self, + &self, mut block: BlockImportParams, ) -> Result, String> { if block.fork_choice.is_none() { @@ -210,7 +210,7 @@ impl BlockImport for PeersClient { type Error = ConsensusError; async fn check_block( - &mut self, + &self, block: BlockCheckParams, ) -> Result { self.client.check_block(block).await @@ -600,7 +600,7 @@ where type Error = ConsensusError; async fn check_block( - &mut self, + &self, block: BlockCheckParams, ) -> Result { self.inner.check_block(block).await @@ -622,10 +622,7 @@ struct VerifierAdapter { #[async_trait::async_trait] impl Verifier for VerifierAdapter { - async fn verify( - &mut self, - block: BlockImportParams, - ) -> Result, String> { + async fn verify(&self, block: BlockImportParams) -> Result, String> { let hash = block.header.hash(); self.verifier.lock().await.verify(block).await.map_err(|e| { self.failed_verifications.lock().insert(hash, e.clone()); diff --git a/substrate/client/network/test/src/service.rs b/substrate/client/network/test/src/service.rs index 150c1db7560e..c4a2b261081e 100644 --- a/substrate/client/network/test/src/service.rs +++ b/substrate/client/network/test/src/service.rs @@ -134,7 +134,7 @@ impl TestNetworkBuilder { #[async_trait::async_trait] impl sc_consensus::Verifier for PassThroughVerifier { async fn verify( - &mut self, + &self, mut block: sc_consensus::BlockImportParams, ) -> Result, String> { block.finalized = self.0; diff --git a/substrate/client/service/src/client/client.rs b/substrate/client/service/src/client/client.rs index 2fbcc3ba4f75..a2c9212f7b9c 100644 --- a/substrate/client/service/src/client/client.rs +++ b/substrate/client/service/src/client/client.rs @@ -1780,7 +1780,7 @@ where /// Check block preconditions. async fn check_block( - &mut self, + &self, block: BlockCheckParams, ) -> Result { let BlockCheckParams { @@ -1862,10 +1862,10 @@ where } async fn check_block( - &mut self, + &self, block: BlockCheckParams, ) -> Result { - (&*self).check_block(block).await + (&self).check_block(block).await } }