From 08a77e06a8b7e76466a5c177159a5ecdf3cad31b Mon Sep 17 00:00:00 2001 From: Eric Holk Date: Wed, 25 Jul 2012 14:05:06 -0700 Subject: [PATCH] Rewrite task-comm-NN to use pipes --- src/libcore/pipes.rs | 59 +++-------------------- src/test/run-pass/task-comm-0.rs | 22 ++++----- src/test/run-pass/task-comm-10.rs | 21 ++++----- src/test/run-pass/task-comm-11.rs | 13 +++--- src/test/run-pass/task-comm-13.rs | 11 ++--- src/test/run-pass/task-comm-14.rs | 14 +++--- src/test/run-pass/task-comm-15.rs | 12 ++--- src/test/run-pass/task-comm-16.rs | 70 ++++++++++++++-------------- src/test/run-pass/task-comm-3.rs | 16 +++---- src/test/run-pass/task-comm-4.rs | 39 ++++++++-------- src/test/run-pass/task-comm-5.rs | 9 ++-- src/test/run-pass/task-comm-6.rs | 34 +++++++------- src/test/run-pass/task-comm-7.rs | 20 ++++---- src/test/run-pass/task-comm-9.rs | 11 ++--- src/test/run-pass/trivial-message.rs | 10 ++-- 15 files changed, 151 insertions(+), 210 deletions(-) diff --git a/src/libcore/pipes.rs b/src/libcore/pipes.rs index 27cd8ef7caf..a999d615e31 100644 --- a/src/libcore/pipes.rs +++ b/src/libcore/pipes.rs @@ -30,59 +30,6 @@ macro_rules! move { // places. Once there is unary move, it can be removed. fn move(-x: T) -> T { x } -/** - -Some thoughts about fixed buffers. - -The idea is if a protocol is bounded, we will synthesize a record that -has a field for each state. Each of these states contains a packet for -the messages that are legal to be sent in that state. Then, instead of -allocating, the send code just finds a pointer to the right field and -uses that instead. - -Unforunately, this makes things kind of tricky. We need to be able to -find the buffer, which means we need to pass it around. This could -either be associated with the (send|recv)_packet classes, or with the -packet itself. We will also need some form of reference counting so we -can track who has the responsibility of freeing the buffer. - -We want to preserve the ability to do things like optimistic buffer -re-use, and skipping over to a new buffer when necessary. What I mean -is, suppose we had the typical stream protocol. It'd make sense to -amortize allocation costs by allocating a buffer with say 16 -messages. When the sender gets to the end of the buffer, it could -check if the receiver is done with the packet in slot 0. If so, it can -just reuse that one, checking if the receiver is done with the next -one in each case. If it is ever not done, it just allocates a new -buffer and skips over to that. - -Also, since protocols are in libcore, we have to do this in a way that -maintains backwards compatibility. - -buffer header and buffer. Cast as c_void when necessary. - -=== - -Okay, here are some new ideas. - -It'd be nice to keep the bounded/unbounded case as uniform as -possible. It leads to less code duplication, and less things that can -go sublty wrong. For the bounded case, we could either have a struct -with a bunch of unique pointers to pre-allocated packets, or we could -lay them out inline. Inline layout is better, if for no other reason -than that we don't have to allocate each packet -individually. Currently we pass unique packets around as unsafe -pointers, but they are actually unique pointers. We should instead use -real unsafe pointers. This makes freeing data and running destructors -trickier though. Thus, we should allocate all packets in parter of a -higher level buffer structure. Packets can maintain a pointer to their -buffer, and this is the part that gets freed. - -It might be helpful to have some idea of a semi-unique pointer (like -being partially pregnant, also like an ARC). - -*/ - enum state { empty, full, @@ -805,6 +752,12 @@ class port_set : recv { vec::push(self.ports, port) } + fn chan() -> chan { + let (ch, po) = stream(); + self.add(po); + ch + } + fn try_recv() -> option { let mut result = none; while result == none && self.ports.len() > 0 { diff --git a/src/test/run-pass/task-comm-0.rs b/src/test/run-pass/task-comm-0.rs index 532daf3651c..eb866ee7b3e 100644 --- a/src/test/run-pass/task-comm-0.rs +++ b/src/test/run-pass/task-comm-0.rs @@ -1,31 +1,29 @@ use std; -import comm; -import comm::chan; -import comm::send; +import pipes; +import pipes::chan; +import pipes::port; import task; fn main() { test05(); } fn test05_start(ch : chan) { - log(error, ch); - send(ch, 10); + ch.send(10); #error("sent 10"); - send(ch, 20); + ch.send(20); #error("sent 20"); - send(ch, 30); + ch.send(30); #error("sent 30"); } fn test05() { - let po = comm::port(); - let ch = comm::chan(po); + let (ch, po) = pipes::stream(); task::spawn(|| test05_start(ch) ); - let mut value = comm::recv(po); + let mut value = po.recv(); log(error, value); - value = comm::recv(po); + value = po.recv(); log(error, value); - value = comm::recv(po); + value = po.recv(); log(error, value); assert (value == 30); } diff --git a/src/test/run-pass/task-comm-10.rs b/src/test/run-pass/task-comm-10.rs index 1d0963a29dc..ef94a13c072 100644 --- a/src/test/run-pass/task-comm-10.rs +++ b/src/test/run-pass/task-comm-10.rs @@ -1,28 +1,27 @@ use std; import task; -import comm; +import pipes; -fn start(c: comm::chan>) { - let p = comm::port(); - comm::send(c, comm::chan(p)); +fn start(c: pipes::chan>) { + let (ch, p) = pipes::stream(); + c.send(ch); let mut a; let mut b; - a = comm::recv(p); + a = p.recv(); assert a == ~"A"; log(error, a); - b = comm::recv(p); + b = p.recv(); assert b == ~"B"; log(error, b); } fn main() { - let p = comm::port(); - let ch = comm::chan(p); + let (ch, p) = pipes::stream(); let child = task::spawn(|| start(ch) ); - let c = comm::recv(p); - comm::send(c, ~"A"); - comm::send(c, ~"B"); + let c = p.recv(); + c.send(~"A"); + c.send(~"B"); task::yield(); } diff --git a/src/test/run-pass/task-comm-11.rs b/src/test/run-pass/task-comm-11.rs index f96f9d148d0..5e1e5c5facd 100644 --- a/src/test/run-pass/task-comm-11.rs +++ b/src/test/run-pass/task-comm-11.rs @@ -1,15 +1,14 @@ use std; -import comm; +import pipes; import task; -fn start(c: comm::chan>) { - let p: comm::port = comm::port(); - comm::send(c, comm::chan(p)); +fn start(c: pipes::chan>) { + let (ch, p) = pipes::stream(); + c.send(ch); } fn main() { - let p = comm::port(); - let ch = comm::chan(p); + let (ch, p) = pipes::stream(); let child = task::spawn(|| start(ch) ); - let c = comm::recv(p); + let c = p.recv(); } diff --git a/src/test/run-pass/task-comm-13.rs b/src/test/run-pass/task-comm-13.rs index 3ded4aac5ae..88220437c43 100644 --- a/src/test/run-pass/task-comm-13.rs +++ b/src/test/run-pass/task-comm-13.rs @@ -1,17 +1,16 @@ use std; import task; -import comm; -import comm::send; +import pipes; +import pipes::send; -fn start(c: comm::chan, start: int, number_of_messages: int) { +fn start(c: pipes::chan, start: int, number_of_messages: int) { let mut i: int = 0; - while i < number_of_messages { send(c, start + i); i += 1; } + while i < number_of_messages { c.send(start + i); i += 1; } } fn main() { #debug("Check that we don't deadlock."); - let p = comm::port::(); - let ch = comm::chan(p); + let (ch, p) = pipes::stream(); task::try(|| start(ch, 0, 10) ); #debug("Joined task"); } diff --git a/src/test/run-pass/task-comm-14.rs b/src/test/run-pass/task-comm-14.rs index ef9c4ae5010..268b6d06dfd 100644 --- a/src/test/run-pass/task-comm-14.rs +++ b/src/test/run-pass/task-comm-14.rs @@ -1,15 +1,14 @@ -use std; -import comm; import task; fn main() { - let po = comm::port::(); - let ch = comm::chan(po); + let po = pipes::port_set(); // Spawn 10 tasks each sending us back one int. let mut i = 10; while (i > 0) { log(debug, i); + let (ch, p) = pipes::stream(); + po.add(p); task::spawn(|copy i| child(i, ch) ); i = i - 1; } @@ -18,17 +17,16 @@ fn main() { // anything back, so we deadlock here. i = 10; - let mut value = 0; while (i > 0) { log(debug, i); - value = comm::recv(po); + po.recv(); i = i - 1; } #debug("main thread exiting"); } -fn child(x: int, ch: comm::chan) { +fn child(x: int, ch: pipes::chan) { log(debug, x); - comm::send(ch, copy x); + ch.send(x); } diff --git a/src/test/run-pass/task-comm-15.rs b/src/test/run-pass/task-comm-15.rs index d9291fd6898..71a732490ce 100644 --- a/src/test/run-pass/task-comm-15.rs +++ b/src/test/run-pass/task-comm-15.rs @@ -1,23 +1,21 @@ // xfail-win32 use std; -import comm; import task; -fn start(c: comm::chan, i0: int) { +fn start(c: pipes::chan, i0: int) { let mut i = i0; while i > 0 { - comm::send(c, 0); + c.send(0); i = i - 1; } } fn main() { - let p = comm::port(); // Spawn a task that sends us back messages. The parent task // is likely to terminate before the child completes, so from // the child's point of view the receiver may die. We should // drop messages on the floor in this case, and not crash! - let ch = comm::chan(p); - let child = task::spawn(|| start(ch, 10) ); - let c = comm::recv(p); + let (ch, p) = pipes::stream(); + task::spawn(|| start(ch, 10)); + p.recv(); } diff --git a/src/test/run-pass/task-comm-16.rs b/src/test/run-pass/task-comm-16.rs index bc46de64cd7..2d1c72df713 100644 --- a/src/test/run-pass/task-comm-16.rs +++ b/src/test/run-pass/task-comm-16.rs @@ -1,44 +1,41 @@ // -*- rust -*- use std; -import comm; -import comm::send; -import comm::port; -import comm::recv; -import comm::chan; +import pipes; +import pipes::send; +import pipes::port; +import pipes::recv; +import pipes::chan; // Tests of ports and channels on various types fn test_rec() { type r = {val0: int, val1: u8, val2: char}; - let po = comm::port(); - let ch = chan(po); + let (ch, po) = pipes::stream(); let r0: r = {val0: 0, val1: 1u8, val2: '2'}; - send(ch, r0); + ch.send(r0); let mut r1: r; - r1 = recv(po); + r1 = po.recv(); assert (r1.val0 == 0); assert (r1.val1 == 1u8); assert (r1.val2 == '2'); } fn test_vec() { - let po = port(); - let ch = chan(po); + let (ch, po) = pipes::stream(); let v0: ~[int] = ~[0, 1, 2]; - send(ch, v0); - let v1 = recv(po); + ch.send(v0); + let v1 = po.recv(); assert (v1[0] == 0); assert (v1[1] == 1); assert (v1[2] == 2); } fn test_str() { - let po = port(); - let ch = chan(po); - let s0 = ~"test"; - send(ch, s0); - let s1 = recv(po); + let (ch, po) = pipes::stream(); + let s0 = "test"; + ch.send(s0); + let s1 = po.recv(); assert (s1[0] == 't' as u8); assert (s1[1] == 'e' as u8); assert (s1[2] == 's' as u8); @@ -47,33 +44,36 @@ fn test_str() { fn test_tag() { enum t { tag1, tag2(int), tag3(int, u8, char), } - let po = port(); - let ch = chan(po); - send(ch, tag1); - send(ch, tag2(10)); - send(ch, tag3(10, 11u8, 'A')); + let (ch, po) = pipes::stream(); + ch.send(tag1); + ch.send(tag2(10)); + ch.send(tag3(10, 11u8, 'A')); let mut t1: t; - t1 = recv(po); + t1 = po.recv(); assert (t1 == tag1); - t1 = recv(po); + t1 = po.recv(); assert (t1 == tag2(10)); - t1 = recv(po); + t1 = po.recv(); assert (t1 == tag3(10, 11u8, 'A')); } fn test_chan() { - let po = port(); - let ch = chan(po); - let po0 = port(); - let ch0 = chan(po0); - send(ch, ch0); - let ch1 = recv(po); + let (ch, po) = pipes::stream(); + let (ch0, po0) = pipes::stream(); + ch.send(ch0); + let ch1 = po.recv(); // Does the transmitted channel still work? - send(ch1, 10); + ch1.send(10); let mut i: int; - i = recv(po0); + i = po0.recv(); assert (i == 10); } -fn main() { test_rec(); test_vec(); test_str(); test_tag(); test_chan(); } +fn main() { + test_rec(); + test_vec(); + test_str(); + test_tag(); + test_chan(); +} diff --git a/src/test/run-pass/task-comm-3.rs b/src/test/run-pass/task-comm-3.rs index c03db442022..05c057d85fd 100644 --- a/src/test/run-pass/task-comm-3.rs +++ b/src/test/run-pass/task-comm-3.rs @@ -1,9 +1,9 @@ use std; import task; -import comm; -import comm::chan; -import comm::send; -import comm::recv; +import pipes; +import pipes::chan; +import pipes::send; +import pipes::recv; fn main() { #debug("===== WITHOUT THREADS ====="); test00(); } @@ -12,7 +12,7 @@ fn test00_start(ch: chan, message: int, count: int) { let mut i: int = 0; while i < count { #debug("Sending Message"); - send(ch, message + 0); + ch.send(message + 0); i = i + 1; } #debug("Ending test00_start"); @@ -24,14 +24,14 @@ fn test00() { #debug("Creating tasks"); - let po = comm::port(); - let ch = chan(po); + let po = pipes::port_set(); let mut i: int = 0; // Create and spawn tasks... let mut results = ~[]; while i < number_of_tasks { + let ch = po.chan(); do task::task().future_result(|-r| { results += ~[r]; }).spawn |copy i| { @@ -45,7 +45,7 @@ fn test00() { for results.each |r| { i = 0; while i < number_of_messages { - let value = recv(po); + let value = po.recv(); sum += value; i = i + 1; } diff --git a/src/test/run-pass/task-comm-4.rs b/src/test/run-pass/task-comm-4.rs index f7de33c63d6..9b99c1cb799 100644 --- a/src/test/run-pass/task-comm-4.rs +++ b/src/test/run-pass/task-comm-4.rs @@ -1,44 +1,43 @@ use std; -import comm; -import comm::send; +import pipes; +import pipes::send; fn main() { test00(); } fn test00() { let mut r: int = 0; let mut sum: int = 0; - let p = comm::port(); - let c = comm::chan(p); - send(c, 1); - send(c, 2); - send(c, 3); - send(c, 4); - r = comm::recv(p); + let (c, p) = pipes::stream(); + c.send(1); + c.send(2); + c.send(3); + c.send(4); + r = p.recv(); sum += r; log(debug, r); - r = comm::recv(p); + r = p.recv(); sum += r; log(debug, r); - r = comm::recv(p); + r = p.recv(); sum += r; log(debug, r); - r = comm::recv(p); + r = p.recv(); sum += r; log(debug, r); - send(c, 5); - send(c, 6); - send(c, 7); - send(c, 8); - r = comm::recv(p); + c.send(5); + c.send(6); + c.send(7); + c.send(8); + r = p.recv(); sum += r; log(debug, r); - r = comm::recv(p); + r = p.recv(); sum += r; log(debug, r); - r = comm::recv(p); + r = p.recv(); sum += r; log(debug, r); - r = comm::recv(p); + r = p.recv(); sum += r; log(debug, r); assert (sum == 1 + 2 + 3 + 4 + 5 + 6 + 7 + 8); diff --git a/src/test/run-pass/task-comm-5.rs b/src/test/run-pass/task-comm-5.rs index a0d4f4595be..249bdaf1c9a 100644 --- a/src/test/run-pass/task-comm-5.rs +++ b/src/test/run-pass/task-comm-5.rs @@ -1,17 +1,16 @@ use std; -import comm; +import pipes; fn main() { test00(); } fn test00() { let r: int = 0; let mut sum: int = 0; - let p = comm::port(); - let c = comm::chan(p); + let (c, p) = pipes::stream(); let number_of_messages: int = 1000; let mut i: int = 0; - while i < number_of_messages { comm::send(c, i + 0); i += 1; } + while i < number_of_messages { c.send(i + 0); i += 1; } i = 0; - while i < number_of_messages { sum += comm::recv(p); i += 1; } + while i < number_of_messages { sum += p.recv(); i += 1; } assert (sum == number_of_messages * (number_of_messages - 1) / 2); } diff --git a/src/test/run-pass/task-comm-6.rs b/src/test/run-pass/task-comm-6.rs index b363b25e864..8bc93a78913 100644 --- a/src/test/run-pass/task-comm-6.rs +++ b/src/test/run-pass/task-comm-6.rs @@ -1,37 +1,37 @@ use std; -import comm; -import comm::send; -import comm::chan; -import comm::recv; +import pipes; +import pipes::send; +import pipes::chan; +import pipes::recv; fn main() { test00(); } fn test00() { let mut r: int = 0; let mut sum: int = 0; - let p = comm::port(); - let c0 = chan(p); - let c1 = chan(p); - let c2 = chan(p); - let c3 = chan(p); + let p = pipes::port_set(); + let c0 = p.chan(); + let c1 = p.chan(); + let c2 = p.chan(); + let c3 = p.chan(); let number_of_messages: int = 1000; let mut i: int = 0; while i < number_of_messages { - send(c0, i + 0); - send(c1, i + 0); - send(c2, i + 0); - send(c3, i + 0); + c0.send(i + 0); + c1.send(i + 0); + c2.send(i + 0); + c3.send(i + 0); i += 1; } i = 0; while i < number_of_messages { - r = recv(p); + r = p.recv(); sum += r; - r = recv(p); + r = p.recv(); sum += r; - r = recv(p); + r = p.recv(); sum += r; - r = recv(p); + r = p.recv(); sum += r; i += 1; } diff --git a/src/test/run-pass/task-comm-7.rs b/src/test/run-pass/task-comm-7.rs index b8922400777..1df3de6ba1e 100644 --- a/src/test/run-pass/task-comm-7.rs +++ b/src/test/run-pass/task-comm-7.rs @@ -1,43 +1,45 @@ use std; import task; -import comm; fn main() { test00(); } -fn test00_start(c: comm::chan, start: int, number_of_messages: int) { +fn test00_start(c: pipes::chan, start: int, number_of_messages: int) { let mut i: int = 0; - while i < number_of_messages { comm::send(c, start + i); i += 1; } + while i < number_of_messages { c.send(start + i); i += 1; } } fn test00() { let mut r: int = 0; let mut sum: int = 0; - let p = comm::port(); + let p = pipes::port_set(); let number_of_messages: int = 10; - let c = comm::chan(p); + let c = p.chan(); do task::spawn { test00_start(c, number_of_messages * 0, number_of_messages); } + let c = p.chan(); do task::spawn { test00_start(c, number_of_messages * 1, number_of_messages); } + let c = p.chan(); do task::spawn { test00_start(c, number_of_messages * 2, number_of_messages); } + let c = p.chan(); do task::spawn { test00_start(c, number_of_messages * 3, number_of_messages); } let mut i: int = 0; while i < number_of_messages { - r = comm::recv(p); + r = p.recv(); sum += r; - r = comm::recv(p); + r = p.recv(); sum += r; - r = comm::recv(p); + r = p.recv(); sum += r; - r = comm::recv(p); + r = p.recv(); sum += r; i += 1; } diff --git a/src/test/run-pass/task-comm-9.rs b/src/test/run-pass/task-comm-9.rs index 964029dee20..939cfd15797 100644 --- a/src/test/run-pass/task-comm-9.rs +++ b/src/test/run-pass/task-comm-9.rs @@ -1,20 +1,19 @@ use std; import task; -import comm; fn main() { test00(); } -fn test00_start(c: comm::chan, number_of_messages: int) { +fn test00_start(c: pipes::chan, number_of_messages: int) { let mut i: int = 0; - while i < number_of_messages { comm::send(c, i + 0); i += 1; } + while i < number_of_messages { c.send(i + 0); i += 1; } } fn test00() { let r: int = 0; let mut sum: int = 0; - let p = comm::port(); + let p = pipes::port_set(); let number_of_messages: int = 10; - let ch = comm::chan(p); + let ch = p.chan(); let mut result = none; do task::task().future_result(|-r| { result = some(r); }).spawn { @@ -23,7 +22,7 @@ fn test00() { let mut i: int = 0; while i < number_of_messages { - sum += comm::recv(p); + sum += p.recv(); log(debug, r); i += 1; } diff --git a/src/test/run-pass/trivial-message.rs b/src/test/run-pass/trivial-message.rs index 487b440ee42..8e92e8f2020 100644 --- a/src/test/run-pass/trivial-message.rs +++ b/src/test/run-pass/trivial-message.rs @@ -1,14 +1,12 @@ -use std; -import comm::*; +import pipes::{port, chan} /* This is about the simplest program that can successfully send a message. */ fn main() { - let po = port(); - let ch = chan(po); - send(ch, 42); - let r = recv(po); + let (ch, po) = pipes::stream(); + ch.send(42); + let r = po.recv(); log(error, r); }