From 688456031017add017f8e696856aa0d1facb6614 Mon Sep 17 00:00:00 2001 From: Guy Kaplan Date: Sun, 14 Sep 2025 22:32:39 +0300 Subject: [PATCH] fix(watch): only stop relevant tasks while watching for changes. --- Cargo.lock | 1 + crates/turborepo-lib/src/engine/mod.rs | 38 +++++++++ crates/turborepo-lib/src/run/builder.rs | 11 +++ crates/turborepo-lib/src/run/mod.rs | 4 + crates/turborepo-lib/src/run/watch.rs | 104 ++++++++++++++++++++---- crates/turborepo-process/Cargo.toml | 1 + crates/turborepo-process/src/child.rs | 7 ++ crates/turborepo-process/src/command.rs | 14 ++++ crates/turborepo-process/src/lib.rs | 27 ++++++ 9 files changed, 190 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a811166db0009..31ee46e033a99 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6912,6 +6912,7 @@ dependencies = [ "tracing", "tracing-test", "turbopath", + "turborepo-repository", "windows-sys 0.59.0", ] diff --git a/crates/turborepo-lib/src/engine/mod.rs b/crates/turborepo-lib/src/engine/mod.rs index d3028a3eee041..76221fd6221db 100644 --- a/crates/turborepo-lib/src/engine/mod.rs +++ b/crates/turborepo-lib/src/engine/mod.rs @@ -159,6 +159,44 @@ impl Engine { /// Creates an instance of `Engine` that only contains tasks that depend on /// tasks from a given package. This is useful for watch mode, where we /// need to re-run only a portion of the task graph. + pub fn filter_tasks(&self, tasks_to_run: &HashSet<(PackageName, String)>) -> Engine { + let new_graph = self.task_graph.filter_map( + |node_idx, node| { + if let TaskNode::Task(task) = &self.task_graph[node_idx] { + if tasks_to_run.contains(&(task.package().into(), task.task().to_string())) { + return Some(node.clone()); + } + } + None + }, + |_, _| Some(()), + ); + + let task_lookup: HashMap<_, _> = new_graph + .node_indices() + .filter_map(|index| { + let task = new_graph + .node_weight(index) + .expect("node index should be present"); + match task { + TaskNode::Root => None, + TaskNode::Task(task) => Some((task.clone(), index)), + } + }) + .collect(); + + Engine { + marker: std::marker::PhantomData, + root_index: self.root_index, + task_graph: new_graph, + task_lookup, + task_definitions: self.task_definitions.clone(), + task_locations: self.task_locations.clone(), + package_tasks: self.package_tasks.clone(), + has_non_interruptible_tasks: self.has_non_interruptible_tasks, + } + } + pub fn create_engine_for_subgraph( &self, changed_packages: &HashSet, diff --git a/crates/turborepo-lib/src/run/builder.rs b/crates/turborepo-lib/src/run/builder.rs index eacc02b6aec12..4029ab95619d7 100644 --- a/crates/turborepo-lib/src/run/builder.rs +++ b/crates/turborepo-lib/src/run/builder.rs @@ -66,6 +66,7 @@ pub struct RunBuilder { // We will then prune away any tasks that do not depend on tasks inside // this package. entrypoint_packages: Option>, + tasks_to_run: Option>, should_print_prelude_override: Option, // In query, we don't want to validate the engine. Defaults to `true` should_validate_engine: bool, @@ -106,12 +107,18 @@ impl RunBuilder { api_auth, analytics_sender: None, entrypoint_packages: None, + tasks_to_run: None, should_print_prelude_override: None, should_validate_engine: true, add_all_tasks: false, }) } + pub fn with_tasks_to_run(mut self, tasks_to_run: HashSet<(PackageName, String)>) -> Self { + self.tasks_to_run = Some(tasks_to_run); + self + } + pub fn with_entrypoint_packages(mut self, entrypoint_packages: HashSet) -> Self { self.entrypoint_packages = Some(entrypoint_packages); self @@ -560,6 +567,10 @@ impl RunBuilder { engine = engine.create_engine_for_subgraph(entrypoint_packages); } + if let Some(tasks_to_run) = &self.tasks_to_run { + engine = engine.filter_tasks(tasks_to_run); + } + if !self.opts.run_opts.parallel && self.should_validate_engine { engine .validate( diff --git a/crates/turborepo-lib/src/run/mod.rs b/crates/turborepo-lib/src/run/mod.rs index 1ca76233273bf..1121aba5dd113 100644 --- a/crates/turborepo-lib/src/run/mod.rs +++ b/crates/turborepo-lib/src/run/mod.rs @@ -531,4 +531,8 @@ impl RunStopper { pub async fn stop(&self) { self.manager.stop().await; } + + pub async fn stop_tasks(&self, tasks: &HashSet<(PackageName, String)>) { + self.manager.stop_tasks(tasks).await; + } } diff --git a/crates/turborepo-lib/src/run/watch.rs b/crates/turborepo-lib/src/run/watch.rs index 6e689acf80f5b..b084909e079ae 100644 --- a/crates/turborepo-lib/src/run/watch.rs +++ b/crates/turborepo-lib/src/run/watch.rs @@ -23,6 +23,12 @@ use crate::{ DaemonConnector, DaemonPaths, }; +#[derive(Debug, PartialEq, Eq, Hash, Clone)] +struct Task { + package: PackageName, + task: String, +} + #[derive(Debug)] enum ChangedPackages { All, @@ -47,6 +53,7 @@ impl ChangedPackages { pub struct WatchClient { run: Arc, watched_packages: HashSet, + tasks: Arc>>, persistent_tasks_handle: Option, connector: DaemonConnector, base: CommandBase, @@ -155,10 +162,13 @@ impl WatchClient { custom_turbo_json_path, }; + let tasks = Arc::new(Mutex::new(HashSet::new())); + Ok(Self { base, run, watched_packages, + tasks, connector, handler, telemetry, @@ -170,6 +180,17 @@ impl WatchClient { } pub async fn start(&mut self) -> Result<(), Error> { + let initial_tasks = self + .run + .engine() + .task_ids() + .map(|task_id| Task { + package: task_id.package().into(), + task: task_id.task().to_string(), + }) + .collect(); + *self.tasks.lock().expect("poisoned lock") = initial_tasks; + let connector = self.connector.clone(); let mut client = connector.connect().await?; @@ -201,20 +222,24 @@ impl WatchClient { let some_changed_packages = { let mut changed_packages_guard = changed_packages.lock().expect("poisoned lock"); - (!changed_packages_guard.is_empty()) - .then(|| std::mem::take(changed_packages_guard.deref_mut())) + if changed_packages_guard.is_empty() { + None + } else { + Some(std::mem::take(changed_packages_guard.deref_mut())) + } }; if let Some(changed_packages) = some_changed_packages { + let tasks_to_run = self.derive_tasks_to_run(&changed_packages); // Clean up currently running tasks - if let Some(RunHandle { stopper, run_task }) = run_handle.take() { - // Shut down the tasks for the run - stopper.stop().await; - // Run should exit shortly after we stop all child tasks, wait for it to - // finish to ensure all messages are flushed. - let _ = run_task.await; + if let Some(run_handle) = run_handle.as_mut() { + let tasks_to_stop = tasks_to_run + .iter() + .map(|task| (task.package.clone(), task.task.clone())) + .collect(); + run_handle.stopper.stop_tasks(&tasks_to_stop).await; } - run_handle = Some(self.execute_run(changed_packages).await?); + run_handle = Some(self.execute_run(tasks_to_run, changed_packages).await?); } } }; @@ -234,6 +259,44 @@ impl WatchClient { } } + fn derive_tasks_to_run(&self, changed_packages: &ChangedPackages) -> HashSet { + let tasks = self.tasks.lock().expect("poisoned lock"); + match changed_packages { + ChangedPackages::All => tasks.iter().cloned().collect(), + ChangedPackages::Some(packages) => { + let mut tasks_to_run = HashSet::new(); + let mut packages_to_visit = packages.clone(); + let mut visited_packages = HashSet::new(); + + while let Some(package) = packages_to_visit.iter().next().cloned() { + packages_to_visit.remove(&package); + if visited_packages.contains(&package) { + continue; + } + visited_packages.insert(package.clone()); + + for task in tasks.iter() { + if task.package == package { + tasks_to_run.insert(task.clone()); + } + } + + for dependent in self + .run + .pkg_dep_graph() + .ancestors(&turborepo_repository::package_graph::PackageNode::Workspace( + package.clone(), + )) + { + packages_to_visit.insert(dependent.as_package_name().clone()); + } + } + + tasks_to_run + } + } + } + #[instrument(skip(changed_packages))] fn handle_change_event( changed_packages: &Mutex, @@ -287,17 +350,18 @@ impl WatchClient { /// be interrupted /// /// Returns a handle to the task running (2) - async fn execute_run(&mut self, changed_packages: ChangedPackages) -> Result { + async fn execute_run( + &mut self, + tasks_to_run: HashSet, + changed_packages: ChangedPackages, + ) -> Result { // Should we recover here? trace!("handling run with changed packages: {changed_packages:?}"); match changed_packages { - ChangedPackages::Some(packages) => { - let packages = packages - .into_iter() - .filter(|pkg| { - // If not in the watched packages set, ignore - self.watched_packages.contains(pkg) - }) + ChangedPackages::Some(_packages) => { + let packages = tasks_to_run + .iter() + .map(|task| task.package.clone()) .collect(); let mut opts = self.base.opts().clone(); @@ -318,6 +382,12 @@ impl WatchClient { let run = RunBuilder::new(new_base)? .with_entrypoint_packages(packages) + .with_tasks_to_run( + tasks_to_run + .into_iter() + .map(|task| (task.package, task.task)) + .collect(), + ) .hide_prelude() .build(&signal_handler, telemetry) .await?; diff --git a/crates/turborepo-process/Cargo.toml b/crates/turborepo-process/Cargo.toml index d4b39341cf6fa..6aff9d81adcef 100644 --- a/crates/turborepo-process/Cargo.toml +++ b/crates/turborepo-process/Cargo.toml @@ -19,6 +19,7 @@ portable-pty = "0.8.1" tokio = { workspace = true, features = ["full", "time"] } tracing.workspace = true turbopath = { workspace = true } +turborepo-repository = { workspace = true } [lints] workspace = true diff --git a/crates/turborepo-process/src/child.rs b/crates/turborepo-process/src/child.rs index 6f2424920b29e..793c69b213171 100644 --- a/crates/turborepo-process/src/child.rs +++ b/crates/turborepo-process/src/child.rs @@ -381,6 +381,7 @@ pub struct Child { stdin: Arc>>, output: Arc>>, label: String, + command: Command, } #[derive(Clone, Debug)] @@ -416,6 +417,7 @@ impl Child { pty_size: Option, ) -> io::Result { let label = command.label(); + let command_clone = command.clone(); let SpawnResult { handle: mut child, io: ChildIO { stdin, output }, @@ -468,6 +470,7 @@ impl Child { stdin: Arc::new(Mutex::new(stdin)), output: Arc::new(Mutex::new(output)), label, + command: command_clone, }) } @@ -688,6 +691,10 @@ impl Child { pub fn label(&self) -> &str { &self.label } + + pub fn command(&self) -> &Command { + &self.command + } } // Adds a trailing newline if necessary to the buffer diff --git a/crates/turborepo-process/src/command.rs b/crates/turborepo-process/src/command.rs index a8814f8229871..2327a40b7592f 100644 --- a/crates/turborepo-process/src/command.rs +++ b/crates/turborepo-process/src/command.rs @@ -106,6 +106,20 @@ impl Command { pub fn program(&self) -> &OsStr { &self.program } + + pub fn task_name(&self) -> Option<(String, String)> { + let package = self + .env + .get(&OsString::from("TURBO_PACKAGE_NAME"))? + .to_str()? + .to_string(); + let task = self + .env + .get(&OsString::from("TURBO_TASK_NAME"))? + .to_str()? + .to_string(); + Some((package, task)) + } } impl From for tokio::process::Command { diff --git a/crates/turborepo-process/src/lib.rs b/crates/turborepo-process/src/lib.rs index e3c6a9678aa90..ae77360784347 100644 --- a/crates/turborepo-process/src/lib.rs +++ b/crates/turborepo-process/src/lib.rs @@ -177,6 +177,33 @@ impl ProcessManager { pub fn set_pty_size(&self, rows: u16, cols: u16) { self.state.lock().expect("not poisoned").size = Some(PtySize { rows, cols }); } + + pub async fn stop_tasks(&self, tasks: &std::collections::HashSet<(turborepo_repository::package_graph::PackageName, String)>) { + let mut set = JoinSet::new(); + + { + let mut lock = self.state.lock().expect("not poisoned"); + lock.children.retain(|child| { + if let Some((package, task)) = child.command().task_name() { + if tasks.contains(&(package.into(), task.to_string())) { + let mut child = child.clone(); + set.spawn(async move { child.stop().await }); + false + } else { + true + } + } else { + true + } + }); + } + + debug!("waiting for {} processes to exit", set.len()); + + while let Some(out) = set.join_next().await { + trace!("process exited: {:?}", out); + } + } } impl ProcessManagerInner {