Skip to content

Commit

Permalink
More migration progress
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelsproul committed Dec 5, 2024
1 parent 6eef8e8 commit 163a27d
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 14 deletions.
139 changes: 130 additions & 9 deletions beacon_node/beacon_chain/src/schema_change/migration_schema_v23.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
time::{Duration, Instant},
};
use store::{
get_full_state_v22, get_key_for_col,
get_full_state, get_full_state_v22, get_key_for_col,
hot_cold_store::{DiffBaseStateRoot, HotColdDBError},
DBColumn, Error, HotColdDB, HotStateSummary, KeyValueStore, KeyValueStoreOp, StoreItem,
};
Expand All @@ -29,6 +29,8 @@ pub fn upgrade_to_v23<T: BeaconChainTypes>(
) -> Result<Vec<KeyValueStoreOp>, Error> {
info!(log, "Upgrading from v22 to v23");

let split = db.get_split_info();

// TODO: sort summaries topologically starting from finalized
// If the summaries are sorted topologically we can insert them into the DB like if they were a
// new state, re-using existing code. As states are likely to be sequential the diff cache
Expand Down Expand Up @@ -82,10 +84,60 @@ pub fn upgrade_to_v23<T: BeaconChainTypes>(
// - Set all others are `Replay` pointing to `epoch_boundary_state_root`

let mut migrate_ops = vec![];
let mut new_diff_ops = vec![];
let mut diffs_written = 0;
let mut last_log_time = Instant::now();

for (slot, old_hot_state_summaries) in summaries_by_slot {
for (state_root, old_summary) in old_hot_state_summaries {
// 1. Store snapshot or diff at this slot (if required).
// TODO: make sure lowest hot hierarchy config is >= 5 to prevent having to
// reconstruct states.
match db.hierarchy_hot.storage_strategy(slot) {
StorageStrategy::DiffFrom(_) | StorageStrategy::Snapshot => {
// Load the full state and re-store it as a snapshot or diff.
let state = get_full_state(&db.hot_db, &state_root, db.spec)?
.unwrap_or(Error::MissingState(state_root))?;

// Store immediately so that future diffs can load and diff from it.
let mut ops = vec![];
db.store_hot_state_diffs(state_root, &state, &mut ops)?;
db.hot_db.do_atomically(ops)?;
}
StorageStrategy::ReplayFrom(_) => {
// No need to store diffs for states that will be reconstructed by replaying
// blocks.
}
}

// 2. Convert the summary to the new format.
let latest_block_root = old_summary.latest_block_root;
let previous_state_root = if state_root == split.state_root {
Hash256::zero()
} else {
state_summaries_dag.previous_state_root(latest_block_root, slot)?;
};

let diff_base_slot = db.hierarchy_hot.diff_base_slot(slot)?;
let diff_base_state_root = state_summaries_dag
.ancestor_state_root_at_slot(latest_block_root, diff_base_slot)?;

let new_summary = HotStateSummary {
slot,
latest_block_root,
previous_state_root,
diff_base_state_root,
};
// Overwrite the summaries atomically as part of the migration.
migrate_ops.push(new_summary.as_kv_store_op(state_root))?;

// 3. Stage old data for deletion.
if slot % E::slots_per_epoch() == 0 {
let state_key =
get_key_for_col(DBColumn::BeaconState.into(), state_root.as_slice());
migrate_ops.push(KeyValueStoreOp::DeleteKey(state_key));
}
}
}

for (i, (state_root, summary)) in entries.iter().enumerate() {
// States are only immediately available on the epoch boundary, else we need to replay
if summary.slot % T::EthSpec::slots_per_epoch() == 0 {
Expand Down Expand Up @@ -167,19 +219,88 @@ struct StateSummariesDAG {
}

impl StateSummariesDAG {
fn summaries_by_slot_ascending(&self) -> BTreeMap<Slot, Vec<HotStateSummaryV22>> {
fn summaries_by_slot_ascending(&self) -> BTreeMap<Slot, Vec<(Hash256, HotStateSummaryV22)>> {
let mut summaries = BTreeMap::new();
for (slot, summary) in self
for (slot, (state_root, summary)) in self
.state_summaries_by_block_root
.values()
.flat_map(|slot_map| slot_map.values())
.flat_map(|slot_map| slot_map.iter())
{
summaries.insert(slot, summary.clone());
summaries.insert(slot, (state_root, summary.clone()));
}
summaries
}

fn ancestor_at_slot(&self, _state_root: Hash256, _slot: Slot) -> Result<Hash256, Error> {
todo!();
/// Compute the block root at `slot - 1` on the chain ending at `latest_block_root`.
fn previous_block_root(
&self,
latest_block_root: Hash256,
slot: Slot,
) -> Result<Hash256, Error> {
let previous_slot = slot - 1;
let parent_block_root =
self.parent_block_roots
.get(latest_block_root)
.ok_or_else(|| {
Error::MigrationError(format!(
"missing parent block root for {latest_block_root:?}"
))
})?;
let parent_block_summaries = self
.state_summaries_by_block_root
.get(&parent_block_root)
.ok_or_else(|| {
Error::MigrationError(format!(
"missing state summaries for parent {parent_block_root:?}"
))
})?;
if slot_summaries.contains_key(&previous_slot) {
// Common case: not a skipped slot.
Ok(parent_block_root)
} else {
// Skipped slot: block root at previous slot is the same as latest block root.
Ok(latest_block_root)
}
}

fn previous_state_root(
&self,
latest_block_root: Hash256,
slot: Slot,
) -> Result<Hash256, Error> {
let previous_block_root = self.previous_block_root(latest_block_root, slot)?;
self.state_root_at_slot(previous_block_root, slot - 1)
}

fn state_root_at_slot(&self, latest_block_root: Hash256, slot: Slot) -> Result<Hash256, Error> {
let current_block_summaries = self
.state_summaries_by_block_root
.get(&latest_block_root)
.ok_or_else(|| {
Error::MigrationError(format!(
"missing state summaries for block {latest_block_root:?}"
))
})?;
current_block_summaries.get(&slot).ok_or_else(|| {
Error::MigrationError(format!(
"missing state summary for slot {previous_slot} and block {latest_block_root:?}",
))
})
}

fn ancestor_state_root_at_slot(
&self,
latest_block_root: Hash256,
state_slot: Slot,
ancestor_slot: Slot,
) -> Result<Hash256, Error> {
// Walk backwards until we reach the state at `ancestor_slot`.
let mut current_block_root = latest_block_root;
let mut current_slot = state_slot;
while current_slot > ancestor_slot {
current_block_root = self.previous_block_root(current_block_root, current_slot)?;
current_slot -= 1;
}
self.state_root_at_slot(current_block_root, ancestor_slot)
}
}
1 change: 1 addition & 0 deletions beacon_node/store/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ pub enum Error {
expected_slot: Slot,
stored_slot: Slot,
},
MigrationError(String),
}

pub trait HandleUnavailable<T> {
Expand Down
8 changes: 4 additions & 4 deletions beacon_node/store/src/hot_cold_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1436,9 +1436,9 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
return Ok(());
}

let mut store_diff_ops = vec![];
self.store_hot_state_separate_ops(state_root, state, ops, &mut store_diff_ops)?;
ops.extend(store_diff_ops);
self.store_hot_state_summary(state_root, state, ops)?;
self.store_hot_state_diffs(state_root, state, ops)?;

Ok(())
}

Expand All @@ -1453,7 +1453,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
// We store one even for the epoch boundary states, as we may need their slots
// when doing a look up by state root.
let hot_state_summary = HotStateSummary::new(state_root, state, &self.hierarchy_hot)?;
store_summary_ops.push(hot_state_summary.as_kv_store_op(*state_root));
ops.push(hot_state_summary.as_kv_store_op(*state_root));
Ok(())
}

Expand Down
1 change: 0 additions & 1 deletion beacon_node/store/src/impls/beacon_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ pub fn store_full_state<E: EthSpec>(
}

// FIXME(tree-states): delete/move to migration
#[allow(dead_code)]
pub fn get_full_state<KV: KeyValueStore<E>, E: EthSpec>(
db: &KV,
state_root: &Hash256,
Expand Down

0 comments on commit 163a27d

Please sign in to comment.