Skip to content

Commit

Permalink
Merge branch 'master' into pg/evm-debug
Browse files Browse the repository at this point in the history
  • Loading branch information
pgherveou committed Jan 14, 2025
2 parents b04f19d + 6878ba1 commit 096ad62
Show file tree
Hide file tree
Showing 46 changed files with 2,111 additions and 414 deletions.
137 changes: 130 additions & 7 deletions polkadot/node/core/approval-voting/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
//! of others. It uses this information to determine when candidates and blocks have
//! been sufficiently approved to finalize.
use futures_timer::Delay;
use polkadot_node_primitives::{
approval::{
v1::{BlockApprovalMeta, DelayTranche},
Expand Down Expand Up @@ -122,6 +123,9 @@ const APPROVAL_CHECKING_TIMEOUT: Duration = Duration::from_secs(120);
const WAIT_FOR_SIGS_TIMEOUT: Duration = Duration::from_millis(500);
const APPROVAL_CACHE_SIZE: u32 = 1024;

/// The maximum number of times we retry to approve a block if is still needed.
const MAX_APPROVAL_RETRIES: u32 = 16;

const APPROVAL_DELAY: Tick = 2;
pub(crate) const LOG_TARGET: &str = "parachain::approval-voting";

Expand Down Expand Up @@ -165,6 +169,10 @@ pub struct ApprovalVotingSubsystem {
metrics: Metrics,
clock: Arc<dyn Clock + Send + Sync>,
spawner: Arc<dyn overseer::gen::Spawner + 'static>,
/// The maximum time we retry to approve a block if it is still needed and PoV fetch failed.
max_approval_retries: u32,
/// The backoff before we retry the approval.
retry_backoff: Duration,
}

#[derive(Clone)]
Expand Down Expand Up @@ -493,6 +501,8 @@ impl ApprovalVotingSubsystem {
metrics,
Arc::new(SystemClock {}),
spawner,
MAX_APPROVAL_RETRIES,
APPROVAL_CHECKING_TIMEOUT / 2,
)
}

Expand All @@ -505,6 +515,8 @@ impl ApprovalVotingSubsystem {
metrics: Metrics,
clock: Arc<dyn Clock + Send + Sync>,
spawner: Arc<dyn overseer::gen::Spawner + 'static>,
max_approval_retries: u32,
retry_backoff: Duration,
) -> Self {
ApprovalVotingSubsystem {
keystore,
Expand All @@ -515,6 +527,8 @@ impl ApprovalVotingSubsystem {
metrics,
clock,
spawner,
max_approval_retries,
retry_backoff,
}
}

Expand Down Expand Up @@ -706,18 +720,53 @@ enum ApprovalOutcome {
TimedOut,
}

#[derive(Clone)]
struct RetryApprovalInfo {
candidate: CandidateReceipt,
backing_group: GroupIndex,
executor_params: ExecutorParams,
core_index: Option<CoreIndex>,
session_index: SessionIndex,
attempts_remaining: u32,
backoff: Duration,
}

struct ApprovalState {
validator_index: ValidatorIndex,
candidate_hash: CandidateHash,
approval_outcome: ApprovalOutcome,
retry_info: Option<RetryApprovalInfo>,
}

impl ApprovalState {
fn approved(validator_index: ValidatorIndex, candidate_hash: CandidateHash) -> Self {
Self { validator_index, candidate_hash, approval_outcome: ApprovalOutcome::Approved }
Self {
validator_index,
candidate_hash,
approval_outcome: ApprovalOutcome::Approved,
retry_info: None,
}
}
fn failed(validator_index: ValidatorIndex, candidate_hash: CandidateHash) -> Self {
Self { validator_index, candidate_hash, approval_outcome: ApprovalOutcome::Failed }
Self {
validator_index,
candidate_hash,
approval_outcome: ApprovalOutcome::Failed,
retry_info: None,
}
}

fn failed_with_retry(
validator_index: ValidatorIndex,
candidate_hash: CandidateHash,
retry_info: Option<RetryApprovalInfo>,
) -> Self {
Self {
validator_index,
candidate_hash,
approval_outcome: ApprovalOutcome::Failed,
retry_info,
}
}
}

Expand Down Expand Up @@ -757,6 +806,7 @@ impl CurrentlyCheckingSet {
candidate_hash,
validator_index,
approval_outcome: ApprovalOutcome::TimedOut,
retry_info: None,
},
Some(approval_state) => approval_state,
}
Expand Down Expand Up @@ -1271,25 +1321,63 @@ where
validator_index,
candidate_hash,
approval_outcome,
retry_info,
}
) = approval_state;

if matches!(approval_outcome, ApprovalOutcome::Approved) {
let mut approvals: Vec<Action> = relay_block_hashes
.into_iter()
.iter()
.map(|block_hash|
Action::IssueApproval(
candidate_hash,
ApprovalVoteRequest {
validator_index,
block_hash,
block_hash: *block_hash,
},
)
)
.collect();
actions.append(&mut approvals);
}

