Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support shared futures on no_std #2868

Merged
merged 5 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions futures-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ Common utilities and extension traits for the futures-rs library.

[features]
default = ["std", "async-await", "async-await-macro"]
std = ["alloc", "futures-core/std", "futures-task/std", "slab"]
alloc = ["futures-core/alloc", "futures-task/alloc"]
std = ["alloc", "futures-core/std", "futures-task/std", "slab/std"]
alloc = ["futures-core/alloc", "futures-task/alloc", "slab"]
async-await = []
async-await-macro = ["async-await", "futures-macro"]
compat = ["std", "futures_01"]
Expand All @@ -37,12 +37,13 @@ futures-channel = { path = "../futures-channel", version = "=0.4.0-alpha.0", def
futures-io = { path = "../futures-io", version = "0.3.30", default-features = false, features = ["std"], optional = true }
futures-sink = { path = "../futures-sink", version = "=0.4.0-alpha.0", default-features = false, optional = true }
futures-macro = { path = "../futures-macro", version = "=0.4.0-alpha.0", default-features = false, optional = true }
slab = { version = "0.4.2", optional = true }
slab = { version = "0.4.2", default-features = false, optional = true }
memchr = { version = "2.2", optional = true }
futures_01 = { version = "0.1.25", optional = true, package = "futures" }
tokio-io = { version = "0.1.9", optional = true }
pin-utils = "0.1.0"
pin-project-lite = "0.2.6"
spin = { version = "0.9.8", optional = true }

[dev-dependencies]
futures = { path = "../futures", features = ["async-await", "thread-pool"] }
Expand Down
8 changes: 4 additions & 4 deletions futures-util/src/future/future/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,9 @@ mod remote_handle;
#[cfg(feature = "std")]
pub use self::remote_handle::{Remote, RemoteHandle};

#[cfg(feature = "std")]
#[cfg(any(feature = "std", feature = "spin"))]
adavis628 marked this conversation as resolved.
Show resolved Hide resolved
mod shared;
#[cfg(feature = "std")]
#[cfg(any(feature = "std", feature = "spin"))]
pub use self::shared::{Shared, WeakShared};

impl<T: ?Sized> FutureExt for T where T: Future {}
Expand Down Expand Up @@ -440,7 +440,7 @@ pub trait FutureExt: Future {
/// into a cloneable future. It enables a future to be polled by multiple
/// threads.
///
/// This method is only available when the `std` feature of this
/// This method is only available when the `std` or 'spin' feature of this
/// library is activated, and it is activated by default.
///
/// # Examples
Expand Down Expand Up @@ -474,7 +474,7 @@ pub trait FutureExt: Future {
/// join_handle.join().unwrap();
/// # });
/// ```
#[cfg(feature = "std")]
#[cfg(any(feature = "std", feature = "spin"))]
fn shared(self) -> Shared<Self>
where
Self: Sized,
Expand Down
37 changes: 29 additions & 8 deletions futures-util/src/future/future/shared.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
use crate::task::{waker_ref, ArcWake};
use alloc::sync::{Arc, Weak};
use core::cell::UnsafeCell;
use core::fmt;
use core::hash::Hasher;
use core::pin::Pin;
use core::ptr;
use core::sync::atomic::AtomicUsize;
use core::sync::atomic::Ordering::{Acquire, SeqCst};
use futures_core::future::{FusedFuture, Future};
use futures_core::task::{Context, Poll, Waker};
use slab::Slab;
use std::cell::UnsafeCell;
use std::fmt;
use std::hash::Hasher;
use std::pin::Pin;
use std::ptr;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::{Acquire, SeqCst};
use std::sync::{Arc, Mutex, Weak};

#[cfg(feature = "std")]
type Mutex<T> = std::sync::Mutex<T>;
#[cfg(not(feature = "std"))]
type Mutex<T> = spin::Mutex<T>;

/// Future for the [`shared`](super::FutureExt::shared) method.
#[must_use = "futures do nothing unless you `.await` or poll them"]
Expand Down Expand Up @@ -204,7 +209,10 @@ where
{
/// Registers the current task to receive a wakeup when we are awoken.
fn record_waker(&self, waker_key: &mut usize, cx: &mut Context<'_>) {
#[cfg(feature = "std")]
let mut wakers_guard = self.notifier.wakers.lock().unwrap();
#[cfg(not(feature = "std"))]
let mut wakers_guard = self.notifier.wakers.lock();

let wakers_mut = wakers_guard.as_mut();

Expand Down Expand Up @@ -345,7 +353,11 @@ where
inner.notifier.state.store(COMPLETE, SeqCst);

// Wake all tasks and drop the slab
#[cfg(feature = "std")]
let mut wakers_guard = inner.notifier.wakers.lock().unwrap();
#[cfg(not(feature = "std"))]
let mut wakers_guard = inner.notifier.wakers.lock();

let mut wakers = wakers_guard.take().unwrap();
for waker in wakers.drain().flatten() {
waker.wake();
Expand Down Expand Up @@ -375,19 +387,28 @@ where
fn drop(&mut self) {
if self.waker_key != NULL_WAKER_KEY {
if let Some(ref inner) = self.inner {
#[cfg(feature = "std")]
if let Ok(mut wakers) = inner.notifier.wakers.lock() {
if let Some(wakers) = wakers.as_mut() {
wakers.remove(self.waker_key);
}
}
#[cfg(not(feature = "std"))]
if let Some(wakers) = inner.notifier.wakers.lock().as_mut() {
wakers.remove(self.waker_key);
}
}
}
}
}

impl ArcWake for Notifier {
fn wake_by_ref(arc_self: &Arc<Self>) {
#[cfg(feature = "std")]
let wakers = &mut *arc_self.wakers.lock().unwrap();
#[cfg(not(feature = "std"))]
let wakers = &mut *arc_self.wakers.lock();

if let Some(wakers) = wakers.as_mut() {
for (_key, opt_waker) in wakers {
if let Some(waker) = opt_waker.take() {
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/future/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub use self::future::CatchUnwind;
#[cfg(feature = "std")]
pub use self::future::{Remote, RemoteHandle};

#[cfg(feature = "std")]
#[cfg(any(feature = "std", feature = "spin"))]
pub use self::future::{Shared, WeakShared};

mod try_future;
Expand Down
1 change: 1 addition & 0 deletions futures/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ compat = ["std", "futures-util/compat"]
io-compat = ["compat", "futures-util/io-compat"]
executor = ["std", "futures-executor/std"]
thread-pool = ["executor", "futures-executor/thread-pool"]
spin = ["futures-util/spin"]

# Unstable features
# These features are outside of the normal semver guarantees and require the
Expand Down
Loading