diff --git a/node/src/components/network.rs b/node/src/components/network.rs index 3bfe03f615..d2d4039c36 100644 --- a/node/src/components/network.rs +++ b/node/src/components/network.rs @@ -294,6 +294,7 @@ where handshake_configuration, keylog, self.net_metrics.clone(), + self.validator_matrix.clone(), ); let conman = ConMan::new( diff --git a/node/src/components/network/conman.rs b/node/src/components/network/conman.rs index feff7c6400..f8870bb3c4 100644 --- a/node/src/components/network/conman.rs +++ b/node/src/components/network/conman.rs @@ -246,6 +246,7 @@ pub(crate) trait ProtocolHandler: Send + Sync { async fn handle_incoming_request( &self, peer: NodeId, + consensus_key: Option<&PublicKey>, request: IncomingRequest, ) -> Result<(), String>; } @@ -1032,7 +1033,7 @@ impl ActiveRoute { if let Err(err) = self .ctx .protocol_handler - .handle_incoming_request(self.peer_id, request) + .handle_incoming_request(self.peer_id, self.consensus_key.as_deref(), request) .await { // The handler return an error, exit and close connection. diff --git a/node/src/components/network/transport.rs b/node/src/components/network/transport.rs index 7611247cb4..a80ec2feb0 100644 --- a/node/src/components/network/transport.rs +++ b/node/src/components/network/transport.rs @@ -9,6 +9,7 @@ use std::{ sync::{Arc, Weak}, }; +use casper_types::PublicKey; use juliet::rpc::IncomingRequest; use openssl::ssl::Ssl; use strum::EnumCount; @@ -20,7 +21,7 @@ use crate::{ components::network::{deserialize_network_message, Message}, reactor::{EventQueueHandle, QueueKind}, tls, - types::{chainspec::JulietConfig, NodeId}, + types::{chainspec::JulietConfig, NodeId, ValidatorMatrix}, utils::{rate_limited::rate_limited, LockedLineWriter}, }; @@ -139,6 +140,7 @@ pub(super) struct TransportHandler { handshake_configuration: HandshakeConfiguration, keylog: Option, net_metrics: Arc, + validator_matrix: ValidatorMatrix, _payload: PhantomData

, } @@ -152,6 +154,7 @@ where handshake_configuration: HandshakeConfiguration, keylog: Option, net_metrics: Arc, + validator_matrix: ValidatorMatrix, ) -> Self { Self { event_queue, @@ -159,6 +162,7 @@ where handshake_configuration, keylog, net_metrics, + validator_matrix, _payload: PhantomData, } } @@ -212,9 +216,10 @@ where async fn handle_incoming_request( &self, peer: NodeId, + consensus_key: Option<&PublicKey>, request: IncomingRequest, ) -> Result<(), String> { - self.do_handle_incoming_request(peer, request) + self.do_handle_incoming_request(peer, consensus_key, request) .await .map_err(|err| err.to_string()) } @@ -228,6 +233,7 @@ where async fn do_handle_incoming_request( &self, peer: NodeId, + consensus_key: Option<&PublicKey>, request: IncomingRequest, ) -> Result<(), MessageReceiverError> { let channel = Channel::from_repr(request.channel().get()) @@ -251,8 +257,10 @@ where }); } - // TODO: Restore priorization based on validator status. - let validator_status = false; + let validator_status = consensus_key + .map(|key| self.validator_matrix.is_active_or_upcoming_validator(key)) + .unwrap_or(false); + let queue_kind = if validator_status { QueueKind::MessageValidator } else if msg.is_low_priority() { diff --git a/node/src/types/validator_matrix.rs b/node/src/types/validator_matrix.rs index 21fdad24e0..7e0fee3de6 100644 --- a/node/src/types/validator_matrix.rs +++ b/node/src/types/validator_matrix.rs @@ -238,8 +238,8 @@ impl ValidatorMatrix { /// Determine if the active validator is in a current or upcoming set of active validators. /// - /// This function may produce false positives, as it works backwards from the highest known era. - /// Depending on the current network state, this may be an upcoming or active era, at least the + /// This function may produce false positives, as it works backwards from the highest known + /// era. Depending on the current network state, this may be an upcoming or active era, the /// previous era validators may be positively identified by this function. #[inline] pub(crate) fn is_active_or_upcoming_validator(&self, public_key: &PublicKey) -> bool {