Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 38 additions & 0 deletions crates/turborepo-lib/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,44 @@ impl Engine<Built> {
/// Creates an instance of `Engine` that only contains tasks that depend on
/// tasks from a given package. This is useful for watch mode, where we
/// need to re-run only a portion of the task graph.
pub fn filter_tasks(&self, tasks_to_run: &HashSet<(PackageName, String)>) -> Engine<Built> {
let new_graph = self.task_graph.filter_map(
|node_idx, node| {
if let TaskNode::Task(task) = &self.task_graph[node_idx] {
if tasks_to_run.contains(&(task.package().into(), task.task().to_string())) {
return Some(node.clone());
}
}
None
},
|_, _| Some(()),
);
Comment on lines +162 to +173
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The filter_tasks() method excludes the root node from the filtered graph, which could break task graph integrity and cause execution failures.

View Details
📝 Patch Details
diff --git a/crates/turborepo-lib/src/engine/mod.rs b/crates/turborepo-lib/src/engine/mod.rs
index 76221fd62..d34ca45fe 100644
--- a/crates/turborepo-lib/src/engine/mod.rs
+++ b/crates/turborepo-lib/src/engine/mod.rs
@@ -162,16 +162,25 @@ impl Engine<Built> {
     pub fn filter_tasks(&self, tasks_to_run: &HashSet<(PackageName, String)>) -> Engine<Built> {
         let new_graph = self.task_graph.filter_map(
             |node_idx, node| {
-                if let TaskNode::Task(task) = &self.task_graph[node_idx] {
-                    if tasks_to_run.contains(&(task.package().into(), task.task().to_string())) {
-                        return Some(node.clone());
+                match &self.task_graph[node_idx] {
+                    TaskNode::Task(task) => {
+                        if tasks_to_run.contains(&(task.package().into(), task.task().to_string())) {
+                            Some(node.clone())
+                        } else {
+                            None
+                        }
                     }
+                    TaskNode::Root => Some(node.clone()),
                 }
-                None
             },
             |_, _| Some(()),
         );
 
+        let root_index = new_graph
+            .node_indices()
+            .find(|index| new_graph[*index] == TaskNode::Root)
+            .expect("root node should be present");
+
         let task_lookup: HashMap<_, _> = new_graph
             .node_indices()
             .filter_map(|index| {
@@ -187,7 +196,7 @@ impl Engine<Built> {
 
         Engine {
             marker: std::marker::PhantomData,
-            root_index: self.root_index,
+            root_index,
             task_graph: new_graph,
             task_lookup,
             task_definitions: self.task_definitions.clone(),

Analysis

Invalid root_index in Engine.filter_tasks() causes panic when accessing filtered task graph

What fails: Engine::filter_tasks() in crates/turborepo-lib/src/engine/mod.rs excludes root node from filtered graph but reuses original root_index, causing panic when accessing engine.task_graph[engine.root_index]

How to reproduce:

let engine = engine.filter_tasks(&tasks_to_run);
// Any access to engine.task_graph[engine.root_index] will panic with "index out of bounds"

Result: Panic with "index out of bounds" when trying to access root node via root_index in filtered engine

Expected: Root node should be preserved in filtered graph and root_index should point to valid node, consistent with other filtering methods like create_engine_for_interruptible_tasks() which correctly handle root node preservation


let task_lookup: HashMap<_, _> = new_graph
.node_indices()
.filter_map(|index| {
let task = new_graph
.node_weight(index)
.expect("node index should be present");
match task {
TaskNode::Root => None,
TaskNode::Task(task) => Some((task.clone(), index)),
}
})
.collect();

Engine {
marker: std::marker::PhantomData,
root_index: self.root_index,
task_graph: new_graph,
task_lookup,
task_definitions: self.task_definitions.clone(),
task_locations: self.task_locations.clone(),
package_tasks: self.package_tasks.clone(),
has_non_interruptible_tasks: self.has_non_interruptible_tasks,
}
}

pub fn create_engine_for_subgraph(
&self,
changed_packages: &HashSet<PackageName>,
Expand Down
11 changes: 11 additions & 0 deletions crates/turborepo-lib/src/run/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ pub struct RunBuilder {
// We will then prune away any tasks that do not depend on tasks inside
// this package.
entrypoint_packages: Option<HashSet<PackageName>>,
tasks_to_run: Option<HashSet<(PackageName, String)>>,
should_print_prelude_override: Option<bool>,
// In query, we don't want to validate the engine. Defaults to `true`
should_validate_engine: bool,
Expand Down Expand Up @@ -106,12 +107,18 @@ impl RunBuilder {
api_auth,
analytics_sender: None,
entrypoint_packages: None,
tasks_to_run: None,
should_print_prelude_override: None,
should_validate_engine: true,
add_all_tasks: false,
})
}

pub fn with_tasks_to_run(mut self, tasks_to_run: HashSet<(PackageName, String)>) -> Self {
self.tasks_to_run = Some(tasks_to_run);
self
}

pub fn with_entrypoint_packages(mut self, entrypoint_packages: HashSet<PackageName>) -> Self {
self.entrypoint_packages = Some(entrypoint_packages);
self
Expand Down Expand Up @@ -560,6 +567,10 @@ impl RunBuilder {
engine = engine.create_engine_for_subgraph(entrypoint_packages);
}

if let Some(tasks_to_run) = &self.tasks_to_run {
engine = engine.filter_tasks(tasks_to_run);
}

if !self.opts.run_opts.parallel && self.should_validate_engine {
engine
.validate(
Expand Down
4 changes: 4 additions & 0 deletions crates/turborepo-lib/src/run/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,4 +531,8 @@ impl RunStopper {
pub async fn stop(&self) {
self.manager.stop().await;
}

pub async fn stop_tasks(&self, tasks: &HashSet<(PackageName, String)>) {
self.manager.stop_tasks(tasks).await;
}
}
104 changes: 87 additions & 17 deletions crates/turborepo-lib/src/run/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ use crate::{
DaemonConnector, DaemonPaths,
};

#[derive(Debug, PartialEq, Eq, Hash, Clone)]
struct Task {
package: PackageName,
task: String,
}

#[derive(Debug)]
enum ChangedPackages {
All,
Expand All @@ -47,6 +53,7 @@ impl ChangedPackages {
pub struct WatchClient {
run: Arc<Run>,
watched_packages: HashSet<PackageName>,
tasks: Arc<Mutex<HashSet<Task>>>,
persistent_tasks_handle: Option<RunHandle>,
connector: DaemonConnector,
base: CommandBase,
Expand Down Expand Up @@ -155,10 +162,13 @@ impl WatchClient {
custom_turbo_json_path,
};

let tasks = Arc::new(Mutex::new(HashSet::new()));

Ok(Self {
base,
run,
watched_packages,
tasks,
connector,
handler,
telemetry,
Expand All @@ -170,6 +180,17 @@ impl WatchClient {
}

pub async fn start(&mut self) -> Result<(), Error> {
let initial_tasks = self
.run
.engine()
.task_ids()
.map(|task_id| Task {
package: task_id.package().into(),
task: task_id.task().to_string(),
})
.collect();
*self.tasks.lock().expect("poisoned lock") = initial_tasks;

let connector = self.connector.clone();
let mut client = connector.connect().await?;

Expand Down Expand Up @@ -201,20 +222,24 @@ impl WatchClient {
let some_changed_packages = {
let mut changed_packages_guard =
changed_packages.lock().expect("poisoned lock");
(!changed_packages_guard.is_empty())
.then(|| std::mem::take(changed_packages_guard.deref_mut()))
if changed_packages_guard.is_empty() {
None
} else {
Some(std::mem::take(changed_packages_guard.deref_mut()))
}
};

if let Some(changed_packages) = some_changed_packages {
let tasks_to_run = self.derive_tasks_to_run(&changed_packages);
// Clean up currently running tasks
if let Some(RunHandle { stopper, run_task }) = run_handle.take() {
// Shut down the tasks for the run
stopper.stop().await;
// Run should exit shortly after we stop all child tasks, wait for it to
// finish to ensure all messages are flushed.
let _ = run_task.await;
if let Some(run_handle) = run_handle.as_mut() {
let tasks_to_stop = tasks_to_run
.iter()
.map(|task| (task.package.clone(), task.task.clone()))
.collect();
run_handle.stopper.stop_tasks(&tasks_to_stop).await;
}
run_handle = Some(self.execute_run(changed_packages).await?);
run_handle = Some(self.execute_run(tasks_to_run, changed_packages).await?);
}
}
};
Expand All @@ -234,6 +259,44 @@ impl WatchClient {
}
}

fn derive_tasks_to_run(&self, changed_packages: &ChangedPackages) -> HashSet<Task> {
let tasks = self.tasks.lock().expect("poisoned lock");
match changed_packages {
ChangedPackages::All => tasks.iter().cloned().collect(),
ChangedPackages::Some(packages) => {
let mut tasks_to_run = HashSet::new();
let mut packages_to_visit = packages.clone();
let mut visited_packages = HashSet::new();

while let Some(package) = packages_to_visit.iter().next().cloned() {
packages_to_visit.remove(&package);
if visited_packages.contains(&package) {
continue;
}
visited_packages.insert(package.clone());

for task in tasks.iter() {
if task.package == package {
tasks_to_run.insert(task.clone());
}
}

for dependent in self
.run
.pkg_dep_graph()
.ancestors(&turborepo_repository::package_graph::PackageNode::Workspace(
package.clone(),
))
{
packages_to_visit.insert(dependent.as_package_name().clone());
}
}

tasks_to_run
}
}
}

#[instrument(skip(changed_packages))]
fn handle_change_event(
changed_packages: &Mutex<ChangedPackages>,
Expand Down Expand Up @@ -287,17 +350,18 @@ impl WatchClient {
/// be interrupted
///
/// Returns a handle to the task running (2)
async fn execute_run(&mut self, changed_packages: ChangedPackages) -> Result<RunHandle, Error> {
async fn execute_run(
&mut self,
tasks_to_run: HashSet<Task>,
changed_packages: ChangedPackages,
) -> Result<RunHandle, Error> {
// Should we recover here?
trace!("handling run with changed packages: {changed_packages:?}");
match changed_packages {
ChangedPackages::Some(packages) => {
let packages = packages
.into_iter()
.filter(|pkg| {
// If not in the watched packages set, ignore
self.watched_packages.contains(pkg)
})
ChangedPackages::Some(_packages) => {
let packages = tasks_to_run
.iter()
.map(|task| task.package.clone())
.collect();

let mut opts = self.base.opts().clone();
Expand All @@ -318,6 +382,12 @@ impl WatchClient {

let run = RunBuilder::new(new_base)?
.with_entrypoint_packages(packages)
.with_tasks_to_run(
tasks_to_run
.into_iter()
.map(|task| (task.package, task.task))
.collect(),
)
.hide_prelude()
.build(&signal_handler, telemetry)
.await?;
Expand Down
1 change: 1 addition & 0 deletions crates/turborepo-process/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ portable-pty = "0.8.1"
tokio = { workspace = true, features = ["full", "time"] }
tracing.workspace = true
turbopath = { workspace = true }
turborepo-repository = { workspace = true }

[lints]
workspace = true
Expand Down
7 changes: 7 additions & 0 deletions crates/turborepo-process/src/child.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ pub struct Child {
stdin: Arc<Mutex<Option<ChildInput>>>,
output: Arc<Mutex<Option<ChildOutput>>>,
label: String,
command: Command,
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -416,6 +417,7 @@ impl Child {
pty_size: Option<PtySize>,
) -> io::Result<Self> {
let label = command.label();
let command_clone = command.clone();
let SpawnResult {
handle: mut child,
io: ChildIO { stdin, output },
Expand Down Expand Up @@ -468,6 +470,7 @@ impl Child {
stdin: Arc::new(Mutex::new(stdin)),
output: Arc::new(Mutex::new(output)),
label,
command: command_clone,
})
}

Expand Down Expand Up @@ -688,6 +691,10 @@ impl Child {
pub fn label(&self) -> &str {
&self.label
}

pub fn command(&self) -> &Command {
&self.command
}
}

// Adds a trailing newline if necessary to the buffer
Expand Down
14 changes: 14 additions & 0 deletions crates/turborepo-process/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,20 @@ impl Command {
pub fn program(&self) -> &OsStr {
&self.program
}

pub fn task_name(&self) -> Option<(String, String)> {
let package = self
.env
.get(&OsString::from("TURBO_PACKAGE_NAME"))?
.to_str()?
.to_string();
let task = self
.env
.get(&OsString::from("TURBO_TASK_NAME"))?
.to_str()?
.to_string();
Some((package, task))
}
Comment on lines +110 to +122
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The task_name() method tries to read TURBO_PACKAGE_NAME and TURBO_TASK_NAME environment variables that are never set anywhere in the codebase, causing it to always return None and breaking the selective task stopping functionality.

View Details
📝 Patch Details
diff --git a/crates/turborepo-lib/src/task_graph/visitor/command.rs b/crates/turborepo-lib/src/task_graph/visitor/command.rs
index 0b3cb4d74..c38cbb956 100644
--- a/crates/turborepo-lib/src/task_graph/visitor/command.rs
+++ b/crates/turborepo-lib/src/task_graph/visitor/command.rs
@@ -135,6 +135,10 @@ impl<'a> CommandProvider for PackageGraphCommandProvider<'a> {
         cmd.env_clear();
         cmd.envs(environment.iter());
 
+        // Set package and task name for selective task stopping
+        cmd.env("TURBO_PACKAGE_NAME", task_id.package());
+        cmd.env("TURBO_TASK_NAME", task_id.task());
+
         // If the task has an associated proxy, then we indicate this to the underlying
         // task via an env var
         if self

Analysis

Missing environment variables breaks selective task stopping functionality

What fails: ProcessManager::stop_tasks() cannot identify tasks to stop because Command::task_name() always returns None due to missing TURBO_PACKAGE_NAME and TURBO_TASK_NAME environment variables

How to reproduce:

  1. Start turbo in watch mode with multiple tasks running
  2. Trigger a file change that should only stop specific tasks
  3. Observe that no tasks are selectively stopped due to task_name() returning None

Result: All tasks continue running instead of selective stopping, breaking the granular watch mode functionality introduced in commit 688456031

Expected: Tasks should be selectively stopped based on package and task names when files change during watch mode

Root cause: PackageGraphCommandProvider::command() in crates/turborepo-lib/src/task_graph/visitor/command.rs never sets the TURBO_PACKAGE_NAME and TURBO_TASK_NAME environment variables that Command::task_name() tries to read

}

impl From<Command> for tokio::process::Command {
Expand Down
27 changes: 27 additions & 0 deletions crates/turborepo-process/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,33 @@ impl ProcessManager {
pub fn set_pty_size(&self, rows: u16, cols: u16) {
self.state.lock().expect("not poisoned").size = Some(PtySize { rows, cols });
}

pub async fn stop_tasks(&self, tasks: &std::collections::HashSet<(turborepo_repository::package_graph::PackageName, String)>) {
let mut set = JoinSet::new();

{
let mut lock = self.state.lock().expect("not poisoned");
lock.children.retain(|child| {
if let Some((package, task)) = child.command().task_name() {
if tasks.contains(&(package.into(), task.to_string())) {
let mut child = child.clone();
set.spawn(async move { child.stop().await });
false
} else {
true
}
} else {
true
}
});
}

debug!("waiting for {} processes to exit", set.len());

while let Some(out) = set.join_next().await {
trace!("process exited: {:?}", out);
}
}
}

impl ProcessManagerInner {
Expand Down
Loading