Skip to content

Commit

Permalink
removes branch to recover legacy shreds (#4488)
Browse files Browse the repository at this point in the history
Legacy shreds are discarded on all clusters:
https://github.com/anza-xyz/agave/blob/91d0d0cae/ledger/src/shred.rs#L1275-L1277

Removing the branch to recover legacy shreds would allow to further
simplify and optimize shreds recovery code.
  • Loading branch information
behzadnouri authored Jan 20, 2025
1 parent 6b21eae commit 28acb1c
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 100 deletions.
102 changes: 45 additions & 57 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -763,10 +763,10 @@ impl Blockstore {
fn get_recovery_data_shreds<'a>(
&'a self,
index: &'a Index,
slot: Slot,
erasure_meta: &'a ErasureMeta,
prev_inserted_shreds: &'a HashMap<ShredId, Shred>,
) -> impl Iterator<Item = Shred> + 'a {
let slot = index.slot;
erasure_meta.data_shreds_indices().filter_map(move |i| {
let key = ShredId::new(slot, u32::try_from(i).unwrap(), ShredType::Data);
if let Some(shred) = prev_inserted_shreds.get(&key) {
Expand All @@ -792,10 +792,10 @@ impl Blockstore {
fn get_recovery_coding_shreds<'a>(
&'a self,
index: &'a Index,
slot: Slot,
erasure_meta: &'a ErasureMeta,
prev_inserted_shreds: &'a HashMap<ShredId, Shred>,
) -> impl Iterator<Item = Shred> + 'a {
let slot = index.slot;
erasure_meta.coding_shreds_indices().filter_map(move |i| {
let key = ShredId::new(slot, u32::try_from(i).unwrap(), ShredType::Code);
if let Some(shred) = prev_inserted_shreds.get(&key) {
Expand Down Expand Up @@ -823,19 +823,12 @@ impl Blockstore {
index: &Index,
erasure_meta: &ErasureMeta,
prev_inserted_shreds: &HashMap<ShredId, Shred>,
leader_schedule_cache: &LeaderScheduleCache,
reed_solomon_cache: &ReedSolomonCache,
) -> std::result::Result<Vec<Shred>, shred::Error> {
// Find shreds for this erasure set and try recovery
let slot = index.slot;
let available_shreds: Vec<_> = self
.get_recovery_data_shreds(index, slot, erasure_meta, prev_inserted_shreds)
.chain(self.get_recovery_coding_shreds(index, slot, erasure_meta, prev_inserted_shreds))
.collect();
let get_slot_leader = |slot: Slot| -> Option<Pubkey> {
leader_schedule_cache.slot_leader_at(slot, /*bank:*/ None)
};
shred::recover(available_shreds, reed_solomon_cache, get_slot_leader)
let data = self.get_recovery_data_shreds(index, erasure_meta, prev_inserted_shreds);
let code = self.get_recovery_coding_shreds(index, erasure_meta, prev_inserted_shreds);
shred::recover(data.chain(code), reed_solomon_cache)
}

/// Collects and reports [`BlockstoreRocksDbColumnFamilyMetrics`] for the
Expand Down Expand Up @@ -942,7 +935,6 @@ impl Blockstore {
erasure_metas: &'a BTreeMap<ErasureSetId, WorkingEntry<ErasureMeta>>,
index_working_set: &'a HashMap<u64, IndexMetaWorkingSetEntry>,
prev_inserted_shreds: &'a HashMap<ShredId, Shred>,
leader_schedule_cache: &'a LeaderScheduleCache,
reed_solomon_cache: &'a ReedSolomonCache,
) -> impl Iterator<Item = Vec<Shred>> + 'a {
// Recovery rules:
Expand All @@ -964,7 +956,6 @@ impl Blockstore {
index,
erasure_meta,
prev_inserted_shreds,
leader_schedule_cache,
reed_solomon_cache,
)
})?
Expand All @@ -987,49 +978,46 @@ impl Blockstore {
metrics: &mut BlockstoreInsertionMetrics,
) {
let mut start = Measure::start("Shred recovery");
if let Some(leader_schedule_cache) = leader_schedule {
let mut recovered_shreds = Vec::new();
let recovered_data_shreds: Vec<_> = self
.try_shred_recovery(
&shred_insertion_tracker.erasure_metas,
&shred_insertion_tracker.index_working_set,
&shred_insertion_tracker.just_inserted_shreds,
leader_schedule_cache,
reed_solomon_cache,
)
.map(|mut shreds| {
// All shreds should be retransmitted, but because there
// are no more missing data shreds in the erasure batch,
// coding shreds are not stored in blockstore.
recovered_shreds
.extend(shred::drain_coding_shreds(&mut shreds).map(Shred::into_payload));
recovered_shreds.extend(shreds.iter().map(Shred::payload).cloned());
shreds
})
.collect();
if !recovered_shreds.is_empty() {
let _ = retransmit_sender.send(recovered_shreds);
}
for shred in recovered_data_shreds.into_iter().flatten() {
metrics.num_recovered += 1;
*match self.check_insert_data_shred(
shred,
shred_insertion_tracker,
is_trusted,
leader_schedule,
ShredSource::Recovered,
) {
Err(InsertDataShredError::Exists) => &mut metrics.num_recovered_exists,
Err(InsertDataShredError::InvalidShred) => {
&mut metrics.num_recovered_failed_invalid
}
Err(InsertDataShredError::BlockstoreError(err)) => {
error!("blockstore error: {err}");
&mut metrics.num_recovered_blockstore_error
}
Ok(()) => &mut metrics.num_recovered_inserted,
} += 1;
}
let mut recovered_shreds = Vec::new();
let recovered_data_shreds: Vec<_> = self
.try_shred_recovery(
&shred_insertion_tracker.erasure_metas,
&shred_insertion_tracker.index_working_set,
&shred_insertion_tracker.just_inserted_shreds,
reed_solomon_cache,
)
.map(|mut shreds| {
// All shreds should be retransmitted, but because there are no
// more missing data shreds in the erasure batch, coding shreds
// are not stored in blockstore.
recovered_shreds
.extend(shred::drain_coding_shreds(&mut shreds).map(Shred::into_payload));
recovered_shreds.extend(shreds.iter().map(Shred::payload).cloned());
shreds
})
.collect();
if !recovered_shreds.is_empty() {
let _ = retransmit_sender.send(recovered_shreds);
}
for shred in recovered_data_shreds.into_iter().flatten() {
metrics.num_recovered += 1;
*match self.check_insert_data_shred(
shred,
shred_insertion_tracker,
is_trusted,
leader_schedule,
ShredSource::Recovered,
) {
Err(InsertDataShredError::Exists) => &mut metrics.num_recovered_exists,
Err(InsertDataShredError::InvalidShred) => {
&mut metrics.num_recovered_failed_invalid
}
Err(InsertDataShredError::BlockstoreError(err)) => {
error!("blockstore error: {err}");
&mut metrics.num_recovered_blockstore_error
}
Ok(()) => &mut metrics.num_recovered_inserted,
} += 1;
}
start.stop();
metrics.shred_recovery_elapsed_us += start.as_us();
Expand Down
65 changes: 22 additions & 43 deletions ledger/src/shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ pub(crate) use self::shred_code::MAX_CODE_SHREDS_PER_SLOT;
use {
self::{shred_code::ShredCode, traits::Shred as _},
crate::blockstore::{self, MAX_DATA_SHREDS_PER_SLOT},
assert_matches::debug_assert_matches,
bitflags::bitflags,
num_enum::{IntoPrimitive, TryFromPrimitive},
rayon::ThreadPool,
reed_solomon_erasure::Error::TooFewShardsPresent,
serde::{Deserialize, Serialize},
solana_entry::entry::{create_ticks, Entry},
solana_perf::packet::Packet,
Expand Down Expand Up @@ -1107,50 +1107,29 @@ impl TryFrom<u8> for ShredVariant {
}

pub(crate) fn recover(
shreds: Vec<Shred>,
shreds: impl IntoIterator<Item = Shred>,
reed_solomon_cache: &ReedSolomonCache,
get_slot_leader: impl Fn(Slot) -> Option<Pubkey>,
) -> Result<Vec<Shred>, Error> {
match shreds
.first()
.ok_or(TooFewShardsPresent)?
.common_header()
.shred_variant
{
ShredVariant::LegacyData | ShredVariant::LegacyCode => {
let mut shreds = Shredder::try_recovery(shreds, reed_solomon_cache)?;
shreds.retain(|shred| {
get_slot_leader(shred.slot())
.map(|pubkey| shred.verify(&pubkey))
.unwrap_or_default()
});
Ok(shreds)
}
ShredVariant::MerkleCode { .. } | ShredVariant::MerkleData { .. } => {
let shreds = shreds
.into_iter()
.map(merkle::Shred::try_from)
.collect::<Result<_, _>>()?;
// With Merkle shreds, leader signs the Merkle root of the erasure
// batch and all shreds within the same erasure batch have the same
// signature.
// For recovered shreds, the (unique) signature is copied from
// shreds which were received from turbine (or repair) and are
// already sig-verified.
// The same signature also verifies for recovered shreds because
// when reconstructing the Merkle tree for the erasure batch, we
// will obtain the same Merkle root.
merkle::recover(shreds, reed_solomon_cache)?
.map(|shred| {
let shred = Shred::from(shred?);
debug_assert!(get_slot_leader(shred.slot())
.map(|pubkey| shred.verify(&pubkey))
.unwrap_or_default());
Ok(shred)
})
.collect()
}
}
let shreds = shreds
.into_iter()
.map(|shred| {
debug_assert_matches!(
shred.common_header().shred_variant,
ShredVariant::MerkleCode { .. } | ShredVariant::MerkleData { .. }
);
merkle::Shred::try_from(shred)
})
.collect::<Result<_, _>>()?;
// With Merkle shreds, leader signs the Merkle root of the erasure batch
// and all shreds within the same erasure batch have the same signature.
// For recovered shreds, the (unique) signature is copied from shreds which
// were received from turbine (or repair) and are already sig-verified.
// The same signature also verifies for recovered shreds because when
// reconstructing the Merkle tree for the erasure batch, we will obtain the
// same Merkle root.
merkle::recover(shreds, reed_solomon_cache)?
.map(|shred| Ok(Shred::from(shred?)))
.collect()
}

#[allow(clippy::too_many_arguments)]
Expand Down

0 comments on commit 28acb1c

Please sign in to comment.