Skip to content

Commit

Permalink
Write new hot state summaries in different column
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed Dec 25, 2024
1 parent 889a968 commit ee13a1f
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 58 deletions.
15 changes: 5 additions & 10 deletions beacon_node/beacon_chain/src/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@ use crate::summaries_dag::{
};
use parking_lot::Mutex;
use slog::{debug, error, info, warn, Logger};
use ssz::Decode;
use std::collections::HashSet;
use std::mem;
use std::sync::{mpsc, Arc};
use std::thread;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use store::hot_cold_store::{migrate_database, HotColdDBError};
use store::{DBColumn, Error, HotStateSummary, ItemStore, StoreOp};
use store::{Error, ItemStore, StoreOp};
pub use store::{HotColdDB, MemoryStore};
use types::{BeaconState, BeaconStateHash, Checkpoint, Epoch, EthSpec, Hash256, Slot};

Expand Down Expand Up @@ -491,14 +490,10 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho

let (state_summaries_dag, block_summaries_dag) = {
let state_summaries = store
.hot_db
.iter_column::<Hash256>(DBColumn::BeaconStateSummary)
.map(|res| {
let (state_root, value) = res?;
let summary = HotStateSummary::from_ssz_bytes(&value)?;
Ok((state_root, summary.into()))
})
.collect::<Result<Vec<(Hash256, DAGStateSummary)>, Error>>()?;
.load_hot_state_summaries()?
.into_iter()
.map(|(state_root, summary)| (state_root, summary.into()))
.collect::<Vec<(Hash256, DAGStateSummary)>>();

// De-duplicate block roots to reduce block reads below
let summary_block_roots = HashSet::<Hash256>::from_iter(
Expand Down
89 changes: 54 additions & 35 deletions beacon_node/beacon_chain/src/schema_change/migration_schema_v23.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,52 +85,65 @@ pub fn upgrade_to_v23<T: BeaconChainTypes>(

// Store immediately so that future diffs can load and diff from it.
let mut ops = vec![];
// We must commit the hot state summary immediatelly, otherwise we can't diff
// against it and future writes will fail. That's why we write the new hot
// summaries in a different column to have both new and old data present at
// once. Otherwise if the process crashes during the migration the database will
// be broken.
db.store_hot_state_summary(&state_root, &state, &mut ops)?;
db.store_hot_state_diffs(&state_root, &state, &mut ops)?;
db.hot_db.do_atomically(ops)?;
diffs_written += 1;
}
StorageStrategy::ReplayFrom(_) => {
// Optimization: instead of having to load the state of each summary we load x32
// less states by manually computing the HotStateSummary roots using the
// computed state dag.
//
// No need to store diffs for states that will be reconstructed by replaying
// blocks.
}
}

// 2. Convert the summary to the new format.
let latest_block_root = old_summary.latest_block_root;
let previous_state_root = if state_root == split.state_root {
Hash256::ZERO
} else {
state_summaries_dag
.previous_state_root(state_root)
.map_err(|e| {
Error::MigrationError(format!("error computing previous_state_root {e:?}"))
})?
};

let diff_base_state_root =
if let Some(diff_base_slot) = storage_strategy.diff_base_slot() {
DiffBaseStateRoot::new(
diff_base_slot,
// 2. Convert the summary to the new format.
let latest_block_root = old_summary.latest_block_root;
let previous_state_root = if state_root == split.state_root {
Hash256::ZERO
} else {
state_summaries_dag
.ancestor_state_root_at_slot(state_root, diff_base_slot)
.previous_state_root(state_root)
.map_err(|e| {
Error::MigrationError(format!(
"error computing ancestor_state_root_at_slot {e:?}"
"error computing previous_state_root {e:?}"
))
})?,
)
} else {
DiffBaseStateRoot::zero()
};

let new_summary = HotStateSummary {
slot,
latest_block_root,
previous_state_root,
diff_base_state_root,
};
// Overwrite the summaries atomically as part of the migration.
migrate_ops.push(new_summary.as_kv_store_op(state_root));
})?
};

let diff_base_state_root =
if let Some(diff_base_slot) = storage_strategy.diff_base_slot() {
DiffBaseStateRoot::new(
diff_base_slot,
state_summaries_dag
.ancestor_state_root_at_slot(state_root, diff_base_slot)
.map_err(|e| {
Error::MigrationError(format!(
"error computing ancestor_state_root_at_slot {e:?}"
))
})?,
)
} else {
DiffBaseStateRoot::zero()
};

let new_summary = HotStateSummary {
slot,
latest_block_root,
previous_state_root,
diff_base_state_root,
};
let op = new_summary.as_kv_store_op(state_root);
// It's not ncessary to immediately commit the summaries of states that are
// ReplayFrom. However we do so for simplicity.
db.hot_db.do_atomically(vec![op])?;
}
}

