Skip to content

Commit

Permalink
Support tx poh recording in unified scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
ryoqun committed Dec 17, 2024
1 parent e27e2be commit 427736f
Show file tree
Hide file tree
Showing 16 changed files with 391 additions and 35 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions ledger/benches/blockstore_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ fn bench_execute_batch(
&mut timing,
None,
&prioritization_fee_cache,
None::<fn() -> Option<Option<usize>>>,
);
}
});
Expand Down
155 changes: 132 additions & 23 deletions ledger/src/blockstore_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,25 +110,30 @@ fn first_err(results: &[Result<()>]) -> Result<()> {
fn get_first_error(
batch: &TransactionBatch<impl SVMTransaction>,
commit_results: &[TransactionCommitResult],
is_block_producing_unified_scheduler: bool,
) -> Option<(Result<()>, Signature)> {
let mut first_err = None;
for (commit_result, transaction) in commit_results.iter().zip(batch.sanitized_transactions()) {
if let Err(err) = commit_result {
if first_err.is_none() {
first_err = Some((Err(err.clone()), *transaction.signature()));
}
warn!(
"Unexpected validator error: {:?}, transaction: {:?}",
err, transaction
);
datapoint_error!(
"validator_process_entry_error",
(
"error",
format!("error: {err:?}, transaction: {transaction:?}"),
String
)
);
// Skip with block producing unified scheduler because it's quite common to observe
// transaction errors...
if !is_block_producing_unified_scheduler {
warn!(
"Unexpected validator error: {:?}, transaction: {:?}",
err, transaction
);
datapoint_error!(
"validator_process_entry_error",
(
"error",
format!("error: {err:?}, transaction: {transaction:?}"),
String
)
);
}
}
}
first_err
Expand All @@ -150,12 +155,14 @@ pub fn execute_batch(
timings: &mut ExecuteTimings,
log_messages_bytes_limit: Option<usize>,
prioritization_fee_cache: &PrioritizationFeeCache,
pre_commit_callback: Option<impl FnOnce() -> Option<Option<usize>>>,
) -> Result<()> {
let TransactionBatchWithIndexes {
batch,
transaction_indexes,
} = batch;
let record_token_balances = transaction_status_sender.is_some();
let mut transaction_indexes = transaction_indexes.to_vec();

let mut mint_decimals: HashMap<Pubkey, u8> = HashMap::new();

Expand All @@ -165,14 +172,34 @@ pub fn execute_batch(
vec![]
};

let (commit_results, balances) = batch.bank().load_execute_and_commit_transactions(
batch,
MAX_PROCESSING_AGE,
transaction_status_sender.is_some(),
ExecutionRecordingConfig::new_single_setting(transaction_status_sender.is_some()),
timings,
log_messages_bytes_limit,
);
let is_block_producing_unified_scheduler = pre_commit_callback.is_some();
let pre_commit_callback = pre_commit_callback.map(|wrapped_callback| {
|| {
wrapped_callback()
.inspect(|&maybe_index| {
if let Some(index) = maybe_index {
assert!(transaction_indexes.is_empty());
transaction_indexes.push(index);
}
})
.is_some()
}
});

let Some((commit_results, balances)) = batch
.bank()
.load_execute_and_commit_transactions_with_pre_commit_callback(
batch,
MAX_PROCESSING_AGE,
transaction_status_sender.is_some(),
ExecutionRecordingConfig::new_single_setting(transaction_status_sender.is_some()),
timings,
log_messages_bytes_limit,
pre_commit_callback,
)
else {
return Err(TransactionError::CommitFailed);
};

bank_utils::find_and_send_votes(
batch.sanitized_transactions(),
Expand Down Expand Up @@ -201,7 +228,7 @@ pub fn execute_batch(
.filter_map(|(commit_result, tx)| commit_result.was_committed().then_some(tx))
.collect_vec();

let first_err = get_first_error(batch, &commit_results);
let first_err = get_first_error(batch, &commit_results, is_block_producing_unified_scheduler);

if let Some(transaction_status_sender) = transaction_status_sender {
let transactions: Vec<SanitizedTransaction> = batch
Expand All @@ -224,7 +251,7 @@ pub fn execute_batch(
commit_results,
balances,
token_balances,
transaction_indexes.to_vec(),
transaction_indexes,
);
}

Expand Down Expand Up @@ -322,6 +349,7 @@ fn execute_batches_internal(
&mut timings,
log_messages_bytes_limit,
prioritization_fee_cache,
None::<fn() -> Option<Option<usize>>>,
));

let thread_index = replay_tx_thread_pool.current_thread_index().unwrap();
Expand Down Expand Up @@ -2210,11 +2238,13 @@ pub fn process_single_slot(
}

#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum TransactionStatusMessage {
Batch(TransactionStatusBatch),
Freeze(Slot),
}

#[derive(Debug)]
pub struct TransactionStatusBatch {
pub slot: Slot,
pub transactions: Vec<SanitizedTransaction>,
Expand Down Expand Up @@ -4433,7 +4463,7 @@ pub mod tests {
&mut ExecuteTimings::default(),
None,
);
let (err, signature) = get_first_error(&batch, &commit_results).unwrap();
let (err, signature) = get_first_error(&batch, &commit_results, false).unwrap();
assert_eq!(err.unwrap_err(), TransactionError::AccountNotFound);
assert_eq!(signature, account_not_found_sig);
}
Expand Down Expand Up @@ -5082,6 +5112,85 @@ pub mod tests {
do_test_schedule_batches_for_execution(false);
}

fn do_test_execute_batch_pre_commit_callback(poh_result: Option<Option<usize>>) {
solana_logger::setup();
let dummy_leader_pubkey = solana_sdk::pubkey::new_rand();
let GenesisConfigInfo {
genesis_config,
mint_keypair,
..
} = create_genesis_config_with_leader(500, &dummy_leader_pubkey, 100);
let bank = Bank::new_for_tests(&genesis_config);
let (bank, _bank_forks) = bank.wrap_with_bank_forks_for_tests();
let bank = Arc::new(bank);
let txs = create_test_transactions(&mint_keypair, &genesis_config.hash());
let mut batch = TransactionBatch::new(
vec![Ok(()); 1],
&bank,
OwnedOrBorrowed::Borrowed(&txs[0..1]),
);
batch.set_needs_unlock(false);
let poh_with_index = matches!(poh_result, Some(Some(_)));
let transaction_indexes = if poh_with_index { vec![] } else { vec![3] };
let batch = TransactionBatchWithIndexes {
batch,
transaction_indexes,
};
let prioritization_fee_cache = PrioritizationFeeCache::default();
let mut timing = ExecuteTimings::default();
let (sender, receiver) = crossbeam_channel::unbounded();

assert_eq!(bank.transaction_count(), 0);
let result = execute_batch(
&batch,
&bank,
Some(&TransactionStatusSender { sender }),
None,
&mut timing,
None,
&prioritization_fee_cache,
Some(|| poh_result),
);
let should_succeed = poh_result.is_some();
if should_succeed {
assert_matches!(result, Ok(()));
assert_eq!(bank.transaction_count(), 1);
} else {
assert_matches!(result, Err(TransactionError::CommitFailed));
assert_eq!(bank.transaction_count(), 0);
}
if poh_with_index {
assert_matches!(
receiver.try_recv(),
Ok(TransactionStatusMessage::Batch(TransactionStatusBatch{transaction_indexes, ..}))
if transaction_indexes == vec![4_usize]
);
} else if should_succeed {
assert_matches!(
receiver.try_recv(),
Ok(TransactionStatusMessage::Batch(TransactionStatusBatch{transaction_indexes, ..}))
if transaction_indexes == vec![3_usize]
);
} else {
assert_matches!(receiver.try_recv(), Err(_));
}
}

#[test]
fn test_execute_batch_pre_commit_callback_success() {
do_test_execute_batch_pre_commit_callback(Some(None));
}

#[test]
fn test_execute_batch_pre_commit_callback_success_with_index() {
do_test_execute_batch_pre_commit_callback(Some(Some(4)));
}

#[test]
fn test_execute_batch_pre_commit_callback_failure() {
do_test_execute_batch_pre_commit_callback(None);
}

#[test]
fn test_confirm_slot_entries_with_fix() {
const HASHES_PER_TICK: u64 = 10;
Expand Down
38 changes: 35 additions & 3 deletions poh/src/poh_recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ pub struct RecordTransactionsSummary {
pub starting_transaction_index: Option<usize>,
}

#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct TransactionRecorder {
// shared by all users of PohRecorder
pub record_sender: Sender<Record>,
Expand Down Expand Up @@ -1144,11 +1144,12 @@ impl PohRecorder {
}
}

pub fn create_test_recorder(
fn do_create_test_recorder(
bank: Arc<Bank>,
blockstore: Arc<Blockstore>,
poh_config: Option<PohConfig>,
leader_schedule_cache: Option<Arc<LeaderScheduleCache>>,
track_transaction_indexes: bool,
) -> (
Arc<AtomicBool>,
Arc<RwLock<PohRecorder>>,
Expand All @@ -1174,7 +1175,10 @@ pub fn create_test_recorder(
);
let ticks_per_slot = bank.ticks_per_slot();

poh_recorder.set_bank(BankWithScheduler::new_without_scheduler(bank), false);
poh_recorder.set_bank(
BankWithScheduler::new_without_scheduler(bank),
track_transaction_indexes,
);
let poh_recorder = Arc::new(RwLock::new(poh_recorder));
let poh_service = PohService::new(
poh_recorder.clone(),
Expand All @@ -1189,6 +1193,34 @@ pub fn create_test_recorder(
(exit, poh_recorder, poh_service, entry_receiver)
}

pub fn create_test_recorder(
bank: Arc<Bank>,
blockstore: Arc<Blockstore>,
poh_config: Option<PohConfig>,
leader_schedule_cache: Option<Arc<LeaderScheduleCache>>,
) -> (
Arc<AtomicBool>,
Arc<RwLock<PohRecorder>>,
PohService,
Receiver<WorkingBankEntry>,
) {
do_create_test_recorder(bank, blockstore, poh_config, leader_schedule_cache, false)
}

pub fn create_test_recorder_with_index_tracking(
bank: Arc<Bank>,
blockstore: Arc<Blockstore>,
poh_config: Option<PohConfig>,
leader_schedule_cache: Option<Arc<LeaderScheduleCache>>,
) -> (
Arc<AtomicBool>,
Arc<RwLock<PohRecorder>>,
PohService,
Receiver<WorkingBankEntry>,
) {
do_create_test_recorder(bank, blockstore, poh_config, leader_schedule_cache, true)
}

#[cfg(test)]
mod tests {
use {
Expand Down
2 changes: 2 additions & 0 deletions programs/sbf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ solana-svm-transaction = { workspace = true }
solana-system-program = { workspace = true, optional = true }
solana-timings = { workspace = true }
solana-transaction-status-client-types = { workspace = true }
solana-unified-scheduler-logic = { workspace = true }
solana-version = { workspace = true }
solana-vote = { workspace = true }
solana-vote-program = { workspace = true }
Expand Down
Loading

0 comments on commit 427736f

Please sign in to comment.