Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDiff in the hot DB #39

Open
wants to merge 8 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions account_manager/src/validator/slashing_protection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pub fn cli_run<E: EthSpec>(
let slashing_protection_database =
SlashingDatabase::open_or_create(&slashing_protection_db_path).map_err(|e| {
format!(
"Unable to open database at {}: {:?}",
"Unable to open slashing protection database at {}: {:?}",
slashing_protection_db_path.display(),
e
)
Expand Down Expand Up @@ -198,7 +198,7 @@ pub fn cli_run<E: EthSpec>(
let slashing_protection_database = SlashingDatabase::open(&slashing_protection_db_path)
.map_err(|e| {
format!(
"Unable to open database at {}: {:?}",
"Unable to open slashing protection database at {}: {:?}",
slashing_protection_db_path.display(),
e
)
Expand Down
52 changes: 17 additions & 35 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use crate::events::ServerSentEventHandler;
use crate::execution_payload::{get_execution_payload, NotifyExecutionLayer, PreparePayloadHandle};
use crate::fork_choice_signal::{ForkChoiceSignalRx, ForkChoiceSignalTx, ForkChoiceWaitResult};
use crate::graffiti_calculator::GraffitiCalculator;
use crate::head_tracker::{HeadTracker, HeadTrackerReader, SszHeadTracker};
use crate::light_client_finality_update_verification::{
Error as LightClientFinalityUpdateError, VerifiedLightClientFinalityUpdate,
};
Expand Down Expand Up @@ -456,8 +455,6 @@ pub struct BeaconChain<T: BeaconChainTypes> {
/// A handler for events generated by the beacon chain. This is only initialized when the
/// HTTP server is enabled.
pub event_handler: Option<ServerSentEventHandler<T::EthSpec>>,
/// Used to track the heads of the beacon chain.
pub(crate) head_tracker: Arc<HeadTracker>,
/// Caches the attester shuffling for a given epoch and shuffling key root.
pub shuffling_cache: RwLock<ShufflingCache>,
/// A cache of eth1 deposit data at epoch boundaries for deposit finalization
Expand Down Expand Up @@ -620,48 +617,30 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

let _head_timer = metrics::start_timer(&metrics::PERSIST_HEAD);

// Hold a lock to head_tracker until it has been persisted to disk. Otherwise there's a race
// condition with the pruning thread which can result in a block present in the head tracker
// but absent in the DB. This inconsistency halts pruning and dramastically increases disk
// size. Ref: https://github.com/sigp/lighthouse/issues/4773
let head_tracker = self.head_tracker.0.read();
batch.push(self.persist_head_in_batch(&head_tracker));

let _fork_choice_timer = metrics::start_timer(&metrics::PERSIST_FORK_CHOICE);
batch.push(self.persist_fork_choice_in_batch());

self.store.hot_db.do_atomically(batch)?;
drop(head_tracker);

Ok(())
}

/// Return a `PersistedBeaconChain` without reference to a `BeaconChain`.
pub fn make_persisted_head(
genesis_block_root: Hash256,
head_tracker_reader: &HeadTrackerReader,
) -> PersistedBeaconChain {
pub fn make_persisted_head(genesis_block_root: Hash256) -> PersistedBeaconChain {
PersistedBeaconChain {
_canonical_head_block_root: DUMMY_CANONICAL_HEAD_BLOCK_ROOT,
genesis_block_root,
ssz_head_tracker: SszHeadTracker::from_map(head_tracker_reader),
ssz_head_tracker: <_>::default(),
}
}

/// Return a database operation for writing the beacon chain head to disk.
pub fn persist_head_in_batch(
&self,
head_tracker_reader: &HeadTrackerReader,
) -> KeyValueStoreOp {
Self::persist_head_in_batch_standalone(self.genesis_block_root, head_tracker_reader)
pub fn persist_head_in_batch(&self) -> KeyValueStoreOp {
Self::persist_head_in_batch_standalone(self.genesis_block_root)
}

pub fn persist_head_in_batch_standalone(
genesis_block_root: Hash256,
head_tracker_reader: &HeadTrackerReader,
) -> KeyValueStoreOp {
Self::make_persisted_head(genesis_block_root, head_tracker_reader)
.as_kv_store_op(BEACON_CHAIN_DB_KEY)
pub fn persist_head_in_batch_standalone(genesis_block_root: Hash256) -> KeyValueStoreOp {
Self::make_persisted_head(genesis_block_root).as_kv_store_op(BEACON_CHAIN_DB_KEY)
}

/// Load fork choice from disk, returning `None` if it isn't found.
Expand Down Expand Up @@ -1405,12 +1384,21 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
///
/// Returns `(block_root, block_slot)`.
pub fn heads(&self) -> Vec<(Hash256, Slot)> {
self.head_tracker.heads()
let head_slot = self.canonical_head.cached_head().head_slot();
self.canonical_head
.fork_choice_read_lock()
.proto_array()
.viable_heads::<T::EthSpec>(head_slot)
.iter()
.map(|node| (node.root, node.slot))
.collect()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could use to implement the DB downgrade (recreate the headtracker after it has been deleted)

}

/// Only used in tests.
pub fn knows_head(&self, block_hash: &SignedBeaconBlockHash) -> bool {
self.head_tracker.contains_head((*block_hash).into())
self.heads()
.iter()
.any(|head| head.0 == Into::<Hash256>::into(*block_hash))
}

/// Returns the `BeaconState` at the given slot.
Expand Down Expand Up @@ -3989,9 +3977,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// about it.
let block_time_imported = timestamp_now();

let parent_root = block.parent_root();
let slot = block.slot();

let current_eth1_finalization_data = Eth1FinalizationData {
eth1_data: state.eth1_data().clone(),
eth1_deposit_index: state.eth1_deposit_index(),
Expand All @@ -4012,9 +3997,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
});
}

self.head_tracker
.register_block(block_root, parent_root, slot);

metrics::stop_timer(db_write_timer);

metrics::inc_counter(&metrics::BLOCK_PROCESSING_SUCCESSES);
Expand Down
85 changes: 41 additions & 44 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ use std::fmt::Debug;
use std::fs;
use std::io::Write;
use std::sync::Arc;
use store::{Error as DBError, HotStateSummary, KeyValueStore, StoreOp};
use store::{Error as DBError, KeyValueStore, StoreOp};
use strum::AsRefStr;
use task_executor::JoinHandle;
use types::{
Expand Down Expand Up @@ -1455,52 +1455,49 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {

let distance = block.slot().as_u64().saturating_sub(state.slot().as_u64());
for _ in 0..distance {
let state_root = if parent.beacon_block.slot() == state.slot() {
// If it happens that `pre_state` has *not* already been advanced forward a single
// slot, then there is no need to compute the state root for this
// `per_slot_processing` call since that state root is already stored in the parent
// block.
parent.beacon_block.state_root()
} else {
// This is a new state we've reached, so stage it for storage in the DB.
// Computing the state root here is time-equivalent to computing it during slot
// processing, but we get early access to it.
let state_root = state.update_tree_hash_cache()?;

// Store the state immediately, marking it as temporary, and staging the deletion
// of its temporary status as part of the larger atomic operation.
let txn_lock = chain.store.hot_db.begin_rw_transaction();
let state_already_exists =
chain.store.load_hot_state_summary(&state_root)?.is_some();

let state_batch = if state_already_exists {
// If the state exists, it could be temporary or permanent, but in neither case
// should we rewrite it or store a new temporary flag for it. We *will* stage
// the temporary flag for deletion because it's OK to double-delete the flag,
// and we don't mind if another thread gets there first.
vec![]
let state_root =
if parent.beacon_block.slot() == state.slot() {
// If it happens that `pre_state` has *not* already been advanced forward a single
// slot, then there is no need to compute the state root for this
// `per_slot_processing` call since that state root is already stored in the parent
// block.
parent.beacon_block.state_root()
} else {
vec![
if state.slot() % T::EthSpec::slots_per_epoch() == 0 {
StoreOp::PutState(state_root, &state)
} else {
StoreOp::PutStateSummary(
state_root,
HotStateSummary::new(&state_root, &state)?,
)
},
StoreOp::PutStateTemporaryFlag(state_root),
]
};
chain
.store
.do_atomically_with_block_and_blobs_cache(state_batch)?;
drop(txn_lock);
// This is a new state we've reached, so stage it for storage in the DB.
// Computing the state root here is time-equivalent to computing it during slot
// processing, but we get early access to it.
let state_root = state.update_tree_hash_cache()?;

// Store the state immediately, marking it as temporary, and staging the deletion
// of its temporary status as part of the larger atomic operation.
// TODO(hdiff): Is it necessary to do this read tx now? Also why is it necessary to
// check that the summary exists at all? Are double writes common? Can this txn
// lock deadlock with the `do_atomically` call?
let txn_lock = chain.store.hot_db.begin_rw_transaction();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason for this lock is a race with the write in BeaconChain::import_block:

If we were to write a non-temporary state in import_block in between setting state_already_exists (false) and the write of the temporary state in this function, we can corrupt the DB:

  • There is a state that is not temporary (required by some fully-imported block),
  • But it is marked temporary due to the race condition here
  • Temporary states risk being deleted by pruning -> invalid DB due to deletion of canonical state.

The lock prevents this case by preventing the interleaving of the read in this function with the write in the import function. However this is a bad abstraction forced upon us by LevelDB which lacks proper ACID transactions. If we move away from LevelDB eventually we can maybe have proper transactions, see:

let state_already_exists =
chain.store.load_hot_state_summary(&state_root)?.is_some();

if state_already_exists {
// If the state exists, it could be temporary or permanent, but in neither case
// should we rewrite it or store a new temporary flag for it. We *will* stage
// the temporary flag for deletion because it's OK to double-delete the flag,
// and we don't mind if another thread gets there first.
} else {
let mut ops = vec![];
// Recycle store codepath to create a state summary and store the state / diff
chain.store.store_hot_state(&state_root, &state, &mut ops)?;
// Additionally write a temporary flag as part of the atomic write
ops.extend(chain.store.convert_to_kv_batch(vec![
StoreOp::PutStateTemporaryFlag(state_root),
])?);
chain.store.hot_db.do_atomically(ops)?;
}
drop(txn_lock);

confirmed_state_roots.push(state_root);
confirmed_state_roots.push(state_root);

state_root
};
state_root
};

if let Some(summary) = per_slot_processing(&mut state, Some(state_root), &chain.spec)? {
// Expose Prometheus metrics.
Expand Down
Loading
Loading