Skip to content

Commit

Permalink
refactor: split check_balance into submethods (near#11314)
Browse files Browse the repository at this point in the history
Extract from check_balance the code that extracts details from receipts,
transaction, and state.

These submethods are extracted:
- buffered_receipts()
- potential_postponed_receipt_ids()
- all_touched_accounts()
- validator_rewards()

The first two are just extracted without modification.

The last two used to be intertwined. I think its cleaner to read to
separate them.
  • Loading branch information
jakmeier authored May 15, 2024
1 parent 5f5a569 commit ff2385d
Showing 1 changed file with 127 additions and 73 deletions.
200 changes: 127 additions & 73 deletions runtime/runtime/src/balance_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ use near_parameters::{ActionCosts, RuntimeConfig};
use near_primitives::errors::{
BalanceMismatchError, IntegerOverflowError, RuntimeError, StorageError,
};
use near_primitives::hash::CryptoHash;
use near_primitives::receipt::{Receipt, ReceiptEnum};
use near_primitives::transaction::SignedTransaction;
use near_primitives::trie_key::TrieKey;
use near_primitives::types::{AccountId, Balance};
use near_store::trie::receipts_column_helper::{ShardsOutgoingReceiptBuffer, TrieQueue};
use near_store::{
get, get_account, get_postponed_receipt, get_promise_yield_receipt, TrieAccess, TrieUpdate,
get, get_account, get_postponed_receipt, get_promise_yield_receipt, Trie, TrieAccess,
TrieUpdate,
};
use std::collections::{BTreeSet, HashSet};

Expand Down Expand Up @@ -125,43 +127,11 @@ fn total_postponed_receipts_cost(
})
}

#[tracing::instrument(target = "runtime", level = "debug", "check_balance", skip_all, fields(
transactions.len = transactions.len(),
incoming_receipts.len = incoming_receipts.len(),
yield_timeout_receipts.len = yield_timeout_receipts.len(),
outgoing_receipts.len = outgoing_receipts.len()
))]
pub(crate) fn check_balance(
config: &RuntimeConfig,
/// Compute balance going in and out of the outgoing receipt buffer.
fn buffered_receipts(
initial_state: &Trie,
final_state: &TrieUpdate,
validator_accounts_update: &Option<ValidatorAccountsUpdate>,
incoming_receipts: &[Receipt],
yield_timeout_receipts: &[Receipt],
transactions: &[SignedTransaction],
outgoing_receipts: &[Receipt],
stats: &ApplyStats,
) -> Result<(), RuntimeError> {
let initial_state = final_state.trie();

// Delayed receipts
let initial_delayed_receipt_indices: DelayedReceiptIndices =
get(initial_state, &TrieKey::DelayedReceiptIndices)?.unwrap_or_default();
let final_delayed_receipt_indices: DelayedReceiptIndices =
get(final_state, &TrieKey::DelayedReceiptIndices)?.unwrap_or_default();

// Previously delayed receipts that were processed this time.
let processed_delayed_receipts = get_delayed_receipts(
initial_state,
initial_delayed_receipt_indices.first_index..final_delayed_receipt_indices.first_index,
)?;
// Receipts that were not processed this time and are delayed now.
let new_delayed_receipts = get_delayed_receipts(
final_state,
initial_delayed_receipt_indices.next_available_index
..final_delayed_receipt_indices.next_available_index,
)?;

// Buffered receipts
) -> Result<(Vec<Receipt>, Vec<Receipt>), RuntimeError> {
let mut initial_buffers = ShardsOutgoingReceiptBuffer::load(initial_state)?;
let mut final_buffers = ShardsOutgoingReceiptBuffer::load(final_state)?;
let mut forwarded_receipts: Vec<Receipt> = vec![];
Expand Down Expand Up @@ -193,50 +163,57 @@ pub(crate) fn check_balance(
}
}

// Accounts
Ok((forwarded_receipts, new_buffered_receipts))
}

