Skip to content

Commit

Permalink
Merge pull request #4661 from marc-casperlabs/incoming-message-priori…
Browse files Browse the repository at this point in the history
…tization

Leverage validator matrix to restore incoming request prioritization
  • Loading branch information
marc-casperlabs authored Apr 12, 2024
2 parents c9aa98c + c056571 commit 1a506c1
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 5 deletions.
4 changes: 4 additions & 0 deletions node/src/components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,11 @@ pub(crate) trait PortBoundComponent<REv>: InitializedComponent<REv> {
) -> Result<Effects<Self::ComponentEvent>, Self::Error>;
}

/// A component that is subscribing to changes in the validator set.
pub(crate) trait ValidatorBoundComponent<REv>: Component<REv> {
/// Notifies the component that the validator set has changed.
///
/// This function is guaranteed to be called whenever a new era begins.
fn handle_validators(
&mut self,
effect_builder: EffectBuilder<REv>,
Expand Down
1 change: 1 addition & 0 deletions node/src/components/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ where
handshake_configuration,
keylog,
self.net_metrics.clone(),
self.validator_matrix.clone(),
);

let conman = ConMan::new(
Expand Down
3 changes: 2 additions & 1 deletion node/src/components/network/conman.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ pub(crate) trait ProtocolHandler: Send + Sync {
async fn handle_incoming_request(
&self,
peer: NodeId,
consensus_key: Option<&PublicKey>,
request: IncomingRequest,
) -> Result<(), String>;
}
Expand Down Expand Up @@ -1032,7 +1033,7 @@ impl ActiveRoute {
if let Err(err) = self
.ctx
.protocol_handler
.handle_incoming_request(self.peer_id, request)
.handle_incoming_request(self.peer_id, self.consensus_key.as_deref(), request)
.await
{
// The handler return an error, exit and close connection.
Expand Down
16 changes: 12 additions & 4 deletions node/src/components/network/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::{
sync::{Arc, Weak},
};

use casper_types::PublicKey;
use juliet::rpc::IncomingRequest;
use openssl::ssl::Ssl;
use strum::EnumCount;
Expand All @@ -20,7 +21,7 @@ use crate::{
components::network::{deserialize_network_message, Message},
reactor::{EventQueueHandle, QueueKind},
tls,
types::{chainspec::JulietConfig, NodeId},
types::{chainspec::JulietConfig, NodeId, ValidatorMatrix},
utils::{rate_limited::rate_limited, LockedLineWriter},
};

Expand Down Expand Up @@ -139,6 +140,7 @@ pub(super) struct TransportHandler<REv: 'static, P> {
handshake_configuration: HandshakeConfiguration,
keylog: Option<LockedLineWriter>,
net_metrics: Arc<Metrics>,
validator_matrix: ValidatorMatrix,
_payload: PhantomData<P>,
}

Expand All @@ -152,13 +154,15 @@ where
handshake_configuration: HandshakeConfiguration,
keylog: Option<LockedLineWriter>,
net_metrics: Arc<Metrics>,
validator_matrix: ValidatorMatrix,
) -> Self {
Self {
event_queue,
identity,
handshake_configuration,
keylog,
net_metrics,
validator_matrix,
_payload: PhantomData,
}
}
Expand Down Expand Up @@ -212,9 +216,10 @@ where
async fn handle_incoming_request(
&self,
peer: NodeId,
consensus_key: Option<&PublicKey>,
request: IncomingRequest,
) -> Result<(), String> {
self.do_handle_incoming_request(peer, request)
self.do_handle_incoming_request(peer, consensus_key, request)
.await
.map_err(|err| err.to_string())
}
Expand All @@ -228,6 +233,7 @@ where
async fn do_handle_incoming_request(
&self,
peer: NodeId,
consensus_key: Option<&PublicKey>,
request: IncomingRequest,
) -> Result<(), MessageReceiverError> {
let channel = Channel::from_repr(request.channel().get())
Expand All @@ -251,8 +257,10 @@ where
});
}

// TODO: Restore priorization based on validator status.
let validator_status = false;
let validator_status = consensus_key
.map(|key| self.validator_matrix.is_active_or_upcoming_validator(key))
.unwrap_or(false);

let queue_kind = if validator_status {
QueueKind::MessageValidator
} else if msg.is_low_priority() {
Expand Down
14 changes: 14 additions & 0 deletions node/src/types/validator_matrix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,20 @@ impl ValidatorMatrix {
}
}

/// Determine if the active validator is in a current or upcoming set of active validators.
///
/// This function may produce false positives, as it works backwards from the highest known
/// era. Depending on the current network state, this may be an upcoming or active era, the
/// previous era validators may be positively identified by this function.
#[inline]
pub(crate) fn is_active_or_upcoming_validator(&self, public_key: &PublicKey) -> bool {
self.read_inner()
.values()
.rev()
.take(self.auction_delay as usize + 1)
.any(|validator_weights| validator_weights.is_validator(public_key))
}

/// Returns the public keys of all validators in a given era.
///
/// Will return `None` if the era is not known.
Expand Down

0 comments on commit 1a506c1

Please sign in to comment.