Skip to content

Commit

Permalink
Preallocate the LruCache inside the TrieCache and use a Mutex for t…
Browse files Browse the repository at this point in the history
…rie caches (near#10995)

This achieves two distinct things:

1. With the Mutex-based lock around the hashmap of `TrieCache`s we now
only hold the lock strictly for as long as necessary, which *may* be as
long as it is necessary to create a `TrieCache`, but that's a one-time
occurrence outside of resharding scenarios as far as I can tell.
2. With the pre-allocation of the `LruCache` we get rid of the
occasionally-very-expensive reallocation of the backing store of this
data.

---

I haven't been running this code for very long, but these are what the
graphs look now (lowpass filter of 5ms for the bottom graph which shows
span durations, so for the new graph this means that most of the stuff
is under 5ms;; the top graph shows the "current" size of the shard trie
caches.)

Old(?; I don't actually know what code this node is running):


![image](https://github.com/near/nearcore/assets/679122/6b101adf-f77f-49bc-a188-b11bb784645a)

New:


![image](https://github.com/near/nearcore/assets/679122/5b9a9251-9713-403a-9f50-64108f9c29cf)

Most notably gone are the occasional blips up-and-above 200ms range per
call.

Fixes  near#10992

---------

Co-authored-by: Bowen Wang <bowen@near.org>
  • Loading branch information
nagisa and bowenwang1996 authored Apr 9, 2024
1 parent 4fad665 commit 246f7cc
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 66 deletions.
10 changes: 9 additions & 1 deletion core/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,15 @@ impl StoreUpdate {
self.transaction.merge(other.transaction)
}

#[tracing::instrument(level = "trace", target = "store", "StoreUpdate::commit", skip_all)]
#[tracing::instrument(
level = "trace",
// FIXME: start moving things into tighter modules so that its easier to selectively trace
// specific things.
target = "store::update",
"StoreUpdate::commit",
skip_all,
fields(transaction.ops.len = self.transaction.ops.len())
)]
pub fn commit(self) -> io::Result<()> {
debug_assert!(
{
Expand Down
107 changes: 43 additions & 64 deletions core/store/src/trie/shard_tries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@ use near_primitives::types::{
use rayon::prelude::{IntoParallelRefIterator, ParallelIterator};
use std::collections::HashMap;
use std::rc::Rc;
use std::sync::{Arc, RwLock};
use std::sync::{Arc, Mutex, RwLock};
use tracing::info;

struct ShardTriesInner {
store: Store,
trie_config: TrieConfig,
mem_tries: RwLock<HashMap<ShardUId, Arc<RwLock<MemTries>>>>,
/// Cache reserved for client actor to use
caches: RwLock<HashMap<ShardUId, TrieCache>>,
caches: Mutex<HashMap<ShardUId, TrieCache>>,
/// Cache for readers.
view_caches: RwLock<HashMap<ShardUId, TrieCache>>,
view_caches: Mutex<HashMap<ShardUId, TrieCache>>,
flat_storage_manager: FlatStorageManager,
/// Prefetcher state, such as IO threads, per shard.
prefetchers: RwLock<HashMap<ShardUId, (PrefetchApi, PrefetchingThreadsHandle)>>,
Expand Down Expand Up @@ -62,8 +62,8 @@ impl ShardTries {
store,
trie_config,
mem_tries: RwLock::new(HashMap::new()),
caches: RwLock::new(caches),
view_caches: RwLock::new(view_caches),
caches: Mutex::new(caches),
view_caches: Mutex::new(view_caches),
flat_storage_manager,
prefetchers: Default::default(),
state_snapshot: Arc::new(RwLock::new(None)),
Expand Down Expand Up @@ -91,27 +91,30 @@ impl ShardTries {
TrieUpdate::new(self.get_view_trie_for_shard(shard_uid, state_root))
}

#[tracing::instrument(
level = "trace",
target = "store::trie::shard_tries",
"ShardTries::get_trie_cache_for",
skip_all,
fields(is_view)
)]
fn get_trie_cache_for(&self, shard_uid: ShardUId, is_view: bool) -> TrieCache {
let caches_to_use = if is_view { &self.0.view_caches } else { &self.0.caches };
let mut caches = caches_to_use.lock().expect(POISONED_LOCK_ERR);
caches
.entry(shard_uid)
.or_insert_with(|| TrieCache::new(&self.0.trie_config, shard_uid, is_view))
.clone()
}

fn get_trie_for_shard_internal(
&self,
shard_uid: ShardUId,
state_root: StateRoot,
is_view: bool,
block_hash: Option<CryptoHash>,
) -> Trie {
let caches_to_use = if is_view { &self.0.view_caches } else { &self.0.caches };
let cache = {
let shard_uid_exists =
caches_to_use.read().expect(POISONED_LOCK_ERR).contains_key(&shard_uid);
if shard_uid_exists {
caches_to_use.read().expect(POISONED_LOCK_ERR)[&shard_uid].clone()
} else {
let mut caches = caches_to_use.write().expect(POISONED_LOCK_ERR);
caches
.entry(shard_uid)
.or_insert_with(|| TrieCache::new(&self.0.trie_config, shard_uid, is_view))
.clone()
}
};
let cache = self.get_trie_cache_for(shard_uid, is_view);
// Do not enable prefetching on view caches.
// 1) Performance of view calls is not crucial.
// 2) A lot of the prefetcher code assumes there is only one "main-thread" per shard active.
Expand Down Expand Up @@ -164,13 +167,7 @@ impl ShardTries {
block_hash: &CryptoHash,
) -> Result<Trie, StorageError> {
let (store, flat_storage_manager) = self.get_state_snapshot(block_hash)?;
let cache = {
let mut caches = self.0.view_caches.write().expect(POISONED_LOCK_ERR);
caches
.entry(shard_uid)
.or_insert_with(|| TrieCache::new(&self.0.trie_config, shard_uid, true))
.clone()
};
let cache = self.get_trie_cache_for(shard_uid, true);
let storage = Rc::new(TrieCachingStorage::new(store, cache, shard_uid, true, None));
let flat_storage_chunk_view = flat_storage_manager.chunk_view(shard_uid, *block_hash);

Expand Down Expand Up @@ -217,32 +214,14 @@ impl ShardTries {

#[tracing::instrument(
level = "trace",
target = "store::trie",
target = "store::trie::shard_tries",
"ShardTries::update_cache",
skip_all
skip_all,
fields(ops.len = ops.len()),
)]
pub fn update_cache(&self, ops: Vec<(&CryptoHash, Option<&[u8]>)>, shard_uid: ShardUId) {
// First acquire a read lock to see if the shard uid exists. The shard uid may not exist due to resharding
let shard_uid_exists = {
let caches = self.0.caches.read().expect(POISONED_LOCK_ERR);
caches.contains_key(&shard_uid)
};
// If the shard uid exists, we acquire a read lock to update the trie cache for the corresponding shard.
// If the shard uid does not exist, then we acquire a write lock to expand the cache with the new shard uid as a key.
if shard_uid_exists {
let caches = self.0.caches.read().expect(POISONED_LOCK_ERR);
match caches.get(&shard_uid) {
Some(cache) => cache.update_cache(ops),
None => debug_assert!(false, "key existence has been checked"),
}
} else {
let mut caches = self.0.caches.write().expect(POISONED_LOCK_ERR);
let cache = caches
.entry(shard_uid)
.or_insert_with(|| TrieCache::new(&self.0.trie_config, shard_uid, false))
.clone();
cache.update_cache(ops);
}
let cache = self.get_trie_cache_for(shard_uid, false);
cache.update_cache(ops);
}

fn apply_deletions_inner(
Expand Down Expand Up @@ -300,7 +279,7 @@ impl ShardTries {

#[tracing::instrument(
level = "trace",
target = "store::trie",
target = "store::trie::shard_tries",
"ShardTries::apply_insertions",
fields(num_insertions = trie_changes.insertions().len(), shard_id = shard_uid.shard_id()),
skip_all,
Expand All @@ -323,7 +302,7 @@ impl ShardTries {

#[tracing::instrument(
level = "trace",
target = "store::trie",
target = "store::trie::shard_tries",
"ShardTries::apply_deletions",
fields(num_deletions = trie_changes.deletions().len(), shard_id = shard_uid.shard_id()),
skip_all,
Expand Down Expand Up @@ -412,8 +391,8 @@ impl ShardTries {
/// Note that flat storage needs to be handled separately
pub fn delete_trie_for_shard(&self, shard_uid: ShardUId, store_update: &mut StoreUpdate) {
// Clear both caches and remove state values from store
self.0.caches.write().expect(POISONED_LOCK_ERR).remove(&shard_uid);
self.0.view_caches.write().expect(POISONED_LOCK_ERR).remove(&shard_uid);
let _cache = self.0.caches.lock().expect(POISONED_LOCK_ERR).remove(&shard_uid);
let _view_cache = self.0.view_caches.lock().expect(POISONED_LOCK_ERR).remove(&shard_uid);
remove_all_state_values(store_update, shard_uid);
}

Expand Down Expand Up @@ -572,7 +551,7 @@ impl WrappedTrieChanges {
/// NOTE: the changes are drained from `self`.
#[tracing::instrument(
level = "debug",
target = "trie",
target = "store::trie::shard_tries",
"ShardTries::state_changes_into",
fields(num_state_changes = self.state_changes.len(), shard_id = self.shard_uid.shard_id()),
skip_all,
Expand Down Expand Up @@ -620,7 +599,7 @@ impl WrappedTrieChanges {

#[tracing::instrument(
level = "debug",
target = "trie",
target = "store::trie::shard_tries",
"ShardTries::trie_changes_into",
skip_all
)]
Expand Down Expand Up @@ -849,26 +828,26 @@ mod test {
let tries = create_trie();
let trie_caches = &tries.0.caches;
// Assert only one cache for one shard exists
assert_eq!(trie_caches.read().unwrap().len(), 1);
assert_eq!(trie_caches.lock().unwrap().len(), 1);
// Assert the shard uid is correct
assert!(trie_caches.read().unwrap().get(&shard_uid).is_some());
assert!(trie_caches.lock().unwrap().get(&shard_uid).is_some());

// Read from cache
let key = CryptoHash::hash_borsh("alice");
let val: Vec<u8> = Vec::from([0, 1, 2, 3, 4]);

assert!(trie_caches.read().unwrap().get(&shard_uid).unwrap().get(&key).is_none());
assert!(trie_caches.lock().unwrap().get(&shard_uid).unwrap().get(&key).is_none());

let insert_ops = Vec::from([(&key, Some(val.as_slice()))]);
tries.update_cache(insert_ops, shard_uid);
assert_eq!(
trie_caches.read().unwrap().get(&shard_uid).unwrap().get(&key).unwrap().to_vec(),
trie_caches.lock().unwrap().get(&shard_uid).unwrap().get(&key).unwrap().to_vec(),
val
);

let deletions_ops = Vec::from([(&key, None)]);
tries.update_cache(deletions_ops, shard_uid);
assert!(trie_caches.read().unwrap().get(&shard_uid).unwrap().get(&key).is_none());
assert!(trie_caches.lock().unwrap().get(&shard_uid).unwrap().get(&key).is_none());
}

#[test]
Expand Down Expand Up @@ -903,7 +882,7 @@ mod test {
let insert_ops = Vec::from([(&key, Some(val.as_slice()))]);
trie.update_cache(insert_ops, shard_uid);
assert_eq!(
trie_caches.read().unwrap().get(&shard_uid).unwrap().get(&key).unwrap().to_vec(),
trie_caches.lock().unwrap().get(&shard_uid).unwrap().get(&key).unwrap().to_vec(),
val
);

Expand All @@ -912,7 +891,7 @@ mod test {
let val: Vec<u8> = vec![0; TrieConfig::max_cached_value_size()];
let insert_ops = Vec::from([(&key, Some(val.as_slice()))]);
trie.update_cache(insert_ops, shard_uid);
assert!(trie_caches.read().unwrap().get(&shard_uid).unwrap().get(&key).is_none());
assert!(trie_caches.lock().unwrap().get(&shard_uid).unwrap().get(&key).is_none());
}

#[test]
Expand All @@ -936,8 +915,8 @@ mod test {
store_update.commit().unwrap();

// verify if data and caches are deleted
assert!(tries.0.caches.read().unwrap().get(&shard_uid).is_none());
assert!(tries.0.view_caches.read().unwrap().get(&shard_uid).is_none());
assert!(tries.0.caches.lock().unwrap().get(&shard_uid).is_none());
assert!(tries.0.view_caches.lock().unwrap().get(&shard_uid).is_none());
let store = tries.get_store();
let key_prefix = shard_uid.to_bytes();
let mut iter = store.iter_prefix(DBCol::State, &key_prefix);
Expand Down
5 changes: 4 additions & 1 deletion core/store/src/trie/trie_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,11 @@ impl TrieCacheInner {
shard_cache_deletions_size: metrics::SHARD_CACHE_DELETIONS_SIZE
.with_label_values(&metrics_labels),
};
// Assuming the values are actual all empty and we store a full hashmap of overhead.
let max_elements = total_size_limit.div_ceil(Self::PER_ENTRY_OVERHEAD);
let max_elements = usize::try_from(max_elements).unwrap();
Self {
cache: LruCache::unbounded(),
cache: LruCache::new(max_elements),
deletions: BoundedQueue::new(deletions_queue_capacity),
total_size: 0,
total_size_limit,
Expand Down

0 comments on commit 246f7cc

Please sign in to comment.