From 9aefb5539baff637d68deb3dd386ff45312f3573 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Mon, 21 Oct 2024 23:42:51 +1100 Subject: [PATCH] Fix BlobsByRange by reverting PR6462 (#6526) * Revert "Remove generic E from RequestId (#6462)" This reverts commit 772929fae27bd9a2978884c7648dc10fecf3d876. --- .../lighthouse_network/src/rpc/codec.rs | 19 ++++++------- .../lighthouse_network/src/rpc/handler.rs | 14 ++++------ .../lighthouse_network/src/rpc/methods.rs | 28 +++++++++++-------- beacon_node/lighthouse_network/src/rpc/mod.rs | 10 +++---- .../lighthouse_network/src/rpc/outbound.rs | 8 ++---- .../lighthouse_network/src/rpc/protocol.rs | 14 +++++----- .../src/rpc/rate_limiter.rs | 3 +- .../src/rpc/self_limiter.rs | 12 ++++---- .../lighthouse_network/src/service/mod.rs | 6 ++-- .../lighthouse_network/tests/rpc_tests.rs | 1 - .../network_beacon_processor/rpc_methods.rs | 4 +-- .../src/network_beacon_processor/tests.rs | 7 ++--- beacon_node/network/src/router.rs | 8 +++--- beacon_node/network/src/service.rs | 2 +- .../network/src/sync/network_context.rs | 1 - 15 files changed, 68 insertions(+), 69 deletions(-) diff --git a/beacon_node/lighthouse_network/src/rpc/codec.rs b/beacon_node/lighthouse_network/src/rpc/codec.rs index 19f1b8def73..9bdecab70b1 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec.rs @@ -28,7 +28,7 @@ const CONTEXT_BYTES_LEN: usize = 4; /* Inbound Codec */ -pub struct SSZSnappyInboundCodec { +pub struct SSZSnappyInboundCodec { protocol: ProtocolId, inner: Uvi, len: Option, @@ -143,7 +143,7 @@ impl Encoder> for SSZSnappyInboundCodec { // Decoder for inbound streams: Decodes RPC requests from peers impl Decoder for SSZSnappyInboundCodec { - type Item = RequestType; + type Item = RequestType; type Error = RPCError; fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { @@ -195,7 +195,7 @@ impl Decoder for SSZSnappyInboundCodec { } /* Outbound Codec: Codec for initiating RPC requests */ -pub struct SSZSnappyOutboundCodec { +pub struct SSZSnappyOutboundCodec { inner: Uvi, len: Option, protocol: ProtocolId, @@ -322,10 +322,10 @@ impl SSZSnappyOutboundCodec { } // Encoder for outbound streams: Encodes RPC Requests to peers -impl Encoder for SSZSnappyOutboundCodec { +impl Encoder> for SSZSnappyOutboundCodec { type Error = RPCError; - fn encode(&mut self, item: RequestType, dst: &mut BytesMut) -> Result<(), Self::Error> { + fn encode(&mut self, item: RequestType, dst: &mut BytesMut) -> Result<(), Self::Error> { let bytes = match item { RequestType::Status(req) => req.as_ssz_bytes(), RequestType::Goodbye(req) => req.as_ssz_bytes(), @@ -549,11 +549,11 @@ fn handle_length( /// Decodes an `InboundRequest` from the byte stream. /// `decoded_buffer` should be an ssz-encoded bytestream with // length = length-prefix received in the beginning of the stream. -fn handle_rpc_request( +fn handle_rpc_request( versioned_protocol: SupportedProtocol, decoded_buffer: &[u8], spec: &ChainSpec, -) -> Result, RPCError> { +) -> Result>, RPCError> { match versioned_protocol { SupportedProtocol::StatusV1 => Ok(Some(RequestType::Status( StatusMessage::from_ssz_bytes(decoded_buffer)?, @@ -1035,7 +1035,6 @@ mod tests { BlobsByRangeRequest { start_slot: 0, count: 10, - max_blobs_per_block: Spec::max_blobs_per_block(), } } @@ -1181,7 +1180,7 @@ mod tests { } /// Verifies that requests we send are encoded in a way that we would correctly decode too. - fn encode_then_decode_request(req: RequestType, fork_name: ForkName, spec: &ChainSpec) { + fn encode_then_decode_request(req: RequestType, fork_name: ForkName, spec: &ChainSpec) { let fork_context = Arc::new(fork_context(fork_name)); let max_packet_size = max_rpc_size(&fork_context, spec.max_chunk_size as usize); let protocol = ProtocolId::new(req.versioned_protocol(), Encoding::SSZSnappy); @@ -1778,7 +1777,7 @@ mod tests { fn test_encode_then_decode_request() { let chain_spec = Spec::default_spec(); - let requests: &[RequestType] = &[ + let requests: &[RequestType] = &[ RequestType::Ping(ping_message()), RequestType::Status(status_message()), RequestType::Goodbye(GoodbyeReason::Fault), diff --git a/beacon_node/lighthouse_network/src/rpc/handler.rs b/beacon_node/lighthouse_network/src/rpc/handler.rs index 74ccb85dccc..e76d6d27866 100644 --- a/beacon_node/lighthouse_network/src/rpc/handler.rs +++ b/beacon_node/lighthouse_network/src/rpc/handler.rs @@ -20,7 +20,6 @@ use slog::{crit, debug, trace}; use smallvec::SmallVec; use std::{ collections::{hash_map::Entry, VecDeque}, - marker::PhantomData, pin::Pin, sync::Arc, task::{Context, Poll}, @@ -97,7 +96,7 @@ where events_out: SmallVec<[HandlerEvent; 4]>, /// Queue of outbound substreams to open. - dial_queue: SmallVec<[(Id, RequestType); 4]>, + dial_queue: SmallVec<[(Id, RequestType); 4]>, /// Current number of concurrent outbound substreams being opened. dial_negotiated: u32, @@ -207,7 +206,7 @@ pub enum OutboundSubstreamState { /// The framed negotiated substream. substream: Box>, /// Keeps track of the actual request sent. - request: RequestType, + request: RequestType, }, /// Closing an outbound substream> Closing(Box>), @@ -275,7 +274,7 @@ where } /// Opens an outbound substream with a request. - fn send_request(&mut self, id: Id, req: RequestType) { + fn send_request(&mut self, id: Id, req: RequestType) { match self.state { HandlerState::Active => { self.dial_queue.push((id, req)); @@ -331,7 +330,7 @@ where type ToBehaviour = HandlerEvent; type InboundProtocol = RPCProtocol; type OutboundProtocol = OutboundRequestContainer; - type OutboundOpenInfo = (Id, RequestType); // Keep track of the id and the request + type OutboundOpenInfo = (Id, RequestType); // Keep track of the id and the request type InboundOpenInfo = (); fn listen_protocol(&self) -> SubstreamProtocol { @@ -789,7 +788,6 @@ where req: req.clone(), fork_context: self.fork_context.clone(), max_rpc_size: self.listen_protocol().upgrade().max_rpc_size, - phantom: PhantomData, }, (), ) @@ -907,7 +905,7 @@ where fn on_fully_negotiated_outbound( &mut self, substream: OutboundFramed, - (id, request): (Id, RequestType), + (id, request): (Id, RequestType), ) { self.dial_negotiated -= 1; // Reset any io-retries counter. @@ -963,7 +961,7 @@ where } fn on_dial_upgrade_error( &mut self, - request_info: (Id, RequestType), + request_info: (Id, RequestType), error: StreamUpgradeError, ) { let (id, req) = request_info; diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index 912fda36060..bb8bfb0e206 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -8,6 +8,7 @@ use ssz_derive::{Decode, Encode}; use ssz_types::{typenum::U256, VariableList}; use std::collections::BTreeMap; use std::fmt::Display; +use std::marker::PhantomData; use std::ops::Deref; use std::sync::Arc; use strum::IntoStaticStr; @@ -93,19 +94,27 @@ pub struct Ping { variant_attributes(derive(Clone, Debug, PartialEq, Serialize),) )] #[derive(Clone, Debug, PartialEq)] -pub struct MetadataRequest; +pub struct MetadataRequest { + _phantom_data: PhantomData, +} -impl MetadataRequest { +impl MetadataRequest { pub fn new_v1() -> Self { - Self::V1(MetadataRequestV1 {}) + Self::V1(MetadataRequestV1 { + _phantom_data: PhantomData, + }) } pub fn new_v2() -> Self { - Self::V2(MetadataRequestV2 {}) + Self::V2(MetadataRequestV2 { + _phantom_data: PhantomData, + }) } pub fn new_v3() -> Self { - Self::V3(MetadataRequestV3 {}) + Self::V3(MetadataRequestV3 { + _phantom_data: PhantomData, + }) } } @@ -315,14 +324,11 @@ pub struct BlobsByRangeRequest { /// The number of slots from the start slot. pub count: u64, - - /// maximum number of blobs in a single block. - pub max_blobs_per_block: usize, } impl BlobsByRangeRequest { - pub fn max_blobs_requested(&self) -> u64 { - self.count.saturating_mul(self.max_blobs_per_block as u64) + pub fn max_blobs_requested(&self) -> u64 { + self.count.saturating_mul(E::max_blobs_per_block() as u64) } } @@ -338,7 +344,7 @@ pub struct DataColumnsByRangeRequest { } impl DataColumnsByRangeRequest { - pub fn max_requested(&self) -> u64 { + pub fn max_requested(&self) -> u64 { self.count.saturating_mul(self.columns.len() as u64) } diff --git a/beacon_node/lighthouse_network/src/rpc/mod.rs b/beacon_node/lighthouse_network/src/rpc/mod.rs index ed4da463ffd..7d091da7660 100644 --- a/beacon_node/lighthouse_network/src/rpc/mod.rs +++ b/beacon_node/lighthouse_network/src/rpc/mod.rs @@ -61,7 +61,7 @@ pub enum RPCSend { /// /// The `Id` is given by the application making the request. These /// go over *outbound* connections. - Request(Id, RequestType), + Request(Id, RequestType), /// A response sent from Lighthouse. /// /// The `SubstreamId` must correspond to the RPC-given ID of the original request received from the @@ -79,7 +79,7 @@ pub enum RPCReceived { /// /// The `SubstreamId` is given by the `RPCHandler` as it identifies this request with the /// *inbound* substream over which it is managed. - Request(Request), + Request(Request), /// A response received from the outside. /// /// The `Id` corresponds to the application given ID of the original request sent to the @@ -113,10 +113,10 @@ impl RequestId { /// An Rpc Request. #[derive(Debug, Clone)] -pub struct Request { +pub struct Request { pub id: RequestId, pub substream_id: SubstreamId, - pub r#type: RequestType, + pub r#type: RequestType, } impl std::fmt::Display for RPCSend { @@ -221,7 +221,7 @@ impl RPC { /// Submits an RPC request. /// /// The peer must be connected for this to succeed. - pub fn send_request(&mut self, peer_id: PeerId, request_id: Id, req: RequestType) { + pub fn send_request(&mut self, peer_id: PeerId, request_id: Id, req: RequestType) { let event = if let Some(self_limiter) = self.self_limiter.as_mut() { match self_limiter.allows(peer_id, request_id, req) { Ok(event) => event, diff --git a/beacon_node/lighthouse_network/src/rpc/outbound.rs b/beacon_node/lighthouse_network/src/rpc/outbound.rs index 1037139f2fa..b614313a84b 100644 --- a/beacon_node/lighthouse_network/src/rpc/outbound.rs +++ b/beacon_node/lighthouse_network/src/rpc/outbound.rs @@ -7,7 +7,6 @@ use futures::future::BoxFuture; use futures::prelude::{AsyncRead, AsyncWrite}; use futures::{FutureExt, SinkExt}; use libp2p::core::{OutboundUpgrade, UpgradeInfo}; -use std::marker::PhantomData; use std::sync::Arc; use tokio_util::{ codec::Framed, @@ -20,14 +19,13 @@ use types::{EthSpec, ForkContext}; // `OutboundUpgrade` #[derive(Debug, Clone)] -pub struct OutboundRequestContainer { - pub req: RequestType, +pub struct OutboundRequestContainer { + pub req: RequestType, pub fork_context: Arc, pub max_rpc_size: usize, - pub phantom: PhantomData, } -impl UpgradeInfo for OutboundRequestContainer { +impl UpgradeInfo for OutboundRequestContainer { type Info = ProtocolId; type InfoIter = Vec; diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index b4f6dac4faf..16c3a133912 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -686,7 +686,7 @@ pub fn rpc_data_column_limits() -> RpcLimits { // The inbound protocol reads the request, decodes it and returns the stream to the protocol // handler to respond to once ready. -pub type InboundOutput = (RequestType, InboundFramed); +pub type InboundOutput = (RequestType, InboundFramed); pub type InboundFramed = Framed>>>, SSZSnappyInboundCodec>; @@ -754,7 +754,7 @@ where } #[derive(Debug, Clone, PartialEq)] -pub enum RequestType { +pub enum RequestType { Status(StatusMessage), Goodbye(GoodbyeReason), BlocksByRange(OldBlocksByRangeRequest), @@ -768,11 +768,11 @@ pub enum RequestType { LightClientFinalityUpdate, LightClientUpdatesByRange(LightClientUpdatesByRangeRequest), Ping(Ping), - MetaData(MetadataRequest), + MetaData(MetadataRequest), } /// Implements the encoding per supported protocol for `RPCRequest`. -impl RequestType { +impl RequestType { /* These functions are used in the handler for stream management */ /// Maximum number of responses expected for this request. @@ -782,10 +782,10 @@ impl RequestType { RequestType::Goodbye(_) => 0, RequestType::BlocksByRange(req) => *req.count(), RequestType::BlocksByRoot(req) => req.block_roots().len() as u64, - RequestType::BlobsByRange(req) => req.max_blobs_requested(), + RequestType::BlobsByRange(req) => req.max_blobs_requested::(), RequestType::BlobsByRoot(req) => req.blob_ids.len() as u64, RequestType::DataColumnsByRoot(req) => req.data_column_ids.len() as u64, - RequestType::DataColumnsByRange(req) => req.max_requested(), + RequestType::DataColumnsByRange(req) => req.max_requested::(), RequestType::Ping(_) => 1, RequestType::MetaData(_) => 1, RequestType::LightClientBootstrap(_) => 1, @@ -1027,7 +1027,7 @@ impl std::error::Error for RPCError { } } -impl std::fmt::Display for RequestType { +impl std::fmt::Display for RequestType { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { RequestType::Status(status) => write!(f, "Status Message: {}", status), diff --git a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs index e11f7f0e73e..ecbacc8c112 100644 --- a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs +++ b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs @@ -9,6 +9,7 @@ use std::pin::Pin; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; use tokio::time::Interval; +use types::EthSpec; /// Nanoseconds since a given time. // Maintained as u64 to reduce footprint @@ -261,7 +262,7 @@ pub trait RateLimiterItem { fn max_responses(&self) -> u64; } -impl RateLimiterItem for super::RequestType { +impl RateLimiterItem for super::RequestType { fn protocol(&self) -> Protocol { self.versioned_protocol().protocol() } diff --git a/beacon_node/lighthouse_network/src/rpc/self_limiter.rs b/beacon_node/lighthouse_network/src/rpc/self_limiter.rs index 9c68e0793d9..e968ad11e3d 100644 --- a/beacon_node/lighthouse_network/src/rpc/self_limiter.rs +++ b/beacon_node/lighthouse_network/src/rpc/self_limiter.rs @@ -19,8 +19,8 @@ use super::{ /// A request that was rate limited or waiting on rate limited requests for the same peer and /// protocol. -struct QueuedRequest { - req: RequestType, +struct QueuedRequest { + req: RequestType, request_id: Id, } @@ -28,7 +28,7 @@ pub(crate) struct SelfRateLimiter { /// Requests queued for sending per peer. This requests are stored when the self rate /// limiter rejects them. Rate limiting is based on a Peer and Protocol basis, therefore /// are stored in the same way. - delayed_requests: HashMap<(PeerId, Protocol), VecDeque>>, + delayed_requests: HashMap<(PeerId, Protocol), VecDeque>>, /// The delay required to allow a peer's outbound request per protocol. next_peer_request: DelayQueue<(PeerId, Protocol)>, /// Rate limiter for our own requests. @@ -70,7 +70,7 @@ impl SelfRateLimiter { &mut self, peer_id: PeerId, request_id: Id, - req: RequestType, + req: RequestType, ) -> Result, Error> { let protocol = req.versioned_protocol().protocol(); // First check that there are not already other requests waiting to be sent. @@ -101,9 +101,9 @@ impl SelfRateLimiter { limiter: &mut RateLimiter, peer_id: PeerId, request_id: Id, - req: RequestType, + req: RequestType, log: &Logger, - ) -> Result, (QueuedRequest, Duration)> { + ) -> Result, (QueuedRequest, Duration)> { match limiter.allows(&peer_id, &req) { Ok(()) => Ok(BehaviourAction::NotifyHandler { peer_id, diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index 462612e40a5..056b6be24d3 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -80,7 +80,7 @@ pub enum NetworkEvent { /// Identifier of the request. All responses to this request must use this id. id: PeerRequestId, /// Request the peer sent. - request: rpc::Request, + request: rpc::Request, }, ResponseReceived { /// Peer that sent the response. @@ -966,7 +966,7 @@ impl Network { &mut self, peer_id: PeerId, request_id: AppRequestId, - request: RequestType, + request: RequestType, ) -> Result<(), (AppRequestId, RPCError)> { // Check if the peer is connected before sending an RPC request if !self.swarm.is_connected(&peer_id) { @@ -1179,7 +1179,7 @@ impl Network { /// Sends a METADATA response to a peer. fn send_meta_data_response( &mut self, - _req: MetadataRequest, + _req: MetadataRequest, id: PeerRequestId, request_id: rpc::RequestId, peer_id: PeerId, diff --git a/beacon_node/lighthouse_network/tests/rpc_tests.rs b/beacon_node/lighthouse_network/tests/rpc_tests.rs index b5125a2d6bf..f721c8477cf 100644 --- a/beacon_node/lighthouse_network/tests/rpc_tests.rs +++ b/beacon_node/lighthouse_network/tests/rpc_tests.rs @@ -327,7 +327,6 @@ fn test_blobs_by_range_chunked_rpc() { let rpc_request = RequestType::BlobsByRange(BlobsByRangeRequest { start_slot: 0, count: slot_count, - max_blobs_per_block: E::max_blobs_per_block(), }); // BlocksByRange Response diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 709cbe5b120..6d32806713d 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -893,7 +893,7 @@ impl NetworkBeaconProcessor { ); // Should not send more than max request blocks - if req.max_blobs_requested() > self.chain.spec.max_request_blob_sidecars { + if req.max_blobs_requested::() > self.chain.spec.max_request_blob_sidecars { return Err(( RpcErrorResponse::InvalidRequest, "Request exceeded `MAX_REQUEST_BLOBS_SIDECARS`", @@ -1098,7 +1098,7 @@ impl NetworkBeaconProcessor { ); // Should not send more than max request data columns - if req.max_requested() > self.chain.spec.max_request_data_column_sidecars { + if req.max_requested::() > self.chain.spec.max_request_data_column_sidecars { return Err(( RpcErrorResponse::InvalidRequest, "Request exceeded `MAX_REQUEST_BLOBS_SIDECARS`", diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index e9805eb5ba7..9d774d97c15 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -30,9 +30,9 @@ use std::time::Duration; use tokio::sync::mpsc; use types::blob_sidecar::FixedBlobSidecarList; use types::{ - Attestation, AttesterSlashing, BlobSidecar, BlobSidecarList, Epoch, EthSpec, Hash256, - MainnetEthSpec, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, - SignedVoluntaryExit, Slot, SubnetId, + Attestation, AttesterSlashing, BlobSidecar, BlobSidecarList, Epoch, Hash256, MainnetEthSpec, + ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedVoluntaryExit, Slot, + SubnetId, }; type E = MainnetEthSpec; @@ -366,7 +366,6 @@ impl TestRig { BlobsByRangeRequest { start_slot: 0, count, - max_blobs_per_block: E::max_blobs_per_block(), }, ) .unwrap(); diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index a445cd6ea36..e1badfda9d5 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -58,7 +58,7 @@ pub enum RouterMessage { RPCRequestReceived { peer_id: PeerId, id: PeerRequestId, - request: rpc::Request, + request: rpc::Request, }, /// An RPC response has been received. RPCResponseReceived { @@ -193,11 +193,11 @@ impl Router { /* RPC - Related functionality */ /// A new RPC request has been received from the network. - fn handle_rpc_request( + fn handle_rpc_request( &mut self, peer_id: PeerId, request_id: PeerRequestId, - rpc_request: rpc::Request, + rpc_request: rpc::Request, ) { if !self.network_globals.peers.read().is_connected(&peer_id) { debug!(self.log, "Dropping request of disconnected peer"; "peer_id" => %peer_id, "request" => ?rpc_request); @@ -836,7 +836,7 @@ impl HandlerNetworkContext { } /// Sends a request to the network task. - pub fn send_processor_request(&mut self, peer_id: PeerId, request: RequestType) { + pub fn send_processor_request(&mut self, peer_id: PeerId, request: RequestType) { self.inform_network(NetworkMessage::SendRequest { peer_id, request_id: AppRequestId::Router, diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 269744dc054..5a66cb7f30d 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -62,7 +62,7 @@ pub enum NetworkMessage { /// Send an RPC request to the libp2p service. SendRequest { peer_id: PeerId, - request: RequestType, + request: RequestType, request_id: AppRequestId, }, /// Send a successful Response to the libp2p service. diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index eb42e697cd2..5f7778ffcc6 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -401,7 +401,6 @@ impl SyncNetworkContext { request: RequestType::BlobsByRange(BlobsByRangeRequest { start_slot: *request.start_slot(), count: *request.count(), - max_blobs_per_block: T::EthSpec::max_blobs_per_block(), }), request_id: AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }), })