From 802f5452a8b754e84a56a749017b534af1ff5822 Mon Sep 17 00:00:00 2001 From: Saketh Are Date: Fri, 3 Nov 2023 15:53:20 -0400 Subject: [PATCH] properly publish snapshot info --- Cargo.lock | 2 + chain/chain/Cargo.toml | 2 + chain/chain/src/chain.rs | 4 +- chain/chain/src/state_snapshot_actor.rs | 38 +++++++++++++++---- chain/client/src/test_utils/setup.rs | 2 +- .../client/src/test_utils/test_env_builder.rs | 2 +- .../src/peer_manager/peer_manager_actor.rs | 18 +++++++-- chain/network/src/types.rs | 5 +-- core/store/src/flat/manager.rs | 5 +++ core/store/src/trie/state_snapshot.rs | 14 +++++-- nearcore/src/lib.rs | 7 +++- 11 files changed, 77 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 494c7f2d04a..61aeb36c858 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3436,12 +3436,14 @@ dependencies = [ "itertools", "itoa", "lru", + "near-async", "near-cache", "near-chain-configs", "near-chain-primitives", "near-client-primitives", "near-crypto", "near-epoch-manager", + "near-network", "near-o11y", "near-performance-metrics", "near-performance-metrics-macros", diff --git a/chain/chain/Cargo.toml b/chain/chain/Cargo.toml index 23a7b7f125f..70adafc7982 100644 --- a/chain/chain/Cargo.toml +++ b/chain/chain/Cargo.toml @@ -26,12 +26,14 @@ strum.workspace = true thiserror.workspace = true tracing.workspace = true +near-async.workspace = true near-cache.workspace = true near-chain-configs.workspace = true near-chain-primitives.workspace = true near-client-primitives.workspace = true near-crypto.workspace = true near-epoch-manager.workspace = true +near-network.workspace = true near-o11y.workspace = true near-performance-metrics.workspace = true near-performance-metrics-macros.workspace = true diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index 0789b1a8f13..de6262cf1d8 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -4275,13 +4275,15 @@ impl Chain { if let Some(snapshot_callbacks) = &self.snapshot_callbacks { if make_snapshot { let head = self.head()?; + let epoch_height = + self.epoch_manager.get_epoch_height_from_prev_block(&head.prev_block_hash)?; let shard_uids = self .epoch_manager .get_shard_layout_from_prev_block(&head.prev_block_hash)? .get_shard_uids(); let last_block = self.get_block(&head.last_block_hash)?; let make_snapshot_callback = &snapshot_callbacks.make_snapshot_callback; - make_snapshot_callback(head.prev_block_hash, shard_uids, last_block); + make_snapshot_callback(head.prev_block_hash, epoch_height, shard_uids, last_block); } else if delete_snapshot { let delete_snapshot_callback = &snapshot_callbacks.delete_snapshot_callback; delete_snapshot_callback(); diff --git a/chain/chain/src/state_snapshot_actor.rs b/chain/chain/src/state_snapshot_actor.rs index 29062f1b853..e2662538bad 100644 --- a/chain/chain/src/state_snapshot_actor.rs +++ b/chain/chain/src/state_snapshot_actor.rs @@ -1,9 +1,12 @@ use actix::{AsyncContext, Context}; +use near_async::messaging::CanSend; +use near_network::types::{NetworkRequests, PeerManagerAdapter, PeerManagerMessageRequest}; use near_o11y::{handler_debug_span, OpenTelemetrySpanExt, WithSpanContext, WithSpanContextExt}; use near_performance_metrics_macros::perf; use near_primitives::block::Block; use near_primitives::hash::CryptoHash; use near_primitives::shard_layout::ShardUId; +use near_primitives::types::{EpochHeight, ShardId}; use near_store::flat::FlatStorageManager; use near_store::ShardTries; use std::sync::Arc; @@ -15,12 +18,17 @@ use std::sync::Arc; /// 3. CompactSnapshotRequest: compacts a snapshot store. pub struct StateSnapshotActor { flat_storage_manager: FlatStorageManager, + network_adapter: PeerManagerAdapter, tries: ShardTries, } impl StateSnapshotActor { - pub fn new(flat_storage_manager: FlatStorageManager, tries: ShardTries) -> Self { - Self { flat_storage_manager, tries } + pub fn new( + flat_storage_manager: FlatStorageManager, + network_adapter: PeerManagerAdapter, + tries: ShardTries, + ) -> Self { + Self { flat_storage_manager, network_adapter, tries } } } @@ -40,6 +48,8 @@ struct DeleteAndMaybeCreateSnapshotRequest { struct CreateSnapshotRequest { /// prev_hash of the last processed block. prev_block_hash: CryptoHash, + /// epoch height associated with prev_block_hash + epoch_height: EpochHeight, /// Shards that need to be present in the snapshot. shard_uids: Vec, /// Last block of the prev epoch. @@ -81,7 +91,7 @@ impl actix::Handler> for StateSnapshotAct let (_span, msg) = handler_debug_span!(target: "state_snapshot", msg); tracing::debug!(target: "state_snapshot", ?msg); - let CreateSnapshotRequest { prev_block_hash, shard_uids, block } = msg; + let CreateSnapshotRequest { prev_block_hash, epoch_height, shard_uids, block } = msg; let res = self.tries.create_state_snapshot(prev_block_hash, &shard_uids, &block); // Unlocking flat state head can be done asynchronously in state_snapshot_actor. @@ -90,7 +100,20 @@ impl actix::Handler> for StateSnapshotAct tracing::error!(target: "state_snapshot", ?prev_block_hash, ?shard_uids, "Failed to unlock flat state updates"); } match res { - Ok(_) => { + Ok(res_shard_uids) => { + if let Some(res_shard_uids) = res_shard_uids { + self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests( + NetworkRequests::SnapshotHostInfo { + sync_hash: prev_block_hash, + epoch_height, + shards: res_shard_uids + .iter() + .map(|uid| uid.shard_id as ShardId) + .collect(), + }, + )); + } + if self.tries.state_snapshot_config().compaction_enabled { context.address().do_send(CompactSnapshotRequest {}.with_span_context()); } else { @@ -122,7 +145,7 @@ impl actix::Handler> for StateSnapshotAc } type MakeSnapshotCallback = - Arc, Block) -> () + Send + Sync + 'static>; + Arc, Block) -> () + Send + Sync + 'static>; type DeleteSnapshotCallback = Arc () + Send + Sync + 'static>; @@ -136,7 +159,7 @@ pub fn get_make_snapshot_callback( state_snapshot_addr: Arc>, flat_storage_manager: FlatStorageManager, ) -> MakeSnapshotCallback { - Arc::new(move |prev_block_hash, shard_uids, block| { + Arc::new(move |prev_block_hash, epoch_height, shard_uids, block| { tracing::info!( target: "state_snapshot", ?prev_block_hash, @@ -148,7 +171,8 @@ pub fn get_make_snapshot_callback( tracing::error!(target: "state_snapshot", ?prev_block_hash, ?shard_uids, "Failed to lock flat state updates"); return; } - let create_snapshot_request = CreateSnapshotRequest { prev_block_hash, shard_uids, block }; + let create_snapshot_request = + CreateSnapshotRequest { prev_block_hash, epoch_height, shard_uids, block }; state_snapshot_addr.do_send( DeleteAndMaybeCreateSnapshotRequest { create_snapshot_request: Some(create_snapshot_request), diff --git a/chain/client/src/test_utils/setup.rs b/chain/client/src/test_utils/setup.rs index 67a38c32d3d..af86b684020 100644 --- a/chain/client/src/test_utils/setup.rs +++ b/chain/client/src/test_utils/setup.rs @@ -826,7 +826,7 @@ pub fn setup_mock_all_validators( NetworkRequests::ForwardTx(_, _) | NetworkRequests::BanPeer { .. } | NetworkRequests::TxStatus(_, _, _) - | NetworkRequests::SnapshotHostInfo(_) + | NetworkRequests::SnapshotHostInfo { .. } | NetworkRequests::Challenge(_) => {} }; } diff --git a/chain/client/src/test_utils/test_env_builder.rs b/chain/client/src/test_utils/test_env_builder.rs index b122a6bd49c..c526f85359b 100644 --- a/chain/client/src/test_utils/test_env_builder.rs +++ b/chain/client/src/test_utils/test_env_builder.rs @@ -502,7 +502,7 @@ impl TestEnvBuilder { None => TEST_SEED, }; let tries = runtime.get_tries(); - let make_snapshot_callback = Arc::new(move |prev_block_hash, shard_uids: Vec, block| { + let make_snapshot_callback = Arc::new(move |prev_block_hash, _epoch_height, shard_uids: Vec, block| { tracing::info!(target: "state_snapshot", ?prev_block_hash, "make_snapshot_callback"); tries.delete_state_snapshot(); tries.create_state_snapshot(prev_block_hash, &shard_uids, &block).unwrap(); diff --git a/chain/network/src/peer_manager/peer_manager_actor.rs b/chain/network/src/peer_manager/peer_manager_actor.rs index 03850e59283..35666a5ac9e 100644 --- a/chain/network/src/peer_manager/peer_manager_actor.rs +++ b/chain/network/src/peer_manager/peer_manager_actor.rs @@ -338,7 +338,7 @@ impl PeerManagerActor { let mut interval = time::Interval::new(clock.now(), time::Duration::seconds(30)); - let mut epoch_height: EpochHeight = 9876543210; + let mut epoch_height: EpochHeight = 321; async move { loop { @@ -352,7 +352,7 @@ impl PeerManagerActor { peer_id.clone(), hash, epoch_height, - vec![23, epoch_height % 10], + vec![0, epoch_height % 10 + 1], &state.config.node_key, ); @@ -816,9 +816,19 @@ impl PeerManagerActor { NetworkResponses::RouteNotFound } } - NetworkRequests::SnapshotHostInfo(snapshot_host_info) => { + NetworkRequests::SnapshotHostInfo { sync_hash, epoch_height, shards } => { + // Sign the information about the locally created snapshot using the keys in the + // network config before broadcasting it + let snapshot_host_info = SnapshotHostInfo::new( + self.state.config.node_id(), + sync_hash, + epoch_height, + shards, + &self.state.config.node_key, + ); + self.state.tier2.broadcast_message(Arc::new(PeerMessage::SyncSnapshotHosts( - SyncSnapshotHosts { hosts: vec![snapshot_host_info] }, + SyncSnapshotHosts { hosts: vec![snapshot_host_info.into()] }, ))); NetworkResponses::NoResponse } diff --git a/chain/network/src/types.rs b/chain/network/src/types.rs index 1f53ed3886a..6ce68b187c6 100644 --- a/chain/network/src/types.rs +++ b/chain/network/src/types.rs @@ -22,8 +22,7 @@ use near_primitives::hash::CryptoHash; use near_primitives::network::{AnnounceAccount, PeerId}; use near_primitives::sharding::PartialEncodedChunkWithArcReceipts; use near_primitives::transaction::SignedTransaction; -use near_primitives::types::BlockHeight; -use near_primitives::types::{AccountId, ShardId}; +use near_primitives::types::{AccountId, BlockHeight, EpochHeight, ShardId}; use std::collections::{HashMap, HashSet}; use std::fmt::Debug; use std::net::SocketAddr; @@ -233,7 +232,7 @@ pub enum NetworkRequests { /// Announce account AnnounceAccount(AnnounceAccount), /// Broadcast information about a hosted snapshot. - SnapshotHostInfo(Arc), + SnapshotHostInfo { sync_hash: CryptoHash, epoch_height: EpochHeight, shards: Vec }, /// Request chunk parts and/or receipts PartialEncodedChunkRequest { diff --git a/core/store/src/flat/manager.rs b/core/store/src/flat/manager.rs index c9de6f8a2a3..7d919f13148 100644 --- a/core/store/src/flat/manager.rs +++ b/core/store/src/flat/manager.rs @@ -204,6 +204,11 @@ impl FlatStorageManager { Some(FlatStorageChunkView::new(self.0.store.clone(), block_hash, flat_storage)) } + pub fn get_shard_uids(&self) -> Vec { + let flat_storages = self.0.flat_storages.lock().expect(POISONED_LOCK_ERR); + flat_storages.keys().cloned().collect() + } + // TODO (#7327): consider returning Result when we expect flat storage to exist pub fn get_flat_storage_for_shard(&self, shard_uid: ShardUId) -> Option { let flat_storages = self.0.flat_storages.lock().expect(POISONED_LOCK_ERR); diff --git a/core/store/src/trie/state_snapshot.rs b/core/store/src/trie/state_snapshot.rs index c9de4343dce..f26b3cac3c8 100644 --- a/core/store/src/trie/state_snapshot.rs +++ b/core/store/src/trie/state_snapshot.rs @@ -117,6 +117,11 @@ impl StateSnapshot { Self { prev_block_hash, store, flat_storage_manager } } + /// Returns the UIds for the shards included in the snapshot. + pub fn get_shard_uids(&self) -> Vec { + self.flat_storage_manager.get_shard_uids() + } + /// Returns status of a shard of a flat storage in the state snapshot. pub fn get_flat_storage_status(&self, shard_uid: ShardUId) -> FlatStorageStatus { self.flat_storage_manager.get_flat_storage_status(shard_uid) @@ -152,13 +157,14 @@ impl ShardTries { Ok((data.store.clone(), data.flat_storage_manager.clone())) } - /// Makes a snapshot of the current state of the DB. + /// Makes a snapshot of the current state of the DB, if one is not already available. + /// If a new snapshot is created, returns the ids of the included shards. pub fn create_state_snapshot( &self, prev_block_hash: CryptoHash, shard_uids: &[ShardUId], block: &Block, - ) -> Result<(), anyhow::Error> { + ) -> Result>, anyhow::Error> { metrics::HAS_STATE_SNAPSHOT.set(0); // The function returns an `anyhow::Error`, because no special handling of errors is done yet. The errors are logged and ignored. let _span = @@ -175,7 +181,7 @@ impl ShardTries { && state_snapshot.prev_block_hash == prev_block_hash { tracing::warn!(target: "state_snapshot", ?prev_block_hash, "Requested a state snapshot but that is already available"); - return Ok(()); + return Ok(None); } tracing::error!(target: "state_snapshot", ?prev_block_hash, ?state_snapshot.prev_block_hash, "Requested a state snapshot but that is already available with a different hash"); } @@ -231,7 +237,7 @@ impl ShardTries { metrics::HAS_STATE_SNAPSHOT.set(1); tracing::info!(target: "state_snapshot", ?prev_block_hash, "Made a checkpoint"); - Ok(()) + Ok(Some(state_snapshot_lock.as_ref().unwrap().get_shard_uids())) } /// Runs compaction on the snapshot. diff --git a/nearcore/src/lib.rs b/nearcore/src/lib.rs index 5820a056f53..8fb033f4a20 100644 --- a/nearcore/src/lib.rs +++ b/nearcore/src/lib.rs @@ -317,7 +317,12 @@ pub fn start_with_config_and_synchronization( ); let state_snapshot_actor = Arc::new( - StateSnapshotActor::new(runtime.get_flat_storage_manager(), runtime.get_tries()).start(), + StateSnapshotActor::new( + runtime.get_flat_storage_manager(), + network_adapter.clone().into(), + runtime.get_tries(), + ) + .start(), ); let delete_snapshot_callback = get_delete_snapshot_callback(state_snapshot_actor.clone()); let make_snapshot_callback =