Skip to content

Commit

Permalink
Reduce StatusCache contention.
Browse files Browse the repository at this point in the history
Remove the global RwLock around the status cache, and introduce more
granular RwLocks per-blockhash and per-slot. Additionally, change the
internal hash tables from std HashMap to Dashmap, so that operations at
the blockhash and slot level can be done only holding read locks.
  • Loading branch information
alessandrod committed Dec 9, 2024
1 parent 6c362f6 commit d9fdc54
Show file tree
Hide file tree
Showing 15 changed files with 528 additions and 230 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ bitflags = { version = "2.6.0" }
blake3 = "1.5.5"
borsh = { version = "1.5.3", features = ["derive", "unstable__schema"] }
borsh0-10 = { package = "borsh", version = "0.10.3" }
boxcar = "0.2.7"
bs58 = { version = "0.5.1", default-features = false }
bv = "0.11.1"
byte-unit = "4.0.19"
Expand All @@ -296,7 +297,7 @@ crossbeam-channel = "0.5.13"
csv = "1.3.1"
ctrlc = "3.4.5"
curve25519-dalek = { version = "4.1.3", features = ["digest", "rand_core"] }
dashmap = "5.5.3"
dashmap = { version = "5.5.3", features = ["serde"] }
derivation-path = { version = "0.2.0", default-features = false }
derive-where = "1.2.7"
dialoguer = "0.10.4"
Expand Down
1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ solana-logger = { workspace = true }
solana-net-utils = { workspace = true, features = ["dev-context-only-utils"] }
solana-poh = { workspace = true, features = ["dev-context-only-utils"] }
solana-program-runtime = { workspace = true }
solana-runtime = { workspace = true, features = ["dev-context-only-utils"] }
solana-sdk = { workspace = true, features = ["dev-context-only-utils"] }
solana-stake-program = { workspace = true }
solana-unified-scheduler-pool = { workspace = true, features = [
Expand Down
4 changes: 0 additions & 4 deletions core/tests/snapshots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,11 +363,7 @@ fn test_slots_to_snapshot(snapshot_version: SnapshotVersion, cluster_type: Clust
.unwrap()
.root_bank()
.status_cache
.read()
.unwrap()
.roots()
.iter()
.cloned()
.sorted();
assert!(slots_to_snapshot.into_iter().eq(expected_slots_to_snapshot));
}
Expand Down
9 changes: 9 additions & 0 deletions programs/sbf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ arrayref = { workspace = true }
base64 = { workspace = true }
bincode = { workspace = true }
blake3 = { workspace = true }
boxcar = { workspace = true }
bv = { workspace = true, features = ["serde"] }
bytemuck = { workspace = true }
byteorder = { workspace = true }
Expand Down Expand Up @@ -48,6 +49,7 @@ serde = { workspace = true, features = ["rc"] }
serde_derive = { workspace = true }
serde_json = { workspace = true }
serde_with = { workspace = true }
smallvec = { workspace = true }
solana-accounts-db = { workspace = true }
solana-address-lookup-table-program = { workspace = true }
solana-bpf-loader-program = { workspace = true }
Expand Down
6 changes: 3 additions & 3 deletions runtime/benches/status_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use {

#[bench]
fn bench_status_cache_serialize(bencher: &mut Bencher) {
let mut status_cache = BankStatusCache::default();
let status_cache = BankStatusCache::default();
status_cache.add_root(0);
status_cache.clear();
for hash_index in 0..100 {
Expand All @@ -30,15 +30,15 @@ fn bench_status_cache_serialize(bencher: &mut Bencher) {
status_cache.insert(&blockhash, sig, 0, Ok(()));
}
}
assert!(status_cache.roots().contains(&0));
assert!(status_cache.roots().collect::<Vec<_>>().contains(&0));
bencher.iter(|| {
let _ = serialize(&status_cache.root_slot_deltas()).unwrap();
});
}

#[bench]
fn bench_status_cache_root_slot_deltas(bencher: &mut Bencher) {
let mut status_cache = BankStatusCache::default();
let status_cache = BankStatusCache::default();

// fill the status cache
let slots: Vec<_> = (42..).take(MAX_CACHE_ENTRIES).collect();
Expand Down
29 changes: 14 additions & 15 deletions runtime/src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ struct RentMetrics {
pub type BankStatusCache = StatusCache<Result<()>>;
#[cfg_attr(
feature = "frozen-abi",
frozen_abi(digest = "BHg4qpwegtaJypLUqAdjQYzYeLfEGf6tA4U5cREbHMHi")
frozen_abi(digest = "CQE8Pab7YMwvUj6rjD95kqt9fgBE4mkG7GRUko2DyveD")
)]
pub type BankSlotDelta = SlotDelta<Result<()>>;

Expand Down Expand Up @@ -753,7 +753,7 @@ pub struct Bank {
pub rc: BankRc,

/// A cache of signature statuses
pub status_cache: Arc<RwLock<BankStatusCache>>,
pub status_cache: Arc<BankStatusCache>,

/// FIFO queue of `recent_blockhash` items
blockhash_queue: RwLock<BlockhashQueue>,
Expand Down Expand Up @@ -1102,7 +1102,7 @@ impl Bank {
let mut bank = Self {
skipped_rewrites: Mutex::default(),
rc: BankRc::new(accounts),
status_cache: Arc::<RwLock<BankStatusCache>>::default(),
status_cache: Arc::<BankStatusCache>::default(),
blockhash_queue: RwLock::<BlockhashQueue>::default(),
ancestors: Ancestors::default(),
hash: RwLock::<Hash>::default(),
Expand Down Expand Up @@ -1769,7 +1769,7 @@ impl Bank {
let mut bank = Self {
skipped_rewrites: Mutex::default(),
rc: bank_rc,
status_cache: Arc::<RwLock<BankStatusCache>>::default(),
status_cache: Arc::<BankStatusCache>::default(),
blockhash_queue: RwLock::new(fields.blockhash_queue),
ancestors,
hash: RwLock::new(fields.hash),
Expand Down Expand Up @@ -2046,7 +2046,7 @@ impl Bank {
}

pub fn status_cache_ancestors(&self) -> Vec<u64> {
let mut roots = self.status_cache.read().unwrap().roots().clone();
let mut roots = self.status_cache.roots().collect::<HashSet<_>>();
let min = roots.iter().min().cloned().unwrap_or(0);
for ancestor in self.ancestors.keys() {
if ancestor >= min {
Expand Down Expand Up @@ -3193,7 +3193,7 @@ impl Bank {
let mut squash_cache_time = Measure::start("squash_cache_time");
roots
.iter()
.for_each(|slot| self.status_cache.write().unwrap().add_root(*slot));
.for_each(|slot| self.status_cache.add_root(*slot));
squash_cache_time.stop();

SquashTiming {
Expand Down Expand Up @@ -3459,26 +3459,26 @@ impl Bank {
}

/// Forget all signatures. Useful for benchmarking.
#[cfg(feature = "dev-context-only-utils")]
pub fn clear_signatures(&self) {
self.status_cache.write().unwrap().clear();
self.status_cache.clear();
}

pub fn clear_slot_signatures(&self, slot: Slot) {
self.status_cache.write().unwrap().clear_slot_entries(slot);
self.status_cache.clear_slot_entries(slot);
}

fn update_transaction_statuses(
&self,
sanitized_txs: &[impl TransactionWithMeta],
processing_results: &[TransactionProcessingResult],
) {
let mut status_cache = self.status_cache.write().unwrap();
assert_eq!(sanitized_txs.len(), processing_results.len());
for (tx, processing_result) in sanitized_txs.iter().zip(processing_results) {
if let Ok(processed_tx) = &processing_result {
// Add the message hash to the status cache to ensure that this message
// won't be processed again with a different signature.
status_cache.insert(
self.status_cache.insert(
tx.recent_blockhash(),
tx.message_hash(),
self.slot(),
Expand All @@ -3487,7 +3487,7 @@ impl Bank {
// Add the transaction signature to the status cache so that transaction status
// can be queried by transaction signature over RPC. In the future, this should
// only be added for API nodes because voting validators don't need to do this.
status_cache.insert(
self.status_cache.insert(
tx.recent_blockhash(),
tx.signature(),
self.slot(),
Expand Down Expand Up @@ -5635,15 +5635,14 @@ impl Bank {
signature: &Signature,
blockhash: &Hash,
) -> Option<Result<()>> {
let rcache = self.status_cache.read().unwrap();
rcache
self.status_cache
.get_status(signature, blockhash, &self.ancestors)
.map(|v| v.1)
}

pub fn get_signature_status_slot(&self, signature: &Signature) -> Option<(Slot, Result<()>)> {
let rcache = self.status_cache.read().unwrap();
rcache.get_status_any_blockhash(signature, &self.ancestors)
self.status_cache
.get_status_any_blockhash(signature, &self.ancestors)
}

pub fn get_signature_status(&self, signature: &Signature) -> Option<Result<()>> {
Expand Down
3 changes: 1 addition & 2 deletions runtime/src/bank/check_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,13 +191,12 @@ impl Bank {
) -> Vec<TransactionCheckResult> {
// Do allocation before acquiring the lock on the status cache.
let mut check_results = Vec::with_capacity(sanitized_txs.len());
let rcache = self.status_cache.read().unwrap();

check_results.extend(sanitized_txs.iter().zip(lock_results).map(
|(sanitized_tx, lock_result)| {
let sanitized_tx = sanitized_tx.borrow();
if lock_result.is_ok()
&& self.is_transaction_already_processed(sanitized_tx, &rcache)
&& self.is_transaction_already_processed(sanitized_tx, &self.status_cache)
{
error_counters.already_processed += 1;
return Err(TransactionError::AlreadyProcessed);
Expand Down
3 changes: 1 addition & 2 deletions runtime/src/bank_forks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -452,8 +452,7 @@ impl BankForks {
if bank.is_startup_verification_complete() {
// Save off the status cache because these may get pruned if another
// `set_root()` is called before the snapshots package can be generated
let status_cache_slot_deltas =
bank.status_cache.read().unwrap().root_slot_deltas();
let status_cache_slot_deltas = bank.status_cache.root_slot_deltas();
if let Err(e) =
accounts_background_request_sender.send_snapshot_request(SnapshotRequest {
snapshot_root_bank: Arc::clone(bank),
Expand Down
8 changes: 4 additions & 4 deletions runtime/src/snapshot_bank_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ fn rebuild_bank_from_unarchived_snapshots(

verify_slot_deltas(slot_deltas.as_slice(), &bank)?;

bank.status_cache.write().unwrap().append(&slot_deltas);
bank.status_cache.append(&slot_deltas);

info!("Rebuilt bank for slot: {}", bank.slot());
Ok((
Expand Down Expand Up @@ -686,7 +686,7 @@ fn rebuild_bank_from_snapshot(

verify_slot_deltas(slot_deltas.as_slice(), &bank)?;

bank.status_cache.write().unwrap().append(&slot_deltas);
bank.status_cache.append(&slot_deltas);

info!("Rebuilt bank for slot: {}", bank.slot());
Ok((
Expand Down Expand Up @@ -912,7 +912,7 @@ fn bank_to_full_snapshot_archive_with(
bank.update_accounts_hash(CalcAccountsHashDataSource::Storages, false, false);

let snapshot_storages = bank.get_snapshot_storages(None);
let status_cache_slot_deltas = bank.status_cache.read().unwrap().root_slot_deltas();
let status_cache_slot_deltas = bank.status_cache.root_slot_deltas();
let accounts_package = AccountsPackage::new_for_snapshot(
AccountsPackageKind::Snapshot(SnapshotKind::FullSnapshot),
bank,
Expand Down Expand Up @@ -975,7 +975,7 @@ pub fn bank_to_incremental_snapshot_archive(
bank.update_incremental_accounts_hash(full_snapshot_slot);

let snapshot_storages = bank.get_snapshot_storages(Some(full_snapshot_slot));
let status_cache_slot_deltas = bank.status_cache.read().unwrap().root_slot_deltas();
let status_cache_slot_deltas = bank.status_cache.root_slot_deltas();
let accounts_package = AccountsPackage::new_for_snapshot(
AccountsPackageKind::Snapshot(SnapshotKind::IncrementalSnapshot(full_snapshot_slot)),
bank,
Expand Down
Loading

0 comments on commit d9fdc54

Please sign in to comment.