@@ -18,6 +18,7 @@ use crate::{
1818 commands:: CommandBase ,
1919 config:: resolve_turbo_config_path,
2020 daemon:: { proto, DaemonConnectorError , DaemonError } ,
21+ engine:: TaskNode ,
2122 get_version, opts,
2223 run:: { self , builder:: RunBuilder , scope:: target_selector:: InvalidSelectorError , Run } ,
2324 DaemonConnector , DaemonPaths ,
@@ -48,6 +49,7 @@ pub struct WatchClient {
4849 run : Arc < Run > ,
4950 watched_packages : HashSet < PackageName > ,
5051 persistent_tasks_handle : Option < RunHandle > ,
52+ active_runs : Vec < RunHandle > ,
5153 connector : DaemonConnector ,
5254 base : CommandBase ,
5355 telemetry : CommandEventBuilder ,
@@ -164,6 +166,7 @@ impl WatchClient {
164166 telemetry,
165167 experimental_write_cache,
166168 persistent_tasks_handle : None ,
169+ active_runs : Vec :: new ( ) ,
167170 ui_sender,
168171 ui_handle,
169172 } )
@@ -195,7 +198,6 @@ impl WatchClient {
195198 } ;
196199
197200 let run_fut = async {
198- let mut run_handle: Option < RunHandle > = None ;
199201 loop {
200202 notify_run. notified ( ) . await ;
201203 let some_changed_packages = {
@@ -205,16 +207,59 @@ impl WatchClient {
205207 . then ( || std:: mem:: take ( changed_packages_guard. deref_mut ( ) ) )
206208 } ;
207209
208- if let Some ( changed_packages) = some_changed_packages {
210+ if let Some ( mut changed_packages) = some_changed_packages {
209211 // Clean up currently running tasks
210- if let Some ( RunHandle { stopper, run_task } ) = run_handle. take ( ) {
211- // Shut down the tasks for the run
212- stopper. stop ( ) . await ;
213- // Run should exit shortly after we stop all child tasks, wait for it to
214- // finish to ensure all messages are flushed.
215- let _ = run_task. await ;
212+ self . active_runs . retain ( |h| !h. run_task . is_finished ( ) ) ;
213+
214+ match & mut changed_packages {
215+ ChangedPackages :: Some ( pkgs) => {
216+ let engine = self . run . engine ( ) ;
217+ let mut tasks_to_stop = HashSet :: new ( ) ;
218+
219+ for node in engine. tasks ( ) {
220+ if let TaskNode :: Task ( task_id) = node {
221+ if pkgs. contains ( & PackageName :: from ( task_id. package ( ) ) ) {
222+ tasks_to_stop. insert ( task_id. clone ( ) ) ;
223+
224+ for dependent_node in engine. transitive_dependents ( task_id)
225+ {
226+ if let TaskNode :: Task ( dependent_id) = dependent_node {
227+ tasks_to_stop. insert ( dependent_id. clone ( ) ) ;
228+ }
229+ }
230+ }
231+ }
232+ }
233+
234+ let mut impacted_packages = HashSet :: new ( ) ;
235+ for task_id in & tasks_to_stop {
236+ impacted_packages. insert ( PackageName :: from ( task_id. package ( ) ) ) ;
237+ }
238+
239+ * pkgs = impacted_packages;
240+
241+ for handle in & self . active_runs {
242+ let tasks = tasks_to_stop. clone ( ) ;
243+ handle
244+ . stopper
245+ . stop_tasks_matching ( move |child| {
246+ if let Some ( task_id) = child. task_id ( ) {
247+ return tasks. contains ( task_id) ;
248+ }
249+ false
250+ } )
251+ . await ;
252+ }
253+ }
254+ ChangedPackages :: All => {
255+ for handle in self . active_runs . drain ( ..) {
256+ handle. stopper . stop ( ) . await ;
257+ let _ = handle. run_task . await ;
258+ }
259+ }
216260 }
217- run_handle = Some ( self . execute_run ( changed_packages) . await ?) ;
261+ let new_run = self . execute_run ( changed_packages) . await ?;
262+ self . active_runs . push ( new_run) ;
218263 }
219264 }
220265 } ;
@@ -271,6 +316,10 @@ impl WatchClient {
271316 if let Some ( sender) = & self . ui_sender {
272317 sender. stop ( ) . await ;
273318 }
319+ for handle in self . active_runs . drain ( ..) {
320+ handle. stopper . stop ( ) . await ;
321+ let _ = handle. run_task . await ;
322+ }
274323 if let Some ( RunHandle { stopper, run_task } ) = self . persistent_tasks_handle . take ( ) {
275324 // Shut down the tasks for the run
276325 stopper. stop ( ) . await ;
0 commit comments