Skip to content

Commit

Permalink
Use SpinWait to avoid busy-parking
Browse files Browse the repository at this point in the history
This may fix the concerns raised in #5.
  • Loading branch information
jonhoo committed Aug 29, 2016
1 parent 0b6fd12 commit d03547b
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 8 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "bus"
version = "1.0.1"
version = "1.1.0"

description = "A lock-free, bounded, single-producer, multi-consumer, broadcast channel."
readme = "README.md"
Expand All @@ -21,6 +21,7 @@ bench = []
[dependencies]
num_cpus = "0.2"
atomic-option = "0.1"
parking_lot_core = "0.1"

[profile.release]
debug=true
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ happening for the single-consumer case. For cases where cloning is expensive, `A
used instead.

In a single-producer, single-consumer setup (which is the only one that Bus and
`mpsc::sync_channel` both support), Bus gets ~1.5x the performance of `mpsc::sync_channel` on
`mpsc::sync_channel` both support), Bus gets ~2x the performance of `mpsc::sync_channel` on
my machine. YMMV. You can check your performance on Nightly using

```console
Expand Down
20 changes: 16 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! used instead.
//!
//! In a single-producer, single-consumer setup (which is the only one that Bus and
//! `mpsc::sync_channel` both support), Bus gets ~1.5x the performance of `mpsc::sync_channel` on
//! `mpsc::sync_channel` both support), Bus gets ~2x the performance of `mpsc::sync_channel` on
//! my machine. YMMV. You can check your performance on Nightly using
//!
//! ```console
Expand Down Expand Up @@ -118,6 +118,9 @@
extern crate atomic_option;
use atomic_option::AtomicOption;

extern crate parking_lot_core;
use parking_lot_core::SpinWait;

#[cfg(feature = "bench")]
extern crate test;

Expand Down Expand Up @@ -347,6 +350,9 @@ impl<T: Clone> Bus<T> {
// tail must also be free, which is simple enough to show by induction (exercise for the
// reader).
let fence = (tail + 1) % self.state.len;

// to avoid parking when a slot frees up quickly, we use an exponential back-off SpinWait.
let mut sw = SpinWait::new();
loop {
let fence_read = self.state.ring[fence].read.load(atomic::Ordering::Acquire);

Expand Down Expand Up @@ -386,8 +392,10 @@ impl<T: Clone> Bus<T> {
// need the atomic fetch_add to ensure reader threads will see the new .waiting
self.state.ring[fence].read.fetch_add(0, atomic::Ordering::Release);

// wait to be unparked, and retry
thread::park_timeout(Duration::new(0, 1000));
if !sw.spin() {
// not likely to get a slow soon -- wait to be unparked instead
thread::park_timeout(Duration::new(0, 1000));
}
continue;
} else {
// no, and blocking isn't allowed, so return an error
Expand Down Expand Up @@ -574,6 +582,7 @@ impl<T: Clone> BusReader<T> {
}

let mut was_closed = false;
let mut sw = SpinWait::new();
loop {
use std::time::Duration;

Expand Down Expand Up @@ -610,7 +619,10 @@ impl<T: Clone> BusReader<T> {
// in particular, we may also have missed updates
unimplemented!();
}
thread::park_timeout(Duration::new(0, 1000));

if !sw.spin() {
thread::park_timeout(Duration::new(0, 1000));
}
}

let head = self.head;
Expand Down
11 changes: 9 additions & 2 deletions tests/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
extern crate bus;

use std::sync::mpsc;
use std::time;

#[test]
fn it_works() {
Expand Down Expand Up @@ -165,8 +166,10 @@ fn it_can_count_to_10000() {
fn test_busy() {
use std::thread;

// start a bus with limited space
let mut bus = bus::Bus::new(1);

// first receiver only receives 5 items
let mut rx1 = bus.add_rx();
let t1 = thread::spawn(move || {
for _ in 0..5 {
Expand All @@ -175,6 +178,7 @@ fn test_busy() {
drop(rx1);
});

// second receiver receives 10 items
let mut rx2 = bus.add_rx();
let t2 = thread::spawn(move || {
for _ in 0..10 {
Expand All @@ -183,16 +187,19 @@ fn test_busy() {
drop(rx2);
});

std::thread::sleep_ms(500);
// let receivers start
std::thread::sleep(time::Duration::from_millis(500));

// try to send 25 items -- should work fine
for i in 0..25 {
std::thread::sleep_ms(500);
std::thread::sleep(time::Duration::from_millis(100));
match bus.try_broadcast(i) {
Ok(_) => (),
Err(e) => println!("Broadcast failed {}", e),
}
}

// done sending -- wait for receivers (which should already be done)
t1.join().unwrap();
t2.join().unwrap();
assert!(true);
Expand Down

0 comments on commit d03547b

Please sign in to comment.