/// Find account ids of all accounts touched in receipts, transactions, and
/// validator updates.
fn all_touched_accounts(
incoming_receipts: &[Receipt],
yield_timeout_receipts: &[Receipt],
processed_delayed_receipts: &[Receipt],
transactions: &[SignedTransaction],
validator_accounts_update: &Option<ValidatorAccountsUpdate>,
) -> Result<HashSet<AccountId>, RuntimeError> {
let mut all_accounts_ids: HashSet<AccountId> = transactions
.iter()
.map(|tx| tx.transaction.signer_id().clone())
.chain(incoming_receipts.iter().map(|r| r.receiver_id().clone()))
.chain(yield_timeout_receipts.iter().map(|r| r.receiver_id().clone()))
.chain(processed_delayed_receipts.iter().map(|r| r.receiver_id().clone()))
.collect();
let incoming_validator_rewards =
if let Some(validator_accounts_update) = validator_accounts_update {
all_accounts_ids.extend(validator_accounts_update.stake_info.keys().cloned());
all_accounts_ids.extend(validator_accounts_update.validator_rewards.keys().cloned());
all_accounts_ids.extend(validator_accounts_update.last_proposals.keys().cloned());
all_accounts_ids.extend(validator_accounts_update.slashing_info.keys().cloned());
if let Some(account_id) = &validator_accounts_update.protocol_treasury_account_id {
all_accounts_ids.insert(account_id.clone());
}
validator_accounts_update
.validator_rewards
.values()
.try_fold(0u128, |res, balance| safe_add_balance(res, *balance))?
} else {
0
};

let initial_accounts_balance = total_accounts_balance(initial_state, &all_accounts_ids)?;
let final_accounts_balance = total_accounts_balance(final_state, &all_accounts_ids)?;
// Receipts
let receipts_cost = |receipts: &[Receipt]| -> Result<Balance, IntegerOverflowError> {
total_receipts_cost(config, receipts)
if let Some(validator_accounts_update) = validator_accounts_update {
all_accounts_ids.extend(validator_accounts_update.stake_info.keys().cloned());
all_accounts_ids.extend(validator_accounts_update.validator_rewards.keys().cloned());
all_accounts_ids.extend(validator_accounts_update.last_proposals.keys().cloned());
all_accounts_ids.extend(validator_accounts_update.slashing_info.keys().cloned());
if let Some(account_id) = &validator_accounts_update.protocol_treasury_account_id {
all_accounts_ids.insert(account_id.clone());
}
};
let incoming_receipts_balance =
receipts_cost(incoming_receipts)? + receipts_cost(yield_timeout_receipts)?;
let outgoing_receipts_balance = receipts_cost(outgoing_receipts)?;
let processed_delayed_receipts_balance = receipts_cost(&processed_delayed_receipts)?;
let new_delayed_receipts_balance = receipts_cost(&new_delayed_receipts)?;
let forwarded_buffered_receipts_balance = receipts_cost(&forwarded_receipts)?;
let new_buffered_receipts_balance = receipts_cost(&new_buffered_receipts)?;

// Postponed actions receipts. The receipts can be postponed and stored with the receiver's
// account ID when the input data is not received yet.
// We calculate all potential receipts IDs that might be postponed initially or after the
// execution.
let all_potential_postponed_receipt_ids = incoming_receipts
Ok(all_accounts_ids)
}

fn validator_rewards(
validator_accounts_update: &ValidatorAccountsUpdate,
) -> Result<Balance, IntegerOverflowError> {
validator_accounts_update
.validator_rewards
.values()
.try_fold(0u128, |res, balance| safe_add_balance(res, *balance))
}

/// The receipts can be postponed and stored with the receiver's account ID when
/// the input data is not received yet. We calculate all potential receipts IDs
/// that might be postponed initially or after the execution.
fn potential_postponed_receipt_ids(
incoming_receipts: &[Receipt],
yield_timeout_receipts: &[Receipt],
processed_delayed_receipts: &[Receipt],
initial_state: &Trie,
) -> Result<HashSet<(PostponedReceiptType, AccountId, CryptoHash)>, StorageError> {
incoming_receipts
.iter()
.chain(processed_delayed_receipts.iter())
.chain(yield_timeout_receipts.iter())
Expand Down Expand Up @@ -276,7 +253,84 @@ pub(crate) fn check_balance(
))),
}
})
.collect::<Result<HashSet<_>, StorageError>>()?;
.collect::<Result<HashSet<_>, StorageError>>()
}

