From 26c11fc542310ef6841d8382418a319502de78a3 Mon Sep 17 00:00:00 2001
From: Michal Kucharczyk <1728078+michalkucharczyk@users.noreply.github.com>
Date: Tue, 15 Oct 2024 16:04:50 +0200
Subject: [PATCH] fork-aware transaction pool added (#4639)
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
### Fork-Aware Transaction Pool Implementation
This PR introduces a fork-aware transaction pool (fatxpool) enhancing
transaction management by maintaining the valid state of txpool for
different forks.
### High-level overview
The high level overview was added to
[`sc_transaction_pool::fork_aware_txpool`](https://github.com/paritytech/polkadot-sdk/blob/3ad0a1b7c08e63a2581595cb2cd55f11ccd60f4f/substrate/client/transaction-pool/src/fork_aware_txpool/mod.rs#L21)
module. Use:
```
cargo doc --document-private-items -p sc-transaction-pool --open
```
to build the doc. It should give a good overview and nice entry point
into the new pool's mechanics.
Quick overview (documentation excerpt)
#### View
For every fork, a view is created. The view is a persisted state of the
transaction pool computed and updated at the tip of the fork. The view
is built around the existing `ValidatedPool` structure.
A view is created on every new best block notification. To create a
view, one of the existing views is chosen and cloned.
When the chain progresses, the view is kept in the cache
(`retracted_views`) to allow building blocks upon intermediary blocks in
the fork.
The views are deleted on finalization: views lower than the finalized
block are removed.
The views are updated with the transactions from the mempool—all
transactions are sent to the newly created views.
A maintain process is also executed for the newly created
views—basically resubmitting and pruning transactions from the
appropriate tree route.
##### View store
View store is the helper structure that acts as a container for all the
views. It provides some convenient methods.
##### Submitting transactions
Every transaction is submitted to every view at the tips of the forks.
Retracted views are not updated.
Every transaction also goes into the mempool.
##### Internal mempool
Shortly, the main purpose of an internal mempool is to prevent a
transaction from being lost. That could happen when a transaction is
invalid on one fork and could be valid on another. It also allows the
txpool to accept transactions when no blocks have been reported yet.
The mempool removes its transactions when they get finalized.
Transactions are also periodically verified on every finalized event and
removed from the mempool if no longer valid.
#### Events
Transaction events from multiple views are merged and filtered to avoid
duplicated events.
`Ready` / `Future` / `Inblock` events are originated in the Views and
are de-duplicated and forwarded to external listeners.
`Finalized` events are originated in fork-aware-txpool logic.
`Invalid` events requires special care and can be originated in both
view and fork-aware-txpool logic.
#### Light maintain
Sometime transaction pool does not have enough time to prepare fully
maintained view with all retracted transactions being revalidated. To
avoid providing empty ready transaction set to block builder (what would
result in empty block) the light maintain was implemented. It simply
removes the imported transactions from ready iterator.
#### Revalidation
Revalidation is performed for every view. The revalidation process is
started after a trigger is executed. The revalidation work is terminated
just after a new best block / finalized event is notified to the
transaction pool.
The revalidation result is applied to the newly created view which is
built upon the revalidated view.
Additionally, parts of the mempool are also revalidated to make sure
that no transactions are stuck in the mempool.
#### Logs
The most important log allowing to understand the state of the txpool
is:
```
maintain: txs:(0, 92) views:[2;[(327, 76, 0), (326, 68, 0)]] event:Finalized { hash: 0x8...f, tree_route: [] } took:3.463522ms
^ ^ ^ ^ ^ ^ ^ ^ ^ ^ ^
unwatched txs in mempool ────┘ │ │ │ │ │ │ │ │ │ │
watched txs in mempool ───────┘ │ │ │ │ │ │ │ │ │
views ───────────────┘ │ │ │ │ │ │ │ │
1st view block # ──────────┘ │ │ │ │ │ │ │
number of ready tx ───────┘ │ │ │ │ │ │
numer of future tx ─────┘ │ │ │ │ │
2nd view block # ──────┘ │ │ │ │
number of ready tx ──────────┘ │ │ │
number of future tx ───────┘ │ │
event ────────┘ │
duration ──────────────────────────────────────────────────┘
```
It is logged after the maintenance is done.
The `debug` level enables per-transaction logging, allowing to keep
track of all transaction-related actions that happened in txpool.
### Integration notes
For teams having a custom node, the new txpool needs to be instantiated,
typically in `service.rs` file, here is an example:
https://github.com/paritytech/polkadot-sdk/blob/9c547ff3e36cf3b52c99f4ed7849a8e9f722d97d/cumulus/polkadot-omni-node/lib/src/common/spec.rs#L152-L161
To enable new transaction pool the following cli arg shall be specified:
`--pool-type=fork-aware`. If it works, there shall be information
printed in the log:
```
2024-09-20 21:28:17.528 INFO main txpool: [Parachain] creating ForkAware txpool.
````
For debugging the following debugs shall be enabled:
```
"-lbasic-authorship=debug",
"-ltxpool=debug",
```
*note:* trace for txpool enables per-transaction logging.
### Future work
The current implementation seems to be stable, however further
improvements are required.
Here is the umbrella issue for future work:
- https://github.com/paritytech/polkadot-sdk/issues/5472
Partially fixes: #1202
---------
Co-authored-by: Bastian Köcher
Co-authored-by: Sebastian Kunert
Co-authored-by: Iulian Barbu <14218860+iulianbarbu@users.noreply.github.com>
---
Cargo.lock | 6 +
cumulus/client/service/src/lib.rs | 2 +-
.../polkadot-omni-node/lib/src/common/rpc.rs | 6 +-
.../polkadot-omni-node/lib/src/common/spec.rs | 21 +-
.../lib/src/common/types.rs | 4 +-
.../polkadot-omni-node/lib/src/nodes/aura.rs | 6 +-
cumulus/test/service/src/lib.rs | 19 +-
cumulus/zombienet/tests/0008-main.js | 18 +
...08-parachain_extrinsic_gets_finalized.toml | 25 +
...8-parachain_extrinsic_gets_finalized.zndsl | 20 +
.../tests/0008-transaction_gets_finalized.js | 69 +
polkadot/node/service/src/lib.rs | 17 +-
prdoc/pr_4639.prdoc | 69 +
substrate/bin/node/bench/src/construct.rs | 25 +-
.../bin/node/cli/benches/transaction_pool.rs | 19 +-
substrate/bin/node/cli/src/service.rs | 26 +-
.../basic-authorship/src/basic_authorship.rs | 94 +-
substrate/client/basic-authorship/src/lib.rs | 4 +-
substrate/client/cli/Cargo.toml | 1 +
.../cli/src/params/transaction_pool_params.rs | 55 +-
.../client/network/transactions/src/lib.rs | 2 +
substrate/client/offchain/src/lib.rs | 9 +-
.../src/transaction/tests/middleware_pool.rs | 19 +-
substrate/client/rpc/src/author/tests.rs | 9 +-
substrate/client/service/src/config.rs | 2 +-
substrate/client/service/src/lib.rs | 30 +-
substrate/client/transaction-pool/Cargo.toml | 4 +
.../client/transaction-pool/api/src/error.rs | 2 +-
.../client/transaction-pool/api/src/lib.rs | 36 +-
.../client/transaction-pool/benches/basics.rs | 28 +-
.../client/transaction-pool/src/builder.rs | 245 ++
.../transaction-pool/src/{ => common}/api.rs | 59 +-
.../src/{ => common}/enactment_state.rs | 54 +-
.../src/{ => common}/error.rs | 0
.../transaction-pool/src/common/log_xt.rs | 54 +
.../src/{ => common}/metrics.rs | 80 +-
.../client/transaction-pool/src/common/mod.rs | 48 +
.../src/{ => common}/tests.rs | 11 +-
.../src/fork_aware_txpool/dropped_watcher.rs | 533 ++++
.../fork_aware_txpool/fork_aware_txpool.rs | 1563 ++++++++++
.../import_notification_sink.rs | 396 +++
.../src/fork_aware_txpool/metrics.rs | 176 ++
.../src/fork_aware_txpool/mod.rs | 376 +++
.../fork_aware_txpool/multi_view_listener.rs | 736 +++++
.../fork_aware_txpool/revalidation_worker.rs | 240 ++
.../src/fork_aware_txpool/tx_mem_pool.rs | 535 ++++
.../src/fork_aware_txpool/view.rs | 415 +++
.../src/fork_aware_txpool/view_store.rs | 468 +++
.../transaction-pool/src/graph/base_pool.rs | 269 +-
.../transaction-pool/src/graph/future.rs | 34 +-
.../transaction-pool/src/graph/listener.rs | 74 +-
.../client/transaction-pool/src/graph/mod.rs | 8 +-
.../client/transaction-pool/src/graph/pool.rs | 602 ++--
.../transaction-pool/src/graph/ready.rs | 14 +-
.../transaction-pool/src/graph/tracked_map.rs | 14 +
.../src/graph/validated_pool.rs | 106 +-
.../transaction-pool/src/graph/watcher.rs | 2 +-
substrate/client/transaction-pool/src/lib.rs | 794 +----
.../src/single_state_txpool/metrics.rs | 67 +
.../src/single_state_txpool/mod.rs | 26 +
.../{ => single_state_txpool}/revalidation.rs | 56 +-
.../single_state_txpool.rs | 790 +++++
.../src/transaction_pool_wrapper.rs | 198 ++
.../client/transaction-pool/tests/fatp.rs | 2617 +++++++++++++++++
.../transaction-pool/tests/fatp_common/mod.rs | 285 ++
.../transaction-pool/tests/fatp_limits.rs | 353 +++
.../client/transaction-pool/tests/pool.rs | 101 +-
.../runtime/src/transaction_validity.rs | 2 +-
.../runtime/transaction-pool/Cargo.toml | 1 +
.../runtime/transaction-pool/src/lib.rs | 141 +-
substrate/utils/frame/rpc/system/src/lib.rs | 36 +-
substrate/utils/prometheus/src/lib.rs | 4 +-
templates/minimal/node/src/service.rs | 17 +-
templates/parachain/node/src/service.rs | 19 +-
templates/solochain/node/src/service.rs | 17 +-
75 files changed, 11719 insertions(+), 1564 deletions(-)
create mode 100644 cumulus/zombienet/tests/0008-main.js
create mode 100644 cumulus/zombienet/tests/0008-parachain_extrinsic_gets_finalized.toml
create mode 100644 cumulus/zombienet/tests/0008-parachain_extrinsic_gets_finalized.zndsl
create mode 100644 cumulus/zombienet/tests/0008-transaction_gets_finalized.js
create mode 100644 prdoc/pr_4639.prdoc
create mode 100644 substrate/client/transaction-pool/src/builder.rs
rename substrate/client/transaction-pool/src/{ => common}/api.rs (87%)
rename substrate/client/transaction-pool/src/{ => common}/enactment_state.rs (94%)
rename substrate/client/transaction-pool/src/{ => common}/error.rs (100%)
create mode 100644 substrate/client/transaction-pool/src/common/log_xt.rs
rename substrate/client/transaction-pool/src/{ => common}/metrics.rs (58%)
create mode 100644 substrate/client/transaction-pool/src/common/mod.rs
rename substrate/client/transaction-pool/src/{ => common}/tests.rs (94%)
create mode 100644 substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs
create mode 100644 substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs
create mode 100644 substrate/client/transaction-pool/src/fork_aware_txpool/import_notification_sink.rs
create mode 100644 substrate/client/transaction-pool/src/fork_aware_txpool/metrics.rs
create mode 100644 substrate/client/transaction-pool/src/fork_aware_txpool/mod.rs
create mode 100644 substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs
create mode 100644 substrate/client/transaction-pool/src/fork_aware_txpool/revalidation_worker.rs
create mode 100644 substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs
create mode 100644 substrate/client/transaction-pool/src/fork_aware_txpool/view.rs
create mode 100644 substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs
create mode 100644 substrate/client/transaction-pool/src/single_state_txpool/metrics.rs
create mode 100644 substrate/client/transaction-pool/src/single_state_txpool/mod.rs
rename substrate/client/transaction-pool/src/{ => single_state_txpool}/revalidation.rs (91%)
create mode 100644 substrate/client/transaction-pool/src/single_state_txpool/single_state_txpool.rs
create mode 100644 substrate/client/transaction-pool/src/transaction_pool_wrapper.rs
create mode 100644 substrate/client/transaction-pool/tests/fatp.rs
create mode 100644 substrate/client/transaction-pool/tests/fatp_common/mod.rs
create mode 100644 substrate/client/transaction-pool/tests/fatp_limits.rs
diff --git a/Cargo.lock b/Cargo.lock
index 360e89d8ad89..40b36e2602fd 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -18394,6 +18394,7 @@ dependencies = [
"sc-service",
"sc-telemetry",
"sc-tracing",
+ "sc-transaction-pool",
"sc-utils",
"serde",
"serde_json",
@@ -19738,6 +19739,8 @@ dependencies = [
"criterion",
"futures",
"futures-timer",
+ "indexmap 2.2.3",
+ "itertools 0.11.0",
"linked-hash-map",
"log",
"parity-scale-codec",
@@ -19760,6 +19763,8 @@ dependencies = [
"substrate-test-runtime-client",
"substrate-test-runtime-transaction-pool",
"thiserror",
+ "tokio",
+ "tokio-stream",
]
[[package]]
@@ -23862,6 +23867,7 @@ name = "substrate-test-runtime-transaction-pool"
version = "2.0.0"
dependencies = [
"futures",
+ "log",
"parity-scale-codec",
"parking_lot 0.12.3",
"sc-transaction-pool",
diff --git a/cumulus/client/service/src/lib.rs b/cumulus/client/service/src/lib.rs
index 92dc64371f34..25b8ee10a931 100644
--- a/cumulus/client/service/src/lib.rs
+++ b/cumulus/client/service/src/lib.rs
@@ -417,7 +417,7 @@ pub struct BuildNetworkParams<
pub net_config:
sc_network::config::FullNetworkConfiguration::Hash, Network>,
pub client: Arc,
- pub transaction_pool: Arc>,
+ pub transaction_pool: Arc>,
pub para_id: ParaId,
pub relay_chain_interface: RCInterface,
pub spawn_handle: SpawnTaskHandle,
diff --git a/cumulus/polkadot-omni-node/lib/src/common/rpc.rs b/cumulus/polkadot-omni-node/lib/src/common/rpc.rs
index 85665c9b220f..4879bd1eb7f4 100644
--- a/cumulus/polkadot-omni-node/lib/src/common/rpc.rs
+++ b/cumulus/polkadot-omni-node/lib/src/common/rpc.rs
@@ -46,7 +46,7 @@ impl
BuildRpcExtensions<
ParachainClient,
ParachainBackend,
- sc_transaction_pool::FullPool>,
+ sc_transaction_pool::TransactionPoolHandle>,
> for BuildParachainRpcExtensions
where
RuntimeApi:
@@ -57,7 +57,9 @@ where
fn build_rpc_extensions(
client: Arc>,
backend: Arc>,
- pool: Arc>>,
+ pool: Arc<
+ sc_transaction_pool::TransactionPoolHandle>,
+ >,
) -> sc_service::error::Result {
let build = || -> Result> {
let mut module = RpcExtension::new(());
diff --git a/cumulus/polkadot-omni-node/lib/src/common/spec.rs b/cumulus/polkadot-omni-node/lib/src/common/spec.rs
index dca28b3c28f7..8397cb778dcf 100644
--- a/cumulus/polkadot-omni-node/lib/src/common/spec.rs
+++ b/cumulus/polkadot-omni-node/lib/src/common/spec.rs
@@ -40,7 +40,7 @@ use sc_service::{Configuration, ImportQueue, PartialComponents, TaskManager};
use sc_sysinfo::HwBench;
use sc_telemetry::{TelemetryHandle, TelemetryWorker};
use sc_tracing::tracing::Instrument;
-use sc_transaction_pool::FullPool;
+use sc_transaction_pool::TransactionPoolHandle;
use sp_keystore::KeystorePtr;
use std::{future::Future, pin::Pin, sync::Arc, time::Duration};
@@ -65,7 +65,7 @@ where
telemetry: Option,
task_manager: &TaskManager,
relay_chain_interface: Arc,
- transaction_pool: Arc>>,
+ transaction_pool: Arc>>,
keystore: KeystorePtr,
relay_chain_slot_duration: Duration,
para_id: ParaId,
@@ -149,12 +149,15 @@ pub(crate) trait BaseNodeSpec {
telemetry
});
- let transaction_pool = sc_transaction_pool::BasicPool::new_full(
- config.transaction_pool.clone(),
- config.role.is_authority().into(),
- config.prometheus_registry(),
- task_manager.spawn_essential_handle(),
- client.clone(),
+ let transaction_pool = Arc::from(
+ sc_transaction_pool::Builder::new(
+ task_manager.spawn_essential_handle(),
+ client.clone(),
+ config.role.is_authority().into(),
+ )
+ .with_options(config.transaction_pool.clone())
+ .with_prometheus(config.prometheus_registry())
+ .build(),
);
let block_import = ParachainBlockImport::new(client.clone(), backend.clone());
@@ -184,7 +187,7 @@ pub(crate) trait NodeSpec: BaseNodeSpec {
type BuildRpcExtensions: BuildRpcExtensions<
ParachainClient,
ParachainBackend,
- FullPool>,
+ TransactionPoolHandle>,
>;
type StartConsensus: StartConsensus;
diff --git a/cumulus/polkadot-omni-node/lib/src/common/types.rs b/cumulus/polkadot-omni-node/lib/src/common/types.rs
index 9cfdcb22451c..4bc58dc9db7e 100644
--- a/cumulus/polkadot-omni-node/lib/src/common/types.rs
+++ b/cumulus/polkadot-omni-node/lib/src/common/types.rs
@@ -20,7 +20,7 @@ use sc_consensus::DefaultImportQueue;
use sc_executor::WasmExecutor;
use sc_service::{PartialComponents, TFullBackend, TFullClient};
use sc_telemetry::{Telemetry, TelemetryWorkerHandle};
-use sc_transaction_pool::FullPool;
+use sc_transaction_pool::TransactionPoolHandle;
use sp_runtime::{generic, traits::BlakeTwo256};
use std::sync::Arc;
@@ -51,6 +51,6 @@ pub type ParachainService = PartialComponents<
ParachainBackend,
(),
DefaultImportQueue,
- FullPool>,
+ TransactionPoolHandle>,
(ParachainBlockImport, Option, Option),
>;
diff --git a/cumulus/polkadot-omni-node/lib/src/nodes/aura.rs b/cumulus/polkadot-omni-node/lib/src/nodes/aura.rs
index cf1ee91cbab5..ec5d0a439ec4 100644
--- a/cumulus/polkadot-omni-node/lib/src/nodes/aura.rs
+++ b/cumulus/polkadot-omni-node/lib/src/nodes/aura.rs
@@ -52,7 +52,7 @@ use sc_consensus::{
};
use sc_service::{Configuration, Error, TaskManager};
use sc_telemetry::TelemetryHandle;
-use sc_transaction_pool::FullPool;
+use sc_transaction_pool::TransactionPoolHandle;
use sp_api::ProvideRuntimeApi;
use sp_inherents::CreateInherentDataProviders;
use sp_keystore::KeystorePtr;
@@ -291,7 +291,7 @@ where
telemetry: Option,
task_manager: &TaskManager,
relay_chain_interface: Arc,
- transaction_pool: Arc>>,
+ transaction_pool: Arc>>,
keystore: KeystorePtr,
_relay_chain_slot_duration: Duration,
para_id: ParaId,
@@ -387,7 +387,7 @@ where
telemetry: Option,
task_manager: &TaskManager,
relay_chain_interface: Arc,
- transaction_pool: Arc>>,
+ transaction_pool: Arc>>,
keystore: KeystorePtr,
relay_chain_slot_duration: Duration,
para_id: ParaId,
diff --git a/cumulus/test/service/src/lib.rs b/cumulus/test/service/src/lib.rs
index 9f93572e9cea..a13399d3a40e 100644
--- a/cumulus/test/service/src/lib.rs
+++ b/cumulus/test/service/src/lib.rs
@@ -134,7 +134,7 @@ pub type Backend = TFullBackend;
pub type ParachainBlockImport = TParachainBlockImport, Backend>;
/// Transaction pool type used by the test service
-pub type TransactionPool = Arc>;
+pub type TransactionPool = Arc>;
/// Recovery handle that fails regularly to simulate unavailable povs.
pub struct FailingRecoveryHandle {
@@ -183,7 +183,7 @@ pub type Service = PartialComponents<
Backend,
(),
sc_consensus::import_queue::BasicQueue,
- sc_transaction_pool::FullPool,
+ sc_transaction_pool::TransactionPoolHandle,
ParachainBlockImport,
>;
@@ -219,12 +219,15 @@ pub fn new_partial(
let block_import = ParachainBlockImport::new(client.clone(), backend.clone());
- let transaction_pool = sc_transaction_pool::BasicPool::new_full(
- config.transaction_pool.clone(),
- config.role.is_authority().into(),
- config.prometheus_registry(),
- task_manager.spawn_essential_handle(),
- client.clone(),
+ let transaction_pool = Arc::from(
+ sc_transaction_pool::Builder::new(
+ task_manager.spawn_essential_handle(),
+ client.clone(),
+ config.role.is_authority().into(),
+ )
+ .with_options(config.transaction_pool.clone())
+ .with_prometheus(config.prometheus_registry())
+ .build(),
);
let slot_duration = sc_consensus_aura::slot_duration(&*client)?;
diff --git a/cumulus/zombienet/tests/0008-main.js b/cumulus/zombienet/tests/0008-main.js
new file mode 100644
index 000000000000..31c01324a77e
--- /dev/null
+++ b/cumulus/zombienet/tests/0008-main.js
@@ -0,0 +1,18 @@
+// Allows to manually submit extrinsic to collator.
+// Usage:
+// zombienet-linux -p native spwan 0008-parachain-extrinsic-gets-finalized.toml
+// node 0008-main.js
+
+global.zombie = null
+
+const fs = require('fs');
+const test = require('./0008-transaction_gets_finalized.js');
+
+if (process.argv.length == 2) {
+ console.error('Path to zombie.json (generated by zombienet-linux spawn command shall be given)!');
+ process.exit(1);
+}
+
+let networkInfo = JSON.parse(fs.readFileSync(process.argv[2]));
+
+test.run("charlie", networkInfo).then(process.exit)
diff --git a/cumulus/zombienet/tests/0008-parachain_extrinsic_gets_finalized.toml b/cumulus/zombienet/tests/0008-parachain_extrinsic_gets_finalized.toml
new file mode 100644
index 000000000000..a295d3960bfe
--- /dev/null
+++ b/cumulus/zombienet/tests/0008-parachain_extrinsic_gets_finalized.toml
@@ -0,0 +1,25 @@
+[relaychain]
+default_image = "{{RELAY_IMAGE}}"
+default_command = "polkadot"
+chain = "rococo-local"
+
+ [[relaychain.nodes]]
+ name = "alice"
+ validator = true
+
+ [[relaychain.nodes]]
+ name = "bob"
+ validator = true
+
+[[parachains]]
+id = 2000
+cumulus_based = true
+chain = "asset-hub-rococo-local"
+
+ # run charlie as parachain collator
+ [[parachains.collators]]
+ name = "charlie"
+ validator = true
+ image = "{{POLKADOT_PARACHAIN_IMAGE}}"
+ command = "polkadot-parachain"
+ args = ["--force-authoring", "-ltxpool=trace", "--pool-type=fork-aware"]
diff --git a/cumulus/zombienet/tests/0008-parachain_extrinsic_gets_finalized.zndsl b/cumulus/zombienet/tests/0008-parachain_extrinsic_gets_finalized.zndsl
new file mode 100644
index 000000000000..5aab1bd923a5
--- /dev/null
+++ b/cumulus/zombienet/tests/0008-parachain_extrinsic_gets_finalized.zndsl
@@ -0,0 +1,20 @@
+Description: Block building
+Network: ./0008-parachain_extrinsic_gets_finalized.toml
+Creds: config
+
+alice: reports node_roles is 4
+bob: reports node_roles is 4
+charlie: reports node_roles is 4
+
+alice: reports peers count is at least 1
+bob: reports peers count is at least 1
+
+alice: reports block height is at least 5 within 60 seconds
+bob: reports block height is at least 5 within 60 seconds
+charlie: reports block height is at least 2 within 120 seconds
+
+alice: count of log lines containing "error" is 0 within 2 seconds
+bob: count of log lines containing "error" is 0 within 2 seconds
+charlie: count of log lines containing "error" is 0 within 2 seconds
+
+charlie: js-script ./0008-transaction_gets_finalized.js within 600 seconds
diff --git a/cumulus/zombienet/tests/0008-transaction_gets_finalized.js b/cumulus/zombienet/tests/0008-transaction_gets_finalized.js
new file mode 100644
index 000000000000..3031c45e3a4b
--- /dev/null
+++ b/cumulus/zombienet/tests/0008-transaction_gets_finalized.js
@@ -0,0 +1,69 @@
+//based on: https://polkadot.js.org/docs/api/examples/promise/transfer-events
+
+const assert = require("assert");
+
+async function run(nodeName, networkInfo, args) {
+ const {wsUri, userDefinedTypes} = networkInfo.nodesByName[nodeName];
+ // Create the API and wait until ready
+ var api = null;
+ var keyring = null;
+ if (zombie == null) {
+ const testKeyring = require('@polkadot/keyring/testing');
+ const { WsProvider, ApiPromise } = require('@polkadot/api');
+ const provider = new WsProvider(wsUri);
+ api = await ApiPromise.create({provider});
+ // Construct the keyring after the API (crypto has an async init)
+ keyring = testKeyring.createTestKeyring({ type: "sr25519" });
+ } else {
+ keyring = new zombie.Keyring({ type: "sr25519" });
+ api = await zombie.connect(wsUri, userDefinedTypes);
+ }
+
+
+ // Add Alice to our keyring with a hard-derivation path (empty phrase, so uses dev)
+ const alice = keyring.addFromUri('//Alice');
+
+ // Create an extrinsic:
+ const extrinsic = api.tx.system.remark("xxx");
+
+ let extrinsic_success_event = false;
+ try {
+ await new Promise( async (resolve, reject) => {
+ const unsubscribe = await extrinsic
+ .signAndSend(alice, { nonce: -1 }, ({ events = [], status }) => {
+ console.log('Extrinsic status:', status.type);
+
+ if (status.isInBlock) {
+ console.log('Included at block hash', status.asInBlock.toHex());
+ console.log('Events:');
+
+ events.forEach(({ event: { data, method, section }, phase }) => {
+ console.log('\t', phase.toString(), `: ${section}.${method}`, data.toString());
+
+ if (section=="system" && method =="ExtrinsicSuccess") {
+ extrinsic_success_event = true;
+ }
+ });
+ } else if (status.isFinalized) {
+ console.log('Finalized block hash', status.asFinalized.toHex());
+ unsubscribe();
+ if (extrinsic_success_event) {
+ resolve();
+ } else {
+ reject("ExtrinsicSuccess has not been seen");
+ }
+ } else if (status.isError) {
+ unsubscribe();
+ reject("Extrinsic status.isError");
+ }
+
+ });
+ });
+ } catch (error) {
+ assert.fail("Transfer promise failed, error: " + error);
+ }
+
+ assert.ok("test passed");
+}
+
+module.exports = { run }
diff --git a/polkadot/node/service/src/lib.rs b/polkadot/node/service/src/lib.rs
index 9515dd231138..da3ab760ed22 100644
--- a/polkadot/node/service/src/lib.rs
+++ b/polkadot/node/service/src/lib.rs
@@ -450,7 +450,7 @@ fn new_partial(
FullBackend,
ChainSelection,
sc_consensus::DefaultImportQueue,
- sc_transaction_pool::FullPool,
+ sc_transaction_pool::TransactionPoolHandle,
(
impl Fn(
polkadot_rpc::SubscriptionTaskExecutor,
@@ -478,12 +478,15 @@ fn new_partial(
where
ChainSelection: 'static + SelectChain,
{
- let transaction_pool = sc_transaction_pool::BasicPool::new_full(
- config.transaction_pool.clone(),
- config.role.is_authority().into(),
- config.prometheus_registry(),
- task_manager.spawn_essential_handle(),
- client.clone(),
+ let transaction_pool = Arc::from(
+ sc_transaction_pool::Builder::new(
+ task_manager.spawn_essential_handle(),
+ client.clone(),
+ config.role.is_authority().into(),
+ )
+ .with_options(config.transaction_pool.clone())
+ .with_prometheus(config.prometheus_registry())
+ .build(),
);
let grandpa_hard_forks = if config.chain_spec.is_kusama() {
diff --git a/prdoc/pr_4639.prdoc b/prdoc/pr_4639.prdoc
new file mode 100644
index 000000000000..dfdd60f2bdb2
--- /dev/null
+++ b/prdoc/pr_4639.prdoc
@@ -0,0 +1,69 @@
+title: "Added the fork-aware transaction pool implementation"
+
+doc:
+ - audience: Node Dev
+ description: |
+ Most important changes introduced by this PR:
+ - The transaction pool references spread across codebase are now wrapper to a transaction pool trait object,
+ - The fork-aware pool implementation was added.
+ - The `sc-transaction-pool` refactored,
+ - Trasnaction pool builder was introduced to allow to instantiation of either old or new transaction pool. Refer to PR description for
+ more details on how to enable fork-aware pool in the custom node.
+ - audience: Node Operator
+ description: |
+ - New command line option was added, allowing to select implementation of transaction pool:
+ - `--pool-type=fork-aware` - new fork aware transaction pool,
+ - `--pool-type=single-state` - old transaction pool implementation which is still default,
+
+crates:
+ - name: sc-basic-authorship
+ bump: patch
+ - name: sc-cli
+ bump: major
+ - name: sc-consensus-manual-seal
+ bump: patch
+ - name: sc-network-transactions
+ bump: none
+ - name: sc-rpc
+ bump: patch
+ - name: sc-rpc-spec-v2
+ bump: patch
+ - name: sc-offchain
+ bump: patch
+ - name: sc-service
+ bump: patch
+ - name: sc-service-test
+ bump: minor
+ - name: sc-transaction-pool
+ bump: major
+ - name: sc-transaction-pool-api
+ bump: major
+ validate: false
+ - name: sp-runtime
+ bump: patch
+ - name: substrate-test-runtime-transaction-pool
+ bump: minor
+ - name: staging-node-cli
+ bump: minor
+ - name: node-bench
+ bump: patch
+ - name: node-rpc
+ bump: minor
+ - name: substrate-prometheus-endpoint
+ bump: patch
+ - name: substrate-frame-rpc-system
+ bump: patch
+ - name: minimal-template-node
+ bump: minor
+ - name: parachain-template-node
+ bump: minor
+ - name: solochain-template-node
+ bump: minor
+ - name: polkadot-service
+ bump: patch
+ - name: cumulus-client-service
+ bump: patch
+ - name: cumulus-test-service
+ bump: major
+ - name: polkadot-omni-node-lib
+ bump: patch
diff --git a/substrate/bin/node/bench/src/construct.rs b/substrate/bin/node/bench/src/construct.rs
index 23d0a0cc1ee5..bed6e3d914c2 100644
--- a/substrate/bin/node/bench/src/construct.rs
+++ b/substrate/bin/node/bench/src/construct.rs
@@ -35,7 +35,7 @@ use sc_transaction_pool_api::{
};
use sp_consensus::{Environment, Proposer};
use sp_inherents::InherentDataProvider;
-use sp_runtime::{traits::NumberFor, OpaqueExtrinsic};
+use sp_runtime::OpaqueExtrinsic;
use crate::{
common::SizeType,
@@ -165,18 +165,18 @@ impl core::Benchmark for ConstructionBenchmark {
#[derive(Clone, Debug)]
pub struct PoolTransaction {
- data: OpaqueExtrinsic,
+ data: Arc,
hash: node_primitives::Hash,
}
impl From for PoolTransaction {
fn from(e: OpaqueExtrinsic) -> Self {
- PoolTransaction { data: e, hash: node_primitives::Hash::zero() }
+ PoolTransaction { data: Arc::from(e), hash: node_primitives::Hash::zero() }
}
}
impl sc_transaction_pool_api::InPoolTransaction for PoolTransaction {
- type Transaction = OpaqueExtrinsic;
+ type Transaction = Arc;
type Hash = node_primitives::Hash;
fn data(&self) -> &Self::Transaction {
@@ -261,7 +261,7 @@ impl sc_transaction_pool_api::TransactionPool for Transactions {
fn ready_at(
&self,
- _at: NumberFor,
+ _at: Self::Hash,
) -> Pin<
Box<
dyn Future<
@@ -305,4 +305,19 @@ impl sc_transaction_pool_api::TransactionPool for Transactions {
fn ready_transaction(&self, _hash: &TxHash) -> Option> {
unimplemented!()
}
+
+ fn ready_at_with_timeout(
+ &self,
+ _at: Self::Hash,
+ _timeout: std::time::Duration,
+ ) -> Pin<
+ Box<
+ dyn Future<
+ Output = Box> + Send>,
+ > + Send
+ + '_,
+ >,
+ > {
+ unimplemented!()
+ }
}
diff --git a/substrate/bin/node/cli/benches/transaction_pool.rs b/substrate/bin/node/cli/benches/transaction_pool.rs
index efec081427f4..c07cb3ec0d13 100644
--- a/substrate/bin/node/cli/benches/transaction_pool.rs
+++ b/substrate/bin/node/cli/benches/transaction_pool.rs
@@ -16,15 +16,16 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see .
-use polkadot_sdk::*;
-use std::time::Duration;
-
use criterion::{criterion_group, criterion_main, BatchSize, Criterion, Throughput};
use futures::{future, StreamExt};
use kitchensink_runtime::{constants::currency::*, BalancesCall, SudoCall};
use node_cli::service::{create_extrinsic, fetch_nonce, FullClient, TransactionPool};
use node_primitives::AccountId;
-use polkadot_sdk::sc_service::config::{ExecutorConfiguration, RpcConfiguration};
+use polkadot_sdk::{
+ sc_service::config::{ExecutorConfiguration, RpcConfiguration},
+ sc_transaction_pool_api::TransactionPool as _,
+ *,
+};
use sc_service::{
config::{
BlocksPruning, DatabaseSource, KeystoreConfig, NetworkConfiguration, OffchainWorkerConfig,
@@ -32,8 +33,7 @@ use sc_service::{
},
BasePath, Configuration, Role,
};
-use sc_transaction_pool::PoolLimit;
-use sc_transaction_pool_api::{TransactionPool as _, TransactionSource, TransactionStatus};
+use sc_transaction_pool_api::{TransactionSource, TransactionStatus};
use sp_core::{crypto::Pair, sr25519};
use sp_keyring::Sr25519Keyring;
use sp_runtime::OpaqueExtrinsic;
@@ -58,12 +58,7 @@ fn new_node(tokio_handle: Handle) -> node_cli::service::NewFullBase {
impl_version: "1.0".into(),
role: Role::Authority,
tokio_handle: tokio_handle.clone(),
- transaction_pool: TransactionPoolOptions {
- ready: PoolLimit { count: 100_000, total_bytes: 100 * 1024 * 1024 },
- future: PoolLimit { count: 100_000, total_bytes: 100 * 1024 * 1024 },
- reject_future_transactions: false,
- ban_time: Duration::from_secs(30 * 60),
- },
+ transaction_pool: TransactionPoolOptions::new_for_benchmarks(),
network: network_config,
keystore: KeystoreConfig::InMemory,
database: DatabaseSource::RocksDb { path: root.join("db"), cache_size: 128 },
diff --git a/substrate/bin/node/cli/src/service.rs b/substrate/bin/node/cli/src/service.rs
index 69e953f54e42..4eb1db185e9b 100644
--- a/substrate/bin/node/cli/src/service.rs
+++ b/substrate/bin/node/cli/src/service.rs
@@ -42,6 +42,7 @@ use sc_network_sync::{strategy::warp::WarpSyncConfig, SyncingService};
use sc_service::{config::Configuration, error::Error as ServiceError, RpcHandlers, TaskManager};
use sc_statement_store::Store as StatementStore;
use sc_telemetry::{Telemetry, TelemetryWorker};
+use sc_transaction_pool::TransactionPoolHandle;
use sc_transaction_pool_api::OffchainTransactionPoolFactory;
use sp_api::ProvideRuntimeApi;
use sp_core::crypto::Pair;
@@ -80,7 +81,7 @@ type FullBeefyBlockImport = beefy::import::BeefyBlockImport<
>;
/// The transaction pool type definition.
-pub type TransactionPool = sc_transaction_pool::FullPool;
+pub type TransactionPool = sc_transaction_pool::TransactionPoolHandle;
/// The minimum period of blocks on which justifications will be
/// imported and generated.
@@ -175,7 +176,7 @@ pub fn new_partial(
FullBackend,
FullSelectChain,
sc_consensus::DefaultImportQueue,
- sc_transaction_pool::FullPool,
+ sc_transaction_pool::TransactionPoolHandle,
(
impl Fn(
sc_rpc::SubscriptionTaskExecutor,
@@ -226,12 +227,15 @@ pub fn new_partial(
let select_chain = sc_consensus::LongestChain::new(backend.clone());
- let transaction_pool = sc_transaction_pool::BasicPool::new_full(
- config.transaction_pool.clone(),
- config.role.is_authority().into(),
- config.prometheus_registry(),
- task_manager.spawn_essential_handle(),
- client.clone(),
+ let transaction_pool = Arc::from(
+ sc_transaction_pool::Builder::new(
+ task_manager.spawn_essential_handle(),
+ client.clone(),
+ config.role.is_authority().into(),
+ )
+ .with_options(config.transaction_pool.clone())
+ .with_prometheus(config.prometheus_registry())
+ .build(),
);
let (grandpa_block_import, grandpa_link) = grandpa::block_import(
@@ -385,7 +389,7 @@ pub struct NewFullBase {
/// The syncing service of the node.
pub sync: Arc>,
/// The transaction pool of the node.
- pub transaction_pool: Arc,
+ pub transaction_pool: Arc>,
/// The rpc handlers of the node.
pub rpc_handlers: RpcHandlers,
}
@@ -865,14 +869,14 @@ mod tests {
Address, BalancesCall, RuntimeCall, UncheckedExtrinsic,
};
use node_primitives::{Block, DigestItem, Signature};
- use polkadot_sdk::*;
+ use polkadot_sdk::{sc_transaction_pool_api::MaintainedTransactionPool, *};
use sc_client_api::BlockBackend;
use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy};
use sc_consensus_babe::{BabeIntermediate, CompatibleDigestItem, INTERMEDIATE_KEY};
use sc_consensus_epochs::descendent_query;
use sc_keystore::LocalKeystore;
use sc_service_test::TestNetNode;
- use sc_transaction_pool_api::{ChainEvent, MaintainedTransactionPool};
+ use sc_transaction_pool_api::ChainEvent;
use sp_consensus::{BlockOrigin, Environment, Proposer};
use sp_core::crypto::Pair;
use sp_inherents::InherentDataProvider;
diff --git a/substrate/client/basic-authorship/src/basic_authorship.rs b/substrate/client/basic-authorship/src/basic_authorship.rs
index 527a3d12d9e7..79e6fddae99f 100644
--- a/substrate/client/basic-authorship/src/basic_authorship.rs
+++ b/substrate/client/basic-authorship/src/basic_authorship.rs
@@ -25,7 +25,6 @@ use futures::{
channel::oneshot,
future,
future::{Future, FutureExt},
- select,
};
use log::{debug, error, info, trace, warn};
use sc_block_builder::{BlockBuilderApi, BlockBuilderBuilder};
@@ -416,26 +415,13 @@ where
let mut skipped = 0;
let mut unqueue_invalid = Vec::new();
- let mut t1 = self.transaction_pool.ready_at(self.parent_number).fuse();
- let mut t2 =
- futures_timer::Delay::new(deadline.saturating_duration_since((self.now)()) / 8).fuse();
-
- let mut pending_iterator = select! {
- res = t1 => res,
- _ = t2 => {
- warn!(target: LOG_TARGET,
- "Timeout fired waiting for transaction pool at block #{}. \
- Proceeding with production.",
- self.parent_number,
- );
- self.transaction_pool.ready()
- },
- };
+ let delay = deadline.saturating_duration_since((self.now)()) / 8;
+ let mut pending_iterator =
+ self.transaction_pool.ready_at_with_timeout(self.parent_hash, delay).await;
let block_size_limit = block_size_limit.unwrap_or(self.default_block_size_limit);
- debug!(target: LOG_TARGET, "Attempting to push transactions from the pool.");
- debug!(target: LOG_TARGET, "Pool status: {:?}", self.transaction_pool.status());
+ debug!(target: LOG_TARGET, "Attempting to push transactions from the pool at {:?}.", self.parent_hash);
let mut transaction_pushed = false;
let end_reason = loop {
@@ -460,7 +446,7 @@ where
break EndProposingReason::HitDeadline
}
- let pending_tx_data = pending_tx.data().clone();
+ let pending_tx_data = (**pending_tx.data()).clone();
let pending_tx_hash = pending_tx.hash().clone();
let block_size =
@@ -524,7 +510,7 @@ where
pending_iterator.report_invalid(&pending_tx);
debug!(
target: LOG_TARGET,
- "[{:?}] Invalid transaction: {}", pending_tx_hash, e
+ "[{:?}] Invalid transaction: {} at: {}", pending_tx_hash, e, self.parent_hash
);
unqueue_invalid.push(pending_tx_hash);
},
@@ -577,13 +563,25 @@ where
)
};
- info!(
- "🎁 Prepared block for proposing at {} ({} ms) [hash: {:?}; parent_hash: {}; {extrinsics_summary}",
- block.header().number(),
- block_took.as_millis(),
- ::Hash::from(block.header().hash()),
- block.header().parent_hash(),
- );
+ if log::log_enabled!(log::Level::Info) {
+ info!(
+ "🎁 Prepared block for proposing at {} ({} ms) [hash: {:?}; parent_hash: {}; extrinsics_count: {}",
+ block.header().number(),
+ block_took.as_millis(),
+ ::Hash::from(block.header().hash()),
+ block.header().parent_hash(),
+ extrinsics.len()
+ )
+ } else if log::log_enabled!(log::Level::Debug) {
+ debug!(
+ "🎁 Prepared block for proposing at {} ({} ms) [hash: {:?}; parent_hash: {}; {extrinsics_summary}",
+ block.header().number(),
+ block_took.as_millis(),
+ ::Hash::from(block.header().hash()),
+ block.header().parent_hash(),
+ );
+ }
+
telemetry!(
self.telemetry;
CONSENSUS_INFO;
@@ -643,22 +641,20 @@ mod tests {
// given
let client = Arc::new(substrate_test_runtime_client::new());
let spawner = sp_core::testing::TaskExecutor::new();
- let txpool = BasicPool::new_full(
+ let txpool = Arc::from(BasicPool::new_full(
Default::default(),
true.into(),
None,
spawner.clone(),
client.clone(),
- );
+ ));
let hashof0 = client.info().genesis_hash;
block_on(txpool.submit_at(hashof0, SOURCE, vec![extrinsic(0), extrinsic(1)])).unwrap();
block_on(
txpool.maintain(chain_event(
- client
- .expect_header(client.info().genesis_hash)
- .expect("there should be header"),
+ client.expect_header(hashof0).expect("there should be header"),
)),
);
@@ -698,13 +694,13 @@ mod tests {
fn should_not_panic_when_deadline_is_reached() {
let client = Arc::new(substrate_test_runtime_client::new());
let spawner = sp_core::testing::TaskExecutor::new();
- let txpool = BasicPool::new_full(
+ let txpool = Arc::from(BasicPool::new_full(
Default::default(),
true.into(),
None,
spawner.clone(),
client.clone(),
- );
+ ));
let mut proposer_factory =
ProposerFactory::new(spawner.clone(), client.clone(), txpool.clone(), None, None);
@@ -735,13 +731,13 @@ mod tests {
let (client, backend) = TestClientBuilder::new().build_with_backend();
let client = Arc::new(client);
let spawner = sp_core::testing::TaskExecutor::new();
- let txpool = BasicPool::new_full(
+ let txpool = Arc::from(BasicPool::new_full(
Default::default(),
true.into(),
None,
spawner.clone(),
client.clone(),
- );
+ ));
let genesis_hash = client.info().best_hash;
@@ -791,13 +787,13 @@ mod tests {
// given
let client = Arc::new(substrate_test_runtime_client::new());
let spawner = sp_core::testing::TaskExecutor::new();
- let txpool = BasicPool::new_full(
+ let txpool = Arc::from(BasicPool::new_full(
Default::default(),
true.into(),
None,
spawner.clone(),
client.clone(),
- );
+ ));
let medium = |nonce| {
ExtrinsicBuilder::new_fill_block(Perbill::from_parts(MEDIUM))
@@ -871,27 +867,27 @@ mod tests {
// let's create one block and import it
let block = propose_block(&client, 0, 2, 7);
- import_and_maintain(client.clone(), block);
+ import_and_maintain(client.clone(), block.clone());
assert_eq!(txpool.ready().count(), 5);
// now let's make sure that we can still make some progress
let block = propose_block(&client, 1, 1, 5);
- import_and_maintain(client.clone(), block);
+ import_and_maintain(client.clone(), block.clone());
assert_eq!(txpool.ready().count(), 4);
// again let's make sure that we can still make some progress
let block = propose_block(&client, 2, 1, 4);
- import_and_maintain(client.clone(), block);
+ import_and_maintain(client.clone(), block.clone());
assert_eq!(txpool.ready().count(), 3);
// again let's make sure that we can still make some progress
let block = propose_block(&client, 3, 1, 3);
- import_and_maintain(client.clone(), block);
+ import_and_maintain(client.clone(), block.clone());
assert_eq!(txpool.ready().count(), 2);
// again let's make sure that we can still make some progress
let block = propose_block(&client, 4, 2, 2);
- import_and_maintain(client.clone(), block);
+ import_and_maintain(client.clone(), block.clone());
assert_eq!(txpool.ready().count(), 0);
}
@@ -899,13 +895,13 @@ mod tests {
fn should_cease_building_block_when_block_limit_is_reached() {
let client = Arc::new(substrate_test_runtime_client::new());
let spawner = sp_core::testing::TaskExecutor::new();
- let txpool = BasicPool::new_full(
+ let txpool = Arc::from(BasicPool::new_full(
Default::default(),
true.into(),
None,
spawner.clone(),
client.clone(),
- );
+ ));
let genesis_hash = client.info().genesis_hash;
let genesis_header = client.expect_header(genesis_hash).expect("there should be header");
@@ -1004,13 +1000,13 @@ mod tests {
// given
let client = Arc::new(substrate_test_runtime_client::new());
let spawner = sp_core::testing::TaskExecutor::new();
- let txpool = BasicPool::new_full(
+ let txpool = Arc::from(BasicPool::new_full(
Default::default(),
true.into(),
None,
spawner.clone(),
client.clone(),
- );
+ ));
let genesis_hash = client.info().genesis_hash;
let tiny = |nonce| {
@@ -1073,13 +1069,13 @@ mod tests {
// given
let client = Arc::new(substrate_test_runtime_client::new());
let spawner = sp_core::testing::TaskExecutor::new();
- let txpool = BasicPool::new_full(
+ let txpool = Arc::from(BasicPool::new_full(
Default::default(),
true.into(),
None,
spawner.clone(),
client.clone(),
- );
+ ));
let genesis_hash = client.info().genesis_hash;
let tiny = |who| {
diff --git a/substrate/client/basic-authorship/src/lib.rs b/substrate/client/basic-authorship/src/lib.rs
index 8f47c2ea00e6..adea7a3571dd 100644
--- a/substrate/client/basic-authorship/src/lib.rs
+++ b/substrate/client/basic-authorship/src/lib.rs
@@ -32,13 +32,13 @@
//! # use sc_transaction_pool::{BasicPool, FullChainApi};
//! # let client = Arc::new(substrate_test_runtime_client::new());
//! # let spawner = sp_core::testing::TaskExecutor::new();
-//! # let txpool = BasicPool::new_full(
+//! # let txpool = Arc::from(BasicPool::new_full(
//! # Default::default(),
//! # true.into(),
//! # None,
//! # spawner.clone(),
//! # client.clone(),
-//! # );
+//! # ));
//! // The first step is to create a `ProposerFactory`.
//! let mut proposer_factory = ProposerFactory::new(
//! spawner,
diff --git a/substrate/client/cli/Cargo.toml b/substrate/client/cli/Cargo.toml
index b7d29aebc3d7..f0b9f8f9b905 100644
--- a/substrate/client/cli/Cargo.toml
+++ b/substrate/client/cli/Cargo.toml
@@ -43,6 +43,7 @@ sc-network = { workspace = true, default-features = true }
sc-service = { workspace = true }
sc-telemetry = { workspace = true, default-features = true }
sc-tracing = { workspace = true, default-features = true }
+sc-transaction-pool = { workspace = true, default-features = true }
sc-utils = { workspace = true, default-features = true }
sp-blockchain = { workspace = true, default-features = true }
sp-core = { workspace = true, default-features = true }
diff --git a/substrate/client/cli/src/params/transaction_pool_params.rs b/substrate/client/cli/src/params/transaction_pool_params.rs
index 48b2e5b1572b..9cf738f58b6b 100644
--- a/substrate/client/cli/src/params/transaction_pool_params.rs
+++ b/substrate/client/cli/src/params/transaction_pool_params.rs
@@ -16,8 +16,28 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see .
-use clap::Args;
-use sc_service::config::TransactionPoolOptions;
+use clap::{Args, ValueEnum};
+use sc_transaction_pool::TransactionPoolOptions;
+
+/// Type of transaction pool to be used
+#[derive(Debug, Clone, Copy, ValueEnum)]
+#[value(rename_all = "kebab-case")]
+pub enum TransactionPoolType {
+ /// Uses a legacy, single-state transaction pool.
+ SingleState,
+ /// Uses a fork-aware transaction pool.
+ ForkAware,
+}
+
+impl Into for TransactionPoolType {
+ fn into(self) -> sc_transaction_pool::TransactionPoolType {
+ match self {
+ TransactionPoolType::SingleState =>
+ sc_transaction_pool::TransactionPoolType::SingleState,
+ TransactionPoolType::ForkAware => sc_transaction_pool::TransactionPoolType::ForkAware,
+ }
+ }
+}
/// Parameters used to create the pool configuration.
#[derive(Debug, Clone, Args)]
@@ -35,30 +55,21 @@ pub struct TransactionPoolParams {
/// If it is considered invalid. Defaults to 1800s.
#[arg(long, value_name = "SECONDS")]
pub tx_ban_seconds: Option,
+
+ /// The type of transaction pool to be instantiated.
+ #[arg(long, value_enum, default_value_t = TransactionPoolType::SingleState)]
+ pub pool_type: TransactionPoolType,
}
impl TransactionPoolParams {
/// Fill the given `PoolConfiguration` by looking at the cli parameters.
pub fn transaction_pool(&self, is_dev: bool) -> TransactionPoolOptions {
- let mut opts = TransactionPoolOptions::default();
-
- // ready queue
- opts.ready.count = self.pool_limit;
- opts.ready.total_bytes = self.pool_kbytes * 1024;
-
- // future queue
- let factor = 10;
- opts.future.count = self.pool_limit / factor;
- opts.future.total_bytes = self.pool_kbytes * 1024 / factor;
-
- opts.ban_time = if let Some(ban_seconds) = self.tx_ban_seconds {
- std::time::Duration::from_secs(ban_seconds)
- } else if is_dev {
- std::time::Duration::from_secs(0)
- } else {
- std::time::Duration::from_secs(30 * 60)
- };
-
- opts
+ TransactionPoolOptions::new_with_params(
+ self.pool_limit,
+ self.pool_kbytes * 1024,
+ self.tx_ban_seconds,
+ self.pool_type.into(),
+ is_dev,
+ )
}
}
diff --git a/substrate/client/network/transactions/src/lib.rs b/substrate/client/network/transactions/src/lib.rs
index a241041968fd..2b5297fe0e13 100644
--- a/substrate/client/network/transactions/src/lib.rs
+++ b/substrate/client/network/transactions/src/lib.rs
@@ -462,6 +462,8 @@ where
if let Some(transaction) = self.transaction_pool.transaction(hash) {
let propagated_to = self.do_propagate_transactions(&[(hash.clone(), transaction)]);
self.transaction_pool.on_broadcasted(propagated_to);
+ } else {
+ debug!(target: "sync", "Propagating transaction failure [{:?}]", hash);
}
}
diff --git a/substrate/client/offchain/src/lib.rs b/substrate/client/offchain/src/lib.rs
index 7cee64e6ce7e..3d5728aad17d 100644
--- a/substrate/client/offchain/src/lib.rs
+++ b/substrate/client/offchain/src/lib.rs
@@ -446,8 +446,13 @@ mod tests {
let client = Arc::new(substrate_test_runtime_client::new());
let spawner = sp_core::testing::TaskExecutor::new();
- let pool =
- BasicPool::new_full(Default::default(), true.into(), None, spawner, client.clone());
+ let pool = Arc::from(BasicPool::new_full(
+ Default::default(),
+ true.into(),
+ None,
+ spawner,
+ client.clone(),
+ ));
let network = Arc::new(TestNetwork());
let header = client.header(client.chain_info().genesis_hash).unwrap().unwrap();
diff --git a/substrate/client/rpc-spec-v2/src/transaction/tests/middleware_pool.rs b/substrate/client/rpc-spec-v2/src/transaction/tests/middleware_pool.rs
index aa8ac572dec9..adcc987f9c39 100644
--- a/substrate/client/rpc-spec-v2/src/transaction/tests/middleware_pool.rs
+++ b/substrate/client/rpc-spec-v2/src/transaction/tests/middleware_pool.rs
@@ -27,7 +27,7 @@ use sc_transaction_pool_api::{
use crate::hex_string;
use futures::{FutureExt, StreamExt};
-use sp_runtime::traits::{Block as BlockT, NumberFor};
+use sp_runtime::traits::Block as BlockT;
use std::{collections::HashMap, pin::Pin, sync::Arc};
use substrate_test_runtime_transaction_pool::TestApi;
use tokio::sync::mpsc;
@@ -166,7 +166,7 @@ impl TransactionPool for MiddlewarePool {
fn ready_at(
&self,
- at: NumberFor,
+ at: ::Hash,
) -> Pin<
Box<
dyn Future<
@@ -184,4 +184,19 @@ impl TransactionPool for MiddlewarePool {
fn futures(&self) -> Vec {
self.inner_pool.futures()
}
+
+ fn ready_at_with_timeout(
+ &self,
+ at: ::Hash,
+ _timeout: std::time::Duration,
+ ) -> Pin<
+ Box<
+ dyn Future<
+ Output = Box> + Send>,
+ > + Send
+ + '_,
+ >,
+ > {
+ self.inner_pool.ready_at(at)
+ }
}
diff --git a/substrate/client/rpc/src/author/tests.rs b/substrate/client/rpc/src/author/tests.rs
index bde60960eaf4..ab0b8bdab699 100644
--- a/substrate/client/rpc/src/author/tests.rs
+++ b/substrate/client/rpc/src/author/tests.rs
@@ -66,8 +66,13 @@ impl Default for TestSetup {
let client = Arc::new(substrate_test_runtime_client::TestClientBuilder::new().build());
let spawner = sp_core::testing::TaskExecutor::new();
- let pool =
- BasicPool::new_full(Default::default(), true.into(), None, spawner, client.clone());
+ let pool = Arc::from(BasicPool::new_full(
+ Default::default(),
+ true.into(),
+ None,
+ spawner,
+ client.clone(),
+ ));
TestSetup { client, keystore, pool }
}
}
diff --git a/substrate/client/service/src/config.rs b/substrate/client/service/src/config.rs
index 6f65c2e2d81b..fb9e9264dfe7 100644
--- a/substrate/client/service/src/config.rs
+++ b/substrate/client/service/src/config.rs
@@ -37,7 +37,7 @@ pub use sc_rpc_server::{
IpNetwork, RpcEndpoint, RpcMethods, SubscriptionIdProvider as RpcSubscriptionIdProvider,
};
pub use sc_telemetry::TelemetryEndpoints;
-pub use sc_transaction_pool::Options as TransactionPoolOptions;
+pub use sc_transaction_pool::TransactionPoolOptions;
use sp_core::crypto::SecretString;
use std::{
io, iter,
diff --git a/substrate/client/service/src/lib.rs b/substrate/client/service/src/lib.rs
index b6acdb8ed002..54e847791cff 100644
--- a/substrate/client/service/src/lib.rs
+++ b/substrate/client/service/src/lib.rs
@@ -94,7 +94,7 @@ pub use sc_network_sync::WarpSyncConfig;
pub use sc_network_transactions::config::{TransactionImport, TransactionImportFuture};
pub use sc_rpc::{RandomIntegerSubscriptionId, RandomStringSubscriptionId};
pub use sc_tracing::TracingReceiver;
-pub use sc_transaction_pool::Options as TransactionPoolOptions;
+pub use sc_transaction_pool::TransactionPoolOptions;
pub use sc_transaction_pool_api::{error::IntoPoolError, InPoolTransaction, TransactionPool};
#[doc(hidden)]
pub use std::{ops::Deref, result::Result, sync::Arc};
@@ -484,7 +484,7 @@ where
.filter(|t| t.is_propagable())
.map(|t| {
let hash = t.hash().clone();
- let ex: B::Extrinsic = t.data().clone();
+ let ex: B::Extrinsic = (**t.data()).clone();
(hash, ex)
})
.collect()
@@ -523,6 +523,7 @@ where
},
};
+ let start = std::time::Instant::now();
let import_future = self.pool.submit_one(
self.client.info().best_hash,
sc_transaction_pool_api::TransactionSource::External,
@@ -530,16 +531,16 @@ where
);
Box::pin(async move {
match import_future.await {
- Ok(_) => TransactionImport::NewGood,
+ Ok(_) => {
+ let elapsed = start.elapsed();
+ debug!(target: sc_transaction_pool::LOG_TARGET, "import transaction: {elapsed:?}");
+ TransactionImport::NewGood
+ },
Err(e) => match e.into_pool_error() {
Ok(sc_transaction_pool_api::error::Error::AlreadyImported(_)) =>
TransactionImport::KnownGood,
- Ok(e) => {
- debug!("Error adding transaction to the pool: {:?}", e);
- TransactionImport::Bad
- },
- Err(e) => {
- debug!("Error converting pool error: {}", e);
+ Ok(_) => TransactionImport::Bad,
+ Err(_) => {
// it is not bad at least, just some internal node logic error, so peer is
// innocent.
TransactionImport::KnownGood
@@ -556,7 +557,7 @@ where
fn transaction(&self, hash: &H) -> Option {
self.pool.ready_transaction(hash).and_then(
// Only propagable transactions should be resolved for network service.
- |tx| if tx.is_propagable() { Some(tx.data().clone()) } else { None },
+ |tx| if tx.is_propagable() { Some((**tx.data()).clone()) } else { None },
)
}
}
@@ -578,8 +579,13 @@ mod tests {
let (client, longest_chain) = TestClientBuilder::new().build_with_longest_chain();
let client = Arc::new(client);
let spawner = sp_core::testing::TaskExecutor::new();
- let pool =
- BasicPool::new_full(Default::default(), true.into(), None, spawner, client.clone());
+ let pool = Arc::from(BasicPool::new_full(
+ Default::default(),
+ true.into(),
+ None,
+ spawner,
+ client.clone(),
+ ));
let source = sp_runtime::transaction_validity::TransactionSource::External;
let best = block_on(longest_chain.best_chain()).unwrap();
let transaction = Transfer {
diff --git a/substrate/client/transaction-pool/Cargo.toml b/substrate/client/transaction-pool/Cargo.toml
index 98994cc742ff..d346add93a64 100644
--- a/substrate/client/transaction-pool/Cargo.toml
+++ b/substrate/client/transaction-pool/Cargo.toml
@@ -20,6 +20,8 @@ async-trait = { workspace = true }
codec = { workspace = true, default-features = true }
futures = { workspace = true }
futures-timer = { workspace = true }
+indexmap = { workspace = true }
+itertools = { workspace = true }
linked-hash-map = { workspace = true }
log = { workspace = true, default-features = true }
parking_lot = { workspace = true, default-features = true }
@@ -36,6 +38,8 @@ sp-crypto-hashing = { workspace = true, default-features = true }
sp-runtime = { workspace = true, default-features = true }
sp-tracing = { workspace = true, default-features = true }
sp-transaction-pool = { workspace = true, default-features = true }
+tokio-stream = { workspace = true }
+tokio = { workspace = true, default-features = true, features = ["macros", "time"] }
[dev-dependencies]
array-bytes = { workspace = true, default-features = true }
diff --git a/substrate/client/transaction-pool/api/src/error.rs b/substrate/client/transaction-pool/api/src/error.rs
index d0744bfa3e19..e81955ebe54c 100644
--- a/substrate/client/transaction-pool/api/src/error.rs
+++ b/substrate/client/transaction-pool/api/src/error.rs
@@ -38,7 +38,7 @@ pub enum Error {
/// The transaction validity returned no "provides" tag.
///
/// Such transactions are not accepted to the pool, since we use those tags
- /// to define identity of transactions (occupance of the same "slot").
+ /// to define identity of transactions (occupancy of the same "slot").
#[error("Transaction does not provide any tags, so the pool can't identify it")]
NoTagsProvided,
diff --git a/substrate/client/transaction-pool/api/src/lib.rs b/substrate/client/transaction-pool/api/src/lib.rs
index 0a313c5b782d..3ac1a79a0c28 100644
--- a/substrate/client/transaction-pool/api/src/lib.rs
+++ b/substrate/client/transaction-pool/api/src/lib.rs
@@ -26,7 +26,7 @@ use codec::Codec;
use futures::{Future, Stream};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use sp_core::offchain::TransactionPoolExt;
-use sp_runtime::traits::{Block as BlockT, Member, NumberFor};
+use sp_runtime::traits::{Block as BlockT, Member};
use std::{collections::HashMap, hash::Hash, marker::PhantomData, pin::Pin, sync::Arc};
const LOG_TARGET: &str = "txpool::api";
@@ -36,7 +36,7 @@ pub use sp_runtime::transaction_validity::{
};
/// Transaction pool status.
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub struct PoolStatus {
/// Number of transactions in the ready queue.
pub ready: usize,
@@ -49,7 +49,7 @@ pub struct PoolStatus {
}
impl PoolStatus {
- /// Returns true if the are no transactions in the pool.
+ /// Returns true if there are no transactions in the pool.
pub fn is_empty(&self) -> bool {
self.ready == 0 && self.future == 0
}
@@ -57,7 +57,7 @@ impl PoolStatus {
/// Possible transaction status events.
///
-/// This events are being emitted by `TransactionPool` watchers,
+/// These events are being emitted by `TransactionPool` watchers,
/// which are also exposed over RPC.
///
/// The status events can be grouped based on their kinds as:
@@ -144,7 +144,7 @@ pub enum TransactionStatus {
/// Maximum number of finality watchers has been reached,
/// old watchers are being removed.
FinalityTimeout(BlockHash),
- /// Transaction has been finalized by a finality-gadget, e.g GRANDPA.
+ /// Transaction has been finalized by a finality-gadget, e.g. GRANDPA.
#[serde(with = "v1_compatible")]
Finalized((BlockHash, TxIndex)),
/// Transaction has been replaced in the pool, by another transaction
@@ -245,7 +245,7 @@ pub trait TransactionPool: Send + Sync {
type Hash: Hash + Eq + Member + Serialize + DeserializeOwned + Codec;
/// In-pool transaction type.
type InPoolTransaction: InPoolTransaction<
- Transaction = TransactionFor,
+ Transaction = Arc>,
Hash = TxHash,
>;
/// Error type.
@@ -269,7 +269,7 @@ pub trait TransactionPool: Send + Sync {
xt: TransactionFor,
) -> PoolFuture, Self::Error>;
- /// Returns a future that import a single transaction and starts to watch their progress in the
+ /// Returns a future that imports a single transaction and starts to watch their progress in the
/// pool.
fn submit_and_watch(
&self,
@@ -285,7 +285,7 @@ pub trait TransactionPool: Send + Sync {
/// Guarantees to return immediately when `None` is passed.
fn ready_at(
&self,
- at: NumberFor,
+ at: ::Hash,
) -> Pin<
Box<
dyn Future<
@@ -321,6 +321,23 @@ pub trait TransactionPool: Send + Sync {
/// Return specific ready transaction by hash, if there is one.
fn ready_transaction(&self, hash: &TxHash) -> Option>;
+
+ /// Returns set of ready transaction at given block within given timeout.
+ ///
+ /// If the timeout is hit during method execution then the best effort set of ready transactions
+ /// for given block, without executing full maintain process is returned.
+ fn ready_at_with_timeout(
+ &self,
+ at: ::Hash,
+ timeout: std::time::Duration,
+ ) -> Pin<
+ Box<
+ dyn Future<
+ Output = Box> + Send>,
+ > + Send
+ + '_,
+ >,
+ >;
}
/// An iterator of ready transactions.
@@ -345,6 +362,7 @@ impl ReadyTransactions for std::iter::Empty {
}
/// Events that the transaction pool listens for.
+#[derive(Debug)]
pub enum ChainEvent {
/// New best block have been added to the chain.
NewBestBlock {
@@ -441,7 +459,7 @@ impl OffchainSubmitTransaction for TP
at: ::Hash,
extrinsic: ::Extrinsic,
) -> Result<(), ()> {
- log::debug!(
+ log::trace!(
target: LOG_TARGET,
"(offchain call) Submitting a transaction to the pool: {:?}",
extrinsic
diff --git a/substrate/client/transaction-pool/benches/basics.rs b/substrate/client/transaction-pool/benches/basics.rs
index 65c83f090535..2db34bc3f32f 100644
--- a/substrate/client/transaction-pool/benches/basics.rs
+++ b/substrate/client/transaction-pool/benches/basics.rs
@@ -24,6 +24,7 @@ use futures::{
future::{ready, Ready},
};
use sc_transaction_pool::*;
+use sp_blockchain::HashAndNumber;
use sp_crypto_hashing::blake2_256;
use sp_runtime::{
generic::BlockId,
@@ -64,8 +65,9 @@ impl ChainApi for TestApi {
&self,
at: ::Hash,
_source: TransactionSource,
- uxt: ::Extrinsic,
+ uxt: Arc<::Extrinsic>,
) -> Self::ValidationFuture {
+ let uxt = (*uxt).clone();
let transfer = TransferData::try_from(&uxt)
.expect("uxt is expected to be bench_call (carrying TransferData)");
let nonce = transfer.nonce;
@@ -144,6 +146,10 @@ fn bench_configured(pool: Pool, number: u64, api: Arc) {
let source = TransactionSource::External;
let mut futures = Vec::new();
let mut tags = Vec::new();
+ let at = HashAndNumber {
+ hash: api.block_id_to_hash(&BlockId::Number(1)).unwrap().unwrap(),
+ number: 1,
+ };
for nonce in 1..=number {
let xt = uxt(TransferData {
@@ -151,15 +157,12 @@ fn bench_configured(pool: Pool, number: u64, api: Arc) {
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce,
- });
+ })
+ .into();
tags.push(to_tag(nonce, AccountId::from_h256(H256::from_low_u64_be(1))));
- futures.push(pool.submit_one(
- api.block_id_to_hash(&BlockId::Number(1)).unwrap().unwrap(),
- source,
- xt,
- ));
+ futures.push(pool.submit_one(&at, source, xt));
}
let res = block_on(futures::future::join_all(futures.into_iter()));
@@ -170,12 +173,11 @@ fn bench_configured(pool: Pool, number: u64, api: Arc) {
// Prune all transactions.
let block_num = 6;
- block_on(pool.prune_tags(
- api.block_id_to_hash(&BlockId::Number(block_num)).unwrap().unwrap(),
- tags,
- vec![],
- ))
- .expect("Prune failed");
+ let at = HashAndNumber {
+ hash: api.block_id_to_hash(&BlockId::Number(block_num)).unwrap().unwrap(),
+ number: block_num,
+ };
+ block_on(pool.prune_tags(&at, tags, vec![]));
// pool is empty
assert_eq!(pool.validated_pool().status().ready, 0);
diff --git a/substrate/client/transaction-pool/src/builder.rs b/substrate/client/transaction-pool/src/builder.rs
new file mode 100644
index 000000000000..e1fddcdd8952
--- /dev/null
+++ b/substrate/client/transaction-pool/src/builder.rs
@@ -0,0 +1,245 @@
+// This file is part of Substrate.
+
+// Copyright (C) Parity Technologies (UK) Ltd.
+// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see .
+
+//! Utility for building substrate transaction pool trait object.
+
+use crate::{
+ common::api::FullChainApi,
+ fork_aware_txpool::ForkAwareTxPool as ForkAwareFullPool,
+ graph::{base_pool::Transaction, ChainApi, ExtrinsicFor, ExtrinsicHash, IsValidator, Options},
+ single_state_txpool::BasicPool as SingleStateFullPool,
+ TransactionPoolWrapper, LOG_TARGET,
+};
+use prometheus_endpoint::Registry as PrometheusRegistry;
+use sc_transaction_pool_api::{LocalTransactionPool, MaintainedTransactionPool};
+use sp_core::traits::SpawnEssentialNamed;
+use sp_runtime::traits::Block as BlockT;
+use std::{marker::PhantomData, sync::Arc, time::Duration};
+
+/// The type of transaction pool.
+#[derive(Debug, Clone)]
+pub enum TransactionPoolType {
+ /// Single-state transaction pool
+ SingleState,
+ /// Fork-aware transaction pool
+ ForkAware,
+}
+
+/// Transaction pool options.
+#[derive(Debug, Clone)]
+pub struct TransactionPoolOptions {
+ txpool_type: TransactionPoolType,
+ options: Options,
+}
+
+impl Default for TransactionPoolOptions {
+ fn default() -> Self {
+ Self { txpool_type: TransactionPoolType::SingleState, options: Default::default() }
+ }
+}
+
+impl TransactionPoolOptions {
+ /// Creates the options for the transaction pool using given parameters.
+ pub fn new_with_params(
+ pool_limit: usize,
+ pool_bytes: usize,
+ tx_ban_seconds: Option,
+ txpool_type: TransactionPoolType,
+ is_dev: bool,
+ ) -> TransactionPoolOptions {
+ let mut options = Options::default();
+
+ // ready queue
+ options.ready.count = pool_limit;
+ options.ready.total_bytes = pool_bytes;
+
+ // future queue
+ let factor = 10;
+ options.future.count = pool_limit / factor;
+ options.future.total_bytes = pool_bytes / factor;
+
+ options.ban_time = if let Some(ban_seconds) = tx_ban_seconds {
+ Duration::from_secs(ban_seconds)
+ } else if is_dev {
+ Duration::from_secs(0)
+ } else {
+ Duration::from_secs(30 * 60)
+ };
+
+ TransactionPoolOptions { options, txpool_type }
+ }
+
+ /// Creates predefined options for benchmarking
+ pub fn new_for_benchmarks() -> TransactionPoolOptions {
+ TransactionPoolOptions {
+ options: Options {
+ ready: crate::graph::base_pool::Limit {
+ count: 100_000,
+ total_bytes: 100 * 1024 * 1024,
+ },
+ future: crate::graph::base_pool::Limit {
+ count: 100_000,
+ total_bytes: 100 * 1024 * 1024,
+ },
+ reject_future_transactions: false,
+ ban_time: Duration::from_secs(30 * 60),
+ },
+ txpool_type: TransactionPoolType::SingleState,
+ }
+ }
+}
+
+/// `FullClientTransactionPool` is a trait that combines the functionality of
+/// `MaintainedTransactionPool` and `LocalTransactionPool` for a given `Client` and `Block`.
+///
+/// This trait defines the requirements for a full client transaction pool, ensuring
+/// that it can handle transactions submission and maintenance.
+pub trait FullClientTransactionPool:
+ MaintainedTransactionPool<
+ Block = Block,
+ Hash = ExtrinsicHash>,
+ InPoolTransaction = Transaction<
+ ExtrinsicHash>,
+ ExtrinsicFor>,
+ >,
+ Error = as ChainApi>::Error,
+ > + LocalTransactionPool<
+ Block = Block,
+ Hash = ExtrinsicHash>,
+ Error = as ChainApi>::Error,
+ >
+where
+ Block: BlockT,
+ Client: sp_api::ProvideRuntimeApi
+ + sc_client_api::BlockBackend
+ + sc_client_api::blockchain::HeaderBackend
+ + sp_runtime::traits::BlockIdTo
+ + sp_blockchain::HeaderMetadata
+ + 'static,
+ Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue,
+{
+}
+
+impl FullClientTransactionPool for P
+where
+ Block: BlockT,
+ Client: sp_api::ProvideRuntimeApi
+ + sc_client_api::BlockBackend
+ + sc_client_api::blockchain::HeaderBackend
+ + sp_runtime::traits::BlockIdTo
+ + sp_blockchain::HeaderMetadata
+ + 'static,
+ Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue,
+ P: MaintainedTransactionPool<
+ Block = Block,
+ Hash = ExtrinsicHash>,
+ InPoolTransaction = Transaction<
+ ExtrinsicHash>,
+ ExtrinsicFor>,
+ >,
+ Error = as ChainApi>::Error,
+ > + LocalTransactionPool<
+ Block = Block,
+ Hash = ExtrinsicHash>,
+ Error = as ChainApi>::Error,
+ >,
+{
+}
+
+/// The public type alias for the actual type providing the implementation of
+/// `FullClientTransactionPool` with the given `Client` and `Block` types.
+///
+/// This handle abstracts away the specific type of the transaction pool. Should be used
+/// externally to keep reference to transaction pool.
+pub type TransactionPoolHandle = TransactionPoolWrapper;
+
+/// Builder allowing to create specific instance of transaction pool.
+pub struct Builder<'a, Block, Client> {
+ options: TransactionPoolOptions,
+ is_validator: IsValidator,
+ prometheus: Option<&'a PrometheusRegistry>,
+ client: Arc,
+ spawner: Box,
+ _phantom: PhantomData<(Client, Block)>,
+}
+
+impl<'a, Client, Block> Builder<'a, Block, Client>
+where
+ Block: BlockT,
+ Client: sp_api::ProvideRuntimeApi
+ + sc_client_api::BlockBackend
+ + sc_client_api::blockchain::HeaderBackend
+ + sp_runtime::traits::BlockIdTo
+ + sc_client_api::ExecutorProvider
+ + sc_client_api::UsageProvider
+ + sp_blockchain::HeaderMetadata
+ + Send
+ + Sync
+ + 'static,
+ ::Hash: std::marker::Unpin,
+ Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue,
+{
+ /// Creates new instance of `Builder`
+ pub fn new(
+ spawner: impl SpawnEssentialNamed + 'static,
+ client: Arc,
+ is_validator: IsValidator,
+ ) -> Builder<'a, Block, Client> {
+ Builder {
+ options: Default::default(),
+ _phantom: Default::default(),
+ spawner: Box::new(spawner),
+ client,
+ is_validator,
+ prometheus: None,
+ }
+ }
+
+ /// Sets the options used for creating a transaction pool instance.
+ pub fn with_options(mut self, options: TransactionPoolOptions) -> Self {
+ self.options = options;
+ self
+ }
+
+ /// Sets the prometheus endpoint used in a transaction pool instance.
+ pub fn with_prometheus(mut self, prometheus: Option<&'a PrometheusRegistry>) -> Self {
+ self.prometheus = prometheus;
+ self
+ }
+
+ /// Creates an instance of transaction pool.
+ pub fn build(self) -> TransactionPoolHandle {
+ log::info!(target:LOG_TARGET, " creating {:?} txpool {:?}/{:?}.", self.options.txpool_type, self.options.options.ready, self.options.options.future);
+ TransactionPoolWrapper::(match self.options.txpool_type {
+ TransactionPoolType::SingleState => Box::new(SingleStateFullPool::new_full(
+ self.options.options,
+ self.is_validator,
+ self.prometheus,
+ self.spawner,
+ self.client,
+ )),
+ TransactionPoolType::ForkAware => Box::new(ForkAwareFullPool::new_full(
+ self.options.options,
+ self.is_validator,
+ self.prometheus,
+ self.spawner,
+ self.client,
+ )),
+ })
+ }
+}
diff --git a/substrate/client/transaction-pool/src/api.rs b/substrate/client/transaction-pool/src/common/api.rs
similarity index 87%
rename from substrate/client/transaction-pool/src/api.rs
rename to substrate/client/transaction-pool/src/common/api.rs
index cccaad7c8994..a5185ba606ef 100644
--- a/substrate/client/transaction-pool/src/api.rs
+++ b/substrate/client/transaction-pool/src/common/api.rs
@@ -40,18 +40,18 @@ use sp_runtime::{
};
use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
-use crate::{
+use super::{
error::{self, Error},
- graph,
metrics::{ApiMetrics, ApiMetricsExt},
};
+use crate::graph;
/// The transaction pool logic for full client.
pub struct FullChainApi {
client: Arc,
_marker: PhantomData,
metrics: Option>,
- validation_pool: Arc + Send>>>>>,
+ validation_pool: mpsc::Sender + Send>>>,
}
/// Spawn a validation task that will be used by the transaction pool to validate transactions.
@@ -101,12 +101,7 @@ impl FullChainApi {
spawn_validation_pool_task("transaction-pool-task-0", receiver.clone(), spawner);
spawn_validation_pool_task("transaction-pool-task-1", receiver, spawner);
- FullChainApi {
- client,
- validation_pool: Arc::new(Mutex::new(sender)),
- _marker: Default::default(),
- metrics,
- }
+ FullChainApi { client, validation_pool: sender, _marker: Default::default(), metrics }
}
}
@@ -139,25 +134,25 @@ where
) -> Self::ValidationFuture {
let (tx, rx) = oneshot::channel();
let client = self.client.clone();
- let validation_pool = self.validation_pool.clone();
+ let mut validation_pool = self.validation_pool.clone();
let metrics = self.metrics.clone();
async move {
metrics.report(|m| m.validations_scheduled.inc());
- validation_pool
- .lock()
- .await
- .send(
- async move {
- let res = validate_transaction_blocking(&*client, at, source, uxt);
- let _ = tx.send(res);
- metrics.report(|m| m.validations_finished.inc());
- }
- .boxed(),
- )
- .await
- .map_err(|e| Error::RuntimeApi(format!("Validation pool down: {:?}", e)))?;
+ {
+ validation_pool
+ .send(
+ async move {
+ let res = validate_transaction_blocking(&*client, at, source, uxt);
+ let _ = tx.send(res);
+ metrics.report(|m| m.validations_finished.inc());
+ }
+ .boxed(),
+ )
+ .await
+ .map_err(|e| Error::RuntimeApi(format!("Validation pool down: {:?}", e)))?;
+ }
match rx.await {
Ok(r) => r,
@@ -183,7 +178,7 @@ where
fn hash_and_length(
&self,
- ex: &graph::ExtrinsicFor,
+ ex: &graph::RawExtrinsicFor,
) -> (graph::ExtrinsicHash, usize) {
ex.using_encoded(|x| ( as traits::Hash>::hash(x), x.len()))
}
@@ -222,7 +217,10 @@ where
Client: Send + Sync + 'static,
Client::Api: TaggedTransactionQueue,
{
- sp_tracing::within_span!(sp_tracing::Level::TRACE, "validate_transaction";
+ let s = std::time::Instant::now();
+ let h = uxt.using_encoded(|x| as traits::Hash>::hash(x));
+
+ let result = sp_tracing::within_span!(sp_tracing::Level::TRACE, "validate_transaction";
{
let runtime_api = client.runtime_api();
let api_version = sp_tracing::within_span! { sp_tracing::Level::TRACE, "check_version";
@@ -240,7 +238,7 @@ where
sp_tracing::Level::TRACE, "runtime::validate_transaction";
{
if api_version >= 3 {
- runtime_api.validate_transaction(at, source, uxt, at)
+ runtime_api.validate_transaction(at, source, (*uxt).clone(), at)
.map_err(|e| Error::RuntimeApi(e.to_string()))
} else {
let block_number = client.to_number(&BlockId::Hash(at))
@@ -260,16 +258,19 @@ where
if api_version == 2 {
#[allow(deprecated)] // old validate_transaction
- runtime_api.validate_transaction_before_version_3(at, source, uxt)
+ runtime_api.validate_transaction_before_version_3(at, source, (*uxt).clone())
.map_err(|e| Error::RuntimeApi(e.to_string()))
} else {
#[allow(deprecated)] // old validate_transaction
- runtime_api.validate_transaction_before_version_2(at, uxt)
+ runtime_api.validate_transaction_before_version_2(at, (*uxt).clone())
.map_err(|e| Error::RuntimeApi(e.to_string()))
}
}
})
- })
+ });
+ log::trace!(target: LOG_TARGET, "[{h:?}] validate_transaction_blocking: at:{at:?} took:{:?}", s.elapsed());
+
+ result
}
impl FullChainApi
diff --git a/substrate/client/transaction-pool/src/enactment_state.rs b/substrate/client/transaction-pool/src/common/enactment_state.rs
similarity index 94%
rename from substrate/client/transaction-pool/src/enactment_state.rs
rename to substrate/client/transaction-pool/src/common/enactment_state.rs
index 85c572c127e8..a7eb6a3687c6 100644
--- a/substrate/client/transaction-pool/src/enactment_state.rs
+++ b/substrate/client/transaction-pool/src/common/enactment_state.rs
@@ -34,7 +34,7 @@ const SKIP_MAINTENANCE_THRESHOLD: u16 = 20;
/// is to figure out which phases (enactment / finalization) of transaction pool
/// maintenance are needed.
///
-/// Given the following chain:
+/// Example: given the following chain:
///
/// B1-C1-D1-E1
/// /
@@ -42,8 +42,8 @@ const SKIP_MAINTENANCE_THRESHOLD: u16 = 20;
/// \
/// B2-C2-D2-E2
///
-/// Some scenarios and expected behavior for sequence of `NewBestBlock` (`nbb`) and `Finalized`
-/// (`f`) events:
+/// the list presents scenarios and expected behavior for sequence of `NewBestBlock` (`nbb`)
+/// and `Finalized` (`f`) events. true/false means if enactiment is required:
///
/// - `nbb(C1)`, `f(C1)` -> false (enactment was already performed in `nbb(C1))`
/// - `f(C1)`, `nbb(C1)` -> false (enactment was already performed in `f(C1))`
@@ -103,7 +103,7 @@ where
let new_hash = event.hash();
let finalized = event.is_finalized();
- // do not proceed with txpool maintain if block distance is to high
+ // do not proceed with txpool maintain if block distance is too high
let skip_maintenance =
match (hash_to_number(new_hash), hash_to_number(self.recent_best_block)) {
(Ok(Some(new)), Ok(Some(current))) =>
@@ -112,14 +112,14 @@ where
};
if skip_maintenance {
- log::debug!(target: LOG_TARGET, "skip maintain: tree_route would be too long");
+ log::trace!(target: LOG_TARGET, "skip maintain: tree_route would be too long");
self.force_update(event);
return Ok(EnactmentAction::Skip)
}
// block was already finalized
if self.recent_finalized_block == new_hash {
- log::debug!(target: LOG_TARGET, "handle_enactment: block already finalized");
+ log::trace!(target: LOG_TARGET, "handle_enactment: block already finalized");
return Ok(EnactmentAction::Skip)
}
@@ -127,7 +127,7 @@ where
// it instead of tree_route provided with event
let tree_route = tree_route(self.recent_best_block, new_hash)?;
- log::debug!(
+ log::trace!(
target: LOG_TARGET,
"resolve hash: {new_hash:?} finalized: {finalized:?} \
tree_route: (common {:?}, last {:?}) best_block: {:?} finalized_block:{:?}",
@@ -141,7 +141,7 @@ where
// happening if we first received a finalization event and then a new
// best event for some old stale best head.
if tree_route.retracted().iter().any(|x| x.hash == self.recent_finalized_block) {
- log::debug!(
+ log::trace!(
target: LOG_TARGET,
"Recently finalized block {} would be retracted by ChainEvent {}, skipping",
self.recent_finalized_block,
@@ -180,7 +180,7 @@ where
ChainEvent::NewBestBlock { hash, .. } => self.recent_best_block = *hash,
ChainEvent::Finalized { hash, .. } => self.recent_finalized_block = *hash,
};
- log::debug!(
+ log::trace!(
target: LOG_TARGET,
"forced update: {:?}, {:?}",
self.recent_best_block,
@@ -296,7 +296,7 @@ mod enactment_state_tests {
use super::*;
/// asserts that tree routes are equal
- fn assert_treeroute_eq(
+ fn assert_tree_route_eq(
expected: Result, String>,
result: Result, String>,
) {
@@ -323,56 +323,56 @@ mod enactment_state_tests {
fn tree_route_mock_test_01() {
let result = tree_route(b1().hash, a().hash);
let expected = TreeRoute::new(vec![b1(), a()], 1);
- assert_treeroute_eq(result, expected);
+ assert_tree_route_eq(result, expected);
}
#[test]
fn tree_route_mock_test_02() {
let result = tree_route(a().hash, b1().hash);
let expected = TreeRoute::new(vec![a(), b1()], 0);
- assert_treeroute_eq(result, expected);
+ assert_tree_route_eq(result, expected);
}
#[test]
fn tree_route_mock_test_03() {
let result = tree_route(a().hash, c2().hash);
let expected = TreeRoute::new(vec![a(), b2(), c2()], 0);
- assert_treeroute_eq(result, expected);
+ assert_tree_route_eq(result, expected);
}
#[test]
fn tree_route_mock_test_04() {
let result = tree_route(e2().hash, a().hash);
let expected = TreeRoute::new(vec![e2(), d2(), c2(), b2(), a()], 4);
- assert_treeroute_eq(result, expected);
+ assert_tree_route_eq(result, expected);
}
#[test]
fn tree_route_mock_test_05() {
let result = tree_route(d1().hash, b1().hash);
let expected = TreeRoute::new(vec![d1(), c1(), b1()], 2);
- assert_treeroute_eq(result, expected);
+ assert_tree_route_eq(result, expected);
}
#[test]
fn tree_route_mock_test_06() {
let result = tree_route(d2().hash, b2().hash);
let expected = TreeRoute::new(vec![d2(), c2(), b2()], 2);
- assert_treeroute_eq(result, expected);
+ assert_tree_route_eq(result, expected);
}
#[test]
fn tree_route_mock_test_07() {
let result = tree_route(b1().hash, d1().hash);
let expected = TreeRoute::new(vec![b1(), c1(), d1()], 0);
- assert_treeroute_eq(result, expected);
+ assert_tree_route_eq(result, expected);
}
#[test]
fn tree_route_mock_test_08() {
let result = tree_route(b2().hash, d2().hash);
let expected = TreeRoute::new(vec![b2(), c2(), d2()], 0);
- assert_treeroute_eq(result, expected);
+ assert_tree_route_eq(result, expected);
}
#[test]
@@ -380,7 +380,7 @@ mod enactment_state_tests {
let result = tree_route(e2().hash, e1().hash);
let expected =
TreeRoute::new(vec![e2(), d2(), c2(), b2(), a(), b1(), c1(), d1(), e1()], 4);
- assert_treeroute_eq(result, expected);
+ assert_tree_route_eq(result, expected);
}
#[test]
@@ -388,55 +388,55 @@ mod enactment_state_tests {
let result = tree_route(e1().hash, e2().hash);
let expected =
TreeRoute::new(vec![e1(), d1(), c1(), b1(), a(), b2(), c2(), d2(), e2()], 4);
- assert_treeroute_eq(result, expected);
+ assert_tree_route_eq(result, expected);
}
#[test]
fn tree_route_mock_test_11() {
let result = tree_route(b1().hash, c2().hash);
let expected = TreeRoute::new(vec![b1(), a(), b2(), c2()], 1);
- assert_treeroute_eq(result, expected);
+ assert_tree_route_eq(result, expected);
}
#[test]
fn tree_route_mock_test_12() {
let result = tree_route(d2().hash, b1().hash);
let expected = TreeRoute::new(vec![d2(), c2(), b2(), a(), b1()], 3);
- assert_treeroute_eq(result, expected);
+ assert_tree_route_eq(result, expected);
}
#[test]
fn tree_route_mock_test_13() {
let result = tree_route(c2().hash, e1().hash);
let expected = TreeRoute::new(vec![c2(), b2(), a(), b1(), c1(), d1(), e1()], 2);
- assert_treeroute_eq(result, expected);
+ assert_tree_route_eq(result, expected);
}
#[test]
fn tree_route_mock_test_14() {
let result = tree_route(b1().hash, b1().hash);
let expected = TreeRoute::new(vec![b1()], 0);
- assert_treeroute_eq(result, expected);
+ assert_tree_route_eq(result, expected);
}
#[test]
fn tree_route_mock_test_15() {
let result = tree_route(b2().hash, b2().hash);
let expected = TreeRoute::new(vec![b2()], 0);
- assert_treeroute_eq(result, expected);
+ assert_tree_route_eq(result, expected);
}
#[test]
fn tree_route_mock_test_16() {
let result = tree_route(a().hash, a().hash);
let expected = TreeRoute::new(vec![a()], 0);
- assert_treeroute_eq(result, expected);
+ assert_tree_route_eq(result, expected);
}
#[test]
fn tree_route_mock_test_17() {
let result = tree_route(x2().hash, b1().hash);
let expected = TreeRoute::new(vec![x2(), e2(), d2(), c2(), b2(), a(), b1()], 5);
- assert_treeroute_eq(result, expected);
+ assert_tree_route_eq(result, expected);
}
}
diff --git a/substrate/client/transaction-pool/src/error.rs b/substrate/client/transaction-pool/src/common/error.rs
similarity index 100%
rename from substrate/client/transaction-pool/src/error.rs
rename to substrate/client/transaction-pool/src/common/error.rs
diff --git a/substrate/client/transaction-pool/src/common/log_xt.rs b/substrate/client/transaction-pool/src/common/log_xt.rs
new file mode 100644
index 000000000000..6c3752c1d50e
--- /dev/null
+++ b/substrate/client/transaction-pool/src/common/log_xt.rs
@@ -0,0 +1,54 @@
+// This file is part of Substrate.
+
+// Copyright (C) Parity Technologies (UK) Ltd.
+// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see .
+
+//! Utility for logging transaction collections.
+
+/// Logs every transaction from given `tx_collection` with given level.
+macro_rules! log_xt {
+ (data: hash, target: $target:expr, $level:expr, $tx_collection:expr, $text_with_format:expr) => {
+ if log::log_enabled!(target: $target, $level) {
+ for tx in $tx_collection {
+ log::log!(target: $target, $level, $text_with_format, tx);
+ }
+ }
+ };
+ (data: hash, target: $target:expr, $level:expr, $tx_collection:expr, $text_with_format:expr, $($arg:expr),*) => {
+ if log::log_enabled!(target: $target, $level) {
+ for tx in $tx_collection {
+ log::log!(target: $target, $level, $text_with_format, tx, $($arg),*);
+ }
+ }
+ };
+ (data: tuple, target: $target:expr, $level:expr, $tx_collection:expr, $text_with_format:expr) => {
+ if log::log_enabled!(target: $target, $level) {
+ for tx in $tx_collection {
+ log::log!(target: $target, $level, $text_with_format, tx.0, tx.1)
+ }
+ }
+ };
+}
+
+/// Logs every transaction from given `tx_collection` with trace level.
+macro_rules! log_xt_trace {
+ (data: $datatype:ident, target: $target:expr, $($arg:tt)+) => ($crate::common::log_xt::log_xt!(data: $datatype, target: $target, log::Level::Trace, $($arg)+));
+ (target: $target:expr, $tx_collection:expr, $text_with_format:expr) => ($crate::common::log_xt::log_xt!(data: hash, target: $target, log::Level::Trace, $tx_collection, $text_with_format));
+ (target: $target:expr, $tx_collection:expr, $text_with_format:expr, $($arg:expr)*) => ($crate::common::log_xt::log_xt!(data: hash, target: $target, log::Level::Trace, $tx_collection, $text_with_format, $($arg)*));
+}
+
+pub(crate) use log_xt;
+pub(crate) use log_xt_trace;
diff --git a/substrate/client/transaction-pool/src/metrics.rs b/substrate/client/transaction-pool/src/common/metrics.rs
similarity index 58%
rename from substrate/client/transaction-pool/src/metrics.rs
rename to substrate/client/transaction-pool/src/common/metrics.rs
index 170bface9647..0ec3b511fa0e 100644
--- a/substrate/client/transaction-pool/src/metrics.rs
+++ b/substrate/client/transaction-pool/src/common/metrics.rs
@@ -16,76 +16,52 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see .
-//! Transaction pool Prometheus metrics.
+//! Transaction pool Prometheus metrics for implementation of Chain API.
+use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64};
use std::sync::Arc;
-use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64};
+use crate::LOG_TARGET;
-#[derive(Clone, Default)]
-pub struct MetricsLink(Arc