Skip to content

Commit

Permalink
Merge branch 'master' into wen_restart_play_unfrozen_block
Browse files Browse the repository at this point in the history
  • Loading branch information
wen-coding authored Jan 25, 2025
2 parents d892ee4 + e134669 commit a22a20e
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 140 deletions.
31 changes: 15 additions & 16 deletions core/src/completed_data_sets_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,11 @@ impl CompletedDataSetsService {
rpc_subscriptions: &RpcSubscriptions,
max_slots: &Arc<MaxSlots>,
) -> Result<(), RecvTimeoutError> {
let completed_data_sets = completed_sets_receiver.recv_timeout(Duration::from_secs(1))?;
let mut max_slot = 0;
for completed_set_info in std::iter::once(completed_data_sets)
.chain(completed_sets_receiver.try_iter())
.flatten()
{
let CompletedDataSetInfo {
slot,
start_index,
end_index,
} = completed_set_info;
max_slot = max_slot.max(slot);
const RECV_TIMEOUT: Duration = Duration::from_secs(1);
let handle_completed_data_set_info = |completed_data_set_info| {
let CompletedDataSetInfo { slot, indices } = completed_data_set_info;
let start_index = indices.start;
let end_index = indices.end - 1;
match blockstore.get_entries_in_data_block(slot, start_index, end_index, None) {
Ok(entries) => {
let transactions = Self::get_transaction_signatures(entries);
Expand All @@ -85,11 +78,17 @@ impl CompletedDataSetsService {
}
Err(e) => warn!("completed-data-set-service deserialize error: {:?}", e),
}
slot
};
let slots = completed_sets_receiver
.recv_timeout(RECV_TIMEOUT)
.map(std::iter::once)?
.chain(completed_sets_receiver.try_iter())
.flatten()
.map(handle_completed_data_set_info);
if let Some(slot) = slots.max() {
max_slots.shred_insert.fetch_max(slot, Ordering::Relaxed);
}
max_slots
.shred_insert
.fetch_max(max_slot, Ordering::Relaxed);

Ok(())
}

Expand Down
134 changes: 63 additions & 71 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ use {
fmt::Write,
fs::{self, File},
io::{Error as IoError, ErrorKind},
ops::Bound,
ops::{Bound, Range},
path::{Path, PathBuf},
rc::Rc,
sync::{
Expand Down Expand Up @@ -208,18 +208,12 @@ pub struct InsertResults {
///
/// `solana_core::completed_data_sets_service::CompletedDataSetsService` is the main receiver of
/// `CompletedDataSetInfo`.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct CompletedDataSetInfo {
/// [`Slot`] to which the [`Shred`]s in this set belong.
pub slot: Slot,

/// Index of the first [`Shred`] in the range of shreds that belong to this set.
/// Range is inclusive, `start_index..=end_index`.
pub start_index: u32,

/// Index of the last [`Shred`] in the range of shreds that belong to this set.
/// Range is inclusive, `start_index..=end_index`.
pub end_index: u32,
/// Data [`Shred`]s' indices in this set.
pub indices: Range<u32>,
}

pub struct BlockstoreSignals {
Expand Down Expand Up @@ -1061,7 +1055,7 @@ impl Blockstore {
shred,
erasure_meta,
&shred_insertion_tracker.just_inserted_shreds,
&mut shred_insertion_tracker.merkle_root_metas,
&shred_insertion_tracker.merkle_root_metas,
&mut shred_insertion_tracker.duplicate_shreds,
);
}
Expand Down Expand Up @@ -1882,7 +1876,7 @@ impl Blockstore {
shred: &Shred,
erasure_meta: &ErasureMeta,
just_inserted_shreds: &HashMap<ShredId, Shred>,
merkle_root_metas: &mut HashMap<ErasureSetId, WorkingEntry<MerkleRootMeta>>,
merkle_root_metas: &HashMap<ErasureSetId, WorkingEntry<MerkleRootMeta>>,
duplicate_shreds: &mut Vec<PossibleDuplicateShred>,
) -> bool {
debug_assert!(erasure_meta.check_coding_shred(shred));
Expand Down Expand Up @@ -2186,14 +2180,14 @@ impl Blockstore {
}
}

fn insert_data_shred(
fn insert_data_shred<'a>(
&self,
slot_meta: &mut SlotMeta,
data_index: &mut ShredIndex,
data_index: &'a mut ShredIndex,
shred: &Shred,
write_batch: &mut WriteBatch,
shred_source: ShredSource,
) -> Result<Vec<CompletedDataSetInfo>> {
) -> Result<impl Iterator<Item = CompletedDataSetInfo> + 'a> {
let slot = shred.slot();
let index = u64::from(shred.index());

Expand Down Expand Up @@ -2242,13 +2236,7 @@ impl Blockstore {
shred.reference_tick(),
data_index,
)
.into_iter()
.map(|(start_index, end_index)| CompletedDataSetInfo {
slot,
start_index,
end_index,
})
.collect();
.map(move |indices| CompletedDataSetInfo { slot, indices });

self.slots_stats.record_shred(
shred.slot(),
Expand Down Expand Up @@ -4699,53 +4687,55 @@ impl Blockstore {
}
}

// Update the `completed_data_indexes` with a new shred `new_shred_index`. If a
// data set is complete, return the range of shred indexes [start_index, end_index]
// Updates the `completed_data_indexes` with a new shred `new_shred_index`.
// If a data set is complete, returns the range of shred indexes
// start_index..end_index
// for that completed data set.
fn update_completed_data_indexes(
fn update_completed_data_indexes<'a>(
is_last_in_data: bool,
new_shred_index: u32,
received_data_shreds: &ShredIndex,
received_data_shreds: &'a ShredIndex,
// Shreds indices which are marked data complete.
completed_data_indexes: &mut BTreeSet<u32>,
) -> Vec<(u32, u32)> {
let start_shred_index = completed_data_indexes
.range(..new_shred_index)
.next_back()
.map(|index| index + 1)
.unwrap_or_default();
// Consecutive entries i, k, j in this vector represent potential ranges [i, k),
// [k, j) that could be completed data ranges
let mut shred_indices = vec![start_shred_index];
// `new_shred_index` is data complete, so need to insert here into the
// `completed_data_indexes`
if is_last_in_data {
completed_data_indexes.insert(new_shred_index);
shred_indices.push(new_shred_index + 1);
}
if let Some(index) = completed_data_indexes.range(new_shred_index + 1..).next() {
shred_indices.push(index + 1);
}
shred_indices
.windows(2)
.filter(|ix| {
let (begin, end) = (ix[0] as u64, ix[1] as u64);
let num_shreds = (end - begin) as usize;
received_data_shreds.range(begin..end).count() == num_shreds
})
.map(|ix| (ix[0], ix[1] - 1))
.collect()
) -> impl Iterator<Item = Range<u32>> + 'a {
// Consecutive entries i, j, k in this array represent potential ranges
// [i, j), [j, k) that could be completed data ranges
[
completed_data_indexes
.range(..new_shred_index)
.next_back()
.map(|index| index + 1)
.or(Some(0u32)),
is_last_in_data.then(|| {
// new_shred_index is data complete, so need to insert here into
// the completed_data_indexes.
completed_data_indexes.insert(new_shred_index);
new_shred_index + 1
}),
completed_data_indexes
.range(new_shred_index + 1..)
.next()
.map(|index| index + 1),
]
.into_iter()
.flatten()
.tuple_windows()
.filter(|&(start, end)| {
let bounds = u64::from(start)..u64::from(end);
received_data_shreds.range(bounds.clone()).eq(bounds)
})
.map(|(start, end)| start..end)
}

fn update_slot_meta(
fn update_slot_meta<'a>(
is_last_in_slot: bool,
is_last_in_data: bool,
slot_meta: &mut SlotMeta,
index: u32,
new_consumed: u64,
reference_tick: u8,
received_data_shreds: &ShredIndex,
) -> Vec<(u32, u32)> {
received_data_shreds: &'a ShredIndex,
) -> impl Iterator<Item = Range<u32>> + 'a {
let first_insert = slot_meta.received == 0;
// Index is zero-indexed, while the "received" height starts from 1,
// so received = index + 1 for the same shred.
Expand Down Expand Up @@ -6090,8 +6080,7 @@ pub mod tests {
.unwrap(),
vec![CompletedDataSetInfo {
slot,
start_index: 0,
end_index: num_shreds as u32 - 1
indices: 0..num_shreds as u32,
}]
);
// Inserting shreds again doesn't trigger notification
Expand Down Expand Up @@ -10676,10 +10665,13 @@ pub mod tests {

for i in 0..10 {
shred_index.insert(i as u64);
assert_eq!(
update_completed_data_indexes(true, i, &shred_index, &mut completed_data_indexes),
vec![(i, i)]
);
assert!(update_completed_data_indexes(
true,
i,
&shred_index,
&mut completed_data_indexes
)
.eq(std::iter::once(i..i + 1)));
assert!(completed_data_indexes.iter().copied().eq(0..=i));
}
}
Expand All @@ -10692,39 +10684,39 @@ pub mod tests {
shred_index.insert(4);
assert!(
update_completed_data_indexes(false, 4, &shred_index, &mut completed_data_indexes)
.is_empty()
.eq([])
);
assert!(completed_data_indexes.is_empty());

shred_index.insert(2);
assert!(
update_completed_data_indexes(false, 2, &shred_index, &mut completed_data_indexes)
.is_empty()
.eq([])
);
assert!(completed_data_indexes.is_empty());

shred_index.insert(3);
assert!(
update_completed_data_indexes(true, 3, &shred_index, &mut completed_data_indexes)
.is_empty()
.eq([])
);
assert!(completed_data_indexes.iter().eq([3].iter()));

// Inserting data complete shred 1 now confirms the range of shreds [2, 3]
// is part of the same data set
shred_index.insert(1);
assert_eq!(
update_completed_data_indexes(true, 1, &shred_index, &mut completed_data_indexes),
vec![(2, 3)]
assert!(
update_completed_data_indexes(true, 1, &shred_index, &mut completed_data_indexes)
.eq(std::iter::once(2..4))
);
assert!(completed_data_indexes.iter().eq([1, 3].iter()));

// Inserting data complete shred 0 now confirms the range of shreds [0]
// is part of the same data set
shred_index.insert(0);
assert_eq!(
update_completed_data_indexes(true, 0, &shred_index, &mut completed_data_indexes),
vec![(0, 0), (1, 1)]
assert!(
update_completed_data_indexes(true, 0, &shred_index, &mut completed_data_indexes)
.eq([0..1, 1..2])
);
assert!(completed_data_indexes.iter().eq([0, 1, 3].iter()));
}
Expand Down
Loading

0 comments on commit a22a20e

Please sign in to comment.