Skip to content

Commit c091360

Browse files
committed
fix: add fine grained interruptible task restarts in watch mode
1 parent 445df7e commit c091360

File tree

9 files changed

+171
-17
lines changed

9 files changed

+171
-17
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/turborepo-lib/src/run/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -753,4 +753,8 @@ impl RunStopper {
753753
pub async fn stop(&self) {
754754
self.manager.stop().await;
755755
}
756+
757+
pub async fn stop_tasks_matching(&self, predicate: impl Fn(&turborepo_process::Child) -> bool) {
758+
self.manager.stop_children_matching(predicate).await;
759+
}
756760
}

crates/turborepo-lib/src/run/watch.rs

Lines changed: 58 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use crate::{
1818
commands::CommandBase,
1919
config::resolve_turbo_config_path,
2020
daemon::{proto, DaemonConnectorError, DaemonError},
21+
engine::TaskNode,
2122
get_version, opts,
2223
run::{self, builder::RunBuilder, scope::target_selector::InvalidSelectorError, Run},
2324
DaemonConnector, DaemonPaths,
@@ -48,6 +49,7 @@ pub struct WatchClient {
4849
run: Arc<Run>,
4950
watched_packages: HashSet<PackageName>,
5051
persistent_tasks_handle: Option<RunHandle>,
52+
active_runs: Vec<RunHandle>,
5153
connector: DaemonConnector,
5254
base: CommandBase,
5355
telemetry: CommandEventBuilder,
@@ -164,6 +166,7 @@ impl WatchClient {
164166
telemetry,
165167
experimental_write_cache,
166168
persistent_tasks_handle: None,
169+
active_runs: Vec::new(),
167170
ui_sender,
168171
ui_handle,
169172
})
@@ -195,7 +198,6 @@ impl WatchClient {
195198
};
196199

197200
let run_fut = async {
198-
let mut run_handle: Option<RunHandle> = None;
199201
loop {
200202
notify_run.notified().await;
201203
let some_changed_packages = {
@@ -205,16 +207,23 @@ impl WatchClient {
205207
.then(|| std::mem::take(changed_packages_guard.deref_mut()))
206208
};
207209

208-
if let Some(changed_packages) = some_changed_packages {
210+
if let Some(mut changed_packages) = some_changed_packages {
209211
// Clean up currently running tasks
210-
if let Some(RunHandle { stopper, run_task }) = run_handle.take() {
211-
// Shut down the tasks for the run
212-
stopper.stop().await;
213-
// Run should exit shortly after we stop all child tasks, wait for it to
214-
// finish to ensure all messages are flushed.
215-
let _ = run_task.await;
212+
self.active_runs.retain(|h| !h.run_task.is_finished());
213+
214+
match &mut changed_packages {
215+
ChangedPackages::Some(pkgs) => {
216+
self.stop_impacted_tasks(pkgs).await;
217+
}
218+
ChangedPackages::All => {
219+
for handle in self.active_runs.drain(..) {
220+
handle.stopper.stop().await;
221+
let _ = handle.run_task.await;
222+
}
223+
}
216224
}
217-
run_handle = Some(self.execute_run(changed_packages).await?);
225+
let new_run = self.execute_run(changed_packages).await?;
226+
self.active_runs.push(new_run);
218227
}
219228
}
220229
};
@@ -266,11 +275,51 @@ impl WatchClient {
266275
Ok(())
267276
}
268277

278+
async fn stop_impacted_tasks(&self, pkgs: &mut HashSet<PackageName>) {
279+
let engine = self.run.engine();
280+
let mut tasks_to_stop = HashSet::new();
281+
282+
for node in engine.tasks() {
283+
if let TaskNode::Task(task_id) = node {
284+
if pkgs.contains(&PackageName::from(task_id.package())) {
285+
tasks_to_stop.insert(task_id.clone());
286+
287+
for dependent_node in engine.transitive_dependents(task_id) {
288+
if let TaskNode::Task(dependent_id) = dependent_node {
289+
tasks_to_stop.insert(dependent_id.clone());
290+
}
291+
}
292+
}
293+
}
294+
}
295+
296+
let mut impacted_packages = HashSet::new();
297+
for task_id in &tasks_to_stop {
298+
impacted_packages.insert(PackageName::from(task_id.package()));
299+
}
300+
301+
*pkgs = impacted_packages;
302+
303+
for handle in &self.active_runs {
304+
let tasks = tasks_to_stop.clone();
305+
handle
306+
.stopper
307+
.stop_tasks_matching(move |child| {
308+
child.task_id().map_or(false, |id| tasks.contains(id))
309+
})
310+
.await;
311+
}
312+
}
313+
269314
/// Shut down any resources that run as part of watch.
270315
pub async fn shutdown(&mut self) {
271316
if let Some(sender) = &self.ui_sender {
272317
sender.stop().await;
273318
}
319+
for handle in self.active_runs.drain(..) {
320+
handle.stopper.stop().await;
321+
let _ = handle.run_task.await;
322+
}
274323
if let Some(RunHandle { stopper, run_task }) = self.persistent_tasks_handle.take() {
275324
// Shut down the tasks for the run
276325
stopper.stop().await;

crates/turborepo-lib/src/task_graph/visitor/command.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ impl<'a> CommandProvider for PackageGraphCommandProvider<'a> {
134134
// We clear the env before populating it with variables we expect
135135
cmd.env_clear();
136136
cmd.envs(environment.iter());
137+
cmd.task_id(task_id.clone().into_owned());
137138

138139
// If the task has an associated proxy, then we indicate this to the underlying
139140
// task via an env var
@@ -277,6 +278,7 @@ impl<'a, T: PackageInfoProvider> CommandProvider for MicroFrontendProxyProvider<
277278
let program = which::which(package_manager.command())?;
278279
let mut cmd = Command::new(&program);
279280
cmd.current_dir(package_dir).args(args).open_stdin();
281+
cmd.task_id(task_id.clone().into_owned());
280282
Some(cmd)
281283
} else if has_mfe_dependency {
282284
tracing::debug!(
@@ -297,6 +299,7 @@ impl<'a, T: PackageInfoProvider> CommandProvider for MicroFrontendProxyProvider<
297299
let program = package_dir.join_components(&["node_modules", ".bin", bin_name]);
298300
let mut cmd = Command::new(program.as_std_path());
299301
cmd.current_dir(package_dir).args(args).open_stdin();
302+
cmd.task_id(task_id.clone().into_owned());
300303
Some(cmd)
301304
} else {
302305
tracing::debug!("MicroFrontendProxyProvider::command - using Turborepo built-in proxy");

crates/turborepo-lib/src/task_graph/visitor/exec.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,8 @@ enum ExecOutcome {
185185
},
186186
// Task didn't execute normally due to a shutdown being initiated by another task
187187
Shutdown,
188+
// Task was stopped to be restarted
189+
Restarted,
188190
}
189191

190192
enum SuccessOutcome {
@@ -259,6 +261,12 @@ impl ExecContext {
259261
// stopped if we think we're shutting down.
260262
self.manager.stop().await;
261263
}
264+
Ok(ExecOutcome::Restarted) => {
265+
tracker.cancel();
266+
// We need to stop dependent tasks because this task will be restarted
267+
// in a new run.
268+
callback.send(Err(StopExecution::DependentTasks)).ok();
269+
}
262270
Err(e) => {
263271
tracker.cancel();
264272
callback.send(Err(StopExecution::AllTasks)).ok();
@@ -455,7 +463,17 @@ impl ExecContext {
455463
// Something else killed the child
456464
ChildExit::KilledExternal => Err(InternalError::ExternalKill),
457465
// The child was killed by turbo indicating a shutdown
458-
ChildExit::Killed | ChildExit::Interrupted => Ok(ExecOutcome::Shutdown),
466+
ChildExit::Killed | ChildExit::Interrupted => {
467+
// We distinguish between a full shutdown and a restart based on whether the
468+
// process manager is closing. If it is closing, it means we are shutting down
469+
// the entire run. If it is not closing, it means we are restarting specific
470+
// tasks.
471+
if self.manager.is_closing() {
472+
Ok(ExecOutcome::Shutdown)
473+
} else {
474+
Ok(ExecOutcome::Restarted)
475+
}
476+
}
459477
}
460478
}
461479
}

crates/turborepo-process/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ portable-pty = "0.8.1"
1919
tokio = { workspace = true, features = ["full", "time"] }
2020
tracing.workspace = true
2121
turbopath = { workspace = true }
22+
turborepo-task-id = { workspace = true }
2223

2324
[lints]
2425
workspace = true

crates/turborepo-process/src/child.rs

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use tokio::{
3131
sync::{mpsc, watch},
3232
};
3333
use tracing::{debug, trace};
34+
use turborepo_task_id::TaskId;
3435

3536
use super::{Command, PtySize};
3637

@@ -384,20 +385,33 @@ pub struct Child {
384385
}
385386

386387
#[derive(Clone, Debug)]
387-
pub struct ChildCommandChannel(mpsc::Sender<ChildCommand>);
388+
pub struct ChildCommandChannel {
389+
sender: mpsc::Sender<ChildCommand>,
390+
task_id: Option<TaskId<'static>>,
391+
}
388392

389393
impl ChildCommandChannel {
390-
pub fn new() -> (Self, mpsc::Receiver<ChildCommand>) {
394+
pub fn new(task_id: Option<TaskId<'static>>) -> (Self, mpsc::Receiver<ChildCommand>) {
391395
let (tx, rx) = mpsc::channel(1);
392-
(ChildCommandChannel(tx), rx)
396+
(
397+
ChildCommandChannel {
398+
sender: tx,
399+
task_id,
400+
},
401+
rx,
402+
)
393403
}
394404

395405
pub async fn kill(&self) -> Result<(), mpsc::error::SendError<ChildCommand>> {
396-
self.0.send(ChildCommand::Kill).await
406+
self.sender.send(ChildCommand::Kill).await
397407
}
398408

399409
pub async fn stop(&self) -> Result<(), mpsc::error::SendError<ChildCommand>> {
400-
self.0.send(ChildCommand::Stop).await
410+
self.sender.send(ChildCommand::Stop).await
411+
}
412+
413+
pub fn get_task_id(&self) -> Option<&TaskId<'static>> {
414+
self.task_id.as_ref()
401415
}
402416
}
403417

@@ -416,6 +430,7 @@ impl Child {
416430
pty_size: Option<PtySize>,
417431
) -> io::Result<Self> {
418432
let label = command.label();
433+
let task_id = command.get_task_id().cloned();
419434
let SpawnResult {
420435
handle: mut child,
421436
io: ChildIO { stdin, output },
@@ -428,7 +443,7 @@ impl Child {
428443

429444
let pid = child.pid();
430445

431-
let (command_tx, mut command_rx) = ChildCommandChannel::new();
446+
let (command_tx, mut command_rx) = ChildCommandChannel::new(task_id);
432447

433448
// we use a watch channel to communicate the exit code back to the
434449
// caller. we are interested in three cases:
@@ -680,6 +695,11 @@ impl Child {
680695
pub fn label(&self) -> &str {
681696
&self.label
682697
}
698+
699+
pub fn task_id(&self) -> Option<&TaskId<'static>> {
700+
self.command_channel
701+
.get_task_id()
702+
}
683703
}
684704

685705
// Adds a trailing newline if necessary to the buffer
@@ -750,7 +770,7 @@ impl ChildStateManager {
750770
impl Child {
751771
// Helper method for checking if child is running
752772
fn is_running(&self) -> bool {
753-
!self.command_channel.0.is_closed()
773+
!self.command_channel.sender.is_closed()
754774
}
755775
}
756776

crates/turborepo-process/src/command.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::{
66

77
use itertools::Itertools;
88
use turbopath::AbsoluteSystemPathBuf;
9+
use turborepo_task_id::TaskId;
910

1011
/// A command builder that can be used to build both regular
1112
/// child processes and ones spawned hooked up to a PTY
@@ -17,6 +18,7 @@ pub struct Command {
1718
env: BTreeMap<OsString, OsString>,
1819
open_stdin: bool,
1920
env_clear: bool,
21+
task_id: Option<TaskId<'static>>,
2022
}
2123

2224
impl Command {
@@ -29,6 +31,7 @@ impl Command {
2931
env: BTreeMap::new(),
3032
open_stdin: false,
3133
env_clear: false,
34+
task_id: None,
3235
}
3336
}
3437

@@ -106,6 +109,15 @@ impl Command {
106109
pub fn program(&self) -> &OsStr {
107110
&self.program
108111
}
112+
113+
pub fn task_id(&mut self, task_id: TaskId<'static>) -> &mut Self {
114+
self.task_id = Some(task_id);
115+
self
116+
}
117+
118+
pub fn get_task_id(&self) -> Option<&TaskId<'static>> {
119+
self.task_id.as_ref()
120+
}
109121
}
110122

111123
impl From<Command> for tokio::process::Command {
@@ -117,6 +129,7 @@ impl From<Command> for tokio::process::Command {
117129
env,
118130
open_stdin,
119131
env_clear,
132+
task_id: _,
120133
} = value;
121134

122135
let mut cmd = tokio::process::Command::new(program);
@@ -149,6 +162,7 @@ impl From<Command> for portable_pty::CommandBuilder {
149162
cwd,
150163
env,
151164
env_clear,
165+
task_id: _,
152166
..
153167
} = value;
154168
let mut cmd = portable_pty::CommandBuilder::new(program);

0 commit comments

Comments
 (0)