Skip to content

Commit

Permalink
bound the size of SnapshotHostsCache
Browse files Browse the repository at this point in the history
  • Loading branch information
saketh-are committed Nov 8, 2023
1 parent d8ada79 commit 73436f0
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 33 deletions.
6 changes: 6 additions & 0 deletions chain/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::network_protocol::PeerInfo;
use crate::peer_manager::peer_manager_actor::Event;
use crate::peer_manager::peer_store;
use crate::sink::Sink;
use crate::snapshot_hosts;
use crate::stun;
use crate::tcp;
use crate::types::ROUTED_MESSAGE_TTL;
Expand Down Expand Up @@ -96,6 +97,7 @@ pub struct NetworkConfig {
pub validator: Option<ValidatorConfig>,

pub peer_store: peer_store::Config,
pub snapshot_hosts: snapshot_hosts::Config,
pub whitelist_nodes: Vec<PeerInfo>,
pub handshake_timeout: time::Duration,

Expand Down Expand Up @@ -285,6 +287,9 @@ impl NetworkConfig {
ban_window: cfg.ban_window.try_into()?,
peer_expiration_duration: cfg.peer_expiration_duration.try_into()?,
},
snapshot_hosts: snapshot_hosts::Config {
snapshot_hosts_cache_size: cfg.snapshot_hosts_cache_size,
},
whitelist_nodes: if cfg.whitelist_nodes.is_empty() {
vec![]
} else {
Expand Down Expand Up @@ -367,6 +372,7 @@ impl NetworkConfig {
peer_expiration_duration: time::Duration::seconds(60 * 60),
connect_only_to_boot_nodes: false,
},
snapshot_hosts: snapshot_hosts::Config { snapshot_hosts_cache_size: 1000 },
whitelist_nodes: vec![],
handshake_timeout: time::Duration::seconds(5),
connect_to_reliable_peers_on_startup: true,
Expand Down
8 changes: 8 additions & 0 deletions chain/network/src/config_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ fn default_monitor_peers_max_period() -> Duration {
fn default_peer_states_cache_size() -> u32 {
1000
}
/// Maximum number of snapshot hosts to keep in memory.
fn default_snapshot_hosts_cache_size() -> u32 {
1000
}
/// Remove peers that we didn't hear about for this amount of time.
fn default_peer_expiration_duration() -> Duration {
Duration::from_secs(7 * 24 * 60 * 60)
Expand Down Expand Up @@ -139,6 +143,9 @@ pub struct Config {
/// Maximum number of peer states to keep in memory.
#[serde(default = "default_peer_states_cache_size")]
pub peer_states_cache_size: u32,
/// Maximum number of snapshot hosts to keep in memory.
#[serde(default = "default_snapshot_hosts_cache_size")]
pub snapshot_hosts_cache_size: u32,
// Remove peers that were not active for this amount of time.
#[serde(default = "default_peer_expiration_duration")]
pub peer_expiration_duration: Duration,
Expand Down Expand Up @@ -296,6 +303,7 @@ impl Default for Config {
handshake_timeout: Duration::from_secs(20),
skip_sync_wait: false,
peer_states_cache_size: default_peer_states_cache_size(),
snapshot_hosts_cache_size: default_snapshot_hosts_cache_size(),
ban_window: Duration::from_secs(3 * 60 * 60),
blacklist: vec![],
ttl_account_id_router: default_ttl_account_id_router(),
Expand Down
2 changes: 1 addition & 1 deletion chain/network/src/peer_manager/network_state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ impl NetworkState {
tier1: connection::Pool::new(config.node_id()),
inbound_handshake_permits: Arc::new(tokio::sync::Semaphore::new(LIMIT_PENDING_PEERS)),
peer_store,
snapshot_hosts: Arc::new(SnapshotHostsCache::new()),
snapshot_hosts: Arc::new(SnapshotHostsCache::new(config.snapshot_hosts.clone())),
connection_store: connection_store::ConnectionStore::new(store).unwrap(),
pending_reconnect: Mutex::new(Vec::<PeerInfo>::new()),
accounts_data: Arc::new(AccountDataCache::new()),
Expand Down
69 changes: 41 additions & 28 deletions chain/network/src/snapshot_hosts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
//! in the network and stored locally in this cache.
use crate::concurrency;
use crate::concurrency::arc_mutex::ArcMutex;
use crate::network_protocol::SnapshotHostInfo;
use lru::LruCache;
use near_primitives::network::PeerId;
use parking_lot::Mutex;
use rayon::iter::ParallelBridge;
use std::collections::HashMap;
use std::sync::Arc;
Expand All @@ -23,16 +24,23 @@ pub(crate) enum SnapshotHostInfoError {
DuplicatePeerId,
}

/// TODO(saketh): Introduce a cache size limit
#[derive(Clone)]
pub struct Config {
/// The maximum number of SnapshotHostInfos to store locally.
/// At present this constraint is enforced using a simple
/// least-recently-used cache. In the future, we may wish to
/// implement something more sophisticated.
pub snapshot_hosts_cache_size: u32,
}

struct Inner {
/// The latest known SnapshotHostInfo for each node in the network
hosts: im::HashMap<PeerId, Arc<SnapshotHostInfo>>,
hosts: LruCache<PeerId, Arc<SnapshotHostInfo>>,
}

impl Inner {
fn is_new(&self, h: &SnapshotHostInfo) -> bool {
match self.hosts.get(&h.peer_id) {
match self.hosts.peek(&h.peer_id) {
Some(old) if old.epoch_height >= h.epoch_height => false,
_ => true,
}
Expand All @@ -45,16 +53,17 @@ impl Inner {
if !self.is_new(&d) {
return None;
}
self.hosts.insert(d.peer_id.clone(), d.clone());
self.hosts.push(d.peer_id.clone(), d.clone());
Some(d)
}
}

pub(crate) struct SnapshotHostsCache(ArcMutex<Inner>);
pub(crate) struct SnapshotHostsCache(Mutex<Inner>);

impl SnapshotHostsCache {
pub fn new() -> Self {
Self(ArcMutex::new(Inner { hosts: im::HashMap::new() }))
pub fn new(config: Config) -> Self {
let hosts = LruCache::new(config.snapshot_hosts_cache_size as usize);
Self(Mutex::new(Inner { hosts }))
}

/// Selects new data and verifies the signatures.
Expand All @@ -66,17 +75,19 @@ impl SnapshotHostsCache {
) -> (Vec<Arc<SnapshotHostInfo>>, Option<SnapshotHostInfoError>) {
// Filter out any data which is outdated or which we already have.
let mut new_data = HashMap::new();
let inner = self.0.load();
for d in data {
// Sharing multiple entries for the same peer is considered malicious,
// since all but one are obviously outdated.
if new_data.contains_key(&d.peer_id) {
return (vec![], Some(SnapshotHostInfoError::DuplicatePeerId));
}
// It is fine to broadcast data we already know about.
// It is fine to broadcast data which we know to be outdated.
if inner.is_new(&d) {
new_data.insert(d.peer_id.clone(), d);
{
let inner = self.0.lock();
for d in data {
// Sharing multiple entries for the same peer is considered malicious,
// since all but one are obviously outdated.
if new_data.contains_key(&d.peer_id) {
return (vec![], Some(SnapshotHostInfoError::DuplicatePeerId));
}
// It is fine to broadcast data we already know about.
// It is fine to broadcast data which we know to be outdated.
if inner.is_new(&d) {
new_data.insert(d.peer_id.clone(), d);
}
}
}

Expand All @@ -99,22 +110,24 @@ impl SnapshotHostsCache {
/// Returns the data inserted and optionally a verification error.
/// WriteLock is acquired only for the final update (after verification).
pub async fn insert(
self: &Arc<Self>,
self: &Self,
data: Vec<Arc<SnapshotHostInfo>>,
) -> (Vec<Arc<SnapshotHostInfo>>, Option<SnapshotHostInfoError>) {
let this = self.clone();
// Execute verification on the rayon threadpool.
let (data, err) = this.verify(data).await;
let (data, err) = self.verify(data).await;
// Insert the successfully verified data, even if an error has been encountered.
let inserted = self.0.update(|mut inner| {
let inserted = data.into_iter().filter_map(|d| inner.try_insert(d)).collect();
(inserted, inner)
});
let mut newly_inserted_data: Vec<Arc<SnapshotHostInfo>> = vec![];
let mut inner = self.0.lock();
for d in data {
if let Some(inserted) = inner.try_insert(d) {
newly_inserted_data.push(inserted);
}
}
// Return the inserted data.
(inserted, err)
(newly_inserted_data, err)
}

pub fn get_hosts(&self) -> Vec<Arc<SnapshotHostInfo>> {
self.0.load().hosts.values().cloned().collect()
self.0.lock().hosts.iter().map(|(_, v)| v.clone()).collect()
}
}
49 changes: 45 additions & 4 deletions chain/network/src/snapshot_hosts/tests.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::network_protocol::testonly as data;
use crate::snapshot_hosts::{SnapshotHostInfoError, SnapshotHostsCache};
use crate::snapshot_hosts::{Config, SnapshotHostInfoError, SnapshotHostsCache};
use crate::testonly::assert_is_superset;
use crate::testonly::{make_rng, AsSet as _};
use crate::types::SnapshotHostInfo;
Expand Down Expand Up @@ -45,7 +45,8 @@ async fn happy_path() {
let peer1 = PeerId::new(key1.public_key());
let peer2 = PeerId::new(key2.public_key());

let cache = Arc::new(SnapshotHostsCache::new());
let config = Config { snapshot_hosts_cache_size: 100 };
let cache = SnapshotHostsCache::new(config);
assert_eq!(cache.get_hosts().len(), 0); // initially empty

// initial insert
Expand Down Expand Up @@ -79,7 +80,9 @@ async fn invalid_signature() {
let peer0 = PeerId::new(key0.public_key());
let peer1 = PeerId::new(key1.public_key());

let cache = Arc::new(SnapshotHostsCache::new());
let config = Config { snapshot_hosts_cache_size: 100 };
let cache = SnapshotHostsCache::new(config);

let info0_invalid_sig = Arc::new(make_snapshot_host_info(&peer0, 1, vec![0, 1, 2, 3], &key1));
let info1 = Arc::new(make_snapshot_host_info(&peer1, 1, vec![0, 1, 2, 3], &key1));
let res = cache.insert(vec![info0_invalid_sig.clone(), info1.clone()]).await;
Expand All @@ -102,7 +105,8 @@ async fn duplicate_peer_id() {
let key0 = data::make_secret_key(rng);
let peer0 = PeerId::new(key0.public_key());

let cache = Arc::new(SnapshotHostsCache::new());
let config = Config { snapshot_hosts_cache_size: 100 };
let cache = SnapshotHostsCache::new(config);

let info00 = Arc::new(make_snapshot_host_info(&peer0, 1, vec![0, 1, 2, 3], &key0));
let info01 = Arc::new(make_snapshot_host_info(&peer0, 2, vec![0, 3], &key0));
Expand All @@ -113,3 +117,40 @@ async fn duplicate_peer_id() {
// no partial data is stored
assert_eq!(0, cache.get_hosts().len());
}

#[tokio::test]
async fn test_lru_eviction() {
init_test_logger();
let mut rng = make_rng(2947294234);
let rng = &mut rng;

let key0 = data::make_secret_key(rng);
let key1 = data::make_secret_key(rng);
let key2 = data::make_secret_key(rng);

let peer0 = PeerId::new(key0.public_key());
let peer1 = PeerId::new(key1.public_key());
let peer2 = PeerId::new(key2.public_key());

let config = Config { snapshot_hosts_cache_size: 2 };
let cache = SnapshotHostsCache::new(config);

// initial inserts to capacity
let info0 = Arc::new(make_snapshot_host_info(&peer0, 123, vec![0, 1, 2, 3], &key0));
let res = cache.insert(vec![info0.clone()]).await;
assert_eq!([&info0].as_set(), unwrap(&res).as_set());
assert_eq!([&info0].as_set(), cache.get_hosts().iter().collect::<HashSet<_>>());

let info1 = Arc::new(make_snapshot_host_info(&peer1, 123, vec![2], &key1));
let res = cache.insert(vec![info1.clone()]).await;
assert_eq!([&info1].as_set(), unwrap(&res).as_set());
assert_eq!([&info0, &info1].as_set(), cache.get_hosts().iter().collect::<HashSet<_>>());

// insert past capacity
let info2 = Arc::new(make_snapshot_host_info(&peer2, 123, vec![1, 3], &key2));
let res = cache.insert(vec![info2.clone()]).await;
// check that the new data is accepted
assert_eq!([&info2].as_set(), unwrap(&res).as_set());
// check that the oldest data was evicted
assert_eq!([&info1, &info2].as_set(), cache.get_hosts().iter().collect::<HashSet<_>>());
}

0 comments on commit 73436f0

Please sign in to comment.