Skip to content

Commit 4b41801

Browse files
committed
test: enable parallel pytest execution with dynamic port allocation
Move get_pid() to ManagedProcess base class Signed-off-by: Keiven Chang <[email protected]>
1 parent f315374 commit 4b41801

32 files changed

+1321
-282
lines changed

.github/workflows/container-validation-dynamo.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ jobs:
7070
docker run -w /workspace \
7171
--name ${{ env.CONTAINER_ID }}_pytest_parallel \
7272
${{ steps.define_image_tag.outputs.image_tag }} \
73-
bash -c "pytest --basetemp=/tmp/pytest-parallel --junitxml=${{ env.PYTEST_PARALLEL_XML_FILE }} -n 4 -m \"${{ env.PYTEST_MARKS }}\""
73+
bash -c "pytest --basetemp=/tmp/pytest-parallel --junitxml=${{ env.PYTEST_PARALLEL_XML_FILE }} -n auto -m \"${{ env.PYTEST_MARKS }}\""
7474
- name: Copy parallel test report from Container
7575
if: always()
7676
run: |

components/src/dynamo/frontend/main.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -265,15 +265,19 @@ async def async_main():
265265
flags = parse_args()
266266
dump_config(flags.dump_config_to, flags)
267267

268-
# Warn if DYN_SYSTEM_PORT is set (frontend doesn't use system metrics server)
268+
# Warn and unset DYN_SYSTEM_PORT if set (frontend doesn't use system metrics server)
269+
# The frontend creates a DRT but should NOT start a system metrics server
270+
# Only backend workers should set DYN_SYSTEM_PORT
269271
if os.environ.get("DYN_SYSTEM_PORT"):
270272
logger.warning(
271273
"=" * 80 + "\n"
272274
"WARNING: DYN_SYSTEM_PORT is set but NOT used by the frontend!\n"
273275
"The frontend does not expose a system metrics server.\n"
274276
"Only backend workers should set DYN_SYSTEM_PORT.\n"
275-
"Use --http-port to configure the frontend HTTP API port.\n" + "=" * 80
277+
"Unsetting DYN_SYSTEM_PORT to prevent DRT from starting system server.\n"
278+
+ "=" * 80
276279
)
280+
os.environ.pop("DYN_SYSTEM_PORT", None)
277281

278282
# Configure Dynamo frontend HTTP service metrics prefix
279283
if flags.metrics_prefix is not None:

