Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
a88dfa0
feat(js): added support for bidi actions and flows
pavelgj Jan 1, 2026
993de97
fixed flow typing
pavelgj Jan 4, 2026
d00b98c
inputStream from input
pavelgj Jan 5, 2026
ad309ae
fix
pavelgj Jan 6, 2026
626da82
jsdocs
pavelgj Jan 27, 2026
8412545
jsdocs
pavelgj Jan 27, 2026
94969a7
docs: Add JSDoc comments to various action and flow types and functions.
pavelgj Jan 27, 2026
f520be1
feedback
pavelgj Jan 27, 2026
07da46b
moved defineBidiFlow to beta
pavelgj Mar 12, 2026
6f183a7
feat: Unify action configuration by integrating `Init` parameters dir…
pavelgj Mar 12, 2026
37a8796
refactor: simplify `init` option assignment and remove unnecessary ty…
pavelgj Mar 12, 2026
3c9bcb9
refactor: remove redundant type assertions for `ActionRunOptions` in …
pavelgj Mar 12, 2026
0abe3e2
Merge branch 'main' into pj/bidi-actions
pavelgj Apr 13, 2026
40dce0d
Merge branch 'main' into pj/bidi-actions
pavelgj Apr 14, 2026
3f70493
refactor: streamline bidi action API
pavelgj Apr 15, 2026
c3cbb4e
fmt
pavelgj Apr 16, 2026
47abe12
cleanup
pavelgj Apr 20, 2026
531cdc9
refactor: separate BidiActionRunOptions from ActionRunOptions and str…
pavelgj Apr 20, 2026
6857c73
feat: add initialization schema support to actions and improve stream…
pavelgj Apr 21, 2026
abbd9de
fix: update channel termination condition to correctly handle falsy v…
pavelgj Apr 21, 2026
3b6e02a
implemented reflection
pavelgj May 1, 2026
26998c2
feat: add `init` parameter to runAction for session initialization
pavelgj May 5, 2026
83f92e6
fmt
pavelgj May 5, 2026
eb7f2cb
feat: add init data support to client and express handler
pavelgj May 6, 2026
4f92ccd
Merge branch 'pj/bidi-actions' into pj/init-transport
pavelgj May 6, 2026
0ea3108
Merge branch 'pj/bidi-actions' into pj/init-transport
pavelgj May 7, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions js/genkit/src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ const __flowStreamDelimiter = '\n\n';
* console.log(await response.output);
* ```
*/
export function streamFlow<O = any, S = any>({
export function streamFlow<O = any, S = any, Init = any>({
url,
input,
init,
streamId,
headers,
abortSignal,
Expand All @@ -47,6 +48,8 @@ export function streamFlow<O = any, S = any>({
url: string;
/** Flow input. */
input?: any;
/** Initialization data for the action. */
init?: Init;
/** Stream ID to connect to. */
streamId?: string;
/** A map of HTTP headers to be added to the HTTP call. */
Expand All @@ -64,6 +67,7 @@ export function streamFlow<O = any, S = any>({
const operationPromise = __flowRunEnvelope({
url,
input,
init,
sendChunk: (c) => channel.send(c),
headers: streamId
? { ...headers, 'x-genkit-stream-id': streamId }
Expand All @@ -88,13 +92,15 @@ export function streamFlow<O = any, S = any>({
async function __flowRunEnvelope({
url,
input,
init,
sendChunk,
headers,
abortSignal,
responseCallback,
}: {
url: string;
input: any;
init?: any;
sendChunk: (chunk: any) => void;
headers?: Record<string, string>;
abortSignal?: AbortSignal;
Expand All @@ -104,6 +110,7 @@ async function __flowRunEnvelope({
method: 'POST',
body: JSON.stringify({
data: input,
...(init !== undefined && { init }),
}),
headers: {
Accept: 'text/event-stream',
Expand Down Expand Up @@ -177,16 +184,19 @@ async function __flowRunEnvelope({
* console.log(await response);
* ```
*/
export async function runFlow<O = any>({
export async function runFlow<O = any, Init = any>({
url,
input,
init,
headers,
abortSignal,
}: {
/** URL of the deployed flow. */
url: string;
/** Flow input. */
input?: any;
/** Initialization data for the action. */
init?: Init;
/** A map of HTTP headers to be added to the HTTP call. */
headers?: Record<string, string>;
/** Abort signal to abort the request. */
Expand All @@ -196,6 +206,7 @@ export async function runFlow<O = any>({
method: 'POST',
body: JSON.stringify({
data: input,
...(init !== undefined && { init }),
}),
headers: {
'Content-Type': 'application/json',
Expand Down
25 changes: 25 additions & 0 deletions js/plugins/express/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,31 @@ for await (const chunk of result.stream) {
console.log(await result.output);
```

### Initialization Data

If your flow or action accepts initialization data (defined via `initSchema`), you can pass it using the `init` option in the client:

```ts
const result = await runFlow({
url: `http://localhost:${port}/myFlow`,
input: 'say hello',
init: { sessionId: 'abc123', config: { temperature: 0.7 } },
});

// Also works with streaming
const streamed = streamFlow({
url: `http://localhost:${port}/myFlow`,
input: 'say hello',
init: { sessionId: 'abc123' },
});
for await (const chunk of streamed.stream) {
console.log(chunk);
}
console.log(await streamed.output);
```

The `init` data is sent in the request body alongside `data` and is validated against the action's `initSchema` on the server side.

You can also use `startFlowServer` to quickly expose multiple flows and actions:

```ts
Expand Down
8 changes: 7 additions & 1 deletion js/plugins/express/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ export function expressHandler<
I extends z.ZodTypeAny = z.ZodTypeAny,
O extends z.ZodTypeAny = z.ZodTypeAny,
S extends z.ZodTypeAny = z.ZodTypeAny,
Init extends z.ZodTypeAny = z.ZodTypeAny,
>(
action: Action<I, O, S>,
action: Action<I, O, S, any, Init>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The Action type parameter for context should use the generic C instead of any. This ensures that the action is compatible with the context provided by the contextProvider. Using any here bypasses type checking for context compatibility, which could lead to runtime errors if the action expects a specific context type that the provider does not supply. Stricter type validation is preferred in this repository to ensure consistency and safety.

Suggested change
action: Action<I, O, S, any, Init>,
action: Action<I, O, S, C, Init>,
References
  1. Stricter validation is preferable to maintain consistency and prevent runtime errors.

opts?: {
contextProvider?: ContextProvider<C, I>;
streamManager?: StreamManager;
Expand Down Expand Up @@ -78,6 +79,7 @@ export function expressHandler<
}

const input = request.body.data as z.infer<I>;
const init = request.body.init;
let context: Record<string, any>;

try {
Expand Down Expand Up @@ -129,6 +131,7 @@ export function expressHandler<
streamManager,
streamIdToUse,
input,
init,
context,
response,
abortController.signal
Expand All @@ -138,6 +141,7 @@ export function expressHandler<
const result = await action.run(input, {
context,
abortSignal: abortController.signal,
init,
});
response.setHeader('x-genkit-trace-id', result.telemetry.traceId);
response.setHeader('x-genkit-span-id', result.telemetry.spanId);
Expand Down Expand Up @@ -168,6 +172,7 @@ async function runActionWithDurableStreaming<
streamManager: StreamManager | undefined,
streamId: string,
input: z.infer<I>,
init: any,
context: ActionContext,
response: express.Response,
abortSignal: AbortSignal
Expand Down Expand Up @@ -195,6 +200,7 @@ async function runActionWithDurableStreaming<
onChunk,
context,
abortSignal,
init,
});
if (streamManager) {
taskQueue!.enqueue(() => durableStream!.done(result.result));
Expand Down
40 changes: 40 additions & 0 deletions js/plugins/express/tests/express_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,26 @@ describe('expressHandler', async () => {
'/echoModelWithAuth',
expressHandler(echoModel, { contextProvider })
);
// A flow that echoes back the init data to verify it was received.
const flowWithInit = ai.defineFlow(
{
name: 'flowWithInit',
inputSchema: z.string(),
},
async (input) => {
return `input: ${input}`;
}
);
// Monkey-patch the run method to capture and return init data.
const originalRun = flowWithInit.run.bind(flowWithInit);
flowWithInit.run = async (input: any, options: any) => {
const result = await originalRun(input, options);
// Embed init in the result so we can verify it was passed through.
result.result = `input: ${input}, init: ${JSON.stringify(options?.init)}`;
return result;
};

app.post('/flowWithInit', expressHandler(flowWithInit));
app.post('/abortableFlow', expressHandler(abortableFlow));

server = app.listen(port, () => {
Expand Down Expand Up @@ -289,6 +309,26 @@ describe('expressHandler', async () => {
});
});

it('should pass init data to the action', async () => {
const result = await runFlow<string>({
url: `http://localhost:${port}/flowWithInit`,
input: 'hello',
init: { sessionId: 'abc123', temperature: 0.7 },
});
assert.strictEqual(
result,
'input: hello, init: {"sessionId":"abc123","temperature":0.7}'
);
});

it('should pass undefined init when not provided', async () => {
const result = await runFlow<string>({
url: `http://localhost:${port}/flowWithInit`,
input: 'hello',
});
assert.strictEqual(result, 'input: hello, init: undefined');
});

// TODO: This test is flaky, skipping until fixed.
it.skip('should abort a flow with auth', async () => {
const controller = new AbortController();
Expand Down
25 changes: 25 additions & 0 deletions js/plugins/fetch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,31 @@ for await (const chunk of result.stream) {
console.log(await result.output);
```

### Initialization Data

If your flow or action accepts initialization data (defined via `initSchema`), you can pass it using the `init` option in the client:

```ts
const result = await runFlow({
url: 'http://localhost:3780/api/myFlow',
input: 'say hello',
init: { sessionId: 'abc123', config: { temperature: 0.7 } },
});

// Also works with streaming
const streamed = streamFlow({
url: 'http://localhost:3780/api/myFlow',
input: 'say hello',
init: { sessionId: 'abc123' },
});
for await (const chunk of streamed.stream) {
console.log(chunk);
}
console.log(await streamed.output);
```

The `init` data is sent in the request body alongside `data` and is validated against the action's `initSchema` on the server side.

## API summary

| Export | Description |
Expand Down
5 changes: 5 additions & 0 deletions js/plugins/fetch/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ async function runActionWithDurableStreaming<
streamManager: StreamManager | undefined,
streamId: string,
input: z.infer<I>,
init: any,
context: ActionContext,
writer: WritableStreamDefaultWriter<Uint8Array>,
abortSignal: AbortSignal
Expand Down Expand Up @@ -247,6 +248,7 @@ async function runActionWithDurableStreaming<
onChunk,
context,
abortSignal,
init,
});

if (streamManager && durableStream) {
Expand Down Expand Up @@ -344,6 +346,7 @@ async function handleActionRequest<
}

const input = body.data as z.infer<I>;
const init = body.init;

let context: C;
try {
Expand Down Expand Up @@ -381,6 +384,7 @@ async function handleActionRequest<
options?.streamManager,
streamIdToUse,
input,
init,
context,
writer,
request.signal
Expand Down Expand Up @@ -409,6 +413,7 @@ async function handleActionRequest<
const result = await action.run(input, {
context,
abortSignal: request.signal,
init,
});

const headers: Record<string, string> = {
Expand Down
53 changes: 53 additions & 0 deletions js/plugins/fetch/tests/web_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,59 @@ describe('fetchHandler (single action)', () => {
assert.ok(json.message?.includes('not authorized'));
});

it('should pass init data to the action', async () => {
const ai = genkit({});
const flow = ai.defineFlow(
{ name: 'flowWithInit', inputSchema: z.string() },
async (input) => `input: ${input}`
);
// Monkey-patch the run method to capture and return init data.
const originalRun = flow.run.bind(flow);
flow.run = async (input: any, options: any) => {
const result = await originalRun(input, options);
result.result = `input: ${input}, init: ${JSON.stringify(options?.init)}`;
return result;
};
const request = new Request('http://localhost/flowWithInit', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
data: 'hello',
init: { sessionId: 'abc123', temperature: 0.7 },
}),
});
const response = await fetchHandler(flow)(request);
assert.strictEqual(response.status, 200);
const json = (await response.json()) as { result: string };
assert.strictEqual(
json.result,
'input: hello, init: {"sessionId":"abc123","temperature":0.7}'
);
});

it('should pass undefined init when not provided', async () => {
const ai = genkit({});
const flow = ai.defineFlow(
{ name: 'flowWithInit', inputSchema: z.string() },
async (input) => `input: ${input}`
);
const originalRun = flow.run.bind(flow);
flow.run = async (input: any, options: any) => {
const result = await originalRun(input, options);
result.result = `input: ${input}, init: ${JSON.stringify(options?.init)}`;
return result;
};
const request = new Request('http://localhost/flowWithInit', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ data: 'hello' }),
});
const response = await fetchHandler(flow)(request);
assert.strictEqual(response.status, 200);
const json = (await response.json()) as { result: string };
assert.strictEqual(json.result, 'input: hello, init: undefined');
});

it('should set x-genkit-trace-id and x-genkit-span-id headers', async () => {
const ai = genkit({});
const flow = ai.defineFlow('traceFlow', async () => 'ok');
Expand Down
25 changes: 25 additions & 0 deletions js/plugins/next/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,31 @@ for await (const chunk of stream) {
console.log(await output); // output is a promise, must be awaited
```

### Initialization Data

If your flow or action accepts initialization data (defined via `initSchema`), you can pass it using the `init` option in the client:

```ts
const result = await runFlow<typeof myFlow>({
url: '/api/myFlow',
input: 'say hello',
init: { sessionId: 'abc123', config: { temperature: 0.7 } },
});

// Also works with streaming
const { stream, output } = streamFlow<typeof myFlow>({
url: '/api/myFlow',
input: 'say hello',
init: { sessionId: 'abc123' },
});
for await (const chunk of stream) {
console.log(chunk.output);
}
console.log(await output);
```

The `init` data is sent in the request body alongside `data` and is validated against the action's `initSchema` on the server side.

The sources for this package are in the main [Genkit](https://github.com/genkit-ai/genkit) repo. Please file issues and pull requests against that repo.

Usage information and reference details can be found in [official Genkit documentation](https://genkit.dev/docs/get-started/).
Expand Down
1 change: 1 addition & 0 deletions js/plugins/next/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export interface RequestData<T> {
url: string;
headers?: Record<string, string>;
input?: T;
init?: any;
streamId?: string;
abortSignal?: AbortSignal;
}
Expand Down
Loading
Loading