if let Some(retry_info) = retry_info {
for block_hash in relay_block_hashes {
if overlayed_db.load_block_entry(&block_hash).map(|block_info| block_info.is_some()).unwrap_or(false) {
let sender = to_other_subsystems.clone();
let spawn_handle = subsystem.spawner.clone();
let metrics = subsystem.metrics.clone();
let retry_info = retry_info.clone();
let executor_params = retry_info.executor_params.clone();
let candidate = retry_info.candidate.clone();

currently_checking_set
.insert_relay_block_hash(
candidate_hash,
validator_index,
block_hash,
async move {
launch_approval(
sender,
spawn_handle,
metrics,
retry_info.session_index,
candidate,
validator_index,
block_hash,
retry_info.backing_group,
executor_params,
retry_info.core_index,
retry_info,
)
.await
},
)
.await?;
}
}
}

actions
},
(block_hash, validator_index) = delayed_approvals_timers.select_next_some() => {
Expand Down Expand Up @@ -1340,6 +1428,8 @@ where
&mut approvals_cache,
&mut subsystem.mode,
actions,
subsystem.max_approval_retries,
subsystem.retry_backoff,
)
.await?
{
Expand Down Expand Up @@ -1389,6 +1479,8 @@ pub async fn start_approval_worker<
metrics,
clock,
spawner,
MAX_APPROVAL_RETRIES,
APPROVAL_CHECKING_TIMEOUT / 2,
);
let backend = DbBackend::new(db.clone(), approval_voting.db_config);
let spawner = approval_voting.spawner.clone();
Expand Down Expand Up @@ -1456,6 +1548,8 @@ async fn handle_actions<
approvals_cache: &mut LruMap<CandidateHash, ApprovalOutcome>,
mode: &mut Mode,
actions: Vec<Action>,
max_approval_retries: u32,
retry_backoff: Duration,
) -> SubsystemResult<bool> {
let mut conclude = false;
let mut actions_iter = actions.into_iter();
Expand Down Expand Up @@ -1542,6 +1636,16 @@ async fn handle_actions<
let sender = sender.clone();
let spawn_handle = spawn_handle.clone();

let retry = RetryApprovalInfo {
candidate: candidate.clone(),
backing_group,
executor_params: executor_params.clone(),
core_index,
session_index: session,
attempts_remaining: max_approval_retries,
backoff: retry_backoff,
};

currently_checking_set
.insert_relay_block_hash(
candidate_hash,
Expand All @@ -1559,6 +1663,7 @@ async fn handle_actions<
backing_group,
executor_params,
core_index,
retry,
)
.await
},
Expand Down Expand Up @@ -3329,6 +3434,7 @@ async fn launch_approval<
backing_group: GroupIndex,
executor_params: ExecutorParams,
core_index: Option<CoreIndex>,
retry: RetryApprovalInfo,
) -> SubsystemResult<RemoteHandle<ApprovalState>> {
let (a_tx, a_rx) = oneshot::channel();
let (code_tx, code_rx) = oneshot::channel();
Expand Down Expand Up @@ -3360,6 +3466,7 @@ async fn launch_approval<

let candidate_hash = candidate.hash();
let para_id = candidate.descriptor.para_id();
let mut next_retry = None;
gum::trace!(target: LOG_TARGET, ?candidate_hash, ?para_id, "Recovering data.");

let timer = metrics.time_recover_and_approve();
Expand Down Expand Up @@ -3388,7 +3495,6 @@ async fn launch_approval<
let background = async move {
// Force the move of the timer into the background task.
let _timer = timer;

let available_data = match a_rx.await {
Err(_) => return ApprovalState::failed(validator_index, candidate_hash),
Ok(Ok(a)) => a,
Expand All @@ -3399,10 +3505,27 @@ async fn launch_approval<
target: LOG_TARGET,
?para_id,
?candidate_hash,
attempts_remaining = retry.attempts_remaining,
"Data unavailable for candidate {:?}",
(candidate_hash, candidate.descriptor.para_id()),
);
// do nothing. we'll just be a no-show and that'll cause others to rise up.
// Availability could fail if we did not discover much of the network, so
// let's back off and order the subsystem to retry at a later point if the
// approval is still needed, because no-show wasn't covered yet.
if retry.attempts_remaining > 0 {
Delay::new(retry.backoff).await;
next_retry = Some(RetryApprovalInfo {
candidate,
backing_group,
executor_params,
core_index,
session_index,
attempts_remaining: retry.attempts_remaining - 1,
backoff: retry.backoff,
});
} else {
next_retry = None;
}
metrics_guard.take().on_approval_unavailable();
},
&RecoveryError::ChannelClosed => {
Expand Down Expand Up @@ -3433,7 +3556,7 @@ async fn launch_approval<
metrics_guard.take().on_approval_invalid();
},
}
return ApprovalState::failed(validator_index, candidate_hash)
return ApprovalState::failed_with_retry(validator_index, candidate_hash, next_retry)
},
};

Expand Down
Loading

0 comments on commit 096ad62

Please sign in to comment.