From d856ce2121952617080c8ee2a02a70968e1606f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Kami=C5=84ski?= Date: Thu, 5 Oct 2023 16:57:47 +0200 Subject: [PATCH 1/6] Add tests for Highway exponent switching --- node/src/components/consensus/cl_context.rs | 1 + .../consensus/highway_core/highway.rs | 10 + .../components/consensus/protocols/highway.rs | 2 +- .../protocols/highway/round_success_meter.rs | 120 +---- .../highway/round_success_meter/tests.rs | 109 +++++ .../consensus/protocols/highway/tests.rs | 139 +++++- .../highway/tests/consensus_environment.rs | 410 ++++++++++++++++++ node/src/components/consensus/tests/utils.rs | 8 + 8 files changed, 680 insertions(+), 119 deletions(-) create mode 100644 node/src/components/consensus/protocols/highway/round_success_meter/tests.rs create mode 100644 node/src/components/consensus/protocols/highway/tests/consensus_environment.rs diff --git a/node/src/components/consensus/cl_context.rs b/node/src/components/consensus/cl_context.rs index 251f1022dd..ff5764c698 100644 --- a/node/src/components/consensus/cl_context.rs +++ b/node/src/components/consensus/cl_context.rs @@ -13,6 +13,7 @@ use crate::{ }; #[derive(DataSize)] +#[cfg_attr(test, derive(Clone))] pub struct Keypair { secret_key: Arc, public_key: PublicKey, diff --git a/node/src/components/consensus/highway_core/highway.rs b/node/src/components/consensus/highway_core/highway.rs index 682485463d..00890def11 100644 --- a/node/src/components/consensus/highway_core/highway.rs +++ b/node/src/components/consensus/highway_core/highway.rs @@ -224,6 +224,16 @@ impl Highway { self.active_validator = None; } + /// Gets the next round exponent + #[cfg(test)] + #[allow(clippy::integer_arithmetic)] + pub(crate) fn get_round_exp(&self) -> Option { + self.active_validator.as_ref().map(|av| { + (av.next_round_length().millis() / self.state.params().min_round_length().millis()) + .trailing_zeros() as u8 + }) + } + /// Switches the active validator to a new round length. pub(crate) fn set_round_len(&mut self, new_round_len: TimeDiff) { if let Some(ref mut av) = self.active_validator { diff --git a/node/src/components/consensus/protocols/highway.rs b/node/src/components/consensus/protocols/highway.rs index 8626c9d41f..fbe28add0c 100644 --- a/node/src/components/consensus/protocols/highway.rs +++ b/node/src/components/consensus/protocols/highway.rs @@ -367,7 +367,7 @@ impl HighwayProtocol { fn calculate_round_length(&mut self, vv: &ValidVertex, now: Timestamp) { let new_round_len = self .round_success_meter - .calculate_new_length(self.highway.state()); + .calculate_new_length(self.highway.state(), now); // If the vertex contains a proposal, register it in the success meter. // It's important to do this _after_ the calculation above - otherwise we might try to // register the proposal before the meter is aware that a new round has started, and it diff --git a/node/src/components/consensus/protocols/highway/round_success_meter.rs b/node/src/components/consensus/protocols/highway/round_success_meter.rs index 938bf4dbeb..01d13f6e00 100644 --- a/node/src/components/consensus/protocols/highway/round_success_meter.rs +++ b/node/src/components/consensus/protocols/highway/round_success_meter.rs @@ -1,3 +1,6 @@ +#[cfg(test)] +mod tests; + use std::{cmp::max, collections::VecDeque, mem}; use datasize::DataSize; @@ -102,8 +105,7 @@ impl RoundSuccessMeter { /// successful, we return a higher round length for the future. /// If the length shouldn't grow, and the round ID is divisible by a certain number, a lower /// round length is returned. - pub fn calculate_new_length(&mut self, state: &State) -> TimeDiff { - let now = Timestamp::now(); + pub fn calculate_new_length(&mut self, state: &State, now: Timestamp) -> TimeDiff { // if the round hasn't finished, just return whatever we have now if state::round_id(now, self.current_round_len) <= self.current_round_id { return self.new_length(); @@ -212,117 +214,3 @@ fn round_index(r_id: Timestamp, round_len: TimeDiff) -> u64 { } r_id.millis() / round_len.millis() } - -#[cfg(test)] -mod tests { - use config::{Config, ACCELERATION_PARAMETER, MAX_FAILED_ROUNDS, NUM_ROUNDS_TO_CONSIDER}; - - use casper_types::{TimeDiff, Timestamp}; - - use crate::components::consensus::{ - cl_context::ClContext, - protocols::highway::round_success_meter::{config, round_index}, - }; - - const TEST_ROUND_LEN: TimeDiff = TimeDiff::from_millis(1 << 13); - const TEST_MIN_ROUND_LEN: TimeDiff = TimeDiff::from_millis(1 << 8); - const TEST_MAX_ROUND_LEN: TimeDiff = TimeDiff::from_millis(1 << 19); - - #[test] - fn new_length_steady() { - let round_success_meter: super::RoundSuccessMeter = - super::RoundSuccessMeter::new( - TEST_ROUND_LEN, - TEST_MIN_ROUND_LEN, - TEST_MAX_ROUND_LEN, - Timestamp::now(), - Config::default(), - ); - assert_eq!(round_success_meter.new_length(), TEST_ROUND_LEN); - } - - #[test] - fn new_length_slow_down() { - let mut round_success_meter: super::RoundSuccessMeter = - super::RoundSuccessMeter::new( - TEST_ROUND_LEN, - TEST_MIN_ROUND_LEN, - TEST_MAX_ROUND_LEN, - Timestamp::now(), - Config::default(), - ); - // If there have been more rounds of failure than MAX_FAILED_ROUNDS, slow down - round_success_meter.rounds = vec![false; MAX_FAILED_ROUNDS + 1].into(); - assert_eq!(round_success_meter.new_length(), TEST_ROUND_LEN * 2); - } - - #[test] - fn new_length_can_not_slow_down_because_max_round_len() { - // If the round length is the same as the maximum round length, can't go up - let mut round_success_meter: super::RoundSuccessMeter = - super::RoundSuccessMeter::new( - TEST_MAX_ROUND_LEN, - TEST_MIN_ROUND_LEN, - TEST_MAX_ROUND_LEN, - Timestamp::now(), - Config::default(), - ); - // If there have been more rounds of failure than MAX_FAILED_ROUNDS, slow down -- but can't - // slow down because of ceiling - round_success_meter.rounds = vec![false; MAX_FAILED_ROUNDS + 1].into(); - assert_eq!(round_success_meter.new_length(), TEST_MAX_ROUND_LEN); - } - - #[test] - fn new_length_speed_up() { - // If there's been enough successful rounds and it's an acceleration round, speed up - let mut round_success_meter: super::RoundSuccessMeter = - super::RoundSuccessMeter::new( - TEST_ROUND_LEN, - TEST_MIN_ROUND_LEN, - TEST_MAX_ROUND_LEN, - Timestamp::now(), - Config::default(), - ); - round_success_meter.rounds = vec![true; NUM_ROUNDS_TO_CONSIDER].into(); - // Increase our round index until we are at an acceleration round - loop { - let current_round_index = round_index( - round_success_meter.current_round_id, - round_success_meter.current_round_len, - ); - if current_round_index % ACCELERATION_PARAMETER == 0 { - break; - }; - round_success_meter.current_round_id += TimeDiff::from_millis(1); - } - assert_eq!(round_success_meter.new_length(), TEST_ROUND_LEN / 2); - } - - #[test] - fn new_length_can_not_speed_up_because_min_round_len() { - // If there's been enough successful rounds and it's an acceleration round, but we are - // already at the smallest round length possible, stay at the current round length - let mut round_success_meter: super::RoundSuccessMeter = - super::RoundSuccessMeter::new( - TEST_MIN_ROUND_LEN, - TEST_MIN_ROUND_LEN, - TEST_MAX_ROUND_LEN, - Timestamp::now(), - Config::default(), - ); - round_success_meter.rounds = vec![true; NUM_ROUNDS_TO_CONSIDER].into(); - // Increase our round index until we are at an acceleration round - loop { - let current_round_index = round_index( - round_success_meter.current_round_id, - round_success_meter.current_round_len, - ); - if current_round_index % ACCELERATION_PARAMETER == 0 { - break; - }; - round_success_meter.current_round_id += TimeDiff::from_millis(1); - } - assert_eq!(round_success_meter.new_length(), TEST_MIN_ROUND_LEN); - } -} diff --git a/node/src/components/consensus/protocols/highway/round_success_meter/tests.rs b/node/src/components/consensus/protocols/highway/round_success_meter/tests.rs new file mode 100644 index 0000000000..85babde4d3 --- /dev/null +++ b/node/src/components/consensus/protocols/highway/round_success_meter/tests.rs @@ -0,0 +1,109 @@ +use config::{Config, ACCELERATION_PARAMETER, MAX_FAILED_ROUNDS, NUM_ROUNDS_TO_CONSIDER}; + +use casper_types::{TimeDiff, Timestamp}; + +use crate::components::consensus::{ + cl_context::ClContext, + protocols::highway::round_success_meter::{config, round_index}, +}; + +const TEST_ROUND_LEN: TimeDiff = TimeDiff::from_millis(1 << 13); +const TEST_MIN_ROUND_LEN: TimeDiff = TimeDiff::from_millis(1 << 8); +const TEST_MAX_ROUND_LEN: TimeDiff = TimeDiff::from_millis(1 << 19); + +#[test] +fn new_length_steady() { + let round_success_meter: super::RoundSuccessMeter = super::RoundSuccessMeter::new( + TEST_ROUND_LEN, + TEST_MIN_ROUND_LEN, + TEST_MAX_ROUND_LEN, + Timestamp::now(), + Config::default(), + ); + assert_eq!(round_success_meter.new_length(), TEST_ROUND_LEN); +} + +#[test] +fn new_length_slow_down() { + let mut round_success_meter: super::RoundSuccessMeter = + super::RoundSuccessMeter::new( + TEST_ROUND_LEN, + TEST_MIN_ROUND_LEN, + TEST_MAX_ROUND_LEN, + Timestamp::now(), + Config::default(), + ); + // If there have been more rounds of failure than MAX_FAILED_ROUNDS, slow down + round_success_meter.rounds = vec![false; MAX_FAILED_ROUNDS + 1].into(); + assert_eq!(round_success_meter.new_length(), TEST_ROUND_LEN * 2); +} + +#[test] +fn new_length_can_not_slow_down_because_max_round_len() { + // If the round length is the same as the maximum round length, can't go up + let mut round_success_meter: super::RoundSuccessMeter = + super::RoundSuccessMeter::new( + TEST_MAX_ROUND_LEN, + TEST_MIN_ROUND_LEN, + TEST_MAX_ROUND_LEN, + Timestamp::now(), + Config::default(), + ); + // If there have been more rounds of failure than MAX_FAILED_ROUNDS, slow down -- but can't + // slow down because of ceiling + round_success_meter.rounds = vec![false; MAX_FAILED_ROUNDS + 1].into(); + assert_eq!(round_success_meter.new_length(), TEST_MAX_ROUND_LEN); +} + +#[test] +fn new_length_speed_up() { + // If there's been enough successful rounds and it's an acceleration round, speed up + let mut round_success_meter: super::RoundSuccessMeter = + super::RoundSuccessMeter::new( + TEST_ROUND_LEN, + TEST_MIN_ROUND_LEN, + TEST_MAX_ROUND_LEN, + Timestamp::now(), + Config::default(), + ); + round_success_meter.rounds = vec![true; NUM_ROUNDS_TO_CONSIDER].into(); + // Increase our round index until we are at an acceleration round + loop { + let current_round_index = round_index( + round_success_meter.current_round_id, + round_success_meter.current_round_len, + ); + if current_round_index % ACCELERATION_PARAMETER == 0 { + break; + }; + round_success_meter.current_round_id += TimeDiff::from_millis(1); + } + assert_eq!(round_success_meter.new_length(), TEST_ROUND_LEN / 2); +} + +#[test] +fn new_length_can_not_speed_up_because_min_round_len() { + // If there's been enough successful rounds and it's an acceleration round, but we are + // already at the smallest round length possible, stay at the current round length + let mut round_success_meter: super::RoundSuccessMeter = + super::RoundSuccessMeter::new( + TEST_MIN_ROUND_LEN, + TEST_MIN_ROUND_LEN, + TEST_MAX_ROUND_LEN, + Timestamp::now(), + Config::default(), + ); + round_success_meter.rounds = vec![true; NUM_ROUNDS_TO_CONSIDER].into(); + // Increase our round index until we are at an acceleration round + loop { + let current_round_index = round_index( + round_success_meter.current_round_id, + round_success_meter.current_round_len, + ); + if current_round_index % ACCELERATION_PARAMETER == 0 { + break; + }; + round_success_meter.current_round_id += TimeDiff::from_millis(1); + } + assert_eq!(round_success_meter.new_length(), TEST_MIN_ROUND_LEN); +} diff --git a/node/src/components/consensus/protocols/highway/tests.rs b/node/src/components/consensus/protocols/highway/tests.rs index cfeee9e653..3f03c59a51 100644 --- a/node/src/components/consensus/protocols/highway/tests.rs +++ b/node/src/components/consensus/protocols/highway/tests.rs @@ -1,4 +1,9 @@ -use std::{collections::BTreeSet, sync::Arc}; +mod consensus_environment; + +use std::{ + collections::{BTreeMap, BTreeSet}, + sync::Arc, +}; use casper_types::{testing::TestRng, PublicKey, TimeDiff, Timestamp, U512}; @@ -19,6 +24,8 @@ use crate::{ }, tests::utils::{ new_test_chainspec, ALICE_NODE_ID, ALICE_PUBLIC_KEY, ALICE_SECRET_KEY, BOB_PUBLIC_KEY, + BOB_SECRET_KEY, CAROL_PUBLIC_KEY, CAROL_SECRET_KEY, DAVE_PUBLIC_KEY, DAVE_SECRET_KEY, + ELLEN_PUBLIC_KEY, ELLEN_SECRET_KEY, }, traits::Context, utils::{ValidatorIndex, Weight}, @@ -27,6 +34,8 @@ use crate::{ types::BlockPayload, }; +use consensus_environment::ConsensusEnvironment; + /// Returns a new `State` with `ClContext` parameters suitable for tests. pub(crate) fn new_test_state(weights: I, seed: u64) -> State where @@ -56,6 +65,19 @@ pub(crate) fn new_test_highway_protocol( weights: I1, init_faulty: I2, ) -> Box> +where + I1: IntoIterator, + I2: IntoIterator, + T: Into, +{ + new_test_highway_protocol_with_era_height(weights, init_faulty, None) +} + +pub(crate) fn new_test_highway_protocol_with_era_height( + weights: I1, + init_faulty: I2, + era_height: Option, +) -> Box> where I1: IntoIterator, I2: IntoIterator, @@ -65,7 +87,10 @@ where .into_iter() .map(|(pk, w)| (pk, w.into())) .collect::>(); - let chainspec = new_test_chainspec(weights.clone()); + let mut chainspec = new_test_chainspec(weights.clone()); + if let Some(era_height) = era_height { + chainspec.core_config.minimum_era_height = era_height; + } let config = Config { max_execution_delay: 3, highway: HighwayConfig { @@ -233,3 +258,113 @@ fn max_rounds_per_era_returns_the_correct_value_for_prod_chainspec_value() { assert_eq!(219, max_rounds_per_era); } + +#[test] +fn no_slow_down_when_all_nodes_fast() { + let mut validators = BTreeMap::new(); + validators.insert( + ALICE_PUBLIC_KEY.clone(), + (Keypair::from(Arc::clone(&*ALICE_SECRET_KEY)), 100), + ); + validators.insert( + BOB_PUBLIC_KEY.clone(), + (Keypair::from(Arc::clone(&*BOB_SECRET_KEY)), 100), + ); + validators.insert( + CAROL_PUBLIC_KEY.clone(), + (Keypair::from(Arc::clone(&*CAROL_SECRET_KEY)), 100), + ); + validators.insert( + DAVE_PUBLIC_KEY.clone(), + (Keypair::from(Arc::clone(&*DAVE_SECRET_KEY)), 100), + ); + validators.insert( + ELLEN_PUBLIC_KEY.clone(), + (Keypair::from(Arc::clone(&*ELLEN_SECRET_KEY)), 100), + ); + + let mut env = ConsensusEnvironment::new(validators, Default::default()); + for _ in 0..10 { + env.crank_round(); + } + + assert_eq!(env.our_round_exp(), 0); +} + +#[ignore] // TODO: unignore when exponent switching is improved +#[test] +fn slow_node_should_switch_own_round_exponent() { + let mut validators = BTreeMap::new(); + validators.insert( + ALICE_PUBLIC_KEY.clone(), + (Keypair::from(Arc::clone(&*ALICE_SECRET_KEY)), 100), + ); + validators.insert( + BOB_PUBLIC_KEY.clone(), + (Keypair::from(Arc::clone(&*BOB_SECRET_KEY)), 100), + ); + validators.insert( + CAROL_PUBLIC_KEY.clone(), + (Keypair::from(Arc::clone(&*CAROL_SECRET_KEY)), 100), + ); + validators.insert( + DAVE_PUBLIC_KEY.clone(), + (Keypair::from(Arc::clone(&*DAVE_SECRET_KEY)), 100), + ); + validators.insert( + ELLEN_PUBLIC_KEY.clone(), + (Keypair::from(Arc::clone(&*ELLEN_SECRET_KEY)), 100), + ); + + // Alice is the tested node; it will be slow, and so it should switch to a higher round + // exponent + let mut slow_nodes = BTreeSet::new(); + slow_nodes.insert(ALICE_PUBLIC_KEY.clone()); + + let mut env = ConsensusEnvironment::new(validators, slow_nodes); + for _ in 0..10 { + env.crank_round(); + } + + assert!(env.our_round_exp() > 0); +} + +#[ignore] // TODO: unignore when exponent switching is improved +#[test] +fn slow_down_when_majority_slow() { + let mut validators = BTreeMap::new(); + validators.insert( + ALICE_PUBLIC_KEY.clone(), + (Keypair::from(Arc::clone(&*ALICE_SECRET_KEY)), 100), + ); + validators.insert( + BOB_PUBLIC_KEY.clone(), + (Keypair::from(Arc::clone(&*BOB_SECRET_KEY)), 100), + ); + validators.insert( + CAROL_PUBLIC_KEY.clone(), + (Keypair::from(Arc::clone(&*CAROL_SECRET_KEY)), 100), + ); + validators.insert( + DAVE_PUBLIC_KEY.clone(), + (Keypair::from(Arc::clone(&*DAVE_SECRET_KEY)), 100), + ); + validators.insert( + ELLEN_PUBLIC_KEY.clone(), + (Keypair::from(Arc::clone(&*ELLEN_SECRET_KEY)), 100), + ); + + // Alice is the tested node; it will be slow, and so it should switch to a higher round + // exponent + let mut slow_nodes = BTreeSet::new(); + slow_nodes.insert(CAROL_PUBLIC_KEY.clone()); + slow_nodes.insert(DAVE_PUBLIC_KEY.clone()); + slow_nodes.insert(ELLEN_PUBLIC_KEY.clone()); + + let mut env = ConsensusEnvironment::new(validators, slow_nodes); + for _ in 0..10 { + env.crank_round(); + } + + assert!(env.our_round_exp() > 0); +} diff --git a/node/src/components/consensus/protocols/highway/tests/consensus_environment.rs b/node/src/components/consensus/protocols/highway/tests/consensus_environment.rs new file mode 100644 index 0000000000..aff3d2a55c --- /dev/null +++ b/node/src/components/consensus/protocols/highway/tests/consensus_environment.rs @@ -0,0 +1,410 @@ +// This is a test module, so we're not worried about integer arithmetic here. +#![allow(clippy::integer_arithmetic)] + +use std::{ + collections::{BTreeMap, BTreeSet}, + fs::File, + io::Write, +}; + +use casper_types::{PublicKey, Timestamp}; + +use crate::{ + components::consensus::{ + cl_context::{ClContext, Keypair}, + consensus_protocol::{ConsensusProtocol, FinalizedBlock, ProtocolOutcome}, + era_supervisor::debug::EraDump, + highway_core::{ + highway::{SignedWireUnit, Vertex, WireUnit}, + State, + }, + protocols::highway::{ + HighwayMessage, HighwayProtocol, ACTION_ID_VERTEX, TIMER_ID_ACTIVE_VALIDATOR, + }, + tests::utils::ALICE_NODE_ID, + traits::Context, + utils::ValidatorIndex, + LeaderSequence, ProposedBlock, SerializedMessage, + }, + NodeRng, +}; + +use super::new_test_highway_protocol_with_era_height; + +pub(super) struct ConsensusEnvironment { + highway: Box>, + leaders: LeaderSequence, + validators: BTreeMap, + current_round_start: u64, + slow_validators: BTreeSet, + rng: NodeRng, + finalized_blocks: Vec>, +} + +impl ConsensusEnvironment { + pub(super) fn new( + validators: BTreeMap, + slow_validators: BTreeSet, + ) -> Self { + let mut highway = new_test_highway_protocol_with_era_height( + validators + .iter() + .map(|(pub_key, value)| (pub_key.clone(), value.1)), + vec![], + Some(10), + ); + // our active validator will be the first in the map + let (pub_key, (keypair, _)) = validators.iter().next().unwrap(); + // this is necessary for the round exponent to be tracked - it only happens in the + // ActiveValidator + let _ = + highway.activate_validator(pub_key.clone(), keypair.clone(), Timestamp::zero(), None); + Self { + highway, + leaders: LeaderSequence::new( + 0, // used as the seed in `new_test_highway_protocol` + &validators + .values() + .map(|(_keypair, weight)| (*weight).into()) + .collect(), + vec![true; validators.len()].into(), + ), + validators, + current_round_start: 0, + slow_validators, + rng: NodeRng::new(), + finalized_blocks: vec![], + } + } + + fn highway(&self) -> &HighwayProtocol { + self.highway.as_any().downcast_ref().unwrap() + } + + fn our_pub_key(&self) -> &PublicKey { + self.validators.keys().next().unwrap() + } + + fn is_slow(&self) -> bool { + self.slow_validators.contains(self.our_pub_key()) + } + + fn round_len(&self) -> u64 { + self.highway() + .highway() + .state() + .params() + .min_round_length() + .millis() + } + + fn clone_state(&self) -> State { + self.highway().highway().state().clone() + } + + pub(super) fn crank_round(&mut self) { + let min_round_len = self.round_len(); + let round_id = Timestamp::from(self.current_round_start); + let leader = self.leaders.leader(round_id.millis()); + let leader_pub_key = self + .validators + .keys() + .nth(leader.0 as usize) + .unwrap() + .clone(); + let leader_is_slow = self.slow_validators.contains(&leader_pub_key); + + let pre_proposal_state = self.clone_state(); + + let (mut post_proposal_state, maybe_proposal_msg) = if leader.0 == 0 { + // our active validator is the proposer + self.this_node_propose(); + (self.clone_state(), None) + } else { + // another validator is the proposer + self.other_node_propose(leader) + }; + + // if we're slow, we're going to create a witness unit before receiving any units from + // other nodes, effectively not citing any of them + // if we're the leader, our proposal and confirmation are already in the state at this + // point + if self.is_slow() { + let timestamp = (self.current_round_start + min_round_len * 2 / 3).into(); + let outcomes = self.highway.handle_timer( + timestamp, + timestamp, + TIMER_ID_ACTIVE_VALIDATOR, + &mut self.rng, + ); + self.finalize_blocks(&outcomes); + } + + if leader_is_slow { + // all validators will just send witness units, as they won't receive a proposal before + // witness timeout + let witness_units: Vec<_> = self + .validators + .iter() + .enumerate() + .skip(1) + .map(|(vid, (_, (keypair, _)))| { + self.create_swunit_from( + (vid as u32).into(), + keypair, + &pre_proposal_state, + min_round_len * 2 / 3, + None, + ) + }) + .collect(); + // we create a witness, too + if !self.is_slow() { + let timestamp = (self.current_round_start + min_round_len * 2 / 3).into(); + let outcomes = self.highway.handle_timer( + timestamp, + timestamp, + TIMER_ID_ACTIVE_VALIDATOR, + &mut self.rng, + ); + self.finalize_blocks(&outcomes); + } + if let Some(proposal_msg) = maybe_proposal_msg { + self.handle_message(proposal_msg, min_round_len * 3 / 4); + } + for unit in witness_units { + let highway_msg = HighwayMessage::NewVertex(Vertex::Unit(unit)); + let msg = SerializedMessage::from_message(&highway_msg); + self.handle_message(msg, min_round_len * 3 / 4); + } + + self.add_vertices((self.current_round_start + min_round_len * 4 / 5).into()); + } else { + // every fast validator creates a confirmation + let fast_confirmation_units: Vec<_> = self + .validators + .iter() + .enumerate() + .skip(1) + .filter(|(vid, (pub_key, _))| { + !self.slow_validators.contains(pub_key) && *vid != leader.0 as usize + }) + .map(|(vid, (_, (keypair, _)))| { + self.create_swunit_from( + (vid as u32).into(), + keypair, + &post_proposal_state, + min_round_len / 3, + None, + ) + }) + .collect(); + + // add proposal and confirmations to our state + if let Some(proposal_msg) = maybe_proposal_msg { + self.handle_message(proposal_msg, min_round_len / 4); + } + for unit in fast_confirmation_units.clone() { + let highway_msg = HighwayMessage::NewVertex(Vertex::Unit(unit)); + let msg = SerializedMessage::from_message(&highway_msg); + self.handle_message(msg, min_round_len / 3 + 1); + } + self.add_vertices((self.current_round_start + min_round_len / 3 + 2).into()); + + let post_confirmation_state = if self.is_slow() { + // if we're slow, the post confirmation state should not contain our own confirmation + for unit in fast_confirmation_units { + post_proposal_state.add_valid_unit(unit); + } + post_proposal_state + } else { + self.clone_state() + }; + + // we create a witness at this point, if we aren't slow + if !self.is_slow() { + let timestamp = (self.current_round_start + min_round_len * 2 / 3).into(); + self.highway.handle_timer( + timestamp, + timestamp, + TIMER_ID_ACTIVE_VALIDATOR, + &mut self.rng, + ); + } + + let fast_witness_units: Vec<_> = self + .validators + .iter() + .enumerate() + .skip(1) + .filter(|(_, (pub_key, _))| !self.slow_validators.contains(pub_key)) + .map(|(vid, (_, (keypair, _)))| { + self.create_swunit_from( + (vid as u32).into(), + keypair, + &post_confirmation_state, + min_round_len * 2 / 3, + None, + ) + }) + .collect(); + for unit in fast_witness_units { + let highway_msg = HighwayMessage::NewVertex(Vertex::Unit(unit)); + let msg = SerializedMessage::from_message(&highway_msg); + self.handle_message(msg, min_round_len * 3 / 4); + } + self.add_vertices((self.current_round_start + min_round_len * 3 / 4 + 1).into()); + + // Slow nodes create witnesses before they can receive the proposal + let slow_witness_units: Vec<_> = self + .validators + .iter() + .enumerate() + .skip(1) + .filter(|(_, (pub_key, _))| self.slow_validators.contains(pub_key)) + .map(|(vid, (_, (keypair, _)))| { + self.create_swunit_from( + (vid as u32).into(), + keypair, + &pre_proposal_state, + min_round_len * 2 / 3, + None, + ) + }) + .collect(); + for unit in slow_witness_units { + let highway_msg = HighwayMessage::NewVertex(Vertex::Unit(unit)); + let msg = SerializedMessage::from_message(&highway_msg); + self.handle_message(msg, min_round_len * 3 / 4); + } + self.add_vertices((self.current_round_start + min_round_len * 3 / 4 + 1).into()); + }; + + self.current_round_start = self.current_round_start.saturating_add(min_round_len); + } + + fn this_node_propose(&mut self) { + let now: Timestamp = self.current_round_start.into(); + // the timer triggers a request for block content + let outcomes = + self.highway + .handle_timer(now, now, TIMER_ID_ACTIVE_VALIDATOR, &mut self.rng); + self.finalize_blocks(&outcomes); + // the request contains necessary block context - extract it + let block_context = outcomes + .iter() + .find_map(|outcome| match outcome { + ProtocolOutcome::CreateNewBlock(context) => Some(context.clone()), + _ => None, + }) + .unwrap_or_else(|| panic!("outcomes didn't contain CreateNewBlock: {:?}", outcomes)); + // this should create the proposal unit, add it to the state and create a message to be + // broadcast - we can ignore the message, because we don't keep other consensus + // instances + let outcomes = self + .highway + .propose(ProposedBlock::new(Default::default(), block_context), now); + self.finalize_blocks(&outcomes); + } + + fn create_swunit_from( + &self, + creator: ValidatorIndex, + keypair: &Keypair, + state: &State, + delay: u64, + value: Option<::ConsensusValue>, + ) -> SignedWireUnit { + let seq_number = { + let prev_unit_hash = state.panorama().get(creator).unwrap().correct(); + prev_unit_hash.map_or(0, |hash| state.unit(hash).seq_number.saturating_add(1)) + }; + let wunit: WireUnit = WireUnit { + panorama: state.panorama().clone(), + creator, + instance_id: *self.highway().instance_id(), + value, + seq_number, + timestamp: (self.current_round_start + delay).into(), + round_exp: 0, + endorsed: BTreeSet::new(), + }; + SignedWireUnit::new(wunit.into_hashed(), keypair) + } + + fn other_node_propose( + &mut self, + leader: ValidatorIndex, + ) -> (State, Option) { + let (_pub_key, (keypair, _weight)) = self.validators.iter().nth(leader.0 as usize).unwrap(); + let state = self.highway().highway().state(); + let swunit = self.create_swunit_from(leader, keypair, state, 0, Some(Default::default())); + + let mut state_clone = state.clone(); + state_clone.add_valid_unit(swunit.clone()); + + let highway_message: HighwayMessage = + HighwayMessage::NewVertex(Vertex::Unit(swunit)); + let msg = SerializedMessage::from_message(&highway_message); + + (state_clone, Some(msg)) + } + + fn add_vertices(&mut self, timestamp: Timestamp) { + loop { + let outcomes = self.highway.handle_action(ACTION_ID_VERTEX, timestamp); + self.finalize_blocks(&outcomes); + if !outcomes + .iter() + .any(|outcome| matches!(outcome, ProtocolOutcome::QueueAction(_))) + { + break; + } + } + } + + fn handle_message(&mut self, msg: SerializedMessage, delay: u64) { + let outcomes = self.highway.handle_message( + &mut self.rng, + *ALICE_NODE_ID, + msg, + (self.current_round_start + delay).into(), + ); + self.finalize_blocks(&outcomes); + } + + fn finalize_blocks(&mut self, outcomes: &[ProtocolOutcome]) { + for outcome in outcomes { + if let ProtocolOutcome::FinalizedBlock(block) = outcome { + self.finalized_blocks.push(block.clone()); + } + } + } + + pub(super) fn our_round_exp(&self) -> u8 { + self.highway().highway().get_round_exp().unwrap() + } + + /// For test debugging purposes + #[allow(unused)] + pub(super) fn dump(&self) { + let dump = EraDump { + id: 0.into(), + start_time: 0.into(), + accusations: &Default::default(), + cannot_propose: &Default::default(), + faulty: &Default::default(), + start_height: 0, + validators: &self + .validators + .iter() + .map(|(pub_key, (_, weight))| (pub_key.clone(), (*weight).into())) + .collect(), + highway_state: self.highway().highway().state(), + }; + + let mut file = File::create("/tmp/consensus.dump").unwrap(); + let data = bincode::serialize(&dump).unwrap(); + let _ = file.write_all(&data); + } +} diff --git a/node/src/components/consensus/tests/utils.rs b/node/src/components/consensus/tests/utils.rs index a791f14d76..ff3aafd1c1 100644 --- a/node/src/components/consensus/tests/utils.rs +++ b/node/src/components/consensus/tests/utils.rs @@ -40,6 +40,14 @@ pub static CAROL_SECRET_KEY: Lazy> = Lazy::new(|| Arc::new(SecretKey::ed25519_from_bytes([2; SecretKey::ED25519_LENGTH]).unwrap())); pub static CAROL_PUBLIC_KEY: Lazy = Lazy::new(|| PublicKey::from(&**CAROL_SECRET_KEY)); +pub static DAVE_SECRET_KEY: Lazy> = + Lazy::new(|| Arc::new(SecretKey::ed25519_from_bytes([3; SecretKey::ED25519_LENGTH]).unwrap())); +pub static DAVE_PUBLIC_KEY: Lazy = Lazy::new(|| PublicKey::from(&**DAVE_SECRET_KEY)); + +pub static ELLEN_SECRET_KEY: Lazy> = + Lazy::new(|| Arc::new(SecretKey::ed25519_from_bytes([4; SecretKey::ED25519_LENGTH]).unwrap())); +pub static ELLEN_PUBLIC_KEY: Lazy = Lazy::new(|| PublicKey::from(&**ELLEN_SECRET_KEY)); + /// Loads the local chainspec and overrides timestamp and genesis account with the given stakes. /// The test `Chainspec` returned has eras with exactly two blocks. pub fn new_test_chainspec(stakes: I) -> Chainspec From 8539e10aedf4247eadf262bb1480daa57d647ba9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Kami=C5=84ski?= Date: Fri, 20 Oct 2023 15:42:34 +0200 Subject: [PATCH 2/6] Fix formatting --- .../consensus/protocols/highway/tests/consensus_environment.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/node/src/components/consensus/protocols/highway/tests/consensus_environment.rs b/node/src/components/consensus/protocols/highway/tests/consensus_environment.rs index aff3d2a55c..6d7246dd9d 100644 --- a/node/src/components/consensus/protocols/highway/tests/consensus_environment.rs +++ b/node/src/components/consensus/protocols/highway/tests/consensus_environment.rs @@ -212,7 +212,8 @@ impl ConsensusEnvironment { self.add_vertices((self.current_round_start + min_round_len / 3 + 2).into()); let post_confirmation_state = if self.is_slow() { - // if we're slow, the post confirmation state should not contain our own confirmation + // if we're slow, the post confirmation state should not contain our own + // confirmation for unit in fast_confirmation_units { post_proposal_state.add_valid_unit(unit); } From a9a16b1465904a30b0c4f8988ed2ad94cf07fc73 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Kami=C5=84ski?= Date: Thu, 26 Oct 2023 15:51:33 +0200 Subject: [PATCH 3/6] Fix the slow leader case --- .../highway/tests/consensus_environment.rs | 33 ++++++++++++++++--- 1 file changed, 28 insertions(+), 5 deletions(-) diff --git a/node/src/components/consensus/protocols/highway/tests/consensus_environment.rs b/node/src/components/consensus/protocols/highway/tests/consensus_environment.rs index 6d7246dd9d..d2bb800ca5 100644 --- a/node/src/components/consensus/protocols/highway/tests/consensus_environment.rs +++ b/node/src/components/consensus/protocols/highway/tests/consensus_environment.rs @@ -89,7 +89,7 @@ impl ConsensusEnvironment { self.slow_validators.contains(self.our_pub_key()) } - fn round_len(&self) -> u64 { + fn min_round_len(&self) -> u64 { self.highway() .highway() .state() @@ -103,7 +103,7 @@ impl ConsensusEnvironment { } pub(super) fn crank_round(&mut self) { - let min_round_len = self.round_len(); + let min_round_len = self.min_round_len(); let round_id = Timestamp::from(self.current_round_start); let leader = self.leaders.leader(round_id.millis()); let leader_pub_key = self @@ -143,11 +143,12 @@ impl ConsensusEnvironment { if leader_is_slow { // all validators will just send witness units, as they won't receive a proposal before // witness timeout - let witness_units: Vec<_> = self + let mut witness_units: Vec<_> = self .validators .iter() .enumerate() .skip(1) + .filter(|(vid, _)| *vid != leader.0 as usize) .map(|(vid, (_, (keypair, _)))| { self.create_swunit_from( (vid as u32).into(), @@ -158,6 +159,18 @@ impl ConsensusEnvironment { ) }) .collect(); + if leader.0 != 0 { + // the leader has to create a witness from the post-proposal state, or it will + // equivocate + let keypair = &self.validators.values().nth(leader.0 as usize).unwrap().0; + witness_units.push(self.create_swunit_from( + leader.0.into(), + keypair, + &post_proposal_state, + min_round_len * 2 / 3, + None, + )); + } // we create a witness, too if !self.is_slow() { let timestamp = (self.current_round_start + min_round_len * 2 / 3).into(); @@ -178,7 +191,7 @@ impl ConsensusEnvironment { self.handle_message(msg, min_round_len * 3 / 4); } - self.add_vertices((self.current_round_start + min_round_len * 4 / 5).into()); + self.add_vertices((self.current_round_start + min_round_len * 3 / 4 + 1).into()); } else { // every fast validator creates a confirmation let fast_confirmation_units: Vec<_> = self @@ -285,6 +298,13 @@ impl ConsensusEnvironment { } fn this_node_propose(&mut self) { + if (self.current_round_start / self.min_round_len()).trailing_zeros() + < self.our_round_exp() as u32 + { + // we're not proposing if we shouldn't even be participating in this round + return; + } + let now: Timestamp = self.current_round_start.into(); // the timer triggers a request for block content let outcomes = @@ -298,7 +318,10 @@ impl ConsensusEnvironment { ProtocolOutcome::CreateNewBlock(context) => Some(context.clone()), _ => None, }) - .unwrap_or_else(|| panic!("outcomes didn't contain CreateNewBlock: {:?}", outcomes)); + .unwrap_or_else(|| { + self.dump(); + panic!("outcomes didn't contain CreateNewBlock: {:?}", outcomes) + }); // this should create the proposal unit, add it to the state and create a message to be // broadcast - we can ignore the message, because we don't keep other consensus // instances From 29d4e773fd6260e35207a3ec43522fff07d98aef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Kami=C5=84ski?= Date: Thu, 26 Oct 2023 15:51:51 +0200 Subject: [PATCH 4/6] Address review comments --- node/src/components/consensus/highway_core/highway.rs | 2 +- node/src/components/consensus/protocols/highway/tests.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/node/src/components/consensus/highway_core/highway.rs b/node/src/components/consensus/highway_core/highway.rs index 00890def11..544121cc17 100644 --- a/node/src/components/consensus/highway_core/highway.rs +++ b/node/src/components/consensus/highway_core/highway.rs @@ -224,7 +224,7 @@ impl Highway { self.active_validator = None; } - /// Gets the next round exponent + /// Gets the round exponent for the next message this instance will create. #[cfg(test)] #[allow(clippy::integer_arithmetic)] pub(crate) fn get_round_exp(&self) -> Option { diff --git a/node/src/components/consensus/protocols/highway/tests.rs b/node/src/components/consensus/protocols/highway/tests.rs index 3f03c59a51..3bf778e5d1 100644 --- a/node/src/components/consensus/protocols/highway/tests.rs +++ b/node/src/components/consensus/protocols/highway/tests.rs @@ -291,7 +291,7 @@ fn no_slow_down_when_all_nodes_fast() { assert_eq!(env.our_round_exp(), 0); } -#[ignore] // TODO: unignore when exponent switching is improved +#[ignore = "TODO: unignore when exponent switching is improved"] #[test] fn slow_node_should_switch_own_round_exponent() { let mut validators = BTreeMap::new(); @@ -329,7 +329,7 @@ fn slow_node_should_switch_own_round_exponent() { assert!(env.our_round_exp() > 0); } -#[ignore] // TODO: unignore when exponent switching is improved +#[ignore = "TODO: unignore when exponent switching is improved"] #[test] fn slow_down_when_majority_slow() { let mut validators = BTreeMap::new(); From ec07edf71b95a77ead20eef05cbdf06dd20b01f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Kami=C5=84ski?= Date: Thu, 26 Oct 2023 17:56:01 +0200 Subject: [PATCH 5/6] Appease cargo fmt --- .../src/storage/trie_store/operations/mod.rs | 9 +++++---- node/src/components/network/outgoing.rs | 4 ++-- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/execution_engine/src/storage/trie_store/operations/mod.rs b/execution_engine/src/storage/trie_store/operations/mod.rs index bbc9c83c90..a851bd695c 100644 --- a/execution_engine/src/storage/trie_store/operations/mod.rs +++ b/execution_engine/src/storage/trie_store/operations/mod.rs @@ -593,10 +593,11 @@ where let trie_key = new_extension.trie_hash()?; new_elements.push((trie_key, new_extension)) } - // The single sibling is an extension. We output an extension to replace - // the parent, prepending the sibling index to the sibling's affix. In - // the next loop iteration, we will handle the case where this extension - // might need to be combined with a grandparent extension. + // The single sibling is an extension. We output an extension to + // replace the parent, prepending the sibling index to the sibling's + // affix. In the next loop iteration, we will handle the case where + // this extension might need to be combined with a grandparent + // extension. Trie::Extension { affix: extension_affix, pointer, diff --git a/node/src/components/network/outgoing.rs b/node/src/components/network/outgoing.rs index ce5d87944c..871a968a48 100644 --- a/node/src/components/network/outgoing.rs +++ b/node/src/components/network/outgoing.rs @@ -27,8 +27,8 @@ //! * The `Dialer` is expected to produce `DialOutcomes` for every dial [`DialRequest::Dial`] //! eventually. These must be forwarded to the `OutgoingManager` via the `handle_dial_outcome` //! function. -//! * The `perform_housekeeping` method must be called periodically to give the -//! `OutgoingManager` a chance to initiate reconnections and collect garbage. +//! * The `perform_housekeeping` method must be called periodically to give the `OutgoingManager` a +//! chance to initiate reconnections and collect garbage. //! * When a connection is dropped, the connection manager must be notified via //! `handle_connection_drop`. //! From c25981559372c093947e4892f466f3af3997ea4d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Kami=C5=84ski?= Date: Fri, 27 Oct 2023 18:47:37 +0200 Subject: [PATCH 6/6] Improve code documentation --- .../highway/tests/consensus_environment.rs | 70 ++++++++++++++++--- 1 file changed, 60 insertions(+), 10 deletions(-) diff --git a/node/src/components/consensus/protocols/highway/tests/consensus_environment.rs b/node/src/components/consensus/protocols/highway/tests/consensus_environment.rs index d2bb800ca5..9442f2d613 100644 --- a/node/src/components/consensus/protocols/highway/tests/consensus_environment.rs +++ b/node/src/components/consensus/protocols/highway/tests/consensus_environment.rs @@ -31,6 +31,16 @@ use crate::{ use super::new_test_highway_protocol_with_era_height; +/// An environment that simulates a single instance of Highway and how it processes consensus +/// messages. +/// The consensus instance is assumed to be the one held by the validator at index 0. The other +/// validators exist only as keypairs, and their activity is simulated by means of passing +/// SignedWireUnits to the Highway instance. Some validators can be defined to be slow, which means +/// they generate their units before they can receive units from other nodes, but the states are +/// assumed to be consistent at the end of each consensus round. +/// Since the only protocol state in this simulation is the one in the Highway instance of node 0, +/// it is also used as the basis for generation of the simulated consensus messages sent from other +/// nodes. pub(super) struct ConsensusEnvironment { highway: Box>, leaders: LeaderSequence, @@ -42,6 +52,9 @@ pub(super) struct ConsensusEnvironment { } impl ConsensusEnvironment { + /// Creates a new `ConsensusEnvironment` with the given set of validators and slow validators. + /// The public keys in `slow_validators` should exist in the `validators` map and have their + /// associated keypairs and weights. pub(super) fn new( validators: BTreeMap, slow_validators: BTreeSet, @@ -77,18 +90,23 @@ impl ConsensusEnvironment { } } + /// Gets the node 0 Highway instance as a reference to the `HighwayProtocol` struct. fn highway(&self) -> &HighwayProtocol { self.highway.as_any().downcast_ref().unwrap() } + /// Gets the public key of node 0. fn our_pub_key(&self) -> &PublicKey { self.validators.keys().next().unwrap() } + /// Returns `true` if node 0 is a slow node. fn is_slow(&self) -> bool { self.slow_validators.contains(self.our_pub_key()) } + /// Returns the minimum round length setting from the test chainspec generated for the + /// simulation. fn min_round_len(&self) -> u64 { self.highway() .highway() @@ -98,10 +116,23 @@ impl ConsensusEnvironment { .millis() } + /// Clones the protocol state of the Highway instance of node 0. + /// This is used to save the state at some particular point, so that it can be used later for + /// creation of units sent by other nodes, while the simulated internal state of node 0 + /// progresses. fn clone_state(&self) -> State { self.highway().highway().state().clone() } + /// Simulates a round of consensus. + /// In each round, the leader creates a proposal and a witness unit. + /// If the leader is a slow node, other nodes only receive the proposal at the end of the round + /// and create witness units without a proposal. + /// Otherwise, the fast nodes will create a confirmation unit and a witness unit. The slow + /// nodes will not receive the proposal and fast nodes' confirmation units before the witness + /// timeout, so they will create witness units without citing a proposal also in this case. + /// This makes it so that slow nodes effectively don't participate in the finalization of + /// proposals, making their participation metrics very low. pub(super) fn crank_round(&mut self) { let min_round_len = self.min_round_len(); let round_id = Timestamp::from(self.current_round_start); @@ -297,6 +328,9 @@ impl ConsensusEnvironment { self.current_round_start = self.current_round_start.saturating_add(min_round_len); } + /// Simulates a proposal being sent by node 0. This is done by triggering the active validator + /// timer at the timestamp when node 0 should make a proposal, and then calling + /// `self.highway.propose()` with the context returned by the timer handler. fn this_node_propose(&mut self) { if (self.current_round_start / self.min_round_len()).trailing_zeros() < self.our_round_exp() as u32 @@ -331,6 +365,10 @@ impl ConsensusEnvironment { self.finalize_blocks(&outcomes); } + /// Creates a simulated `SignedWireUnit` sent by `creator`. This is used to simulate other + /// nodes participating in consensus. The created unit uses `state.panorama()` as the cited + /// panorama, which enables simulating the creation of units at a particular point in time + /// (by passing a clone of the state from that point in time). fn create_swunit_from( &self, creator: ValidatorIndex, @@ -356,6 +394,9 @@ impl ConsensusEnvironment { SignedWireUnit::new(wunit.into_hashed(), keypair) } + /// Simulates a proposal being sent by a node other than node 0. This is just a message + /// containing a `SignedWireUnit` with a default consensus value, with the timestamp at the + /// start of the current round. fn other_node_propose( &mut self, leader: ValidatorIndex, @@ -374,6 +415,22 @@ impl ConsensusEnvironment { (state_clone, Some(msg)) } + /// Handles a consensus message received from another node. + /// If the message is a vertex (which is the only simulated case in this environment), it will + /// become queued in the synchronizer, and will only be added to the protocol state once + /// `add_vertices` is called. + fn handle_message(&mut self, msg: SerializedMessage, delay: u64) { + let outcomes = self.highway.handle_message( + &mut self.rng, + *ALICE_NODE_ID, + msg, + (self.current_round_start + delay).into(), + ); + self.finalize_blocks(&outcomes); + } + + /// Processes the vertices queued in the synchronizer of the Highway instance and adds them to + /// the protocol state. fn add_vertices(&mut self, timestamp: Timestamp) { loop { let outcomes = self.highway.handle_action(ACTION_ID_VERTEX, timestamp); @@ -387,16 +444,7 @@ impl ConsensusEnvironment { } } - fn handle_message(&mut self, msg: SerializedMessage, delay: u64) { - let outcomes = self.highway.handle_message( - &mut self.rng, - *ALICE_NODE_ID, - msg, - (self.current_round_start + delay).into(), - ); - self.finalize_blocks(&outcomes); - } - + /// Saves the finalized blocks that were output by the Highway instance. fn finalize_blocks(&mut self, outcomes: &[ProtocolOutcome]) { for outcome in outcomes { if let ProtocolOutcome::FinalizedBlock(block) = outcome { @@ -405,6 +453,8 @@ impl ConsensusEnvironment { } } + /// Gets the current round exponent set in the Highway instance. It is being changed by the + /// instance itself internally in response to how the protocol state progresses. pub(super) fn our_round_exp(&self) -> u8 { self.highway().highway().get_round_exp().unwrap() }