Skip to content

Commit

Permalink
Reintroduce refactoring to split TrieRefcountChange to TrieRefcountAd…
Browse files Browse the repository at this point in the history
…dition/Subtraction (near#10080)

Previous change near#10006 caused an issue because it made the borsh
encoding of TrieRefcountSubtraction incompatible. This change makes it
compatible by ignoring the value field when deserializing, and when
serializing outputs an empty vector. Also adds a test to check that the
previous encoding is compatible with the new one.
  • Loading branch information
robin-near authored Nov 2, 2023
1 parent 6fb0c39 commit 3fcd356
Show file tree
Hide file tree
Showing 6 changed files with 218 additions and 93 deletions.
8 changes: 6 additions & 2 deletions core/store/src/cold_storage.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::columns::DBKeyType;
use crate::db::{ColdDB, COLD_HEAD_KEY, HEAD_KEY};
use crate::trie::TrieRefcountChange;
use crate::trie::TrieRefcountAddition;
use crate::{metrics, DBCol, DBTransaction, Database, Store, TrieChanges};

use borsh::BorshDeserialize;
Expand Down Expand Up @@ -475,7 +475,11 @@ impl StoreWithCache<'_> {
option_to_not_found(self.get_ser(column, key), format_args!("{:?}: {:?}", column, key))
}

