Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update height_expiration_txs to not grow indefinitely #2579

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

### Added
- [2551](https://github.com/FuelLabs/fuel-core/pull/2551): Enhanced the DA compressed block header to include block id.
- [2579](https://github.com/FuelLabs/fuel-core/pull/2579): Clear expiration txs cache in transaction pool based on inserted transactions

### Fixed
- [2609](https://github.com/FuelLabs/fuel-core/pull/2609): Check response before trying to deserialize, return error instead
Expand Down
7 changes: 5 additions & 2 deletions crates/services/txpool_v2/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,8 @@ where

/// Remove transaction but keep its dependents.
/// The dependents become executables.
pub fn remove_transaction(&mut self, tx_ids: Vec<TxId>) {
pub fn remove_transactions(&mut self, tx_ids: Vec<TxId>) -> Vec<ArcPoolTx> {
let mut removed_transactions = Vec::with_capacity(tx_ids.len());
for tx_id in tx_ids {
if let Some(storage_id) = self.tx_id_to_storage_id.remove(&tx_id) {
let dependents: Vec<S::StorageIndex> =
Expand Down Expand Up @@ -383,10 +384,12 @@ where
.new_executable_transaction(dependent, storage_data);
}
self.update_components_and_caches_on_removal(iter::once(&transaction));
removed_transactions.push(transaction.transaction);
}
}

self.update_stats();
removed_transactions
}

/// Check if the pool has enough space to store a transaction.
Expand Down Expand Up @@ -525,7 +528,7 @@ where
/// Remove transaction and its dependents.
pub fn remove_transaction_and_dependents(
&mut self,
tx_ids: Vec<TxId>,
tx_ids: impl Iterator<Item = TxId>,
) -> Vec<ArcPoolTx> {
let mut removed_transactions = vec![];
for tx_id in tx_ids {
Expand Down
47 changes: 36 additions & 11 deletions crates/services/txpool_v2/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,16 +315,29 @@ where
{
fn import_block(&mut self, result: SharedImportResult) {
let new_height = *result.sealed_block.entity.header().height();
let executed_transaction = result.tx_status.iter().map(|s| s.id).collect();
let executed_transactions = result.tx_status.iter().map(|s| s.id).collect();
// We don't want block importer way for us to process the result.
drop(result);

{
let removed_transactions = {
let mut tx_pool = self.pool.write();
tx_pool.remove_transaction(executed_transaction);
let removed_transactions = tx_pool.remove_transactions(executed_transactions);
if !tx_pool.is_empty() {
self.shared_state.new_txs_notifier.send_replace(());
}
removed_transactions
};
if !removed_transactions.is_empty() {
rafal-ch marked this conversation as resolved.
Show resolved Hide resolved
let mut height_expiration_txs = self.pruner.height_expiration_txs.write();
for tx in removed_transactions.into_iter() {
let expiration = tx.expiration();
if expiration < u32::MAX.into() {
if let Some(expired_txs) = height_expiration_txs.get_mut(&expiration)
{
expired_txs.remove(&tx.id());
}
}
}
}

{
Expand All @@ -345,8 +358,10 @@ where
let expired_txs = height_expiration_txs.remove(&height);
if let Some(expired_txs) = expired_txs {
let mut tx_pool = self.pool.write();
removed_txs
.extend(tx_pool.remove_transaction_and_dependents(expired_txs));
removed_txs.extend(
tx_pool
.remove_transaction_and_dependents(expired_txs.into_iter()),
);
}
}
}
Expand Down Expand Up @@ -486,7 +501,7 @@ where
if expiration < u32::MAX.into() {
let mut lock = height_expiration_txs.write();
let block_height_expiration = lock.entry(expiration).or_default();
block_height_expiration.push(tx_id);
block_height_expiration.insert(tx_id);
}

let duration = submitted_time
Expand Down Expand Up @@ -662,13 +677,23 @@ where
let removed;
{
let mut pool = self.pool.write();
removed = pool.remove_transaction_and_dependents(txs_to_remove);
removed = pool.remove_transaction_and_dependents(txs_to_remove.into_iter());
}

for tx in removed {
self.shared_state
.tx_status_sender
.send_squeezed_out(tx.id(), Error::Removed(RemovedReason::Ttl));
if !removed.is_empty() {
let mut height_expiration_txs = self.pruner.height_expiration_txs.write();
for tx in removed {
let expiration = tx.expiration();
if expiration < u32::MAX.into() {
if let Some(expired_txs) = height_expiration_txs.get_mut(&expiration)
{
expired_txs.remove(&tx.id());
}
}
self.shared_state
.tx_status_sender
.send_squeezed_out(tx.id(), Error::Removed(RemovedReason::Ttl));
}
}

{
Expand Down
3 changes: 2 additions & 1 deletion crates/services/txpool_v2/src/service/pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ use fuel_core_types::{
use std::{
collections::{
BTreeMap,
HashSet,
VecDeque,
},
time::SystemTime,
};

pub(super) struct TransactionPruner {
pub time_txs_submitted: Shared<VecDeque<(SystemTime, TxId)>>,
pub height_expiration_txs: Shared<BTreeMap<BlockHeight, Vec<TxId>>>,
pub height_expiration_txs: Shared<BTreeMap<BlockHeight, HashSet<TxId>>>,
pub ttl_timer: tokio::time::Interval,
pub txs_ttl: tokio::time::Duration,
}
Loading