Skip to content

Commit

Permalink
congestion: Max size, gas and bytes in summary (near#10793)
Browse files Browse the repository at this point in the history
Instead of showing the snapshot of the model in the end, it is better to
display the max values throughout the model execution in the final
summary table.

Also add gas and byte values of queues, rather than just the number of
receipts.

This makes the summary table much more useful to quickly compare the
performance of many different strategies.
  • Loading branch information
jakmeier authored Mar 16, 2024
1 parent 61c67c6 commit ba29063
Show file tree
Hide file tree
Showing 7 changed files with 148 additions and 51 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions tools/congestion-model/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ license.workspace = true
publish = false

[dependencies]
bytesize.workspace = true
chrono.workspace = true
clap = { workspace = true, features = ["derive"] }
csv.workspace = true
Expand Down
31 changes: 3 additions & 28 deletions tools/congestion-model/src/evaluation/mod.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,13 @@
use chrono::{Duration, Utc};
pub use queue_lengths::{QueueStats, ShardQueueLengths};
pub use transaction_progress::TransactionStatus;

use crate::{GGas, Model, ShardId};
use std::collections::HashMap;
use crate::{GGas, Model};

mod queue_lengths;
pub mod summary_table;
mod transaction_progress;

#[derive(Debug, Clone)]
pub struct ShardQueueLengths {
pub unprocessed_incoming_transactions: u64,
pub incoming_receipts: u64,
pub queued_receipts: u64,
}

#[derive(Debug, Clone)]
pub struct GasThroughput {
pub total: GGas,
Expand All @@ -31,25 +25,6 @@ pub struct Progress {
pub type StatsWriter = Option<Box<csv::Writer<std::fs::File>>>;

impl Model {
pub fn queue_lengths(&self) -> HashMap<ShardId, ShardQueueLengths> {
let mut out = HashMap::new();
for shard in self.shard_ids.clone() {
let unprocessed_incoming_transactions =
self.queues.incoming_transactions(shard).len() as u64;
let incoming_receipts = self.queues.incoming_receipts(shard).len() as u64;
let total_shard_receipts: u64 =
self.queues.shard_queues(shard).map(|q| q.len() as u64).sum();

let shard_stats = ShardQueueLengths {
unprocessed_incoming_transactions,
incoming_receipts,
queued_receipts: total_shard_receipts - incoming_receipts,
};
out.insert(shard, shard_stats);
}
out
}

pub fn gas_throughput(&self) -> GasThroughput {
GasThroughput { total: self.transactions.all_transactions().map(|tx| tx.gas_burnt()).sum() }
}
Expand Down
104 changes: 104 additions & 0 deletions tools/congestion-model/src/evaluation/queue_lengths.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
use crate::{GGas, Model, Queue, ShardId};
use std::collections::HashMap;
use std::iter::Sum;

#[derive(Debug, Clone, Default)]
pub struct ShardQueueLengths {
/// Number of transactions waiting and not processed.
pub unprocessed_incoming_transactions: u64,
/// Receipts in the mailbox.
pub incoming_receipts: QueueStats,
/// Receipts in all internal queues plus the mailbox.
pub queued_receipts: QueueStats,
}

#[derive(Debug, Clone, Copy, Default)]
pub struct QueueStats {
/// Number of messages.
pub num: u64,
/// Sum of messages sizes in bytes.
pub size: u64,
/// Sum of attached gas for all messages in the queue.
pub gas: GGas,
}

impl Model {
/// Current queue lengths per shard.
pub fn queue_lengths(&self) -> HashMap<ShardId, ShardQueueLengths> {
let mut out = HashMap::new();
for shard in self.shard_ids.clone() {
let unprocessed_incoming_transactions =
self.queues.incoming_transactions(shard).len() as u64;
let incoming_receipts = self.queues.incoming_receipts(shard).stats();
let total_shard_receipts: QueueStats =
self.queues.shard_queues(shard).map(|q| q.stats()).sum();

let shard_stats = ShardQueueLengths {
unprocessed_incoming_transactions,
incoming_receipts,
queued_receipts: total_shard_receipts,
};
out.insert(shard, shard_stats);
}
out
}

/// Current max queue length stats.
pub fn max_queue_length(&self) -> ShardQueueLengths {
let mut out = ShardQueueLengths::default();
for q in self.queue_lengths().values() {
out = out.max_component_wise(q);
}
out
}
}

impl Queue {
pub fn stats(&self) -> QueueStats {
QueueStats { num: self.len() as u64, size: self.size(), gas: self.attached_gas() }
}
}

impl ShardQueueLengths {
pub fn max_component_wise(&self, rhs: &Self) -> Self {
Self {
unprocessed_incoming_transactions: self
.unprocessed_incoming_transactions
.max(rhs.unprocessed_incoming_transactions),
incoming_receipts: self.incoming_receipts.max_component_wise(rhs.incoming_receipts),
queued_receipts: self.queued_receipts.max_component_wise(rhs.queued_receipts),
}
}
}

impl QueueStats {
pub fn max_component_wise(&self, rhs: Self) -> Self {
Self {
num: self.num.max(rhs.num),
size: self.size.max(rhs.size),
gas: self.gas.max(rhs.gas),
}
}
}

impl std::ops::Add for QueueStats {
type Output = Self;

fn add(self, rhs: Self) -> Self::Output {
Self { num: self.num + rhs.num, size: self.size + rhs.size, gas: self.gas + rhs.gas }
}
}

impl std::ops::Sub for QueueStats {
type Output = Self;

fn sub(self, rhs: Self) -> Self::Output {
Self { num: self.num - rhs.num, size: self.size - rhs.size, gas: self.gas - rhs.gas }
}
}

impl Sum for QueueStats {
fn sum<I: Iterator<Item = Self>>(iter: I) -> Self {
iter.reduce(std::ops::Add::add).unwrap_or_default()
}
}
37 changes: 21 additions & 16 deletions tools/congestion-model/src/evaluation/summary_table.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,32 @@
use crate::{Model, PGAS};
use super::{GasThroughput, Progress, ShardQueueLengths};
use crate::PGAS;

pub fn print_summary_header() {
println!(
"{:<25}{:<25}{:>25}{:>25}{:>25}",
"WORKLOAD", "STRATEGY", "BURNT GAS", "TRANSACTIONS FINISHED", "MAX QUEUE LEN",
"{:<25}{:<25}{:>25}{:>25}{:>16}{:>16}{:>16}",
"WORKLOAD",
"STRATEGY",
"BURNT GAS",
"TRANSACTIONS FINISHED",
"MAX QUEUE LEN",
"MAX QUEUE SIZE",
"MAX QUEUE PGAS",
);
}

pub fn print_summary_row(model: &Model, workload: &str, strategy: &str) {
let queues = model.queue_lengths();
let throughput = model.gas_throughput();
let progress = model.progress();

let mut max_queue_len = 0;
for q in queues.values() {
let len = q.incoming_receipts + q.queued_receipts;
max_queue_len = len.max(max_queue_len);
}

pub fn print_summary_row(
workload: &str,
strategy: &str,
progress: &Progress,
throughput: &GasThroughput,
max_queues: &ShardQueueLengths,
) {
println!(
"{workload:<25}{strategy:<25}{:>20} PGas{:>25}{:>25}",
"{workload:<25}{strategy:<25}{:>20} PGas{:>25}{:>16}{:>16}{:>16}",
throughput.total / PGAS,
progress.finished_transactions,
max_queue_len
max_queues.queued_receipts.num,
bytesize::ByteSize::b(max_queues.queued_receipts.size),
max_queues.queued_receipts.gas / PGAS,
);
}
4 changes: 3 additions & 1 deletion tools/congestion-model/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ mod model;
pub mod strategy;
pub mod workload;

pub use evaluation::{summary_table, StatsWriter, TransactionStatus};
pub use evaluation::{
summary_table, QueueStats, ShardQueueLengths, StatsWriter, TransactionStatus,
};
pub use model::{Model, Queue, QueueId, Receipt, ShardId, TransactionId};
pub use strategy::CongestionStrategy;
pub use workload::{ReceiptDefinition, ReceiptId, TransactionBuilder};
Expand Down
21 changes: 15 additions & 6 deletions tools/congestion-model/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use chrono::Utc;
use std::time::Duration;

use congestion_model::strategy::{GlobalTxStopShard, NewTxLast, NoQueueShard, SimpleBackpressure};
use congestion_model::workload::{
AllForOneProducer, BalancedProducer, LinearImbalanceProducer, Producer,
};
use congestion_model::{summary_table, CongestionStrategy, Model, StatsWriter, PGAS};
use congestion_model::{
summary_table, CongestionStrategy, Model, ShardQueueLengths, StatsWriter, PGAS,
};
use std::time::Duration;

use clap::Parser;

Expand Down Expand Up @@ -96,6 +97,7 @@ fn run_model(
let strategy = strategy(strategy_name, num_shards);
let workload = workload(workload_name);
let mut model = Model::new(strategy, workload);
let mut max_queues = ShardQueueLengths::default();

// Set the start time to an half hour ago to make it visible by default in
// grafana. Each round is 1 virtual second so hald an hour is good for
Expand All @@ -108,8 +110,15 @@ fn run_model(
for round in 0..num_rounds {
model.write_stats_values(&mut stats_writer, start_time, round);
model.step();
max_queues = max_queues.max_component_wise(&model.max_queue_length());
}
summary_table::print_summary_row(&model, workload_name, strategy_name);
summary_table::print_summary_row(
workload_name,
strategy_name,
&model.progress(),
&model.gas_throughput(),
&max_queues,
);
}

fn normalize_cmdline_arg(value: &str) -> String {
Expand Down Expand Up @@ -199,7 +208,7 @@ fn print_report(model: &Model) {
println!("{:>6} transactions failed", progress.failed_transactions);
for shard_id in model.shard_ids() {
println!("SHARD {shard_id}");
println!(" {:>6} receipts incoming", queues[shard_id].incoming_receipts);
println!(" {:>6} receipts queued", queues[shard_id].queued_receipts);
println!(" {:>6} receipts incoming", queues[shard_id].incoming_receipts.num);
println!(" {:>6} receipts queued", queues[shard_id].queued_receipts.num);
}
}

0 comments on commit ba29063

Please sign in to comment.