Merge remote-tracking branch 'brson/mainthread'

Conflicts:
	src/rt/rust_sched_loop.cpp
	src/rt/rust_shape.cpp
	src/rt/rust_task.cpp
This commit is contained in:
Brian Anderson 2012-04-01 00:13:59 -07:00
commit e78396850d
30 changed files with 577 additions and 260 deletions

View file

@ -50,8 +50,11 @@ RUNTIME_CS_$(1) := \
rt/rust_builtin.cpp \
rt/rust_run_program.cpp \
rt/rust_env.cpp \
rt/rust_task_thread.cpp \
rt/rust_sched_loop.cpp \
rt/rust_sched_launcher.cpp \
rt/rust_sched_driver.cpp \
rt/rust_scheduler.cpp \
rt/rust_sched_reaper.cpp \
rt/rust_task.cpp \
rt/rust_stack.cpp \
rt/rust_port.cpp \

View file

@ -389,6 +389,34 @@
fun:uv_loop_delete
}
{
lock_and_signal-probably-threadsafe-access-outside-of-lock
Helgrind:Race
fun:_ZN15lock_and_signal27lock_held_by_current_threadEv
...
}
{
lock_and_signal-probably-threadsafe-access-outside-of-lock2
Helgrind:Race
fun:_ZN15lock_and_signal6unlockEv
...
}
{
lock_and_signal-probably-threadsafe-access-outside-of-lock3
Helgrind:Race
fun:_ZN15lock_and_signal4lockEv
...
}
{
lock_and_signal-probably-threadsafe-access-outside-of-lock4
Helgrind:Race
fun:_ZN15lock_and_signal4waitEv
...
}
{
uv-async-send-does-racy-things
Helgrind:Race

View file

@ -80,21 +80,20 @@ rust_start(uintptr_t main_fn, int argc, char **argv, void* crate_map) {
rust_sched_id sched_id = kernel->create_scheduler(env->num_sched_threads);
rust_scheduler *sched = kernel->get_scheduler_by_id(sched_id);
rust_task *root_task = sched->create_task(NULL, "main");
rust_task_thread *thread = root_task->thread;
command_line_args *args
= new (kernel, "main command line args")
command_line_args(root_task, argc, argv);
DLOG(thread, dom, "startup: %d args in 0x%" PRIxPTR,
LOG(root_task, dom, "startup: %d args in 0x%" PRIxPTR,
args->argc, (uintptr_t)args->args);
for (int i = 0; i < args->argc; i++) {
DLOG(thread, dom, "startup: arg[%d] = '%s'", i, args->argv[i]);
LOG(root_task, dom, "startup: arg[%d] = '%s'", i, args->argv[i]);
}
root_task->start((spawn_fn)main_fn, NULL, args->args);
root_task = NULL;
int ret = kernel->wait_for_schedulers();
int ret = kernel->wait_for_exit();
delete args;
delete kernel;
delete srv;

View file

@ -1,7 +1,7 @@
/* Native builtins. */
#include "rust_internal.h"
#include "rust_task_thread.h"
#include "rust_sched_loop.h"
#include "rust_task.h"
#include "rust_util.h"
#include "rust_scheduler.h"
@ -22,7 +22,7 @@ extern char **environ;
extern "C" CDECL rust_str*
last_os_error() {
rust_task *task = rust_task_thread::get_task();
rust_task *task = rust_sched_loop::get_task();
LOG(task, task, "last_os_error()");
@ -65,7 +65,7 @@ last_os_error() {
extern "C" CDECL rust_str *
rust_getcwd() {
rust_task *task = rust_task_thread::get_task();
rust_task *task = rust_sched_loop::get_task();
LOG(task, task, "rust_getcwd()");
char cbuf[BUF_BYTES];
@ -85,7 +85,7 @@ rust_getcwd() {
#if defined(__WIN32__)
extern "C" CDECL rust_vec *
rust_env_pairs() {
rust_task *task = rust_task_thread::get_task();
rust_task *task = rust_sched_loop::get_task();
size_t envc = 0;
LPTCH ch = GetEnvironmentStringsA();
LPTCH c;
@ -111,7 +111,7 @@ rust_env_pairs() {
#else
extern "C" CDECL rust_vec *
rust_env_pairs() {
rust_task *task = rust_task_thread::get_task();
rust_task *task = rust_sched_loop::get_task();
#ifdef __APPLE__
char **environ = *_NSGetEnviron();
#endif
@ -133,21 +133,21 @@ refcount(intptr_t *v) {
extern "C" CDECL void
unsupervise() {
rust_task *task = rust_task_thread::get_task();
rust_task *task = rust_sched_loop::get_task();
task->unsupervise();
}
extern "C" CDECL void
vec_reserve_shared(type_desc* ty, rust_vec** vp,
size_t n_elts) {
rust_task *task = rust_task_thread::get_task();
rust_task *task = rust_sched_loop::get_task();
reserve_vec_exact(task, vp, n_elts * ty->size);
}
extern "C" CDECL void
str_reserve_shared(rust_vec** sp,
size_t n_elts) {
rust_task *task = rust_task_thread::get_task();
rust_task *task = rust_sched_loop::get_task();
reserve_vec_exact(task, sp, n_elts + 1);
}
@ -157,7 +157,7 @@ str_reserve_shared(rust_vec** sp,
*/
extern "C" CDECL rust_vec*
vec_from_buf_shared(type_desc *ty, void *ptr, size_t count) {
rust_task *task = rust_task_thread::get_task();
rust_task *task = rust_sched_loop::get_task();
size_t fill = ty->size * count;
rust_vec* v = (rust_vec*)task->kernel->malloc(fill + sizeof(rust_vec),
"vec_from_buf");
@ -168,7 +168,7 @@ vec_from_buf_shared(type_desc *ty, void *ptr, size_t count) {
extern "C" CDECL void
rust_str_push(rust_vec** sp, uint8_t byte) {
rust_task *task = rust_task_thread::get_task();
rust_task *task = rust_sched_loop::get_task();
size_t fill = (*sp)->fill;
reserve_vec(task, sp, fill + 1);
(*sp)->data[fill-1] = byte;
@ -178,8 +178,8 @@ rust_str_push(rust_vec** sp, uint8_t byte) {
extern "C" CDECL void *
rand_new() {
rust_task *task = rust_task_thread::get_task();
rust_task_thread *thread = task->thread;
rust_task *task = rust_sched_loop::get_task();
rust_sched_loop *thread = task->sched_loop;
randctx *rctx = (randctx *) task->malloc(sizeof(randctx), "randctx");
if (!rctx) {
task->fail();
@ -196,7 +196,7 @@ rand_next(randctx *rctx) {
extern "C" CDECL void
rand_free(randctx *rctx) {
rust_task *task = rust_task_thread::get_task();
rust_task *task = rust_sched_loop::get_task();
task->free(rctx);
}
@ -242,7 +242,7 @@ debug_abi_2(floats f) {
static void
debug_tydesc_helper(type_desc *t)
{
rust_task *task = rust_task_thread::get_task();
rust_task *task = rust_sched_loop::get_task();
LOG(task, stdlib, " size %" PRIdPTR ", align %" PRIdPTR
", first_param 0x%" PRIxPTR,
t->size, t->align, t->first_param);
@ -250,14 +250,14 @@ debug_tydesc_helper(type_desc *t)
extern "C" CDECL void
debug_tydesc(type_desc *t) {
rust_task *task = rust_task_thread::get_task();
rust_task *task = rust_sched_loop::get_task();
LOG(task, stdlib, "debug_tydesc");
debug_tydesc_helper(t);
}
extern "C" CDECL void
debug_opaque(type_desc *t, uint8_t *front) {
rust_task *task = rust_task_thread::get_task();
rust_task *task = rust_sched_loop::get_task();
LOG(task, stdlib, "debug_opaque");
debug_tydesc_helper(t);
// FIXME may want to actually account for alignment. `front` may not
@ -277,7 +277,7 @@ struct rust_box {
extern "C" CDECL void
debug_box(type_desc *t, rust_box *box) {
rust_task *task = rust_task_thread::get_task();
rust_task *task = rust_sched_loop::get_task();
LOG(task, stdlib, "debug_box(0x%" PRIxPTR ")", box);
debug_tydesc_helper(t);
LOG(task, stdlib, " refcount %" PRIdPTR,
@ -294,7 +294,7 @@ struct rust_tag {
extern "C" CDECL void
debug_tag(type_desc *t, rust_tag *tag) {
rust_task *task = rust_task_thread::get_task();
rust_task *task = rust_sched_loop::get_task();
LOG(task, stdlib, "debug_tag");
debug_tydesc_helper(t);
@ -312,7 +312,7 @@ struct rust_fn {
extern "C" CDECL void
debug_fn(type_desc *t, rust_fn *fn) {
rust_task *task = rust_task_thread::get_task();
rust_task *task = rust_sched_loop::get_task();
LOG(task, stdlib, "debug_fn");
debug_tydesc_helper(t);
LOG(task, stdlib, " thunk at 0x%" PRIxPTR, fn->thunk);
@ -326,7 +326,7 @@ extern "C" CDECL void *
debug_ptrcast(type_desc *from_ty,
type_desc *to_ty,
void *ptr) {
rust_task *task = rust_task_thread::get_task();
rust_task *task = rust_sched_loop::get_task();
LOG(task, stdlib, "debug_ptrcast from");
debug_tydesc_helper(from_ty);
LOG(task, stdlib, "to");
@ -336,13 +336,13 @@ debug_ptrcast(type_desc *from_ty,
extern "C" CDECL void *
debug_get_stk_seg() {
rust_task *task = rust_task_thread::get_task();
rust_task *task = rust_sched_loop::get_task();
return task->stk;
}
extern "C" CDECL rust_vec*
rust_list_files(rust_str *path) {
rust_task *task = rust_task_thread::get_task();
rust_task *task = rust_sched_loop::get_task();
array_list<rust_str*> strings;
#if defined(__WIN32__)
WIN32_FIND_DATA FindFileData;
@ -443,21 +443,21 @@ precise_time_ns(uint64_t *ns) {
extern "C" CDECL rust_sched_id
rust_get_sched_id() {
rust_task *task = rust_task_thread::get_task();
rust_task *task = rust_sched_loop::get_task();
return task->sched->get_id();
}
extern "C" CDECL rust_sched_id
rust_new_sched(uintptr_t threads) {
rust_task *task = rust_task_thread::get_task();
A(task->thread, threads > 0,
rust_task *task = rust_sched_loop::get_task();
A(task->sched_loop, threads > 0,
"Can't create a scheduler with no threads, silly!");
return task->kernel->create_scheduler(threads);
}
extern "C" CDECL rust_task_id
get_task_id() {
rust_task *task = rust_task_thread::get_task();
rust_task *task = rust_sched_loop::get_task();
return task->id;
}
@ -468,13 +468,13 @@ new_task_common(rust_scheduler *sched, rust_task *parent) {
extern "C" CDECL rust_task*
new_task() {
rust_task *task = rust_task_thread::get_task();
rust_task *task = rust_sched_loop::get_task();
return new_task_common(task->sched, task);
}
extern "C" CDECL rust_task*
rust_new_task_in_sched(rust_sched_id id) {
rust_task *task = rust_task_thread::get_task();
rust_task *task = rust_sched_loop::get_task();
rust_scheduler *sched = task->kernel->get_scheduler_by_id(id);
// FIXME: What if we didn't get the scheduler?
return new_task_common(sched, task);
@ -487,7 +487,7 @@ rust_task_config_notify(rust_task *target, rust_port_id *port) {
extern "C" rust_task *
rust_get_task() {
return rust_task_thread::get_task();
return rust_sched_loop::get_task();
}
extern "C" CDECL void
@ -497,13 +497,13 @@ start_task(rust_task *target, fn_env_pair *f) {
extern "C" CDECL int
sched_threads() {
rust_task *task = rust_task_thread::get_task();
rust_task *task = rust_sched_loop::get_task();
return task->sched->number_of_threads();
}
extern "C" CDECL rust_port*
new_port(size_t unit_sz) {
rust_task *task = rust_task_thread::get_task();
rust_task *task = rust_sched_loop::get_task();
LOG(task, comm, "new_port(task=0x%" PRIxPTR " (%s), unit_sz=%d)",
(uintptr_t) task, task->name, unit_sz);
// port starts with refcount == 1
@ -512,7 +512,7 @@ new_port(size_t unit_sz) {
extern "C" CDECL void
rust_port_begin_detach(rust_port *port, uintptr_t *yield) {
rust_task *task = rust_task_thread::get_task();
rust_task *task = rust_sched_loop::get_task();
LOG(task, comm, "rust_port_detach(0x%" PRIxPTR ")", (uintptr_t) port);
port->begin_detach(yield);
}
@ -524,7 +524,7 @@ rust_port_end_detach(rust_port *port) {
extern "C" CDECL void
del_port(rust_port *port) {
rust_task *task = rust_task_thread::get_task();
rust_task *task = rust_sched_loop::get_task();
LOG(task, comm, "del_port(0x%" PRIxPTR ")", (uintptr_t) port);
delete port;
}
@ -542,7 +542,7 @@ get_port_id(rust_port *port) {
extern "C" CDECL uintptr_t
rust_port_id_send(type_desc *t, rust_port_id target_port_id, void *sptr) {
bool sent = false;
rust_task *task = rust_task_thread::get_task();
rust_task *task = rust_sched_loop::get_task();
LOG(task, comm, "rust_port_id*_send port: 0x%" PRIxPTR,
(uintptr_t) target_port_id);
@ -573,14 +573,14 @@ port_recv(uintptr_t *dptr, rust_port *port, uintptr_t *yield) {
extern "C" CDECL void
rust_port_select(rust_port **dptr, rust_port **ports,
size_t n_ports, uintptr_t *yield) {
rust_task *task = rust_task_thread::get_task();
rust_task *task = rust_sched_loop::get_task();
rust_port_selector *selector = task->get_port_selector();
selector->select(task, dptr, ports, n_ports, yield);
}
extern "C" CDECL void
rust_set_exit_status(intptr_t code) {
rust_task *task = rust_task_thread::get_task();
rust_task *task = rust_sched_loop::get_task();
task->kernel->set_exit_status((int)code);
}
@ -595,7 +595,7 @@ extern void log_console_off(rust_env *env);
extern "C" CDECL void
rust_log_console_off() {
rust_task *task = rust_task_thread::get_task();
rust_task *task = rust_sched_loop::get_task();
log_console_off(task->kernel->env);
}
@ -606,36 +606,36 @@ rust_dbg_lock_create() {
extern "C" CDECL void
rust_dbg_lock_destroy(lock_and_signal *lock) {
rust_task *task = rust_task_thread::get_task();
I(task->thread, lock);
rust_task *task = rust_sched_loop::get_task();
I(task->sched_loop, lock);
delete lock;
}
extern "C" CDECL void
rust_dbg_lock_lock(lock_and_signal *lock) {
rust_task *task = rust_task_thread::get_task();
I(task->thread, lock);
rust_task *task = rust_sched_loop::get_task();
I(task->sched_loop, lock);
lock->lock();
}
extern "C" CDECL void
rust_dbg_lock_unlock(lock_and_signal *lock) {
rust_task *task = rust_task_thread::get_task();
I(task->thread, lock);
rust_task *task = rust_sched_loop::get_task();
I(task->sched_loop, lock);
lock->unlock();
}
extern "C" CDECL void
rust_dbg_lock_wait(lock_and_signal *lock) {
rust_task *task = rust_task_thread::get_task();
I(task->thread, lock);
rust_task *task = rust_sched_loop::get_task();
I(task->sched_loop, lock);
lock->wait();
}
extern "C" CDECL void
rust_dbg_lock_signal(lock_and_signal *lock) {
rust_task *task = rust_task_thread::get_task();
I(task->thread, lock);
rust_task *task = rust_sched_loop::get_task();
I(task->sched_loop, lock);
lock->signal();
}

View file

@ -10,7 +10,7 @@
#include "sync/lock_and_signal.h"
#include "sync/lock_free_queue.h"
struct rust_task_thread;
struct rust_sched_loop;
struct rust_task;
class rust_log;
class rust_port;
@ -110,7 +110,7 @@ struct rust_cond { };
#include "rust_srv.h"
#include "rust_log.h"
#include "rust_kernel.h"
#include "rust_task_thread.h"
#include "rust_sched_loop.h"
typedef void CDECL (glue_fn)(void *, void *,
const type_desc **, void *);

View file

@ -17,6 +17,7 @@ rust_kernel::rust_kernel(rust_srv *srv) :
max_port_id(0),
rval(0),
max_sched_id(0),
sched_reaper(this),
env(srv->env)
{
}
@ -62,6 +63,9 @@ rust_kernel::create_scheduler(size_t num_threads) {
rust_scheduler *sched;
{
scoped_lock with(sched_lock);
// If this is the first scheduler then we need to launch
// the scheduler reaper.
bool start_reaper = sched_table.empty();
id = max_sched_id++;
K(srv, id != INTPTR_MAX, "Hit the maximum scheduler id");
sched = new (this, "rust_scheduler")
@ -70,6 +74,9 @@ rust_kernel::create_scheduler(size_t num_threads) {
.insert(std::pair<rust_sched_id,
rust_scheduler*>(id, sched)).second;
A(this, is_new, "Reusing a sched id?");
if (start_reaper) {
sched_reaper.start();
}
}
sched->start_task_threads();
return id;
@ -97,12 +104,12 @@ rust_kernel::release_scheduler_id(rust_sched_id id) {
}
/*
Called on the main thread to wait for the kernel to exit. This function is
also used to join on every terminating scheduler thread, so that we can be
sure they have completely exited before the process exits. If we don't join
them then we can see valgrind errors due to un-freed pthread memory.
Called by rust_sched_reaper to join every every terminating scheduler thread,
so that we can be sure they have completely exited before the process exits.
If we don't join them then we can see valgrind errors due to un-freed pthread
memory.
*/
int
void
rust_kernel::wait_for_schedulers()
{
scoped_lock with(sched_lock);
@ -121,6 +128,12 @@ rust_kernel::wait_for_schedulers()
sched_lock.wait();
}
}
}
/* Called on the main thread to wait for the kernel to exit */
int
rust_kernel::wait_for_exit() {
sched_reaper.join();
return rval;
}

View file

@ -6,6 +6,7 @@
#include <vector>
#include "memory_region.h"
#include "rust_log.h"
#include "rust_sched_reaper.h"
struct rust_task_thread;
class rust_scheduler;
@ -46,6 +47,8 @@ private:
// A list of scheduler ids that are ready to exit
std::vector<rust_sched_id> join_list;
rust_sched_reaper sched_reaper;
public:
struct rust_env *env;
@ -66,7 +69,8 @@ public:
rust_scheduler* get_scheduler_by_id(rust_sched_id id);
// Called by a scheduler to indicate that it is terminating
void release_scheduler_id(rust_sched_id id);
int wait_for_schedulers();
void wait_for_schedulers();
int wait_for_exit();
#ifdef __WIN32__
void win32_require(LPCTSTR fn, BOOL ok);

View file

@ -40,9 +40,9 @@ log_console_off(rust_env *env) {
}
}
rust_log::rust_log(rust_srv *srv, rust_task_thread *thread) :
rust_log::rust_log(rust_srv *srv, rust_sched_loop *sched_loop) :
_srv(srv),
_thread(thread) {
_sched_loop(sched_loop) {
}
rust_log::~rust_log() {
@ -122,12 +122,12 @@ rust_log::trace_ln(rust_task *task, uint32_t level, char *message) {
#endif
char prefix[BUF_BYTES] = "";
if (_thread && _thread->name) {
if (_sched_loop && _sched_loop-.name) {
append_string(prefix, "%04" PRIxPTR ":%.10s:",
thread_id, _thread->name);
thread_id, _sched_loop->name);
} else {
append_string(prefix, "%04" PRIxPTR ":0x%08" PRIxPTR ":",
thread_id, (uintptr_t) _thread);
thread_id, (uintptr_t) _sched_loop);
}
if (task) {
if (task->name) {

View file

@ -8,18 +8,18 @@ const uint32_t log_info = 2;
const uint32_t log_debug = 3;
#define LOG(task, field, ...) \
DLOG_LVL(log_debug, task, task->thread, field, __VA_ARGS__)
DLOG_LVL(log_debug, task, task->sched_loop, field, __VA_ARGS__)
#define LOG_ERR(task, field, ...) \
DLOG_LVL(log_err, task, task->thread, field, __VA_ARGS__)
#define DLOG(thread, field, ...) \
DLOG_LVL(log_debug, NULL, thread, field, __VA_ARGS__)
#define DLOG_ERR(thread, field, ...) \
DLOG_LVL(log_err, NULL, thread, field, __VA_ARGS__)
#define LOGPTR(thread, msg, ptrval) \
DLOG_LVL(log_debug, NULL, thread, mem, "%s 0x%" PRIxPTR, msg, ptrval)
#define DLOG_LVL(lvl, task, thread, field, ...) \
DLOG_LVL(log_err, task, task->sched_loop, field, __VA_ARGS__)
#define DLOG(sched_loop, field, ...) \
DLOG_LVL(log_debug, NULL, sched_loop, field, __VA_ARGS__)
#define DLOG_ERR(sched_loop, field, ...) \
DLOG_LVL(log_err, NULL, sched_loop, field, __VA_ARGS__)
#define LOGPTR(sched_loop, msg, ptrval) \
DLOG_LVL(log_debug, NULL, sched_loop, mem, "%s 0x%" PRIxPTR, msg, ptrval)
#define DLOG_LVL(lvl, task, sched_loop, field, ...) \
do { \
rust_task_thread* _d_ = thread; \
rust_sched_loop* _d_ = sched_loop; \
if (log_rt_##field >= lvl && _d_->log_lvl >= lvl) { \
_d_->log(task, lvl, __VA_ARGS__); \
} \
@ -34,13 +34,13 @@ const uint32_t log_debug = 3;
} \
} while (0)
struct rust_task_thread;
struct rust_sched_loop;
struct rust_task;
class rust_log {
public:
rust_log(rust_srv *srv, rust_task_thread *thread);
rust_log(rust_srv *srv, rust_sched_loop *sched_loop);
virtual ~rust_log();
void trace_ln(rust_task *task, uint32_t level, char *message);
@ -49,7 +49,7 @@ public:
private:
rust_srv *_srv;
rust_task_thread *_thread;
rust_sched_loop *_sched_loop;
bool _use_labels;
void trace_ln(rust_task *task, char *message);
};

View file

@ -51,7 +51,7 @@ void rust_port::end_detach() {
// Just take the lock to make sure that the thread that signaled
// the detach_cond isn't still holding it
scoped_lock with(ref_lock);
I(task->thread, ref_count == 0);
I(task->sched_loop, ref_count == 0);
}
void rust_port::send(void *sptr) {

View file

@ -10,12 +10,12 @@ rust_port_selector::select(rust_task *task, rust_port **dptr,
rust_port **ports,
size_t n_ports, uintptr_t *yield) {
I(task->thread, this->ports == NULL);
I(task->thread, this->n_ports == 0);
I(task->thread, dptr != NULL);
I(task->thread, ports != NULL);
I(task->thread, n_ports != 0);
I(task->thread, yield != NULL);
I(task->sched_loop, this->ports == NULL);
I(task->sched_loop, this->n_ports == 0);
I(task->sched_loop, dptr != NULL);
I(task->sched_loop, ports != NULL);
I(task->sched_loop, n_ports != 0);
I(task->sched_loop, yield != NULL);
*yield = false;
size_t locks_taken = 0;
@ -27,11 +27,11 @@ rust_port_selector::select(rust_task *task, rust_port **dptr,
// message.
// Start looking for ports from a different index each time.
size_t j = isaac_rand(&task->thread->rctx);
size_t j = isaac_rand(&task->sched_loop->rctx);
for (size_t i = 0; i < n_ports; i++) {
size_t k = (i + j) % n_ports;
rust_port *port = ports[k];
I(task->thread, port != NULL);
I(task->sched_loop, port != NULL);
port->lock.lock();
locks_taken++;
@ -46,7 +46,7 @@ rust_port_selector::select(rust_task *task, rust_port **dptr,
if (!found_msg) {
this->ports = ports;
this->n_ports = n_ports;
I(task->thread, task->rendezvous_ptr == NULL);
I(task->sched_loop, task->rendezvous_ptr == NULL);
task->rendezvous_ptr = (uintptr_t*)dptr;
task->block(this, "waiting for select rendezvous");
@ -69,6 +69,8 @@ void
rust_port_selector::msg_sent_on(rust_port *port) {
rust_task *task = port->task;
port->lock.must_not_have_lock();
// Prevent two ports from trying to wake up the task
// simultaneously
scoped_lock with(rendezvous_lock);

View file

@ -0,0 +1,46 @@
#include <assert.h>
#include "rust_internal.h"
#include "rust_sched_driver.h"
#include "rust_sched_loop.h"
rust_sched_driver::rust_sched_driver(rust_sched_loop *sched_loop)
: sched_loop(sched_loop),
signalled(false) {
assert(sched_loop != NULL);
sched_loop->on_pump_loop(this);
}
/**
* Starts the main scheduler loop which performs task scheduling for this
* domain.
*
* Returns once no more tasks can be scheduled and all task ref_counts
* drop to zero.
*/
void
rust_sched_driver::start_main_loop() {
assert(sched_loop != NULL);
rust_sched_loop_state state = sched_loop_state_keep_going;
while (state != sched_loop_state_exit) {
DLOG(sched_loop, dom, "pumping scheduler");
state = sched_loop->run_single_turn();
if (state == sched_loop_state_block) {
scoped_lock with(lock);
if (!signalled) {
DLOG(sched_loop, dom, "blocking scheduler");
lock.wait();
}
signalled = false;
}
}
}
void
rust_sched_driver::signal() {
scoped_lock with(lock);
signalled = true;
lock.signal();
}

View file

@ -0,0 +1,23 @@
#ifndef RUST_SCHED_DRIVER_H
#define RUST_SCHED_DRIVER_H
#include "sync/lock_and_signal.h"
#include "rust_signal.h"
struct rust_sched_loop;
class rust_sched_driver : public rust_signal {
private:
rust_sched_loop *sched_loop;
lock_and_signal lock;
bool signalled;
public:
rust_sched_driver(rust_sched_loop *sched_loop);
void start_main_loop();
virtual void signal();
};
#endif /* RUST_SCHED_DRIVER_H */

View file

@ -0,0 +1,18 @@
#include "rust_sched_launcher.h"
#include "rust_scheduler.h"
const size_t SCHED_STACK_SIZE = 1024*100;
rust_sched_launcher::rust_sched_launcher(rust_scheduler *sched,
rust_srv *srv, int id)
: kernel(sched->kernel),
sched_loop(sched, srv, id),
driver(&sched_loop) {
}
rust_thread_sched_launcher::rust_thread_sched_launcher(rust_scheduler *sched,
rust_srv *srv, int id)
: rust_sched_launcher(sched, srv, id),
rust_thread(SCHED_STACK_SIZE) {
}

View file

@ -0,0 +1,37 @@
#ifndef RUST_SCHED_LAUNCHER_H
#define RUST_SCHED_LAUNCHER_H
#include "rust_internal.h"
#include "sync/rust_thread.h"
#include "rust_sched_driver.h"
class rust_sched_launcher : public kernel_owned<rust_sched_launcher> {
public:
rust_kernel *kernel;
private:
rust_sched_loop sched_loop;
protected:
rust_sched_driver driver;
public:
rust_sched_launcher(rust_scheduler *sched, rust_srv *srv, int id);
virtual void start() = 0;
virtual void run() = 0;
virtual void join() = 0;
rust_sched_loop *get_loop() { return &sched_loop; }
};
class rust_thread_sched_launcher
:public rust_sched_launcher,
private rust_thread {
public:
rust_thread_sched_launcher(rust_scheduler *sched, rust_srv *srv, int id);
virtual void start() { rust_thread::start(); }
virtual void run() { driver.start_main_loop(); }
virtual void join() { rust_thread::join(); }
};
#endif // RUST_SCHED_LAUNCHER_H

View file

@ -8,25 +8,24 @@
#include "rust_scheduler.h"
#ifndef _WIN32
pthread_key_t rust_task_thread::task_key;
pthread_key_t rust_sched_loop::task_key;
#else
DWORD rust_task_thread::task_key;
DWORD rust_sched_loop::task_key;
#endif
const size_t SCHED_STACK_SIZE = 1024*100;
const size_t C_STACK_SIZE = 1024*1024;
bool rust_task_thread::tls_initialized = false;
bool rust_sched_loop::tls_initialized = false;
rust_task_thread::rust_task_thread(rust_scheduler *sched,
rust_sched_loop::rust_sched_loop(rust_scheduler *sched,
rust_srv *srv,
int id) :
rust_thread(SCHED_STACK_SIZE),
_log(srv, this),
id(id),
should_exit(false),
cached_c_stack(NULL),
dead_task(NULL),
pump_signal(NULL),
kernel(sched->kernel),
sched(sched),
srv(srv),
@ -44,7 +43,8 @@ rust_task_thread::rust_task_thread(rust_scheduler *sched,
}
void
rust_task_thread::activate(rust_task *task) {
rust_sched_loop::activate(rust_task *task) {
lock.must_have_lock();
task->ctx.next = &c_context;
DLOG(this, task, "descheduling...");
lock.unlock();
@ -57,7 +57,7 @@ rust_task_thread::activate(rust_task *task) {
}
void
rust_task_thread::log(rust_task* task, uint32_t level, char const *fmt, ...) {
rust_sched_loop::log(rust_task* task, uint32_t level, char const *fmt, ...) {
char buf[BUF_BYTES];
va_list args;
va_start(args, fmt);
@ -67,14 +67,14 @@ rust_task_thread::log(rust_task* task, uint32_t level, char const *fmt, ...) {
}
void
rust_task_thread::fail() {
rust_sched_loop::fail() {
log(NULL, log_err, "domain %s @0x%" PRIxPTR " root task failed",
name, this);
kernel->fail();
}
void
rust_task_thread::kill_all_tasks() {
rust_sched_loop::kill_all_tasks() {
std::vector<rust_task*> all_tasks;
{
@ -100,7 +100,7 @@ rust_task_thread::kill_all_tasks() {
}
size_t
rust_task_thread::number_of_live_tasks() {
rust_sched_loop::number_of_live_tasks() {
return running_tasks.length() + blocked_tasks.length();
}
@ -108,7 +108,9 @@ rust_task_thread::number_of_live_tasks() {
* Delete any dead tasks.
*/
void
rust_task_thread::reap_dead_tasks() {
rust_sched_loop::reap_dead_tasks() {
lock.must_have_lock();
if (dead_task == NULL) {
return;
}
@ -126,7 +128,7 @@ rust_task_thread::reap_dead_tasks() {
}
void
rust_task_thread::release_task(rust_task *task) {
rust_sched_loop::release_task(rust_task *task) {
// Nobody should have a ref to the task at this point
I(this, task->get_ref_count() == 0);
// Now delete the task, which will require using this thread's
@ -145,7 +147,8 @@ rust_task_thread::release_task(rust_task *task) {
* Returns NULL if no tasks can be scheduled.
*/
rust_task *
rust_task_thread::schedule_task() {
rust_sched_loop::schedule_task() {
lock.must_have_lock();
I(this, this);
// FIXME: in the face of failing tasks, this is not always right.
// I(this, n_live_tasks() > 0);
@ -161,7 +164,7 @@ rust_task_thread::schedule_task() {
}
void
rust_task_thread::log_state() {
rust_sched_loop::log_state() {
if (log_rt_task < log_debug) return;
if (!running_tasks.is_empty()) {
@ -184,20 +187,31 @@ rust_task_thread::log_state() {
}
}
}
/**
* Starts the main scheduler loop which performs task scheduling for this
* domain.
*
* Returns once no more tasks can be scheduled and all task ref_counts
* drop to zero.
*/
void
rust_task_thread::start_main_loop() {
rust_sched_loop::on_pump_loop(rust_signal *signal) {
I(this, pump_signal == NULL);
I(this, signal != NULL);
pump_signal = signal;
}
void
rust_sched_loop::pump_loop() {
I(this, pump_signal != NULL);
pump_signal->signal();
}
rust_sched_loop_state
rust_sched_loop::run_single_turn() {
DLOG(this, task,
"scheduler %d resuming ...", id);
lock.lock();
DLOG(this, dom, "started domain loop %d", id);
if (!should_exit) {
A(this, dead_task == NULL,
"Tasks should only die after running");
while (!should_exit) {
DLOG(this, dom, "worker %d, number_of_live_tasks = %d",
id, number_of_live_tasks());
@ -208,12 +222,9 @@ rust_task_thread::start_main_loop() {
DLOG(this, task,
"all tasks are blocked, scheduler id %d yielding ...",
id);
lock.wait();
A(this, dead_task == NULL,
"Tasks should only die after running");
DLOG(this, task,
"scheduler %d resuming ...", id);
continue;
lock.unlock();
return sched_loop_state_block;
}
I(this, scheduled_task->running());
@ -241,25 +252,31 @@ rust_task_thread::start_main_loop() {
id);
reap_dead_tasks();
}
A(this, running_tasks.is_empty(), "Should have no running tasks");
A(this, blocked_tasks.is_empty(), "Should have no blocked tasks");
A(this, dead_task == NULL, "Should have no dead tasks");
lock.unlock();
return sched_loop_state_keep_going;
} else {
A(this, running_tasks.is_empty(), "Should have no running tasks");
A(this, blocked_tasks.is_empty(), "Should have no blocked tasks");
A(this, dead_task == NULL, "Should have no dead tasks");
DLOG(this, dom, "finished main-loop %d", id);
DLOG(this, dom, "finished main-loop %d", id);
lock.unlock();
lock.unlock();
I(this, !extra_c_stack);
if (cached_c_stack) {
destroy_stack(kernel->region(), cached_c_stack);
cached_c_stack = NULL;
I(this, !extra_c_stack);
if (cached_c_stack) {
destroy_stack(kernel->region(), cached_c_stack);
cached_c_stack = NULL;
}
sched->release_task_thread();
return sched_loop_state_exit;
}
}
rust_task *
rust_task_thread::create_task(rust_task *spawner, const char *name) {
rust_sched_loop::create_task(rust_task *spawner, const char *name) {
rust_task *task =
new (this->kernel, "rust_task")
rust_task (this, task_state_newborn,
@ -272,7 +289,7 @@ rust_task_thread::create_task(rust_task *spawner, const char *name) {
}
rust_task_list *
rust_task_thread::state_list(rust_task_state state) {
rust_sched_loop::state_list(rust_task_state state) {
switch (state) {
case task_state_running:
return &running_tasks;
@ -284,7 +301,7 @@ rust_task_thread::state_list(rust_task_state state) {
}
const char *
rust_task_thread::state_name(rust_task_state state) {
rust_sched_loop::state_name(rust_task_state state) {
switch (state) {
case task_state_newborn:
return "newborn";
@ -301,7 +318,7 @@ rust_task_thread::state_name(rust_task_state state) {
}
void
rust_task_thread::transition(rust_task *task,
rust_sched_loop::transition(rust_task *task,
rust_task_state src, rust_task_state dst,
rust_cond *cond, const char* cond_name) {
scoped_lock with(lock);
@ -324,38 +341,33 @@ rust_task_thread::transition(rust_task *task,
}
task->set_state(dst, cond, cond_name);
lock.signal();
}
void rust_task_thread::run() {
this->start_main_loop();
sched->release_task_thread();
pump_loop();
}
#ifndef _WIN32
void
rust_task_thread::init_tls() {
rust_sched_loop::init_tls() {
int result = pthread_key_create(&task_key, NULL);
assert(!result && "Couldn't create the TLS key!");
tls_initialized = true;
}
void
rust_task_thread::place_task_in_tls(rust_task *task) {
rust_sched_loop::place_task_in_tls(rust_task *task) {
int result = pthread_setspecific(task_key, task);
assert(!result && "Couldn't place the task in TLS!");
task->record_stack_limit();
}
#else
void
rust_task_thread::init_tls() {
rust_sched_loop::init_tls() {
task_key = TlsAlloc();
assert(task_key != TLS_OUT_OF_INDEXES && "Couldn't create the TLS key!");
tls_initialized = true;
}
void
rust_task_thread::place_task_in_tls(rust_task *task) {
rust_sched_loop::place_task_in_tls(rust_task *task) {
BOOL result = TlsSetValue(task_key, task);
assert(result && "Couldn't place the task in TLS!");
task->record_stack_limit();
@ -363,10 +375,11 @@ rust_task_thread::place_task_in_tls(rust_task *task) {
#endif
void
rust_task_thread::exit() {
rust_sched_loop::exit() {
scoped_lock with(lock);
DLOG(this, dom, "Requesting exit for thread %d", id);
should_exit = true;
lock.signal();
pump_loop();
}
// Before activating each task, make sure we have a C stack available.
@ -374,7 +387,7 @@ rust_task_thread::exit() {
// stack), because once we're on the Rust stack we won't have enough
// room to do the allocation
void
rust_task_thread::prepare_c_stack(rust_task *task) {
rust_sched_loop::prepare_c_stack(rust_task *task) {
I(this, !extra_c_stack);
if (!cached_c_stack && !task->have_c_stack()) {
cached_c_stack = create_stack(kernel->region(), C_STACK_SIZE);
@ -382,7 +395,7 @@ rust_task_thread::prepare_c_stack(rust_task *task) {
}
void
rust_task_thread::unprepare_c_stack() {
rust_sched_loop::unprepare_c_stack() {
if (extra_c_stack) {
destroy_stack(kernel->region(), extra_c_stack);
extra_c_stack = NULL;

View file

@ -1,17 +1,11 @@
#ifndef RUST_TASK_THREAD_H
#define RUST_TASK_THREAD_H
#ifndef RUST_SCHED_LOOP_H
#define RUST_SCHED_LOOP_H
#include "rust_internal.h"
#include "sync/rust_thread.h"
#include "rust_stack.h"
#include "rust_signal.h"
#include "context.h"
#ifndef _WIN32
#include <pthread.h>
#else
#include <windows.h>
#endif
enum rust_task_state {
task_state_newborn,
task_state_running,
@ -19,10 +13,21 @@ enum rust_task_state {
task_state_dead
};
/*
The result of every turn of the scheduler loop. Instructs the loop
driver how to proceed.
*/
enum rust_sched_loop_state {
sched_loop_state_keep_going,
sched_loop_state_block,
sched_loop_state_exit
};
struct rust_task;
typedef indexed_list<rust_task> rust_task_list;
struct rust_task_thread : public kernel_owned<rust_task_thread>,
rust_thread
struct rust_sched_loop
{
private:
@ -51,12 +56,16 @@ private:
rust_task_list blocked_tasks;
rust_task *dead_task;
rust_signal *pump_signal;
void prepare_c_stack(rust_task *task);
void unprepare_c_stack();
rust_task_list *state_list(rust_task_state state);
const char *state_name(rust_task_state state);
void pump_loop();
public:
rust_kernel *kernel;
rust_scheduler *sched;
@ -75,12 +84,13 @@ public:
randctx rctx;
// FIXME: Neither of these are used
int32_t list_index;
const char *const name;
// Only a pointer to 'name' is kept, so it must live as long as this
// domain.
rust_task_thread(rust_scheduler *sched, rust_srv *srv, int id);
rust_sched_loop(rust_scheduler *sched, rust_srv *srv, int id);
void activate(rust_task *task);
void log(rust_task *task, uint32_t level, char const *fmt, ...);
rust_log & get_log();
@ -91,7 +101,8 @@ public:
void reap_dead_tasks();
rust_task *schedule_task();
void start_main_loop();
void on_pump_loop(rust_signal *signal);
rust_sched_loop_state run_single_turn();
void log_state();
@ -103,8 +114,6 @@ public:
rust_task_state src, rust_task_state dst,
rust_cond *cond, const char* cond_name);
virtual void run();
void init_tls();
void place_task_in_tls(rust_task *task);
@ -122,7 +131,7 @@ public:
};
inline rust_log &
rust_task_thread::get_log() {
rust_sched_loop::get_log() {
return _log;
}
@ -131,7 +140,7 @@ rust_task_thread::get_log() {
#ifndef __WIN32__
inline rust_task *
rust_task_thread::get_task() {
rust_sched_loop::get_task() {
if (!tls_initialized)
return NULL;
rust_task *task = reinterpret_cast<rust_task *>
@ -143,7 +152,7 @@ rust_task_thread::get_task() {
#else
inline rust_task *
rust_task_thread::get_task() {
rust_sched_loop::get_task() {
if (!tls_initialized)
return NULL;
rust_task *task = reinterpret_cast<rust_task *>(TlsGetValue(task_key));
@ -155,7 +164,7 @@ rust_task_thread::get_task() {
// NB: Runs on the Rust stack
inline stk_seg *
rust_task_thread::borrow_c_stack() {
rust_sched_loop::borrow_c_stack() {
I(this, cached_c_stack);
stk_seg *your_stack;
if (extra_c_stack) {
@ -170,7 +179,7 @@ rust_task_thread::borrow_c_stack() {
// NB: Runs on the Rust stack
inline void
rust_task_thread::return_c_stack(stk_seg *stack) {
rust_sched_loop::return_c_stack(stk_seg *stack) {
I(this, !extra_c_stack);
if (!cached_c_stack) {
cached_c_stack = stack;
@ -191,4 +200,4 @@ rust_task_thread::return_c_stack(stk_seg *stack) {
// End:
//
#endif /* RUST_TASK_THREAD_H */
#endif /* RUST_SCHED_LOOP_H */

View file

@ -0,0 +1,15 @@
#include "rust_internal.h"
#include "rust_kernel.h"
#include "rust_sched_reaper.h"
// NB: We're using a very small stack here
const size_t STACK_SIZE = 1024*20;
rust_sched_reaper::rust_sched_reaper(rust_kernel *kernel)
: rust_thread(STACK_SIZE), kernel(kernel) {
}
void
rust_sched_reaper::run() {
kernel->wait_for_schedulers();
}

View file

@ -0,0 +1,17 @@
#ifndef RUST_SCHED_REAPER_H
#define RUST_SCHED_REAPER_H
#include "sync/rust_thread.h"
class rust_kernel;
/* Responsible for joining with rust_schedulers */
class rust_sched_reaper : public rust_thread {
private:
rust_kernel *kernel;
public:
rust_sched_reaper(rust_kernel *kernel);
virtual void run();
};
#endif /* RUST_SCHED_REAPER_H */

View file

@ -1,5 +1,6 @@
#include "rust_scheduler.h"
#include "rust_util.h"
#include "rust_sched_launcher.h"
rust_scheduler::rust_scheduler(rust_kernel *kernel,
rust_srv *srv,
@ -21,21 +22,21 @@ rust_scheduler::~rust_scheduler() {
destroy_task_threads();
}
rust_task_thread *
rust_sched_launcher *
rust_scheduler::create_task_thread(int id) {
rust_srv *srv = this->srv->clone();
rust_task_thread *thread =
new (kernel, "rust_task_thread") rust_task_thread(this, srv, id);
KLOG(kernel, kern, "created task thread: " PTR ", id: %d, index: %d",
thread, id, thread->list_index);
rust_sched_launcher *thread =
new (kernel, "rust_thread_sched_launcher")
rust_thread_sched_launcher(this, srv, id);
KLOG(kernel, kern, "created task thread: " PTR ", id: %d",
thread, id);
return thread;
}
void
rust_scheduler::destroy_task_thread(rust_task_thread *thread) {
KLOG(kernel, kern, "deleting task thread: " PTR ", name: %s, index: %d",
thread, thread->name, thread->list_index);
rust_srv *srv = thread->srv;
rust_scheduler::destroy_task_thread(rust_sched_launcher *thread) {
KLOG(kernel, kern, "deleting task thread: " PTR, thread);
rust_srv *srv = thread->get_loop()->srv;
delete thread;
delete srv;
}
@ -60,7 +61,7 @@ void
rust_scheduler::start_task_threads()
{
for(size_t i = 0; i < num_threads; ++i) {
rust_task_thread *thread = threads[i];
rust_sched_launcher *thread = threads[i];
thread->start();
}
}
@ -69,7 +70,7 @@ void
rust_scheduler::join_task_threads()
{
for(size_t i = 0; i < num_threads; ++i) {
rust_task_thread *thread = threads[i];
rust_sched_launcher *thread = threads[i];
thread->join();
}
}
@ -77,8 +78,8 @@ rust_scheduler::join_task_threads()
void
rust_scheduler::kill_all_tasks() {
for(size_t i = 0; i < num_threads; ++i) {
rust_task_thread *thread = threads[i];
thread->kill_all_tasks();
rust_sched_launcher *thread = threads[i];
thread->get_loop()->kill_all_tasks();
}
}
@ -92,8 +93,8 @@ rust_scheduler::create_task(rust_task *spawner, const char *name) {
if (cur_thread >= num_threads)
cur_thread = 0;
}
rust_task_thread *thread = threads[thread_no];
return thread->create_task(spawner, name);
rust_sched_launcher *thread = threads[thread_no];
return thread->get_loop()->create_task(spawner, name);
}
void
@ -118,7 +119,7 @@ rust_scheduler::exit() {
// scheduler will get destroyed, and our fields will cease to exist.
size_t current_num_threads = num_threads;
for(size_t i = 0; i < current_num_threads; ++i) {
threads[i]->exit();
threads[i]->get_loop()->exit();
}
}

View file

@ -3,6 +3,8 @@
#include "rust_internal.h"
class rust_sched_launcher;
class rust_scheduler : public kernel_owned<rust_scheduler> {
// FIXME: Make these private
public:
@ -17,7 +19,7 @@ private:
// When this hits zero we'll tell the threads to exit
uintptr_t live_tasks;
array_list<rust_task_thread *> threads;
array_list<rust_sched_launcher *> threads;
const size_t num_threads;
size_t cur_thread;
@ -26,8 +28,8 @@ private:
void create_task_threads();
void destroy_task_threads();
rust_task_thread *create_task_thread(int id);
void destroy_task_thread(rust_task_thread *thread);
rust_sched_launcher *create_task_thread(int id);
void destroy_task_thread(rust_sched_launcher *thread);
void exit();

View file

@ -552,7 +552,7 @@ extern "C" void
shape_cmp_type(int8_t *result, const type_desc *tydesc,
const type_desc **subtydescs, uint8_t *data_0,
uint8_t *data_1, uint8_t cmp_type) {
rust_task *task = rust_task_thread::get_task();
rust_task *task = rust_sched_loop::get_task();
shape::arena arena;
// FIXME: This may well be broken when comparing two closures or objects
@ -573,7 +573,7 @@ shape_cmp_type(int8_t *result, const type_desc *tydesc,
extern "C" rust_str *
shape_log_str(const type_desc *tydesc, uint8_t *data) {
rust_task *task = rust_task_thread::get_task();
rust_task *task = rust_sched_loop::get_task();
shape::arena arena;
shape::type_param *params =
@ -591,7 +591,7 @@ shape_log_str(const type_desc *tydesc, uint8_t *data) {
extern "C" void
shape_log_type(const type_desc *tydesc, uint8_t *data, uint32_t level) {
rust_task *task = rust_task_thread::get_task();
rust_task *task = rust_sched_loop::get_task();
shape::arena arena;
shape::type_param *params =
@ -603,6 +603,6 @@ shape_log_type(const type_desc *tydesc, uint8_t *data, uint32_t level) {
log.walk();
task->thread->log(task, level, "%s", ss.str().c_str());
task->sched_loop->log(task, level, "%s", ss.str().c_str());
}

10
src/rt/rust_signal.h Normal file
View file

@ -0,0 +1,10 @@
#ifndef RUST_SIGNAL_H
#define RUST_SIGNAL_H
// Just an abstrict class that reperesents something that can be signalled
class rust_signal {
public:
virtual void signal() = 0;
};
#endif /* RUST_SIGNAL_H */

View file

@ -13,7 +13,7 @@
#include "rust_upcall.h"
// Tasks
rust_task::rust_task(rust_task_thread *thread, rust_task_state state,
rust_task::rust_task(rust_sched_loop *sched_loop, rust_task_state state,
rust_task *spawner, const char *name,
size_t init_stack_sz) :
ref_count(1),
@ -21,13 +21,13 @@ rust_task::rust_task(rust_task_thread *thread, rust_task_state state,
notify_enabled(false),
stk(NULL),
runtime_sp(0),
sched(thread->sched),
thread(thread),
kernel(thread->kernel),
sched(sched_loop->sched),
sched_loop(sched_loop),
kernel(sched_loop->kernel),
name(name),
list_index(-1),
rendezvous_ptr(0),
local_region(&thread->srv->local_region),
local_region(&sched_loop->srv->local_region),
boxed(&local_region),
unwinding(false),
propagate_failure(true),
@ -43,8 +43,8 @@ rust_task::rust_task(rust_task_thread *thread, rust_task_state state,
next_rust_sp(0),
supervisor(spawner)
{
LOGPTR(thread, "new task", (uintptr_t)this);
DLOG(thread, task, "sizeof(task) = %d (0x%x)",
LOGPTR(sched_loop, "new task", (uintptr_t)this);
DLOG(sched_loop, task, "sizeof(task) = %d (0x%x)",
sizeof *this, sizeof *this);
new_stack(init_stack_sz);
@ -57,7 +57,7 @@ rust_task::rust_task(rust_task_thread *thread, rust_task_state state,
void
rust_task::delete_this()
{
DLOG(thread, task, "~rust_task %s @0x%" PRIxPTR ", refcnt=%d",
DLOG(sched_loop, task, "~rust_task %s @0x%" PRIxPTR ", refcnt=%d",
name, (uintptr_t)this, ref_count);
// FIXME: We should do this when the task exits, not in the destructor
@ -70,10 +70,10 @@ rust_task::delete_this()
/* FIXME: tighten this up, there are some more
assertions that hold at task-lifecycle events. */
I(thread, ref_count == 0); // ||
I(sched_loop, ref_count == 0); // ||
// (ref_count == 1 && this == sched->root_task));
thread->release_task(this);
sched_loop->release_task(this);
}
struct spawn_args {
@ -125,7 +125,7 @@ cleanup_task(cleanup_args *args) {
#ifndef __WIN32__
task->conclude_failure();
#else
A(task->thread, false, "Shouldn't happen");
A(task->sched_loop, false, "Shouldn't happen");
#endif
}
}
@ -141,7 +141,7 @@ void task_start_wrapper(spawn_args *a)
// must have void return type, we can safely pass 0.
a->f(0, a->envptr, a->argptr);
} catch (rust_task *ex) {
A(task->thread, ex == task,
A(task->sched_loop, ex == task,
"Expected this task to be thrown for unwinding");
threw_exception = true;
@ -155,7 +155,7 @@ void task_start_wrapper(spawn_args *a)
}
// We should have returned any C stack by now
I(task->thread, task->c_stack == NULL);
I(task->sched_loop, task->c_stack == NULL);
rust_opaque_box* env = a->envptr;
if(env) {
@ -181,7 +181,7 @@ rust_task::start(spawn_fn spawnee_fn,
" with env 0x%" PRIxPTR " and arg 0x%" PRIxPTR,
spawnee_fn, envptr, argptr);
I(thread, stk->data != NULL);
I(sched_loop, stk->data != NULL);
char *sp = (char *)stk->end;
@ -212,6 +212,7 @@ rust_task::must_fail_from_being_killed() {
bool
rust_task::must_fail_from_being_killed_unlocked() {
kill_lock.must_have_lock();
return killed && !reentered_rust_stack;
}
@ -219,7 +220,7 @@ rust_task::must_fail_from_being_killed_unlocked() {
void
rust_task::yield(bool *killed) {
if (must_fail_from_being_killed()) {
I(thread, !blocked());
I(sched_loop, !blocked());
*killed = true;
}
@ -263,7 +264,7 @@ bool rust_task_is_unwinding(rust_task *rt) {
void
rust_task::fail() {
// See note in ::kill() regarding who should call this.
DLOG(thread, task, "task %s @0x%" PRIxPTR " failing", name, this);
DLOG(sched_loop, task, "task %s @0x%" PRIxPTR " failing", name, this);
backtrace();
unwinding = true;
#ifndef __WIN32__
@ -272,7 +273,7 @@ rust_task::fail() {
die();
conclude_failure();
// FIXME: Need unwinding on windows. This will end up aborting
thread->fail();
sched_loop->fail();
#endif
}
@ -285,14 +286,14 @@ void
rust_task::fail_parent() {
scoped_lock with(supervisor_lock);
if (supervisor) {
DLOG(thread, task,
DLOG(sched_loop, task,
"task %s @0x%" PRIxPTR
" propagating failure to supervisor %s @0x%" PRIxPTR,
name, this, supervisor->name, supervisor);
supervisor->kill();
}
if (NULL == supervisor && propagate_failure)
thread->fail();
sched_loop->fail();
}
void
@ -300,7 +301,7 @@ rust_task::unsupervise()
{
scoped_lock with(supervisor_lock);
if (supervisor) {
DLOG(thread, task,
DLOG(sched_loop, task,
"task %s @0x%" PRIxPTR
" disconnecting from supervisor %s @0x%" PRIxPTR,
name, this, supervisor->name, supervisor);
@ -365,7 +366,7 @@ rust_task::free(void *p)
void
rust_task::transition(rust_task_state src, rust_task_state dst,
rust_cond *cond, const char* cond_name) {
thread->transition(this, src, dst, cond, cond_name);
sched_loop->transition(this, src, dst, cond, cond_name);
}
void
@ -388,8 +389,8 @@ rust_task::block(rust_cond *on, const char* name) {
LOG(this, task, "Blocking on 0x%" PRIxPTR ", cond: 0x%" PRIxPTR,
(uintptr_t) on, (uintptr_t) cond);
A(thread, cond == NULL, "Cannot block an already blocked task.");
A(thread, on != NULL, "Cannot block on a NULL object.");
A(sched_loop, cond == NULL, "Cannot block an already blocked task.");
A(sched_loop, on != NULL, "Cannot block on a NULL object.");
transition(task_state_running, task_state_blocked, on, name);
@ -398,10 +399,10 @@ rust_task::block(rust_cond *on, const char* name) {
void
rust_task::wakeup(rust_cond *from) {
A(thread, cond != NULL, "Cannot wake up unblocked task.");
A(sched_loop, cond != NULL, "Cannot wake up unblocked task.");
LOG(this, task, "Blocked on 0x%" PRIxPTR " woken up on 0x%" PRIxPTR,
(uintptr_t) cond, (uintptr_t) from);
A(thread, cond == from,
A(sched_loop, cond == from,
"Cannot wake up blocked task on wrong condition.");
transition(task_state_blocked, task_state_running, NULL, "none");
@ -461,13 +462,13 @@ rust_task::get_next_stack_size(size_t min, size_t current, size_t requested) {
sz = std::max(sz, next);
LOG(this, mem, "next stack size: %" PRIdPTR, sz);
I(thread, requested <= sz);
I(sched_loop, requested <= sz);
return sz;
}
void
rust_task::free_stack(stk_seg *stk) {
LOGPTR(thread, "freeing stk segment", (uintptr_t)stk);
LOGPTR(sched_loop, "freeing stk segment", (uintptr_t)stk);
total_stack_sz -= user_stack_size(stk);
destroy_stack(&local_region, stk);
}
@ -485,7 +486,7 @@ rust_task::new_stack(size_t requested_sz) {
}
// The minimum stack size, in bytes, of a Rust stack, excluding red zone
size_t min_sz = thread->min_stack_size;
size_t min_sz = sched_loop->min_stack_size;
// Try to reuse an existing stack segment
while (stk != NULL && stk->next != NULL) {
@ -514,21 +515,21 @@ rust_task::new_stack(size_t requested_sz) {
size_t rust_stk_sz = get_next_stack_size(min_sz,
current_sz, requested_sz);
if (total_stack_sz + rust_stk_sz > thread->env->max_stack_size) {
if (total_stack_sz + rust_stk_sz > sched_loop->env->max_stack_size) {
LOG_ERR(this, task, "task %" PRIxPTR " ran out of stack", this);
fail();
}
size_t sz = rust_stk_sz + RED_ZONE_SIZE;
stk_seg *new_stk = create_stack(&local_region, sz);
LOGPTR(thread, "new stk", (uintptr_t)new_stk);
LOGPTR(sched_loop, "new stk", (uintptr_t)new_stk);
new_stk->task = this;
new_stk->next = NULL;
new_stk->prev = stk;
if (stk) {
stk->next = new_stk;
}
LOGPTR(thread, "stk end", new_stk->end);
LOGPTR(sched_loop, "stk end", new_stk->end);
stk = new_stk;
total_stack_sz += user_stack_size(new_stk);
@ -538,7 +539,7 @@ void
rust_task::cleanup_after_turn() {
// Delete any spare stack segments that were left
// behind by calls to prev_stack
I(thread, stk);
I(sched_loop, stk);
while (stk->next) {
stk_seg *new_next = stk->next->next;
free_stack(stk->next);
@ -568,7 +569,7 @@ reset_stack_limit_on_c_stack(reset_args *args) {
uintptr_t sp = args->sp;
while (!sp_in_stk_seg(sp, task->stk)) {
task->stk = task->stk->prev;
A(task->thread, task->stk != NULL,
A(task->sched_loop, task->stk != NULL,
"Failed to find the current stack");
}
task->record_stack_limit();
@ -597,10 +598,10 @@ rust_task::check_stack_canary() {
void
rust_task::delete_all_stacks() {
I(thread, !on_rust_stack());
I(sched_loop, !on_rust_stack());
// Delete all the stacks. There may be more than one if the task failed
// and no landing pads stopped to clean up.
I(thread, stk->next == NULL);
I(sched_loop, stk->next == NULL);
while (stk != NULL) {
stk_seg *prev = stk->prev;
free_stack(stk);

View file

@ -112,7 +112,7 @@ rust_task : public kernel_owned<rust_task>, rust_cond
stk_seg *stk;
uintptr_t runtime_sp; // Runtime sp while task running.
rust_scheduler *sched;
rust_task_thread *thread;
rust_sched_loop *sched_loop;
// Fields known only to the runtime.
rust_kernel *kernel;
@ -191,7 +191,7 @@ private:
public:
// Only a pointer to 'name' is kept, so it must live as long as this task.
rust_task(rust_task_thread *thread,
rust_task(rust_sched_loop *sched_loop,
rust_task_state state,
rust_task *spawner,
const char *name,
@ -312,7 +312,7 @@ rust_task::call_on_c_stack(void *args, void *fn_ptr) {
bool borrowed_a_c_stack = false;
uintptr_t sp;
if (c_stack == NULL) {
c_stack = thread->borrow_c_stack();
c_stack = sched_loop->borrow_c_stack();
next_c_sp = align_down(c_stack->end);
sp = next_c_sp;
borrowed_a_c_stack = true;
@ -335,8 +335,8 @@ inline void
rust_task::call_on_rust_stack(void *args, void *fn_ptr) {
// Too expensive to check
// I(thread, !on_rust_stack());
A(thread, get_sp_limit() != 0, "Stack must be configured");
I(thread, next_rust_sp);
A(sched_loop, get_sp_limit() != 0, "Stack must be configured");
I(sched_loop, next_rust_sp);
bool had_reentered_rust_stack = reentered_rust_stack;
reentered_rust_stack = true;
@ -358,8 +358,8 @@ inline void
rust_task::return_c_stack() {
// Too expensive to check
// I(thread, on_rust_stack());
I(thread, c_stack != NULL);
thread->return_c_stack(c_stack);
I(sched_loop, c_stack != NULL);
sched_loop->return_c_stack(c_stack);
c_stack = NULL;
next_c_sp = 0;
}
@ -368,7 +368,7 @@ rust_task::return_c_stack() {
inline void *
rust_task::next_stack(size_t stk_sz, void *args_addr, size_t args_sz) {
new_stack_fast(stk_sz + args_sz);
A(thread, stk->end - (uintptr_t)stk->data >= stk_sz + args_sz,
A(sched_loop, stk->end - (uintptr_t)stk->data >= stk_sz + args_sz,
"Did not receive enough stack");
uint8_t *new_sp = (uint8_t*)stk->end;
// Push the function arguments to the new stack
@ -407,7 +407,7 @@ new_stack_slow(new_stack_args *args);
inline void
rust_task::new_stack_fast(size_t requested_sz) {
// The minimum stack size, in bytes, of a Rust stack, excluding red zone
size_t min_sz = thread->min_stack_size;
size_t min_sz = sched_loop->min_stack_size;
// Try to reuse an existing stack segment
if (stk != NULL && stk->next != NULL) {
@ -438,8 +438,8 @@ record_sp_limit(void *limit);
inline void
rust_task::record_stack_limit() {
I(thread, stk);
A(thread,
I(sched_loop, stk);
A(sched_loop,
(uintptr_t)stk->end - RED_ZONE_SIZE
- (uintptr_t)stk->data >= LIMIT_OFFSET,
"Stack size must be greater than LIMIT_OFFSET");

View file

@ -8,7 +8,7 @@
#include "rust_cc.h"
#include "rust_internal.h"
#include "rust_task_thread.h"
#include "rust_sched_loop.h"
#include "rust_unwind.h"
#include "rust_upcall.h"
#include "rust_util.h"
@ -47,7 +47,7 @@ static void check_stack_alignment() { }
inline void
call_upcall_on_c_stack(void *args, void *fn_ptr) {
check_stack_alignment();
rust_task *task = rust_task_thread::get_task();
rust_task *task = rust_sched_loop::get_task();
task->call_on_c_stack(args, fn_ptr);
}
@ -62,7 +62,7 @@ extern "C" void record_sp_limit(void *limit);
*/
extern "C" CDECL void
upcall_call_shim_on_c_stack(void *args, void *fn_ptr) {
rust_task *task = rust_task_thread::get_task();
rust_task *task = rust_sched_loop::get_task();
// FIXME (1226) - The shim functions generated by rustc contain the
// morestack prologue, so we need to let them know they have enough
@ -85,7 +85,7 @@ upcall_call_shim_on_c_stack(void *args, void *fn_ptr) {
*/
extern "C" CDECL void
upcall_call_shim_on_rust_stack(void *args, void *fn_ptr) {
rust_task *task = rust_task_thread::get_task();
rust_task *task = rust_sched_loop::get_task();
// FIXME: Because of the hack in the other function that disables the
// stack limit when entering the C stack, here we restore the stack limit
@ -116,7 +116,7 @@ struct s_fail_args {
extern "C" CDECL void
upcall_s_fail(s_fail_args *args) {
rust_task *task = rust_task_thread::get_task();
rust_task *task = rust_sched_loop::get_task();
LOG_UPCALL_ENTRY(task);
LOG_ERR(task, upcall, "upcall fail '%s', %s:%" PRIdPTR,
args->expr, args->file, args->line);
@ -142,7 +142,7 @@ struct s_malloc_args {
extern "C" CDECL void
upcall_s_malloc(s_malloc_args *args) {
rust_task *task = rust_task_thread::get_task();
rust_task *task = rust_sched_loop::get_task();
LOG_UPCALL_ENTRY(task);
LOG(task, mem, "upcall malloc(0x%" PRIxPTR ")", args->td);
@ -179,11 +179,11 @@ struct s_free_args {
extern "C" CDECL void
upcall_s_free(s_free_args *args) {
rust_task *task = rust_task_thread::get_task();
rust_task *task = rust_sched_loop::get_task();
LOG_UPCALL_ENTRY(task);
rust_task_thread *thread = task->thread;
DLOG(thread, mem,
rust_sched_loop *sched_loop = task->sched_loop;
DLOG(sched_loop, mem,
"upcall free(0x%" PRIxPTR ", is_gc=%" PRIdPTR ")",
(uintptr_t)args->ptr);
@ -225,7 +225,7 @@ struct s_shared_malloc_args {
extern "C" CDECL void
upcall_s_shared_malloc(s_shared_malloc_args *args) {
rust_task *task = rust_task_thread::get_task();
rust_task *task = rust_sched_loop::get_task();
LOG_UPCALL_ENTRY(task);
LOG(task, mem, "upcall shared_malloc(%" PRIdPTR ")", args->nbytes);
@ -253,11 +253,11 @@ struct s_shared_free_args {
extern "C" CDECL void
upcall_s_shared_free(s_shared_free_args *args) {
rust_task *task = rust_task_thread::get_task();
rust_task *task = rust_sched_loop::get_task();
LOG_UPCALL_ENTRY(task);
rust_task_thread *thread = task->thread;
DLOG(thread, mem,
rust_sched_loop *sched_loop = task->sched_loop;
DLOG(sched_loop, mem,
"upcall shared_free(0x%" PRIxPTR")",
(uintptr_t)args->ptr);
task->kernel->free(args->ptr);
@ -277,7 +277,7 @@ struct s_shared_realloc_args {
extern "C" CDECL void
upcall_s_shared_realloc(s_shared_realloc_args *args) {
rust_task *task = rust_task_thread::get_task();
rust_task *task = rust_sched_loop::get_task();
LOG_UPCALL_ENTRY(task);
args->retval = task->kernel->realloc(args->ptr, args->size);
}
@ -298,7 +298,7 @@ struct s_vec_grow_args {
extern "C" CDECL void
upcall_s_vec_grow(s_vec_grow_args *args) {
rust_task *task = rust_task_thread::get_task();
rust_task *task = rust_sched_loop::get_task();
LOG_UPCALL_ENTRY(task);
reserve_vec(task, args->vp, args->new_sz);
(*args->vp)->fill = args->new_sz;
@ -320,7 +320,7 @@ extern "C" CDECL void
upcall_s_str_concat(s_str_concat_args *args) {
rust_vec *lhs = args->lhs;
rust_vec *rhs = args->rhs;
rust_task *task = rust_task_thread::get_task();
rust_task *task = rust_sched_loop::get_task();
size_t fill = lhs->fill + rhs->fill - 1;
rust_vec* v = (rust_vec*)task->kernel->malloc(fill + sizeof(rust_vec),
"str_concat");
@ -377,7 +377,7 @@ upcall_rust_personality(int version,
s_rust_personality_args args = {(_Unwind_Reason_Code)0,
version, actions, exception_class,
ue_header, context};
rust_task *task = rust_task_thread::get_task();
rust_task *task = rust_sched_loop::get_task();
// The personality function is run on the stack of the
// last function that threw or landed, which is going
@ -463,7 +463,7 @@ upcall_del_stack() {
// needs to acquire the value of the stack pointer
extern "C" CDECL void
upcall_reset_stack_limit() {
rust_task *task = rust_task_thread::get_task();
rust_task *task = rust_sched_loop::get_task();
task->reset_stack_limit();
}

View file

@ -20,13 +20,13 @@ struct handle_data {
// helpers
static void*
current_kernel_malloc(size_t size, const char* tag) {
void* ptr = rust_task_thread::get_task()->kernel->malloc(size, tag);
void* ptr = rust_sched_loop::get_task()->kernel->malloc(size, tag);
return ptr;
}
static void
current_kernel_free(void* ptr) {
rust_task_thread::get_task()->kernel->free(ptr);
rust_sched_loop::get_task()->kernel->free(ptr);
}
static handle_data*

View file

@ -10,8 +10,15 @@
#include "lock_and_signal.h"
// FIXME: This is not a portable way of specifying an invalid pthread_t
#define INVALID_THREAD 0
#if defined(__WIN32__)
lock_and_signal::lock_and_signal()
#if defined(DEBUG_LOCKS)
: _holding_thread(INVALID_THREAD)
#endif
{
_event = CreateEvent(NULL, FALSE, FALSE, NULL);
@ -31,6 +38,9 @@ lock_and_signal::lock_and_signal()
#else
lock_and_signal::lock_and_signal()
#if defined(DEBUG_LOCKS)
: _holding_thread(INVALID_THREAD)
#endif
{
CHECKED(pthread_cond_init(&_cond, NULL));
CHECKED(pthread_mutex_init(&_mutex, NULL));
@ -48,14 +58,25 @@ lock_and_signal::~lock_and_signal() {
}
void lock_and_signal::lock() {
must_not_have_lock();
#if defined(__WIN32__)
EnterCriticalSection(&_cs);
#if defined(DEBUG_LOCKS)
_holding_thread = GetCurrentThreadId();
#endif
#else
CHECKED(pthread_mutex_lock(&_mutex));
#if defined(DEBUG_LOCKS)
_holding_thread = pthread_self();
#endif
#endif
}
void lock_and_signal::unlock() {
must_have_lock();
#if defined(DEBUG_LOCKS)
_holding_thread = INVALID_THREAD;
#endif
#if defined(__WIN32__)
LeaveCriticalSection(&_cs);
#else
@ -67,12 +88,24 @@ void lock_and_signal::unlock() {
* Wait indefinitely until condition is signaled.
*/
void lock_and_signal::wait() {
must_have_lock();
#if defined(DEBUG_LOCKS)
_holding_thread = INVALID_THREAD;
#endif
#if defined(__WIN32__)
LeaveCriticalSection(&_cs);
WaitForSingleObject(_event, INFINITE);
EnterCriticalSection(&_cs);
must_not_be_locked();
#if defined(DEBUG_LOCKS)
_holding_thread = GetCurrentThreadId();
#endif
#else
CHECKED(pthread_cond_wait(&_cond, &_mutex));
must_not_be_locked();
#if defined(DEBUG_LOCKS)
_holding_thread = pthread_self();
#endif
#endif
}
@ -87,6 +120,32 @@ void lock_and_signal::signal() {
#endif
}
#if defined(DEBUG_LOCKS)
bool lock_and_signal::lock_held_by_current_thread()
{
#if defined(__WIN32__)
return _holding_thread == GetCurrentThreadId();
#else
return pthread_equal(_holding_thread, pthread_self());
#endif
}
#endif
#if defined(DEBUG_LOCKS)
void lock_and_signal::must_have_lock() {
assert(lock_held_by_current_thread() && "must have lock");
}
void lock_and_signal::must_not_have_lock() {
assert(!lock_held_by_current_thread() && "must not have lock");
}
void lock_and_signal::must_not_be_locked() {
}
#else
void lock_and_signal::must_have_lock() { }
void lock_and_signal::must_not_have_lock() { }
void lock_and_signal::must_not_be_locked() { }
#endif
scoped_lock::scoped_lock(lock_and_signal &lock)
: lock(lock)
{

View file

@ -2,14 +2,30 @@
#ifndef LOCK_AND_SIGNAL_H
#define LOCK_AND_SIGNAL_H
#ifndef RUST_NDEBUG
#define DEBUG_LOCKS
#endif
class lock_and_signal {
#if defined(__WIN32__)
HANDLE _event;
CRITICAL_SECTION _cs;
#if defined(DEBUG_LOCKS)
DWORD _holding_thread;
#endif
#else
pthread_cond_t _cond;
pthread_mutex_t _mutex;
#if defined(DEBUG_LOCKS)
pthread_t _holding_thread;
#endif
#endif
#if defined(DEBUG_LOCKS)
bool lock_held_by_current_thread();
#endif
void must_not_be_locked();
public:
lock_and_signal();
@ -19,6 +35,9 @@ public:
void unlock();
void wait();
void signal();
void must_have_lock();
void must_not_have_lock();
};
class scoped_lock {

View file

@ -20,9 +20,7 @@ class rust_thread {
void start();
virtual void run() {
return;
}
virtual void run() = 0;
void join();
void detach();