auto merge of #12991 : alexcrichton/rust/sync-chan, r=brson

This commit contains an implementation of synchronous, bounded channels for
Rust. This is an implementation of the proposal made last January [1]. These
channels are built on mutexes, and currently focus on a working implementation
rather than speed. Receivers for sync channels have select() implemented for
them, but there is currently no implementation of select() for sync senders.

Rust will continue to provide both synchronous and asynchronous channels as part
of the standard distribution, there is no intent to remove asynchronous
channels. This flavor of channels is meant to provide an alternative to
asynchronous channels because like green tasks, asynchronous channels are not
appropriate for all situations.

[1] - https://mail.mozilla.org/pipermail/rust-dev/2014-January/007924.html
This commit is contained in:
bors 2014-03-24 21:56:50 -07:00
commit 1e6e98c0c2
11 changed files with 1229 additions and 115 deletions

View file

@ -280,6 +280,7 @@ mod select;
mod oneshot;
mod stream;
mod shared;
mod sync;
// Use a power of 2 to allow LLVM to optimize to something that's not a
// division, this is hit pretty regularly.
@ -301,8 +302,8 @@ pub struct Messages<'a, T> {
priv rx: &'a Receiver<T>
}
/// The sending-half of Rust's channel type. This half can only be owned by one
/// task
/// The sending-half of Rust's asynchronous channel type. This half can only be
/// owned by one task, but it can be cloned to send to other tasks.
pub struct Sender<T> {
priv inner: Flavor<T>,
priv sends: Cell<uint>,
@ -310,6 +311,14 @@ pub struct Sender<T> {
priv marker: marker::NoShare,
}
/// The sending-half of Rust's synchronous channel type. This half can only be
/// owned by one task, but it can be cloned to send to other tasks.
pub struct SyncSender<T> {
priv inner: UnsafeArc<sync::Packet<T>>,
// can't share in an arc
priv marker: marker::NoShare,
}
/// This enumeration is the list of the possible reasons that try_recv could not
/// return data when called.
#[deriving(Eq, Clone, Show)]
@ -324,10 +333,31 @@ pub enum TryRecvResult<T> {
Data(T),
}
/// This enumeration is the list of the possible outcomes for the
/// `SyncSender::try_send` method.
#[deriving(Eq, Clone, Show)]
pub enum TrySendResult<T> {
/// The data was successfully sent along the channel. This either means that
/// it was buffered in the channel, or handed off to a receiver. In either
/// case, the callee no longer has ownership of the data.
Sent,
/// The data could not be sent on the channel because it would require that
/// the callee block to send the data.
///
/// If this is a buffered channel, then the buffer is full at this time. If
/// this is not a buffered channel, then there is no receiver available to
/// acquire the data.
Full(T),
/// This channel's receiving half has disconnected, so the data could not be
/// sent. The data is returned back to the callee in this case.
RecvDisconnected(T),
}
enum Flavor<T> {
Oneshot(UnsafeArc<oneshot::Packet<T>>),
Stream(UnsafeArc<stream::Packet<T>>),
Shared(UnsafeArc<shared::Packet<T>>),
Sync(UnsafeArc<sync::Packet<T>>),
}
/// Creates a new channel, returning the sender/receiver halves. All data sent
@ -338,6 +368,46 @@ pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
(Sender::my_new(Oneshot(b)), Receiver::my_new(Oneshot(a)))
}
/// Creates a new synchronous, bounded channel.
///
/// Like asynchronous channels, the `Receiver` will block until a message
/// becomes available. These channels differ greatly in the semantics of the
/// sender from asynchronous channels, however.
///
/// This channel has an internal buffer on which messages will be queued. When
/// the internal buffer becomes full, future sends will *block* waiting for the
/// buffer to open up. Note that a buffer size of 0 is valid, in which case this
/// becomes "rendezvous channel" where each send will not return until a recv
/// is paired with it.
///
/// As with asynchronous channels, all senders will fail in `send` if the
/// `Receiver` has been destroyed.
///
/// # Example
///
/// ```
/// let (tx, rx) = sync_channel(1);
///
/// // this returns immediately
/// tx.send(1);
///
/// spawn(proc() {
/// // this will block until the previous message has been received
/// tx.send(2);
/// });
///
/// assert_eq!(rx.recv(), 1);
/// assert_eq!(rx.recv(), 2);
/// ```
pub fn sync_channel<T: Send>(bound: uint) -> (SyncSender<T>, Receiver<T>) {
let (a, b) = UnsafeArc::new2(sync::Packet::new(bound));
(SyncSender::new(a), Receiver::my_new(Sync(b)))
}
////////////////////////////////////////////////////////////////////////////////
// Sender
////////////////////////////////////////////////////////////////////////////////
impl<T: Send> Sender<T> {
fn my_new(inner: Flavor<T>) -> Sender<T> {
Sender { inner: inner, sends: Cell::new(0), marker: marker::NoShare }
@ -422,6 +492,7 @@ impl<T: Send> Sender<T> {
}
Stream(ref p) => return unsafe { (*p.get()).send(t) },
Shared(ref p) => return unsafe { (*p.get()).send(t) },
Sync(..) => unreachable!(),
};
unsafe {
@ -453,6 +524,7 @@ impl<T: Send> Clone for Sender<T> {
unsafe { (*p.get()).clone_chan(); }
return Sender::my_new(Shared(p.clone()));
}
Sync(..) => unreachable!(),
};
unsafe {
@ -472,10 +544,100 @@ impl<T: Send> Drop for Sender<T> {
Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); },
Stream(ref mut p) => unsafe { (*p.get()).drop_chan(); },
Shared(ref mut p) => unsafe { (*p.get()).drop_chan(); },
Sync(..) => unreachable!(),
}
}
}
////////////////////////////////////////////////////////////////////////////////
// SyncSender
////////////////////////////////////////////////////////////////////////////////
impl<T: Send> SyncSender<T> {
fn new(inner: UnsafeArc<sync::Packet<T>>) -> SyncSender<T> {
SyncSender { inner: inner, marker: marker::NoShare }
}
/// Sends a value on this synchronous channel.
///
/// This function will *block* until space in the internal buffer becomes
/// available or a receiver is available to hand off the message to.
///
/// Note that a successful send does *not* guarantee that the receiver will
/// ever see the data if there is a buffer on this channel. Messages may be
/// enqueued in the internal buffer for the receiver to receive at a later
/// time. If the buffer size is 0, however, it can be guaranteed that the
/// receiver has indeed received the data if this function returns success.
///
/// # Failure
///
/// Similarly to `Sender::send`, this function will fail if the
/// corresponding `Receiver` for this channel has disconnected. This
/// behavior is used to propagate failure among tasks.
///
/// If failure is not desired, you can achieve the same semantics with the
/// `SyncSender::send_opt` method which will not fail if the receiver
/// disconnects.
pub fn send(&self, t: T) {
if self.send_opt(t).is_some() {
fail!("sending on a closed channel");
}
}
/// Send a value on a channel, returning it back if the receiver
/// disconnected
///
/// This method will *block* to send the value `t` on the channel, but if
/// the value could not be sent due to the receiver disconnecting, the value
/// is returned back to the callee. This function is similar to `try_send`,
/// except that it will block if the channel is currently full.
///
/// # Failure
///
/// This function cannot fail.
pub fn send_opt(&self, t: T) -> Option<T> {
match unsafe { (*self.inner.get()).send(t) } {
Ok(()) => None,
Err(t) => Some(t),
}
}
/// Attempts to send a value on this channel without blocking.
///
/// This method semantically differs from `Sender::try_send` because it can
/// fail if the receiver has not disconnected yet. If the buffer on this
/// channel is full, this function will immediately return the data back to
/// the callee.
///
/// See `SyncSender::send` for notes about guarantees of whether the
/// receiver has received the data or not if this function is successful.
///
/// # Failure
///
/// This function cannot fail
pub fn try_send(&self, t: T) -> TrySendResult<T> {
unsafe { (*self.inner.get()).try_send(t) }
}
}
impl<T: Send> Clone for SyncSender<T> {
fn clone(&self) -> SyncSender<T> {
unsafe { (*self.inner.get()).clone_chan(); }
return SyncSender::new(self.inner.clone());
}
}
#[unsafe_destructor]
impl<T: Send> Drop for SyncSender<T> {
fn drop(&mut self) {
unsafe { (*self.inner.get()).drop_chan(); }
}
}
////////////////////////////////////////////////////////////////////////////////
// Receiver
////////////////////////////////////////////////////////////////////////////////
impl<T: Send> Receiver<T> {
fn my_new(inner: Flavor<T>) -> Receiver<T> {
Receiver { inner: inner, receives: Cell::new(0), marker: marker::NoShare }
@ -554,6 +716,13 @@ impl<T: Send> Receiver<T> {
Err(shared::Disconnected) => return Disconnected,
}
}
Sync(ref p) => {
match unsafe { (*p.get()).try_recv() } {
Ok(t) => return Data(t),
Err(sync::Empty) => return Empty,
Err(sync::Disconnected) => return Disconnected,
}
}
};
unsafe {
mem::swap(&mut cast::transmute_mut(self).inner,
@ -600,6 +769,7 @@ impl<T: Send> Receiver<T> {
Err(shared::Disconnected) => return None,
}
}
Sync(ref p) => return unsafe { (*p.get()).recv() }
};
unsafe {
mem::swap(&mut cast::transmute_mut(self).inner,
@ -634,6 +804,9 @@ impl<T: Send> select::Packet for Receiver<T> {
Shared(ref p) => {
return unsafe { (*p.get()).can_recv() };
}
Sync(ref p) => {
return unsafe { (*p.get()).can_recv() };
}
};
unsafe {
mem::swap(&mut cast::transmute_mut(self).inner,
@ -662,6 +835,9 @@ impl<T: Send> select::Packet for Receiver<T> {
Shared(ref p) => {
return unsafe { (*p.get()).start_selection(task) };
}
Sync(ref p) => {
return unsafe { (*p.get()).start_selection(task) };
}
};
task = t;
unsafe {
@ -682,6 +858,9 @@ impl<T: Send> select::Packet for Receiver<T> {
Shared(ref p) => return unsafe {
(*p.get()).abort_selection(was_upgrade)
},
Sync(ref p) => return unsafe {
(*p.get()).abort_selection()
},
};
let mut new_port = match result { Ok(b) => return b, Err(p) => p };
was_upgrade = true;
@ -704,6 +883,7 @@ impl<T: Send> Drop for Receiver<T> {
Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); },
Stream(ref mut p) => unsafe { (*p.get()).drop_port(); },
Shared(ref mut p) => unsafe { (*p.get()).drop_port(); },
Sync(ref mut p) => unsafe { (*p.get()).drop_port(); },
}
}
}
@ -1243,3 +1423,517 @@ mod test {
pdone.recv();
})
}
#[cfg(test)]
mod sync_tests {
use prelude::*;
use os;
pub fn stress_factor() -> uint {
match os::getenv("RUST_TEST_STRESS") {
Some(val) => from_str::<uint>(val).unwrap(),
None => 1,
}
}
test!(fn smoke() {
let (tx, rx) = sync_channel(1);
tx.send(1);
assert_eq!(rx.recv(), 1);
})
test!(fn drop_full() {
let (tx, _rx) = sync_channel(1);
tx.send(~1);
})
test!(fn smoke_shared() {
let (tx, rx) = sync_channel(1);
tx.send(1);
assert_eq!(rx.recv(), 1);
let tx = tx.clone();
tx.send(1);
assert_eq!(rx.recv(), 1);
})
test!(fn smoke_threads() {
let (tx, rx) = sync_channel(0);
spawn(proc() {
tx.send(1);
});
assert_eq!(rx.recv(), 1);
})
test!(fn smoke_port_gone() {
let (tx, rx) = sync_channel(0);
drop(rx);
tx.send(1);
} #[should_fail])
test!(fn smoke_shared_port_gone2() {
let (tx, rx) = sync_channel(0);
drop(rx);
let tx2 = tx.clone();
drop(tx);
tx2.send(1);
} #[should_fail])
test!(fn port_gone_concurrent() {
let (tx, rx) = sync_channel(0);
spawn(proc() {
rx.recv();
});
loop { tx.send(1) }
} #[should_fail])
test!(fn port_gone_concurrent_shared() {
let (tx, rx) = sync_channel(0);
let tx2 = tx.clone();
spawn(proc() {
rx.recv();
});
loop {
tx.send(1);
tx2.send(1);
}
} #[should_fail])
test!(fn smoke_chan_gone() {
let (tx, rx) = sync_channel::<int>(0);
drop(tx);
rx.recv();
} #[should_fail])
test!(fn smoke_chan_gone_shared() {
let (tx, rx) = sync_channel::<()>(0);
let tx2 = tx.clone();
drop(tx);
drop(tx2);
rx.recv();
} #[should_fail])
test!(fn chan_gone_concurrent() {
let (tx, rx) = sync_channel(0);
spawn(proc() {
tx.send(1);
tx.send(1);
});
loop { rx.recv(); }
} #[should_fail])
test!(fn stress() {
let (tx, rx) = sync_channel(0);
spawn(proc() {
for _ in range(0, 10000) { tx.send(1); }
});
for _ in range(0, 10000) {
assert_eq!(rx.recv(), 1);
}
})
test!(fn stress_shared() {
static AMT: uint = 1000;
static NTHREADS: uint = 8;
let (tx, rx) = sync_channel::<int>(0);
let (dtx, drx) = sync_channel::<()>(0);
spawn(proc() {
for _ in range(0, AMT * NTHREADS) {
assert_eq!(rx.recv(), 1);
}
match rx.try_recv() {
Data(..) => fail!(),
_ => {}
}
dtx.send(());
});
for _ in range(0, NTHREADS) {
let tx = tx.clone();
spawn(proc() {
for _ in range(0, AMT) { tx.send(1); }
});
}
drop(tx);
drx.recv();
})
test!(fn oneshot_single_thread_close_port_first() {
// Simple test of closing without sending
let (_tx, rx) = sync_channel::<int>(0);
drop(rx);
})
test!(fn oneshot_single_thread_close_chan_first() {
// Simple test of closing without sending
let (tx, _rx) = sync_channel::<int>(0);
drop(tx);
})
test!(fn oneshot_single_thread_send_port_close() {
// Testing that the sender cleans up the payload if receiver is closed
let (tx, rx) = sync_channel::<~int>(0);
drop(rx);
tx.send(~0);
} #[should_fail])
test!(fn oneshot_single_thread_recv_chan_close() {
// Receiving on a closed chan will fail
let res = task::try(proc() {
let (tx, rx) = sync_channel::<int>(0);
drop(tx);
rx.recv();
});
// What is our res?
assert!(res.is_err());
})
test!(fn oneshot_single_thread_send_then_recv() {
let (tx, rx) = sync_channel::<~int>(1);
tx.send(~10);
assert!(rx.recv() == ~10);
})
test!(fn oneshot_single_thread_try_send_open() {
let (tx, rx) = sync_channel::<int>(1);
assert_eq!(tx.try_send(10), Sent);
assert!(rx.recv() == 10);
})
test!(fn oneshot_single_thread_try_send_closed() {
let (tx, rx) = sync_channel::<int>(0);
drop(rx);
assert_eq!(tx.try_send(10), RecvDisconnected(10));
})
test!(fn oneshot_single_thread_try_send_closed2() {
let (tx, _rx) = sync_channel::<int>(0);
assert_eq!(tx.try_send(10), Full(10));
})
test!(fn oneshot_single_thread_try_recv_open() {
let (tx, rx) = sync_channel::<int>(1);
tx.send(10);
assert!(rx.recv_opt() == Some(10));
})
test!(fn oneshot_single_thread_try_recv_closed() {
let (tx, rx) = sync_channel::<int>(0);
drop(tx);
assert!(rx.recv_opt() == None);
})
test!(fn oneshot_single_thread_peek_data() {
let (tx, rx) = sync_channel::<int>(1);
assert_eq!(rx.try_recv(), Empty)
tx.send(10);
assert_eq!(rx.try_recv(), Data(10));
})
test!(fn oneshot_single_thread_peek_close() {
let (tx, rx) = sync_channel::<int>(0);
drop(tx);
assert_eq!(rx.try_recv(), Disconnected);
assert_eq!(rx.try_recv(), Disconnected);
})
test!(fn oneshot_single_thread_peek_open() {
let (_tx, rx) = sync_channel::<int>(0);
assert_eq!(rx.try_recv(), Empty);
})
test!(fn oneshot_multi_task_recv_then_send() {
let (tx, rx) = sync_channel::<~int>(0);
spawn(proc() {
assert!(rx.recv() == ~10);
});
tx.send(~10);
})
test!(fn oneshot_multi_task_recv_then_close() {
let (tx, rx) = sync_channel::<~int>(0);
spawn(proc() {
drop(tx);
});
let res = task::try(proc() {
assert!(rx.recv() == ~10);
});
assert!(res.is_err());
})
test!(fn oneshot_multi_thread_close_stress() {
for _ in range(0, stress_factor()) {
let (tx, rx) = sync_channel::<int>(0);
spawn(proc() {
drop(rx);
});
drop(tx);
}
})
test!(fn oneshot_multi_thread_send_close_stress() {
for _ in range(0, stress_factor()) {
let (tx, rx) = sync_channel::<int>(0);
spawn(proc() {
drop(rx);
});
let _ = task::try(proc() {
tx.send(1);
});
}
})
test!(fn oneshot_multi_thread_recv_close_stress() {
for _ in range(0, stress_factor()) {
let (tx, rx) = sync_channel::<int>(0);
spawn(proc() {
let res = task::try(proc() {
rx.recv();
});
assert!(res.is_err());
});
spawn(proc() {
spawn(proc() {
drop(tx);
});
});
}
})
test!(fn oneshot_multi_thread_send_recv_stress() {
for _ in range(0, stress_factor()) {
let (tx, rx) = sync_channel(0);
spawn(proc() {
tx.send(~10);
});
spawn(proc() {
assert!(rx.recv() == ~10);
});
}
})
test!(fn stream_send_recv_stress() {
for _ in range(0, stress_factor()) {
let (tx, rx) = sync_channel(0);
send(tx, 0);
recv(rx, 0);
fn send(tx: SyncSender<~int>, i: int) {
if i == 10 { return }
spawn(proc() {
tx.send(~i);
send(tx, i + 1);
});
}
fn recv(rx: Receiver<~int>, i: int) {
if i == 10 { return }
spawn(proc() {
assert!(rx.recv() == ~i);
recv(rx, i + 1);
});
}
}
})
test!(fn recv_a_lot() {
// Regression test that we don't run out of stack in scheduler context
let (tx, rx) = sync_channel(10000);
for _ in range(0, 10000) { tx.send(()); }
for _ in range(0, 10000) { rx.recv(); }
})
test!(fn shared_chan_stress() {
let (tx, rx) = sync_channel(0);
let total = stress_factor() + 100;
for _ in range(0, total) {
let tx = tx.clone();
spawn(proc() {
tx.send(());
});
}
for _ in range(0, total) {
rx.recv();
}
})
test!(fn test_nested_recv_iter() {
let (tx, rx) = sync_channel::<int>(0);
let (total_tx, total_rx) = sync_channel::<int>(0);
spawn(proc() {
let mut acc = 0;
for x in rx.iter() {
acc += x;
}
total_tx.send(acc);
});
tx.send(3);
tx.send(1);
tx.send(2);
drop(tx);
assert_eq!(total_rx.recv(), 6);
})
test!(fn test_recv_iter_break() {
let (tx, rx) = sync_channel::<int>(0);
let (count_tx, count_rx) = sync_channel(0);
spawn(proc() {
let mut count = 0;
for x in rx.iter() {
if count >= 3 {
break;
} else {
count += x;
}
}
count_tx.send(count);
});
tx.send(2);
tx.send(2);
tx.send(2);
tx.try_send(2);
drop(tx);
assert_eq!(count_rx.recv(), 4);
})
test!(fn try_recv_states() {
let (tx1, rx1) = sync_channel::<int>(1);
let (tx2, rx2) = sync_channel::<()>(1);
let (tx3, rx3) = sync_channel::<()>(1);
spawn(proc() {
rx2.recv();
tx1.send(1);
tx3.send(());
rx2.recv();
drop(tx1);
tx3.send(());
});
assert_eq!(rx1.try_recv(), Empty);
tx2.send(());
rx3.recv();
assert_eq!(rx1.try_recv(), Data(1));
assert_eq!(rx1.try_recv(), Empty);
tx2.send(());
rx3.recv();
assert_eq!(rx1.try_recv(), Disconnected);
})
// This bug used to end up in a livelock inside of the Receiver destructor
// because the internal state of the Shared packet was corrupted
test!(fn destroy_upgraded_shared_port_when_sender_still_active() {
let (tx, rx) = sync_channel(0);
let (tx2, rx2) = sync_channel(0);
spawn(proc() {
rx.recv(); // wait on a oneshot
drop(rx); // destroy a shared
tx2.send(());
});
// make sure the other task has gone to sleep
for _ in range(0, 5000) { task::deschedule(); }
// upgrade to a shared chan and send a message
let t = tx.clone();
drop(tx);
t.send(());
// wait for the child task to exit before we exit
rx2.recv();
})
test!(fn try_recvs_off_the_runtime() {
use std::rt::thread::Thread;
let (tx, rx) = sync_channel(0);
let (cdone, pdone) = channel();
let t = Thread::start(proc() {
let mut hits = 0;
while hits < 10 {
match rx.try_recv() {
Data(()) => { hits += 1; }
Empty => { Thread::yield_now(); }
Disconnected => return,
}
}
cdone.send(());
});
for _ in range(0, 10) {
tx.send(());
}
t.join();
pdone.recv();
})
test!(fn send_opt1() {
let (tx, rx) = sync_channel(0);
spawn(proc() { rx.recv(); });
assert_eq!(tx.send_opt(1), None);
})
test!(fn send_opt2() {
let (tx, rx) = sync_channel(0);
spawn(proc() { drop(rx); });
assert_eq!(tx.send_opt(1), Some(1));
})
test!(fn send_opt3() {
let (tx, rx) = sync_channel(1);
assert_eq!(tx.send_opt(1), None);
spawn(proc() { drop(rx); });
assert_eq!(tx.send_opt(1), Some(1));
})
test!(fn send_opt4() {
let (tx, rx) = sync_channel(0);
let tx2 = tx.clone();
let (done, donerx) = channel();
let done2 = done.clone();
spawn(proc() {
assert_eq!(tx.send_opt(1), Some(1));
done.send(());
});
spawn(proc() {
assert_eq!(tx2.send_opt(2), Some(2));
done2.send(());
});
drop(rx);
donerx.recv();
donerx.recv();
})
test!(fn try_send1() {
let (tx, _rx) = sync_channel(0);
assert_eq!(tx.try_send(1), Full(1));
})
test!(fn try_send2() {
let (tx, _rx) = sync_channel(1);
assert_eq!(tx.try_send(1), Sent);
assert_eq!(tx.try_send(1), Full(1));
})
test!(fn try_send3() {
let (tx, rx) = sync_channel(1);
assert_eq!(tx.try_send(1), Sent);
drop(rx);
assert_eq!(tx.try_send(1), RecvDisconnected(1));
})
test!(fn try_send4() {
let (tx, rx) = sync_channel(0);
spawn(proc() {
for _ in range(0, 1000) { task::deschedule(); }
assert_eq!(tx.try_send(1), Sent);
});
assert_eq!(rx.recv(), 1);
})
}

View file

@ -648,4 +648,40 @@ mod test {
tx1.send(());
rx2.recv();
})
test!(fn sync1() {
let (tx, rx) = sync_channel(1);
tx.send(1);
select! {
n = rx.recv() => { assert_eq!(n, 1); }
}
})
test!(fn sync2() {
let (tx, rx) = sync_channel(0);
spawn(proc() {
for _ in range(0, 100) { task::deschedule() }
tx.send(1);
});
select! {
n = rx.recv() => { assert_eq!(n, 1); }
}
})
test!(fn sync3() {
let (tx1, rx1) = sync_channel(0);
let (tx2, rx2) = channel();
spawn(proc() { tx1.send(1); });
spawn(proc() { tx2.send(2); });
select! {
n = rx1.recv() => {
assert_eq!(n, 1);
assert_eq!(rx2.recv(), 2);
},
n = rx2.recv() => {
assert_eq!(n, 2);
assert_eq!(rx1.recv(), 1);
}
}
})
}

485
src/libstd/comm/sync.rs Normal file
View file

@ -0,0 +1,485 @@
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
/// Synchronous channels/ports
///
/// This channel implementation differs significantly from the asynchronous
/// implementations found next to it (oneshot/stream/share). This is an
/// implementation of a synchronous, bounded buffer channel.
///
/// Each channel is created with some amount of backing buffer, and sends will
/// *block* until buffer space becomes available. A buffer size of 0 is valid,
/// which means that every successful send is paired with a successful recv.
///
/// This flavor of channels defines a new `send_opt` method for channels which
/// is the method by which a message is sent but the task does not fail if it
/// cannot be delivered.
///
/// Another major difference is that send() will *always* return back the data
/// if it couldn't be sent. This is because it is deterministically known when
/// the data is received and when it is not received.
///
/// Implementation-wise, it can all be summed up with "use a mutex plus some
/// logic". The mutex used here is an OS native mutex, meaning that no user code
/// is run inside of the mutex (to prevent context switching). This
/// implementation shares almost all code for the buffered and unbuffered cases
/// of a synchronous channel. There are a few branches for the unbuffered case,
/// but they're mostly just relevant to blocking senders.
use cast;
use container::Container;
use iter::Iterator;
use kinds::Send;
use mem;
use ops::Drop;
use option::{Some, None, Option};
use ptr::RawPtr;
use result::{Result, Ok, Err};
use rt::local::Local;
use rt::task::{Task, BlockedTask};
use sync::atomics;
use ty::Unsafe;
use unstable::mutex::{NativeMutex, LockGuard};
use vec::Vec;
pub struct Packet<T> {
/// Only field outside of the mutex. Just done for kicks, but mainly because
/// the other shared channel already had the code implemented
channels: atomics::AtomicUint,
/// The state field is protected by this mutex
lock: NativeMutex,
state: Unsafe<State<T>>,
}
struct State<T> {
disconnected: bool, // Is the channel disconnected yet?
queue: Queue, // queue of senders waiting to send data
blocker: Blocker, // currently blocked task on this channel
buf: Buffer<T>, // storage for buffered messages
cap: uint, // capacity of this channel
/// A curious flag used to indicate whether a sender failed or succeeded in
/// blocking. This is used to transmit information back to the task that it
/// must dequeue its message from the buffer because it was not received.
/// This is only relevant in the 0-buffer case. This obviously cannot be
/// safely constructed, but it's guaranteed to always have a valid pointer
/// value.
canceled: Option<&'static mut bool>,
}
/// Possible flavors of tasks who can be blocked on this channel.
enum Blocker {
BlockedSender(BlockedTask),
BlockedReceiver(BlockedTask),
NoneBlocked
}
/// Simple queue for threading tasks together. Nodes are stack-allocated, so
/// this structure is not safe at all
struct Queue {
head: *mut Node,
tail: *mut Node,
}
struct Node {
task: Option<BlockedTask>,
next: *mut Node,
}
/// A simple ring-buffer
struct Buffer<T> {
buf: Vec<Option<T>>,
start: uint,
size: uint,
}
#[deriving(Show)]
pub enum Failure {
Empty,
Disconnected,
}
/// Atomically blocks the current task, placing it into `slot`, unlocking `lock`
/// in the meantime. This re-locks the mutex upon returning.
fn wait(slot: &mut Blocker, f: fn(BlockedTask) -> Blocker,
lock: &NativeMutex) {
let me: ~Task = Local::take();
me.deschedule(1, |task| {
match mem::replace(slot, f(task)) {
NoneBlocked => {}
_ => unreachable!(),
}
unsafe { lock.unlock_noguard(); }
Ok(())
});
unsafe { lock.lock_noguard(); }
}
/// Wakes up a task, dropping the lock at the correct time
fn wakeup(task: BlockedTask, guard: LockGuard) {
// We need to be careful to wake up the waiting task *outside* of the mutex
// in case it incurs a context switch.
mem::drop(guard);
task.wake().map(|t| t.reawaken());
}
impl<T: Send> Packet<T> {
pub fn new(cap: uint) -> Packet<T> {
Packet {
channels: atomics::AtomicUint::new(1),
lock: unsafe { NativeMutex::new() },
state: Unsafe::new(State {
disconnected: false,
blocker: NoneBlocked,
cap: cap,
canceled: None,
queue: Queue {
head: 0 as *mut Node,
tail: 0 as *mut Node,
},
buf: Buffer {
buf: Vec::from_fn(cap + if cap == 0 {1} else {0}, |_| None),
start: 0,
size: 0,
},
}),
}
}
// Locks this channel, returning a guard for the state and the mutable state
// itself. Care should be taken to ensure that the state does not escape the
// guard!
//
// Note that we're ok promoting an & reference to an &mut reference because
// the lock ensures that we're the only ones in the world with a pointer to
// the state.
fn lock<'a>(&'a self) -> (LockGuard<'a>, &'a mut State<T>) {
unsafe {
let guard = self.lock.lock();
(guard, &mut *self.state.get())
}
}
pub fn send(&self, t: T) -> Result<(), T> {
let (guard, state) = self.lock();
// wait for a slot to become available, and enqueue the data
while !state.disconnected && state.buf.size() == state.buf.cap() {
state.queue.enqueue(&self.lock);
}
if state.disconnected { return Err(t) }
state.buf.enqueue(t);
match mem::replace(&mut state.blocker, NoneBlocked) {
// if our capacity is 0, then we need to wait for a receiver to be
// available to take our data. After waiting, we check again to make
// sure the port didn't go away in the meantime. If it did, we need
// to hand back our data.
NoneBlocked if state.cap == 0 => {
let mut canceled = false;
assert!(state.canceled.is_none());
state.canceled = Some(unsafe { cast::transmute(&mut canceled) });
wait(&mut state.blocker, BlockedSender, &self.lock);
if canceled {Err(state.buf.dequeue())} else {Ok(())}
}
// success, we buffered some data
NoneBlocked => Ok(()),
// success, someone's about to receive our buffered data.
BlockedReceiver(task) => { wakeup(task, guard); Ok(()) }
BlockedSender(..) => fail!("lolwut"),
}
}
pub fn try_send(&self, t: T) -> super::TrySendResult<T> {
let (guard, state) = self.lock();
if state.disconnected {
super::RecvDisconnected(t)
} else if state.buf.size() == state.buf.cap() {
super::Full(t)
} else if state.cap == 0 {
// With capacity 0, even though we have buffer space we can't
// transfer the data unless there's a receiver waiting.
match mem::replace(&mut state.blocker, NoneBlocked) {
NoneBlocked => super::Full(t),
BlockedSender(..) => unreachable!(),
BlockedReceiver(task) => {
state.buf.enqueue(t);
wakeup(task, guard);
super::Sent
}
}
} else {
// If the buffer has some space and the capacity isn't 0, then we
// just enqueue the data for later retrieval.
assert!(state.buf.size() < state.buf.cap());
state.buf.enqueue(t);
super::Sent
}
}
// Receives a message from this channel
//
// When reading this, remember that there can only ever be one receiver at
// time.
pub fn recv(&self) -> Option<T> {
let (guard, state) = self.lock();
// Wait for the buffer to have something in it. No need for a while loop
// because we're the only receiver.
let mut waited = false;
if !state.disconnected && state.buf.size() == 0 {
wait(&mut state.blocker, BlockedReceiver, &self.lock);
waited = true;
}
if state.disconnected && state.buf.size() == 0 { return None }
// Pick up the data, wake up our neighbors, and carry on
assert!(state.buf.size() > 0);
let ret = state.buf.dequeue();
self.wakeup_senders(waited, guard, state);
return Some(ret);
}
pub fn try_recv(&self) -> Result<T, Failure> {
let (guard, state) = self.lock();
// Easy cases first
if state.disconnected { return Err(Disconnected) }
if state.buf.size() == 0 { return Err(Empty) }
// Be sure to wake up neighbors
let ret = Ok(state.buf.dequeue());
self.wakeup_senders(false, guard, state);
return ret;
}
// Wake up pending senders after some data has been received
//
// * `waited` - flag if the receiver blocked to receive some data, or if it
// just picked up some data on the way out
// * `guard` - the lock guard that is held over this channel's lock
fn wakeup_senders(&self, waited: bool,
guard: LockGuard,
state: &mut State<T>) {
let pending_sender1: Option<BlockedTask> = state.queue.dequeue();
// If this is a no-buffer channel (cap == 0), then if we didn't wait we
// need to ACK the sender. If we waited, then the sender waking us up
// was already the ACK.
let pending_sender2 = if state.cap == 0 && !waited {
match mem::replace(&mut state.blocker, NoneBlocked) {
NoneBlocked => None,
BlockedReceiver(..) => unreachable!(),
BlockedSender(task) => {
state.canceled.take();
Some(task)
}
}
} else {
None
};
mem::drop((state, guard));
// only outside of the lock do we wake up the pending tasks
pending_sender1.map(|t| t.wake().map(|t| t.reawaken()));
pending_sender2.map(|t| t.wake().map(|t| t.reawaken()));
}
// Prepares this shared packet for a channel clone, essentially just bumping
// a refcount.
pub fn clone_chan(&self) {
self.channels.fetch_add(1, atomics::SeqCst);
}
pub fn drop_chan(&self) {
// Only flag the channel as disconnected if we're the last channel
match self.channels.fetch_sub(1, atomics::SeqCst) {
1 => {}
_ => return
}
// Not much to do other than wake up a receiver if one's there
let (guard, state) = self.lock();
if state.disconnected { return }
state.disconnected = true;
match mem::replace(&mut state.blocker, NoneBlocked) {
NoneBlocked => {}
BlockedSender(..) => unreachable!(),
BlockedReceiver(task) => wakeup(task, guard),
}
}
pub fn drop_port(&self) {
let (guard, state) = self.lock();
if state.disconnected { return }
state.disconnected = true;
// If the capacity is 0, then the sender may want its data back after
// we're disconnected. Otherwise it's now our responsibility to destroy
// the buffered data. As with many other portions of this code, this
// needs to be careful to destroy the data *outside* of the lock to
// prevent deadlock.
let _data = if state.cap != 0 {
mem::replace(&mut state.buf.buf, Vec::new())
} else {
Vec::new()
};
let mut queue = mem::replace(&mut state.queue, Queue {
head: 0 as *mut Node,
tail: 0 as *mut Node,
});
let waiter = match mem::replace(&mut state.blocker, NoneBlocked) {
NoneBlocked => None,
BlockedSender(task) => {
*state.canceled.take_unwrap() = true;
Some(task)
}
BlockedReceiver(..) => unreachable!(),
};
mem::drop((state, guard));
loop {
match queue.dequeue() {
Some(task) => { task.wake().map(|t| t.reawaken()); }
None => break,
}
}
waiter.map(|t| t.wake().map(|t| t.reawaken()));
}
////////////////////////////////////////////////////////////////////////////
// select implementation
////////////////////////////////////////////////////////////////////////////
// If Ok, the value is whether this port has data, if Err, then the upgraded
// port needs to be checked instead of this one.
pub fn can_recv(&self) -> bool {
let (_g, state) = self.lock();
state.disconnected || state.buf.size() > 0
}
// Attempts to start selection on this port. This can either succeed or fail
// because there is data waiting.
pub fn start_selection(&self, task: BlockedTask) -> Result<(), BlockedTask>{
let (_g, state) = self.lock();
if state.disconnected || state.buf.size() > 0 {
Err(task)
} else {
match mem::replace(&mut state.blocker, BlockedReceiver(task)) {
NoneBlocked => {}
BlockedSender(..) => unreachable!(),
BlockedReceiver(..) => unreachable!(),
}
Ok(())
}
}
// Remove a previous selecting task from this port. This ensures that the
// blocked task will no longer be visible to any other threads.
//
// The return value indicates whether there's data on this port.
pub fn abort_selection(&self) -> bool {
let (_g, state) = self.lock();
match mem::replace(&mut state.blocker, NoneBlocked) {
NoneBlocked => true,
BlockedSender(task) => {
state.blocker = BlockedSender(task);
true
}
BlockedReceiver(task) => { task.trash(); false }
}
}
}
#[unsafe_destructor]
impl<T: Send> Drop for Packet<T> {
fn drop(&mut self) {
assert_eq!(self.channels.load(atomics::SeqCst), 0);
let (_g, state) = self.lock();
assert!(state.queue.dequeue().is_none());
assert!(state.canceled.is_none());
}
}
////////////////////////////////////////////////////////////////////////////////
// Buffer, a simple ring buffer backed by Vec<T>
////////////////////////////////////////////////////////////////////////////////
impl<T> Buffer<T> {
fn enqueue(&mut self, t: T) {
let pos = (self.start + self.size) % self.buf.len();
self.size += 1;
let prev = mem::replace(self.buf.get_mut(pos), Some(t));
assert!(prev.is_none());
}
fn dequeue(&mut self) -> T {
let start = self.start;
self.size -= 1;
self.start = (self.start + 1) % self.buf.len();
self.buf.get_mut(start).take_unwrap()
}
fn size(&self) -> uint { self.size }
fn cap(&self) -> uint { self.buf.len() }
}
////////////////////////////////////////////////////////////////////////////////
// Queue, a simple queue to enqueue tasks with (stack-allocated nodes)
////////////////////////////////////////////////////////////////////////////////
impl Queue {
fn enqueue(&mut self, lock: &NativeMutex) {
let task: ~Task = Local::take();
let mut node = Node {
task: None,
next: 0 as *mut Node,
};
task.deschedule(1, |task| {
node.task = Some(task);
if self.tail.is_null() {
self.head = &mut node as *mut Node;
self.tail = &mut node as *mut Node;
} else {
unsafe {
(*self.tail).next = &mut node as *mut Node;
self.tail = &mut node as *mut Node;
}
}
unsafe { lock.unlock_noguard(); }
Ok(())
});
unsafe { lock.lock_noguard(); }
assert!(node.next.is_null());
}
fn dequeue(&mut self) -> Option<BlockedTask> {
if self.head.is_null() {
return None
}
let node = self.head;
self.head = unsafe { (*node).next };
if self.head.is_null() {
self.tail = 0 as *mut Node;
}
unsafe {
(*node).next = 0 as *mut Node;
Some((*node).task.take_unwrap())
}
}
}

View file

@ -1282,10 +1282,10 @@ mod test {
}
iotest!(fn binary_file() {
use rand::{Rng, task_rng};
use rand::{StdRng, Rng};
let mut bytes = [0, ..1024];
task_rng().fill_bytes(bytes);
StdRng::new().fill_bytes(bytes);
let tmpdir = tmpdir();

View file

@ -62,7 +62,7 @@ pub use slice::{Vector, VectorVector, CloneableVector, ImmutableVector};
pub use vec::Vec;
// Reexported runtime types
pub use comm::{channel, Sender, Receiver};
pub use comm::{sync_channel, channel, SyncSender, Sender, Receiver};
pub use task::spawn;
// Reexported statics

View file

@ -69,6 +69,7 @@ mod imp {
use iter::Iterator;
use unstable::mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT};
use mem;
#[cfg(not(test))] use ptr::RawPtr;
static mut global_args_ptr: uint = 0;
static mut lock: StaticNativeMutex = NATIVE_MUTEX_INIT;

View file

@ -433,8 +433,8 @@ mod test {
#[test]
fn rng() {
use rand::{Rng, task_rng};
let mut r = task_rng();
use rand::{StdRng, Rng};
let mut r = StdRng::new();
let _ = r.next_u32();
}

View file

@ -580,9 +580,9 @@ mod test {
fn smoke_cond() {
static mut lock: StaticNativeMutex = NATIVE_MUTEX_INIT;
unsafe {
let mut guard = lock.lock();
let guard = lock.lock();
let t = Thread::start(proc() {
let mut guard = lock.lock();
let guard = lock.lock();
guard.signal();
});
guard.wait();

View file

@ -1355,13 +1355,8 @@ impl<T> Drop for MoveItems<T> {
#[cfg(test)]
mod tests {
use super::Vec;
use iter::{Iterator, range, Extendable};
use mem::{drop, size_of};
use ops::Drop;
use option::{Some, None};
use container::Container;
use slice::{Vector, MutableVector, ImmutableVector};
use prelude::*;
use mem::size_of;
#[test]
fn test_small_vec_struct() {

View file

@ -51,54 +51,9 @@ impl<S:Send,R:Send> DuplexStream<S, R> {
}
}
/// An extension of `pipes::stream` that provides synchronous message sending.
pub struct SyncSender<S> { priv duplex_stream: DuplexStream<S, ()> }
/// An extension of `pipes::stream` that acknowledges each message received.
pub struct SyncReceiver<R> { priv duplex_stream: DuplexStream<(), R> }
impl<S: Send> SyncSender<S> {
pub fn send(&self, val: S) {
assert!(self.try_send(val), "SyncSender.send: receiving port closed");
}
/// Sends a message, or report if the receiver has closed the connection
/// before receiving.
pub fn try_send(&self, val: S) -> bool {
self.duplex_stream.try_send(val) && self.duplex_stream.recv_opt().is_some()
}
}
impl<R: Send> SyncReceiver<R> {
pub fn recv(&self) -> R {
self.recv_opt().expect("SyncReceiver.recv: sending channel closed")
}
pub fn recv_opt(&self) -> Option<R> {
self.duplex_stream.recv_opt().map(|val| {
self.duplex_stream.try_send(());
val
})
}
pub fn try_recv(&self) -> comm::TryRecvResult<R> {
match self.duplex_stream.try_recv() {
comm::Data(t) => { self.duplex_stream.try_send(()); comm::Data(t) }
state => state,
}
}
}
/// Creates a stream whose channel, upon sending a message, blocks until the
/// message is received.
pub fn rendezvous<T: Send>() -> (SyncReceiver<T>, SyncSender<T>) {
let (chan_stream, port_stream) = duplex();
(SyncReceiver { duplex_stream: port_stream },
SyncSender { duplex_stream: chan_stream })
}
#[cfg(test)]
mod test {
use comm::{duplex, rendezvous};
use comm::{duplex};
#[test]
@ -111,56 +66,4 @@ mod test {
assert!(left.recv() == 123);
assert!(right.recv() == ~"abc");
}
#[test]
pub fn basic_rendezvous_test() {
let (port, chan) = rendezvous();
spawn(proc() {
chan.send("abc");
});
assert!(port.recv() == "abc");
}
#[test]
fn recv_a_lot() {
// Rendezvous streams should be able to handle any number of messages being sent
let (port, chan) = rendezvous();
spawn(proc() {
for _ in range(0, 10000) { chan.send(()); }
});
for _ in range(0, 10000) { port.recv(); }
}
#[test]
fn send_and_fail_and_try_recv() {
let (port, chan) = rendezvous();
spawn(proc() {
chan.duplex_stream.send(()); // Can't access this field outside this module
fail!()
});
port.recv()
}
#[test]
fn try_send_and_recv_then_fail_before_ack() {
let (port, chan) = rendezvous();
spawn(proc() {
port.duplex_stream.recv();
fail!()
});
chan.try_send(());
}
#[test]
#[should_fail]
fn send_and_recv_then_fail_before_ack() {
let (port, chan) = rendezvous();
spawn(proc() {
port.duplex_stream.recv();
fail!()
});
chan.send(());
}
}

View file

@ -25,7 +25,7 @@
#[cfg(test)]
#[phase(syntax, link)] extern crate log;
pub use comm::{DuplexStream, SyncSender, SyncReceiver, rendezvous, duplex};
pub use comm::{DuplexStream, duplex};
pub use task_pool::TaskPool;
pub use future::Future;
pub use arc::{Arc, Weak};