Skip to content

feat(go): add background session flows via Detach#5193

Open
apascal07 wants to merge 4 commits intoap/go-session-flowfrom
ap/go-session-flow-background
Open

feat(go): add background session flows via Detach#5193
apascal07 wants to merge 4 commits intoap/go-session-flowfrom
ap/go-session-flow-background

Conversation

@apascal07
Copy link
Copy Markdown
Collaborator

@apascal07 apascal07 commented Apr 25, 2026

Builds on top of #4462. Adds SessionFlowConnection.Detach so a client can disconnect from a session flow and have it continue processing in the background, plus the snapshot lifecycle (pending / complete / canceled / error) and companion actions needed to observe and cancel that work.

Examples

Detaching a connection

Detach ends the input stream and asks the server to take ownership of the rest of the work. The connection closes promptly with a snapshot ID the client can use later. Send any final inputs first.

chatFlow := genkit.DefineSessionFlow(g, "chat", myFn,
    aix.WithSessionStore(store),
)

conn, _ := chatFlow.StreamBidi(ctx)
conn.SendText("draft a long report on Go's runtime")
conn.SendText("...and email it to me when done")

// Client wants to walk away. The server keeps working.
conn.Detach()

out, _ := conn.Output() // returns immediately with the pending snapshot ID
fmt.Println(out.SnapshotID)

For single-turn use, Run already takes a SessionFlowInput whose Detach field is the same wire bit, so detached single-turn is just:

out, _ := chatFlow.Run(ctx, &aix.SessionFlowInput{
    Detach:   true,
    Messages: []*ai.Message{ai.NewUserTextMessage("...")},
})

Polling status, fetching results, cancelling

Local Go callers use the store reference they already have from WithSessionStore. No indirection through the flow.

snap, _ := store.GetSnapshot(ctx, id)

switch snap.Status {
case aix.SnapshotStatusPending:   // still working; snap.PendingInputs lists the queued inputs
case aix.SnapshotStatusComplete:  // snap.State has the final state
case aix.SnapshotStatusError:     // snap.Error has the failure message
case aix.SnapshotStatusCanceled:  // someone (or some thing) cancelled it
}

store.CancelSnapshot(ctx, id) // atomic pending → canceled; no-op otherwise

For Dev UI and non-Go clients, every session flow registers {flowName}/getSnapshot and {flowName}/cancelSnapshot companion actions over the reflection API. They produce a client-facing view (WithSnapshotTransform applied; raw snapshot envelope hidden) that the local store calls bypass.

Resuming after the background run finishes

Once the snapshot has finalized to complete, resume it like any other snapshot. Pending and error snapshots are rejected.

out, _ := chatFlow.RunText(ctx, "next question",
    aix.WithSnapshotID[MyState](id),
)

Tuning for slow / fast cancel response

chatFlow := genkit.DefineSessionFlow(g, "chat", myFn,
    aix.WithSessionStore(store),
    aix.WithHeartbeatInterval[MyState](2*time.Second), // default: 10s
    aix.WithSnapshotTransform[MyState](redactPII),     // applied to the getSnapshot action + client-managed output
)

Lifecycle

A session flow runs in two modes: sync (default) and detached (opt-in).

sync     ── turn ── turn ── fn returns ──→ output on the wire

detach   ── turn ── Detach ──→ pending snapshot ID; client disconnects
                                  │
                                  └─→ pending ──→ complete | error | canceled
                                                    ↑
                                          cancelSnapshot can flip

Sync. Each turn writes a turn-end snapshot; fn-return writes an invocation-end snapshot; output is returned over the connection. Same as #4462.

Detach. When Detach=true lands, the server atomically captures the in-flight input plus everything queued behind it into FIFO PendingInputs on a single pending snapshot, returns that ID over the wire, and closes the connection. The user fn keeps running on a context decoupled from the client's, so a client disconnect doesn't cancel it. A pure detach signal (no payload) is dropped, not enqueued.

Background terminal. When fn returns, the same snapshot row is rewritten in place with one of three statuses (complete, error with Error set, or canceled) and PendingInputs is cleared. The snapshot ID never changes, so callers keep polling the same ID.

Cancel. store.CancelSnapshot(id) (or the cancelSnapshot action) atomically flips pending → canceled. A heartbeat poller in the server (default 10s, WithHeartbeatInterval) observes the flip and cancels the work context so fn returns promptly. The finalizer rechecks status before writing terminal state, so a late cancel wins over a complete that was about to land.

Resume. Pass any terminal snapshot ID via WithSnapshotID. Pending is rejected (still running). Canceled is rejected. Error is rejected and surfaces the recorded error.

No store + detach. Rejected with FAILED_PRECONDITION. Detach has nowhere durable to put the captured state.


API additions

Connection

// Detach asks the server to write a pending snapshot capturing any inputs
// already buffered, close the connection, and continue processing in the
// background. To ride a final input on the same wire message, send
// Send(&SessionFlowInput{Detach: true, Messages: ...}) directly.
func (*SessionFlowConnection) Detach() error

Flow options

// WithHeartbeatInterval sets the cadence at which a detached invocation
// polls its pending snapshot for cancellation. Default: 10s.
WithHeartbeatInterval[State](d time.Duration)

