core::comm: Modernize constructors to use new

This commit is contained in:
Brian Anderson 2013-04-16 23:45:29 -07:00
parent bc60d84507
commit decd3901d5
32 changed files with 80 additions and 68 deletions

View file

@ -236,7 +236,7 @@ Instead we can use a `SharedChan`, a type that allows a single
use core::comm::{stream, SharedChan};
let (port, chan) = stream();
let chan = SharedChan(chan);
let chan = SharedChan::new(chan);
for uint::range(0, 3) |init_val| {
// Create a new channel handle to distribute to the child task

View file

@ -73,7 +73,7 @@ pub fn run(lib_path: ~str,
writeclose(pipe_in.out, input);
let p = comm::PortSet();
let p = comm::PortSet::new();
let ch = p.chan();
do task::spawn_sched(task::SingleThreaded) || {
let errput = readclose(pipe_err.in);

View file

@ -19,6 +19,7 @@ use option::{Option, Some, None};
use uint;
use unstable;
use vec;
use unstable::Exclusive;
use pipes::{recv, try_recv, wait_many, peek, PacketHeader};
@ -218,13 +219,14 @@ pub struct PortSet<T> {
mut ports: ~[Port<T>],
}
pub fn PortSet<T: Owned>() -> PortSet<T>{
PortSet {
ports: ~[]
}
}
pub impl<T: Owned> PortSet<T> {
fn new() -> PortSet<T> {
PortSet {
ports: ~[]
}
}
fn add(&self, port: Port<T>) {
self.ports.push(port)
}
@ -279,12 +281,21 @@ impl<T: Owned> Peekable<T> for PortSet<T> {
}
/// A channel that can be shared between many senders.
pub type SharedChan<T> = unstable::Exclusive<Chan<T>>;
pub struct SharedChan<T> {
ch: Exclusive<Chan<T>>
}
impl<T: Owned> SharedChan<T> {
/// Converts a `chan` into a `shared_chan`.
pub fn new(c: Chan<T>) -> SharedChan<T> {
SharedChan { ch: unstable::exclusive(c) }
}
}
impl<T: Owned> GenericChan<T> for SharedChan<T> {
fn send(&self, x: T) {
let mut xx = Some(x);
do self.with_imm |chan| {
do self.ch.with_imm |chan| {
let mut x = None;
x <-> xx;
chan.send(x.unwrap())
@ -295,7 +306,7 @@ impl<T: Owned> GenericChan<T> for SharedChan<T> {
impl<T: Owned> GenericSmartChan<T> for SharedChan<T> {
fn try_send(&self, x: T) -> bool {
let mut xx = Some(x);
do self.with_imm |chan| {
do self.ch.with_imm |chan| {
let mut x = None;
x <-> xx;
chan.try_send(x.unwrap())
@ -303,9 +314,10 @@ impl<T: Owned> GenericSmartChan<T> for SharedChan<T> {
}
}
/// Converts a `chan` into a `shared_chan`.
pub fn SharedChan<T:Owned>(c: Chan<T>) -> SharedChan<T> {
unstable::exclusive(c)
impl<T: Owned> ::clone::Clone for SharedChan<T> {
fn clone(&self) -> SharedChan<T> {
SharedChan { ch: self.ch.clone() }
}
}
/*proto! oneshot (

View file

@ -405,7 +405,7 @@ pub fn program_output(prog: &str, args: &[~str]) -> ProgramOutput {
// or the other. FIXME (#2625): Surely there's a much more
// clever way to do this.
let (p, ch) = stream();
let ch = SharedChan(ch);
let ch = SharedChan::new(ch);
let ch_clone = ch.clone();
do task::spawn_sched(task::SingleThreaded) {
let errput = readclose(pipe_err.in);

View file

@ -657,7 +657,7 @@ fn test_cant_dup_task_builder() {
#[test] #[ignore(cfg(windows))]
fn test_spawn_unlinked_unsup_no_fail_down() { // grandchild sends on a port
let (po, ch) = stream();
let ch = SharedChan(ch);
let ch = SharedChan::new(ch);
do spawn_unlinked {
let ch = ch.clone();
do spawn_unlinked {
@ -881,7 +881,7 @@ fn test_spawn_sched_no_threads() {
#[test]
fn test_spawn_sched() {
let (po, ch) = stream::<()>();
let ch = SharedChan(ch);
let ch = SharedChan::new(ch);
fn f(i: int, ch: SharedChan<()>) {
let parent_sched_id = unsafe { rt::rust_get_sched_id() };

View file

@ -69,7 +69,7 @@ fn create_global_service() -> ~WeakTaskService {
debug!("creating global weak task service");
let (port, chan) = stream::<ServiceMsg>();
let port = Cell(port);
let chan = SharedChan(chan);
let chan = SharedChan::new(chan);
let chan_clone = chan.clone();
do task().unlinked().spawn {

View file

@ -307,7 +307,7 @@ bug and need to present an error.
pub fn monitor(+f: ~fn(diagnostic::Emitter)) {
use core::comm::*;
let (p, ch) = stream();
let ch = SharedChan(ch);
let ch = SharedChan::new(ch);
let ch_capture = ch.clone();
match do task::try || {
let ch = ch_capture.clone();

View file

@ -69,7 +69,7 @@ fn run<T>(owner: SrvOwner<T>, source: ~str, parse: Parser) -> T {
}
let srv_ = Srv {
ch: SharedChan(ch)
ch: SharedChan::new(ch)
};
let res = owner(srv_.clone());

View file

@ -232,7 +232,7 @@ fn write_file(path: &Path, s: ~str) {
pub fn future_writer_factory(
) -> (WriterFactory, Port<(doc::Page, ~str)>) {
let (markdown_po, markdown_ch) = stream();
let markdown_ch = SharedChan(markdown_ch);
let markdown_ch = SharedChan::new(markdown_ch);
let writer_factory: WriterFactory = |page| {
let (writer_po, writer_ch) = comm::stream();
let markdown_ch = markdown_ch.clone();

View file

@ -50,7 +50,7 @@ pub fn run(
let (result_port, result_chan) = stream();
let (page_port, page_chan) = stream();
let page_chan = SharedChan(page_chan);
let page_chan = SharedChan::new(page_chan);
do task::spawn {
result_chan.send(make_doc_from_pages(&page_port));
};

View file

@ -499,7 +499,7 @@ mod tests {
let (p, c) = comm::stream();
do task::spawn() || {
let p = comm::PortSet();
let p = comm::PortSet::new();
c.send(p.chan());
let arc_v = p.recv();

View file

@ -113,7 +113,7 @@ enum IpGetAddrErr {
pub fn get_addr(node: &str, iotask: &iotask)
-> result::Result<~[IpAddr], IpGetAddrErr> {
let (output_po, output_ch) = stream();
let mut output_ch = Some(SharedChan(output_ch));
let mut output_ch = Some(SharedChan::new(output_ch));
do str::as_buf(node) |node_ptr, len| {
let output_ch = output_ch.swap_unwrap();
debug!("slice len %?", len);

View file

@ -150,16 +150,16 @@ pub fn connect(input_ip: ip::IpAddr, port: uint,
-> result::Result<TcpSocket, TcpConnectErrData> {
unsafe {
let (result_po, result_ch) = stream::<ConnAttempt>();
let result_ch = SharedChan(result_ch);
let result_ch = SharedChan::new(result_ch);
let (closed_signal_po, closed_signal_ch) = stream::<()>();
let closed_signal_ch = SharedChan(closed_signal_ch);
let closed_signal_ch = SharedChan::new(closed_signal_ch);
let conn_data = ConnectReqData {
result_ch: result_ch,
closed_signal_ch: closed_signal_ch
};
let conn_data_ptr = ptr::addr_of(&conn_data);
let (reader_po, reader_ch) = stream::<Result<~[u8], TcpErrData>>();
let reader_ch = SharedChan(reader_ch);
let reader_ch = SharedChan::new(reader_ch);
let stream_handle_ptr = malloc_uv_tcp_t();
*(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t();
let socket_data = @TcpSocketData {
@ -517,7 +517,7 @@ pub fn accept(new_conn: TcpNewConnection)
server_handle_ptr) as *TcpListenFcData;
let (reader_po, reader_ch) = stream::<
Result<~[u8], TcpErrData>>();
let reader_ch = SharedChan(reader_ch);
let reader_ch = SharedChan::new(reader_ch);
let iotask = &(*server_data_ptr).iotask;
let stream_handle_ptr = malloc_uv_tcp_t();
*(stream_handle_ptr as *mut uv::ll::uv_tcp_t) =
@ -537,7 +537,7 @@ pub fn accept(new_conn: TcpNewConnection)
(*client_socket_data_ptr).stream_handle_ptr;
let (result_po, result_ch) = stream::<Option<TcpErrData>>();
let result_ch = SharedChan(result_ch);
let result_ch = SharedChan::new(result_ch);
// UNSAFE LIBUV INTERACTION BEGIN
// .. normally this happens within the context of
@ -646,9 +646,9 @@ fn listen_common(host_ip: ip::IpAddr,
on_connect_cb: ~fn(*uv::ll::uv_tcp_t))
-> result::Result<(), TcpListenErrData> {
let (stream_closed_po, stream_closed_ch) = stream::<()>();
let stream_closed_ch = SharedChan(stream_closed_ch);
let stream_closed_ch = SharedChan::new(stream_closed_ch);
let (kill_po, kill_ch) = stream::<Option<TcpErrData>>();
let kill_ch = SharedChan(kill_ch);
let kill_ch = SharedChan::new(kill_ch);
let server_stream = uv::ll::tcp_t();
let server_stream_ptr = ptr::addr_of(&server_stream);
let server_data: TcpListenFcData = TcpListenFcData {
@ -997,7 +997,7 @@ impl io::Writer for TcpSocketBuf {
fn tear_down_socket_data(socket_data: @TcpSocketData) {
unsafe {
let (closed_po, closed_ch) = stream::<()>();
let closed_ch = SharedChan(closed_ch);
let closed_ch = SharedChan::new(closed_ch);
let close_data = TcpSocketCloseData {
closed_ch: closed_ch
};
@ -1147,7 +1147,7 @@ fn write_common_impl(socket_data_ptr: *TcpSocketData,
vec::len(raw_write_data)) ];
let write_buf_vec_ptr = ptr::addr_of(&write_buf_vec);
let (result_po, result_ch) = stream::<TcpWriteResult>();
let result_ch = SharedChan(result_ch);
let result_ch = SharedChan::new(result_ch);
let write_data = WriteReqData {
result_ch: result_ch
};
@ -1554,7 +1554,7 @@ mod test {
let (server_result_po, server_result_ch) = stream::<~str>();
let (cont_po, cont_ch) = stream::<()>();
let cont_ch = SharedChan(cont_ch);
let cont_ch = SharedChan::new(cont_ch);
// server
let hl_loop_clone = hl_loop.clone();
do task::spawn_sched(task::ManualThreads(1u)) {
@ -1592,7 +1592,7 @@ mod test {
let expected_resp = ~"pong";
let (cont_po, cont_ch) = stream::<()>();
let cont_ch = SharedChan(cont_ch);
let cont_ch = SharedChan::new(cont_ch);
// server
let hl_loop_clone = hl_loop.clone();
do task::spawn_sched(task::ManualThreads(1u)) {
@ -1652,7 +1652,7 @@ mod test {
let expected_resp = ~"pong";
let (cont_po, cont_ch) = stream::<()>();
let cont_ch = SharedChan(cont_ch);
let cont_ch = SharedChan::new(cont_ch);
// server
let hl_loop_clone = hl_loop.clone();
do task::spawn_sched(task::ManualThreads(1u)) {
@ -1717,7 +1717,7 @@ mod test {
let (server_result_po, server_result_ch) = stream::<~str>();
let (cont_po, cont_ch) = stream::<()>();
let cont_ch = SharedChan(cont_ch);
let cont_ch = SharedChan::new(cont_ch);
// server
let iotask_clone = iotask.clone();
do task::spawn_sched(task::ManualThreads(1u)) {
@ -1764,7 +1764,7 @@ mod test {
let expected_resp = ~"A string\nwith multiple lines\n";
let (cont_po, cont_ch) = stream::<()>();
let cont_ch = SharedChan(cont_ch);
let cont_ch = SharedChan::new(cont_ch);
// server
let hl_loop_clone = hl_loop.clone();
do task::spawn_sched(task::ManualThreads(1u)) {
@ -1813,7 +1813,7 @@ mod test {
cont_ch: SharedChan<()>,
iotask: &IoTask) -> ~str {
let (server_po, server_ch) = stream::<~str>();
let server_ch = SharedChan(server_ch);
let server_ch = SharedChan::new(server_ch);
let server_ip_addr = ip::v4::parse_addr(server_ip);
let listen_result = listen(server_ip_addr, server_port, 128,
iotask,

View file

@ -446,7 +446,7 @@ fn run_tests(opts: &TestOpts,
let mut pending = 0;
let (p, ch) = stream();
let ch = SharedChan(ch);
let ch = SharedChan::new(ch);
while pending > 0 || !remaining.is_empty() {
while pending < concurrency && !remaining.is_empty() {
@ -797,7 +797,7 @@ mod tests {
testfn: DynTestFn(|| f()),
};
let (p, ch) = stream();
let ch = SharedChan(ch);
let ch = SharedChan::new(ch);
run_test(false, desc, ch);
let (_, res) = p.recv();
assert!(res != TrOk);
@ -815,7 +815,7 @@ mod tests {
testfn: DynTestFn(|| f()),
};
let (p, ch) = stream();
let ch = SharedChan(ch);
let ch = SharedChan::new(ch);
run_test(false, desc, ch);
let (_, res) = p.recv();
assert!(res == TrIgnored);
@ -834,7 +834,7 @@ mod tests {
testfn: DynTestFn(|| f()),
};
let (p, ch) = stream();
let ch = SharedChan(ch);
let ch = SharedChan::new(ch);
run_test(false, desc, ch);
let (_, res) = p.recv();
assert!(res == TrOk);
@ -852,7 +852,7 @@ mod tests {
testfn: DynTestFn(|| f()),
};
let (p, ch) = stream();
let ch = SharedChan(ch);
let ch = SharedChan::new(ch);
run_test(false, desc, ch);
let (_, res) = p.recv();
assert!(res == TrFailed);

View file

@ -43,7 +43,7 @@ pub fn delayed_send<T:Owned>(iotask: &IoTask,
ch: &Chan<T>,
val: T) {
let (timer_done_po, timer_done_ch) = stream::<()>();
let timer_done_ch = SharedChan(timer_done_ch);
let timer_done_ch = SharedChan::new(timer_done_ch);
let timer = uv::ll::timer_t();
let timer_ptr = ptr::addr_of(&timer);
do iotask::interact(iotask) |loop_ptr| {
@ -199,7 +199,7 @@ mod test {
#[test]
fn test_gl_timer_sleep_stress2() {
let (po, ch) = stream();
let ch = SharedChan(ch);
let ch = SharedChan::new(ch);
let hl_loop = &uv::global_loop::get();
let repeat = 20u;

View file

@ -211,7 +211,7 @@ mod test {
#[ignore]
fn test_stress_gl_uv_global_loop_high_level_global_timer() {
let (exit_po, exit_ch) = stream::<()>();
let exit_ch = SharedChan(exit_ch);
let exit_ch = SharedChan::new(exit_ch);
let cycles = 5000u;
for iter::repeat(cycles) {
let exit_ch_clone = exit_ch.clone();

View file

@ -126,7 +126,7 @@ fn run_loop(iotask_ch: &Chan<IoTask>) {
// while we dwell in the I/O loop
let iotask = IoTask{
async_handle: async_handle,
op_chan: SharedChan(msg_ch)
op_chan: SharedChan::new(msg_ch)
};
iotask_ch.send(iotask);
@ -230,7 +230,7 @@ fn impl_uv_iotask_async(iotask: &IoTask) {
let (exit_po, exit_ch) = stream::<()>();
let ah_data = AhData {
iotask: iotask.clone(),
exit_ch: SharedChan(exit_ch)
exit_ch: SharedChan::new(exit_ch)
};
let ah_data_ptr: *AhData = unsafe {
ptr::to_unsafe_ptr(&ah_data)
@ -293,7 +293,7 @@ fn test_uv_iotask_async() {
// loop lives until, at least, all of the
// impl_uv_hl_async() runs have been called, at least.
let (work_exit_po, work_exit_ch) = stream::<()>();
let work_exit_ch = SharedChan(work_exit_ch);
let work_exit_ch = SharedChan::new(work_exit_ch);
for iter::repeat(7u) {
let iotask_clone = iotask.clone();
let work_exit_ch_clone = work_exit_ch.clone();

View file

@ -1717,12 +1717,12 @@ mod test {
let kill_server_msg = ~"does a dog have buddha nature?";
let server_resp_msg = ~"mu!";
let (client_port, client_chan) = stream::<~str>();
let client_chan = SharedChan(client_chan);
let client_chan = SharedChan::new(client_chan);
let (server_port, server_chan) = stream::<~str>();
let server_chan = SharedChan(server_chan);
let server_chan = SharedChan::new(server_chan);
let (continue_port, continue_chan) = stream::<bool>();
let continue_chan = SharedChan(continue_chan);
let continue_chan = SharedChan::new(continue_chan);
let kill_server_msg_copy = copy kill_server_msg;
let server_resp_msg_copy = copy server_resp_msg;

View file

@ -58,7 +58,7 @@ fn run(args: &[~str]) {
let (from_child, to_parent) = comm::stream();
let (from_parent, to_child) = comm::stream();
let to_child = SharedChan(to_child);
let to_child = SharedChan::new(to_child);
let size = uint::from_str(args[1]).get();
let workers = uint::from_str(args[2]).get();

View file

@ -53,7 +53,7 @@ fn server(requests: PortSet<request>, responses: Chan<uint>) {
fn run(args: &[~str]) {
let (from_child, to_parent) = stream();
let (from_parent_, to_child) = stream();
let from_parent = PortSet();
let from_parent = PortSet::new();
from_parent.add(from_parent_);
let size = uint::from_str(args[1]).get();

View file

@ -137,9 +137,9 @@ fn rendezvous(nn: uint, set: ~[color]) {
// these ports will allow us to hear from the creatures
let (from_creatures, to_rendezvous) = stream::<CreatureInfo>();
let to_rendezvous = SharedChan(to_rendezvous);
let to_rendezvous = SharedChan::new(to_rendezvous);
let (from_creatures_log, to_rendezvous_log) = stream::<~str>();
let to_rendezvous_log = SharedChan(to_rendezvous_log);
let to_rendezvous_log = SharedChan::new(to_rendezvous_log);
// these channels will be passed to the creatures so they can talk to us

View file

@ -173,7 +173,7 @@ fn main() {
else { uint::from_str(args[1]).get() };
let (pport, pchan) = comm::stream();
let pchan = comm::SharedChan(pchan);
let pchan = comm::SharedChan::new(pchan);
for uint::range(0_u, size) |j| {
let cchan = pchan.clone();
do task::spawn { cchan.send(chanmb(j, size, depth)) };

View file

@ -38,7 +38,7 @@ fn fib(n: int) -> int {
} else if n <= 2 {
c.send(1);
} else {
let p = PortSet();
let p = PortSet::new();
let ch = p.chan();
task::spawn(|| pfib(ch, n - 1) );
let ch = p.chan();

View file

@ -26,7 +26,7 @@ use core::comm::*;
fn grandchild_group(num_tasks: uint) {
let (po, ch) = stream();
let ch = SharedChan(ch);
let ch = SharedChan::new(ch);
for num_tasks.times {
let ch = ch.clone();

View file

@ -60,7 +60,7 @@ mod map_reduce {
pub fn map_reduce(inputs: ~[~str]) {
let (ctrl_port, ctrl_chan) = stream();
let ctrl_chan = SharedChan(ctrl_chan);
let ctrl_chan = SharedChan::new(ctrl_chan);
// This task becomes the master control task. It spawns others
// to do the rest.

View file

@ -11,7 +11,7 @@
// xfail-fast
pub fn main() {
let po = comm::PortSet();
let po = comm::PortSet::new();
// Spawn 10 tasks each sending us back one int.
let mut i = 10;

View file

@ -32,7 +32,7 @@ fn test00() {
debug!("Creating tasks");
let po = comm::PortSet();
let po = comm::PortSet::new();
let mut i: int = 0;

View file

@ -15,7 +15,7 @@ pub fn main() { test00(); }
fn test00() {
let mut r: int = 0;
let mut sum: int = 0;
let p = comm::PortSet();
let p = comm::PortSet::new();
let c0 = p.chan();
let c1 = p.chan();
let c2 = p.chan();

View file

@ -23,7 +23,7 @@ fn test00_start(c: comm::Chan<int>, start: int, number_of_messages: int) {
fn test00() {
let mut r: int = 0;
let mut sum: int = 0;
let p = comm::PortSet();
let p = comm::PortSet::new();
let number_of_messages: int = 10;
let c = p.chan();

View file

@ -22,7 +22,7 @@ fn test00_start(c: &comm::Chan<int>, number_of_messages: int) {
fn test00() {
let r: int = 0;
let mut sum: int = 0;
let p = comm::PortSet();
let p = comm::PortSet::new();
let number_of_messages: int = 10;
let ch = p.chan();

View file

@ -16,7 +16,7 @@ fn child(c: &SharedChan<~uint>, i: uint) {
pub fn main() {
let (p, ch) = stream();
let ch = SharedChan(ch);
let ch = SharedChan::new(ch);
let n = 100u;
let mut expected = 0u;
for uint::range(0u, n) |i| {

View file

@ -39,7 +39,7 @@ fn f(c: SharedChan<bool>) {
pub fn main() {
let (p, c) = stream();
let c = SharedChan(c);
let c = SharedChan::new(c);
task::spawn_unlinked(|| f(c.clone()) );
error!("hiiiiiiiii");
assert!(p.recv());