Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Do not merge] Benchmark Tokio executor vs Rayon #31

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ vec_map = "0.8.2"
smallvec = "1.8.0"
arbitrary = { version = "1.2.3", features = ["derive"] }
ethereum-types = { version = "0.14.1", features = ["arbitrary"] }
tokio = { version = "1", features = ["sync", "rt", "rt-multi-thread"] }
futures = "0.3.29"
async-recursion = "1.0.0"

[dev-dependencies]
ssz_types = "0.5.0"
Expand Down
18 changes: 18 additions & 0 deletions benches/tree_hash_root.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use milhouse::{List, Vector};
use ssz_types::VariableList;
use tokio::runtime::Builder;
use tree_hash::TreeHash;

type C = typenum::U1099511627776;
Expand All @@ -21,6 +22,23 @@ pub fn tree_hash_root(c: &mut Criterion) {
},
);

// Tweaking the intervals doesn't really make much difference (a few percent).
let rt = Builder::new_multi_thread()
.global_queue_interval(1_000_000)
.event_interval(1_000_000)
.build()
.unwrap();
c.bench_with_input(
BenchmarkId::new("async_tree_hash_root_list", size),
&(&rt, size),
|b, &(rt, size)| {
b.iter(|| {
let l1 = List::<u64, C>::try_from_iter(0..size).unwrap();
rt.block_on(l1.async_tree_hash_root())
});
},
);

c.bench_with_input(
BenchmarkId::new("tree_hash_root_vector", size),
&size,
Expand Down
14 changes: 1 addition & 13 deletions src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
};
use arbitrary::Arbitrary;
use derivative::Derivative;
use parking_lot::RwLock;
use tokio::sync::RwLock;
use tree_hash::Hash256;

#[derive(Debug, Derivative, Arbitrary)]
Expand All @@ -17,18 +17,6 @@ pub struct Leaf<T> {
pub value: Arc<T>,
}

impl<T> Clone for Leaf<T>
where
T: Clone,
{
fn clone(&self) -> Self {
Self {
hash: RwLock::new(*self.hash.read()),
value: self.value.clone(),
}
}
}

impl<T> Leaf<T> {
pub fn new(value: T) -> Self {
Self::with_hash(value, Hash256::zero())
Expand Down
12 changes: 11 additions & 1 deletion src/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ impl<T: Value, N: Unsigned> Default for List<T, N> {
}
}

impl<T: Value + Send + Sync, N: Unsigned> TreeHash for List<T, N> {
impl<T: Value + Send + Sync + 'static, N: Unsigned> TreeHash for List<T, N> {
fn tree_hash_type() -> tree_hash::TreeHashType {
tree_hash::TreeHashType::List
}
Expand All @@ -293,6 +293,16 @@ impl<T: Value + Send + Sync, N: Unsigned> TreeHash for List<T, N> {
}
}

impl<T: Value + Send + Sync + 'static, N: Unsigned> List<T, N> {
pub async fn async_tree_hash_root(&self) -> Hash256 {
// FIXME(sproul): remove assert
assert!(!self.interface.has_pending_updates());

let root = self.interface.backing.tree.async_tree_hash().await;
tree_hash::mix_in_length(&root, self.len())
}
}

