diff --git a/core/store/src/lib.rs b/core/store/src/lib.rs index e3eadf2a79d..e0db078c5e7 100644 --- a/core/store/src/lib.rs +++ b/core/store/src/lib.rs @@ -574,7 +574,15 @@ impl StoreUpdate { self.transaction.merge(other.transaction) } - #[tracing::instrument(level = "trace", target = "store", "StoreUpdate::commit", skip_all)] + #[tracing::instrument( + level = "trace", + // FIXME: start moving things into tighter modules so that its easier to selectively trace + // specific things. + target = "store::update", + "StoreUpdate::commit", + skip_all, + fields(transaction.ops.len = self.transaction.ops.len()) + )] pub fn commit(self) -> io::Result<()> { debug_assert!( { diff --git a/core/store/src/trie/shard_tries.rs b/core/store/src/trie/shard_tries.rs index 4a557bbfded..39e87a5261d 100644 --- a/core/store/src/trie/shard_tries.rs +++ b/core/store/src/trie/shard_tries.rs @@ -21,7 +21,7 @@ use near_primitives::types::{ use rayon::prelude::{IntoParallelRefIterator, ParallelIterator}; use std::collections::HashMap; use std::rc::Rc; -use std::sync::{Arc, RwLock}; +use std::sync::{Arc, Mutex, RwLock}; use tracing::info; struct ShardTriesInner { @@ -29,9 +29,9 @@ struct ShardTriesInner { trie_config: TrieConfig, mem_tries: RwLock>>>, /// Cache reserved for client actor to use - caches: RwLock>, + caches: Mutex>, /// Cache for readers. - view_caches: RwLock>, + view_caches: Mutex>, flat_storage_manager: FlatStorageManager, /// Prefetcher state, such as IO threads, per shard. prefetchers: RwLock>, @@ -62,8 +62,8 @@ impl ShardTries { store, trie_config, mem_tries: RwLock::new(HashMap::new()), - caches: RwLock::new(caches), - view_caches: RwLock::new(view_caches), + caches: Mutex::new(caches), + view_caches: Mutex::new(view_caches), flat_storage_manager, prefetchers: Default::default(), state_snapshot: Arc::new(RwLock::new(None)), @@ -91,6 +91,22 @@ impl ShardTries { TrieUpdate::new(self.get_view_trie_for_shard(shard_uid, state_root)) } + #[tracing::instrument( + level = "trace", + target = "store::trie::shard_tries", + "ShardTries::get_trie_cache_for", + skip_all, + fields(is_view) + )] + fn get_trie_cache_for(&self, shard_uid: ShardUId, is_view: bool) -> TrieCache { + let caches_to_use = if is_view { &self.0.view_caches } else { &self.0.caches }; + let mut caches = caches_to_use.lock().expect(POISONED_LOCK_ERR); + caches + .entry(shard_uid) + .or_insert_with(|| TrieCache::new(&self.0.trie_config, shard_uid, is_view)) + .clone() + } + fn get_trie_for_shard_internal( &self, shard_uid: ShardUId, @@ -98,20 +114,7 @@ impl ShardTries { is_view: bool, block_hash: Option, ) -> Trie { - let caches_to_use = if is_view { &self.0.view_caches } else { &self.0.caches }; - let cache = { - let shard_uid_exists = - caches_to_use.read().expect(POISONED_LOCK_ERR).contains_key(&shard_uid); - if shard_uid_exists { - caches_to_use.read().expect(POISONED_LOCK_ERR)[&shard_uid].clone() - } else { - let mut caches = caches_to_use.write().expect(POISONED_LOCK_ERR); - caches - .entry(shard_uid) - .or_insert_with(|| TrieCache::new(&self.0.trie_config, shard_uid, is_view)) - .clone() - } - }; + let cache = self.get_trie_cache_for(shard_uid, is_view); // Do not enable prefetching on view caches. // 1) Performance of view calls is not crucial. // 2) A lot of the prefetcher code assumes there is only one "main-thread" per shard active. @@ -164,13 +167,7 @@ impl ShardTries { block_hash: &CryptoHash, ) -> Result { let (store, flat_storage_manager) = self.get_state_snapshot(block_hash)?; - let cache = { - let mut caches = self.0.view_caches.write().expect(POISONED_LOCK_ERR); - caches - .entry(shard_uid) - .or_insert_with(|| TrieCache::new(&self.0.trie_config, shard_uid, true)) - .clone() - }; + let cache = self.get_trie_cache_for(shard_uid, true); let storage = Rc::new(TrieCachingStorage::new(store, cache, shard_uid, true, None)); let flat_storage_chunk_view = flat_storage_manager.chunk_view(shard_uid, *block_hash); @@ -217,32 +214,14 @@ impl ShardTries { #[tracing::instrument( level = "trace", - target = "store::trie", + target = "store::trie::shard_tries", "ShardTries::update_cache", - skip_all + skip_all, + fields(ops.len = ops.len()), )] pub fn update_cache(&self, ops: Vec<(&CryptoHash, Option<&[u8]>)>, shard_uid: ShardUId) { - // First acquire a read lock to see if the shard uid exists. The shard uid may not exist due to resharding - let shard_uid_exists = { - let caches = self.0.caches.read().expect(POISONED_LOCK_ERR); - caches.contains_key(&shard_uid) - }; - // If the shard uid exists, we acquire a read lock to update the trie cache for the corresponding shard. - // If the shard uid does not exist, then we acquire a write lock to expand the cache with the new shard uid as a key. - if shard_uid_exists { - let caches = self.0.caches.read().expect(POISONED_LOCK_ERR); - match caches.get(&shard_uid) { - Some(cache) => cache.update_cache(ops), - None => debug_assert!(false, "key existence has been checked"), - } - } else { - let mut caches = self.0.caches.write().expect(POISONED_LOCK_ERR); - let cache = caches - .entry(shard_uid) - .or_insert_with(|| TrieCache::new(&self.0.trie_config, shard_uid, false)) - .clone(); - cache.update_cache(ops); - } + let cache = self.get_trie_cache_for(shard_uid, false); + cache.update_cache(ops); } fn apply_deletions_inner( @@ -300,7 +279,7 @@ impl ShardTries { #[tracing::instrument( level = "trace", - target = "store::trie", + target = "store::trie::shard_tries", "ShardTries::apply_insertions", fields(num_insertions = trie_changes.insertions().len(), shard_id = shard_uid.shard_id()), skip_all, @@ -323,7 +302,7 @@ impl ShardTries { #[tracing::instrument( level = "trace", - target = "store::trie", + target = "store::trie::shard_tries", "ShardTries::apply_deletions", fields(num_deletions = trie_changes.deletions().len(), shard_id = shard_uid.shard_id()), skip_all, @@ -412,8 +391,8 @@ impl ShardTries { /// Note that flat storage needs to be handled separately pub fn delete_trie_for_shard(&self, shard_uid: ShardUId, store_update: &mut StoreUpdate) { // Clear both caches and remove state values from store - self.0.caches.write().expect(POISONED_LOCK_ERR).remove(&shard_uid); - self.0.view_caches.write().expect(POISONED_LOCK_ERR).remove(&shard_uid); + let _cache = self.0.caches.lock().expect(POISONED_LOCK_ERR).remove(&shard_uid); + let _view_cache = self.0.view_caches.lock().expect(POISONED_LOCK_ERR).remove(&shard_uid); remove_all_state_values(store_update, shard_uid); } @@ -572,7 +551,7 @@ impl WrappedTrieChanges { /// NOTE: the changes are drained from `self`. #[tracing::instrument( level = "debug", - target = "trie", + target = "store::trie::shard_tries", "ShardTries::state_changes_into", fields(num_state_changes = self.state_changes.len(), shard_id = self.shard_uid.shard_id()), skip_all, @@ -620,7 +599,7 @@ impl WrappedTrieChanges { #[tracing::instrument( level = "debug", - target = "trie", + target = "store::trie::shard_tries", "ShardTries::trie_changes_into", skip_all )] @@ -849,26 +828,26 @@ mod test { let tries = create_trie(); let trie_caches = &tries.0.caches; // Assert only one cache for one shard exists - assert_eq!(trie_caches.read().unwrap().len(), 1); + assert_eq!(trie_caches.lock().unwrap().len(), 1); // Assert the shard uid is correct - assert!(trie_caches.read().unwrap().get(&shard_uid).is_some()); + assert!(trie_caches.lock().unwrap().get(&shard_uid).is_some()); // Read from cache let key = CryptoHash::hash_borsh("alice"); let val: Vec = Vec::from([0, 1, 2, 3, 4]); - assert!(trie_caches.read().unwrap().get(&shard_uid).unwrap().get(&key).is_none()); + assert!(trie_caches.lock().unwrap().get(&shard_uid).unwrap().get(&key).is_none()); let insert_ops = Vec::from([(&key, Some(val.as_slice()))]); tries.update_cache(insert_ops, shard_uid); assert_eq!( - trie_caches.read().unwrap().get(&shard_uid).unwrap().get(&key).unwrap().to_vec(), + trie_caches.lock().unwrap().get(&shard_uid).unwrap().get(&key).unwrap().to_vec(), val ); let deletions_ops = Vec::from([(&key, None)]); tries.update_cache(deletions_ops, shard_uid); - assert!(trie_caches.read().unwrap().get(&shard_uid).unwrap().get(&key).is_none()); + assert!(trie_caches.lock().unwrap().get(&shard_uid).unwrap().get(&key).is_none()); } #[test] @@ -903,7 +882,7 @@ mod test { let insert_ops = Vec::from([(&key, Some(val.as_slice()))]); trie.update_cache(insert_ops, shard_uid); assert_eq!( - trie_caches.read().unwrap().get(&shard_uid).unwrap().get(&key).unwrap().to_vec(), + trie_caches.lock().unwrap().get(&shard_uid).unwrap().get(&key).unwrap().to_vec(), val ); @@ -912,7 +891,7 @@ mod test { let val: Vec = vec![0; TrieConfig::max_cached_value_size()]; let insert_ops = Vec::from([(&key, Some(val.as_slice()))]); trie.update_cache(insert_ops, shard_uid); - assert!(trie_caches.read().unwrap().get(&shard_uid).unwrap().get(&key).is_none()); + assert!(trie_caches.lock().unwrap().get(&shard_uid).unwrap().get(&key).is_none()); } #[test] @@ -936,8 +915,8 @@ mod test { store_update.commit().unwrap(); // verify if data and caches are deleted - assert!(tries.0.caches.read().unwrap().get(&shard_uid).is_none()); - assert!(tries.0.view_caches.read().unwrap().get(&shard_uid).is_none()); + assert!(tries.0.caches.lock().unwrap().get(&shard_uid).is_none()); + assert!(tries.0.view_caches.lock().unwrap().get(&shard_uid).is_none()); let store = tries.get_store(); let key_prefix = shard_uid.to_bytes(); let mut iter = store.iter_prefix(DBCol::State, &key_prefix); diff --git a/core/store/src/trie/trie_storage.rs b/core/store/src/trie/trie_storage.rs index ae2154ccc28..3e177c5dc59 100644 --- a/core/store/src/trie/trie_storage.rs +++ b/core/store/src/trie/trie_storage.rs @@ -117,8 +117,11 @@ impl TrieCacheInner { shard_cache_deletions_size: metrics::SHARD_CACHE_DELETIONS_SIZE .with_label_values(&metrics_labels), }; + // Assuming the values are actual all empty and we store a full hashmap of overhead. + let max_elements = total_size_limit.div_ceil(Self::PER_ENTRY_OVERHEAD); + let max_elements = usize::try_from(max_elements).unwrap(); Self { - cache: LruCache::unbounded(), + cache: LruCache::new(max_elements), deletions: BoundedQueue::new(deletions_queue_capacity), total_size: 0, total_size_limit,