From 606a113cff10bb86568cbe709b7601deb1b94215 Mon Sep 17 00:00:00 2001 From: hopinheimer <48147533+hopinheimer@users.noreply.github.com> Date: Thu, 17 Oct 2024 04:27:56 -0400 Subject: [PATCH] IDONTWANT message optimisation to cutoff for smaller messages (#6456) * idontwant message opitmising * requested changes and linter appeasing * added the config cli flag * Merge branch 'unstable' into fix/idontwant-optimise * cli docs generated * const declaration * Hide extra technical cli flag * passing ci * Merge branch 'unstable' into fix/idontwant-optimise --- .../gossipsub/src/behaviour.rs | 9 ++-- .../gossipsub/src/behaviour/tests.rs | 46 ++++++++++++++++++- .../gossipsub/src/config.rs | 27 +++++++++++ beacon_node/lighthouse_network/src/config.rs | 8 ++++ .../lighthouse_network/src/service/mod.rs | 1 + beacon_node/src/cli.rs | 10 +++- beacon_node/src/config.rs | 14 ++++++ 7 files changed, 110 insertions(+), 5 deletions(-) diff --git a/beacon_node/lighthouse_network/gossipsub/src/behaviour.rs b/beacon_node/lighthouse_network/gossipsub/src/behaviour.rs index bf77f30979c..c50e76e7f2c 100644 --- a/beacon_node/lighthouse_network/gossipsub/src/behaviour.rs +++ b/beacon_node/lighthouse_network/gossipsub/src/behaviour.rs @@ -1812,9 +1812,6 @@ where // Calculate the message id on the transformed data. let msg_id = self.config.message_id(&message); - // Broadcast IDONTWANT messages. - self.send_idontwant(&raw_message, &msg_id, propagation_source); - // Check the validity of the message // Peers get penalized if this message is invalid. We don't add it to the duplicate cache // and instead continually penalize peers that repeatedly send this message. @@ -1830,6 +1827,12 @@ where self.mcache.observe_duplicate(&msg_id, propagation_source); return; } + + // Broadcast IDONTWANT messages + if raw_message.raw_protobuf_len() > self.config.idontwant_message_size_threshold() { + self.send_idontwant(&raw_message, &msg_id, propagation_source); + } + tracing::debug!( message=%msg_id, "Put message in duplicate_cache and resolve promises" diff --git a/beacon_node/lighthouse_network/gossipsub/src/behaviour/tests.rs b/beacon_node/lighthouse_network/gossipsub/src/behaviour/tests.rs index 00de3ba2dbc..62f026b568a 100644 --- a/beacon_node/lighthouse_network/gossipsub/src/behaviour/tests.rs +++ b/beacon_node/lighthouse_network/gossipsub/src/behaviour/tests.rs @@ -5266,13 +5266,14 @@ fn sends_idontwant() { let message = RawMessage { source: Some(peers[1]), - data: vec![12], + data: vec![12u8; 1024], sequence_number: Some(0), topic: topic_hashes[0].clone(), signature: None, key: None, validated: true, }; + gs.handle_received_message(message.clone(), &local_id); assert_eq!( receivers @@ -5292,6 +5293,48 @@ fn sends_idontwant() { ); } +#[test] +fn doesnt_sends_idontwant_for_lower_message_size() { + let (mut gs, peers, receivers, topic_hashes) = inject_nodes1() + .peer_no(5) + .topics(vec![String::from("topic1")]) + .to_subscribe(true) + .gs_config(Config::default()) + .explicit(1) + .peer_kind(PeerKind::Gossipsubv1_2) + .create_network(); + + let local_id = PeerId::random(); + + let message = RawMessage { + source: Some(peers[1]), + data: vec![12], + sequence_number: Some(0), + topic: topic_hashes[0].clone(), + signature: None, + key: None, + validated: true, + }; + + gs.handle_received_message(message.clone(), &local_id); + assert_eq!( + receivers + .into_iter() + .fold(0, |mut idontwants, (peer_id, c)| { + let non_priority = c.non_priority.into_inner(); + while !non_priority.is_empty() { + if let Ok(RpcOut::IDontWant(_)) = non_priority.try_recv() { + assert_ne!(peer_id, peers[1]); + idontwants += 1; + } + } + idontwants + }), + 0, + "IDONTWANT was sent" + ); +} + /// Test that a node doesn't send IDONTWANT messages to the mesh peers /// that don't run Gossipsub v1.2. #[test] @@ -5316,6 +5359,7 @@ fn doesnt_send_idontwant() { key: None, validated: true, }; + gs.handle_received_message(message.clone(), &local_id); assert_eq!( receivers diff --git a/beacon_node/lighthouse_network/gossipsub/src/config.rs b/beacon_node/lighthouse_network/gossipsub/src/config.rs index 1296e614c89..eb8dd432a33 100644 --- a/beacon_node/lighthouse_network/gossipsub/src/config.rs +++ b/beacon_node/lighthouse_network/gossipsub/src/config.rs @@ -98,6 +98,7 @@ pub struct Config { connection_handler_queue_len: usize, connection_handler_publish_duration: Duration, connection_handler_forward_duration: Duration, + idontwant_message_size_threshold: usize, } impl Config { @@ -370,6 +371,16 @@ impl Config { pub fn forward_queue_duration(&self) -> Duration { self.connection_handler_forward_duration } + + // The message size threshold for which IDONTWANT messages are sent. + // Sending IDONTWANT messages for small messages can have a negative effect to the overall + // traffic and CPU load. This acts as a lower bound cutoff for the message size to which + // IDONTWANT won't be sent to peers. Only works if the peers support Gossipsub1.2 + // (see https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.2.md#idontwant-message) + // default is 1kB + pub fn idontwant_message_size_threshold(&self) -> usize { + self.idontwant_message_size_threshold + } } impl Default for Config { @@ -440,6 +451,7 @@ impl Default for ConfigBuilder { connection_handler_queue_len: 5000, connection_handler_publish_duration: Duration::from_secs(5), connection_handler_forward_duration: Duration::from_millis(1000), + idontwant_message_size_threshold: 1000, }, invalid_protocol: false, } @@ -825,6 +837,17 @@ impl ConfigBuilder { self } + // The message size threshold for which IDONTWANT messages are sent. + // Sending IDONTWANT messages for small messages can have a negative effect to the overall + // traffic and CPU load. This acts as a lower bound cutoff for the message size to which + // IDONTWANT won't be sent to peers. Only works if the peers support Gossipsub1.2 + // (see https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.2.md#idontwant-message) + // default is 1kB + pub fn idontwant_message_size_threshold(&mut self, size: usize) -> &mut Self { + self.config.idontwant_message_size_threshold = size; + self + } + /// Constructs a [`Config`] from the given configuration and validates the settings. pub fn build(&self) -> Result { // check all constraints on config @@ -895,6 +918,10 @@ impl std::fmt::Debug for Config { "published_message_ids_cache_time", &self.published_message_ids_cache_time, ); + let _ = builder.field( + "idontwant_message_size_threhold", + &self.idontwant_message_size_threshold, + ); builder.finish() } } diff --git a/beacon_node/lighthouse_network/src/config.rs b/beacon_node/lighthouse_network/src/config.rs index ea29501784c..d70e50b1da3 100644 --- a/beacon_node/lighthouse_network/src/config.rs +++ b/beacon_node/lighthouse_network/src/config.rs @@ -19,6 +19,7 @@ pub const DEFAULT_IPV4_ADDRESS: Ipv4Addr = Ipv4Addr::UNSPECIFIED; pub const DEFAULT_TCP_PORT: u16 = 9000u16; pub const DEFAULT_DISC_PORT: u16 = 9000u16; pub const DEFAULT_QUIC_PORT: u16 = 9001u16; +pub const DEFAULT_IDONTWANT_MESSAGE_SIZE_THRESHOLD: usize = 1000usize; /// The maximum size of gossip messages. pub fn gossip_max_size(is_merge_enabled: bool, gossip_max_size: usize) -> usize { @@ -141,6 +142,10 @@ pub struct Config { /// Configuration for the inbound rate limiter (requests received by this node). pub inbound_rate_limiter_config: Option, + + /// Configuration for the minimum message size for which IDONTWANT messages are send in the mesh. + /// Lower the value reduces the optimization effect of the IDONTWANT messages. + pub idontwant_message_size_threshold: usize, } impl Config { @@ -352,6 +357,7 @@ impl Default for Config { outbound_rate_limiter_config: None, invalid_block_storage: None, inbound_rate_limiter_config: None, + idontwant_message_size_threshold: DEFAULT_IDONTWANT_MESSAGE_SIZE_THRESHOLD, } } } @@ -433,6 +439,7 @@ pub fn gossipsub_config( gossipsub_config_params: GossipsubConfigParams, seconds_per_slot: u64, slots_per_epoch: u64, + idontwant_message_size_threshold: usize, ) -> gossipsub::Config { fn prefix( prefix: [u8; 4], @@ -498,6 +505,7 @@ pub fn gossipsub_config( .duplicate_cache_time(duplicate_cache_time) .message_id_fn(gossip_message_id) .allow_self_origin(true) + .idontwant_message_size_threshold(idontwant_message_size_threshold) .build() .expect("valid gossipsub configuration") } diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index ff641f666fe..79889274de0 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -237,6 +237,7 @@ impl Network { gossipsub_config_params, ctx.chain_spec.seconds_per_slot, E::slots_per_epoch(), + config.idontwant_message_size_threshold, ); let score_settings = PeerScoreSettings::new(&ctx.chain_spec, gs_config.mesh_n()); diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 1e9611fd1eb..d6ed1068036 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -659,7 +659,15 @@ pub fn cli_app() -> Command { .action(ArgAction::Set) .display_order(0) ) - + .arg( + Arg::new("idontwant-message-size-threshold") + .long("idontwant-message-size-threshold") + .help("Specifies the minimum message size for which IDONTWANT messages are sent. \ + This an optimization strategy to not send IDONTWANT messages for smaller messages.") + .action(ArgAction::Set) + .hide(true) + .display_order(0) + ) /* * Monitoring metrics */ diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 0eff8577c4a..f62ccfe3ed9 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -1487,6 +1487,20 @@ pub fn set_network_config( Some(Default::default()) } }; + + if let Some(idontwant_message_size_threshold) = + cli_args.get_one::("idontwant-message-size-threshold") + { + config.idontwant_message_size_threshold = idontwant_message_size_threshold + .parse::() + .map_err(|_| { + format!( + "Invalid idontwant message size threshold value passed: {}", + idontwant_message_size_threshold + ) + })?; + } + Ok(()) }