Skip to content

Commit 52e1ecf

Browse files
committed
test: Expand cancellation/migration E2E tests to both NATS and TCP request plane
1 parent 09ab256 commit 52e1ecf

File tree

9 files changed

+69
-11
lines changed

9 files changed

+69
-11
lines changed

tests/conftest.py

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -412,11 +412,52 @@ def _create_server(self) -> ManagedProcess:
412412
return server
413413

414414

415+
@pytest.fixture
416+
def store_kv(request):
417+
"""
418+
KV store for runtime. Defaults to "etcd".
419+
420+
To iterate over multiple stores in a test:
421+
@pytest.mark.parametrize("store_kv", ["file", "etcd"], indirect=True)
422+
def test_example(runtime_services):
423+
...
424+
"""
425+
return getattr(request, "param", "etcd")
426+
427+
428+
@pytest.fixture
429+
def request_plane(request):
430+
"""
431+
Request plane for runtime. Defaults to "nats".
432+
433+
To iterate over multiple transports in a test:
434+
@pytest.mark.parametrize("request_plane", ["nats", "tcp"], indirect=True)
435+
def test_example(runtime_services):
436+
...
437+
"""
438+
return getattr(request, "param", "nats")
439+
440+
415441
@pytest.fixture()
416-
def runtime_services(request):
417-
with NatsServer(request) as nats_process:
442+
def runtime_services(request, store_kv, request_plane):
443+
"""
444+
Start runtime services (NATS and/or etcd) based on store_kv and request_plane.
445+
446+
- If store_kv != "etcd", etcd is not started (returns None)
447+
- If request_plane != "nats", NATS is not started (returns None)
448+
"""
449+
if request_plane == "nats" and store_kv == "etcd":
450+
with NatsServer(request) as nats_process:
451+
with EtcdServer(request) as etcd_process:
452+
yield nats_process, etcd_process
453+
elif request_plane == "nats":
454+
with NatsServer(request) as nats_process:
455+
yield nats_process, None
456+
elif store_kv == "etcd":
418457
with EtcdServer(request) as etcd_process:
419-
yield nats_process, etcd_process
458+
yield None, etcd_process
459+
else:
460+
yield None, None
420461

421462

422463
@pytest.fixture(scope="session")

tests/fault_tolerance/cancellation/test_sglang.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
pytest.mark.sglang,
2626
pytest.mark.e2e,
2727
pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME),
28+
pytest.mark.parametrize("request_plane", ["nats", "tcp"], indirect=True),
2829
pytest.mark.post_merge, # post_merge to pinpoint failure commit
2930
]
3031

@@ -89,8 +90,10 @@ def __init__(self, request, mode: str = "agg"):
8990
else: # agg (aggregated mode)
9091
port = "8081"
9192

92-
# Set debug logging environment
93+
# Set environment variables
9394
env = os.environ.copy()
95+
env["DYN_REQUEST_PLANE"] = request.getfixturevalue("request_plane")
96+
9497
env["DYN_LOG"] = "debug"
9598
# Disable canary health check - these tests expect full control over requests
9699
# sent to the workers where canary health check intermittently sends dummy

tests/fault_tolerance/cancellation/test_trtllm.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
pytest.mark.gpu_1,
2727
pytest.mark.e2e,
2828
pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME),
29+
pytest.mark.parametrize("request_plane", ["nats", "tcp"], indirect=True),
2930
pytest.mark.post_merge, # post_merge to pinpoint failure commit
3031
]
3132

@@ -85,8 +86,10 @@ def __init__(self, request, mode: str = "prefill_and_decode"):
8586
else: # prefill_and_decode
8687
port = "8081"
8788

88-
# Set debug logging environment
89+
# Set environment variables
8990
env = os.environ.copy()
91+
env["DYN_REQUEST_PLANE"] = request.getfixturevalue("request_plane")
92+
9093
env["DYN_LOG"] = "debug"
9194
# Disable canary health check - these tests expect full control over requests
9295
# sent to the workers where canary health check intermittently sends dummy

tests/fault_tolerance/cancellation/test_vllm.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
pytest.mark.gpu_1,
2626
pytest.mark.e2e,
2727
pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME),
28+
pytest.mark.parametrize("request_plane", ["nats", "tcp"], indirect=True),
2829
pytest.mark.post_merge, # post_merge to pinpoint failure commit
2930
]
3031

@@ -65,8 +66,10 @@ def __init__(self, request, is_prefill: bool = False):
6566
(f"http://localhost:{FRONTEND_PORT}/health", check_health_generate),
6667
]
6768

