feat(go): add background session flows via Detach#5193
feat(go): add background session flows via Detach#5193apascal07 wants to merge 4 commits intoap/go-session-flowfrom
Detach#5193Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces support for detached invocations in session flows, allowing clients to disconnect while the flow continues processing in the background. Key changes include new SnapshotEvent and SnapshotStatus types to manage the lifecycle of these detached invocations, a detach field in SessionFlowInput, and the addition of turnIndex to TurnEnd for better input correlation. The SessionStore interface is extended with GetSnapshotMetadata and CancelSnapshot methods, and new companion actions (getSnapshot, cancelSnapshot) are registered. The implementation includes a refactored session flow runtime with chunkRouter and detachIntake components to manage input and output streams, and new options WithSnapshotTransform and WithHeartbeatInterval for state manipulation and detached invocation monitoring. The Output method for BidiConnection and SessionFlowConnection is updated to correctly handle outputs from detached flows. Feedback highlights a potential race condition in finalizePendingSnapshot where a concurrent CancelSnapshot could be overwritten, and suggests treating empty snapshot statuses as 'complete' in the heartbeat poller for consistency.
| if meta, err := rt.cfg.store.GetSnapshotMetadata(ctx, pending.SnapshotID); err == nil && meta != nil && meta.Status == SnapshotStatusCanceled { | ||
| canceledByUser = true | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
There is a potential race condition between this status check and the subsequent SaveSnapshot call. If CancelSnapshot is invoked by another client after GetSnapshotMetadata returns but before SaveSnapshot completes, the canceled status in the store will be overwritten by complete or error. While this check narrows the window, ensuring the "no clobber" guarantee mentioned in the PR description for all store implementations might require a conditional update mechanism in the SessionStore interface (e.g., SaveSnapshotIfStatus) or using store-specific transactions.
| case SnapshotStatusCanceled: | ||
| onCancel() | ||
| return | ||
| case SnapshotStatusComplete, SnapshotStatusError: |
There was a problem hiding this comment.
The heartbeat poller should also treat an empty status as terminal (complete) to be consistent with how snapshots are handled elsewhere in the system (e.g., in registerSnapshotActions and loadSession). This avoids unnecessary polling if a store implementation returns an empty status for a finalized snapshot.
| case SnapshotStatusComplete, SnapshotStatusError: | |
| case SnapshotStatusComplete, SnapshotStatusError, "": |
- Detach() takes no input. To ride a final input on the same wire
message, callers can still call Send(&{Detach: true, Messages: ...})
directly. Server skips empty detach signals so they don't leave a
stray input in PendingInputs.
- Add SessionFlow.RunBackground() for single-turn background
invocations.
- Expose SessionFlow.GetSnapshot() and SessionFlow.CancelSnapshot() as
typed methods so callers don't have to go through ResolveActionFor.
Local Go callers already hold the store reference passed to WithSessionStore — no need for a method on the flow that bounces through the action layer. Tests that exercise action-layer behavior (transform, no-store error) still call the action via ResolveActionFor; tests that just need the cancel/get behavior call the store directly.
Builds on top of #4462. Adds
SessionFlowConnection.Detachso a client can disconnect from a session flow and have it continue processing in the background, plus the snapshot lifecycle (pending/complete/canceled/error) and companion actions needed to observe and cancel that work.Examples
Detaching a connection
Detachends the input stream and asks the server to take ownership of the rest of the work. The connection closes promptly with a snapshot ID the client can use later. Send any final inputs first.For single-turn use,
Runalready takes aSessionFlowInputwhoseDetachfield is the same wire bit, so detached single-turn is just:Polling status, fetching results, cancelling
Local Go callers use the
storereference they already have fromWithSessionStore. No indirection through the flow.For Dev UI and non-Go clients, every session flow registers
{flowName}/getSnapshotand{flowName}/cancelSnapshotcompanion actions over the reflection API. They produce a client-facing view (WithSnapshotTransformapplied; raw snapshot envelope hidden) that the local store calls bypass.Resuming after the background run finishes
Once the snapshot has finalized to
complete, resume it like any other snapshot. Pending and error snapshots are rejected.Tuning for slow / fast cancel response
Lifecycle
A session flow runs in two modes: sync (default) and detached (opt-in).
Sync. Each turn writes a turn-end snapshot;
fn-return writes an invocation-end snapshot; output is returned over the connection. Same as #4462.Detach. When
Detach=truelands, the server atomically captures the in-flight input plus everything queued behind it into FIFOPendingInputson a single pending snapshot, returns that ID over the wire, and closes the connection. The userfnkeeps running on a context decoupled from the client's, so a client disconnect doesn't cancel it. A pure detach signal (no payload) is dropped, not enqueued.Background terminal. When
fnreturns, the same snapshot row is rewritten in place with one of three statuses (complete,errorwithErrorset, orcanceled) andPendingInputsis cleared. The snapshot ID never changes, so callers keep polling the same ID.Cancel.
store.CancelSnapshot(id)(or thecancelSnapshotaction) atomically flips pending → canceled. A heartbeat poller in the server (default 10s,WithHeartbeatInterval) observes the flip and cancels the work context sofnreturns promptly. The finalizer rechecks status before writing terminal state, so a late cancel wins over acompletethat was about to land.Resume. Pass any terminal snapshot ID via
WithSnapshotID. Pending is rejected (still running). Canceled is rejected. Error is rejected and surfaces the recorded error.No store + detach. Rejected with
FAILED_PRECONDITION. Detach has nowhere durable to put the captured state.API additions
Connection
Flow options
Snapshot lifecycle
SessionSnapshotgains:UpdatedAt,Status,Error,StartingTurnIndex,PendingInputs.TurnEndgainsTurnIndex.Companion action wire types (auto-registered per flow)
Store interface
InMemorySessionStoreimplements all four. External stores must implement the new methods;CancelSnapshotmust be atomic (transaction or CAS) so a racing finalizer cannot clobber a cancel.Wire input