Skip to content

Commit

Permalink
Redo migration
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed Nov 26, 2024
1 parent 3ec4f08 commit 48bb349
Show file tree
Hide file tree
Showing 7 changed files with 184 additions and 178 deletions.
8 changes: 7 additions & 1 deletion beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1480,12 +1480,18 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
vec![]
} else {
vec![
// TODO: No longer relevant under HDiff hot
if state.slot() % T::EthSpec::slots_per_epoch() == 0 {
StoreOp::PutState(state_root, &state)
} else {
StoreOp::PutStateSummary(
state_root,
HotStateSummary::new(&state_root, &state)?,
// Doing this computation here feels spagetti
HotStateSummary::new(
&state_root,
&state,
&chain.store.hierarchy_hot,
)?,
)
},
StoreOp::PutStateTemporaryFlag(state_root),
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
.chain(abandoned_states.into_iter().flat_map(|(slot, state_hash)| {
[
StoreOp::DeleteState(state_hash.into(), Some(slot)),
StoreOp::BeaconStateHotDiff(state_hash.into()),
StoreOp::DeleteStateHotDiff(state_hash.into()),
]
}))
.collect();
Expand Down
15 changes: 8 additions & 7 deletions beacon_node/beacon_chain/src/schema_change.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,19 @@ pub fn migrate_schema<T: BeaconChainTypes>(
// bumped inside the upgrade_to_v22 fn
migration_schema_v22::upgrade_to_v22::<T>(db.clone(), genesis_state_root, log)
}
// Anything else is an error.
(_, _) => Err(HotColdDBError::UnsupportedSchemaVersion {
target_version: to,
current_version: from,
}
.into()),
(SchemaVersion(22), SchemaVersion(23)) => {
let ops = migration_schema_v23::upgrade_to_v23::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
(SchemaVersion(23), SchemaVersion(22)) => {
todo!("downgrade");
let ops = migration_schema_v23::downgrade_to_v22::<T>(db.clone(), log)?;
db.store_schema_version_atomically(to, ops)
}
// Anything else is an error.
(_, _) => Err(HotColdDBError::UnsupportedSchemaVersion {
target_version: to,
current_version: from,
}
.into()),
}
}
170 changes: 87 additions & 83 deletions beacon_node/beacon_chain/src/schema_change/migration_schema_v23.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
use crate::beacon_chain::BeaconChainTypes;
use crate::validator_pubkey_cache::DatabasePubkey;
use safe_arith::SafeArith;
use slog::{info, Logger};
use ssz::{Decode, Encode};
use ssz::Decode;
use ssz_derive::{Decode, Encode};
use std::{
sync::Arc,
time::{Duration, Instant},
};
use store::{
get_key_for_col, hdiff::StorageStrategy, hot_cold_store::StorageStrategyHot, DBColumn, Error,
HotColdDB, HotStateSummary, KeyValueStore, KeyValueStoreOp, StoreItem,
get_full_state_v22, get_key_for_col,
hot_cold_store::{DiffBaseStateRoot, HotColdDBError},
DBColumn, Error, HotColdDB, HotStateSummary, KeyValueStore, KeyValueStoreOp, StoreItem,
};
use types::{Hash256, PublicKey, Slot};
use types::{EthSpec, Hash256, Slot};

const LOG_EVERY: usize = 200_000;
const PERSISTS_DIFFS_EVERY: usize = 5;

#[derive(Debug, Clone, Copy, Encode, Decode)]
pub struct HotStateSummaryV22 {
Expand All @@ -27,98 +29,85 @@ pub fn upgrade_to_v23<T: BeaconChainTypes>(
) -> Result<Vec<KeyValueStoreOp>, Error> {
info!(log, "Upgrading from v22 to v23");

// TODO: sort summaries topologically starting from finalized
// If the summaries are sorted topologically we can insert them into the DB like if they were a
// new state, re-using existing code. As states are likely to be sequential the diff cache
// should kick in making the migration more efficient. If we just iterate the column of
// summaries we may get distance state of each iteration.
let entries = db
.hot_db
.iter_column::<Hash256>(DBColumn::BeaconStateSummary)
.map(|res| {
let (key, value) = res?;
let state_root: Hash256 = key;
let summary = HotStateSummaryV22::from_ssz_bytes(&value)?;
Ok((state_root, summary))
})
.collect::<Result<Vec<_>, Error>>()?;

let state_summaries_dag = StateSummariesDAG {};

// TODO: Should prune states associated with summaries that are not descendant of finalized?

// TODO: Must create the finalized diff points in the hot DB if the node checkpoint synced long
// ago. Also consider availability of the finalized state from the hot DB.

// Upgrade all hot DB state summaries to the new type:
// - Set all summaries of boundary states as `Snapshot` type
// - Set all others are `Replay` pointing to `epoch_boundary_state_root`

let mut other_ops = vec![];
let mut store_diff_ops = vec![];
let mut migrate_ops = vec![];
let mut new_diff_ops = vec![];
let mut diffs_written = 0;
let mut last_log_time = Instant::now();

// Iterate through all pubkeys and decompress them.
for (i, res) in db
.hot_db
.iter_column::<Hash256>(DBColumn::BeaconStateSummary)
.enumerate()
{
let (key, value) = res?;
let state_root: Hash256 = key.into();
let summary = HotStateSummaryV22::from_ssz_bytes(&value)?;
let storage_strategy = if summary.slot % T::EthSpec::slots_per_epoch() == 0 {
// boundary state
// - Read state from DB
// - Compute diff
// - Commit immediately in separate diff column
// - Schedule state deletion as part of atomic ops
let state = get_full_state(&self.hot_db, &state_root, &self.spec)?
.ok_or(HotColdDBError::MissingEpochBoundaryState(state_root))?;

let storage_strategy = match self.hierarchy_hot.storage_strategy(slot)? {
StorageStrategy::ReplayFrom(from) => {
let from_root = *state
.get_state_root(from_slot)
.map_err(HotColdDBError::HotStateSummaryError)?;
StorageStrategyHot::ReplayFrom(from_root)
}
StorageStrategy::Snapshot => StorageStrategyHot::Snapshot(0),
StorageStrategy::DiffFrom(from) => {
// Compute and store diff, and delete the state

// TOOD: Find a cleaner way to de-duplicate the code to compute diffs on
// migration
// TODO: Ensure that the other of diffs is optimal, i.e. follow topological
// order to not re-compute intermediary values
db.store_hot_state_separate_ops(
&state_root,
state,
&mut vec![],
&mut store_diff_ops,
)?;

let from_root = *state
.get_state_root(from_slot)
.map_err(HotColdDBError::HotStateSummaryError)?;
StorageStrategyHot::DiffFrom(from_root)
}
};

let should_delete_state = match storage_strategy {
StorageStrategyHot::ReplayFrom(_) => true, // State no longer needed, we will replay
StorageStrategyHot::Snapshot(_) => false, // Keep state, need for snapshot
StorageStrategyHot::DiffFrom(_) => true, // State no longer needed, we will diff
};

if should_delete_state {
// Note: Do not delete the BeaconStateSummary as this will be over-written below
// TODO: Should also delete the temporary flag?
let state_key =
get_key_for_col(DBColumn::BeaconState.into(), state_root.as_slice());
other_ops.push(KeyValueStoreOp::DeleteKey(state_key));
}

storage_strategy
} else {
// TODO: Clip to anchor / finalized, if not aligned
StorageStrategyHot::ReplayFrom(summary.epoch_boundary_state_root)
};
for (i, (state_root, summary)) in entries.iter().enumerate() {
// States are only immediately available on the epoch boundary, else we need to replay
if summary.slot % T::EthSpec::slots_per_epoch() == 0 {
let state = get_full_state_v22(&db.hot_db, state_root, &db.spec)?
.ok_or(HotColdDBError::MissingEpochBoundaryState(*state_root))?;
db.store_hot_state_separate_ops(state_root, &state, &mut new_diff_ops, &mut vec![])?;
}

// TODO: What to do with summaries not at an epoch boundary?

// Prune states written in previous format as part of the atomic version bump
let state_is_written = summary.slot % T::EthSpec::slots_per_epoch() == 0;
if state_is_written {
// Note: Do not delete the BeaconStateSummary as this will be over-written below
// TODO: Should also delete the temporary flag?
let state_key = get_key_for_col(DBColumn::BeaconState.into(), state_root.as_slice());
migrate_ops.push(KeyValueStoreOp::DeleteKey(state_key));
}

// Use the `state_summaries_dag` to compute previous roots to prevent having to load
// states in the epoch with block replay.
let diff_base_slot = db.hierarchy_hot.diff_base_slot(summary.slot)?;
let diff_base_state_root = DiffBaseStateRoot::new(
diff_base_slot,
state_summaries_dag.ancestor_at_slot(*state_root, diff_base_slot)?,
);

let new_summary = HotStateSummary {
slot: summary.slot,
latest_block_root: summary.latest_block_root,
storage_strategy,
diff_base_state_root,
previous_state_root: state_summaries_dag
.ancestor_at_slot(*state_root, summary.slot.safe_sub(1)?)?,
};
ops.push(new_summary.as_kv_store_op(key));
migrate_ops.push(new_summary.as_kv_store_op(*state_root));

if last_log_time.elapsed() > Duration::from_secs(5) {
last_log_time = Instant::now();

diffs_written += store_diff_ops.len();
// Write diffs and snapshots frequently to not blow up the memory
if new_diff_ops.len() > PERSISTS_DIFFS_EVERY {
diffs_written += new_diff_ops.len();
// Commit diff ops once in a while to not blow up memory
// Commit diff ops before deleting states to not lose data.
db.hot_db.do_atomically(store_diff_ops)?;
store_diff_ops = vec![];
db.hot_db.do_atomically(new_diff_ops)?;
new_diff_ops = vec![];
}

if last_log_time.elapsed() > Duration::from_secs(5) {
last_log_time = Instant::now();
// TODO: Display the slot distance between head and finalized, and head-tracker count
info!(
log,
Expand All @@ -133,5 +122,20 @@ pub fn upgrade_to_v23<T: BeaconChainTypes>(
// for the next finality event?
info!(log, "Hot state migration complete");

Ok(ops)
Ok(migrate_ops)
}

pub fn downgrade_to_v22<T: BeaconChainTypes>(
_db: Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
_log: Logger,
) -> Result<Vec<KeyValueStoreOp>, Error> {
panic!("downgrade not supported");
}

struct StateSummariesDAG {}

impl StateSummariesDAG {
fn ancestor_at_slot(&self, _state_root: Hash256, _slot: Slot) -> Result<Hash256, Error> {
todo!();
}
}
9 changes: 9 additions & 0 deletions beacon_node/store/src/hdiff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,15 @@ impl HierarchyModuli {
}))
}

/// Returns the slot that storage_strategy points to.
pub fn diff_base_slot(&self, slot: Slot) -> Result<Slot, Error> {
Ok(match self.storage_strategy(slot)? {
StorageStrategy::ReplayFrom(from) => from,
StorageStrategy::DiffFrom(from) => from,
StorageStrategy::Snapshot => slot,
})
}

/// Return the smallest slot greater than or equal to `slot` at which a full snapshot should
/// be stored.
pub fn next_snapshot_slot(&self, slot: Slot) -> Result<Slot, Error> {
Expand Down
Loading

0 comments on commit 48bb349

Please sign in to comment.