This guide explains the overall architecture of pgflow, providing a navigational map to the codebase and documenting cross-cutting concepts that span multiple packages.
pgflow is a PostgreSQL-native workflow orchestration engine that replaces external control planes (like Airflow or Temporal) with a zero-deployment, database-centric approach.
Core Philosophy: Postgres is the single source of truth for all workflow definitions, state, and execution.
- Postgres = Single Source of Truth - All workflow definitions, state transitions, and task queues live in the database
- Opinionated over Configurable - Clear conventions (DAGs only, JSON serialization, topological order)
- Robust Yet Simple - ACID transactions and reliable queue processing without exotic features
- Compile-Time Safety - TypeScript type checking and SQL referential integrity prevent runtime errors
- Serverless-Ready - Stateless workers enable horizontal scaling without coordination overhead
pgflow separates concerns across three distinct layers. Each layer has a clear responsibility and knows nothing about the implementation details of other layers.
What it is: TypeScript API for defining workflows with compile-time type safety.
Location: /pkgs/dsl
Mental model: Developer writes workflow intent. Doesn't know about execution or state management.
Key Methods:
.step({ slug, dependsOn?, ...options }, handler)- Define a step.array({ slug, dependsOn?, ...options }, handler)- Step returning array (semantic wrapper).map({ slug, array?, ...options }, handler)- Process array elements in parallel (no dependsOn)
Map Step Modes:
- Root map - Process flow input array (no
array:property) - Dependent map - Process another step's array (with
array: 'step_slug')
Comprehensive DSL Example (shows all step types and options):
import { Flow } from '@pgflow/dsl/supabase';
// Flow with all options
const CompleteExample = new Flow<{ urls: string[] }>({
slug: 'completeExample',
maxAttempts: 3, // Flow-level default
timeout: 60, // Flow-level default
baseDelay: 2, // Flow-level default
})
// Regular step with step-level overrides
.step(
{
slug: 'config',
maxAttempts: 5, // Override flow default
timeout: 120, // Override flow default
startDelay: 10, // Delay before starting (step-specific)
},
async (input, context) => {
// context.env - Environment variables
// context.shutdownSignal - Graceful shutdown signal
// context.stepTask - Current task details (run_id, step_slug, input, msg_id, task_index)
// context.workerConfig - Worker configuration (read-only)
// context.sql - PostgreSQL client (Supabase preset)
// context.supabase - Supabase client (Supabase preset)
return { apiKey: context.env.API_KEY };
}
)
// Root map - processes flow input array
.map(
{ slug: 'fetchUrls' },
async (url, context) => {
// Receives individual array element, not full array
const response = await fetch(url);
return { url, html: await response.text() };
}
)
// Array step enriches fetched pages with config data
.array(
{ slug: 'enrichedPages', dependsOn: ['fetchUrls', 'config'] },
async (input) => {
// Enrich each page with config data
return input.fetchUrls.map(page => ({
...page,
apiKey: input.config.apiKey
}));
}
)
// Dependent map - processes enriched array
.map(
{ slug: 'extractLinks', array: 'enrichedPages' },
async (enrichedPage) => {
// Has both page data and apiKey from enrichment
return extractLinks(enrichedPage.html, enrichedPage.apiKey);
}
)
// Array step - returns array (semantic, same as .step())
.array(
{ slug: 'aggregate', dependsOn: ['extractLinks'] },
async (input) => {
// Receives full input object with all previous step outputs
return input.extractLinks.flat();
}
)
// Regular step depending on multiple steps
.step(
{ slug: 'save', dependsOn: ['aggregate', 'config'] },
async (input, context) => {
await context.sql`
INSERT INTO results (data) VALUES (${input.aggregate})
`;
return { saved: true };
}
);
export default CompleteExample;Compilation:
npx pgflow compile path/to/flow.ts
# Generates migration with:
# - SELECT pgflow.create_flow(...)
# - SELECT pgflow.add_step(...) for each stepImportant: See DSL package files for:
- Type inference utilities:
ExtractFlowInput,ExtractFlowOutput,StepInput,StepOutput - Handler signature details:
/pkgs/dsl/src/types.ts - Supabase preset:
/pkgs/dsl/supabase/
What it is: Pure SQL implementation of workflow orchestration logic.
Location: /pkgs/core
Mental model: State machine for DAG execution. Doesn't know about DSL syntax or how handlers execute.
Schema (see /pkgs/core/src/migrations/):
Definition tables:
flows,steps,deps
Runtime tables:
runs,step_states,step_tasks,workers
Custom types:
step_task_record- Return type forstart_tasks()
Core Functions (see pgflow--*.sql migration files):
Flow Definition:
create_flow,add_step,is_valid_slug
Flow Execution:
start_flow,start_flow_with_states,get_run_with_states
Task Management:
start_tasks,complete_task,fail_task
Orchestration:
start_ready_steps- Starts steps whose dependencies are completecascade_complete_taskless_steps- Handles empty array propagationmaybe_complete_run- Completes run when all steps done, aggregates outputs
Utilities:
calculate_retry_delay,set_vt_batch,read_with_poll
Critical Cross-Cutting Concepts:
- Two-Phase Polling - Worker calls
read_with_poll()thenstart_tasks(workerId)to prevent race conditions - Empty Array Cascade - When
initial_tasks=0,cascade_complete_taskless_steps()completes entire dependent chain in one transaction - Map Step
initial_tasksLifecycle:- Root maps: Set at flow start from input array length
- Dependent maps: Set to
NULL, resolved when parent completes - Empty arrays: Set to
0, triggers taskless completion
- Type Validation -
complete_task()validates non-array to map step fails entire run - Retry Mechanism -
fail_task()implements retry with exponential backoff viacalculate_retry_delay() - Run Completion -
maybe_complete_run()aggregates leaf step outputs only (steps with no dependents)
Realtime Events (emitted via realtime.send() for Client subscriptions):
run:started,run:completed,run:failedstep:started,step:completed,step:failed
What it is: Stateless task executor that polls queues and runs handler functions.
Location: /pkgs/edge-worker
Mental model: Execution engine. Doesn't know where tasks come from or overall workflow state.
Worker Lifecycle:
acknowledgeStart()- Register worker withworkerIdin database- Main loop:
sendHeartbeat()- Update status, check deprecation- If deprecated → exit gracefully
- Two-phase polling:
readMessages()thenstartTasks(workerId) - Execute handlers (up to
maxConcurrentparallel) complete_task()orfail_task()
- On shutdown:
transitionToStopping(), wait for tasks,acknowledgeStop(), close SQL connection
Configuration (see FlowWorkerConfig type in /pkgs/edge-worker/src/types/):
maxConcurrent,maxPgConnections,batchSize,visibilityTimeout,maxPollSeconds,pollIntervalMs
Comprehensive Worker Example (shows all features):
import { createFlowWorker } from '@pgflow/edge-worker';
import { createClient } from '@supabase/supabase-js';
import { MyFlow } from '../../flows/my_flow.ts';
// Create Supabase client
const supabase = createClient(
Deno.env.get('SUPABASE_URL')!,
Deno.env.get('SUPABASE_SERVICE_ROLE_KEY')!
);
// Create worker with all configuration options
const worker = createFlowWorker(supabase, MyFlow, {
// Queue configuration
queueName: 'tasks', // Default: 'tasks'
// Polling configuration
maxPollSeconds: 2, // Default: 2
pollIntervalMs: 100, // Default: 100
batchSize: 10, // Default: 10
visibilityTimeout: 2, // Default: 2
// Concurrency configuration
maxConcurrent: 10, // Default: 10
maxPgConnections: 4, // Default: 4
// Retry configuration (exponential backoff)
retry: {
strategy: 'exponential', // or 'fixed'
delay: 1000, // Base delay in ms
maxAttempts: 3, // Max retry attempts
},
});
// Start worker (runs until shutdown signal)
await worker.start();
// Worker provides context to handlers:
// - context.env - Environment variables
// - context.shutdownSignal - Graceful shutdown detection
// - context.stepTask - Current task (flow_slug, run_id, step_slug, input, msg_id, task_index)
// - context.workerConfig - Configuration (read-only, frozen)
// - context.rawMessage - Full pgmq message (msg_id, read_ct, enqueued_at, vt)
// - context.sql - PostgreSQL client (Supabase)
// - context.supabase - Supabase client (Supabase)Important: See Edge Worker package files for:
- Platform adapter abstraction:
/pkgs/edge-worker/src/adapters/ - Worker states:
Starting,Running,Stopping,Stopped,Deprecated - Context types:
/pkgs/edge-worker/src/core/context.ts - Auto-restart mechanism:
spawnNewEdgeFunction()in Supabase adapter
Location: /pkgs/cli
Commands:
pgflow install [--supabase-path <path>] [-y, --yes]- Sets up pgflowpgflow compile <flow.ts> [--deno-json <path>] [--supabase-path <path>]- Compiles flows to SQL
See: /pkgs/cli/src/commands/ for implementation details
Location: /pkgs/client
Core API (see /pkgs/client/src/PgflowClient.ts):
startFlow(slug, input)- Start new rungetRun(run_id)- Retrieve and subscribe to existing rundispose(run_id),disposeAll()- Cleanup
FlowRun API (see /pkgs/client/src/FlowRun.ts):
.on(event, handler)- Events:'started','completed','failed','*'.waitForStatus(status, options?)- SupportsAbortSignal.step(slug)- Get FlowStep instance
FlowStep API (see /pkgs/client/src/FlowStep.ts):
.on(event, handler)- Same event types as FlowRun.waitForStatus(status, options?)- Step-level status waiting
Realtime Subscription: Uses SupabaseBroadcastAdapter internally, subscribes via subscribeToRun(run_id)
Why: Prevents race condition where worker processes message before step_tasks record exists.
How:
- Phase 1: Worker calls
read_with_poll()- reserves messages, returnsmsg_ids - Phase 2: Worker calls
start_tasks(flow_slug, msg_ids, workerId)- createsstep_tasks, returns details
See:
- Worker implementation:
/pkgs/edge-worker/src/worker/FlowWorkerLifecycle.ts - SQL implementation:
/pkgs/core/src/migrations/pgflow--*.sql(functions:read_with_poll,start_tasks)
What: When map step receives empty array, entire dependent chain completes without spawning tasks.
Why: Prevents infinite waiting on steps that will never have tasks.
How:
- DSL: Map steps compiled with
step_type => 'map' - SQL Core:
complete_task()detects array parent completed with[], sets dependent mapinitial_tasks=0 - SQL Core:
cascade_complete_taskless_steps()completes allinitial_tasks=0steps in chain (max 50 iterations)
See:
- DSL compilation:
/pkgs/dsl/src/compiler/compileFlow.ts - Cascade logic:
/pkgs/core/src/migrations/(function:cascade_complete_taskless_steps)
Flow: SQL Core → Postgres triggers → Supabase Realtime → Client subscriptions → Application handlers
Events:
run:started,run:completed,run:failedstep:started,step:completed,step:failed
See:
- SQL emission:
/pkgs/core/src/migrations/(search forrealtime.send()) - Client subscription:
/pkgs/client/src/adapters/SupabaseBroadcastAdapter.ts
Root map:
- DSL: No
array:property - SQL Core:
initial_tasksset atstart_flow()from input array length
Dependent map:
- DSL: Has
array: 'parent_step_slug' - SQL Core:
initial_tasksNULL initially, resolved incomplete_task()when parent completes
See:
- DSL implementation:
/pkgs/dsl/src/flow/Flow.ts(.map()method) - SQL resolution:
/pkgs/core/src/migrations/(function:complete_task)
- Slugs:
[a-zA-Z_][a-zA-Z0-9_]*, 1-128 chars (cannot be 'run') - DAG Only: No cycles or conditional edges
- Topological Order: Steps added in dependency order (FK enforced)
- JSON Serializable: All inputs/outputs must be JSON-compatible
- Immutable Data: Inputs and outputs never change after creation
Validation: is_valid_slug() function in SQL Core
| Component | Technologies |
|---|---|
| DSL | TypeScript, JSON Schema |
| SQL Core | PostgreSQL ≥14, pgmq |
| Worker | Deno runtime, postgres.js |
| Platform | Supabase (Edge Functions, Realtime) |
| CLI | Node.js, clack |
| Client | TypeScript, @supabase/supabase-js |
| Monorepo | nx |
/pkgs/
├── core/ # SQL Core - PostgreSQL functions, migrations
├── dsl/ # DSL - Flow definition API
│ └── supabase/ # Supabase-specific preset
├── edge-worker/ # Edge Worker - Task executor
├── client/ # Client - Observation API
├── cli/ # CLI - Development tools
├── example-flows/ # Example implementations
└── website/ # Documentation (Starlight)
pgflow achieves reliable workflow orchestration through clear separation of concerns:
- DSL Layer - Developer experience and type safety
- SQL Core Layer - State management and dependency resolution
- Edge Worker Layer - Reliable task execution
- CLI - Development tooling
- Client - Application integration
Each layer solves its own class of problems without leaking concerns. The database serves as the authoritative source of truth, eliminating the need for external coordinators and enabling horizontal scaling through stateless workers.