From ac54aad05bcbab82931d1431eca308d1f81cba6d Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Sat, 25 Nov 2023 20:01:54 +0300 Subject: [PATCH 1/3] Oh no this might be dumb --- Cargo.toml | 3 ++ src/leaf.rs | 5 +- src/list.rs | 2 +- src/packed_leaf.rs | 30 +++++++++-- src/tests/proptest/operations.rs | 4 +- src/tests/repeat.rs | 2 +- src/tests/size_of.rs | 2 +- src/tree.rs | 85 ++++++++++++++++++++++++++++---- src/utils.rs | 2 +- src/vector.rs | 2 +- 10 files changed, 114 insertions(+), 23 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d0a21a9..17bc19c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,9 @@ vec_map = "0.8.2" smallvec = "1.8.0" arbitrary = { version = "1.2.3", features = ["derive"] } ethereum-types = { version = "0.14.1", features = ["arbitrary"] } +tokio = { version = "1", features = ["sync", "rt"] } +futures = "0.3.29" +async-recursion = "1.0.0" [dev-dependencies] ssz_types = "0.5.0" diff --git a/src/leaf.rs b/src/leaf.rs index bd64bfe..771666a 100644 --- a/src/leaf.rs +++ b/src/leaf.rs @@ -4,7 +4,7 @@ use crate::{ }; use arbitrary::Arbitrary; use derivative::Derivative; -use parking_lot::RwLock; +use tokio::sync::RwLock; use tree_hash::Hash256; #[derive(Debug, Derivative, Arbitrary)] @@ -23,7 +23,8 @@ where { fn clone(&self) -> Self { Self { - hash: RwLock::new(*self.hash.read()), + // FIXME(sproul): this is really annoying + hash: RwLock::new(Hash256::zero()), value: self.value.clone(), } } diff --git a/src/list.rs b/src/list.rs index 1d449a0..f7cdf49 100644 --- a/src/list.rs +++ b/src/list.rs @@ -271,7 +271,7 @@ impl Default for List { } } -impl TreeHash for List { +impl TreeHash for List { fn tree_hash_type() -> tree_hash::TreeHashType { tree_hash::TreeHashType::List } diff --git a/src/packed_leaf.rs b/src/packed_leaf.rs index b0f3770..8cd29fd 100644 --- a/src/packed_leaf.rs +++ b/src/packed_leaf.rs @@ -1,8 +1,8 @@ use crate::{utils::arb_rwlock, Error, UpdateMap}; use arbitrary::Arbitrary; use derivative::Derivative; -use parking_lot::RwLock; use std::ops::ControlFlow; +use tokio::sync::RwLock; use tree_hash::{Hash256, TreeHash, BYTES_PER_CHUNK}; #[derive(Debug, Derivative, Arbitrary)] @@ -20,7 +20,8 @@ where { fn clone(&self) -> Self { Self { - hash: RwLock::new(*self.hash.read()), + // FIXME(sproul): perf implications of this might be bad + hash: RwLock::new(Hash256::zero()), values: self.values.clone(), } } @@ -28,7 +29,28 @@ where impl PackedLeaf { pub fn tree_hash(&self) -> Hash256 { - let read_lock = self.hash.read(); + let read_lock = self.hash.blocking_read(); + let mut hash = *read_lock; + drop(read_lock); + + if !hash.is_zero() { + return hash; + } + + let hash_bytes = hash.as_bytes_mut(); + + let value_len = BYTES_PER_CHUNK / T::tree_hash_packing_factor(); + for (i, value) in self.values.iter().enumerate() { + hash_bytes[i * value_len..(i + 1) * value_len] + .copy_from_slice(&value.tree_hash_packed_encoding()); + } + + *self.hash.blocking_write() = hash; + hash + } + + pub async fn async_tree_hash(&self) -> Hash256 { + let read_lock = self.hash.read().await; let mut hash = *read_lock; drop(read_lock); @@ -44,7 +66,7 @@ impl PackedLeaf { .copy_from_slice(&value.tree_hash_packed_encoding()); } - *self.hash.write() = hash; + *self.hash.write().await = hash; hash } diff --git a/src/tests/proptest/operations.rs b/src/tests/proptest/operations.rs index f882ddd..1779030 100644 --- a/src/tests/proptest/operations.rs +++ b/src/tests/proptest/operations.rs @@ -157,7 +157,7 @@ where fn apply_ops_list(list: &mut List, spec: &mut Spec, ops: Vec>) where - T: Value + Debug + Send + Sync, + T: Value + Debug + Send + Sync + 'static, N: Unsigned + Debug, { let mut checkpoint = list.clone(); @@ -235,7 +235,7 @@ where fn apply_ops_vect(vect: &mut Vector, spec: &mut Spec, ops: Vec>) where - T: Value + Debug + Send + Sync, + T: Value + Debug + Send + Sync + 'static, N: Unsigned + Debug, { let mut checkpoint = vect.clone(); diff --git a/src/tests/repeat.rs b/src/tests/repeat.rs index 552041f..07cbc06 100644 --- a/src/tests/repeat.rs +++ b/src/tests/repeat.rs @@ -3,7 +3,7 @@ use std::fmt::Debug; use tree_hash::TreeHash; use typenum::{Unsigned, U1024, U64, U8}; -fn list_test(val: T) { +fn list_test(val: T) { for n in 96..=N::to_usize() { let fast = List::::repeat(val.clone(), n).unwrap(); let slow = List::::repeat_slow(val.clone(), n).unwrap(); diff --git a/src/tests/size_of.rs b/src/tests/size_of.rs index 00e0b9d..9c0f60f 100644 --- a/src/tests/size_of.rs +++ b/src/tests/size_of.rs @@ -1,6 +1,6 @@ use crate::{Arc, Leaf, PackedLeaf, Tree}; -use parking_lot::RwLock; use std::mem::size_of; +use tokio::sync::RwLock; use tree_hash::Hash256; /// It's important that the Tree nodes have a predictable size. diff --git a/src/tree.rs b/src/tree.rs index 77bb09e..ed807eb 100644 --- a/src/tree.rs +++ b/src/tree.rs @@ -1,11 +1,12 @@ use crate::utils::{arb_arc, arb_rwlock, opt_hash, opt_packing_depth, opt_packing_factor, Length}; use crate::{Arc, Error, Leaf, PackedLeaf, UpdateMap, Value}; use arbitrary::Arbitrary; +use async_recursion::async_recursion; use derivative::Derivative; use ethereum_hashing::{hash32_concat, ZERO_HASHES}; -use parking_lot::RwLock; use std::collections::BTreeMap; use std::ops::ControlFlow; +use tokio::sync::RwLock; use tree_hash::Hash256; #[derive(Debug, Derivative, Arbitrary)] @@ -28,8 +29,13 @@ pub enum Tree { impl Clone for Tree { fn clone(&self) -> Self { match self { - Self::Node { hash, left, right } => Self::Node { - hash: RwLock::new(*hash.read()), + // FIXME(sproul): noooooooo + Self::Node { + hash: _, + left, + right, + } => Self::Node { + hash: RwLock::new(Hash256::zero()), left: left.clone(), right: right.clone(), }, @@ -287,8 +293,8 @@ impl Tree { ) if full_depth > 0 => { use RebaseAction::*; - let orig_hash = *orig_hash_lock.read(); - let base_hash = *base_hash_lock.read(); + let orig_hash = *orig_hash_lock.blocking_read(); + let base_hash = *base_hash_lock.blocking_read(); // If hashes *and* lengths are equal then we can short-cut the recursion // and immediately replace `orig` by the `base` node. If `lengths` are `None` @@ -391,12 +397,12 @@ impl Tree { } } -impl Tree { +impl Tree { pub fn tree_hash(&self) -> Hash256 { match self { Self::Leaf(Leaf { hash, value }) => { // FIXME(sproul): upgradeable RwLock? - let read_lock = hash.read(); + let read_lock = hash.blocking_read(); let existing_hash = *read_lock; drop(read_lock); @@ -411,14 +417,14 @@ impl Tree { existing_hash } else { let tree_hash = value.tree_hash_root(); - *hash.write() = tree_hash; + *hash.blocking_write() = tree_hash; tree_hash } } Self::PackedLeaf(leaf) => leaf.tree_hash(), Self::Zero(depth) => Hash256::from_slice(&ZERO_HASHES[*depth]), Self::Node { hash, left, right } => { - let read_lock = hash.read(); + let read_lock = hash.blocking_read(); let existing_hash = *read_lock; drop(read_lock); @@ -430,7 +436,66 @@ impl Tree { rayon::join(|| left.tree_hash(), || right.tree_hash()); let tree_hash = Hash256::from(hash32_concat(left_hash.as_bytes(), right_hash.as_bytes())); - *hash.write() = tree_hash; + *hash.blocking_write() = tree_hash; + tree_hash + } + } + } + } + + #[async_recursion] + pub async fn async_tree_hash(&self) -> Hash256 { + match self { + Self::Leaf(Leaf { hash, value }) => { + // FIXME(sproul): upgradeable RwLock? + let read_lock = hash.read().await; + let existing_hash = *read_lock; + drop(read_lock); + + // NOTE: We re-compute the hash whenever it is non-zero. Computed hashes may + // legitimately be zero, but this only occurs at the leaf level when the value is + // entirely zeroes (e.g. [0u64, 0, 0, 0]). In order to avoid storing an + // `Option` we choose to re-compute the hash in this case. In practice + // this is unlikely to provide any performance penalty except at very small list + // lengths (<= 32), because a node higher in the tree will cache a non-zero hash + // preventing its children from being visited more than once. + if !existing_hash.is_zero() { + existing_hash + } else { + let tree_hash = value.tree_hash_root(); + *hash.write().await = tree_hash; + tree_hash + } + } + Self::PackedLeaf(leaf) => leaf.tree_hash(), + Self::Zero(depth) => Hash256::from_slice(&ZERO_HASHES[*depth]), + Self::Node { hash, left, right } => { + let read_lock = hash.read().await; + let existing_hash = *read_lock; + drop(read_lock); + + if !existing_hash.is_zero() { + existing_hash + } else { + // Parallelism goes brrrr. + let left_clone = left.clone(); + let right_clone = right.clone(); + let (left_res, right_res) = futures::future::join( + async move { + tokio::task::spawn(async move { left_clone.async_tree_hash().await }) + .await + }, + async move { + tokio::task::spawn(async move { right_clone.async_tree_hash().await }) + .await + }, + ) + .await; + let left_hash = left_res.unwrap(); + let right_hash = right_res.unwrap(); + let tree_hash = + Hash256::from(hash32_concat(left_hash.as_bytes(), right_hash.as_bytes())); + *hash.write().await = tree_hash; tree_hash } } diff --git a/src/utils.rs b/src/utils.rs index e0aa78a..2b34041 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,7 +1,7 @@ use crate::{Arc, UpdateMap}; use arbitrary::Arbitrary; -use parking_lot::RwLock; use std::collections::BTreeMap; +use tokio::sync::RwLock; use tree_hash::{Hash256, TreeHash, TreeHashType}; /// Length type, to avoid confusion with depth and other `usize` parameters. diff --git a/src/vector.rs b/src/vector.rs index 7316bff..5263d06 100644 --- a/src/vector.rs +++ b/src/vector.rs @@ -245,7 +245,7 @@ impl Default for Vector { } } -impl tree_hash::TreeHash for Vector { +impl tree_hash::TreeHash for Vector { fn tree_hash_type() -> tree_hash::TreeHashType { tree_hash::TreeHashType::Vector } From a169794162a230a11eaf290617abf6adabb86c37 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Sun, 26 Nov 2023 14:17:12 +1100 Subject: [PATCH 2/3] Benchmark + optimise --- Cargo.toml | 2 +- benches/tree_hash_root.rs | 13 +++++++++++++ src/list.rs | 10 ++++++++++ src/tree.rs | 37 +++++++++++++++++++++++++++++++------ 4 files changed, 55 insertions(+), 7 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 17bc19c..2fe9687 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,7 @@ vec_map = "0.8.2" smallvec = "1.8.0" arbitrary = { version = "1.2.3", features = ["derive"] } ethereum-types = { version = "0.14.1", features = ["arbitrary"] } -tokio = { version = "1", features = ["sync", "rt"] } +tokio = { version = "1", features = ["sync", "rt", "rt-multi-thread"] } futures = "0.3.29" async-recursion = "1.0.0" diff --git a/benches/tree_hash_root.rs b/benches/tree_hash_root.rs index 940f249..2ac7ab2 100644 --- a/benches/tree_hash_root.rs +++ b/benches/tree_hash_root.rs @@ -1,6 +1,7 @@ use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; use milhouse::{List, Vector}; use ssz_types::VariableList; +use tokio::runtime::Runtime; use tree_hash::TreeHash; type C = typenum::U1099511627776; @@ -21,6 +22,18 @@ pub fn tree_hash_root(c: &mut Criterion) { }, ); + let rt = Runtime::new().unwrap(); + c.bench_with_input( + BenchmarkId::new("async_tree_hash_root_list", size), + &(&rt, size), + |b, &(rt, size)| { + b.iter(|| { + let l1 = List::::try_from_iter(0..size).unwrap(); + rt.block_on(l1.async_tree_hash_root()) + }); + }, + ); + c.bench_with_input( BenchmarkId::new("tree_hash_root_vector", size), &size, diff --git a/src/list.rs b/src/list.rs index f7cdf49..747903c 100644 --- a/src/list.rs +++ b/src/list.rs @@ -293,6 +293,16 @@ impl TreeHash for List { } } +impl List { + pub async fn async_tree_hash_root(&self) -> Hash256 { + // FIXME(sproul): remove assert + assert!(!self.interface.has_pending_updates()); + + let root = self.interface.backing.tree.async_tree_hash().await; + tree_hash::mix_in_length(&root, self.len()) + } +} + impl<'a, T: Value, N: Unsigned, U: UpdateMap> IntoIterator for &'a List { type Item = &'a T; type IntoIter = InterfaceIter<'a, T, U>; diff --git a/src/tree.rs b/src/tree.rs index ed807eb..bd6d1cb 100644 --- a/src/tree.rs +++ b/src/tree.rs @@ -6,7 +6,7 @@ use derivative::Derivative; use ethereum_hashing::{hash32_concat, ZERO_HASHES}; use std::collections::BTreeMap; use std::ops::ControlFlow; -use tokio::sync::RwLock; +use tokio::{runtime::Handle, sync::RwLock}; use tree_hash::Hash256; #[derive(Debug, Derivative, Arbitrary)] @@ -467,9 +467,11 @@ impl Tree { tree_hash } } - Self::PackedLeaf(leaf) => leaf.tree_hash(), + Self::PackedLeaf(leaf) => leaf.async_tree_hash().await, Self::Zero(depth) => Hash256::from_slice(&ZERO_HASHES[*depth]), Self::Node { hash, left, right } => { + const MAX_QUEUE_DEPTH: usize = 4; + let read_lock = hash.read().await; let existing_hash = *read_lock; drop(read_lock); @@ -478,16 +480,39 @@ impl Tree { existing_hash } else { // Parallelism goes brrrr. + let rt_metrics = Handle::current().metrics(); + let num_workers = rt_metrics.num_workers(); + let max_queue_depth = (0..num_workers) + .map(|i| rt_metrics.worker_local_queue_depth(i)) + .max() + .unwrap(); + let left_clone = left.clone(); let right_clone = right.clone(); let (left_res, right_res) = futures::future::join( - async move { - tokio::task::spawn(async move { left_clone.async_tree_hash().await }) + async { + if max_queue_depth >= MAX_QUEUE_DEPTH { + // Runtime is busy, use the current thread. + Ok(left_clone.async_tree_hash().await) + } else { + // Runtime has some spare capacity, use new task. + tokio::task::spawn( + async move { left_clone.async_tree_hash().await }, + ) .await + } }, - async move { - tokio::task::spawn(async move { right_clone.async_tree_hash().await }) + async { + if max_queue_depth >= MAX_QUEUE_DEPTH { + // Runtime is busy, use the current thread. + Ok(right_clone.async_tree_hash().await) + } else { + // Runtime has some spare capacity, use new task. + tokio::task::spawn( + async move { right_clone.async_tree_hash().await }, + ) .await + } }, ) .await; From ccaed7bb2902d28d85f7ddeec4999b65473ad4a8 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Sun, 26 Nov 2023 17:57:40 +1100 Subject: [PATCH 3/3] Delete unnecessary Clone impls, tweak benches --- benches/tree_hash_root.rs | 9 +++++++-- src/leaf.rs | 13 ------------- src/packed_leaf.rs | 13 ------------- src/tree.rs | 28 ++++------------------------ 4 files changed, 11 insertions(+), 52 deletions(-) diff --git a/benches/tree_hash_root.rs b/benches/tree_hash_root.rs index 2ac7ab2..b06a0a8 100644 --- a/benches/tree_hash_root.rs +++ b/benches/tree_hash_root.rs @@ -1,7 +1,7 @@ use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; use milhouse::{List, Vector}; use ssz_types::VariableList; -use tokio::runtime::Runtime; +use tokio::runtime::Builder; use tree_hash::TreeHash; type C = typenum::U1099511627776; @@ -22,7 +22,12 @@ pub fn tree_hash_root(c: &mut Criterion) { }, ); - let rt = Runtime::new().unwrap(); + // Tweaking the intervals doesn't really make much difference (a few percent). + let rt = Builder::new_multi_thread() + .global_queue_interval(1_000_000) + .event_interval(1_000_000) + .build() + .unwrap(); c.bench_with_input( BenchmarkId::new("async_tree_hash_root_list", size), &(&rt, size), diff --git a/src/leaf.rs b/src/leaf.rs index 771666a..88cb784 100644 --- a/src/leaf.rs +++ b/src/leaf.rs @@ -17,19 +17,6 @@ pub struct Leaf { pub value: Arc, } -impl Clone for Leaf -where - T: Clone, -{ - fn clone(&self) -> Self { - Self { - // FIXME(sproul): this is really annoying - hash: RwLock::new(Hash256::zero()), - value: self.value.clone(), - } - } -} - impl Leaf { pub fn new(value: T) -> Self { Self::with_hash(value, Hash256::zero()) diff --git a/src/packed_leaf.rs b/src/packed_leaf.rs index 8cd29fd..43a0cb9 100644 --- a/src/packed_leaf.rs +++ b/src/packed_leaf.rs @@ -14,19 +14,6 @@ pub struct PackedLeaf { pub(crate) values: Vec, } -impl Clone for PackedLeaf -where - T: TreeHash + Clone, -{ - fn clone(&self) -> Self { - Self { - // FIXME(sproul): perf implications of this might be bad - hash: RwLock::new(Hash256::zero()), - values: self.values.clone(), - } - } -} - impl PackedLeaf { pub fn tree_hash(&self) -> Hash256 { let read_lock = self.hash.blocking_read(); diff --git a/src/tree.rs b/src/tree.rs index bd6d1cb..27fa04c 100644 --- a/src/tree.rs +++ b/src/tree.rs @@ -26,26 +26,6 @@ pub enum Tree { Zero(usize), } -impl Clone for Tree { - fn clone(&self) -> Self { - match self { - // FIXME(sproul): noooooooo - Self::Node { - hash: _, - left, - right, - } => Self::Node { - hash: RwLock::new(Hash256::zero()), - left: left.clone(), - right: right.clone(), - }, - Self::Leaf(leaf) => Self::Leaf(leaf.clone()), - Self::PackedLeaf(leaf) => Self::PackedLeaf(leaf.clone()), - Self::Zero(depth) => Self::Zero(*depth), - } - } -} - impl Tree { pub fn empty(depth: usize) -> Arc { Self::zero(depth) @@ -487,15 +467,14 @@ impl Tree { .max() .unwrap(); - let left_clone = left.clone(); - let right_clone = right.clone(); let (left_res, right_res) = futures::future::join( async { if max_queue_depth >= MAX_QUEUE_DEPTH { // Runtime is busy, use the current thread. - Ok(left_clone.async_tree_hash().await) + Ok(left.async_tree_hash().await) } else { // Runtime has some spare capacity, use new task. + let left_clone = left.clone(); tokio::task::spawn( async move { left_clone.async_tree_hash().await }, ) @@ -505,9 +484,10 @@ impl Tree { async { if max_queue_depth >= MAX_QUEUE_DEPTH { // Runtime is busy, use the current thread. - Ok(right_clone.async_tree_hash().await) + Ok(right.async_tree_hash().await) } else { // Runtime has some spare capacity, use new task. + let right_clone = right.clone(); tokio::task::spawn( async move { right_clone.async_tree_hash().await }, )