diff --git a/js/genkit/src/client/client.ts b/js/genkit/src/client/client.ts index 82b70308e1..3bb0b3678a 100644 --- a/js/genkit/src/client/client.ts +++ b/js/genkit/src/client/client.ts @@ -36,9 +36,10 @@ const __flowStreamDelimiter = '\n\n'; * console.log(await response.output); * ``` */ -export function streamFlow({ +export function streamFlow({ url, input, + init, streamId, headers, abortSignal, @@ -47,6 +48,8 @@ export function streamFlow({ url: string; /** Flow input. */ input?: any; + /** Initialization data for the action. */ + init?: Init; /** Stream ID to connect to. */ streamId?: string; /** A map of HTTP headers to be added to the HTTP call. */ @@ -64,6 +67,7 @@ export function streamFlow({ const operationPromise = __flowRunEnvelope({ url, input, + init, sendChunk: (c) => channel.send(c), headers: streamId ? { ...headers, 'x-genkit-stream-id': streamId } @@ -88,6 +92,7 @@ export function streamFlow({ async function __flowRunEnvelope({ url, input, + init, sendChunk, headers, abortSignal, @@ -95,6 +100,7 @@ async function __flowRunEnvelope({ }: { url: string; input: any; + init?: any; sendChunk: (chunk: any) => void; headers?: Record; abortSignal?: AbortSignal; @@ -104,6 +110,7 @@ async function __flowRunEnvelope({ method: 'POST', body: JSON.stringify({ data: input, + ...(init !== undefined && { init }), }), headers: { Accept: 'text/event-stream', @@ -177,9 +184,10 @@ async function __flowRunEnvelope({ * console.log(await response); * ``` */ -export async function runFlow({ +export async function runFlow({ url, input, + init, headers, abortSignal, }: { @@ -187,6 +195,8 @@ export async function runFlow({ url: string; /** Flow input. */ input?: any; + /** Initialization data for the action. */ + init?: Init; /** A map of HTTP headers to be added to the HTTP call. */ headers?: Record; /** Abort signal to abort the request. */ @@ -196,6 +206,7 @@ export async function runFlow({ method: 'POST', body: JSON.stringify({ data: input, + ...(init !== undefined && { init }), }), headers: { 'Content-Type': 'application/json', diff --git a/js/plugins/express/README.md b/js/plugins/express/README.md index aed75b661d..7e75085b3d 100644 --- a/js/plugins/express/README.md +++ b/js/plugins/express/README.md @@ -131,6 +131,31 @@ for await (const chunk of result.stream) { console.log(await result.output); ``` +### Initialization Data + +If your flow or action accepts initialization data (defined via `initSchema`), you can pass it using the `init` option in the client: + +```ts +const result = await runFlow({ + url: `http://localhost:${port}/myFlow`, + input: 'say hello', + init: { sessionId: 'abc123', config: { temperature: 0.7 } }, +}); + +// Also works with streaming +const streamed = streamFlow({ + url: `http://localhost:${port}/myFlow`, + input: 'say hello', + init: { sessionId: 'abc123' }, +}); +for await (const chunk of streamed.stream) { + console.log(chunk); +} +console.log(await streamed.output); +``` + +The `init` data is sent in the request body alongside `data` and is validated against the action's `initSchema` on the server side. + You can also use `startFlowServer` to quickly expose multiple flows and actions: ```ts diff --git a/js/plugins/express/src/index.ts b/js/plugins/express/src/index.ts index b7ff953b11..9dc8f0971e 100644 --- a/js/plugins/express/src/index.ts +++ b/js/plugins/express/src/index.ts @@ -47,8 +47,9 @@ export function expressHandler< I extends z.ZodTypeAny = z.ZodTypeAny, O extends z.ZodTypeAny = z.ZodTypeAny, S extends z.ZodTypeAny = z.ZodTypeAny, + Init extends z.ZodTypeAny = z.ZodTypeAny, >( - action: Action, + action: Action, opts?: { contextProvider?: ContextProvider; streamManager?: StreamManager; @@ -78,6 +79,7 @@ export function expressHandler< } const input = request.body.data as z.infer; + const init = request.body.init; let context: Record; try { @@ -129,6 +131,7 @@ export function expressHandler< streamManager, streamIdToUse, input, + init, context, response, abortController.signal @@ -138,6 +141,7 @@ export function expressHandler< const result = await action.run(input, { context, abortSignal: abortController.signal, + init, }); response.setHeader('x-genkit-trace-id', result.telemetry.traceId); response.setHeader('x-genkit-span-id', result.telemetry.spanId); @@ -168,6 +172,7 @@ async function runActionWithDurableStreaming< streamManager: StreamManager | undefined, streamId: string, input: z.infer, + init: any, context: ActionContext, response: express.Response, abortSignal: AbortSignal @@ -195,6 +200,7 @@ async function runActionWithDurableStreaming< onChunk, context, abortSignal, + init, }); if (streamManager) { taskQueue!.enqueue(() => durableStream!.done(result.result)); diff --git a/js/plugins/express/tests/express_test.ts b/js/plugins/express/tests/express_test.ts index 3dbed0c6bb..e8ef2aa5a0 100644 --- a/js/plugins/express/tests/express_test.ts +++ b/js/plugins/express/tests/express_test.ts @@ -161,6 +161,26 @@ describe('expressHandler', async () => { '/echoModelWithAuth', expressHandler(echoModel, { contextProvider }) ); + // A flow that echoes back the init data to verify it was received. + const flowWithInit = ai.defineFlow( + { + name: 'flowWithInit', + inputSchema: z.string(), + }, + async (input) => { + return `input: ${input}`; + } + ); + // Monkey-patch the run method to capture and return init data. + const originalRun = flowWithInit.run.bind(flowWithInit); + flowWithInit.run = async (input: any, options: any) => { + const result = await originalRun(input, options); + // Embed init in the result so we can verify it was passed through. + result.result = `input: ${input}, init: ${JSON.stringify(options?.init)}`; + return result; + }; + + app.post('/flowWithInit', expressHandler(flowWithInit)); app.post('/abortableFlow', expressHandler(abortableFlow)); server = app.listen(port, () => { @@ -289,6 +309,26 @@ describe('expressHandler', async () => { }); }); + it('should pass init data to the action', async () => { + const result = await runFlow({ + url: `http://localhost:${port}/flowWithInit`, + input: 'hello', + init: { sessionId: 'abc123', temperature: 0.7 }, + }); + assert.strictEqual( + result, + 'input: hello, init: {"sessionId":"abc123","temperature":0.7}' + ); + }); + + it('should pass undefined init when not provided', async () => { + const result = await runFlow({ + url: `http://localhost:${port}/flowWithInit`, + input: 'hello', + }); + assert.strictEqual(result, 'input: hello, init: undefined'); + }); + // TODO: This test is flaky, skipping until fixed. it.skip('should abort a flow with auth', async () => { const controller = new AbortController(); diff --git a/js/plugins/fetch/README.md b/js/plugins/fetch/README.md index d6aba2fca9..2400a25f87 100644 --- a/js/plugins/fetch/README.md +++ b/js/plugins/fetch/README.md @@ -159,6 +159,31 @@ for await (const chunk of result.stream) { console.log(await result.output); ``` +### Initialization Data + +If your flow or action accepts initialization data (defined via `initSchema`), you can pass it using the `init` option in the client: + +```ts +const result = await runFlow({ + url: 'http://localhost:3780/api/myFlow', + input: 'say hello', + init: { sessionId: 'abc123', config: { temperature: 0.7 } }, +}); + +// Also works with streaming +const streamed = streamFlow({ + url: 'http://localhost:3780/api/myFlow', + input: 'say hello', + init: { sessionId: 'abc123' }, +}); +for await (const chunk of streamed.stream) { + console.log(chunk); +} +console.log(await streamed.output); +``` + +The `init` data is sent in the request body alongside `data` and is validated against the action's `initSchema` on the server side. + ## API summary | Export | Description | diff --git a/js/plugins/fetch/src/index.ts b/js/plugins/fetch/src/index.ts index c87dd8c8f8..95c47a77bb 100644 --- a/js/plugins/fetch/src/index.ts +++ b/js/plugins/fetch/src/index.ts @@ -213,6 +213,7 @@ async function runActionWithDurableStreaming< streamManager: StreamManager | undefined, streamId: string, input: z.infer, + init: any, context: ActionContext, writer: WritableStreamDefaultWriter, abortSignal: AbortSignal @@ -247,6 +248,7 @@ async function runActionWithDurableStreaming< onChunk, context, abortSignal, + init, }); if (streamManager && durableStream) { @@ -344,6 +346,7 @@ async function handleActionRequest< } const input = body.data as z.infer; + const init = body.init; let context: C; try { @@ -381,6 +384,7 @@ async function handleActionRequest< options?.streamManager, streamIdToUse, input, + init, context, writer, request.signal @@ -409,6 +413,7 @@ async function handleActionRequest< const result = await action.run(input, { context, abortSignal: request.signal, + init, }); const headers: Record = { diff --git a/js/plugins/fetch/tests/web_test.ts b/js/plugins/fetch/tests/web_test.ts index 1608c0a68f..1d09144160 100644 --- a/js/plugins/fetch/tests/web_test.ts +++ b/js/plugins/fetch/tests/web_test.ts @@ -314,6 +314,59 @@ describe('fetchHandler (single action)', () => { assert.ok(json.message?.includes('not authorized')); }); + it('should pass init data to the action', async () => { + const ai = genkit({}); + const flow = ai.defineFlow( + { name: 'flowWithInit', inputSchema: z.string() }, + async (input) => `input: ${input}` + ); + // Monkey-patch the run method to capture and return init data. + const originalRun = flow.run.bind(flow); + flow.run = async (input: any, options: any) => { + const result = await originalRun(input, options); + result.result = `input: ${input}, init: ${JSON.stringify(options?.init)}`; + return result; + }; + const request = new Request('http://localhost/flowWithInit', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + data: 'hello', + init: { sessionId: 'abc123', temperature: 0.7 }, + }), + }); + const response = await fetchHandler(flow)(request); + assert.strictEqual(response.status, 200); + const json = (await response.json()) as { result: string }; + assert.strictEqual( + json.result, + 'input: hello, init: {"sessionId":"abc123","temperature":0.7}' + ); + }); + + it('should pass undefined init when not provided', async () => { + const ai = genkit({}); + const flow = ai.defineFlow( + { name: 'flowWithInit', inputSchema: z.string() }, + async (input) => `input: ${input}` + ); + const originalRun = flow.run.bind(flow); + flow.run = async (input: any, options: any) => { + const result = await originalRun(input, options); + result.result = `input: ${input}, init: ${JSON.stringify(options?.init)}`; + return result; + }; + const request = new Request('http://localhost/flowWithInit', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ data: 'hello' }), + }); + const response = await fetchHandler(flow)(request); + assert.strictEqual(response.status, 200); + const json = (await response.json()) as { result: string }; + assert.strictEqual(json.result, 'input: hello, init: undefined'); + }); + it('should set x-genkit-trace-id and x-genkit-span-id headers', async () => { const ai = genkit({}); const flow = ai.defineFlow('traceFlow', async () => 'ok'); diff --git a/js/plugins/next/README.md b/js/plugins/next/README.md index c491c67bd3..50c720bd34 100644 --- a/js/plugins/next/README.md +++ b/js/plugins/next/README.md @@ -98,6 +98,31 @@ for await (const chunk of stream) { console.log(await output); // output is a promise, must be awaited ``` +### Initialization Data + +If your flow or action accepts initialization data (defined via `initSchema`), you can pass it using the `init` option in the client: + +```ts +const result = await runFlow({ + url: '/api/myFlow', + input: 'say hello', + init: { sessionId: 'abc123', config: { temperature: 0.7 } }, +}); + +// Also works with streaming +const { stream, output } = streamFlow({ + url: '/api/myFlow', + input: 'say hello', + init: { sessionId: 'abc123' }, +}); +for await (const chunk of stream) { + console.log(chunk.output); +} +console.log(await output); +``` + +The `init` data is sent in the request body alongside `data` and is validated against the action's `initSchema` on the server side. + The sources for this package are in the main [Genkit](https://github.com/genkit-ai/genkit) repo. Please file issues and pull requests against that repo. Usage information and reference details can be found in [official Genkit documentation](https://genkit.dev/docs/get-started/). diff --git a/js/plugins/next/src/client.ts b/js/plugins/next/src/client.ts index f70378eff8..179e4537a6 100644 --- a/js/plugins/next/src/client.ts +++ b/js/plugins/next/src/client.ts @@ -31,6 +31,7 @@ export interface RequestData { url: string; headers?: Record; input?: T; + init?: any; streamId?: string; abortSignal?: AbortSignal; } diff --git a/js/plugins/next/src/index.ts b/js/plugins/next/src/index.ts index b8a94ecbbe..7b3ddbe582 100644 --- a/js/plugins/next/src/index.ts +++ b/js/plugins/next/src/index.ts @@ -155,7 +155,7 @@ function appRoute< ) { return async (req: NextRequest): Promise => { let context: C = {} as C; - const { data: input } = await req.json(); + const { data: input, init } = await req.json(); const streamId = req.headers.get('x-genkit-stream-id'); if (req.headers.get('accept') !== 'text/event-stream') { try { @@ -171,6 +171,7 @@ function appRoute< const resp = await action.run(input, { context, abortSignal: req.signal, + init, }); const response = NextResponse.json({ result: resp.result }); if (opts?.streamManager && streamId) { @@ -223,6 +224,7 @@ function appRoute< const output = action.run(input, { context, abortSignal: req.signal, + init, onChunk: (chunk) => { if (durableStream) { taskQueue.enqueue(() => durableStream!.write(chunk)); diff --git a/js/plugins/next/tests/index_test.ts b/js/plugins/next/tests/index_test.ts index e3f2b59adf..57b8832023 100644 --- a/js/plugins/next/tests/index_test.ts +++ b/js/plugins/next/tests/index_test.ts @@ -292,6 +292,75 @@ describe('appRoute', () => { ]); }); + it('passes init data to the action', async () => { + const ai = genkit({}); + const flowWithInit = ai.defineFlow( + { + name: 'flowWithInit', + inputSchema: z.string(), + outputSchema: z.string(), + }, + async (input) => `input: ${input}` + ); + // Monkey-patch the run method to capture and return init data. + const originalRun = flowWithInit.run.bind(flowWithInit); + flowWithInit.run = async (input: any, options: any) => { + const result = await originalRun(input, options); + result.result = `input: ${input}, init: ${JSON.stringify(options?.init)}`; + return result; + }; + + const route = appRoute(flowWithInit); + + // Non-streaming: init provided + const request = new NextRequest('http://localhost/api/data', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + data: 'hello', + init: { sessionId: 'abc123', temperature: 0.7 }, + }), + }); + let response = await route(request); + expect(response.status).toEqual(200); + expect(await response.json()).toEqual({ + result: 'input: hello, init: {"sessionId":"abc123","temperature":0.7}', + }); + + // Non-streaming: init not provided + const request2 = new NextRequest('http://localhost/api/data', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ data: 'hello' }), + }); + response = await route(request2); + expect(response.status).toEqual(200); + expect(await response.json()).toEqual({ + result: 'input: hello, init: undefined', + }); + + // Streaming: init provided + const request3 = new NextRequest('http://localhost/api/data', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + accept: 'text/event-stream', + }, + body: JSON.stringify({ + data: 'hello', + init: { sessionId: 'abc123' }, + }), + }); + response = await route(request3); + expect(response.status).toEqual(200); + const parsed = chunks(await response.text()); + const dataChunk = parsed.find((c) => c.type === 'data'); + expect(dataChunk).toBeDefined(); + expect((dataChunk as any).content.result).toEqual( + 'input: hello, init: {"sessionId":"abc123"}' + ); + }); + describe('durable streaming', () => { const streamingFlow = ai.defineFlow( {