From ba29063908bd25c7bb9c3646ef33dd55d7e8b5b3 Mon Sep 17 00:00:00 2001 From: Jakob Meier Date: Sat, 16 Mar 2024 08:08:29 +0100 Subject: [PATCH] congestion: Max size, gas and bytes in summary (#10793) Instead of showing the snapshot of the model in the end, it is better to display the max values throughout the model execution in the final summary table. Also add gas and byte values of queues, rather than just the number of receipts. This makes the summary table much more useful to quickly compare the performance of many different strategies. --- Cargo.lock | 1 + tools/congestion-model/Cargo.toml | 1 + tools/congestion-model/src/evaluation/mod.rs | 31 +----- .../src/evaluation/queue_lengths.rs | 104 ++++++++++++++++++ .../src/evaluation/summary_table.rs | 37 ++++--- tools/congestion-model/src/lib.rs | 4 +- tools/congestion-model/src/main.rs | 21 +++- 7 files changed, 148 insertions(+), 51 deletions(-) create mode 100644 tools/congestion-model/src/evaluation/queue_lengths.rs diff --git a/Cargo.lock b/Cargo.lock index 718c06e0388..0b626dff437 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1232,6 +1232,7 @@ dependencies = [ name = "congestion-model" version = "0.0.0" dependencies = [ + "bytesize", "chrono", "clap", "csv", diff --git a/tools/congestion-model/Cargo.toml b/tools/congestion-model/Cargo.toml index d0c035b4cdf..9f6555b7d55 100644 --- a/tools/congestion-model/Cargo.toml +++ b/tools/congestion-model/Cargo.toml @@ -9,6 +9,7 @@ license.workspace = true publish = false [dependencies] +bytesize.workspace = true chrono.workspace = true clap = { workspace = true, features = ["derive"] } csv.workspace = true diff --git a/tools/congestion-model/src/evaluation/mod.rs b/tools/congestion-model/src/evaluation/mod.rs index 14c08d2ece5..a299833760b 100644 --- a/tools/congestion-model/src/evaluation/mod.rs +++ b/tools/congestion-model/src/evaluation/mod.rs @@ -1,19 +1,13 @@ use chrono::{Duration, Utc}; +pub use queue_lengths::{QueueStats, ShardQueueLengths}; pub use transaction_progress::TransactionStatus; -use crate::{GGas, Model, ShardId}; -use std::collections::HashMap; +use crate::{GGas, Model}; +mod queue_lengths; pub mod summary_table; mod transaction_progress; -#[derive(Debug, Clone)] -pub struct ShardQueueLengths { - pub unprocessed_incoming_transactions: u64, - pub incoming_receipts: u64, - pub queued_receipts: u64, -} - #[derive(Debug, Clone)] pub struct GasThroughput { pub total: GGas, @@ -31,25 +25,6 @@ pub struct Progress { pub type StatsWriter = Option>>; impl Model { - pub fn queue_lengths(&self) -> HashMap { - let mut out = HashMap::new(); - for shard in self.shard_ids.clone() { - let unprocessed_incoming_transactions = - self.queues.incoming_transactions(shard).len() as u64; - let incoming_receipts = self.queues.incoming_receipts(shard).len() as u64; - let total_shard_receipts: u64 = - self.queues.shard_queues(shard).map(|q| q.len() as u64).sum(); - - let shard_stats = ShardQueueLengths { - unprocessed_incoming_transactions, - incoming_receipts, - queued_receipts: total_shard_receipts - incoming_receipts, - }; - out.insert(shard, shard_stats); - } - out - } - pub fn gas_throughput(&self) -> GasThroughput { GasThroughput { total: self.transactions.all_transactions().map(|tx| tx.gas_burnt()).sum() } } diff --git a/tools/congestion-model/src/evaluation/queue_lengths.rs b/tools/congestion-model/src/evaluation/queue_lengths.rs new file mode 100644 index 00000000000..9c6dd66b0be --- /dev/null +++ b/tools/congestion-model/src/evaluation/queue_lengths.rs @@ -0,0 +1,104 @@ +use crate::{GGas, Model, Queue, ShardId}; +use std::collections::HashMap; +use std::iter::Sum; + +#[derive(Debug, Clone, Default)] +pub struct ShardQueueLengths { + /// Number of transactions waiting and not processed. + pub unprocessed_incoming_transactions: u64, + /// Receipts in the mailbox. + pub incoming_receipts: QueueStats, + /// Receipts in all internal queues plus the mailbox. + pub queued_receipts: QueueStats, +} + +#[derive(Debug, Clone, Copy, Default)] +pub struct QueueStats { + /// Number of messages. + pub num: u64, + /// Sum of messages sizes in bytes. + pub size: u64, + /// Sum of attached gas for all messages in the queue. + pub gas: GGas, +} + +impl Model { + /// Current queue lengths per shard. + pub fn queue_lengths(&self) -> HashMap { + let mut out = HashMap::new(); + for shard in self.shard_ids.clone() { + let unprocessed_incoming_transactions = + self.queues.incoming_transactions(shard).len() as u64; + let incoming_receipts = self.queues.incoming_receipts(shard).stats(); + let total_shard_receipts: QueueStats = + self.queues.shard_queues(shard).map(|q| q.stats()).sum(); + + let shard_stats = ShardQueueLengths { + unprocessed_incoming_transactions, + incoming_receipts, + queued_receipts: total_shard_receipts, + }; + out.insert(shard, shard_stats); + } + out + } + + /// Current max queue length stats. + pub fn max_queue_length(&self) -> ShardQueueLengths { + let mut out = ShardQueueLengths::default(); + for q in self.queue_lengths().values() { + out = out.max_component_wise(q); + } + out + } +} + +impl Queue { + pub fn stats(&self) -> QueueStats { + QueueStats { num: self.len() as u64, size: self.size(), gas: self.attached_gas() } + } +} + +impl ShardQueueLengths { + pub fn max_component_wise(&self, rhs: &Self) -> Self { + Self { + unprocessed_incoming_transactions: self + .unprocessed_incoming_transactions + .max(rhs.unprocessed_incoming_transactions), + incoming_receipts: self.incoming_receipts.max_component_wise(rhs.incoming_receipts), + queued_receipts: self.queued_receipts.max_component_wise(rhs.queued_receipts), + } + } +} + +impl QueueStats { + pub fn max_component_wise(&self, rhs: Self) -> Self { + Self { + num: self.num.max(rhs.num), + size: self.size.max(rhs.size), + gas: self.gas.max(rhs.gas), + } + } +} + +impl std::ops::Add for QueueStats { + type Output = Self; + + fn add(self, rhs: Self) -> Self::Output { + Self { num: self.num + rhs.num, size: self.size + rhs.size, gas: self.gas + rhs.gas } + } +} + +impl std::ops::Sub for QueueStats { + type Output = Self; + + fn sub(self, rhs: Self) -> Self::Output { + Self { num: self.num - rhs.num, size: self.size - rhs.size, gas: self.gas - rhs.gas } + } +} + +impl Sum for QueueStats { + fn sum>(iter: I) -> Self { + iter.reduce(std::ops::Add::add).unwrap_or_default() + } +} diff --git a/tools/congestion-model/src/evaluation/summary_table.rs b/tools/congestion-model/src/evaluation/summary_table.rs index 25178a9e4f2..a97eb7b31dd 100644 --- a/tools/congestion-model/src/evaluation/summary_table.rs +++ b/tools/congestion-model/src/evaluation/summary_table.rs @@ -1,27 +1,32 @@ -use crate::{Model, PGAS}; +use super::{GasThroughput, Progress, ShardQueueLengths}; +use crate::PGAS; pub fn print_summary_header() { println!( - "{:<25}{:<25}{:>25}{:>25}{:>25}", - "WORKLOAD", "STRATEGY", "BURNT GAS", "TRANSACTIONS FINISHED", "MAX QUEUE LEN", + "{:<25}{:<25}{:>25}{:>25}{:>16}{:>16}{:>16}", + "WORKLOAD", + "STRATEGY", + "BURNT GAS", + "TRANSACTIONS FINISHED", + "MAX QUEUE LEN", + "MAX QUEUE SIZE", + "MAX QUEUE PGAS", ); } -pub fn print_summary_row(model: &Model, workload: &str, strategy: &str) { - let queues = model.queue_lengths(); - let throughput = model.gas_throughput(); - let progress = model.progress(); - - let mut max_queue_len = 0; - for q in queues.values() { - let len = q.incoming_receipts + q.queued_receipts; - max_queue_len = len.max(max_queue_len); - } - +pub fn print_summary_row( + workload: &str, + strategy: &str, + progress: &Progress, + throughput: &GasThroughput, + max_queues: &ShardQueueLengths, +) { println!( - "{workload:<25}{strategy:<25}{:>20} PGas{:>25}{:>25}", + "{workload:<25}{strategy:<25}{:>20} PGas{:>25}{:>16}{:>16}{:>16}", throughput.total / PGAS, progress.finished_transactions, - max_queue_len + max_queues.queued_receipts.num, + bytesize::ByteSize::b(max_queues.queued_receipts.size), + max_queues.queued_receipts.gas / PGAS, ); } diff --git a/tools/congestion-model/src/lib.rs b/tools/congestion-model/src/lib.rs index d293a060a1d..e066561b17a 100644 --- a/tools/congestion-model/src/lib.rs +++ b/tools/congestion-model/src/lib.rs @@ -3,7 +3,9 @@ mod model; pub mod strategy; pub mod workload; -pub use evaluation::{summary_table, StatsWriter, TransactionStatus}; +pub use evaluation::{ + summary_table, QueueStats, ShardQueueLengths, StatsWriter, TransactionStatus, +}; pub use model::{Model, Queue, QueueId, Receipt, ShardId, TransactionId}; pub use strategy::CongestionStrategy; pub use workload::{ReceiptDefinition, ReceiptId, TransactionBuilder}; diff --git a/tools/congestion-model/src/main.rs b/tools/congestion-model/src/main.rs index 9e9be93c000..e14bdd3d2e6 100644 --- a/tools/congestion-model/src/main.rs +++ b/tools/congestion-model/src/main.rs @@ -1,11 +1,12 @@ use chrono::Utc; -use std::time::Duration; - use congestion_model::strategy::{GlobalTxStopShard, NewTxLast, NoQueueShard, SimpleBackpressure}; use congestion_model::workload::{ AllForOneProducer, BalancedProducer, LinearImbalanceProducer, Producer, }; -use congestion_model::{summary_table, CongestionStrategy, Model, StatsWriter, PGAS}; +use congestion_model::{ + summary_table, CongestionStrategy, Model, ShardQueueLengths, StatsWriter, PGAS, +}; +use std::time::Duration; use clap::Parser; @@ -96,6 +97,7 @@ fn run_model( let strategy = strategy(strategy_name, num_shards); let workload = workload(workload_name); let mut model = Model::new(strategy, workload); + let mut max_queues = ShardQueueLengths::default(); // Set the start time to an half hour ago to make it visible by default in // grafana. Each round is 1 virtual second so hald an hour is good for @@ -108,8 +110,15 @@ fn run_model( for round in 0..num_rounds { model.write_stats_values(&mut stats_writer, start_time, round); model.step(); + max_queues = max_queues.max_component_wise(&model.max_queue_length()); } - summary_table::print_summary_row(&model, workload_name, strategy_name); + summary_table::print_summary_row( + workload_name, + strategy_name, + &model.progress(), + &model.gas_throughput(), + &max_queues, + ); } fn normalize_cmdline_arg(value: &str) -> String { @@ -199,7 +208,7 @@ fn print_report(model: &Model) { println!("{:>6} transactions failed", progress.failed_transactions); for shard_id in model.shard_ids() { println!("SHARD {shard_id}"); - println!(" {:>6} receipts incoming", queues[shard_id].incoming_receipts); - println!(" {:>6} receipts queued", queues[shard_id].queued_receipts); + println!(" {:>6} receipts incoming", queues[shard_id].incoming_receipts.num); + println!(" {:>6} receipts queued", queues[shard_id].queued_receipts.num); } }