Revamp TaskBuilder API
This patch consolidates and cleans up the task spawning APIs: * Removes the problematic `future_result` method from `std::task::TaskBuilder`, and adds a `try_future` that both spawns the task and returns a future representing its eventual result (or failure). * Removes the public `opts` field from `TaskBuilder`, instead adding appropriate builder methods to configure the task. * Adds extension traits to libgreen and libnative that add methods to `TaskBuilder` for spawning the task as a green or native thread. Previously, there was no way to benefit from the `TaskBuilder` functionality and also set the scheduler to spawn within. With this change, all task spawning scenarios are supported through the `TaskBuilder` interface. Closes #3725. [breaking-change]
This commit is contained in:
parent
8e9e17d188
commit
a23511a65d
5 changed files with 619 additions and 420 deletions
|
@ -159,16 +159,19 @@
|
||||||
//!
|
//!
|
||||||
//! # Using a scheduler pool
|
//! # Using a scheduler pool
|
||||||
//!
|
//!
|
||||||
|
//! This library adds a `GreenTaskBuilder` trait that extends the methods
|
||||||
|
//! available on `std::task::TaskBuilder` to allow spawning a green task,
|
||||||
|
//! possibly pinned to a particular scheduler thread:
|
||||||
|
//!
|
||||||
//! ```rust
|
//! ```rust
|
||||||
//! use std::rt::task::TaskOpts;
|
//! use std::task::TaskBuilder;
|
||||||
//! use green::{SchedPool, PoolConfig};
|
//! use green::{SchedPool, PoolConfig, GreenTaskBuilder};
|
||||||
//! use green::sched::{PinnedTask, TaskFromFriend};
|
|
||||||
//!
|
//!
|
||||||
//! let config = PoolConfig::new();
|
//! let config = PoolConfig::new();
|
||||||
//! let mut pool = SchedPool::new(config);
|
//! let mut pool = SchedPool::new(config);
|
||||||
//!
|
//!
|
||||||
//! // Spawn tasks into the pool of schedulers
|
//! // Spawn tasks into the pool of schedulers
|
||||||
//! pool.spawn(TaskOpts::new(), proc() {
|
//! TaskBuilder::new().green(&mut pool).spawn(proc() {
|
||||||
//! // this code is running inside the pool of schedulers
|
//! // this code is running inside the pool of schedulers
|
||||||
//!
|
//!
|
||||||
//! spawn(proc() {
|
//! spawn(proc() {
|
||||||
|
@ -181,12 +184,9 @@
|
||||||
//! let mut handle = pool.spawn_sched();
|
//! let mut handle = pool.spawn_sched();
|
||||||
//!
|
//!
|
||||||
//! // Pin a task to the spawned scheduler
|
//! // Pin a task to the spawned scheduler
|
||||||
//! let task = pool.task(TaskOpts::new(), proc() { /* ... */ });
|
//! TaskBuilder::new().green_pinned(&mut pool, &mut handle).spawn(proc() {
|
||||||
//! handle.send(PinnedTask(task));
|
//! /* ... */
|
||||||
//!
|
//! });
|
||||||
//! // Schedule a task on this new scheduler
|
|
||||||
//! let task = pool.task(TaskOpts::new(), proc() { /* ... */ });
|
|
||||||
//! handle.send(TaskFromFriend(task));
|
|
||||||
//!
|
//!
|
||||||
//! // Handles keep schedulers alive, so be sure to drop all handles before
|
//! // Handles keep schedulers alive, so be sure to drop all handles before
|
||||||
//! // destroying the sched pool
|
//! // destroying the sched pool
|
||||||
|
@ -209,6 +209,8 @@
|
||||||
// NB this does *not* include globs, please keep it that way.
|
// NB this does *not* include globs, please keep it that way.
|
||||||
#![feature(macro_rules, phase)]
|
#![feature(macro_rules, phase)]
|
||||||
#![allow(visible_private_types)]
|
#![allow(visible_private_types)]
|
||||||
|
#![allow(deprecated)]
|
||||||
|
#![feature(default_type_params)]
|
||||||
|
|
||||||
#[cfg(test)] #[phase(plugin, link)] extern crate log;
|
#[cfg(test)] #[phase(plugin, link)] extern crate log;
|
||||||
#[cfg(test)] extern crate rustuv;
|
#[cfg(test)] extern crate rustuv;
|
||||||
|
@ -224,8 +226,9 @@ use std::rt::task::TaskOpts;
|
||||||
use std::rt;
|
use std::rt;
|
||||||
use std::sync::atomics::{SeqCst, AtomicUint, INIT_ATOMIC_UINT};
|
use std::sync::atomics::{SeqCst, AtomicUint, INIT_ATOMIC_UINT};
|
||||||
use std::sync::deque;
|
use std::sync::deque;
|
||||||
|
use std::task::{TaskBuilder, Spawner};
|
||||||
|
|
||||||
use sched::{Shutdown, Scheduler, SchedHandle, TaskFromFriend, NewNeighbor};
|
use sched::{Shutdown, Scheduler, SchedHandle, TaskFromFriend, PinnedTask, NewNeighbor};
|
||||||
use sleeper_list::SleeperList;
|
use sleeper_list::SleeperList;
|
||||||
use stack::StackPool;
|
use stack::StackPool;
|
||||||
use task::GreenTask;
|
use task::GreenTask;
|
||||||
|
@ -444,6 +447,7 @@ impl SchedPool {
|
||||||
/// This is useful to create a task which can then be sent to a specific
|
/// This is useful to create a task which can then be sent to a specific
|
||||||
/// scheduler created by `spawn_sched` (and possibly pin it to that
|
/// scheduler created by `spawn_sched` (and possibly pin it to that
|
||||||
/// scheduler).
|
/// scheduler).
|
||||||
|
#[deprecated = "use the green and green_pinned methods of GreenTaskBuilder instead"]
|
||||||
pub fn task(&mut self, opts: TaskOpts, f: proc():Send) -> Box<GreenTask> {
|
pub fn task(&mut self, opts: TaskOpts, f: proc():Send) -> Box<GreenTask> {
|
||||||
GreenTask::configure(&mut self.stack_pool, opts, f)
|
GreenTask::configure(&mut self.stack_pool, opts, f)
|
||||||
}
|
}
|
||||||
|
@ -454,6 +458,7 @@ impl SchedPool {
|
||||||
/// New tasks are spawned in a round-robin fashion to the schedulers in this
|
/// New tasks are spawned in a round-robin fashion to the schedulers in this
|
||||||
/// pool, but tasks can certainly migrate among schedulers once they're in
|
/// pool, but tasks can certainly migrate among schedulers once they're in
|
||||||
/// the pool.
|
/// the pool.
|
||||||
|
#[deprecated = "use the green and green_pinned methods of GreenTaskBuilder instead"]
|
||||||
pub fn spawn(&mut self, opts: TaskOpts, f: proc():Send) {
|
pub fn spawn(&mut self, opts: TaskOpts, f: proc():Send) {
|
||||||
let task = self.task(opts, f);
|
let task = self.task(opts, f);
|
||||||
|
|
||||||
|
@ -563,3 +568,54 @@ impl Drop for SchedPool {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// A spawner for green tasks
|
||||||
|
pub struct GreenSpawner<'a>{
|
||||||
|
pool: &'a mut SchedPool,
|
||||||
|
handle: Option<&'a mut SchedHandle>
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> Spawner for GreenSpawner<'a> {
|
||||||
|
#[inline]
|
||||||
|
fn spawn(self, opts: TaskOpts, f: proc():Send) {
|
||||||
|
let GreenSpawner { pool, handle } = self;
|
||||||
|
match handle {
|
||||||
|
None => pool.spawn(opts, f),
|
||||||
|
Some(h) => h.send(PinnedTask(pool.task(opts, f)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// An extension trait adding `green` configuration methods to `TaskBuilder`.
|
||||||
|
pub trait GreenTaskBuilder {
|
||||||
|
fn green<'a>(self, &'a mut SchedPool) -> TaskBuilder<GreenSpawner<'a>>;
|
||||||
|
fn green_pinned<'a>(self, &'a mut SchedPool, &'a mut SchedHandle)
|
||||||
|
-> TaskBuilder<GreenSpawner<'a>>;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S: Spawner> GreenTaskBuilder for TaskBuilder<S> {
|
||||||
|
fn green<'a>(self, pool: &'a mut SchedPool) -> TaskBuilder<GreenSpawner<'a>> {
|
||||||
|
self.spawner(GreenSpawner {pool: pool, handle: None})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn green_pinned<'a>(self, pool: &'a mut SchedPool, handle: &'a mut SchedHandle)
|
||||||
|
-> TaskBuilder<GreenSpawner<'a>> {
|
||||||
|
self.spawner(GreenSpawner {pool: pool, handle: Some(handle)})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod test {
|
||||||
|
use std::task::TaskBuilder;
|
||||||
|
use super::{SchedPool, PoolConfig, GreenTaskBuilder};
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_green_builder() {
|
||||||
|
let mut pool = SchedPool::new(PoolConfig::new());
|
||||||
|
let res = TaskBuilder::new().green(&mut pool).try(proc() {
|
||||||
|
"Success!".to_string()
|
||||||
|
});
|
||||||
|
assert_eq!(res.ok().unwrap(), "Success!".to_string());
|
||||||
|
pool.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -32,10 +32,13 @@
|
||||||
//! ```rust
|
//! ```rust
|
||||||
//! extern crate native;
|
//! extern crate native;
|
||||||
//!
|
//!
|
||||||
|
//! use std::task::TaskBuilder;
|
||||||
|
//! use native::NativeTaskBuilder;
|
||||||
|
//!
|
||||||
//! fn main() {
|
//! fn main() {
|
||||||
//! // We're not sure whether this main function is run in 1:1 or M:N mode.
|
//! // We're not sure whether this main function is run in 1:1 or M:N mode.
|
||||||
//!
|
//!
|
||||||
//! native::task::spawn(proc() {
|
//! TaskBuilder::new().native().spawn(proc() {
|
||||||
//! // this code is guaranteed to be run on a native thread
|
//! // this code is guaranteed to be run on a native thread
|
||||||
//! });
|
//! });
|
||||||
//! }
|
//! }
|
||||||
|
@ -50,7 +53,8 @@
|
||||||
html_root_url = "http://doc.rust-lang.org/")]
|
html_root_url = "http://doc.rust-lang.org/")]
|
||||||
#![deny(unused_result, unused_must_use)]
|
#![deny(unused_result, unused_must_use)]
|
||||||
#![allow(non_camel_case_types)]
|
#![allow(non_camel_case_types)]
|
||||||
#![feature(macro_rules)]
|
#![allow(deprecated)]
|
||||||
|
#![feature(default_type_params)]
|
||||||
|
|
||||||
// NB this crate explicitly does *not* allow glob imports, please seriously
|
// NB this crate explicitly does *not* allow glob imports, please seriously
|
||||||
// consider whether they're needed before adding that feature here (the
|
// consider whether they're needed before adding that feature here (the
|
||||||
|
@ -65,6 +69,8 @@ use std::os;
|
||||||
use std::rt;
|
use std::rt;
|
||||||
use std::str;
|
use std::str;
|
||||||
|
|
||||||
|
pub use task::NativeTaskBuilder;
|
||||||
|
|
||||||
pub mod io;
|
pub mod io;
|
||||||
pub mod task;
|
pub mod task;
|
||||||
|
|
||||||
|
|
|
@ -27,6 +27,7 @@ use std::rt;
|
||||||
|
|
||||||
use io;
|
use io;
|
||||||
use task;
|
use task;
|
||||||
|
use std::task::{TaskBuilder, Spawner};
|
||||||
|
|
||||||
/// Creates a new Task which is ready to execute as a 1:1 task.
|
/// Creates a new Task which is ready to execute as a 1:1 task.
|
||||||
pub fn new(stack_bounds: (uint, uint)) -> Box<Task> {
|
pub fn new(stack_bounds: (uint, uint)) -> Box<Task> {
|
||||||
|
@ -48,12 +49,14 @@ fn ops() -> Box<Ops> {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Spawns a function with the default configuration
|
/// Spawns a function with the default configuration
|
||||||
|
#[deprecated = "use the native method of NativeTaskBuilder instead"]
|
||||||
pub fn spawn(f: proc():Send) {
|
pub fn spawn(f: proc():Send) {
|
||||||
spawn_opts(TaskOpts { name: None, stack_size: None, on_exit: None }, f)
|
spawn_opts(TaskOpts { name: None, stack_size: None, on_exit: None }, f)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Spawns a new task given the configuration options and a procedure to run
|
/// Spawns a new task given the configuration options and a procedure to run
|
||||||
/// inside the task.
|
/// inside the task.
|
||||||
|
#[deprecated = "use the native method of NativeTaskBuilder instead"]
|
||||||
pub fn spawn_opts(opts: TaskOpts, f: proc():Send) {
|
pub fn spawn_opts(opts: TaskOpts, f: proc():Send) {
|
||||||
let TaskOpts { name, stack_size, on_exit } = opts;
|
let TaskOpts { name, stack_size, on_exit } = opts;
|
||||||
|
|
||||||
|
@ -95,6 +98,26 @@ pub fn spawn_opts(opts: TaskOpts, f: proc():Send) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// A spawner for native tasks
|
||||||
|
pub struct NativeSpawner;
|
||||||
|
|
||||||
|
impl Spawner for NativeSpawner {
|
||||||
|
fn spawn(self, opts: TaskOpts, f: proc():Send) {
|
||||||
|
spawn_opts(opts, f)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// An extension trait adding a `native` configuration method to `TaskBuilder`.
|
||||||
|
pub trait NativeTaskBuilder {
|
||||||
|
fn native(self) -> TaskBuilder<NativeSpawner>;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S: Spawner> NativeTaskBuilder for TaskBuilder<S> {
|
||||||
|
fn native(self) -> TaskBuilder<NativeSpawner> {
|
||||||
|
self.spawner(NativeSpawner)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// This structure is the glue between channels and the 1:1 scheduling mode. This
|
// This structure is the glue between channels and the 1:1 scheduling mode. This
|
||||||
// structure is allocated once per task.
|
// structure is allocated once per task.
|
||||||
struct Ops {
|
struct Ops {
|
||||||
|
@ -259,7 +282,8 @@ mod tests {
|
||||||
use std::rt::local::Local;
|
use std::rt::local::Local;
|
||||||
use std::rt::task::{Task, TaskOpts};
|
use std::rt::task::{Task, TaskOpts};
|
||||||
use std::task;
|
use std::task;
|
||||||
use super::{spawn, spawn_opts, Ops};
|
use std::task::TaskBuilder;
|
||||||
|
use super::{spawn, spawn_opts, Ops, NativeTaskBuilder};
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn smoke() {
|
fn smoke() {
|
||||||
|
@ -347,4 +371,12 @@ mod tests {
|
||||||
});
|
});
|
||||||
rx.recv();
|
rx.recv();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_native_builder() {
|
||||||
|
let res = TaskBuilder::new().native().try(proc() {
|
||||||
|
"Success!".to_string()
|
||||||
|
});
|
||||||
|
assert_eq!(res.ok().unwrap(), "Success!".to_string());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,71 +8,110 @@
|
||||||
// option. This file may not be copied, modified, or distributed
|
// option. This file may not be copied, modified, or distributed
|
||||||
// except according to those terms.
|
// except according to those terms.
|
||||||
|
|
||||||
/*!
|
//! Utilities for managing and scheduling tasks
|
||||||
* Utilities for managing and scheduling tasks
|
//!
|
||||||
*
|
//! An executing Rust program consists of a collection of lightweight tasks,
|
||||||
* An executing Rust program consists of a collection of tasks, each with their
|
//! each with their own stack. Tasks communicate with each other using channels
|
||||||
* own stack, and sole ownership of their allocated heap data. Tasks communicate
|
//! (see `std::comm`) or other forms of synchronization (see `std::sync`) that
|
||||||
* with each other using channels (see `std::comm` for more info about how
|
//! ensure data-race freedom.
|
||||||
* communication works).
|
//!
|
||||||
*
|
//! Failure in one task does immediately propagate to any others (not to parent,
|
||||||
* Failure in one task does not propagate to any others (not to parent, not to
|
//! not to child). Failure propagation is instead handled as part of task
|
||||||
* child). Failure propagation is instead handled by using the channel send()
|
//! synchronization. For example, the channel `send()` and `recv()` methods will
|
||||||
* and recv() methods which will fail if the other end has hung up already.
|
//! fail if the other end has hung up already.
|
||||||
*
|
//!
|
||||||
* Task Scheduling:
|
//! # Basic task scheduling
|
||||||
*
|
//!
|
||||||
* By default, every task is created with the same "flavor" as the calling task.
|
//! By default, every task is created with the same "flavor" as the calling task.
|
||||||
* This flavor refers to the scheduling mode, with two possibilities currently
|
//! This flavor refers to the scheduling mode, with two possibilities currently
|
||||||
* being 1:1 and M:N modes. Green (M:N) tasks are cooperatively scheduled and
|
//! being 1:1 and M:N modes. Green (M:N) tasks are cooperatively scheduled and
|
||||||
* native (1:1) tasks are scheduled by the OS kernel.
|
//! native (1:1) tasks are scheduled by the OS kernel.
|
||||||
*
|
//!
|
||||||
* # Example
|
//! ## Example
|
||||||
*
|
//!
|
||||||
* ```rust
|
//! ```rust
|
||||||
* spawn(proc() {
|
//! spawn(proc() {
|
||||||
* println!("Hello, World!");
|
//! println!("Hello, World!");
|
||||||
* })
|
//! })
|
||||||
* ```
|
//! ```
|
||||||
*/
|
//!
|
||||||
|
//! # Advanced task scheduling
|
||||||
|
//!
|
||||||
|
//! Task spawning can also be configured to use a particular scheduler, to
|
||||||
|
//! redirect the new task's output, or to yield a `future` representing the
|
||||||
|
//! task's final result. The configuration is established using the
|
||||||
|
//! `TaskBuilder` API:
|
||||||
|
//!
|
||||||
|
//! ## Example
|
||||||
|
//!
|
||||||
|
//! ```rust
|
||||||
|
//! extern crate green;
|
||||||
|
//! extern crate native;
|
||||||
|
//!
|
||||||
|
//! use std::task::TaskBuilder;
|
||||||
|
//! use green::{SchedPool, PoolConfig, GreenTaskBuilder};
|
||||||
|
//! use native::NativeTaskBuilder;
|
||||||
|
//!
|
||||||
|
//! # fn main() {
|
||||||
|
//! // Create a green scheduler pool with the default configuration
|
||||||
|
//! let mut pool = SchedPool::new(PoolConfig::new());
|
||||||
|
//!
|
||||||
|
//! // Spawn a task in the green pool
|
||||||
|
//! let mut fut_green = TaskBuilder::new().green(&mut pool).try_future(proc() {
|
||||||
|
//! /* ... */
|
||||||
|
//! });
|
||||||
|
//!
|
||||||
|
//! // Spawn a native task
|
||||||
|
//! let mut fut_native = TaskBuilder::new().native().try_future(proc() {
|
||||||
|
//! /* ... */
|
||||||
|
//! });
|
||||||
|
//!
|
||||||
|
//! // Wait for both tasks to finish, recording their outcome
|
||||||
|
//! let res_green = fut_green.unwrap();
|
||||||
|
//! let res_native = fut_native.unwrap();
|
||||||
|
//!
|
||||||
|
//! // Shut down the green scheduler pool
|
||||||
|
//! pool.shutdown();
|
||||||
|
//! # }
|
||||||
|
//! ```
|
||||||
|
|
||||||
use any::Any;
|
use any::Any;
|
||||||
use comm::{Sender, Receiver, channel};
|
use comm::channel;
|
||||||
use io::{Writer, stdio};
|
use io::{Writer, stdio};
|
||||||
use kinds::{Send, marker};
|
use kinds::{Send, marker};
|
||||||
use option::{None, Some, Option};
|
use option::{None, Some, Option};
|
||||||
use owned::Box;
|
use owned::Box;
|
||||||
use result::{Result, Ok, Err};
|
use result::Result;
|
||||||
use rt::local::Local;
|
use rt::local::Local;
|
||||||
use rt::task;
|
use rt::task;
|
||||||
use rt::task::Task;
|
use rt::task::Task;
|
||||||
use str::{Str, SendStr, IntoMaybeOwned};
|
use str::{Str, SendStr, IntoMaybeOwned};
|
||||||
|
use sync::Future;
|
||||||
|
|
||||||
#[cfg(test)] use any::AnyRefExt;
|
/// A means of spawning a task
|
||||||
#[cfg(test)] use owned::AnyOwnExt;
|
pub trait Spawner {
|
||||||
#[cfg(test)] use result;
|
/// Spawn a task, given low-level task options.
|
||||||
#[cfg(test)] use str::StrAllocating;
|
fn spawn(self, opts: task::TaskOpts, f: proc():Send);
|
||||||
#[cfg(test)] use string::String;
|
|
||||||
|
|
||||||
/// Task configuration options
|
|
||||||
pub struct TaskOpts {
|
|
||||||
/// Enable lifecycle notifications on the given channel
|
|
||||||
pub notify_chan: Option<Sender<task::Result>>,
|
|
||||||
/// A name for the task-to-be, for identification in failure messages
|
|
||||||
pub name: Option<SendStr>,
|
|
||||||
/// The size of the stack for the spawned task
|
|
||||||
pub stack_size: Option<uint>,
|
|
||||||
/// Task-local stdout
|
|
||||||
pub stdout: Option<Box<Writer + Send>>,
|
|
||||||
/// Task-local stderr
|
|
||||||
pub stderr: Option<Box<Writer + Send>>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/// The default task spawner, which spawns siblings to the current task.
|
||||||
* The task builder type.
|
pub struct SiblingSpawner;
|
||||||
*
|
|
||||||
* Provides detailed control over the properties and behavior of new tasks.
|
impl Spawner for SiblingSpawner {
|
||||||
*/
|
fn spawn(self, opts: task::TaskOpts, f: proc():Send) {
|
||||||
|
// bind tb to provide type annotation
|
||||||
|
let tb: Option<Box<Task>> = Local::try_take();
|
||||||
|
match tb {
|
||||||
|
Some(t) => t.spawn_sibling(opts, f),
|
||||||
|
None => fail!("need a local task to spawn a sibling task"),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The task builder type.
|
||||||
|
///
|
||||||
|
/// Provides detailed control over the properties and behavior of new tasks.
|
||||||
|
|
||||||
// NB: Builders are designed to be single-use because they do stateful
|
// NB: Builders are designed to be single-use because they do stateful
|
||||||
// things that get weird when reusing - e.g. if you create a result future
|
// things that get weird when reusing - e.g. if you create a result future
|
||||||
// it only applies to a single task, so then you have to maintain Some
|
// it only applies to a single task, so then you have to maintain Some
|
||||||
|
@ -80,75 +119,102 @@ pub struct TaskOpts {
|
||||||
// when you try to reuse the builder to spawn a new task. We'll just
|
// when you try to reuse the builder to spawn a new task. We'll just
|
||||||
// sidestep that whole issue by making builders uncopyable and making
|
// sidestep that whole issue by making builders uncopyable and making
|
||||||
// the run function move them in.
|
// the run function move them in.
|
||||||
pub struct TaskBuilder {
|
pub struct TaskBuilder<S = SiblingSpawner> {
|
||||||
/// Options to spawn the new task with
|
// A name for the task-to-be, for identification in failure messages
|
||||||
pub opts: TaskOpts,
|
name: Option<SendStr>,
|
||||||
|
// The size of the stack for the spawned task
|
||||||
|
stack_size: Option<uint>,
|
||||||
|
// Task-local stdout
|
||||||
|
stdout: Option<Box<Writer + Send>>,
|
||||||
|
// Task-local stderr
|
||||||
|
stderr: Option<Box<Writer + Send>>,
|
||||||
|
// The mechanics of actually spawning the task (i.e.: green or native)
|
||||||
|
spawner: S,
|
||||||
|
// Optionally wrap the eventual task body
|
||||||
gen_body: Option<proc(v: proc():Send):Send -> proc():Send>,
|
gen_body: Option<proc(v: proc():Send):Send -> proc():Send>,
|
||||||
nocopy: marker::NoCopy,
|
nocopy: marker::NoCopy,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TaskBuilder {
|
impl TaskBuilder<SiblingSpawner> {
|
||||||
/// Generate the base configuration for spawning a task, off of which more
|
/// Generate the base configuration for spawning a task, off of which more
|
||||||
/// configuration methods can be chained.
|
/// configuration methods can be chained.
|
||||||
pub fn new() -> TaskBuilder {
|
pub fn new() -> TaskBuilder<SiblingSpawner> {
|
||||||
TaskBuilder {
|
TaskBuilder {
|
||||||
opts: TaskOpts::new(),
|
name: None,
|
||||||
|
stack_size: None,
|
||||||
|
stdout: None,
|
||||||
|
stderr: None,
|
||||||
|
spawner: SiblingSpawner,
|
||||||
gen_body: None,
|
gen_body: None,
|
||||||
nocopy: marker::NoCopy,
|
nocopy: marker::NoCopy,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get a future representing the exit status of the task.
|
|
||||||
///
|
|
||||||
/// Taking the value of the future will block until the child task
|
|
||||||
/// terminates. The future result return value will be created *before* the task is
|
|
||||||
/// spawned; as such, do not invoke .get() on it directly;
|
|
||||||
/// rather, store it in an outer variable/list for later use.
|
|
||||||
///
|
|
||||||
/// # Failure
|
|
||||||
/// Fails if a future_result was already set for this task.
|
|
||||||
pub fn future_result(&mut self) -> Receiver<task::Result> {
|
|
||||||
// FIXME (#3725): Once linked failure and notification are
|
|
||||||
// handled in the library, I can imagine implementing this by just
|
|
||||||
// registering an arbitrary number of task::on_exit handlers and
|
|
||||||
// sending out messages.
|
|
||||||
|
|
||||||
if self.opts.notify_chan.is_some() {
|
|
||||||
fail!("Can't set multiple future_results for one task!");
|
|
||||||
}
|
|
||||||
|
|
||||||
// Construct the future and give it to the caller.
|
|
||||||
let (tx, rx) = channel();
|
|
||||||
|
|
||||||
// Reconfigure self to use a notify channel.
|
|
||||||
self.opts.notify_chan = Some(tx);
|
|
||||||
|
|
||||||
rx
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<S: Spawner> TaskBuilder<S> {
|
||||||
/// Name the task-to-be. Currently the name is used for identification
|
/// Name the task-to-be. Currently the name is used for identification
|
||||||
/// only in failure messages.
|
/// only in failure messages.
|
||||||
pub fn named<S: IntoMaybeOwned<'static>>(mut self, name: S) -> TaskBuilder {
|
pub fn named<T: IntoMaybeOwned<'static>>(mut self, name: T) -> TaskBuilder<S> {
|
||||||
self.opts.name = Some(name.into_maybe_owned());
|
self.name = Some(name.into_maybe_owned());
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/// Set the size of the stack for the new task.
|
||||||
* Add a wrapper to the body of the spawned task.
|
pub fn stack_size(mut self, size: uint) -> TaskBuilder<S> {
|
||||||
*
|
self.stack_size = Some(size);
|
||||||
* Before the task is spawned it is passed through a 'body generator'
|
self
|
||||||
* function that may perform local setup operations as well as wrap
|
}
|
||||||
* the task body in remote setup operations. With this the behavior
|
|
||||||
* of tasks can be extended in simple ways.
|
/// Redirect task-local stdout.
|
||||||
*
|
pub fn stdout(mut self, stdout: Box<Writer + Send>) -> TaskBuilder<S> {
|
||||||
* This function augments the current body generator with a new body
|
self.stdout = Some(stdout);
|
||||||
* generator by applying the task body which results from the
|
self
|
||||||
* existing body generator to the new body generator.
|
}
|
||||||
*/
|
|
||||||
pub fn with_wrapper(mut self,
|
/// Redirect task-local stderr.
|
||||||
wrapper: proc(v: proc(): Send): Send -> proc(): Send)
|
pub fn stderr(mut self, stderr: Box<Writer + Send>) -> TaskBuilder<S> {
|
||||||
-> TaskBuilder
|
self.stderr = Some(stderr);
|
||||||
{
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set the spawning mechanism for the task.
|
||||||
|
///
|
||||||
|
/// The `TaskBuilder` API configures a task to be spawned, but defers to the
|
||||||
|
/// "spawner" to actually create and spawn the task. The `spawner` method
|
||||||
|
/// should not be called directly by `TaskBuiler` clients. It is intended
|
||||||
|
/// for use by downstream crates (like `native` and `green`) that implement
|
||||||
|
/// tasks. These downstream crates then add extension methods to the
|
||||||
|
/// builder, like `.native()` and `.green(pool)`, that actually set the
|
||||||
|
/// spawner.
|
||||||
|
pub fn spawner<T: Spawner>(self, spawner: T) -> TaskBuilder<T> {
|
||||||
|
// repackage the entire TaskBuilder since its type is changing.
|
||||||
|
let TaskBuilder {
|
||||||
|
name, stack_size, stdout, stderr, spawner: _, gen_body, nocopy
|
||||||
|
} = self;
|
||||||
|
TaskBuilder {
|
||||||
|
name: name,
|
||||||
|
stack_size: stack_size,
|
||||||
|
stdout: stdout,
|
||||||
|
stderr: stderr,
|
||||||
|
spawner: spawner,
|
||||||
|
gen_body: gen_body,
|
||||||
|
nocopy: nocopy,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Add a wrapper to the body of the spawned task.
|
||||||
|
///
|
||||||
|
/// Before the task is spawned it is passed through a 'body generator'
|
||||||
|
/// function that may perform local setup operations as well as wrap
|
||||||
|
/// the task body in remote setup operations. With this the behavior
|
||||||
|
/// of tasks can be extended in simple ways.
|
||||||
|
///
|
||||||
|
/// This function augments the current body generator with a new body
|
||||||
|
/// generator by applying the task body which results from the
|
||||||
|
/// existing body generator to the new body generator.
|
||||||
|
#[deprecated = "this function will be removed soon"]
|
||||||
|
pub fn with_wrapper(mut self, wrapper: proc(v: proc():Send):Send -> proc():Send)
|
||||||
|
-> TaskBuilder<S> {
|
||||||
self.gen_body = match self.gen_body.take() {
|
self.gen_body = match self.gen_body.take() {
|
||||||
Some(prev) => Some(proc(body) { wrapper(prev(body)) }),
|
Some(prev) => Some(proc(body) { wrapper(prev(body)) }),
|
||||||
None => Some(wrapper)
|
None => Some(wrapper)
|
||||||
|
@ -156,90 +222,80 @@ impl TaskBuilder {
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
// Where spawning actually happens (whether yielding a future or not)
|
||||||
* Creates and executes a new child task
|
fn spawn_internal(self, f: proc():Send,
|
||||||
*
|
on_exit: Option<proc(Result<(), Box<Any + Send>>):Send>) {
|
||||||
* Sets up a new task with its own call stack and schedules it to run
|
let TaskBuilder {
|
||||||
* the provided unique closure. The task has the properties and behavior
|
name, stack_size, stdout, stderr, spawner, mut gen_body, nocopy: _
|
||||||
* specified by the task_builder.
|
} = self;
|
||||||
*/
|
let f = match gen_body.take() {
|
||||||
pub fn spawn(mut self, f: proc(): Send) {
|
|
||||||
let gen_body = self.gen_body.take();
|
|
||||||
let f = match gen_body {
|
|
||||||
Some(gen) => gen(f),
|
Some(gen) => gen(f),
|
||||||
None => f
|
None => f
|
||||||
};
|
};
|
||||||
let t: Box<Task> = match Local::try_take() {
|
|
||||||
Some(t) => t,
|
|
||||||
None => fail!("need a local task to spawn a new task"),
|
|
||||||
};
|
|
||||||
let TaskOpts { notify_chan, name, stack_size, stdout, stderr } = self.opts;
|
|
||||||
|
|
||||||
let opts = task::TaskOpts {
|
let opts = task::TaskOpts {
|
||||||
on_exit: notify_chan.map(|c| proc(r) c.send(r)),
|
on_exit: on_exit,
|
||||||
name: name,
|
name: name,
|
||||||
stack_size: stack_size,
|
stack_size: stack_size,
|
||||||
};
|
};
|
||||||
if stdout.is_some() || stderr.is_some() {
|
if stdout.is_some() || stderr.is_some() {
|
||||||
t.spawn_sibling(opts, proc() {
|
spawner.spawn(opts, proc() {
|
||||||
let _ = stdout.map(stdio::set_stdout);
|
let _ = stdout.map(stdio::set_stdout);
|
||||||
let _ = stderr.map(stdio::set_stderr);
|
let _ = stderr.map(stdio::set_stderr);
|
||||||
f();
|
f();
|
||||||
});
|
})
|
||||||
} else {
|
} else {
|
||||||
t.spawn_sibling(opts, f);
|
spawner.spawn(opts, f)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/// Creates and executes a new child task.
|
||||||
* Execute a function in another task and return either the return value
|
///
|
||||||
* of the function or result::err.
|
/// Sets up a new task with its own call stack and schedules it to run
|
||||||
*
|
/// the provided proc. The task has the properties and behavior
|
||||||
* # Return value
|
/// specified by the `TaskBuilder`.
|
||||||
*
|
pub fn spawn(self, f: proc():Send) {
|
||||||
* If the function executed successfully then try returns result::ok
|
self.spawn_internal(f, None)
|
||||||
* containing the value returned by the function. If the function fails
|
|
||||||
* then try returns result::err containing nil.
|
|
||||||
*
|
|
||||||
* # Failure
|
|
||||||
* Fails if a future_result was already set for this task.
|
|
||||||
*/
|
|
||||||
pub fn try<T: Send>(mut self, f: proc(): Send -> T)
|
|
||||||
-> Result<T, Box<Any + Send>> {
|
|
||||||
let (tx, rx) = channel();
|
|
||||||
|
|
||||||
let result = self.future_result();
|
|
||||||
|
|
||||||
self.spawn(proc() {
|
|
||||||
tx.send(f());
|
|
||||||
});
|
|
||||||
|
|
||||||
match result.recv() {
|
|
||||||
Ok(()) => Ok(rx.recv()),
|
|
||||||
Err(cause) => Err(cause)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Task construction */
|
/// Execute a proc in a newly-spawned task and return a future representing
|
||||||
|
/// the task's result. The task has the properties and behavior
|
||||||
|
/// specified by the `TaskBuilder`.
|
||||||
|
///
|
||||||
|
/// Taking the value of the future will block until the child task
|
||||||
|
/// terminates.
|
||||||
|
///
|
||||||
|
/// # Return value
|
||||||
|
///
|
||||||
|
/// If the child task executes successfully (without failing) then the
|
||||||
|
/// future returns `result::Ok` containing the value returned by the
|
||||||
|
/// function. If the child task fails then the future returns `result::Err`
|
||||||
|
/// containing the argument to `fail!(...)` as an `Any` trait object.
|
||||||
|
pub fn try_future<T:Send>(self, f: proc():Send -> T)
|
||||||
|
-> Future<Result<T, Box<Any + Send>>> {
|
||||||
|
// currently, the on_exit proc provided by librustrt only works for unit
|
||||||
|
// results, so we use an additional side-channel to communicate the
|
||||||
|
// result.
|
||||||
|
|
||||||
impl TaskOpts {
|
let (tx_done, rx_done) = channel(); // signal that task has exited
|
||||||
pub fn new() -> TaskOpts {
|
let (tx_retv, rx_retv) = channel(); // return value from task
|
||||||
/*!
|
|
||||||
* The default task options
|
|
||||||
*/
|
|
||||||
|
|
||||||
TaskOpts {
|
let on_exit = proc(res) { tx_done.send(res) };
|
||||||
notify_chan: None,
|
self.spawn_internal(proc() { tx_retv.send(f()) },
|
||||||
name: None,
|
Some(on_exit));
|
||||||
stack_size: None,
|
|
||||||
stdout: None,
|
Future::from_fn(proc() {
|
||||||
stderr: None,
|
rx_done.recv().map(|_| rx_retv.recv())
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Execute a function in a newly-spawnedtask and block until the task
|
||||||
|
/// completes or fails. Equivalent to `.try_future(f).unwrap()`.
|
||||||
|
pub fn try<T:Send>(self, f: proc():Send -> T) -> Result<T, Box<Any + Send>> {
|
||||||
|
self.try_future(f).unwrap()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Spawn convenience functions */
|
/* Convenience functions */
|
||||||
|
|
||||||
/// Creates and executes a new child task
|
/// Creates and executes a new child task
|
||||||
///
|
///
|
||||||
|
@ -251,14 +307,22 @@ pub fn spawn(f: proc(): Send) {
|
||||||
TaskBuilder::new().spawn(f)
|
TaskBuilder::new().spawn(f)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Execute a function in another task and return either the return value of
|
/// Execute a function in a newly-spawned task and return either the return
|
||||||
/// the function or an error if the task failed
|
/// value of the function or an error if the task failed.
|
||||||
///
|
///
|
||||||
/// This is equivalent to TaskBuilder::new().try
|
/// This is equivalent to `TaskBuilder::new().try`.
|
||||||
pub fn try<T: Send>(f: proc(): Send -> T) -> Result<T, Box<Any + Send>> {
|
pub fn try<T: Send>(f: proc(): Send -> T) -> Result<T, Box<Any + Send>> {
|
||||||
TaskBuilder::new().try(f)
|
TaskBuilder::new().try(f)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Execute a function in another task and return a future representing the
|
||||||
|
/// task's result.
|
||||||
|
///
|
||||||
|
/// This is equivalent to `TaskBuilder::new().try_future`.
|
||||||
|
pub fn try_future<T:Send>(f: proc():Send -> T) -> Future<Result<T, Box<Any + Send>>> {
|
||||||
|
TaskBuilder::new().try_future(f)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/* Lifecycle functions */
|
/* Lifecycle functions */
|
||||||
|
|
||||||
|
@ -273,9 +337,8 @@ pub fn with_task_name<U>(blk: |Option<&str>| -> U) -> U {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Yield control to the task scheduler.
|
||||||
pub fn deschedule() {
|
pub fn deschedule() {
|
||||||
//! Yield control to the task scheduler
|
|
||||||
|
|
||||||
use rt::local::Local;
|
use rt::local::Local;
|
||||||
|
|
||||||
// FIXME(#7544): Optimize this, since we know we won't block.
|
// FIXME(#7544): Optimize this, since we know we won't block.
|
||||||
|
@ -283,16 +346,26 @@ pub fn deschedule() {
|
||||||
task.yield_now();
|
task.yield_now();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// True if the running task is currently failing (e.g. will return `true` inside a
|
||||||
|
/// destructor that is run while unwinding the stack after a call to `fail!()`).
|
||||||
pub fn failing() -> bool {
|
pub fn failing() -> bool {
|
||||||
//! True if the running task has failed
|
|
||||||
use rt::task::Task;
|
use rt::task::Task;
|
||||||
Local::borrow(None::<Task>).unwinder.unwinding()
|
Local::borrow(None::<Task>).unwinder.unwinding()
|
||||||
}
|
}
|
||||||
|
|
||||||
// The following 8 tests test the following 2^3 combinations:
|
#[cfg(test)]
|
||||||
// {un,}linked {un,}supervised failure propagation {up,down}wards.
|
mod test {
|
||||||
|
use any::{Any, AnyRefExt};
|
||||||
|
use owned::AnyOwnExt;
|
||||||
|
use result;
|
||||||
|
use result::{Ok, Err};
|
||||||
|
use str::StrAllocating;
|
||||||
|
use string::String;
|
||||||
|
use std::io::{ChanReader, ChanWriter};
|
||||||
|
use prelude::*;
|
||||||
|
use super::*;
|
||||||
|
|
||||||
// !!! These tests are dangerous. If Something is buggy, they will hang, !!!
|
// !!! These tests are dangerous. If something is buggy, they will hang, !!!
|
||||||
// !!! instead of exiting cleanly. This might wedge the buildbots. !!!
|
// !!! instead of exiting cleanly. This might wedge the buildbots. !!!
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -354,25 +427,14 @@ fn test_with_wrapper() {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_future_result() {
|
fn test_try_future() {
|
||||||
let mut builder = TaskBuilder::new();
|
let result = TaskBuilder::new().try_future(proc() {});
|
||||||
let result = builder.future_result();
|
assert!(result.unwrap().is_ok());
|
||||||
builder.spawn(proc() {});
|
|
||||||
assert!(result.recv().is_ok());
|
|
||||||
|
|
||||||
let mut builder = TaskBuilder::new();
|
let result = TaskBuilder::new().try_future(proc() -> () {
|
||||||
let result = builder.future_result();
|
|
||||||
builder.spawn(proc() {
|
|
||||||
fail!();
|
fail!();
|
||||||
});
|
});
|
||||||
assert!(result.recv().is_err());
|
assert!(result.unwrap().is_err());
|
||||||
}
|
|
||||||
|
|
||||||
#[test] #[should_fail]
|
|
||||||
fn test_back_to_the_future_result() {
|
|
||||||
let mut builder = TaskBuilder::new();
|
|
||||||
builder.future_result();
|
|
||||||
builder.future_result();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -429,7 +491,6 @@ fn test_spawn_sched_childs_on_default_sched() {
|
||||||
rx.recv();
|
rx.recv();
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
fn avoid_copying_the_body(spawnfn: |v: proc():Send|) {
|
fn avoid_copying_the_body(spawnfn: |v: proc():Send|) {
|
||||||
let (tx, rx) = channel::<uint>();
|
let (tx, rx) = channel::<uint>();
|
||||||
|
|
||||||
|
@ -546,3 +607,21 @@ fn test_try_fail_message_unit_struct() {
|
||||||
Err(_) | Ok(()) => fail!()
|
Err(_) | Ok(()) => fail!()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_stdout() {
|
||||||
|
let (tx, rx) = channel();
|
||||||
|
let mut reader = ChanReader::new(rx);
|
||||||
|
let stdout = ChanWriter::new(tx);
|
||||||
|
|
||||||
|
TaskBuilder::new().stdout(box stdout as Box<Writer + Send>).try(proc() {
|
||||||
|
print!("Hello, world!");
|
||||||
|
}).unwrap();
|
||||||
|
|
||||||
|
let output = reader.read_to_str().unwrap();
|
||||||
|
assert_eq!(output, "Hello, world!".to_string());
|
||||||
|
}
|
||||||
|
|
||||||
|
// NOTE: the corresponding test for stderr is in run-pass/task-stderr, due
|
||||||
|
// to the test harness apparently interfering with stderr configuration.
|
||||||
|
}
|
||||||
|
|
26
src/test/run-pass/task-stderr.rs
Normal file
26
src/test/run-pass/task-stderr.rs
Normal file
|
@ -0,0 +1,26 @@
|
||||||
|
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
|
||||||
|
// file at the top-level directory of this distribution and at
|
||||||
|
// http://rust-lang.org/COPYRIGHT.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
|
||||||
|
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
|
||||||
|
// option. This file may not be copied, modified, or distributed
|
||||||
|
// except according to those terms.
|
||||||
|
|
||||||
|
use std::io::{ChanReader, ChanWriter};
|
||||||
|
use std::task::build;
|
||||||
|
|
||||||
|
fn main() {
|
||||||
|
let (tx, rx) = channel();
|
||||||
|
let mut reader = ChanReader::new(rx);
|
||||||
|
let stderr = ChanWriter::new(tx);
|
||||||
|
|
||||||
|
let res = build().stderr(box stderr as Box<Writer + Send>).try(proc() -> () {
|
||||||
|
fail!("Hello, world!")
|
||||||
|
});
|
||||||
|
assert!(res.is_err());
|
||||||
|
|
||||||
|
let output = reader.read_to_str().unwrap();
|
||||||
|
assert!(output.as_slice().contains("Hello, world!"));
|
||||||
|
}
|
Loading…
Add table
Reference in a new issue