diff --git a/Cargo.lock b/Cargo.lock index bf2d98f..a513236 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "arbitrary" @@ -277,6 +277,10 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "fairly_unsafe_cell" +version = "0.1.0" + [[package]] name = "fastrand" version = "2.1.0" @@ -626,9 +630,9 @@ checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" [[package]] name = "ufotofu" -version = "0.6.0" +version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "48cbfe3042f0626ef4f7ef42e31bad3e0bd4015704499cc6c50bc5d9aad2fd7f" +checksum = "18477a01bdbde8764a22a74101adfb24c5b90df18ff0460903bcdc2b96aba913" dependencies = [ "arbitrary", "either", @@ -670,8 +674,10 @@ checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" name = "wb_async_utils" version = "0.1.0" dependencies = [ - "async_cell", "either", + "fairly_unsafe_cell", + "futures", + "pollster", "ufotofu", "ufotofu_queues", ] @@ -723,6 +729,7 @@ dependencies = [ "compact_u64", "earthstar", "either", + "futures", "libfuzzer-sys", "meadowcap", "pollster", @@ -731,6 +738,8 @@ dependencies = [ "ufotofu", "ufotofu_codec", "ufotofu_codec_endian", + "ufotofu_queues", + "wb_async_utils", "willow-data-model", "willow-encoding", ] diff --git a/Cargo.toml b/Cargo.toml index cb0e4c1..daaeda3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ members = [ "data-model", "earthstar", "encoding", + "fairly_unsafe_cell", "fuzz", "lcmux", "meadowcap", diff --git a/fairly_unsafe_cell/Cargo.toml b/fairly_unsafe_cell/Cargo.toml new file mode 100644 index 0000000..4d45f5d --- /dev/null +++ b/fairly_unsafe_cell/Cargo.toml @@ -0,0 +1,7 @@ +[package] +name = "fairly_unsafe_cell" +version = "0.1.0" +edition = "2021" + +[lints] +workspace = true diff --git a/fairly_unsafe_cell/README.md b/fairly_unsafe_cell/README.md new file mode 100644 index 0000000..0a593a6 --- /dev/null +++ b/fairly_unsafe_cell/README.md @@ -0,0 +1,3 @@ +# Fairly Unsafe Cell + +A hybrid between an `UnsafeCell` and a `RefCell`: comes with an unsafe but RefCell-like API that panics in test builds (`#[cfg(test)]`) when mutable access is not exclusive, but has no overhead (and allows for UB) in non-test builds. \ No newline at end of file diff --git a/fairly_unsafe_cell/src/checked.rs b/fairly_unsafe_cell/src/checked.rs new file mode 100644 index 0000000..a8a8fc4 --- /dev/null +++ b/fairly_unsafe_cell/src/checked.rs @@ -0,0 +1,507 @@ +use core::{ + cell::RefCell, + fmt, + ops::{Deref, DerefMut}, +}; + +/// A cell that unsafely grants mutable access to its wrapped value. In test builds (`#[cfg(test)]`), it performs runtime checks to panic when mutable access is not exclusive. In non-test builds, it has no runtime overhead and silently allows for undefined behaviour. +#[derive(Debug)] +pub struct FairlyUnsafeCell(RefCell); + +impl FairlyUnsafeCell { + /// Creates a new `FairlyUnsafeCell` containing `value`. + /// + /// ``` + /// use fairly_unsafe_cell::*; + /// + /// let c = FairlyUnsafeCell::new(5); + /// ``` + #[inline] + pub const fn new(value: T) -> Self { + Self(RefCell::new(value)) + } + + /// Consumes the `FairlyUnsafeCell`, returning the wrapped value. + /// + /// ``` + /// use fairly_unsafe_cell::*; + /// + /// let c = FairlyUnsafeCell::new(5); + /// assert_eq!(c.into_inner(), 5); + /// ``` + #[inline] + pub fn into_inner(self) -> T { + self.0.into_inner() + } + + /// Replaces the wrapped value with a new one, returning the old value, + /// without deinitializing either one. + /// + /// This function corresponds to [`core::mem::replace`]. + /// + /// # Safety + /// + /// UB if the value is currently borrowed. Will panic instead of causing UB if `#[cfg(test)]`. + /// + /// # Examples + /// + /// ``` + /// use fairly_unsafe_cell::*; + /// + /// let cell = FairlyUnsafeCell::new(5); + /// let old_value = unsafe { cell.replace(6) }; + /// assert_eq!(old_value, 5); + /// assert_eq!(cell.into_inner(), 6); + /// ``` + #[inline] + pub unsafe fn replace(&self, t: T) -> T { + self.0.replace(t) + } + + /// Replaces the wrapped value with a new one computed from `f`, returning + /// the old value, without deinitializing either one. + /// + /// # Safety + /// + /// UB if the value is currently borrowed. Will panic instead of causing UB if `#[cfg(test)]`. + /// + /// # Examples + /// + /// ``` + /// use fairly_unsafe_cell::*; + /// + /// let cell = FairlyUnsafeCell::new(5); + /// let old_value = unsafe { cell.replace_with(|&mut old| old + 1) }; + /// assert_eq!(old_value, 5); + /// assert_eq!(cell.into_inner(), 6); + /// ``` + #[inline] + pub unsafe fn replace_with T>(&self, f: F) -> T { + self.0.replace_with(f) + } + + /// Swaps the wrapped value of `self` with the wrapped value of `other`, + /// without deinitializing either one. + /// + /// This function corresponds to [`core::mem::swap`]. + /// + /// # Safety + /// + /// UB if the value in either `FairlyUnsafeCell` is currently borrowed, or + /// if `self` and `other` point to the same `FairlyUnsafeCell`. + /// Will panic instead of causing UB if `#[cfg(test)]`. + /// + /// # Examples + /// + /// ``` + /// use fairly_unsafe_cell::*; + /// + /// let c = FairlyUnsafeCell::new(5); + /// let d = FairlyUnsafeCell::new(6); + /// unsafe { c.swap(&d) }; + /// assert_eq!(c.into_inner(), 6); + /// assert_eq!(d.into_inner(), 5); + /// ``` + #[inline] + pub unsafe fn swap(&self, other: &Self) { + self.0.swap(&other.0) + } +} + +impl FairlyUnsafeCell { + /// Immutably borrows the wrapped value. + /// + /// The borrow lasts until the returned `Ref` exits scope. Multiple + /// immutable borrows can be taken out at the same time. + /// + /// # Safety + /// + /// UB if the value is currently mutably borrowed. + /// Will panic instead of causing UB if `#[cfg(test)]`. + /// + /// # Examples + /// + /// ``` + /// use fairly_unsafe_cell::*; + /// + /// let c = FairlyUnsafeCell::new(5); + /// + /// unsafe { + /// let borrowed_five = c.borrow(); + /// let borrowed_five2 = c.borrow(); + /// } + /// ``` + /// + /// An example of UB (and a panic if `#[cfg(test)]`): + /// + /// ```should_panic + /// use fairly_unsafe_cell::*; + /// + /// let c = FairlyUnsafeCell::new(5); + /// + /// unsafe { + /// let m = c.borrow_mut(); + /// let b = c.borrow(); // this causes UB in non-test code, and a panic in test code + /// } + /// ``` + #[inline] + pub unsafe fn borrow(&self) -> Ref<'_, T> { + Ref(self.0.borrow()) + } + + /// Mutably borrows the wrapped value. + /// + /// The borrow lasts until the returned `RefMut` or all `RefMut`s derived + /// from it exit scope. The value cannot be borrowed while this borrow is + /// active. + /// + /// # Safety + /// + /// UB if the value is currently borrowed. + /// Will panic instead of causing UB if `#[cfg(test)]`. + /// + /// # Examples + /// + /// ``` + /// use fairly_unsafe_cell::*; + /// + /// let c = FairlyUnsafeCell::new("hello".to_owned()); + /// + /// unsafe { + /// *c.borrow_mut() = "bonjour".to_owned(); + /// } + /// + /// assert_eq!(unsafe { &*c.borrow() }, "bonjour"); + /// ``` + /// + /// An example of UB (and a panic if `#[cfg(test)]`): + /// + /// ```should_panic + /// use fairly_unsafe_cell::*; + /// + /// let c = FairlyUnsafeCell::new(5); + /// unsafe { + /// let m = c.borrow(); /// + /// let b = c.borrow_mut(); // this causes UB in non-test code, and a panic in test code + /// } + /// ``` + #[inline] + pub unsafe fn borrow_mut(&self) -> RefMut<'_, T> { + RefMut(self.0.borrow_mut()) + } + + /// Returns a raw pointer to the underlying data in this cell. + /// + /// # Examples + /// + /// ``` + /// use fairly_unsafe_cell::*; + /// + /// let c = FairlyUnsafeCell::new(5); + /// + /// let ptr = c.as_ptr(); + /// ``` + #[inline] + pub fn as_ptr(&self) -> *mut T { + self.0.as_ptr() + } + + /// Returns a mutable reference to the underlying data. + /// + /// This call borrows the `UnsafeCell` mutably (at compile-time) which + /// guarantees that we possess the only reference. + /// + /// # Examples + /// + /// ``` + /// use fairly_unsafe_cell::*; + /// + /// let mut c = FairlyUnsafeCell::new(5); + /// *c.get_mut() += 1; + /// + /// assert_eq!(unsafe { *c.borrow() }, 6); + /// ``` + #[inline(always)] + pub fn get_mut(&mut self) -> &mut T { + self.0.get_mut() + } +} + +impl Default for FairlyUnsafeCell { + /// Creates a `FairlyUnsafeCell`, with the `Default` value for T. + #[inline] + fn default() -> FairlyUnsafeCell { + Self(RefCell::default()) + } +} + +impl From for FairlyUnsafeCell { + /// Creates a new `FairlyUnsafeCell` containing the given value. + fn from(t: T) -> FairlyUnsafeCell { + Self(RefCell::from(t)) + } +} + +/// A wrapper type for an immutably borrowed value from a `FairlyUnsafeCell`. +/// +/// See the [module-level documentation](crate) for more. +#[derive(Debug)] +pub struct Ref<'b, T>(core::cell::Ref<'b, T>) +where + T: 'b + ?Sized; + +impl<'b, T: ?Sized> Ref<'b, T> { + /// Copies a `Ref`. + /// + /// The `FairlyUnsafeCell` is already immutably borrowed, so this cannot introduce UB where there was none before. + /// + /// This is an associated function that needs to be used as + /// `Ref::clone(...)`. A `Clone` implementation or a method would interfere + /// with the widespread use of `r.borrow().clone()` to clone the contents of + /// a `FairlyUnsafeCell`. + #[must_use] + #[inline] + pub fn clone(orig: &Ref<'b, T>) -> Ref<'b, T> { + Ref(core::cell::Ref::clone(&orig.0)) + } + + /// Makes a new `Ref` for a component of the borrowed data. + /// + /// The `FairlyUnsafeCell` is already immutably borrowed, so this cannot introduce UB where there was none before. + /// + /// This is an associated function that needs to be used as `Ref::map(...)`. + /// A method would interfere with methods of the same name on the contents + /// of a `FairlyUnsafeCell` used through `Deref`. + /// + /// # Examples + /// + /// ``` + /// use fairly_unsafe_cell::*; + /// + /// let c = FairlyUnsafeCell::new((5, 'b')); + /// let b1: Ref<'_, (u32, char)> = unsafe { c.borrow() }; + /// let b2: Ref<'_, u32> = Ref::map(b1, |t| &t.0); + /// assert_eq!(*b2, 5) + /// ``` + #[inline] + pub fn map(orig: Ref<'b, T>, f: F) -> Ref<'b, U> + where + F: FnOnce(&T) -> &U, + { + Ref(core::cell::Ref::map(orig.0, f)) + } + + /// Makes a new `Ref` for an optional component of the borrowed data. The + /// original guard is returned as an `Err(..)` if the closure returns + /// `None`. + /// + /// The `FairlyUnsafeCell` is already immutably borrowed, so this cannot introduce UB where there was none before. + /// + /// This is an associated function that needs to be used as + /// `Ref::filter_map(...)`. A method would interfere with methods of the same + /// name on the contents of a `FairlyUnsafeCell` used through `Deref`. + /// + /// # Examples + /// + /// ``` + /// use fairly_unsafe_cell::*; + /// + /// let c = FairlyUnsafeCell::new(vec![1, 2, 3]); + /// let b1: Ref<'_, Vec> = unsafe { c.borrow() }; + /// let b2: Result, _> = Ref::filter_map(b1, |v| v.get(1)); + /// assert_eq!(*b2.unwrap(), 2); + /// ``` + #[inline] + pub fn filter_map(orig: Ref<'b, T>, f: F) -> Result, Self> + where + F: FnOnce(&T) -> Option<&U>, + { + match core::cell::Ref::filter_map(orig.0, f) { + Ok(yay) => Ok(Ref(yay)), + Err(nay) => Err(Ref(nay)), + } + } + + /// Splits a `Ref` into multiple `Ref`s for different components of the + /// borrowed data. + /// + /// The `FairlyUnsafeCell` is already immutably borrowed, so this cannot introduce UB where there was none before. + /// + /// This is an associated function that needs to be used as + /// `Ref::map_split(...)`. A method would interfere with methods of the same + /// name on the contents of a `FairlyUnsafeCell` used through `Deref`. + /// + /// # Examples + /// + /// ``` + /// use fairly_unsafe_cell::*; + /// + /// let cell = FairlyUnsafeCell::new([1, 2, 3, 4]); + /// let borrow = unsafe { cell.borrow() }; + /// let (begin, end) = Ref::map_split(borrow, |slice| slice.split_at(2)); + /// assert_eq!(*begin, [1, 2]); + /// assert_eq!(*end, [3, 4]); + /// ``` + #[inline] + pub fn map_split(orig: Ref<'b, T>, f: F) -> (Ref<'b, U>, Ref<'b, V>) + where + F: FnOnce(&T) -> (&U, &V), + { + let (a, b) = core::cell::Ref::map_split(orig.0, f); + (Ref(a), Ref(b)) + } +} + +impl Deref for Ref<'_, T> { + type Target = T; + + #[inline] + fn deref(&self) -> &T { + self.0.deref() + } +} + +impl fmt::Display for Ref<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } +} + +/// A wrapper type for a mutably borrowed value from a `FairlyUnsafeCell`. +/// +/// See the [module-level documentation](crate) for more. +#[derive(Debug)] +pub struct RefMut<'b, T>(core::cell::RefMut<'b, T>) +where + T: 'b + ?Sized; + +impl<'b, T: ?Sized> RefMut<'b, T> { + /// Makes a new `RefMut` for a component of the borrowed data, e.g., an enum + /// variant. + /// + /// The `FairlyUnsafeCell` is already mutably borrowed, so this cannot fail. + /// + /// This is an associated function that needs to be used as + /// `RefMut::map(...)`. A method would interfere with methods of the same + /// name on the contents of a `FairlyUnsafeCell` used through `Deref`. + /// + /// # Examples + /// + /// ``` + /// use fairly_unsafe_cell::*; + /// + /// let c = FairlyUnsafeCell::new((5, 'b')); + /// { + /// let b1: RefMut<'_, (u32, char)> = unsafe { c.borrow_mut() }; + /// let mut b2: RefMut<'_, u32> = RefMut::map(b1, |t| &mut t.0); + /// assert_eq!(*b2, 5); + /// *b2 = 42; + /// } + /// assert_eq!(unsafe { *c.borrow() }, (42, 'b')); + /// ``` + #[inline] + pub fn map(orig: RefMut<'b, T>, f: F) -> RefMut<'b, U> + where + F: FnOnce(&mut T) -> &mut U, + { + RefMut(core::cell::RefMut::map(orig.0, f)) + } + + // /// Makes a new `RefMut` for an optional component of the borrowed data. The + // /// original guard is returned as an `Err(..)` if the closure returns + // /// `None`. + // /// + // /// The `FairlyUnsafeCell` is already mutably borrowed, so this cannot fail. + // /// + // /// This is an associated function that needs to be used as + // /// `RefMut::filter_map(...)`. A method would interfere with methods of the + // /// same name on the contents of a `FairlyUnsafeCell` used through `Deref`. + // /// + // /// # Examples + // /// + // /// ``` + // /// use fairly_unsafe_cell::*; + // /// + // /// let c = FairlyUnsafeCell::new(vec![1, 2, 3]); + // /// + // /// { + // /// let b1: RefMut<'_, Vec> = unsafe { c.borrow_mut() }; + // /// let mut b2: Result, _> = RefMut::filter_map(b1, |v| v.get_mut(1)); + // /// + // /// if let Ok(mut b2) = b2 { + // /// *b2 += 2; + // /// } + // /// } + // /// + // /// assert_eq!(* unsafe { c.borrow() }, vec![1, 4, 3]); + // /// ``` + // #[inline] + // pub fn filter_map(orig: RefMut<'b, T>, f: F) -> Result, Self> + // where + // F: FnOnce(&mut T) -> Option<&mut U>, + // { + // match core::cell::RefMut::filter_map(orig.0, f) { + // Ok(yay) => Ok(RefMut(yay)), + // Err(nay) => Err(RefMut(nay)), + // } + // } + + /// Splits a `RefMut` into multiple `RefMut`s for different components of the + /// borrowed data. + /// + /// The underlying `FairlyUnsafeCell` will remain mutably borrowed until both + /// returned `RefMut`s go out of scope. + /// + /// The `FairlyUnsafeCell` is already mutably borrowed, so this cannot fail. + /// + /// This is an associated function that needs to be used as + /// `RefMut::map_split(...)`. A method would interfere with methods of the + /// same name on the contents of a `FairlyUnsafeCell` used through `Deref`. + /// + /// # Examples + /// + /// ``` + /// use fairly_unsafe_cell::*; + /// + /// let cell = FairlyUnsafeCell::new([1, 2, 3, 4]); + /// let borrow = unsafe { cell.borrow_mut() }; + /// let (mut begin, mut end) = RefMut::map_split(borrow, |slice| slice.split_at_mut(2)); + /// assert_eq!(*begin, [1, 2]); + /// assert_eq!(*end, [3, 4]); + /// begin.copy_from_slice(&[4, 3]); + /// end.copy_from_slice(&[2, 1]); + /// ``` + #[inline] + pub fn map_split( + orig: RefMut<'b, T>, + f: F, + ) -> (RefMut<'b, U>, RefMut<'b, V>) + where + F: FnOnce(&mut T) -> (&mut U, &mut V), + { + let (a, b) = core::cell::RefMut::map_split(orig.0, f); + (RefMut(a), RefMut(b)) + } +} + +impl Deref for RefMut<'_, T> { + type Target = T; + + #[inline] + fn deref(&self) -> &T { + self.0.deref() + } +} + +impl DerefMut for RefMut<'_, T> { + #[inline] + fn deref_mut(&mut self) -> &mut T { + self.0.deref_mut() + } +} + +impl fmt::Display for RefMut<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } +} diff --git a/fairly_unsafe_cell/src/lib.rs b/fairly_unsafe_cell/src/lib.rs new file mode 100644 index 0000000..60bd9b4 --- /dev/null +++ b/fairly_unsafe_cell/src/lib.rs @@ -0,0 +1,14 @@ +#![no_std] +#![allow(clippy::should_implement_trait)] + +//! A hybrid between an [`UnsafeCell`](core::cell::UnsafeCell) and a [`RefCell`](core::cell::RefCell): comes with a [`RefCell`](core::cell::RefCell)-like but unsafe API that panics in test builds (`#[cfg(test)]`) when mutable access is not exclusive, but has no overhead (and allows for UB) in non-test builds. + +#[cfg(test)] +mod checked; +#[cfg(test)] +pub use checked::*; + +#[cfg(not(test))] +mod unchecked; +#[cfg(not(test))] +pub use unchecked::*; diff --git a/fairly_unsafe_cell/src/unchecked.rs b/fairly_unsafe_cell/src/unchecked.rs new file mode 100644 index 0000000..37725a2 --- /dev/null +++ b/fairly_unsafe_cell/src/unchecked.rs @@ -0,0 +1,485 @@ +use core::{ + cell::UnsafeCell, + fmt, mem, + ops::{Deref, DerefMut}, +}; + +/// A cell that unsafely grants mutable access to its wrapped value. In test builds (`#[cfg(test)]`), it performs runtime checks to panic when mutable access is not exclusive. In non-test builds, it has no runtime overhead and silently allows for undefined behaviour. +#[derive(Debug)] +pub struct FairlyUnsafeCell(UnsafeCell); + +impl FairlyUnsafeCell { + /// Creates a new `FairlyUnsafeCell` containing `value`. + /// + /// ``` + /// use fairly_unsafe_cell::*; + /// + /// let c = FairlyUnsafeCell::new(5); + /// ``` + #[inline] + pub const fn new(value: T) -> Self { + Self(UnsafeCell::new(value)) + } + + /// Consumes the `FairlyUnsafeCell`, returning the wrapped value. + /// + /// ``` + /// use fairly_unsafe_cell::*; + /// + /// let c = FairlyUnsafeCell::new(5); + /// assert_eq!(c.into_inner(), 5); + /// ``` + #[inline] + pub fn into_inner(self) -> T { + self.0.into_inner() + } + + /// Replaces the wrapped value with a new one, returning the old value, + /// without deinitializing either one. + /// + /// This function corresponds to [`core::mem::replace`]. + /// + /// # Safety + /// + /// UB if the value is currently borrowed. Will panic instead of causing UB if `#[cfg(test)]`. + /// + /// # Examples + /// + /// ``` + /// use fairly_unsafe_cell::*; + /// + /// let cell = FairlyUnsafeCell::new(5); + /// let old_value = unsafe { cell.replace(6) }; + /// assert_eq!(old_value, 5); + /// assert_eq!(cell.into_inner(), 6); + /// ``` + #[inline] + pub unsafe fn replace(&self, t: T) -> T { + mem::replace(&mut *self.borrow_mut(), t) + } + + /// Replaces the wrapped value with a new one computed from `f`, returning + /// the old value, without deinitializing either one. + /// + /// # Safety + /// + /// UB if the value is currently borrowed. Will panic instead of causing UB if `#[cfg(test)]`. + /// + /// # Examples + /// + /// ``` + /// use fairly_unsafe_cell::*; + /// + /// let cell = FairlyUnsafeCell::new(5); + /// let old_value = unsafe { cell.replace_with(|&mut old| old + 1) }; + /// assert_eq!(old_value, 5); + /// assert_eq!(cell.into_inner(), 6); + /// ``` + #[inline] + pub unsafe fn replace_with T>(&self, f: F) -> T { + let mut_borrow = &mut *self.borrow_mut(); + let replacement = f(mut_borrow); + mem::replace(mut_borrow, replacement) + } + + /// Swaps the wrapped value of `self` with the wrapped value of `other`, + /// without deinitializing either one. + /// + /// This function corresponds to [`core::mem::swap`]. + /// + /// # Safety + /// + /// UB if the value in either `FairlyUnsafeCell` is currently borrowed, or + /// if `self` and `other` point to the same `FairlyUnsafeCell`. + /// Will panic instead of causing UB if `#[cfg(test)]`. + /// + /// # Examples + /// + /// ``` + /// use fairly_unsafe_cell::*; + /// + /// let c = FairlyUnsafeCell::new(5); + /// let d = FairlyUnsafeCell::new(6); + /// unsafe { c.swap(&d) }; + /// assert_eq!(c.into_inner(), 6); + /// assert_eq!(d.into_inner(), 5); + /// ``` + #[inline] + pub unsafe fn swap(&self, other: &Self) { + mem::swap(&mut *self.borrow_mut(), &mut *other.borrow_mut()) + } +} + +impl FairlyUnsafeCell { + /// Immutably borrows the wrapped value. + /// + /// The borrow lasts until the returned `Ref` exits scope. Multiple + /// immutable borrows can be taken out at the same time. + /// + /// # Safety + /// + /// UB if the value is currently mutably borrowed. + /// Will panic instead of causing UB if `#[cfg(test)]`. + /// + /// # Examples + /// + /// ``` + /// use fairly_unsafe_cell::*; + /// + /// let c = FairlyUnsafeCell::new(5); + /// + /// unsafe { + /// let borrowed_five = c.borrow(); + /// let borrowed_five2 = c.borrow(); + /// } + /// ``` + #[inline] + pub unsafe fn borrow(&self) -> Ref<'_, T> { + Ref(unsafe { &*self.0.get() }) + } + + /// Mutably borrows the wrapped value. + /// + /// The borrow lasts until the returned `RefMut` or all `RefMut`s derived + /// from it exit scope. The value cannot be borrowed while this borrow is + /// active. + /// + /// # Safety + /// + /// UB if the value is currently borrowed. + /// Will panic instead of causing UB if `#[cfg(test)]`. + /// + /// # Examples + /// + /// ``` + /// use fairly_unsafe_cell::*; + /// + /// let c = FairlyUnsafeCell::new("hello".to_owned()); + /// + /// unsafe { + /// *c.borrow_mut() = "bonjour".to_owned(); + /// } + /// + /// assert_eq!(unsafe { &*c.borrow() }, "bonjour"); + /// ``` + #[inline] + pub unsafe fn borrow_mut(&self) -> RefMut<'_, T> { + RefMut(unsafe { &mut *self.0.get() }) + } + + /// Returns a raw pointer to the underlying data in this cell. + /// + /// # Examples + /// + /// ``` + /// use fairly_unsafe_cell::*; + /// + /// let c = FairlyUnsafeCell::new(5); + /// + /// let ptr = c.as_ptr(); + /// ``` + #[inline] + pub fn as_ptr(&self) -> *mut T { + self.0.get() + } + + /// Returns a mutable reference to the underlying data. + /// + /// This call borrows the `UnsafeCell` mutably (at compile-time) which + /// guarantees that we possess the only reference. + /// + /// # Examples + /// + /// ``` + /// use fairly_unsafe_cell::*; + /// + /// let mut c = FairlyUnsafeCell::new(5); + /// *c.get_mut() += 1; + /// + /// assert_eq!(unsafe { *c.borrow() }, 6); + /// ``` + #[inline(always)] + pub fn get_mut(&mut self) -> &mut T { + self.0.get_mut() + } +} + +impl Default for FairlyUnsafeCell { + /// Creates a `FairlyUnsafeCell`, with the `Default` value for T. + #[inline] + fn default() -> FairlyUnsafeCell { + Self(UnsafeCell::default()) + } +} + +impl From for FairlyUnsafeCell { + /// Creates a new `FairlyUnsafeCell` containing the given value. + fn from(t: T) -> FairlyUnsafeCell { + Self(UnsafeCell::from(t)) + } +} + +/// A wrapper type for an immutably borrowed value from a `FairlyUnsafeCell`. +/// +/// See the [module-level documentation](crate) for more. +#[derive(Debug)] +pub struct Ref<'b, T>(&'b T) +where + T: 'b + ?Sized; + +impl<'b, T: ?Sized> Ref<'b, T> { + /// Copies a `Ref`. + /// + /// The `FairlyUnsafeCell` is already immutably borrowed, so this cannot introduce UB where there was none before. + /// + /// This is an associated function that needs to be used as + /// `Ref::clone(...)`. A `Clone` implementation or a method would interfere + /// with the widespread use of `r.borrow().clone()` to clone the contents of + /// a `FairlyUnsafeCell`. + #[must_use] + #[inline] + pub fn clone(orig: &Ref<'b, T>) -> Ref<'b, T> { + Ref(orig.0) + } + + /// Makes a new `Ref` for a component of the borrowed data. + /// + /// The `FairlyUnsafeCell` is already immutably borrowed, so this cannot introduce UB where there was none before. + /// + /// This is an associated function that needs to be used as `Ref::map(...)`. + /// A method would interfere with methods of the same name on the contents + /// of a `FairlyUnsafeCell` used through `Deref`. + /// + /// # Examples + /// + /// ``` + /// use fairly_unsafe_cell::*; + /// + /// let c = FairlyUnsafeCell::new((5, 'b')); + /// let b1: Ref<'_, (u32, char)> = unsafe { c.borrow() }; + /// let b2: Ref<'_, u32> = Ref::map(b1, |t| &t.0); + /// assert_eq!(*b2, 5) + /// ``` + #[inline] + pub fn map(orig: Ref<'b, T>, f: F) -> Ref<'b, U> + where + F: FnOnce(&T) -> &U, + { + Ref(f(orig.0)) + } + + /// Makes a new `Ref` for an optional component of the borrowed data. The + /// original guard is returned as an `Err(..)` if the closure returns + /// `None`. + /// + /// The `FairlyUnsafeCell` is already immutably borrowed, so this cannot introduce UB where there was none before. + /// + /// This is an associated function that needs to be used as + /// `Ref::filter_map(...)`. A method would interfere with methods of the same + /// name on the contents of a `FairlyUnsafeCell` used through `Deref`. + /// + /// # Examples + /// + /// ``` + /// use fairly_unsafe_cell::*; + /// + /// let c = FairlyUnsafeCell::new(vec![1, 2, 3]); + /// let b1: Ref<'_, Vec> = unsafe { c.borrow() }; + /// let b2: Result, _> = Ref::filter_map(b1, |v| v.get(1)); + /// assert_eq!(*b2.unwrap(), 2); + /// ``` + #[inline] + pub fn filter_map(orig: Ref<'b, T>, f: F) -> Result, Self> + where + F: FnOnce(&T) -> Option<&U>, + { + match f(orig.0) { + Some(yay) => Ok(Ref(yay)), + None => Err(orig), + } + } + + /// Splits a `Ref` into multiple `Ref`s for different components of the + /// borrowed data. + /// + /// The `FairlyUnsafeCell` is already immutably borrowed, so this cannot introduce UB where there was none before. + /// + /// This is an associated function that needs to be used as + /// `Ref::map_split(...)`. A method would interfere with methods of the same + /// name on the contents of a `FairlyUnsafeCell` used through `Deref`. + /// + /// # Examples + /// + /// ``` + /// use fairly_unsafe_cell::*; + /// + /// let cell = FairlyUnsafeCell::new([1, 2, 3, 4]); + /// let borrow = unsafe { cell.borrow() }; + /// let (begin, end) = Ref::map_split(borrow, |slice| slice.split_at(2)); + /// assert_eq!(*begin, [1, 2]); + /// assert_eq!(*end, [3, 4]); + /// ``` + #[inline] + pub fn map_split(orig: Ref<'b, T>, f: F) -> (Ref<'b, U>, Ref<'b, V>) + where + F: FnOnce(&T) -> (&U, &V), + { + let (a, b) = f(orig.0); + (Ref(a), Ref(b)) + } +} + +impl Deref for Ref<'_, T> { + type Target = T; + + #[inline] + fn deref(&self) -> &T { + self.0 + } +} + +impl fmt::Display for Ref<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } +} + +/// A wrapper type for a mutably borrowed value from a `FairlyUnsafeCell`. +/// +/// See the [module-level documentation](crate) for more. +#[derive(Debug)] +pub struct RefMut<'b, T>(&'b mut T) +where + T: 'b + ?Sized; + +impl<'b, T: ?Sized> RefMut<'b, T> { + /// Makes a new `RefMut` for a component of the borrowed data, e.g., an enum + /// variant. + /// + /// The `FairlyUnsafeCell` is already mutably borrowed, so this cannot fail. + /// + /// This is an associated function that needs to be used as + /// `RefMut::map(...)`. A method would interfere with methods of the same + /// name on the contents of a `FairlyUnsafeCell` used through `Deref`. + /// + /// # Examples + /// + /// ``` + /// use fairly_unsafe_cell::*; + /// + /// let c = FairlyUnsafeCell::new((5, 'b')); + /// { + /// let b1: RefMut<'_, (u32, char)> = unsafe { c.borrow_mut() }; + /// let mut b2: RefMut<'_, u32> = RefMut::map(b1, |t| &mut t.0); + /// assert_eq!(*b2, 5); + /// *b2 = 42; + /// } + /// assert_eq!(unsafe { *c.borrow() }, (42, 'b')); + /// ``` + #[inline] + pub fn map(orig: RefMut<'b, T>, f: F) -> RefMut<'b, U> + where + F: FnOnce(&mut T) -> &mut U, + { + RefMut(f(orig.0)) + } + + // /// Makes a new `RefMut` for an optional component of the borrowed data. The + // /// original guard is returned as an `Err(..)` if the closure returns + // /// `None`. + // /// + // /// The `FairlyUnsafeCell` is already mutably borrowed, so this cannot fail. + // /// + // /// This is an associated function that needs to be used as + // /// `RefMut::filter_map(...)`. A method would interfere with methods of the + // /// same name on the contents of a `FairlyUnsafeCell` used through `Deref`. + // /// + // /// # Examples + // /// + // /// ``` + // /// use fairly_unsafe_cell::*; + // /// + // /// let c = FairlyUnsafeCell::new(vec![1, 2, 3]); + // /// + // /// { + // /// let b1: RefMut<'_, Vec> = unsafe { c.borrow_mut() }; + // /// let mut b2: Result, _> = RefMut::filter_map(b1, |v| v.get_mut(1)); + // /// + // /// if let Ok(mut b2) = b2 { + // /// *b2 += 2; + // /// } + // /// } + // /// + // /// assert_eq!(* unsafe { c.borrow() }, vec![1, 4, 3]); + // /// ``` + // #[inline] + // pub fn filter_map(orig: RefMut<'b, T>, f: F) -> Result, Self> + // where + // F: FnOnce(&mut T) -> Option<&mut U>, + // { + // { if let Some(yay) = f(&mut *orig.0) { + // return Ok(RefMut(yay)); + // }} + + // Err(RefMut(orig.0)) + // } + + /// Splits a `RefMut` into multiple `RefMut`s for different components of the + /// borrowed data. + /// + /// The underlying `FairlyUnsafeCell` will remain mutably borrowed until both + /// returned `RefMut`s go out of scope. + /// + /// The `FairlyUnsafeCell` is already mutably borrowed, so this cannot fail. + /// + /// This is an associated function that needs to be used as + /// `RefMut::map_split(...)`. A method would interfere with methods of the + /// same name on the contents of a `FairlyUnsafeCell` used through `Deref`. + /// + /// # Examples + /// + /// ``` + /// use fairly_unsafe_cell::*; + /// + /// let cell = FairlyUnsafeCell::new([1, 2, 3, 4]); + /// let borrow = unsafe { cell.borrow_mut() }; + /// let (mut begin, mut end) = RefMut::map_split(borrow, |slice| slice.split_at_mut(2)); + /// assert_eq!(*begin, [1, 2]); + /// assert_eq!(*end, [3, 4]); + /// begin.copy_from_slice(&[4, 3]); + /// end.copy_from_slice(&[2, 1]); + /// ``` + #[inline] + pub fn map_split( + orig: RefMut<'b, T>, + f: F, + ) -> (RefMut<'b, U>, RefMut<'b, V>) + where + F: FnOnce(&mut T) -> (&mut U, &mut V), + { + let (a, b) = f(orig.0); + (RefMut(a), RefMut(b)) + } +} + +impl Deref for RefMut<'_, T> { + type Target = T; + + #[inline] + fn deref(&self) -> &T { + self.0 + } +} + +impl DerefMut for RefMut<'_, T> { + #[inline] + fn deref_mut(&mut self) -> &mut T { + self.0 + } +} + +impl fmt::Display for RefMut<'_, T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } +} diff --git a/fuzz/Cargo.toml b/fuzz/Cargo.toml index 7ad02b2..0dd6fdb 100644 --- a/fuzz/Cargo.toml +++ b/fuzz/Cargo.toml @@ -12,16 +12,20 @@ workspace = true [dependencies] ufotofu = { version = "0.6.0", features = ["std", "dev"] } +# ufotofu = { path = "../../ufotofu/ufotofu", features = ["std", "dev"] } +ufotofu_queues = { version = "0.5.0", features = ["std"] } pollster = "0.4.0" arbitrary = { version = "1.0.2", features = ["derive"] } libfuzzer-sys = { version = "0.4", features = ["arbitrary-derive"] } signature = "2.2.0" syncify = "0.1.0" willow-encoding = { path = "../encoding" } -ufotofu_codec = { path = "../ufotofu_codec", features = [ "std", "dev" ] } -ufotofu_codec_endian = { path = "../ufotofu_codec_endian", features = [ "std" ] } -compact_u64 = { path = "../compact_u64", features = [ "std", "dev" ] } +ufotofu_codec = { path = "../ufotofu_codec", features = ["std", "dev"] } +ufotofu_codec_endian = { path = "../ufotofu_codec_endian", features = ["std"] } +compact_u64 = { path = "../compact_u64", features = ["std", "dev"] } either = "1.10.0" +wb_async_utils = { path = "../wb_async_utils", features = ["channels"] } +futures = { version = "0.3.31" } [dependencies.willow-data-model] path = "../data-model" @@ -193,6 +197,17 @@ test = false doc = false bench = false +################## +# wb_async_utils # +################## + +[[bin]] +name = "spsc" +path = "fuzz_targets/spsc.rs" +test = false +doc = false +bench = false + ######### # Other # ######### @@ -470,4 +485,3 @@ path = "fuzz_targets/enc_mcsubspacecapability_abs.rs" test = false doc = false bench = false - diff --git a/fuzz/fuzz_targets/spsc.rs b/fuzz/fuzz_targets/spsc.rs new file mode 100644 index 0000000..a433fd3 --- /dev/null +++ b/fuzz/fuzz_targets/spsc.rs @@ -0,0 +1,82 @@ +#![no_main] + +use std::sync::Arc; +use std::thread; + +use futures::join; +use libfuzzer_sys::fuzz_target; + +use either::Either::*; + +use ufotofu::consumer::{self, BulkConsumerOperation}; +use ufotofu::producer::{self, BulkProducerOperation}; +use ufotofu::{BufferedConsumer, Consumer, Producer}; +use wb_async_utils::spsc::*; + +fuzz_target!(|data: ( + Box<[u8]>, + Result, + usize, + Vec, + Vec +)| { + let (input, last, queue_capacity, consume_ops, produce_ops) = data; + + let queue_capacity = std::cmp::min(2048, std::cmp::max(1, queue_capacity)); + + pollster::block_on(async { + let state: State, i16, i16> = + State::new(ufotofu_queues::Fixed::new(queue_capacity)); + let (sender, receiver) = new_spsc(&state); + let mut sender = consumer::BulkScrambler::new(sender, consume_ops); + let mut receiver = producer::BulkScrambler::new(receiver, produce_ops); + + let send = async { + for datum in input.iter() { + assert_eq!(Ok(()), sender.consume(*datum).await); + } + assert_eq!(Ok(()), sender.flush().await); + + + match last { + Ok(fin) => { + assert_eq!(Ok(()), sender.close(fin).await); + } + Err(err) => { + sender.as_mut().cause_error(err); + } + } + }; + + let receive = async { + for datum in input.iter() { + assert_eq!(Ok(Left(*datum)), receiver.produce().await); + } + + match last { + Ok(fin) => { + assert_eq!(Ok(Right(fin)), receiver.produce().await); + } + Err(err) => { + assert_eq!(Err(err), receiver.produce().await); + } + } + }; + + // let done = Arc::new(std::sync::atomic::AtomicBool::new(false)); + // let done_as_well = done.clone(); + + // thread::spawn(move || { + // thread::sleep(std::time::Duration::from_millis(10000)); + + // if !done_as_well.load(std::sync::atomic::Ordering::Relaxed) { + // std::process::exit(-123); + // } + // }); + + join!(send, receive); + + // done.store(true, std::sync::atomic::Ordering::Relaxed); + + }); +}); diff --git a/wb_async_utils/Cargo.toml b/wb_async_utils/Cargo.toml index e7d103c..d8e667a 100644 --- a/wb_async_utils/Cargo.toml +++ b/wb_async_utils/Cargo.toml @@ -5,14 +5,21 @@ edition = "2021" [features] -# enables ufotofu-related functionality -channels = ["ufotofu", "ufotofu_queues", "either", "async_cell"] +# enables inmemory channel functionality +# channels = ["ufotofu", "ufotofu_queues", "either", "async_cell"] +channels = ["ufotofu", "ufotofu_queues", "either"] [dependencies] -ufotofu = { version = "0.6.0", features = ["std"], optional = true } +ufotofu = { version = "0.6.1", features = ["std"], optional = true } +# ufotofu = { path = "../../ufotofu/ufotofu", features = ["std", "dev"], optional = true } ufotofu_queues = { version = "0.5.0", features = ["std"], optional = true } either = { version = "1.10.0", optional = true } -async_cell = { version = "0.2.2", optional = true } # TODO refactor channels to not use this anymore +# async_cell = { version = "0.2.2", optional = true } +fairly_unsafe_cell = { path = "../fairly_unsafe_cell", version = "0.1.0" } + +[dev-dependencies] +pollster = "0.4.0" +futures = { version = "0.3.31" } [lints] workspace = true diff --git a/wb_async_utils/src/lib.rs b/wb_async_utils/src/lib.rs index 0ea2180..890519e 100644 --- a/wb_async_utils/src/lib.rs +++ b/wb_async_utils/src/lib.rs @@ -10,8 +10,5 @@ pub use once_cell::OnceCell; mod take_cell; pub use take_cell::TakeCell; -#[cfg(feature = "ufotofu")] -pub mod mpmc; - #[cfg(feature = "ufotofu")] pub mod spsc; diff --git a/wb_async_utils/src/mpmc.rs b/wb_async_utils/src/mpmc.rs deleted file mode 100644 index a2cb3c8..0000000 --- a/wb_async_utils/src/mpmc.rs +++ /dev/null @@ -1,278 +0,0 @@ -//! This module provides a nonblocking **m**ulti **p**roducer **m**ulti **c**onsumer channel -//! backed by an arbitrary [`Queue`], with a UFOTOFU-based interface. - -use async_cell::unsync::AsyncCell; -use either::Either::{self, *}; -use std::{cell::RefCell, convert::Infallible, rc::Rc}; -use ufotofu::{BufferedConsumer, BufferedProducer, BulkConsumer, BulkProducer, Consumer, Producer}; -use ufotofu_queues::Queue; - -/// Create an MPMC channel, in the form of an [`Input`] that implements [`BulkConsumer`] -/// and an [`Output`] that implements [`BulkProducer`]. -pub fn new_mpmc(queue: Q) -> (Input, Output) { - let state = Rc::new(SpscState { - queue: RefCell::new(queue), - notify: AsyncCell::new(), - fin: RefCell::new(None), - err: RefCell::new(None), - }); - - return ( - Input { - state: state.clone(), - }, - Output { state }, - ); -} - -/// Shared state between the Input and the Output. -#[derive(Debug)] -struct SpscState { - queue: RefCell, - notify: AsyncCell<()>, - fin: RefCell>, - err: RefCell>, -} - -#[derive(Clone, Debug)] -pub struct Input { - state: Rc>, -} - -impl Input { - /// Return the number of items that are currently buffered. - pub fn len(&self) -> usize { - self.state.queue.borrow().len() - } - - /// Set an error to be emitted on the corresponding `Output`. - /// The error is only emitted there when trying to produce values - /// via `produce` or `expose_items` (or any method calling one of these), - /// but never when `slurp`ing or calling `consider_produced`. - /// - /// Must not call any of the `Consumer`, `BufferedConsumer`, or `BulkProducer` methods - /// on this `Input` after calling this function. - /// May call this function at most once per `Input`. - pub fn cause_error(&mut self, err: E) { - *self.state.err.borrow_mut() = Some(err); - self.state.notify.set(()); - } -} - -impl Consumer for Input { - type Item = Q::Item; - - type Final = F; - - type Error = Infallible; - - /// Write the item into the buffer queue, waiting for buffer space to - /// become available (by reading items from the corresponding [`Output`]) if necessary. - async fn consume(&mut self, item: Self::Item) -> Result<(), Self::Error> { - loop { - // Try to buffer the item. - match self.state.queue.borrow_mut().enqueue(item) { - // Enqueueing failed. - Some(_) => { - // Wait for queue space. - let () = self.state.notify.take().await; - // Go into the next iteration of the loop, where enqueeuing is guaranteed to succeed. - } - // Enqueueing succeeded. - None => return Ok(()), - } - } - } - - async fn close(&mut self, fin: Self::Final) -> Result<(), Self::Error> { - // Store the final value for later access by the Output. - *self.state.fin.borrow_mut() = Some(fin); - - // If the queue is empty, we need to notify the waiting Output (if any) of the final value. - if self.state.queue.borrow().is_empty() { - self.state.notify.set(()); - } - - Ok(()) - } -} - -impl BufferedConsumer for Input { - async fn flush(&mut self) -> Result<(), Self::Error> { - Ok(()) // Nothing to do here. - } -} - -impl BulkConsumer for Input { - async fn expose_slots<'a>(&'a mut self) -> Result<&'a mut [Self::Item], Self::Error> - where - Self::Item: 'a, - { - loop { - // Try obtain at least one empty slots. - match self.state.queue.borrow_mut().expose_slots() { - // No empty slot available. - None => { - // Wait for queue space. - let () = self.state.notify.take().await; - // Go into the next iteration of the loop, where there will be slots available. - } - //Got some empty slots. - Some(slots) => { - // We need to return something which lives for 'a, - // but to the compiler's best knowledge, `slots` lives only - // for as long as the return value of `self.state.queue.borrow_mut()`, - // whose lifetime is limited by the current stack frame. - // - // We *know* that these slots will have a sufficiently long lifetime, - // however, because they sit inside an Rc which has a lifetime of 'a. - // An Rc keeps its contents alive as long at least as itself. - // Thus we know that the slots have a lifetime of at least 'a. - // Hence, extending the lifetime to 'a is safe. - let slots: &'a mut [Q::Item] = unsafe { extend_lifetime_mut(slots) }; - return Ok(slots); - } - } - } - } - - async fn consume_slots(&mut self, amount: usize) -> Result<(), Self::Error> { - self.state.queue.borrow_mut().consider_enqueued(amount); - Ok(()) - } -} - -#[derive(Clone, Debug)] -pub struct Output { - state: Rc>, -} - -impl Output { - /// Return the number of items that are currently buffered. - pub fn len(&self) -> usize { - self.state.queue.borrow().len() - } -} - -impl Producer for Output { - type Item = Q::Item; - - type Final = F; - - type Error = E; - - /// Take an item from the buffer queue, waiting for an item to - /// become available (by being consumed by the corresponding [`Input`]) if necessary. - async fn produce(&mut self) -> Result, Self::Error> { - loop { - // Try to obtain the next item. - match self.state.queue.borrow_mut().dequeue() { - // At least one item was in the buffer, return it. - Some(item) => return Ok(Left(item)), - None => { - // Buffer is empty. - // But perhaps the final item has been consumed already? - match self.state.fin.borrow_mut().take() { - Some(fin) => { - // Yes, so we can return the final item. - return Ok(Right(fin)); - } - None => { - // No, so we check whether there is an error. - match self.state.err.borrow_mut().take() { - Some(err) => { - // Yes, there is an error; return it. - return Err(err); - } - None => { - // No, no error either, so we wait until something changes. - let () = self.state.notify.take().await; - // Go into the next iteration of the loop, where progress will be made. - } - } - } - } - } - } - } - } -} - -impl BufferedProducer for Output { - async fn slurp(&mut self) -> Result<(), Self::Error> { - Ok(()) // Nothing to do. - } -} - -impl BulkProducer for Output { - async fn expose_items<'a>( - &'a mut self, - ) -> Result, Self::Error> - where - Self::Item: 'a, - { - loop { - // Try to get at least one item. - match self.state.queue.borrow_mut().expose_items() { - // No items available - None => { - // But perhaps the final item has been consumed already? - match self.state.fin.borrow_mut().take() { - Some(fin) => { - // Yes, so we can return the final item. - return Ok(Right(fin)); - } - None => { - // No, so we check whether there is an error. - match self.state.err.borrow_mut().take() { - Some(err) => { - // Yes, there is an error; return it. - return Err(err); - } - None => { - // No, no error either, so we wait until something changes. - let () = self.state.notify.take().await; - // Go into the next iteration of the loop, where progress will be made. - } - } - } - } - } - // Got at least one item - Some(items) => { - // We need to return something which lives for 'a, - // but to the compiler's best knowledge, `items` lives only - // for as long as the return value of `self.state.queue.borrow_mut()`, - // whose lifetime is limited by the current stack frame. - // - // We *know* that these items will have a sufficiently long lifetime, - // however, because they sit inside an Rc which has a lifetime of 'a. - // An Rc keeps its contents alive as long at least as itself. - // Thus we know that the items have a lifetime of at least 'a. - // Hence, extending the lifetime to 'a is safe. - let items: &'a [Q::Item] = unsafe { extend_lifetime(items) }; - return Ok(Left(items)); - } - } - } - } - - async fn consider_produced(&mut self, amount: usize) -> Result<(), Self::Error> { - self.state.queue.borrow_mut().consider_dequeued(amount); - Ok(()) - } -} - -// This is safe if and only if the object pointed at by `reference` lives for at least `'longer`. -// See https://doc.rust-lang.org/nightly/std/intrinsics/fn.transmute.html for more detail. -unsafe fn extend_lifetime<'shorter, 'longer, T: ?Sized>(reference: &'shorter T) -> &'longer T { - std::mem::transmute::<&'shorter T, &'longer T>(reference) -} - -// This is safe if and only if the object pointed at by `reference` lives for at least `'longer`. -// See https://doc.rust-lang.org/nightly/std/intrinsics/fn.transmute.html for more detail. -unsafe fn extend_lifetime_mut<'shorter, 'longer, T: ?Sized>( - reference: &'shorter mut T, -) -> &'longer mut T { - std::mem::transmute::<&'shorter mut T, &'longer mut T>(reference) -} diff --git a/wb_async_utils/src/mutex.rs b/wb_async_utils/src/mutex.rs index 351c298..c3aedd7 100644 --- a/wb_async_utils/src/mutex.rs +++ b/wb_async_utils/src/mutex.rs @@ -183,14 +183,14 @@ pub struct ReadGuard<'mutex, T> { mutex: &'mutex Mutex, } -impl<'mutex, T> Drop for ReadGuard<'mutex, T> { +impl Drop for ReadGuard<'_, T> { fn drop(&mut self) { self.mutex.currently_used.set(false); self.mutex.wake_next(); } } -impl<'mutex, T> Deref for ReadGuard<'mutex, T> { +impl Deref for ReadGuard<'_, T> { type Target = T; fn deref(&self) -> &T { @@ -205,14 +205,14 @@ pub struct WriteGuard<'mutex, T> { mutex: &'mutex Mutex, } -impl<'mutex, T> Drop for WriteGuard<'mutex, T> { +impl Drop for WriteGuard<'_, T> { fn drop(&mut self) { self.mutex.currently_used.set(false); self.mutex.wake_next(); } } -impl<'mutex, T> Deref for WriteGuard<'mutex, T> { +impl Deref for WriteGuard<'_, T> { type Target = T; fn deref(&self) -> &T { @@ -220,7 +220,7 @@ impl<'mutex, T> Deref for WriteGuard<'mutex, T> { } } -impl<'mutex, T> DerefMut for WriteGuard<'mutex, T> { +impl DerefMut for WriteGuard<'_, T> { fn deref_mut(&mut self) -> &mut T { unsafe { &mut *self.mutex.value.get() } // Safe because a `&mut WriteGuard` can never live at the same time as another `&mut WriteGuard` or a `&mut Mutex` } diff --git a/wb_async_utils/src/once_cell.rs b/wb_async_utils/src/once_cell.rs index f6855d9..e9d9e5c 100644 --- a/wb_async_utils/src/once_cell.rs +++ b/wb_async_utils/src/once_cell.rs @@ -19,6 +19,12 @@ enum State { Empty(VecDeque), } +impl Default for OnceCell { + fn default() -> Self { + Self::new() + } +} + impl OnceCell { /// Creates a new, empty [`OnceCell`]. pub fn new() -> Self { diff --git a/wb_async_utils/src/rw.rs b/wb_async_utils/src/rw.rs index 475a5ba..d98e859 100644 --- a/wb_async_utils/src/rw.rs +++ b/wb_async_utils/src/rw.rs @@ -205,7 +205,7 @@ pub struct ReadGuard<'lock, T> { lock: &'lock RwLock, } -impl<'lock, T> Drop for ReadGuard<'lock, T> { +impl Drop for ReadGuard<'_, T> { fn drop(&mut self) { match self.lock.readers.get() { None => unsafe { unreachable_unchecked() }, // ReadGuards are only created when there are no writers @@ -219,7 +219,7 @@ impl<'lock, T> Drop for ReadGuard<'lock, T> { } } -impl<'lock, T> Deref for ReadGuard<'lock, T> { +impl Deref for ReadGuard<'_, T> { type Target = T; fn deref(&self) -> &T { @@ -234,7 +234,7 @@ pub struct WriteGuard<'lock, T> { lock: &'lock RwLock, } -impl<'lock, T> Drop for WriteGuard<'lock, T> { +impl Drop for WriteGuard<'_, T> { fn drop(&mut self) { self.lock.readers.set(Some(0)); // Guaranteed to have been `None` before. @@ -242,7 +242,7 @@ impl<'lock, T> Drop for WriteGuard<'lock, T> { } } -impl<'lock, T> Deref for WriteGuard<'lock, T> { +impl Deref for WriteGuard<'_, T> { type Target = T; fn deref(&self) -> &T { @@ -250,7 +250,7 @@ impl<'lock, T> Deref for WriteGuard<'lock, T> { } } -impl<'lock, T> DerefMut for WriteGuard<'lock, T> { +impl DerefMut for WriteGuard<'_, T> { fn deref_mut(&mut self) -> &mut T { unsafe { &mut *self.lock.value.get() } // Safe because a `&mut WriteGuard` can never live at the same time as another `&mut WriteGuard` or a `&mut RwLock` } diff --git a/wb_async_utils/src/spsc.rs b/wb_async_utils/src/spsc.rs index 033e8fb..b8e156e 100644 --- a/wb_async_utils/src/spsc.rs +++ b/wb_async_utils/src/spsc.rs @@ -1,103 +1,215 @@ //! This module provides a nonblocking **s**ingle **p**roducer **s**ingle **c**onsumer channel //! backed by an arbitrary [`Queue`], with a UFOTOFU-based interface. +//! +//! It differs from most other spsc implementations in a few important aspects: +//! +//! - It is not `Send`. This channel does not provide synchronisation across threads, it merely decouples program components executing on the same thread. In a sense, its primary function is working around the limitations on shared mutability imposed by the borrow checker. +//! - It can be backed by arbitrary [`Queue`]s. A common choice would be the [`ufotofu_queues::Fixed`] fixed-capacity queue. +//! - It is unopinionated where the shared state between sender and receiver lives. Most APIs transparently handle the state by placing it on the heap behind a reference counted pointer. Our implementation lets the programmer supply the shared state as an opaque struct. When knowing the lifetime of the sender and receiver, the state can be stack-allocated instead of heap-allocated. +//! - It allows closing with arbitrary `Final` values, and the sender has a method for triggering an error on the receiver. +//! - Dropping the sender or the receiver does not inform the other endpoint about anything. +//! +//! See [`new_spsc`] for the entrypoint to this module and some examples. -// TODO right now, this is more-or-less the mcmp, but copypasted and without `Clone` on `Input` and `Output`. To be replaced with a better implementation. - -use async_cell::unsync::AsyncCell; use either::Either::{self, *}; -use std::{cell::RefCell, convert::Infallible, rc::Rc}; +use fairly_unsafe_cell::*; +use std::{ + cell::Cell, + convert::Infallible, + marker::PhantomData, + ops::{Deref, DerefMut}, +}; use ufotofu::{BufferedConsumer, BufferedProducer, BulkConsumer, BulkProducer, Consumer, Producer}; use ufotofu_queues::Queue; -// TODO use fin and error... -// TODO also, unify them into a single last: Result? - -/// Create a SPSC channel, in the form of an [`Input`] that implements [`BulkConsumer`] -/// and an [`Output`] that implements [`BulkProducer`]. -pub fn new_spsc(queue: Q) -> (Input, Output) { - let state = Rc::new(SpscState { - queue: RefCell::new(queue), - notify: AsyncCell::new(), - fin: RefCell::new(None), - err: RefCell::new(None), - }); - - return ( - Input { - state: state.clone(), +use crate::{Mutex, TakeCell}; + +/// The state shared between the [`Sender`] and the [`Receiver`]. This is fully opaque, but we expose it to give control over where it is allocated. +#[derive(Debug)] +pub struct State { + // We need a Mutex here because `expose_slots` and `expose_items` can be called concurrently on the two endpoints but would result in a mutable and an immutable borrow coexisting. + queue: Mutex, + // Safety: We never return refs to this from any method, and we never hold a borrow across `.await` points. + // Hence, no cuncurrent refs can exist. + last: FairlyUnsafeCell>>, + // We track the number of items in the queue here, so that we can access it without waiting for the Mutex of the queue itself. + // A bit awkward, but this enables sync access to the current count. + len: Cell, + // Empty while the sender cannot make progress. + notify_the_sender: TakeCell<()>, + // Empty while the receiver cannot make progress. + notify_the_receiver: TakeCell<()>, +} + +impl State { + /// Creates a new [`State`], using the given queue for the SPSC channel. The queue must have a non-zero maximum capacity. + pub fn new(queue: Q) -> Self { + State { + len: Cell::new(queue.len()), + queue: Mutex::new(queue), + last: FairlyUnsafeCell::new(None), + notify_the_sender: TakeCell::new_with(()), + notify_the_receiver: TakeCell::new_with(()), + } + } +} + +/// Creates a new SPSC channel in the form of a [`Sender`] and a [`Receiver`] endpoint which communicate via the given [`State`]. +/// +/// An example with a stack-allocated [`State`]: +/// +/// ``` +/// use wb_async_utils::spsc::*; +/// use ufotofu::*; +/// +/// let state: State<_, _, ()> = State::new(ufotofu_queues::Fixed::new(99 /* capacity */)); +/// let (mut sender, mut receiver) = new_spsc(&state); +/// +/// pollster::block_on(async { +/// // If the capacity was less than four, you would need to join +/// // a future that sends and a future that receives the items. +/// assert!(sender.consume(300).await.is_ok()); +/// assert!(sender.consume(400).await.is_ok()); +/// assert!(sender.consume(500).await.is_ok()); +/// assert!(sender.close(-17).await.is_ok()); +/// assert_eq!(300, receiver.produce().await.unwrap().unwrap_left()); +/// assert_eq!(400, receiver.produce().await.unwrap().unwrap_left()); +/// assert_eq!(500, receiver.produce().await.unwrap().unwrap_left()); +/// assert_eq!(-17, receiver.produce().await.unwrap().unwrap_right()); +/// }); +/// ``` +/// +/// An example with a heap-allocated [`State`]: +/// +/// ``` +/// use wb_async_utils::spsc::*; +/// use ufotofu::*; +/// +/// let state: State<_, _, ()> = State::new(ufotofu_queues::Fixed::new(99 /* capacity */)); +/// let (mut sender, mut receiver) = new_spsc(std::rc::Rc::new(state)); +/// +/// pollster::block_on(async { +/// // If the capacity was less than four, you would need to join +/// // a future that sends and a future that receives the items. +/// assert!(sender.consume(300).await.is_ok()); +/// assert!(sender.consume(400).await.is_ok()); +/// assert!(sender.consume(500).await.is_ok()); +/// assert!(sender.close(-17).await.is_ok()); +/// assert_eq!(300, receiver.produce().await.unwrap().unwrap_left()); +/// assert_eq!(400, receiver.produce().await.unwrap().unwrap_left()); +/// assert_eq!(500, receiver.produce().await.unwrap().unwrap_left()); +/// assert_eq!(-17, receiver.produce().await.unwrap().unwrap_right()); +/// }); +/// ``` +pub fn new_spsc(state_ref: R) -> (Sender, Receiver) +where + R: Deref> + Clone, +{ + ( + Sender { + state: state_ref.clone(), + phantom: PhantomData, + }, + Receiver { + state: state_ref, + phantom: PhantomData, }, - Output { state }, - ); + ) } -/// Shared state between the Input and the Output. +/// Allows sending data to the SPSC channel via its [`BulkConsumer`] implementation. #[derive(Debug)] -struct SpscState { - queue: RefCell, - notify: AsyncCell<()>, - fin: RefCell>, - err: RefCell>, +pub struct Sender { + state: R, + phantom: PhantomData<(Q, F, E)>, } +/// Allows receiving data from the SPSC channel via its [`BulkProducer`] implementation. #[derive(Debug)] -pub struct Input { - state: Rc>, +pub struct Receiver { + state: R, + phantom: PhantomData<(Q, F, E)>, } -impl Input { - /// Return the number of items that are currently buffered. +impl>, Q: Queue, F, E> Sender { + /// Returns the number of items that are currently buffered. pub fn len(&self) -> usize { - self.state.queue.borrow().len() + self.state.len.get() + } + + /// Returns whether there are currently no items buffered. + pub fn is_empty(&self) -> bool { + self.len() == 0 } - /// Set an error to be emitted on the corresponding `Output`. + /// Sets an error to be emitted on the corresponding `Receiver`. /// The error is only emitted there when trying to produce values /// via `produce` or `expose_items` (or any method calling one of these), /// but never when `slurp`ing or calling `consider_produced`. /// /// Must not call any of the `Consumer`, `BufferedConsumer`, or `BulkProducer` methods - /// on this `Input` after calling this function. - /// May call this function at most once per `Input`. + /// on this `Receiver` after calling this function, nor `close_sync`. + /// May call this function at most once per `Receiver`. + /// Must not call this function after calling `close` or `close_sync`. pub fn cause_error(&mut self, err: E) { - *self.state.err.borrow_mut() = Some(err); - self.state.notify.set(()); + let mut last = unsafe { self.state.last.borrow_mut() }; + debug_assert!( + last.is_none(), + "Must not call `cause_error` multiple times or after calling `close` or `close_sync`." + ); + *last = Some(Err(err)); + + self.state.notify_the_receiver.set(()); } - /// Same as calling [`Consumer::close`], but sync. - pub fn close_sync(&mut self, fin: F) -> Result<(), E> { - // Store the final value for later access by the Output. - *self.state.fin.borrow_mut() = Some(fin); + /// Same as calling [`Consumer::close`], but sync. Must not use this multiple times, after calling `close`, or after calling `cause_error`. + pub fn close_sync(&mut self, fin: F) -> Result<(), Infallible> { + // Store the final value for later access by the Sender. + let mut last = unsafe { self.state.last.borrow_mut() }; + debug_assert!( + last.is_none(), + "Must not call `close` or `close_sync` multiple times or after calling `cause_error`." + ); + *last = Some(Ok(fin)); - // If the queue is empty, we need to notify the waiting Output (if any) of the final value. - if self.state.queue.borrow().is_empty() { - self.state.notify.set(()); - } + self.state.notify_the_receiver.set(()); Ok(()) } } -impl Consumer for Input { +impl>, Q: Queue, F, E> Consumer for Sender { type Item = Q::Item; type Final = F; - type Error = E; + type Error = Infallible; - /// Write the item into the buffer queue, waiting for buffer space to - /// become available (by reading items from the corresponding [`Output`]) if necessary. - async fn consume(&mut self, item: Self::Item) -> Result<(), Self::Error> { + /// Writes the item into the buffer queue, waiting for buffer space to + /// become available (by reading items from the corresponding [`Sender`]) if necessary. + async fn consume(&mut self, item_: Self::Item) -> Result<(), Self::Error> { + let mut item = item_; loop { // Try to buffer the item. - match self.state.queue.borrow_mut().enqueue(item) { + let did_it_work = { + // Inside a block to drop the Mutex access before awaiting on the notifier. + self.state.queue.write().await.deref_mut().enqueue(item) + }; + + match did_it_work { // Enqueueing failed. - Some(_) => { + Some(item_) => { // Wait for queue space. - let () = self.state.notify.take().await; + let () = self.state.notify_the_sender.take().await; // Go into the next iteration of the loop, where enqueeuing is guaranteed to succeed. + item = item_; } // Enqueueing succeeded. - None => return Ok(()), + None => { + self.state.len.set(self.state.len.get() + 1); + self.state.notify_the_receiver.set(()); + return Ok(()); + } } } } @@ -107,24 +219,24 @@ impl Consumer for Input { } } -impl BufferedConsumer for Input { +impl>, Q: Queue, F, E> BufferedConsumer for Sender { async fn flush(&mut self) -> Result<(), Self::Error> { Ok(()) // Nothing to do here. } } -impl BulkConsumer for Input { +impl>, Q: Queue, F, E> BulkConsumer for Sender { async fn expose_slots<'a>(&'a mut self) -> Result<&'a mut [Self::Item], Self::Error> where Self::Item: 'a, { loop { // Try obtain at least one empty slots. - match self.state.queue.borrow_mut().expose_slots() { + match self.state.queue.write().await.deref_mut().expose_slots() { // No empty slot available. None => { // Wait for queue space. - let () = self.state.notify.take().await; + let () = self.state.notify_the_sender.take().await; // Go into the next iteration of the loop, where there will be slots available. } //Got some empty slots. @@ -135,9 +247,7 @@ impl BulkConsumer for Input { // whose lifetime is limited by the current stack frame. // // We *know* that these slots will have a sufficiently long lifetime, - // however, because they sit inside an Rc which has a lifetime of 'a. - // An Rc keeps its contents alive as long at least as itself. - // Thus we know that the slots have a lifetime of at least 'a. + // however, because they sit inside a State which must outlive 'a. // Hence, extending the lifetime to 'a is safe. let slots: &'a mut [Q::Item] = unsafe { extend_lifetime_mut(slots) }; return Ok(slots); @@ -147,24 +257,31 @@ impl BulkConsumer for Input { } async fn consume_slots(&mut self, amount: usize) -> Result<(), Self::Error> { - self.state.queue.borrow_mut().consider_enqueued(amount); + self.state + .queue + .write() + .await + .deref_mut() + .consider_enqueued(amount); + self.state.len.set(self.state.len.get() + amount); + self.state.notify_the_receiver.set(()); Ok(()) } } -#[derive(Debug)] -pub struct Output { - state: Rc>, -} - -impl Output { - /// Return the number of items that are currently buffered. +impl>, Q: Queue, F, E> Receiver { + /// Returns the number of items that are currently buffered. pub fn len(&self) -> usize { - self.state.queue.borrow().len() + self.state.len.get() + } + + /// Returns whether there are currently no items buffered. + pub fn is_empty(&self) -> bool { + self.len() == 0 } } -impl Producer for Output { +impl>, Q: Queue, F, E> Producer for Receiver { type Item = Q::Item; type Final = F; @@ -172,34 +289,31 @@ impl Producer for Output { type Error = E; /// Take an item from the buffer queue, waiting for an item to - /// become available (by being consumed by the corresponding [`Input`]) if necessary. + /// become available (by being consumed by the corresponding [`Sender`]) if necessary. async fn produce(&mut self) -> Result, Self::Error> { loop { // Try to obtain the next item. - match self.state.queue.borrow_mut().dequeue() { + match self.state.queue.write().await.deref_mut().dequeue() { // At least one item was in the buffer, return it. - Some(item) => return Ok(Left(item)), + Some(item) => { + self.state.len.set(self.state.len.get() - 1); + self.state.notify_the_sender.set(()); + return Ok(Left(item)); + } None => { // Buffer is empty. - // But perhaps the final item has been consumed already? - match self.state.fin.borrow_mut().take() { - Some(fin) => { - // Yes, so we can return the final item. + // But perhaps the final item has been consumed already, or an error was requested? + match unsafe { self.state.last.borrow_mut().take() } { + Some(Ok(fin)) => { return Ok(Right(fin)); } + Some(Err(err)) => { + return Err(err); + } None => { - // No, so we check whether there is an error. - match self.state.err.borrow_mut().take() { - Some(err) => { - // Yes, there is an error; return it. - return Err(err); - } - None => { - // No, no error either, so we wait until something changes. - let () = self.state.notify.take().await; - // Go into the next iteration of the loop, where progress will be made. - } - } + // No last item yet, so we wait until something changes. + let () = self.state.notify_the_receiver.take().await; + // Go into the next iteration of the loop, where progress will be made. } } } @@ -208,13 +322,26 @@ impl Producer for Output { } } -impl BufferedProducer for Output { +impl>, Q: Queue, F, E> BufferedProducer for Receiver { async fn slurp(&mut self) -> Result<(), Self::Error> { + // Nothing to do really, except that if the buffer is empty and an error was set, then we emit it. + if self.is_empty() { + // if self.state.queue.read().await.len() == 0 { + match unsafe { self.state.last.borrow_mut().take() } { + None => { /* no-op */ } + Some(Err(err)) => return Err(err), + Some(Ok(fin)) => { + // Put the fin back in the cell. + unsafe { *self.state.last.borrow_mut().deref_mut() = Some(Ok(fin)) } + } + } + } + Ok(()) // Nothing to do. } } -impl BulkProducer for Output { +impl>, Q: Queue, F, E> BulkProducer for Receiver { async fn expose_items<'a>( &'a mut self, ) -> Result, Self::Error> @@ -223,28 +350,21 @@ impl BulkProducer for Output { { loop { // Try to get at least one item. - match self.state.queue.borrow_mut().expose_items() { + match self.state.queue.write().await.deref_mut().expose_items() { // No items available None => { - // But perhaps the final item has been consumed already? - match self.state.fin.borrow_mut().take() { - Some(fin) => { - // Yes, so we can return the final item. + // But perhaps the final item has been consumed already, or an error was requested? + match unsafe { self.state.last.borrow_mut().take() } { + Some(Ok(fin)) => { return Ok(Right(fin)); } + Some(Err(err)) => { + return Err(err); + } None => { - // No, so we check whether there is an error. - match self.state.err.borrow_mut().take() { - Some(err) => { - // Yes, there is an error; return it. - return Err(err); - } - None => { - // No, no error either, so we wait until something changes. - let () = self.state.notify.take().await; - // Go into the next iteration of the loop, where progress will be made. - } - } + // No last item yet, so we wait until something changes. + let () = self.state.notify_the_receiver.take().await; + // Go into the next iteration of the loop, where progress will be made. } } } @@ -256,9 +376,7 @@ impl BulkProducer for Output { // whose lifetime is limited by the current stack frame. // // We *know* that these items will have a sufficiently long lifetime, - // however, because they sit inside an Rc which has a lifetime of 'a. - // An Rc keeps its contents alive as long at least as itself. - // Thus we know that the items have a lifetime of at least 'a. + // however, because they sit inside a State which must outlive 'a. // Hence, extending the lifetime to 'a is safe. let items: &'a [Q::Item] = unsafe { extend_lifetime(items) }; return Ok(Left(items)); @@ -268,7 +386,14 @@ impl BulkProducer for Output { } async fn consider_produced(&mut self, amount: usize) -> Result<(), Self::Error> { - self.state.queue.borrow_mut().consider_dequeued(amount); + self.state + .queue + .write() + .await + .deref_mut() + .consider_dequeued(amount); + self.state.len.set(self.state.len.get() - amount); + self.state.notify_the_sender.set(()); Ok(()) } } @@ -286,3 +411,107 @@ unsafe fn extend_lifetime_mut<'shorter, 'longer, T: ?Sized>( ) -> &'longer mut T { std::mem::transmute::<&'shorter mut T, &'longer mut T>(reference) } + +#[cfg(test)] +mod tests { + use super::*; + + use futures::join; + + use ufotofu_queues::Fixed; + + #[test] + fn test_spsc_sufficient_capacity() { + let state: State<_, _, ()> = State::new(Fixed::new(99 /* capacity */)); + let (mut sender, mut receiver) = new_spsc(&state); + + pollster::block_on(async { + assert!(sender.consume(300).await.is_ok()); + assert!(sender.consume(400).await.is_ok()); + assert!(sender.consume(500).await.is_ok()); + assert!(sender.close(-17).await.is_ok()); + assert_eq!(300, receiver.produce().await.unwrap().unwrap_left()); + assert_eq!(400, receiver.produce().await.unwrap().unwrap_left()); + assert_eq!(500, receiver.produce().await.unwrap().unwrap_left()); + assert_eq!(-17, receiver.produce().await.unwrap().unwrap_right()); + }); + } + + #[test] + fn test_spsc_low_capacity() { + pollster::block_on(async { + let state: State<_, _, ()> = State::new(Fixed::new(3 /* capacity */)); + let (mut sender, mut receiver) = new_spsc(&state); + + let send_things = async { + assert!(sender.consume(300).await.is_ok()); + assert!(sender.consume(400).await.is_ok()); + assert!(sender.consume(500).await.is_ok()); + assert!(sender.close(-17).await.is_ok()); + }; + + let receive_things = async { + assert_eq!(300, receiver.produce().await.unwrap().unwrap_left()); + assert_eq!(400, receiver.produce().await.unwrap().unwrap_left()); + assert_eq!(500, receiver.produce().await.unwrap().unwrap_left()); + assert_eq!(-17, receiver.produce().await.unwrap().unwrap_right()); + }; + + join!(send_things, receive_things); + }); + } + + #[test] + fn test_spsc_immediate_final() { + pollster::block_on(async { + let state: State, i16, ()> = State::new(Fixed::new(3 /* capacity */)); + let (mut sender, mut receiver) = new_spsc(&state); + + let send_things = async { + assert!(sender.close(-17).await.is_ok()); + }; + + let receive_things = async { + assert_eq!(-17, receiver.produce().await.unwrap().unwrap_right()); + }; + + join!(send_things, receive_things); + }); + } + + #[test] + fn test_spsc_immediate_error() { + pollster::block_on(async { + let state: State, i16, i16> = State::new(Fixed::new(3 /* capacity */)); + let (mut sender, mut receiver) = new_spsc(&state); + + let send_things = async { + sender.cause_error(-17); + }; + + let receive_things = async { + assert_eq!(-17, receiver.produce().await.unwrap_err()); + }; + + join!(send_things, receive_things); + }); + } + + #[test] + fn test_spsc_slurp() { + pollster::block_on(async { + let state: State, i16, i16> = State::new(Fixed::new(3 /* capacity */)); + let (mut sender, mut receiver) = new_spsc(&state); + + let send_things = async { + sender.cause_error(-17); + }; + + let receive_things = async { + assert_eq!(-17, receiver.slurp().await.unwrap_err()); + }; + + join!(send_things, receive_things); + }); + } +} diff --git a/wb_async_utils/src/take_cell.rs b/wb_async_utils/src/take_cell.rs index 727844c..b1b78ef 100644 --- a/wb_async_utils/src/take_cell.rs +++ b/wb_async_utils/src/take_cell.rs @@ -15,6 +15,12 @@ pub struct TakeCell { parked: UnsafeCell>, // push_back to enqueue, pop_front to dequeue } +impl Default for TakeCell { + fn default() -> Self { + Self::new() + } +} + impl TakeCell { /// Creates a new, empty [`TakeCell`]. pub fn new() -> Self { @@ -42,7 +48,30 @@ impl TakeCell { unsafe { &*self.value.get() }.is_none() } + /// Returns how many tasks are currently waiting for the cell to be filled. + pub fn count_waiting(&self) -> usize { + unsafe { &*self.parked.get() }.len() + } + /// Sets the value in the cell. If the cell was empty, wakes up the oldest pending async method call that was waiting for a value in the cell. + /// + /// ``` + /// use futures::join; + /// use wb_async_utils::TakeCell; + /// + /// let cell = TakeCell::new(); + /// + /// pollster::block_on(async { + /// let waitForSetting = async { + /// assert_eq!(5, cell.take().await); + /// }; + /// let setToFive = async { + /// cell.set(5); + /// }; + /// + /// join!(waitForSetting, setToFive); + /// }); + /// ``` pub fn set(&self, value: T) { let _ = self.replace(value); } @@ -68,6 +97,17 @@ impl TakeCell { } /// Takes the current value out of the cell if there is one, waiting for one to arrive if necessary. + /// + /// ``` + /// use futures::join; + /// use wb_async_utils::TakeCell; + /// + /// let cell = TakeCell::new_with(5); + /// + /// pollster::block_on(async { + /// assert_eq!(5, cell.take().await); + /// }); + /// ``` pub async fn take(&self) -> T { TakeFuture(self).await } @@ -119,7 +159,7 @@ impl fmt::Debug for TakeCell { struct TakeFuture<'cell, T>(&'cell TakeCell); -impl<'cell, T> Future for TakeFuture<'cell, T> { +impl Future for TakeFuture<'_, T> { type Output = T; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll {