Add support for using a jobserver with Rayon
This commit is contained in:
parent
350674b718
commit
892fed9d08
8 changed files with 172 additions and 32 deletions
|
@ -20,8 +20,8 @@ num_cpus = "1.0"
|
|||
scoped-tls = "1.0"
|
||||
log = { version = "0.4", features = ["release_max_level_info", "std"] }
|
||||
polonius-engine = "0.6.2"
|
||||
rustc-rayon = "0.1.1"
|
||||
rustc-rayon-core = "0.1.1"
|
||||
rustc-rayon = "0.1.2"
|
||||
rustc-rayon-core = "0.1.2"
|
||||
rustc_apfloat = { path = "../librustc_apfloat" }
|
||||
rustc_target = { path = "../librustc_target" }
|
||||
rustc_data_structures = { path = "../librustc_data_structures" }
|
||||
|
|
|
@ -34,7 +34,8 @@ use crate::util::profiling::SelfProfiler;
|
|||
|
||||
use rustc_target::spec::{PanicStrategy, RelroLevel, Target, TargetTriple};
|
||||
use rustc_data_structures::flock;
|
||||
use jobserver::Client;
|
||||
use rustc_data_structures::jobserver;
|
||||
use ::jobserver::Client;
|
||||
|
||||
use std;
|
||||
use std::cell::{self, Cell, RefCell};
|
||||
|
@ -1230,32 +1231,7 @@ pub fn build_session_(
|
|||
optimization_fuel,
|
||||
print_fuel_crate,
|
||||
print_fuel,
|
||||
// Note that this is unsafe because it may misinterpret file descriptors
|
||||
// on Unix as jobserver file descriptors. We hopefully execute this near
|
||||
// the beginning of the process though to ensure we don't get false
|
||||
// positives, or in other words we try to execute this before we open
|
||||
// any file descriptors ourselves.
|
||||
//
|
||||
// Pick a "reasonable maximum" if we don't otherwise have
|
||||
// a jobserver in our environment, capping out at 32 so we
|
||||
// don't take everything down by hogging the process run queue.
|
||||
// The fixed number is used to have deterministic compilation
|
||||
// across machines.
|
||||
//
|
||||
// Also note that we stick this in a global because there could be
|
||||
// multiple `Session` instances in this process, and the jobserver is
|
||||
// per-process.
|
||||
jobserver: unsafe {
|
||||
static mut GLOBAL_JOBSERVER: *mut Client = 0 as *mut _;
|
||||
static INIT: std::sync::Once = std::sync::ONCE_INIT;
|
||||
INIT.call_once(|| {
|
||||
let client = Client::from_env().unwrap_or_else(|| {
|
||||
Client::new(32).expect("failed to create jobserver")
|
||||
});
|
||||
GLOBAL_JOBSERVER = Box::into_raw(Box::new(client));
|
||||
});
|
||||
(*GLOBAL_JOBSERVER).clone()
|
||||
},
|
||||
jobserver: jobserver::client(),
|
||||
has_global_allocator: Once::new(),
|
||||
has_panic_handler: Once::new(),
|
||||
driver_lint_caps,
|
||||
|
|
|
@ -7,6 +7,7 @@ use std::{fmt, ptr};
|
|||
use rustc_data_structures::fx::FxHashSet;
|
||||
use rustc_data_structures::sync::{Lock, LockGuard, Lrc, Weak};
|
||||
use rustc_data_structures::OnDrop;
|
||||
use rustc_data_structures::jobserver;
|
||||
use syntax_pos::Span;
|
||||
|
||||
use crate::ty::tls;
|
||||
|
@ -198,7 +199,11 @@ impl<'tcx> QueryLatch<'tcx> {
|
|||
// we have to be in the `wait` call. This is ensured by the deadlock handler
|
||||
// getting the self.info lock.
|
||||
rayon_core::mark_blocked();
|
||||
jobserver::release_thread();
|
||||
waiter.condvar.wait(&mut info);
|
||||
// Release the lock before we potentially block in `acquire_thread`
|
||||
mem::drop(info);
|
||||
jobserver::acquire_thread();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -12,13 +12,15 @@ crate-type = ["dylib"]
|
|||
[dependencies]
|
||||
ena = "0.11"
|
||||
log = "0.4"
|
||||
jobserver_crate = { version = "0.1", package = "jobserver" }
|
||||
lazy_static = "1"
|
||||
rustc_cratesio_shim = { path = "../librustc_cratesio_shim" }
|
||||
serialize = { path = "../libserialize" }
|
||||
graphviz = { path = "../libgraphviz" }
|
||||
cfg-if = "0.1.2"
|
||||
stable_deref_trait = "1.0.0"
|
||||
rayon = { version = "0.1.1", package = "rustc-rayon" }
|
||||
rayon-core = { version = "0.1.1", package = "rustc-rayon-core" }
|
||||
rayon = { version = "0.1.2", package = "rustc-rayon" }
|
||||
rayon-core = { version = "0.1.2", package = "rustc-rayon-core" }
|
||||
rustc-hash = "1.0.1"
|
||||
smallvec = { version = "0.6.7", features = ["union", "may_dangle"] }
|
||||
|
||||
|
|
153
src/librustc_data_structures/jobserver.rs
Normal file
153
src/librustc_data_structures/jobserver.rs
Normal file
|
@ -0,0 +1,153 @@
|
|||
use jobserver_crate::{Client, HelperThread, Acquired};
|
||||
use lazy_static::lazy_static;
|
||||
use std::sync::{Condvar, Arc, Mutex};
|
||||
use std::mem;
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct LockedProxyData {
|
||||
/// The number of free thread tokens, this may include the implicit token given to the process
|
||||
free: usize,
|
||||
|
||||
/// The number of threads waiting for a token
|
||||
waiters: usize,
|
||||
|
||||
/// The number of tokens we requested from the server
|
||||
requested: usize,
|
||||
|
||||
/// Stored tokens which will be dropped when we no longer need them
|
||||
tokens: Vec<Acquired>,
|
||||
}
|
||||
|
||||
impl LockedProxyData {
|
||||
fn request_token(&mut self, thread: &Mutex<HelperThread>) {
|
||||
self.requested += 1;
|
||||
thread.lock().unwrap().request_token();
|
||||
}
|
||||
|
||||
fn release_token(&mut self, cond_var: &Condvar) {
|
||||
if self.waiters > 0 {
|
||||
self.free += 1;
|
||||
cond_var.notify_one();
|
||||
} else {
|
||||
if self.tokens.is_empty() {
|
||||
// We are returning the implicit token
|
||||
self.free += 1;
|
||||
} else {
|
||||
// Return a real token to the server
|
||||
self.tokens.pop().unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn take_token(&mut self, thread: &Mutex<HelperThread>) -> bool {
|
||||
if self.free > 0 {
|
||||
self.free -= 1;
|
||||
self.waiters -= 1;
|
||||
|
||||
// We stole some token reqested by someone else
|
||||
// Request another one
|
||||
if self.requested + self.free < self.waiters {
|
||||
self.request_token(thread);
|
||||
}
|
||||
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
fn new_requested_token(&mut self, token: Acquired, cond_var: &Condvar) {
|
||||
self.requested -= 1;
|
||||
|
||||
// Does anything need this token?
|
||||
if self.waiters > 0 {
|
||||
self.free += 1;
|
||||
self.tokens.push(token);
|
||||
cond_var.notify_one();
|
||||
} else {
|
||||
// Otherwise we'll just drop it
|
||||
mem::drop(token);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct ProxyData {
|
||||
lock: Mutex<LockedProxyData>,
|
||||
cond_var: Condvar,
|
||||
}
|
||||
|
||||
pub struct Proxy {
|
||||
thread: Mutex<HelperThread>,
|
||||
data: Arc<ProxyData>,
|
||||
}
|
||||
|
||||
lazy_static! {
|
||||
// We can only call `from_env` once per process
|
||||
|
||||
// Note that this is unsafe because it may misinterpret file descriptors
|
||||
// on Unix as jobserver file descriptors. We hopefully execute this near
|
||||
// the beginning of the process though to ensure we don't get false
|
||||
// positives, or in other words we try to execute this before we open
|
||||
// any file descriptors ourselves.
|
||||
//
|
||||
// Pick a "reasonable maximum" if we don't otherwise have
|
||||
// a jobserver in our environment, capping out at 32 so we
|
||||
// don't take everything down by hogging the process run queue.
|
||||
// The fixed number is used to have deterministic compilation
|
||||
// across machines.
|
||||
//
|
||||
// Also note that we stick this in a global because there could be
|
||||
// multiple rustc instances in this process, and the jobserver is
|
||||
// per-process.
|
||||
static ref GLOBAL_CLIENT: Client = unsafe {
|
||||
Client::from_env().unwrap_or_else(|| {
|
||||
Client::new(32).expect("failed to create jobserver")
|
||||
})
|
||||
};
|
||||
|
||||
static ref GLOBAL_PROXY: Proxy = {
|
||||
let data = Arc::new(ProxyData::default());
|
||||
|
||||
Proxy {
|
||||
data: data.clone(),
|
||||
thread: Mutex::new(client().into_helper_thread(move |token| {
|
||||
data.lock.lock().unwrap().new_requested_token(token.unwrap(), &data.cond_var);
|
||||
}).unwrap()),
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
pub fn client() -> Client {
|
||||
GLOBAL_CLIENT.clone()
|
||||
}
|
||||
|
||||
pub fn acquire_thread() {
|
||||
GLOBAL_PROXY.acquire_token();
|
||||
}
|
||||
|
||||
pub fn release_thread() {
|
||||
GLOBAL_PROXY.release_token();
|
||||
}
|
||||
|
||||
impl Proxy {
|
||||
pub fn release_token(&self) {
|
||||
self.data.lock.lock().unwrap().release_token(&self.data.cond_var);
|
||||
}
|
||||
|
||||
pub fn acquire_token(&self) {
|
||||
let mut data = self.data.lock.lock().unwrap();
|
||||
data.waiters += 1;
|
||||
if data.take_token(&self.thread) {
|
||||
return;
|
||||
}
|
||||
// Request a token for us
|
||||
data.request_token(&self.thread);
|
||||
loop {
|
||||
data = self.data.cond_var.wait(data).unwrap();
|
||||
if data.take_token(&self.thread) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -77,6 +77,7 @@ pub mod fx;
|
|||
pub mod graph;
|
||||
pub mod indexed_vec;
|
||||
pub mod interner;
|
||||
pub mod jobserver;
|
||||
pub mod obligation_forest;
|
||||
pub mod owning_ref;
|
||||
pub mod ptr_key;
|
||||
|
|
|
@ -13,7 +13,7 @@ arena = { path = "../libarena" }
|
|||
graphviz = { path = "../libgraphviz" }
|
||||
log = "0.4"
|
||||
env_logger = { version = "0.5", default-features = false }
|
||||
rustc-rayon = "0.1.1"
|
||||
rustc-rayon = "0.1.2"
|
||||
scoped-tls = "1.0"
|
||||
rustc = { path = "../librustc" }
|
||||
rustc_allocator = { path = "../librustc_allocator" }
|
||||
|
|
|
@ -17,6 +17,7 @@ use rustc_allocator as allocator;
|
|||
use rustc_borrowck as borrowck;
|
||||
use rustc_codegen_utils::codegen_backend::CodegenBackend;
|
||||
use rustc_data_structures::sync::{self, Lock};
|
||||
use rustc_data_structures::jobserver;
|
||||
use rustc_incremental;
|
||||
use rustc_metadata::creader::CrateLoader;
|
||||
use rustc_metadata::cstore::{self, CStore};
|
||||
|
@ -72,6 +73,8 @@ pub fn spawn_thread_pool<F: FnOnce(config::Options) -> R + sync::Send, R: sync::
|
|||
let gcx_ptr = &Lock::new(0);
|
||||
|
||||
let config = ThreadPoolBuilder::new()
|
||||
.acquire_thread_handler(jobserver::acquire_thread)
|
||||
.release_thread_handler(jobserver::release_thread)
|
||||
.num_threads(Session::threads_from_count(opts.debugging_opts.threads))
|
||||
.deadlock_handler(|| unsafe { ty::query::handle_deadlock() })
|
||||
.stack_size(::STACK_SIZE);
|
||||
|
|
Loading…
Add table
Reference in a new issue