//! Watcher implementation for Darwin's FSEvents API //! //! The FSEvents API provides a mechanism to notify clients about directories they ought to re-scan //! in order to keep their internal data structures up-to-date with respect to the true state of //! the file system. (For example, when files or directories are created, modified, or removed.) It //! sends these notifications "in bulk", possibly notifying the client of changes to several //! directories in a single callback. //! //! For more information see the [FSEvents API reference][ref]. //! //! TODO: document event translation //! //! [ref]: https://developer.apple.com/library/mac/documentation/Darwin/Reference/FSEvents_Ref/ #![allow(non_upper_case_globals, dead_code)] use crate::event::*; use crate::{unbounded, Config, Error, EventHandler, RecursiveMode, Result, Sender, Watcher}; use fsevent_sys as fs; use fsevent_sys::core_foundation as cf; use std::collections::HashMap; use std::ffi::CStr; use std::fmt; use std::os::raw; use std::path::{Path, PathBuf}; use std::ptr; use std::sync::{Arc, Mutex}; use std::thread; bitflags::bitflags! { #[repr(C)] #[derive(Debug)] struct StreamFlags: u32 { const NONE = fs::kFSEventStreamEventFlagNone; const MUST_SCAN_SUBDIRS = fs::kFSEventStreamEventFlagMustScanSubDirs; const USER_DROPPED = fs::kFSEventStreamEventFlagUserDropped; const KERNEL_DROPPED = fs::kFSEventStreamEventFlagKernelDropped; const IDS_WRAPPED = fs::kFSEventStreamEventFlagEventIdsWrapped; const HISTORY_DONE = fs::kFSEventStreamEventFlagHistoryDone; const ROOT_CHANGED = fs::kFSEventStreamEventFlagRootChanged; const MOUNT = fs::kFSEventStreamEventFlagMount; const UNMOUNT = fs::kFSEventStreamEventFlagUnmount; const ITEM_CREATED = fs::kFSEventStreamEventFlagItemCreated; const ITEM_REMOVED = fs::kFSEventStreamEventFlagItemRemoved; const INODE_META_MOD = fs::kFSEventStreamEventFlagItemInodeMetaMod; const ITEM_RENAMED = fs::kFSEventStreamEventFlagItemRenamed; const ITEM_MODIFIED = fs::kFSEventStreamEventFlagItemModified; const FINDER_INFO_MOD = fs::kFSEventStreamEventFlagItemFinderInfoMod; const ITEM_CHANGE_OWNER = fs::kFSEventStreamEventFlagItemChangeOwner; const ITEM_XATTR_MOD = fs::kFSEventStreamEventFlagItemXattrMod; const IS_FILE = fs::kFSEventStreamEventFlagItemIsFile; const IS_DIR = fs::kFSEventStreamEventFlagItemIsDir; const IS_SYMLINK = fs::kFSEventStreamEventFlagItemIsSymlink; const OWN_EVENT = fs::kFSEventStreamEventFlagOwnEvent; const IS_HARDLINK = fs::kFSEventStreamEventFlagItemIsHardlink; const IS_LAST_HARDLINK = fs::kFSEventStreamEventFlagItemIsLastHardlink; const ITEM_CLONED = fs::kFSEventStreamEventFlagItemCloned; } } /// FSEvents-based `Watcher` implementation pub struct FsEventWatcher { paths: cf::CFMutableArrayRef, since_when: fs::FSEventStreamEventId, latency: cf::CFTimeInterval, flags: fs::FSEventStreamCreateFlags, event_handler: Arc>, runloop: Option<(cf::CFRunLoopRef, thread::JoinHandle<()>)>, recursive_info: HashMap, } impl fmt::Debug for FsEventWatcher { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("FsEventWatcher") .field("paths", &self.paths) .field("since_when", &self.since_when) .field("latency", &self.latency) .field("flags", &self.flags) .field("event_handler", &Arc::as_ptr(&self.event_handler)) .field("runloop", &self.runloop) .field("recursive_info", &self.recursive_info) .finish() } } // CFMutableArrayRef is a type alias to *mut libc::c_void, so FsEventWatcher is not Send/Sync // automatically. It's Send because the pointer is not used in other threads. unsafe impl Send for FsEventWatcher {} // It's Sync because all methods that change the mutable state use `&mut self`. unsafe impl Sync for FsEventWatcher {} fn translate_flags(flags: StreamFlags, precise: bool) -> Vec { let mut evs = Vec::new(); // «Denotes a sentinel event sent to mark the end of the "historical" events // sent as a result of specifying a `sinceWhen` value in the FSEvents.Create // call that created this event stream. After invoking the client's callback // with all the "historical" events that occurred before now, the client's // callback will be invoked with an event where the HistoryDone flag is set. // The client should ignore the path supplied in this callback.» // — https://www.mbsplugins.eu/FSEventsNextEvent.shtml // // As a result, we just stop processing here and return an empty vec, which // will ignore this completely and not emit any Events whatsoever. if flags.contains(StreamFlags::HISTORY_DONE) { return evs; } // FSEvents provides two possible hints as to why events were dropped, // however documentation on what those mean is scant, so we just pass them // through in the info attr field. The intent is clear enough, and the // additional information is provided if the user wants it. if flags.contains(StreamFlags::MUST_SCAN_SUBDIRS) { let e = Event::new(EventKind::Other).set_flag(Flag::Rescan); evs.push(if flags.contains(StreamFlags::USER_DROPPED) { e.set_info("rescan: user dropped") } else if flags.contains(StreamFlags::KERNEL_DROPPED) { e.set_info("rescan: kernel dropped") } else { e }); } // In imprecise mode, let's not even bother parsing the kind of the event // except for the above very special events. if !precise { evs.push(Event::new(EventKind::Any)); return evs; } // This is most likely a rename or a removal. We assume rename but may want // to figure out if it was a removal some way later (TODO). To denote the // special nature of the event, we add an info string. if flags.contains(StreamFlags::ROOT_CHANGED) { evs.push( Event::new(EventKind::Modify(ModifyKind::Name(RenameMode::From))) .set_info("root changed"), ); } // A path was mounted at the event path; we treat that as a create. if flags.contains(StreamFlags::MOUNT) { evs.push(Event::new(EventKind::Create(CreateKind::Other)).set_info("mount")); } // A path was unmounted at the event path; we treat that as a remove. if flags.contains(StreamFlags::UNMOUNT) { evs.push(Event::new(EventKind::Remove(RemoveKind::Other)).set_info("mount")); } if flags.contains(StreamFlags::ITEM_CREATED) { evs.push(if flags.contains(StreamFlags::IS_DIR) { Event::new(EventKind::Create(CreateKind::Folder)) } else if flags.contains(StreamFlags::IS_FILE) { Event::new(EventKind::Create(CreateKind::File)) } else { let e = Event::new(EventKind::Create(CreateKind::Other)); if flags.contains(StreamFlags::IS_SYMLINK) { e.set_info("is: symlink") } else if flags.contains(StreamFlags::IS_HARDLINK) { e.set_info("is: hardlink") } else if flags.contains(StreamFlags::ITEM_CLONED) { e.set_info("is: clone") } else { Event::new(EventKind::Create(CreateKind::Any)) } }); } if flags.contains(StreamFlags::ITEM_REMOVED) { evs.push(if flags.contains(StreamFlags::IS_DIR) { Event::new(EventKind::Remove(RemoveKind::Folder)) } else if flags.contains(StreamFlags::IS_FILE) { Event::new(EventKind::Remove(RemoveKind::File)) } else { let e = Event::new(EventKind::Remove(RemoveKind::Other)); if flags.contains(StreamFlags::IS_SYMLINK) { e.set_info("is: symlink") } else if flags.contains(StreamFlags::IS_HARDLINK) { e.set_info("is: hardlink") } else if flags.contains(StreamFlags::ITEM_CLONED) { e.set_info("is: clone") } else { Event::new(EventKind::Remove(RemoveKind::Any)) } }); } // FSEvents provides no mechanism to associate the old and new sides of a // rename event. if flags.contains(StreamFlags::ITEM_RENAMED) { evs.push(Event::new(EventKind::Modify(ModifyKind::Name( RenameMode::Any, )))); } // This is only described as "metadata changed", but it may be that it's // only emitted for some more precise subset of events... if so, will need // amending, but for now we have an Any-shaped bucket to put it in. if flags.contains(StreamFlags::INODE_META_MOD) { evs.push(Event::new(EventKind::Modify(ModifyKind::Metadata( MetadataKind::Any, )))); } if flags.contains(StreamFlags::FINDER_INFO_MOD) { evs.push( Event::new(EventKind::Modify(ModifyKind::Metadata(MetadataKind::Other))) .set_info("meta: finder info"), ); } if flags.contains(StreamFlags::ITEM_CHANGE_OWNER) { evs.push(Event::new(EventKind::Modify(ModifyKind::Metadata( MetadataKind::Ownership, )))); } if flags.contains(StreamFlags::ITEM_XATTR_MOD) { evs.push(Event::new(EventKind::Modify(ModifyKind::Metadata( MetadataKind::Extended, )))); } // This is specifically described as a data change, which we take to mean // is a content change. if flags.contains(StreamFlags::ITEM_MODIFIED) { evs.push(Event::new(EventKind::Modify(ModifyKind::Data( DataChange::Content, )))); } if flags.contains(StreamFlags::OWN_EVENT) { for ev in &mut evs { *ev = std::mem::take(ev).set_process_id(std::process::id()); } } evs } struct StreamContextInfo { event_handler: Arc>, recursive_info: HashMap, } // Free the context when the stream created by `FSEventStreamCreate` is released. extern "C" fn release_context(info: *const libc::c_void) { // Safety: // - The [documentation] for `FSEventStreamContext` states that `release` is only // called when the stream is deallocated, so it is safe to convert `info` back into a // box and drop it. // // [docs]: https://developer.apple.com/documentation/coreservices/fseventstreamcontext?language=objc unsafe { drop(Box::from_raw( info as *const StreamContextInfo as *mut StreamContextInfo, )); } } extern "C" { /// Indicates whether the run loop is waiting for an event. fn CFRunLoopIsWaiting(runloop: cf::CFRunLoopRef) -> cf::Boolean; } impl FsEventWatcher { fn from_event_handler(event_handler: Arc>) -> Result { Ok(FsEventWatcher { paths: unsafe { cf::CFArrayCreateMutable(cf::kCFAllocatorDefault, 0, &cf::kCFTypeArrayCallBacks) }, since_when: fs::kFSEventStreamEventIdSinceNow, latency: 0.0, flags: fs::kFSEventStreamCreateFlagFileEvents | fs::kFSEventStreamCreateFlagNoDefer, event_handler, runloop: None, recursive_info: HashMap::new(), }) } fn watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> { self.stop(); let result = self.append_path(path, recursive_mode); // ignore return error: may be empty path list let _ = self.run(); result } fn unwatch_inner(&mut self, path: &Path) -> Result<()> { self.stop(); let result = self.remove_path(path); // ignore return error: may be empty path list let _ = self.run(); result } #[inline] fn is_running(&self) -> bool { self.runloop.is_some() } fn stop(&mut self) { if !self.is_running() { return; } if let Some((runloop, thread_handle)) = self.runloop.take() { unsafe { let runloop = runloop as *mut raw::c_void; while CFRunLoopIsWaiting(runloop) == 0 { thread::yield_now(); } cf::CFRunLoopStop(runloop); } // Wait for the thread to shut down. thread_handle.join().expect("thread to shut down"); } } fn remove_path(&mut self, path: &Path) -> Result<()> { let str_path = path.to_str().unwrap(); unsafe { let mut err: cf::CFErrorRef = ptr::null_mut(); let cf_path = cf::str_path_to_cfstring_ref(str_path, &mut err); if cf_path.is_null() { cf::CFRelease(err as cf::CFRef); return Err(Error::watch_not_found().add_path(path.into())); } let mut to_remove = Vec::new(); for idx in 0..cf::CFArrayGetCount(self.paths) { let item = cf::CFArrayGetValueAtIndex(self.paths, idx); if cf::CFStringCompare(item, cf_path, cf::kCFCompareCaseInsensitive) == cf::kCFCompareEqualTo { to_remove.push(idx); } } cf::CFRelease(cf_path); for idx in to_remove.iter().rev() { cf::CFArrayRemoveValueAtIndex(self.paths, *idx); } } let p = if let Ok(canonicalized_path) = path.canonicalize() { canonicalized_path } else { path.to_owned() }; match self.recursive_info.remove(&p) { Some(_) => Ok(()), None => Err(Error::watch_not_found()), } } // https://github.com/thibaudgg/rb-fsevent/blob/master/ext/fsevent_watch/main.c fn append_path(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> { if !path.exists() { return Err(Error::path_not_found().add_path(path.into())); } let canonical_path = path.to_path_buf().canonicalize()?; let str_path = path.to_str().unwrap(); unsafe { let mut err: cf::CFErrorRef = ptr::null_mut(); let cf_path = cf::str_path_to_cfstring_ref(str_path, &mut err); if cf_path.is_null() { // Most likely the directory was deleted, or permissions changed, // while the above code was running. cf::CFRelease(err as cf::CFRef); return Err(Error::path_not_found().add_path(path.into())); } cf::CFArrayAppendValue(self.paths, cf_path); cf::CFRelease(cf_path); } self.recursive_info .insert(canonical_path, recursive_mode.is_recursive()); Ok(()) } fn run(&mut self) -> Result<()> { if unsafe { cf::CFArrayGetCount(self.paths) } == 0 { // TODO: Reconstruct and add paths to error return Err(Error::path_not_found()); } // We need to associate the stream context with our callback in order to propagate events // to the rest of the system. This will be owned by the stream, and will be freed when the // stream is closed. This means we will leak the context if we panic before reacing // `FSEventStreamRelease`. let context = Box::into_raw(Box::new(StreamContextInfo { event_handler: self.event_handler.clone(), recursive_info: self.recursive_info.clone(), })); let stream_context = fs::FSEventStreamContext { version: 0, info: context as *mut libc::c_void, retain: None, release: Some(release_context), copy_description: None, }; let stream = unsafe { fs::FSEventStreamCreate( cf::kCFAllocatorDefault, callback, &stream_context, self.paths, self.since_when, self.latency, self.flags, ) }; // Wrapper to help send CFRef types across threads. struct CFSendWrapper(cf::CFRef); // Safety: // - According to the Apple documentation, it's safe to move `CFRef`s across threads. // https://developer.apple.com/library/archive/documentation/Cocoa/Conceptual/Multithreading/ThreadSafetySummary/ThreadSafetySummary.html unsafe impl Send for CFSendWrapper {} // move into thread let stream = CFSendWrapper(stream); // channel to pass runloop around let (rl_tx, rl_rx) = unbounded(); let thread_handle = thread::Builder::new() .name("notify-rs fsevents loop".to_string()) .spawn(move || { let _ = &stream; let stream = stream.0; unsafe { let cur_runloop = cf::CFRunLoopGetCurrent(); fs::FSEventStreamScheduleWithRunLoop( stream, cur_runloop, cf::kCFRunLoopDefaultMode, ); fs::FSEventStreamStart(stream); // the calling to CFRunLoopRun will be terminated by CFRunLoopStop call in drop() rl_tx .send(CFSendWrapper(cur_runloop)) .expect("Unable to send runloop to watcher"); cf::CFRunLoopRun(); fs::FSEventStreamStop(stream); fs::FSEventStreamInvalidate(stream); fs::FSEventStreamRelease(stream); } })?; // block until runloop has been sent self.runloop = Some((rl_rx.recv().unwrap().0, thread_handle)); Ok(()) } fn configure_raw_mode(&mut self, _config: Config, tx: Sender>) { tx.send(Ok(false)) .expect("configuration channel disconnect"); } } extern "C" fn callback( stream_ref: fs::FSEventStreamRef, info: *mut libc::c_void, num_events: libc::size_t, // size_t numEvents event_paths: *mut libc::c_void, // void *eventPaths event_flags: *const fs::FSEventStreamEventFlags, // const FSEventStreamEventFlags eventFlags[] event_ids: *const fs::FSEventStreamEventId, // const FSEventStreamEventId eventIds[] ) { unsafe { callback_impl( stream_ref, info, num_events, event_paths, event_flags, event_ids, ) } } unsafe fn callback_impl( _stream_ref: fs::FSEventStreamRef, info: *mut libc::c_void, num_events: libc::size_t, // size_t numEvents event_paths: *mut libc::c_void, // void *eventPaths event_flags: *const fs::FSEventStreamEventFlags, // const FSEventStreamEventFlags eventFlags[] _event_ids: *const fs::FSEventStreamEventId, // const FSEventStreamEventId eventIds[] ) { let event_paths = event_paths as *const *const libc::c_char; let info = info as *const StreamContextInfo; let event_handler = &(*info).event_handler; for p in 0..num_events { let path = CStr::from_ptr(*event_paths.add(p)) .to_str() .expect("Invalid UTF8 string."); let path = PathBuf::from(path); let flag = *event_flags.add(p); let flag = StreamFlags::from_bits(flag).unwrap_or_else(|| { panic!("Unable to decode StreamFlags: {}", flag); }); let mut handle_event = false; for (p, r) in &(*info).recursive_info { if path.starts_with(p) { if *r || &path == p { handle_event = true; break; } else if let Some(parent_path) = path.parent() { if parent_path == p { handle_event = true; break; } } } } if !handle_event { continue; } log::trace!("FSEvent: path = `{}`, flag = {:?}", path.display(), flag); for ev in translate_flags(flag, true).into_iter() { // TODO: precise let ev = ev.add_path(path.clone()); let mut event_handler = event_handler.lock().expect("lock not to be poisoned"); event_handler.handle_event(Ok(ev)); } } } impl Watcher for FsEventWatcher { /// Create a new watcher. fn new(event_handler: F, _config: Config) -> Result { Self::from_event_handler(Arc::new(Mutex::new(event_handler))) } fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> { self.watch_inner(path, recursive_mode) } fn unwatch(&mut self, path: &Path) -> Result<()> { self.unwatch_inner(path) } fn configure(&mut self, config: Config) -> Result { let (tx, rx) = unbounded(); self.configure_raw_mode(config, tx); rx.recv()? } fn kind() -> crate::WatcherKind { crate::WatcherKind::Fsevent } } impl Drop for FsEventWatcher { fn drop(&mut self) { self.stop(); unsafe { cf::CFRelease(self.paths); } } } #[test] fn test_fsevent_watcher_drop() { use super::*; use std::time::Duration; let dir = tempfile::tempdir().unwrap(); let (tx, rx) = std::sync::mpsc::channel(); { let mut watcher = FsEventWatcher::new(tx, Default::default()).unwrap(); watcher.watch(dir.path(), RecursiveMode::Recursive).unwrap(); thread::sleep(Duration::from_millis(2000)); println!("is running -> {}", watcher.is_running()); thread::sleep(Duration::from_millis(1000)); watcher.unwatch(dir.path()).unwrap(); println!("is running -> {}", watcher.is_running()); } thread::sleep(Duration::from_millis(1000)); for res in rx { let e = res.unwrap(); println!("debug => {:?} {:?}", e.kind, e.paths); } println!("in test: {} works", file!()); } #[test] fn test_steam_context_info_send_and_sync() { fn check_send() {} check_send::(); }