feat: add LocalWaker type, ContextBuilder type, and LocalWake trait.

This commit is contained in:
Tomás Vallotton 2023-11-30 18:11:02 -03:00
parent 314384b5fb
commit 60a08196b6
4 changed files with 504 additions and 25 deletions

View file

@ -135,6 +135,7 @@
#![feature(iter_next_chunk)]
#![feature(iter_repeat_n)]
#![feature(layout_for_ptr)]
#![feature(local_waker)]
#![feature(maybe_uninit_slice)]
#![feature(maybe_uninit_uninit_array)]
#![feature(maybe_uninit_uninit_array_transpose)]

View file

@ -7,8 +7,9 @@
//! `#[cfg(target_has_atomic = "ptr")]`.
use core::mem::ManuallyDrop;
use core::task::{RawWaker, RawWakerVTable, Waker};
use core::task::{LocalWaker, RawWaker, RawWakerVTable, Waker};
use crate::rc::Rc;
use crate::sync::Arc;
/// The implementation of waking a task on an executor.
@ -152,3 +153,165 @@ fn raw_waker<W: Wake + Send + Sync + 'static>(waker: Arc<W>) -> RawWaker {
&RawWakerVTable::new(clone_waker::<W>, wake::<W>, wake_by_ref::<W>, drop_waker::<W>),
)
}
/// An analogous trait to `Wake` but used to construct a `LocalWaker`. This API
/// works in exactly the same way as `Wake`, except that it uses an `Rc` instead
/// of an `Arc`, and the result is a `LocalWaker` instead of a `Waker`.
///
/// The benefits of using `LocalWaker` over `Waker` are that it allows the local waker
/// to hold data that does not implement `Send` and `Sync`. Additionally, it saves calls
/// to `Arc::clone`, which requires atomic synchronization.
///
/// # Examples
///
/// A
///
/// This is a simplified example of a `spawn` and a `block_on` function. The `spawn` function
/// is used to push new tasks onto the run queue, while the block on function will remove them
/// and poll them. When a task is woken, it will put itself back on the run queue to be polled by the executor.
///
/// **Note:** A real world example would interlieve poll calls with calls to an io reactor to wait for events instead
/// of spinning on a loop.
///
/// ```rust
/// use std::task::{LocalWake, ContextBuilder, LocalWaker};
/// use std::future::Future;
/// use std::pin::Pin;
/// use std::rc::Rc;
/// use std::cell::RefCell;
/// use std::collections::VecDeque;
///
///
/// thread_local! {
/// // A queue containing all tasks ready to do progress
/// static RUN_QUEUE: RefCell<VecDeque<Rc<Task>>> = RefCell::default();
/// }
///
/// type BoxedFuture = Pin<Box<dyn Future<Output = ()>>>;
///
/// struct Task(RefCell<BoxedFuture>);
///
/// impl LocalWake for Task {
/// fn wake(self: Rc<Self>) {
/// RUN_QUEUE.with_borrow_mut(|queue| {
/// queue.push_back(self)
/// })
/// }
/// }
///
/// fn spawn<F>(future: F)
/// where
/// F: Future<Output=()> + 'static + Send + Sync
/// {
/// let task = Rc::new(Box::pin(future));
/// RUN_QUEUE.with_borrow_mut(|queue| {
/// queue.push_back(task)
/// });
/// }
///
/// fn block_on<F>(future: F)
/// where
/// F: Future<Output=()> + 'static + Sync + Send
/// {
/// spawn(future);
/// loop {
/// let Some(task) = RUN_QUEUE.with_borrow_mut(|queue|queue.pop_front()) else {
/// // we exit, since there are no more tasks remaining on the queue
/// return;
/// };
/// // cast the Rc<Task> into a `LocalWaker`
/// let waker: LocalWaker = task.into();
/// // Build the context using `ContextBuilder`
/// let mut cx = ContextBuilder::new()
/// .local_waker(&waker)
/// .build();
///
/// // Poll the task
/// task.0
/// .borrow_mut()
/// .as_mut()
/// .poll(&mut cx);
/// }
/// }
/// ```
///
#[unstable(feature = "local_waker", issue = "none")]
pub trait LocalWake {
/// Wake this task.
#[unstable(feature = "local_waker", issue = "none")]
fn wake(self: Rc<Self>);
/// Wake this task without consuming the local waker.
///
/// If an executor supports a cheaper way to wake without consuming the
/// waker, it should override this method. By default, it clones the
/// [`Rc`] and calls [`wake`] on the clone.
///
/// [`wake`]: Rc::wake
#[unstable(feature = "local_waker", issue = "none")]
fn wake_by_ref(self: &Rc<Self>) {
self.clone().wake();
}
}
#[unstable(feature = "local_waker", issue = "none")]
impl<W: LocalWake + 'static> From<Rc<W>> for LocalWaker {
/// Use a `Wake`-able type as a `LocalWaker`.
///
/// No heap allocations or atomic operations are used for this conversion.
fn from(waker: Rc<W>) -> LocalWaker {
// SAFETY: This is safe because raw_waker safely constructs
// a RawWaker from Rc<W>.
unsafe { LocalWaker::from_raw(local_raw_waker(waker)) }
}
}
#[allow(ineffective_unstable_trait_impl)]
#[unstable(feature = "local_waker", issue = "none")]
impl<W: LocalWake + 'static> From<Rc<W>> for RawWaker {
/// Use a `Wake`-able type as a `RawWaker`.
///
/// No heap allocations or atomic operations are used for this conversion.
fn from(waker: Rc<W>) -> RawWaker {
local_raw_waker(waker)
}
}
// NB: This private function for constructing a RawWaker is used, rather than
// inlining this into the `From<Rc<W>> for RawWaker` impl, to ensure that
// the safety of `From<Rc<W>> for Waker` does not depend on the correct
// trait dispatch - instead both impls call this function directly and
// explicitly.
#[inline(always)]
fn local_raw_waker<W: LocalWake + 'static>(waker: Rc<W>) -> RawWaker {
// Increment the reference count of the Rc to clone it.
unsafe fn clone_waker<W: LocalWake + 'static>(waker: *const ()) -> RawWaker {
unsafe { Rc::increment_strong_count(waker as *const W) };
RawWaker::new(
waker as *const (),
&RawWakerVTable::new(clone_waker::<W>, wake::<W>, wake_by_ref::<W>, drop_waker::<W>),
)
}
// Wake by value, moving the Rc into the LocalWake::wake function
unsafe fn wake<W: LocalWake + 'static>(waker: *const ()) {
let waker = unsafe { Rc::from_raw(waker as *const W) };
<W as LocalWake>::wake(waker);
}
// Wake by reference, wrap the waker in ManuallyDrop to avoid dropping it
unsafe fn wake_by_ref<W: LocalWake + 'static>(waker: *const ()) {
let waker = unsafe { ManuallyDrop::new(Rc::from_raw(waker as *const W)) };
<W as LocalWake>::wake_by_ref(&waker);
}
// Decrement the reference count of the Rc on drop
unsafe fn drop_waker<W: LocalWake + 'static>(waker: *const ()) {
unsafe { Rc::decrement_strong_count(waker as *const W) };
}
RawWaker::new(
Rc::into_raw(waker) as *const (),
&RawWakerVTable::new(clone_waker::<W>, wake::<W>, wake_by_ref::<W>, drop_waker::<W>),
)
}