components/src/dynamo/sglang/publisher.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ def setup_prometheus_registry(
228228
SGLang uses multiprocess architecture where metrics are stored in shared memory.
229229
MultiProcessCollector aggregates metrics from all worker processes. The Prometheus
230230
registry collects sglang:* metrics which are exposed via the metrics server endpoint
231-
(set DYN_SYSTEM_PORT to a positive value to enable, e.g., DYN_SYSTEM_PORT=8081).
231+
(typically port 8081) when DYN_SYSTEM_PORT is set to a positive value.
232232
233233
Args:
234234
engine: The SGLang engine instance.

examples/backends/sglang/launch/disagg.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ python3 -m dynamo.frontend &
5050
DYNAMO_PID=$!
5151

5252
# run prefill worker
53-
OTEL_SERVICE_NAME=dynamo-worker-prefill DYN_SYSTEM_PORT=${DYN_SYSTEM_PORT_PREFILL:-8081} \
53+
OTEL_SERVICE_NAME=dynamo-worker-prefill DYN_SYSTEM_PORT=${DYN_SYSTEM_PORT1:-8081} \
5454
python3 -m dynamo.sglang \
5555
--model-path Qwen/Qwen3-0.6B \
5656
--served-model-name Qwen/Qwen3-0.6B \
@@ -65,7 +65,7 @@ python3 -m dynamo.sglang \
6565
PREFILL_PID=$!
6666

6767
# run decode worker
68-
OTEL_SERVICE_NAME=dynamo-worker-decode DYN_SYSTEM_PORT=${DYN_SYSTEM_PORT_DECODE:-8082} \
68+
OTEL_SERVICE_NAME=dynamo-worker-decode DYN_SYSTEM_PORT=${DYN_SYSTEM_PORT2:-8082} \
6969
CUDA_VISIBLE_DEVICES=1 python3 -m dynamo.sglang \
7070
--model-path Qwen/Qwen3-0.6B \
7171
--served-model-name Qwen/Qwen3-0.6B \

examples/backends/vllm/launch/disagg_same_gpu.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ DYNAMO_PID=$!
4848

4949
# run decode worker with metrics on port 8081
5050
# --enforce-eager is added for quick deployment. for production use, need to remove this flag
51-
DYN_SYSTEM_PORT=${DYN_SYSTEM_PORT:-8081} \
51+
DYN_SYSTEM_PORT=${DYN_SYSTEM_PORT1:-8081} \
5252
CUDA_VISIBLE_DEVICES=0 \
5353
python3 -m dynamo.vllm \
5454
--model Qwen/Qwen3-0.6B \
@@ -66,7 +66,7 @@ echo "Waiting for decode worker to initialize..."
6666
sleep 10
6767

6868
# run prefill worker with metrics on port 8082 (foreground)
69-
DYN_SYSTEM_PORT=${DYN_SYSTEM_PORT_PREFILL:-8082} \
69+
DYN_SYSTEM_PORT=${DYN_SYSTEM_PORT2:-8082} \
7070
DYN_VLLM_KV_EVENT_PORT=20081 \
7171
VLLM_NIXL_SIDE_CHANNEL_PORT=20097 \
7272
CUDA_VISIBLE_DEVICES=0 \

lib/runtime/src/config.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ pub struct RuntimeConfig {
101101
/// Set to 0 to bind to a random available port
102102
/// Set to a positive port number (e.g. 8081) to bind to a specific port
103103
/// Set this at runtime with environment variable DYN_SYSTEM_PORT
104+
/// TODO: Change type from i16 to u16 to support full port range (0-65535)
104105
#[builder(default = "DEFAULT_SYSTEM_PORT")]
105106
#[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
106107
pub system_port: i16,

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,8 @@ markers = [
220220
"model: model id used by a test or parameter",
221221
"custom_build: marks tests that require custom builds or special setup (e.g., MoE models)",
222222
"k8s: marks tests as requiring Kubernetes",
223-
"fault_tolerance: marks tests as fault tolerance tests"
223+
"fault_tolerance: marks tests as fault tolerance tests",
224+
"requires_hf_token: marks tests that require HuggingFace authentication token for gated models"
224225
]
225226

226227
# Linting/formatting

tests/conftest.py

Lines changed: 100 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,5 @@
11
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
22
# SPDX-License-Identifier: Apache-2.0
3-
#
4-
# Licensed under the Apache License, Version 2.0 (the "License");
5-
# you may not use this file except in compliance with the License.
6-
# You may obtain a copy of the License at
7-
#
8-
# http://www.apache.org/licenses/LICENSE-2.0
9-
#
10-
# Unless required by applicable law or agreed to in writing, software
11-
# distributed under the License is distributed on an "AS IS" BASIS,
12-
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
# See the License for the specific language governing permissions and
14-
# limitations under the License.
153

164
import logging
175
import os
@@ -25,6 +13,14 @@
2513

2614
from tests.utils.constants import TEST_MODELS
2715
from tests.utils.managed_process import ManagedProcess
16+
from tests.utils.port_utils import (
17+
allocate_free_port,
18+
allocate_free_ports,
19+
free_port,
20+
free_ports,
21+
)
22+
23+
_logger = logging.getLogger(__name__)
2824

2925

3026
def pytest_configure(config):
@@ -227,44 +223,124 @@ def pytest_collection_modifyitems(config, items):
227223

228224

229225
class EtcdServer(ManagedProcess):
230-
def __init__(self, request, port=2379, timeout=300):
226+
def __init__(self, request, port=None, timeout=300):
227+
# Allocate free ports if not specified
228+
use_random_port = port is None
229+
if use_random_port:
230+
# Need two ports: client port and peer port for parallel execution
231+
# Start from 2380 (etcd default 2379 + 1)
232+
port, peer_port = allocate_free_ports(2, 2380)
233+
else:
234+
peer_port = None
235+
236+
self.port = port
237+
self.peer_port = peer_port # Store for cleanup
231238
port_string = str(port)
232239
etcd_env = os.environ.copy()
233240
etcd_env["ALLOW_NONE_AUTHENTICATION"] = "yes"
234241
data_dir = tempfile.mkdtemp(prefix="etcd_")
242+
235243
command = [
236244
"etcd",
237245
"--listen-client-urls",
238246
f"http://0.0.0.0:{port_string}",
239247
"--advertise-client-urls",
240248
f"http://0.0.0.0:{port_string}",
241-
"--data-dir",
242-
data_dir,
243249
]
250+
251+
# Add peer port configuration only for random ports (parallel execution)
252+
if peer_port is not None:
253+
peer_port_string = str(peer_port)
254+
command.extend(
255+
[
256+
"--listen-peer-urls",
257+
f"http://0.0.0.0:{peer_port_string}",
258+
"--initial-advertise-peer-urls",
259+
f"http://localhost:{peer_port_string}",
260+
"--initial-cluster",
261+
f"default=http://localhost:{peer_port_string}",
262+
]
263+
)
264+
265+
command.extend(
266+
[
267+
"--data-dir",
268+
data_dir,
269+
]
270+
)
244271
super().__init__(
245272
env=etcd_env,
246273
command=command,
247274
timeout=timeout,
248275
display_output=False,
276+
terminate_existing=not use_random_port, # Disabled for parallel test execution with random ports
249277
health_check_ports=[port],
250278
data_dir=data_dir,
251279
log_dir=request.node.name,
252280
)
253281

282+
def __exit__(self, exc_type, exc_val, exc_tb):
283+
"""Release allocated ports when server exits."""
284+
ports_to_release = []
285+
try:
286+
# Release allocated ports BEFORE calling parent __exit__
287+
if hasattr(self, "port") and self.port is not None:
288+
ports_to_release.append(self.port)
289+
if hasattr(self, "peer_port") and self.peer_port is not None:
290+
ports_to_release.append(self.peer_port)
291+
292+
if ports_to_release:
293+
free_ports(ports_to_release)
294+
except Exception:
295+
# Silently continue if port release fails
296+
pass
297+
finally:
298+
# Always call parent __exit__ to terminate the process
299+
return super().__exit__(exc_type, exc_val, exc_tb)
300+
254301

255302
class NatsServer(ManagedProcess):
256-
def __init__(self, request, port=4222, timeout=300):
303+
def __init__(self, request, port=None, timeout=300):
304+
# Allocate a free port if not specified
305+
use_random_port = port is None
306+
if use_random_port:
307+
# Start from 4223 (nats-server default 4222 + 1)
308+
port = allocate_free_port(4223)
309+
310+
self.port = port
257311
data_dir = tempfile.mkdtemp(prefix="nats_")
258-
command = ["nats-server", "-js", "--trace", "--store_dir", data_dir]
312+
command = [
313+
"nats-server",
314+
"-js",
315+
"--trace",
316+
"--store_dir",
317+
data_dir,
318+
"-p",
319+
str(port),
320+
]
259321
super().__init__(
260322
command=command,
261323
timeout=timeout,
262324
display_output=False,
325+
terminate_existing=not use_random_port, # Disabled for parallel test execution with random ports
263326
data_dir=data_dir,
264327
health_check_ports=[port],
265328
log_dir=request.node.name,
266329
)
267330

331+
def __exit__(self, exc_type, exc_val, exc_tb):
332+
"""Release allocated port when server exits."""
333+
try:
334+
# Release allocated port BEFORE calling parent __exit__
335+
if hasattr(self, "port") and self.port is not None:
336+
free_port(self.port)
337+
except Exception:
338+
# Silently continue if port release fails
339+
pass
340+
finally:
341+
# Always call parent __exit__ to terminate the process
342+
return super().__exit__(exc_type, exc_val, exc_tb)
343+
268344

269345
class SharedManagedProcess:
270346
"""Base class for ManagedProcess with file-based reference counting for multi-process sharing."""
@@ -393,6 +469,13 @@ def _create_server(self) -> ManagedProcess:
393469

394470
@pytest.fixture()
395471
def runtime_services(request):
472+
"""Provide NATS and Etcd servers with dynamically allocated ports.
473+
474+
Returns a tuple of (nats_process, etcd_process) where each has a .port attribute.
475+
Tests should set NATS_SERVER and ETCD_ENDPOINTS environment variables in their
476+
subprocess environments using these ports.
477+
"""
478+
# Port cleanup is now handled in NatsServer and EtcdServer __exit__ methods
396479
with NatsServer(request) as nats_process:
397480
with EtcdServer(request) as etcd_process:
398481
yield nats_process, etcd_process

0 commit comments

Comments
 (0)