From 06e97f2ed89ee97ac255efb34ede44babf309d90 Mon Sep 17 00:00:00 2001 From: ksolana <110843012+ksolana@users.noreply.github.com> Date: Thu, 23 Jan 2025 12:17:31 -0800 Subject: [PATCH] ThreadLocalUnprocessedPackets Towards: #3357 --- .../unprocessed_transaction_storage.rs | 722 ------------------ 1 file changed, 722 deletions(-) diff --git a/core/src/banking_stage/unprocessed_transaction_storage.rs b/core/src/banking_stage/unprocessed_transaction_storage.rs index db94cc0a558663..187c433f8a4598 100644 --- a/core/src/banking_stage/unprocessed_transaction_storage.rs +++ b/core/src/banking_stage/unprocessed_transaction_storage.rs @@ -43,13 +43,6 @@ const MAX_NUM_VOTES_RECEIVE: usize = 10_000; #[derive(Debug)] pub enum UnprocessedTransactionStorage { VoteStorage(VoteStorage), - LocalTransactionStorage(ThreadLocalUnprocessedPackets), -} - -#[derive(Debug)] -pub struct ThreadLocalUnprocessedPackets { - unprocessed_packet_batches: UnprocessedPacketBatches, - thread_type: ThreadType, } #[derive(Debug)] @@ -253,15 +246,6 @@ where } impl UnprocessedTransactionStorage { - pub fn new_transaction_storage( - unprocessed_packet_batches: UnprocessedPacketBatches, - thread_type: ThreadType, - ) -> Self { - Self::LocalTransactionStorage(ThreadLocalUnprocessedPackets { - unprocessed_packet_batches, - thread_type, - }) - } pub fn new_vote_storage( latest_unprocessed_votes: Arc, @@ -276,32 +260,24 @@ impl UnprocessedTransactionStorage { pub fn is_empty(&self) -> bool { match self { Self::VoteStorage(vote_storage) => vote_storage.is_empty(), - Self::LocalTransactionStorage(transaction_storage) => transaction_storage.is_empty(), } } pub fn len(&self) -> usize { match self { Self::VoteStorage(vote_storage) => vote_storage.len(), - Self::LocalTransactionStorage(transaction_storage) => transaction_storage.len(), } } pub fn get_min_priority(&self) -> Option { match self { Self::VoteStorage(_) => None, - Self::LocalTransactionStorage(transaction_storage) => { - transaction_storage.get_min_compute_unit_price() - } } } pub fn get_max_priority(&self) -> Option { match self { Self::VoteStorage(_) => None, - Self::LocalTransactionStorage(transaction_storage) => { - transaction_storage.get_max_compute_unit_price() - } } } @@ -309,9 +285,6 @@ impl UnprocessedTransactionStorage { pub fn max_receive_size(&self) -> usize { match self { Self::VoteStorage(vote_storage) => vote_storage.max_receive_size(), - Self::LocalTransactionStorage(transaction_storage) => { - transaction_storage.max_receive_size() - } } } @@ -327,7 +300,6 @@ impl UnprocessedTransactionStorage { #[cfg(test)] pub fn iter(&mut self) -> impl Iterator { match self { - Self::LocalTransactionStorage(transaction_storage) => transaction_storage.iter(), _ => panic!(), } } @@ -335,15 +307,11 @@ impl UnprocessedTransactionStorage { pub fn forward_option(&self) -> ForwardOption { match self { Self::VoteStorage(vote_storage) => vote_storage.forward_option(), - Self::LocalTransactionStorage(transaction_storage) => { - transaction_storage.forward_option() - } } } pub fn clear_forwarded_packets(&mut self) { match self { - Self::LocalTransactionStorage(transaction_storage) => transaction_storage.clear(), // Since we set everything as forwarded this is the same Self::VoteStorage(vote_storage) => vote_storage.clear_forwarded_packets(), } } @@ -356,9 +324,6 @@ impl UnprocessedTransactionStorage { Self::VoteStorage(vote_storage) => { InsertPacketBatchSummary::from(vote_storage.insert_batch(deserialized_packets)) } - Self::LocalTransactionStorage(transaction_storage) => InsertPacketBatchSummary::from( - transaction_storage.insert_batch(deserialized_packets), - ), } } @@ -368,11 +333,6 @@ impl UnprocessedTransactionStorage { forward_packet_batches_by_accounts: &mut ForwardPacketBatchesByAccounts, ) -> FilterForwardingResults { match self { - Self::LocalTransactionStorage(transaction_storage) => transaction_storage - .filter_forwardable_packets_and_add_batches( - bank, - forward_packet_batches_by_accounts, - ), Self::VoteStorage(vote_storage) => vote_storage .filter_forwardable_packets_and_add_batches( bank, @@ -399,13 +359,6 @@ impl UnprocessedTransactionStorage { ) -> Option>, { match self { - Self::LocalTransactionStorage(transaction_storage) => transaction_storage - .process_packets( - &bank, - banking_stage_stats, - slot_metrics_tracker, - processing_function, - ), Self::VoteStorage(vote_storage) => vote_storage.process_packets( bank, banking_stage_stats, @@ -417,7 +370,6 @@ impl UnprocessedTransactionStorage { pub(crate) fn cache_epoch_boundary_info(&mut self, bank: &Bank) { match self { - Self::LocalTransactionStorage(_) => (), Self::VoteStorage(vote_storage) => vote_storage.cache_epoch_boundary_info(bank), } } @@ -569,428 +521,6 @@ impl VoteStorage { } } -impl ThreadLocalUnprocessedPackets { - fn is_empty(&self) -> bool { - self.unprocessed_packet_batches.is_empty() - } - - pub fn thread_type(&self) -> ThreadType { - self.thread_type - } - - fn len(&self) -> usize { - self.unprocessed_packet_batches.len() - } - - pub fn get_min_compute_unit_price(&self) -> Option { - self.unprocessed_packet_batches.get_min_compute_unit_price() - } - - pub fn get_max_compute_unit_price(&self) -> Option { - self.unprocessed_packet_batches.get_max_compute_unit_price() - } - - fn max_receive_size(&self) -> usize { - self.unprocessed_packet_batches.capacity() - self.unprocessed_packet_batches.len() - } - - #[cfg(test)] - fn iter(&mut self) -> impl Iterator { - self.unprocessed_packet_batches.iter() - } - - pub fn iter_mut(&mut self) -> impl Iterator { - self.unprocessed_packet_batches.iter_mut() - } - - fn forward_option(&self) -> ForwardOption { - match self.thread_type { - ThreadType::Transactions => ForwardOption::ForwardTransaction, - ThreadType::Voting(VoteSource::Tpu) => ForwardOption::ForwardTpuVote, - ThreadType::Voting(VoteSource::Gossip) => ForwardOption::NotForward, - } - } - - fn clear(&mut self) { - self.unprocessed_packet_batches.clear(); - } - - fn insert_batch( - &mut self, - deserialized_packets: Vec, - ) -> PacketBatchInsertionMetrics { - self.unprocessed_packet_batches.insert_batch( - deserialized_packets - .into_iter() - .map(DeserializedPacket::from_immutable_section), - ) - } - - /// Filter out packets that fail to sanitize, or are no longer valid (could be - /// too old, a duplicate of something already processed). Doing this in batches to avoid - /// checking bank's blockhash and status cache per transaction which could be bad for performance. - /// Added valid and sanitized packets to forwarding queue. - fn filter_forwardable_packets_and_add_batches( - &mut self, - bank: Arc, - forward_buffer: &mut ForwardPacketBatchesByAccounts, - ) -> FilterForwardingResults { - let mut total_forwardable_packets: usize = 0; - let mut total_packet_conversion_us: u64 = 0; - let mut total_filter_packets_us: u64 = 0; - let mut total_dropped_packets: usize = 0; - - let mut original_priority_queue = self.take_priority_queue(); - let original_capacity = original_priority_queue.capacity(); - let mut new_priority_queue = MinMaxHeap::with_capacity(original_capacity); - - // indicates if `forward_buffer` still accept more packets, see details at - // `ForwardPacketBatchesByAccounts.rs`. - let mut accepting_packets = true; - // batch iterate through self.unprocessed_packet_batches in desc priority order - new_priority_queue.extend( - original_priority_queue - .drain_desc() - .chunks(UNPROCESSED_BUFFER_STEP_SIZE) - .into_iter() - .flat_map(|packets_to_process| { - // Only process packets not yet forwarded - let (forwarded_packets, packets_to_forward) = - self.prepare_packets_to_forward(packets_to_process); - - [ - forwarded_packets, - if accepting_packets { - let ( - (sanitized_transactions, transaction_to_packet_indexes), - packet_conversion_us, - ) = measure_us!(self.sanitize_unforwarded_packets( - &packets_to_forward, - &bank, - &mut total_dropped_packets - )); - saturating_add_assign!( - total_packet_conversion_us, - packet_conversion_us - ); - - let (forwardable_transaction_indexes, filter_packets_us) = - measure_us!(Self::filter_invalid_transactions( - &sanitized_transactions, - &bank, - &mut total_dropped_packets - )); - saturating_add_assign!(total_filter_packets_us, filter_packets_us); - saturating_add_assign!( - total_forwardable_packets, - forwardable_transaction_indexes.len() - ); - - let accepted_packet_indexes = - Self::add_filtered_packets_to_forward_buffer( - forward_buffer, - &packets_to_forward, - &sanitized_transactions, - &transaction_to_packet_indexes, - &forwardable_transaction_indexes, - &mut total_dropped_packets, - &bank.feature_set, - ); - accepting_packets = accepted_packet_indexes.len() - == forwardable_transaction_indexes.len(); - - self.unprocessed_packet_batches - .mark_accepted_packets_as_forwarded( - &packets_to_forward, - &accepted_packet_indexes, - ); - - Self::collect_retained_packets( - &mut self.unprocessed_packet_batches.message_hash_to_transaction, - &packets_to_forward, - &Self::prepare_filtered_packet_indexes( - &transaction_to_packet_indexes, - &forwardable_transaction_indexes, - ), - ) - } else { - // skip sanitizing and filtering if not longer able to add more packets for forwarding - saturating_add_assign!(total_dropped_packets, packets_to_forward.len()); - packets_to_forward - }, - ] - .concat() - }), - ); - - // replace packet priority queue - self.unprocessed_packet_batches.packet_priority_queue = new_priority_queue; - self.verify_priority_queue(original_capacity); - - // Assert unprocessed queue is still consistent - assert_eq!( - self.unprocessed_packet_batches.packet_priority_queue.len(), - self.unprocessed_packet_batches - .message_hash_to_transaction - .len() - ); - - FilterForwardingResults { - total_forwardable_packets, - total_dropped_packets, - total_packet_conversion_us, - total_filter_packets_us, - } - } - - /// Take self.unprocessed_packet_batches's priority_queue out, leave empty MinMaxHeap in its place. - fn take_priority_queue(&mut self) -> MinMaxHeap> { - std::mem::replace( - &mut self.unprocessed_packet_batches.packet_priority_queue, - MinMaxHeap::new(), // <-- no need to reserve capacity as we will replace this - ) - } - - /// Verify that the priority queue and map are consistent and that original capacity is maintained. - fn verify_priority_queue(&self, original_capacity: usize) { - // Assert unprocessed queue is still consistent and maintains original capacity - assert_eq!( - self.unprocessed_packet_batches - .packet_priority_queue - .capacity(), - original_capacity - ); - assert_eq!( - self.unprocessed_packet_batches.packet_priority_queue.len(), - self.unprocessed_packet_batches - .message_hash_to_transaction - .len() - ); - } - - /// sanitize un-forwarded packet into SanitizedTransaction for validation and forwarding. - fn sanitize_unforwarded_packets( - &mut self, - packets_to_process: &[Arc], - bank: &Bank, - total_dropped_packets: &mut usize, - ) -> (Vec>, Vec) { - // Get ref of ImmutableDeserializedPacket - let deserialized_packets = packets_to_process.iter().map(|p| &**p); - let (transactions, transaction_to_packet_indexes): (Vec<_>, Vec<_>) = deserialized_packets - .enumerate() - .filter_map(|(packet_index, deserialized_packet)| { - deserialized_packet - .build_sanitized_transaction( - bank.vote_only_bank(), - bank, - bank.get_reserved_account_keys(), - ) - .map(|(transaction, _deactivation_slot)| (transaction, packet_index)) - }) - .unzip(); - - let filtered_count = packets_to_process.len().saturating_sub(transactions.len()); - saturating_add_assign!(*total_dropped_packets, filtered_count); - - (transactions, transaction_to_packet_indexes) - } - - /// Checks sanitized transactions against bank, returns valid transaction indexes - fn filter_invalid_transactions( - transactions: &[RuntimeTransaction], - bank: &Bank, - total_dropped_packets: &mut usize, - ) -> Vec { - let filter = vec![Ok(()); transactions.len()]; - let results = bank.check_transactions_with_forwarding_delay( - transactions, - &filter, - FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, - ); - - let filtered_count = transactions.len().saturating_sub(results.len()); - saturating_add_assign!(*total_dropped_packets, filtered_count); - - results - .iter() - .enumerate() - .filter_map(|(tx_index, result)| result.as_ref().ok().map(|_| tx_index)) - .collect_vec() - } - - fn prepare_filtered_packet_indexes( - transaction_to_packet_indexes: &[usize], - retained_transaction_indexes: &[usize], - ) -> Vec { - retained_transaction_indexes - .iter() - .map(|tx_index| transaction_to_packet_indexes[*tx_index]) - .collect_vec() - } - - /// try to add filtered forwardable and valid packets to forward buffer; - /// returns vector of packet indexes that were accepted for forwarding. - fn add_filtered_packets_to_forward_buffer( - forward_buffer: &mut ForwardPacketBatchesByAccounts, - packets_to_process: &[Arc], - transactions: &[RuntimeTransaction], - transaction_to_packet_indexes: &[usize], - forwardable_transaction_indexes: &[usize], - total_dropped_packets: &mut usize, - feature_set: &FeatureSet, - ) -> Vec { - let mut added_packets_count: usize = 0; - let mut accepted_packet_indexes = Vec::with_capacity(transaction_to_packet_indexes.len()); - for forwardable_transaction_index in forwardable_transaction_indexes { - let sanitized_transaction = &transactions[*forwardable_transaction_index]; - let forwardable_packet_index = - transaction_to_packet_indexes[*forwardable_transaction_index]; - let immutable_deserialized_packet = - packets_to_process[forwardable_packet_index].clone(); - if !forward_buffer.try_add_packet( - sanitized_transaction, - immutable_deserialized_packet, - feature_set, - ) { - break; - } - accepted_packet_indexes.push(forwardable_packet_index); - saturating_add_assign!(added_packets_count, 1); - } - - let filtered_count = forwardable_transaction_indexes - .len() - .saturating_sub(added_packets_count); - saturating_add_assign!(*total_dropped_packets, filtered_count); - - accepted_packet_indexes - } - - fn collect_retained_packets( - message_hash_to_transaction: &mut HashMap, - packets_to_process: &[Arc], - retained_packet_indexes: &[usize], - ) -> Vec> { - Self::remove_non_retained_packets( - message_hash_to_transaction, - packets_to_process, - retained_packet_indexes, - ); - retained_packet_indexes - .iter() - .map(|i| packets_to_process[*i].clone()) - .collect_vec() - } - - /// remove packets from UnprocessedPacketBatches.message_hash_to_transaction after they have - /// been removed from UnprocessedPacketBatches.packet_priority_queue - fn remove_non_retained_packets( - message_hash_to_transaction: &mut HashMap, - packets_to_process: &[Arc], - retained_packet_indexes: &[usize], - ) { - filter_processed_packets( - retained_packet_indexes - .iter() - .chain(std::iter::once(&packets_to_process.len())), - |start, end| { - for processed_packet in &packets_to_process[start..end] { - message_hash_to_transaction.remove(processed_packet.message_hash()); - } - }, - ) - } - - // returns `true` if reached end of slot - fn process_packets( - &mut self, - bank: &Bank, - banking_stage_stats: &BankingStageStats, - slot_metrics_tracker: &mut LeaderSlotMetricsTracker, - mut processing_function: F, - ) -> bool - where - F: FnMut( - &Vec>, - &mut ConsumeScannerPayload, - ) -> Option>, - { - let mut retryable_packets = self.take_priority_queue(); - let original_capacity = retryable_packets.capacity(); - let mut new_retryable_packets = MinMaxHeap::with_capacity(original_capacity); - let all_packets_to_process = retryable_packets.drain_desc().collect_vec(); - - let should_process_packet = - |packet: &Arc, payload: &mut ConsumeScannerPayload| { - consume_scan_should_process_packet(bank, banking_stage_stats, packet, payload) - }; - let mut scanner = create_consume_multi_iterator( - &all_packets_to_process, - slot_metrics_tracker, - &mut self.unprocessed_packet_batches.message_hash_to_transaction, - should_process_packet, - ); - - while let Some((packets_to_process, payload)) = scanner.iterate() { - let packets_to_process = packets_to_process - .iter() - .map(|p| (*p).clone()) - .collect_vec(); - let retryable_packets = if let Some(retryable_transaction_indexes) = - processing_function(&packets_to_process, payload) - { - Self::collect_retained_packets( - payload.message_hash_to_transaction, - &packets_to_process, - &retryable_transaction_indexes, - ) - } else { - packets_to_process - }; - - new_retryable_packets.extend(retryable_packets); - } - - let reached_end_of_slot = scanner.finalize().payload.reached_end_of_slot; - - self.unprocessed_packet_batches.packet_priority_queue = new_retryable_packets; - self.verify_priority_queue(original_capacity); - - reached_end_of_slot - } - - /// Prepare a chunk of packets for forwarding, filter out already forwarded packets while - /// counting tracers. - /// Returns Vec of unforwarded packets, and Vec of same size each indicates corresponding - /// packet is tracer packet. - fn prepare_packets_to_forward( - &self, - packets_to_forward: impl Iterator>, - ) -> ( - Vec>, - Vec>, - ) { - let mut forwarded_packets: Vec> = vec![]; - let forwardable_packets = packets_to_forward - .into_iter() - .filter_map(|immutable_deserialized_packet| { - if !self - .unprocessed_packet_batches - .is_forwarded(&immutable_deserialized_packet) - { - Some(immutable_deserialized_packet) - } else { - forwarded_packets.push(immutable_deserialized_packet); - None - } - }) - .collect(); - - (forwarded_packets, forwardable_packets) - } -} - #[cfg(test)] mod tests { use { @@ -1095,166 +625,6 @@ mod tests { DeserializedPacket::new(p).unwrap() }) .collect_vec(); - - // all packets are forwarded - { - let buffered_packet_batches: UnprocessedPacketBatches = - UnprocessedPacketBatches::from_iter(packets.clone(), packets.len()); - let mut transaction_storage = UnprocessedTransactionStorage::new_transaction_storage( - buffered_packet_batches, - ThreadType::Transactions, - ); - let mut forward_packet_batches_by_accounts = - ForwardPacketBatchesByAccounts::new_with_default_batch_limits(); - - let FilterForwardingResults { - total_forwardable_packets, - .. - } = transaction_storage.filter_forwardable_packets_and_add_batches( - current_bank.clone(), - &mut forward_packet_batches_by_accounts, - ); - assert_eq!(total_forwardable_packets, 256); - - // packets in a batch are forwarded in arbitrary order; verify the ports match after - // sorting - let expected_ports: Vec<_> = (0..256).collect(); - let mut forwarded_ports: Vec<_> = forward_packet_batches_by_accounts - .iter_batches() - .flat_map(|batch| batch.get_forwardable_packets().map(|p| p.meta().port)) - .collect(); - forwarded_ports.sort_unstable(); - assert_eq!(expected_ports, forwarded_ports); - } - - // some packets are forwarded - { - let num_already_forwarded = 16; - for packet in &mut packets[0..num_already_forwarded] { - packet.forwarded = true; - } - let buffered_packet_batches: UnprocessedPacketBatches = - UnprocessedPacketBatches::from_iter(packets.clone(), packets.len()); - let mut transaction_storage = UnprocessedTransactionStorage::new_transaction_storage( - buffered_packet_batches, - ThreadType::Transactions, - ); - let mut forward_packet_batches_by_accounts = - ForwardPacketBatchesByAccounts::new_with_default_batch_limits(); - let FilterForwardingResults { - total_forwardable_packets, - .. - } = transaction_storage.filter_forwardable_packets_and_add_batches( - current_bank.clone(), - &mut forward_packet_batches_by_accounts, - ); - assert_eq!( - total_forwardable_packets, - packets.len() - num_already_forwarded - ); - } - - // some packets are invalid (already processed) - { - let num_already_processed = 16; - for tx in &simple_transactions[0..num_already_processed] { - assert_eq!(current_bank.process_transaction(tx), Ok(())); - } - let buffered_packet_batches: UnprocessedPacketBatches = - UnprocessedPacketBatches::from_iter(packets.clone(), packets.len()); - let mut transaction_storage = UnprocessedTransactionStorage::new_transaction_storage( - buffered_packet_batches, - ThreadType::Transactions, - ); - let mut forward_packet_batches_by_accounts = - ForwardPacketBatchesByAccounts::new_with_default_batch_limits(); - let FilterForwardingResults { - total_forwardable_packets, - .. - } = transaction_storage.filter_forwardable_packets_and_add_batches( - current_bank, - &mut forward_packet_batches_by_accounts, - ); - assert_eq!( - total_forwardable_packets, - packets.len() - num_already_processed - ); - } - } - - #[test] - fn test_unprocessed_transaction_storage_insert() -> Result<(), Box> { - let keypair = Keypair::new(); - let vote_keypair = Keypair::new(); - let pubkey = solana_pubkey::new_rand(); - - let small_transfer = Packet::from_data( - None, - system_transaction::transfer(&keypair, &pubkey, 1, Hash::new_unique()), - )?; - let mut vote = Packet::from_data( - None, - new_tower_sync_transaction( - TowerSync::default(), - Hash::new_unique(), - &keypair, - &vote_keypair, - &vote_keypair, - None, - ), - )?; - vote.meta_mut().flags.set(PacketFlags::SIMPLE_VOTE_TX, true); - let big_transfer = Packet::from_data( - None, - system_transaction::transfer(&keypair, &pubkey, 1000000, Hash::new_unique()), - )?; - - for thread_type in [ - ThreadType::Transactions, - ThreadType::Voting(VoteSource::Gossip), - ThreadType::Voting(VoteSource::Tpu), - ] { - let mut transaction_storage = UnprocessedTransactionStorage::new_transaction_storage( - UnprocessedPacketBatches::with_capacity(100), - thread_type, - ); - transaction_storage.insert_batch(vec![ - ImmutableDeserializedPacket::new(small_transfer.clone())?, - ImmutableDeserializedPacket::new(vote.clone())?, - ImmutableDeserializedPacket::new(big_transfer.clone())?, - ]); - let deserialized_packets = transaction_storage - .iter() - .map(|packet| packet.immutable_section().original_packet().clone()) - .collect_vec(); - assert_eq!(3, deserialized_packets.len()); - assert!(deserialized_packets.contains(&small_transfer)); - assert!(deserialized_packets.contains(&vote)); - assert!(deserialized_packets.contains(&big_transfer)); - } - - for (vote_source, staked) in iproduct!( - [VoteSource::Gossip, VoteSource::Tpu].into_iter(), - [true, false].into_iter() - ) { - let staked_keys = if staked { - vec![vote_keypair.pubkey()] - } else { - vec![] - }; - let latest_unprocessed_votes = LatestUnprocessedVotes::new_for_tests(&staked_keys); - let mut transaction_storage = UnprocessedTransactionStorage::new_vote_storage( - Arc::new(latest_unprocessed_votes), - vote_source, - ); - transaction_storage.insert_batch(vec![ - ImmutableDeserializedPacket::new(small_transfer.clone())?, - ImmutableDeserializedPacket::new(vote.clone())?, - ImmutableDeserializedPacket::new(big_transfer.clone())?, - ]); - assert_eq!(if staked { 1 } else { 0 }, transaction_storage.len()); - } - Ok(()) } #[test] @@ -1310,96 +680,4 @@ mod tests { assert_eq!(1, transaction_storage.len()); Ok(()) } - - #[test] - fn test_prepare_packets_to_forward() { - solana_logger::setup(); - let GenesisConfigInfo { - genesis_config, - mint_keypair, - .. - } = create_genesis_config(10); - - let simple_transactions: Vec = (0..256) - .map(|_id| { - // packets are deserialized upon receiving, failed packets will not be - // forwarded; Therefore we need to create real packets here. - let key1 = Keypair::new(); - system_transaction::transfer( - &mint_keypair, - &key1.pubkey(), - genesis_config.rent.minimum_balance(0), - genesis_config.hash(), - ) - }) - .collect_vec(); - - let mut packets: Vec = simple_transactions - .iter() - .enumerate() - .map(|(packets_id, transaction)| { - let mut p = Packet::from_data(None, transaction).unwrap(); - p.meta_mut().port = packets_id as u16; - DeserializedPacket::new(p).unwrap() - }) - .collect_vec(); - - // test preparing buffered packets for forwarding - let test_prepareing_buffered_packets_for_forwarding = - |buffered_packet_batches: UnprocessedPacketBatches| -> usize { - let mut total_packets_to_forward: usize = 0; - - let mut unprocessed_transactions = ThreadLocalUnprocessedPackets { - unprocessed_packet_batches: buffered_packet_batches, - thread_type: ThreadType::Transactions, - }; - - let mut original_priority_queue = unprocessed_transactions.take_priority_queue(); - let _ = original_priority_queue - .drain_desc() - .chunks(128usize) - .into_iter() - .flat_map(|packets_to_process| { - let (_, packets_to_forward) = - unprocessed_transactions.prepare_packets_to_forward(packets_to_process); - total_packets_to_forward += packets_to_forward.len(); - packets_to_forward - }) - .collect::>>(); - total_packets_to_forward - }; - - { - let buffered_packet_batches: UnprocessedPacketBatches = - UnprocessedPacketBatches::from_iter(packets.clone(), packets.len()); - let total_packets_to_forward = - test_prepareing_buffered_packets_for_forwarding(buffered_packet_batches); - assert_eq!(total_packets_to_forward, 256); - } - - // some packets are forwarded - { - let num_already_forwarded = 16; - for packet in &mut packets[0..num_already_forwarded] { - packet.forwarded = true; - } - let buffered_packet_batches: UnprocessedPacketBatches = - UnprocessedPacketBatches::from_iter(packets.clone(), packets.len()); - let total_packets_to_forward = - test_prepareing_buffered_packets_for_forwarding(buffered_packet_batches); - assert_eq!(total_packets_to_forward, 256 - num_already_forwarded); - } - - // all packets are forwarded - { - for packet in &mut packets { - packet.forwarded = true; - } - let buffered_packet_batches: UnprocessedPacketBatches = - UnprocessedPacketBatches::from_iter(packets.clone(), packets.len()); - let total_packets_to_forward = - test_prepareing_buffered_packets_for_forwarding(buffered_packet_batches); - assert_eq!(total_packets_to_forward, 0); - } - } }