impl<'a, T: Value, N: Unsigned, U: UpdateMap<T>> IntoIterator for &'a List<T, N, U> {
type Item = &'a T;
type IntoIter = InterfaceIter<'a, T, U>;
Expand Down
37 changes: 23 additions & 14 deletions src/packed_leaf.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::{utils::arb_rwlock, Error, UpdateMap};
use arbitrary::Arbitrary;
use derivative::Derivative;
use parking_lot::RwLock;
use std::ops::ControlFlow;
use tokio::sync::RwLock;
use tree_hash::{Hash256, TreeHash, BYTES_PER_CHUNK};

#[derive(Debug, Derivative, Arbitrary)]
Expand All @@ -14,21 +14,30 @@ pub struct PackedLeaf<T: TreeHash + Clone> {
pub(crate) values: Vec<T>,
}

impl<T> Clone for PackedLeaf<T>
where
T: TreeHash + Clone,
{
fn clone(&self) -> Self {
Self {
hash: RwLock::new(*self.hash.read()),
values: self.values.clone(),
impl<T: TreeHash + Clone> PackedLeaf<T> {
pub fn tree_hash(&self) -> Hash256 {
let read_lock = self.hash.blocking_read();
let mut hash = *read_lock;
drop(read_lock);

if !hash.is_zero() {
return hash;
}

let hash_bytes = hash.as_bytes_mut();

let value_len = BYTES_PER_CHUNK / T::tree_hash_packing_factor();
for (i, value) in self.values.iter().enumerate() {
hash_bytes[i * value_len..(i + 1) * value_len]
.copy_from_slice(&value.tree_hash_packed_encoding());
}

*self.hash.blocking_write() = hash;
hash
}
}

impl<T: TreeHash + Clone> PackedLeaf<T> {
pub fn tree_hash(&self) -> Hash256 {
let read_lock = self.hash.read();
pub async fn async_tree_hash(&self) -> Hash256 {
let read_lock = self.hash.read().await;
let mut hash = *read_lock;
drop(read_lock);

Expand All @@ -44,7 +53,7 @@ impl<T: TreeHash + Clone> PackedLeaf<T> {
.copy_from_slice(&value.tree_hash_packed_encoding());
}

*self.hash.write() = hash;
*self.hash.write().await = hash;
hash
}

Expand Down
4 changes: 2 additions & 2 deletions src/tests/proptest/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ where

fn apply_ops_list<T, N>(list: &mut List<T, N>, spec: &mut Spec<T, N>, ops: Vec<Op<T>>)
where
T: Value + Debug + Send + Sync,
T: Value + Debug + Send + Sync + 'static,
N: Unsigned + Debug,
{
let mut checkpoint = list.clone();
Expand Down Expand Up @@ -235,7 +235,7 @@ where

fn apply_ops_vect<T, N>(vect: &mut Vector<T, N>, spec: &mut Spec<T, N>, ops: Vec<Op<T>>)
where
T: Value + Debug + Send + Sync,
T: Value + Debug + Send + Sync + 'static,
N: Unsigned + Debug,
{
let mut checkpoint = vect.clone();
Expand Down
2 changes: 1 addition & 1 deletion src/tests/repeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::fmt::Debug;
use tree_hash::TreeHash;
use typenum::{Unsigned, U1024, U64, U8};

fn list_test<T: Value + Send + Sync + Debug, N: Unsigned + Debug>(val: T) {
fn list_test<T: Value + Send + Sync + Debug + 'static, N: Unsigned + Debug>(val: T) {
for n in 96..=N::to_usize() {
let fast = List::<T, N>::repeat(val.clone(), n).unwrap();
let slow = List::<T, N>::repeat_slow(val.clone(), n).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/tests/size_of.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{Arc, Leaf, PackedLeaf, Tree};
use parking_lot::RwLock;
use std::mem::size_of;
use tokio::sync::RwLock;
use tree_hash::Hash256;

/// It's important that the Tree nodes have a predictable size.
Expand Down
116 changes: 93 additions & 23 deletions src/tree.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use crate::utils::{arb_arc, arb_rwlock, opt_hash, opt_packing_depth, opt_packing_factor, Length};
use crate::{Arc, Error, Leaf, PackedLeaf, UpdateMap, Value};
use arbitrary::Arbitrary;
use async_recursion::async_recursion;
use derivative::Derivative;
use ethereum_hashing::{hash32_concat, ZERO_HASHES};
use parking_lot::RwLock;
use std::collections::BTreeMap;
use std::ops::ControlFlow;
use tokio::{runtime::Handle, sync::RwLock};
use tree_hash::Hash256;

#[derive(Debug, Derivative, Arbitrary)]
Expand All @@ -25,21 +26,6 @@ pub enum Tree<T: Value> {
Zero(usize),
}

impl<T: Value> Clone for Tree<T> {
fn clone(&self) -> Self {
match self {
Self::Node { hash, left, right } => Self::Node {
hash: RwLock::new(*hash.read()),
left: left.clone(),
right: right.clone(),
},
Self::Leaf(leaf) => Self::Leaf(leaf.clone()),
Self::PackedLeaf(leaf) => Self::PackedLeaf(leaf.clone()),
Self::Zero(depth) => Self::Zero(*depth),
}
}
}

impl<T: Value> Tree<T> {
pub fn empty(depth: usize) -> Arc<Self> {
Self::zero(depth)
Expand Down Expand Up @@ -287,8 +273,8 @@ impl<T: Value> Tree<T> {
) if full_depth > 0 => {
use RebaseAction::*;

let orig_hash = *orig_hash_lock.read();
let base_hash = *base_hash_lock.read();
let orig_hash = *orig_hash_lock.blocking_read();
let base_hash = *base_hash_lock.blocking_read();

// If hashes *and* lengths are equal then we can short-cut the recursion
// and immediately replace `orig` by the `base` node. If `lengths` are `None`
Expand Down Expand Up @@ -391,12 +377,12 @@ impl<T: Value> Tree<T> {
}
}

impl<T: Value + Send + Sync> Tree<T> {
impl<T: Value + Send + Sync + 'static> Tree<T> {
pub fn tree_hash(&self) -> Hash256 {
match self {
Self::Leaf(Leaf { hash, value }) => {
// FIXME(sproul): upgradeable RwLock?
let read_lock = hash.read();
let read_lock = hash.blocking_read();
let existing_hash = *read_lock;
drop(read_lock);

Expand All @@ -411,14 +397,14 @@ impl<T: Value + Send + Sync> Tree<T> {
existing_hash
} else {
let tree_hash = value.tree_hash_root();
*hash.write() = tree_hash;
*hash.blocking_write() = tree_hash;
tree_hash
}
}
Self::PackedLeaf(leaf) => leaf.tree_hash(),
Self::Zero(depth) => Hash256::from_slice(&ZERO_HASHES[*depth]),
Self::Node { hash, left, right } => {
let read_lock = hash.read();
let read_lock = hash.blocking_read();
let existing_hash = *read_lock;
drop(read_lock);

Expand All @@ -430,7 +416,91 @@ impl<T: Value + Send + Sync> Tree<T> {
rayon::join(|| left.tree_hash(), || right.tree_hash());
let tree_hash =
Hash256::from(hash32_concat(left_hash.as_bytes(), right_hash.as_bytes()));
*hash.write() = tree_hash;
*hash.blocking_write() = tree_hash;
tree_hash
}
}
}
}

#[async_recursion]
pub async fn async_tree_hash(&self) -> Hash256 {
match self {
Self::Leaf(Leaf { hash, value }) => {
// FIXME(sproul): upgradeable RwLock?
let read_lock = hash.read().await;
let existing_hash = *read_lock;
drop(read_lock);

// NOTE: We re-compute the hash whenever it is non-zero. Computed hashes may
// legitimately be zero, but this only occurs at the leaf level when the value is
// entirely zeroes (e.g. [0u64, 0, 0, 0]). In order to avoid storing an
// `Option<Hash256>` we choose to re-compute the hash in this case. In practice
// this is unlikely to provide any performance penalty except at very small list
// lengths (<= 32), because a node higher in the tree will cache a non-zero hash
// preventing its children from being visited more than once.
if !existing_hash.is_zero() {
existing_hash
} else {
let tree_hash = value.tree_hash_root();
*hash.write().await = tree_hash;
tree_hash
}
}
Self::PackedLeaf(leaf) => leaf.async_tree_hash().await,
Self::Zero(depth) => Hash256::from_slice(&ZERO_HASHES[*depth]),
Self::Node { hash, left, right } => {
const MAX_QUEUE_DEPTH: usize = 4;

let read_lock = hash.read().await;
let existing_hash = *read_lock;
drop(read_lock);

if !existing_hash.is_zero() {
existing_hash
} else {
// Parallelism goes brrrr.
let rt_metrics = Handle::current().metrics();
let num_workers = rt_metrics.num_workers();
let max_queue_depth = (0..num_workers)
.map(|i| rt_metrics.worker_local_queue_depth(i))
.max()
.unwrap();

let (left_res, right_res) = futures::future::join(
async {
if max_queue_depth >= MAX_QUEUE_DEPTH {
// Runtime is busy, use the current thread.
Ok(left.async_tree_hash().await)
} else {
// Runtime has some spare capacity, use new task.
let left_clone = left.clone();
tokio::task::spawn(
async move { left_clone.async_tree_hash().await },
)
.await
}
},
async {
if max_queue_depth >= MAX_QUEUE_DEPTH {
// Runtime is busy, use the current thread.
Ok(right.async_tree_hash().await)
} else {
// Runtime has some spare capacity, use new task.
let right_clone = right.clone();
tokio::task::spawn(
async move { right_clone.async_tree_hash().await },
)
.await
}
},
)
.await;
let left_hash = left_res.unwrap();
let right_hash = right_res.unwrap();
let tree_hash =
Hash256::from(hash32_concat(left_hash.as_bytes(), right_hash.as_bytes()));
*hash.write().await = tree_hash;
tree_hash
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{Arc, UpdateMap};
use arbitrary::Arbitrary;
use parking_lot::RwLock;
use std::collections::BTreeMap;
use tokio::sync::RwLock;
use tree_hash::{Hash256, TreeHash, TreeHashType};

/// Length type, to avoid confusion with depth and other `usize` parameters.
Expand Down
2 changes: 1 addition & 1 deletion src/vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ impl<T: Default + Value, N: Unsigned> Default for Vector<T, N> {
}
}

impl<T: Value + Send + Sync, N: Unsigned> tree_hash::TreeHash for Vector<T, N> {
impl<T: Value + Send + Sync + 'static, N: Unsigned> tree_hash::TreeHash for Vector<T, N> {
fn tree_hash_type() -> tree_hash::TreeHashType {
tree_hash::TreeHashType::Vector
}
Expand Down
Loading