Skip to content

Commit

Permalink
submit_local: prio/limits improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
michalkucharczyk committed Jan 13, 2025
1 parent bdf5b2a commit 8ee3b7b
Showing 1 changed file with 63 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,15 @@ use parking_lot::Mutex;
use prometheus_endpoint::Registry as PrometheusRegistry;
use sc_transaction_pool_api::{
error::Error as TxPoolApiError, ChainEvent, ImportNotificationStream,
MaintainedTransactionPool, PoolStatus, TransactionFor, TransactionPool, TransactionSource,
TransactionStatusStreamFor, TxHash,
MaintainedTransactionPool, PoolStatus, TransactionFor, TransactionPool, TransactionPriority,
TransactionSource, TransactionStatusStreamFor, TxHash,
};
use sp_blockchain::{HashAndNumber, TreeRoute};
use sp_core::traits::SpawnEssentialNamed;
use sp_runtime::{
generic::BlockId,
traits::{Block as BlockT, NumberFor},
transaction_validity::{TransactionValidityError, ValidTransaction},
};
use std::{
collections::{HashMap, HashSet},
Expand Down Expand Up @@ -885,22 +886,16 @@ where
}
}

impl<Block, Client> sc_transaction_pool_api::LocalTransactionPool
for ForkAwareTxPool<FullChainApi<Client, Block>, Block>
impl<ChainApi, Block> sc_transaction_pool_api::LocalTransactionPool
for ForkAwareTxPool<ChainApi, Block>
where
Block: BlockT,
ChainApi: 'static + graph::ChainApi<Block = Block>,
<Block as BlockT>::Hash: Unpin,
Client: sp_api::ProvideRuntimeApi<Block>
+ sc_client_api::BlockBackend<Block>
+ sc_client_api::blockchain::HeaderBackend<Block>
+ sp_runtime::traits::BlockIdTo<Block>
+ sp_blockchain::HeaderMetadata<Block, Error = sp_blockchain::Error>,
Client: Send + Sync + 'static,
Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
{
type Block = Block;
type Hash = ExtrinsicHash<FullChainApi<Client, Block>>;
type Error = <FullChainApi<Client, Block> as graph::ChainApi>::Error;
type Hash = ExtrinsicHash<ChainApi>;
type Error = ChainApi::Error;

fn submit_local(
&self,
Expand All @@ -909,18 +904,29 @@ where
) -> Result<Self::Hash, Self::Error> {
log::debug!(target: LOG_TARGET, "fatp::submit_local views:{}", self.active_views_count());
let xt = Arc::from(xt);
let InsertionInfo { hash: xt_hash, .. } = self
.mempool
.extend_unwatched(TransactionSource::Local, &[xt.clone()])
.remove(0)?;

let result =
self.mempool.extend_unwatched(TransactionSource::Local, &[xt.clone()]).remove(0);

let insertion = match result {
Err(TxPoolApiError::ImmediatelyDropped) => self.attempt_transaction_replacement_sync(
TransactionSource::Local,
false,
xt.clone(),
),
_ => result,
}?;

self.view_store
.submit_local(xt)
.inspect_err(|_| {
self.mempool.remove_transaction(&insertion.hash);
})
.map(|outcome| {
self.mempool.update_transaction_priority(&outcome);
outcome.hash()
})
.or_else(|_| Ok(xt_hash))
.or_else(|_| Ok(insertion.hash))
}
}

Expand Down Expand Up @@ -1357,7 +1363,7 @@ where
.get_view_at(at, false)
.ok_or(TxPoolApiError::ImmediatelyDropped)?;

let (tx_hash, validated_tx) = best_view
let (xt_hash, validated_tx) = best_view
.pool
.verify_one(
best_view.at.hash,
Expand All @@ -1372,7 +1378,44 @@ where
return Err(TxPoolApiError::ImmediatelyDropped)
};

let insertion_info = self.mempool.try_replace_transaction(xt, priority, source, watched)?;
self.attempt_transaction_replacement_inner(xt, xt_hash, priority, source, watched)
}

/// Sync version of [`Self::attempt_transaction_replacement`].
fn attempt_transaction_replacement_sync(
&self,
source: TransactionSource,
watched: bool,
xt: ExtrinsicFor<ChainApi>,
) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, TxPoolApiError> {
let at = self
.view_store
.most_recent_view
.read()
.ok_or(TxPoolApiError::ImmediatelyDropped)?;

let ValidTransaction { priority, .. } = self
.api
.validate_transaction_blocking(at, TransactionSource::Local, Arc::from(xt.clone()))
.map_err(|_| TxPoolApiError::ImmediatelyDropped)?
.map_err(|e| match e {
TransactionValidityError::Invalid(i) => TxPoolApiError::InvalidTransaction(i),
TransactionValidityError::Unknown(u) => TxPoolApiError::UnknownTransaction(u),
})?;
let xt_hash = self.hash_of(&xt);
self.attempt_transaction_replacement_inner(xt, xt_hash, priority, source, watched)
}

fn attempt_transaction_replacement_inner(
&self,
xt: ExtrinsicFor<ChainApi>,
tx_hash: ExtrinsicHash<ChainApi>,
priority: TransactionPriority,
source: TransactionSource,
watched: bool,
) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, TxPoolApiError> {
let insertion_info =
self.mempool.try_insert_with_replacement(xt, priority, source, watched)?;

for worst_hash in &insertion_info.removed {
log::trace!(target: LOG_TARGET, "removed: {worst_hash:?} replaced by {tx_hash:?}");
Expand Down

0 comments on commit 8ee3b7b

Please sign in to comment.