// 3. Stage old data for deletion.
if slot % T::EthSpec::slots_per_epoch() == 0 {
Expand All @@ -139,6 +152,11 @@ pub fn upgrade_to_v23<T: BeaconChainTypes>(
migrate_ops.push(KeyValueStoreOp::DeleteKey(state_key));
}

// Delete previous summaries
let state_summary_key =
get_key_for_col(DBColumn::BeaconStateSummary.into(), state_root.as_slice());
migrate_ops.push(KeyValueStoreOp::DeleteKey(state_summary_key));

summaries_written += 1;
if last_log_time.elapsed() > Duration::from_secs(5) {
last_log_time = Instant::now();
Expand Down Expand Up @@ -179,6 +197,7 @@ fn new_dag<T: BeaconChainTypes>(
// Collect all sumaries for unfinalized states
let state_summaries_v22 = db
.hot_db
// Collect summaries from the legacy V22 column BeaconStateSummary
.iter_column::<Hash256>(DBColumn::BeaconStateSummary)
.map(|res| {
let (key, value) = res?;
Expand Down
33 changes: 22 additions & 11 deletions beacon_node/store/src/hot_cold_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1241,8 +1241,10 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>

StoreOp::DeleteState(state_root, slot) => {
// Delete the hot state summary.
let state_summary_key =
get_key_for_col(DBColumn::BeaconStateSummary.into(), state_root.as_slice());
let state_summary_key = get_key_for_col(
DBColumn::BeaconStateHotSummary.into(),
state_root.as_slice(),
);
key_value_batch.push(KeyValueStoreOp::DeleteKey(state_summary_key));

// Delete the state temporary flag (if any). Temporary flags are commonly
Expand Down Expand Up @@ -1635,14 +1637,10 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}) = self.load_hot_state_summary(&state_root)?
else {
let mut existing_summaries = self
.hot_db
.iter_column::<Hash256>(DBColumn::BeaconStateSummary)
.map(|res| {
let (state_root, value) = res?;
let summary = HotStateSummary::from_ssz_bytes(&value)?;
Ok((state_root, summary.slot))
})
.collect::<Result<Vec<(Hash256, Slot)>, Error>>()?;
.load_hot_state_summaries()?
.into_iter()
.map(|(state_root, summary)| (state_root, summary.slot))
.collect::<Vec<(Hash256, Slot)>>();
existing_summaries.sort_by(|a, b| a.1.cmp(&b.1));
// Hot summaries should never be missing, dump the current list of summaries to ease debug
debug!(
Expand Down Expand Up @@ -2704,6 +2702,18 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
.map_err(|e| Error::LoadHotStateSummary(*state_root, e.into()))
}

/// Load all hot state summaries present in the hot DB
pub fn load_hot_state_summaries(&self) -> Result<Vec<(Hash256, HotStateSummary)>, Error> {
self.hot_db
.iter_column::<Hash256>(DBColumn::BeaconStateHotSummary)
.map(|res| {
let (state_root, value) = res?;
let summary = HotStateSummary::from_ssz_bytes(&value)?;
Ok((state_root, summary))
})
.collect()
}

/// Load the temporary flag for a state root, if one exists.
///
/// Returns `Some` if the state is temporary, or `None` if the state is permanent or does not
Expand Down Expand Up @@ -3120,6 +3130,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
let mut cold_ops = vec![];

let current_schema_columns = vec![
DBColumn::BeaconStateHotSummary,
DBColumn::BeaconColdStateSummary,
DBColumn::BeaconStateSnapshot,
DBColumn::BeaconStateDiff,
Expand Down Expand Up @@ -3409,7 +3420,7 @@ impl DiffBaseStateRoot {

impl StoreItem for HotStateSummary {
fn db_column() -> DBColumn {
DBColumn::BeaconStateSummary
DBColumn::BeaconStateHotSummary
}

fn as_store_bytes(&self) -> Vec<u8> {
Expand Down
13 changes: 11 additions & 2 deletions beacon_node/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ pub trait KeyValueStore<E: EthSpec>: Sync + Send + Sized + 'static {
// i.e. entries being created and deleted.
for column in [
DBColumn::BeaconState,
DBColumn::BeaconStateSummary,
DBColumn::BeaconStateHotSummary,
DBColumn::BeaconBlock,
] {
self.compact_column(column)?;
Expand Down Expand Up @@ -285,12 +285,20 @@ pub enum DBColumn {
/// For compact `BeaconStateDiff`s in the freezer DB.
#[strum(serialize = "bsd")]
BeaconStateDiff,
/// Mapping from state root to `HotStateSummary` in the hot DB.
/// DEPRECATED
///
/// Mapping from state root to `HotStateSummaryV22` in the hot DB.
///
/// Previously this column also served a role in the freezer DB, mapping state roots to
/// `ColdStateSummary`. However that role is now filled by `BeaconColdStateSummary`.
#[strum(serialize = "bss")]
BeaconStateSummary,
/// Mapping from state root to `HotStateSummaryV23` in the hot DB.
///
/// This column is populated after DB schema version 23 superseding `BeaconStateSummary`. The
/// new column is necessary to have a safe migration without data loss.
#[strum(serialize = "bs3")]
BeaconStateHotSummary,
/// Mapping from state root to `ColdStateSummary` in the cold DB.
#[strum(serialize = "bcs")]
BeaconColdStateSummary,
Expand Down Expand Up @@ -394,6 +402,7 @@ impl DBColumn {
| Self::BeaconStateSummary
| Self::BeaconStateHotDiff
| Self::BeaconStateHotSnapshot
| Self::BeaconStateHotSummary
| Self::BeaconColdStateSummary
| Self::BeaconStateTemporary
| Self::ExecPayload
Expand Down

0 comments on commit ee13a1f

Please sign in to comment.