handle all the reads on the "main" watcher thread
This commit is contained in:
parent
3ce531f95d
commit
012ea3fac6
1 changed files with 75 additions and 48 deletions
|
@ -5,7 +5,7 @@ use std::{
|
|||
sync::{mpsc, Arc},
|
||||
time::Duration,
|
||||
};
|
||||
use crossbeam_channel::{Receiver, Sender};
|
||||
use crossbeam_channel::{Receiver, Sender, unbounded, RecvError, select};
|
||||
use relative_path::RelativePathBuf;
|
||||
use thread_worker::WorkerHandle;
|
||||
use walkdir::WalkDir;
|
||||
|
@ -61,9 +61,25 @@ pub(crate) struct Worker {
|
|||
|
||||
impl Worker {
|
||||
pub(crate) fn start(roots: Arc<Roots>) -> Worker {
|
||||
let (worker, worker_handle) =
|
||||
thread_worker::spawn("vfs", 128, move |input_receiver, output_sender| {
|
||||
// This is a pretty elaborate setup of threads & channels! It is
|
||||
// explained by the following concerns:
|
||||
|
||||
// * we need to burn a thread translating from notify's mpsc to
|
||||
// crossbeam_channel.
|
||||
// * we want to read all files from a single thread, to gurantee that
|
||||
// we always get fresher versions and never go back in time.
|
||||
// * we want to tear down everything neatly during shutdown.
|
||||
let (worker, worker_handle) = thread_worker::spawn(
|
||||
"vfs",
|
||||
128,
|
||||
// This are the channels we use to communicate with outside world.
|
||||
// If `input_receiver` is closed we need to tear ourselves down.
|
||||
// `output_sender` should not be closed unless the parent died.
|
||||
move |input_receiver, output_sender| {
|
||||
// These are `std` channels notify will send events to
|
||||
let (notify_sender, notify_receiver) = mpsc::channel();
|
||||
// These are the corresponding crossbeam channels
|
||||
let (watcher_sender, watcher_receiver) = unbounded();
|
||||
let watcher = notify::watcher(notify_sender, WATCHER_DELAY)
|
||||
.map_err(|e| log::error!("failed to spawn notify {}", e))
|
||||
.ok();
|
||||
|
@ -72,18 +88,30 @@ impl Worker {
|
|||
watcher: Arc::new(Mutex::new(watcher)),
|
||||
sender: output_sender,
|
||||
};
|
||||
let thread = thread::spawn({
|
||||
let ctx = ctx.clone();
|
||||
move || {
|
||||
let _ = notify_receiver
|
||||
.into_iter()
|
||||
// forward relevant events only
|
||||
.try_for_each(|change| ctx.handle_debounced_event(change));
|
||||
let thread = thread::spawn(move || {
|
||||
let _ = notify_receiver
|
||||
.into_iter()
|
||||
// forward relevant events only
|
||||
.for_each(|event| convert_notify_event(event, &watcher_sender));
|
||||
});
|
||||
|
||||
loop {
|
||||
select! {
|
||||
// Received request from the caller. If this channel is
|
||||
// closed, we should shutdown everything.
|
||||
recv(input_receiver) -> t => match t {
|
||||
Err(RecvError) => break,
|
||||
Ok(Task::AddRoot { root, config }) => watch_root(&ctx, root, Arc::clone(&config)),
|
||||
},
|
||||
// Watcher send us changes. If **this** channel is
|
||||
// closed, the watcher has died, which indicates a bug
|
||||
// -- escalate!
|
||||
recv(watcher_receiver) -> event => match event {
|
||||
Err(RecvError) => panic!("watcher is dead"),
|
||||
Ok((path, change)) => WatcherCtx::handle_change(&ctx, path, change).unwrap(),
|
||||
},
|
||||
}
|
||||
});
|
||||
let res1 = input_receiver.into_iter().try_for_each(|t| match t {
|
||||
Task::AddRoot { root, config } => watch_root(&ctx, root, Arc::clone(&config)),
|
||||
});
|
||||
}
|
||||
drop(ctx.watcher.lock().take());
|
||||
drop(ctx);
|
||||
let res2 = thread.join();
|
||||
|
@ -91,9 +119,9 @@ impl Worker {
|
|||
Ok(()) => log::info!("... Watcher terminated with ok"),
|
||||
Err(_) => log::error!("... Watcher terminated with err"),
|
||||
}
|
||||
res1.unwrap();
|
||||
res2.unwrap();
|
||||
});
|
||||
},
|
||||
);
|
||||
Worker {
|
||||
worker,
|
||||
worker_handle,
|
||||
|
@ -114,7 +142,7 @@ impl Worker {
|
|||
}
|
||||
}
|
||||
|
||||
fn watch_root(woker: &WatcherCtx, root: VfsRoot, config: Arc<RootConfig>) -> Result<()> {
|
||||
fn watch_root(woker: &WatcherCtx, root: VfsRoot, config: Arc<RootConfig>) {
|
||||
let mut guard = woker.watcher.lock();
|
||||
log::debug!("loading {} ...", config.root.as_path().display());
|
||||
let files = watch_recursive(guard.as_mut(), config.root.as_path(), &*config)
|
||||
|
@ -127,9 +155,9 @@ fn watch_root(woker: &WatcherCtx, root: VfsRoot, config: Arc<RootConfig>) -> Res
|
|||
.collect();
|
||||
woker
|
||||
.sender
|
||||
.send(TaskResult::BulkLoadRoot { root, files })?;
|
||||
.send(TaskResult::BulkLoadRoot { root, files })
|
||||
.unwrap();
|
||||
log::debug!("... loaded {}", config.root.as_path().display());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
|
@ -139,38 +167,37 @@ struct WatcherCtx {
|
|||
sender: Sender<TaskResult>,
|
||||
}
|
||||
|
||||
impl WatcherCtx {
|
||||
fn handle_debounced_event(&self, ev: DebouncedEvent) -> Result<()> {
|
||||
match ev {
|
||||
DebouncedEvent::NoticeWrite(_)
|
||||
| DebouncedEvent::NoticeRemove(_)
|
||||
| DebouncedEvent::Chmod(_) => {
|
||||
// ignore
|
||||
}
|
||||
DebouncedEvent::Rescan => {
|
||||
// TODO rescan all roots
|
||||
}
|
||||
DebouncedEvent::Create(path) => {
|
||||
self.handle_change(path, ChangeKind::Create)?;
|
||||
}
|
||||
DebouncedEvent::Write(path) => {
|
||||
self.handle_change(path, ChangeKind::Write)?;
|
||||
}
|
||||
DebouncedEvent::Remove(path) => {
|
||||
self.handle_change(path, ChangeKind::Remove)?;
|
||||
}
|
||||
DebouncedEvent::Rename(src, dst) => {
|
||||
self.handle_change(src, ChangeKind::Remove)?;
|
||||
self.handle_change(dst, ChangeKind::Create)?;
|
||||
}
|
||||
DebouncedEvent::Error(err, path) => {
|
||||
// TODO should we reload the file contents?
|
||||
log::warn!("watcher error \"{}\", {:?}", err, path);
|
||||
}
|
||||
fn convert_notify_event(event: DebouncedEvent, sender: &Sender<(PathBuf, ChangeKind)>) {
|
||||
match event {
|
||||
DebouncedEvent::NoticeWrite(_)
|
||||
| DebouncedEvent::NoticeRemove(_)
|
||||
| DebouncedEvent::Chmod(_) => {
|
||||
// ignore
|
||||
}
|
||||
DebouncedEvent::Rescan => {
|
||||
// TODO rescan all roots
|
||||
}
|
||||
DebouncedEvent::Create(path) => {
|
||||
sender.send((path, ChangeKind::Create)).unwrap();
|
||||
}
|
||||
DebouncedEvent::Write(path) => {
|
||||
sender.send((path, ChangeKind::Write)).unwrap();
|
||||
}
|
||||
DebouncedEvent::Remove(path) => {
|
||||
sender.send((path, ChangeKind::Remove)).unwrap();
|
||||
}
|
||||
DebouncedEvent::Rename(src, dst) => {
|
||||
sender.send((src, ChangeKind::Remove)).unwrap();
|
||||
sender.send((dst, ChangeKind::Create)).unwrap();
|
||||
}
|
||||
DebouncedEvent::Error(err, path) => {
|
||||
// TODO should we reload the file contents?
|
||||
log::warn!("watcher error \"{}\", {:?}", err, path);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl WatcherCtx {
|
||||
fn handle_change(&self, path: PathBuf, kind: ChangeKind) -> Result<()> {
|
||||
let (root, rel_path) = match self.roots.find(&path) {
|
||||
None => return Ok(()),
|
||||
|
|
Loading…
Add table
Reference in a new issue