Skip to content

Commit

Permalink
Address todos
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed Dec 18, 2024
1 parent bf3d6b6 commit 0a230ec
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 113 deletions.
71 changes: 31 additions & 40 deletions beacon_node/beacon_chain/src/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::sync::{mpsc, Arc};
use std::thread;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use store::hot_cold_store::{migrate_database, HotColdDBError};
use store::iter::StateRootsIterator;
use store::iter::{BlockRootsIterator, StateRootsIterator};
use store::{DBColumn, Error, HotStateSummary, ItemStore, StoreItem, StoreOp};
pub use store::{HotColdDB, MemoryStore};
use types::{
Expand Down Expand Up @@ -514,23 +514,13 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
"new_finalized_epoch" => new_finalized_checkpoint.epoch,
);

let mut hot_db_ops = vec![];
// TODO(hdiff): iterate blocks from finalized block up to old_finalized_slot
let finalized_blocks = vec![];
Self::prune_non_checkpoint_sync_committee_branches(&finalized_blocks, &mut hot_db_ops);
// if store.config.prune_payloads {
Self::prune_finalized_payloads(&finalized_blocks, &mut hot_db_ops);
// }

debug!(
log,
"Extra pruning information";
"old_finalized_root" => format!("{:?}", old_finalized_checkpoint.root),
"new_finalized_root" => format!("{:?}", new_finalized_checkpoint.root),
);

/////

let summaries_dag = {
let state_summaries = store
.hot_db
Expand Down Expand Up @@ -568,26 +558,26 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
summaries_dag.descendant_block_roots_of(&new_finalized_checkpoint.root)?;

// TODO(hdiff): Could re-use the summaries dag
let newly_finalized_chain =
std::iter::once(Ok((new_finalized_slot, new_finalized_state_hash)))
.chain(
StateRootsIterator::new(&store, new_finalized_state)
.map(|res| res.map(|(state_root, slot)| (slot, state_root))),
)
let newly_finalized_state_roots =
std::iter::once(Ok((new_finalized_state_hash, new_finalized_slot)))
.chain(StateRootsIterator::new(&store, new_finalized_state))
.take_while(|res| {
res.as_ref()
.map_or(true, |(slot, _)| *slot >= old_finalized_slot)
.map_or(true, |(_, slot)| *slot >= old_finalized_slot)
})
.map(|res| res.map(|(_, state_root)| state_root))
.collect::<Result<Vec<Hash256>, _>>()?;
.map(|res| res.map(|(state_root, _)| state_root))
.collect::<Result<HashSet<Hash256>, _>>()?;

// TODO(hdiff): Could re-use the summaries dag
let newly_finalized_blocks = BlockRootsIterator::new(&store, new_finalized_state)
.take_while(|res| {
res.as_ref()
.map_or(true, |(_, slot)| *slot >= old_finalized_slot)
})
.collect::<Result<Vec<(Hash256, Slot)>, _>>()?;

// Compute the set of finalized state roots that we must keep to make the dynamic HDiff system
// work.
// TODO(hdiff): must not delete *all* finalized hdiffs. Instead, keep
// the more recent diff of each layer including the snapshot.
// Implement a routine somewhere to figure out which diffs should be kept
// Given a start slot, and the current finalized state:
// - iterate each hdiff layer and compute the most recent point <= finalized slot
let required_finalized_diff_state_slots =
store.hierarchy_hot.closest_layer_points(new_finalized_slot);

Expand All @@ -601,23 +591,21 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
if block_roots_descendant_of_finalized.contains(&summary.latest_block_root) {
// Keep this state is the post state of a viable head, or a state advance from a
// viable head.
} else if newly_finalized_chain.contains(&state_root)
} else if newly_finalized_state_roots.contains(&state_root)
&& required_finalized_diff_state_slots.contains(&slot)
{
// Keep this state and diff as it's necessary for the finalized portion of the
// HDiff links
// HDiff links. `required_finalized_diff_state_slots` tracks the set of slots on
// each diff layer, and by checking `newly_finalized_state_roots` which only
// keep those on the finalized canonical chain. Note that there may be lingering
// forks.
} else {
// TODO(hdiff) compute the root of the hdiffs to keep, else prune
abandoned_blocks.insert(SignedBeaconBlockHash::from(summary.latest_block_root));
abandoned_states.insert((slot, BeaconStateHash::from(state_root)));
}
}
}

// TODO(hdiff): Update the headtracker or remove it

/////

let mut batch: Vec<StoreOp<E>> = abandoned_blocks
.into_iter()
.map(Into::into)
Expand Down Expand Up @@ -650,11 +638,14 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
store.pruning_checkpoint_store_op(new_finalized_checkpoint),
));

store.do_atomically_with_block_and_blobs_cache(batch)?;
// TODO(hdiff): this is probably broken
Self::prune_non_checkpoint_sync_committee_branches(&newly_finalized_blocks, &mut batch);
// TODO(hdiff): access store config here
// if store.config.prune_payloads {
Self::prune_finalized_payloads(&newly_finalized_blocks, &mut batch);
// }

// Do a quick separate pass to delete obsoleted hot states, usually pre-states from the state
// advance which are not canonical due to blocks being applied on top.
store.prune_old_hot_states()?;
store.do_atomically_with_block_and_blobs_cache(batch)?;

debug!(log, "Database pruning complete");

Expand All @@ -664,10 +655,10 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
}

