diff --git a/chain/client/src/sync/state.rs b/chain/client/src/sync/state.rs index 0344ab89d5e..16cece2a726 100644 --- a/chain/client/src/sync/state.rs +++ b/chain/client/src/sync/state.rs @@ -675,24 +675,42 @@ impl StateSync { for ((part_id, download), target) in parts_to_fetch(new_shard_sync_download).zip(possible_targets_sampler) { - sent_request_part( - self.clock.clone(), - target.clone(), - part_id, - shard_id, - sync_hash, - last_part_id_requested, - requested_target, - self.timeout, - ); - request_part_from_peers( - part_id, - target, - download, - shard_id, - sync_hash, - &self.network_adapter, - ); + // The request sent to the network adapater needs to include the sync_prev_prev_hash + // so that a peer hosting the correct snapshot can be selected. + let prev_header = chain + .get_block_header(&sync_hash) + .map(|header| chain.get_block_header(&header.prev_hash())); + + match prev_header { + Ok(Ok(prev_header)) => { + let sync_prev_prev_hash = prev_header.prev_hash(); + sent_request_part( + self.clock.clone(), + target.clone(), + part_id, + shard_id, + sync_hash, + last_part_id_requested, + requested_target, + self.timeout, + ); + request_part_from_peers( + part_id, + target, + download, + shard_id, + sync_hash, + *sync_prev_prev_hash, + &self.network_adapter, + ); + } + Ok(Err(err)) => { + tracing::error!(target: "sync", %shard_id, %sync_hash, ?err, "could not get prev header"); + } + Err(err) => { + tracing::error!(target: "sync", %shard_id, %sync_hash, ?err, "could not get header"); + } + } } } StateSyncInner::External { chain_id, semaphore, external } => { @@ -1304,18 +1322,24 @@ fn request_part_from_peers( download: &mut DownloadStatus, shard_id: ShardId, sync_hash: CryptoHash, + sync_prev_prev_hash: CryptoHash, network_adapter: &PeerManagerAdapter, ) { download.run_me.store(false, Ordering::SeqCst); download.state_requests_count += 1; - download.last_target = Some(peer_id.clone()); + download.last_target = Some(peer_id); let run_me = download.run_me.clone(); near_performance_metrics::actix::spawn( "StateSync", network_adapter .send_async(PeerManagerMessageRequest::NetworkRequests( - NetworkRequests::StateRequestPart { shard_id, sync_hash, part_id, peer_id }, + NetworkRequests::StateRequestPart { + shard_id, + sync_hash, + sync_prev_prev_hash, + part_id, + }, )) .then(move |result| { // TODO: possible optimization - in the current code, even if one of the targets it not present in the network graph diff --git a/chain/client/src/tests/catching_up.rs b/chain/client/src/tests/catching_up.rs index 19a5ced219a..f49db6989ec 100644 --- a/chain/client/src/tests/catching_up.rs +++ b/chain/client/src/tests/catching_up.rs @@ -101,8 +101,9 @@ enum ReceiptsSyncPhases { pub struct StateRequestStruct { pub shard_id: u64, pub sync_hash: CryptoHash, + pub sync_prev_prev_hash: Option, pub part_id: Option, - pub peer_id: PeerId, + pub peer_id: Option, } /// Sanity checks that the incoming and outgoing receipts are properly sent and received @@ -268,8 +269,9 @@ fn test_catchup_receipts_sync_common(wait_till: u64, send: u64, sync_hold: bool) let srs = StateRequestStruct { shard_id: *shard_id, sync_hash: *sync_hash, + sync_prev_prev_hash: None, part_id: None, - peer_id: peer_id.clone(), + peer_id: Some(peer_id.clone()), }; if !seen_hashes_with_state .contains(&hash_func(&borsh::to_vec(&srs).unwrap())) @@ -283,16 +285,17 @@ fn test_catchup_receipts_sync_common(wait_till: u64, send: u64, sync_hold: bool) if let NetworkRequests::StateRequestPart { shard_id, sync_hash, + sync_prev_prev_hash, part_id, - peer_id, } = msg { if sync_hold { let srs = StateRequestStruct { shard_id: *shard_id, sync_hash: *sync_hash, + sync_prev_prev_hash: Some(*sync_prev_prev_hash), part_id: Some(*part_id), - peer_id: peer_id.clone(), + peer_id: None, }; if !seen_hashes_with_state .contains(&hash_func(&borsh::to_vec(&srs).unwrap())) diff --git a/chain/network/src/network_protocol/borsh_conv.rs b/chain/network/src/network_protocol/borsh_conv.rs index 4ef69a0dc58..cb899f8f896 100644 --- a/chain/network/src/network_protocol/borsh_conv.rs +++ b/chain/network/src/network_protocol/borsh_conv.rs @@ -216,6 +216,9 @@ impl From<&mem::PeerMessage> for net::PeerMessage { panic!("Tier1Handshake is not supported in Borsh encoding") } mem::PeerMessage::Tier2Handshake(h) => net::PeerMessage::Handshake((&h).into()), + mem::PeerMessage::Tier3Handshake(_) => { + panic!("Tier3Handshake is not supported in Borsh encoding") + } mem::PeerMessage::HandshakeFailure(pi, hfr) => { net::PeerMessage::HandshakeFailure(pi, (&hfr).into()) } diff --git a/chain/network/src/network_protocol/mod.rs b/chain/network/src/network_protocol/mod.rs index 6c584219c70..a2a2c28b0e2 100644 --- a/chain/network/src/network_protocol/mod.rs +++ b/chain/network/src/network_protocol/mod.rs @@ -411,6 +411,7 @@ pub struct Disconnect { pub enum PeerMessage { Tier1Handshake(Handshake), Tier2Handshake(Handshake), + Tier3Handshake(Handshake), HandshakeFailure(PeerInfo, HandshakeFailureReason), /// When a failed nonce is used by some peer, this message is sent back as evidence. LastEdge(Edge), @@ -552,6 +553,7 @@ pub enum RoutedMessageBody { VersionedChunkEndorsement(ChunkEndorsement), EpochSyncRequest, EpochSyncResponse(CompressedEpochSyncProof), + StatePartRequest(StatePartRequest), } impl RoutedMessageBody { @@ -645,6 +647,7 @@ impl fmt::Debug for RoutedMessageBody { RoutedMessageBody::EpochSyncResponse(_) => { write!(f, "EpochSyncResponse") } + RoutedMessageBody::StatePartRequest(_) => write!(f, "StatePartRequest"), } } } diff --git a/chain/network/src/network_protocol/network.proto b/chain/network/src/network_protocol/network.proto index f60bd202312..cc95644d4d5 100644 --- a/chain/network/src/network_protocol/network.proto +++ b/chain/network/src/network_protocol/network.proto @@ -458,17 +458,15 @@ message PeerMessage { TraceContext trace_context = 26; oneof message_type { - // Handshakes for TIER1 and TIER2 networks are considered separate, - // so that a node binary which doesn't support TIER1 connection won't - // be even able to PARSE the handshake. This way we avoid accidental - // connections, such that one end thinks it is a TIER2 connection and the - // other thinks it is a TIER1 connection. As currently both TIER1 and TIER2 - // connections are handled by the same PeerActor, both fields use the same - // underlying message type. If we ever decide to separate the handshake - // implementations, we can copy the Handshake message type defition and - // make it evolve differently for TIER1 and TIER2. + // Handshakes for different network tiers explicitly use different PeerMessage variants. + // This way we avoid accidental connections, such that one end thinks it is a TIER2 connection + // and the other thinks it is a TIER1 connection. Currently the same PeerActor handles + // all types of connections, hence the contents are identical for all types of connections. + // If we ever decide to separate the handshake implementations, we can copy the Handshake message + // type definition and make it evolve differently for different tiers. Handshake tier1_handshake = 27; Handshake tier2_handshake = 4; + Handshake tier3_handshake = 33; HandshakeFailure handshake_failure = 5; LastEdge last_edge = 6; diff --git a/chain/network/src/network_protocol/proto_conv/peer_message.rs b/chain/network/src/network_protocol/proto_conv/peer_message.rs index 38b4250b15a..b73a66d7966 100644 --- a/chain/network/src/network_protocol/proto_conv/peer_message.rs +++ b/chain/network/src/network_protocol/proto_conv/peer_message.rs @@ -234,6 +234,7 @@ impl From<&PeerMessage> for proto::PeerMessage { message_type: Some(match x { PeerMessage::Tier1Handshake(h) => ProtoMT::Tier1Handshake(h.into()), PeerMessage::Tier2Handshake(h) => ProtoMT::Tier2Handshake(h.into()), + PeerMessage::Tier3Handshake(h) => ProtoMT::Tier3Handshake(h.into()), PeerMessage::HandshakeFailure(pi, hfr) => { ProtoMT::HandshakeFailure((pi, hfr).into()) } @@ -398,6 +399,9 @@ impl TryFrom<&proto::PeerMessage> for PeerMessage { ProtoMT::Tier2Handshake(h) => { PeerMessage::Tier2Handshake(h.try_into().map_err(Self::Error::Handshake)?) } + ProtoMT::Tier3Handshake(h) => { + PeerMessage::Tier3Handshake(h.try_into().map_err(Self::Error::Handshake)?) + } ProtoMT::HandshakeFailure(hf) => { let (pi, hfr) = hf.try_into().map_err(Self::Error::HandshakeFailure)?; PeerMessage::HandshakeFailure(pi, hfr) diff --git a/chain/network/src/network_protocol/state_sync.rs b/chain/network/src/network_protocol/state_sync.rs index d30170b83d5..c500893f3f4 100644 --- a/chain/network/src/network_protocol/state_sync.rs +++ b/chain/network/src/network_protocol/state_sync.rs @@ -107,3 +107,26 @@ pub enum SnapshotHostInfoVerificationError { )] TooManyShards(usize), } + +/// Message used to request a state part. +/// +#[derive( + Clone, + Debug, + Eq, + PartialEq, + Hash, + borsh::BorshSerialize, + borsh::BorshDeserialize, + ProtocolSchema, +)] +pub struct StatePartRequest { + /// Requested shard id + pub shard_id: ShardId, + /// Hash of the requested snapshot's state root + pub sync_hash: CryptoHash, + /// Requested part id + pub part_id: u64, + /// Public address of the node making the request + pub addr: std::net::SocketAddr, +} diff --git a/chain/network/src/peer/peer_actor.rs b/chain/network/src/peer/peer_actor.rs index 4ff60267190..1e585d70b94 100644 --- a/chain/network/src/peer/peer_actor.rs +++ b/chain/network/src/peer/peer_actor.rs @@ -284,6 +284,18 @@ impl PeerActor { .start_outbound(peer_id.clone()) .map_err(ClosingReason::OutboundNotAllowed)? } + tcp::Tier::T3 => { + // Loop connections are allowed only on T1; see comment above + if peer_id == &network_state.config.node_id() { + return Err(ClosingReason::OutboundNotAllowed( + connection::PoolError::UnexpectedLoopConnection, + )); + } + network_state + .tier3 + .start_outbound(peer_id.clone()) + .map_err(ClosingReason::OutboundNotAllowed)? + } }, handshake_spec: HandshakeSpec { partial_edge_info: network_state.propose_edge(&clock, peer_id, None), @@ -293,10 +305,12 @@ impl PeerActor { }, }, }; - // Override force_encoding for outbound Tier1 connections, - // since Tier1Handshake is supported only with proto encoding. + // Override force_encoding for outbound Tier1 and Tier3 connections; + // Tier1Handshake and Tier3Handshake are supported only with proto encoding. let force_encoding = match &stream.type_ { - tcp::StreamType::Outbound { tier, .. } if tier == &tcp::Tier::T1 => { + tcp::StreamType::Outbound { tier, .. } + if tier == &tcp::Tier::T1 || tier == &tcp::Tier::T3 => + { Some(Encoding::Proto) } _ => force_encoding, @@ -480,6 +494,7 @@ impl PeerActor { let msg = match spec.tier { tcp::Tier::T1 => PeerMessage::Tier1Handshake(handshake), tcp::Tier::T2 => PeerMessage::Tier2Handshake(handshake), + tcp::Tier::T3 => PeerMessage::Tier3Handshake(handshake), }; self.send_message_or_log(&msg); } @@ -939,6 +954,9 @@ impl PeerActor { (PeerStatus::Connecting { .. }, PeerMessage::Tier2Handshake(msg)) => { self.process_handshake(ctx, tcp::Tier::T2, msg) } + (PeerStatus::Connecting { .. }, PeerMessage::Tier3Handshake(msg)) => { + self.process_handshake(ctx, tcp::Tier::T3, msg) + } (_, msg) => { tracing::warn!(target:"network","unexpected message during handshake: {}",msg) } @@ -1140,7 +1158,9 @@ impl PeerActor { self.stop(ctx, ClosingReason::DisconnectMessage); } - PeerMessage::Tier1Handshake(_) | PeerMessage::Tier2Handshake(_) => { + PeerMessage::Tier1Handshake(_) + | PeerMessage::Tier2Handshake(_) + | PeerMessage::Tier3Handshake(_) => { // Received handshake after already have seen handshake from this peer. tracing::debug!(target: "network", "Duplicate handshake from {}", self.peer_info); } @@ -1182,8 +1202,20 @@ impl PeerActor { self.stop(ctx, ClosingReason::Ban(ReasonForBan::Abusive)); } - // Add received peers to the peer store let node_id = self.network_state.config.node_id(); + + // Record our own IP address as observed by the peer. + if self.network_state.my_public_addr.read().is_none() { + if let Some(my_peer_info) = + direct_peers.iter().find(|peer_info| peer_info.id == node_id) + { + if let Some(addr) = my_peer_info.addr { + let mut my_public_addr = self.network_state.my_public_addr.write(); + *my_public_addr = Some(addr); + } + } + } + // Add received indirect peers to the peer store self.network_state.peer_store.add_indirect_peers( &self.clock, peers.into_iter().filter(|peer_info| peer_info.id != node_id), diff --git a/chain/network/src/peer_manager/connection/mod.rs b/chain/network/src/peer_manager/connection/mod.rs index ea9f7edccab..2035a673da5 100644 --- a/chain/network/src/peer_manager/connection/mod.rs +++ b/chain/network/src/peer_manager/connection/mod.rs @@ -36,8 +36,12 @@ impl tcp::Tier { match msg { PeerMessage::Tier1Handshake(_) => self == tcp::Tier::T1, PeerMessage::Tier2Handshake(_) => self == tcp::Tier::T2, + PeerMessage::Tier3Handshake(_) => self == tcp::Tier::T3, PeerMessage::HandshakeFailure(_, _) => true, PeerMessage::LastEdge(_) => true, + PeerMessage::VersionedStateResponse(_) => { + self == tcp::Tier::T2 || self == tcp::Tier::T3 + } PeerMessage::Routed(msg) => self.is_allowed_routed(&msg.body), _ => self == tcp::Tier::T2, } diff --git a/chain/network/src/peer_manager/network_state/mod.rs b/chain/network/src/peer_manager/network_state/mod.rs index 06c1bad9ffe..739b94e8101 100644 --- a/chain/network/src/peer_manager/network_state/mod.rs +++ b/chain/network/src/peer_manager/network_state/mod.rs @@ -28,7 +28,9 @@ use crate::state_witness::{ use crate::stats::metrics; use crate::store; use crate::tcp; -use crate::types::{ChainInfo, PeerType, ReasonForBan}; +use crate::types::{ + ChainInfo, PeerType, ReasonForBan, StatePartRequestBody, Tier3Request, Tier3RequestBody, +}; use anyhow::Context; use arc_swap::ArcSwap; use near_async::messaging::{CanSend, SendAsync, Sender}; @@ -38,7 +40,8 @@ use near_primitives::hash::CryptoHash; use near_primitives::network::PeerId; use near_primitives::stateless_validation::chunk_endorsement::ChunkEndorsement; use near_primitives::types::AccountId; -use parking_lot::Mutex; +use parking_lot::{Mutex, RwLock}; +use std::collections::VecDeque; use std::net::SocketAddr; use std::num::NonZeroUsize; use std::sync::atomic::AtomicUsize; @@ -115,8 +118,11 @@ pub(crate) struct NetworkState { /// Connected peers (inbound and outbound) with their full peer information. pub tier2: connection::Pool, pub tier1: connection::Pool, + pub tier3: connection::Pool, /// Semaphore limiting inflight inbound handshakes. pub inbound_handshake_permits: Arc, + /// The public IP of this node; available after connecting to any one peer. + pub my_public_addr: Arc>>, /// Peer store that provides read/write access to peers. pub peer_store: peer_store::PeerStore, /// Information about state snapshots hosted by network peers. @@ -143,6 +149,9 @@ pub(crate) struct NetworkState { /// TODO(gprusak): consider removing it altogether. pub tier1_route_back: Mutex, + /// Queue of received requests to which a response should be made over TIER3. + pub tier3_requests: Mutex>, + /// Shared counter across all PeerActors, which counts number of `RoutedMessageBody::ForwardTx` /// messages sincce last block. pub txns_since_last_block: AtomicUsize, @@ -194,7 +203,9 @@ impl NetworkState { chain_info: Default::default(), tier2: connection::Pool::new(config.node_id()), tier1: connection::Pool::new(config.node_id()), + tier3: connection::Pool::new(config.node_id()), inbound_handshake_permits: Arc::new(tokio::sync::Semaphore::new(LIMIT_PENDING_PEERS)), + my_public_addr: Arc::new(RwLock::new(None)), peer_store, snapshot_hosts: Arc::new(SnapshotHostsCache::new(config.snapshot_hosts.clone())), connection_store: connection_store::ConnectionStore::new(store.clone()).unwrap(), @@ -203,6 +214,7 @@ impl NetworkState { account_announcements: Arc::new(AnnounceAccountCache::new(store)), tier2_route_back: Mutex::new(RouteBackCache::default()), tier1_route_back: Mutex::new(RouteBackCache::default()), + tier3_requests: Mutex::new(VecDeque::::new()), recent_routed_messages: Mutex::new(lru::LruCache::new( NonZeroUsize::new(RECENT_ROUTED_MESSAGES_CACHE_SIZE).unwrap(), )), @@ -349,6 +361,18 @@ impl NetworkState { // Write to the peer store this.peer_store.peer_connected(&clock, peer_info); } + tcp::Tier::T3 => { + if conn.peer_type == PeerType::Inbound { + // TODO(saketh): When a peer initiates a TIER3 connection it should be + // responding to a request sent previously by the local node. If we + // maintain some state about pending requests it would be possible to add + // an additional layer of security here and reject unexpected connections. + } + if !edge.verify() { + return Err(RegisterPeerError::InvalidEdge); + } + this.tier3.insert_ready(conn).map_err(RegisterPeerError::PoolError)?; + } } Ok(()) }).await.unwrap() @@ -369,14 +393,19 @@ impl NetworkState { let clock = clock.clone(); let conn = conn.clone(); self.spawn(async move { - let peer_id = conn.peer_info.id.clone(); - if conn.tier == tcp::Tier::T1 { - // There is no banning or routing table for TIER1. - // Just remove the connection from the network_state. - this.tier1.remove(&conn); + match conn.tier { + tcp::Tier::T1 => this.tier1.remove(&conn), + tcp::Tier::T2 => this.tier2.remove(&conn), + tcp::Tier::T3 => this.tier3.remove(&conn), + } + + // The rest of this function has to do with banning or routing, + // which are applicable only for TIER2. + if conn.tier != tcp::Tier::T2 { return; } - this.tier2.remove(&conn); + + let peer_id = conn.peer_info.id.clone(); // If the last edge we have with this peer represent a connection addition, create the edge // update that represents the connection removal. @@ -558,6 +587,17 @@ impl NetworkState { } } } + tcp::Tier::T3 => { + let peer_id = match &msg.target { + PeerIdOrHash::Hash(_) => { + // There is no route back cache for TIER3 as all connections are direct + debug_assert!(false); + return false; + } + PeerIdOrHash::PeerId(peer_id) => peer_id.clone(), + }; + return self.tier3.send_message(peer_id, Arc::new(PeerMessage::Routed(msg))); + } } } @@ -743,6 +783,17 @@ impl NetworkState { self.client.send(EpochSyncResponseMessage { from_peer: peer_id, proof }); None } + RoutedMessageBody::StatePartRequest(request) => { + self.tier3_requests.lock().push_back(Tier3Request { + peer_info: PeerInfo { id: peer_id, addr: Some(request.addr), account_id: None }, + body: Tier3RequestBody::StatePart(StatePartRequestBody { + shard_id: request.shard_id, + sync_hash: request.sync_hash, + part_id: request.part_id, + }), + }); + None + } body => { tracing::error!(target: "network", "Peer received unexpected message type: {:?}", body); None diff --git a/chain/network/src/peer_manager/network_state/routing.rs b/chain/network/src/peer_manager/network_state/routing.rs index 0fe045fcdad..ccbf28c7f3f 100644 --- a/chain/network/src/peer_manager/network_state/routing.rs +++ b/chain/network/src/peer_manager/network_state/routing.rs @@ -210,9 +210,14 @@ impl NetworkState { tracing::trace!(target: "network", route_back = ?msg.clone(), "Received peer message that requires response"); let from = &conn.peer_info.id; + match conn.tier { tcp::Tier::T1 => self.tier1_route_back.lock().insert(&clock, msg.hash(), from.clone()), tcp::Tier::T2 => self.tier2_route_back.lock().insert(&clock, msg.hash(), from.clone()), + tcp::Tier::T3 => { + // TIER3 connections are direct by design; no routing is performed + debug_assert!(false) + } } } diff --git a/chain/network/src/peer_manager/peer_manager_actor.rs b/chain/network/src/peer_manager/peer_manager_actor.rs index 6e04e871203..30deb4aa529 100644 --- a/chain/network/src/peer_manager/peer_manager_actor.rs +++ b/chain/network/src/peer_manager/peer_manager_actor.rs @@ -1,10 +1,11 @@ -use crate::client::{ClientSenderForNetwork, SetNetworkInfo}; +use crate::client::{ClientSenderForNetwork, SetNetworkInfo, StateRequestPart}; use crate::config; use crate::debug::{DebugStatus, GetDebugStatus}; use crate::network_protocol; use crate::network_protocol::SyncSnapshotHosts; use crate::network_protocol::{ Disconnect, Edge, PeerIdOrHash, PeerMessage, Ping, Pong, RawRoutedMessage, RoutedMessageBody, + StatePartRequest, }; use crate::peer::peer_actor::PeerActor; use crate::peer_manager::connection; @@ -18,7 +19,7 @@ use crate::tcp; use crate::types::{ ConnectedPeerInfo, HighestHeightPeerInfo, KnownProducer, NetworkInfo, NetworkRequests, NetworkResponses, PeerInfo, PeerManagerMessageRequest, PeerManagerMessageResponse, PeerType, - SetChainInfo, SnapshotHostInfo, + SetChainInfo, SnapshotHostInfo, StatePartRequestBody, Tier3RequestBody, }; use ::time::ext::InstantExt as _; use actix::fut::future::wrap_future; @@ -87,6 +88,11 @@ pub(crate) const UPDATE_CONNECTION_STORE_INTERVAL: time::Duration = time::Durati /// How often to poll the NetworkState for closed connections we'd like to re-establish. pub(crate) const POLL_CONNECTION_STORE_INTERVAL: time::Duration = time::Duration::minutes(1); +/// How often we check for and process pending Tier3 requests +const PROCESS_TIER3_REQUESTS_INTERVAL: time::Duration = time::Duration::seconds(1); +/// The length of time that a Tier3 connection is allowed to idle before it is stopped +const TIER3_IDLE_TIMEOUT: time::Duration = time::Duration::seconds(15); + /// Actor that manages peers connections. pub struct PeerManagerActor { pub(crate) clock: time::Clock, @@ -338,6 +344,62 @@ impl PeerManagerActor { } } }); + // Periodically process pending Tier3 requests. + arbiter.spawn({ + let clock = clock.clone(); + let state = state.clone(); + let arbiter = arbiter.clone(); + let mut interval = time::Interval::new(clock.now(), PROCESS_TIER3_REQUESTS_INTERVAL); + async move { + loop { + interval.tick(&clock).await; + + if let Some(request) = state.tier3_requests.lock().pop_front() { + arbiter.spawn({ + let clock = clock.clone(); + let state = state.clone(); + async move { + let tier3_response = match request.body { + Tier3RequestBody::StatePart(StatePartRequestBody { shard_id, sync_hash, part_id }) => { + match state.client.send_async(StateRequestPart { shard_id, sync_hash, part_id }).await { + Ok(Some(client_response)) => { + PeerMessage::VersionedStateResponse(*client_response.0) + } + Ok(None) => { + tracing::debug!(target: "network", "client declined to respond to {:?}", request); + return; + } + Err(err) => { + tracing::error!(target: "network", ?err, "client failed to respond to {:?}", request); + return; + } + } + } + }; + + if !state.tier3.load().ready.contains_key(&request.peer_info.id) { + let result = async { + let stream = tcp::Stream::connect( + &request.peer_info, + tcp::Tier::T3, + &state.config.socket_options + ).await.context("tcp::Stream::connect()")?; + PeerActor::spawn_and_handshake(clock.clone(),stream,None,state.clone()).await.context("PeerActor::spawn()")?; + anyhow::Ok(()) + }.await; + + if let Err(ref err) = result { + tracing::info!(target: "network", err = format!("{:#}", err), "failed to connect to {}", request.peer_info); + } + } + + state.tier3.send_message(request.peer_info.id, Arc::new(tier3_response)); + } + }); + } + } + } + }); } }); Ok(Self::start_in_arbiter(&arbiter, move |_ctx| Self { @@ -553,6 +615,29 @@ impl PeerManagerActor { } } + /// TIER3 connections are established ad-hoc to transmit individual large messages. + /// Here we terminate these "single-purpose" connections after an idle timeout. + /// + /// When a TIER3 connection is established the intended message is already prepared in-memory, + /// so there is no concern of the timeout falling in between the handshake and the payload. + /// + /// A finer detail is that as long as a TIER3 connection remains open it can be reused to + /// transmit additional TIER3 payloads intended for the same peer. In such cases the message + /// can be lost if the timeout is reached precisely while it is in flight. For simplicity we + /// accept this risk; network requests are understood as unreliable and the requesting node has + /// retry logic anyway. TODO(saketh): consider if we can improve this in a simple way. + fn stop_tier3_idle_connections(&self) { + let now = self.clock.now(); + let _ = self + .state + .tier3 + .load() + .ready + .values() + .filter(|p| now - p.last_time_received_message.load() > TIER3_IDLE_TIMEOUT) + .map(|p| p.stop(None)); + } + /// Periodically monitor list of peers and: /// - request new peers from connected peers, /// - bootstrap outbound connections from known peers, @@ -621,6 +706,9 @@ impl PeerManagerActor { // If there are too many active connections try to remove some connections self.maybe_stop_active_connection(); + // Close Tier3 connections which have been idle for too long + self.stop_tier3_idle_connections(); + // Find peers that are not reliable (too much behind) - and make sure that we're not routing messages through them. let unreliable_peers = self.unreliable_peers(); metrics::PEER_UNRELIABLE.set(unreliable_peers.len() as i64); @@ -788,11 +876,42 @@ impl PeerManagerActor { NetworkResponses::RouteNotFound } } - NetworkRequests::StateRequestPart { shard_id, sync_hash, part_id, peer_id } => { - if self.state.tier2.send_message( - peer_id, - Arc::new(PeerMessage::StateRequestPart(shard_id, sync_hash, part_id)), - ) { + NetworkRequests::StateRequestPart { + shard_id, + sync_hash, + sync_prev_prev_hash, + part_id, + } => { + let mut success = false; + + // The node needs to include its own public address in the request + // so that the reponse can be sent over Tier3 + if let Some(addr) = *self.state.my_public_addr.read() { + if let Some(peer_id) = self.state.snapshot_hosts.select_host_for_part( + &sync_prev_prev_hash, + shard_id, + part_id, + ) { + success = + self.state.send_message_to_peer( + &self.clock, + tcp::Tier::T2, + self.state.sign_message( + &self.clock, + RawRoutedMessage { + target: PeerIdOrHash::PeerId(peer_id), + body: RoutedMessageBody::StatePartRequest( + StatePartRequest { shard_id, sync_hash, part_id, addr }, + ), + }, + ), + ); + } else { + tracing::debug!(target: "network", "no hosts available for {shard_id}, {sync_prev_prev_hash}"); + } + } + + if success { NetworkResponses::NoResponse } else { NetworkResponses::RouteNotFound diff --git a/chain/network/src/peer_manager/tests/connection_pool.rs b/chain/network/src/peer_manager/tests/connection_pool.rs index 79e00807f20..15da55a57aa 100644 --- a/chain/network/src/peer_manager/tests/connection_pool.rs +++ b/chain/network/src/peer_manager/tests/connection_pool.rs @@ -273,7 +273,7 @@ async fn invalid_edge() { ]; for (name, edge) in &testcases { - for tier in [tcp::Tier::T1, tcp::Tier::T2] { + for tier in [tcp::Tier::T1, tcp::Tier::T2, tcp::Tier::T3] { tracing::info!(target:"test","{name} {tier:?}"); let stream = tcp::Stream::connect(&pm.peer_info(), tier, &SocketOptions::default()) .await @@ -303,6 +303,7 @@ async fn invalid_edge() { let handshake = match tier { tcp::Tier::T1 => PeerMessage::Tier1Handshake(handshake), tcp::Tier::T2 => PeerMessage::Tier2Handshake(handshake), + tcp::Tier::T3 => PeerMessage::Tier3Handshake(handshake), }; stream.write(&handshake).await; let reason = events diff --git a/chain/network/src/rate_limits/messages_limits.rs b/chain/network/src/rate_limits/messages_limits.rs index 08d2d8ea40f..54638a829c3 100644 --- a/chain/network/src/rate_limits/messages_limits.rs +++ b/chain/network/src/rate_limits/messages_limits.rs @@ -220,6 +220,7 @@ fn get_key_and_token_cost(message: &PeerMessage) -> Option<(RateLimitedPeerMessa RoutedMessageBody::VersionedChunkEndorsement(_) => Some((ChunkEndorsement, 1)), RoutedMessageBody::EpochSyncRequest => None, RoutedMessageBody::EpochSyncResponse(_) => None, + RoutedMessageBody::StatePartRequest(_) => None, // TODO RoutedMessageBody::Ping(_) | RoutedMessageBody::Pong(_) | RoutedMessageBody::_UnusedChunkStateWitness @@ -239,6 +240,7 @@ fn get_key_and_token_cost(message: &PeerMessage) -> Option<(RateLimitedPeerMessa PeerMessage::VersionedStateResponse(_) => Some((VersionedStateResponse, 1)), PeerMessage::Tier1Handshake(_) | PeerMessage::Tier2Handshake(_) + | PeerMessage::Tier3Handshake(_) | PeerMessage::HandshakeFailure(_, _) | PeerMessage::LastEdge(_) | PeerMessage::Disconnect(_) diff --git a/chain/network/src/snapshot_hosts/mod.rs b/chain/network/src/snapshot_hosts/mod.rs index ca430900ac1..2a636b4e9ff 100644 --- a/chain/network/src/snapshot_hosts/mod.rs +++ b/chain/network/src/snapshot_hosts/mod.rs @@ -313,7 +313,6 @@ impl SnapshotHostsCache { } /// Given a state part request, selects a peer host to which the request should be sent. - #[allow(dead_code)] pub fn select_host_for_part( &self, sync_hash: &CryptoHash, diff --git a/chain/network/src/tcp.rs b/chain/network/src/tcp.rs index 06ba01a5033..5e5e78a7a42 100644 --- a/chain/network/src/tcp.rs +++ b/chain/network/src/tcp.rs @@ -26,6 +26,11 @@ pub enum Tier { /// consensus messages. Also, Tier1 peer discovery actually happens on Tier2 network, i.e. /// Tier2 network is necessary to bootstrap Tier1 connections. T2, + /// Tier3 connections are created ad hoc to directly transfer large messages, e.g. state parts. + /// Requests for state parts are routed over Tier2. A node receiving such a request initiates a + /// direct Tier3 connections to send the response. By sending large responses over dedicated + /// connections we avoid delaying other messages and we minimize network bandwidth usage. + T3, } #[derive(Clone, Debug)] diff --git a/chain/network/src/types.rs b/chain/network/src/types.rs index 3ddcbbcc067..4b005e24c2b 100644 --- a/chain/network/src/types.rs +++ b/chain/network/src/types.rs @@ -245,7 +245,12 @@ pub enum NetworkRequests { /// Request state header for given shard at given state root. StateRequestHeader { shard_id: ShardId, sync_hash: CryptoHash, peer_id: PeerId }, /// Request state part for given shard at given state root. - StateRequestPart { shard_id: ShardId, sync_hash: CryptoHash, part_id: u64, peer_id: PeerId }, + StateRequestPart { + shard_id: ShardId, + sync_hash: CryptoHash, + sync_prev_prev_hash: CryptoHash, + part_id: u64, + }, /// Ban given peer. BanPeer { peer_id: PeerId, ban_reason: ReasonForBan }, /// Announce account @@ -498,3 +503,24 @@ pub struct AccountIdOrPeerTrackingShard { /// Only send messages to peers whose latest chain height is no less `min_height` pub min_height: BlockHeight, } + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +/// An inbound request to which a response should be sent over Tier3 +pub struct Tier3Request { + /// Target peer to send the response to + pub peer_info: PeerInfo, + /// Contents of the request + pub body: Tier3RequestBody, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum Tier3RequestBody { + StatePart(StatePartRequestBody), +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct StatePartRequestBody { + pub shard_id: ShardId, + pub sync_hash: CryptoHash, + pub part_id: u64, +} diff --git a/tools/protocol-schema-check/res/protocol_schema.toml b/tools/protocol-schema-check/res/protocol_schema.toml index 0eb924a54e4..ab871b63095 100644 --- a/tools/protocol-schema-check/res/protocol_schema.toml +++ b/tools/protocol-schema-check/res/protocol_schema.toml @@ -153,7 +153,7 @@ PeerChainInfoV2 = 2686179044 PeerId = 2447445523 PeerIdOrHash = 4080492546 PeerInfo = 3831734408 -PeerMessage = 1912504821 +PeerMessage = 2881871188 Ping = 2783493472 Pong = 3159638327 PrepareError = 4009037507 @@ -177,8 +177,8 @@ ReceiptV1 = 2994842769 ReceiptValidationError = 551721215 ReceivedData = 3601438283 RootProof = 3135729669 -RoutedMessage = 334669112 -RoutedMessageBody = 237812276 +RoutedMessage = 3434968924 +RoutedMessageBody = 4241045537 RoutingTableUpdate = 2987752645 Secp256K1PublicKey = 4117078281 Secp256K1Signature = 3687154735 @@ -212,6 +212,7 @@ StakeAction = 2002027105 StateChangeCause = 1569242014 StateHeaderKey = 1385533899 StatePartKey = 3498655211 +StatePartRequest = 4194196967 StateResponseInfo = 2184941925 StateResponseInfoV1 = 1435664823 StateResponseInfoV2 = 1784931382