Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

approval-voting: Fix sending of assignments after restart #6973

Merged
merged 7 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion polkadot/node/core/approval-voting/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,16 @@ pub(crate) const LOG_TARGET: &str = "parachain::approval-voting";
// The max number of ticks we delay sending the approval after we are ready to issue the approval
const MAX_APPROVAL_COALESCE_WAIT_TICKS: Tick = 12;

// If the node restarted and the tranche has passed without the assignment
// being trigger, we won't trigger the assignment at restart because we don't have
// an wakeup schedule for it.
// The solution, is to always schedule a wake up after the restart and let the
// process_wakeup to decide if the assignment needs to be triggered.
// We need to have a delay after restart to give time to the node to catch up with
// messages and not trigger its assignment unnecessarily, because it hasn't seen
// the assignments from the other validators.
const RESTART_WAKEUP_DELAY: Tick = 12;

/// Configuration for the approval voting subsystem
#[derive(Debug, Clone)]
pub struct Config {
Expand Down Expand Up @@ -1732,7 +1742,20 @@ async fn distribution_messages_for_activation<Sender: SubsystemSender<RuntimeApi
match candidate_entry.approval_entry(&block_hash) {
Some(approval_entry) => {
match approval_entry.local_statements() {
(None, None) | (None, Some(_)) => {}, // second is impossible case.
(None, None) =>
if approval_entry
.our_assignment()
.map(|assignment| !assignment.triggered())
.unwrap_or(false)
{
actions.push(Action::ScheduleWakeup {
block_hash,
block_number: block_entry.block_number(),
candidate_hash: *candidate_hash,
tick: state.clock.tick_now() + RESTART_WAKEUP_DELAY,
})
},
(None, Some(_)) => {}, // second is impossible case.
(Some(assignment), None) => {
let claimed_core_indices =
get_core_indices_on_startup(&assignment.cert().kind, *core_index);
Expand Down
246 changes: 246 additions & 0 deletions polkadot/node/core/approval-voting/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5234,6 +5234,252 @@ fn subsystem_sends_assignment_approval_in_correct_order_on_approval_restart() {
});
}

// Test that if the subsystem missed the triggering of some tranches because it was not running
// it launches the missed assignements on restart.
#[test]
fn subsystem_launches_missed_assignments_on_restart() {
let test_tranche = 20;
let assignment_criteria = Box::new(MockAssignmentCriteria(
move || {
let mut assignments = HashMap::new();
let _ = assignments.insert(
CoreIndex(0),
approval_db::v2::OurAssignment {
cert: garbage_assignment_cert_v2(AssignmentCertKindV2::RelayVRFDelay {
core_index: CoreIndex(0),
}),
tranche: test_tranche,
validator_index: ValidatorIndex(0),
triggered: false,
}
.into(),
);

assignments
},
|_| Ok(0),
));
let config = HarnessConfigBuilder::default().assignment_criteria(assignment_criteria).build();
let store = config.backend();
let store_clone = config.backend();

test_harness(config, |test_harness| async move {
let TestHarness { mut virtual_overseer, clock, sync_oracle_handle } = test_harness;

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(rx)) => {
rx.send(Ok(0)).unwrap();
}
);

let block_hash = Hash::repeat_byte(0x01);
let fork_block_hash = Hash::repeat_byte(0x02);
let candidate_commitments = CandidateCommitments::default();
let mut candidate_receipt = dummy_candidate_receipt_v2(block_hash);
candidate_receipt.commitments_hash = candidate_commitments.hash();
let candidate_hash = candidate_receipt.hash();
let slot = Slot::from(1);
let (chain_builder, _session_info) = build_chain_with_two_blocks_with_one_candidate_each(
block_hash,
fork_block_hash,
slot,
sync_oracle_handle,
candidate_receipt,
)
.await;
chain_builder.build(&mut virtual_overseer).await;

assert!(!clock.inner.lock().current_wakeup_is(1));
clock.inner.lock().wakeup_all(1);

assert!(clock.inner.lock().current_wakeup_is(slot_to_tick(slot) + test_tranche as u64));
clock.inner.lock().wakeup_all(slot_to_tick(slot));

futures_timer::Delay::new(Duration::from_millis(200)).await;

clock.inner.lock().wakeup_all(slot_to_tick(slot + 2));

assert_eq!(clock.inner.lock().wakeups.len(), 0);

futures_timer::Delay::new(Duration::from_millis(200)).await;

let candidate_entry = store.load_candidate_entry(&candidate_hash).unwrap().unwrap();
let our_assignment =
candidate_entry.approval_entry(&block_hash).unwrap().our_assignment().unwrap();
assert!(!our_assignment.triggered());

// Assignment is not triggered because its tranches has not been reached.
virtual_overseer
});

// Restart a new approval voting subsystem with the same database and major syncing true until
// the last leaf.
let config = HarnessConfigBuilder::default().backend(store_clone).major_syncing(true).build();

test_harness(config, |test_harness| async move {
let TestHarness { mut virtual_overseer, clock, sync_oracle_handle } = test_harness;
let slot = Slot::from(1);
// 1. Set the clock to the to a tick past the tranche where the assignment should be
// triggered.
clock.inner.lock().set_tick(slot_to_tick(slot) + 2 * test_tranche as u64);
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(rx)) => {
rx.send(Ok(0)).unwrap();
}
);

let block_hash = Hash::repeat_byte(0x01);
let fork_block_hash = Hash::repeat_byte(0x02);
let candidate_commitments = CandidateCommitments::default();
let mut candidate_receipt = dummy_candidate_receipt_v2(block_hash);
candidate_receipt.commitments_hash = candidate_commitments.hash();
let (chain_builder, session_info) = build_chain_with_two_blocks_with_one_candidate_each(
block_hash,
fork_block_hash,
slot,
sync_oracle_handle,
candidate_receipt,
)
.await;

chain_builder.build(&mut virtual_overseer).await;

futures_timer::Delay::new(Duration::from_millis(2000)).await;

// On major syncing ending Approval voting should send all the necessary messages for a
// candidate to be approved.
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ApprovalDistribution(ApprovalDistributionMessage::NewBlocks(
_,
)) => {
}
);

clock
.inner
.lock()
.wakeup_all(slot_to_tick(slot) + 2 * test_tranche as u64 + RESTART_WAKEUP_DELAY - 1);

// Subsystem should not send any messages because the assignment is not triggered yet.
assert!(overseer_recv(&mut virtual_overseer).timeout(TIMEOUT / 2).await.is_none());

// Set the clock to the tick where the assignment should be triggered.
clock
.inner
.lock()
.wakeup_all(slot_to_tick(slot) + 2 * test_tranche as u64 + RESTART_WAKEUP_DELAY);

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(
_,
RuntimeApiRequest::SessionInfo(_, si_tx),
)
) => {
si_tx.send(Ok(Some(session_info.clone()))).unwrap();
}
);

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(
_,
RuntimeApiRequest::SessionExecutorParams(_, si_tx),
)
) => {
// Make sure all SessionExecutorParams calls are not made for the leaf (but for its relay parent)
si_tx.send(Ok(Some(ExecutorParams::default()))).unwrap();
}
);

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(_, RuntimeApiRequest::NodeFeatures(_, si_tx), )
) => {
si_tx.send(Ok(NodeFeatures::EMPTY)).unwrap();
}
);

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeAssignment(
_,
_,
)) => {
}
);

