Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/enforce-json-step-outputs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@pgflow/dsl': patch
---

Enforce JSON-compatible step outputs at .step() construction time
6 changes: 6 additions & 0 deletions .changeset/extract-flow-output-skippable-keys-3570.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@pgflow/dsl': patch
'@pgflow/edge-worker': patch
---

Make skippable leaf step keys optional in ExtractFlowOutput type
132 changes: 101 additions & 31 deletions pkgs/dsl/__tests__/types/array-method.test-d.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import { Flow, type StepInput, type ExtractFlowContext } from '../../src/index.js';
import {
Flow,
type StepInput,
type ExtractFlowContext,
} from '../../src/index.js';
import { describe, it, expectTypeOf } from 'vitest';

describe('.array() method type constraints', () => {
Expand All @@ -9,7 +13,10 @@ describe('.array() method type constraints', () => {
.array({ slug: 'objects' }, () => [{ id: 1 }, { id: 2 }])
.array({ slug: 'strings' }, () => ['a', 'b', 'c'])
.array({ slug: 'empty' }, () => [])
.array({ slug: 'nested' }, () => [[1, 2], [3, 4]]);
.array({ slug: 'nested' }, () => [
[1, 2],
[3, 4],
]);
});

it('should accept handlers that return Promise<Array>', () => {
Expand Down Expand Up @@ -42,12 +49,30 @@ describe('.array() method type constraints', () => {
// @ts-expect-error - should reject Promise<null>
.array({ slug: 'invalid_async_null' }, async () => null);
});

it('should reject non-JSON array element shapes', () => {
new Flow<Record<string, never>>({ slug: 'test_json' })
// @ts-expect-error - undefined element is not Json
.array({ slug: 'invalid_undefined_element' }, () => [undefined])
// @ts-expect-error - function element is not Json
.array({ slug: 'invalid_function_element' }, () => [() => 'x'])
// @ts-expect-error - symbol element is not Json
.array({ slug: 'invalid_symbol_element' }, () => [Symbol('x')])
// @ts-expect-error - object property with undefined is not Json
.array({ slug: 'invalid_object_property' }, () => [
{ id: 1, maybe: undefined as string | undefined },
]);
});
});

describe('type inference', () => {
it('should provide correct input types for dependent steps', () => {
new Flow<{ count: number }>({ slug: 'test' })
.array({ slug: 'items' }, (flowInput) => Array(flowInput.count).fill(0).map((_, i) => i))
.array({ slug: 'items' }, (flowInput) =>
Array(flowInput.count)
.fill(0)
.map((_, i) => i)
)
.step({ slug: 'process', dependsOn: ['items'] }, (deps) => {
expectTypeOf(deps).toMatchTypeOf<{
items: number[];
Expand All @@ -58,18 +83,32 @@ describe('.array() method type constraints', () => {

it('should correctly infer element types from arrays', () => {
new Flow<{ userId: string }>({ slug: 'test' })
.array({ slug: 'users' }, () => [{ id: 1, name: 'John' }, { id: 2, name: 'Jane' }])
.array({ slug: 'users' }, () => [
{ id: 1, name: 'John' },
{ id: 2, name: 'Jane' },
])
.step({ slug: 'count_users', dependsOn: ['users'] }, (deps) => {
expectTypeOf(deps.users).toEqualTypeOf<{ id: number; name: string }[]>();
expectTypeOf(deps.users[0]).toMatchTypeOf<{ id: number; name: string }>();
expectTypeOf(deps.users).toEqualTypeOf<
{ id: number; name: string }[]
>();
expectTypeOf(deps.users[0]).toMatchTypeOf<{
id: number;
name: string;
}>();
return deps.users.length;
});
});

it('should handle complex nested array types', () => {
new Flow<{ depth: number }>({ slug: 'test' })
.array({ slug: 'matrix' }, (flowInput) =>
Array(flowInput.depth).fill(0).map(() => Array(3).fill(0).map(() => ({ value: Math.random() })))
Array(flowInput.depth)
.fill(0)
.map(() =>
Array(3)
.fill(0)
.map(() => ({ value: Math.random() }))
)
)
.step({ slug: 'flatten', dependsOn: ['matrix'] }, (deps) => {
expectTypeOf(deps.matrix).toEqualTypeOf<{ value: number }[][]>();
Expand All @@ -83,12 +122,14 @@ describe('.array() method type constraints', () => {
new Flow<{ url: string }>({ slug: 'test' })
.array({ slug: 'data' }, async (flowInput) => {
// Simulate async data fetching
await new Promise(resolve => setTimeout(resolve, 1));
await new Promise((resolve) => setTimeout(resolve, 1));
return [{ url: flowInput.url, status: 200 }];
})
.step({ slug: 'validate', dependsOn: ['data'] }, (deps) => {
expectTypeOf(deps.data).toEqualTypeOf<{ url: string; status: number }[]>();
return deps.data.every(item => item.status === 200);
expectTypeOf(deps.data).toEqualTypeOf<
{ url: string; status: number }[]
>();
return deps.data.every((item) => item.status === 200);
});
});
});
Expand Down Expand Up @@ -119,34 +160,49 @@ describe('.array() method type constraints', () => {

it('should correctly type multi-dependency array steps', () => {
new Flow<{ base: number }>({ slug: 'test' })
.array({ slug: 'numbers' }, (flowInput) => [flowInput.base, flowInput.base + 1])
.array({ slug: 'numbers' }, (flowInput) => [
flowInput.base,
flowInput.base + 1,
])
.array({ slug: 'letters' }, () => ['a', 'b'])
.array({ slug: 'combined', dependsOn: ['numbers', 'letters'] }, (deps) => {
expectTypeOf(deps).toMatchTypeOf<{
numbers: number[];
letters: string[];
}>();
.array(
{ slug: 'combined', dependsOn: ['numbers', 'letters'] },
(deps) => {
expectTypeOf(deps).toMatchTypeOf<{
numbers: number[];
letters: string[];
}>();

return deps.numbers.map((num, i) => ({
number: num,
letter: deps.letters[i] || 'z'
}));
});
return deps.numbers.map((num, i) => ({
number: num,
letter: deps.letters[i] || 'z',
}));
}
);
});
});

describe('context typing', () => {
it('should provide custom context via Flow type parameter', () => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const flow = new Flow<{ id: number }, { api: { get: (id: number) => Promise<any> } }>({ slug: 'test' })
.array({ slug: 'fetch_data' }, (flowInput, context) => {
const flow = new Flow<
{ id: number },
{ api: { get: (id: number) => Promise<any> } }
>({ slug: 'test' }).array(
{ slug: 'fetch_data' },
(flowInput, context) => {
// No handler annotation needed! Type parameter provides context
expectTypeOf(context.api).toEqualTypeOf<{ get: (id: number) => Promise<any> }>();
expectTypeOf(context.env).toEqualTypeOf<Record<string, string | undefined>>();
expectTypeOf(context.api).toEqualTypeOf<{
get: (id: number) => Promise<any>;
}>();
expectTypeOf(context.env).toEqualTypeOf<
Record<string, string | undefined>
>();
expectTypeOf(context.shutdownSignal).toEqualTypeOf<AbortSignal>();

return [{ id: flowInput.id, data: 'mock' }];
});
}
);

// ExtractFlowContext should include FlowContext & custom resources
type FlowCtx = ExtractFlowContext<typeof flow>;
Expand All @@ -160,10 +216,15 @@ describe('.array() method type constraints', () => {
});

it('should share custom context across array and regular steps', () => {
const flow = new Flow<{ count: number }, { generator: () => number; processor: (items: number[]) => string }>({ slug: 'test' })
const flow = new Flow<
{ count: number },
{ generator: () => number; processor: (items: number[]) => string }
>({ slug: 'test' })
.array({ slug: 'items' }, (flowInput, context) => {
// All steps get the same context automatically
return Array(flowInput.count).fill(0).map(() => context.generator());
return Array(flowInput.count)
.fill(0)
.map(() => context.generator());
})
.step({ slug: 'process' }, (flowInput, context) => {
return context.processor([1, 2, 3]);
Expand All @@ -184,14 +245,23 @@ describe('.array() method type constraints', () => {
describe('handler signature validation', () => {
it('should correctly type array step handlers when using getStepDefinition', () => {
const flow = new Flow<{ size: number }>({ slug: 'test' })
.array({ slug: 'data' }, (flowInput, _context) => Array(flowInput.size).fill(0).map((_, i) => ({ index: i })))
.step({ slug: 'dependent', dependsOn: ['data'] }, (deps, _context) => deps.data.length);
.array({ slug: 'data' }, (flowInput, _context) =>
Array(flowInput.size)
.fill(0)
.map((_, i) => ({ index: i }))
)
.step(
{ slug: 'dependent', dependsOn: ['data'] },
(deps, _context) => deps.data.length
);

const arrayStep = flow.getStepDefinition('data');

// Test array step handler type - root steps receive flowInput directly (no run key)
expectTypeOf(arrayStep.handler).toBeFunction();
expectTypeOf(arrayStep.handler).parameter(0).toMatchTypeOf<{ size: number }>();
expectTypeOf(arrayStep.handler)
.parameter(0)
.toMatchTypeOf<{ size: number }>();
expectTypeOf(arrayStep.handler).returns.toMatchTypeOf<
{ index: number }[] | Promise<{ index: number }[]>
>();
Expand Down
165 changes: 165 additions & 0 deletions pkgs/dsl/__tests__/types/compatible-flow.test-d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
import { describe, expectTypeOf, it } from 'vitest';
import {
Flow,
type AnyFlow,
type CompatibleFlow,
type Json,
} from '../../src/index.js';
import {
Flow as SupabaseFlow,
type SupabaseResources,
} from '../../src/platforms/supabase.js';

interface RedisClient {
get: (key: string) => Promise<string | null>;
}

type AcceptsCompatible<
F extends AnyFlow,
PR extends Record<string, unknown>,
UR extends Record<string, unknown> = Record<string, never>
> = (flow: CompatibleFlow<F, PR, UR>) => void;

const acceptCompatible = <
F extends AnyFlow,
PR extends Record<string, unknown>,
UR extends Record<string, unknown> = Record<string, never>
>(
flow: CompatibleFlow<F, PR, UR>
) => {
void flow;
};

describe('CompatibleFlow utility type', () => {
it('accepts flows that only need base FlowContext', () => {
const baseFlow = new Flow<Json>({ slug: 'base-compatible' }).step(
{ slug: 's1' },
(_input, ctx) => ({ hasSignal: !!ctx.shutdownSignal })
);

acceptCompatible<typeof baseFlow, Record<string, never>>(baseFlow);

type Result = CompatibleFlow<typeof baseFlow, Record<string, never>>;
expectTypeOf<Result>().toEqualTypeOf<typeof baseFlow>();
});

it('accepts flows requiring platform resources when provided', () => {
const platformFlow = new SupabaseFlow({ slug: 'platform-compatible' }).step(
{ slug: 'db' },
async (_input, ctx) => {
const rows = await ctx.sql`SELECT 1`;
void ctx.supabase;
return { rows: rows.length };
}
);

acceptCompatible<typeof platformFlow, SupabaseResources>(platformFlow);

type Result = CompatibleFlow<typeof platformFlow, SupabaseResources>;
expectTypeOf<Result>().toEqualTypeOf<typeof platformFlow>();
});

it('rejects flows requiring platform resources when missing', () => {
const platformFlow = new SupabaseFlow({ slug: 'platform-missing' }).step(
{ slug: 'db' },
async (_input, ctx) => {
const rows = await ctx.sql`SELECT 1`;
return { rows: rows.length };
}
);

const accept: AcceptsCompatible<
typeof platformFlow,
Record<string, never>
> = acceptCompatible;
// @ts-expect-error - platform resources are required by flow context
accept(platformFlow);

type Result = CompatibleFlow<typeof platformFlow, Record<string, never>>;
expectTypeOf<Result>().toEqualTypeOf<never>();
});

it('accepts user resources when explicitly provided', () => {
const customCtxFlow = new Flow<Json, { redis: RedisClient }>({
slug: 'user-resource-ok',
}).step({ slug: 'cache' }, async (_input, ctx) => {
const value = await ctx.redis.get('k1');
return { value };
});

acceptCompatible<
typeof customCtxFlow,
Record<string, never>,
{ redis: RedisClient }
>(customCtxFlow);

type Result = CompatibleFlow<
typeof customCtxFlow,
Record<string, never>,
{ redis: RedisClient }
>;
expectTypeOf<Result>().toEqualTypeOf<typeof customCtxFlow>();
});

it('rejects user-resource flows when user resources are omitted', () => {
const customCtxFlow = new Flow<Json, { redis: RedisClient }>({
slug: 'user-resource-missing',
}).step({ slug: 'cache' }, async (_input, ctx) => {
const value = await ctx.redis.get('k1');
return { value };
});

const accept: AcceptsCompatible<
typeof customCtxFlow,
Record<string, never>
> = acceptCompatible;
// @ts-expect-error - missing required user resources
accept(customCtxFlow);

type Result = CompatibleFlow<typeof customCtxFlow, Record<string, never>>;
expectTypeOf<Result>().toEqualTypeOf<never>();
});

it('accepts mixed platform and user resources', () => {
const mixedFlow = new SupabaseFlow<Json, { redis: RedisClient }>({
slug: 'mixed-compatible',
}).step({ slug: 'mixed' }, async (_input, ctx) => {
const rows = await ctx.sql`SELECT 1`;
const value = await ctx.redis.get('k1');
void ctx.supabase;
return { rows: rows.length, value };
});

acceptCompatible<
typeof mixedFlow,
SupabaseResources,
{ redis: RedisClient }
>(mixedFlow);

type Result = CompatibleFlow<
typeof mixedFlow,
SupabaseResources,
{ redis: RedisClient }
>;
expectTypeOf<Result>().toEqualTypeOf<typeof mixedFlow>();
});

it('is invariant to optional output keys in step outputs', () => {
const optionalOutputFlow = new SupabaseFlow({
slug: 'optional-output-flow',
})
.step({ slug: 'producer' }, (): { entryId?: string } =>
Math.random() > 0.5 ? { entryId: 'entry-1' } : {}
)
.step({ slug: 'consumer', dependsOn: ['producer'] }, (deps) => ({
hasEntry: 'entryId' in deps.producer,
}));

acceptCompatible<typeof optionalOutputFlow, SupabaseResources>(
optionalOutputFlow
);

type Result = CompatibleFlow<typeof optionalOutputFlow, SupabaseResources>;
expectTypeOf<Result>().toEqualTypeOf<typeof optionalOutputFlow>();
});
});
Loading
Loading