68-
# Set debug logging environment
69+
# Set environment variables
6970
env = os.environ.copy()
71+
env["DYN_REQUEST_PLANE"] = request.getfixturevalue("request_plane")
72+
7073
env["DYN_LOG"] = "debug"
7174
# Disable canary health check - these tests expect full control over requests
7275
# sent to the workers where canary health check intermittently sends dummy

tests/fault_tolerance/cancellation/utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ class DynamoFrontendProcess(ManagedProcess):
2626
def __init__(self, request):
2727
command = ["python", "-m", "dynamo.frontend"]
2828

29-
# Set debug logging environment
3029
env = os.environ.copy()
30+
env["DYN_REQUEST_PLANE"] = request.getfixturevalue("request_plane")
3131
env["DYN_LOG"] = "debug"
3232
# Disable canary health check - these tests expect full control over requests
3333
# sent to the workers where canary health check intermittently sends dummy

tests/fault_tolerance/migration/test_sglang.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
pytest.mark.gpu_1,
2929
pytest.mark.e2e,
3030
pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME),
31+
pytest.mark.parametrize("request_plane", ["nats", "tcp"], indirect=True),
3132
pytest.mark.post_merge, # post_merge to pinpoint failure commit
3233
]
3334

@@ -56,8 +57,9 @@ def __init__(self, request, worker_id: str, migration_limit: int = 3):
5657
str(migration_limit),
5758
]
5859

59-
# Set debug logging environment
60+
# Set environment variables
6061
env = os.environ.copy()
62+
env["DYN_REQUEST_PLANE"] = request.getfixturevalue("request_plane")
6163
env["DYN_LOG"] = "debug"
6264
# Disable canary health check - these tests expect full control over requests
6365
# sent to the workers where canary health check intermittently sends dummy

tests/fault_tolerance/migration/test_trtllm.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
pytest.mark.gpu_1,
2929
pytest.mark.e2e,
3030
pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME),
31+
pytest.mark.parametrize("request_plane", ["nats", "tcp"], indirect=True),
3132
pytest.mark.post_merge, # post_merge to pinpoint failure commit
3233
]
3334

@@ -54,8 +55,9 @@ def __init__(self, request, worker_id: str, migration_limit: int = 3):
5455
str(migration_limit),
5556
]
5657

57-
# Set debug logging environment
58+
# Set environment variables
5859
env = os.environ.copy()
60+
env["DYN_REQUEST_PLANE"] = request.getfixturevalue("request_plane")
5961
env["DYN_LOG"] = "debug"
6062
# Disable canary health check - these tests expect full control over requests
6163
# sent to the workers where canary health check intermittently sends dummy

tests/fault_tolerance/migration/test_vllm.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
pytest.mark.gpu_1,
2929
pytest.mark.e2e,
3030
pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME),
31+
pytest.mark.parametrize("request_plane", ["nats", "tcp"], indirect=True),
3132
pytest.mark.post_merge, # post_merge to pinpoint failure commit
3233
]
3334

@@ -53,8 +54,10 @@ def __init__(self, request, worker_id: str, migration_limit: int = 3):
5354
str(migration_limit),
5455
]
5556

56-
# Set debug logging environment
57+
# Set environment variables
5758
env = os.environ.copy()
59+
env["DYN_REQUEST_PLANE"] = request.getfixturevalue("request_plane")
60+
5861
env["DYN_VLLM_KV_EVENT_PORT"] = f"2008{worker_id[-1]}"
5962
env["VLLM_NIXL_SIDE_CHANNEL_PORT"] = f"560{worker_id[-1]}"
6063

tests/fault_tolerance/migration/utils.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,14 @@ class DynamoFrontendProcess(ManagedProcess):
2323
def __init__(self, request):
2424
command = ["python", "-m", "dynamo.frontend", "--router-mode", "round-robin"]
2525

26-
# Unset DYN_SYSTEM_PORT - frontend doesn't use system metrics server
2726
env = os.environ.copy()
27+
env["DYN_REQUEST_PLANE"] = request.getfixturevalue("request_plane")
2828
# Disable canary health check - these tests expect full control over requests
2929
# sent to the workers where canary health check intermittently sends dummy
3030
# requests to workers interfering with the test process which may cause
3131
# intermittent failures
3232
env["DYN_HEALTH_CHECK_ENABLED"] = "false"
33+
# Unset DYN_SYSTEM_PORT - frontend doesn't use system metrics server
3334
env.pop("DYN_SYSTEM_PORT", None)
3435

3536
log_dir = f"{request.node.name}_frontend"

0 commit comments

Comments
 (0)