Skip to content

Commit

Permalink
properly publish snapshot info
Browse files Browse the repository at this point in the history
  • Loading branch information
saketh-are committed Nov 3, 2023
1 parent 07dd313 commit 802f545
Show file tree
Hide file tree
Showing 11 changed files with 77 additions and 22 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions chain/chain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ strum.workspace = true
thiserror.workspace = true
tracing.workspace = true

near-async.workspace = true
near-cache.workspace = true
near-chain-configs.workspace = true
near-chain-primitives.workspace = true
near-client-primitives.workspace = true
near-crypto.workspace = true
near-epoch-manager.workspace = true
near-network.workspace = true
near-o11y.workspace = true
near-performance-metrics.workspace = true
near-performance-metrics-macros.workspace = true
Expand Down
4 changes: 3 additions & 1 deletion chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4275,13 +4275,15 @@ impl Chain {
if let Some(snapshot_callbacks) = &self.snapshot_callbacks {
if make_snapshot {
let head = self.head()?;
let epoch_height =
self.epoch_manager.get_epoch_height_from_prev_block(&head.prev_block_hash)?;
let shard_uids = self
.epoch_manager
.get_shard_layout_from_prev_block(&head.prev_block_hash)?
.get_shard_uids();
let last_block = self.get_block(&head.last_block_hash)?;
let make_snapshot_callback = &snapshot_callbacks.make_snapshot_callback;
make_snapshot_callback(head.prev_block_hash, shard_uids, last_block);
make_snapshot_callback(head.prev_block_hash, epoch_height, shard_uids, last_block);
} else if delete_snapshot {
let delete_snapshot_callback = &snapshot_callbacks.delete_snapshot_callback;
delete_snapshot_callback();
Expand Down
38 changes: 31 additions & 7 deletions chain/chain/src/state_snapshot_actor.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use actix::{AsyncContext, Context};
use near_async::messaging::CanSend;
use near_network::types::{NetworkRequests, PeerManagerAdapter, PeerManagerMessageRequest};
use near_o11y::{handler_debug_span, OpenTelemetrySpanExt, WithSpanContext, WithSpanContextExt};
use near_performance_metrics_macros::perf;
use near_primitives::block::Block;
use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::ShardUId;
use near_primitives::types::{EpochHeight, ShardId};
use near_store::flat::FlatStorageManager;
use near_store::ShardTries;
use std::sync::Arc;
Expand All @@ -15,12 +18,17 @@ use std::sync::Arc;
/// 3. CompactSnapshotRequest: compacts a snapshot store.
pub struct StateSnapshotActor {
flat_storage_manager: FlatStorageManager,
network_adapter: PeerManagerAdapter,
tries: ShardTries,
}

impl StateSnapshotActor {
pub fn new(flat_storage_manager: FlatStorageManager, tries: ShardTries) -> Self {
Self { flat_storage_manager, tries }
pub fn new(
flat_storage_manager: FlatStorageManager,
network_adapter: PeerManagerAdapter,
tries: ShardTries,
) -> Self {
Self { flat_storage_manager, network_adapter, tries }
}
}

Expand All @@ -40,6 +48,8 @@ struct DeleteAndMaybeCreateSnapshotRequest {
struct CreateSnapshotRequest {
/// prev_hash of the last processed block.
prev_block_hash: CryptoHash,
/// epoch height associated with prev_block_hash
epoch_height: EpochHeight,
/// Shards that need to be present in the snapshot.
shard_uids: Vec<ShardUId>,
/// Last block of the prev epoch.
Expand Down Expand Up @@ -81,7 +91,7 @@ impl actix::Handler<WithSpanContext<CreateSnapshotRequest>> for StateSnapshotAct
let (_span, msg) = handler_debug_span!(target: "state_snapshot", msg);
tracing::debug!(target: "state_snapshot", ?msg);

let CreateSnapshotRequest { prev_block_hash, shard_uids, block } = msg;
let CreateSnapshotRequest { prev_block_hash, epoch_height, shard_uids, block } = msg;
let res = self.tries.create_state_snapshot(prev_block_hash, &shard_uids, &block);

// Unlocking flat state head can be done asynchronously in state_snapshot_actor.
Expand All @@ -90,7 +100,20 @@ impl actix::Handler<WithSpanContext<CreateSnapshotRequest>> for StateSnapshotAct
tracing::error!(target: "state_snapshot", ?prev_block_hash, ?shard_uids, "Failed to unlock flat state updates");
}
match res {
Ok(_) => {
Ok(res_shard_uids) => {
if let Some(res_shard_uids) = res_shard_uids {
self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::SnapshotHostInfo {
sync_hash: prev_block_hash,
epoch_height,
shards: res_shard_uids
.iter()
.map(|uid| uid.shard_id as ShardId)
.collect(),
},
));
}

if self.tries.state_snapshot_config().compaction_enabled {
context.address().do_send(CompactSnapshotRequest {}.with_span_context());
} else {
Expand Down Expand Up @@ -122,7 +145,7 @@ impl actix::Handler<WithSpanContext<CompactSnapshotRequest>> for StateSnapshotAc
}

type MakeSnapshotCallback =
Arc<dyn Fn(CryptoHash, Vec<ShardUId>, Block) -> () + Send + Sync + 'static>;
Arc<dyn Fn(CryptoHash, EpochHeight, Vec<ShardUId>, Block) -> () + Send + Sync + 'static>;

type DeleteSnapshotCallback = Arc<dyn Fn() -> () + Send + Sync + 'static>;

Expand All @@ -136,7 +159,7 @@ pub fn get_make_snapshot_callback(
state_snapshot_addr: Arc<actix::Addr<StateSnapshotActor>>,
flat_storage_manager: FlatStorageManager,
) -> MakeSnapshotCallback {
Arc::new(move |prev_block_hash, shard_uids, block| {
Arc::new(move |prev_block_hash, epoch_height, shard_uids, block| {
tracing::info!(
target: "state_snapshot",
?prev_block_hash,
Expand All @@ -148,7 +171,8 @@ pub fn get_make_snapshot_callback(
tracing::error!(target: "state_snapshot", ?prev_block_hash, ?shard_uids, "Failed to lock flat state updates");
return;
}
let create_snapshot_request = CreateSnapshotRequest { prev_block_hash, shard_uids, block };
let create_snapshot_request =
CreateSnapshotRequest { prev_block_hash, epoch_height, shard_uids, block };
state_snapshot_addr.do_send(
DeleteAndMaybeCreateSnapshotRequest {
create_snapshot_request: Some(create_snapshot_request),
Expand Down
2 changes: 1 addition & 1 deletion chain/client/src/test_utils/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -826,7 +826,7 @@ pub fn setup_mock_all_validators(
NetworkRequests::ForwardTx(_, _)
| NetworkRequests::BanPeer { .. }
| NetworkRequests::TxStatus(_, _, _)
| NetworkRequests::SnapshotHostInfo(_)
| NetworkRequests::SnapshotHostInfo { .. }
| NetworkRequests::Challenge(_) => {}
};
}
Expand Down
2 changes: 1 addition & 1 deletion chain/client/src/test_utils/test_env_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ impl TestEnvBuilder {
None => TEST_SEED,
};
let tries = runtime.get_tries();
let make_snapshot_callback = Arc::new(move |prev_block_hash, shard_uids: Vec<ShardUId>, block| {
let make_snapshot_callback = Arc::new(move |prev_block_hash, _epoch_height, shard_uids: Vec<ShardUId>, block| {
tracing::info!(target: "state_snapshot", ?prev_block_hash, "make_snapshot_callback");
tries.delete_state_snapshot();
tries.create_state_snapshot(prev_block_hash, &shard_uids, &block).unwrap();
Expand Down
18 changes: 14 additions & 4 deletions chain/network/src/peer_manager/peer_manager_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ impl PeerManagerActor {

let mut interval = time::Interval::new(clock.now(), time::Duration::seconds(30));

let mut epoch_height: EpochHeight = 9876543210;
let mut epoch_height: EpochHeight = 321;

async move {
loop {
Expand All @@ -352,7 +352,7 @@ impl PeerManagerActor {
peer_id.clone(),
hash,
epoch_height,
vec![23, epoch_height % 10],
vec![0, epoch_height % 10 + 1],
&state.config.node_key,
);

Expand Down Expand Up @@ -816,9 +816,19 @@ impl PeerManagerActor {
NetworkResponses::RouteNotFound
}
}
NetworkRequests::SnapshotHostInfo(snapshot_host_info) => {
NetworkRequests::SnapshotHostInfo { sync_hash, epoch_height, shards } => {
// Sign the information about the locally created snapshot using the keys in the
// network config before broadcasting it
let snapshot_host_info = SnapshotHostInfo::new(
self.state.config.node_id(),
sync_hash,
epoch_height,
shards,
&self.state.config.node_key,
);

self.state.tier2.broadcast_message(Arc::new(PeerMessage::SyncSnapshotHosts(
SyncSnapshotHosts { hosts: vec![snapshot_host_info] },
SyncSnapshotHosts { hosts: vec![snapshot_host_info.into()] },
)));
NetworkResponses::NoResponse
}
Expand Down
5 changes: 2 additions & 3 deletions chain/network/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ use near_primitives::hash::CryptoHash;
use near_primitives::network::{AnnounceAccount, PeerId};
use near_primitives::sharding::PartialEncodedChunkWithArcReceipts;
use near_primitives::transaction::SignedTransaction;
use near_primitives::types::BlockHeight;
use near_primitives::types::{AccountId, ShardId};
use near_primitives::types::{AccountId, BlockHeight, EpochHeight, ShardId};
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::net::SocketAddr;
Expand Down Expand Up @@ -233,7 +232,7 @@ pub enum NetworkRequests {
/// Announce account
AnnounceAccount(AnnounceAccount),
/// Broadcast information about a hosted snapshot.
SnapshotHostInfo(Arc<SnapshotHostInfo>),
SnapshotHostInfo { sync_hash: CryptoHash, epoch_height: EpochHeight, shards: Vec<ShardId> },

/// Request chunk parts and/or receipts
PartialEncodedChunkRequest {
Expand Down
5 changes: 5 additions & 0 deletions core/store/src/flat/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,11 @@ impl FlatStorageManager {
Some(FlatStorageChunkView::new(self.0.store.clone(), block_hash, flat_storage))
}

pub fn get_shard_uids(&self) -> Vec<ShardUId> {
let flat_storages = self.0.flat_storages.lock().expect(POISONED_LOCK_ERR);
flat_storages.keys().cloned().collect()
}

// TODO (#7327): consider returning Result<FlatStorage, Error> when we expect flat storage to exist
pub fn get_flat_storage_for_shard(&self, shard_uid: ShardUId) -> Option<FlatStorage> {
let flat_storages = self.0.flat_storages.lock().expect(POISONED_LOCK_ERR);
Expand Down
14 changes: 10 additions & 4 deletions core/store/src/trie/state_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ impl StateSnapshot {
Self { prev_block_hash, store, flat_storage_manager }
}

/// Returns the UIds for the shards included in the snapshot.
pub fn get_shard_uids(&self) -> Vec<ShardUId> {
self.flat_storage_manager.get_shard_uids()
}

/// Returns status of a shard of a flat storage in the state snapshot.
pub fn get_flat_storage_status(&self, shard_uid: ShardUId) -> FlatStorageStatus {
self.flat_storage_manager.get_flat_storage_status(shard_uid)
Expand Down Expand Up @@ -152,13 +157,14 @@ impl ShardTries {
Ok((data.store.clone(), data.flat_storage_manager.clone()))
}

/// Makes a snapshot of the current state of the DB.
/// Makes a snapshot of the current state of the DB, if one is not already available.
/// If a new snapshot is created, returns the ids of the included shards.
pub fn create_state_snapshot(
&self,
prev_block_hash: CryptoHash,
shard_uids: &[ShardUId],
block: &Block,
) -> Result<(), anyhow::Error> {
) -> Result<Option<Vec<ShardUId>>, anyhow::Error> {
metrics::HAS_STATE_SNAPSHOT.set(0);
// The function returns an `anyhow::Error`, because no special handling of errors is done yet. The errors are logged and ignored.
let _span =
Expand All @@ -175,7 +181,7 @@ impl ShardTries {
&& state_snapshot.prev_block_hash == prev_block_hash
{
tracing::warn!(target: "state_snapshot", ?prev_block_hash, "Requested a state snapshot but that is already available");
return Ok(());
return Ok(None);
}
tracing::error!(target: "state_snapshot", ?prev_block_hash, ?state_snapshot.prev_block_hash, "Requested a state snapshot but that is already available with a different hash");
}
Expand Down Expand Up @@ -231,7 +237,7 @@ impl ShardTries {

metrics::HAS_STATE_SNAPSHOT.set(1);
tracing::info!(target: "state_snapshot", ?prev_block_hash, "Made a checkpoint");
Ok(())
Ok(Some(state_snapshot_lock.as_ref().unwrap().get_shard_uids()))
}

/// Runs compaction on the snapshot.
Expand Down
7 changes: 6 additions & 1 deletion nearcore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,12 @@ pub fn start_with_config_and_synchronization(
);

let state_snapshot_actor = Arc::new(
StateSnapshotActor::new(runtime.get_flat_storage_manager(), runtime.get_tries()).start(),
StateSnapshotActor::new(
runtime.get_flat_storage_manager(),
network_adapter.clone().into(),
runtime.get_tries(),
)
.start(),
);
let delete_snapshot_callback = get_delete_snapshot_callback(state_snapshot_actor.clone());
let make_snapshot_callback =
Expand Down

0 comments on commit 802f545

Please sign in to comment.