Skip to content

Commit

Permalink
Compiles
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed Dec 10, 2024
1 parent 163a27d commit 4f76cef
Showing 1 changed file with 88 additions and 117 deletions.
205 changes: 88 additions & 117 deletions beacon_node/beacon_chain/src/schema_change/migration_schema_v23.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,18 @@
use crate::beacon_chain::BeaconChainTypes;
use safe_arith::SafeArith;
use slog::{info, Logger};
use ssz::Decode;
use ssz_derive::{Decode, Encode};
use std::{
collections::{BTreeMap, HashMap},
sync::Arc,
time::{Duration, Instant},
};
use store::{
get_full_state, get_full_state_v22, get_key_for_col,
hot_cold_store::{DiffBaseStateRoot, HotColdDBError},
get_full_state_v22, get_key_for_col, hdiff::StorageStrategy, hot_cold_store::DiffBaseStateRoot,
DBColumn, Error, HotColdDB, HotStateSummary, KeyValueStore, KeyValueStoreOp, StoreItem,
};
use types::{EthSpec, Hash256, Slot};

const PERSISTS_DIFFS_EVERY: usize = 5;

#[derive(Debug, Clone, Copy, Encode, Decode)]
pub struct HotStateSummaryV22 {
slot: Slot,
Expand All @@ -36,40 +33,7 @@ pub fn upgrade_to_v23<T: BeaconChainTypes>(
// new state, re-using existing code. As states are likely to be sequential the diff cache
// should kick in making the migration more efficient. If we just iterate the column of
// summaries we may get distance state of each iteration.
let state_summaries_v22 = db
.hot_db
.iter_column::<Hash256>(DBColumn::BeaconStateSummary)
.map(|res| {
let (key, value) = res?;
let state_root: Hash256 = key;
let summary = HotStateSummaryV22::from_ssz_bytes(&value)?;
Ok((state_root, summary))
})
.collect::<Result<Vec<_>, Error>>()?;

let mut state_summaries_by_block_root = HashMap::new();
for (state_root, summary) in &state_summaries_v22 {
let summaries = state_summaries_by_block_root
.entry(summary.latest_block_root)
.or_default();

// TODO(hdiff): error if existing
summaries.insert(summary.slot, summary);
}

// Construct block root to parent block root mapping.
let mut parent_block_roots = HashMap::new();
for block_root in state_summaries_by_root.keys() {
let blinded_block = db
.get_blinded_block(block_root)?
.unwrap_or(Error::MissingBlock(block_root))?;
parent_block_roots.insert(block_root, blinded_block.parent_root());
}

let state_summaries_dag = StateSummariesDAG {
state_summaries_by_block_root,
parent_block_roots,
};
let state_summaries_dag = StateSummariesDAG::new::<T>(&db)?;

// Sort summaries by slot so we have their ancestor diffs already stored when we store them.
let summaries_by_slot = state_summaries_dag.summaries_by_slot_ascending();
Expand All @@ -84,23 +48,26 @@ pub fn upgrade_to_v23<T: BeaconChainTypes>(
// - Set all others are `Replay` pointing to `epoch_boundary_state_root`

let mut migrate_ops = vec![];
let mut diffs_written = 0;
let mut summaries_written = 0;
let mut last_log_time = Instant::now();

for (slot, old_hot_state_summaries) in summaries_by_slot {
for (state_root, old_summary) in old_hot_state_summaries {
// 1. Store snapshot or diff at this slot (if required).
// TODO: make sure lowest hot hierarchy config is >= 5 to prevent having to
// reconstruct states.
match db.hierarchy_hot.storage_strategy(slot) {
match db.hierarchy_hot.storage_strategy(slot)? {
StorageStrategy::DiffFrom(_) | StorageStrategy::Snapshot => {
// Load the full state and re-store it as a snapshot or diff.
let state = get_full_state(&db.hot_db, &state_root, db.spec)?
.unwrap_or(Error::MissingState(state_root))?;
let state = get_full_state_v22(&db.hot_db, &state_root, &db.spec)?
.ok_or_else(|| Error::MissingState(state_root))?;

// Store immediately so that future diffs can load and diff from it.
let mut ops = vec![];
db.store_hot_state_diffs(state_root, &state, &mut ops)?;
db.store_hot_state_diffs(&state_root, &state, &mut ops)?;
db.hot_db.do_atomically(ops)?;
diffs_written += 1;
}
StorageStrategy::ReplayFrom(_) => {
// No need to store diffs for states that will be reconstructed by replaying
Expand All @@ -111,14 +78,20 @@ pub fn upgrade_to_v23<T: BeaconChainTypes>(
// 2. Convert the summary to the new format.
let latest_block_root = old_summary.latest_block_root;
let previous_state_root = if state_root == split.state_root {
Hash256::zero()
Hash256::ZERO
} else {
state_summaries_dag.previous_state_root(latest_block_root, slot)?;
state_summaries_dag.previous_state_root(latest_block_root, slot)?
};

let diff_base_slot = db.hierarchy_hot.diff_base_slot(slot)?;
let diff_base_state_root = state_summaries_dag
.ancestor_state_root_at_slot(latest_block_root, diff_base_slot)?;
let diff_base_state_root = DiffBaseStateRoot::new(
diff_base_slot,
state_summaries_dag.ancestor_state_root_at_slot(
latest_block_root,
old_summary.slot,
diff_base_slot,
)?,
);

let new_summary = HotStateSummary {
slot,
Expand All @@ -127,71 +100,26 @@ pub fn upgrade_to_v23<T: BeaconChainTypes>(
diff_base_state_root,
};
// Overwrite the summaries atomically as part of the migration.
migrate_ops.push(new_summary.as_kv_store_op(state_root))?;
migrate_ops.push(new_summary.as_kv_store_op(state_root));

// 3. Stage old data for deletion.
if slot % E::slots_per_epoch() == 0 {
if slot % T::EthSpec::slots_per_epoch() == 0 {
let state_key =
get_key_for_col(DBColumn::BeaconState.into(), state_root.as_slice());
migrate_ops.push(KeyValueStoreOp::DeleteKey(state_key));
}
}
}

for (i, (state_root, summary)) in entries.iter().enumerate() {
// States are only immediately available on the epoch boundary, else we need to replay
if summary.slot % T::EthSpec::slots_per_epoch() == 0 {
let state = get_full_state_v22(&db.hot_db, state_root, &db.spec)?
.ok_or(HotColdDBError::MissingEpochBoundaryState(*state_root))?;
db.store_hot_state_separate_ops(state_root, &state, &mut new_diff_ops, &mut vec![])?;
}

// TODO: What to do with summaries not at an epoch boundary?

// Prune states written in previous format as part of the atomic version bump
let state_is_written = summary.slot % T::EthSpec::slots_per_epoch() == 0;
if state_is_written {
// Note: Do not delete the BeaconStateSummary as this will be over-written below
// TODO: Should also delete the temporary flag?
let state_key = get_key_for_col(DBColumn::BeaconState.into(), state_root.as_slice());
migrate_ops.push(KeyValueStoreOp::DeleteKey(state_key));
}

// Use the `state_summaries_dag` to compute previous roots to prevent having to load
// states in the epoch with block replay.
let diff_base_slot = db.hierarchy_hot.diff_base_slot(summary.slot)?;
let diff_base_state_root = DiffBaseStateRoot::new(
diff_base_slot,
state_summaries_dag.ancestor_at_slot(*state_root, diff_base_slot)?,
);

let new_summary = HotStateSummary {
slot: summary.slot,
latest_block_root: summary.latest_block_root,
diff_base_state_root,
previous_state_root: state_summaries_dag
.ancestor_at_slot(*state_root, summary.slot.safe_sub(1)?)?,
};
migrate_ops.push(new_summary.as_kv_store_op(*state_root));

// Write diffs and snapshots frequently to not blow up the memory
if new_diff_ops.len() > PERSISTS_DIFFS_EVERY {
diffs_written += new_diff_ops.len();
// Commit diff ops once in a while to not blow up memory
// Commit diff ops before deleting states to not lose data.
db.hot_db.do_atomically(new_diff_ops)?;
new_diff_ops = vec![];
}

if last_log_time.elapsed() > Duration::from_secs(5) {
last_log_time = Instant::now();
// TODO: Display the slot distance between head and finalized, and head-tracker count
info!(
log,
"Hot state migration in progress";
"summaries_migrated" => i,
"diff_written" => diffs_written
);
summaries_written += 1;
if last_log_time.elapsed() > Duration::from_secs(5) {
last_log_time = Instant::now();
// TODO: Display the slot distance between head and finalized, and head-tracker count
info!(
log,
"Hot state migration in progress";
"diff_written" => diffs_written,
"summaries_written" => summaries_written,
);
}
}
}

Expand All @@ -209,24 +137,66 @@ pub fn downgrade_to_v22<T: BeaconChainTypes>(
panic!("downgrade not supported");
}

struct HotStateSummaryWithParentBlockRoot {}

struct StateSummariesDAG {
// block_root -> slot -> (state_root, state summary)
// block_root -> state slot -> (state_root, state summary)
state_summaries_by_block_root: HashMap<Hash256, BTreeMap<Slot, (Hash256, HotStateSummaryV22)>>,
// block_root -> parent_block_root
parent_block_roots: HashMap<Hash256, Hash256>,
}

impl StateSummariesDAG {
fn new<T: BeaconChainTypes>(
db: &HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>,
) -> Result<Self, Error> {
// Collect all sumaries for unfinalized states
let state_summaries_v22 = db
.hot_db
.iter_column::<Hash256>(DBColumn::BeaconStateSummary)
.map(|res| {
let (key, value) = res?;
let state_root: Hash256 = key;
let summary = HotStateSummaryV22::from_ssz_bytes(&value)?;
Ok((state_root, summary))
})
.collect::<Result<Vec<_>, Error>>()?;

// Group them by latest block root, and sorted state slot
let mut state_summaries_by_block_root = HashMap::<Hash256, BTreeMap<_, _>>::new();
for (state_root, summary) in state_summaries_v22.into_iter() {
let summaries = state_summaries_by_block_root
.entry(summary.latest_block_root)
.or_default();

// TODO(hdiff): error if existing
summaries.insert(summary.slot, (state_root, summary));
}

// Construct block root to parent block root mapping.
let mut parent_block_roots = HashMap::new();
for block_root in state_summaries_by_block_root.keys() {
let blinded_block = db
.get_blinded_block(block_root)?
.ok_or_else(|| Error::MissingBlock(*block_root))?;
parent_block_roots.insert(*block_root, blinded_block.parent_root());
}

Ok(Self {
state_summaries_by_block_root,
parent_block_roots,
})
}

fn summaries_by_slot_ascending(&self) -> BTreeMap<Slot, Vec<(Hash256, HotStateSummaryV22)>> {
let mut summaries = BTreeMap::new();
let mut summaries = BTreeMap::<Slot, Vec<_>>::new();
for (slot, (state_root, summary)) in self
.state_summaries_by_block_root
.values()
.flat_map(|slot_map| slot_map.iter())
{
summaries.insert(slot, (state_root, summary.clone()));
summaries
.entry(*slot)
.or_default()
.push((*state_root, *summary));
}
summaries
}
Expand All @@ -240,23 +210,23 @@ impl StateSummariesDAG {
let previous_slot = slot - 1;
let parent_block_root =
self.parent_block_roots
.get(latest_block_root)
.get(&latest_block_root)
.ok_or_else(|| {
Error::MigrationError(format!(
"missing parent block root for {latest_block_root:?}"
))
})?;
let parent_block_summaries = self
.state_summaries_by_block_root
.get(&parent_block_root)
.get(parent_block_root)
.ok_or_else(|| {
Error::MigrationError(format!(
"missing state summaries for parent {parent_block_root:?}"
))
})?;
if slot_summaries.contains_key(&previous_slot) {
if parent_block_summaries.contains_key(&previous_slot) {
// Common case: not a skipped slot.
Ok(parent_block_root)
Ok(*parent_block_root)
} else {
// Skipped slot: block root at previous slot is the same as latest block root.
Ok(latest_block_root)
Expand All @@ -281,11 +251,12 @@ impl StateSummariesDAG {
"missing state summaries for block {latest_block_root:?}"
))
})?;
current_block_summaries.get(&slot).ok_or_else(|| {
let (state_root, _state_summary) = current_block_summaries.get(&slot).ok_or_else(|| {
Error::MigrationError(format!(
"missing state summary for slot {previous_slot} and block {latest_block_root:?}",
"missing state summary for slot {slot} and block {latest_block_root:?}",
))
})
})?;
Ok(*state_root)
}

fn ancestor_state_root_at_slot(
Expand Down

0 comments on commit 4f76cef

Please sign in to comment.