diff --git a/core/store/src/cold_storage.rs b/core/store/src/cold_storage.rs index 3898308fae1..f3d8066d1b6 100644 --- a/core/store/src/cold_storage.rs +++ b/core/store/src/cold_storage.rs @@ -1,6 +1,6 @@ use crate::columns::DBKeyType; use crate::db::{ColdDB, COLD_HEAD_KEY, HEAD_KEY}; -use crate::trie::TrieRefcountChange; +use crate::trie::TrieRefcountAddition; use crate::{metrics, DBCol, DBTransaction, Database, Store, TrieChanges}; use borsh::BorshDeserialize; @@ -475,7 +475,11 @@ impl StoreWithCache<'_> { option_to_not_found(self.get_ser(column, key), format_args!("{:?}: {:?}", column, key)) } - pub fn insert_state_to_cache_from_op(&mut self, op: &TrieRefcountChange, shard_uid_key: &[u8]) { + pub fn insert_state_to_cache_from_op( + &mut self, + op: &TrieRefcountAddition, + shard_uid_key: &[u8], + ) { debug_assert_eq!( DBCol::State.key_type(), &[DBKeyType::ShardUId, DBKeyType::TrieNodeOrValueHash] diff --git a/core/store/src/trie/insert_delete.rs b/core/store/src/trie/insert_delete.rs index 0d7a5ebd776..050d467ac3d 100644 --- a/core/store/src/trie/insert_delete.rs +++ b/core/store/src/trie/insert_delete.rs @@ -1,21 +1,18 @@ -use std::collections::HashMap; - -use borsh::BorshSerialize; - -use near_primitives::hash::{hash, CryptoHash}; -use near_primitives::state::ValueRef; - +use super::TrieRefcountDeltaMap; use crate::trie::nibble_slice::NibbleSlice; use crate::trie::{ Children, NodeHandle, RawTrieNode, RawTrieNodeWithSize, StorageHandle, StorageValueHandle, TrieNode, TrieNodeWithSize, ValueHandle, }; use crate::{StorageError, Trie, TrieChanges}; +use borsh::BorshSerialize; +use near_primitives::hash::{hash, CryptoHash}; +use near_primitives::state::ValueRef; pub(crate) struct NodesStorage { nodes: Vec>, values: Vec>>, - pub(crate) refcount_changes: HashMap, i32)>, + pub(crate) refcount_changes: TrieRefcountDeltaMap, } const INVALID_STORAGE_HANDLE: &str = "invalid storage handle"; @@ -23,7 +20,11 @@ const INVALID_STORAGE_HANDLE: &str = "invalid storage handle"; /// Local mutable storage that owns node objects. impl NodesStorage { pub fn new() -> NodesStorage { - NodesStorage { nodes: Vec::new(), refcount_changes: HashMap::new(), values: Vec::new() } + NodesStorage { + nodes: Vec::new(), + refcount_changes: TrieRefcountDeltaMap::new(), + values: Vec::new(), + } } fn destroy(&mut self, handle: StorageHandle) -> TrieNodeWithSize { @@ -604,14 +605,11 @@ impl Trie { raw_node_with_size.serialize(&mut buffer).unwrap(); let key = hash(&buffer); - let (_value, rc) = - memory.refcount_changes.entry(key).or_insert_with(|| (buffer.clone(), 0)); - *rc += 1; + memory.refcount_changes.add(key, buffer.clone(), 1); buffer.clear(); last_hash = key; } - let (insertions, deletions) = - Trie::convert_to_insertions_and_deletions(memory.refcount_changes); + let (insertions, deletions) = memory.refcount_changes.into_changes(); Ok(TrieChanges { old_root: *old_root, new_root: last_hash, insertions, deletions }) } @@ -621,9 +619,7 @@ impl Trie { let value = memory.value_ref(value_handle).to_vec(); let value_length = value.len() as u32; let value_hash = hash(&value); - let (_value, rc) = - memory.refcount_changes.entry(value_hash).or_insert_with(|| (value, 0)); - *rc += 1; + memory.refcount_changes.add(value_hash, value, 1); ValueRef { length: value_length, hash: value_hash } } ValueHandle::HashAndSize(value) => value, diff --git a/core/store/src/trie/mod.rs b/core/store/src/trie/mod.rs index 652caab2d97..548d5236606 100644 --- a/core/store/src/trie/mod.rs +++ b/core/store/src/trie/mod.rs @@ -26,7 +26,7 @@ pub use raw_node::{Children, RawTrieNode, RawTrieNodeWithSize}; use std::cell::RefCell; use std::collections::HashMap; use std::fmt::Write; -use std::hash::{Hash, Hasher}; +use std::hash::Hash; use std::rc::Rc; use std::str; use std::sync::Arc; @@ -364,21 +364,59 @@ pub trait TrieAccess { fn get(&self, key: &TrieKey) -> Result>, StorageError>; } -/// Stores reference count change for some key-value pair in DB. -#[derive(BorshSerialize, BorshDeserialize, Clone, PartialEq, Eq, PartialOrd, Ord, Debug)] -pub struct TrieRefcountChange { +/// Stores reference count addition for some key-value pair in DB. +#[derive(BorshSerialize, BorshDeserialize, Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Hash)] +pub struct TrieRefcountAddition { /// Hash of trie_node_or_value and part of the DB key. /// Used for uniting with shard id to get actual DB key. trie_node_or_value_hash: CryptoHash, /// DB value. Can be either serialized RawTrieNodeWithSize or value corresponding to /// some TrieKey. trie_node_or_value: Vec, - /// Reference count difference which will be added to the total refcount if it corresponds to - /// insertion and subtracted from it in the case of deletion. + /// Reference count difference which will be added to the total refcount. rc: std::num::NonZeroU32, } -impl TrieRefcountChange { +/// Stores reference count subtraction for some key in DB. +#[derive(BorshSerialize, BorshDeserialize, Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Hash)] +pub struct TrieRefcountSubtraction { + /// Hash of trie_node_or_value and part of the DB key. + /// Used for uniting with shard id to get actual DB key. + trie_node_or_value_hash: CryptoHash, + /// Obsolete field but which we cannot remove because this data is persisted + /// to the database. + _ignored: IgnoredVecU8, + /// Reference count difference which will be subtracted to the total refcount. + rc: std::num::NonZeroU32, +} + +/// Struct that is borsh compatible with Vec but which is logically the unit type. +#[derive(Default, BorshSerialize, BorshDeserialize, Clone, Debug)] +struct IgnoredVecU8 { + _ignored: Vec, +} + +impl PartialEq for IgnoredVecU8 { + fn eq(&self, _other: &Self) -> bool { + true + } +} +impl Eq for IgnoredVecU8 {} +impl Hash for IgnoredVecU8 { + fn hash(&self, _state: &mut H) {} +} +impl PartialOrd for IgnoredVecU8 { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} +impl Ord for IgnoredVecU8 { + fn cmp(&self, _other: &Self) -> std::cmp::Ordering { + std::cmp::Ordering::Equal + } +} + +impl TrieRefcountAddition { pub fn hash(&self) -> &CryptoHash { &self.trie_node_or_value_hash } @@ -386,14 +424,64 @@ impl TrieRefcountChange { pub fn payload(&self) -> &[u8] { self.trie_node_or_value.as_slice() } + + pub fn revert(&self) -> TrieRefcountSubtraction { + TrieRefcountSubtraction::new(self.trie_node_or_value_hash, self.rc) + } } -impl Hash for TrieRefcountChange { - fn hash(&self, state: &mut H) { - state.write(&self.trie_node_or_value_hash.0); - state.write_u32(self.rc.into()); +impl TrieRefcountSubtraction { + pub fn new(trie_node_or_value_hash: CryptoHash, rc: std::num::NonZeroU32) -> Self { + Self { trie_node_or_value_hash, _ignored: Default::default(), rc } } } + +/// Helps produce a list of additions and subtractions to the trie, +/// especially in the case where deletions don't carry the full value. +pub struct TrieRefcountDeltaMap { + map: HashMap>, i32)>, +} + +impl TrieRefcountDeltaMap { + pub fn new() -> Self { + Self { map: HashMap::new() } + } + + pub fn add(&mut self, hash: CryptoHash, data: Vec, refcount: u32) { + let (old_value, old_rc) = self.map.entry(hash).or_insert((None, 0)); + *old_value = Some(data); + *old_rc += refcount as i32; + } + + pub fn subtract(&mut self, hash: CryptoHash, refcount: u32) { + let (_, old_rc) = self.map.entry(hash).or_insert((None, 0)); + *old_rc -= refcount as i32; + } + + pub fn into_changes(self) -> (Vec, Vec) { + let mut insertions = Vec::new(); + let mut deletions = Vec::new(); + for (hash, (value, rc)) in self.map.into_iter() { + if rc > 0 { + insertions.push(TrieRefcountAddition { + trie_node_or_value_hash: hash, + trie_node_or_value: value.expect("value must be present"), + rc: std::num::NonZeroU32::new(rc as u32).unwrap(), + }); + } else if rc < 0 { + deletions.push(TrieRefcountSubtraction::new( + hash, + std::num::NonZeroU32::new((-rc) as u32).unwrap(), + )); + } + } + // Sort so that trie changes have unique representation. + insertions.sort(); + deletions.sort(); + (insertions, deletions) + } +} + /// /// TrieChanges stores delta for refcount. /// Multiple versions of the state work the following way: @@ -421,8 +509,8 @@ impl Hash for TrieRefcountChange { pub struct TrieChanges { pub old_root: StateRoot, pub new_root: StateRoot, - insertions: Vec, - deletions: Vec, + insertions: Vec, + deletions: Vec, } impl TrieChanges { @@ -430,7 +518,7 @@ impl TrieChanges { TrieChanges { old_root, new_root: old_root, insertions: vec![], deletions: vec![] } } - pub fn insertions(&self) -> &[TrieRefcountChange] { + pub fn insertions(&self) -> &[TrieRefcountAddition] { self.insertions.as_slice() } } @@ -598,12 +686,8 @@ impl Trie { ) -> Result<(), StorageError> { match value { ValueHandle::HashAndSize(value) => { - let bytes = self.internal_retrieve_trie_node(&value.hash, true)?; - memory - .refcount_changes - .entry(value.hash) - .or_insert_with(|| (bytes.to_vec(), 0)) - .1 -= 1; + self.internal_retrieve_trie_node(&value.hash, true)?; + memory.refcount_changes.subtract(value.hash, 1); } ValueHandle::InMemory(_) => { // do nothing @@ -941,9 +1025,9 @@ impl Trie { ) -> Result { match self.retrieve_raw_node(hash, true)? { None => Ok(memory.store(TrieNodeWithSize::empty())), - Some((bytes, node)) => { + Some((_, node)) => { let result = memory.store(TrieNodeWithSize::from_raw(node)); - memory.refcount_changes.entry(*hash).or_insert_with(|| (bytes.to_vec(), 0)).1 -= 1; + memory.refcount_changes.subtract(*hash, 1); Ok(result) } } @@ -1213,32 +1297,6 @@ impl Trie { } } - pub(crate) fn convert_to_insertions_and_deletions( - changes: HashMap, i32)>, - ) -> (Vec, Vec) { - let mut deletions = Vec::new(); - let mut insertions = Vec::new(); - for (trie_node_or_value_hash, (trie_node_or_value, rc)) in changes.into_iter() { - if rc > 0 { - insertions.push(TrieRefcountChange { - trie_node_or_value_hash, - trie_node_or_value, - rc: std::num::NonZeroU32::new(rc as u32).unwrap(), - }); - } else if rc < 0 { - deletions.push(TrieRefcountChange { - trie_node_or_value_hash, - trie_node_or_value, - rc: std::num::NonZeroU32::new((-rc) as u32).unwrap(), - }); - } - } - // Sort so that trie changes have unique representation - insertions.sort(); - deletions.sort(); - (insertions, deletions) - } - pub fn update(&self, changes: I) -> Result where I: IntoIterator, Option>)>, @@ -1724,3 +1782,65 @@ mod tests { assert_eq!(trie2.get(b"doge").unwrap().unwrap(), b"coin"); } } + +#[cfg(test)] +mod borsh_compatibility_test { + use borsh::{BorshDeserialize, BorshSerialize}; + use near_primitives::hash::{hash, CryptoHash}; + use near_primitives::types::StateRoot; + + use crate::trie::{TrieRefcountAddition, TrieRefcountSubtraction}; + use crate::TrieChanges; + + #[test] + fn test_trie_changes_compatibility() { + #[derive(BorshSerialize)] + struct LegacyTrieRefcountChange { + trie_node_or_value_hash: CryptoHash, + trie_node_or_value: Vec, + rc: std::num::NonZeroU32, + } + + #[derive(BorshSerialize)] + struct LegacyTrieChanges { + old_root: StateRoot, + new_root: StateRoot, + insertions: Vec, + deletions: Vec, + } + + let changes = LegacyTrieChanges { + old_root: hash(b"a"), + new_root: hash(b"b"), + insertions: vec![LegacyTrieRefcountChange { + trie_node_or_value_hash: hash(b"c"), + trie_node_or_value: b"d".to_vec(), + rc: std::num::NonZeroU32::new(1).unwrap(), + }], + deletions: vec![LegacyTrieRefcountChange { + trie_node_or_value_hash: hash(b"e"), + trie_node_or_value: b"f".to_vec(), + rc: std::num::NonZeroU32::new(2).unwrap(), + }], + }; + + let serialized = borsh::to_vec(&changes).unwrap(); + let deserialized = TrieChanges::try_from_slice(&serialized).unwrap(); + assert_eq!( + deserialized, + TrieChanges { + old_root: hash(b"a"), + new_root: hash(b"b"), + insertions: vec![TrieRefcountAddition { + trie_node_or_value_hash: hash(b"c"), + trie_node_or_value: b"d".to_vec(), + rc: std::num::NonZeroU32::new(1).unwrap(), + }], + deletions: vec![TrieRefcountSubtraction::new( + hash(b"e"), + std::num::NonZeroU32::new(2).unwrap(), + )], + } + ); + } +} diff --git a/core/store/src/trie/shard_tries.rs b/core/store/src/trie/shard_tries.rs index b1336e8603f..48741217cfd 100644 --- a/core/store/src/trie/shard_tries.rs +++ b/core/store/src/trie/shard_tries.rs @@ -2,7 +2,7 @@ use crate::flat::{FlatStorageManager, FlatStorageStatus}; use crate::trie::config::TrieConfig; use crate::trie::prefetching_trie_storage::PrefetchingThreadsHandle; use crate::trie::trie_storage::{TrieCache, TrieCachingStorage}; -use crate::trie::{TrieRefcountChange, POISONED_LOCK_ERR}; +use crate::trie::{TrieRefcountAddition, POISONED_LOCK_ERR}; use crate::{metrics, DBCol, PrefetchApi}; use crate::{Store, StoreUpdate, Trie, TrieChanges, TrieUpdate}; @@ -18,6 +18,7 @@ use std::rc::Rc; use std::sync::{Arc, RwLock}; use super::state_snapshot::{StateSnapshot, StateSnapshotConfig}; +use super::TrieRefcountSubtraction; struct ShardTriesInner { store: Store, @@ -235,12 +236,12 @@ impl ShardTries { fn apply_deletions_inner( &self, - deletions: &[TrieRefcountChange], + deletions: &[TrieRefcountSubtraction], shard_uid: ShardUId, store_update: &mut StoreUpdate, ) { let mut ops = Vec::new(); - for TrieRefcountChange { trie_node_or_value_hash, rc, .. } in deletions.iter() { + for TrieRefcountSubtraction { trie_node_or_value_hash, rc, .. } in deletions.iter() { let key = TrieCachingStorage::get_key_from_shard_uid_and_hash( shard_uid, trie_node_or_value_hash, @@ -254,12 +255,12 @@ impl ShardTries { fn apply_insertions_inner( &self, - insertions: &[TrieRefcountChange], + insertions: &[TrieRefcountAddition], shard_uid: ShardUId, store_update: &mut StoreUpdate, ) { let mut ops = Vec::new(); - for TrieRefcountChange { trie_node_or_value_hash, trie_node_or_value, rc } in + for TrieRefcountAddition { trie_node_or_value_hash, trie_node_or_value, rc } in insertions.iter() { let key = TrieCachingStorage::get_key_from_shard_uid_and_hash( @@ -331,7 +332,11 @@ impl ShardTries { metrics::REVERTED_TRIE_INSERTIONS .with_label_values(&[&shard_id]) .inc_by(trie_changes.insertions.len() as u64); - self.apply_deletions_inner(&trie_changes.insertions, shard_uid, store_update) + self.apply_deletions_inner( + &trie_changes.insertions.iter().map(|insertion| insertion.revert()).collect::>(), + shard_uid, + store_update, + ) } pub fn apply_all( diff --git a/core/store/src/trie/state_parts.rs b/core/store/src/trie/state_parts.rs index 986d8801090..ef7c22c58b1 100644 --- a/core/store/src/trie/state_parts.rs +++ b/core/store/src/trie/state_parts.rs @@ -36,6 +36,8 @@ use std::collections::{HashMap, HashSet}; use std::rc::Rc; use std::sync::Arc; +use super::TrieRefcountDeltaMap; + /// Trie key in nibbles corresponding to the right boundary for the last state part. /// Guaranteed to be bigger than any existing trie key. const LAST_STATE_PART_BOUNDARY: &[u8; 1] = &[16]; @@ -457,12 +459,12 @@ impl Trie { let path_end = trie.find_state_part_boundary(part_id.idx + 1, part_id.total)?; let mut iterator = trie.iter()?; let trie_traversal_items = iterator.visit_nodes_interval(&path_begin, &path_end)?; - let mut map = HashMap::new(); + let mut refcount_changes = TrieRefcountDeltaMap::new(); let mut flat_state_delta = FlatStateChanges::default(); let mut contract_codes = Vec::new(); for TrieTraversalItem { hash, key } in trie_traversal_items { let value = trie.retrieve_value(&hash)?; - map.entry(hash).or_insert_with(|| (value.to_vec(), 0)).1 += 1; + refcount_changes.add(hash, value.to_vec(), 1); if let Some(trie_key) = key { let flat_state_value = FlatStateValue::on_disk(&value); flat_state_delta.insert(trie_key.clone(), Some(flat_state_value)); @@ -471,7 +473,7 @@ impl Trie { } } } - let (insertions, deletions) = Trie::convert_to_insertions_and_deletions(map); + let (insertions, deletions) = refcount_changes.into_changes(); Ok(ApplyStatePartResult { trie_changes: TrieChanges { old_root: Trie::EMPTY_ROOT, @@ -506,6 +508,8 @@ impl Trie { mod tests { use assert_matches::assert_matches; use std::collections::{HashMap, HashSet}; + use std::fmt::Debug; + use std::hash::Hash; use std::sync::Arc; use rand::prelude::ThreadRng; @@ -518,7 +522,9 @@ mod tests { create_tries, create_tries_with_flat_storage, gen_changes, test_populate_trie, }; use crate::trie::iterator::CrumbStatus; - use crate::trie::{TrieRefcountChange, ValueHandle}; + use crate::trie::{ + TrieRefcountAddition, TrieRefcountDeltaMap, TrieRefcountSubtraction, ValueHandle, + }; use super::*; use crate::{DBCol, MissingTrieValueContext, TrieCachingStorage}; @@ -629,7 +635,7 @@ mod tests { })?; let mut insertions = insertions .into_iter() - .map(|(k, (v, rc))| TrieRefcountChange { + .map(|(k, (v, rc))| TrieRefcountAddition { trie_node_or_value_hash: k, trie_node_or_value: v, rc: std::num::NonZeroU32::new(rc).unwrap(), @@ -881,23 +887,20 @@ mod tests { return TrieChanges::empty(Trie::EMPTY_ROOT); } let new_root = changes[0].new_root; - let mut map = HashMap::new(); + let mut map = TrieRefcountDeltaMap::new(); for changes_set in changes { assert!(changes_set.deletions.is_empty(), "state parts only have insertions"); - for TrieRefcountChange { trie_node_or_value_hash, trie_node_or_value, rc } in + for TrieRefcountAddition { trie_node_or_value_hash, trie_node_or_value, rc } in changes_set.insertions { - map.entry(trie_node_or_value_hash).or_insert_with(|| (trie_node_or_value, 0)).1 += - rc.get() as i32; + map.add(trie_node_or_value_hash, trie_node_or_value, rc.get()); } - for TrieRefcountChange { trie_node_or_value_hash, trie_node_or_value, rc } in - changes_set.deletions + for TrieRefcountSubtraction { trie_node_or_value_hash, rc, .. } in changes_set.deletions { - map.entry(trie_node_or_value_hash).or_insert_with(|| (trie_node_or_value, 0)).1 -= - rc.get() as i32; + map.subtract(trie_node_or_value_hash, rc.get()); } } - let (insertions, deletions) = Trie::convert_to_insertions_and_deletions(map); + let (insertions, deletions) = map.into_changes(); TrieChanges { old_root: Default::default(), new_root, insertions, deletions } } @@ -971,10 +974,7 @@ mod tests { } } - fn format_simple_trie_refcount_diff( - left: &[TrieRefcountChange], - right: &[TrieRefcountChange], - ) -> String { + fn format_simple_trie_refcount_diff(left: &[T], right: &[T]) -> String { let left_set: HashSet<_> = HashSet::from_iter(left.iter()); let right_set: HashSet<_> = HashSet::from_iter(right.iter()); format!( diff --git a/core/store/src/trie/trie_tests.rs b/core/store/src/trie/trie_tests.rs index eebdb8d31b1..ff2afc9df27 100644 --- a/core/store/src/trie/trie_tests.rs +++ b/core/store/src/trie/trie_tests.rs @@ -208,7 +208,7 @@ mod trie_storage_tests { use crate::test_utils::{create_test_store, create_tries}; use crate::trie::accounting_cache::TrieAccountingCache; use crate::trie::trie_storage::{TrieCache, TrieCachingStorage, TrieDBStorage}; - use crate::trie::TrieRefcountChange; + use crate::trie::TrieRefcountAddition; use crate::{Store, TrieChanges, TrieConfig}; use assert_matches::assert_matches; use near_primitives::hash::hash; @@ -218,7 +218,7 @@ mod trie_storage_tests { let mut trie_changes = TrieChanges::empty(Trie::EMPTY_ROOT); trie_changes.insertions = values .iter() - .map(|value| TrieRefcountChange { + .map(|value| TrieRefcountAddition { trie_node_or_value_hash: hash(value), trie_node_or_value: value.clone(), rc: std::num::NonZeroU32::new(1).unwrap(),