Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions crates/turborepo-lib/src/run/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -753,4 +753,8 @@ impl RunStopper {
pub async fn stop(&self) {
self.manager.stop().await;
}

pub async fn stop_tasks_matching(&self, predicate: impl Fn(&turborepo_process::Child) -> bool) {
self.manager.stop_children_matching(predicate).await;
}
}
67 changes: 58 additions & 9 deletions crates/turborepo-lib/src/run/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::{
commands::CommandBase,
config::resolve_turbo_config_path,
daemon::{proto, DaemonConnectorError, DaemonError},
engine::TaskNode,
get_version, opts,
run::{self, builder::RunBuilder, scope::target_selector::InvalidSelectorError, Run},
DaemonConnector, DaemonPaths,
Expand Down Expand Up @@ -48,6 +49,7 @@ pub struct WatchClient {
run: Arc<Run>,
watched_packages: HashSet<PackageName>,
persistent_tasks_handle: Option<RunHandle>,
active_runs: Vec<RunHandle>,
connector: DaemonConnector,
base: CommandBase,
telemetry: CommandEventBuilder,
Expand Down Expand Up @@ -164,6 +166,7 @@ impl WatchClient {
telemetry,
experimental_write_cache,
persistent_tasks_handle: None,
active_runs: Vec::new(),
ui_sender,
ui_handle,
})
Expand Down Expand Up @@ -195,7 +198,6 @@ impl WatchClient {
};

let run_fut = async {
let mut run_handle: Option<RunHandle> = None;
loop {
notify_run.notified().await;
let some_changed_packages = {
Expand All @@ -205,16 +207,23 @@ impl WatchClient {
.then(|| std::mem::take(changed_packages_guard.deref_mut()))
};

if let Some(changed_packages) = some_changed_packages {
if let Some(mut changed_packages) = some_changed_packages {
// Clean up currently running tasks
if let Some(RunHandle { stopper, run_task }) = run_handle.take() {
// Shut down the tasks for the run
stopper.stop().await;
// Run should exit shortly after we stop all child tasks, wait for it to
// finish to ensure all messages are flushed.
let _ = run_task.await;
self.active_runs.retain(|h| !h.run_task.is_finished());

match &mut changed_packages {
ChangedPackages::Some(pkgs) => {
self.stop_impacted_tasks(pkgs).await;
}
ChangedPackages::All => {
for handle in self.active_runs.drain(..) {
handle.stopper.stop().await;
let _ = handle.run_task.await;
}
}
}
run_handle = Some(self.execute_run(changed_packages).await?);
let new_run = self.execute_run(changed_packages).await?;
self.active_runs.push(new_run);
}
}
};
Expand Down Expand Up @@ -266,11 +275,51 @@ impl WatchClient {
Ok(())
}

async fn stop_impacted_tasks(&self, pkgs: &mut HashSet<PackageName>) {
let engine = self.run.engine();
let mut tasks_to_stop = HashSet::new();

for node in engine.tasks() {
if let TaskNode::Task(task_id) = node {
if pkgs.contains(&PackageName::from(task_id.package())) {
tasks_to_stop.insert(task_id.clone());

for dependent_node in engine.transitive_dependents(task_id) {
if let TaskNode::Task(dependent_id) = dependent_node {
tasks_to_stop.insert(dependent_id.clone());
}
}
}
}
}

let mut impacted_packages = HashSet::new();
for task_id in &tasks_to_stop {
impacted_packages.insert(PackageName::from(task_id.package()));
}

*pkgs = impacted_packages;

for handle in &self.active_runs {
let tasks = tasks_to_stop.clone();
handle
.stopper
.stop_tasks_matching(move |child| {
child.task_id().map_or(false, |id| tasks.contains(id))
})
.await;
}
}

/// Shut down any resources that run as part of watch.
pub async fn shutdown(&mut self) {
if let Some(sender) = &self.ui_sender {
sender.stop().await;
}
for handle in self.active_runs.drain(..) {
handle.stopper.stop().await;
let _ = handle.run_task.await;
}
if let Some(RunHandle { stopper, run_task }) = self.persistent_tasks_handle.take() {
// Shut down the tasks for the run
stopper.stop().await;
Expand Down
3 changes: 3 additions & 0 deletions crates/turborepo-lib/src/task_graph/visitor/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ impl<'a> CommandProvider for PackageGraphCommandProvider<'a> {
// We clear the env before populating it with variables we expect
cmd.env_clear();
cmd.envs(environment.iter());
cmd.task_id(task_id.clone().into_owned());

// If the task has an associated proxy, then we indicate this to the underlying
// task via an env var
Expand Down Expand Up @@ -277,6 +278,7 @@ impl<'a, T: PackageInfoProvider> CommandProvider for MicroFrontendProxyProvider<
let program = which::which(package_manager.command())?;
let mut cmd = Command::new(&program);
cmd.current_dir(package_dir).args(args).open_stdin();
cmd.task_id(task_id.clone().into_owned());
Some(cmd)
} else if has_mfe_dependency {
tracing::debug!(
Expand All @@ -297,6 +299,7 @@ impl<'a, T: PackageInfoProvider> CommandProvider for MicroFrontendProxyProvider<
let program = package_dir.join_components(&["node_modules", ".bin", bin_name]);
let mut cmd = Command::new(program.as_std_path());
cmd.current_dir(package_dir).args(args).open_stdin();
cmd.task_id(task_id.clone().into_owned());
Some(cmd)
} else {
tracing::debug!("MicroFrontendProxyProvider::command - using Turborepo built-in proxy");
Expand Down
20 changes: 19 additions & 1 deletion crates/turborepo-lib/src/task_graph/visitor/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ enum ExecOutcome {
},
// Task didn't execute normally due to a shutdown being initiated by another task
Shutdown,
// Task was stopped to be restarted
Restarted,
}

enum SuccessOutcome {
Expand Down Expand Up @@ -259,6 +261,12 @@ impl ExecContext {
// stopped if we think we're shutting down.
self.manager.stop().await;
}
Ok(ExecOutcome::Restarted) => {
tracker.cancel();
// We need to stop dependent tasks because this task will be restarted
// in a new run.
callback.send(Err(StopExecution::DependentTasks)).ok();
}
Err(e) => {
tracker.cancel();
callback.send(Err(StopExecution::AllTasks)).ok();
Expand Down Expand Up @@ -455,7 +463,17 @@ impl ExecContext {
// Something else killed the child
ChildExit::KilledExternal => Err(InternalError::ExternalKill),
// The child was killed by turbo indicating a shutdown
ChildExit::Killed | ChildExit::Interrupted => Ok(ExecOutcome::Shutdown),
ChildExit::Killed | ChildExit::Interrupted => {
// We distinguish between a full shutdown and a restart based on whether the
// process manager is closing. If it is closing, it means we are shutting down
// the entire run. If it is not closing, it means we are restarting specific
// tasks.
if self.manager.is_closing() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is potentially dangerous based on race conditions and dependent task behavior. What tests can we write to validate that we're not creating bugs here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Example Race:

Thread 1 (exec.rs):        Thread 2 (manager):
child exits
is_closing() → false
                           close() called
                           is_closing = true
return Restarted ❌

That should've been Shutdown.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we introduce some atomics, this concern would get addressed.

Ok(ExecOutcome::Shutdown)
} else {
Ok(ExecOutcome::Restarted)
}
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/turborepo-process/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ portable-pty = "0.8.1"
tokio = { workspace = true, features = ["full", "time"] }
tracing.workspace = true
turbopath = { workspace = true }
turborepo-task-id = { workspace = true }

[lints]
workspace = true
Expand Down
34 changes: 27 additions & 7 deletions crates/turborepo-process/src/child.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use tokio::{
sync::{mpsc, watch},
};
use tracing::{debug, trace};
use turborepo_task_id::TaskId;

use super::{Command, PtySize};

Expand Down Expand Up @@ -384,20 +385,33 @@ pub struct Child {
}

#[derive(Clone, Debug)]
pub struct ChildCommandChannel(mpsc::Sender<ChildCommand>);
pub struct ChildCommandChannel {
sender: mpsc::Sender<ChildCommand>,
task_id: Option<TaskId<'static>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a little rough, because now the child process has to be aware of task_ids . These channels shouldn't be aware of "business logic" ideally, to preserve encapsulation.

I'm also worried about the lifetime implications. The TaskId is going to live the entire program, but line 433 has a .clone() that can end up referencing tasks that are non-static. That could result in panics.

Another small, small worry I have, but a worry nonethless, is the memory overhead associated with this strategy. TaskIds aren't dropped until the Child is dropped, so its possible to see some memory in high concurrency scenarios. (I haven't taken the time to validate this, but based on my other comments, I'd love to see an architectural change where this is no longer a concern).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm fair enough. Is there a better identifier or way of getting the "set of processes mapped to tasks" that would allow us to not have the process code care about tasks at all?

That was the primary thing I wasn't confident in going into this change without being too familiar with the codebase.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After sleeping on this, I'm thinking the best way (still not great) we can go about this is if ProcessManager gets indices into Vec<Child>. Some pseudocoding:

#[derive(Debug)]
struct ProcessManagerInner {
    is_closing: bool,
    
    // None = inactive/dead process slot
    children: Vec<Option<Child>>,
    
    // Maps task_id -> indices in the children vec
    // We keep dead indices here and filter them on access
    task_index: HashMap<TaskId<'static>, Vec<usize>>,
    
    size: Option<PtySize>,
}

impl ProcessManagerInner {
    fn is_index_active(&self, idx: usize) -> bool {
        self.children.get(idx)
            .and_then(|opt| opt.as_ref())
            .is_some()
    }
    
    fn get_active_children_for_task(&self, task_id: &TaskId) -> Vec<Child> {
        self.task_index
            .get(task_id)
            .map(|indices| {
                indices.iter()
                    .filter_map(|&idx| {
                        self.children.get(idx)
                            .and_then(|opt| opt.as_ref())
                            .cloned()
                    })
                    .collect()
            })
            .unwrap_or_default()
    }
}

Now when a process exits, we don't need to update task_index immediately, it will get filtered.

// Mark index as dead - just set to None
lock.children[3] = None;

Stopping a task now looks like:

pub async fn stop_tasks(&self, task_ids: &[TaskId<'static>]) {
    let children_to_stop = {
        let mut lock = self.state.lock().unwrap();
        
        let mut children = Vec::new();
        for task_id in task_ids {
            if let Some(indices) = lock.task_index.get(task_id) {
                for &idx in indices {
                    // Check if still active
                    if let Some(Some(child)) = lock.children.get(idx) {
                        children.push(child.clone());
                        // Mark as inactive
                        lock.children[idx] = None;
                    }
                }
            }
            // Remove task from index
            lock.task_index.remove(task_id);
        }
        children
    };
    
    // Stop outside the lock
    for mut child in children_to_stop {
        child.stop().await;
    }
}

What do you think? Is that enough to have another go with or do you need more detail?

}

impl ChildCommandChannel {
pub fn new() -> (Self, mpsc::Receiver<ChildCommand>) {
pub fn new(task_id: Option<TaskId<'static>>) -> (Self, mpsc::Receiver<ChildCommand>) {
let (tx, rx) = mpsc::channel(1);
(ChildCommandChannel(tx), rx)
(
ChildCommandChannel {
sender: tx,
task_id,
},
rx,
)
}

pub async fn kill(&self) -> Result<(), mpsc::error::SendError<ChildCommand>> {
self.0.send(ChildCommand::Kill).await
self.sender.send(ChildCommand::Kill).await
}

pub async fn stop(&self) -> Result<(), mpsc::error::SendError<ChildCommand>> {
self.0.send(ChildCommand::Stop).await
self.sender.send(ChildCommand::Stop).await
}

pub fn get_task_id(&self) -> Option<&TaskId<'static>> {
self.task_id.as_ref()
}
}

Expand All @@ -416,6 +430,7 @@ impl Child {
pty_size: Option<PtySize>,
) -> io::Result<Self> {
let label = command.label();
let task_id = command.get_task_id().cloned();
let SpawnResult {
handle: mut child,
io: ChildIO { stdin, output },
Expand All @@ -428,7 +443,7 @@ impl Child {

let pid = child.pid();

let (command_tx, mut command_rx) = ChildCommandChannel::new();
let (command_tx, mut command_rx) = ChildCommandChannel::new(task_id);

// we use a watch channel to communicate the exit code back to the
// caller. we are interested in three cases:
Expand Down Expand Up @@ -680,6 +695,11 @@ impl Child {
pub fn label(&self) -> &str {
&self.label
}

pub fn task_id(&self) -> Option<&TaskId<'static>> {
self.command_channel
.get_task_id()
}
}

// Adds a trailing newline if necessary to the buffer
Expand Down Expand Up @@ -750,7 +770,7 @@ impl ChildStateManager {
impl Child {
// Helper method for checking if child is running
fn is_running(&self) -> bool {
!self.command_channel.0.is_closed()
!self.command_channel.sender.is_closed()
}
}

Expand Down
14 changes: 14 additions & 0 deletions crates/turborepo-process/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{

use itertools::Itertools;
use turbopath::AbsoluteSystemPathBuf;
use turborepo_task_id::TaskId;

/// A command builder that can be used to build both regular
/// child processes and ones spawned hooked up to a PTY
Expand All @@ -17,6 +18,7 @@ pub struct Command {
env: BTreeMap<OsString, OsString>,
open_stdin: bool,
env_clear: bool,
task_id: Option<TaskId<'static>>,
}

impl Command {
Expand All @@ -29,6 +31,7 @@ impl Command {
env: BTreeMap::new(),
open_stdin: false,
env_clear: false,
task_id: None,
}
}

Expand Down Expand Up @@ -106,6 +109,15 @@ impl Command {
pub fn program(&self) -> &OsStr {
&self.program
}

pub fn task_id(&mut self, task_id: TaskId<'static>) -> &mut Self {
self.task_id = Some(task_id);
self
}

pub fn get_task_id(&self) -> Option<&TaskId<'static>> {
self.task_id.as_ref()
}
}

impl From<Command> for tokio::process::Command {
Expand All @@ -117,6 +129,7 @@ impl From<Command> for tokio::process::Command {
env,
open_stdin,
env_clear,
task_id: _,
} = value;

let mut cmd = tokio::process::Command::new(program);
Expand Down Expand Up @@ -149,6 +162,7 @@ impl From<Command> for portable_pty::CommandBuilder {
cwd,
env,
env_clear,
task_id: _,
..
} = value;
let mut cmd = portable_pty::CommandBuilder::new(program);
Expand Down
Loading