pub fn insert_state_to_cache_from_op(&mut self, op: &TrieRefcountChange, shard_uid_key: &[u8]) {
pub fn insert_state_to_cache_from_op(
&mut self,
op: &TrieRefcountAddition,
shard_uid_key: &[u8],
) {
debug_assert_eq!(
DBCol::State.key_type(),
&[DBKeyType::ShardUId, DBKeyType::TrieNodeOrValueHash]
Expand Down
30 changes: 13 additions & 17 deletions core/store/src/trie/insert_delete.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,30 @@
use std::collections::HashMap;

use borsh::BorshSerialize;

use near_primitives::hash::{hash, CryptoHash};
use near_primitives::state::ValueRef;

use super::TrieRefcountDeltaMap;
use crate::trie::nibble_slice::NibbleSlice;
use crate::trie::{
Children, NodeHandle, RawTrieNode, RawTrieNodeWithSize, StorageHandle, StorageValueHandle,
TrieNode, TrieNodeWithSize, ValueHandle,
};
use crate::{StorageError, Trie, TrieChanges};
use borsh::BorshSerialize;
use near_primitives::hash::{hash, CryptoHash};
use near_primitives::state::ValueRef;

pub(crate) struct NodesStorage {
nodes: Vec<Option<TrieNodeWithSize>>,
values: Vec<Option<Vec<u8>>>,
pub(crate) refcount_changes: HashMap<CryptoHash, (Vec<u8>, i32)>,
pub(crate) refcount_changes: TrieRefcountDeltaMap,
}

const INVALID_STORAGE_HANDLE: &str = "invalid storage handle";

/// Local mutable storage that owns node objects.
impl NodesStorage {
pub fn new() -> NodesStorage {
NodesStorage { nodes: Vec::new(), refcount_changes: HashMap::new(), values: Vec::new() }
NodesStorage {
nodes: Vec::new(),
refcount_changes: TrieRefcountDeltaMap::new(),
values: Vec::new(),
}
}

fn destroy(&mut self, handle: StorageHandle) -> TrieNodeWithSize {
Expand Down Expand Up @@ -604,14 +605,11 @@ impl Trie {
raw_node_with_size.serialize(&mut buffer).unwrap();
let key = hash(&buffer);

let (_value, rc) =
memory.refcount_changes.entry(key).or_insert_with(|| (buffer.clone(), 0));
*rc += 1;
memory.refcount_changes.add(key, buffer.clone(), 1);
buffer.clear();
last_hash = key;
}
let (insertions, deletions) =
Trie::convert_to_insertions_and_deletions(memory.refcount_changes);
let (insertions, deletions) = memory.refcount_changes.into_changes();
Ok(TrieChanges { old_root: *old_root, new_root: last_hash, insertions, deletions })
}

Expand All @@ -621,9 +619,7 @@ impl Trie {
let value = memory.value_ref(value_handle).to_vec();
let value_length = value.len() as u32;
let value_hash = hash(&value);
let (_value, rc) =
memory.refcount_changes.entry(value_hash).or_insert_with(|| (value, 0));
*rc += 1;
memory.refcount_changes.add(value_hash, value, 1);
ValueRef { length: value_length, hash: value_hash }
}
ValueHandle::HashAndSize(value) => value,
Expand Down
216 changes: 168 additions & 48 deletions core/store/src/trie/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub use raw_node::{Children, RawTrieNode, RawTrieNodeWithSize};
use std::cell::RefCell;
use std::collections::HashMap;
use std::fmt::Write;
use std::hash::{Hash, Hasher};
use std::hash::Hash;
use std::rc::Rc;
use std::str;
use std::sync::Arc;
Expand Down Expand Up @@ -364,36 +364,124 @@ pub trait TrieAccess {
fn get(&self, key: &TrieKey) -> Result<Option<Vec<u8>>, StorageError>;
}

/// Stores reference count change for some key-value pair in DB.
#[derive(BorshSerialize, BorshDeserialize, Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
pub struct TrieRefcountChange {
/// Stores reference count addition for some key-value pair in DB.
#[derive(BorshSerialize, BorshDeserialize, Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Hash)]
pub struct TrieRefcountAddition {
/// Hash of trie_node_or_value and part of the DB key.
/// Used for uniting with shard id to get actual DB key.
trie_node_or_value_hash: CryptoHash,
/// DB value. Can be either serialized RawTrieNodeWithSize or value corresponding to
/// some TrieKey.
trie_node_or_value: Vec<u8>,
/// Reference count difference which will be added to the total refcount if it corresponds to
/// insertion and subtracted from it in the case of deletion.
/// Reference count difference which will be added to the total refcount.
rc: std::num::NonZeroU32,
}

impl TrieRefcountChange {
/// Stores reference count subtraction for some key in DB.
#[derive(BorshSerialize, BorshDeserialize, Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Hash)]
pub struct TrieRefcountSubtraction {
/// Hash of trie_node_or_value and part of the DB key.
/// Used for uniting with shard id to get actual DB key.
trie_node_or_value_hash: CryptoHash,
/// Obsolete field but which we cannot remove because this data is persisted
/// to the database.
_ignored: IgnoredVecU8,
/// Reference count difference which will be subtracted to the total refcount.
rc: std::num::NonZeroU32,
}

/// Struct that is borsh compatible with Vec<u8> but which is logically the unit type.
#[derive(Default, BorshSerialize, BorshDeserialize, Clone, Debug)]
struct IgnoredVecU8 {
_ignored: Vec<u8>,
}

impl PartialEq for IgnoredVecU8 {
fn eq(&self, _other: &Self) -> bool {
true
}
}
impl Eq for IgnoredVecU8 {}
impl Hash for IgnoredVecU8 {
fn hash<H: std::hash::Hasher>(&self, _state: &mut H) {}
}
impl PartialOrd for IgnoredVecU8 {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for IgnoredVecU8 {
fn cmp(&self, _other: &Self) -> std::cmp::Ordering {
std::cmp::Ordering::Equal
}
}

impl TrieRefcountAddition {
pub fn hash(&self) -> &CryptoHash {
&self.trie_node_or_value_hash
}

pub fn payload(&self) -> &[u8] {
self.trie_node_or_value.as_slice()
}

pub fn revert(&self) -> TrieRefcountSubtraction {
TrieRefcountSubtraction::new(self.trie_node_or_value_hash, self.rc)
}
}

impl Hash for TrieRefcountChange {
fn hash<H: Hasher>(&self, state: &mut H) {
state.write(&self.trie_node_or_value_hash.0);
state.write_u32(self.rc.into());
impl TrieRefcountSubtraction {
pub fn new(trie_node_or_value_hash: CryptoHash, rc: std::num::NonZeroU32) -> Self {
Self { trie_node_or_value_hash, _ignored: Default::default(), rc }
}
}

/// Helps produce a list of additions and subtractions to the trie,
/// especially in the case where deletions don't carry the full value.
pub struct TrieRefcountDeltaMap {
map: HashMap<CryptoHash, (Option<Vec<u8>>, i32)>,
}

impl TrieRefcountDeltaMap {
pub fn new() -> Self {
Self { map: HashMap::new() }
}

pub fn add(&mut self, hash: CryptoHash, data: Vec<u8>, refcount: u32) {
let (old_value, old_rc) = self.map.entry(hash).or_insert((None, 0));
*old_value = Some(data);
*old_rc += refcount as i32;
}

pub fn subtract(&mut self, hash: CryptoHash, refcount: u32) {
let (_, old_rc) = self.map.entry(hash).or_insert((None, 0));
*old_rc -= refcount as i32;
}

pub fn into_changes(self) -> (Vec<TrieRefcountAddition>, Vec<TrieRefcountSubtraction>) {
let mut insertions = Vec::new();
let mut deletions = Vec::new();
for (hash, (value, rc)) in self.map.into_iter() {
if rc > 0 {
insertions.push(TrieRefcountAddition {
trie_node_or_value_hash: hash,
trie_node_or_value: value.expect("value must be present"),
rc: std::num::NonZeroU32::new(rc as u32).unwrap(),
});
} else if rc < 0 {
deletions.push(TrieRefcountSubtraction::new(
hash,
std::num::NonZeroU32::new((-rc) as u32).unwrap(),
));
}
}
// Sort so that trie changes have unique representation.
insertions.sort();
deletions.sort();
(insertions, deletions)
}
}

///
/// TrieChanges stores delta for refcount.
/// Multiple versions of the state work the following way:
Expand Down Expand Up @@ -421,16 +509,16 @@ impl Hash for TrieRefcountChange {
pub struct TrieChanges {
pub old_root: StateRoot,
pub new_root: StateRoot,
insertions: Vec<TrieRefcountChange>,
deletions: Vec<TrieRefcountChange>,
insertions: Vec<TrieRefcountAddition>,
deletions: Vec<TrieRefcountSubtraction>,
}

impl TrieChanges {
pub fn empty(old_root: StateRoot) -> Self {
TrieChanges { old_root, new_root: old_root, insertions: vec![], deletions: vec![] }
}

pub fn insertions(&self) -> &[TrieRefcountChange] {
pub fn insertions(&self) -> &[TrieRefcountAddition] {
self.insertions.as_slice()
}
}
Expand Down Expand Up @@ -598,12 +686,8 @@ impl Trie {
) -> Result<(), StorageError> {
match value {
ValueHandle::HashAndSize(value) => {
let bytes = self.internal_retrieve_trie_node(&value.hash, true)?;
memory
.refcount_changes
.entry(value.hash)
.or_insert_with(|| (bytes.to_vec(), 0))
.1 -= 1;
self.internal_retrieve_trie_node(&value.hash, true)?;
memory.refcount_changes.subtract(value.hash, 1);
}
ValueHandle::InMemory(_) => {
// do nothing
Expand Down Expand Up @@ -941,9 +1025,9 @@ impl Trie {
) -> Result<StorageHandle, StorageError> {
match self.retrieve_raw_node(hash, true)? {
None => Ok(memory.store(TrieNodeWithSize::empty())),
Some((bytes, node)) => {
Some((_, node)) => {
let result = memory.store(TrieNodeWithSize::from_raw(node));
memory.refcount_changes.entry(*hash).or_insert_with(|| (bytes.to_vec(), 0)).1 -= 1;
memory.refcount_changes.subtract(*hash, 1);
Ok(result)
}
}
Expand Down Expand Up @@ -1213,32 +1297,6 @@ impl Trie {
}
}

pub(crate) fn convert_to_insertions_and_deletions(
changes: HashMap<CryptoHash, (Vec<u8>, i32)>,
) -> (Vec<TrieRefcountChange>, Vec<TrieRefcountChange>) {
let mut deletions = Vec::new();
let mut insertions = Vec::new();
for (trie_node_or_value_hash, (trie_node_or_value, rc)) in changes.into_iter() {
if rc > 0 {
insertions.push(TrieRefcountChange {
trie_node_or_value_hash,
trie_node_or_value,
rc: std::num::NonZeroU32::new(rc as u32).unwrap(),
});
} else if rc < 0 {
deletions.push(TrieRefcountChange {
trie_node_or_value_hash,
trie_node_or_value,
rc: std::num::NonZeroU32::new((-rc) as u32).unwrap(),
});
}
}
// Sort so that trie changes have unique representation
insertions.sort();
deletions.sort();
(insertions, deletions)
}

pub fn update<I>(&self, changes: I) -> Result<TrieChanges, StorageError>
where
I: IntoIterator<Item = (Vec<u8>, Option<Vec<u8>>)>,
Expand Down Expand Up @@ -1724,3 +1782,65 @@ mod tests {
assert_eq!(trie2.get(b"doge").unwrap().unwrap(), b"coin");
}
}

#[cfg(test)]
mod borsh_compatibility_test {
use borsh::{BorshDeserialize, BorshSerialize};
use near_primitives::hash::{hash, CryptoHash};
use near_primitives::types::StateRoot;

use crate::trie::{TrieRefcountAddition, TrieRefcountSubtraction};
use crate::TrieChanges;

#[test]
fn test_trie_changes_compatibility() {
#[derive(BorshSerialize)]
struct LegacyTrieRefcountChange {
trie_node_or_value_hash: CryptoHash,
trie_node_or_value: Vec<u8>,
rc: std::num::NonZeroU32,
}

#[derive(BorshSerialize)]
struct LegacyTrieChanges {
old_root: StateRoot,
new_root: StateRoot,
insertions: Vec<LegacyTrieRefcountChange>,
deletions: Vec<LegacyTrieRefcountChange>,
}

let changes = LegacyTrieChanges {
old_root: hash(b"a"),
new_root: hash(b"b"),
insertions: vec![LegacyTrieRefcountChange {
trie_node_or_value_hash: hash(b"c"),
trie_node_or_value: b"d".to_vec(),
rc: std::num::NonZeroU32::new(1).unwrap(),
}],
deletions: vec![LegacyTrieRefcountChange {
trie_node_or_value_hash: hash(b"e"),
trie_node_or_value: b"f".to_vec(),
rc: std::num::NonZeroU32::new(2).unwrap(),
}],
};

let serialized = borsh::to_vec(&changes).unwrap();
let deserialized = TrieChanges::try_from_slice(&serialized).unwrap();
assert_eq!(
deserialized,
TrieChanges {
old_root: hash(b"a"),
new_root: hash(b"b"),
insertions: vec![TrieRefcountAddition {
trie_node_or_value_hash: hash(b"c"),
trie_node_or_value: b"d".to_vec(),
rc: std::num::NonZeroU32::new(1).unwrap(),
}],
deletions: vec![TrieRefcountSubtraction::new(
hash(b"e"),
std::num::NonZeroU32::new(2).unwrap(),
)],
}
);
}
}
Loading

0 comments on commit 3fcd356

Please sign in to comment.