Skip to content

Commit

Permalink
SingleAttestation
Browse files Browse the repository at this point in the history
  • Loading branch information
eserilev committed Dec 4, 2024
1 parent fec502d commit 366bed3
Show file tree
Hide file tree
Showing 10 changed files with 359 additions and 23 deletions.
18 changes: 15 additions & 3 deletions beacon_node/beacon_chain/src/attestation_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ use std::borrow::Cow;
use strum::AsRefStr;
use tree_hash::TreeHash;
use types::{
Attestation, AttestationRef, BeaconCommittee, BeaconStateError::NoCommitteeFound, ChainSpec,
CommitteeIndex, Epoch, EthSpec, Hash256, IndexedAttestation, SelectionProof,
SignedAggregateAndProof, Slot, SubnetId,
attestation::SingleAttestation, Attestation, AttestationRef, BeaconCommittee,
BeaconStateError::NoCommitteeFound, ChainSpec, CommitteeIndex, Epoch, EthSpec, Hash256,
IndexedAttestation, SelectionProof, SignedAggregateAndProof, Slot, SubnetId,
};

pub use batch::{batch_verify_aggregated_attestations, batch_verify_unaggregated_attestations};
Expand Down Expand Up @@ -317,12 +317,23 @@ pub struct VerifiedUnaggregatedAttestation<'a, T: BeaconChainTypes> {
attestation: AttestationRef<'a, T::EthSpec>,
indexed_attestation: IndexedAttestation<T::EthSpec>,
subnet_id: SubnetId,
validator_index: usize,
}

impl<T: BeaconChainTypes> VerifiedUnaggregatedAttestation<'_, T> {
pub fn into_indexed_attestation(self) -> IndexedAttestation<T::EthSpec> {
self.indexed_attestation
}

pub fn single_attestation(&self) -> SingleAttestation {
// TODO(single-attestation) unwrap
SingleAttestation {
committee_index: self.attestation.committee_index().unwrap_or(0) as usize,
attester_index: self.validator_index,
data: self.attestation.data().clone(),
signature: self.attestation.signature().clone(),
}
}
}

/// Custom `Clone` implementation is to avoid the restrictive trait bounds applied by the usual derive
Expand Down Expand Up @@ -1035,6 +1046,7 @@ impl<'a, T: BeaconChainTypes> VerifiedUnaggregatedAttestation<'a, T> {
attestation,
indexed_attestation,
subnet_id,
validator_index: validator_index as usize,
})
}

