diff --git a/beacon_node/beacon_chain/src/migrate.rs b/beacon_node/beacon_chain/src/migrate.rs index e445263bd4..4c32b44f48 100644 --- a/beacon_node/beacon_chain/src/migrate.rs +++ b/beacon_node/beacon_chain/src/migrate.rs @@ -4,14 +4,13 @@ use crate::summaries_dag::{ }; use parking_lot::Mutex; use slog::{debug, error, info, warn, Logger}; -use ssz::Decode; use std::collections::HashSet; use std::mem; use std::sync::{mpsc, Arc}; use std::thread; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use store::hot_cold_store::{migrate_database, HotColdDBError}; -use store::{DBColumn, Error, HotStateSummary, ItemStore, StoreOp}; +use store::{Error, ItemStore, StoreOp}; pub use store::{HotColdDB, MemoryStore}; use types::{BeaconState, BeaconStateHash, Checkpoint, Epoch, EthSpec, Hash256, Slot}; @@ -491,14 +490,10 @@ impl, Cold: ItemStore> BackgroundMigrator(DBColumn::BeaconStateSummary) - .map(|res| { - let (state_root, value) = res?; - let summary = HotStateSummary::from_ssz_bytes(&value)?; - Ok((state_root, summary.into())) - }) - .collect::, Error>>()?; + .load_hot_state_summaries()? + .into_iter() + .map(|(state_root, summary)| (state_root, summary.into())) + .collect::>(); // De-duplicate block roots to reduce block reads below let summary_block_roots = HashSet::::from_iter( diff --git a/beacon_node/beacon_chain/src/schema_change/migration_schema_v23.rs b/beacon_node/beacon_chain/src/schema_change/migration_schema_v23.rs index b82d0684af..32bc1b4702 100644 --- a/beacon_node/beacon_chain/src/schema_change/migration_schema_v23.rs +++ b/beacon_node/beacon_chain/src/schema_change/migration_schema_v23.rs @@ -85,52 +85,65 @@ pub fn upgrade_to_v23( // Store immediately so that future diffs can load and diff from it. let mut ops = vec![]; + // We must commit the hot state summary immediatelly, otherwise we can't diff + // against it and future writes will fail. That's why we write the new hot + // summaries in a different column to have both new and old data present at + // once. Otherwise if the process crashes during the migration the database will + // be broken. + db.store_hot_state_summary(&state_root, &state, &mut ops)?; db.store_hot_state_diffs(&state_root, &state, &mut ops)?; db.hot_db.do_atomically(ops)?; diffs_written += 1; } StorageStrategy::ReplayFrom(_) => { + // Optimization: instead of having to load the state of each summary we load x32 + // less states by manually computing the HotStateSummary roots using the + // computed state dag. + // // No need to store diffs for states that will be reconstructed by replaying // blocks. - } - } - - // 2. Convert the summary to the new format. - let latest_block_root = old_summary.latest_block_root; - let previous_state_root = if state_root == split.state_root { - Hash256::ZERO - } else { - state_summaries_dag - .previous_state_root(state_root) - .map_err(|e| { - Error::MigrationError(format!("error computing previous_state_root {e:?}")) - })? - }; - - let diff_base_state_root = - if let Some(diff_base_slot) = storage_strategy.diff_base_slot() { - DiffBaseStateRoot::new( - diff_base_slot, + // 2. Convert the summary to the new format. + let latest_block_root = old_summary.latest_block_root; + let previous_state_root = if state_root == split.state_root { + Hash256::ZERO + } else { state_summaries_dag - .ancestor_state_root_at_slot(state_root, diff_base_slot) + .previous_state_root(state_root) .map_err(|e| { Error::MigrationError(format!( - "error computing ancestor_state_root_at_slot {e:?}" + "error computing previous_state_root {e:?}" )) - })?, - ) - } else { - DiffBaseStateRoot::zero() - }; - - let new_summary = HotStateSummary { - slot, - latest_block_root, - previous_state_root, - diff_base_state_root, - }; - // Overwrite the summaries atomically as part of the migration. - migrate_ops.push(new_summary.as_kv_store_op(state_root)); + })? + }; + + let diff_base_state_root = + if let Some(diff_base_slot) = storage_strategy.diff_base_slot() { + DiffBaseStateRoot::new( + diff_base_slot, + state_summaries_dag + .ancestor_state_root_at_slot(state_root, diff_base_slot) + .map_err(|e| { + Error::MigrationError(format!( + "error computing ancestor_state_root_at_slot {e:?}" + )) + })?, + ) + } else { + DiffBaseStateRoot::zero() + }; + + let new_summary = HotStateSummary { + slot, + latest_block_root, + previous_state_root, + diff_base_state_root, + }; + let op = new_summary.as_kv_store_op(state_root); + // It's not ncessary to immediately commit the summaries of states that are + // ReplayFrom. However we do so for simplicity. + db.hot_db.do_atomically(vec![op])?; + } + } // 3. Stage old data for deletion. if slot % T::EthSpec::slots_per_epoch() == 0 { @@ -139,6 +152,11 @@ pub fn upgrade_to_v23( migrate_ops.push(KeyValueStoreOp::DeleteKey(state_key)); } + // Delete previous summaries + let state_summary_key = + get_key_for_col(DBColumn::BeaconStateSummary.into(), state_root.as_slice()); + migrate_ops.push(KeyValueStoreOp::DeleteKey(state_summary_key)); + summaries_written += 1; if last_log_time.elapsed() > Duration::from_secs(5) { last_log_time = Instant::now(); @@ -179,6 +197,7 @@ fn new_dag( // Collect all sumaries for unfinalized states let state_summaries_v22 = db .hot_db + // Collect summaries from the legacy V22 column BeaconStateSummary .iter_column::(DBColumn::BeaconStateSummary) .map(|res| { let (key, value) = res?; diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 9ab233c647..a6aecd5c64 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -1241,8 +1241,10 @@ impl, Cold: ItemStore> HotColdDB StoreOp::DeleteState(state_root, slot) => { // Delete the hot state summary. - let state_summary_key = - get_key_for_col(DBColumn::BeaconStateSummary.into(), state_root.as_slice()); + let state_summary_key = get_key_for_col( + DBColumn::BeaconStateHotSummary.into(), + state_root.as_slice(), + ); key_value_batch.push(KeyValueStoreOp::DeleteKey(state_summary_key)); // Delete the state temporary flag (if any). Temporary flags are commonly @@ -1635,14 +1637,10 @@ impl, Cold: ItemStore> HotColdDB }) = self.load_hot_state_summary(&state_root)? else { let mut existing_summaries = self - .hot_db - .iter_column::(DBColumn::BeaconStateSummary) - .map(|res| { - let (state_root, value) = res?; - let summary = HotStateSummary::from_ssz_bytes(&value)?; - Ok((state_root, summary.slot)) - }) - .collect::, Error>>()?; + .load_hot_state_summaries()? + .into_iter() + .map(|(state_root, summary)| (state_root, summary.slot)) + .collect::>(); existing_summaries.sort_by(|a, b| a.1.cmp(&b.1)); // Hot summaries should never be missing, dump the current list of summaries to ease debug debug!( @@ -2704,6 +2702,18 @@ impl, Cold: ItemStore> HotColdDB .map_err(|e| Error::LoadHotStateSummary(*state_root, e.into())) } + /// Load all hot state summaries present in the hot DB + pub fn load_hot_state_summaries(&self) -> Result, Error> { + self.hot_db + .iter_column::(DBColumn::BeaconStateHotSummary) + .map(|res| { + let (state_root, value) = res?; + let summary = HotStateSummary::from_ssz_bytes(&value)?; + Ok((state_root, summary)) + }) + .collect() + } + /// Load the temporary flag for a state root, if one exists. /// /// Returns `Some` if the state is temporary, or `None` if the state is permanent or does not @@ -3120,6 +3130,7 @@ impl, Cold: ItemStore> HotColdDB let mut cold_ops = vec![]; let current_schema_columns = vec![ + DBColumn::BeaconStateHotSummary, DBColumn::BeaconColdStateSummary, DBColumn::BeaconStateSnapshot, DBColumn::BeaconStateDiff, @@ -3409,7 +3420,7 @@ impl DiffBaseStateRoot { impl StoreItem for HotStateSummary { fn db_column() -> DBColumn { - DBColumn::BeaconStateSummary + DBColumn::BeaconStateHotSummary } fn as_store_bytes(&self) -> Vec { diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 7bba4c0f62..35489ec3f9 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -93,7 +93,7 @@ pub trait KeyValueStore: Sync + Send + Sized + 'static { // i.e. entries being created and deleted. for column in [ DBColumn::BeaconState, - DBColumn::BeaconStateSummary, + DBColumn::BeaconStateHotSummary, DBColumn::BeaconBlock, ] { self.compact_column(column)?; @@ -285,12 +285,20 @@ pub enum DBColumn { /// For compact `BeaconStateDiff`s in the freezer DB. #[strum(serialize = "bsd")] BeaconStateDiff, - /// Mapping from state root to `HotStateSummary` in the hot DB. + /// DEPRECATED + /// + /// Mapping from state root to `HotStateSummaryV22` in the hot DB. /// /// Previously this column also served a role in the freezer DB, mapping state roots to /// `ColdStateSummary`. However that role is now filled by `BeaconColdStateSummary`. #[strum(serialize = "bss")] BeaconStateSummary, + /// Mapping from state root to `HotStateSummaryV23` in the hot DB. + /// + /// This column is populated after DB schema version 23 superseding `BeaconStateSummary`. The + /// new column is necessary to have a safe migration without data loss. + #[strum(serialize = "bs3")] + BeaconStateHotSummary, /// Mapping from state root to `ColdStateSummary` in the cold DB. #[strum(serialize = "bcs")] BeaconColdStateSummary, @@ -394,6 +402,7 @@ impl DBColumn { | Self::BeaconStateSummary | Self::BeaconStateHotDiff | Self::BeaconStateHotSnapshot + | Self::BeaconStateHotSummary | Self::BeaconColdStateSummary | Self::BeaconStateTemporary | Self::ExecPayload