// Guarantees the approval work has been relaunched.
recover_available_data(&mut virtual_overseer).await;
fetch_validation_code(&mut virtual_overseer).await;

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::CandidateValidation(CandidateValidationMessage::ValidateFromExhaustive {
exec_kind,
response_sender,
..
}) if exec_kind == PvfExecKind::Approval => {
response_sender.send(Ok(ValidationResult::Valid(Default::default(), Default::default())))
.unwrap();
}
);

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(_, RuntimeApiRequest::ApprovalVotingParams(_, sender))) => {
let _ = sender.send(Ok(ApprovalVotingParams {
max_approval_coalesce_count: 1,
}));
}
);

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeApproval(_))
);

clock
.inner
.lock()
.wakeup_all(slot_to_tick(slot) + 2 * test_tranche as u64 + RESTART_WAKEUP_DELAY);

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeAssignment(
_,
_,
)) => {
}
);

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(_, RuntimeApiRequest::ApprovalVotingParams(_, sender))) => {
let _ = sender.send(Ok(ApprovalVotingParams {
max_approval_coalesce_count: 1,
}));
}
);

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeApproval(_))
);

// Assert that there are no more messages being sent by the subsystem
assert!(overseer_recv(&mut virtual_overseer).timeout(TIMEOUT / 2).await.is_none());

virtual_overseer
});
}

// Test we correctly update the timer when we mark the beginning of gathering assignments.
#[test]
fn test_gathering_assignments_statements() {
Expand Down
Loading