200 lines
4.7 KiB
Rust
200 lines
4.7 KiB
Rust
// run-pass
|
|
// compile-flags:--test
|
|
// ignore-emscripten
|
|
|
|
use std::sync::mpsc::channel;
|
|
use std::sync::mpsc::TryRecvError;
|
|
use std::sync::mpsc::RecvError;
|
|
use std::sync::mpsc::RecvTimeoutError;
|
|
use std::sync::Arc;
|
|
use std::sync::atomic::AtomicUsize;
|
|
use std::sync::atomic::Ordering;
|
|
|
|
use std::thread;
|
|
use std::time::Duration;
|
|
|
|
|
|
/// Simple thread synchronization utility
|
|
struct Barrier {
|
|
// Not using mutex/condvar for precision
|
|
shared: Arc<AtomicUsize>,
|
|
count: usize,
|
|
}
|
|
|
|
impl Barrier {
|
|
fn new(count: usize) -> Vec<Barrier> {
|
|
let shared = Arc::new(AtomicUsize::new(0));
|
|
(0..count).map(|_| Barrier { shared: shared.clone(), count: count }).collect()
|
|
}
|
|
|
|
fn new2() -> (Barrier, Barrier) {
|
|
let mut v = Barrier::new(2);
|
|
(v.pop().unwrap(), v.pop().unwrap())
|
|
}
|
|
|
|
/// Returns when `count` threads enter `wait`
|
|
fn wait(self) {
|
|
self.shared.fetch_add(1, Ordering::SeqCst);
|
|
while self.shared.load(Ordering::SeqCst) != self.count {
|
|
#[cfg(target_env = "sgx")]
|
|
thread::yield_now();
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
fn shared_close_sender_does_not_lose_messages_iter() {
|
|
let (tb, rb) = Barrier::new2();
|
|
|
|
let (tx, rx) = channel();
|
|
let _ = tx.clone(); // convert to shared
|
|
|
|
thread::spawn(move || {
|
|
tb.wait();
|
|
thread::sleep(Duration::from_micros(1));
|
|
tx.send(17).expect("send");
|
|
drop(tx);
|
|
});
|
|
|
|
let i = rx.into_iter();
|
|
rb.wait();
|
|
// Make sure it doesn't return disconnected before returning an element
|
|
assert_eq!(vec![17], i.collect::<Vec<_>>());
|
|
}
|
|
|
|
#[test]
|
|
fn shared_close_sender_does_not_lose_messages() {
|
|
with_minimum_timer_resolution(|| {
|
|
for _ in 0..10000 {
|
|
shared_close_sender_does_not_lose_messages_iter();
|
|
}
|
|
});
|
|
}
|
|
|
|
|
|
// https://github.com/rust-lang/rust/issues/39364
|
|
fn concurrent_recv_timeout_and_upgrade_iter() {
|
|
// 1 us
|
|
let sleep = Duration::new(0, 1_000);
|
|
|
|
let (a, b) = Barrier::new2();
|
|
let (tx, rx) = channel();
|
|
let th = thread::spawn(move || {
|
|
a.wait();
|
|
loop {
|
|
match rx.recv_timeout(sleep) {
|
|
Ok(_) => {
|
|
break;
|
|
},
|
|
Err(_) => {},
|
|
}
|
|
}
|
|
});
|
|
b.wait();
|
|
thread::sleep(sleep);
|
|
tx.clone().send(()).expect("send");
|
|
th.join().unwrap();
|
|
}
|
|
|
|
#[test]
|
|
fn concurrent_recv_timeout_and_upgrade() {
|
|
with_minimum_timer_resolution(|| {
|
|
for _ in 0..10000 {
|
|
concurrent_recv_timeout_and_upgrade_iter();
|
|
}
|
|
});
|
|
}
|
|
|
|
|
|
fn concurrent_writes_iter() {
|
|
const THREADS: usize = 4;
|
|
const PER_THR: usize = 100;
|
|
|
|
let mut bs = Barrier::new(THREADS + 1);
|
|
let (tx, rx) = channel();
|
|
|
|
let mut threads = Vec::new();
|
|
for j in 0..THREADS {
|
|
let tx = tx.clone();
|
|
let b = bs.pop().unwrap();
|
|
threads.push(thread::spawn(move || {
|
|
b.wait();
|
|
for i in 0..PER_THR {
|
|
tx.send(j * 1000 + i).expect("send");
|
|
}
|
|
}));
|
|
}
|
|
|
|
let b = bs.pop().unwrap();
|
|
b.wait();
|
|
|
|
let mut v: Vec<_> = rx.iter().take(THREADS * PER_THR).collect();
|
|
v.sort();
|
|
|
|
for j in 0..THREADS {
|
|
for i in 0..PER_THR {
|
|
assert_eq!(j * 1000 + i, v[j * PER_THR + i]);
|
|
}
|
|
}
|
|
|
|
for t in threads {
|
|
t.join().unwrap();
|
|
}
|
|
|
|
let one_us = Duration::new(0, 1000);
|
|
|
|
assert_eq!(TryRecvError::Empty, rx.try_recv().unwrap_err());
|
|
assert_eq!(RecvTimeoutError::Timeout, rx.recv_timeout(one_us).unwrap_err());
|
|
|
|
drop(tx);
|
|
|
|
assert_eq!(RecvError, rx.recv().unwrap_err());
|
|
assert_eq!(RecvTimeoutError::Disconnected, rx.recv_timeout(one_us).unwrap_err());
|
|
assert_eq!(TryRecvError::Disconnected, rx.try_recv().unwrap_err());
|
|
}
|
|
|
|
#[test]
|
|
fn concurrent_writes() {
|
|
with_minimum_timer_resolution(|| {
|
|
for _ in 0..100 {
|
|
concurrent_writes_iter();
|
|
}
|
|
});
|
|
}
|
|
|
|
#[cfg(windows)]
|
|
pub mod timeapi {
|
|
#![allow(non_snake_case)]
|
|
use std::ffi::c_uint;
|
|
|
|
pub const TIMERR_NOERROR: c_uint = 0;
|
|
|
|
#[link(name = "winmm")]
|
|
extern "system" {
|
|
pub fn timeBeginPeriod(uPeriod: c_uint) -> c_uint;
|
|
pub fn timeEndPeriod(uPeriod: c_uint) -> c_uint;
|
|
}
|
|
}
|
|
|
|
/// Window's minimum sleep time can be as much as 16ms.
|
|
// This function evaluates the closure with this resolution
|
|
// set as low as possible.
|
|
///
|
|
/// This takes the above test's duration from 10000*16/1000/60=2.67 minutes to ~16 seconds.
|
|
fn with_minimum_timer_resolution(f: impl Fn()) {
|
|
#[cfg(windows)]
|
|
unsafe {
|
|
let ret = timeapi::timeBeginPeriod(1);
|
|
assert_eq!(ret, timeapi::TIMERR_NOERROR);
|
|
|
|
f();
|
|
|
|
let ret = timeapi::timeEndPeriod(1);
|
|
assert_eq!(ret, timeapi::TIMERR_NOERROR);
|
|
}
|
|
|
|
#[cfg(not(windows))]
|
|
{
|
|
f();
|
|
}
|
|
}
|