#[tracing::instrument(target = "runtime", level = "debug", "check_balance", skip_all, fields(
transactions.len = transactions.len(),
incoming_receipts.len = incoming_receipts.len(),
yield_timeout_receipts.len = yield_timeout_receipts.len(),
outgoing_receipts.len = outgoing_receipts.len()
))]
pub(crate) fn check_balance(
config: &RuntimeConfig,
final_state: &TrieUpdate,
validator_accounts_update: &Option<ValidatorAccountsUpdate>,
incoming_receipts: &[Receipt],
yield_timeout_receipts: &[Receipt],
transactions: &[SignedTransaction],
outgoing_receipts: &[Receipt],
stats: &ApplyStats,
) -> Result<(), RuntimeError> {
let initial_state = final_state.trie();

// Delayed receipts
let initial_delayed_receipt_indices: DelayedReceiptIndices =
get(initial_state, &TrieKey::DelayedReceiptIndices)?.unwrap_or_default();
let final_delayed_receipt_indices: DelayedReceiptIndices =
get(final_state, &TrieKey::DelayedReceiptIndices)?.unwrap_or_default();

// Previously delayed receipts that were processed this time.
let processed_delayed_receipts = get_delayed_receipts(
initial_state,
initial_delayed_receipt_indices.first_index..final_delayed_receipt_indices.first_index,
)?;
// Receipts that were not processed this time and are delayed now.
let new_delayed_receipts = get_delayed_receipts(
final_state,
initial_delayed_receipt_indices.next_available_index
..final_delayed_receipt_indices.next_available_index,
)?;

// Buffered receipts
let (forwarded_receipts, new_buffered_receipts) =
buffered_receipts(initial_state, final_state)?;

// Accounts
let all_accounts_ids = all_touched_accounts(
incoming_receipts,
yield_timeout_receipts,
&processed_delayed_receipts,
transactions,
validator_accounts_update,
)?;

let initial_accounts_balance = total_accounts_balance(initial_state, &all_accounts_ids)?;
let final_accounts_balance = total_accounts_balance(final_state, &all_accounts_ids)?;

// Validator rewards
let incoming_validator_rewards =
validator_accounts_update.as_ref().map(validator_rewards).transpose()?.unwrap_or(0);

// Receipts
let receipts_cost = |receipts: &[Receipt]| -> Result<Balance, IntegerOverflowError> {
total_receipts_cost(config, receipts)
};
let incoming_receipts_balance =
receipts_cost(incoming_receipts)? + receipts_cost(yield_timeout_receipts)?;
let outgoing_receipts_balance = receipts_cost(outgoing_receipts)?;
let processed_delayed_receipts_balance = receipts_cost(&processed_delayed_receipts)?;
let new_delayed_receipts_balance = receipts_cost(&new_delayed_receipts)?;
let forwarded_buffered_receipts_balance = receipts_cost(&forwarded_receipts)?;
let new_buffered_receipts_balance = receipts_cost(&new_buffered_receipts)?;

// Postponed actions receipts.
let all_potential_postponed_receipt_ids = potential_postponed_receipt_ids(
incoming_receipts,
yield_timeout_receipts,
&processed_delayed_receipts,
initial_state,
)?;

let initial_postponed_receipts_balance =
total_postponed_receipts_cost(initial_state, config, &all_potential_postponed_receipt_ids)?;
Expand Down

0 comments on commit ff2385d

Please sign in to comment.