Skip to content

feat(agents): prototype v2 event stream + continuationId + React hooks#5424

Draft
chrisraygill wants to merge 14 commits into
pj/agents-samplefrom
cg/agents-v2-event-stream-prototype
Draft

feat(agents): prototype v2 event stream + continuationId + React hooks#5424
chrisraygill wants to merge 14 commits into
pj/agents-samplefrom
cg/agents-v2-event-stream-prototype

Conversation

@chrisraygill
Copy link
Copy Markdown
Contributor

@chrisraygill chrisraygill commented May 28, 2026

Summary

End-to-end prototype of a v2 Agents protocol plus a framework-agnostic client core (AgentSession) and thin per-framework adapters that absorb all client glue. Server emits a discriminated event union with a single opaque continuation token; the AgentSession class owns the lifecycle so pages never touch streamFlow / runFlow directly. Validated across two frameworks: all 11 React sample pages plus 3 Angular pages running against the same backend, all verified end-to-end against gemini-flash-latest.

This is exploratory work, not a merge candidate. v2 is the only API surface (no backwards-compat layer); the goal is to validate the design with running code before committing to a wire protocol.

High-level API design

Server protocol

  • AgentEvent: one discriminated union for the wire. Tagged variants on type: model-chunk, status, artifact-emitted, snapshot, interrupt, tool-error, detached, turn-end, error. Replaces the structural union of optional fields on AgentStreamChunk. Clients dispatch by type; old event.kind === 'X' pattern matching is gone.
  • continuation: structured discriminated object replaces the state vs snapshotId fork. Shape is { kind: 'snapshot', snapshotId } (server-stored agents) or { kind: 'state', state } (stateless — state is carried inline as JSON, no base64). Round-trips on every turn through AgentInit.continuation and AgentOutput.continuation, and on the snapshot / detached / turn-end events. The previous FAILED_PRECONDITION validation (which would throw when callers passed the wrong kind of init for the wrong storage mode) is gone — the server reads whichever kind is in continuation. The kind discriminator keeps the storage-mode distinction visible (snapshot tokens are URL-fit, state objects can be megabytes) without forcing client code to branch.
  • status event is application-defined. Single { type: 'status', status: <any> } event variant. The shape of status is intentionally not standardized — agents and their clients agree on their own structure (typed via the client's <TStatus> generic). Earlier prototype iterations had typed status / progress / phase variants; reverted to keep app-side flexibility.
  • In-stream interrupt events with addressable toolCallId / toolName / input / kind: 'respond' | 'restart'. Replaces post-hoc scanning of result.message.content for unresolved tool requests. Resolves on the same turn the interrupt fires; UIs don't need to re-fetch the snapshot to discover an interrupt happened.
  • In-stream tool-error events as the symmetric counterpart. Fires alongside the model-chunk that carries the failed toolResponse, carrying { toolCallId, toolName, errorText, errorCode?, details? }. Detection signal is metadata.toolError set by middleware — the stock filesystem middleware sets it. The toolResponse still flows back to the model so the conversation can continue; this event lets clients mark the matching in-flight tool call as failed.
  • detached event marks the foreground → background transition explicitly, with continuation: { kind: 'snapshot', snapshotId } so the client can start polling without a second query. Was previously inferred from the absence of further chunks.
  • snapshotId is re-exposed on AgentOutput and on the snapshot / detached / turn-end events as a server-side convenience field alongside continuation. Useful for direct /state / /abort calls and URL bookmarks without needing to pattern-match on continuation.kind.

Client surface (genkit/beta/client)

  • walkAgentEvent(event, handlers) dispatches over the union. Handlers are all optional: onText, onReasoning, onToolRequest, onToolResponse, onToolError, onStatus (payload unknown), onArtifact, onInterrupt, onDetached, onTurnEnd. Strongly typed per-handler with no any escape hatches except where the protocol explicitly leaves a payload open (status).
  • AgentSession<S, TStatus> is the framework-agnostic session core. Owns the entire client-side state machine: streaming chunk dispatch, structured-continuation round-trip, in-stream interrupt detection, foreground → background phase transitions with polling, snapshot rehydration, and lifecycle cleanup. Exposes a subscribe(listener) => unsubscribe observable + getState() snapshot getter + action methods (submit, abort, reset, respondToInterrupt, restartInterrupt, runVariants, continueFrom). Zero framework imports. The <TStatus> generic types the application-defined status payload per agent.
  • No continuation codec helpers. Earlier iterations had an opaque-string token (snap:<id> / state:<base64>) with browser-safe encode/decode helpers; the structured continuation makes those unnecessary — the JSON itself is the wire format.

Framework adapters

AgentSession is the heavy lifter; each framework's adapter is a thin shim that wires the session's state observable into the framework's native reactivity model. Two adapters in this PR, validating the cross-framework design:

React (js/testapps/agents/web/src/genkit-react/useGenkitAgent.ts):

export function useGenkitAgent<S, TStatus>(
  options
): UseGenkitAgentResult<S, TStatus> {
  const sessionRef = useRef<AgentSession<S, TStatus> | null>(null);
  if (sessionRef.current === null)
    sessionRef.current = new AgentSession<S, TStatus>(options);
  const session = sessionRef.current;
  session.setOptions(options);

  const state = useSyncExternalStore(
    useMemo(() => session.subscribe.bind(session), [session]),
    useMemo(() => session.getState.bind(session), [session]),
  );
  // …bind action methods…
}

Angular (js/testapps/agents-angular/src/app/genkit-angular/inject-genkit-agent.ts):

export function injectGenkitAgent<S, TStatus>(
  options
): GenkitAgentHandle<S, TStatus> {
  const session = new AgentSession<S, TStatus>(options);
  const state = signal(session.getState());
  const unsubscribe = session.subscribe(() => state.set(session.getState()));
  inject(DestroyRef).onDestroy(() => {
    unsubscribe();
    session.dispose();
  });
  return { state: state.asReadonly(), ...boundMethods(session) };
}

Effective adapter code (excluding JSDoc/types/imports):

Adapter Total LOC Body LOC
React useGenkitAgent 128 11
Angular injectGenkitAgent 80 8
Core AgentSession 830

The duplication between adapters is the irreducible minimum: construct session, forward state changes to framework reactivity, bind action methods, tear down on destroy. No further extraction would shrink either side. Same pattern Vue (ref() + watch), Svelte (readable(initial, set => session.subscribe(...))), or Solid (from(session)) would use.

  • useGenkitStream<I, O, S, Init> is a generic React primitive for non-agent streaming flows. Makes no assumptions about chunk shape.
  • useGenkitRunFlow<I, O> wraps non-streaming runFlow for sibling endpoints (workspace listings, file reads, snapshot data). Useful when an agent page also calls a few one-shot flows — keeps everything reactive through one mental model.

Sample testapps

Two front-end testapps share the same backend (testapps/agents):

  • testapps/agents/web — React, all 11 pages migrated.
  • testapps/agents-angular — Angular, 3 representative pages ported (Weather, Banking, Background). Exists to prove the framework-agnostic core: same AgentSession, same backend, no server-side changes.

React: migrated pages

All 11 React testapp pages migrated. Each was previously a hand-rolled streamFlow loop with manual chunk dispatch, state vs snapshotId routing, interrupt detection, and (for some) background polling.

Page Demonstrates
WeatherChat basic chat + tool calls + URL bookmark
ClientState typed customState round-trip
BankingInterrupt in-stream interrupt with respondToInterrupt
WorkspaceBuilder side-by-side chat + artifact panel
BackgroundAgent auto detachphase: 'background' polling, server-side abort
BranchingChat runVariants for "pick your variant" UIs + continueFrom
TaskTracker persisted custom state across turns
TripPlanner URL-restorable multi-step planning
SubAgentChat composed sub-agents
ResearchAgent long-running research with progress
CodingAgent the kitchen sink — filesystem middleware, tool approval, ask_user interrupt, snapshot resumption, sibling workspace endpoints via useGenkitRunFlow

Code shrinkage on the most representative pages:

Page Before After Δ
WeatherChat 345 LOC 181 LOC −47%
BankingInterrupt 358 LOC 195 LOC −46%
BackgroundAgent 342 LOC 238 LOC −30%
CodingAgent 1040 LOC 885 LOC −15%

Most of the post-migration LOC is JSX + the explanatory sidebar that documents the v2 pattern. Actual hook usage is typically 5-15 lines.

End-to-end verification

Tested against the actual agents testapp with GEMINI_API_KEY against gemini-flash-latest, using Puppeteer-driven headless Chrome:

Wire format (curl):

  • Stored agent (codingAgent): SSE stream returns turn-end events with payload { snapshotId, continuation: { kind: 'snapshot', snapshotId } }. No continuationId field anywhere.
  • Stateless agent (weatherAgentStateless): final result returns continuation: { kind: 'state', state: {...} } with the full session state inline (no base64).
  • Tool-error event: drove read_file against a nonexistent path → 2 tool-error events with addressable toolCallId and clean errorText (no Tool 'X' failed: prefix in the structured field).

React (testapps/agents/web):

  • CodingAgent restored session: drove a write_file interrupt via curl, loaded /coding-agent/{sid} in browser → transcript rehydrated, approval dialog rendered with tool name + file path. Approve click → POST to /api/codingAgent returned 200, "✅ Approved" synth event + "✅ File written" tool row + follow-up model message rendered, file written to disk with correct content.
  • WeatherChat: click suggestion → tool call + tool response + model response rendered in correct order, URL pushed to /weather/{snapshotId}, no console errors.
  • ClientState (stateless agent): click suggestion → conversation works, sidebar renders the inline state JSON directly from agent.continuation.state (no decode step).
  • BankingInterrupt: in-stream interrupt → approval dialog with Approve / Deny buttons rendered.

Angular (testapps/agents-angular):

  • BackgroundAgent: submit topic with detach: true → phase transitioned to 'background' rendering snapshotId from the new derived state field → polling tick → phase transitioned to 'done' with full report rendered.

Earlier in the branch, all 11 React pages and 3 Angular pages were verified end-to-end.

Honest gaps

  • useGenkitStream is still useful but rarely needed once useGenkitAgent covers chat-shaped flows.
  • No subscribeAgent for background phase. Session polls. With server-sent events + Last-Event-ID resumption (suggestion S3 from the design discussion) it would prefer subscription.
  • No streaming artifacts emission. Schema reserves the event types but no flow uses them yet.
  • Adapters live in the testapps, not as published @genkit-ai/react / @genkit-ai/angular packages.
  • Only React + Angular validated. Vue / Svelte / Solid would follow the same pattern but aren't proven in code yet.
  • No type inference from agent definitions (suggestion S8). Hook consumers do well, but raw streamFlow callers don't benefit.

How to run

cd js
pnpm install
pnpm -C core build && pnpm -C ai build && pnpm -C genkit build
pnpm -C plugins/express build && pnpm -C plugins/middleware build

# Server (defaults to port 3400, matching both web apps' proxy targets)
cd testapps/agents
GEMINI_API_KEY=<your-key> npx tsx src/index.ts

# React client (in another terminal)
cd testapps/agents/web
npx vite
# Open http://localhost:5173/

# Angular client (in another terminal — same backend)
cd testapps/agents-angular
npx ng serve
# Open http://localhost:4200/

Files changed (high level)

  • js/ai/src/agent.ts — schemas (AgentEvent discriminated union, structured ContinuationSchema, continuation field on AgentInit / AgentOutput / snapshot / detached / turn-end), runtime event emission, in-stream interrupt + tool-error detection with addressable refs
  • js/genkit/src/client/agent-events.tswalkAgentEvent visitor + AgentEvent / AgentContinuation type exports
  • js/genkit/src/client/agent-session.ts — framework-agnostic AgentSession<S, TStatus> core
  • js/genkit/src/client/index.ts — exports
  • js/testapps/agents/src/* — server-side test flows updated to v2 (app-defined status payloads, structured continuation round-trip, { type: 'model-chunk', chunk }); workspace files endpoint no longer 500s on ENOENT; server default port is 3400
  • js/testapps/agents/web/src/genkit-react/useGenkitAgent, useGenkitStream, useGenkitRunFlow
  • js/testapps/agents-angular/ — Angular testapp + injectGenkitAgent
  • js/testapps/agents/web/src/pages/ — all 11 pages rewritten to use the hooks
  • js/testapps/agents/web/vite.config.ts — alias genkit/beta/client to TS sources

Why a draft PR

To spark concrete discussion on the v2 design with running code attached, not to merge as-is. Open design questions:

  • Should walkAgentEvent live in genkit/beta/client or split into a separate @genkit-ai/client-utils so the AI SDK adapter can depend on it without pulling streamFlow?
  • Adapters: extract to @genkit-ai/react / @genkit-ai/angular immediately (and add Vue / Svelte / Solid), or keep in examples/ for a release cycle to iterate on the API?
  • Should AgentSession's resume path also sniff messages[last] for an unresolved tool request and surface it as pendingInterrupt (so restartInterrupt / respondToInterrupt work uniformly across in-stream and restored interrupts)?
  • Should useGenkitRunFlow exist as a sibling primitive, or should pages just import runFlow directly for one-shots? (Angular has no equivalent today.)
  • Should AgentInit accept an { events: AgentEvent[] } batching wrapper for network coalescing (proposal §1, deferred)?

Implements suggestions 1, 2, 4, and 6 from the design discussion as a
coherent v2 protocol pass, plus the proposed `useGenkitAgent` hook, and
migrates 3 of the 11 testapp pages to use them.

See PR description for the full design + migration rationale.
@google-cla
Copy link
Copy Markdown

google-cla Bot commented May 28, 2026

Thanks for your pull request! It looks like this may be your first contribution to a Google open source project. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA).

View this failed invocation of the CLA check for more information.

For the most up to date status, view the checks section at the bottom of the pull request.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces the v2 Agents API, which unifies state continuation via an opaque continuationId and adds a typed, discriminated AgentEvent stream. It also provides client-side event-walking helpers and React hooks (useGenkitAgent, useGenkitStream) to simplify agent interaction, tool calling, interrupts, and background execution, migrating several test applications to this new pattern. The review feedback highlights critical improvements for the React hooks to prevent memory leaks and redundant network requests by properly managing and clearing background polling timeouts and guarding state rehydration. Additionally, feedback recommends enhancing server-side robustness by throwing errors on malformed continuation tokens, handling potential circular references during status serialization, and adding defensive checks when parsing event content parts.

Comment thread js/testapps/agents/web/src/genkit-react/useGenkitAgent.ts Outdated
Comment thread js/testapps/agents/web/src/genkit-react/useGenkitAgent.ts Outdated
Comment thread js/testapps/agents/web/src/genkit-react/useGenkitAgent.ts Outdated
Comment thread js/testapps/agents/web/src/genkit-react/useGenkitAgent.ts Outdated
Comment thread js/testapps/agents/web/src/genkit-react/useGenkitAgent.ts Outdated
Comment thread js/testapps/agents/web/src/genkit-react/useGenkitAgent.ts Outdated
Comment thread js/ai/src/agent.ts Outdated
Comment thread js/ai/src/agent.ts Outdated
Comment thread js/genkit/src/client/agent-events.ts
Per design discussion: the Agents API is brand new (still in beta), so
backwards compat isn't a concern. This commit makes v2 the only API.

Core (js/ai/src/agent.ts)
- AgentStreamChunkSchema is now the discriminated event union directly
  (was a structural wrapper with optional modelChunk/status/artifact/turnEnd
  fields plus an `event` field for dual-emission). Every chunk has a `type`
  discriminator; consumers do `switch (chunk.type)` for exhaustive dispatch.
- AgentInitSchema is just `{ continuationId? }`. Dropped `snapshotId` and
  `state` from the client-facing schema entirely. The FAILED_PRECONDITION
  state-vs-snapshotId fork is gone.
- AgentOutputSchema drops `snapshotId` (use `continuationId` instead).
  `state` stays as a read-only view (carries messages, custom, artifacts).
- Runtime emission updated: every site emits typed events directly. No
  more `withEvent()` wrapper. Interrupts emit one `interrupt` event per
  pending tool request with addressable refs.

Client (js/genkit/src/client/agent-events.ts)
- walkAgentEvent now dispatches the event union directly (no more
  fallback synthesis from legacy fields).
- Dropped AgentStreamChunkV2 wrapper type.

React hooks (js/testapps/agents/web/src/genkit-react/useGenkitAgent.ts)
- Now consumes AgentEvent directly from streamFlow's stream.
- Reads result.state.custom / state.messages / state.artifacts for
  reactive hook fields.

Pages — all 11 migrated:
- WeatherChat, BankingInterrupt, BackgroundAgent (already on v2)
- ClientState, TripPlanner, TaskTracker, WorkspaceBuilder, SubAgentChat,
  ResearchAgent — full migration to useGenkitAgent
- BranchingChat — kept on raw runFlow (its parallel-branch model is
  outside the hook's single-stream contract), updated to continuationId
- CodingAgent — minimal patch to use continuationId everywhere; the
  larger structure preserved (1040 LOC, kept its custom approval/question
  dialogs and middleware-specific rendering)

Server-side custom agents updated for typed events:
- research-agent.ts: sendChunk({ type: 'status', label }), sendChunk({
  type: 'progress', current, total, label }), sendChunk({
  type: 'model-chunk', chunk })
- task-agent.ts: tools self-initialize their custom state shape on first
  turn (previously relied on the client pre-seeding it via init.state,
  which no longer exists)

Verified end-to-end in browser with the API key:
- WeatherChat: streaming chat with tool calls
- BankingInterrupt: in-stream interrupt + approval resume
- BackgroundAgent: detach + auto-poll + result render
- TaskTracker: typed customState reactive update
- ResearchAgent: typed status events drive status bar

Zero page errors, zero console errors across all spot-checked pages.
@chrisraygill
Copy link
Copy Markdown
Contributor Author

Update: v2 is now the only API (no backwards-compat baggage)

Per design feedback that the Agents API is brand new and doesn't need backwards compat, this update removes the dual-emission and v1 schema fields entirely. v2 is the only API surface now, and all 11 testapp pages are migrated.

Schema simplification

Was Now
AgentStreamChunkSchema = z.object({ modelChunk?, status?, artifact?, turnEnd?, event? }) AgentStreamChunkSchema = z.discriminatedUnion('type', [...])
AgentInitSchema = z.object({ snapshotId?, state? }) + FAILED_PRECONDITION if wrong one AgentInitSchema = z.object({ continuationId? })
AgentOutputSchema = z.object({ snapshotId, state, continuationId, ... }) AgentOutputSchema = z.object({ continuationId, message, artifacts, state })
Runtime withEvent() dual-emission wrapper Every emission site emits AgentEvent directly
walkAgentEvent(chunk, handlers) with legacy-field synthesis fallback walkAgentEvent(event, handlers) direct dispatch

Wire format verified clean

$ curl -N -X POST .../api/weatherAgent -d '{"data":{"messages":[...]},"init":{}}'
EVENT type=model-chunk
EVENT type=model-chunk
EVENT type=model-chunk
EVENT type=turn-end
{
  "result": {
    "continuationId": "v1:57dc2276-...",
    "message": { "role": "model", "content": [...] },
    "state": { "messages": [...], "custom": {}, "artifacts": [] }
  }
}

All 11 pages migrated

Page Pattern Migration approach
WeatherChat streaming chat + tool calls + URL restore useGenkitAgent
BankingInterrupt tool approval via respond interrupt useGenkitAgent + respondToInterrupt
BackgroundAgent detach + auto-poll + result render useGenkitAgent (auto background phase)
ClientState stateless agent, no server store useGenkitAgent (same hook — continuationId hides the difference)
TripPlanner definePromptAgent + .prompt file useGenkitAgent
TaskTracker typed custom state with mutating tools useGenkitAgent<TaskState>
WorkspaceBuilder artifacts useGenkitAgent (artifacts reactive)
SubAgentChat agents() middleware + call_agent useGenkitAgent + delegation formatting
ResearchAgent defineCustomAgent + status/progress events useGenkitAgent (typed statusLabel + progress)
BranchingChat parallel runs from same point Kept on raw runFlow — branching is outside the hook's single-stream model — but updated to continuationId
CodingAgent 1040 LOC interrupt-heavy + custom UI Minimal patch to continuationId; preserved custom approval/question dialog logic

Server-side adjustments (custom agents needed typed events)

// research-agent.ts before
sendChunk({ status: 'Decomposing question into sub-topics…' });
sendChunk({ status: `Researching (${i+1}/${n}): ${q}` });
sendChunk({ modelChunk: chunk });

// after — typed contract
sendChunk({ type: 'status', label: 'Decomposing question into sub-topics…' });
sendChunk({ type: 'progress', label: 'Researching', current: i+1, total: n });
sendChunk({ type: 'model-chunk', chunk });

A real defineAgent migration also surfaced a latent bug in task-agent.ts: the tools previously relied on the client pre-seeding state.custom = { tasks: [], nextId: 1 } via init.state. With v2, first-turn custom state is {} and the tools have to self-heal. One-line fix, but worth flagging as an example of how the v1 client-pre-seeds-state pattern was hiding tool bugs.

Sample-app verification (all in browser with Playwright + headless Chromium)

=== WeatherChat ===
  clicked suggestion (London)
  assistant reply rendered ✅

=== BankingInterrupt ===
  interrupt dialog appeared ✅
  clicked Approve → model confirmed "transferred $500 to your savings"

=== BackgroundAgent ===
  pending UI visible: true
  background result rendered ✅ (1690 chars)

=== TaskTracker ===
  tasks visible: 1 ✅ (agent.customState reactive update)

=== ResearchAgent ===
  status/sub-questions visible ✅ (typed status events drive indicator)

errors: 0 / 0

What's still not done

  • BranchingChat is on raw runFlow not the hook. A useGenkitBranching hook (or a branchFrom option on useGenkitAgent) would be the natural extension. Scoped out for this prototype.
  • CodingAgent kept most of its original 1040 LOC. Could migrate the streaming loop to the hook but the custom approval/question dialogs and middleware-specific tool rendering would need significant rework. Worth doing in a follow-up.
  • TaskTracker tool fix (self-init on first turn) is a side-effect — confirms v1's client-pre-seeded state pattern was load-bearing for tool correctness. Worth surfacing as a Genkit docs note.

…rs, all 11 pages verified

Removes the remaining backwards-compat baggage flagged in review:

Token format
- Rename `v1:` → `snap:`, `v1s:` → `state:`. The prefix is a storage-kind
  discriminator (snap vs state), not a version. If the format ever needs
  to evolve, the server detects the new shape at decode time.

Helpers exported from `genkit/beta/client`
- `encodeSnapshotContinuation(snapshotId)`
- `encodeStateContinuation(state)`
- `continuationToSnapshotId(token)` — opaque-token-safe extraction
- Live in a new `client/continuation.ts` (no Node deps); also re-exported
  from `genkit/beta` for server-side test flows.

Snapshot data action
- `getSnapshotDataAction` now accepts either a raw snapshotId or a
  continuationId. Server decodes if it sees the `snap:` prefix; otherwise
  treats input as a raw id for backwards-friendly server-internal calls.

Hook centralizes the format
- `useGenkitAgent` exposes `agent.snapshotId` as a derived field (decoded
  from continuationId). Pages no longer compose tokens by hand or do
  `.startsWith('v1:')` / `.slice(3)` tricks.
- New `resumeFromSnapshotId` option for the common URL-param case so
  callers don't have to know about token prefixes at all.

Server test flows migrated
- `background-agent.ts`, `banking-agent.ts`, `branching-agent.ts`,
  `coding-agent.ts`, `file-store-agent.ts`, `task-agent.ts`,
  `weather-agent-stateless.ts` — all use `continuationId` +
  `continuationToSnapshotId` helper. No more `init: { snapshotId: ... }`
  or `output.snapshotId` references.
- `task-agent` test flow: stop pre-seeding the custom state via init —
  tools self-initialize their structure on first turn (fix for a latent
  bug the v1 client-pre-seeded pattern was masking).
- `weather-agent-stateless` test flow: input schema now takes
  `continuationId` instead of the full state blob.

Pages use the centralized API
- Removed manual `v1:${sid}` composition from BackgroundAgent, WeatherChat,
  TripPlanner, BranchingChat, CodingAgent.
- All use either the hook's `agent.snapshotId` / `resumeFromSnapshotId`
  OR the helpers from `genkit/beta/client` (browser-safe path).

Bug found and fixed during this pass
- Importing the helpers from `genkit/beta` (the Node package) in
  BranchingChat + CodingAgent broke ALL 11 pages at runtime with
  `Module "node:perf_hooks" has been externalized`. Caught by the
  Playwright sweep. Fix: helpers must come from `genkit/beta/client`,
  not `genkit/beta`. Updated imports + cleared vite cache.

Verified end-to-end in browser
- Drove all 11 pages with Playwright + headless Chromium against the
  live API key. WeatherChat, ClientState, BankingInterrupt,
  WorkspaceBuilder, BackgroundAgent, TaskTracker, TripPlanner,
  SubAgentChat, BranchingChat, ResearchAgent, CodingAgent — all PASS.
- Zero page errors. Two pre-existing console errors on CodingAgent
  (filesystem-state endpoint not configured) — unrelated to this PR.
@chrisraygill chrisraygill changed the title [Draft] Prototype v2 Agent event stream + continuationId + React hooks feat(agents): prototype v2 event stream + continuationId + React hooks May 29, 2026
@chrisraygill
Copy link
Copy Markdown
Contributor Author

Cleanup pass: backwards-compat baggage removed, all 11 pages verified

Token format renamed

Was Now
v1:<snapshotId> snap:<snapshotId>
v1s:<base64(state)> state:<base64(state)>

The prefix is now a clear storage-kind discriminator. No version baggage.

Helpers exported from genkit/beta/client (browser-safe)

import {
  encodeSnapshotContinuation,
  encodeStateContinuation,
  continuationToSnapshotId,
} from 'genkit/beta/client';

Re-exported from genkit/beta for server-side test flows.

Hook centralizes the format

Page code no longer composes tokens by hand. The hook exposes a derived agent.snapshotId and accepts resumeFromSnapshotId (alias for URL-param case):

// Was
const resumeFromContinuation = urlSnapshotId ? `v1:${urlSnapshotId}` : undefined;
const sid = agent.continuationId?.startsWith('v1:')
  ? agent.continuationId.slice(3)
  : undefined;

// Now
const agent = useGenkitAgent({ url, resumeFromSnapshotId: urlSnapshotId });
const sid = agent.snapshotId;

getSnapshotDataAction accepts either form

// Server decodes if it sees the `snap:` prefix; otherwise treats input as
// a raw snapshotId for direct server-internal API callers.

Server test flows migrated

All seven server-side test flows (background-agent, banking-agent, branching-agent, coding-agent, file-store-agent, task-agent, weather-agent-stateless) now use continuationId + continuationToSnapshotId helper. Zero references to init: { snapshotId } or output.snapshotId.

Real bug found and fixed by the verification sweep

Importing helpers from genkit/beta (the Node package) in BranchingChat + CodingAgent broke all 11 pages at runtime with Module "node:perf_hooks" has been externalized. Caught by the Playwright sweep, not by typecheck. Fix: helpers must come from genkit/beta/client (the browser-safe path).

Worth noting as a Genkit packaging concern: nothing prevents genkit/beta imports from leaking Node-only code into browser bundles. A real @genkit-ai/react package would want stricter exports.

Browser verification — all 11 pages

Page Result
WeatherChat ✅ PASS
ClientState (stateless) ✅ PASS
BankingInterrupt ✅ PASS
WorkspaceBuilder ✅ PASS
BackgroundAgent ✅ PASS
TaskTracker ✅ PASS
TripPlanner ✅ PASS
SubAgentChat ✅ PASS
BranchingChat ✅ PASS
ResearchAgent ✅ PASS
CodingAgent ⚠️ 2 console errors (pre-existing — filesystem-state endpoint not configured; unrelated to this PR)

Zero page errors across all 11. Driven with Playwright + headless Chromium against the live API key.

CI status

  • commitlint: PR title updated to feat(agents): prototype v2 event stream + continuationId + React hooks (conventional format)
  • format/lint: npm run format run; copyright headers updated
  • build: server test flow build errors fixed (the snapshotId/state field references I missed in v1)
  • cla/google: expected (first contribution; not signing on personal account)

…fix tests

Server-internal callers (tests, sub-agent middleware, direct `.run()`
invocations) want raw snapshotId ergonomics. The wire format still
treats `continuationId` as canonical for clients; `snapshotId` is now a
documented server-side convenience field on both AgentInit and
AgentOutput. This is not v1 backwards compat — it's recognizing the
same value has two valid representations.

- AgentInit accepts either `continuationId` or `snapshotId`. If both,
  continuationId wins. No FAILED_PRECONDITION fork.
- AgentOutput exposes `snapshotId` alongside `continuationId` for
  server-stored agents.
- agent_test.ts: 2 obsolete tests deleted (testing the removed
  FAILED_PRECONDITION behavior); 2 tests updated to use continuationId
  / `chunk.type === 'turn-end'` / `chunk.type === 'model-chunk'` to
  match the v2 discriminated-union chunk shape.

All 54 tests pass locally.
@chrisraygill chrisraygill force-pushed the cg/agents-v2-event-stream-prototype branch 2 times, most recently from 307b34d to 4f89065 Compare May 29, 2026 02:48
…med token rejection, defensive part

Addresses the gemini-code-assist[bot] review on PR 5424:

useGenkitAgent.ts (6 HIGH-priority comments):
- Add `pollTimeoutRef` + `pollGenRef` for the background-poll lifecycle.
  `clearActivePoll()` cancels any pending timer and bumps the generation
  so an in-flight tick exits early without calling setState.
- Wire cleanup through `reset()`, `abort()`, and the unmount effect so
  the recursive setTimeout chain doesn't leak after teardown.
- Resume-from-snapshot effect now tracks which continuationIds it has
  already rehydrated this mount. URL pushes that feed back as new
  `resumeFromContinuation` values are no-ops instead of triggering
  redundant `/state` fetches mid-conversation.

agent.ts:
- Throw `INVALID_ARGUMENT` when `init.continuationId` is malformed
  instead of silently starting a fresh session. Silent fall-through
  was masking typo/corruption bugs in callers.

agent-events.ts:
- Defensive `if (!part) continue;` in `walkAgentEvent` for the model
  chunk content loop. Cheap insurance against adapters emitting holes.

(The third MEDIUM comment about JSON.stringify on circular status objects
referenced code in the dual-emission `withEvent()` wrapper that was
already removed in the v2 cleanup pass — no action needed.)

All 54 agent tests pass. All 11 sample pages verified end-to-end.
…import them

Goal: a React app working with Genkit agents shouldn't need to reach for
`runFlow` or `streamFlow` directly. The hook surface now covers every
pattern the testapp pages were using raw.

useGenkitAgent additions:
- `agent.abort()` now returns Promise<void> and, when in the background
  phase, calls the server's `/abort` endpoint in addition to cancelling
  local polling. Server-side work actually stops; pages don't need to
  call `runFlow` against the abort URL by hand.
- `agent.runVariants(input, count)` fires N parallel runs from the
  current continuation and returns an `AgentVariant[]` with each
  branch's continuationId / snapshotId / message / state.
- `agent.continueFrom(snapshotIdOrContinuation)` advances the agent to
  a chosen branch, fetching its `/state` snapshot and rehydrating
  messages / customState / artifacts reactively.
- Added `phaseRef` + `snapshotIdRef` so `abort` reads current values
  without re-creating itself.

New hook: useGenkitRunFlow
- Generic React wrapper over `runFlow` for non-streaming sibling
  endpoints (workspace listings, file reads, snapshot fetches, etc.).
- Exposes `{ data, status, error, run, reset }`.
- Memoized so consumer-side useCallback / useEffect deps stay stable.

Page migrations:
- BackgroundAgent: drop the `runFlow` import entirely; abort is just
  `agent.abort()`.
- BranchingChat: rewrite to use the hook — `agent.runVariants(input, 2)`
  + `agent.continueFrom(variant.snapshotId)`. The page is now a pure
  consumer of useGenkitAgent.
- CodingAgent: the 3 `runFlow` calls (workspace tree, file content,
  snapshot restore) go through `useGenkitRunFlow`. Caught and fixed an
  infinite-render bug — depending on the full hook result instead of
  its stable `.run` reference caused 80k+ console errors per page load
  before the fix.

The one remaining `streamFlow` import in CodingAgent is intentionally
deferred — its custom approval/interrupt rendering predates the hook
and migrating it fully would be a substantial restructure of a 1040 LOC
file. The streaming work itself is identical to what `useGenkitAgent`
does; this is purely about the page's rendering scaffolding.

All 54 agent tests pass. All 11 sample pages verified end-to-end in
browser (Playwright + headless Chromium, live API key). Zero page
errors. CodingAgent has 1 pre-existing console error (filesystem-state
endpoint connection refused on initial load before server is fully
ready) — same as before this PR.
CodingAgent was the last page still importing `streamFlow` directly.
Replace its hand-rolled stream loop, restore-from-URL fetcher, and
interrupt-detection routine with the v2 hook surface. Page now derives
displayed messages from `agent.messages` with synthetic system events
interleaved (approvals, denials, ask_user answers, errors). 1057 → 884 LOC.

Verifying the migration end-to-end with headless Chrome surfaced two
useGenkitAgent bugs that affected every page using URL resumption or
streaming-text rendering:

1. Resume-from-snapshot stuck under React StrictMode. The hook added the
   continuation token to `rehydratedRef` BEFORE the snapshot fetch
   resolved. StrictMode's first effect pass got canceled (cleanup ran),
   and the second pass saw the token already in the ref and skipped the
   fetch entirely — phase never left 'idle', messages never loaded.
   Move the `add` to AFTER the fetch commits (or after the error path),
   so a canceled first run leaves the second run free to do the work.

2. Streaming text not cleared at end of turn. After the final model
   message is committed to `messages`, `streamingText`/`streamingReasoning`
   retained their last values, causing ChatUI to render the same text
   twice (once as the committed message, once as a live streaming line
   with a cursor). Clear both before transitioning out of 'streaming'.

For the CodingAgent approval/answer handlers, submit via
`agent.submit({ resume: ... })` directly rather than the
`restartInterrupt`/`respondToInterrupt` helpers, because restored
interrupts don't surface as `agent.pendingInterrupt` (the hook's
restoration path doesn't populate it). The page has the
toolName/ref/input locally from its restore-sniff effect, so passing
them explicitly works in both fresh-stream and restored cases.
…ix repo setup quirks

Hook ergonomics:
- `useGenkitAgent` and `useGenkitRunFlow` now route all option reads
  through a latest-options ref, so `submit` / `abort` / `reset` /
  `respondToInterrupt` / `restartInterrupt` / `runVariants` /
  `continueFrom` / `run` all have permanently stable identity for the
  hook's lifetime. Empty dependency arrays on every `useCallback`.
- Consumers can now include any of these in a `useEffect` /
  `useCallback` dep array without thinking about whether the parent
  memoized the option object. A missed memo on `headers` (or just
  passing `options` inline) previously made `run` change every render
  — exactly the trap that caused the 81k-error re-render loop in
  CodingAgent earlier in this branch.
- `restartInterrupt` / `respondToInterrupt` read the latest
  `pendingInterrupt` via a ref too, so they no longer change identity
  every time an interrupt arrives.

Tool-call duplication fix:
- Clear `agent.toolCalls` when a turn ends, alongside the existing
  `streamingText` / `streamingReasoning` clear. The canonical
  representation of each tool invocation lives in `agent.messages`
  after the turn commits; leaving the in-flight buffer populated made
  pages that render `toolCalls` next to `messages` (WeatherChat)
  re-emit the same tool row twice. Pages that want live tool activity
  should gate on `phase === 'streaming'`.

Repo setup quirks (caught while verifying end-to-end):
- Server defaults to port 3400, matching Vite's default proxy target.
  Was 8080 — every page using `/api/...` 500'd through the proxy
  unless `PORT=3400` was set explicitly.
- `listWorkspaceFiles` returns `{ files: [] }` on ENOENT instead of
  throwing. The web app polls this on every CodingAgent load, and a
  fresh checkout (where the workspace dir doesn't exist yet) shouldn't
  crash the request.
- CodingAgent now uses relative `/api/...` paths instead of absolute
  `http://localhost:8080`, so it goes through the same Vite proxy
  every other page uses.
Rounds out the tool-call lifecycle symmetry. Interrupts already had a
typed in-stream event with an addressable `toolCallId`; tool failures
did not — clients had to string-match the toolResponse output (e.g.
`Tool 'X' failed: ...`) to know a tool errored.

Now:

- **Schema**: `tool-error` event variant on `AgentStreamChunkSchema`,
  carrying `{ toolCallId, toolName, errorText, errorCode?, details? }`.
  Mirrors the shape of `interrupt`.
- **Runtime emission**: agent runtime inspects every toolResponse part
  flowing through the model-chunk stream and emits a `tool-error`
  event alongside the chunk when a failure is detected. Detection is
  in priority order:
    1. `metadata.toolError` set by middleware that caught a thrown error
    2. Output shape `{ error: string }` or `{ status: 'error', message }`
    3. Output is a string starting with `Tool '<name>' failed:` (the
       format genkit's stock middleware produces)
  Tool errors still flow back to the model as a toolResponse so the
  conversation can continue — this event is purely additive.
- **Visitor**: `walkAgentEvent` dispatches via `onToolError` with the
  same payload.
- **Hook**: `useGenkitAgent` now transitions the matching `ToolCall`
  entry in `agent.toolCalls` to `state: 'error'` and exposes
  `errorText` / `errorCode`. Wins over a concurrent `onToolResponse`
  for the same toolCallId (the typed event carries the structured
  info). WeatherChat's existing `state === 'error'` branch
  (`❌ {name} failed`) now lights up.

Verified end-to-end: drove CodingAgent with a request that fails
`read_file` against a nonexistent path — got 2 `tool-error` events in
the stream (one for read_file, one for list_files), each with the
correct addressable toolCallId and stripped error text.
…cross-framework design

Extract a framework-agnostic `AgentSession` class from `useGenkitAgent`
into `genkit/beta/client`. The class owns the entire client-side state
machine — streaming chunk dispatch, continuation token round-trip,
in-stream interrupt detection, foreground/background phase
transitions with polling, snapshot rehydration, and lifecycle
cleanup. Framework adapters subscribe to its state observable and
forward action calls.

This turns the React hook into a 128-LOC wrapper around the 830-LOC
core (was a 716-LOC monolith), and lets us validate the design holds
up under a totally different framework's reactivity model.

To validate, a new `testapps/agents-angular` exercises the same
backend the React testapp uses (`testapps/agents`, no server-side
changes). The Angular adapter (`injectGenkitAgent`) is 80 LOC,
using Angular signals + `DestroyRef`:

  ```ts
  export function injectGenkitAgent<S>(options) {
    const session = new AgentSession<S>(options);
    const state = signal(session.getState());
    const unsubscribe = session.subscribe(() =>
      state.set(session.getState()));
    inject(DestroyRef).onDestroy(() => {
      unsubscribe();
      session.dispose();
    });
    return { state: state.asReadonly(), ...boundMethods(session) };
  }
  ```

Three representative pages ported (WeatherChat, BankingInterrupt,
BackgroundAgent) and verified end-to-end against the React app's
backend via Puppeteer + Chromium with the live API key:

- WeatherChat: user → tool call → tool response → model response
  render in correct order
- BankingInterrupt: in-stream interrupt → approval dialog with
  Approve/Deny buttons
- BackgroundAgent: detach → phase=background (polling) → phase=done
  with 11k-char report

Zero pageerrors. Zero console errors across all three.

Effective adapter code (excluding JSDoc/types/imports) is 11 LOC for
React and 8 LOC for Angular. The duplication between the two
adapters is the irreducible minimum: construct session, forward
state changes to framework reactivity, bind action methods, tear
down on destroy. No further extraction would shrink them.

React-side StrictMode note: the dispose-on-unmount fires
prematurely under StrictMode's double-mount cycle. The wrapper
defers disposal to a microtask and checks
`session.hasActiveSubscribers()` first — if the remount has already
re-subscribed, disposal is skipped. Angular's `DestroyRef` has no
such complication.
@github-actions github-actions Bot added docs Improvements or additions to documentation config labels May 29, 2026
…upes, dead schema

Four implementation simplifications that don't change observable
behavior (except #3, which tightens the tool-error contract).

1) Consolidate snapshot rehydrate + background poll into a single
   `applySnapshot(snapshotId)` helper on `AgentSession`. Both methods
   previously did the same runFlow → state.{messages,artifacts,custom}
   → check status logic with subtle drift (the poll handled 'aborted',
   rehydrate didn't). The helper returns the snapshot's terminal
   status; callers decide their own phase semantics. `continueFrom`
   also routes through it. ~30 LOC of duplication gone.

2) Drop the StrictMode dispose dance from the React adapter. The
   trickiest 15 LOC in the whole stack — Promise.resolve().then +
   hasActiveSubscribers() check + microtask deferral — goes away.
   Replaced with a `listeners.size === 0` self-termination check
   inside `startBackgroundPoll`: when no one is subscribed, the poll
   stops on its next tick. `useSyncExternalStore` already manages
   subscribe/unsubscribe correctly, so the listener Set empties on
   unmount. AbortControllers are GC-safe. No explicit dispose needed
   from the adapter; the session lazy-init via useRef just works
   under StrictMode's double-mount. `hasActiveSubscribers()` removed
   from the public API.

3) Make `metadata.toolError` the only `tool-error` detection signal.
   Drop the two heuristic fallbacks (`{error}`/`{status:'error'}`
   shape, and the `Tool '<name>' failed:` string match) from
   `detectToolError`. Patch the stock filesystem middleware to set
   `metadata.toolError = { message }` on the response envelope so
   the runtime can find it. Detection lives at one explicit opt-in
   flag; the implementation now matches the protocol's promise of
   "no more string matching". Third-party middleware that catches
   tool throws and returns error-shaped responses without setting
   the flag won't surface as `tool-error` events — the conversation
   still proceeds (model sees the error in output) but the typed
   client signal is gone. Documented tradeoff.

   Bug caught while wiring this up: the metadata flag lives on the
   *part* envelope, not on the `toolResponse` value. The runtime
   now passes both to the detector.

4) Drop the unused `artifact-start`/`artifact-delta`/`artifact-complete`
   schema variants from `AgentStreamChunkSchema` (and the matching
   client visitor case). The schema comment had been calling them
   "reserved; runtime emits artifact-emitted for now" since they
   were introduced — three reserved-but-never-emitted variants is
   protocol noise. Add them back at the moment the emission site
   exists.

End-to-end verified:
- React: WeatherChat tool-call lifecycle + BankingInterrupt in-stream
  interrupt + URL push. Zero errors.
- Angular: WeatherChat + BankingInterrupt + BackgroundAgent (phase
  transitioned background → done at t=18s, 9.5KB report). Zero errors.
- tool-error event still fires via metadata signal: 2 events emitted
  in a curl test driving read_file ENOENT (one for list_files probe,
  one for read_file), each with addressable toolCallId and clean
  errorText.

Net LOC across all four: ~80 saved, plus the React adapter's
trickiest code path retired. Protocol vocabulary contracts from 12
variants to 10.
Three protocol-level changes per scratch/genkit-agents-v2-protocol.md:

1. **Drop the opaque `continuationId` string; adopt structured continuation**
   (proposal §2 Option A). `AgentInit`, `AgentOutput`, and the `snapshot` /
   `detached` / `turn-end` events now carry `continuation: { kind:
   'snapshot', snapshotId } | { kind: 'state', state }`. Stateless agents
   return the full session state inline as JSON; no base64, no opaque
   token. Delete `encodeSnapshotContinuation` / `encodeStateContinuation`
   / `decodeContinuation` / `continuationToSnapshotId` helpers and the
   browser-safe `client/continuation.ts` file — the JSON itself is the
   wire format. Clients pattern-match on `continuation.kind` when they
   care about storage mode; the convenience `snapshotId` field on
   outputs / events stays for URL-bookmark callers.

2. **Revert typed status events to a single application-defined variant.**
   Drop `StatusEventSchema` and the three `{ type: 'status' | 'progress'
   | 'phase' }` variants. Single `{ type: 'status', status: z.any() }`
   variant carrying whatever shape the agent and its client agree on.
   The client visitor collapses `onStatus` / `onProgress` / `onPhase`
   into one `onStatus(status: unknown)`. `AgentSession<S, TStatus>` and
   the React/Angular adapters gain a `<TStatus>` generic so consumers
   can type the payload at the usage site (e.g. ResearchAgent declares
   `{ label, current?, total? }`).

3. **Cleanup: drop streaming-artifact mentions** — the schema variants
   were aspirational and never landed in code; only documentation
   referred to them.

Other implied changes from the proposal:
- `getSnapshotData` action stops accepting `snap:`-prefixed tokens. Input
  is a raw snapshotId; clients hit the `/state` endpoint with the
  derived `snapshotId` field directly.
- Tool-error detection in agent runtime no longer falls back to the
  `Tool '<name>' failed:` string heuristic. Trust `metadata.toolError`
  set by middleware only (filesystem middleware already does).

Updated:
- Core schema + runtime (agent.ts)
- Client visitor (agent-events.ts) — `AgentContinuation` type exported
- AgentSession (agent-session.ts) — `<S, TStatus>` generic, structured
  continuation throughout, `continueFrom` accepts either continuation
  or raw snapshotId
- React adapter (useGenkitAgent.ts) — `<S, TStatus>` generic
- Angular adapter (inject-genkit-agent.ts) + BackgroundAgent component
- React pages: BackgroundAgent (`agent.status?.label`), ClientState
  (renders inline state JSON directly, no decode), ResearchAgent
  (declares its status shape)
- Server testapps: research-agent emits `{ type: 'status', status: {...} }`,
  background-agent / file-store-agent / branching-agent /
  weather-agent-stateless / coding-agent / banking-agent migrate
  continuation round-trip
- Tests (agent_test.ts) — 3 round-trip assertions migrated to
  `continuation` shape

Verified end-to-end: 307 unit tests pass; React + Angular UI flows
exercised (WeatherChat, ClientState, BankingInterrupt, CodingAgent
restore+approve, BackgroundAgent); curl confirms wire payloads carry
`continuation: { kind: 'snapshot', snapshotId }` on stored agents and
`continuation: { kind: 'state', state }` inline on stateless.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

config docs Improvements or additions to documentation js

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant