-
Notifications
You must be signed in to change notification settings - Fork 2.1k
fix: Add fine grained interruptible task restarts in watch mode #11135
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -31,6 +31,7 @@ use tokio::{ | |
| sync::{mpsc, watch}, | ||
| }; | ||
| use tracing::{debug, trace}; | ||
| use turborepo_task_id::TaskId; | ||
|
|
||
| use super::{Command, PtySize}; | ||
|
|
||
|
|
@@ -384,20 +385,33 @@ pub struct Child { | |
| } | ||
|
|
||
| #[derive(Clone, Debug)] | ||
| pub struct ChildCommandChannel(mpsc::Sender<ChildCommand>); | ||
| pub struct ChildCommandChannel { | ||
| sender: mpsc::Sender<ChildCommand>, | ||
| task_id: Option<TaskId<'static>>, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a little rough, because now the child process has to be aware of I'm also worried about the lifetime implications. The Another small, small worry I have, but a worry nonethless, is the memory overhead associated with this strategy.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm fair enough. Is there a better identifier or way of getting the "set of processes mapped to tasks" that would allow us to not have the process code care about tasks at all? That was the primary thing I wasn't confident in going into this change without being too familiar with the codebase.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After sleeping on this, I'm thinking the best way (still not great) we can go about this is if Now when a process exits, we don't need to update task_index immediately, it will get filtered. Stopping a task now looks like: What do you think? Is that enough to have another go with or do you need more detail? |
||
| } | ||
|
|
||
| impl ChildCommandChannel { | ||
| pub fn new() -> (Self, mpsc::Receiver<ChildCommand>) { | ||
| pub fn new(task_id: Option<TaskId<'static>>) -> (Self, mpsc::Receiver<ChildCommand>) { | ||
| let (tx, rx) = mpsc::channel(1); | ||
| (ChildCommandChannel(tx), rx) | ||
| ( | ||
| ChildCommandChannel { | ||
| sender: tx, | ||
| task_id, | ||
| }, | ||
| rx, | ||
| ) | ||
| } | ||
|
|
||
| pub async fn kill(&self) -> Result<(), mpsc::error::SendError<ChildCommand>> { | ||
| self.0.send(ChildCommand::Kill).await | ||
| self.sender.send(ChildCommand::Kill).await | ||
| } | ||
|
|
||
| pub async fn stop(&self) -> Result<(), mpsc::error::SendError<ChildCommand>> { | ||
| self.0.send(ChildCommand::Stop).await | ||
| self.sender.send(ChildCommand::Stop).await | ||
| } | ||
|
|
||
| pub fn get_task_id(&self) -> Option<&TaskId<'static>> { | ||
| self.task_id.as_ref() | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -416,6 +430,7 @@ impl Child { | |
| pty_size: Option<PtySize>, | ||
| ) -> io::Result<Self> { | ||
| let label = command.label(); | ||
| let task_id = command.get_task_id().cloned(); | ||
| let SpawnResult { | ||
| handle: mut child, | ||
| io: ChildIO { stdin, output }, | ||
|
|
@@ -428,7 +443,7 @@ impl Child { | |
|
|
||
| let pid = child.pid(); | ||
|
|
||
| let (command_tx, mut command_rx) = ChildCommandChannel::new(); | ||
| let (command_tx, mut command_rx) = ChildCommandChannel::new(task_id); | ||
|
|
||
| // we use a watch channel to communicate the exit code back to the | ||
| // caller. we are interested in three cases: | ||
|
|
@@ -680,6 +695,11 @@ impl Child { | |
| pub fn label(&self) -> &str { | ||
| &self.label | ||
| } | ||
|
|
||
| pub fn task_id(&self) -> Option<&TaskId<'static>> { | ||
| self.command_channel | ||
| .get_task_id() | ||
| } | ||
| } | ||
|
|
||
| // Adds a trailing newline if necessary to the buffer | ||
|
|
@@ -750,7 +770,7 @@ impl ChildStateManager { | |
| impl Child { | ||
| // Helper method for checking if child is running | ||
| fn is_running(&self) -> bool { | ||
| !self.command_channel.0.is_closed() | ||
| !self.command_channel.sender.is_closed() | ||
| } | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is potentially dangerous based on race conditions and dependent task behavior. What tests can we write to validate that we're not creating bugs here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Example Race:
That should've been
Shutdown.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we introduce some atomics, this concern would get addressed.