Skip to content

Commit aa1ead7

Browse files
deployment: add retry mechanism to port-forward on simultaor script (#10545)
1 parent 99daba8 commit aa1ead7

File tree

1 file changed

+144
-86
lines changed

1 file changed

+144
-86
lines changed

scripts/system_tests/sequencer_simulator2.py

Lines changed: 144 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -42,99 +42,156 @@ def port_forward(
4242
remote_port: int,
4343
wait_ready: bool = True,
4444
max_attempts: int = 25,
45+
max_retries: int = 5, # Retry the port-forward command itself
4546
):
46-
cmd = ["kubectl", "port-forward", pod_name, f"{local_port}:{remote_port}"]
47-
# Capture stderr to see kubectl errors
48-
process = subprocess.Popen(
49-
cmd,
50-
stdout=subprocess.PIPE,
51-
stderr=subprocess.PIPE,
52-
text=True,
53-
)
47+
"""Port-forward with retry logic for transient kubectl connection failures."""
48+
49+
def is_transient_error(error_msg: str) -> bool:
50+
"""Check if error message indicates a transient connection error."""
51+
if not error_msg:
52+
return False
53+
error_lower = error_msg.lower()
54+
return any(
55+
phrase in error_lower
56+
for phrase in [
57+
"eof",
58+
"error dialing backend",
59+
"error upgrading connection",
60+
"connection refused",
61+
]
62+
)
5463

55-
error_output_read = False
64+
for retry in range(max_retries):
65+
if retry > 0:
66+
print(
67+
f"🔄 Retrying port-forward command (attempt {retry + 1}/{max_retries})...",
68+
flush=True,
69+
)
70+
time.sleep(5) # Wait before retry
71+
72+
cmd = ["kubectl", "port-forward", pod_name, f"{local_port}:{remote_port}"]
73+
process = subprocess.Popen(
74+
cmd,
75+
stdout=subprocess.PIPE,
76+
stderr=subprocess.PIPE,
77+
text=True,
78+
)
5679

57-
def get_error_if_failed():
58-
"""Check if process failed and return error message, None if still running."""
59-
nonlocal error_output_read
60-
if error_output_read:
61-
return None # Already read the output
80+
error_output_read = False
6281

63-
if process.poll() is not None:
64-
# Process terminated, read stderr (non-blocking since process is done)
65-
error_output_read = True
66-
try:
67-
_, stderr = process.communicate(timeout=1)
68-
return stderr.strip() if stderr else "Process terminated with unknown error"
69-
except subprocess.TimeoutExpired:
70-
return "Process terminated but could not read error output"
71-
return None
72-
73-
# Give kubectl a moment to start and potentially fail
74-
time.sleep(0.5)
75-
76-
# Check if process has already failed
77-
error_msg = get_error_if_failed()
78-
if error_msg:
79-
raise RuntimeError(
80-
f"❌ Port-forward to {pod_name}:{remote_port} failed immediately.\n"
81-
f"kubectl error: {error_msg}"
82-
)
82+
def get_error_if_failed():
83+
"""Check if process failed and return error message, None if still running."""
84+
nonlocal error_output_read
85+
if error_output_read:
86+
return None
87+
88+
if process.poll() is not None:
89+
error_output_read = True
90+
try:
91+
_, stderr = process.communicate(timeout=1)
92+
return stderr.strip() if stderr else "Process terminated with unknown error"
93+
except subprocess.TimeoutExpired:
94+
return "Process terminated but could not read error output"
95+
return None
8396

84-
if not wait_ready:
85-
return
97+
# Give kubectl more time to establish connection in CI
98+
time.sleep(1.5 if retry == 0 else 0.5)
8699

87-
for attempt in range(max_attempts):
88-
# Check if process has failed
100+
# Check if process failed immediately (transient connection error)
89101
error_msg = get_error_if_failed()
90102
if error_msg:
91-
raise RuntimeError(
92-
f"❌ Port-forward to {pod_name}:{remote_port} failed.\n"
93-
f"kubectl error: {error_msg}"
94-
)
95-
96-
try:
97-
with socket.create_connection(("localhost", local_port), timeout=1):
103+
if retry < max_retries - 1 and is_transient_error(error_msg):
98104
print(
99-
f"✅ Port-forward to {pod_name}:{remote_port} is ready on localhost:{local_port}"
105+
f"⚠️ Transient kubectl connection error (will retry): {error_msg[:150]}",
106+
flush=True,
100107
)
101-
return
102-
except Exception:
103-
print(
104-
f"🔄 Port-forward to {pod_name}:{remote_port} failed, attempt: {attempt}/{max_attempts}"
105-
)
106-
time.sleep(1)
107-
108-
# Final check - if process failed, get the error
109-
error_msg = get_error_if_failed()
110-
if error_msg:
111-
raise RuntimeError(
112-
f"❌ Port-forward to {pod_name}:{remote_port} failed after {max_attempts} attempts.\n"
113-
f"kubectl error: {error_msg}"
114-
)
115-
else:
116-
# Process still running but port not ready - kill it and report
117-
process.terminate()
118-
final_error_msg = None
119-
try:
120-
process.wait(timeout=2)
121-
# Try to read any final error output
122-
if not error_output_read:
123108
try:
124-
_, stderr = process.communicate(timeout=1)
125-
if stderr:
126-
final_error_msg = stderr.strip()
127-
except subprocess.TimeoutExpired:
109+
process.kill()
110+
except:
128111
pass
129-
except subprocess.TimeoutExpired:
130-
process.kill()
131-
132-
error_details = f"\nkubectl error: {final_error_msg}" if final_error_msg else ""
133-
raise RuntimeError(
134-
f"❌ Port-forward to {pod_name}:{remote_port} failed after {max_attempts} attempts.\n"
135-
f"Port {local_port} is not accessible. Check if the pod is running and the port is correct.\n"
136-
f"Pod: {pod_name}, Local port: {local_port}, Remote port: {remote_port}{error_details}"
137-
)
112+
continue # Retry the port-forward command
113+
else:
114+
raise RuntimeError(
115+
f"❌ Port-forward to {pod_name}:{remote_port} failed after {retry + 1} attempts.\n"
116+
f"kubectl error: {error_msg}"
117+
)
118+
119+
if not wait_ready:
120+
return process
121+
122+
# Wait for port to be accessible
123+
for attempt in range(max_attempts):
124+
# Check if process has failed
125+
error_msg = get_error_if_failed()
126+
if error_msg:
127+
if retry < max_retries - 1 and is_transient_error(error_msg):
128+
print(
129+
f"⚠️ Transient error during wait (will retry): {error_msg[:150]}",
130+
flush=True,
131+
)
132+
try:
133+
process.kill()
134+
except:
135+
pass
136+
break # Break inner loop to retry outer loop
137+
else:
138+
raise RuntimeError(
139+
f"❌ Port-forward to {pod_name}:{remote_port} failed.\n"
140+
f"kubectl error: {error_msg}"
141+
)
142+
143+
try:
144+
with socket.create_connection(("localhost", local_port), timeout=1):
145+
print(
146+
f"✅ Port-forward to {pod_name}:{remote_port} is ready on localhost:{local_port}",
147+
flush=True,
148+
)
149+
return process
150+
except Exception:
151+
if attempt < max_attempts - 1:
152+
print(
153+
f"🔄 Port-forward to {pod_name}:{remote_port} not ready yet, attempt: {attempt + 1}/{max_attempts}",
154+
flush=True,
155+
)
156+
time.sleep(1)
157+
158+
# If we get here, port never became ready - retry if we have retries left
159+
if retry < max_retries - 1:
160+
print(
161+
f"⚠️ Port-forward process still running but port not accessible, retrying...",
162+
flush=True,
163+
)
164+
try:
165+
process.kill()
166+
except:
167+
pass
168+
continue
169+
else:
170+
# Final failure
171+
process.terminate()
172+
final_error_msg = None
173+
try:
174+
process.wait(timeout=2)
175+
if not error_output_read:
176+
try:
177+
_, stderr = process.communicate(timeout=1)
178+
if stderr:
179+
final_error_msg = stderr.strip()
180+
except subprocess.TimeoutExpired:
181+
pass
182+
except subprocess.TimeoutExpired:
183+
process.kill()
184+
185+
error_details = f"\nkubectl error: {final_error_msg}" if final_error_msg else ""
186+
raise RuntimeError(
187+
f"❌ Port-forward to {pod_name}:{remote_port} failed after {max_retries} retries and {max_attempts} attempts.\n"
188+
f"Port {local_port} is not accessible. Check if the pod is running and the port is correct.\n"
189+
f"Pod: {pod_name}, Local port: {local_port}, Remote port: {remote_port}{error_details}"
190+
)
191+
192+
raise RuntimeError(
193+
f"❌ Port-forward to {pod_name}:{remote_port} failed after {max_retries} retries"
194+
)
138195

139196

140197
def run_simulator(http_port: int, monitoring_port: int, sender_address: str, receiver_address: str):
@@ -155,7 +212,7 @@ def run_simulator(http_port: int, monitoring_port: int, sender_address: str, rec
155212

156213
def setup_port_forwarding(service_name: str, port: int, node_type: NodeType):
157214
pod_name = get_pod_name(get_service_label(node_type, service_name))
158-
print(f"📡 Port-forwarding {pod_name} on local port {port}...")
215+
print(f"📡 Port-forwarding {pod_name} on local port {port}...", flush=True)
159216
port_forward(pod_name, port, port)
160217

161218
return port
@@ -168,7 +225,7 @@ def main(
168225
sender_address: str,
169226
receiver_address: str,
170227
):
171-
print("🚀 Running sequencer simulator....")
228+
print("🚀 Running sequencer simulator....", flush=True)
172229

173230
try:
174231
node_type = NodeType(node_type_str)
@@ -203,15 +260,16 @@ def main(
203260
)
204261

205262
print(
206-
f"Running the simulator with http port: {http_server_port} and monitoring port: {state_sync_port}"
263+
f"Running the simulator with http port: {http_server_port} and monitoring port: {state_sync_port}",
264+
flush=True,
207265
)
208266
exit_code = run_simulator(http_server_port, state_sync_port, sender_address, receiver_address)
209267

210268
if exit_code != 0:
211-
print("❌ Sequencer simulator failed!")
269+
print("❌ Sequencer simulator failed!", flush=True)
212270
exit(exit_code)
213271
else:
214-
print("✅ Sequencer simulator completed successfully!")
272+
print("✅ Sequencer simulator completed successfully!", flush=True)
215273

216274

217275
if __name__ == "__main__":

0 commit comments

Comments
 (0)