fn prune_finalized_payloads(
finalized_blocks: &[(Slot, Hash256)],
finalized_blocks: &[(Hash256, Slot)],
hot_db_ops: &mut Vec<StoreOp<E>>,
) {
for (_, block_root) in finalized_blocks {
for (block_root, _) in finalized_blocks {
// Delete the execution payload if payload pruning is enabled. At a skipped slot we may
// delete the payload for the finalized block itself, but that's OK as we only guarantee
// that payloads are present for slots >= the split slot. The payload fetching code is also
Expand All @@ -677,14 +668,14 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
}

fn prune_non_checkpoint_sync_committee_branches(
finalized_blocks: &[(Slot, Hash256)],
finalized_blocks: &[(Hash256, Slot)],
hot_db_ops: &mut Vec<StoreOp<E>>,
) {
let mut epoch_boundary_blocks = HashSet::new();
let mut non_checkpoint_block_roots = HashSet::new();

// Then, iterate states in slot ascending order, as they are stored wrt previous states.
for (slot, block_root) in finalized_blocks {
for (block_root, slot) in finalized_blocks {
// At a missed slot, `state_root_iter` will return the block root
// from the previous non-missed slot. This ensures that the block root at an
// epoch boundary is always a checkpoint block root. We keep track of block roots
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,15 @@ pub fn upgrade_to_v23<T: BeaconChainTypes>(

let split = db.get_split_info();

// TODO(hdiff): sort summaries topologically starting from finalized
let state_summaries_dag = new_dag::<T>(&db)?;

// Sort summaries by slot so we have their ancestor diffs already stored when we store them.
// If the summaries are sorted topologically we can insert them into the DB like if they were a
// new state, re-using existing code. As states are likely to be sequential the diff cache
// should kick in making the migration more efficient. If we just iterate the column of
// summaries we may get distance state of each iteration.
let state_summaries_dag = new_dag::<T>(&db)?;

// Sort summaries by slot so we have their ancestor diffs already stored when we store them.
let summaries_by_slot = state_summaries_dag.summaries_by_slot_ascending();

// TODO(hdiff): Should prune states associated with summaries that are not descendant of finalized?

// TODO(hdiff): Must create the finalized diff points in the hot DB if the node checkpoint synced long
// ago. Also consider availability of the finalized state from the hot DB.

// Upgrade all hot DB state summaries to the new type:
// - Set all summaries of boundary states as `Snapshot` type
// - Set all others are `Replay` pointing to `epoch_boundary_state_root`
Expand Down
94 changes: 30 additions & 64 deletions beacon_node/store/src/hot_cold_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1210,18 +1210,36 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
);
key_value_batch.push(KeyValueStoreOp::DeleteKey(state_temp_key));

// Always delete diffs with summary. If a diff must be kept beyond finalization,
// do not issue a DeleteState op at that time.
key_value_batch.push(KeyValueStoreOp::DeleteKey(get_key_for_col(
DBColumn::BeaconStateHotDiff.into(),
state_root.as_slice(),
)));

// TODO(hdiff): Review under HDiff
if slot.map_or(true, |slot| slot % E::slots_per_epoch() == 0) {
let state_key =
get_key_for_col(DBColumn::BeaconState.into(), state_root.as_slice());
key_value_batch.push(KeyValueStoreOp::DeleteKey(state_key));
if let Some(slot) = slot {
match self.hierarchy_hot.storage_strategy(slot)? {
StorageStrategy::Snapshot => {
// Full state stored in this position
key_value_batch.push(KeyValueStoreOp::DeleteKey(get_key_for_col(
DBColumn::BeaconState.into(),
state_root.as_slice(),
)));
}
StorageStrategy::DiffFrom(_) => {
// Diff stored in this position
key_value_batch.push(KeyValueStoreOp::DeleteKey(get_key_for_col(
DBColumn::BeaconStateHotDiff.into(),
state_root.as_slice(),
)));
}
StorageStrategy::ReplayFrom(_) => {
// Nothing else to delete
}
}
} else {
// TODO(hdiff): should attempt to delete everything if slot is not available?
key_value_batch.push(KeyValueStoreOp::DeleteKey(get_key_for_col(
DBColumn::BeaconState.into(),
state_root.as_slice(),
)));
key_value_batch.push(KeyValueStoreOp::DeleteKey(get_key_for_col(
DBColumn::BeaconStateHotDiff.into(),
state_root.as_slice(),
)));
}
}

Expand Down Expand Up @@ -3031,58 +3049,6 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>

Ok(())
}

/// Prune states from the hot database which are prior to the split.
///
/// This routine is important for cleaning up advanced states which are stored in the database
/// with a temporary flag.
pub fn prune_old_hot_states(&self) -> Result<(), Error> {
let split = self.get_split_info();
debug!(
self.log,
"Database state pruning started";
"split_slot" => split.slot,
);
let mut state_delete_batch = vec![];
for res in self
.hot_db
.iter_column::<Hash256>(DBColumn::BeaconStateSummary)
{
let (state_root, summary_bytes) = res?;
let summary = HotStateSummary::from_ssz_bytes(&summary_bytes)?;

if summary.slot <= split.slot {
let old = summary.slot < split.slot;
let non_canonical = summary.slot == split.slot
&& state_root != split.state_root
&& !split.state_root.is_zero();
if old || non_canonical {
let reason = if old {
"old dangling state"
} else {
"non-canonical"
};
debug!(
self.log,
"Deleting state";
"state_root" => ?state_root,
"slot" => summary.slot,
"reason" => reason,
);
// Delete the diff as descendants of this state will never be used.
state_delete_batch.push(StoreOp::DeleteState(state_root, Some(summary.slot)));
}
}
}
let num_deleted_states = state_delete_batch.len();
self.do_atomically_with_block_and_blobs_cache(state_delete_batch)?;
debug!(
self.log,
"Database state pruning complete";
"num_deleted_states" => num_deleted_states,
);
Ok(())
}
}

/// Advance the split point of the store, moving new finalized states to the freezer.
Expand Down

0 comments on commit 0a230ec

Please sign in to comment.