From ea744c389b8b679bb290b2b811f0ab422413ea35 Mon Sep 17 00:00:00 2001 From: Greg Soltis Date: Tue, 30 Apr 2024 13:43:45 -0700 Subject: [PATCH] fix(Turborepo): Make package discovery async, and apply a debouncer (#8058) ### Description - move the debouncer to its own module so that it is reusable - apply the debouncer to package discovery - make package discovery async so it doesn't block the file watching channel ### Testing Instructions Existing test suite. Fixes #3455 Closes TURBO-2909 --------- Co-authored-by: Greg Soltis Co-authored-by: Alexander Lyon --- crates/turborepo-filewatch/src/debouncer.rs | 127 ++++++++ .../turborepo-filewatch/src/hash_watcher.rs | 124 +------- crates/turborepo-filewatch/src/lib.rs | 1 + .../src/package_watcher.rs | 274 ++++++++++++------ 4 files changed, 332 insertions(+), 194 deletions(-) create mode 100644 crates/turborepo-filewatch/src/debouncer.rs diff --git a/crates/turborepo-filewatch/src/debouncer.rs b/crates/turborepo-filewatch/src/debouncer.rs new file mode 100644 index 00000000000000..0f169b2d715b63 --- /dev/null +++ b/crates/turborepo-filewatch/src/debouncer.rs @@ -0,0 +1,127 @@ +use std::{fmt::Debug, sync::Mutex, time::Duration}; + +use tokio::{select, sync, time::Instant}; +use tracing::trace; + +pub(crate) struct Debouncer { + bump: sync::Notify, + serial: Mutex>, + timeout: Duration, +} + +const DEFAULT_DEBOUNCE_TIMEOUT: Duration = Duration::from_millis(10); + +impl Default for Debouncer { + fn default() -> Self { + Self::new(DEFAULT_DEBOUNCE_TIMEOUT) + } +} + +impl Debug for Debouncer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let serial = { self.serial.lock().expect("lock is valid") }; + f.debug_struct("Debouncer") + .field("is_pending", &serial.is_some()) + .field("timeout", &self.timeout) + .finish() + } +} + +impl Debouncer { + pub(crate) fn new(timeout: Duration) -> Self { + let bump = sync::Notify::new(); + let serial = Mutex::new(Some(0)); + Self { + bump, + serial, + timeout, + } + } + + pub(crate) fn bump(&self) -> bool { + let mut serial = self.serial.lock().expect("lock is valid"); + match *serial { + None => false, + Some(previous) => { + *serial = Some(previous + 1); + self.bump.notify_one(); + true + } + } + } + + pub(crate) async fn debounce(&self) { + let mut serial = { + self.serial + .lock() + .expect("lock is valid") + .expect("only this thread sets the value to None") + }; + let mut deadline = Instant::now() + self.timeout; + loop { + let timeout = tokio::time::sleep_until(deadline); + select! { + _ = self.bump.notified() => { + trace!("debouncer notified"); + // reset timeout + let current_serial = self.serial.lock().expect("lock is valid").expect("only this thread sets the value to None"); + if current_serial == serial { + // we timed out between the serial update and the notification. + // ignore this notification, we've already bumped the timer + continue; + } else { + serial = current_serial; + deadline = Instant::now() + self.timeout; + } + } + _ = timeout => { + // check if serial is still valid. It's possible a bump came in before the timeout, + // but we haven't been notified yet. + let mut current_serial_opt = self.serial.lock().expect("lock is valid"); + let current_serial = current_serial_opt.expect("only this thread sets the value to None"); + if current_serial == serial { + // if the serial is what we last observed, and the timer expired, we timed out. + // we're done. Mark that we won't accept any more bumps and return + *current_serial_opt = None; + return; + } else { + serial = current_serial; + deadline = Instant::now() + self.timeout; + } + } + } + } + } +} + +#[cfg(test)] +mod tests { + use std::{ + sync::Arc, + time::{Duration, Instant}, + }; + + use crate::debouncer::Debouncer; + + #[tokio::test] + async fn test_debouncer() { + let debouncer = Arc::new(Debouncer::new(Duration::from_millis(10))); + let debouncer_copy = debouncer.clone(); + let handle = tokio::task::spawn(async move { + debouncer_copy.debounce().await; + }); + for _ in 0..10 { + // assert that we can continue bumping it past the original timeout + tokio::time::sleep(Duration::from_millis(2)).await; + assert!(debouncer.bump()); + } + let start = Instant::now(); + handle.await.unwrap(); + let end = Instant::now(); + // give some wiggle room to account for race conditions, but assert that we + // didn't immediately complete after the last bump + assert!(end - start > Duration::from_millis(5)); + // we shouldn't be able to bump it after it's run out it's timeout + assert!(!debouncer.bump()); + } +} diff --git a/crates/turborepo-filewatch/src/hash_watcher.rs b/crates/turborepo-filewatch/src/hash_watcher.rs index 923dff51767eba..c312ab8eac664e 100644 --- a/crates/turborepo-filewatch/src/hash_watcher.rs +++ b/crates/turborepo-filewatch/src/hash_watcher.rs @@ -2,7 +2,7 @@ use std::{ collections::{HashMap, HashSet}, sync::{ atomic::{AtomicUsize, Ordering}, - Arc, Mutex, + Arc, }, time::Duration, }; @@ -12,15 +12,17 @@ use radix_trie::{Trie, TrieCommon}; use thiserror::Error; use tokio::{ select, - sync::{self, broadcast, mpsc, oneshot, watch}, - time::Instant, + sync::{broadcast, mpsc, oneshot, watch}, }; use tracing::{debug, trace}; use turbopath::{AbsoluteSystemPathBuf, AnchoredSystemPath, AnchoredSystemPathBuf}; use turborepo_repository::discovery::DiscoveryResponse; use turborepo_scm::{package_deps::GitHashes, Error as SCMError, SCM}; -use crate::{globwatcher::GlobSet, package_watcher::DiscoveryData, NotifyError, OptionalWatch}; +use crate::{ + debouncer::Debouncer, globwatcher::GlobSet, package_watcher::DiscoveryData, NotifyError, + OptionalWatch, +}; pub struct HashWatcher { _exit_tx: oneshot::Sender<()>, @@ -125,92 +127,11 @@ enum Query { #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] struct Version(usize); -struct HashDebouncer { - bump: sync::Notify, - serial: Mutex>, - timeout: Duration, -} - -const DEFAULT_DEBOUNCE_TIMEOUT: Duration = Duration::from_millis(10); - -impl Default for HashDebouncer { - fn default() -> Self { - Self::new(DEFAULT_DEBOUNCE_TIMEOUT) - } -} - -impl HashDebouncer { - fn new(timeout: Duration) -> Self { - let bump = sync::Notify::new(); - let serial = Mutex::new(Some(0)); - Self { - bump, - serial, - timeout, - } - } - - fn bump(&self) -> bool { - let mut serial = self.serial.lock().expect("lock is valid"); - match *serial { - None => false, - Some(previous) => { - *serial = Some(previous + 1); - self.bump.notify_one(); - true - } - } - } - - async fn debounce(&self) { - let mut serial = { - self.serial - .lock() - .expect("lock is valid") - .expect("only this thread sets the value to None") - }; - let mut deadline = Instant::now() + self.timeout; - loop { - let timeout = tokio::time::sleep_until(deadline); - select! { - _ = self.bump.notified() => { - trace!("debouncer notified"); - // reset timeout - let current_serial = self.serial.lock().expect("lock is valid").expect("only this thread sets the value to None"); - if current_serial == serial { - // we timed out between the serial update and the notification. - // ignore this notification, we've already bumped the timer - continue; - } else { - serial = current_serial; - deadline = Instant::now() + self.timeout; - } - } - _ = timeout => { - // check if serial is still valid. It's possible a bump came in before the timeout, - // but we haven't been notified yet. - let mut current_serial_opt = self.serial.lock().expect("lock is valid"); - let current_serial = current_serial_opt.expect("only this thread sets the value to None"); - if current_serial == serial { - // if the serial is what we last observed, and the timer expired, we timed out. - // we're done. Mark that we won't accept any more bumps and return - *current_serial_opt = None; - return; - } else { - serial = current_serial; - deadline = Instant::now() + self.timeout; - } - } - } - } - } -} - enum HashState { Hashes(GitHashes), Pending( Version, - Arc, + Arc, Vec>>, ), Unavailable(String), @@ -545,16 +466,16 @@ impl Subscriber { spec: &HashSpec, hash_update_tx: &mpsc::Sender, immediate: bool, - ) -> (Version, Arc) { + ) -> (Version, Arc) { let version = Version(self.next_version.fetch_add(1, Ordering::SeqCst)); let tx = hash_update_tx.clone(); let spec = spec.clone(); let repo_root = self.repo_root.clone(); let scm = self.scm.clone(); let debouncer = if immediate { - HashDebouncer::new(Duration::from_millis(0)) + Debouncer::new(Duration::from_millis(0)) } else { - HashDebouncer::default() + Debouncer::default() }; let debouncer = Arc::new(debouncer); let debouncer_copy = debouncer.clone(); @@ -702,7 +623,6 @@ impl Subscriber { mod tests { use std::{ assert_matches::assert_matches, - sync::Arc, time::{Duration, Instant}, }; @@ -717,7 +637,7 @@ mod tests { use crate::{ cookies::CookieWriter, globwatcher::GlobSet, - hash_watcher::{HashDebouncer, HashSpec, HashWatcher}, + hash_watcher::{HashSpec, HashWatcher}, package_watcher::PackageWatcher, FileSystemWatcher, }; @@ -1114,28 +1034,6 @@ mod tests { assert!(result.is_empty()); } - #[tokio::test] - async fn test_debouncer() { - let debouncer = Arc::new(HashDebouncer::new(Duration::from_millis(10))); - let debouncer_copy = debouncer.clone(); - let handle = tokio::task::spawn(async move { - debouncer_copy.debounce().await; - }); - for _ in 0..10 { - // assert that we can continue bumping it past the original timeout - tokio::time::sleep(Duration::from_millis(2)).await; - assert!(debouncer.bump()); - } - let start = Instant::now(); - handle.await.unwrap(); - let end = Instant::now(); - // give some wiggle room to account for race conditions, but assert that we - // didn't immediately complete after the last bump - assert!(end - start > Duration::from_millis(5)); - // we shouldn't be able to bump it after it's run out it's timeout - assert!(!debouncer.bump()); - } - #[tokio::test] #[tracing_test::traced_test] async fn test_basic_file_changes_with_inputs() { diff --git a/crates/turborepo-filewatch/src/lib.rs b/crates/turborepo-filewatch/src/lib.rs index 6fba08f2ca9b51..3f7565341c1e5a 100644 --- a/crates/turborepo-filewatch/src/lib.rs +++ b/crates/turborepo-filewatch/src/lib.rs @@ -35,6 +35,7 @@ use { }; pub mod cookies; +mod debouncer; #[cfg(target_os = "macos")] mod fsevent; pub mod globwatcher; diff --git a/crates/turborepo-filewatch/src/package_watcher.rs b/crates/turborepo-filewatch/src/package_watcher.rs index 758176e482d14f..da35e9387410aa 100644 --- a/crates/turborepo-filewatch/src/package_watcher.rs +++ b/crates/turborepo-filewatch/src/package_watcher.rs @@ -1,16 +1,24 @@ //! This module hosts the `PackageWatcher` type, which is used to watch the //! filesystem for changes to packages. -use std::{collections::HashMap, path::Path}; +use std::{ + collections::HashMap, + path::Path, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + time::Duration, +}; use futures::FutureExt; use notify::Event; use thiserror::Error; use tokio::{ - join, + join, select, sync::{ broadcast::{self, error::RecvError}, - oneshot, watch, + mpsc, oneshot, watch, }, }; use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf}; @@ -24,6 +32,7 @@ use turborepo_repository::{ use crate::{ cookies::{CookieRegister, CookieWriter, CookiedOptionalWatch}, + debouncer::Debouncer, optional_watch::OptionalWatch, NotifyError, }; @@ -49,6 +58,17 @@ pub enum PackageWatchError { // making a change. pub(crate) type DiscoveryData = Result; +// Version is a type that exists to stamp an asynchronous hash computation +// with a version so that we can ignore completion of outdated hash +// computations. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +struct Version(usize); + +struct DiscoveryResult { + version: Version, + state: PackageState, +} + /// Watches the filesystem for changes to packages and package managers. pub struct PackageWatcher { // _exit_ch exists to trigger a close on the receiver when an instance @@ -123,13 +143,14 @@ struct Subscriber { package_discovery_tx: watch::Sender>, package_discovery_lazy: CookiedOptionalWatch, cookie_tx: CookieRegister, + next_version: AtomicUsize, } /// PackageWatcher state. We either don't have a valid package manager, /// don't have valid globs, or we have both a package manager and globs /// and some maybe-empty set of workspaces. #[derive(Debug)] -enum State { +enum PackageState { NoPackageManager(String), InvalidGlobs(String), ValidWorkspaces { @@ -139,6 +160,15 @@ enum State { }, } +#[derive(Debug)] +enum State { + Pending { + debouncer: Arc, + version: Version, + }, + Ready(PackageState), +} + // Because our package manager detection is coupled with the workspace globs, we // need to recheck all workspaces any time any of these files change. A change // in any of these might result in a different package manager being detected, @@ -173,9 +203,37 @@ impl Subscriber { package_discovery_tx, package_discovery_lazy, cookie_tx, + next_version: AtomicUsize::new(0), }) } + fn queue_rediscovery( + &self, + immediate: bool, + package_state_tx: mpsc::Sender, + ) -> (Version, Arc) { + // Every time we're queuing rediscovery, we know our state is no longer valid, + // so reset it for any downstream consumers. + self.reset_discovery_data(); + let version = Version(self.next_version.fetch_add(1, Ordering::SeqCst)); + let debouncer = if immediate { + Debouncer::new(Duration::from_millis(0)) + } else { + Debouncer::default() + }; + let debouncer = Arc::new(debouncer); + let debouncer_copy = debouncer.clone(); + let repo_root = self.repo_root.clone(); + tokio::task::spawn(async move { + debouncer_copy.debounce().await; + let state = discover_packages(repo_root).await; + let _ = package_state_tx + .send(DiscoveryResult { version, state }) + .await; + }); + (version, debouncer) + } + async fn watch_process( mut self, mut recv: OptionalWatch>>, @@ -186,32 +244,56 @@ impl Subscriber { Err(e) => return PackageWatcherProcessError::Filewatching(e), }; - // state represents our current understanding of the underlying filesystem, and - // is expected to be mutated in place by handle_file_event. Both - // rediscover_everything and handle_file_event are responsible for - // broadcasting updates to state. - let mut state = self.rediscover_and_write_state().await; + let (package_state_tx, mut package_state_rx) = mpsc::channel::(256); + + let (version, debouncer) = self.queue_rediscovery(true, package_state_tx.clone()); + + // state represents the current state of this process, and is expected to be + // updated in place by the various handler functions. + let mut state = State::Pending { debouncer, version }; tracing::debug!("package watcher ready {:?}", state); loop { - let file_event = recv.recv().await; - match file_event { - Ok(Ok(event)) => self.handle_file_event(&mut state, &event).await, - // if we get an error, we need to re-discover the packages - Ok(Err(_)) => state = self.rediscover_and_write_state().await, - Err(e @ RecvError::Closed) => { - return PackageWatcherProcessError::FilewatchingClosed(e) - } - // if we end up lagging, warn and rediscover packages - Err(RecvError::Lagged(count)) => { - tracing::warn!("lagged behind {count} processing file watching events"); - state = self.rediscover_and_write_state().await; + select! { + Some(discovery_result) = package_state_rx.recv() => { + self.handle_discovery_result(discovery_result, &mut state); + }, + file_event = recv.recv() => { + match file_event { + Ok(Ok(event)) => self.handle_file_event(&mut state, &event, &package_state_tx).await, + // if we get an error, we need to re-discover the packages + Ok(Err(_)) => self.bump_or_queue_rediscovery(&mut state, &package_state_tx), + Err(e @ RecvError::Closed) => { + return PackageWatcherProcessError::FilewatchingClosed(e) + } + // if we end up lagging, warn and rediscover packages + Err(RecvError::Lagged(count)) => { + tracing::warn!("lagged behind {count} processing file watching events"); + self.bump_or_queue_rediscovery(&mut state, &package_state_tx); + } + } } } tracing::trace!("package watcher state: {:?}", state); } } + fn handle_discovery_result(&self, package_result: DiscoveryResult, state: &mut State) { + if let State::Pending { version, .. } = state { + // If this response matches an outstanding rediscovery request, write out the + // corresponding state to downstream consumers and update our state + // accordingly. + // + // Note that depending on events that we received since this request was queued, + // we may have a higher version number, at which point we would + // ignore this update, as we know it is stale. + if package_result.version == *version { + self.write_state(&package_result.state); + *state = State::Ready(package_result.state); + } + } + } + async fn watch( self, exit_rx: oneshot::Receiver<()>, @@ -243,7 +325,12 @@ impl Subscriber { .any(|invalidation_path| path.eq(invalidation_path as &AbsoluteSystemPath)) } - async fn handle_file_event(&mut self, state: &mut State, file_event: &Event) { + async fn handle_file_event( + &mut self, + state: &mut State, + file_event: &Event, + package_state_tx: &mpsc::Sender, + ) { tracing::trace!("file event: {:?} {:?}", file_event.kind, file_event.paths); if file_event @@ -252,10 +339,12 @@ impl Subscriber { .any(|path| self.path_invalidates_everything(path)) { // root package.json changed, rediscover everything - *state = self.rediscover_and_write_state().await; + //*state = self.rediscover_and_write_state().await; + self.bump_or_queue_rediscovery(state, package_state_tx); } else { tracing::trace!("handling non-root package.json change"); - self.handle_workspace_changes(state, file_event).await; + self.handle_workspace_changes(state, file_event, package_state_tx) + .await; } tracing::trace!("updating the cookies"); @@ -271,14 +360,48 @@ impl Subscriber { ); } + fn bump_or_queue_rediscovery( + &self, + state: &mut State, + package_state_tx: &mpsc::Sender, + ) { + if let State::Pending { debouncer, .. } = state { + if debouncer.bump() { + // We successfully bumped the debouncer, which was already pending, + // so a new discovery will happen shortly. + return; + } + } + // We either failed to bump the debouncer, or we don't have a rediscovery + // queued, but we need one. + let (version, debouncer) = self.queue_rediscovery(false, package_state_tx.clone()); + *state = State::Pending { debouncer, version } + } + // checks if the file event contains any changes to package.json files, or // directories that would map to a workspace. - async fn handle_workspace_changes(&mut self, state: &mut State, file_event: &Event) { + async fn handle_workspace_changes( + &mut self, + state: &mut State, + file_event: &Event, + package_state_tx: &mpsc::Sender, + ) { + let package_state = match state { + State::Pending { .. } => { + // We can't assess this event until we have a valid package manager. To be safe, + // bump or queue another rediscovery, since this could race with the discovery + // in progress. + self.bump_or_queue_rediscovery(state, package_state_tx); + return; + } + State::Ready(package_state) => package_state, + }; + // If we don't have a valid package manager and workspace globs, nothing to be // done here - let State::ValidWorkspaces { + let PackageState::ValidWorkspaces { filter, workspaces, .. - } = state + } = package_state else { return; }; @@ -347,7 +470,7 @@ impl Subscriber { } if changed { - self.write_state(state); + self.write_state(package_state); } } @@ -362,59 +485,9 @@ impl Subscriber { }); } - async fn rediscover_and_write_state(&mut self) -> State { - // If we're rediscovering the package manager, clear all data - self.reset_discovery_data(); - let state = self.rediscover().await; - self.write_state(&state); - state - } - - async fn rediscover(&self) -> State { - // If we're rediscovering everything, we need to rediscover the package manager. - // It may have changed if a lockfile changed or package.json changed. - let discovery = - match LocalPackageDiscoveryBuilder::new(self.repo_root.clone(), None, None).build() { - Ok(discovery) => discovery, - Err(e) => return State::NoPackageManager(e.to_string()), - }; - let initial_discovery = match discovery.discover_packages().await { - Ok(discovery) => discovery, - // If we failed the discovery, that's fine, we've reset the values, leave them as None - Err(e) => { - tracing::debug!("failed to rediscover packages: {}", e); - return State::NoPackageManager(e.to_string()); - } - }; - - tracing::debug!("rediscovered packages: {:?}", initial_discovery); - let filter = match initial_discovery - .package_manager - .get_workspace_globs(&self.repo_root) - { - Ok(filter) => filter, - Err(e) => { - // If the globs are invalid, leave everything set to None - tracing::debug!("failed to get workspace globs: {}", e); - return State::InvalidGlobs(e.to_string()); - } - }; - - let workspaces = initial_discovery - .workspaces - .into_iter() - .map(|p| (p.package_json.parent().expect("non-root").to_owned(), p)) - .collect::>(); - State::ValidWorkspaces { - package_manager: initial_discovery.package_manager, - filter, - workspaces, - } - } - - fn write_state(&self, state: &State) { + fn write_state(&self, state: &PackageState) { match state { - State::NoPackageManager(e) | State::InvalidGlobs(e) => { + PackageState::NoPackageManager(e) | PackageState::InvalidGlobs(e) => { self.package_discovery_tx.send_if_modified(|existing| { let error_msg = e.to_string(); match existing { @@ -426,7 +499,7 @@ impl Subscriber { } }); } - State::ValidWorkspaces { + PackageState::ValidWorkspaces { package_manager, workspaces, .. @@ -443,6 +516,45 @@ impl Subscriber { } } +async fn discover_packages(repo_root: AbsoluteSystemPathBuf) -> PackageState { + // If we're rediscovering everything, we need to rediscover the package manager. + // It may have changed if a lockfile changed or package.json changed. + let discovery = match LocalPackageDiscoveryBuilder::new(repo_root.clone(), None, None).build() { + Ok(discovery) => discovery, + Err(e) => return PackageState::NoPackageManager(e.to_string()), + }; + let initial_discovery = match discovery.discover_packages().await { + Ok(discovery) => discovery, + Err(e) => { + tracing::debug!("failed to rediscover packages: {}", e); + return PackageState::NoPackageManager(e.to_string()); + } + }; + + tracing::debug!("rediscovered packages: {:?}", initial_discovery); + let filter = match initial_discovery + .package_manager + .get_workspace_globs(&repo_root) + { + Ok(filter) => filter, + Err(e) => { + tracing::debug!("failed to get workspace globs: {}", e); + return PackageState::InvalidGlobs(e.to_string()); + } + }; + + let workspaces = initial_discovery + .workspaces + .into_iter() + .map(|p| (p.package_json.parent().expect("non-root").to_owned(), p)) + .collect::>(); + PackageState::ValidWorkspaces { + package_manager: initial_discovery.package_manager, + filter, + workspaces, + } +} + #[cfg(test)] mod test { use std::time::Duration;