75fa9f6dec
This commit is extracted from #122036 and adds a new directive to the `compiletest` test runner, `//@ needs-threads`. This is intended to capture the need that a target must implement threading to execute a specific test, typically one that uses `std::thread`. This is primarily done for WebAssembly targets which currently do not have threads by default. This enables transitioning a lot of `//@ ignore-wasm*`-style ignores into a more self-documenting `//@ needs-threads` directive. Additionally the `wasm32-wasi-preview1-threads` target, for example, does actually have threads, but isn't tested in CI at this time. This change enables running these tests for that target, but not other wasm targets.
200 lines
4.7 KiB
Rust
200 lines
4.7 KiB
Rust
//@ run-pass
|
|
//@ compile-flags:--test
|
|
//@ needs-threads
|
|
|
|
use std::sync::mpsc::channel;
|
|
use std::sync::mpsc::TryRecvError;
|
|
use std::sync::mpsc::RecvError;
|
|
use std::sync::mpsc::RecvTimeoutError;
|
|
use std::sync::Arc;
|
|
use std::sync::atomic::AtomicUsize;
|
|
use std::sync::atomic::Ordering;
|
|
|
|
use std::thread;
|
|
use std::time::Duration;
|
|
|
|
|
|
/// Simple thread synchronization utility
|
|
struct Barrier {
|
|
// Not using mutex/condvar for precision
|
|
shared: Arc<AtomicUsize>,
|
|
count: usize,
|
|
}
|
|
|
|
impl Barrier {
|
|
fn new(count: usize) -> Vec<Barrier> {
|
|
let shared = Arc::new(AtomicUsize::new(0));
|
|
(0..count).map(|_| Barrier { shared: shared.clone(), count: count }).collect()
|
|
}
|
|
|
|
fn new2() -> (Barrier, Barrier) {
|
|
let mut v = Barrier::new(2);
|
|
(v.pop().unwrap(), v.pop().unwrap())
|
|
}
|
|
|
|
/// Returns when `count` threads enter `wait`
|
|
fn wait(self) {
|
|
self.shared.fetch_add(1, Ordering::SeqCst);
|
|
while self.shared.load(Ordering::SeqCst) != self.count {
|
|
#[cfg(target_env = "sgx")]
|
|
thread::yield_now();
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
fn shared_close_sender_does_not_lose_messages_iter() {
|
|
let (tb, rb) = Barrier::new2();
|
|
|
|
let (tx, rx) = channel();
|
|
let _ = tx.clone(); // convert to shared
|
|
|
|
thread::spawn(move || {
|
|
tb.wait();
|
|
thread::sleep(Duration::from_micros(1));
|
|
tx.send(17).expect("send");
|
|
drop(tx);
|
|
});
|
|
|
|
let i = rx.into_iter();
|
|
rb.wait();
|
|
// Make sure it doesn't return disconnected before returning an element
|
|
assert_eq!(vec![17], i.collect::<Vec<_>>());
|
|
}
|
|
|
|
#[test]
|
|
fn shared_close_sender_does_not_lose_messages() {
|
|
with_minimum_timer_resolution(|| {
|
|
for _ in 0..10000 {
|
|
shared_close_sender_does_not_lose_messages_iter();
|
|
}
|
|
});
|
|
}
|
|
|
|
|
|
// https://github.com/rust-lang/rust/issues/39364
|
|
fn concurrent_recv_timeout_and_upgrade_iter() {
|
|
// 1 us
|
|
let sleep = Duration::new(0, 1_000);
|
|
|
|
let (a, b) = Barrier::new2();
|
|
let (tx, rx) = channel();
|
|
let th = thread::spawn(move || {
|
|
a.wait();
|
|
loop {
|
|
match rx.recv_timeout(sleep) {
|
|
Ok(_) => {
|
|
break;
|
|
},
|
|
Err(_) => {},
|
|
}
|
|
}
|
|
});
|
|
b.wait();
|
|
thread::sleep(sleep);
|
|
tx.clone().send(()).expect("send");
|
|
th.join().unwrap();
|
|
}
|
|
|
|
#[test]
|
|
fn concurrent_recv_timeout_and_upgrade() {
|
|
with_minimum_timer_resolution(|| {
|
|
for _ in 0..10000 {
|
|
concurrent_recv_timeout_and_upgrade_iter();
|
|
}
|
|
});
|
|
}
|
|
|
|
|
|
fn concurrent_writes_iter() {
|
|
const THREADS: usize = 4;
|
|
const PER_THR: usize = 100;
|
|
|
|
let mut bs = Barrier::new(THREADS + 1);
|
|
let (tx, rx) = channel();
|
|
|
|
let mut threads = Vec::new();
|
|
for j in 0..THREADS {
|
|
let tx = tx.clone();
|
|
let b = bs.pop().unwrap();
|
|
threads.push(thread::spawn(move || {
|
|
b.wait();
|
|
for i in 0..PER_THR {
|
|
tx.send(j * 1000 + i).expect("send");
|
|
}
|
|
}));
|
|
}
|
|
|
|
let b = bs.pop().unwrap();
|
|
b.wait();
|
|
|
|
let mut v: Vec<_> = rx.iter().take(THREADS * PER_THR).collect();
|
|
v.sort();
|
|
|
|
for j in 0..THREADS {
|
|
for i in 0..PER_THR {
|
|
assert_eq!(j * 1000 + i, v[j * PER_THR + i]);
|
|
}
|
|
}
|
|
|
|
for t in threads {
|
|
t.join().unwrap();
|
|
}
|
|
|
|
let one_us = Duration::new(0, 1000);
|
|
|
|
assert_eq!(TryRecvError::Empty, rx.try_recv().unwrap_err());
|
|
assert_eq!(RecvTimeoutError::Timeout, rx.recv_timeout(one_us).unwrap_err());
|
|
|
|
drop(tx);
|
|
|
|
assert_eq!(RecvError, rx.recv().unwrap_err());
|
|
assert_eq!(RecvTimeoutError::Disconnected, rx.recv_timeout(one_us).unwrap_err());
|
|
assert_eq!(TryRecvError::Disconnected, rx.try_recv().unwrap_err());
|
|
}
|
|
|
|
#[test]
|
|
fn concurrent_writes() {
|
|
with_minimum_timer_resolution(|| {
|
|
for _ in 0..100 {
|
|
concurrent_writes_iter();
|
|
}
|
|
});
|
|
}
|
|
|
|
#[cfg(windows)]
|
|
pub mod timeapi {
|
|
#![allow(non_snake_case)]
|
|
use std::ffi::c_uint;
|
|
|
|
pub const TIMERR_NOERROR: c_uint = 0;
|
|
|
|
#[link(name = "winmm")]
|
|
extern "system" {
|
|
pub fn timeBeginPeriod(uPeriod: c_uint) -> c_uint;
|
|
pub fn timeEndPeriod(uPeriod: c_uint) -> c_uint;
|
|
}
|
|
}
|
|
|
|
/// Window's minimum sleep time can be as much as 16ms.
|
|
// This function evaluates the closure with this resolution
|
|
// set as low as possible.
|
|
///
|
|
/// This takes the above test's duration from 10000*16/1000/60=2.67 minutes to ~16 seconds.
|
|
fn with_minimum_timer_resolution(f: impl Fn()) {
|
|
#[cfg(windows)]
|
|
unsafe {
|
|
let ret = timeapi::timeBeginPeriod(1);
|
|
assert_eq!(ret, timeapi::TIMERR_NOERROR);
|
|
|
|
f();
|
|
|
|
let ret = timeapi::timeEndPeriod(1);
|
|
assert_eq!(ret, timeapi::TIMERR_NOERROR);
|
|
}
|
|
|
|
#[cfg(not(windows))]
|
|
{
|
|
f();
|
|
}
|
|
}
|