// WithSnapshotTransform rewrites snapshot state before it is surfaced via
// GetSnapshot or as client-managed output. Typical use: PII redaction.
WithSnapshotTransform[State](transform SnapshotTransform[State])

Snapshot lifecycle

type SnapshotStatus string
const (
    SnapshotStatusPending  SnapshotStatus = "pending"  // detached invocation still running
    SnapshotStatusComplete SnapshotStatus = "complete" // settled state
    SnapshotStatusCanceled SnapshotStatus = "canceled" // cancelled via CancelSnapshot
    SnapshotStatusError    SnapshotStatus = "error"    // invocation failed; Error is populated
)

const SnapshotEventDetach SnapshotEvent = "detach"

SessionSnapshot gains: UpdatedAt, Status, Error, StartingTurnIndex, PendingInputs. TurnEnd gains TurnIndex.

Companion action wire types (auto-registered per flow)

// {flowName}/getSnapshot
type GetSnapshotRequest  struct { SnapshotID string }
type GetSnapshotResponse[State any] struct {
    SnapshotID        string
    CreatedAt, UpdatedAt time.Time
    Status            SnapshotStatus
    Error             string
    StartingTurnIndex int
    PendingInputs     []*SessionFlowInput  // non-empty only when status=pending
    State             *SessionState[State] // omitted when status=pending or error
}

// {flowName}/cancelSnapshot
type CancelSnapshotRequest  struct { SnapshotID string }
type CancelSnapshotResponse struct { SnapshotID string; Status SnapshotStatus }

Store interface

type SessionStore[State any] interface {
    GetSnapshot(ctx, id) (*SessionSnapshot[State], error)
    GetSnapshotMetadata(ctx, id) (*SnapshotMetadata, error) // light projection for the heartbeat poller
    SaveSnapshot(ctx, snapshot) error
    CancelSnapshot(ctx, id) (*SnapshotMetadata, error)      // atomic pending → canceled; no-op otherwise
}

InMemorySessionStore implements all four. External stores must implement the new methods; CancelSnapshot must be atomic (transaction or CAS) so a racing finalizer cannot clobber a cancel.

Wire input

type SessionFlowInput struct {
    Detach       bool          `json:"detach,omitempty"` // new
    Messages     []*ai.Message
    ToolRestarts []*ai.Part
}

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces support for detached invocations in session flows, allowing clients to disconnect while the flow continues processing in the background. Key changes include new SnapshotEvent and SnapshotStatus types to manage the lifecycle of these detached invocations, a detach field in SessionFlowInput, and the addition of turnIndex to TurnEnd for better input correlation. The SessionStore interface is extended with GetSnapshotMetadata and CancelSnapshot methods, and new companion actions (getSnapshot, cancelSnapshot) are registered. The implementation includes a refactored session flow runtime with chunkRouter and detachIntake components to manage input and output streams, and new options WithSnapshotTransform and WithHeartbeatInterval for state manipulation and detached invocation monitoring. The Output method for BidiConnection and SessionFlowConnection is updated to correctly handle outputs from detached flows. Feedback highlights a potential race condition in finalizePendingSnapshot where a concurrent CancelSnapshot could be overwritten, and suggests treating empty snapshot statuses as 'complete' in the heartbeat poller for consistency.

Comment thread go/ai/exp/session_flow.go
Comment on lines +341 to +345
if meta, err := rt.cfg.store.GetSnapshotMetadata(ctx, pending.SnapshotID); err == nil && meta != nil && meta.Status == SnapshotStatusCanceled {
canceledByUser = true
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There is a potential race condition between this status check and the subsequent SaveSnapshot call. If CancelSnapshot is invoked by another client after GetSnapshotMetadata returns but before SaveSnapshot completes, the canceled status in the store will be overwritten by complete or error. While this check narrows the window, ensuring the "no clobber" guarantee mentioned in the PR description for all store implementations might require a conditional update mechanism in the SessionStore interface (e.g., SaveSnapshotIfStatus) or using store-specific transactions.

Comment thread go/ai/exp/session_flow.go
case SnapshotStatusCanceled:
onCancel()
return
case SnapshotStatusComplete, SnapshotStatusError:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The heartbeat poller should also treat an empty status as terminal (complete) to be consistent with how snapshots are handled elsewhere in the system (e.g., in registerSnapshotActions and loadSession). This avoids unnecessary polling if a store implementation returns an empty status for a finalized snapshot.

Suggested change
case SnapshotStatusComplete, SnapshotStatusError:
case SnapshotStatusComplete, SnapshotStatusError, "":

- Detach() takes no input. To ride a final input on the same wire
  message, callers can still call Send(&{Detach: true, Messages: ...})
  directly. Server skips empty detach signals so they don't leave a
  stray input in PendingInputs.
- Add SessionFlow.RunBackground() for single-turn background
  invocations.
- Expose SessionFlow.GetSnapshot() and SessionFlow.CancelSnapshot() as
  typed methods so callers don't have to go through ResolveActionFor.
Local Go callers already hold the store reference passed to
WithSessionStore — no need for a method on the flow that bounces through
the action layer. Tests that exercise action-layer behavior (transform,
no-store error) still call the action via ResolveActionFor; tests that
just need the cancel/get behavior call the store directly.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant