Skip to content

Commit

Permalink
Implement PeerDAS subnet decoupling (aka custody groups) (#6736)
Browse files Browse the repository at this point in the history
* Implement PeerDAS subnet decoupling (aka custody groups).

* Merge branch 'unstable' into decouple-subnets

* Refactor feature testing for spec tests (#6737)

Squashed commit of the following:

commit 898d05e
Merge: ffbd25e 7e0cdde
Author: Jimmy Chen <jchen.tc@gmail.com>
Date:   Tue Dec 24 14:41:19 2024 +1100

    Merge branch 'unstable' into refactor-ef-tests-features

commit ffbd25e
Author: Jimmy Chen <jchen.tc@gmail.com>
Date:   Tue Dec 24 14:40:38 2024 +1100

    Fix `SszStatic` tests for PeerDAS: exclude eip7594 test vectors when testing Electra types.

commit aa593cf
Author: Jimmy Chen <jchen.tc@gmail.com>
Date:   Fri Dec 20 12:08:54 2024 +1100

    Refactor spec testing for features and simplify usage.

* Fix build.

* Add input validation and improve arithmetic handling when calculating custody groups.

* Address review comments re code style consistency.

* Merge branch 'unstable' into decouple-subnets

# Conflicts:
#	beacon_node/beacon_chain/src/kzg_utils.rs
#	beacon_node/beacon_chain/src/observed_data_sidecars.rs
#	beacon_node/lighthouse_network/src/discovery/subnet_predicate.rs
#	common/eth2_network_config/built_in_network_configs/chiado/config.yaml
#	common/eth2_network_config/built_in_network_configs/gnosis/config.yaml
#	common/eth2_network_config/built_in_network_configs/holesky/config.yaml
#	common/eth2_network_config/built_in_network_configs/mainnet/config.yaml
#	common/eth2_network_config/built_in_network_configs/sepolia/config.yaml
#	consensus/types/src/chain_spec.rs

* Update consensus/types/src/chain_spec.rs

Co-authored-by: Lion - dapplion <35266934+dapplion@users.noreply.github.com>

* Merge remote-tracking branch 'origin/unstable' into decouple-subnets

* Update error handling.

* Address review comment.

* Merge remote-tracking branch 'origin/unstable' into decouple-subnets

# Conflicts:
#	consensus/types/src/chain_spec.rs

* Update PeerDAS spec tests to `1.5.0-beta.0` and fix failing unit tests.

* Merge remote-tracking branch 'origin/unstable' into decouple-subnets

# Conflicts:
#	beacon_node/lighthouse_network/src/peer_manager/mod.rs
  • Loading branch information
jimmygchen authored Jan 15, 2025
1 parent dd7591f commit e98209d
Show file tree
Hide file tree
Showing 39 changed files with 551 additions and 429 deletions.
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/block_verification_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ impl<E: EthSpec> RpcBlock<E> {
let inner = if !custody_columns.is_empty() {
RpcBlockInner::BlockAndCustodyColumns(
block,
RuntimeVariableList::new(custody_columns, spec.number_of_columns)?,
RuntimeVariableList::new(custody_columns, spec.number_of_columns as usize)?,
)
} else {
RpcBlockInner::Block(block)
Expand Down
21 changes: 8 additions & 13 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,21 +117,16 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
spec: Arc<ChainSpec>,
log: Logger,
) -> Result<Self, AvailabilityCheckError> {
let custody_subnet_count = if import_all_data_columns {
spec.data_column_sidecar_subnet_count as usize
} else {
spec.custody_requirement as usize
};

let subnet_sampling_size =
std::cmp::max(custody_subnet_count, spec.samples_per_slot as usize);
let sampling_column_count =
subnet_sampling_size.saturating_mul(spec.data_columns_per_subnet());
let custody_group_count = spec.custody_group_count(import_all_data_columns);
// This should only panic if the chain spec contains invalid values.
let sampling_size = spec
.sampling_size(custody_group_count)
.expect("should compute node sampling size from valid chain spec");

let inner = DataAvailabilityCheckerInner::new(
OVERFLOW_LRU_CAPACITY,
store,
sampling_column_count,
sampling_size as usize,
spec.clone(),
)?;
Ok(Self {
Expand All @@ -148,7 +143,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
}

pub(crate) fn is_supernode(&self) -> bool {
self.get_sampling_column_count() == self.spec.number_of_columns
self.get_sampling_column_count() == self.spec.number_of_columns as usize
}

/// Checks if the block root is currenlty in the availability cache awaiting import because
Expand Down Expand Up @@ -433,7 +428,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
.map(CustodyDataColumn::into_inner)
.collect::<Vec<_>>();
let all_data_columns =
RuntimeVariableList::from_vec(all_data_columns, self.spec.number_of_columns);
RuntimeVariableList::from_vec(all_data_columns, self.spec.number_of_columns as usize);

// verify kzg for all data columns at once
if !all_data_columns.is_empty() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {

// If we're sampling all columns, it means we must be custodying all columns.
let custody_column_count = self.sampling_column_count();
let total_column_count = self.spec.number_of_columns;
let total_column_count = self.spec.number_of_columns as usize;
let received_column_count = pending_components.verified_data_columns.len();

if pending_components.reconstruction_started {
Expand All @@ -607,7 +607,7 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
if custody_column_count != total_column_count {
return ReconstructColumnsDecision::No("not required for full node");
}
if received_column_count == self.spec.number_of_columns {
if received_column_count >= total_column_count {
return ReconstructColumnsDecision::No("all columns received");
}
if received_column_count < total_column_count / 2 {
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/beacon_chain/src/data_column_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ fn verify_data_column_sidecar<E: EthSpec>(
data_column: &DataColumnSidecar<E>,
spec: &ChainSpec,
) -> Result<(), GossipDataColumnError> {
if data_column.index >= spec.number_of_columns as u64 {
if data_column.index >= spec.number_of_columns {
return Err(GossipDataColumnError::InvalidColumnIndex(data_column.index));
}
if data_column.kzg_commitments.is_empty() {
Expand Down Expand Up @@ -611,7 +611,7 @@ fn verify_index_matches_subnet<E: EthSpec>(
spec: &ChainSpec,
) -> Result<(), GossipDataColumnError> {
let expected_subnet: u64 =
DataColumnSubnetId::from_column_index::<E>(data_column.index as usize, spec).into();
DataColumnSubnetId::from_column_index(data_column.index, spec).into();
if expected_subnet != subnet {
return Err(GossipDataColumnError::InvalidSubnetId {
received: subnet,
Expand Down
6 changes: 3 additions & 3 deletions beacon_node/beacon_chain/src/kzg_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ fn build_data_column_sidecars<E: EthSpec>(
blob_cells_and_proofs_vec: Vec<CellsAndKzgProofs>,
spec: &ChainSpec,
) -> Result<DataColumnSidecarList<E>, String> {
let number_of_columns = spec.number_of_columns;
let number_of_columns = spec.number_of_columns as usize;
let max_blobs_per_block = spec
.max_blobs_per_block(signed_block_header.message.slot.epoch(E::slots_per_epoch()))
as usize;
Expand Down Expand Up @@ -428,7 +428,7 @@ mod test {
.kzg_commitments_merkle_proof()
.unwrap();

assert_eq!(column_sidecars.len(), spec.number_of_columns);
assert_eq!(column_sidecars.len(), spec.number_of_columns as usize);
for (idx, col_sidecar) in column_sidecars.iter().enumerate() {
assert_eq!(col_sidecar.index, idx as u64);

Expand Down Expand Up @@ -461,7 +461,7 @@ mod test {
)
.unwrap();

for i in 0..spec.number_of_columns {
for i in 0..spec.number_of_columns as usize {
assert_eq!(reconstructed_columns.get(i), column_sidecars.get(i), "{i}");
}
}
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/observed_data_sidecars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl<E: EthSpec> ObservableDataSidecar for DataColumnSidecar<E> {
}

fn max_num_of_items(spec: &ChainSpec, _slot: Slot) -> usize {
spec.number_of_columns
spec.number_of_columns as usize
}
}

Expand Down
2 changes: 1 addition & 1 deletion beacon_node/http_api/src/block_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ impl BlockId {

let num_found_column_keys = column_indices.len();
let num_required_columns = chain.spec.number_of_columns / 2;
let is_blob_available = num_found_column_keys >= num_required_columns;
let is_blob_available = num_found_column_keys >= num_required_columns as usize;

if is_blob_available {
let data_columns = column_indices
Expand Down
10 changes: 3 additions & 7 deletions beacon_node/http_api/src/publish_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,9 +395,8 @@ fn build_gossip_verified_data_columns<T: BeaconChainTypes>(
let gossip_verified_data_columns = data_column_sidecars
.into_iter()
.map(|data_column_sidecar| {
let column_index = data_column_sidecar.index as usize;
let subnet =
DataColumnSubnetId::from_column_index::<T::EthSpec>(column_index, &chain.spec);
let column_index = data_column_sidecar.index;
let subnet = DataColumnSubnetId::from_column_index(column_index, &chain.spec);
let gossip_verified_column =
GossipVerifiedDataColumn::new(data_column_sidecar, subnet.into(), chain);

Expand Down Expand Up @@ -520,10 +519,7 @@ fn publish_column_sidecars<T: BeaconChainTypes>(
let pubsub_messages = data_column_sidecars
.into_iter()
.map(|data_col| {
let subnet = DataColumnSubnetId::from_column_index::<T::EthSpec>(
data_col.index as usize,
&chain.spec,
);
let subnet = DataColumnSubnetId::from_column_index(data_col.index, &chain.spec);
PubsubMessage::DataColumnSidecar(Box::new((subnet, data_col)))
})
.collect::<Vec<_>>();
Expand Down
46 changes: 23 additions & 23 deletions beacon_node/lighthouse_network/src/discovery/enr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ pub const ETH2_ENR_KEY: &str = "eth2";
pub const ATTESTATION_BITFIELD_ENR_KEY: &str = "attnets";
/// The ENR field specifying the sync committee subnet bitfield.
pub const SYNC_COMMITTEE_BITFIELD_ENR_KEY: &str = "syncnets";
/// The ENR field specifying the peerdas custody subnet count.
pub const PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY: &str = "csc";
/// The ENR field specifying the peerdas custody group count.
pub const PEERDAS_CUSTODY_GROUP_COUNT_ENR_KEY: &str = "cgc";

/// Extension trait for ENR's within Eth2.
pub trait Eth2Enr {
Expand All @@ -38,8 +38,8 @@ pub trait Eth2Enr {
&self,
) -> Result<EnrSyncCommitteeBitfield<E>, &'static str>;

/// The peerdas custody subnet count associated with the ENR.
fn custody_subnet_count<E: EthSpec>(&self, spec: &ChainSpec) -> Result<u64, &'static str>;
/// The peerdas custody group count associated with the ENR.
fn custody_group_count<E: EthSpec>(&self, spec: &ChainSpec) -> Result<u64, &'static str>;

fn eth2(&self) -> Result<EnrForkId, &'static str>;
}
Expand Down Expand Up @@ -67,16 +67,16 @@ impl Eth2Enr for Enr {
.map_err(|_| "Could not decode the ENR syncnets bitfield")
}

fn custody_subnet_count<E: EthSpec>(&self, spec: &ChainSpec) -> Result<u64, &'static str> {
let csc = self
.get_decodable::<u64>(PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY)
.ok_or("ENR custody subnet count non-existent")?
.map_err(|_| "Could not decode the ENR custody subnet count")?;
fn custody_group_count<E: EthSpec>(&self, spec: &ChainSpec) -> Result<u64, &'static str> {
let cgc = self
.get_decodable::<u64>(PEERDAS_CUSTODY_GROUP_COUNT_ENR_KEY)
.ok_or("ENR custody group count non-existent")?
.map_err(|_| "Could not decode the ENR custody group count")?;

if csc >= spec.custody_requirement && csc <= spec.data_column_sidecar_subnet_count {
Ok(csc)
if (spec.custody_requirement..=spec.number_of_custody_groups).contains(&cgc) {
Ok(cgc)
} else {
Err("Invalid custody subnet count in ENR")
Err("Invalid custody group count in ENR")
}
}

Expand Down Expand Up @@ -253,14 +253,14 @@ pub fn build_enr<E: EthSpec>(
&bitfield.as_ssz_bytes().into(),
);

// only set `csc` if PeerDAS fork epoch has been scheduled
// only set `cgc` if PeerDAS fork epoch has been scheduled
if spec.is_peer_das_scheduled() {
let custody_subnet_count = if config.subscribe_all_data_column_subnets {
spec.data_column_sidecar_subnet_count
let custody_group_count = if config.subscribe_all_data_column_subnets {
spec.number_of_custody_groups
} else {
spec.custody_requirement
};
builder.add_value(PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY, &custody_subnet_count);
builder.add_value(PEERDAS_CUSTODY_GROUP_COUNT_ENR_KEY, &custody_group_count);
}

builder
Expand All @@ -287,11 +287,11 @@ fn compare_enr(local_enr: &Enr, disk_enr: &Enr) -> bool {
&& (local_enr.udp4().is_none() || local_enr.udp4() == disk_enr.udp4())
&& (local_enr.udp6().is_none() || local_enr.udp6() == disk_enr.udp6())
// we need the ATTESTATION_BITFIELD_ENR_KEY and SYNC_COMMITTEE_BITFIELD_ENR_KEY and
// PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY key to match, otherwise we use a new ENR. This will
// PEERDAS_CUSTODY_GROUP_COUNT_ENR_KEY key to match, otherwise we use a new ENR. This will
// likely only be true for non-validating nodes.
&& local_enr.get_decodable::<Bytes>(ATTESTATION_BITFIELD_ENR_KEY) == disk_enr.get_decodable(ATTESTATION_BITFIELD_ENR_KEY)
&& local_enr.get_decodable::<Bytes>(SYNC_COMMITTEE_BITFIELD_ENR_KEY) == disk_enr.get_decodable(SYNC_COMMITTEE_BITFIELD_ENR_KEY)
&& local_enr.get_decodable::<Bytes>(PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY) == disk_enr.get_decodable(PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY)
&& local_enr.get_decodable::<Bytes>(PEERDAS_CUSTODY_GROUP_COUNT_ENR_KEY) == disk_enr.get_decodable(PEERDAS_CUSTODY_GROUP_COUNT_ENR_KEY)
}

/// Loads enr from the given directory
Expand Down Expand Up @@ -348,7 +348,7 @@ mod test {
}

#[test]
fn custody_subnet_count_default() {
fn custody_group_count_default() {
let config = NetworkConfig {
subscribe_all_data_column_subnets: false,
..NetworkConfig::default()
Expand All @@ -358,13 +358,13 @@ mod test {
let enr = build_enr_with_config(config, &spec).0;

assert_eq!(
enr.custody_subnet_count::<E>(&spec).unwrap(),
enr.custody_group_count::<E>(&spec).unwrap(),
spec.custody_requirement,
);
}

#[test]
fn custody_subnet_count_all() {
fn custody_group_count_all() {
let config = NetworkConfig {
subscribe_all_data_column_subnets: true,
..NetworkConfig::default()
Expand All @@ -373,8 +373,8 @@ mod test {
let enr = build_enr_with_config(config, &spec).0;

assert_eq!(
enr.custody_subnet_count::<E>(&spec).unwrap(),
spec.data_column_sidecar_subnet_count,
enr.custody_group_count::<E>(&spec).unwrap(),
spec.number_of_custody_groups,
);
}

Expand Down
14 changes: 5 additions & 9 deletions beacon_node/lighthouse_network/src/discovery/subnet_predicate.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
//! The subnet predicate used for searching for a particular subnet.
use super::*;
use crate::types::{EnrAttestationBitfield, EnrSyncCommitteeBitfield};
use itertools::Itertools;
use slog::trace;
use std::ops::Deref;
use types::{ChainSpec, DataColumnSubnetId};
use types::data_column_custody_group::compute_subnets_for_node;
use types::ChainSpec;

/// Returns the predicate for a given subnet.
pub fn subnet_predicate<E>(
Expand Down Expand Up @@ -37,13 +37,9 @@ where
.as_ref()
.is_ok_and(|b| b.get(*s.deref() as usize).unwrap_or(false)),
Subnet::DataColumn(s) => {
if let Ok(custody_subnet_count) = enr.custody_subnet_count::<E>(&spec) {
DataColumnSubnetId::compute_custody_subnets::<E>(
enr.node_id().raw(),
custody_subnet_count,
&spec,
)
.is_ok_and(|mut subnets| subnets.contains(s))
if let Ok(custody_group_count) = enr.custody_group_count::<E>(&spec) {
compute_subnets_for_node(enr.node_id().raw(), custody_group_count, &spec)
.is_ok_and(|subnets| subnets.contains(s))
} else {
false
}
Expand Down
8 changes: 4 additions & 4 deletions beacon_node/lighthouse_network/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,11 @@ pub static PEERS_PER_CLIENT: LazyLock<Result<IntGaugeVec>> = LazyLock::new(|| {
)
});

pub static PEERS_PER_CUSTODY_SUBNET_COUNT: LazyLock<Result<IntGaugeVec>> = LazyLock::new(|| {
pub static PEERS_PER_CUSTODY_GROUP_COUNT: LazyLock<Result<IntGaugeVec>> = LazyLock::new(|| {
try_create_int_gauge_vec(
"peers_per_custody_subnet_count",
"The current count of peers by custody subnet count",
&["custody_subnet_count"],
"peers_per_custody_group_count",
"The current count of peers by custody group count",
&["custody_group_count"],
)
});

Expand Down
Loading

0 comments on commit e98209d

Please sign in to comment.