|
1 | 1 | # SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. |
2 | 2 | # SPDX-License-Identifier: Apache-2.0 |
3 | | -# |
4 | | -# Licensed under the Apache License, Version 2.0 (the "License"); |
5 | | -# you may not use this file except in compliance with the License. |
6 | | -# You may obtain a copy of the License at |
7 | | -# |
8 | | -# http://www.apache.org/licenses/LICENSE-2.0 |
9 | | -# |
10 | | -# Unless required by applicable law or agreed to in writing, software |
11 | | -# distributed under the License is distributed on an "AS IS" BASIS, |
12 | | -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13 | | -# See the License for the specific language governing permissions and |
14 | | -# limitations under the License. |
15 | 3 |
|
16 | 4 | import logging |
17 | 5 | import os |
|
26 | 14 |
|
27 | 15 | from tests.utils.constants import TEST_MODELS |
28 | 16 | from tests.utils.managed_process import ManagedProcess |
| 17 | +from tests.utils.port_utils import ( |
| 18 | + allocate_free_port, |
| 19 | + allocate_free_ports, |
| 20 | + free_port, |
| 21 | + free_ports, |
| 22 | +) |
| 23 | + |
| 24 | +_logger = logging.getLogger(__name__) |
29 | 25 |
|
30 | 26 |
|
31 | 27 | def pytest_configure(config): |
@@ -249,43 +245,121 @@ def pytest_runtestloop(session): |
249 | 245 |
|
250 | 246 | class EtcdServer(ManagedProcess): |
251 | 247 | def __init__(self, request, port=2379, timeout=300): |
| 248 | + # Allocate free ports if port is None or 0 |
| 249 | + use_random_port = port == 0 |
| 250 | + if use_random_port: |
| 251 | + # Need two ports: client port and peer port for parallel execution |
| 252 | + # Start from 2380 (etcd default 2379 + 1) |
| 253 | + port, peer_port = allocate_free_ports(2, 2380) |
| 254 | + else: |
| 255 | + peer_port = None |
| 256 | + |
| 257 | + self.port = port |
| 258 | + self.peer_port = peer_port # Store for cleanup |
252 | 259 | port_string = str(port) |
253 | 260 | etcd_env = os.environ.copy() |
254 | 261 | etcd_env["ALLOW_NONE_AUTHENTICATION"] = "yes" |
255 | 262 | data_dir = tempfile.mkdtemp(prefix="etcd_") |
| 263 | + |
256 | 264 | command = [ |
257 | 265 | "etcd", |
258 | 266 | "--listen-client-urls", |
259 | 267 | f"http://0.0.0.0:{port_string}", |
260 | 268 | "--advertise-client-urls", |
261 | 269 | f"http://0.0.0.0:{port_string}", |
262 | | - "--data-dir", |
263 | | - data_dir, |
264 | 270 | ] |
| 271 | + |
| 272 | + # Add peer port configuration only for random ports (parallel execution) |
| 273 | + if peer_port is not None: |
| 274 | + peer_port_string = str(peer_port) |
| 275 | + command.extend( |
| 276 | + [ |
| 277 | + "--listen-peer-urls", |
| 278 | + f"http://0.0.0.0:{peer_port_string}", |
| 279 | + "--initial-advertise-peer-urls", |
| 280 | + f"http://localhost:{peer_port_string}", |
| 281 | + "--initial-cluster", |
| 282 | + f"default=http://localhost:{peer_port_string}", |
| 283 | + ] |
| 284 | + ) |
| 285 | + |
| 286 | + command.extend( |
| 287 | + [ |
| 288 | + "--data-dir", |
| 289 | + data_dir, |
| 290 | + ] |
| 291 | + ) |
265 | 292 | super().__init__( |
266 | 293 | env=etcd_env, |
267 | 294 | command=command, |
268 | 295 | timeout=timeout, |
269 | 296 | display_output=False, |
| 297 | + terminate_existing=not use_random_port, # Disabled for parallel test execution with random ports |
270 | 298 | health_check_ports=[port], |
271 | 299 | data_dir=data_dir, |
272 | 300 | log_dir=request.node.name, |
273 | 301 | ) |
274 | 302 |
|
| 303 | + def __exit__(self, exc_type, exc_val, exc_tb): |
| 304 | + """Release allocated ports when server exits.""" |
| 305 | + ports_to_release = [] |
| 306 | + try: |
| 307 | + # Release allocated ports BEFORE calling parent __exit__ |
| 308 | + if hasattr(self, "port") and self.port is not None: |
| 309 | + ports_to_release.append(self.port) |
| 310 | + if hasattr(self, "peer_port") and self.peer_port is not None: |
| 311 | + ports_to_release.append(self.peer_port) |
| 312 | + |
| 313 | + if ports_to_release: |
| 314 | + free_ports(ports_to_release) |
| 315 | + except Exception as e: |
| 316 | + logging.warning(f"Failed to release EtcdServer port: {e}") |
| 317 | + finally: |
| 318 | + # Always call parent __exit__ to terminate the process |
| 319 | + return super().__exit__(exc_type, exc_val, exc_tb) |
| 320 | + |
275 | 321 |
|
276 | 322 | class NatsServer(ManagedProcess): |
277 | 323 | def __init__(self, request, port=4222, timeout=300): |
| 324 | + # Allocate a free port if port is None or 0 |
| 325 | + use_random_port = port == 0 |
| 326 | + if use_random_port: |
| 327 | + # Start from 4223 (nats-server default 4222 + 1) |
| 328 | + port = allocate_free_port(4223) |
| 329 | + |
| 330 | + self.port = port |
278 | 331 | data_dir = tempfile.mkdtemp(prefix="nats_") |
279 | | - command = ["nats-server", "-js", "--trace", "--store_dir", data_dir] |
| 332 | + command = [ |
| 333 | + "nats-server", |
| 334 | + "-js", |
| 335 | + "--trace", |
| 336 | + "--store_dir", |
| 337 | + data_dir, |
| 338 | + "-p", |
| 339 | + str(port), |
| 340 | + ] |
280 | 341 | super().__init__( |
281 | 342 | command=command, |
282 | 343 | timeout=timeout, |
283 | 344 | display_output=False, |
| 345 | + terminate_existing=not use_random_port, # Disabled for parallel test execution with random ports |
284 | 346 | data_dir=data_dir, |
285 | 347 | health_check_ports=[port], |
286 | 348 | log_dir=request.node.name, |
287 | 349 | ) |
288 | 350 |
|
| 351 | + def __exit__(self, exc_type, exc_val, exc_tb): |
| 352 | + """Release allocated port when server exits.""" |
| 353 | + try: |
| 354 | + # Release allocated port BEFORE calling parent __exit__ |
| 355 | + if hasattr(self, "port") and self.port is not None: |
| 356 | + free_port(self.port) |
| 357 | + except Exception as e: |
| 358 | + logging.warning(f"Failed to release NatsServer port: {e}") |
| 359 | + finally: |
| 360 | + # Always call parent __exit__ to terminate the process |
| 361 | + return super().__exit__(exc_type, exc_val, exc_tb) |
| 362 | + |
289 | 363 |
|
290 | 364 | class SharedManagedProcess: |
291 | 365 | """Base class for ManagedProcess with file-based reference counting for multi-process sharing.""" |
@@ -414,11 +488,45 @@ def _create_server(self) -> ManagedProcess: |
414 | 488 |
|
415 | 489 | @pytest.fixture() |
416 | 490 | def runtime_services(request): |
| 491 | + """Provide NATS and Etcd servers with dynamically allocated ports. |
| 492 | +
|
| 493 | + Returns a tuple of (nats_process, etcd_process) where each has a .port attribute. |
| 494 | + Tests should set NATS_SERVER and ETCD_ENDPOINTS environment variables in their |
| 495 | + subprocess environments using these ports. |
| 496 | + """ |
| 497 | + # Port cleanup is now handled in NatsServer and EtcdServer __exit__ methods |
417 | 498 | with NatsServer(request) as nats_process: |
418 | 499 | with EtcdServer(request) as etcd_process: |
419 | 500 | yield nats_process, etcd_process |
420 | 501 |
|
421 | 502 |
|
| 503 | +@pytest.fixture() |
| 504 | +def runtime_services_dynamic_ports(request): |
| 505 | + """Provide NATS and Etcd servers with truly dynamic ports. |
| 506 | +
|
| 507 | + This fixture actually allocates dynamic ports by passing port=0 to the servers. |
| 508 | + It also sets the NATS_SERVER and ETCD_ENDPOINTS environment variables so that |
| 509 | + Dynamo processes can find the services on the dynamic ports. |
| 510 | +
|
| 511 | + Returns a tuple of (nats_process, etcd_process) where each has a .port attribute. |
| 512 | + """ |
| 513 | + import os |
| 514 | + |
| 515 | + # Port cleanup is now handled in NatsServer and EtcdServer __exit__ methods |
| 516 | + with NatsServer(request, port=0) as nats_process: |
| 517 | + with EtcdServer(request, port=0) as etcd_process: |
| 518 | + # Set environment variables for the dynamic ports. |
| 519 | + # xdist (parallel execution) will launch isolated tests in a new process, so no need to worry about environment pollution. |
| 520 | + os.environ["NATS_SERVER"] = f"nats://localhost:{nats_process.port}" |
| 521 | + os.environ["ETCD_ENDPOINTS"] = f"http://localhost:{etcd_process.port}" |
| 522 | + |
| 523 | + yield nats_process, etcd_process |
| 524 | + |
| 525 | + # Clean up environment variables after test |
| 526 | + os.environ.pop("NATS_SERVER", None) |
| 527 | + os.environ.pop("ETCD_ENDPOINTS", None) |
| 528 | + |
| 529 | + |
422 | 530 | @pytest.fixture(scope="session") |
423 | 531 | def runtime_services_session(request, tmp_path_factory): |
424 | 532 | """Session-scoped fixture that provides shared NATS and etcd instances for all tests. |
|
0 commit comments