Skip to content

[v2] Event watcher#7091

Merged
machichima merged 18 commits intoflyteorg:v2from
machichima:event-watcher
Apr 1, 2026
Merged

[v2] Event watcher#7091
machichima merged 18 commits intoflyteorg:v2from
machichima:event-watcher

Conversation

@machichima
Copy link
Copy Markdown
Member

@machichima machichima commented Mar 25, 2026

Tracking issue

Why are the changes needed?

Previously WatchClusterEvents incorrectly return action phase rather than cluster events.

What changes were proposed in this pull request?

  • Add k8s event watcher to capture events for objects and store in AdditionalReasons in phase info
    • taskaction controller will take them out from AdditionalReasons and put into ClusterEvents in action event
  • WatchClusterEvent get from ClusterEvents

How was this patch tested?

  • Run on sandbox and ensure we can see k8s events
image

Labels

Please add one or more of the following labels to categorize your PR:

  • added: For new features.
  • changed: For changes in existing functionality.
  • deprecated: For soon-to-be-removed features.
  • removed: For features being removed.
  • fixed: For any bug fixed.
  • security: In case of vulnerabilities

This is important to improve the readability of release notes.

Setup process

Screenshots

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Related PRs

Docs link

Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
@machichima machichima marked this pull request as draft March 25, 2026 10:53
@github-actions github-actions bot mentioned this pull request Mar 25, 2026
3 tasks
@pingsutw pingsutw added this to the V2 GA milestone Mar 25, 2026
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
@machichima machichima marked this pull request as ready for review March 26, 2026 01:57
Signed-off-by: machichima <nary12321@gmail.com>
go s.repo.ActionRepo().WatchActionUpdates(ctx, actionID.Run, updatesCh, errsCh)

lastPhase := action.Phase
const maxEvents = 500
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just set a reasonable number here first, normally we will not get to this vaue. We can decide whether we want to increase/decrease it later one

Signed-off-by: machichima <nary12321@gmail.com>
logger.Infof(ctx, "Received WatchClusterEvents request for: %s/%s", actionID.Run.Name, actionID.Name)

// Step 1: Send existing events from current DB state
action, err := s.repo.ActionRepo().GetAction(ctx, actionID)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we watch first then query action db? I t is likely that some updates might be lost between the time window

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 2868d5e

}
// Filter to this specific action
if updated.Name != actionID.Name {
continue
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this continue here will makes the program go back to the top while loop and do the drain process again. Is there any reason for doing so?

Copy link
Copy Markdown
Member Author

@machichima machichima Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: This is to prevent the event miss. I tried to skip when the update is not from target action, we will only get partial event (see following).
I think it's a bug now and my implementation is a workaround. Will try to deal with this in follow-ups

Image

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a todo comment here: ee6efcb

Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Comment on lines +93 to +100
// Revalidate this bucket is still current before writing.
w.mu.RLock()
stillCurrent := w.objectCache[objectKey] == eventInfos
w.mu.RUnlock()
if !stillCurrent {
// eventInfos being deleted/changed, we should get the newest object again
eventInfos.mu.Unlock()
continue
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need revalidate if we already have a lock per event?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix by not deleting eventInfo when it's empty in 7976995
Also use sync.Map instead for objectCache

Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
Signed-off-by: machichima <nary12321@gmail.com>
@machichima machichima merged commit 900685f into flyteorg:v2 Apr 1, 2026
15 of 17 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants