From b70ba3672f5606463c5d19720f6a2d800408cf5d Mon Sep 17 00:00:00 2001 From: Nicholas Yang Date: Tue, 30 Apr 2024 14:39:05 -0400 Subject: [PATCH] fix(turborepo): Watch mode not responding to changes (#8057) ### Description When I converted to a `watch` channel, I ended up introducing an annoying bug. Basically you can't tell if the value has been read already in a `watch` channel. So if you send a rediscover, then with subsequent events it's impossible to determine if the rediscover has been seen and you should send a new event, or if it's not seen and therefore you should send nothing. I fixed this by reverting back to the lock version and addressed @gsoltis's comments by using a tokio Mutex which yields to the runtime when grabbing the lock. ### Testing Instructions Validated that we get the right change events. Also validated that with regular Mutex and no `yield_now`, we get a deadlock, while with tokio Mutex and no `yield_now` we don't deadlock. Closes TURBO-2907 --- crates/turborepo-lib/src/run/watch.rs | 47 ++++++++++++++------------- 1 file changed, 25 insertions(+), 22 deletions(-) diff --git a/crates/turborepo-lib/src/run/watch.rs b/crates/turborepo-lib/src/run/watch.rs index cf024ad53c253..a82a424f0b8b7 100644 --- a/crates/turborepo-lib/src/run/watch.rs +++ b/crates/turborepo-lib/src/run/watch.rs @@ -1,12 +1,12 @@ -use std::collections::HashSet; +use std::{cell::RefCell, collections::HashSet, sync::Arc}; use futures::StreamExt; use miette::{Diagnostic, SourceSpan}; use thiserror::Error; use tokio::{ select, - sync::watch, - task::{yield_now, JoinHandle}, + sync::{Mutex, Notify}, + task::JoinHandle, }; use turborepo_repository::package_graph::PackageName; use turborepo_telemetry::events::command::CommandEventBuilder; @@ -22,8 +22,7 @@ use crate::{ DaemonConnector, DaemonPaths, }; -#[derive(Clone)] -pub enum ChangedPackages { +enum ChangedPackages { All, Some(HashSet), } @@ -89,10 +88,6 @@ pub enum Error { SignalInterrupt, #[error("package change error")] PackageChange(#[from] tonic::Status), - #[error("changed packages channel closed, cannot receive new changes")] - ChangedPackagesRecv(#[from] watch::error::RecvError), - #[error("changed packages channel closed, cannot send new changes")] - ChangedPackagesSend(#[from] watch::error::SendError), } impl WatchClient { @@ -140,12 +135,18 @@ impl WatchClient { let signal_subscriber = self.handler.subscribe().ok_or(Error::NoSignalHandler)?; - let (changed_pkgs_tx, mut changed_pkgs_rx) = watch::channel(ChangedPackages::default()); + // We explicitly use a tokio::sync::Mutex here to avoid deadlocks. + // If we used a std::sync::Mutex, we could deadlock by spinning the lock + // and not yielding back to the tokio runtime. + let changed_packages = Mutex::new(RefCell::new(ChangedPackages::default())); + let notify_run = Arc::new(Notify::new()); + let notify_event = notify_run.clone(); let event_fut = async { while let Some(event) = events.next().await { let event = event?; - Self::handle_change_event(&changed_pkgs_tx, event.event.unwrap()).await?; + Self::handle_change_event(&changed_packages, event.event.unwrap()).await?; + notify_event.notify_one(); } Err(Error::ConnectionClosed) @@ -153,10 +154,12 @@ impl WatchClient { let run_fut = async { loop { - changed_pkgs_rx.changed().await?; - let changed_pkgs = { changed_pkgs_rx.borrow_and_update().clone() }; - - self.execute_run(changed_pkgs).await?; + notify_run.notified().await; + let changed_packages_guard = changed_packages.lock().await; + if !changed_packages_guard.borrow().is_empty() { + let changed_packages = changed_packages_guard.take(); + self.execute_run(changed_packages).await?; + } } }; @@ -177,7 +180,7 @@ impl WatchClient { } async fn handle_change_event( - changed_packages_tx: &watch::Sender, + changed_packages: &Mutex>, event: proto::package_change_event::Event, ) -> Result<(), Error> { // Should we recover here? @@ -187,17 +190,17 @@ impl WatchClient { }) => { let package_name = PackageName::from(package_name); - changed_packages_tx.send_if_modified(|changed_pkgs| match changed_pkgs { - ChangedPackages::All => false, + match changed_packages.lock().await.get_mut() { + ChangedPackages::All => { + // If we've already changed all packages, ignore + } ChangedPackages::Some(ref mut pkgs) => { pkgs.insert(package_name); - - true } - }); + } } proto::package_change_event::Event::RediscoverPackages(_) => { - changed_packages_tx.send(ChangedPackages::All)?; + *changed_packages.lock().await.get_mut() = ChangedPackages::All; } proto::package_change_event::Event::Error(proto::PackageChangeError { message }) => { return Err(DaemonError::Unavailable(message).into());