Skip to content

Commit

Permalink
fork-aware transaction pool added (#4639)
Browse files Browse the repository at this point in the history
### Fork-Aware Transaction Pool Implementation

This PR introduces a fork-aware transaction pool (fatxpool) enhancing
transaction management by maintaining the valid state of txpool for
different forks.

### High-level overview
The high level overview was added to
[`sc_transaction_pool::fork_aware_txpool`](https://github.com/paritytech/polkadot-sdk/blob/3ad0a1b7c08e63a2581595cb2cd55f11ccd60f4f/substrate/client/transaction-pool/src/fork_aware_txpool/mod.rs#L21)
module. Use:
```
cargo  doc --document-private-items -p sc-transaction-pool --open
```
to build the doc. It should give a good overview and nice entry point
into the new pool's mechanics.

<details>
  <summary>Quick overview (documentation excerpt)</summary>

#### View
For every fork, a view is created. The view is a persisted state of the
transaction pool computed and updated at the tip of the fork. The view
is built around the existing `ValidatedPool` structure.

A view is created on every new best block notification. To create a
view, one of the existing views is chosen and cloned.

When the chain progresses, the view is kept in the cache
(`retracted_views`) to allow building blocks upon intermediary blocks in
the fork.

The views are deleted on finalization: views lower than the finalized
block are removed.

The views are updated with the transactions from the mempool—all
transactions are sent to the newly created views.
A maintain process is also executed for the newly created
views—basically resubmitting and pruning transactions from the
appropriate tree route.

##### View store
View store is the helper structure that acts as a container for all the
views. It provides some convenient methods.

##### Submitting transactions
Every transaction is submitted to every view at the tips of the forks.
Retracted views are not updated.
Every transaction also goes into the mempool.

##### Internal mempool
Shortly, the main purpose of an internal mempool is to prevent a
transaction from being lost. That could happen when a transaction is
invalid on one fork and could be valid on another. It also allows the
txpool to accept transactions when no blocks have been reported yet.

The mempool removes its transactions when they get finalized.
Transactions are also periodically verified on every finalized event and
removed from the mempool if no longer valid.

#### Events
Transaction events from multiple views are merged and filtered to avoid
duplicated events.
`Ready` / `Future` / `Inblock` events are originated in the Views and
are de-duplicated and forwarded to external listeners.
`Finalized` events are originated in fork-aware-txpool logic.
`Invalid` events requires special care and can be originated in both
view and fork-aware-txpool logic.

#### Light maintain
Sometime transaction pool does not have enough time to prepare fully
maintained view with all retracted transactions being revalidated. To
avoid providing empty ready transaction set to block builder (what would
result in empty block) the light maintain was implemented. It simply
removes the imported transactions from ready iterator.

#### Revalidation
Revalidation is performed for every view. The revalidation process is
started after a trigger is executed. The revalidation work is terminated
just after a new best block / finalized event is notified to the
transaction pool.
The revalidation result is applied to the newly created view which is
built upon the revalidated view.

Additionally, parts of the mempool are also revalidated to make sure
that no transactions are stuck in the mempool.


#### Logs
The most important log allowing to understand the state of the txpool
is:
```
              maintain: txs:(0, 92) views:[2;[(327, 76, 0), (326, 68, 0)]] event:Finalized { hash: 0x8...f, tree_route: [] }  took:3.463522ms
                             ^   ^         ^     ^   ^  ^      ^   ^  ^        ^                                                   ^
unwatched txs in mempool ────┘   │         │     │   │  │      │   │  │        │                                                   │
   watched txs in mempool ───────┘         │     │   │  │      │   │  │        │                                                   │
                     views  ───────────────┘     │   │  │      │   │  │        │                                                   │
                      1st view block # ──────────┘   │  │      │   │  │        │                                                   │
                           number of ready tx ───────┘  │      │   │  │        │                                                   │
                                numer of future tx ─────┘      │   │  │        │                                                   │
                                        2nd view block # ──────┘   │  │        │                                                   │
                                      number of ready tx ──────────┘  │        │                                                   │
                                           number of future tx ───────┘        │                                                   │
                                                                 event ────────┘                                                   │
                                                                       duration  ──────────────────────────────────────────────────┘
```
It is logged after the maintenance is done.

The `debug` level enables per-transaction logging, allowing to keep
track of all transaction-related actions that happened in txpool.
</details>


### Integration notes

For teams having a custom node, the new txpool needs to be instantiated,
typically in `service.rs` file, here is an example:

https://github.com/paritytech/polkadot-sdk/blob/9c547ff3e36cf3b52c99f4ed7849a8e9f722d97d/cumulus/polkadot-omni-node/lib/src/common/spec.rs#L152-L161

To enable new transaction pool the following cli arg shall be specified:
`--pool-type=fork-aware`. If it works, there shall be information
printed in the log:
```
2024-09-20 21:28:17.528  INFO main txpool: [Parachain]  creating ForkAware txpool.
````

For debugging the following debugs shall be enabled:
```
      "-lbasic-authorship=debug",
      "-ltxpool=debug",
```
*note:* trace for txpool enables per-transaction logging.

### Future work
The current implementation seems to be stable, however further
improvements are required.
Here is the umbrella issue for future work:
- #5472


Partially fixes: #1202

---------

Co-authored-by: Bastian Köcher <git@kchr.de>
Co-authored-by: Sebastian Kunert <skunert49@gmail.com>
Co-authored-by: Iulian Barbu <14218860+iulianbarbu@users.noreply.github.com>
  • Loading branch information
4 people authored Oct 15, 2024
1 parent 183b55a commit 26c11fc
Show file tree
Hide file tree
Showing 75 changed files with 11,719 additions and 1,564 deletions.
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion cumulus/client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ pub struct BuildNetworkParams<
pub net_config:
sc_network::config::FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Network>,
pub client: Arc<Client>,
pub transaction_pool: Arc<sc_transaction_pool::FullPool<Block, Client>>,
pub transaction_pool: Arc<sc_transaction_pool::TransactionPoolHandle<Block, Client>>,
pub para_id: ParaId,
pub relay_chain_interface: RCInterface,
pub spawn_handle: SpawnTaskHandle,
Expand Down
6 changes: 4 additions & 2 deletions cumulus/polkadot-omni-node/lib/src/common/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl<Block: BlockT, RuntimeApi>
BuildRpcExtensions<
ParachainClient<Block, RuntimeApi>,
ParachainBackend<Block>,
sc_transaction_pool::FullPool<Block, ParachainClient<Block, RuntimeApi>>,
sc_transaction_pool::TransactionPoolHandle<Block, ParachainClient<Block, RuntimeApi>>,
> for BuildParachainRpcExtensions<Block, RuntimeApi>
where
RuntimeApi:
Expand All @@ -57,7 +57,9 @@ where
fn build_rpc_extensions(
client: Arc<ParachainClient<Block, RuntimeApi>>,
backend: Arc<ParachainBackend<Block>>,
pool: Arc<sc_transaction_pool::FullPool<Block, ParachainClient<Block, RuntimeApi>>>,
pool: Arc<
sc_transaction_pool::TransactionPoolHandle<Block, ParachainClient<Block, RuntimeApi>>,
>,
) -> sc_service::error::Result<RpcExtension> {
let build = || -> Result<RpcExtension, Box<dyn std::error::Error + Send + Sync>> {
let mut module = RpcExtension::new(());
Expand Down
21 changes: 12 additions & 9 deletions cumulus/polkadot-omni-node/lib/src/common/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use sc_service::{Configuration, ImportQueue, PartialComponents, TaskManager};
use sc_sysinfo::HwBench;
use sc_telemetry::{TelemetryHandle, TelemetryWorker};
use sc_tracing::tracing::Instrument;
use sc_transaction_pool::FullPool;
use sc_transaction_pool::TransactionPoolHandle;
use sp_keystore::KeystorePtr;
use std::{future::Future, pin::Pin, sync::Arc, time::Duration};

Expand All @@ -65,7 +65,7 @@ where
telemetry: Option<TelemetryHandle>,
task_manager: &TaskManager,
relay_chain_interface: Arc<dyn RelayChainInterface>,
transaction_pool: Arc<FullPool<Block, ParachainClient<Block, RuntimeApi>>>,
transaction_pool: Arc<TransactionPoolHandle<Block, ParachainClient<Block, RuntimeApi>>>,
keystore: KeystorePtr,
relay_chain_slot_duration: Duration,
para_id: ParaId,
Expand Down Expand Up @@ -149,12 +149,15 @@ pub(crate) trait BaseNodeSpec {
telemetry
});

let transaction_pool = sc_transaction_pool::BasicPool::new_full(
config.transaction_pool.clone(),
config.role.is_authority().into(),
config.prometheus_registry(),
task_manager.spawn_essential_handle(),
client.clone(),
let transaction_pool = Arc::from(
sc_transaction_pool::Builder::new(
task_manager.spawn_essential_handle(),
client.clone(),
config.role.is_authority().into(),
)
.with_options(config.transaction_pool.clone())
.with_prometheus(config.prometheus_registry())
.build(),
);

let block_import = ParachainBlockImport::new(client.clone(), backend.clone());
Expand Down Expand Up @@ -184,7 +187,7 @@ pub(crate) trait NodeSpec: BaseNodeSpec {
type BuildRpcExtensions: BuildRpcExtensions<
ParachainClient<Self::Block, Self::RuntimeApi>,
ParachainBackend<Self::Block>,
FullPool<Self::Block, ParachainClient<Self::Block, Self::RuntimeApi>>,
TransactionPoolHandle<Self::Block, ParachainClient<Self::Block, Self::RuntimeApi>>,
>;

type StartConsensus: StartConsensus<Self::Block, Self::RuntimeApi>;
Expand Down
4 changes: 2 additions & 2 deletions cumulus/polkadot-omni-node/lib/src/common/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use sc_consensus::DefaultImportQueue;
use sc_executor::WasmExecutor;
use sc_service::{PartialComponents, TFullBackend, TFullClient};
use sc_telemetry::{Telemetry, TelemetryWorkerHandle};
use sc_transaction_pool::FullPool;
use sc_transaction_pool::TransactionPoolHandle;
use sp_runtime::{generic, traits::BlakeTwo256};
use std::sync::Arc;

Expand Down Expand Up @@ -51,6 +51,6 @@ pub type ParachainService<Block, RuntimeApi> = PartialComponents<
ParachainBackend<Block>,
(),
DefaultImportQueue<Block>,
FullPool<Block, ParachainClient<Block, RuntimeApi>>,
TransactionPoolHandle<Block, ParachainClient<Block, RuntimeApi>>,
(ParachainBlockImport<Block, RuntimeApi>, Option<Telemetry>, Option<TelemetryWorkerHandle>),
>;
6 changes: 3 additions & 3 deletions cumulus/polkadot-omni-node/lib/src/nodes/aura.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use sc_consensus::{
};
use sc_service::{Configuration, Error, TaskManager};
use sc_telemetry::TelemetryHandle;
use sc_transaction_pool::FullPool;
use sc_transaction_pool::TransactionPoolHandle;
use sp_api::ProvideRuntimeApi;
use sp_inherents::CreateInherentDataProviders;
use sp_keystore::KeystorePtr;
Expand Down Expand Up @@ -291,7 +291,7 @@ where
telemetry: Option<TelemetryHandle>,
task_manager: &TaskManager,
relay_chain_interface: Arc<dyn RelayChainInterface>,
transaction_pool: Arc<FullPool<Block, ParachainClient<Block, RuntimeApi>>>,
transaction_pool: Arc<TransactionPoolHandle<Block, ParachainClient<Block, RuntimeApi>>>,
keystore: KeystorePtr,
_relay_chain_slot_duration: Duration,
para_id: ParaId,
Expand Down Expand Up @@ -387,7 +387,7 @@ where
telemetry: Option<TelemetryHandle>,
task_manager: &TaskManager,
relay_chain_interface: Arc<dyn RelayChainInterface>,
transaction_pool: Arc<FullPool<Block, ParachainClient<Block, RuntimeApi>>>,
transaction_pool: Arc<TransactionPoolHandle<Block, ParachainClient<Block, RuntimeApi>>>,
keystore: KeystorePtr,
relay_chain_slot_duration: Duration,
para_id: ParaId,
Expand Down
19 changes: 11 additions & 8 deletions cumulus/test/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ pub type Backend = TFullBackend<Block>;
pub type ParachainBlockImport = TParachainBlockImport<Block, Arc<Client>, Backend>;

/// Transaction pool type used by the test service
pub type TransactionPool = Arc<sc_transaction_pool::FullPool<Block, Client>>;
pub type TransactionPool = Arc<sc_transaction_pool::TransactionPoolHandle<Block, Client>>;

/// Recovery handle that fails regularly to simulate unavailable povs.
pub struct FailingRecoveryHandle {
Expand Down Expand Up @@ -183,7 +183,7 @@ pub type Service = PartialComponents<
Backend,
(),
sc_consensus::import_queue::BasicQueue<Block>,
sc_transaction_pool::FullPool<Block, Client>,
sc_transaction_pool::TransactionPoolHandle<Block, Client>,
ParachainBlockImport,
>;

Expand Down Expand Up @@ -219,12 +219,15 @@ pub fn new_partial(

let block_import = ParachainBlockImport::new(client.clone(), backend.clone());

let transaction_pool = sc_transaction_pool::BasicPool::new_full(
config.transaction_pool.clone(),
config.role.is_authority().into(),
config.prometheus_registry(),
task_manager.spawn_essential_handle(),
client.clone(),
let transaction_pool = Arc::from(
sc_transaction_pool::Builder::new(
task_manager.spawn_essential_handle(),
client.clone(),
config.role.is_authority().into(),
)
.with_options(config.transaction_pool.clone())
.with_prometheus(config.prometheus_registry())
.build(),
);

let slot_duration = sc_consensus_aura::slot_duration(&*client)?;
Expand Down
18 changes: 18 additions & 0 deletions cumulus/zombienet/tests/0008-main.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Allows to manually submit extrinsic to collator.
// Usage:
// zombienet-linux -p native spwan 0008-parachain-extrinsic-gets-finalized.toml
// node 0008-main.js <path-to-generated-tmpdir/zombie.json>

global.zombie = null

const fs = require('fs');
const test = require('./0008-transaction_gets_finalized.js');

if (process.argv.length == 2) {
console.error('Path to zombie.json (generated by zombienet-linux spawn command shall be given)!');
process.exit(1);
}

let networkInfo = JSON.parse(fs.readFileSync(process.argv[2]));

test.run("charlie", networkInfo).then(process.exit)
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[relaychain]
default_image = "{{RELAY_IMAGE}}"
default_command = "polkadot"
chain = "rococo-local"

[[relaychain.nodes]]
name = "alice"
validator = true

[[relaychain.nodes]]
name = "bob"
validator = true

[[parachains]]
id = 2000
cumulus_based = true
chain = "asset-hub-rococo-local"

# run charlie as parachain collator
[[parachains.collators]]
name = "charlie"
validator = true
image = "{{POLKADOT_PARACHAIN_IMAGE}}"
command = "polkadot-parachain"
args = ["--force-authoring", "-ltxpool=trace", "--pool-type=fork-aware"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
Description: Block building
Network: ./0008-parachain_extrinsic_gets_finalized.toml
Creds: config

alice: reports node_roles is 4
bob: reports node_roles is 4
charlie: reports node_roles is 4

alice: reports peers count is at least 1
bob: reports peers count is at least 1

alice: reports block height is at least 5 within 60 seconds
bob: reports block height is at least 5 within 60 seconds
charlie: reports block height is at least 2 within 120 seconds

alice: count of log lines containing "error" is 0 within 2 seconds
bob: count of log lines containing "error" is 0 within 2 seconds
charlie: count of log lines containing "error" is 0 within 2 seconds

charlie: js-script ./0008-transaction_gets_finalized.js within 600 seconds
69 changes: 69 additions & 0 deletions cumulus/zombienet/tests/0008-transaction_gets_finalized.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
//based on: https://polkadot.js.org/docs/api/examples/promise/transfer-events

const assert = require("assert");

async function run(nodeName, networkInfo, args) {
const {wsUri, userDefinedTypes} = networkInfo.nodesByName[nodeName];
// Create the API and wait until ready
var api = null;
var keyring = null;
if (zombie == null) {
const testKeyring = require('@polkadot/keyring/testing');
const { WsProvider, ApiPromise } = require('@polkadot/api');
const provider = new WsProvider(wsUri);
api = await ApiPromise.create({provider});
// Construct the keyring after the API (crypto has an async init)
keyring = testKeyring.createTestKeyring({ type: "sr25519" });
} else {
keyring = new zombie.Keyring({ type: "sr25519" });
api = await zombie.connect(wsUri, userDefinedTypes);
}


// Add Alice to our keyring with a hard-derivation path (empty phrase, so uses dev)
const alice = keyring.addFromUri('//Alice');

// Create an extrinsic:
const extrinsic = api.tx.system.remark("xxx");

let extrinsic_success_event = false;
try {
await new Promise( async (resolve, reject) => {
const unsubscribe = await extrinsic
.signAndSend(alice, { nonce: -1 }, ({ events = [], status }) => {
console.log('Extrinsic status:', status.type);

if (status.isInBlock) {
console.log('Included at block hash', status.asInBlock.toHex());
console.log('Events:');

events.forEach(({ event: { data, method, section }, phase }) => {
console.log('\t', phase.toString(), `: ${section}.${method}`, data.toString());

if (section=="system" && method =="ExtrinsicSuccess") {
extrinsic_success_event = true;
}
});
} else if (status.isFinalized) {
console.log('Finalized block hash', status.asFinalized.toHex());
unsubscribe();
if (extrinsic_success_event) {
resolve();
} else {
reject("ExtrinsicSuccess has not been seen");
}
} else if (status.isError) {
unsubscribe();
reject("Extrinsic status.isError");
}

});
});
} catch (error) {
assert.fail("Transfer promise failed, error: " + error);
}

assert.ok("test passed");
}

module.exports = { run }
17 changes: 10 additions & 7 deletions polkadot/node/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ fn new_partial<ChainSelection>(
FullBackend,
ChainSelection,
sc_consensus::DefaultImportQueue<Block>,
sc_transaction_pool::FullPool<Block, FullClient>,
sc_transaction_pool::TransactionPoolHandle<Block, FullClient>,
(
impl Fn(
polkadot_rpc::SubscriptionTaskExecutor,
Expand Down Expand Up @@ -478,12 +478,15 @@ fn new_partial<ChainSelection>(
where
ChainSelection: 'static + SelectChain<Block>,
{
let transaction_pool = sc_transaction_pool::BasicPool::new_full(
config.transaction_pool.clone(),
config.role.is_authority().into(),
config.prometheus_registry(),
task_manager.spawn_essential_handle(),
client.clone(),
let transaction_pool = Arc::from(
sc_transaction_pool::Builder::new(
task_manager.spawn_essential_handle(),
client.clone(),
config.role.is_authority().into(),
)
.with_options(config.transaction_pool.clone())
.with_prometheus(config.prometheus_registry())
.build(),
);

let grandpa_hard_forks = if config.chain_spec.is_kusama() {
Expand Down
Loading

0 comments on commit 26c11fc

Please sign in to comment.