diff --git a/beacon_node/beacon_chain/src/schema_change/migration_schema_v23.rs b/beacon_node/beacon_chain/src/schema_change/migration_schema_v23.rs index 9a37095d6ed..e81c5f42638 100644 --- a/beacon_node/beacon_chain/src/schema_change/migration_schema_v23.rs +++ b/beacon_node/beacon_chain/src/schema_change/migration_schema_v23.rs @@ -1,21 +1,18 @@ use crate::beacon_chain::BeaconChainTypes; -use safe_arith::SafeArith; use slog::{info, Logger}; use ssz::Decode; use ssz_derive::{Decode, Encode}; use std::{ + collections::{BTreeMap, HashMap}, sync::Arc, time::{Duration, Instant}, }; use store::{ - get_full_state, get_full_state_v22, get_key_for_col, - hot_cold_store::{DiffBaseStateRoot, HotColdDBError}, + get_full_state_v22, get_key_for_col, hdiff::StorageStrategy, hot_cold_store::DiffBaseStateRoot, DBColumn, Error, HotColdDB, HotStateSummary, KeyValueStore, KeyValueStoreOp, StoreItem, }; use types::{EthSpec, Hash256, Slot}; -const PERSISTS_DIFFS_EVERY: usize = 5; - #[derive(Debug, Clone, Copy, Encode, Decode)] pub struct HotStateSummaryV22 { slot: Slot, @@ -36,40 +33,7 @@ pub fn upgrade_to_v23( // new state, re-using existing code. As states are likely to be sequential the diff cache // should kick in making the migration more efficient. If we just iterate the column of // summaries we may get distance state of each iteration. - let state_summaries_v22 = db - .hot_db - .iter_column::(DBColumn::BeaconStateSummary) - .map(|res| { - let (key, value) = res?; - let state_root: Hash256 = key; - let summary = HotStateSummaryV22::from_ssz_bytes(&value)?; - Ok((state_root, summary)) - }) - .collect::, Error>>()?; - - let mut state_summaries_by_block_root = HashMap::new(); - for (state_root, summary) in &state_summaries_v22 { - let summaries = state_summaries_by_block_root - .entry(summary.latest_block_root) - .or_default(); - - // TODO(hdiff): error if existing - summaries.insert(summary.slot, summary); - } - - // Construct block root to parent block root mapping. - let mut parent_block_roots = HashMap::new(); - for block_root in state_summaries_by_root.keys() { - let blinded_block = db - .get_blinded_block(block_root)? - .unwrap_or(Error::MissingBlock(block_root))?; - parent_block_roots.insert(block_root, blinded_block.parent_root()); - } - - let state_summaries_dag = StateSummariesDAG { - state_summaries_by_block_root, - parent_block_roots, - }; + let state_summaries_dag = StateSummariesDAG::new::(&db)?; // Sort summaries by slot so we have their ancestor diffs already stored when we store them. let summaries_by_slot = state_summaries_dag.summaries_by_slot_ascending(); @@ -84,6 +48,8 @@ pub fn upgrade_to_v23( // - Set all others are `Replay` pointing to `epoch_boundary_state_root` let mut migrate_ops = vec![]; + let mut diffs_written = 0; + let mut summaries_written = 0; let mut last_log_time = Instant::now(); for (slot, old_hot_state_summaries) in summaries_by_slot { @@ -91,16 +57,17 @@ pub fn upgrade_to_v23( // 1. Store snapshot or diff at this slot (if required). // TODO: make sure lowest hot hierarchy config is >= 5 to prevent having to // reconstruct states. - match db.hierarchy_hot.storage_strategy(slot) { + match db.hierarchy_hot.storage_strategy(slot)? { StorageStrategy::DiffFrom(_) | StorageStrategy::Snapshot => { // Load the full state and re-store it as a snapshot or diff. - let state = get_full_state(&db.hot_db, &state_root, db.spec)? - .unwrap_or(Error::MissingState(state_root))?; + let state = get_full_state_v22(&db.hot_db, &state_root, &db.spec)? + .ok_or_else(|| Error::MissingState(state_root))?; // Store immediately so that future diffs can load and diff from it. let mut ops = vec![]; - db.store_hot_state_diffs(state_root, &state, &mut ops)?; + db.store_hot_state_diffs(&state_root, &state, &mut ops)?; db.hot_db.do_atomically(ops)?; + diffs_written += 1; } StorageStrategy::ReplayFrom(_) => { // No need to store diffs for states that will be reconstructed by replaying @@ -111,14 +78,20 @@ pub fn upgrade_to_v23( // 2. Convert the summary to the new format. let latest_block_root = old_summary.latest_block_root; let previous_state_root = if state_root == split.state_root { - Hash256::zero() + Hash256::ZERO } else { - state_summaries_dag.previous_state_root(latest_block_root, slot)?; + state_summaries_dag.previous_state_root(latest_block_root, slot)? }; let diff_base_slot = db.hierarchy_hot.diff_base_slot(slot)?; - let diff_base_state_root = state_summaries_dag - .ancestor_state_root_at_slot(latest_block_root, diff_base_slot)?; + let diff_base_state_root = DiffBaseStateRoot::new( + diff_base_slot, + state_summaries_dag.ancestor_state_root_at_slot( + latest_block_root, + old_summary.slot, + diff_base_slot, + )?, + ); let new_summary = HotStateSummary { slot, @@ -127,71 +100,26 @@ pub fn upgrade_to_v23( diff_base_state_root, }; // Overwrite the summaries atomically as part of the migration. - migrate_ops.push(new_summary.as_kv_store_op(state_root))?; + migrate_ops.push(new_summary.as_kv_store_op(state_root)); // 3. Stage old data for deletion. - if slot % E::slots_per_epoch() == 0 { + if slot % T::EthSpec::slots_per_epoch() == 0 { let state_key = get_key_for_col(DBColumn::BeaconState.into(), state_root.as_slice()); migrate_ops.push(KeyValueStoreOp::DeleteKey(state_key)); } - } - } - - for (i, (state_root, summary)) in entries.iter().enumerate() { - // States are only immediately available on the epoch boundary, else we need to replay - if summary.slot % T::EthSpec::slots_per_epoch() == 0 { - let state = get_full_state_v22(&db.hot_db, state_root, &db.spec)? - .ok_or(HotColdDBError::MissingEpochBoundaryState(*state_root))?; - db.store_hot_state_separate_ops(state_root, &state, &mut new_diff_ops, &mut vec![])?; - } - - // TODO: What to do with summaries not at an epoch boundary? - - // Prune states written in previous format as part of the atomic version bump - let state_is_written = summary.slot % T::EthSpec::slots_per_epoch() == 0; - if state_is_written { - // Note: Do not delete the BeaconStateSummary as this will be over-written below - // TODO: Should also delete the temporary flag? - let state_key = get_key_for_col(DBColumn::BeaconState.into(), state_root.as_slice()); - migrate_ops.push(KeyValueStoreOp::DeleteKey(state_key)); - } - // Use the `state_summaries_dag` to compute previous roots to prevent having to load - // states in the epoch with block replay. - let diff_base_slot = db.hierarchy_hot.diff_base_slot(summary.slot)?; - let diff_base_state_root = DiffBaseStateRoot::new( - diff_base_slot, - state_summaries_dag.ancestor_at_slot(*state_root, diff_base_slot)?, - ); - - let new_summary = HotStateSummary { - slot: summary.slot, - latest_block_root: summary.latest_block_root, - diff_base_state_root, - previous_state_root: state_summaries_dag - .ancestor_at_slot(*state_root, summary.slot.safe_sub(1)?)?, - }; - migrate_ops.push(new_summary.as_kv_store_op(*state_root)); - - // Write diffs and snapshots frequently to not blow up the memory - if new_diff_ops.len() > PERSISTS_DIFFS_EVERY { - diffs_written += new_diff_ops.len(); - // Commit diff ops once in a while to not blow up memory - // Commit diff ops before deleting states to not lose data. - db.hot_db.do_atomically(new_diff_ops)?; - new_diff_ops = vec![]; - } - - if last_log_time.elapsed() > Duration::from_secs(5) { - last_log_time = Instant::now(); - // TODO: Display the slot distance between head and finalized, and head-tracker count - info!( - log, - "Hot state migration in progress"; - "summaries_migrated" => i, - "diff_written" => diffs_written - ); + summaries_written += 1; + if last_log_time.elapsed() > Duration::from_secs(5) { + last_log_time = Instant::now(); + // TODO: Display the slot distance between head and finalized, and head-tracker count + info!( + log, + "Hot state migration in progress"; + "diff_written" => diffs_written, + "summaries_written" => summaries_written, + ); + } } } @@ -209,24 +137,66 @@ pub fn downgrade_to_v22( panic!("downgrade not supported"); } -struct HotStateSummaryWithParentBlockRoot {} - struct StateSummariesDAG { - // block_root -> slot -> (state_root, state summary) + // block_root -> state slot -> (state_root, state summary) state_summaries_by_block_root: HashMap>, // block_root -> parent_block_root parent_block_roots: HashMap, } impl StateSummariesDAG { + fn new( + db: &HotColdDB, + ) -> Result { + // Collect all sumaries for unfinalized states + let state_summaries_v22 = db + .hot_db + .iter_column::(DBColumn::BeaconStateSummary) + .map(|res| { + let (key, value) = res?; + let state_root: Hash256 = key; + let summary = HotStateSummaryV22::from_ssz_bytes(&value)?; + Ok((state_root, summary)) + }) + .collect::, Error>>()?; + + // Group them by latest block root, and sorted state slot + let mut state_summaries_by_block_root = HashMap::>::new(); + for (state_root, summary) in state_summaries_v22.into_iter() { + let summaries = state_summaries_by_block_root + .entry(summary.latest_block_root) + .or_default(); + + // TODO(hdiff): error if existing + summaries.insert(summary.slot, (state_root, summary)); + } + + // Construct block root to parent block root mapping. + let mut parent_block_roots = HashMap::new(); + for block_root in state_summaries_by_block_root.keys() { + let blinded_block = db + .get_blinded_block(block_root)? + .ok_or_else(|| Error::MissingBlock(*block_root))?; + parent_block_roots.insert(*block_root, blinded_block.parent_root()); + } + + Ok(Self { + state_summaries_by_block_root, + parent_block_roots, + }) + } + fn summaries_by_slot_ascending(&self) -> BTreeMap> { - let mut summaries = BTreeMap::new(); + let mut summaries = BTreeMap::>::new(); for (slot, (state_root, summary)) in self .state_summaries_by_block_root .values() .flat_map(|slot_map| slot_map.iter()) { - summaries.insert(slot, (state_root, summary.clone())); + summaries + .entry(*slot) + .or_default() + .push((*state_root, *summary)); } summaries } @@ -240,7 +210,7 @@ impl StateSummariesDAG { let previous_slot = slot - 1; let parent_block_root = self.parent_block_roots - .get(latest_block_root) + .get(&latest_block_root) .ok_or_else(|| { Error::MigrationError(format!( "missing parent block root for {latest_block_root:?}" @@ -248,15 +218,15 @@ impl StateSummariesDAG { })?; let parent_block_summaries = self .state_summaries_by_block_root - .get(&parent_block_root) + .get(parent_block_root) .ok_or_else(|| { Error::MigrationError(format!( "missing state summaries for parent {parent_block_root:?}" )) })?; - if slot_summaries.contains_key(&previous_slot) { + if parent_block_summaries.contains_key(&previous_slot) { // Common case: not a skipped slot. - Ok(parent_block_root) + Ok(*parent_block_root) } else { // Skipped slot: block root at previous slot is the same as latest block root. Ok(latest_block_root) @@ -281,11 +251,12 @@ impl StateSummariesDAG { "missing state summaries for block {latest_block_root:?}" )) })?; - current_block_summaries.get(&slot).ok_or_else(|| { + let (state_root, _state_summary) = current_block_summaries.get(&slot).ok_or_else(|| { Error::MigrationError(format!( - "missing state summary for slot {previous_slot} and block {latest_block_root:?}", + "missing state summary for slot {slot} and block {latest_block_root:?}", )) - }) + })?; + Ok(*state_root) } fn ancestor_state_root_at_slot(