Skip to content

Commit

Permalink
publish information about snapshots to the network
Browse files Browse the repository at this point in the history
  • Loading branch information
saketh-are committed Nov 4, 2023
1 parent c4868c1 commit 348e37a
Show file tree
Hide file tree
Showing 34 changed files with 733 additions and 22 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions chain/chain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ strum.workspace = true
thiserror.workspace = true
tracing.workspace = true

near-async.workspace = true
near-cache.workspace = true
near-chain-configs.workspace = true
near-chain-primitives.workspace = true
near-client-primitives.workspace = true
near-crypto.workspace = true
near-epoch-manager.workspace = true
near-network.workspace = true
near-o11y.workspace = true
near-performance-metrics.workspace = true
near-performance-metrics-macros.workspace = true
Expand Down Expand Up @@ -66,18 +68,22 @@ nightly = [
"protocol_feature_chunk_validation",
"protocol_feature_reject_blocks_with_outdated_protocol_version",
"protocol_feature_simple_nightshade_v2",
"near-async/nightly",
"near-chain-configs/nightly",
"near-client-primitives/nightly",
"near-epoch-manager/nightly",
"near-network/nightly",
"near-o11y/nightly",
"near-pool/nightly",
"near-primitives/nightly",
"near-store/nightly",
]
nightly_protocol = [
"near-async/nightly_protocol",
"near-chain-configs/nightly_protocol",
"near-client-primitives/nightly_protocol",
"near-epoch-manager/nightly_protocol",
"near-network/nightly_protocol",
"near-o11y/nightly_protocol",
"near-pool/nightly_protocol",
"near-primitives/nightly_protocol",
Expand Down
4 changes: 3 additions & 1 deletion chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4275,13 +4275,15 @@ impl Chain {
if let Some(snapshot_callbacks) = &self.snapshot_callbacks {
if make_snapshot {
let head = self.head()?;
let epoch_height =
self.epoch_manager.get_epoch_height_from_prev_block(&head.prev_block_hash)?;
let shard_uids = self
.epoch_manager
.get_shard_layout_from_prev_block(&head.prev_block_hash)?
.get_shard_uids();
let last_block = self.get_block(&head.last_block_hash)?;
let make_snapshot_callback = &snapshot_callbacks.make_snapshot_callback;
make_snapshot_callback(head.prev_block_hash, shard_uids, last_block);
make_snapshot_callback(head.prev_block_hash, epoch_height, shard_uids, last_block);
} else if delete_snapshot {
let delete_snapshot_callback = &snapshot_callbacks.delete_snapshot_callback;
delete_snapshot_callback();
Expand Down
38 changes: 31 additions & 7 deletions chain/chain/src/state_snapshot_actor.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use actix::{AsyncContext, Context};
use near_async::messaging::CanSend;
use near_network::types::{NetworkRequests, PeerManagerAdapter, PeerManagerMessageRequest};
use near_o11y::{handler_debug_span, OpenTelemetrySpanExt, WithSpanContext, WithSpanContextExt};
use near_performance_metrics_macros::perf;
use near_primitives::block::Block;
use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::ShardUId;
use near_primitives::types::{EpochHeight, ShardId};
use near_store::flat::FlatStorageManager;
use near_store::ShardTries;
use std::sync::Arc;
Expand All @@ -15,12 +18,17 @@ use std::sync::Arc;
/// 3. CompactSnapshotRequest: compacts a snapshot store.
pub struct StateSnapshotActor {
flat_storage_manager: FlatStorageManager,
network_adapter: PeerManagerAdapter,
tries: ShardTries,
}

impl StateSnapshotActor {
pub fn new(flat_storage_manager: FlatStorageManager, tries: ShardTries) -> Self {
Self { flat_storage_manager, tries }
pub fn new(
flat_storage_manager: FlatStorageManager,
network_adapter: PeerManagerAdapter,
tries: ShardTries,
) -> Self {
Self { flat_storage_manager, network_adapter, tries }
}
}

Expand All @@ -40,6 +48,8 @@ struct DeleteAndMaybeCreateSnapshotRequest {
struct CreateSnapshotRequest {
/// prev_hash of the last processed block.
prev_block_hash: CryptoHash,
/// epoch height associated with prev_block_hash
epoch_height: EpochHeight,
/// Shards that need to be present in the snapshot.
shard_uids: Vec<ShardUId>,
/// Last block of the prev epoch.
Expand Down Expand Up @@ -81,7 +91,7 @@ impl actix::Handler<WithSpanContext<CreateSnapshotRequest>> for StateSnapshotAct
let (_span, msg) = handler_debug_span!(target: "state_snapshot", msg);
tracing::debug!(target: "state_snapshot", ?msg);

let CreateSnapshotRequest { prev_block_hash, shard_uids, block } = msg;
let CreateSnapshotRequest { prev_block_hash, epoch_height, shard_uids, block } = msg;
let res = self.tries.create_state_snapshot(prev_block_hash, &shard_uids, &block);

// Unlocking flat state head can be done asynchronously in state_snapshot_actor.
Expand All @@ -90,7 +100,20 @@ impl actix::Handler<WithSpanContext<CreateSnapshotRequest>> for StateSnapshotAct
tracing::error!(target: "state_snapshot", ?prev_block_hash, ?shard_uids, "Failed to unlock flat state updates");
}
match res {
Ok(_) => {
Ok(res_shard_uids) => {
if let Some(res_shard_uids) = res_shard_uids {
self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::SnapshotHostInfo {
sync_hash: prev_block_hash,
epoch_height,
shards: res_shard_uids
.iter()
.map(|uid| uid.shard_id as ShardId)
.collect(),
},
));
}

if self.tries.state_snapshot_config().compaction_enabled {
context.address().do_send(CompactSnapshotRequest {}.with_span_context());
} else {
Expand Down Expand Up @@ -122,7 +145,7 @@ impl actix::Handler<WithSpanContext<CompactSnapshotRequest>> for StateSnapshotAc
}

type MakeSnapshotCallback =
Arc<dyn Fn(CryptoHash, Vec<ShardUId>, Block) -> () + Send + Sync + 'static>;
Arc<dyn Fn(CryptoHash, EpochHeight, Vec<ShardUId>, Block) -> () + Send + Sync + 'static>;

type DeleteSnapshotCallback = Arc<dyn Fn() -> () + Send + Sync + 'static>;

Expand All @@ -136,7 +159,7 @@ pub fn get_make_snapshot_callback(
state_snapshot_addr: Arc<actix::Addr<StateSnapshotActor>>,
flat_storage_manager: FlatStorageManager,
) -> MakeSnapshotCallback {
Arc::new(move |prev_block_hash, shard_uids, block| {
Arc::new(move |prev_block_hash, epoch_height, shard_uids, block| {
tracing::info!(
target: "state_snapshot",
?prev_block_hash,
Expand All @@ -148,7 +171,8 @@ pub fn get_make_snapshot_callback(
tracing::error!(target: "state_snapshot", ?prev_block_hash, ?shard_uids, "Failed to lock flat state updates");
return;
}
let create_snapshot_request = CreateSnapshotRequest { prev_block_hash, shard_uids, block };
let create_snapshot_request =
CreateSnapshotRequest { prev_block_hash, epoch_height, shard_uids, block };
state_snapshot_addr.do_send(
DeleteAndMaybeCreateSnapshotRequest {
create_snapshot_request: Some(create_snapshot_request),
Expand Down
1 change: 1 addition & 0 deletions chain/client/src/test_utils/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,7 @@ pub fn setup_mock_all_validators(
NetworkRequests::ForwardTx(_, _)
| NetworkRequests::BanPeer { .. }
| NetworkRequests::TxStatus(_, _, _)
| NetworkRequests::SnapshotHostInfo { .. }
| NetworkRequests::Challenge(_) => {}
};
}
Expand Down
2 changes: 1 addition & 1 deletion chain/client/src/test_utils/test_env_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ impl TestEnvBuilder {
None => TEST_SEED,
};
let tries = runtime.get_tries();
let make_snapshot_callback = Arc::new(move |prev_block_hash, shard_uids: Vec<ShardUId>, block| {
let make_snapshot_callback = Arc::new(move |prev_block_hash, _epoch_height, shard_uids: Vec<ShardUId>, block| {
tracing::info!(target: "state_snapshot", ?prev_block_hash, "make_snapshot_callback");
tries.delete_state_snapshot();
tries.create_state_snapshot(prev_block_hash, &shard_uids, &block).unwrap();
Expand Down
3 changes: 2 additions & 1 deletion chain/jsonrpc-primitives/src/types/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use near_client_primitives::debug::{
#[cfg(feature = "debug_types")]
use near_primitives::views::{
CatchupStatusView, ChainProcessingInfo, NetworkGraphView, NetworkRoutesView, PeerStoreView,
RecentOutboundConnectionsView, RequestedStatePartsView, SyncStatusView,
RecentOutboundConnectionsView, RequestedStatePartsView, SnapshotHostsView, SyncStatusView,
};

#[derive(Debug, serde::Serialize, serde::Deserialize)]
Expand Down Expand Up @@ -33,6 +33,7 @@ pub enum DebugStatusResponse {
NetworkGraph(NetworkGraphView),
RecentOutboundConnections(RecentOutboundConnectionsView),
Routes(NetworkRoutesView),
SnapshotHosts(SnapshotHostsView),
}

#[cfg(feature = "debug_types")]
Expand Down
3 changes: 3 additions & 0 deletions chain/jsonrpc/src/api/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ impl RpcFrom<near_network::debug::DebugStatus>
near_network::debug::DebugStatus::Routes(x) => {
near_jsonrpc_primitives::types::status::DebugStatusResponse::Routes(x)
}
near_network::debug::DebugStatus::SnapshotHosts(x) => {
near_jsonrpc_primitives::types::status::DebugStatusResponse::SnapshotHosts(x)
}
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions chain/jsonrpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,10 @@ impl JsonRpcHandler {
.peer_manager_send(near_network::debug::GetDebugStatus::Routes)
.await?
.rpc_into(),
"/debug/api/snapshot_hosts" => self
.peer_manager_send(near_network::debug::GetDebugStatus::SnapshotHosts)
.await?
.rpc_into(),
_ => return Ok(None),
};
Ok(Some(near_jsonrpc_primitives::types::status::RpcDebugStatusResponse {
Expand Down
4 changes: 4 additions & 0 deletions chain/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ pub struct NetworkConfig {
pub archive: bool,
/// Maximal rate at which SyncAccountsData can be broadcasted.
pub accounts_data_broadcast_rate_limit: rate::Limit,
/// Maximal rate at which SyncSnapshotHosts can be broadcasted.
pub snapshot_hosts_broadcast_rate_limit: rate::Limit,
/// Maximal rate at which RoutingTable can be recomputed.
pub routing_table_update_rate_limit: rate::Limit,
/// Config of the TIER1 network.
Expand Down Expand Up @@ -318,6 +320,7 @@ impl NetworkConfig {
outbound_disabled: false,
archive,
accounts_data_broadcast_rate_limit: rate::Limit { qps: 0.1, burst: 1 },
snapshot_hosts_broadcast_rate_limit: rate::Limit { qps: 0.1, burst: 1 },
routing_table_update_rate_limit: rate::Limit { qps: 1., burst: 1 },
tier1: Some(Tier1 {
connect_interval: cfg.experimental.tier1_connect_interval.try_into()?,
Expand Down Expand Up @@ -386,6 +389,7 @@ impl NetworkConfig {
inbound_disabled: false,
archive: false,
accounts_data_broadcast_rate_limit: rate::Limit { qps: 100., burst: 1000000 },
snapshot_hosts_broadcast_rate_limit: rate::Limit { qps: 100., burst: 1000000 },
routing_table_update_rate_limit: rate::Limit { qps: 10., burst: 1 },
tier1: Some(Tier1 {
// Interval is very large, so that it doesn't happen spontaneously in tests.
Expand Down
3 changes: 3 additions & 0 deletions chain/network/src/debug.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use ::actix::Message;
use near_primitives::views::{
NetworkGraphView, NetworkRoutesView, PeerStoreView, RecentOutboundConnectionsView,
SnapshotHostsView,
};

// Different debug requests that can be sent by HTML pages, via GET.
Expand All @@ -9,6 +10,7 @@ pub enum GetDebugStatus {
Graph,
RecentOutboundConnections,
Routes,
SnapshotHosts,
}

#[derive(actix::MessageResponse, Debug)]
Expand All @@ -17,6 +19,7 @@ pub enum DebugStatus {
Graph(NetworkGraphView),
RecentOutboundConnections(RecentOutboundConnectionsView),
Routes(NetworkRoutesView),
SnapshotHosts(SnapshotHostsView),
}

impl Message for GetDebugStatus {
Expand Down
1 change: 1 addition & 0 deletions chain/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod network_protocol;
mod peer;
mod peer_manager;
mod private_actix;
mod snapshot_hosts;
mod stats;
mod store;
mod stun;
Expand Down
2 changes: 2 additions & 0 deletions chain/network/src/network_protocol/borsh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//! WARNING WARNING WARNING
//! We need to maintain backwards compatibility, all changes to this file needs to be reviews.
use crate::network_protocol::edge::{Edge, PartialEdgeInfo};
use crate::network_protocol::SyncSnapshotHosts;
use crate::network_protocol::{PeerChainInfoV2, PeerInfo, RoutedMessage, StateResponseInfo};
use borsh::{BorshDeserialize, BorshSerialize};
use near_primitives::block::{Block, BlockHeader, GenesisId};
Expand Down Expand Up @@ -158,6 +159,7 @@ pub(super) enum PeerMessage {
StateRequestHeader(ShardId, CryptoHash),
StateRequestPart(ShardId, CryptoHash, u64),
VersionedStateResponse(StateResponseInfo),
SyncSnapshotHosts(SyncSnapshotHosts),
}
#[cfg(target_arch = "x86_64")] // Non-x86_64 doesn't match this requirement yet but it's not bad as it's not production-ready
const _: () = assert!(std::mem::size_of::<PeerMessage>() <= 1500, "PeerMessage > 1500 bytes");
2 changes: 2 additions & 0 deletions chain/network/src/network_protocol/borsh_conv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ impl TryFrom<&net::PeerMessage> for mem::PeerMessage {
net::PeerMessage::VersionedStateResponse(sri) => {
mem::PeerMessage::VersionedStateResponse(sri)
}
net::PeerMessage::SyncSnapshotHosts(ssh) => mem::PeerMessage::SyncSnapshotHosts(ssh),
})
}
}
Expand Down Expand Up @@ -251,6 +252,7 @@ impl From<&mem::PeerMessage> for net::PeerMessage {
mem::PeerMessage::VersionedStateResponse(sri) => {
net::PeerMessage::VersionedStateResponse(sri)
}
mem::PeerMessage::SyncSnapshotHosts(ssh) => net::PeerMessage::SyncSnapshotHosts(ssh),
}
}
}
3 changes: 3 additions & 0 deletions chain/network/src/network_protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ mod borsh_conv;
mod edge;
mod peer;
mod proto_conv;
mod state_sync;
pub use edge::*;
pub use peer::*;
pub use state_sync::*;

#[cfg(test)]
pub(crate) mod testonly;
Expand Down Expand Up @@ -408,6 +410,7 @@ pub enum PeerMessage {
Disconnect(Disconnect),
Challenge(Challenge),

SyncSnapshotHosts(SyncSnapshotHosts),
StateRequestHeader(ShardId, CryptoHash),
StateRequestPart(ShardId, CryptoHash, u64),
VersionedStateResponse(StateResponseInfo),
Expand Down
14 changes: 14 additions & 0 deletions chain/network/src/network_protocol/network.proto
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,19 @@ message StateResponse {
StateResponseInfo state_response_info = 1;
}

message SnapshotHostInfo {
PublicKey peer_id = 1;
CryptoHash sync_hash = 2;
uint64 epoch_height = 3;
repeated uint64 shards = 4;
Signature signature = 5;
}

message SyncSnapshotHosts {
// Information about peers in the network hosting state snapshots
repeated SnapshotHostInfo hosts = 1;
}

// PeerMessage is a wrapper of all message types exchanged between NEAR nodes.
// The wire format of a single message M consists of len(M)+4 bytes:
// <len(M)> : 4 bytes : little endian uint32
Expand Down Expand Up @@ -484,5 +497,6 @@ message PeerMessage {
StateRequestHeader state_request_header = 29;
StateRequestPart state_request_part = 30;
StateResponse state_response = 31;
SyncSnapshotHosts sync_snapshot_hosts = 32;
}
}
Loading

0 comments on commit 348e37a

Please sign in to comment.