Skip to content

Commit f75b607

Browse files
Skip empty dag run conf rows and set statement timeout (apache#50788)
Don't bother depickling `{}` object in dag run conf Also, set the statement timeout a little higher for xcom migration. Co-authored-by: Jed Cunningham <[email protected]>
1 parent fbc7156 commit f75b607

File tree

4 files changed

+52
-9
lines changed

4 files changed

+52
-9
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
4a44b71c7fe18f8f7c60ac1d576d29c618ee370f810fe1bb8a1894a3d77fcb0d
1+
f5a99cb756403f20d63b7a4d0df67f776ac43d53d9bcc75323c4b80044d409e4

airflow-core/src/airflow/migrations/versions/0042_3_0_0_add_uuid_primary_key_to_task_instance_.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,10 @@ def upgrade():
206206
op.execute(pg_uuid7_fn)
207207

208208
# Migrate existing rows with UUID v7 using a timestamp-based generation
209+
batch_num = 0
209210
while True:
211+
batch_num += 1
212+
print(f"processing batch {batch_num}")
210213
result = conn.execute(
211214
text(
212215
"""

airflow-core/src/airflow/migrations/versions/0049_3_0_0_remove_pickled_data_from_xcom_table.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
from __future__ import annotations
2929

3030
import sqlalchemy as sa
31-
from alembic import op
31+
from alembic import context, op
3232
from sqlalchemy import text
3333
from sqlalchemy.dialects.mysql import LONGBLOB
3434

@@ -77,9 +77,24 @@ def upgrade():
7777
condition = condition_templates.get(dialect)
7878
if not condition:
7979
raise RuntimeError(f"Unsupported dialect: {dialect}")
80-
8180
# Key is a reserved keyword in MySQL, so we need to quote it
8281
quoted_key = conn.dialect.identifier_preparer.quote("key")
82+
if dialect == "postgresql" and not context.is_offline_mode():
83+
curr_timeout = (
84+
int(
85+
conn.execute(
86+
text("""
87+
SELECT setting
88+
FROM pg_settings
89+
WHERE name = 'statement_timeout'
90+
""")
91+
).scalar_one()
92+
)
93+
/ 1000
94+
)
95+
if curr_timeout > 0 and curr_timeout < 1800:
96+
print("setting local statement timeout to 1800s")
97+
conn.execute(text("SET LOCAL statement_timeout='1800s'"))
8398

8499
# Archive pickled data using the condition
85100
conn.execute(

airflow-core/src/airflow/migrations/versions/0055_3_0_0_remove_pickled_data_from_dagrun_table.py

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,17 @@
4747
def upgrade():
4848
"""Apply remove pickled data from dagrun table."""
4949
conn = op.get_bind()
50+
empty_vals = {
51+
"mysql": "X'80057D942E'",
52+
"postgresql": r"'\x80057D942E'",
53+
"sqlite": "X'80057D942E'",
54+
}
55+
dialect = conn.dialect.name
56+
try:
57+
empty_val = empty_vals[dialect]
58+
except KeyError:
59+
raise RuntimeError(f"Dialect {dialect} not supported.")
60+
5061
conf_type = sa.JSON().with_variant(postgresql.JSONB, "postgresql")
5162
op.add_column("dag_run", sa.Column("conf_json", conf_type, nullable=True))
5263

@@ -61,12 +72,20 @@ def upgrade():
6172
""")
6273
)
6374
else:
64-
BATCH_SIZE = 100
75+
BATCH_SIZE = 1000
6576
offset = 0
6677
while True:
78+
err_count = 0
79+
batch_num = offset + 1
80+
print(f"converting dag run conf. batch={batch_num}")
6781
rows = conn.execute(
6882
text(
69-
f"SELECT id,conf FROM dag_run WHERE conf IS not NULL order by id LIMIT {BATCH_SIZE} OFFSET {offset}"
83+
"SELECT id, conf "
84+
"FROM dag_run "
85+
"WHERE conf IS not NULL "
86+
f"AND conf != {empty_val}"
87+
f"ORDER BY id LIMIT {BATCH_SIZE} "
88+
f"OFFSET {offset}"
7089
)
7190
).fetchall()
7291
if not rows:
@@ -85,9 +104,11 @@ def upgrade():
85104
"""),
86105
{"json_data": json_data, "id": row_id},
87106
)
88-
except Exception as e:
89-
print(f"Error converting dagrun conf to json for dagrun ID {row_id}: {e}")
107+
except Exception:
108+
err_count += 1
90109
continue
110+
if err_count:
111+
print(f"could not convert dag run conf for {err_count} records. batch={batch_num}")
91112
offset += BATCH_SIZE
92113

93114
op.drop_column("dag_run", "conf")
@@ -112,12 +133,16 @@ def downgrade():
112133
)
113134

114135
else:
115-
BATCH_SIZE = 100
136+
BATCH_SIZE = 1000
116137
offset = 0
117138
while True:
118139
rows = conn.execute(
119140
text(
120-
f"SELECT id,conf FROM dag_run WHERE conf IS not NULL order by id LIMIT {BATCH_SIZE} OFFSET {offset}"
141+
"SELECT id,conf "
142+
"FROM dag_run "
143+
"WHERE conf IS NOT NULL "
144+
f"ORDER BY id LIMIT {BATCH_SIZE} "
145+
f"OFFSET {offset}"
121146
)
122147
).fetchall()
123148
if not rows:

0 commit comments

Comments
 (0)