Skip to content

Commit 60154b4

Browse files
committed
test: Extend migration tests to SGLang
Signed-off-by: Jacky <[email protected]>
1 parent 2f2a13a commit 60154b4

File tree

2 files changed

+322
-1
lines changed

2 files changed

+322
-1
lines changed
Lines changed: 321 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,321 @@
1+
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
import logging
5+
import os
6+
import shutil
7+
8+
import pytest
9+
10+
from tests.utils.constants import FAULT_TOLERANCE_MODEL_NAME
11+
from tests.utils.engine_process import FRONTEND_PORT
12+
from tests.utils.managed_process import ManagedProcess, terminate_process_tree
13+
from tests.utils.payloads import check_models_api
14+
15+
# Import utilities from the refactored utils module
16+
from .utils import (
17+
DynamoFrontendProcess,
18+
determine_request_receiving_worker,
19+
start_completion_request,
20+
validate_completion_response,
21+
verify_migration_occurred,
22+
)
23+
24+
logger = logging.getLogger(__name__)
25+
26+
pytestmark = [
27+
pytest.mark.sglang,
28+
pytest.mark.gpu_1,
29+
pytest.mark.e2e,
30+
pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME),
31+
pytest.mark.pre_merge, # can be moved to nightly once stable for a week
32+
]
33+
34+
35+
class DynamoWorkerProcess(ManagedProcess):
36+
"""Process manager for Dynamo worker with SGLang backend"""
37+
38+
def __init__(self, request, worker_id: str, migration_limit: int = 3):
39+
self.worker_id = worker_id
40+
41+
command = [
42+
"python3",
43+
"-m",
44+
"dynamo.sglang",
45+
"--model-path",
46+
FAULT_TOLERANCE_MODEL_NAME,
47+
"--served-model-name",
48+
FAULT_TOLERANCE_MODEL_NAME,
49+
"--trust-remote-code",
50+
"--skip-tokenizer-init",
51+
"--mem-fraction-static",
52+
"0.45",
53+
"--context-length",
54+
"8192",
55+
"--migration-limit",
56+
str(migration_limit),
57+
]
58+
59+
# Set debug logging environment
60+
env = os.environ.copy()
61+
env["DYN_LOG"] = "debug"
62+
env["DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS"] = '["generate"]'
63+
env["DYN_SYSTEM_PORT"] = f"808{worker_id[-1]}"
64+
65+
# TODO: Have the managed process take a command name explicitly to distinguish
66+
# between processes started with the same command.
67+
log_dir = f"{request.node.name}_{worker_id}"
68+
69+
# Clean up any existing log directory from previous runs
70+
try:
71+
shutil.rmtree(log_dir)
72+
logger.info(f"Cleaned up existing log directory: {log_dir}")
73+
except FileNotFoundError:
74+
# Directory doesn't exist, which is fine
75+
pass
76+
77+
super().__init__(
78+
command=command,
79+
env=env,
80+
health_check_urls=[
81+
(f"http://localhost:{FRONTEND_PORT}/v1/models", check_models_api),
82+
(f"http://localhost:808{worker_id[-1]}/health", self.is_ready),
83+
],
84+
timeout=300,
85+
display_output=True,
86+
terminate_existing=False,
87+
stragglers=["SGLANG:EngineCore"],
88+
straggler_commands=["-m dynamo.sglang"],
89+
log_dir=log_dir,
90+
)
91+
92+
def get_pid(self):
93+
"""Get the PID of the worker process"""
94+
return self.proc.pid if self.proc else None
95+
96+
def is_ready(self, response) -> bool:
97+
"""Check the health of the worker process"""
98+
try:
99+
data = response.json()
100+
if data.get("status") == "ready":
101+
logger.info(f"{self.worker_id} status is ready")
102+
return True
103+
logger.warning(
104+
f"{self.worker_id} status is not ready: {data.get('status')}"
105+
)
106+
except ValueError:
107+
logger.warning(f"{self.worker_id} health response is not valid JSON")
108+
return False
109+
110+
111+
def test_request_migration_sglang_worker_failure(
112+
request, runtime_services, predownload_models, set_ucx_tls_no_mm
113+
):
114+
"""
115+
End-to-end test for worker fault tolerance with migration support using SGLang.
116+
117+
This test verifies that when a worker is killed during request processing,
118+
the system can handle the failure gracefully and migrate the request to
119+
another worker.
120+
"""
121+
122+
# Step 1: Start the frontend
123+
with DynamoFrontendProcess(request) as frontend:
124+
logger.info("Frontend started successfully")
125+
126+
# Step 2: Start 2 workers sequentially
127+
with DynamoWorkerProcess(request, "worker1") as worker1:
128+
logger.info(f"Worker 1 PID: {worker1.get_pid()}")
129+
130+
with DynamoWorkerProcess(request, "worker2") as worker2:
131+
logger.info(f"Worker 2 PID: {worker2.get_pid()}")
132+
133+
# Step 3: Send the request
134+
request_thread, response_list = start_completion_request()
135+
136+
# Step 4: Use polling to determine which worker received the request
137+
worker, worker_name = determine_request_receiving_worker(
138+
worker1, worker2, receiving_pattern="New Request ID: "
139+
)
140+
141+
# Step 5: Kill the worker that has the request
142+
logger.info(
143+
f"Killing {worker_name} with PID {worker.get_pid()} processing the request"
144+
)
145+
terminate_process_tree(worker.get_pid(), immediate_kill=True, timeout=0)
146+
147+
# Step 6: Validate the completion response
148+
validate_completion_response(request_thread, response_list)
149+
150+
# Step 7: Verify migration occurred
151+
verify_migration_occurred(frontend)
152+
153+
154+
@pytest.mark.skip(reason="SGLang graceful shutdown not yet implemented")
155+
def test_request_migration_sglang_graceful_shutdown(
156+
request, runtime_services, predownload_models, set_ucx_tls_no_mm
157+
):
158+
"""
159+
End-to-end test for worker fault tolerance with graceful shutdown and migration support using SGLang.
160+
161+
This test verifies that when a worker receives a graceful shutdown signal (SIGTERM)
162+
during request processing, the system can handle the shutdown gracefully and migrate
163+
the request to another worker. Unlike the abrupt kill test, this simulates a more
164+
controlled shutdown scenario where the worker has time to clean up and notify the
165+
system about its shutdown.
166+
"""
167+
168+
# Step 1: Start the frontend
169+
with DynamoFrontendProcess(request) as frontend:
170+
logger.info("Frontend started successfully")
171+
172+
# Step 2: Start 2 workers sequentially
173+
with DynamoWorkerProcess(request, "worker1") as worker1:
174+
logger.info(f"Worker 1 PID: {worker1.get_pid()}")
175+
176+
with DynamoWorkerProcess(request, "worker2") as worker2:
177+
logger.info(f"Worker 2 PID: {worker2.get_pid()}")
178+
179+
# Step 3: Send the request
180+
request_thread, response_list = start_completion_request()
181+
182+
# Step 4: Use polling to determine which worker received the request
183+
worker, worker_name = determine_request_receiving_worker(
184+
worker1, worker2, receiving_pattern="New Request ID: "
185+
)
186+
187+
# Step 5: Gracefully shutdown the worker that has the request
188+
logger.info(
189+
f"Gracefully shutting down {worker_name} with PID {worker.get_pid()} processing the request"
190+
)
191+
terminate_process_tree(
192+
worker.get_pid(), immediate_kill=False, timeout=10
193+
)
194+
195+
# Step 6: Validate the completion response
196+
validate_completion_response(request_thread, response_list)
197+
198+
# Step 7: Verify migration occurred during graceful shutdown
199+
verify_migration_occurred(frontend)
200+
201+
202+
def test_no_request_migration_sglang_worker_failure(
203+
request, runtime_services, predownload_models, set_ucx_tls_no_mm
204+
):
205+
"""
206+
End-to-end test for worker fault tolerance with migration disabled using SGLang.
207+
208+
This test verifies that when migration is disabled (migration_limit=0) and a worker
209+
is killed during request processing, the request fails as expected without migration.
210+
This is the opposite behavior of test_request_migration_sglang_worker_failure.
211+
"""
212+
213+
# Step 1: Start the frontend
214+
with DynamoFrontendProcess(request) as frontend:
215+
logger.info("Frontend started successfully")
216+
217+
# Step 2: Start 2 workers sequentially with migration disabled
218+
with DynamoWorkerProcess(request, "worker1", migration_limit=0) as worker1:
219+
logger.info(f"Worker 1 PID: {worker1.get_pid()}")
220+
221+
with DynamoWorkerProcess(request, "worker2", migration_limit=0) as worker2:
222+
logger.info(f"Worker 2 PID: {worker2.get_pid()}")
223+
224+
# Step 3: Send the request
225+
request_thread, response_list = start_completion_request()
226+
227+
# Step 4: Use polling to determine which worker received the request
228+
worker, worker_name = determine_request_receiving_worker(
229+
worker1, worker2, receiving_pattern="New Request ID: "
230+
)
231+
232+
# Step 5: Kill the worker that has the request
233+
logger.info(
234+
f"Killing {worker_name} with PID {worker.get_pid()} processing the request"
235+
)
236+
terminate_process_tree(worker.get_pid(), immediate_kill=True, timeout=0)
237+
238+
# Step 6: Validate the completion response - should fail without migration
239+
try:
240+
validate_completion_response(request_thread, response_list)
241+
pytest.fail(
242+
"Request succeeded unexpectedly when migration was disabled"
243+
)
244+
except AssertionError as e:
245+
assert "Request failed with status 500: " in str(
246+
e
247+
), f"Unexpected request error message: {e}"
248+
249+
# Step 7: Verify migration did NOT occur - should fail
250+
try:
251+
verify_migration_occurred(frontend)
252+
pytest.fail(
253+
"Migration verification unexpectedly passed when migration was disabled"
254+
)
255+
except AssertionError as e:
256+
assert "'Cannot recreate stream: ...' error found in logs" in str(
257+
e
258+
), f"Unexpected migration message: {e}"
259+
260+
261+
@pytest.mark.skip(reason="SGLang graceful shutdown not yet implemented")
262+
def test_no_request_migration_sglang_graceful_shutdown(
263+
request, runtime_services, predownload_models, set_ucx_tls_no_mm
264+
):
265+
"""
266+
End-to-end test for worker fault tolerance with graceful shutdown and migration disabled using SGLang.
267+
268+
This test verifies that when migration is disabled (migration_limit=0) and a worker
269+
receives a graceful shutdown signal (SIGTERM) during request processing, the request
270+
fails as expected without migration. This is the opposite behavior of
271+
test_request_migration_sglang_graceful_shutdown.
272+
"""
273+
274+
# Step 1: Start the frontend
275+
with DynamoFrontendProcess(request) as frontend:
276+
logger.info("Frontend started successfully")
277+
278+
# Step 2: Start 2 workers sequentially with migration disabled
279+
with DynamoWorkerProcess(request, "worker1", migration_limit=0) as worker1:
280+
logger.info(f"Worker 1 PID: {worker1.get_pid()}")
281+
282+
with DynamoWorkerProcess(request, "worker2", migration_limit=0) as worker2:
283+
logger.info(f"Worker 2 PID: {worker2.get_pid()}")
284+
285+
# Step 3: Send the request
286+
request_thread, response_list = start_completion_request()
287+
288+
# Step 4: Use polling to determine which worker received the request
289+
worker, worker_name = determine_request_receiving_worker(
290+
worker1, worker2, receiving_pattern="New Request ID: "
291+
)
292+
293+
# Step 5: Gracefully shutdown the worker that has the request
294+
logger.info(
295+
f"Gracefully shutting down {worker_name} with PID {worker.get_pid()} processing the request"
296+
)
297+
terminate_process_tree(
298+
worker.get_pid(), immediate_kill=False, timeout=10
299+
)
300+
301+
# Step 6: Validate the completion response - should fail without migration
302+
try:
303+
validate_completion_response(request_thread, response_list)
304+
pytest.fail(
305+
"Request succeeded unexpectedly when migration was disabled"
306+
)
307+
except AssertionError as e:
308+
assert "Request failed with status 500: " in str(
309+
e
310+
), f"Unexpected request error message: {e}"
311+
312+
# Step 7: Verify migration did NOT occur - should fail
313+
try:
314+
verify_migration_occurred(frontend)
315+
pytest.fail(
316+
"Migration verification unexpectedly passed when migration was disabled"
317+
)
318+
except AssertionError as e:
319+
assert "'Cannot recreate stream: ...' error found in logs" in str(
320+
e
321+
), f"Unexpected migration message: {e}"

tests/fault_tolerance/migration/utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ def start_completion_request() -> tuple:
5757

5858
def send_request():
5959
prompt = "Tell me a long long long story about yourself?"
60-
max_tokens = 8192
60+
max_tokens = 8000
6161
timeout = 240 # Extended timeout for long request
6262

6363
payload = {

0 commit comments

Comments
 (0)