Expand Down
15 changes: 12 additions & 3 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2034,9 +2034,18 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// This method is called for API and gossip attestations, so this covers all unaggregated attestation events
if let Some(event_handler) = self.event_handler.as_ref() {
if event_handler.has_attestation_subscribers() {
event_handler.register(EventKind::Attestation(Box::new(
v.attestation().clone_as_attestation(),
)));
let current_fork = self
.spec
.fork_name_at_slot::<T::EthSpec>(v.attestation().data().slot);
if current_fork.electra_enabled() {
event_handler.register(EventKind::SingleAttestation(Box::new(
v.single_attestation(),
)));
} else {
event_handler.register(EventKind::Attestation(Box::new(
v.attestation().clone_as_attestation(),
)));
}
}
}
metrics::inc_counter(&metrics::UNAGGREGATED_ATTESTATION_PROCESSING_SUCCESSES);
Expand Down
4 changes: 4 additions & 0 deletions beacon_node/beacon_chain/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ impl<E: EthSpec> ServerSentEventHandler<E> {
.attestation_tx
.send(kind)
.map(|count| log_count("attestation", count)),
EventKind::SingleAttestation(_) => self
.attestation_tx
.send(kind)
.map(|count| log_count("attestation", count)),
EventKind::Block(_) => self
.block_tx
.send(kind)
Expand Down
48 changes: 43 additions & 5 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ use types::{
ForkVersionedResponse, Hash256, ProposerPreparationData, ProposerSlashing, RelativeEpoch,
SignedAggregateAndProof, SignedBlindedBeaconBlock, SignedBlsToExecutionChange,
SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot,
SyncCommitteeMessage, SyncContributionData,
SyncCommitteeMessage, SyncContributionData, attestation::SingleAttestation
};
use validator::pubkey_to_validator_index;
use version::{
Expand Down Expand Up @@ -1831,21 +1831,26 @@ pub fn serve<T: BeaconChainTypes>(
.and(task_spawner_filter.clone())
.and(chain_filter.clone());

let beacon_pool_path_v2 = eth_v2
.and(warp::path("beacon"))
.and(warp::path("pool"))
.and(task_spawner_filter.clone())
.and(chain_filter.clone());

// POST beacon/pool/attestations
let post_beacon_pool_attestations = beacon_pool_path_any
let post_beacon_pool_attestations = beacon_pool_path
.clone()
.and(warp::path("attestations"))
.and(warp::path::end())
.and(warp_utils::json::json())
.and(network_tx_filter.clone())
.and(reprocess_send_filter)
.and(reprocess_send_filter.clone())
.and(log_filter.clone())
.then(
// V1 and V2 are identical except V2 has a consensus version header in the request.
// We only require this header for SSZ deserialization, which isn't supported for
// this endpoint presently.
|_endpoint_version: EndpointVersion,
task_spawner: TaskSpawner<T::EthSpec>,
|task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
attestations: Vec<Attestation<T::EthSpec>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
Expand All @@ -1865,6 +1870,38 @@ pub fn serve<T: BeaconChainTypes>(
},
);

let post_beacon_pool_attestations_v2 = beacon_pool_path_v2
.clone()
.and(warp::path("attestations"))
.and(warp::path::end())
.and(warp_utils::json::json())
.and(network_tx_filter.clone())
.and(reprocess_send_filter)
.and(log_filter.clone())
.then(
// V1 and V2 are identical except V2 has a consensus version header in the request.
// We only require this header for SSZ deserialization, which isn't supported for
// this endpoint presently.
|task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
attestations: Vec<SingleAttestation>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
reprocess_tx: Option<Sender<ReprocessQueueMessage>>,
log: Logger| async move {
let result = crate::publish_attestations::publish_single_attestations(
task_spawner,
chain,
attestations,
network_tx,
reprocess_tx,
log,
)
.await
.map(|()| warp::reply::json(&()));
convert_rejection(result).await
},
);

// GET beacon/pool/attestations?committee_index,slot
let get_beacon_pool_attestations = beacon_pool_path_any
.clone()
Expand Down Expand Up @@ -4732,6 +4769,7 @@ pub fn serve<T: BeaconChainTypes>(
.uor(post_beacon_blocks_v2)
.uor(post_beacon_blinded_blocks_v2)
.uor(post_beacon_pool_attestations)
.uor(post_beacon_pool_attestations_v2)
.uor(post_beacon_pool_attester_slashings)
.uor(post_beacon_pool_proposer_slashings)
.uor(post_beacon_pool_voluntary_exits)
Expand Down
90 changes: 80 additions & 10 deletions beacon_node/http_api/src/publish_attestations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use tokio::sync::{
mpsc::{Sender, UnboundedSender},
oneshot,
};
use types::Attestation;
use types::{attestation::SingleAttestation, Attestation, EthSpec};

// Error variants are only used in `Debug` and considered `dead_code` by the compiler.
#[derive(Debug)]
Expand Down Expand Up @@ -82,15 +82,43 @@ fn verify_and_publish_attestation<T: BeaconChainTypes>(
.verify_unaggregated_attestation_for_gossip(attestation, None)
.map_err(Error::Validation)?;

// Publish.
network_tx
.send(NetworkMessage::Publish {
messages: vec![PubsubMessage::Attestation(Box::new((
attestation.subnet_id(),
attestation.attestation().clone_as_attestation(),
)))],
})
.map_err(|_| Error::Publication)?;
match attestation.attestation() {
types::AttestationRef::Base(_) => {
// Publish.
network_tx
.send(NetworkMessage::Publish {
messages: vec![PubsubMessage::Attestation(Box::new((
attestation.subnet_id(),
attestation.attestation().clone_as_attestation(),
)))],
})
.map_err(|_| Error::Publication)?;
}
types::AttestationRef::Electra(attn) => {
chain
.with_committee_cache(
attn.data.target.root,
attn.data.slot.epoch(T::EthSpec::slots_per_epoch()),
|committee_cache, _| {
let committees =
committee_cache.get_beacon_committees_at_slot(attn.data.slot)?;

let single_attestation = attn.to_single_attestation(&committees)?;

network_tx
.send(NetworkMessage::Publish {
messages: vec![PubsubMessage::SingleAttestation(Box::new((
attestation.subnet_id(),
single_attestation,
)))],
})
.map_err(|_| BeaconChainError::UnableToPublish)?;
Ok(())
},
)
.map_err(|_| Error::Publication)?;
}
}

// Notify the validator monitor.
chain
Expand Down Expand Up @@ -129,6 +157,48 @@ fn verify_and_publish_attestation<T: BeaconChainTypes>(
}
}

pub async fn publish_single_attestations<T: BeaconChainTypes>(
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
single_attestations: Vec<SingleAttestation>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
reprocess_send: Option<Sender<ReprocessQueueMessage>>,
log: Logger,
) -> Result<(), warp::Rejection> {
let mut attestations = vec![];
for single_attestation in single_attestations {
let attestation = chain.with_committee_cache(
single_attestation.data.target.root,
single_attestation
.data
.slot
.epoch(T::EthSpec::slots_per_epoch()),
|committee_cache, _| {
let committees =
committee_cache.get_beacon_committees_at_slot(single_attestation.data.slot)?;

let attestation = single_attestation.to_attestation::<T::EthSpec>(&committees)?;

Ok(attestation)
},
);

if let Ok(attestation) = attestation {
attestations.push(attestation);
}
}

publish_attestations(
task_spawner,
chain,
attestations,
network_tx,
reprocess_send,
log,
)
.await
}

pub async fn publish_attestations<T: BeaconChainTypes>(
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
Expand Down
15 changes: 15 additions & 0 deletions beacon_node/lighthouse_network/src/types/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use snap::raw::{decompress_len, Decoder, Encoder};
use ssz::{Decode, Encode};
use std::io::{Error, ErrorKind};
use std::sync::Arc;
use types::attestation::SingleAttestation;
use types::{
Attestation, AttestationBase, AttestationElectra, AttesterSlashing, AttesterSlashingBase,
AttesterSlashingElectra, BlobSidecar, DataColumnSidecar, DataColumnSubnetId, EthSpec,
Expand All @@ -29,6 +30,8 @@ pub enum PubsubMessage<E: EthSpec> {
AggregateAndProofAttestation(Box<SignedAggregateAndProof<E>>),
/// Gossipsub message providing notification of a raw un-aggregated attestation with its shard id.
Attestation(Box<(SubnetId, Attestation<E>)>),
/// Gossipsub message providing notification of a `SingleAttestation`` with its shard id.
SingleAttestation(Box<(SubnetId, SingleAttestation)>),
/// Gossipsub message providing notification of a voluntary exit.
VoluntaryExit(Box<SignedVoluntaryExit>),
/// Gossipsub message providing notification of a new proposer slashing.
Expand Down Expand Up @@ -128,6 +131,9 @@ impl<E: EthSpec> PubsubMessage<E> {
PubsubMessage::Attestation(attestation_data) => {
GossipKind::Attestation(attestation_data.0)
}
PubsubMessage::SingleAttestation(attestation_data) => {
GossipKind::Attestation(attestation_data.0)
}
PubsubMessage::VoluntaryExit(_) => GossipKind::VoluntaryExit,
PubsubMessage::ProposerSlashing(_) => GossipKind::ProposerSlashing,
PubsubMessage::AttesterSlashing(_) => GossipKind::AttesterSlashing,
Expand Down Expand Up @@ -411,6 +417,7 @@ impl<E: EthSpec> PubsubMessage<E> {
PubsubMessage::ProposerSlashing(data) => data.as_ssz_bytes(),
PubsubMessage::AttesterSlashing(data) => data.as_ssz_bytes(),
PubsubMessage::Attestation(data) => data.1.as_ssz_bytes(),
PubsubMessage::SingleAttestation(data) => data.1.as_ssz_bytes(),
PubsubMessage::SignedContributionAndProof(data) => data.as_ssz_bytes(),
PubsubMessage::SyncCommitteeMessage(data) => data.1.as_ssz_bytes(),
PubsubMessage::BlsToExecutionChange(data) => data.as_ssz_bytes(),
Expand Down Expand Up @@ -455,6 +462,14 @@ impl<E: EthSpec> std::fmt::Display for PubsubMessage<E> {
data.1.data().slot,
data.1.committee_index(),
),
PubsubMessage::SingleAttestation(data) => write!(
f,
"SingleAttestation: subnet_id: {}, attestation_slot: {}, committee_index: {:?}, attester_index: {:?}",
*data.0,
data.1.data.slot,
data.1.committee_index,
data.1.attester_index,
),
PubsubMessage::VoluntaryExit(_data) => write!(f, "Voluntary Exit"),
PubsubMessage::ProposerSlashing(_data) => write!(f, "Proposer Slashing"),
PubsubMessage::AttesterSlashing(_data) => write!(f, "Attester Slashing"),
Expand Down
46 changes: 45 additions & 1 deletion beacon_node/network/src/network_beacon_processor/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::sync::manager::BlockProcessType;
use crate::sync::SamplingId;
use crate::{service::NetworkMessage, sync::manager::SyncMessage};
use attestation::SingleAttestation;
use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob};
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::data_column_verification::{observe_gossip_data_column, GossipDataColumnError};
Expand Down Expand Up @@ -28,7 +29,7 @@ use lighthouse_network::{
Client, MessageId, NetworkGlobals, PeerId, PubsubMessage,
};
use rand::prelude::SliceRandom;
use slog::{debug, error, trace, warn, Logger};
use slog::{debug, error, info, trace, warn, Logger};
use slot_clock::ManualSlotClock;
use std::path::PathBuf;
use std::sync::Arc;
Expand Down Expand Up @@ -84,6 +85,49 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
.map_err(Into::into)
}

/// Create a new `Work` event for some `SingleAttestation`.
pub fn send_single_attestation(
self: &Arc<Self>,
message_id: MessageId,
peer_id: PeerId,
single_attestation: SingleAttestation,
subnet_id: SubnetId,
should_import: bool,
seen_timestamp: Duration,
) -> Result<(), Error<T::EthSpec>> {
info!(self.log, "SENDING A SINGLE ATTESTATION");
let result = self.chain.with_committee_cache(
single_attestation.data.target.root,
single_attestation
.data
.slot
.epoch(T::EthSpec::slots_per_epoch()),
|committee_cache, _| {
let committees =
committee_cache.get_beacon_committees_at_slot(single_attestation.data.slot)?;

let attestation = single_attestation.to_attestation(&committees)?;

Ok(self.send_unaggregated_attestation(
message_id.clone(),
peer_id,
attestation,
subnet_id,
should_import,
seen_timestamp,
))
},
);

match result {
Ok(result) => result,
Err(e) => {
warn!(self.log, "Failed to send SingleAttestation"; "error" => ?e);
Ok(())
}
}
}

/// Create a new `Work` event for some unaggregated attestation.
pub fn send_unaggregated_attestation(
self: &Arc<Self>,
Expand Down
Loading

0 comments on commit 366bed3

Please sign in to comment.