From 09ab2560cf4b991b20ee3ee1d863faf6b24a13d6 Mon Sep 17 00:00:00 2001 From: Jacky <18255193+kthui@users.noreply.github.com> Date: Wed, 3 Dec 2025 16:13:28 -0800 Subject: [PATCH 1/2] test: Expand cancellation unit test to both NATS and TCP request plane --- .../tests/cancellation/test_cancellation.py | 6 +++ lib/bindings/python/tests/conftest.py | 38 ++++++++++++++++++- 2 files changed, 42 insertions(+), 2 deletions(-) diff --git a/lib/bindings/python/tests/cancellation/test_cancellation.py b/lib/bindings/python/tests/cancellation/test_cancellation.py index 42d29d8930..1aff5e10ae 100644 --- a/lib/bindings/python/tests/cancellation/test_cancellation.py +++ b/lib/bindings/python/tests/cancellation/test_cancellation.py @@ -165,6 +165,7 @@ async def client(runtime, namespace): @pytest.mark.forked @pytest.mark.asyncio +@pytest.mark.parametrize("request_plane", ["nats", "tcp"], indirect=True) async def test_client_context_cancel(temp_file_store, server, client): _, handler = server context = Context() @@ -198,6 +199,7 @@ async def test_client_context_cancel(temp_file_store, server, client): @pytest.mark.forked @pytest.mark.asyncio +@pytest.mark.parametrize("request_plane", ["nats", "tcp"], indirect=True) async def test_client_loop_break(temp_file_store, server, client): _, handler = server stream = await client.generate("_generate_until_context_cancelled") @@ -230,6 +232,7 @@ async def test_client_loop_break(temp_file_store, server, client): @pytest.mark.forked @pytest.mark.asyncio +@pytest.mark.parametrize("request_plane", ["nats", "tcp"], indirect=True) async def test_server_context_cancel(temp_file_store, server, client): _, handler = server stream = await client.generate("_generate_and_cancel_context") @@ -254,6 +257,7 @@ async def test_server_context_cancel(temp_file_store, server, client): @pytest.mark.forked @pytest.mark.asyncio +@pytest.mark.parametrize("request_plane", ["nats", "tcp"], indirect=True) async def test_server_raise_cancelled(temp_file_store, server, client): _, handler = server stream = await client.generate("_generate_and_raise_cancelled") @@ -282,6 +286,7 @@ async def test_server_raise_cancelled(temp_file_store, server, client): @pytest.mark.forked @pytest.mark.asyncio +@pytest.mark.parametrize("request_plane", ["nats", "tcp"], indirect=True) async def test_client_context_already_cancelled(temp_file_store, server, client): _, handler = server context = Context() @@ -304,6 +309,7 @@ async def test_client_context_already_cancelled(temp_file_store, server, client) @pytest.mark.forked @pytest.mark.asyncio +@pytest.mark.parametrize("request_plane", ["nats", "tcp"], indirect=True) async def test_client_context_cancel_before_await_request( temp_file_store, server, client ): diff --git a/lib/bindings/python/tests/conftest.py b/lib/bindings/python/tests/conftest.py index 9d5f33a932..b234e405bb 100644 --- a/lib/bindings/python/tests/conftest.py +++ b/lib/bindings/python/tests/conftest.py @@ -402,8 +402,34 @@ def temp_file_store(): yield tmpdir +@pytest.fixture +def store_kv(request): + """ + KV store for runtime. Defaults to "file". + + To iterate over multiple stores in a test: + @pytest.mark.parametrize("store_kv", ["file", "etcd"], indirect=True) + async def test_example(runtime): + ... + """ + return getattr(request, "param", "file") + + +@pytest.fixture +def request_plane(request): + """ + Request plane for runtime. Defaults to "nats". + + To iterate over multiple transports in a test: + @pytest.mark.parametrize("request_plane", ["tcp", "nats"], indirect=True) + async def test_example(runtime): + ... + """ + return getattr(request, "param", "nats") + + @pytest.fixture(scope="function", autouse=False) -async def runtime(request): +async def runtime(request, store_kv, request_plane): """ Create a DistributedRuntime for testing. @@ -413,6 +439,14 @@ async def runtime(request): Without @pytest.mark.forked in isolated mode, you will get "Worker already initialized" errors when multiple tests try to create runtimes in the same process. + + The store_kv and request_plane can be customized by overriding their fixtures + or using @pytest.mark.parametrize with indirect=True: + + @pytest.mark.forked + @pytest.mark.parametrize("store_kv", ["etcd"], indirect=True) + async def test_with_etcd(runtime): + ... """ # Check if the test is marked with @pytest.mark.forked (only in isolated mode) if ENABLE_ISOLATED_ETCD_AND_NATS: @@ -435,6 +469,6 @@ async def test_my_test(runtime): ) loop = asyncio.get_running_loop() - runtime = DistributedRuntime(loop, "file", "nats") + runtime = DistributedRuntime(loop, store_kv, request_plane) yield runtime runtime.shutdown() From 52e1ecf52c23fd5bff460b8c165dc0d3253ae2ab Mon Sep 17 00:00:00 2001 From: Jacky <18255193+kthui@users.noreply.github.com> Date: Wed, 3 Dec 2025 18:27:59 -0800 Subject: [PATCH 2/2] test: Expand cancellation/migration E2E tests to both NATS and TCP request plane --- tests/conftest.py | 47 +++++++++++++++++-- .../cancellation/test_sglang.py | 5 +- .../cancellation/test_trtllm.py | 5 +- .../fault_tolerance/cancellation/test_vllm.py | 5 +- tests/fault_tolerance/cancellation/utils.py | 2 +- .../fault_tolerance/migration/test_sglang.py | 4 +- .../fault_tolerance/migration/test_trtllm.py | 4 +- tests/fault_tolerance/migration/test_vllm.py | 5 +- tests/fault_tolerance/migration/utils.py | 3 +- 9 files changed, 69 insertions(+), 11 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 43570b6e53..2e204d4912 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -412,11 +412,52 @@ def _create_server(self) -> ManagedProcess: return server +@pytest.fixture +def store_kv(request): + """ + KV store for runtime. Defaults to "etcd". + + To iterate over multiple stores in a test: + @pytest.mark.parametrize("store_kv", ["file", "etcd"], indirect=True) + def test_example(runtime_services): + ... + """ + return getattr(request, "param", "etcd") + + +@pytest.fixture +def request_plane(request): + """ + Request plane for runtime. Defaults to "nats". + + To iterate over multiple transports in a test: + @pytest.mark.parametrize("request_plane", ["nats", "tcp"], indirect=True) + def test_example(runtime_services): + ... + """ + return getattr(request, "param", "nats") + + @pytest.fixture() -def runtime_services(request): - with NatsServer(request) as nats_process: +def runtime_services(request, store_kv, request_plane): + """ + Start runtime services (NATS and/or etcd) based on store_kv and request_plane. + + - If store_kv != "etcd", etcd is not started (returns None) + - If request_plane != "nats", NATS is not started (returns None) + """ + if request_plane == "nats" and store_kv == "etcd": + with NatsServer(request) as nats_process: + with EtcdServer(request) as etcd_process: + yield nats_process, etcd_process + elif request_plane == "nats": + with NatsServer(request) as nats_process: + yield nats_process, None + elif store_kv == "etcd": with EtcdServer(request) as etcd_process: - yield nats_process, etcd_process + yield None, etcd_process + else: + yield None, None @pytest.fixture(scope="session") diff --git a/tests/fault_tolerance/cancellation/test_sglang.py b/tests/fault_tolerance/cancellation/test_sglang.py index 55058f01d1..44bba39b62 100644 --- a/tests/fault_tolerance/cancellation/test_sglang.py +++ b/tests/fault_tolerance/cancellation/test_sglang.py @@ -25,6 +25,7 @@ pytest.mark.sglang, pytest.mark.e2e, pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME), + pytest.mark.parametrize("request_plane", ["nats", "tcp"], indirect=True), pytest.mark.post_merge, # post_merge to pinpoint failure commit ] @@ -89,8 +90,10 @@ def __init__(self, request, mode: str = "agg"): else: # agg (aggregated mode) port = "8081" - # Set debug logging environment + # Set environment variables env = os.environ.copy() + env["DYN_REQUEST_PLANE"] = request.getfixturevalue("request_plane") + env["DYN_LOG"] = "debug" # Disable canary health check - these tests expect full control over requests # sent to the workers where canary health check intermittently sends dummy diff --git a/tests/fault_tolerance/cancellation/test_trtllm.py b/tests/fault_tolerance/cancellation/test_trtllm.py index 87e679f40d..3ee83ddb72 100644 --- a/tests/fault_tolerance/cancellation/test_trtllm.py +++ b/tests/fault_tolerance/cancellation/test_trtllm.py @@ -26,6 +26,7 @@ pytest.mark.gpu_1, pytest.mark.e2e, pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME), + pytest.mark.parametrize("request_plane", ["nats", "tcp"], indirect=True), pytest.mark.post_merge, # post_merge to pinpoint failure commit ] @@ -85,8 +86,10 @@ def __init__(self, request, mode: str = "prefill_and_decode"): else: # prefill_and_decode port = "8081" - # Set debug logging environment + # Set environment variables env = os.environ.copy() + env["DYN_REQUEST_PLANE"] = request.getfixturevalue("request_plane") + env["DYN_LOG"] = "debug" # Disable canary health check - these tests expect full control over requests # sent to the workers where canary health check intermittently sends dummy diff --git a/tests/fault_tolerance/cancellation/test_vllm.py b/tests/fault_tolerance/cancellation/test_vllm.py index 01bde0f314..14d2439a8a 100644 --- a/tests/fault_tolerance/cancellation/test_vllm.py +++ b/tests/fault_tolerance/cancellation/test_vllm.py @@ -25,6 +25,7 @@ pytest.mark.gpu_1, pytest.mark.e2e, pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME), + pytest.mark.parametrize("request_plane", ["nats", "tcp"], indirect=True), pytest.mark.post_merge, # post_merge to pinpoint failure commit ] @@ -65,8 +66,10 @@ def __init__(self, request, is_prefill: bool = False): (f"http://localhost:{FRONTEND_PORT}/health", check_health_generate), ] - # Set debug logging environment + # Set environment variables env = os.environ.copy() + env["DYN_REQUEST_PLANE"] = request.getfixturevalue("request_plane") + env["DYN_LOG"] = "debug" # Disable canary health check - these tests expect full control over requests # sent to the workers where canary health check intermittently sends dummy diff --git a/tests/fault_tolerance/cancellation/utils.py b/tests/fault_tolerance/cancellation/utils.py index 18f27a3558..adc9377c81 100644 --- a/tests/fault_tolerance/cancellation/utils.py +++ b/tests/fault_tolerance/cancellation/utils.py @@ -26,8 +26,8 @@ class DynamoFrontendProcess(ManagedProcess): def __init__(self, request): command = ["python", "-m", "dynamo.frontend"] - # Set debug logging environment env = os.environ.copy() + env["DYN_REQUEST_PLANE"] = request.getfixturevalue("request_plane") env["DYN_LOG"] = "debug" # Disable canary health check - these tests expect full control over requests # sent to the workers where canary health check intermittently sends dummy diff --git a/tests/fault_tolerance/migration/test_sglang.py b/tests/fault_tolerance/migration/test_sglang.py index e41da95b59..53f47cd31d 100644 --- a/tests/fault_tolerance/migration/test_sglang.py +++ b/tests/fault_tolerance/migration/test_sglang.py @@ -28,6 +28,7 @@ pytest.mark.gpu_1, pytest.mark.e2e, pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME), + pytest.mark.parametrize("request_plane", ["nats", "tcp"], indirect=True), pytest.mark.post_merge, # post_merge to pinpoint failure commit ] @@ -56,8 +57,9 @@ def __init__(self, request, worker_id: str, migration_limit: int = 3): str(migration_limit), ] - # Set debug logging environment + # Set environment variables env = os.environ.copy() + env["DYN_REQUEST_PLANE"] = request.getfixturevalue("request_plane") env["DYN_LOG"] = "debug" # Disable canary health check - these tests expect full control over requests # sent to the workers where canary health check intermittently sends dummy diff --git a/tests/fault_tolerance/migration/test_trtllm.py b/tests/fault_tolerance/migration/test_trtllm.py index 0b0da3a8b1..1af0530743 100644 --- a/tests/fault_tolerance/migration/test_trtllm.py +++ b/tests/fault_tolerance/migration/test_trtllm.py @@ -28,6 +28,7 @@ pytest.mark.gpu_1, pytest.mark.e2e, pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME), + pytest.mark.parametrize("request_plane", ["nats", "tcp"], indirect=True), pytest.mark.post_merge, # post_merge to pinpoint failure commit ] @@ -54,8 +55,9 @@ def __init__(self, request, worker_id: str, migration_limit: int = 3): str(migration_limit), ] - # Set debug logging environment + # Set environment variables env = os.environ.copy() + env["DYN_REQUEST_PLANE"] = request.getfixturevalue("request_plane") env["DYN_LOG"] = "debug" # Disable canary health check - these tests expect full control over requests # sent to the workers where canary health check intermittently sends dummy diff --git a/tests/fault_tolerance/migration/test_vllm.py b/tests/fault_tolerance/migration/test_vllm.py index 336cfb041b..d5a850770f 100644 --- a/tests/fault_tolerance/migration/test_vllm.py +++ b/tests/fault_tolerance/migration/test_vllm.py @@ -28,6 +28,7 @@ pytest.mark.gpu_1, pytest.mark.e2e, pytest.mark.model(FAULT_TOLERANCE_MODEL_NAME), + pytest.mark.parametrize("request_plane", ["nats", "tcp"], indirect=True), pytest.mark.post_merge, # post_merge to pinpoint failure commit ] @@ -53,8 +54,10 @@ def __init__(self, request, worker_id: str, migration_limit: int = 3): str(migration_limit), ] - # Set debug logging environment + # Set environment variables env = os.environ.copy() + env["DYN_REQUEST_PLANE"] = request.getfixturevalue("request_plane") + env["DYN_VLLM_KV_EVENT_PORT"] = f"2008{worker_id[-1]}" env["VLLM_NIXL_SIDE_CHANNEL_PORT"] = f"560{worker_id[-1]}" diff --git a/tests/fault_tolerance/migration/utils.py b/tests/fault_tolerance/migration/utils.py index 1ccd50f389..14ccc7afc9 100644 --- a/tests/fault_tolerance/migration/utils.py +++ b/tests/fault_tolerance/migration/utils.py @@ -23,13 +23,14 @@ class DynamoFrontendProcess(ManagedProcess): def __init__(self, request): command = ["python", "-m", "dynamo.frontend", "--router-mode", "round-robin"] - # Unset DYN_SYSTEM_PORT - frontend doesn't use system metrics server env = os.environ.copy() + env["DYN_REQUEST_PLANE"] = request.getfixturevalue("request_plane") # Disable canary health check - these tests expect full control over requests # sent to the workers where canary health check intermittently sends dummy # requests to workers interfering with the test process which may cause # intermittent failures env["DYN_HEALTH_CHECK_ENABLED"] = "false" + # Unset DYN_SYSTEM_PORT - frontend doesn't use system metrics server env.pop("DYN_SYSTEM_PORT", None) log_dir = f"{request.node.name}_frontend"