Skip to content

Commit

Permalink
Dedupe acked packets from TwccSendRegister::apply_report()
Browse files Browse the repository at this point in the history
  • Loading branch information
alexlapa authored Jan 8, 2025
1 parent 621c91e commit 052e74f
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 56 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
* Do not disconnect whilst we still check new candidates #489
* Ensure lexical ordering of SDP-formatted candidates follows priority #557
* Limit TWCC iteration with packet status count #606
* Dedupe acked packets from `TwccSendRegister::apply_report()` #601, #605

# 0.6.2

Expand Down
181 changes: 132 additions & 49 deletions src/rtp/rtcp/twcc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ pub struct TwccRecvRegister {
///
/// Once the queue has some content, we will always keep at least one entry to "remember" for the
/// next report.
queue: VecDeque<Receiption>,
queue: VecDeque<Receipt>,

/// Index into queue from where we start reporting on next build_report().
report_from: usize,
Expand All @@ -252,7 +252,7 @@ pub struct TwccRecvRegister {
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
struct Receiption {
struct Receipt {
seq: SeqNo,
time: Instant,
}
Expand Down Expand Up @@ -294,7 +294,7 @@ impl TwccRecvRegister {
}
}

self.queue.insert(idx, Receiption { seq, time });
self.queue.insert(idx, Receipt { seq, time });

if idx < self.report_from {
self.report_from = idx;
Expand Down Expand Up @@ -500,7 +500,7 @@ impl TwccRecvRegister {
/// Interims are deltas between `Receiption` which is an intermediary format before
/// we populate the Twcc report.
fn build_interims(
queue: &VecDeque<Receiption>,
queue: &VecDeque<Receipt>,
report_from: usize,
base_seq: SeqNo,
base_time: Instant,
Expand Down Expand Up @@ -1005,6 +1005,10 @@ pub struct TwccSendRegister {
/// 0 offset for remote time in Twcc structs.
time_zero: Option<Instant>,

/// Counter of invocations of apply_report. Used to identify
/// which TwccSendRecord resulted from each invocation.
apply_report_counter: u64,

/// Last registered Twcc number.
last_registered: SeqNo,
}
Expand Down Expand Up @@ -1072,6 +1076,9 @@ pub struct TwccRecvReport {

/// The remote time the other side received the seq.
remote_recv_time: Option<Instant>,

/// The invocation count of apply_report(). Used for filtering.
apply_report_counter: u64,
}

impl TwccSendRegister {
Expand All @@ -1080,6 +1087,7 @@ impl TwccSendRegister {
keep,
queue: VecDeque::new(),
time_zero: None,
apply_report_counter: 0,
last_registered: 0.into(),
}
}
Expand All @@ -1102,13 +1110,20 @@ impl TwccSendRegister {

/// Apply a TWCC RTCP report.
///
/// Returns a range of the sequence numbers for the applied packets if the report was
/// successfully applied.
pub fn apply_report(&mut self, twcc: Twcc, now: Instant) -> Option<RangeInclusive<SeqNo>> {
/// Returns iterator over [`TwccSendRecord`]s included in the given [`Twcc`]
/// except for ones that was already acked and returned before.
pub fn apply_report(
&mut self,
twcc: Twcc,
now: Instant,
) -> Option<impl Iterator<Item = &TwccSendRecord>> {
if self.time_zero.is_none() {
self.time_zero = Some(now);
}

self.apply_report_counter += 1;
let apply_report_counter = self.apply_report_counter;

let time_zero = self.time_zero.unwrap();
let head_seq = self.queue.front().map(|r| r.seq)?;

Expand All @@ -1124,14 +1139,30 @@ impl TwccSendRegister {
now: Instant,
r: &mut TwccSendRecord,
seq: SeqNo,
instant: Option<Instant>,
remote_recv_time: Option<Instant>,
apply_report_counter: u64,
) -> bool {
if r.seq != seq {
return false;
}

let apply_report_counter = if let Some(rr) = r.recv_report {
// This packed was already acked and handled before so carry
// over previous apply_report_counter, so it won't be included
// in the current apply_report() call result.
rr.remote_recv_time
.map(|_| rr.apply_report_counter)
.unwrap_or_else(|| apply_report_counter)
} else {
apply_report_counter
};

// Carry over remote recv time if this packet was acked before.
let remote_recv_time = r.remote_recv_time().or(remote_recv_time);
let recv_report = TwccRecvReport {
local_recv_time: now,
remote_recv_time: instant,
remote_recv_time,
apply_report_counter,
};
r.recv_report = Some(recv_report);

Expand All @@ -1145,7 +1176,13 @@ impl TwccSendRegister {

let mut problematic_seq = None;

if !update(now, first_record, first_seq_no, first_instant) {
if !update(
now,
first_record,
first_seq_no,
first_instant,
apply_report_counter,
) {
problematic_seq = Some((first_record.seq, first_seq_no));
}

Expand All @@ -1155,7 +1192,7 @@ impl TwccSendRegister {
break;
}

if !update(now, record, seq, instant) {
if !update(now, record, seq, instant, apply_report_counter) {
problematic_seq = Some((record.seq, seq));
}
last_seq_no = seq;
Expand All @@ -1170,13 +1207,29 @@ impl TwccSendRegister {
);
}

Some(first_seq_no..=last_seq_no)
}
let first_index = self
.queue
.binary_search_by_key(&first_seq_no, |r| r.seq)
.expect("first_seq_no to be registered");

pub fn send_record(&self, seq: SeqNo) -> Option<&TwccSendRecord> {
let index = self.queue.binary_search_by_key(&seq, |r| r.seq).ok()?;
let range = first_seq_no..=last_seq_no;

Some(&self.queue[index])
Some(
TwccSendRecordsIter {
range: range.clone(),
index: first_index,
current: first_seq_no,
queue: &self.queue,
}
// We only want the records that were registered in this invocation of
// apply_report_counter(). This is to not double count in the BWE,
// which is the consumer of this returned iterator.
.filter(move |s| {
s.recv_report
.map(|r| r.apply_report_counter == apply_report_counter)
.unwrap_or_default()
}),
)
}

/// Calculate the egress loss for given time window.
Expand Down Expand Up @@ -1214,26 +1267,6 @@ impl TwccSendRegister {

Some((lost as f32) / (total as f32))
}

/// Get all send records in a range.
pub fn send_records(
&self,
range: RangeInclusive<SeqNo>,
) -> Option<impl Iterator<Item = &TwccSendRecord>> {
let first_index = self
.queue
.binary_search_by_key(range.start(), |r| r.seq)
.ok()?;

let current = *range.start();

Some(TwccSendRecordsIter {
range,
index: first_index,
current,
queue: &self.queue,
})
}
}

#[derive()]
Expand Down Expand Up @@ -1806,7 +1839,7 @@ mod test {
);
now = now + Duration::from_millis(35);

reg.apply_report(
let iter = reg.apply_report(
Twcc {
sender_ssrc: Ssrc::new(),
ssrc: Ssrc::new(),
Expand All @@ -1829,15 +1862,12 @@ mod test {
},
now,
);
let iter = iter.unwrap();

for seq in 25..=27 {
let record = reg
.send_record(seq.into())
.unwrap_or_else(|| panic!("Should have send record for seq {seq}"));

for record in iter {
assert!(
record.recv_report.is_some(),
"Report should have recorded recv_report for {seq}"
"Report should have recorded recv_report"
);
}
}
Expand Down Expand Up @@ -1962,7 +1992,7 @@ mod test {
now = now + Duration::from_micros(15);
}

let range = reg
let iter = reg
.apply_report(
Twcc {
sender_ssrc: Ssrc::new(),
Expand All @@ -1988,11 +2018,6 @@ mod test {
)
.expect("apply_report to return Some(_)");

assert_eq!(range, 0.into()..=7.into());

let iter = reg
.send_records(range)
.expect("send_records to return Some(_)");
assert_eq!(
iter.map(|r| *r.seq).collect::<Vec<_>>(),
vec![0, 1, 2, 3, 4, 5, 6, 7]
Expand All @@ -2009,6 +2034,7 @@ mod test {
}

now = now + Duration::from_millis(5);
#[allow(unused_must_use)]
reg.apply_report(

Check warning on line 2038 in src/rtp/rtcp/twcc.rs

View workflow job for this annotation

GitHub Actions / test (macos-latest, 1.71.1)

unused implementer of `Iterator` that must be used

Check warning on line 2038 in src/rtp/rtcp/twcc.rs

View workflow job for this annotation

GitHub Actions / test (ubuntu-latest, 1.71.1)

unused implementer of `Iterator` that must be used

Check warning on line 2038 in src/rtp/rtcp/twcc.rs

View workflow job for this annotation

GitHub Actions / test_wincrypto (windows-latest, 1.71.1)

unused implementer of `Iterator` that must be used

Check warning on line 2038 in src/rtp/rtcp/twcc.rs

View workflow job for this annotation

GitHub Actions / test (windows-latest, 1.71.1)

unused implementer of `Iterator` that must be used

Check warning on line 2038 in src/rtp/rtcp/twcc.rs

View workflow job for this annotation

GitHub Actions / test (macos-latest, 1.71.1)

unused implementer of `Iterator` that must be used

Check warning on line 2038 in src/rtp/rtcp/twcc.rs

View workflow job for this annotation

GitHub Actions / test (ubuntu-latest, 1.71.1)

unused implementer of `Iterator` that must be used

Check warning on line 2038 in src/rtp/rtcp/twcc.rs

View workflow job for this annotation

GitHub Actions / test_wincrypto (windows-latest, 1.71.1)

unused implementer of `Iterator` that must be used

Check warning on line 2038 in src/rtp/rtcp/twcc.rs

View workflow job for this annotation

GitHub Actions / test (windows-latest, 1.71.1)

unused implementer of `Iterator` that must be used
Twcc {
sender_ssrc: Ssrc::new(),
Expand Down Expand Up @@ -2077,4 +2103,61 @@ mod test {

assert_eq!(reg.loss(), Some(4.0 / 10.0));
}

#[test]
fn no_acked_duplicates_when_reordered() {
let now = Instant::now();
let mut twcc_gen = TwccRecvRegister::new(1000);
let mut twcc_handler = TwccSendRegister::new(1000);

twcc_handler.register_seq(1.into(), now + Duration::from_millis(1), 0);
twcc_handler.register_seq(2.into(), now + Duration::from_millis(2), 0);
twcc_handler.register_seq(3.into(), now + Duration::from_millis(3), 0);
twcc_handler.register_seq(4.into(), now + Duration::from_millis(4), 0);

{
let acked_packets = twcc_handler
.apply_report(
{
// 3rd packet is delayed
twcc_gen.update_seq(1.into(), now + Duration::from_millis(5));
twcc_gen.update_seq(2.into(), now + Duration::from_millis(6));
twcc_gen.update_seq(4.into(), now + Duration::from_millis(7));
twcc_gen.build_report(10_000).unwrap()
},
now + Duration::from_millis(8),
)
.unwrap()
.filter_map(|sr| sr.remote_recv_time().map(|_| sr.seq.as_u16()))
.collect::<Vec<_>>();

assert_eq!(acked_packets, [1, 2, 4]);
}

twcc_handler.register_seq(5.into(), now + Duration::from_millis(9), 0);
twcc_handler.register_seq(6.into(), now + Duration::from_millis(10), 0);
twcc_handler.register_seq(7.into(), now + Duration::from_millis(11), 0);

{
let acked_packets = twcc_handler
.apply_report(
{
// So the receipt order is 1, 2, 4, 3, 7
twcc_gen.update_seq(3.into(), now + Duration::from_millis(12));
twcc_gen.update_seq(7.into(), now + Duration::from_millis(13));
twcc_gen.build_report(10_000).unwrap()
},
now + Duration::from_millis(14),
)
.unwrap()
.filter_map(|sr| sr.remote_recv_time().map(|_| sr.seq.as_u16()))
.collect::<Vec<_>>();

// 3 was delayed before and is acked now
// 4 is excluded since it was already returned from the previous call
// [5, 6] are delayed/lost
// 7 is acked in the last report
assert_eq!(acked_packets, [3, 7]);
}
}
}
10 changes: 3 additions & 7 deletions src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -517,14 +517,10 @@ impl Session {
for fb in RtcpFb::from_rtcp(self.feedback_rx.drain(..)) {
if let RtcpFb::Twcc(twcc) = fb {
trace!("Handle TWCC: {:?}", twcc);
let range = self.twcc_tx_register.apply_report(twcc, now);
let maybe_records = self.twcc_tx_register.apply_report(twcc, now);

if let Some(bwe) = &mut self.bwe {
let records = range.and_then(|range| self.twcc_tx_register.send_records(range));

if let Some(records) = records {
bwe.update(records, now);
}
if let (Some(maybe_records), Some(bwe)) = (maybe_records, &mut self.bwe) {
bwe.update(maybe_records, now);
}
need_configure_pacer = true;

Expand Down

0 comments on commit 052e74f

Please sign in to comment.