Skip to content

Commit 84c189c

Browse files
authored
feat: add conditional flow execution and retry handling (#600)
# Add conditional flow execution and retry handling This PR introduces conditional flow execution and retry handling capabilities to the PGFlow system. It adds: 1. Conditional step execution with `if` and `ifNot` patterns that can: - Skip steps when conditions are not met - Optionally cascade skips to dependent steps - Fail the run when required conditions aren't met 2. Retry exhaustion handling that can: - Skip steps after retries are exhausted - Cascade skips to dependent steps - Configure different behavior per step The implementation includes comprehensive test coverage for: - Basic conditional execution with required and forbidden patterns - Skip cascading behavior to dependent steps - Non-cascading skips where downstream steps can still run - Failure handling when conditions aren't met - Multiple retry attempts before skipping - Combined condition and failure handling scenarios These features enable more robust workflow definitions with better handling of optional steps, error conditions, and dependencies between steps.
1 parent 320a106 commit 84c189c

3 files changed

Lines changed: 1076 additions & 0 deletions

File tree

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import type { postgres } from '../../sql.ts';
2+
import { compileFlow } from '@pgflow/dsl';
3+
import type { AnyFlow } from '@pgflow/dsl';
4+
5+
// ============= Test Helpers for Conditional Flow Integration Tests =============
6+
7+
/**
8+
* Compiles a Flow and executes SQL statements to create it in the database.
9+
*/
10+
export const createFlowInDb = async (sql: postgres.Sql, flow: AnyFlow) => {
11+
const statements = compileFlow(flow);
12+
for (const stmt of statements) {
13+
await sql.unsafe(stmt);
14+
}
15+
};
16+
17+
/**
18+
* Extended step state info including skip details
19+
*/
20+
export interface StepStateWithSkip {
21+
step_slug: string;
22+
status: string;
23+
skip_reason: string | null;
24+
skipped_at: string | null;
25+
}
26+
27+
/**
28+
* Get step states with skip information
29+
*/
30+
export const getStepStatesWithSkip = async (
31+
sql: postgres.Sql,
32+
runId: string
33+
): Promise<StepStateWithSkip[]> => {
34+
return await sql<StepStateWithSkip[]>`
35+
SELECT step_slug, status, skip_reason, skipped_at
36+
FROM pgflow.step_states
37+
WHERE run_id = ${runId}
38+
ORDER BY step_slug;
39+
`;
40+
};
41+
42+
/**
43+
* Extended task info including error details
44+
*/
45+
export interface TaskWithError {
46+
step_slug: string;
47+
status: string;
48+
error_message: string | null;
49+
attempts_count: number;
50+
}
51+
52+
/**
53+
* Get step tasks with error information
54+
*/
55+
export const getStepTasksWithError = async (
56+
sql: postgres.Sql,
57+
runId: string
58+
): Promise<TaskWithError[]> => {
59+
return await sql<TaskWithError[]>`
60+
SELECT step_slug, status, error_message, attempts_count
61+
FROM pgflow.step_tasks
62+
WHERE run_id = ${runId}
63+
ORDER BY step_slug, task_index;
64+
`;
65+
};

0 commit comments

Comments
 (0)