View file

@ -8,7 +8,7 @@ pub use self::poll::Poll;
mod wake;
#[stable(feature = "futures_api", since = "1.36.0")]
pub use self::wake::{Context, RawWaker, RawWakerVTable, Waker};
pub use self::wake::{Context, ContextBuilder, LocalWaker, RawWaker, RawWakerVTable, Waker};
mod ready;
#[stable(feature = "ready_macro", since = "1.64.0")]

View file

@ -1,5 +1,7 @@
#![stable(feature = "futures_api", since = "1.36.0")]
use crate::mem::transmute;
use crate::fmt;
use crate::marker::PhantomData;
use crate::ptr;
@ -60,6 +62,21 @@ impl RawWaker {
pub fn vtable(&self) -> &'static RawWakerVTable {
self.vtable
}
#[unstable(feature = "noop_waker", issue = "98286")]
const NOOP: RawWaker = {
const VTABLE: RawWakerVTable = RawWakerVTable::new(
// Cloning just returns a new no-op raw waker
|_| RawWaker::NOOP,
// `wake` does nothing
|_| {},
// `wake_by_ref` does nothing
|_| {},
// Dropping does nothing as we don't allocate anything
|_| {},
);
RawWaker::new(ptr::null(), &VTABLE)
};
}
/// A virtual function pointer table (vtable) that specifies the behavior
@ -177,7 +194,8 @@ impl RawWakerVTable {
#[stable(feature = "futures_api", since = "1.36.0")]
#[lang = "Context"]
pub struct Context<'a> {
waker: &'a Waker,
waker: Option<&'a Waker>,
local_waker: Option<&'a LocalWaker>,
// Ensure we future-proof against variance changes by forcing
// the lifetime to be invariant (argument-position lifetimes
// are contravariant while return-position lifetimes are
@ -195,16 +213,36 @@ impl<'a> Context<'a> {
#[must_use]
#[inline]
pub const fn from_waker(waker: &'a Waker) -> Self {
Context { waker, _marker: PhantomData, _marker2: PhantomData }
ContextBuilder::new().waker(waker).build()
}
/// Returns a reference to the [`Waker`] for the current task.
///
/// Note that if the waker does not need to be sent across threads, it
/// is preferable to call `local_waker`, which is more portable and
/// potentially more efficient.
///
/// # Panics
/// This function will panic if no `Waker` was set on the context. This happens if
/// the executor does not support working with thread safe wakers. An alternative
/// may be to call [`.local_waker()`](Context::local_waker) instead.
#[stable(feature = "futures_api", since = "1.36.0")]
#[rustc_const_unstable(feature = "const_waker", issue = "102012")]
#[must_use]
#[inline]
pub const fn waker(&self) -> &'a Waker {
&self.waker
&self
.waker
.expect("no waker was set on this context, consider calling `local_waker` instead.")
}
/// Returns a reference to the [`LocalWaker`] for the current task.
#[unstable(feature = "local_waker", issue = "none")]
pub fn local_waker(&self) -> &'a LocalWaker {
// Safety:
// It is safe to transmute a `&Waker` into a `&LocalWaker` since both are a transparent
// wrapper around a local waker. Also, the Option<&Waker> here cannot be None since it is
// impossible to construct a Context without any waker set.
self.local_waker.unwrap_or_else(|| unsafe { transmute(self.waker) })
}
}
@ -215,6 +253,94 @@ impl fmt::Debug for Context<'_> {
}
}
/// A Builder used to construct a `Context` instance
/// with support for `LocalWaker`.
///
/// # Examples
/// ```
/// #![feature(local_waker)]
/// #![feature(noop_waker)]
/// use std::task::{ContextBuilder, LocalWaker, Waker};
///
/// let local_waker = LocalWaker::noop();
/// let waker = Waker::noop();
///
/// let context = ContextBuilder::default()
/// .local_waker(&local_waker)
/// .waker(&waker)
/// .build();
/// ```
#[unstable(feature = "local_waker", issue = "none")]
#[derive(Default, Debug)]
pub struct ContextBuilder<'a> {
waker: Option<&'a Waker>,
local_waker: Option<&'a LocalWaker>,
}
impl<'a> ContextBuilder<'a> {
/// Creates a new empty `ContextBuilder`.
#[inline]
#[rustc_const_unstable(feature = "const_waker", issue = "102012")]
#[unstable(feature = "local_waker", issue = "none")]
pub const fn new() -> Self {
ContextBuilder { waker: None, local_waker: None }
}
/// This field is used to set the value of the waker on `Context`.
#[inline]
#[rustc_const_unstable(feature = "const_waker", issue = "102012")]
#[unstable(feature = "local_waker", issue = "none")]
pub const fn waker(self, waker: &'a Waker) -> Self {
Self { waker: Some(waker), ..self }
}
/// This method is used to set the value for the local waker on `Context`.
///
/// # Examples
/// ```
/// #![feature(local_waker)]
/// #![feature(noop_waker)]
///
/// use std::task;
/// use std::pin;
/// use std::future::Future;
///
/// let local_waker = task::LocalWaker::noop();
///
/// let mut context = task::ContextBuilder::new()
/// .local_waker(&local_waker)
/// .build();
///
/// let future = pin::pin!(async { 20 });
///
/// let poll = future.poll(&mut context);
///
/// assert_eq!(poll, task::Poll::Ready(20));
/// ```
#[inline]
#[unstable(feature = "local_waker", issue = "none")]
#[rustc_const_unstable(feature = "const_waker", issue = "102012")]
pub const fn local_waker(self, local_waker: &'a LocalWaker) -> Self {
Self { local_waker: Some(local_waker), ..self }
}
/// Builds the `Context`.
///
/// # Panics
/// Panics if no `Waker` or `LocalWaker` is set.
#[inline]
#[unstable(feature = "local_waker", issue = "none")]
#[rustc_const_unstable(feature = "const_waker", issue = "102012")]
pub const fn build(self) -> Context<'a> {
let ContextBuilder { waker, local_waker } = self;
assert!(
waker.is_some() || local_waker.is_some(),
"at least one waker must be set with either the `local_waker` or `waker` methods on `ContextBuilder`."
);
Context { waker, local_waker, _marker: PhantomData, _marker2: PhantomData }
}
}
/// A `Waker` is a handle for waking up a task by notifying its executor that it
/// is ready to be run.
///
@ -229,7 +355,8 @@ impl fmt::Debug for Context<'_> {
/// Implements [`Clone`], [`Send`], and [`Sync`]; therefore, a waker may be invoked
/// from any thread, including ones not in any way managed by the executor. For example,
/// this might be done to wake a future when a blocking function call completes on another
/// thread.
/// thread. If the waker does not need to be moved across threads, it is better to use
/// [`LocalWaker`], which the executor may use to skip unnecessary memory synchronization.
///
/// Note that it is preferable to use `waker.clone_from(&new_waker)` instead
/// of `*waker = new_waker.clone()`, as the former will avoid cloning the waker
@ -354,25 +481,8 @@ impl Waker {
#[must_use]
#[unstable(feature = "noop_waker", issue = "98286")]
pub const fn noop() -> &'static Waker {
// Ideally all this data would be explicitly `static` because it is used by reference and
// only ever needs one copy. But `const fn`s (and `const` items) cannot refer to statics,
// even though their values can be promoted to static. (That might change; see #119618.)
// An alternative would be a `pub static NOOP: &Waker`, but associated static items are not
// currently allowed either, and making it non-associated would be unergonomic.
const VTABLE: RawWakerVTable = RawWakerVTable::new(
// Cloning just returns a new no-op raw waker
|_| RAW,
// `wake` does nothing
|_| {},
// `wake_by_ref` does nothing
|_| {},
// Dropping does nothing as we don't allocate anything
|_| {},
);
const RAW: RawWaker = RawWaker::new(ptr::null(), &VTABLE);
const WAKER_REF: &Waker = &Waker { waker: RAW };
WAKER_REF
const WAKER: &Waker = &Waker { waker: RawWaker::NOOP };
WAKER
}
/// Get a reference to the underlying [`RawWaker`].
@ -425,3 +535,208 @@ impl fmt::Debug for Waker {
.finish()
}
}
/// A `LocalWaker` is analogous to a [`Waker`], but it does not implement [`Send`] or [`Sync`].
/// This handle encapsulates a [`RawWaker`] instance, which defines the
/// executor-specific wakeup behavior.
///
/// Local wakers can be requested from a `Context` with the [`local_waker`] method.
///
/// The typical life of a `LocalWaker` is that it is constructed by an executor, wrapped in a
/// [`Context`], then passed to [`Future::poll()`]. Then, if the future chooses to return
/// [`Poll::Pending`], it must also store the waker somehow and call [`Waker::wake()`] when
/// the future should be polled again.
///
/// Implements [`Clone`], but neither [`Send`] nor [`Sync`]; therefore, a local waker may
/// not be moved to other threads. In general, when deciding to use wakers or local wakers,
/// local wakers are preferable unless the waker needs to be sent across threads. This is because
/// wakers can incur in additional cost related to memory synchronization, and not all executors
/// may support wakers.
///
/// Note that it is preferable to use `local_waker.clone_from(&new_waker)` instead
/// of `*local_waker = new_waker.clone()`, as the former will avoid cloning the waker
/// unnecessarily if the two wakers [wake the same task](Self::will_wake).
///
/// # Examples
///
/// ```
/// #![feature(local_waker)]
/// use std::future::{Future, poll_fn};
/// use std::task::Poll;
///
/// // a future that returns pending once.
/// fn yield_now() -> impl Future<Output=()> + Unpin {
/// let mut yielded = false;
/// poll_fn(move |cx| {
/// if !yielded {
/// yielded = true;
/// cx.local_waker().wake_by_ref();
/// return Poll::Pending;
/// }
/// return Poll::Ready(())
/// })
/// }
/// # async {
/// yield_now().await;
/// # };
/// ```
///
/// [`Future::poll()`]: core::future::Future::poll
/// [`Poll::Pending`]: core::task::Poll::Pending
/// [`local_waker`]: core::task::Context::local_waker
#[unstable(feature = "local_waker", issue = "none")]
#[repr(transparent)]
pub struct LocalWaker {
waker: RawWaker,
}
#[unstable(feature = "local_waker", issue = "none")]
impl Unpin for LocalWaker {}
impl LocalWaker {
/// Creates a new `LocalWaker` from [`RawWaker`].
///
/// The behavior of the returned `Waker` is undefined if the contract defined
/// in [`RawWaker`]'s and [`RawWakerVTable`]'s documentation is not upheld.
/// Therefore this method is unsafe.
#[inline]
#[must_use]
#[stable(feature = "futures_api", since = "1.36.0")]
#[rustc_const_unstable(feature = "const_waker", issue = "102012")]
pub const unsafe fn from_raw(waker: RawWaker) -> LocalWaker {
Self { waker }
}
/// Wake up the task associated with this `LocalWaker`.
///
/// As long as the executor keeps running and the task is not finished, it is
/// guaranteed that each invocation of [`wake()`](Self::wake) (or
/// [`wake_by_ref()`](Self::wake_by_ref)) will be followed by at least one
/// [`poll()`] of the task to which this `Waker` belongs. This makes
/// it possible to temporarily yield to other tasks while running potentially
/// unbounded processing loops.
///
/// Note that the above implies that multiple wake-ups may be coalesced into a
/// single [`poll()`] invocation by the runtime.
///
/// Also note that yielding to competing tasks is not guaranteed: it is the
/// executors choice which task to run and the executor may choose to run the
/// current task again.
///
/// [`poll()`]: crate::future::Future::poll
#[inline]
#[stable(feature = "futures_api", since = "1.36.0")]
pub fn wake(self) {
// The actual wakeup call is delegated through a virtual function call
// to the implementation which is defined by the executor.
let wake = self.waker.vtable.wake;
let data = self.waker.data;
// Don't call `drop` -- the waker will be consumed by `wake`.
crate::mem::forget(self);
// SAFETY: This is safe because `Waker::from_raw` is the only way
// to initialize `wake` and `data` requiring the user to acknowledge
// that the contract of `RawWaker` is upheld.
unsafe { (wake)(data) };
}
/// Creates a new `LocalWaker` that does nothing when `wake` is called.
///
/// This is mostly useful for writing tests that need a [`Context`] to poll
/// some futures, but are not expecting those futures to wake the waker or
/// do not need to do anything specific if it happens.
///
/// # Examples
///
/// ```
/// #![feature(local_waker)]
/// #![feature(noop_waker)]
///
/// use std::future::Future;
/// use std::task::{ContextBuilder, LocalWaker};
///
/// let mut cx = task::ContextBuilder::new()
/// .local_waker(LocalWaker::noop())
/// .build();
///
/// let mut future = Box::pin(async { 10 });
/// assert_eq!(future.as_mut().poll(&mut cx), task::Poll::Ready(10));
/// ```
#[inline]
#[must_use]
#[unstable(feature = "noop_waker", issue = "98286")]
pub const fn noop() -> &'static LocalWaker {
const WAKER: &LocalWaker = &LocalWaker { waker: RawWaker::NOOP };
WAKER
}
/// Get a reference to the underlying [`RawWaker`].
#[inline]
#[must_use]
#[unstable(feature = "waker_getters", issue = "87021")]
pub fn as_raw(&self) -> &RawWaker {
&self.waker
}
/// Returns `true` if this `LocalWaker` and another `LocalWaker` would awake the same task.
///
/// This function works on a best-effort basis, and may return false even
/// when the `Waker`s would awaken the same task. However, if this function
/// returns `true`, it is guaranteed that the `Waker`s will awaken the same task.
///
/// This function is primarily used for optimization purposes — for example,
/// this type's [`clone_from`](Self::clone_from) implementation uses it to
/// avoid cloning the waker when they would wake the same task anyway.
#[inline]
#[must_use]
#[stable(feature = "futures_api", since = "1.36.0")]
pub fn will_wake(&self, other: &LocalWaker) -> bool {
self.waker == other.waker
}
/// Wake up the task associated with this `LocalWaker` without consuming the `LocalWaker`.
///
/// This is similar to [`wake()`](Self::wake), but may be slightly less efficient in
/// the case where an owned `Waker` is available. This method should be preferred to
/// calling `waker.clone().wake()`.
#[inline]
#[stable(feature = "futures_api", since = "1.36.0")]
pub fn wake_by_ref(&self) {
// The actual wakeup call is delegated through a virtual function call
// to the implementation which is defined by the executor.
// SAFETY: see `wake`
unsafe { (self.waker.vtable.wake_by_ref)(self.waker.data) }
}
}
#[unstable(feature = "local_waker", issue = "none")]
impl Clone for LocalWaker {
#[inline]
fn clone(&self) -> Self {
LocalWaker {
// SAFETY: This is safe because `Waker::from_raw` is the only way
// to initialize `clone` and `data` requiring the user to acknowledge
// that the contract of [`RawWaker`] is upheld.
waker: unsafe { (self.waker.vtable.clone)(self.waker.data) },
}
}
#[inline]
fn clone_from(&mut self, source: &Self) {
if !self.will_wake(source) {
*self = source.clone();
}
}
}
#[stable(feature = "futures_api", since = "1.36.0")]
impl fmt::Debug for LocalWaker {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let vtable_ptr = self.waker.vtable as *const RawWakerVTable;
f.debug_struct("LocalWaker")
.field("data", &self.waker.data)
.field("vtable", &vtable_ptr)
.finish()
}
}