Add std::comm with DuplexStream

This commit is contained in:
Eric Holk 2012-08-14 14:17:27 -07:00
parent 91622d0310
commit 924e787119
11 changed files with 251 additions and 169 deletions

View file

@ -420,12 +420,11 @@ mod tests {
let v = ~[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let arc_v = arc::arc(v);
let p = port();
let c = chan(p);
let (c, p) = pipes::stream();
do task::spawn() {
let p = port();
c.send(chan(p));
let p = pipes::port_set();
c.send(p.chan());
let arc_v = p.recv();

76
src/libstd/comm.rs Normal file
View file

@ -0,0 +1,76 @@
/*!
Higher level communication abstractions.
*/
// NB: transitionary, de-mode-ing.
#[forbid(deprecated_mode)];
#[forbid(deprecated_pattern)];
// Make sure we follow the new conventions
#[forbid(non_camel_case_types)];
import pipes::{channel, recv, chan, port, selectable};
export DuplexStream;
/// An extension of `pipes::stream` that allows both sending and receiving.
struct DuplexStream<T: send, U: send> : channel<T>, recv<U>, selectable {
priv chan: chan<T>;
priv port: port<U>;
fn send(+x: T) {
self.chan.send(x)
}
fn try_send(+x: T) -> bool {
self.chan.try_send(x)
}
fn recv() -> U {
self.port.recv()
}
fn try_recv() -> option<U> {
self.port.try_recv()
}
pure fn peek() -> bool {
self.port.peek()
}
pure fn header() -> *pipes::packet_header {
self.port.header()
}
}
/// Creates a bidirectional stream.
fn DuplexStream<T: send, U: send>()
-> (DuplexStream<T, U>, DuplexStream<U, T>)
{
let (c2, p1) = pipes::stream();
let (c1, p2) = pipes::stream();
(DuplexStream {
chan: c1,
port: p1
},
DuplexStream {
chan: c2,
port: p2
})
}
#[cfg(test)]
mod test {
#[test]
fn DuplexStream1() {
let (left, right) = DuplexStream();
left.send(~"abc");
right.send(123);
assert left.recv() == 123;
assert right.recv() == ~"abc";
}
}

View file

@ -19,6 +19,7 @@ import create_uv_getaddrinfo_t = uv::ll::getaddrinfo_t;
import set_data_for_req = uv::ll::set_data_for_req;
import get_data_for_req = uv::ll::get_data_for_req;
import ll = uv::ll;
import comm = core::comm;
export ip_addr, parse_addr_err;
export format_addr;
@ -85,7 +86,7 @@ enum ip_get_addr_err {
*/
fn get_addr(++node: ~str, iotask: iotask)
-> result::result<~[ip_addr], ip_get_addr_err> unsafe {
do comm::listen |output_ch| {
do core::comm::listen |output_ch| {
do str::as_buf(node) |node_ptr, len| {
log(debug, fmt!{"slice len %?", len});
let handle = create_uv_getaddrinfo_t();

View file

@ -9,6 +9,7 @@ import future_spawn = future::spawn;
import result::*;
import libc::size_t;
import io::{Reader, Writer};
import comm = core::comm;
// tcp interfaces
export tcp_socket;
@ -120,19 +121,19 @@ enum tcp_connect_err_data {
fn connect(-input_ip: ip::ip_addr, port: uint,
iotask: iotask)
-> result::result<tcp_socket, tcp_connect_err_data> unsafe {
let result_po = comm::port::<conn_attempt>();
let closed_signal_po = comm::port::<()>();
let result_po = core::comm::port::<conn_attempt>();
let closed_signal_po = core::comm::port::<()>();
let conn_data = {
result_ch: comm::chan(result_po),
closed_signal_ch: comm::chan(closed_signal_po)
result_ch: core::comm::chan(result_po),
closed_signal_ch: core::comm::chan(closed_signal_po)
};
let conn_data_ptr = ptr::addr_of(conn_data);
let reader_po = comm::port::<result::result<~[u8], tcp_err_data>>();
let reader_po = core::comm::port::<result::result<~[u8], tcp_err_data>>();
let stream_handle_ptr = malloc_uv_tcp_t();
*(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t();
let socket_data = @{
reader_po: reader_po,
reader_ch: comm::chan(reader_po),
reader_ch: core::comm::chan(reader_po),
stream_handle_ptr: stream_handle_ptr,
connect_req: uv::ll::connect_t(),
write_req: uv::ll::write_t(),
@ -202,7 +203,7 @@ fn connect(-input_ip: ip::ip_addr, port: uint,
// immediate connect failure.. probably a garbage
// ip or somesuch
let err_data = uv::ll::get_last_err_data(loop_ptr);
comm::send((*conn_data_ptr).result_ch,
core::comm::send((*conn_data_ptr).result_ch,
conn_failure(err_data.to_tcp_err()));
uv::ll::set_data_for_uv_handle(stream_handle_ptr,
conn_data_ptr);
@ -215,18 +216,18 @@ fn connect(-input_ip: ip::ip_addr, port: uint,
_ => {
// failure to create a tcp handle
let err_data = uv::ll::get_last_err_data(loop_ptr);
comm::send((*conn_data_ptr).result_ch,
core::comm::send((*conn_data_ptr).result_ch,
conn_failure(err_data.to_tcp_err()));
}
}
};
match comm::recv(result_po) {
match core::comm::recv(result_po) {
conn_success => {
log(debug, ~"tcp::connect - received success on result_po");
result::ok(tcp_socket(socket_data))
}
conn_failure(err_data) => {
comm::recv(closed_signal_po);
core::comm::recv(closed_signal_po);
log(debug, ~"tcp::connect - received failure on result_po");
// still have to free the malloc'd stream handle..
rustrt::rust_uv_current_kernel_free(stream_handle_ptr
@ -311,8 +312,9 @@ fn write_future(sock: tcp_socket, raw_write_data: ~[u8])
* # Returns
*
* * A `result` instance that will either contain a
* `comm::port<tcp_read_result>` that the user can read (and optionally, loop
* on) from until `read_stop` is called, or a `tcp_err_data` record
* `core::comm::port<tcp_read_result>` that the user can read (and
* optionally, loop on) from until `read_stop` is called, or a
* `tcp_err_data` record
*/
fn read_start(sock: tcp_socket)
-> result::result<comm::Port<
@ -428,23 +430,23 @@ fn read_future(sock: tcp_socket, timeout_msecs: uint)
* }
* // this callback is ran when a new connection arrives
* {|new_conn, kill_ch|
* let cont_po = comm::port::<option<tcp_err_data>>();
* let cont_ch = comm::chan(cont_po);
* let cont_po = core::comm::port::<option<tcp_err_data>>();
* let cont_ch = core::comm::chan(cont_po);
* task::spawn {||
* let accept_result = net::tcp::accept(new_conn);
* if accept_result.is_err() {
* comm::send(cont_ch, result::get_err(accept_result));
* core::comm::send(cont_ch, result::get_err(accept_result));
* // fail?
* }
* else {
* let sock = result::get(accept_result);
* comm::send(cont_ch, true);
* core::comm::send(cont_ch, true);
* // do work here
* }
* };
* match comm::recv(cont_po) {
* match core::comm::recv(cont_po) {
* // shut down listen()
* some(err_data) { comm::send(kill_chan, some(err_data)) }
* some(err_data) { core::comm::send(kill_chan, some(err_data)) }
* // wait for next connection
* none {}
* }
@ -470,13 +472,13 @@ fn accept(new_conn: tcp_new_connection)
new_tcp_conn(server_handle_ptr) => {
let server_data_ptr = uv::ll::get_data_for_uv_handle(
server_handle_ptr) as *tcp_listen_fc_data;
let reader_po = comm::port::<result::result<~[u8], tcp_err_data>>();
let reader_po = core::comm::port();
let iotask = (*server_data_ptr).iotask;
let stream_handle_ptr = malloc_uv_tcp_t();
*(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t();
let client_socket_data = @{
reader_po: reader_po,
reader_ch: comm::chan(reader_po),
reader_ch: core::comm::chan(reader_po),
stream_handle_ptr : stream_handle_ptr,
connect_req : uv::ll::connect_t(),
write_req : uv::ll::write_t(),
@ -486,8 +488,8 @@ fn accept(new_conn: tcp_new_connection)
let client_stream_handle_ptr =
(*client_socket_data_ptr).stream_handle_ptr;
let result_po = comm::port::<option<tcp_err_data>>();
let result_ch = comm::chan(result_po);
let result_po = core::comm::port::<option<tcp_err_data>>();
let result_ch = core::comm::chan(result_po);
// UNSAFE LIBUV INTERACTION BEGIN
// .. normally this happens within the context of
@ -509,23 +511,23 @@ fn accept(new_conn: tcp_new_connection)
uv::ll::set_data_for_uv_handle(client_stream_handle_ptr,
client_socket_data_ptr
as *libc::c_void);
comm::send(result_ch, none);
core::comm::send(result_ch, none);
}
_ => {
log(debug, ~"failed to accept client conn");
comm::send(result_ch, some(
core::comm::send(result_ch, some(
uv::ll::get_last_err_data(loop_ptr).to_tcp_err()));
}
}
}
_ => {
log(debug, ~"failed to init client stream");
comm::send(result_ch, some(
core::comm::send(result_ch, some(
uv::ll::get_last_err_data(loop_ptr).to_tcp_err()));
}
}
// UNSAFE LIBUV INTERACTION END
match comm::recv(result_po) {
match core::comm::recv(result_po) {
some(err_data) => result::err(err_data),
none => result::ok(tcp_socket(client_socket_data))
}
@ -551,8 +553,8 @@ fn accept(new_conn: tcp_new_connection)
* callback's arguments are:
* * `new_conn` - an opaque type that can be passed to
* `net::tcp::accept` in order to be converted to a `tcp_socket`.
* * `kill_ch` - channel of type `comm::chan<option<tcp_err_data>>`. this
* channel can be used to send a message to cause `listen` to begin
* * `kill_ch` - channel of type `core::comm::chan<option<tcp_err_data>>`.
* this channel can be used to send a message to cause `listen` to begin
* closing the underlying libuv data structures.
*
* # returns
@ -583,14 +585,14 @@ fn listen_common(-host_ip: ip::ip_addr, port: uint, backlog: uint,
on_establish_cb: fn~(comm::Chan<option<tcp_err_data>>),
-on_connect_cb: fn~(*uv::ll::uv_tcp_t))
-> result::result<(), tcp_listen_err_data> unsafe {
let stream_closed_po = comm::port::<()>();
let kill_po = comm::port::<option<tcp_err_data>>();
let kill_ch = comm::chan(kill_po);
let stream_closed_po = core::comm::port::<()>();
let kill_po = core::comm::port::<option<tcp_err_data>>();
let kill_ch = core::comm::chan(kill_po);
let server_stream = uv::ll::tcp_t();
let server_stream_ptr = ptr::addr_of(server_stream);
let server_data = {
server_stream_ptr: server_stream_ptr,
stream_closed_ch: comm::chan(stream_closed_po),
stream_closed_ch: core::comm::chan(stream_closed_po),
kill_ch: kill_ch,
on_connect_cb: on_connect_cb,
iotask: iotask,
@ -598,13 +600,13 @@ fn listen_common(-host_ip: ip::ip_addr, port: uint, backlog: uint,
};
let server_data_ptr = ptr::addr_of(server_data);
let setup_result = do comm::listen |setup_ch| {
let setup_result = do core::comm::listen |setup_ch| {
// this is to address a compiler warning about
// an implicit copy.. it seems that double nested
// will defeat a move sigil, as is done to the host_ip
// arg above.. this same pattern works w/o complaint in
// tcp::connect (because the iotask::interact cb isn't
// nested within a comm::listen block)
// nested within a core::comm::listen block)
let loc_ip = copy(host_ip);
do iotask::interact(iotask) |loop_ptr| {
match uv::ll::tcp_init(loop_ptr, server_stream_ptr) {
@ -632,25 +634,25 @@ fn listen_common(-host_ip: ip::ip_addr, port: uint, backlog: uint,
match uv::ll::listen(server_stream_ptr,
backlog as libc::c_int,
tcp_lfc_on_connection_cb) {
0i32 => comm::send(setup_ch, none),
0i32 => core::comm::send(setup_ch, none),
_ => {
log(debug, ~"failure to uv_listen()");
let err_data = uv::ll::get_last_err_data(loop_ptr);
comm::send(setup_ch, some(err_data));
core::comm::send(setup_ch, some(err_data));
}
}
}
_ => {
log(debug, ~"failure to uv_tcp_bind");
let err_data = uv::ll::get_last_err_data(loop_ptr);
comm::send(setup_ch, some(err_data));
core::comm::send(setup_ch, some(err_data));
}
}
}
_ => {
log(debug, ~"failure to uv_tcp_init");
let err_data = uv::ll::get_last_err_data(loop_ptr);
comm::send(setup_ch, some(err_data));
core::comm::send(setup_ch, some(err_data));
}
}
};
@ -684,7 +686,7 @@ fn listen_common(-host_ip: ip::ip_addr, port: uint, backlog: uint,
}
none => {
on_establish_cb(kill_ch);
let kill_result = comm::recv(kill_po);
let kill_result = core::comm::recv(kill_po);
do iotask::interact(iotask) |loop_ptr| {
log(debug, fmt!{"tcp::listen post-kill recv hl interact %?",
loop_ptr});
@ -835,8 +837,8 @@ impl @tcp_socket_buf: io::Writer {
// INTERNAL API
fn tear_down_socket_data(socket_data: @tcp_socket_data) unsafe {
let closed_po = comm::port::<()>();
let closed_ch = comm::chan(closed_po);
let closed_po = core::comm::port::<()>();
let closed_ch = core::comm::chan(closed_po);
let close_data = {
closed_ch: closed_ch
};
@ -849,7 +851,7 @@ fn tear_down_socket_data(socket_data: @tcp_socket_data) unsafe {
close_data_ptr);
uv::ll::close(stream_handle_ptr, tcp_socket_dtor_close_cb);
};
comm::recv(closed_po);
core::comm::recv(closed_po);
log(debug, fmt!{"about to free socket_data at %?", socket_data});
rustrt::rust_uv_current_kernel_free(stream_handle_ptr
as *libc::c_void);
@ -872,7 +874,7 @@ fn read_common_impl(socket_data: *tcp_socket_data, timeout_msecs: uint)
timer::recv_timeout(
iotask, timeout_msecs, result::get(rs_result))
} else {
some(comm::recv(result::get(rs_result)))
some(core::comm::recv(result::get(rs_result)))
};
log(debug, ~"tcp::read after recv_timeout");
match read_result {
@ -898,23 +900,23 @@ fn read_common_impl(socket_data: *tcp_socket_data, timeout_msecs: uint)
fn read_stop_common_impl(socket_data: *tcp_socket_data) ->
result::result<(), tcp_err_data> unsafe {
let stream_handle_ptr = (*socket_data).stream_handle_ptr;
let stop_po = comm::port::<option<tcp_err_data>>();
let stop_ch = comm::chan(stop_po);
let stop_po = core::comm::port::<option<tcp_err_data>>();
let stop_ch = core::comm::chan(stop_po);
do iotask::interact((*socket_data).iotask) |loop_ptr| {
log(debug, ~"in interact cb for tcp::read_stop");
match uv::ll::read_stop(stream_handle_ptr as *uv::ll::uv_stream_t) {
0i32 => {
log(debug, ~"successfully called uv_read_stop");
comm::send(stop_ch, none);
core::comm::send(stop_ch, none);
}
_ => {
log(debug, ~"failure in calling uv_read_stop");
let err_data = uv::ll::get_last_err_data(loop_ptr);
comm::send(stop_ch, some(err_data.to_tcp_err()));
core::comm::send(stop_ch, some(err_data.to_tcp_err()));
}
}
};
match comm::recv(stop_po) {
match core::comm::recv(stop_po) {
some(err_data) => result::err(err_data.to_tcp_err()),
none => result::ok(())
}
@ -925,8 +927,8 @@ fn read_start_common_impl(socket_data: *tcp_socket_data)
-> result::result<comm::Port<
result::result<~[u8], tcp_err_data>>, tcp_err_data> unsafe {
let stream_handle_ptr = (*socket_data).stream_handle_ptr;
let start_po = comm::port::<option<uv::ll::uv_err_data>>();
let start_ch = comm::chan(start_po);
let start_po = core::comm::port::<option<uv::ll::uv_err_data>>();
let start_ch = core::comm::chan(start_po);
log(debug, ~"in tcp::read_start before interact loop");
do iotask::interact((*socket_data).iotask) |loop_ptr| {
log(debug, fmt!{"in tcp::read_start interact cb %?", loop_ptr});
@ -935,16 +937,16 @@ fn read_start_common_impl(socket_data: *tcp_socket_data)
on_tcp_read_cb) {
0i32 => {
log(debug, ~"success doing uv_read_start");
comm::send(start_ch, none);
core::comm::send(start_ch, none);
}
_ => {
log(debug, ~"error attempting uv_read_start");
let err_data = uv::ll::get_last_err_data(loop_ptr);
comm::send(start_ch, some(err_data));
core::comm::send(start_ch, some(err_data));
}
}
};
match comm::recv(start_po) {
match core::comm::recv(start_po) {
some(err_data) => result::err(err_data.to_tcp_err()),
none => result::ok((*socket_data).reader_po)
}
@ -963,9 +965,9 @@ fn write_common_impl(socket_data_ptr: *tcp_socket_data,
vec::unsafe::to_ptr(raw_write_data),
vec::len(raw_write_data)) ];
let write_buf_vec_ptr = ptr::addr_of(write_buf_vec);
let result_po = comm::port::<tcp_write_result>();
let result_po = core::comm::port::<tcp_write_result>();
let write_data = {
result_ch: comm::chan(result_po)
result_ch: core::comm::chan(result_po)
};
let write_data_ptr = ptr::addr_of(write_data);
do iotask::interact((*socket_data_ptr).iotask) |loop_ptr| {
@ -981,7 +983,7 @@ fn write_common_impl(socket_data_ptr: *tcp_socket_data,
_ => {
log(debug, ~"error invoking uv_write()");
let err_data = uv::ll::get_last_err_data(loop_ptr);
comm::send((*write_data_ptr).result_ch,
core::comm::send((*write_data_ptr).result_ch,
tcp_write_error(err_data.to_tcp_err()));
}
}
@ -990,7 +992,7 @@ fn write_common_impl(socket_data_ptr: *tcp_socket_data,
// and waiting here for the write to complete, we should transfer
// ownership of everything to the I/O task and let it deal with the
// aftermath, so we don't have to sit here blocking.
match comm::recv(result_po) {
match core::comm::recv(result_po) {
tcp_write_success => result::ok(()),
tcp_write_error(err_data) => result::err(err_data.to_tcp_err())
}
@ -1012,7 +1014,7 @@ type tcp_listen_fc_data = {
extern fn tcp_lfc_close_cb(handle: *uv::ll::uv_tcp_t) unsafe {
let server_data_ptr = uv::ll::get_data_for_uv_handle(
handle) as *tcp_listen_fc_data;
comm::send((*server_data_ptr).stream_closed_ch, ());
core::comm::send((*server_data_ptr).stream_closed_ch, ());
}
extern fn tcp_lfc_on_connection_cb(handle: *uv::ll::uv_tcp_t,
@ -1025,7 +1027,7 @@ extern fn tcp_lfc_on_connection_cb(handle: *uv::ll::uv_tcp_t,
0i32 => (*server_data_ptr).on_connect_cb(handle),
_ => {
let loop_ptr = uv::ll::get_loop_for_uv_handle(handle);
comm::send(kill_ch,
core::comm::send(kill_ch,
some(uv::ll::get_last_err_data(loop_ptr)
.to_tcp_err()));
(*server_data_ptr).active = false;
@ -1085,7 +1087,7 @@ extern fn on_tcp_read_cb(stream: *uv::ll::uv_stream_t,
log(debug, fmt!{"on_tcp_read_cb: incoming err.. name %? msg %?",
err_data.err_name, err_data.err_msg});
let reader_ch = (*socket_data_ptr).reader_ch;
comm::send(reader_ch, result::err(err_data));
core::comm::send(reader_ch, result::err(err_data));
}
// do nothing .. unneeded buf
0 => (),
@ -1096,7 +1098,7 @@ extern fn on_tcp_read_cb(stream: *uv::ll::uv_stream_t,
let reader_ch = (*socket_data_ptr).reader_ch;
let buf_base = uv::ll::get_base_from_buf(buf);
let new_bytes = vec::unsafe::from_buf(buf_base, nread as uint);
comm::send(reader_ch, result::ok(new_bytes));
core::comm::send(reader_ch, result::ok(new_bytes));
}
}
uv::ll::free_base_of_buf(buf);
@ -1123,7 +1125,7 @@ extern fn tcp_socket_dtor_close_cb(handle: *uv::ll::uv_tcp_t) unsafe {
let data = uv::ll::get_data_for_uv_handle(handle)
as *tcp_socket_close_data;
let closed_ch = (*data).closed_ch;
comm::send(closed_ch, ());
core::comm::send(closed_ch, ());
log(debug, ~"tcp_socket_dtor_close_cb exiting..");
}
@ -1133,14 +1135,15 @@ extern fn tcp_write_complete_cb(write_req: *uv::ll::uv_write_t,
as *write_req_data;
if status == 0i32 {
log(debug, ~"successful write complete");
comm::send((*write_data_ptr).result_ch, tcp_write_success);
core::comm::send((*write_data_ptr).result_ch, tcp_write_success);
} else {
let stream_handle_ptr = uv::ll::get_stream_handle_from_write_req(
write_req);
let loop_ptr = uv::ll::get_loop_for_uv_handle(stream_handle_ptr);
let err_data = uv::ll::get_last_err_data(loop_ptr);
log(debug, ~"failure to write");
comm::send((*write_data_ptr).result_ch, tcp_write_error(err_data));
core::comm::send((*write_data_ptr).result_ch,
tcp_write_error(err_data));
}
}
@ -1156,7 +1159,7 @@ type connect_req_data = {
extern fn stream_error_close_cb(handle: *uv::ll::uv_tcp_t) unsafe {
let data = uv::ll::get_data_for_uv_handle(handle) as
*connect_req_data;
comm::send((*data).closed_signal_ch, ());
core::comm::send((*data).closed_signal_ch, ());
log(debug, fmt!{"exiting steam_error_close_cb for %?", handle});
}
@ -1175,7 +1178,7 @@ extern fn tcp_connect_on_connect_cb(connect_req_ptr: *uv::ll::uv_connect_t,
match status {
0i32 => {
log(debug, ~"successful tcp connection!");
comm::send(result_ch, conn_success);
core::comm::send(result_ch, conn_success);
}
_ => {
log(debug, ~"error in tcp_connect_on_connect_cb");
@ -1183,7 +1186,7 @@ extern fn tcp_connect_on_connect_cb(connect_req_ptr: *uv::ll::uv_connect_t,
let err_data = uv::ll::get_last_err_data(loop_ptr);
log(debug, fmt!{"err_data %? %?", err_data.err_name,
err_data.err_msg});
comm::send(result_ch, conn_failure(err_data));
core::comm::send(result_ch, conn_failure(err_data));
uv::ll::set_data_for_uv_handle(tcp_stream_ptr,
conn_data_ptr);
uv::ll::close(tcp_stream_ptr, stream_error_close_cb);
@ -1279,11 +1282,11 @@ mod test {
let expected_req = ~"ping";
let expected_resp = ~"pong";
let server_result_po = comm::port::<~str>();
let server_result_ch = comm::chan(server_result_po);
let server_result_po = core::comm::port::<~str>();
let server_result_ch = core::comm::chan(server_result_po);
let cont_po = comm::port::<()>();
let cont_ch = comm::chan(cont_po);
let cont_po = core::comm::port::<()>();
let cont_ch = core::comm::chan(cont_po);
// server
do task::spawn_sched(task::ManualThreads(1u)) {
let actual_req = do comm::listen |server_ch| {
@ -1297,10 +1300,10 @@ mod test {
};
server_result_ch.send(actual_req);
};
comm::recv(cont_po);
core::comm::recv(cont_po);
// client
log(debug, ~"server started, firing up client..");
let actual_resp_result = do comm::listen |client_ch| {
let actual_resp_result = do core::comm::listen |client_ch| {
run_tcp_test_client(
server_ip,
server_port,
@ -1310,7 +1313,7 @@ mod test {
};
assert actual_resp_result.is_ok();
let actual_resp = actual_resp_result.get();
let actual_req = comm::recv(server_result_po);
let actual_req = core::comm::recv(server_result_po);
log(debug, fmt!{"REQ: expected: '%s' actual: '%s'",
expected_req, actual_req});
log(debug, fmt!{"RESP: expected: '%s' actual: '%s'",
@ -1325,7 +1328,7 @@ mod test {
let expected_req = ~"ping";
// client
log(debug, ~"firing up client..");
let actual_resp_result = do comm::listen |client_ch| {
let actual_resp_result = do core::comm::listen |client_ch| {
run_tcp_test_client(
server_ip,
server_port,
@ -1345,11 +1348,11 @@ mod test {
let expected_req = ~"ping";
let expected_resp = ~"pong";
let server_result_po = comm::port::<~str>();
let server_result_ch = comm::chan(server_result_po);
let server_result_po = core::comm::port::<~str>();
let server_result_ch = core::comm::chan(server_result_po);
let cont_po = comm::port::<()>();
let cont_ch = comm::chan(cont_po);
let cont_po = core::comm::port::<()>();
let cont_ch = core::comm::chan(cont_po);
// server
do task::spawn_sched(task::ManualThreads(1u)) {
let actual_req = do comm::listen |server_ch| {
@ -1363,7 +1366,7 @@ mod test {
};
server_result_ch.send(actual_req);
};
comm::recv(cont_po);
core::comm::recv(cont_po);
// this one should fail..
let listen_err = run_tcp_test_server_fail(
server_ip,
@ -1371,7 +1374,7 @@ mod test {
hl_loop);
// client.. just doing this so that the first server tears down
log(debug, ~"server started, firing up client..");
do comm::listen |client_ch| {
do core::comm::listen |client_ch| {
run_tcp_test_client(
server_ip,
server_port,
@ -1415,11 +1418,11 @@ mod test {
let expected_req = ~"ping";
let expected_resp = ~"pong";
let server_result_po = comm::port::<~str>();
let server_result_ch = comm::chan(server_result_po);
let server_result_po = core::comm::port::<~str>();
let server_result_ch = core::comm::chan(server_result_po);
let cont_po = comm::port::<()>();
let cont_ch = comm::chan(cont_po);
let cont_po = core::comm::port::<()>();
let cont_ch = core::comm::chan(cont_po);
// server
do task::spawn_sched(task::ManualThreads(1u)) {
let actual_req = do comm::listen |server_ch| {
@ -1433,7 +1436,7 @@ mod test {
};
server_result_ch.send(actual_req);
};
comm::recv(cont_po);
core::comm::recv(cont_po);
// client
let server_addr = ip::v4::parse_addr(server_ip);
let conn_result = connect(server_addr, server_port, iotask);
@ -1449,7 +1452,7 @@ mod test {
vec::len(resp_buf))
};
let actual_req = comm::recv(server_result_po);
let actual_req = core::comm::recv(server_result_po);
log(debug, fmt!{"REQ: expected: '%s' actual: '%s'",
expected_req, actual_req});
log(debug, fmt!{"RESP: expected: '%s' actual: '%s'",
@ -1484,7 +1487,7 @@ mod test {
|kill_ch| {
log(debug, fmt!{"establish_cb %?",
kill_ch});
comm::send(cont_ch, ());
core::comm::send(cont_ch, ());
},
// risky to run this on the loop, but some users
// will want the POWER
@ -1499,7 +1502,7 @@ mod test {
if result::is_err(accept_result) {
log(debug, ~"SERVER: error accept connection");
let err_data = result::get_err(accept_result);
comm::send(kill_ch, some(err_data));
core::comm::send(kill_ch, some(err_data));
log(debug,
~"SERVER/WORKER: send on err cont ch");
cont_ch.send(());
@ -1522,12 +1525,12 @@ mod test {
log(debug, ~"SERVER: before write");
tcp_write_single(sock, str::bytes(resp));
log(debug, ~"SERVER: after write.. die");
comm::send(kill_ch, none);
core::comm::send(kill_ch, none);
}
result::err(err_data) => {
log(debug, fmt!{"SERVER: error recvd: %s %s",
err_data.err_name, err_data.err_msg});
comm::send(kill_ch, some(err_data));
core::comm::send(kill_ch, some(err_data));
server_ch.send(~"");
}
}

View file

@ -1,7 +1,3 @@
import comm::port;
import comm::chan;
import comm::send;
import comm::recv;
import future_spawn = future::spawn;
export map, mapi, alli, any, mapi_factory;

View file

@ -52,6 +52,7 @@ mod cell;
mod sync;
mod arc;
mod comm;
// Collections

View file

@ -10,6 +10,7 @@ import result::{ok, err};
import io::WriterUtil;
import libc::size_t;
import task::TaskBuilder;
import comm = core::comm;
export test_name;
export test_fn;
@ -285,8 +286,8 @@ fn run_tests(opts: test_opts, tests: ~[test_desc],
let mut wait_idx = 0u;
let mut done_idx = 0u;
let p = comm::port();
let ch = comm::chan(p);
let p = core::comm::port();
let ch = core::comm::chan(p);
while done_idx < total {
while wait_idx < concurrency && run_idx < total {
@ -302,7 +303,7 @@ fn run_tests(opts: test_opts, tests: ~[test_desc],
run_idx += 1u;
}
let (test, result) = comm::recv(p);
let (test, result) = core::comm::recv(p);
if concurrency != 1u {
callback(te_wait(copy test));
}
@ -381,7 +382,7 @@ type test_future = {test: test_desc, wait: fn@() -> test_result};
fn run_test(+test: test_desc, monitor_ch: comm::Chan<monitor_msg>) {
if test.ignore {
comm::send(monitor_ch, (copy test, tr_ignored));
core::comm::send(monitor_ch, (copy test, tr_ignored));
return;
}
@ -419,10 +420,10 @@ mod tests {
ignore: true,
should_fail: false
};
let p = comm::port();
let ch = comm::chan(p);
let p = core::comm::port();
let ch = core::comm::chan(p);
run_test(desc, ch);
let (_, res) = comm::recv(p);
let (_, res) = core::comm::recv(p);
assert res != tr_ok;
}
@ -435,10 +436,10 @@ mod tests {
ignore: true,
should_fail: false
};
let p = comm::port();
let ch = comm::chan(p);
let p = core::comm::port();
let ch = core::comm::chan(p);
run_test(desc, ch);
let (_, res) = comm::recv(p);
let (_, res) = core::comm::recv(p);
assert res == tr_ignored;
}
@ -452,10 +453,10 @@ mod tests {
ignore: false,
should_fail: true
};
let p = comm::port();
let ch = comm::chan(p);
let p = core::comm::port();
let ch = core::comm::chan(p);
run_test(desc, ch);
let (_, res) = comm::recv(p);
let (_, res) = core::comm::recv(p);
assert res == tr_ok;
}
@ -468,10 +469,10 @@ mod tests {
ignore: false,
should_fail: true
};
let p = comm::port();
let ch = comm::chan(p);
let p = core::comm::port();
let ch = core::comm::chan(p);
run_test(desc, ch);
let (_, res) = comm::recv(p);
let (_, res) = core::comm::recv(p);
assert res == tr_failed;
}

View file

@ -3,6 +3,8 @@
import uv = uv;
import uv::iotask;
import iotask::iotask;
import comm = core::comm;
export delayed_send, sleep, recv_timeout;
/**
@ -24,8 +26,8 @@ export delayed_send, sleep, recv_timeout;
fn delayed_send<T: copy send>(iotask: iotask,
msecs: uint, ch: comm::Chan<T>, val: T) {
unsafe {
let timer_done_po = comm::port::<()>();
let timer_done_ch = comm::chan(timer_done_po);
let timer_done_po = core::comm::port::<()>();
let timer_done_ch = core::comm::chan(timer_done_po);
let timer_done_ch_ptr = ptr::addr_of(timer_done_ch);
let timer = uv::ll::timer_t();
let timer_ptr = ptr::addr_of(timer);
@ -51,11 +53,11 @@ fn delayed_send<T: copy send>(iotask: iotask,
}
};
// delayed_send_cb has been processed by libuv
comm::recv(timer_done_po);
core::comm::recv(timer_done_po);
// notify the caller immediately
comm::send(ch, copy(val));
core::comm::send(ch, copy(val));
// uv_close for this timer has been processed
comm::recv(timer_done_po);
core::comm::recv(timer_done_po);
};
}
@ -71,10 +73,10 @@ fn delayed_send<T: copy send>(iotask: iotask,
* * msecs - an amount of time, in milliseconds, for the current task to block
*/
fn sleep(iotask: iotask, msecs: uint) {
let exit_po = comm::port::<()>();
let exit_ch = comm::chan(exit_po);
let exit_po = core::comm::port::<()>();
let exit_ch = core::comm::chan(exit_po);
delayed_send(iotask, msecs, exit_ch, ());
comm::recv(exit_po);
core::comm::recv(exit_po);
}
/**
@ -89,7 +91,7 @@ fn sleep(iotask: iotask, msecs: uint) {
*
* * `iotask' - `uv::iotask` that the tcp request will run on
* * msecs - an mount of time, in milliseconds, to wait to receive
* * wait_port - a `comm::port<T>` to receive on
* * wait_port - a `core::comm::port<T>` to receive on
*
* # Returns
*
@ -111,7 +113,7 @@ fn recv_timeout<T: copy send>(iotask: iotask,
none
}, |right_val| {
some(*right_val)
}, &comm::select2(timeout_po, wait_po)
}, &core::comm::select2(timeout_po, wait_po)
)
}
@ -123,7 +125,7 @@ extern fn delayed_send_cb(handle: *uv::ll::uv_timer_t,
*(uv::ll::get_data_for_uv_handle(handle) as *comm::Chan<()>);
let stop_result = uv::ll::timer_stop(handle);
if (stop_result == 0i32) {
comm::send(timer_done_ch, ());
core::comm::send(timer_done_ch, ());
uv::ll::close(handle, delayed_send_close_cb);
}
else {
@ -158,8 +160,8 @@ mod test {
#[test]
fn test_gl_timer_sleep_stress2() {
let po = comm::port();
let ch = comm::chan(po);
let po = core::comm::port();
let ch = core::comm::chan(po);
let hl_loop = uv::global_loop::get();
let repeat = 20u;
@ -181,13 +183,13 @@ mod test {
for iter::repeat(times) {
sleep(hl_loop, rng.next() as uint % maxms);
}
comm::send(ch, ());
core::comm::send(ch, ());
}
}
}
for iter::repeat(repeat * spec.len()) {
comm::recv(po)
core::comm::recv(po)
}
}
@ -208,8 +210,8 @@ mod test {
task::yield();
let expected = rand::rng().gen_str(16u);
let test_po = comm::port::<str>();
let test_ch = comm::chan(test_po);
let test_po = core::comm::port::<str>();
let test_ch = core::comm::chan(test_po);
do task::spawn() {
delayed_send(hl_loop, 1u, test_ch, expected);
@ -236,8 +238,8 @@ mod test {
for iter::repeat(times as uint) {
let expected = rand::rng().gen_str(16u);
let test_po = comm::port::<~str>();
let test_ch = comm::chan(test_po);
let test_po = core::comm::port::<~str>();
let test_ch = core::comm::chan(test_po);
do task::spawn() {
delayed_send(hl_loop, 1000u, test_ch, expected);

View file

@ -7,6 +7,7 @@ import iotask = uv_iotask;
import get_gl = get;
import iotask::{iotask, spawn_iotask};
import priv::{chan_from_global_ptr, weaken_task};
import comm = core::comm;
import comm::{Port, Chan, port, chan, select2, listen};
import task::TaskBuilder;
import either::{Left, Right};
@ -110,7 +111,7 @@ mod test {
let exit_ch_ptr = ll::get_data_for_uv_handle(
timer_ptr as *libc::c_void) as *comm::Chan<bool>;
let exit_ch = *exit_ch_ptr;
comm::send(exit_ch, true);
core::comm::send(exit_ch, true);
log(debug, fmt!{"EXIT_CH_PTR simple_timer_close_cb exit_ch_ptr: %?",
exit_ch_ptr});
}
@ -129,8 +130,8 @@ mod test {
}
fn impl_uv_hl_simple_timer(iotask: iotask) unsafe {
let exit_po = comm::port::<bool>();
let exit_ch = comm::chan(exit_po);
let exit_po = core::comm::port::<bool>();
let exit_ch = core::comm::chan(exit_po);
let exit_ch_ptr = ptr::addr_of(exit_ch);
log(debug, fmt!{"EXIT_CH_PTR newly created exit_ch_ptr: %?",
exit_ch_ptr});
@ -155,7 +156,7 @@ mod test {
fail ~"failure on ll::timer_init()";
}
};
comm::recv(exit_po);
core::comm::recv(exit_po);
log(debug, ~"global_loop timer test: msg recv on exit_po, done..");
}
@ -166,10 +167,10 @@ mod test {
let exit_ch = comm::chan(exit_po);
task::spawn_sched(task::ManualThreads(1u), || {
impl_uv_hl_simple_timer(hl_loop);
comm::send(exit_ch, ());
core::comm::send(exit_ch, ());
});
impl_uv_hl_simple_timer(hl_loop);
comm::recv(exit_po);
core::comm::recv(exit_po);
}
// keeping this test ignored until some kind of stress-test-harness
@ -178,17 +179,17 @@ mod test {
#[ignore]
fn test_stress_gl_uv_global_loop_high_level_global_timer() unsafe {
let hl_loop = get_gl();
let exit_po = comm::port::<()>();
let exit_ch = comm::chan(exit_po);
let exit_po = core::comm::port::<()>();
let exit_ch = core::comm::chan(exit_po);
let cycles = 5000u;
for iter::repeat(cycles) {
task::spawn_sched(task::ManualThreads(1u), || {
impl_uv_hl_simple_timer(hl_loop);
comm::send(exit_ch, ());
core::comm::send(exit_ch, ());
});
};
for iter::repeat(cycles) {
comm::recv(exit_po);
core::comm::recv(exit_po);
};
log(debug, ~"test_stress_gl_uv_global_loop_high_level_global_timer"+
~" exiting sucessfully!");

View file

@ -12,6 +12,7 @@ export exit;
import libc::c_void;
import ptr::addr_of;
import comm = core::comm;
import comm::{Port, port, Chan, chan, listen};
import task::TaskBuilder;
import ll = uv_ll;
@ -171,7 +172,7 @@ mod test {
log(debug, fmt!{"async_close_cb handle %?", handle});
let exit_ch = (*(ll::get_data_for_uv_handle(handle)
as *ah_data)).exit_ch;
comm::send(exit_ch, ());
core::comm::send(exit_ch, ());
}
extern fn async_handle_cb(handle: *ll::uv_async_t, status: libc::c_int)
unsafe {
@ -185,8 +186,8 @@ mod test {
fn impl_uv_iotask_async(iotask: iotask) unsafe {
let async_handle = ll::async_t();
let ah_ptr = ptr::addr_of(async_handle);
let exit_po = comm::port::<()>();
let exit_ch = comm::chan(exit_po);
let exit_po = core::comm::port::<()>();
let exit_ch = core::comm::chan(exit_po);
let ah_data = {
iotask: iotask,
exit_ch: exit_ch
@ -197,7 +198,7 @@ mod test {
ll::set_data_for_uv_handle(ah_ptr, ah_data_ptr as *libc::c_void);
ll::async_send(ah_ptr);
};
comm::recv(exit_po);
core::comm::recv(exit_po);
}
// this fn documents the bear minimum neccesary to roll your own
@ -209,7 +210,7 @@ mod test {
run_loop(iotask_ch);
exit_ch.send(());
};
return comm::recv(iotask_port);
return core::comm::recv(iotask_port);
}
extern fn lifetime_handle_close(handle: *libc::c_void) unsafe {
@ -224,8 +225,8 @@ mod test {
#[test]
fn test_uv_iotask_async() unsafe {
let exit_po = comm::port::<()>();
let exit_ch = comm::chan(exit_po);
let exit_po = core::comm::port::<()>();
let exit_ch = core::comm::chan(exit_po);
let iotask = spawn_test_loop(exit_ch);
// using this handle to manage the lifetime of the high_level_loop,
@ -234,20 +235,20 @@ mod test {
// under race-condition type situations.. this ensures that the loop
// lives until, at least, all of the impl_uv_hl_async() runs have been
// called, at least.
let work_exit_po = comm::port::<()>();
let work_exit_ch = comm::chan(work_exit_po);
let work_exit_po = core::comm::port::<()>();
let work_exit_ch = core::comm::chan(work_exit_po);
for iter::repeat(7u) {
do task::spawn_sched(task::ManualThreads(1u)) {
impl_uv_iotask_async(iotask);
comm::send(work_exit_ch, ());
core::comm::send(work_exit_ch, ());
};
};
for iter::repeat(7u) {
comm::recv(work_exit_po);
core::comm::recv(work_exit_po);
};
log(debug, ~"sending teardown_loop msg..");
exit(iotask);
comm::recv(exit_po);
core::comm::recv(exit_po);
log(debug, ~"after recv on exit_po.. exiting..");
}
}

View file

@ -22,6 +22,7 @@
import libc::size_t;
import ptr::assimilate;
import comm = core::comm;
// libuv struct mappings
type uv_ip4_addr = {
@ -1046,7 +1047,7 @@ mod test {
let bytes = vec::unsafe::from_buf(buf_base, buf_len as uint);
let read_chan = *((*client_data).read_chan);
let msg_from_server = str::from_bytes(bytes);
comm::send(read_chan, msg_from_server);
core::comm::send(read_chan, msg_from_server);
close(stream as *libc::c_void, after_close_cb)
}
else if (nread == -1) {
@ -1231,7 +1232,7 @@ mod test {
log(debug, ~"SERVER: sending response to client");
read_stop(client_stream_ptr);
let server_chan = *((*client_data).server_chan);
comm::send(server_chan, request_str);
core::comm::send(server_chan, request_str);
let write_result = write(
write_req,
client_stream_ptr as *libc::c_void,
@ -1346,7 +1347,7 @@ mod test {
async_handle as *libc::c_void) as *async_handle_data;
let continue_chan = *((*data).continue_chan);
let should_continue = status == 0i32;
comm::send(continue_chan, should_continue);
core::comm::send(continue_chan, should_continue);
close(async_handle as *libc::c_void, async_close_cb);
}
@ -1460,13 +1461,13 @@ mod test {
let port = 8887;
let kill_server_msg = ~"does a dog have buddha nature?";
let server_resp_msg = ~"mu!";
let client_port = comm::port::<~str>();
let client_chan = comm::chan::<~str>(client_port);
let server_port = comm::port::<~str>();
let server_chan = comm::chan::<~str>(server_port);
let client_port = core::comm::port::<~str>();
let client_chan = core::comm::chan::<~str>(client_port);
let server_port = core::comm::port::<~str>();
let server_chan = core::comm::chan::<~str>(server_port);
let continue_port = comm::port::<bool>();
let continue_chan = comm::chan::<bool>(continue_port);
let continue_port = core::comm::port::<bool>();
let continue_chan = core::comm::chan::<bool>(continue_port);
let continue_chan_ptr = ptr::addr_of(continue_chan);
do task::spawn_sched(task::ManualThreads(1u)) {
@ -1479,7 +1480,7 @@ mod test {
// block until the server up is.. possibly a race?
log(debug, ~"before receiving on server continue_port");
comm::recv(continue_port);
core::comm::recv(continue_port);
log(debug, ~"received on continue port, set up tcp client");
do task::spawn_sched(task::ManualThreads(1u)) {
@ -1488,8 +1489,8 @@ mod test {
ptr::addr_of(client_chan));
};
let msg_from_client = comm::recv(server_port);
let msg_from_server = comm::recv(client_port);
let msg_from_client = core::comm::recv(server_port);
let msg_from_server = core::comm::recv(client_port);
assert str::contains(msg_from_client, kill_server_msg);
assert str::contains(msg_from_server, server_resp_msg);