Auto merge of #128219 - connortsui20:rwlock-downgrade, r=tgross35
Rwlock downgrade Tracking Issue: #128203 This PR adds a `downgrade` method for `RwLock` / `RwLockWriteGuard` on all currently supported platforms. Outstanding questions: - [x] ~~Does the `futex.rs` change affect performance at all? It doesn't seem like it will but we can't be certain until we bench it...~~ - [x] ~~Should the SOLID platform implementation [be ported over](https://github.com/rust-lang/rust/pull/128219#discussion_r1693470090) to the `queue.rs` implementation to allow it to support downgrades?~~
This commit is contained in:
commit
e83c45a98b
7 changed files with 676 additions and 275 deletions
|
@ -4,10 +4,10 @@ mod tests;
|
|||
use crate::cell::UnsafeCell;
|
||||
use crate::fmt;
|
||||
use crate::marker::PhantomData;
|
||||
use crate::mem::ManuallyDrop;
|
||||
use crate::mem::{ManuallyDrop, forget};
|
||||
use crate::ops::{Deref, DerefMut};
|
||||
use crate::ptr::NonNull;
|
||||
use crate::sync::{LockResult, TryLockError, TryLockResult, poison};
|
||||
use crate::sync::{LockResult, PoisonError, TryLockError, TryLockResult, poison};
|
||||
use crate::sys::sync as sys;
|
||||
|
||||
/// A reader-writer lock
|
||||
|
@ -574,8 +574,12 @@ impl<T> From<T> for RwLock<T> {
|
|||
|
||||
impl<'rwlock, T: ?Sized> RwLockReadGuard<'rwlock, T> {
|
||||
/// Creates a new instance of `RwLockReadGuard<T>` from a `RwLock<T>`.
|
||||
// SAFETY: if and only if `lock.inner.read()` (or `lock.inner.try_read()`) has been
|
||||
// successfully called from the same thread before instantiating this object.
|
||||
///
|
||||
/// # Safety
|
||||
///
|
||||
/// This function is safe if and only if the same thread has successfully and safely called
|
||||
/// `lock.inner.read()`, `lock.inner.try_read()`, or `lock.inner.downgrade()` before
|
||||
/// instantiating this object.
|
||||
unsafe fn new(lock: &'rwlock RwLock<T>) -> LockResult<RwLockReadGuard<'rwlock, T>> {
|
||||
poison::map_result(lock.poison.borrow(), |()| RwLockReadGuard {
|
||||
data: unsafe { NonNull::new_unchecked(lock.data.get()) },
|
||||
|
@ -957,6 +961,68 @@ impl<'a, T: ?Sized> RwLockWriteGuard<'a, T> {
|
|||
None => Err(orig),
|
||||
}
|
||||
}
|
||||
|
||||
/// Downgrades a write-locked `RwLockWriteGuard` into a read-locked [`RwLockReadGuard`].
|
||||
///
|
||||
/// This method will atomically change the state of the [`RwLock`] from exclusive mode into
|
||||
/// shared mode. This means that it is impossible for a writing thread to get in between a
|
||||
/// thread calling `downgrade` and the same thread reading whatever it wrote while it had the
|
||||
/// [`RwLock`] in write mode.
|
||||
///
|
||||
/// Note that since we have the `RwLockWriteGuard`, we know that the [`RwLock`] is already
|
||||
/// locked for writing, so this method cannot fail.
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```
|
||||
/// #![feature(rwlock_downgrade)]
|
||||
/// use std::sync::{Arc, RwLock, RwLockWriteGuard};
|
||||
///
|
||||
/// // The inner value starts as 0.
|
||||
/// let rw = Arc::new(RwLock::new(0));
|
||||
///
|
||||
/// // Put the lock in write mode.
|
||||
/// let mut main_write_guard = rw.write().unwrap();
|
||||
///
|
||||
/// let evil = rw.clone();
|
||||
/// let handle = std::thread::spawn(move || {
|
||||
/// // This will not return until the main thread drops the `main_read_guard`.
|
||||
/// let mut evil_guard = evil.write().unwrap();
|
||||
///
|
||||
/// assert_eq!(*evil_guard, 1);
|
||||
/// *evil_guard = 2;
|
||||
/// });
|
||||
///
|
||||
/// // After spawning the writer thread, set the inner value to 1.
|
||||
/// *main_write_guard = 1;
|
||||
///
|
||||
/// // Atomically downgrade the write guard into a read guard.
|
||||
/// let main_read_guard = RwLockWriteGuard::downgrade(main_write_guard);
|
||||
///
|
||||
/// // Since `downgrade` is atomic, the writer thread cannot have set the inner value to 2.
|
||||
/// assert_eq!(*main_read_guard, 1, "`downgrade` was not atomic");
|
||||
///
|
||||
/// // Clean up everything now
|
||||
/// drop(main_read_guard);
|
||||
/// handle.join().unwrap();
|
||||
///
|
||||
/// let final_check = rw.read().unwrap();
|
||||
/// assert_eq!(*final_check, 2);
|
||||
/// ```
|
||||
#[unstable(feature = "rwlock_downgrade", issue = "128203")]
|
||||
pub fn downgrade(s: Self) -> RwLockReadGuard<'a, T> {
|
||||
let lock = s.lock;
|
||||
|
||||
// We don't want to call the destructor since that calls `write_unlock`.
|
||||
forget(s);
|
||||
|
||||
// SAFETY: We take ownership of a write guard, so we must already have the `RwLock` in write
|
||||
// mode, satisfying the `downgrade` contract.
|
||||
unsafe { lock.inner.downgrade() };
|
||||
|
||||
// SAFETY: We have just successfully called `downgrade`, so we fulfill the safety contract.
|
||||
unsafe { RwLockReadGuard::new(lock).unwrap_or_else(PoisonError::into_inner) }
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T: ?Sized> MappedRwLockWriteGuard<'a, T> {
|
||||
|
|
|
@ -501,3 +501,108 @@ fn panic_while_mapping_write_unlocked_poison() {
|
|||
|
||||
drop(lock);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_downgrade_basic() {
|
||||
let r = RwLock::new(());
|
||||
|
||||
let write_guard = r.write().unwrap();
|
||||
let _read_guard = RwLockWriteGuard::downgrade(write_guard);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_downgrade_observe() {
|
||||
// Taken from the test `test_rwlock_downgrade` from:
|
||||
// https://github.com/Amanieu/parking_lot/blob/master/src/rwlock.rs
|
||||
|
||||
const W: usize = 20;
|
||||
const N: usize = 100;
|
||||
|
||||
// This test spawns `W` writer threads, where each will increment a counter `N` times, ensuring
|
||||
// that the value they wrote has not changed after downgrading.
|
||||
|
||||
let rw = Arc::new(RwLock::new(0));
|
||||
|
||||
// Spawn the writers that will do `W * N` operations and checks.
|
||||
let handles: Vec<_> = (0..W)
|
||||
.map(|_| {
|
||||
let rw = rw.clone();
|
||||
thread::spawn(move || {
|
||||
for _ in 0..N {
|
||||
// Increment the counter.
|
||||
let mut write_guard = rw.write().unwrap();
|
||||
*write_guard += 1;
|
||||
let cur_val = *write_guard;
|
||||
|
||||
// Downgrade the lock to read mode, where the value protected cannot be modified.
|
||||
let read_guard = RwLockWriteGuard::downgrade(write_guard);
|
||||
assert_eq!(cur_val, *read_guard);
|
||||
}
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
|
||||
for handle in handles {
|
||||
handle.join().unwrap();
|
||||
}
|
||||
|
||||
assert_eq!(*rw.read().unwrap(), W * N);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_downgrade_atomic() {
|
||||
const NEW_VALUE: i32 = -1;
|
||||
|
||||
// This test checks that `downgrade` is atomic, meaning as soon as a write lock has been
|
||||
// downgraded, the lock must be in read mode and no other threads can take the write lock to
|
||||
// modify the protected value.
|
||||
|
||||
// `W` is the number of evil writer threads.
|
||||
const W: usize = 20;
|
||||
let rwlock = Arc::new(RwLock::new(0));
|
||||
|
||||
// Spawns many evil writer threads that will try and write to the locked value before the
|
||||
// initial writer (who has the exclusive lock) can read after it downgrades.
|
||||
// If the `RwLock` behaves correctly, then the initial writer should read the value it wrote
|
||||
// itself as no other thread should be able to mutate the protected value.
|
||||
|
||||
// Put the lock in write mode, causing all future threads trying to access this go to sleep.
|
||||
let mut main_write_guard = rwlock.write().unwrap();
|
||||
|
||||
// Spawn all of the evil writer threads. They will each increment the protected value by 1.
|
||||
let handles: Vec<_> = (0..W)
|
||||
.map(|_| {
|
||||
let rwlock = rwlock.clone();
|
||||
thread::spawn(move || {
|
||||
// Will go to sleep since the main thread initially has the write lock.
|
||||
let mut evil_guard = rwlock.write().unwrap();
|
||||
*evil_guard += 1;
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
|
||||
// Wait for a good amount of time so that evil threads go to sleep.
|
||||
// Note: this is not strictly necessary...
|
||||
let eternity = crate::time::Duration::from_millis(42);
|
||||
thread::sleep(eternity);
|
||||
|
||||
// Once everyone is asleep, set the value to `NEW_VALUE`.
|
||||
*main_write_guard = NEW_VALUE;
|
||||
|
||||
// Atomically downgrade the write guard into a read guard.
|
||||
let main_read_guard = RwLockWriteGuard::downgrade(main_write_guard);
|
||||
|
||||
// If the above is not atomic, then it would be possible for an evil thread to get in front of
|
||||
// this read and change the value to be non-negative.
|
||||
assert_eq!(*main_read_guard, NEW_VALUE, "`downgrade` was not atomic");
|
||||
|
||||
// Drop the main read guard and allow the evil writer threads to start incrementing.
|
||||
drop(main_read_guard);
|
||||
|
||||
for handle in handles {
|
||||
handle.join().unwrap();
|
||||
}
|
||||
|
||||
let final_check = rwlock.read().unwrap();
|
||||
assert_eq!(*final_check, W as i32 + NEW_VALUE);
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ pub struct RwLock {
|
|||
const READ_LOCKED: Primitive = 1;
|
||||
const MASK: Primitive = (1 << 30) - 1;
|
||||
const WRITE_LOCKED: Primitive = MASK;
|
||||
const DOWNGRADE: Primitive = READ_LOCKED.wrapping_sub(WRITE_LOCKED); // READ_LOCKED - WRITE_LOCKED
|
||||
const MAX_READERS: Primitive = MASK - 1;
|
||||
const READERS_WAITING: Primitive = 1 << 30;
|
||||
const WRITERS_WAITING: Primitive = 1 << 31;
|
||||
|
@ -53,6 +54,24 @@ fn is_read_lockable(state: Primitive) -> bool {
|
|||
state & MASK < MAX_READERS && !has_readers_waiting(state) && !has_writers_waiting(state)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn is_read_lockable_after_wakeup(state: Primitive) -> bool {
|
||||
// We make a special case for checking if we can read-lock _after_ a reader thread that went to
|
||||
// sleep has been woken up by a call to `downgrade`.
|
||||
//
|
||||
// `downgrade` will wake up all readers and place the lock in read mode. Thus, there should be
|
||||
// no readers waiting and the lock should be read-locked (not write-locked or unlocked).
|
||||
//
|
||||
// Note that we do not check if any writers are waiting. This is because a call to `downgrade`
|
||||
// implies that the caller wants other readers to read the value protected by the lock. If we
|
||||
// did not allow readers to acquire the lock before writers after a `downgrade`, then only the
|
||||
// original writer would be able to read the value, thus defeating the purpose of `downgrade`.
|
||||
state & MASK < MAX_READERS
|
||||
&& !has_readers_waiting(state)
|
||||
&& !is_write_locked(state)
|
||||
&& !is_unlocked(state)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn has_reached_max_readers(state: Primitive) -> bool {
|
||||
state & MASK == MAX_READERS
|
||||
|
@ -84,6 +103,9 @@ impl RwLock {
|
|||
}
|
||||
}
|
||||
|
||||
/// # Safety
|
||||
///
|
||||
/// The `RwLock` must be read-locked (N readers) in order to call this.
|
||||
#[inline]
|
||||
pub unsafe fn read_unlock(&self) {
|
||||
let state = self.state.fetch_sub(READ_LOCKED, Release) - READ_LOCKED;
|
||||
|
@ -100,11 +122,13 @@ impl RwLock {
|
|||
|
||||
#[cold]
|
||||
fn read_contended(&self) {
|
||||
let mut has_slept = false;
|
||||
let mut state = self.spin_read();
|
||||
|
||||
loop {
|
||||
// If we can lock it, lock it.
|
||||
if is_read_lockable(state) {
|
||||
// If we have just been woken up, first check for a `downgrade` call.
|
||||
// Otherwise, if we can read-lock it, lock it.
|
||||
if (has_slept && is_read_lockable_after_wakeup(state)) || is_read_lockable(state) {
|
||||
match self.state.compare_exchange_weak(state, state + READ_LOCKED, Acquire, Relaxed)
|
||||
{
|
||||
Ok(_) => return, // Locked!
|
||||
|
@ -116,9 +140,7 @@ impl RwLock {
|
|||
}
|
||||
|
||||
// Check for overflow.
|
||||
if has_reached_max_readers(state) {
|
||||
panic!("too many active read locks on RwLock");
|
||||
}
|
||||
assert!(!has_reached_max_readers(state), "too many active read locks on RwLock");
|
||||
|
||||
// Make sure the readers waiting bit is set before we go to sleep.
|
||||
if !has_readers_waiting(state) {
|
||||
|
@ -132,6 +154,7 @@ impl RwLock {
|
|||
|
||||
// Wait for the state to change.
|
||||
futex_wait(&self.state, state | READERS_WAITING, None);
|
||||
has_slept = true;
|
||||
|
||||
// Spin again after waking up.
|
||||
state = self.spin_read();
|
||||
|
@ -152,6 +175,9 @@ impl RwLock {
|
|||
}
|
||||
}
|
||||
|
||||
/// # Safety
|
||||
///
|
||||
/// The `RwLock` must be write-locked (single writer) in order to call this.
|
||||
#[inline]
|
||||
pub unsafe fn write_unlock(&self) {
|
||||
let state = self.state.fetch_sub(WRITE_LOCKED, Release) - WRITE_LOCKED;
|
||||
|
@ -163,6 +189,22 @@ impl RwLock {
|
|||
}
|
||||
}
|
||||
|
||||
/// # Safety
|
||||
///
|
||||
/// The `RwLock` must be write-locked (single writer) in order to call this.
|
||||
#[inline]
|
||||
pub unsafe fn downgrade(&self) {
|
||||
// Removes all write bits and adds a single read bit.
|
||||
let state = self.state.fetch_add(DOWNGRADE, Release);
|
||||
debug_assert!(is_write_locked(state), "RwLock must be write locked to call `downgrade`");
|
||||
|
||||
if has_readers_waiting(state) {
|
||||
// Since we had the exclusive lock, nobody else can unset this bit.
|
||||
self.state.fetch_sub(READERS_WAITING, Relaxed);
|
||||
futex_wake_all(&self.state);
|
||||
}
|
||||
}
|
||||
|
||||
#[cold]
|
||||
fn write_contended(&self) {
|
||||
let mut state = self.spin_write();
|
||||
|
|
|
@ -62,4 +62,9 @@ impl RwLock {
|
|||
pub unsafe fn write_unlock(&self) {
|
||||
assert_eq!(self.mode.replace(0), -1);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub unsafe fn downgrade(&self) {
|
||||
assert_eq!(self.mode.replace(1), -1);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,37 +1,38 @@
|
|||
//! Efficient read-write locking without `pthread_rwlock_t`.
|
||||
//!
|
||||
//! The readers-writer lock provided by the `pthread` library has a number of
|
||||
//! problems which make it a suboptimal choice for `std`:
|
||||
//! The readers-writer lock provided by the `pthread` library has a number of problems which make it
|
||||
//! a suboptimal choice for `std`:
|
||||
//!
|
||||
//! * It is non-movable, so it needs to be allocated (lazily, to make the
|
||||
//! constructor `const`).
|
||||
//! * `pthread` is an external library, meaning the fast path of acquiring an
|
||||
//! uncontended lock cannot be inlined.
|
||||
//! * Some platforms (at least glibc before version 2.25) have buggy implementations
|
||||
//! that can easily lead to undefined behavior in safe Rust code when not properly
|
||||
//! guarded against.
|
||||
//! * It is non-movable, so it needs to be allocated (lazily, to make the constructor `const`).
|
||||
//! * `pthread` is an external library, meaning the fast path of acquiring an uncontended lock
|
||||
//! cannot be inlined.
|
||||
//! * Some platforms (at least glibc before version 2.25) have buggy implementations that can easily
|
||||
//! lead to undefined behaviour in safe Rust code when not properly guarded against.
|
||||
//! * On some platforms (e.g. macOS), the lock is very slow.
|
||||
//!
|
||||
//! Therefore, we implement our own `RwLock`! Naively, one might reach for a
|
||||
//! spinlock, but those [can be quite problematic] when the lock is contended.
|
||||
//! Instead, this readers-writer lock copies its implementation strategy from
|
||||
//! the Windows [SRWLOCK] and the [usync] library. Spinning is still used for the
|
||||
//! fast path, but it is bounded: after spinning fails, threads will locklessly
|
||||
//! add an information structure containing a [`Thread`] handle into a queue of
|
||||
//! waiters associated with the lock. The lock owner, upon releasing the lock,
|
||||
//! will scan through the queue and wake up threads as appropriate, which will
|
||||
//! then again try to acquire the lock. The resulting [`RwLock`] is:
|
||||
//! Therefore, we implement our own [`RwLock`]! Naively, one might reach for a spinlock, but those
|
||||
//! can be quite [problematic] when the lock is contended.
|
||||
//!
|
||||
//! * adaptive, since it spins before doing any heavywheight parking operations
|
||||
//! * allocation-free, modulo the per-thread [`Thread`] handle, which is
|
||||
//! allocated regardless when using threads created by `std`
|
||||
//! Instead, this [`RwLock`] copies its implementation strategy from the Windows [SRWLOCK] and the
|
||||
//! [usync] library implementations.
|
||||
//!
|
||||
//! Spinning is still used for the fast path, but it is bounded: after spinning fails, threads will
|
||||
//! locklessly add an information structure ([`Node`]) containing a [`Thread`] handle into a queue
|
||||
//! of waiters associated with the lock. The lock owner, upon releasing the lock, will scan through
|
||||
//! the queue and wake up threads as appropriate, and the newly-awoken threads will then try to
|
||||
//! acquire the lock themselves.
|
||||
//!
|
||||
//! The resulting [`RwLock`] is:
|
||||
//!
|
||||
//! * adaptive, since it spins before doing any heavyweight parking operations
|
||||
//! * allocation-free, modulo the per-thread [`Thread`] handle, which is allocated anyways when
|
||||
//! using threads created by `std`
|
||||
//! * writer-preferring, even if some readers may still slip through
|
||||
//! * unfair, which reduces context-switching and thus drastically improves
|
||||
//! performance
|
||||
//! * unfair, which reduces context-switching and thus drastically improves performance
|
||||
//!
|
||||
//! and also quite fast in most cases.
|
||||
//!
|
||||
//! [can be quite problematic]: https://matklad.github.io/2020/01/02/spinlocks-considered-harmful.html
|
||||
//! [problematic]: https://matklad.github.io/2020/01/02/spinlocks-considered-harmful.html
|
||||
//! [SRWLOCK]: https://learn.microsoft.com/en-us/windows/win32/sync/slim-reader-writer--srw--locks
|
||||
//! [usync]: https://crates.io/crates/usync
|
||||
//!
|
||||
|
@ -39,33 +40,37 @@
|
|||
//!
|
||||
//! ## State
|
||||
//!
|
||||
//! A single [`AtomicPtr`] is used as state variable. The lowest three bits are used
|
||||
//! to indicate the meaning of the remaining bits:
|
||||
//! A single [`AtomicPtr`] is used as state variable. The lowest four bits are used to indicate the
|
||||
//! meaning of the remaining bits:
|
||||
//!
|
||||
//! | [`LOCKED`] | [`QUEUED`] | [`QUEUE_LOCKED`] | Remaining | |
|
||||
//! |:-----------|:-----------|:-----------------|:-------------|:----------------------------------------------------------------------------------------------------------------------------|
|
||||
//! | 0 | 0 | 0 | 0 | The lock is unlocked, no threads are waiting |
|
||||
//! | 1 | 0 | 0 | 0 | The lock is write-locked, no threads waiting |
|
||||
//! | 1 | 0 | 0 | n > 0 | The lock is read-locked with n readers |
|
||||
//! | 0 | 1 | * | `*mut Node` | The lock is unlocked, but some threads are waiting. Only writers may lock the lock |
|
||||
//! | 1 | 1 | * | `*mut Node` | The lock is locked, but some threads are waiting. If the lock is read-locked, the last queue node contains the reader count |
|
||||
//! | [`LOCKED`] | [`QUEUED`] | [`QUEUE_LOCKED`] | [`DOWNGRADED`] | Remaining | |
|
||||
//! |------------|:-----------|:-----------------|:---------------|:-------------|:----------------------------------------------------------------------------------------------------------------------------|
|
||||
//! | 0 | 0 | 0 | 0 | 0 | The lock is unlocked, no threads are waiting |
|
||||
//! | 1 | 0 | 0 | 0 | 0 | The lock is write-locked, no threads waiting |
|
||||
//! | 1 | 0 | 0 | 0 | n > 0 | The lock is read-locked with n readers |
|
||||
//! | 0 | 1 | * | 0 | `*mut Node` | The lock is unlocked, but some threads are waiting. Only writers may lock the lock |
|
||||
//! | 1 | 1 | * | * | `*mut Node` | The lock is locked, but some threads are waiting. If the lock is read-locked, the last queue node contains the reader count |
|
||||
//!
|
||||
//! ## Waiter queue
|
||||
//! ## Waiter Queue
|
||||
//!
|
||||
//! When threads are waiting on the lock (`QUEUE` is set), the lock state
|
||||
//! points to a queue of waiters, which is implemented as a linked list of
|
||||
//! nodes stored on the stack to avoid memory allocation. To enable lockless
|
||||
//! enqueuing of new nodes to the queue, the linked list is single-linked upon
|
||||
//! creation. Since when the lock is read-locked, the lock count is stored in
|
||||
//! the last link of the queue, threads have to traverse the queue to find the
|
||||
//! last element upon releasing the lock. To avoid having to traverse the whole
|
||||
//! list again and again, a pointer to the found tail is cached in the (current)
|
||||
//! first element of the queue.
|
||||
//! When threads are waiting on the lock (the `QUEUE` bit is set), the lock state points to a queue
|
||||
//! of waiters, which is implemented as a linked list of nodes stored on the stack to avoid memory
|
||||
//! allocation.
|
||||
//!
|
||||
//! Also, while the lock is unfair for performance reasons, it is still best to
|
||||
//! wake the tail node first, which requires backlinks to previous nodes to be
|
||||
//! created. This is done at the same time as finding the tail, and thus a set
|
||||
//! tail field indicates the remaining portion of the queue is initialized.
|
||||
//! To enable lock-free enqueuing of new nodes to the queue, the linked list is singly-linked upon
|
||||
//! creation.
|
||||
//!
|
||||
//! When the lock is read-locked, the lock count (number of readers) is stored in the last link of
|
||||
//! the queue. Threads have to traverse the queue to find the last element upon releasing the lock.
|
||||
//! To avoid having to traverse the entire list every time we want to access the reader count, a
|
||||
//! pointer to the found tail is cached in the (current) first element of the queue.
|
||||
//!
|
||||
//! Also, while the lock is unfair for performance reasons, it is still best to wake the tail node
|
||||
//! first (FIFO ordering). Since we always pop nodes off the tail of the queue, we must store
|
||||
//! backlinks to previous nodes so that we can update the `tail` field of the (current) first
|
||||
//! element of the queue. Adding backlinks is done at the same time as finding the tail (via the
|
||||
//! function [`find_tail_and_add_backlinks`]), and thus encountering a set tail field on a node
|
||||
//! indicates that all following nodes in the queue are initialized.
|
||||
//!
|
||||
//! TLDR: Here's a diagram of what the queue looks like:
|
||||
//!
|
||||
|
@ -89,21 +94,21 @@
|
|||
//! 3. All nodes preceding this node must have a correct, non-null `next` field.
|
||||
//! 4. All nodes following this node must have a correct, non-null `prev` field.
|
||||
//!
|
||||
//! Access to the queue is controlled by the `QUEUE_LOCKED` bit, which threads
|
||||
//! try to set both after enqueuing themselves to eagerly add backlinks to the
|
||||
//! queue, which drastically improves performance, and after unlocking the lock
|
||||
//! to wake the next waiter(s). This is done atomically at the same time as the
|
||||
//! enqueuing/unlocking operation. The thread releasing the `QUEUE_LOCK` bit
|
||||
//! will check the state of the lock and wake up waiters as appropriate. This
|
||||
//! guarantees forward-progress even if the unlocking thread could not acquire
|
||||
//! the queue lock.
|
||||
//! Access to the queue is controlled by the `QUEUE_LOCKED` bit. Threads will try to set this bit
|
||||
//! in two cases: one is when a thread enqueues itself and eagerly adds backlinks to the queue
|
||||
//! (which drastically improves performance), and the other is after a thread unlocks the lock to
|
||||
//! wake up the next waiter(s).
|
||||
//!
|
||||
//! ## Memory orderings
|
||||
//! `QUEUE_LOCKED` is set atomically at the same time as the enqueuing/unlocking operations. The
|
||||
//! thread releasing the `QUEUE_LOCKED` bit will check the state of the lock (in particular, whether
|
||||
//! a downgrade was requested using the [`DOWNGRADED`] bit) and wake up waiters as appropriate. This
|
||||
//! guarantees forward progress even if the unlocking thread could not acquire the queue lock.
|
||||
//!
|
||||
//! To properly synchronize changes to the data protected by the lock, the lock
|
||||
//! is acquired and released with [`Acquire`] and [`Release`] ordering, respectively.
|
||||
//! To propagate the initialization of nodes, changes to the queue lock are also
|
||||
//! performed using these orderings.
|
||||
//! ## Memory Orderings
|
||||
//!
|
||||
//! To properly synchronize changes to the data protected by the lock, the lock is acquired and
|
||||
//! released with [`Acquire`] and [`Release`] ordering, respectively. To propagate the
|
||||
//! initialization of nodes, changes to the queue lock are also performed using these orderings.
|
||||
|
||||
#![forbid(unsafe_op_in_unsafe_fn)]
|
||||
|
||||
|
@ -115,26 +120,30 @@ use crate::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
|
|||
use crate::sync::atomic::{AtomicBool, AtomicPtr};
|
||||
use crate::thread::{self, Thread, ThreadId};
|
||||
|
||||
// Locking uses exponential backoff. `SPIN_COUNT` indicates how many times the
|
||||
// locking operation will be retried.
|
||||
// `spin_loop` will be called `2.pow(SPIN_COUNT) - 1` times.
|
||||
const SPIN_COUNT: usize = 7;
|
||||
|
||||
type State = *mut ();
|
||||
/// The atomic lock state.
|
||||
type AtomicState = AtomicPtr<()>;
|
||||
/// The inner lock state.
|
||||
type State = *mut ();
|
||||
|
||||
const UNLOCKED: State = without_provenance_mut(0);
|
||||
const LOCKED: usize = 1;
|
||||
const QUEUED: usize = 2;
|
||||
const QUEUE_LOCKED: usize = 4;
|
||||
const SINGLE: usize = 8;
|
||||
const MASK: usize = !(QUEUE_LOCKED | QUEUED | LOCKED);
|
||||
const LOCKED: usize = 1 << 0;
|
||||
const QUEUED: usize = 1 << 1;
|
||||
const QUEUE_LOCKED: usize = 1 << 2;
|
||||
const DOWNGRADED: usize = 1 << 3;
|
||||
const SINGLE: usize = 1 << 4;
|
||||
const STATE: usize = DOWNGRADED | QUEUE_LOCKED | QUEUED | LOCKED;
|
||||
const NODE_MASK: usize = !STATE;
|
||||
|
||||
/// Locking uses exponential backoff. `SPIN_COUNT` indicates how many times the locking operation
|
||||
/// will be retried.
|
||||
///
|
||||
/// In other words, `spin_loop` will be called `2.pow(SPIN_COUNT) - 1` times.
|
||||
const SPIN_COUNT: usize = 7;
|
||||
|
||||
/// Marks the state as write-locked, if possible.
|
||||
#[inline]
|
||||
fn write_lock(state: State) -> Option<State> {
|
||||
let state = state.wrapping_byte_add(LOCKED);
|
||||
if state.addr() & LOCKED == LOCKED { Some(state) } else { None }
|
||||
if state.addr() & LOCKED == 0 { Some(state.map_addr(|addr| addr | LOCKED)) } else { None }
|
||||
}
|
||||
|
||||
/// Marks the state as read-locked, if possible.
|
||||
|
@ -147,13 +156,32 @@ fn read_lock(state: State) -> Option<State> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Masks the state, assuming it points to a queue node.
|
||||
/// Converts a `State` into a `Node` by masking out the bottom bits of the state, assuming that the
|
||||
/// state points to a queue node.
|
||||
///
|
||||
/// # Safety
|
||||
///
|
||||
/// The state must contain a valid pointer to a queue node.
|
||||
#[inline]
|
||||
unsafe fn to_node(state: State) -> NonNull<Node> {
|
||||
unsafe { NonNull::new_unchecked(state.mask(MASK)).cast() }
|
||||
unsafe { NonNull::new_unchecked(state.mask(NODE_MASK)).cast() }
|
||||
}
|
||||
|
||||
/// The representation of a thread waiting on the lock queue.
|
||||
///
|
||||
/// We initialize these `Node`s on thread execution stacks to avoid allocation.
|
||||
///
|
||||
/// Note that we need an alignment of 16 to ensure that the last 4 bits of any
|
||||
/// pointers to `Node`s are always zeroed (for the bit flags described in the
|
||||
/// module-level documentation).
|
||||
#[repr(align(16))]
|
||||
struct Node {
|
||||
next: AtomicLink,
|
||||
prev: AtomicLink,
|
||||
tail: AtomicLink,
|
||||
write: bool,
|
||||
thread: OnceCell<Thread>,
|
||||
completed: AtomicBool,
|
||||
}
|
||||
|
||||
/// An atomic node pointer with relaxed operations.
|
||||
|
@ -173,16 +201,6 @@ impl AtomicLink {
|
|||
}
|
||||
}
|
||||
|
||||
#[repr(align(8))]
|
||||
struct Node {
|
||||
next: AtomicLink,
|
||||
prev: AtomicLink,
|
||||
tail: AtomicLink,
|
||||
write: bool,
|
||||
thread: OnceCell<Thread>,
|
||||
completed: AtomicBool,
|
||||
}
|
||||
|
||||
impl Node {
|
||||
/// Creates a new queue node.
|
||||
fn new(write: bool) -> Node {
|
||||
|
@ -198,17 +216,17 @@ impl Node {
|
|||
|
||||
/// Prepare this node for waiting.
|
||||
fn prepare(&mut self) {
|
||||
// Fall back to creating an unnamed `Thread` handle to allow locking in
|
||||
// TLS destructors.
|
||||
// Fall back to creating an unnamed `Thread` handle to allow locking in TLS destructors.
|
||||
self.thread.get_or_init(|| {
|
||||
thread::try_current().unwrap_or_else(|| Thread::new_unnamed(ThreadId::new()))
|
||||
});
|
||||
self.completed = AtomicBool::new(false);
|
||||
}
|
||||
|
||||
/// Wait until this node is marked as completed.
|
||||
/// Wait until this node is marked as [`complete`](Node::complete)d by another thread.
|
||||
///
|
||||
/// # Safety
|
||||
///
|
||||
/// May only be called from the thread that created the node.
|
||||
unsafe fn wait(&self) {
|
||||
while !self.completed.load(Acquire) {
|
||||
|
@ -218,51 +236,48 @@ impl Node {
|
|||
}
|
||||
}
|
||||
|
||||
/// Atomically mark this node as completed. The node may not outlive this call.
|
||||
unsafe fn complete(this: NonNull<Node>) {
|
||||
// Since the node may be destroyed immediately after the completed flag
|
||||
// is set, clone the thread handle before that.
|
||||
let thread = unsafe { this.as_ref().thread.get().unwrap().clone() };
|
||||
/// Atomically mark this node as completed.
|
||||
///
|
||||
/// # Safety
|
||||
///
|
||||
/// `node` must point to a valid `Node`, and the node may not outlive this call.
|
||||
unsafe fn complete(node: NonNull<Node>) {
|
||||
// Since the node may be destroyed immediately after the completed flag is set, clone the
|
||||
// thread handle before that.
|
||||
let thread = unsafe { node.as_ref().thread.get().unwrap().clone() };
|
||||
unsafe {
|
||||
this.as_ref().completed.store(true, Release);
|
||||
node.as_ref().completed.store(true, Release);
|
||||
}
|
||||
thread.unpark();
|
||||
}
|
||||
}
|
||||
|
||||
struct PanicGuard;
|
||||
|
||||
impl Drop for PanicGuard {
|
||||
fn drop(&mut self) {
|
||||
rtabort!("tried to drop node in intrusive list.");
|
||||
}
|
||||
}
|
||||
|
||||
/// Add backlinks to the queue, returning the tail.
|
||||
/// Traverse the queue and find the tail, adding backlinks to the queue while traversing.
|
||||
///
|
||||
/// May be called from multiple threads at the same time, while the queue is not
|
||||
/// This may be called from multiple threads at the same time as long as the queue is not being
|
||||
/// modified (this happens when unlocking multiple readers).
|
||||
///
|
||||
/// # Safety
|
||||
///
|
||||
/// * `head` must point to a node in a valid queue.
|
||||
/// * `head` must be or be in front of the head of the queue at the time of the
|
||||
/// last removal.
|
||||
/// * The part of the queue starting with `head` must not be modified during this
|
||||
/// call.
|
||||
unsafe fn add_backlinks_and_find_tail(head: NonNull<Node>) -> NonNull<Node> {
|
||||
/// * `head` must be in front of the previous head node that was used to perform the last removal.
|
||||
/// * The part of the queue starting with `head` must not be modified during this call.
|
||||
unsafe fn find_tail_and_add_backlinks(head: NonNull<Node>) -> NonNull<Node> {
|
||||
let mut current = head;
|
||||
|
||||
// Traverse the queue until we find a node that has a set `tail`.
|
||||
let tail = loop {
|
||||
let c = unsafe { current.as_ref() };
|
||||
match c.tail.get() {
|
||||
Some(tail) => break tail,
|
||||
// SAFETY:
|
||||
// All `next` fields before the first node with a `set` tail are
|
||||
// non-null and valid (invariant 3).
|
||||
None => unsafe {
|
||||
let next = c.next.get().unwrap_unchecked();
|
||||
next.as_ref().prev.set(Some(current));
|
||||
current = next;
|
||||
},
|
||||
if let Some(tail) = c.tail.get() {
|
||||
break tail;
|
||||
}
|
||||
|
||||
// SAFETY: All `next` fields before the first node with a set `tail` are non-null and valid
|
||||
// (by Invariant 3).
|
||||
unsafe {
|
||||
let next = c.next.get().unwrap_unchecked();
|
||||
next.as_ref().prev.set(Some(current));
|
||||
current = next;
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -272,6 +287,38 @@ unsafe fn add_backlinks_and_find_tail(head: NonNull<Node>) -> NonNull<Node> {
|
|||
}
|
||||
}
|
||||
|
||||
/// [`complete`](Node::complete)s all threads in the queue ending with `tail`.
|
||||
///
|
||||
/// # Safety
|
||||
///
|
||||
/// * `tail` must be a valid tail of a fully linked queue.
|
||||
/// * The current thread must have exclusive access to that queue.
|
||||
unsafe fn complete_all(tail: NonNull<Node>) {
|
||||
let mut current = tail;
|
||||
|
||||
// Traverse backwards through the queue (FIFO) and `complete` all of the nodes.
|
||||
loop {
|
||||
let prev = unsafe { current.as_ref().prev.get() };
|
||||
unsafe {
|
||||
Node::complete(current);
|
||||
}
|
||||
match prev {
|
||||
Some(prev) => current = prev,
|
||||
None => return,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A type to guard against the unwinds of stacks that nodes are located on due to panics.
|
||||
struct PanicGuard;
|
||||
|
||||
impl Drop for PanicGuard {
|
||||
fn drop(&mut self) {
|
||||
rtabort!("tried to drop node in intrusive list.");
|
||||
}
|
||||
}
|
||||
|
||||
/// The public inner `RwLock` type.
|
||||
pub struct RwLock {
|
||||
state: AtomicState,
|
||||
}
|
||||
|
@ -296,11 +343,10 @@ impl RwLock {
|
|||
|
||||
#[inline]
|
||||
pub fn try_write(&self) -> bool {
|
||||
// Atomically set the `LOCKED` bit. This is lowered to a single atomic
|
||||
// instruction on most modern processors (e.g. "lock bts" on x86 and
|
||||
// "ldseta" on modern AArch64), and therefore is more efficient than
|
||||
// `fetch_update(lock(true))`, which can spuriously fail if a new node
|
||||
// is appended to the queue.
|
||||
// Atomically set the `LOCKED` bit. This is lowered to a single atomic instruction on most
|
||||
// modern processors (e.g. "lock bts" on x86 and "ldseta" on modern AArch64), and therefore
|
||||
// is more efficient than `fetch_update(lock(true))`, which can spuriously fail if a new
|
||||
// node is appended to the queue.
|
||||
self.state.fetch_or(LOCKED, Acquire).addr() & LOCKED == 0
|
||||
}
|
||||
|
||||
|
@ -313,88 +359,97 @@ impl RwLock {
|
|||
|
||||
#[cold]
|
||||
fn lock_contended(&self, write: bool) {
|
||||
let update = if write { write_lock } else { read_lock };
|
||||
let mut node = Node::new(write);
|
||||
let mut state = self.state.load(Relaxed);
|
||||
let mut count = 0;
|
||||
let update_fn = if write { write_lock } else { read_lock };
|
||||
|
||||
loop {
|
||||
if let Some(next) = update(state) {
|
||||
// Optimistically update the state.
|
||||
if let Some(next) = update_fn(state) {
|
||||
// The lock is available, try locking it.
|
||||
match self.state.compare_exchange_weak(state, next, Acquire, Relaxed) {
|
||||
Ok(_) => return,
|
||||
Err(new) => state = new,
|
||||
}
|
||||
continue;
|
||||
} else if state.addr() & QUEUED == 0 && count < SPIN_COUNT {
|
||||
// If the lock is not available and no threads are queued, spin
|
||||
// for a while, using exponential backoff to decrease cache
|
||||
// contention.
|
||||
// If the lock is not available and no threads are queued, optimistically spin for a
|
||||
// while, using exponential backoff to decrease cache contention.
|
||||
for _ in 0..(1 << count) {
|
||||
spin_loop();
|
||||
}
|
||||
state = self.state.load(Relaxed);
|
||||
count += 1;
|
||||
} else {
|
||||
// Fall back to parking. First, prepare the node.
|
||||
node.prepare();
|
||||
|
||||
// If there are threads queued, set the `next` field to a
|
||||
// pointer to the next node in the queue. Otherwise set it to
|
||||
// the lock count if the state is read-locked or to zero if it
|
||||
// is write-locked.
|
||||
node.next.0 = AtomicPtr::new(state.mask(MASK).cast());
|
||||
node.prev = AtomicLink::new(None);
|
||||
let mut next = ptr::from_ref(&node)
|
||||
.map_addr(|addr| addr | QUEUED | (state.addr() & LOCKED))
|
||||
as State;
|
||||
|
||||
if state.addr() & QUEUED == 0 {
|
||||
// If this is the first node in the queue, set the tail field to
|
||||
// the node itself to ensure there is a current `tail` field in
|
||||
// the queue (invariants 1 and 2). This needs to use `set` to
|
||||
// avoid invalidating the new pointer.
|
||||
node.tail.set(Some(NonNull::from(&node)));
|
||||
} else {
|
||||
// Otherwise, the tail of the queue is not known.
|
||||
node.tail.set(None);
|
||||
// Try locking the queue to eagerly add backlinks.
|
||||
next = next.map_addr(|addr| addr | QUEUE_LOCKED);
|
||||
}
|
||||
|
||||
// Register the node, using release ordering to propagate our
|
||||
// changes to the waking thread.
|
||||
if let Err(new) = self.state.compare_exchange_weak(state, next, AcqRel, Relaxed) {
|
||||
// The state has changed, just try again.
|
||||
state = new;
|
||||
continue;
|
||||
}
|
||||
|
||||
// The node is registered, so the structure must not be
|
||||
// mutably accessed or destroyed while other threads may
|
||||
// be accessing it. Guard against unwinds using a panic
|
||||
// guard that aborts when dropped.
|
||||
let guard = PanicGuard;
|
||||
|
||||
// If the current thread locked the queue, unlock it again,
|
||||
// linking it in the process.
|
||||
if state.addr() & (QUEUE_LOCKED | QUEUED) == QUEUED {
|
||||
unsafe {
|
||||
self.unlock_queue(next);
|
||||
}
|
||||
}
|
||||
|
||||
// Wait until the node is removed from the queue.
|
||||
// SAFETY: the node was created by the current thread.
|
||||
unsafe {
|
||||
node.wait();
|
||||
}
|
||||
|
||||
// The node was removed from the queue, disarm the guard.
|
||||
mem::forget(guard);
|
||||
|
||||
// Reload the state and try again.
|
||||
state = self.state.load(Relaxed);
|
||||
count = 0;
|
||||
continue;
|
||||
}
|
||||
// The optimistic paths did not succeed, so fall back to parking the thread.
|
||||
|
||||
// First, prepare the node.
|
||||
node.prepare();
|
||||
|
||||
// If there are threads queued, this will set the `next` field to be a pointer to the
|
||||
// first node in the queue.
|
||||
// If the state is read-locked, this will set `next` to the lock count.
|
||||
// If it is write-locked, it will set `next` to zero.
|
||||
node.next.0 = AtomicPtr::new(state.mask(NODE_MASK).cast());
|
||||
node.prev = AtomicLink::new(None);
|
||||
|
||||
// Set the `QUEUED` bit and preserve the `LOCKED` and `DOWNGRADED` bit.
|
||||
let mut next = ptr::from_ref(&node)
|
||||
.map_addr(|addr| addr | QUEUED | (state.addr() & (DOWNGRADED | LOCKED)))
|
||||
as State;
|
||||
|
||||
let mut is_queue_locked = false;
|
||||
if state.addr() & QUEUED == 0 {
|
||||
// If this is the first node in the queue, set the `tail` field to the node itself
|
||||
// to ensure there is a valid `tail` field in the queue (Invariants 1 & 2).
|
||||
// This needs to use `set` to avoid invalidating the new pointer.
|
||||
node.tail.set(Some(NonNull::from(&node)));
|
||||
} else {
|
||||
// Otherwise, the tail of the queue is not known.
|
||||
node.tail.set(None);
|
||||
|
||||
// Try locking the queue to eagerly add backlinks.
|
||||
next = next.map_addr(|addr| addr | QUEUE_LOCKED);
|
||||
|
||||
// Track if we changed the `QUEUE_LOCKED` bit from off to on.
|
||||
is_queue_locked = state.addr() & QUEUE_LOCKED == 0;
|
||||
}
|
||||
|
||||
// Register the node, using release ordering to propagate our changes to the waking
|
||||
// thread.
|
||||
if let Err(new) = self.state.compare_exchange_weak(state, next, AcqRel, Relaxed) {
|
||||
// The state has changed, just try again.
|
||||
state = new;
|
||||
continue;
|
||||
}
|
||||
// The node has been registered, so the structure must not be mutably accessed or
|
||||
// destroyed while other threads may be accessing it.
|
||||
|
||||
// Guard against unwinds using a `PanicGuard` that aborts when dropped.
|
||||
let guard = PanicGuard;
|
||||
|
||||
// If the current thread locked the queue, unlock it to eagerly adding backlinks.
|
||||
if is_queue_locked {
|
||||
// SAFETY: This thread set the `QUEUE_LOCKED` bit above.
|
||||
unsafe {
|
||||
self.unlock_queue(next);
|
||||
}
|
||||
}
|
||||
|
||||
// Wait until the node is removed from the queue.
|
||||
// SAFETY: the node was created by the current thread.
|
||||
unsafe {
|
||||
node.wait();
|
||||
}
|
||||
|
||||
// The node was removed from the queue, disarm the guard.
|
||||
mem::forget(guard);
|
||||
|
||||
// Reload the state and try again.
|
||||
state = self.state.load(Relaxed);
|
||||
count = 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -402,39 +457,51 @@ impl RwLock {
|
|||
pub unsafe fn read_unlock(&self) {
|
||||
match self.state.fetch_update(Release, Acquire, |state| {
|
||||
if state.addr() & QUEUED == 0 {
|
||||
// If there are no threads queued, simply decrement the reader count.
|
||||
let count = state.addr() - (SINGLE | LOCKED);
|
||||
Some(if count > 0 { without_provenance_mut(count | LOCKED) } else { UNLOCKED })
|
||||
} else if state.addr() & DOWNGRADED != 0 {
|
||||
// This thread used to have exclusive access, but requested a downgrade. This has
|
||||
// not been completed yet, so we still have exclusive access.
|
||||
// Retract the downgrade request and unlock, but leave waking up new threads to the
|
||||
// thread that already holds the queue lock.
|
||||
Some(state.mask(!(DOWNGRADED | LOCKED)))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}) {
|
||||
Ok(_) => {}
|
||||
// There are waiters queued and the lock count was moved to the
|
||||
// tail of the queue.
|
||||
// There are waiters queued and the lock count was moved to the tail of the queue.
|
||||
Err(state) => unsafe { self.read_unlock_contended(state) },
|
||||
}
|
||||
}
|
||||
|
||||
/// # Safety
|
||||
///
|
||||
/// * There must be threads queued on the lock.
|
||||
/// * `state` must be a pointer to a node in a valid queue.
|
||||
/// * There cannot be a `downgrade` in progress.
|
||||
#[cold]
|
||||
unsafe fn read_unlock_contended(&self, state: State) {
|
||||
// The state was observed with acquire ordering above, so the current
|
||||
// thread will observe all node initializations.
|
||||
|
||||
// SAFETY:
|
||||
// Because new read-locks cannot be acquired while threads are queued,
|
||||
// all queue-lock owners will observe the set `LOCKED` bit. Because they
|
||||
// do not modify the queue while there is a lock owner, the queue will
|
||||
// not be removed from here.
|
||||
let tail = unsafe { add_backlinks_and_find_tail(to_node(state)).as_ref() };
|
||||
// The state was observed with acquire ordering above, so the current thread will have
|
||||
// observed all node initializations.
|
||||
// We also know that no threads can be modifying the queue starting at `state`: because new
|
||||
// read-locks cannot be acquired while there are any threads queued on the lock, all
|
||||
// queue-lock owners will observe a set `LOCKED` bit in `self.state` and will not modify
|
||||
// the queue. The other case that a thread could modify the queue is if a downgrade is in
|
||||
// progress (removal of the entire queue), but since that is part of this function's safety
|
||||
// contract, we can guarantee that no other threads can modify the queue.
|
||||
let tail = unsafe { find_tail_and_add_backlinks(to_node(state)).as_ref() };
|
||||
|
||||
// The lock count is stored in the `next` field of `tail`.
|
||||
// Decrement it, making sure to observe all changes made to the queue
|
||||
// by the other lock owners by using acquire-release ordering.
|
||||
// Decrement it, making sure to observe all changes made to the queue by the other lock
|
||||
// owners by using acquire-release ordering.
|
||||
let was_last = tail.next.0.fetch_byte_sub(SINGLE, AcqRel).addr() - SINGLE == 0;
|
||||
if was_last {
|
||||
// SAFETY:
|
||||
// Other threads cannot read-lock while threads are queued. Also,
|
||||
// the `LOCKED` bit is still set, so there are no writers. Therefore,
|
||||
// the current thread exclusively owns the lock.
|
||||
// SAFETY: Other threads cannot read-lock while threads are queued. Also, the `LOCKED`
|
||||
// bit is still set, so there are no writers. Thus the current thread exclusively owns
|
||||
// this lock, even though it is a reader.
|
||||
unsafe { self.unlock_contended(state) }
|
||||
}
|
||||
}
|
||||
|
@ -444,49 +511,143 @@ impl RwLock {
|
|||
if let Err(state) =
|
||||
self.state.compare_exchange(without_provenance_mut(LOCKED), UNLOCKED, Release, Relaxed)
|
||||
{
|
||||
// SAFETY:
|
||||
// Since other threads cannot acquire the lock, the state can only
|
||||
// have changed because there are threads queued on the lock.
|
||||
// SAFETY: Since other threads cannot acquire the lock, the state can only have changed
|
||||
// because there are threads queued on the lock.
|
||||
unsafe { self.unlock_contended(state) }
|
||||
}
|
||||
}
|
||||
|
||||
/// # Safety
|
||||
///
|
||||
/// * The lock must be exclusively owned by this thread.
|
||||
/// * There must be threads queued on the lock.
|
||||
/// * `state` must be a pointer to a node in a valid queue.
|
||||
/// * There cannot be a `downgrade` in progress.
|
||||
#[cold]
|
||||
unsafe fn unlock_contended(&self, mut state: State) {
|
||||
unsafe fn unlock_contended(&self, state: State) {
|
||||
debug_assert_eq!(state.addr() & (DOWNGRADED | QUEUED | LOCKED), QUEUED | LOCKED);
|
||||
|
||||
let mut current = state;
|
||||
|
||||
// We want to atomically release the lock and try to acquire the queue lock.
|
||||
loop {
|
||||
// First check if the queue lock is already held.
|
||||
if current.addr() & QUEUE_LOCKED != 0 {
|
||||
// Another thread holds the queue lock, so let them wake up waiters for us.
|
||||
let next = current.mask(!LOCKED);
|
||||
match self.state.compare_exchange_weak(current, next, Release, Relaxed) {
|
||||
Ok(_) => return,
|
||||
Err(new) => {
|
||||
current = new;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Atomically release the lock and try to acquire the queue lock.
|
||||
let next = state.map_addr(|a| (a & !LOCKED) | QUEUE_LOCKED);
|
||||
match self.state.compare_exchange_weak(state, next, AcqRel, Relaxed) {
|
||||
// The queue lock was acquired. Release it, waking up the next
|
||||
// waiter in the process.
|
||||
Ok(_) if state.addr() & QUEUE_LOCKED == 0 => unsafe {
|
||||
return self.unlock_queue(next);
|
||||
},
|
||||
// Another thread already holds the queue lock, leave waking up
|
||||
// waiters to it.
|
||||
Ok(_) => return,
|
||||
Err(new) => state = new,
|
||||
let next = current.map_addr(|addr| (addr & !LOCKED) | QUEUE_LOCKED);
|
||||
match self.state.compare_exchange_weak(current, next, AcqRel, Relaxed) {
|
||||
// Now that we have the queue lock, we can wake up the next waiter.
|
||||
Ok(_) => {
|
||||
// SAFETY: This thread just acquired the queue lock, and this function's safety
|
||||
// contract requires that there are threads already queued on the lock.
|
||||
unsafe { self.unlock_queue(next) };
|
||||
return;
|
||||
}
|
||||
Err(new) => current = new,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Unlocks the queue. If the lock is unlocked, wakes up the next eligible
|
||||
/// thread(s).
|
||||
/// # Safety
|
||||
///
|
||||
/// * The lock must be write-locked by this thread.
|
||||
#[inline]
|
||||
pub unsafe fn downgrade(&self) {
|
||||
// Optimistically change the state from write-locked with a single writer and no waiters to
|
||||
// read-locked with a single reader and no waiters.
|
||||
if let Err(state) = self.state.compare_exchange(
|
||||
without_provenance_mut(LOCKED),
|
||||
without_provenance_mut(SINGLE | LOCKED),
|
||||
Release,
|
||||
Relaxed,
|
||||
) {
|
||||
// SAFETY: The only way the state can have changed is if there are threads queued.
|
||||
// Wake all of them up.
|
||||
unsafe { self.downgrade_slow(state) }
|
||||
}
|
||||
}
|
||||
|
||||
/// Downgrades the lock from write-locked to read-locked in the case that there are threads
|
||||
/// waiting on the wait queue.
|
||||
///
|
||||
/// This function will either wake up all of the waiters on the wait queue or designate the
|
||||
/// current holder of the queue lock to wake up all of the waiters instead. Once the waiters
|
||||
/// wake up, they will continue in the execution loop of `lock_contended`.
|
||||
///
|
||||
/// # Safety
|
||||
/// The queue lock must be held by the current thread.
|
||||
///
|
||||
/// * The lock must be write-locked by this thread.
|
||||
/// * `state` must be a pointer to a node in a valid queue.
|
||||
/// * There must be threads queued on the lock.
|
||||
#[cold]
|
||||
unsafe fn downgrade_slow(&self, mut state: State) {
|
||||
debug_assert_eq!(state.addr() & (DOWNGRADED | QUEUED | LOCKED), QUEUED | LOCKED);
|
||||
|
||||
// Attempt to wake up all waiters by taking ownership of the entire waiter queue.
|
||||
loop {
|
||||
if state.addr() & QUEUE_LOCKED != 0 {
|
||||
// Another thread already holds the queue lock. Tell it to wake up all waiters.
|
||||
// If the other thread succeeds in waking up waiters before we release our lock, the
|
||||
// effect will be just the same as if we had changed the state below.
|
||||
// Otherwise, the `DOWNGRADED` bit will still be set, meaning that when this thread
|
||||
// calls `read_unlock` later (because it holds a read lock and must unlock
|
||||
// eventually), it will realize that the lock is still exclusively locked and act
|
||||
// accordingly.
|
||||
let next = state.map_addr(|addr| addr | DOWNGRADED);
|
||||
match self.state.compare_exchange_weak(state, next, Release, Relaxed) {
|
||||
Ok(_) => return,
|
||||
Err(new) => state = new,
|
||||
}
|
||||
} else {
|
||||
// Grab the entire queue by swapping the `state` with a single reader.
|
||||
let next = ptr::without_provenance_mut(SINGLE | LOCKED);
|
||||
if let Err(new) = self.state.compare_exchange_weak(state, next, AcqRel, Relaxed) {
|
||||
state = new;
|
||||
continue;
|
||||
}
|
||||
|
||||
// SAFETY: We have full ownership of this queue now, so nobody else can modify it.
|
||||
let tail = unsafe { find_tail_and_add_backlinks(to_node(state)) };
|
||||
|
||||
// Wake up all waiters.
|
||||
// SAFETY: `tail` was just computed, meaning the whole queue is linked, and we have
|
||||
// full ownership of the queue, so we have exclusive access.
|
||||
unsafe { complete_all(tail) };
|
||||
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Unlocks the queue. Wakes up all threads if a downgrade was requested, otherwise wakes up the
|
||||
/// next eligible thread(s) if the lock is unlocked.
|
||||
///
|
||||
/// # Safety
|
||||
///
|
||||
/// * The queue lock must be held by the current thread.
|
||||
/// * `state` must be a pointer to a node in a valid queue.
|
||||
/// * There must be threads queued on the lock.
|
||||
unsafe fn unlock_queue(&self, mut state: State) {
|
||||
debug_assert_eq!(state.addr() & (QUEUED | QUEUE_LOCKED), QUEUED | QUEUE_LOCKED);
|
||||
|
||||
loop {
|
||||
let tail = unsafe { add_backlinks_and_find_tail(to_node(state)) };
|
||||
// SAFETY: Since we have the queue lock, nobody else can be modifying the queue.
|
||||
let tail = unsafe { find_tail_and_add_backlinks(to_node(state)) };
|
||||
|
||||
if state.addr() & LOCKED == LOCKED {
|
||||
// Another thread has locked the lock. Leave waking up waiters
|
||||
// to them by releasing the queue lock.
|
||||
if state.addr() & (DOWNGRADED | LOCKED) == LOCKED {
|
||||
// Another thread has locked the lock and no downgrade was requested.
|
||||
// Leave waking up waiters to them by releasing the queue lock.
|
||||
match self.state.compare_exchange_weak(
|
||||
state,
|
||||
state.mask(!QUEUE_LOCKED),
|
||||
|
@ -501,53 +662,63 @@ impl RwLock {
|
|||
}
|
||||
}
|
||||
|
||||
let is_writer = unsafe { tail.as_ref().write };
|
||||
if is_writer && let Some(prev) = unsafe { tail.as_ref().prev.get() } {
|
||||
// `tail` is a writer and there is a node before `tail`.
|
||||
// Split off `tail`.
|
||||
// Since we hold the queue lock and downgrades cannot be requested if the lock is
|
||||
// already read-locked, we have exclusive control over the queue here and can make
|
||||
// modifications.
|
||||
|
||||
// There are no set `tail` links before the node pointed to by
|
||||
// `state`, so the first non-null tail field will be current
|
||||
// (invariant 2). Invariant 4 is fullfilled since `find_tail`
|
||||
// was called on this node, which ensures all backlinks are set.
|
||||
let downgrade = state.addr() & DOWNGRADED != 0;
|
||||
let is_writer = unsafe { tail.as_ref().write };
|
||||
if !downgrade
|
||||
&& is_writer
|
||||
&& let Some(prev) = unsafe { tail.as_ref().prev.get() }
|
||||
{
|
||||
// If we are not downgrading and the next thread is a writer, only wake up that
|
||||
// writing thread.
|
||||
|
||||
// Split off `tail`.
|
||||
// There are no set `tail` links before the node pointed to by `state`, so the first
|
||||
// non-null tail field will be current (Invariant 2).
|
||||
// We also fulfill Invariant 4 since `find_tail` was called on this node, which
|
||||
// ensures all backlinks are set.
|
||||
unsafe {
|
||||
to_node(state).as_ref().tail.set(Some(prev));
|
||||
}
|
||||
|
||||
// Release the queue lock. Doing this by subtraction is more
|
||||
// efficient on modern processors since it is a single instruction
|
||||
// instead of an update loop, which will fail if new threads are
|
||||
// added to the list.
|
||||
self.state.fetch_byte_sub(QUEUE_LOCKED, Release);
|
||||
|
||||
// The tail was split off and the lock released. Mark the node as
|
||||
// completed.
|
||||
unsafe {
|
||||
return Node::complete(tail);
|
||||
}
|
||||
} else {
|
||||
// The next waiter is a reader or the queue only consists of one
|
||||
// waiter. Just wake all threads.
|
||||
|
||||
// The lock cannot be locked (checked above), so mark it as
|
||||
// unlocked to reset the queue.
|
||||
if let Err(new) =
|
||||
self.state.compare_exchange_weak(state, UNLOCKED, Release, Acquire)
|
||||
{
|
||||
// Try to release the queue lock. We need to check the state again since another
|
||||
// thread might have acquired the lock and requested a downgrade.
|
||||
let next = state.mask(!QUEUE_LOCKED);
|
||||
if let Err(new) = self.state.compare_exchange_weak(state, next, Release, Acquire) {
|
||||
// Undo the tail modification above, so that we can find the tail again above.
|
||||
// As mentioned above, we have exclusive control over the queue, so no other
|
||||
// thread could have noticed the change.
|
||||
unsafe {
|
||||
to_node(state).as_ref().tail.set(Some(tail));
|
||||
}
|
||||
state = new;
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut current = tail;
|
||||
loop {
|
||||
let prev = unsafe { current.as_ref().prev.get() };
|
||||
unsafe {
|
||||
Node::complete(current);
|
||||
}
|
||||
match prev {
|
||||
Some(prev) => current = prev,
|
||||
None => return,
|
||||
}
|
||||
// The tail was split off and the lock was released. Mark the node as completed.
|
||||
unsafe {
|
||||
return Node::complete(tail);
|
||||
}
|
||||
} else {
|
||||
// We are either downgrading, the next waiter is a reader, or the queue only
|
||||
// consists of one waiter. In any case, just wake all threads.
|
||||
|
||||
// Clear the queue.
|
||||
let next =
|
||||
if downgrade { ptr::without_provenance_mut(SINGLE | LOCKED) } else { UNLOCKED };
|
||||
if let Err(new) = self.state.compare_exchange_weak(state, next, Release, Acquire) {
|
||||
state = new;
|
||||
continue;
|
||||
}
|
||||
|
||||
// SAFETY: we computed `tail` above, and no new nodes can have been added since
|
||||
// (otherwise the CAS above would have failed).
|
||||
// Thus we have complete control over the whole queue.
|
||||
unsafe {
|
||||
return complete_all(tail);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -79,6 +79,12 @@ impl RwLock {
|
|||
let rwl = self.raw();
|
||||
expect_success_aborting(unsafe { abi::rwl_unl_rwl(rwl) }, &"rwl_unl_rwl");
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub unsafe fn downgrade(&self) {
|
||||
// The SOLID platform does not support the `downgrade` operation for reader writer locks, so
|
||||
// this function is simply a no-op as only 1 reader can read: the original writer.
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for RwLock {
|
||||
|
|
|
@ -41,4 +41,10 @@ impl RwLock {
|
|||
pub unsafe fn write_unlock(&self) {
|
||||
unsafe { self.inner.unlock() };
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub unsafe fn downgrade(&self) {
|
||||
// Since there is no difference between read-locked and write-locked on this platform, this
|
||||
// function is simply a no-op as only 1 reader can read: the original writer.
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue