-
Notifications
You must be signed in to change notification settings - Fork 17
Expand file tree
/
Copy path0120_function_start_tasks.sql
More file actions
183 lines (181 loc) · 6.56 KB
/
0120_function_start_tasks.sql
File metadata and controls
183 lines (181 loc) · 6.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
create or replace function pgflow.start_tasks(
flow_slug text,
msg_ids bigint [],
worker_id uuid
)
returns setof pgflow.step_task_record
volatile
set search_path to ''
language sql
as $$
with tasks as (
select
task.flow_slug,
task.run_id,
task.step_slug,
task.task_index,
task.message_id
from pgflow.step_tasks as task
join pgflow.runs r on r.run_id = task.run_id
where task.flow_slug = start_tasks.flow_slug
and task.message_id = any(msg_ids)
and task.status = 'queued'
and r.status = 'started'
and exists (
select 1
from pgflow.step_states ss
where ss.run_id = task.run_id
and ss.step_slug = task.step_slug
and ss.status = 'started'
)
),
start_tasks_update as (
update pgflow.step_tasks
set
attempts_count = attempts_count + 1,
status = 'started',
started_at = now(),
last_worker_id = worker_id
from tasks
where step_tasks.message_id = tasks.message_id
and step_tasks.flow_slug = tasks.flow_slug
and step_tasks.status = 'queued'
),
runs as (
select
r.run_id,
r.input
from pgflow.runs r
where r.run_id in (select run_id from tasks)
),
deps as (
select
st.run_id,
st.step_slug,
dep.dep_slug,
-- Read output directly from step_states (already aggregated by writers)
dep_state.output as dep_output
from tasks st
join pgflow.deps dep on dep.flow_slug = st.flow_slug and dep.step_slug = st.step_slug
join pgflow.step_states dep_state on
dep_state.run_id = st.run_id and
dep_state.step_slug = dep.dep_slug and
dep_state.status = 'completed' -- Only include completed deps (not skipped)
),
deps_outputs as (
select
d.run_id,
d.step_slug,
jsonb_object_agg(d.dep_slug, d.dep_output) as deps_output,
count(*) as dep_count
from deps d
group by d.run_id, d.step_slug
),
timeouts as (
select
task.message_id,
task.flow_slug,
coalesce(step.opt_timeout, flow.opt_timeout) + 2 as vt_delay
from tasks task
join pgflow.flows flow on flow.flow_slug = task.flow_slug
join pgflow.steps step on step.flow_slug = task.flow_slug and step.step_slug = task.step_slug
),
-- Batch update visibility timeouts for all messages
set_vt_batch as (
select pgflow.set_vt_batch(
start_tasks.flow_slug,
array_agg(t.message_id order by t.message_id),
array_agg(t.vt_delay order by t.message_id)
)
from timeouts t
)
select
st.flow_slug,
st.run_id,
st.step_slug,
-- ==========================================
-- INPUT CONSTRUCTION LOGIC
-- ==========================================
-- This nested CASE statement determines how to construct the input
-- for each task based on the step type (map vs non-map).
--
-- The fundamental difference:
-- - Map steps: Receive RAW array elements (e.g., just 42 or "hello")
-- - Non-map steps: Receive structured objects with named keys
-- (e.g., {"run": {...}, "dependency1": {...}})
-- ==========================================
CASE
-- -------------------- MAP STEPS --------------------
-- Map steps process arrays element-by-element.
-- Each task receives ONE element from the array at its task_index position.
WHEN step.step_type = 'map' THEN
-- Map steps get raw array elements without any wrapper object
CASE
-- ROOT MAP: Gets array from run input
-- Example: run input = [1, 2, 3]
-- task 0 gets: 1
-- task 1 gets: 2
-- task 2 gets: 3
WHEN step.deps_count = 0 THEN
-- Root map (deps_count = 0): no dependencies, reads from run input.
-- Extract the element at task_index from the run's input array.
-- Note: If run input is not an array, this will return NULL
-- and the flow will fail (validated in start_flow).
jsonb_array_element(r.input, st.task_index)
-- DEPENDENT MAP: Gets array from its single dependency
-- Example: dependency output = ["a", "b", "c"]
-- task 0 gets: "a"
-- task 1 gets: "b"
-- task 2 gets: "c"
ELSE
-- Has dependencies (should be exactly 1 for map steps).
-- Extract the element at task_index from the dependency's output array.
--
-- Why the subquery with jsonb_each?
-- - The dependency outputs a raw array: [1, 2, 3]
-- - deps_outputs aggregates it into: {"dep_name": [1, 2, 3]}
-- - We need to unwrap and get just the array value
-- - Map steps have exactly 1 dependency (enforced by add_step)
-- - So jsonb_each will return exactly 1 row
-- - We extract the 'value' which is the raw array [1, 2, 3]
-- - Then get the element at task_index from that array
(SELECT jsonb_array_element(value, st.task_index)
FROM jsonb_each(dep_out.deps_output)
LIMIT 1)
END
-- -------------------- NON-MAP STEPS --------------------
-- Regular (non-map) steps receive dependency outputs as a structured object.
-- Root steps (no dependencies) get empty object - they access flowInput via context.
-- Dependent steps get only their dependency outputs.
ELSE
-- Non-map steps get structured input with dependency keys only
-- Example for dependent step: {
-- "step1": {"output": "from_step1"},
-- "step2": {"output": "from_step2"}
-- }
-- Example for root step: {}
--
-- Note: flow_input is available separately in the returned record
-- for workers to access via context.flowInput
coalesce(dep_out.deps_output, '{}'::jsonb)
END as input,
st.message_id as msg_id,
st.task_index as task_index,
-- flow_input: Original run input for worker context
-- Only included for root non-map steps to avoid data duplication.
-- Root map steps: flowInput IS the array, useless to include
-- Dependent steps: lazy load via ctx.flowInput when needed
CASE
WHEN step.step_type != 'map' AND step.deps_count = 0
THEN r.input
ELSE NULL
END as flow_input
from tasks st
join runs r on st.run_id = r.run_id
join pgflow.steps step on
step.flow_slug = st.flow_slug and
step.step_slug = st.step_slug
left join deps_outputs dep_out on
dep_out.run_id = st.run_id and
dep_out.step_slug = st.step_slug
$$;