From 1e009aca3874ed30fccb2c9e587220e4df024c25 Mon Sep 17 00:00:00 2001 From: tmontfort Date: Fri, 21 Nov 2025 16:56:42 -0800 Subject: [PATCH 01/15] get vllm agg working --- tests/conftest.py | 11 +- .../deploy/SCENARIO_ARCHITECTURE.md | 233 ++++++++++++++++++ tests/fault_tolerance/deploy/client.py | 136 ++++++++-- .../fault_tolerance/deploy/client_factory.py | 1 + tests/fault_tolerance/deploy/conftest.py | 13 + tests/fault_tolerance/deploy/legacy_client.py | 2 +- tests/fault_tolerance/deploy/parse_results.py | 31 ++- tests/fault_tolerance/deploy/scenarios.py | 110 +++++++-- .../fault_tolerance/deploy/test_deployment.py | 142 ++++++++--- tests/utils/managed_deployment.py | 196 +++++++++++---- 10 files changed, 748 insertions(+), 127 deletions(-) create mode 100644 tests/fault_tolerance/deploy/SCENARIO_ARCHITECTURE.md diff --git a/tests/conftest.py b/tests/conftest.py index e4a4b562a6..f315d2dafd 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -19,6 +19,7 @@ import tempfile from pathlib import Path from typing import Optional +from datetime import datetime import pytest from filelock import FileLock @@ -191,10 +192,14 @@ def predownload_tokenizers(pytestconfig): @pytest.fixture(autouse=True) def logger(request): - log_path = os.path.join(request.node.name, "test.log.txt") + timestamp = datetime.now().strftime("%m-%d-%Y_%H-%M-%S") + log_dir = f"{request.node.name}_{timestamp}" + request.node.log_dir = log_dir + log_path = os.path.join(log_dir, "test.log.txt") + logger = logging.getLogger() - shutil.rmtree(request.node.name, ignore_errors=True) - os.makedirs(request.node.name, exist_ok=True) + shutil.rmtree(log_dir, ignore_errors=True) + os.makedirs(log_dir, exist_ok=True) handler = logging.FileHandler(log_path, mode="w") formatter = logging.Formatter(LOG_FORMAT, datefmt=DATE_FORMAT) handler.setFormatter(formatter) diff --git a/tests/fault_tolerance/deploy/SCENARIO_ARCHITECTURE.md b/tests/fault_tolerance/deploy/SCENARIO_ARCHITECTURE.md new file mode 100644 index 0000000000..3d63e997f0 --- /dev/null +++ b/tests/fault_tolerance/deploy/SCENARIO_ARCHITECTURE.md @@ -0,0 +1,233 @@ +# Fault Tolerance Test Architecture + +This document explains the relationship between **scenarios**, **deployment specs**, **load**, and **failures** in the fault tolerance testing framework. + +## Core Components + +### Scenario +Top-level test configuration combining: +- **Deployment Spec**: Kubernetes deployment (workers, replicas, TP/DP config) +- **Load**: Client load generation (clients, requests, tokens, thresholds) +- **Failures**: List of failures to inject (timing, pod/process targets) +- **Model**: Optional model identifier +- **Backend**: Backend type (vllm, sglang, trtllm) +- **Checkers**: Validation checkers for post-test validation + +### Deployment Spec +Kubernetes deployment configuration loaded from YAML files. Defines worker types, replica counts (data parallelism), tensor parallel size, and backend-specific arguments. + +### Load +Client load configuration: number of clients (default: 10), requests per client (default: 150), input/output token lengths (default: 100), retry settings, client type ("aiperf" or "legacy"), and success threshold (default: 90.0%). + +### Failure +Fault injection definition: `time` (seconds relative to previous failure), `pod_name` (target pod/service), `command` ("delete_pod" or process name), `signal` (SIGKILL, SIGINT, etc.), and `replicas` (number to affect, default: 1). + +## Execution Flow + +### Test Execution Sequence + +1. **Deployment**: The deployment spec is deployed to Kubernetes +2. **Client Launch**: Client processes are started (in parallel) +3. **Load Generation**: Clients send requests while failures are injected +4. **Failure Injection**: Failures are injected sequentially (see below) +5. **Teardown**: Deployment is cleaned up +6. **Validation**: Checkers validate test results + +### Parallel vs Sequential Execution + +#### Can They Be Run in Parallel? + +**Clients run in parallel**: Multiple client processes are spawned concurrently using `multiprocessing.Process`. Each client: +- Selects a frontend pod using round-robin +- Sets up port forwarding +- Generates load independently + +**Failures are injected sequentially**: The `_inject_failures()` function processes failures one at a time: + +```python +def _inject_failures(failures, logger, deployment: ManagedDeployment): + affected_pods = {} + + for failure in failures: + time.sleep(failure.time) # Wait before injecting + # ... inject failure ... + + return affected_pods +``` + +**Load and failures run concurrently**: Clients generate load while failures are being injected. The test uses a context manager pattern: + +```python +with _clients(...): # Clients run in background + # Inject failures while clients are running + affected_pods = _inject_failures(scenario.failures, logger, deployment) +``` + +### Multiple Failures Execution + +**Failures are executed sequentially**, not in parallel. Each failure has a `time` field that specifies how many seconds to wait **after the previous failure** before injecting the current one. + +**Example**: +```python +failures = [ + Failure(time=30, pod_name="VllmDecodeWorker", command="delete_pod"), # After 30s + Failure(time=60, pod_name="Frontend", command="delete_pod"), # After 60s more (90s total) + Failure(time=30, pod_name="VllmPrefillWorker", command="SIGKILL"), # After 30s more (120s total) +] +``` + +**Execution timeline**: +- T=0s: Test starts, clients begin sending requests +- T=30s: First failure injected (delete decode worker pod) +- T=90s: Second failure injected (delete frontend pod) +- T=120s: Third failure injected (kill prefill worker process) + +**Note**: The `time` field is relative to the **previous** failure, not absolute time from test start. + +## Success Criteria + +### Test Success Definition + +A scenario is considered successful if: + +1. **No exceptions raised**: The test completes without raising unhandled exceptions +2. **Validation checkers pass**: All checkers in `scenario.checkers` pass their validation +3. **Success rate threshold met**: The success rate (successful requests / total requests) meets or exceeds `scenario.load.success_threshold` (default: 90.0%) + +### Validation Stages + +Validation happens in **two stages**: + +#### Stage 1: Scenario Verification +Checkers verify that the test scenario executed correctly: +- **PodDeletionChecker**: Verifies pods were actually deleted (via K8s events) +- **ProcessTerminationChecker**: Verifies processes were terminated +- **ContainerRestartChecker**: Verifies containers restarted after failures + +These checkers ensure the failures were actually injected, not just that the code ran. + +#### Stage 2: Results Validation +Checkers verify system behavior based on parsed results: +- **SuccessRateChecker**: Verifies success rate meets threshold +- **RecoveryTimeChecker**: Verifies recovery time is within acceptable bounds +- **NoFailuresChecker**: Verifies no unexpected failures occurred + +### Success Criteria by Component + +| Component | Success Criteria | +|-----------|------------------| +| **Individual Failure** | Not evaluated separately - failures are part of the scenario | +| **Whole Scenario** | All checkers pass AND success rate ≥ threshold AND no exceptions | +| **Test Execution** | No unhandled exceptions during test execution | + +### Failure Handling + +- **If a checker fails**: An `AssertionError` is raised, causing the test to fail +- **If success rate is below threshold**: The `SuccessRateChecker` raises an `AssertionError` +- **If parsing fails**: The test continues but validation is skipped (warning logged) +- **If validation errors occur** (non-assertion exceptions): The test continues but validation is skipped (warning logged) + +### Example Success Flow + +``` +1. Test starts → Deploy deployment spec +2. Clients launch → Generate load in parallel +3. Failures injected → Sequentially at specified times +4. Test completes → Clients finish, deployment torn down +5. Results parsed → Extract metrics from client logs +6. Validation runs: + ✓ PodDeletionChecker: Pod was deleted (K8s events confirm) + ✓ SuccessRateChecker: 95% success rate ≥ 90% threshold + ✓ RecoveryTimeChecker: Recovery time 45s < 60s limit +7. Test passes ✓ +``` + +## Relationship Summary + +``` +Scenario +├── Deployment Spec (what to deploy) +│ ├── Worker types and counts +│ ├── Resource requirements +│ └── Backend configuration +│ +├── Load (how to generate traffic) +│ ├── Client count and concurrency +│ ├── Request parameters +│ └── Success thresholds +│ +└── Failures (what faults to inject) + ├── Failure 1 (time=30s) + ├── Failure 2 (time=60s) + └── Failure 3 (time=30s) + └── Executed sequentially +``` + +## Defined Scenarios + +Scenarios are automatically generated from the Cartesian product of deployments and failures. The naming convention is: `{deployment_name}-{failure_name}`. + +### Deployment Configurations + +**Standard Deployments** (for vllm, sglang, trtllm backends): +- **Aggregated (agg)**: `{backend}-agg-tp-{1|2|4}-dp-{1|2}` +- **Disaggregated (disagg)**: `{backend}-disagg-prefill-tp-{1|2|4}-decode-tp-{1|2|4}-dp-{1}` + +**MoE Deployments** (vllm only): +- `vllm-moe-agg-tp-1-dp-2` +- `vllm-moe-disagg-tp-1-dp-2` + +**Total Standard Deployments**: 3 backends × 2 types × 4 configs = 24 (some disagg configs skipped when TP>1 and DP>1) + +### Failure Types + +**Common Failures** (all backends): +- `none`: No failure injection (baseline test) +- `frontend`: Terminate frontend process (SIGINT to `dynamo.frontend`) +- `frontend_pod`: Delete frontend pod +- `decode_worker`: Terminate decode worker process (SIGKILL to `dynamo.{backend}`) +- `decode_worker_pod`: Delete decode worker pod +- `prefill_worker`: Terminate prefill worker process (SIGKILL to `dynamo.{backend}`) - disagg only +- `prefill_worker_pod`: Delete prefill worker pod - disagg only + +**vLLM-Specific Failures**: +- `vllm_decode_engine_core`: Kill VLLM::EngineCore process in decode worker +- `vllm_prefill_engine_core`: Kill VLLM::EngineCore process in prefill worker - disagg only + +**SGLang-Specific Failures**: +- `sglang_decode_scheduler`: Kill sglang::scheduler process in decode worker +- `sglang_decode_detokenizer`: Kill sglang::detokenizer process in decode worker +- `sglang_prefill_scheduler`: Kill sglang::scheduler process in prefill worker - disagg only +- `sglang_prefill_detokenizer`: Kill sglang::detokenizer process in prefill worker - disagg only + +**Token Overflow Scenarios**: +- `vllm_agg_token_overflow_2x` +- `vllm_disagg_token_overflow_2x` +- `trtllm_agg_token_overflow_2x` +- `trtllm_disagg_token_overflow_2x` +- `sglang_agg_token_overflow_2x` +- `sglang_disagg_token_overflow_2x` + +### Example Scenario Names + +- `vllm-agg-tp-1-dp-1-frontend_pod`: vLLM aggregated, TP=1, DP=1, delete frontend pod +- `sglang-disagg-prefill-tp-2-decode-tp-2-dp-1-decode_worker`: SGLang disaggregated, TP=2, delete decode worker pod +- `trtllm-agg-tp-4-dp-1-none`: TRT-LLM aggregated, TP=4, DP=1, no failures (baseline) +- `vllm-moe-agg-tp-1-dp-2-decode_worker_pod`: vLLM MoE aggregated, delete decode worker pod + +### Total Scenario Count + +Approximately **200+ scenarios** generated from: +- 24 standard deployments × ~7-9 failures per deployment +- 2 MoE deployments × ~7 failures +- 6 token overflow scenarios + +## Key Takeaways + +1. **Scenarios combine deployment, load, and failures** into a single test configuration +2. **Clients run in parallel**, but **failures are injected sequentially** +3. **Load and failures run concurrently** - failures are injected while clients are generating load +4. **Success is evaluated at the scenario level**, not per failure +5. **Success requires**: No exceptions + All checkers pass + Success rate ≥ threshold +6. **Multiple failures use relative timing** - each failure's `time` is relative to the previous one + diff --git a/tests/fault_tolerance/deploy/client.py b/tests/fault_tolerance/deploy/client.py index 03432c0d38..b457788954 100644 --- a/tests/fault_tolerance/deploy/client.py +++ b/tests/fault_tolerance/deploy/client.py @@ -18,6 +18,7 @@ import json import logging import os +import signal import subprocess import time from pathlib import Path @@ -60,7 +61,7 @@ def get_frontend_port( Returns: Tuple of (pod_name, local_port, pod_instance) or (None, None, None) if failed """ - pods = managed_deployment.get_pods(managed_deployment.frontend_service_name) + pods = managed_deployment.get_pods([managed_deployment.frontend_service_name]) port = 0 pod_name = None @@ -270,6 +271,7 @@ def run_aiperf( logger: logging.Logger, max_retries: int = 1, retry_delay: float = 1, + continuous_load: bool = False, ) -> bool: """ Execute AI-Perf with specified parameters. @@ -280,13 +282,14 @@ def run_aiperf( model: Model name pod_name: Selected pod name for logging port: Local port number - requests_per_client: Number of requests to send + requests_per_client: Number of requests to send (used if continuous load not enabled) input_token_length: Input token count output_token_length: Output token count output_dir: Directory for AI-Perf artifacts logger: Logger instance max_retries: Maximum number of retry attempts (default: 1) retry_delay: Delay in seconds between retries (default: 1) + continous_load: If True, use continuous load instead of fixed request count Returns: True if successful, False otherwise @@ -315,8 +318,6 @@ def run_aiperf( # Enable streaming for TTFT and ITL metrics "--streaming", # Request parameters - "--request-count", - str(requests_per_client), # Required: how many requests "--concurrency", "1", # Optional: we set to 1 for sequential # Token configuration @@ -338,8 +339,18 @@ def run_aiperf( "100", # For reproducible results ] - # Calculate timeout (same as legacy would for all requests) - timeout = max(requests_per_client * 2 + 60, 300) # At least 5 minutes + # Add request parameters based on continuous load mode + if continuous_load: + # Use benchmark duration for continuous load + cmd.extend(["--benchmark-duration", "1800"]) # 30 minutes for continuous load + logger.info("Using continuous load with duration: 30 minutes") + # Set a very long timeout for duration-based tests + timeout = 1860 # 31 minutes default for duration-based tests (30 minutes + 1 minute buffer) + else: + # Normal mode - use requests_per_client + cmd.extend(["--request-count", str(requests_per_client)]) + # Calculate timeout (same as legacy would for all requests) + timeout = max(requests_per_client * 2 + 60, 300) # At least 5 minutes # Log execution logger.info(f"Starting AI-Perf for Pod {pod_name} Local Port {port}") @@ -354,15 +365,20 @@ def run_aiperf( logger.info(f"Command: {' '.join(cmd)}") # Retry logic for fault tolerance - retry FULL request count until success - - max_attempts = max_retries if max_retries > 0 else 1 + # Note: For continuous load, we only run once and expect SIGINT to stop it + max_attempts = 1 if continuous_load else (max_retries if max_retries > 0 else 1) success = False all_results = [] for attempt in range(max_attempts): - logger.info( - f"AI-Perf attempt {attempt + 1}/{max_attempts} with {requests_per_client} requests" - ) + if continuous_load: + logger.info( + "AI-Perf continuous load (will run until interrupted by SIGINT)" + ) + else: + logger.info( + f"AI-Perf attempt {attempt + 1}/{max_attempts} with {requests_per_client} requests" + ) # Update output directory for this attempt attempt_dir = output_dir / f"attempt_{attempt}" @@ -374,13 +390,17 @@ def run_aiperf( cmd_attempt[artifact_dir_idx] = str(attempt_dir) try: - result = subprocess.run( - cmd_attempt, - capture_output=True, - text=True, - timeout=timeout, - stdin=subprocess.DEVNULL, # Prevent stdin reading which can cause process suspension - ) + if continuous_load: + result = run_continuous_load_process(cmd_attempt, logger, timeout) + else: + # Normal mode - use subprocess.run + result = subprocess.run( + cmd_attempt, + capture_output=True, + text=True, + timeout=timeout, + stdin=subprocess.DEVNULL, # Prevent stdin reading which can cause process suspension + ) # Save logs for this attempt with open(attempt_dir / "genai_perf.log", "w") as f: @@ -398,6 +418,7 @@ def run_aiperf( } ) + # Even with continous load, with SIGINT, aiperf should return 0 and create the profile_export_aiperf.json file if result.returncode == 0: # AI-Perf returns 0 even if all requests failed, so we need to check the output json_path = attempt_dir / "profile_export_aiperf.json" @@ -412,6 +433,19 @@ def run_aiperf( ) if success: break # Success - exit the retry loop + ## TODO: bug with aiperf git+https://github.com/ai-dynamo/aiperf.git@4d3fa29403c8f75da22a14f1f7b3aeb27db9288f + ## where sending a SIGINT on Mac can sometimes have an error code of -9 (SIGABRT) which results in profile_export_aiperf.json not being created + elif result.returncode == -9 and continuous_load: + logger.warning( + f""" + Attempt {attempt + 1} failed with return code {result.returncode} + This is a known bug with aiperf on Mac where sending a SIGINT can sometimes have an error code of -9 (SIGABRT) + which results in profile_export_aiperf.json not being created + """ + ) + logger.debug( + f"Stderr: {result.stderr[:500] if result.stderr else 'No stderr'}" + ) else: logger.warning( f"Attempt {attempt + 1} failed with return code {result.returncode}" @@ -423,20 +457,75 @@ def run_aiperf( logger.error(f"Error in attempt {attempt + 1}: {str(e)}") all_results.append({"attempt": attempt + 1, "error": str(e)}) - # Sleep before next attempt (if not the last attempt) - if not success and attempt < max_attempts - 1: + # Sleep before next attempt (if not the last attempt and not continuous load) + if not success and attempt < max_attempts - 1 and not continuous_load: time.sleep(retry_delay) - if success: + if success and not continuous_load: logger.info( f"AI-Perf successfully completed all {requests_per_client} requests for {pod_name}" ) + elif success and continuous_load: + logger.info( + f"AI-Perf sustained continuous load for {pod_name} and existed succesfully" + ) else: logger.error(f"AI-Perf failed all {max_attempts} attempts for {pod_name}") return success +def run_continuous_load_process( + cmd_attempt: List[str], + logger: logging.Logger, + timeout: int, +) -> subprocess.CompletedProcess: + """ + Run aiperf in continuous load mode and return the result. + + Handles SIGINT and timeout when running with subprocess.Popen. + """ + proc = subprocess.Popen( + cmd_attempt, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + stdin=subprocess.DEVNULL, + ) + + # Set up signal handler to forward SIGINT to subprocess + def signal_handler(signum, frame): + logger.info(f"Received signal {signum}, forwarding to aiperf subprocess") + try: + proc.send_signal(signal.SIGINT) + except ProcessLookupError: + pass # Process already terminated + + signal.signal(signal.SIGINT, signal_handler) + + try: + stdout, stderr = proc.communicate(timeout=timeout) + returncode = proc.returncode + except subprocess.TimeoutExpired: + logger.warning(f"AI-Perf subprocess timed out after {timeout}s") + proc.kill() + stdout, stderr = proc.communicate() + returncode = proc.returncode + except KeyboardInterrupt: + logger.info("Received KeyboardInterrupt, sending SIGINT to aiperf subprocess") + proc.send_signal(signal.SIGINT) + try: + stdout, stderr = proc.communicate(timeout=30) # Give it time to clean up + returncode = proc.returncode + except subprocess.TimeoutExpired: + logger.warning("Subprocess didn't terminate gracefully, killing it") + proc.kill() + stdout, stderr = proc.communicate() + returncode = proc.returncode + + return subprocess.CompletedProcess(cmd_attempt, returncode, stdout, stderr) + + def log_summary_metrics( output_dir: Path, logger: logging.Logger, pod_name: str, port: int ) -> None: @@ -513,6 +602,7 @@ def client( output_token_length: int, max_retries: int, retry_delay: float = 1, + continuous_load: bool = False, ): """ Generate load using AI-Perf for fault tolerance testing. @@ -527,11 +617,12 @@ def client( model: Model name log_dir: Directory for output logs and AI-Perf artifacts index: Client index used for round-robin pod selection - requests_per_client: Number of requests to generate + requests_per_client: Number of requests to generate (used if continuous load not enabled) input_token_length: Number of input tokens per request output_token_length: Number of output tokens per request max_retries: Maximum retry attempts for AI-Perf execution retry_delay: Delay in seconds between retry attempts + continuous_load: If True, use continuous load instead of fixed request count """ logger = logging.getLogger(f"CLIENT: {index}") logging.getLogger("httpx").setLevel(logging.WARNING) @@ -578,6 +669,7 @@ def client( logger=logger, max_retries=max_retries, retry_delay=retry_delay, + continuous_load=continuous_load, ) if not success: diff --git a/tests/fault_tolerance/deploy/client_factory.py b/tests/fault_tolerance/deploy/client_factory.py index 936122f082..d8f8e3f99f 100644 --- a/tests/fault_tolerance/deploy/client_factory.py +++ b/tests/fault_tolerance/deploy/client_factory.py @@ -42,6 +42,7 @@ def get_client_function(client_type: str) -> Callable: output_token_length, max_retries, retry_delay_or_rate, # Differs between implementations + continuous_load, ) Raises: diff --git a/tests/fault_tolerance/deploy/conftest.py b/tests/fault_tolerance/deploy/conftest.py index 70545b9526..74f463a9b6 100644 --- a/tests/fault_tolerance/deploy/conftest.py +++ b/tests/fault_tolerance/deploy/conftest.py @@ -35,6 +35,13 @@ def pytest_addoption(parser): help="Include tests that require custom builds (e.g., MoE models). " "By default, these tests are excluded.", ) + parser.addoption( + "--skip-restart-services", + action="store_true", + default=False, + help="Skip restarting NATS and etcd services before deployment. " + "By default, these services are restarted.", + ) def pytest_generate_tests(metafunc): @@ -109,3 +116,9 @@ def namespace(request): def client_type(request): """Get client type from command line or use scenario default.""" return request.config.getoption("--client-type") + + +@pytest.fixture +def skip_restart_services(request): + """Get skip restart services flag from command line.""" + return request.config.getoption("--skip-restart-services") diff --git a/tests/fault_tolerance/deploy/legacy_client.py b/tests/fault_tolerance/deploy/legacy_client.py index 5cb4df4557..3200e48850 100644 --- a/tests/fault_tolerance/deploy/legacy_client.py +++ b/tests/fault_tolerance/deploy/legacy_client.py @@ -228,7 +228,7 @@ def client( for i in range(requests_per_client): # Get available pods pods = managed_deployment.get_pods( - managed_deployment.frontend_service_name + [managed_deployment.frontend_service_name] ) port = 0 pod_name = None diff --git a/tests/fault_tolerance/deploy/parse_results.py b/tests/fault_tolerance/deploy/parse_results.py index 66bc967e9b..00c1839468 100644 --- a/tests/fault_tolerance/deploy/parse_results.py +++ b/tests/fault_tolerance/deploy/parse_results.py @@ -341,6 +341,7 @@ def parse_aiperf_client_results(log_dir: str) -> Dict[str, Any]: Returns: Dictionary with aggregated metrics and client count """ + logger = logging.getLogger(__name__) all_metrics: Dict[str, Any] = { "total_requests": 0, "successful_requests": 0, @@ -382,22 +383,28 @@ def parse_aiperf_client_results(log_dir: str) -> Dict[str, Any]: with open(profile_json) as f: client_metrics = json.load(f) - # AI-Perf format has "records" dictionary at the top level + # AI-Perf format can have "records" dictionary or metrics at top level + # Try records first (older format), then fall back to top level (newer format) records = client_metrics.get("records", {}) - # Extract successful request count - request_count_record = records.get("request_count", {}) + # Extract successful request count - check both locations + request_count_record = records.get( + "request_count" + ) or client_metrics.get("request_count", {}) successful_count = ( int(request_count_record.get("avg", 0)) - if request_count_record + if request_count_record and isinstance(request_count_record, dict) else 0 ) - # Extract error request count - error_request_count_record = records.get("error_request_count", {}) + # Extract error request count - check both locations + error_request_count_record = records.get( + "error_request_count" + ) or client_metrics.get("error_request_count", {}) error_request_count = ( int(error_request_count_record.get("avg", 0)) if error_request_count_record + and isinstance(error_request_count_record, dict) else 0 ) @@ -418,9 +425,17 @@ def parse_aiperf_client_results(log_dir: str) -> Dict[str, Any]: # Sum up actual error counts from each error type error_count = sum(error.get("count", 0) for error in error_summary) - # Check if test was cancelled + # Log if test was cancelled (expected for continuous load mode) if client_metrics.get("was_cancelled", False): - error_count = request_count # Mark all as failed if cancelled + logger.info( + f"AI-Perf client {item} was cancelled - anticipated if running with continuous load mode. " + f"Completed {request_count} requests before cancellation." + ) + + # Note: If test was cancelled (was_cancelled=True), we still count the requests + # that were successfully completed before cancellation. The request_count + # represents successful requests, and error_count represents actual errors. + # We don't mark cancelled requests as failed - they were just interrupted. # Validate data consistency if request_count < error_count: diff --git a/tests/fault_tolerance/deploy/scenarios.py b/tests/fault_tolerance/deploy/scenarios.py index 0dc93e384c..8442d1a7d6 100644 --- a/tests/fault_tolerance/deploy/scenarios.py +++ b/tests/fault_tolerance/deploy/scenarios.py @@ -155,14 +155,19 @@ class Load: overflow_request_count: int = 15 # Number of overflow requests normal_request_count: int = 15 # Number of normal requests after overflow + continuous_load: bool = ( + False # If True, use continuous load instead of fixed request count + ) + @dataclass class Failure: time: int - pod_name: str + service_names: list[str] command: str signal: str = "SIGINT" replicas: int = 1 + end_condition: Optional[str] = None # End condition for failure (e.g., "dgd_ready") @dataclass @@ -182,7 +187,7 @@ def __init__( ): super().__init__( time=time, - pod_name="Client", + service_names=["Client"], command="token_overflow", ) self.max_seq_len = max_seq_len @@ -206,7 +211,7 @@ class Scenario: # Helper functions to create deployment specs -def _create_deployment_spec(backend: str, yaml_path: str) -> DeploymentInfo: +def _create_deployment_info(backend: str, yaml_path: str) -> DeploymentInfo: """Create a deployment spec with backend information. Args: @@ -240,7 +245,9 @@ def _set_replicas(deployment_spec, backend, deploy_type, replicas): spec[WORKER_MAP[backend]["prefill"]].replicas = replicas -def _set_tensor_parallel(deployment_spec, backend, deploy_type, tp_size): +def _set_tensor_parallel( + deployment_spec: DeploymentSpec, backend: str, deploy_type: str, tp_size: int +): """Set tensor parallel size for worker components.""" spec = deployment_spec["spec"] @@ -308,7 +315,7 @@ def _create_deployments_for_backend(backend: str) -> Dict[str, DeploymentInfo]: scenario_name = "-".join(name_parts) # Create and configure the deployment - deployment = _create_deployment_spec(backend, yaml_files[deploy_type]) + deployment = _create_deployment_info(backend, yaml_files[deploy_type]) if tp_size > 1: _set_tensor_parallel(deployment, backend, deploy_type, tp_size) if dp_replicas > 1: @@ -397,34 +404,41 @@ def _create_backend_failures(backend, deploy_type="disagg"): process_name = f"dynamo.{backend}" failures = { - "frontend": [Failure(30, "Frontend", "dynamo.frontend")], - "frontend_pod": [Failure(30, "Frontend", "delete_pod")], - "decode_worker": [Failure(30, decode_worker, process_name, "SIGKILL")], - "decode_worker_pod": [Failure(30, decode_worker, "delete_pod")], - "prefill_worker": [Failure(30, prefill_worker, process_name, "SIGKILL")], - "prefill_worker_pod": [Failure(30, prefill_worker, "delete_pod")], + "frontend": [Failure(30, ["Frontend"], "dynamo.frontend")], + "frontend_pod": [Failure(30, ["Frontend"], "delete_pod")], + "decode_worker": [Failure(30, [decode_worker], process_name, "SIGKILL")], + "decode_worker_pod": [Failure(30, [decode_worker], "delete_pod")], + "prefill_worker": [Failure(30, [prefill_worker], process_name, "SIGKILL")], + "prefill_worker_pod": [Failure(30, [prefill_worker], "delete_pod")], "none": [], } if backend == "vllm": failures["vllm_decode_engine_core"] = [ - Failure(30, decode_worker, "VLLM::EngineCore", "SIGKILL") + Failure(30, [decode_worker], "VLLM::EngineCore", "SIGKILL") ] failures["vllm_prefill_engine_core"] = [ - Failure(30, prefill_worker, "VLLM::EngineCore", "SIGKILL") + Failure(30, [prefill_worker], "VLLM::EngineCore", "SIGKILL") ] elif backend == "sglang": failures["sglang_decode_scheduler"] = [ - Failure(30, decode_worker, "sglang::scheduler", "SIGKILL") + Failure(30, [decode_worker], "sglang::scheduler", "SIGKILL") ] failures["sglang_decode_detokenizer"] = [ - Failure(30, decode_worker, "sglang::detokenizer", "SIGKILL") + Failure(30, [decode_worker], "sglang::detokenizer", "SIGKILL") ] failures["sglang_prefill_scheduler"] = [ - Failure(30, prefill_worker, "sglang::scheduler", "SIGKILL") + Failure(30, [prefill_worker], "sglang::scheduler", "SIGKILL") ] failures["sglang_prefill_detokenizer"] = [ - Failure(30, prefill_worker, "sglang::detokenizer", "SIGKILL") + Failure(30, [prefill_worker], "sglang::detokenizer", "SIGKILL") + ] + elif backend == "trtllm": + failures["trtllm_decode_engine_core"] = [ + Failure(30, [decode_worker], "TRTLLM::EngineCore", "SIGKILL") + ] + failures["trtllm_prefill_engine_core"] = [ + Failure(30, [prefill_worker], "TRTLLM::EngineCore", "SIGKILL") ] return failures @@ -533,7 +547,7 @@ def create_legacy_load( # Populate Scenarios -scenarios = {} +scenarios: dict[str, Scenario] = {} # Map of backend+deploy_type to failure definitions backend_failure_map = {} @@ -729,5 +743,65 @@ def add_token_overflow_scenarios(): ) +def add_rolling_upgrade_scenarios(): + for backend in ["vllm", "sglang", "trtllm"]: + for worker_mode in ["agg", "disagg"]: + yaml_files = { + "agg": f"examples/backends/{backend}/deploy/agg.yaml", + "disagg": f"examples/backends/{backend}/deploy/disagg.yaml", + } + deployment_info = _create_deployment_info(backend, yaml_files[worker_mode]) + deployment_spec: DeploymentSpec = deployment_info["spec"] + + service_names: list[str] = [] + + ## TODO: vllm disagg has both decode workers restart at same time but the prefill does one at a time, it also looks like the test exits when the first prefill worker is ready but the second is recreated + ## (Bug in operator?) + ## TODO: maybe add a bit of buffer time after the rolling upgrade is completed (after the DGD is ready again) + + # setting replicas to 2 so we have availability of 1 replica at a time + if worker_mode == "agg" and backend == "trtllm": + service_names.append(WORKER_MAP[backend]["decode_agg"]) + else: + service_names.append(WORKER_MAP[backend]["decode"]) + + if worker_mode == "disagg": + service_names.append(WORKER_MAP[backend]["prefill"]) + + for service_name in service_names: + deployment_spec.set_service_replicas(service_name, 2) + + load = Load( + clients=10, + input_token_length=100, + output_token_length=100, + max_retries=1, + client_type="aiperf", + max_request_rate=1.0, + success_threshold=100.0, + continuous_load=True, + ) + + scenario_name = f"{backend}-{worker_mode}-rolling-upgrade" + model = "Qwen/Qwen3-0.6B" + + failure = Failure( + time=30, + service_names=service_names, + command="rolling_upgrade", + end_condition="dgd_ready", # Wait for DGD to be ready before stopping clients + ) + scenarios[scenario_name] = Scenario( + deployment=deployment_info["spec"], + load=load, + failures=[failure], + model=model, + backend=backend, + ) + + # Add the token overflow scenarios add_token_overflow_scenarios() + +# Add the rolling upgrade scenarios +add_rolling_upgrade_scenarios() diff --git a/tests/fault_tolerance/deploy/test_deployment.py b/tests/fault_tolerance/deploy/test_deployment.py index caeb9039a2..dcbd3f92ff 100644 --- a/tests/fault_tolerance/deploy/test_deployment.py +++ b/tests/fault_tolerance/deploy/test_deployment.py @@ -3,12 +3,16 @@ import logging import multiprocessing +import os import re +import signal import time from contextlib import contextmanager from typing import Any +from multiprocessing.context import SpawnProcess import pytest +from kr8s.objects import Pod from tests.fault_tolerance.deploy.base_checker import ValidationContext from tests.fault_tolerance.deploy.client_factory import get_client_function @@ -17,11 +21,13 @@ from tests.fault_tolerance.deploy.scenarios import ( OVERFLOW_SUFFIX, RECOVERY_SUFFIX, + Failure, Load, + Scenario, TokenOverflowFailure, scenarios, ) -from tests.utils.managed_deployment import ManagedDeployment +from tests.utils.managed_deployment import DeploymentSpec, ManagedDeployment @pytest.fixture @@ -55,18 +61,18 @@ def scenario(scenario_name, client_type): @contextmanager def _clients( - logger, - request, - deployment_spec, - namespace, - model, + logger: logging.Logger, + log_dir: str, + deployment_spec: DeploymentSpec, + namespace: str, + model: str, load_config: Load, ): """Start client processes using factory pattern for client selection. Args: logger: Logger instance - request: Pytest request fixture + log_dir: Log directory for output logs and client logs/artifacts deployment_spec: Deployment specification namespace: Kubernetes namespace model: Model name to test @@ -79,7 +85,7 @@ def _clients( f"Starting {load_config.clients} clients using '{load_config.client_type}' client" ) - procs = [] + procs: list[SpawnProcess] = [] ctx = multiprocessing.get_context("spawn") # Determine retry_delay_or_rate based on client type @@ -90,6 +96,9 @@ def _clients( # AI-Perf client uses retry_delay between attempts (default 5s) retry_delay_or_rate = 5 + # Check if this is a continuous load test (rolling upgrade scenarios) + continuous_load = getattr(load_config, "continuous_load", False) + # Check if this is a mixed token test (overflow + recovery) # If mixed_token_test is True, run two phases; otherwise run normally if hasattr(load_config, "mixed_token_test") and load_config.mixed_token_test: @@ -108,13 +117,14 @@ def _clients( deployment_spec, namespace, model, - request.node.name + OVERFLOW_SUFFIX, + f"{log_dir}{OVERFLOW_SUFFIX}", i, load_config.overflow_request_count, # 15 overflow requests load_config.overflow_token_length, # 2x max_seq_len tokens load_config.output_token_length, load_config.max_retries, retry_delay_or_rate, + continuous_load, ), ) proc_overflow.start() @@ -128,7 +138,7 @@ def _clients( logger.info("Overflow requests completed. Starting recovery phase...") # Second phase: Send normal requests to test recovery - procs_recovery = [] + procs_recovery: list[SpawnProcess] = [] for i in range(load_config.clients): proc_normal = ctx.Process( target=client_func, @@ -136,7 +146,7 @@ def _clients( deployment_spec, namespace, model, - request.node.name + RECOVERY_SUFFIX, + f"{log_dir}{RECOVERY_SUFFIX}", i, load_config.normal_request_count, # 15 normal requests load_config.input_token_length, # Normal token count @@ -161,13 +171,14 @@ def _clients( deployment_spec, namespace, model, - request.node.name, + log_dir, i, load_config.requests_per_client, load_config.input_token_length, load_config.output_token_length, load_config.max_retries, retry_delay_or_rate, + continuous_load, # Pass continuous_load flag ), ) ) @@ -182,13 +193,38 @@ def _clients( logger.debug(f"{proc} joined") -def _inject_failures(failures, logger, deployment: ManagedDeployment): # noqa: F811 - """Inject failures and return info about affected pods. - Returns: - Dict mapping failure info to list of affected pod names - Example: {"VllmDecodeWorker:delete_pod": ["pod-abc123", "pod-xyz789"]} +def _terminate_client_processes( + client_procs: list[SpawnProcess], + logger: logging.Logger, +): """ + Terminate client processes. + """ + # Send SIGINT to client processes to stop continuous load + if client_procs: + logger.info(f"Sending SIGINT to {len(client_procs)} client processes...") + for proc in client_procs: + if proc.is_alive(): + try: + logger.debug(f"Sending SIGINT to client process {proc.pid}") + os.kill(proc.pid, signal.SIGINT) + except ProcessLookupError: + logger.debug(f"Process {proc.pid} already terminated") + except Exception as e: + logger.warning(f"Failed to send SIGINT to process {proc.pid}: {e}") + logger.info( + "SIGINT sent to all client processes, waiting for graceful shutdown..." + ) + else: + logger.warning("No client processes provided to terminate") + + +async def _inject_failures( + failures: list[Failure], + logger: logging.Logger, + deployment: ManagedDeployment, +): # noqa: F811 affected_pods: dict[str, list] = {} for failure in failures: @@ -201,17 +237,19 @@ def _inject_failures(failures, logger, deployment: ManagedDeployment): # noqa: # This is just logging for visibility continue - pods = deployment.get_pods(failure.pod_name)[failure.pod_name] - - num_pods = len(pods) + service_pod_dict = deployment.get_pods(failure.service_names) + pods: list[Pod] = [] + for service_name in failure.service_names: + pods.extend(service_pod_dict[service_name]) if not pods: + logger.warning(f"No pods found for {failure.service_names}") continue replicas = failure.replicas if not replicas: - replicas = num_pods + replicas = len(pods) logger.info(f"Injecting failure for: {failure}") @@ -221,7 +259,7 @@ def _inject_failures(failures, logger, deployment: ManagedDeployment): # noqa: affected_pods[failure_key] = [] for x in range(replicas): - pod = pods[x % num_pods] + pod = pods[x % len(pods)] # Capture the exact pod name before we kill it pod_name = pod.name @@ -242,6 +280,36 @@ def _inject_failures(failures, logger, deployment: ManagedDeployment): # noqa: ) process.kill(failure.signal) + if failure.command == "rolling_upgrade": + await deployment.trigger_rolling_upgrade(failure.service_names) + + # Need to wait for the deployment to be unready so we know the rolling upgrade has started + await deployment.wait_for_unready(timeout=60, log_interval=10) + else: + ## TODO: need to refactor this for service_names being a list + for x in range(replicas): + pod = pods[x % len(pods)] + + if failure.command == "delete_pod": + deployment.get_pod_manifest_logs_metrics( + failure.service_names, pod, ".before_delete" + ) + pod.delete(force=True) # force means no graceful termination + else: + processes = deployment.get_processes(pod) + for process in processes: + if failure.command in process.command: + logger.info( + f"Terminating {failure.service_names} Pid {process.pid} Command {process.command}" + ) + process.kill(failure.signal) + + # Wait until DGD is ready (this means the rolling upgrade is complete) + if failure.end_condition == "dgd_ready": + logger.info("Waiting for DGD to be ready") + await deployment._wait_for_ready(timeout=1800) # 30 minute timeout + logger.info("DGD is ready") + return affected_pods @@ -270,6 +338,9 @@ def validation_context(request, scenario): # noqa: F811 yield context # Test receives this and populates it + # Get log_dir from request.node if available (set by test), otherwise use node.name + base_log_dir = getattr(request.node, "log_dir", request.node.name) + # Determine log paths based on whether this is a mixed token test log_paths = [] test_name = request.node.name @@ -277,8 +348,8 @@ def validation_context(request, scenario): # noqa: F811 if hasattr(scenario.load, "mixed_token_test") and scenario.load.mixed_token_test: # For mixed token tests, we have separate overflow and recovery directories - overflow_dir = f"{request.node.name}{OVERFLOW_SUFFIX}" - recovery_dir = f"{request.node.name}{RECOVERY_SUFFIX}" + overflow_dir = f"{base_log_dir}{OVERFLOW_SUFFIX}" + recovery_dir = f"{base_log_dir}{RECOVERY_SUFFIX}" log_paths = [overflow_dir, recovery_dir] logging.info("Mixed token test detected. Looking for results in:") @@ -286,7 +357,7 @@ def validation_context(request, scenario): # noqa: F811 logging.info(f" - Recovery phase: {recovery_dir}") else: # Standard test with single directory - log_paths = [request.node.name] + log_paths = [base_log_dir] # Use factory to auto-detect and parse results try: @@ -445,11 +516,12 @@ def results_summary(): @pytest.mark.slow @pytest.mark.filterwarnings("ignore::DeprecationWarning") async def test_fault_scenario( - scenario, # noqa: F811 + scenario: Scenario, # noqa: F811 request, - image, - namespace, + image: str, + namespace: str, validation_context, # noqa: F811 # Shared context for passing data to validation + skip_restart_services: bool, ): """ Test dynamo serve deployments with injected failures @@ -498,8 +570,9 @@ async def test_fault_scenario( async with ManagedDeployment( namespace=namespace, - log_dir=request.node.name, + log_dir=request.node.log_dir, deployment_spec=scenario.deployment, + skip_restart_services=skip_restart_services, ) as deployment: # Populate shared context for validation validation_context["deployment"] = deployment @@ -507,14 +580,17 @@ async def test_fault_scenario( with _clients( logger, - request, + request.node.log_dir, scenario.deployment, namespace, model, scenario.load, # Pass entire Load config object - ): + ) as client_procs: # Inject failures and capture which pods were affected affected_pods = _inject_failures(scenario.failures, logger, deployment) - validation_context["affected_pods"] = affected_pods - logger.info(f"Affected pods during test: {affected_pods}") + + await _inject_failures(scenario.failures, logger, deployment) + + if scenario.load.continuous_load: + _terminate_client_processes(client_procs, logger) diff --git a/tests/utils/managed_deployment.py b/tests/utils/managed_deployment.py index 8dd008a61a..47d9a9a685 100644 --- a/tests/utils/managed_deployment.py +++ b/tests/utils/managed_deployment.py @@ -5,6 +5,7 @@ import logging import os import re +import secrets import shlex import time from dataclasses import dataclass, field @@ -14,8 +15,7 @@ import kubernetes import requests import yaml -from kr8s.objects import Pod as kr8s_Pod -from kr8s.objects import Service as kr8s_Service +from kr8s.objects import Pod, Service from kubernetes_asyncio import client, config @@ -314,8 +314,42 @@ def get_logging_config(self) -> dict: return {"jsonl_enabled": jsonl_enabled, "log_level": log_level} + def set_service_env_var(self, service_name: str, name: str, value: str): + """ + Set an environment variable for a specific service + """ + # Check service exists + if service_name not in self._deployment_spec["spec"]["services"]: + raise ValueError(f"Service '{service_name}' not found in deployment spec") + + service = self._deployment_spec["spec"]["services"][service_name] + if "envs" not in service: + service["envs"] = [] + + # if env var already exists, update it + for env in service["envs"]: + if env["name"] == name: + env["value"] = value + return + + # if env var does not exist, add it + service["envs"].append({"name": name, "value": value}) + + def get_service_env_vars(self, service_name: str) -> list[dict]: + """ + Get all environment variables for a specific service + + Returns: + List of environment variable dicts (e.g., [{"name": "VAR", "value": "val"}]) + """ + # Check service exists + if service_name not in self._deployment_spec["spec"]["services"]: + raise ValueError(f"Service '{service_name}' not found in deployment spec") + + return self._deployment_spec["spec"]["services"][service_name].get("envs", []) + @property - def services(self) -> list: + def services(self) -> list[ServiceSpec]: """List of ServiceSpec objects""" return [ ServiceSpec(svc, spec) @@ -384,6 +418,16 @@ def add_arg_to_service(self, service_name: str, arg_name: str, arg_value: str): # Add new argument args_list.extend([arg_name, arg_value]) + def set_service_replicas(self, service_name: str, replicas: int): + """ + Set the number of replicas for a specific service + """ + # Check service exists + if service_name not in self._deployment_spec["spec"]["services"]: + raise ValueError(f"Service '{service_name}' not found in deployment spec") + + self._deployment_spec["spec"]["services"][service_name]["replicas"] = replicas + def save(self, out_file: str): """Save updated deployment to file""" with open(out_file, "w") as f: @@ -391,7 +435,7 @@ def save(self, out_file: str): class PodProcess: - def __init__(self, pod: kr8s_Pod, line: str): + def __init__(self, pod: Pod, line: str): self.pid = int(re.split(r"\s+", line)[1]) self.command = " ".join( re.split(r"\s+", line)[10:] @@ -440,8 +484,9 @@ class ManagedDeployment: deployment_spec: DeploymentSpec namespace: str frontend_service_name: Optional[str] = "Frontend" + skip_restart_services: bool = False - _custom_api: Optional[Any] = None + _custom_api: Optional[client.CustomObjectsApi] = None _core_api: Optional[Any] = None _in_cluster: bool = False _logger: logging.Logger = logging.getLogger() @@ -511,6 +556,17 @@ async def _restart_stateful(self, name, label): self._logger.info(f"Restarted {name} {label}") + async def wait_for_unready(self, timeout: int = 1800, sleep=1, log_interval=60): + """ + Wait for the custom resource to be unready. + + Args: + timeout: Maximum time to wait in seconds, default to 30 mins (image pulling can take a while) + """ + return await self._wait_for_condition( + timeout, sleep, log_interval, False, "pending" + ) + async def _wait_for_ready(self, timeout: int = 1800, sleep=1, log_interval=60): """ Wait for the custom resource to be ready. @@ -518,9 +574,23 @@ async def _wait_for_ready(self, timeout: int = 1800, sleep=1, log_interval=60): Args: timeout: Maximum time to wait in seconds, default to 30 mins (image pulling can take a while) """ + return await self._wait_for_condition( + timeout, sleep, log_interval, True, "successful" + ) + + async def _wait_for_condition( + self, + timeout: int = 1800, + sleep=1, + log_interval=60, + ready_condition_val: bool = True, + state_val: str = "successful", + ): start_time = time.time() - self._logger.info(f"Waiting for Deployment {self._deployment_name}") + self._logger.info( + f"Waiting for Deployment {self._deployment_name} to have Ready condition {ready_condition_val} and state {state_val}" + ) attempt = 0 @@ -542,25 +612,28 @@ async def _wait_for_ready(self, timeout: int = 1800, sleep=1, log_interval=60): conditions = status_obj.get("conditions", []) current_state = status_obj.get("state", "unknown") - ready_condition = False + observed_ready_condition_val = "" for condition in conditions: - if ( - condition.get("type") == "Ready" - and condition.get("status") == "True" - ): - ready_condition = True - break + if condition.get("type") == "Ready": + observed_ready_condition_val = condition.get("status") + if observed_ready_condition_val == str(ready_condition_val): + break - state_successful = status_obj.get("state") == "successful" + observed_state_val = status_obj.get("state") - if ready_condition and state_successful: + if ( + observed_ready_condition_val == str(ready_condition_val) + and observed_state_val == state_val + ): self._logger.info(f"Current deployment state: {current_state}") self._logger.info(f"Current conditions: {conditions}") self._logger.info( f"Elapsed time: {time.time() - start_time:.1f}s / {timeout}s" ) - self._logger.info(f"Deployment {self._deployment_name} is ready") + self._logger.info( + f"Deployment {self._deployment_name} has Ready condition {ready_condition_val} and state {state_val}" + ) return True else: if attempt % log_interval == 0: @@ -570,7 +643,7 @@ async def _wait_for_ready(self, timeout: int = 1800, sleep=1, log_interval=60): f"Elapsed time: {time.time() - start_time:.1f}s / {timeout}s" ) self._logger.info( - f"Deployment not ready yet - Ready condition: {ready_condition}, State successful: {state_successful}" + f"Deployment has Ready condition {observed_ready_condition_val} and state {observed_state_val}, desired condition {ready_condition_val} and state {state_val}" ) except kubernetes.client.rest.ApiException as e: @@ -633,7 +706,40 @@ async def _create_deployment(self): ) raise - def get_processes(self, pod) -> list: + async def trigger_rolling_upgrade(self, service_names: list[str]): + """ + Triggers a rolling update for a list of services + This is a dummy update - sets an env var on the service + """ + + for service_name in service_names: + self.deployment_spec.set_service_env_var( + service_name, "TEST_ROLLING_UPDATE_TRIGGER", secrets.token_hex(8) + ) + + updated_envs = self.deployment_spec.get_service_env_vars(service_name) + + try: + patch_body = { + "spec": {"services": {service_name: {"envs": updated_envs}}} + } + + await self._custom_api.patch_namespaced_custom_object( + group="nvidia.com", + version="v1alpha1", + namespace=self.namespace, + plural="dynamographdeployments", + name=self._deployment_name, + body=patch_body, + _content_type="application/merge-patch+json", + ) + except kubernetes.client.rest.ApiException as e: + self._logger.info( + f"Failed to patch deployment {self._deployment_name}: {e}" + ) + raise + + def get_processes(self, pod: Pod) -> list[PodProcess]: """Get list of processes in the given pod""" result = pod.exec(["ps", "-aux"]) lines = result.stdout.decode().splitlines() @@ -646,38 +752,34 @@ def get_service(self, service_name=None): service_name = "" full_service_name = f"{self._deployment_name}-{service_name.lower()}" - return kr8s_Service.get(full_service_name, namespace=self.namespace) + return Service.get(full_service_name, namespace=self.namespace) - def get_pods(self, service_name=None): - result = {} + def get_pods(self, service_names: list[str] | None = None) -> dict[str, list[Pod]]: + result: dict[str, list[Pod]] = {} - service_list = [] + if not service_names: + service_names = [service.name for service in self.deployment_spec.services] - if not service_name: - service_list = [service.name for service in self.deployment_spec.services] - else: - service_list = [service_name] - - for service in service_list: + for service_name in service_names: # List pods for this service using the selector label # nvidia.com/selector: deployment-name-service label_selector = ( - f"nvidia.com/selector={self._deployment_name}-{service.lower()}" + f"nvidia.com/selector={self._deployment_name}-{service_name.lower()}" ) - pods = [] + pods: list[Pod] = [] for pod in kr8s.get( "pods", namespace=self.namespace, label_selector=label_selector ): pods.append(pod) - result[service] = pods + result[service_name] = pods return result - def get_pod_logs(self, service, pod, suffix=""): - directory = os.path.join(self.log_dir, service) + def get_pod_manifest_logs_metrics(self, service_name: str, pod: Pod, suffix=""): + directory = os.path.join(self.log_dir, service_name) os.makedirs(directory, exist_ok=True) try: @@ -699,16 +801,20 @@ def get_pod_logs(self, service, pod, suffix=""): except Exception as e: self._logger.debug(e) - self._get_pod_metrics(pod, service, suffix) + self._get_pod_metrics(pod, service_name, suffix) def _get_service_logs(self, service_name=None, suffix=""): - service_pods = self.get_pods(service_name) + service_names = None + if service_name: + service_names = [service_name] + + service_pods = self.get_pods(service_names) for service, pods in service_pods.items(): - for i, pod in enumerate(pods): - self.get_pod_logs(service, pod, suffix) + for pod in pods: + self.get_pod_manifest_logs_metrics(service, pod, suffix) - def _get_pod_metrics(self, pod, service_name, suffix=""): + def _get_pod_metrics(self, pod: Pod, service_name: str, suffix=""): directory = os.path.join(self.log_dir, service_name) os.makedirs(directory, exist_ok=True) port = None @@ -761,7 +867,9 @@ async def _delete_deployment(self): if e.status != 404: # Ignore if already deleted raise - def port_forward(self, pod, remote_port, max_connection_attempts=3): + def port_forward( + self, pod: Pod, remote_port: int, max_connection_attempts: int = 3 + ): """Attempt to connect to a pod and return the port-forward object on success. Note: Port forwards run in background threads. When pods are terminated, @@ -866,9 +974,13 @@ async def __aenter__(self): self._deployment_name = self.deployment_spec.name logging.getLogger("httpx").setLevel(logging.WARNING) await self._init_kubernetes() - await self._delete_deployment() - await self._restart_etcd() - await self._restart_nats() + + # Run delete deployment and service restarts in parallel + tasks = [self._delete_deployment()] + if not self.skip_restart_services: + tasks.extend([self._restart_etcd(), self._restart_nats()]) + await asyncio.gather(*tasks) + await self._create_deployment() await self._wait_for_ready() From ad9703612b55c11cded643c4e323bb103c14d938 Mon Sep 17 00:00:00 2001 From: tmontfort Date: Fri, 21 Nov 2025 16:57:18 -0800 Subject: [PATCH 02/15] remove readme --- .../deploy/SCENARIO_ARCHITECTURE.md | 233 ------------------ 1 file changed, 233 deletions(-) delete mode 100644 tests/fault_tolerance/deploy/SCENARIO_ARCHITECTURE.md diff --git a/tests/fault_tolerance/deploy/SCENARIO_ARCHITECTURE.md b/tests/fault_tolerance/deploy/SCENARIO_ARCHITECTURE.md deleted file mode 100644 index 3d63e997f0..0000000000 --- a/tests/fault_tolerance/deploy/SCENARIO_ARCHITECTURE.md +++ /dev/null @@ -1,233 +0,0 @@ -# Fault Tolerance Test Architecture - -This document explains the relationship between **scenarios**, **deployment specs**, **load**, and **failures** in the fault tolerance testing framework. - -## Core Components - -### Scenario -Top-level test configuration combining: -- **Deployment Spec**: Kubernetes deployment (workers, replicas, TP/DP config) -- **Load**: Client load generation (clients, requests, tokens, thresholds) -- **Failures**: List of failures to inject (timing, pod/process targets) -- **Model**: Optional model identifier -- **Backend**: Backend type (vllm, sglang, trtllm) -- **Checkers**: Validation checkers for post-test validation - -### Deployment Spec -Kubernetes deployment configuration loaded from YAML files. Defines worker types, replica counts (data parallelism), tensor parallel size, and backend-specific arguments. - -### Load -Client load configuration: number of clients (default: 10), requests per client (default: 150), input/output token lengths (default: 100), retry settings, client type ("aiperf" or "legacy"), and success threshold (default: 90.0%). - -### Failure -Fault injection definition: `time` (seconds relative to previous failure), `pod_name` (target pod/service), `command` ("delete_pod" or process name), `signal` (SIGKILL, SIGINT, etc.), and `replicas` (number to affect, default: 1). - -## Execution Flow - -### Test Execution Sequence - -1. **Deployment**: The deployment spec is deployed to Kubernetes -2. **Client Launch**: Client processes are started (in parallel) -3. **Load Generation**: Clients send requests while failures are injected -4. **Failure Injection**: Failures are injected sequentially (see below) -5. **Teardown**: Deployment is cleaned up -6. **Validation**: Checkers validate test results - -### Parallel vs Sequential Execution - -#### Can They Be Run in Parallel? - -**Clients run in parallel**: Multiple client processes are spawned concurrently using `multiprocessing.Process`. Each client: -- Selects a frontend pod using round-robin -- Sets up port forwarding -- Generates load independently - -**Failures are injected sequentially**: The `_inject_failures()` function processes failures one at a time: - -```python -def _inject_failures(failures, logger, deployment: ManagedDeployment): - affected_pods = {} - - for failure in failures: - time.sleep(failure.time) # Wait before injecting - # ... inject failure ... - - return affected_pods -``` - -**Load and failures run concurrently**: Clients generate load while failures are being injected. The test uses a context manager pattern: - -```python -with _clients(...): # Clients run in background - # Inject failures while clients are running - affected_pods = _inject_failures(scenario.failures, logger, deployment) -``` - -### Multiple Failures Execution - -**Failures are executed sequentially**, not in parallel. Each failure has a `time` field that specifies how many seconds to wait **after the previous failure** before injecting the current one. - -**Example**: -```python -failures = [ - Failure(time=30, pod_name="VllmDecodeWorker", command="delete_pod"), # After 30s - Failure(time=60, pod_name="Frontend", command="delete_pod"), # After 60s more (90s total) - Failure(time=30, pod_name="VllmPrefillWorker", command="SIGKILL"), # After 30s more (120s total) -] -``` - -**Execution timeline**: -- T=0s: Test starts, clients begin sending requests -- T=30s: First failure injected (delete decode worker pod) -- T=90s: Second failure injected (delete frontend pod) -- T=120s: Third failure injected (kill prefill worker process) - -**Note**: The `time` field is relative to the **previous** failure, not absolute time from test start. - -## Success Criteria - -### Test Success Definition - -A scenario is considered successful if: - -1. **No exceptions raised**: The test completes without raising unhandled exceptions -2. **Validation checkers pass**: All checkers in `scenario.checkers` pass their validation -3. **Success rate threshold met**: The success rate (successful requests / total requests) meets or exceeds `scenario.load.success_threshold` (default: 90.0%) - -### Validation Stages - -Validation happens in **two stages**: - -#### Stage 1: Scenario Verification -Checkers verify that the test scenario executed correctly: -- **PodDeletionChecker**: Verifies pods were actually deleted (via K8s events) -- **ProcessTerminationChecker**: Verifies processes were terminated -- **ContainerRestartChecker**: Verifies containers restarted after failures - -These checkers ensure the failures were actually injected, not just that the code ran. - -#### Stage 2: Results Validation -Checkers verify system behavior based on parsed results: -- **SuccessRateChecker**: Verifies success rate meets threshold -- **RecoveryTimeChecker**: Verifies recovery time is within acceptable bounds -- **NoFailuresChecker**: Verifies no unexpected failures occurred - -### Success Criteria by Component - -| Component | Success Criteria | -|-----------|------------------| -| **Individual Failure** | Not evaluated separately - failures are part of the scenario | -| **Whole Scenario** | All checkers pass AND success rate ≥ threshold AND no exceptions | -| **Test Execution** | No unhandled exceptions during test execution | - -### Failure Handling - -- **If a checker fails**: An `AssertionError` is raised, causing the test to fail -- **If success rate is below threshold**: The `SuccessRateChecker` raises an `AssertionError` -- **If parsing fails**: The test continues but validation is skipped (warning logged) -- **If validation errors occur** (non-assertion exceptions): The test continues but validation is skipped (warning logged) - -### Example Success Flow - -``` -1. Test starts → Deploy deployment spec -2. Clients launch → Generate load in parallel -3. Failures injected → Sequentially at specified times -4. Test completes → Clients finish, deployment torn down -5. Results parsed → Extract metrics from client logs -6. Validation runs: - ✓ PodDeletionChecker: Pod was deleted (K8s events confirm) - ✓ SuccessRateChecker: 95% success rate ≥ 90% threshold - ✓ RecoveryTimeChecker: Recovery time 45s < 60s limit -7. Test passes ✓ -``` - -## Relationship Summary - -``` -Scenario -├── Deployment Spec (what to deploy) -│ ├── Worker types and counts -│ ├── Resource requirements -│ └── Backend configuration -│ -├── Load (how to generate traffic) -│ ├── Client count and concurrency -│ ├── Request parameters -│ └── Success thresholds -│ -└── Failures (what faults to inject) - ├── Failure 1 (time=30s) - ├── Failure 2 (time=60s) - └── Failure 3 (time=30s) - └── Executed sequentially -``` - -## Defined Scenarios - -Scenarios are automatically generated from the Cartesian product of deployments and failures. The naming convention is: `{deployment_name}-{failure_name}`. - -### Deployment Configurations - -**Standard Deployments** (for vllm, sglang, trtllm backends): -- **Aggregated (agg)**: `{backend}-agg-tp-{1|2|4}-dp-{1|2}` -- **Disaggregated (disagg)**: `{backend}-disagg-prefill-tp-{1|2|4}-decode-tp-{1|2|4}-dp-{1}` - -**MoE Deployments** (vllm only): -- `vllm-moe-agg-tp-1-dp-2` -- `vllm-moe-disagg-tp-1-dp-2` - -**Total Standard Deployments**: 3 backends × 2 types × 4 configs = 24 (some disagg configs skipped when TP>1 and DP>1) - -### Failure Types - -**Common Failures** (all backends): -- `none`: No failure injection (baseline test) -- `frontend`: Terminate frontend process (SIGINT to `dynamo.frontend`) -- `frontend_pod`: Delete frontend pod -- `decode_worker`: Terminate decode worker process (SIGKILL to `dynamo.{backend}`) -- `decode_worker_pod`: Delete decode worker pod -- `prefill_worker`: Terminate prefill worker process (SIGKILL to `dynamo.{backend}`) - disagg only -- `prefill_worker_pod`: Delete prefill worker pod - disagg only - -**vLLM-Specific Failures**: -- `vllm_decode_engine_core`: Kill VLLM::EngineCore process in decode worker -- `vllm_prefill_engine_core`: Kill VLLM::EngineCore process in prefill worker - disagg only - -**SGLang-Specific Failures**: -- `sglang_decode_scheduler`: Kill sglang::scheduler process in decode worker -- `sglang_decode_detokenizer`: Kill sglang::detokenizer process in decode worker -- `sglang_prefill_scheduler`: Kill sglang::scheduler process in prefill worker - disagg only -- `sglang_prefill_detokenizer`: Kill sglang::detokenizer process in prefill worker - disagg only - -**Token Overflow Scenarios**: -- `vllm_agg_token_overflow_2x` -- `vllm_disagg_token_overflow_2x` -- `trtllm_agg_token_overflow_2x` -- `trtllm_disagg_token_overflow_2x` -- `sglang_agg_token_overflow_2x` -- `sglang_disagg_token_overflow_2x` - -### Example Scenario Names - -- `vllm-agg-tp-1-dp-1-frontend_pod`: vLLM aggregated, TP=1, DP=1, delete frontend pod -- `sglang-disagg-prefill-tp-2-decode-tp-2-dp-1-decode_worker`: SGLang disaggregated, TP=2, delete decode worker pod -- `trtllm-agg-tp-4-dp-1-none`: TRT-LLM aggregated, TP=4, DP=1, no failures (baseline) -- `vllm-moe-agg-tp-1-dp-2-decode_worker_pod`: vLLM MoE aggregated, delete decode worker pod - -### Total Scenario Count - -Approximately **200+ scenarios** generated from: -- 24 standard deployments × ~7-9 failures per deployment -- 2 MoE deployments × ~7 failures -- 6 token overflow scenarios - -## Key Takeaways - -1. **Scenarios combine deployment, load, and failures** into a single test configuration -2. **Clients run in parallel**, but **failures are injected sequentially** -3. **Load and failures run concurrently** - failures are injected while clients are generating load -4. **Success is evaluated at the scenario level**, not per failure -5. **Success requires**: No exceptions + All checkers pass + Success rate ≥ threshold -6. **Multiple failures use relative timing** - each failure's `time` is relative to the previous one - From 3dc86c57b394fc103fd4ee258d94113e9e3b1d72 Mon Sep 17 00:00:00 2001 From: tmontfort Date: Mon, 24 Nov 2025 12:18:30 -0500 Subject: [PATCH 03/15] patch once for multi-service rolling upgrades --- tests/utils/managed_deployment.py | 42 +++++++++++++++++-------------- 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/tests/utils/managed_deployment.py b/tests/utils/managed_deployment.py index 47d9a9a685..24887b2efa 100644 --- a/tests/utils/managed_deployment.py +++ b/tests/utils/managed_deployment.py @@ -712,32 +712,36 @@ async def trigger_rolling_upgrade(self, service_names: list[str]): This is a dummy update - sets an env var on the service """ + if not service_names: + raise ValueError( + "service_names cannot be empty for trigger_rolling_upgrade" + ) + + patch_body = {"spec": {"services": {}}} + for service_name in service_names: self.deployment_spec.set_service_env_var( service_name, "TEST_ROLLING_UPDATE_TRIGGER", secrets.token_hex(8) ) updated_envs = self.deployment_spec.get_service_env_vars(service_name) + patch_body["spec"]["services"][service_name] = {"envs": updated_envs} - try: - patch_body = { - "spec": {"services": {service_name: {"envs": updated_envs}}} - } - - await self._custom_api.patch_namespaced_custom_object( - group="nvidia.com", - version="v1alpha1", - namespace=self.namespace, - plural="dynamographdeployments", - name=self._deployment_name, - body=patch_body, - _content_type="application/merge-patch+json", - ) - except kubernetes.client.rest.ApiException as e: - self._logger.info( - f"Failed to patch deployment {self._deployment_name}: {e}" - ) - raise + try: + await self._custom_api.patch_namespaced_custom_object( + group="nvidia.com", + version="v1alpha1", + namespace=self.namespace, + plural="dynamographdeployments", + name=self._deployment_name, + body=patch_body, + _content_type="application/merge-patch+json", + ) + except kubernetes.client.rest.ApiException as e: + self._logger.info( + f"Failed to patch deployment {self._deployment_name}: {e}" + ) + raise def get_processes(self, pod: Pod) -> list[PodProcess]: """Get list of processes in the given pod""" From 2ed1a05969a0a9b39ba4bb1a82d0b6acf4730aaf Mon Sep 17 00:00:00 2001 From: tmontfort Date: Mon, 24 Nov 2025 13:00:21 -0500 Subject: [PATCH 04/15] make linter happy and refactor for traditional failure scenarios --- tests/fault_tolerance/deploy/scenarios.py | 15 +++---- .../fault_tolerance/deploy/test_deployment.py | 44 +++++++++---------- tests/utils/managed_deployment.py | 29 ++++++------ 3 files changed, 43 insertions(+), 45 deletions(-) diff --git a/tests/fault_tolerance/deploy/scenarios.py b/tests/fault_tolerance/deploy/scenarios.py index 8442d1a7d6..8a0ff2f7ab 100644 --- a/tests/fault_tolerance/deploy/scenarios.py +++ b/tests/fault_tolerance/deploy/scenarios.py @@ -18,7 +18,7 @@ from enum import Enum, auto from typing import TYPE_CHECKING, Dict, List, Optional, Pattern -from typing_extensions import TypedDict +from typing_extensions import Required, TypedDict from tests.utils.managed_deployment import DeploymentSpec @@ -54,8 +54,8 @@ class DeploymentInfo(TypedDict, total=False): is_moe: Optional flag indicating if this is a Mixture-of-Experts model """ - spec: DeploymentSpec - backend: str + spec: Required[DeploymentSpec] + backend: Required[str] model: str is_moe: bool @@ -166,8 +166,9 @@ class Failure: service_names: list[str] command: str signal: str = "SIGINT" - replicas: int = 1 - end_condition: Optional[str] = None # End condition for failure (e.g., "dgd_ready") + + # End condition for failure (e.g., "dgd_ready") + end_condition: Optional[str] = None @dataclass @@ -246,7 +247,7 @@ def _set_replicas(deployment_spec, backend, deploy_type, replicas): def _set_tensor_parallel( - deployment_spec: DeploymentSpec, backend: str, deploy_type: str, tp_size: int + deployment_spec: DeploymentInfo, backend: str, deploy_type: str, tp_size: int ): """Set tensor parallel size for worker components.""" spec = deployment_spec["spec"] @@ -755,8 +756,6 @@ def add_rolling_upgrade_scenarios(): service_names: list[str] = [] - ## TODO: vllm disagg has both decode workers restart at same time but the prefill does one at a time, it also looks like the test exits when the first prefill worker is ready but the second is recreated - ## (Bug in operator?) ## TODO: maybe add a bit of buffer time after the rolling upgrade is completed (after the DGD is ready again) # setting replicas to 2 so we have availability of 1 replica at a time diff --git a/tests/fault_tolerance/deploy/test_deployment.py b/tests/fault_tolerance/deploy/test_deployment.py index dcbd3f92ff..0ac6957908 100644 --- a/tests/fault_tolerance/deploy/test_deployment.py +++ b/tests/fault_tolerance/deploy/test_deployment.py @@ -207,8 +207,11 @@ def _terminate_client_processes( for proc in client_procs: if proc.is_alive(): try: - logger.debug(f"Sending SIGINT to client process {proc.pid}") - os.kill(proc.pid, signal.SIGINT) + if proc.pid is not None: + logger.debug(f"Sending SIGINT to client process {proc.pid}") + os.kill(proc.pid, signal.SIGINT) + else: + raise ValueError(f"Process {proc} has no PID") except ProcessLookupError: logger.debug(f"Process {proc.pid} already terminated") except Exception as e: @@ -246,11 +249,6 @@ async def _inject_failures( logger.warning(f"No pods found for {failure.service_names}") continue - replicas = failure.replicas - - if not replicas: - replicas = len(pods) - logger.info(f"Injecting failure for: {failure}") # Track which pods were affected by this failure @@ -286,23 +284,21 @@ async def _inject_failures( # Need to wait for the deployment to be unready so we know the rolling upgrade has started await deployment.wait_for_unready(timeout=60, log_interval=10) else: - ## TODO: need to refactor this for service_names being a list - for x in range(replicas): - pod = pods[x % len(pods)] - - if failure.command == "delete_pod": - deployment.get_pod_manifest_logs_metrics( - failure.service_names, pod, ".before_delete" - ) - pod.delete(force=True) # force means no graceful termination - else: - processes = deployment.get_processes(pod) - for process in processes: - if failure.command in process.command: - logger.info( - f"Terminating {failure.service_names} Pid {process.pid} Command {process.command}" - ) - process.kill(failure.signal) + for service_name, pods in service_pod_dict.items(): + for pod in pods: + if failure.command == "delete_pod": + deployment.get_pod_manifest_logs_metrics( + service_name, pod, ".before_delete" + ) + pod.delete(force=True) # force means no graceful termination + else: + processes = deployment.get_processes(pod) + for process in processes: + if failure.command in process.command: + logger.info( + f"Terminating {failure.service_names} Pid {process.pid} Command {process.command}" + ) + process.kill(failure.signal) # Wait until DGD is ready (this means the rolling upgrade is complete) if failure.end_condition == "dgd_ready": diff --git a/tests/utils/managed_deployment.py b/tests/utils/managed_deployment.py index 24887b2efa..638f55f406 100644 --- a/tests/utils/managed_deployment.py +++ b/tests/utils/managed_deployment.py @@ -12,11 +12,11 @@ from typing import Any, List, Optional import kr8s -import kubernetes import requests import yaml from kr8s.objects import Pod, Service from kubernetes_asyncio import client, config +from kubernetes_asyncio.client import exceptions def _get_workspace_dir() -> str: @@ -483,7 +483,9 @@ class ManagedDeployment: log_dir: str deployment_spec: DeploymentSpec namespace: str - frontend_service_name: Optional[str] = "Frontend" + # TODO: this should be determined by the deployment_spec + # the service containing component_type: Frontend determines what is actually the frontend service + frontend_service_name: str = "Frontend" skip_restart_services: bool = False _custom_api: Optional[client.CustomObjectsApi] = None @@ -502,7 +504,7 @@ async def _init_kubernetes(self): """Initialize kubernetes client""" try: # Try in-cluster config first (for pods with service accounts) - await config.load_incluster_config() + config.load_incluster_config() self._in_cluster = True except Exception: # Fallback to kube config file (for local development) @@ -598,7 +600,7 @@ async def _wait_for_condition( try: attempt += 1 assert self._custom_api is not None, "Kubernetes API not initialized" - status = await self._custom_api.get_namespaced_custom_object( + status = await self._custom_api.get_namespaced_custom_object( # type: ignore[awaitable-is-not-coroutine] group="nvidia.com", version="v1alpha1", namespace=self.namespace, @@ -608,9 +610,9 @@ async def _wait_for_condition( # Check both conditions: # 1. Ready condition is True # 2. State is successful - status_obj = status.get("status", {}) - conditions = status_obj.get("conditions", []) - current_state = status_obj.get("state", "unknown") + status_obj = status.get("status", {}) # type: ignore[attr-defined] + conditions = status_obj.get("conditions", []) # type: ignore[attr-defined] + current_state = status_obj.get("state", "unknown") # type: ignore[attr-defined] observed_ready_condition_val = "" for condition in conditions: @@ -619,7 +621,7 @@ async def _wait_for_condition( if observed_ready_condition_val == str(ready_condition_val): break - observed_state_val = status_obj.get("state") + observed_state_val = status_obj.get("state") # type: ignore[attr-defined] if ( observed_ready_condition_val == str(ready_condition_val) @@ -646,7 +648,7 @@ async def _wait_for_condition( f"Deployment has Ready condition {observed_ready_condition_val} and state {observed_state_val}, desired condition {ready_condition_val} and state {state_val}" ) - except kubernetes.client.rest.ApiException as e: + except exceptions.ApiException as e: self._logger.info( f"API Exception while checking deployment status: {e}" ) @@ -697,7 +699,7 @@ async def _create_deployment(self): ) self._logger.info(self.deployment_spec.spec()) self._logger.info(f"Deployment Started {self._deployment_name}") - except kubernetes.client.rest.ApiException as e: + except exceptions.ApiException as e: if e.status == 409: # Already exists self._logger.info(f"Deployment {self._deployment_name} already exists") else: @@ -728,6 +730,7 @@ async def trigger_rolling_upgrade(self, service_names: list[str]): patch_body["spec"]["services"][service_name] = {"envs": updated_envs} try: + assert self._custom_api is not None, "Kubernetes API not initialized" await self._custom_api.patch_namespaced_custom_object( group="nvidia.com", version="v1alpha1", @@ -737,7 +740,7 @@ async def trigger_rolling_upgrade(self, service_names: list[str]): body=patch_body, _content_type="application/merge-patch+json", ) - except kubernetes.client.rest.ApiException as e: + except exceptions.ApiException as e: self._logger.info( f"Failed to patch deployment {self._deployment_name}: {e}" ) @@ -776,7 +779,7 @@ def get_pods(self, service_names: list[str] | None = None) -> dict[str, list[Pod for pod in kr8s.get( "pods", namespace=self.namespace, label_selector=label_selector ): - pods.append(pod) + pods.append(pod) # type: ignore[arg-type] result[service_name] = pods @@ -867,7 +870,7 @@ async def _delete_deployment(self): plural="dynamographdeployments", name=self._deployment_name, ) - except client.exceptions.ApiException as e: + except exceptions.ApiException as e: if e.status != 404: # Ignore if already deleted raise From 032c6bf33d59629c1b2fa8abfe5ff88027418592 Mon Sep 17 00:00:00 2001 From: tmontfort Date: Mon, 24 Nov 2025 13:09:56 -0500 Subject: [PATCH 05/15] add continous_load field to legacy client --- tests/fault_tolerance/deploy/legacy_client.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/fault_tolerance/deploy/legacy_client.py b/tests/fault_tolerance/deploy/legacy_client.py index 3200e48850..668145838c 100644 --- a/tests/fault_tolerance/deploy/legacy_client.py +++ b/tests/fault_tolerance/deploy/legacy_client.py @@ -192,6 +192,7 @@ def client( max_retries, max_request_rate, retry_delay=1, + continuous_load=False, ): """Legacy custom client for fault tolerance testing. @@ -211,7 +212,11 @@ def client( max_retries: Maximum retry attempts per request max_request_rate: Maximum requests per second (for rate limiting) retry_delay: Delay in seconds between retries + continuous_load: If True, use continuous load instead of fixed request count """ + if continuous_load: + raise ValueError("Continuous load is not supported for legacy client") + logger = logging.getLogger(f"CLIENT: {index}") logging.getLogger("httpx").setLevel(logging.WARNING) From ef51e5e6ea991b025be927cc70e394d8140e0399 Mon Sep 17 00:00:00 2001 From: tmontfort Date: Mon, 24 Nov 2025 13:22:50 -0500 Subject: [PATCH 06/15] unify subprocess launch and signal handling --- tests/fault_tolerance/deploy/client.py | 29 +++++++------------------- 1 file changed, 8 insertions(+), 21 deletions(-) diff --git a/tests/fault_tolerance/deploy/client.py b/tests/fault_tolerance/deploy/client.py index b457788954..e2e09b6fb1 100644 --- a/tests/fault_tolerance/deploy/client.py +++ b/tests/fault_tolerance/deploy/client.py @@ -289,7 +289,7 @@ def run_aiperf( logger: Logger instance max_retries: Maximum number of retry attempts (default: 1) retry_delay: Delay in seconds between retries (default: 1) - continous_load: If True, use continuous load instead of fixed request count + continuous_load: If True, use continuous load instead of fixed request count Returns: True if successful, False otherwise @@ -339,17 +339,12 @@ def run_aiperf( "100", # For reproducible results ] - # Add request parameters based on continuous load mode if continuous_load: - # Use benchmark duration for continuous load cmd.extend(["--benchmark-duration", "1800"]) # 30 minutes for continuous load logger.info("Using continuous load with duration: 30 minutes") - # Set a very long timeout for duration-based tests timeout = 1860 # 31 minutes default for duration-based tests (30 minutes + 1 minute buffer) else: - # Normal mode - use requests_per_client cmd.extend(["--request-count", str(requests_per_client)]) - # Calculate timeout (same as legacy would for all requests) timeout = max(requests_per_client * 2 + 60, 300) # At least 5 minutes # Log execution @@ -390,17 +385,7 @@ def run_aiperf( cmd_attempt[artifact_dir_idx] = str(attempt_dir) try: - if continuous_load: - result = run_continuous_load_process(cmd_attempt, logger, timeout) - else: - # Normal mode - use subprocess.run - result = subprocess.run( - cmd_attempt, - capture_output=True, - text=True, - timeout=timeout, - stdin=subprocess.DEVNULL, # Prevent stdin reading which can cause process suspension - ) + result = run_aiperf_with_signal_handling(cmd_attempt, logger, timeout) # Save logs for this attempt with open(attempt_dir / "genai_perf.log", "w") as f: @@ -418,7 +403,7 @@ def run_aiperf( } ) - # Even with continous load, with SIGINT, aiperf should return 0 and create the profile_export_aiperf.json file + # Even with continuous load, with SIGINT, aiperf should return 0 and create the profile_export_aiperf.json file if result.returncode == 0: # AI-Perf returns 0 even if all requests failed, so we need to check the output json_path = attempt_dir / "profile_export_aiperf.json" @@ -475,15 +460,17 @@ def run_aiperf( return success -def run_continuous_load_process( +def run_aiperf_with_signal_handling( cmd_attempt: List[str], logger: logging.Logger, timeout: int, ) -> subprocess.CompletedProcess: """ - Run aiperf in continuous load mode and return the result. + Run aiperf with signal handling for graceful shutdown. - Handles SIGINT and timeout when running with subprocess.Popen. + Handles SIGINT forwarding and timeout when running with subprocess.Popen. + This ensures that Ctrl-C and SIGINT are properly forwarded to the subprocess + so it can clean up gracefully and write results files. """ proc = subprocess.Popen( cmd_attempt, From b406cf287b70142bb8c3ac2996c50ec7e218942b Mon Sep 17 00:00:00 2001 From: tmontfort Date: Mon, 24 Nov 2025 14:13:00 -0500 Subject: [PATCH 07/15] make failures more generic --- tests/fault_tolerance/deploy/scenarios.py | 205 ++++++++++++++++-- .../fault_tolerance/deploy/test_deployment.py | 79 +------ tests/utils/managed_deployment.py | 19 +- 3 files changed, 202 insertions(+), 101 deletions(-) diff --git a/tests/fault_tolerance/deploy/scenarios.py b/tests/fault_tolerance/deploy/scenarios.py index 8a0ff2f7ab..746889a932 100644 --- a/tests/fault_tolerance/deploy/scenarios.py +++ b/tests/fault_tolerance/deploy/scenarios.py @@ -13,14 +13,16 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging import re from dataclasses import dataclass, field +from abc import ABC, abstractmethod from enum import Enum, auto from typing import TYPE_CHECKING, Dict, List, Optional, Pattern from typing_extensions import Required, TypedDict -from tests.utils.managed_deployment import DeploymentSpec +from tests.utils.managed_deployment import DeploymentSpec, ManagedDeployment if TYPE_CHECKING: from tests.fault_tolerance.deploy.base_checker import BaseChecker @@ -161,14 +163,131 @@ class Load: @dataclass -class Failure: +class Failure(ABC): + """Base class for all failure types.""" + + # time to wait in seconds before the failure is injected time: int + + # names of DGD services to inject the failure into the corresponding pods for service_names: list[str] - command: str - signal: str = "SIGINT" - # End condition for failure (e.g., "dgd_ready") - end_condition: Optional[str] = None + @abstractmethod + async def execute( + self, deployment: ManagedDeployment, logger: logging.Logger + ) -> list[str]: + """Execute the failure injection. + + Args: + deployment: The managed deployment to inject the failure into + logger: Logger instance for logging failure injection + + Returns: List of affected pod names + """ + pass + + @abstractmethod + def get_failure_key(self) -> str: + """Get the failure key for the failure.""" + pass + + +@dataclass +class RollingUpgradeFailure(Failure): + """Failure type for triggering rolling upgrades.""" + + async def execute( + self, deployment: ManagedDeployment, logger: logging.Logger + ) -> None: + """Execute rolling upgrade failure injection.""" + await deployment.trigger_rolling_upgrade(self.service_names) + + # Need to wait for the deployment to be unready so we know the rolling upgrade has started + await deployment.wait_for_unready(timeout=60, log_interval=10) + + await deployment._wait_for_ready(timeout=1800) # 30 minute timeout + + return await deployment.get_pod_names(self.service_names) + + def get_failure_key(self) -> str: + """Get the failure key for the rolling upgrade failure.""" + return f"rolling_upgrade:{','.join(self.service_names)}" + + +@dataclass +class DeletePodFailure(Failure): + """Failure type for deleting pods.""" + + async def execute( + self, deployment: ManagedDeployment, logger: logging.Logger + ) -> None: + """Execute pod deletion failure injection.""" + service_pod_dict = deployment.get_pods(self.service_names) + pod_names: list[str] = [] + for service_name, pods in service_pod_dict.items(): + for pod in pods: + deployment.get_pod_manifest_logs_metrics( + service_name, pod, ".before_delete" + ) + pod.delete(force=True) # force means no graceful termination + pod_names.append(pod.name) + + def get_failure_key(self) -> str: + """Get the failure key for the delete pod failure.""" + return f"delete_pod:{','.join(self.service_names)}" + + +class TerminateProcessFailure(Failure): + """Failure type for terminating specific processes by name.""" + + def __init__( + self, + time: int, + service_names: list[str], + signal: str = "SIGINT", + process_name: str = "", + ): + """Initialize TerminateProcessFailure. + + Args: + time: Time to wait in seconds before the failure is injected + service_names: Names of DGD services to inject the failure into + signal: Signal to send (default: "SIGINT") + process_name: Name of the process to terminate (required) + end_condition: End condition for failure (e.g., "dgd_ready") + """ + super().__init__( + time=time, + service_names=service_names, + ) + if not process_name or not signal: + raise ValueError( + "process_name and signal are required for TerminateProcessFailure" + ) + self.process_name = process_name + self.signal = signal + + async def execute( + self, deployment: ManagedDeployment, logger: logging.Logger + ) -> None: + """Execute process termination failure injection.""" + service_pod_dict = deployment.get_pods(self.service_names) + pod_names: list[str] = [] + for service_name, pods in service_pod_dict.items(): + for pod in pods: + processes = deployment.get_processes(pod) + for process in processes: + if self.process_name in process.command: + logger.info( + f"Terminating {service_name} pod {pod} Pid {process.pid} Command {process.command}" + ) + process.kill(self.signal) + pod_names.append(pod.name) + return pod_names + + def get_failure_key(self) -> str: + """Get the failure key for the terminate process failure.""" + return f"terminate_process:{','.join(self.service_names)}:{self.process_name}:{self.signal}" @dataclass @@ -189,12 +308,24 @@ def __init__( super().__init__( time=time, service_names=["Client"], - command="token_overflow", ) self.max_seq_len = max_seq_len self.overflow_multiplier = overflow_multiplier self.overflow_token_count = int(max_seq_len * overflow_multiplier) + async def execute( + self, deployment: ManagedDeployment, logger: logging.Logger + ) -> None: + """Token overflow is handled client-side, so this is a no-op.""" + # The actual overflow is handled by the client configuration + # which uses the input_token_length from the Load config + # This is just a placeholder for the abstract method + return [] + + def get_failure_key(self) -> str: + """Get the failure key for the token overflow failure.""" + return f"token_overflow:{self.overflow_token_count}" + @dataclass class Scenario: @@ -405,41 +536,69 @@ def _create_backend_failures(backend, deploy_type="disagg"): process_name = f"dynamo.{backend}" failures = { - "frontend": [Failure(30, ["Frontend"], "dynamo.frontend")], - "frontend_pod": [Failure(30, ["Frontend"], "delete_pod")], - "decode_worker": [Failure(30, [decode_worker], process_name, "SIGKILL")], - "decode_worker_pod": [Failure(30, [decode_worker], "delete_pod")], - "prefill_worker": [Failure(30, [prefill_worker], process_name, "SIGKILL")], - "prefill_worker_pod": [Failure(30, [prefill_worker], "delete_pod")], + "frontend": [ + TerminateProcessFailure( + 30, ["Frontend"], "SIGINT", process_name="dynamo.frontend" + ) + ], + "frontend_pod": [DeletePodFailure(30, ["Frontend"])], + "decode_worker": [ + TerminateProcessFailure( + 30, [decode_worker], "SIGKILL", process_name=process_name + ) + ], + "decode_worker_pod": [DeletePodFailure(30, [decode_worker])], + "prefill_worker": [ + TerminateProcessFailure( + 30, [prefill_worker], "SIGKILL", process_name=process_name + ) + ], + "prefill_worker_pod": [DeletePodFailure(30, [prefill_worker])], "none": [], } if backend == "vllm": failures["vllm_decode_engine_core"] = [ - Failure(30, [decode_worker], "VLLM::EngineCore", "SIGKILL") + TerminateProcessFailure( + 30, [decode_worker], "SIGKILL", process_name="VLLM::EngineCore" + ) ] failures["vllm_prefill_engine_core"] = [ - Failure(30, [prefill_worker], "VLLM::EngineCore", "SIGKILL") + TerminateProcessFailure( + 30, [prefill_worker], "SIGKILL", process_name="VLLM::EngineCore" + ) ] elif backend == "sglang": failures["sglang_decode_scheduler"] = [ - Failure(30, [decode_worker], "sglang::scheduler", "SIGKILL") + TerminateProcessFailure( + 30, [decode_worker], "SIGKILL", process_name="sglang::scheduler" + ) ] failures["sglang_decode_detokenizer"] = [ - Failure(30, [decode_worker], "sglang::detokenizer", "SIGKILL") + TerminateProcessFailure( + 30, [decode_worker], "SIGKILL", process_name="sglang::detokenizer" + ) ] failures["sglang_prefill_scheduler"] = [ - Failure(30, [prefill_worker], "sglang::scheduler", "SIGKILL") + TerminateProcessFailure( + 30, [prefill_worker], "SIGKILL", process_name="sglang::scheduler" + ) ] failures["sglang_prefill_detokenizer"] = [ - Failure(30, [prefill_worker], "sglang::detokenizer", "SIGKILL") + TerminateProcessFailure( + 30, [prefill_worker], "SIGKILL", process_name="sglang::detokenizer" + ) ] elif backend == "trtllm": failures["trtllm_decode_engine_core"] = [ - Failure(30, [decode_worker], "TRTLLM::EngineCore", "SIGKILL") + TerminateProcessFailure( + 30, [decode_worker], "SIGKILL", process_name="TRTLLM::EngineCore" + ) ] failures["trtllm_prefill_engine_core"] = [ - Failure(30, [prefill_worker], "TRTLLM::EngineCore", "SIGKILL") + TerminateProcessFailure( + 30, [prefill_worker], "SIGKILL", process_name="TRTLLM::EngineCore" + ) ] return failures @@ -784,11 +943,9 @@ def add_rolling_upgrade_scenarios(): scenario_name = f"{backend}-{worker_mode}-rolling-upgrade" model = "Qwen/Qwen3-0.6B" - failure = Failure( + failure = RollingUpgradeFailure( time=30, service_names=service_names, - command="rolling_upgrade", - end_condition="dgd_ready", # Wait for DGD to be ready before stopping clients ) scenarios[scenario_name] = Scenario( deployment=deployment_info["spec"], diff --git a/tests/fault_tolerance/deploy/test_deployment.py b/tests/fault_tolerance/deploy/test_deployment.py index 0ac6957908..04137cacaa 100644 --- a/tests/fault_tolerance/deploy/test_deployment.py +++ b/tests/fault_tolerance/deploy/test_deployment.py @@ -12,7 +12,6 @@ from multiprocessing.context import SpawnProcess import pytest -from kr8s.objects import Pod from tests.fault_tolerance.deploy.base_checker import ValidationContext from tests.fault_tolerance.deploy.client_factory import get_client_function @@ -24,7 +23,6 @@ Failure, Load, Scenario, - TokenOverflowFailure, scenarios, ) from tests.utils.managed_deployment import DeploymentSpec, ManagedDeployment @@ -227,84 +225,15 @@ async def _inject_failures( failures: list[Failure], logger: logging.Logger, deployment: ManagedDeployment, -): # noqa: F811 +) -> dict[str, list]: # noqa: F811 affected_pods: dict[str, list] = {} for failure in failures: time.sleep(failure.time) - # Handle TokenOverflowFailure differently - it's a client-side injection - if isinstance(failure, TokenOverflowFailure): - # The actual overflow is handled by the client configuration - # which uses the input_token_length from the Load config - # This is just logging for visibility - continue - - service_pod_dict = deployment.get_pods(failure.service_names) - pods: list[Pod] = [] - for service_name in failure.service_names: - pods.extend(service_pod_dict[service_name]) - - if not pods: - logger.warning(f"No pods found for {failure.service_names}") - continue - logger.info(f"Injecting failure for: {failure}") - # Track which pods were affected by this failure - failure_key = f"{failure.pod_name}:{failure.command}" - if failure_key not in affected_pods: - affected_pods[failure_key] = [] - - for x in range(replicas): - pod = pods[x % len(pods)] - - # Capture the exact pod name before we kill it - pod_name = pod.name - affected_pods[failure_key].append(pod_name) - - logger.info(f"Target pod for failure: {pod_name}") - - if failure.command == "delete_pod": - deployment.get_pod_logs(failure.pod_name, pod, ".before_delete") - logger.info(f"Deleting pod: {pod_name}") - pod.delete(force=True) - else: - processes = deployment.get_processes(pod) - for process in processes: - if failure.command in process.command: - logger.info( - f"Terminating {failure.pod_name} Pid {process.pid} Command {process.command} in pod {pod_name}" - ) - process.kill(failure.signal) - - if failure.command == "rolling_upgrade": - await deployment.trigger_rolling_upgrade(failure.service_names) - - # Need to wait for the deployment to be unready so we know the rolling upgrade has started - await deployment.wait_for_unready(timeout=60, log_interval=10) - else: - for service_name, pods in service_pod_dict.items(): - for pod in pods: - if failure.command == "delete_pod": - deployment.get_pod_manifest_logs_metrics( - service_name, pod, ".before_delete" - ) - pod.delete(force=True) # force means no graceful termination - else: - processes = deployment.get_processes(pod) - for process in processes: - if failure.command in process.command: - logger.info( - f"Terminating {failure.service_names} Pid {process.pid} Command {process.command}" - ) - process.kill(failure.signal) - - # Wait until DGD is ready (this means the rolling upgrade is complete) - if failure.end_condition == "dgd_ready": - logger.info("Waiting for DGD to be ready") - await deployment._wait_for_ready(timeout=1800) # 30 minute timeout - logger.info("DGD is ready") + affected_pods[failure.get_failure_key()] = await failure.execute(deployment, logger) return affected_pods @@ -583,10 +512,8 @@ async def test_fault_scenario( scenario.load, # Pass entire Load config object ) as client_procs: # Inject failures and capture which pods were affected - affected_pods = _inject_failures(scenario.failures, logger, deployment) + affected_pods = await _inject_failures(scenario.failures, logger, deployment) logger.info(f"Affected pods during test: {affected_pods}") - await _inject_failures(scenario.failures, logger, deployment) - if scenario.load.continuous_load: _terminate_client_processes(client_procs, logger) diff --git a/tests/utils/managed_deployment.py b/tests/utils/managed_deployment.py index 638f55f406..c47bdd5729 100644 --- a/tests/utils/managed_deployment.py +++ b/tests/utils/managed_deployment.py @@ -489,7 +489,7 @@ class ManagedDeployment: skip_restart_services: bool = False _custom_api: Optional[client.CustomObjectsApi] = None - _core_api: Optional[Any] = None + _core_api: Optional[client.CoreV1Api] = None _in_cluster: bool = False _logger: logging.Logger = logging.getLogger() _port_forward: Optional[Any] = None @@ -745,6 +745,23 @@ async def trigger_rolling_upgrade(self, service_names: list[str]): f"Failed to patch deployment {self._deployment_name}: {e}" ) raise + + async def get_pod_names(self, service_names: list[str] | None = None) -> list[str]: + if not service_names: + service_names = [service.name for service in self.deployment_spec.services] + + pod_names: list[str] = [] + + for service_name in service_names: + label_selector = ( + f"nvidia.com/selector={self._deployment_name}-{service_name.lower()}" + ) + assert self._core_api is not None, "Kubernetes API not initialized" + pods: client.V1PodList = await self._core_api.list_namespaced_pod(self.namespace, label_selector=label_selector) + for pod in pods.items: + pod_names.append(pod.metadata.name) + + return pod_names def get_processes(self, pod: Pod) -> list[PodProcess]: """Get list of processes in the given pod""" From 642b4f51eb748e84d98c929a18b788bf17c630fd Mon Sep 17 00:00:00 2001 From: tmontfort Date: Mon, 24 Nov 2025 16:04:13 -0500 Subject: [PATCH 08/15] small nits --- tests/fault_tolerance/deploy/scenarios.py | 7 +++++-- tests/utils/managed_deployment.py | 18 ++++++++++-------- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/tests/fault_tolerance/deploy/scenarios.py b/tests/fault_tolerance/deploy/scenarios.py index 746889a932..021d4ebb6a 100644 --- a/tests/fault_tolerance/deploy/scenarios.py +++ b/tests/fault_tolerance/deploy/scenarios.py @@ -16,6 +16,7 @@ import logging import re from dataclasses import dataclass, field +import time from abc import ABC, abstractmethod from enum import Enum, auto from typing import TYPE_CHECKING, Dict, List, Optional, Pattern @@ -207,6 +208,10 @@ async def execute( await deployment._wait_for_ready(timeout=1800) # 30 minute timeout + time.sleep( + self.time + ) # have some requests processed after the rolling upgrade has completed + return await deployment.get_pod_names(self.service_names) def get_failure_key(self) -> str: @@ -915,8 +920,6 @@ def add_rolling_upgrade_scenarios(): service_names: list[str] = [] - ## TODO: maybe add a bit of buffer time after the rolling upgrade is completed (after the DGD is ready again) - # setting replicas to 2 so we have availability of 1 replica at a time if worker_mode == "agg" and backend == "trtllm": service_names.append(WORKER_MAP[backend]["decode_agg"]) diff --git a/tests/utils/managed_deployment.py b/tests/utils/managed_deployment.py index c47bdd5729..35fdc67500 100644 --- a/tests/utils/managed_deployment.py +++ b/tests/utils/managed_deployment.py @@ -585,13 +585,13 @@ async def _wait_for_condition( timeout: int = 1800, sleep=1, log_interval=60, - ready_condition_val: bool = True, - state_val: str = "successful", + desired_ready_condition_val: bool = True, + desired_state_val: str = "successful", ): start_time = time.time() self._logger.info( - f"Waiting for Deployment {self._deployment_name} to have Ready condition {ready_condition_val} and state {state_val}" + f"Waiting for Deployment {self._deployment_name} to have Ready condition {desired_ready_condition_val} and state {desired_state_val}" ) attempt = 0 @@ -618,14 +618,16 @@ async def _wait_for_condition( for condition in conditions: if condition.get("type") == "Ready": observed_ready_condition_val = condition.get("status") - if observed_ready_condition_val == str(ready_condition_val): + if observed_ready_condition_val == str( + desired_ready_condition_val + ): break observed_state_val = status_obj.get("state") # type: ignore[attr-defined] if ( - observed_ready_condition_val == str(ready_condition_val) - and observed_state_val == state_val + observed_ready_condition_val == str(desired_ready_condition_val) + and observed_state_val == desired_state_val ): self._logger.info(f"Current deployment state: {current_state}") self._logger.info(f"Current conditions: {conditions}") @@ -634,7 +636,7 @@ async def _wait_for_condition( ) self._logger.info( - f"Deployment {self._deployment_name} has Ready condition {ready_condition_val} and state {state_val}" + f"Deployment {self._deployment_name} has Ready condition {desired_ready_condition_val} and state {desired_state_val}" ) return True else: @@ -645,7 +647,7 @@ async def _wait_for_condition( f"Elapsed time: {time.time() - start_time:.1f}s / {timeout}s" ) self._logger.info( - f"Deployment has Ready condition {observed_ready_condition_val} and state {observed_state_val}, desired condition {ready_condition_val} and state {state_val}" + f"Deployment has Ready condition {observed_ready_condition_val} and state {observed_state_val}, desired condition {desired_ready_condition_val} and state {desired_state_val}" ) except exceptions.ApiException as e: From 70c61fb194dea1e53c02e4ca022a508157b6ab0c Mon Sep 17 00:00:00 2001 From: tmontfort Date: Tue, 25 Nov 2025 11:33:49 -0500 Subject: [PATCH 09/15] small nits --- tests/fault_tolerance/deploy/client.py | 29 +++++------ tests/fault_tolerance/deploy/scenarios.py | 4 +- .../fault_tolerance/deploy/test_deployment.py | 4 +- tests/utils/managed_deployment.py | 48 +++++++++++-------- 4 files changed, 42 insertions(+), 43 deletions(-) diff --git a/tests/fault_tolerance/deploy/client.py b/tests/fault_tolerance/deploy/client.py index e2e09b6fb1..6440a04fbe 100644 --- a/tests/fault_tolerance/deploy/client.py +++ b/tests/fault_tolerance/deploy/client.py @@ -363,7 +363,6 @@ def run_aiperf( # Note: For continuous load, we only run once and expect SIGINT to stop it max_attempts = 1 if continuous_load else (max_retries if max_retries > 0 else 1) success = False - all_results = [] for attempt in range(max_attempts): if continuous_load: @@ -394,16 +393,6 @@ def run_aiperf( f.write("\n\n=== STDERR ===\n") f.write(result.stderr) - all_results.append( - { - "attempt": attempt + 1, - "returncode": result.returncode, - "stdout": result.stdout, - "stderr": result.stderr, - } - ) - - # Even with continuous load, with SIGINT, aiperf should return 0 and create the profile_export_aiperf.json file if result.returncode == 0: # AI-Perf returns 0 even if all requests failed, so we need to check the output json_path = attempt_dir / "profile_export_aiperf.json" @@ -440,7 +429,6 @@ def run_aiperf( ) except Exception as e: logger.error(f"Error in attempt {attempt + 1}: {str(e)}") - all_results.append({"attempt": attempt + 1, "error": str(e)}) # Sleep before next attempt (if not the last attempt and not continuous load) if not success and attempt < max_attempts - 1 and not continuous_load: @@ -468,9 +456,9 @@ def run_aiperf_with_signal_handling( """ Run aiperf with signal handling for graceful shutdown. - Handles SIGINT forwarding and timeout when running with subprocess.Popen. - This ensures that Ctrl-C and SIGINT are properly forwarded to the subprocess - so it can clean up gracefully and write results files. + Handles SIGINT and SIGTERM forwarding and timeout when running with subprocess.Popen. + This ensures that Ctrl-C (SIGINT) and graceful termination signals (SIGTERM) + are properly forwarded to the subprocess so it can clean up gracefully and write results files. """ proc = subprocess.Popen( cmd_attempt, @@ -480,15 +468,20 @@ def run_aiperf_with_signal_handling( stdin=subprocess.DEVNULL, ) - # Set up signal handler to forward SIGINT to subprocess def signal_handler(signum, frame): - logger.info(f"Received signal {signum}, forwarding to aiperf subprocess") + signal_names = { + signal.SIGINT: "SIGINT", + signal.SIGTERM: "SIGTERM", + } + signal_name = signal_names.get(signum, f"signal {signum}") + logger.info(f"Received {signal_name}, forwarding to aiperf subprocess") try: - proc.send_signal(signal.SIGINT) + proc.send_signal(signum) except ProcessLookupError: pass # Process already terminated signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) try: stdout, stderr = proc.communicate(timeout=timeout) diff --git a/tests/fault_tolerance/deploy/scenarios.py b/tests/fault_tolerance/deploy/scenarios.py index 021d4ebb6a..c7128be888 100644 --- a/tests/fault_tolerance/deploy/scenarios.py +++ b/tests/fault_tolerance/deploy/scenarios.py @@ -13,10 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio import logging import re from dataclasses import dataclass, field -import time from abc import ABC, abstractmethod from enum import Enum, auto from typing import TYPE_CHECKING, Dict, List, Optional, Pattern @@ -208,7 +208,7 @@ async def execute( await deployment._wait_for_ready(timeout=1800) # 30 minute timeout - time.sleep( + await asyncio.sleep( self.time ) # have some requests processed after the rolling upgrade has completed diff --git a/tests/fault_tolerance/deploy/test_deployment.py b/tests/fault_tolerance/deploy/test_deployment.py index 04137cacaa..cb88d77bcb 100644 --- a/tests/fault_tolerance/deploy/test_deployment.py +++ b/tests/fault_tolerance/deploy/test_deployment.py @@ -1,12 +1,12 @@ # SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 +import asyncio import logging import multiprocessing import os import re import signal -import time from contextlib import contextmanager from typing import Any from multiprocessing.context import SpawnProcess @@ -229,7 +229,7 @@ async def _inject_failures( affected_pods: dict[str, list] = {} for failure in failures: - time.sleep(failure.time) + await asyncio.sleep(failure.time) logger.info(f"Injecting failure for: {failure}") diff --git a/tests/utils/managed_deployment.py b/tests/utils/managed_deployment.py index 35fdc67500..ac84f4559e 100644 --- a/tests/utils/managed_deployment.py +++ b/tests/utils/managed_deployment.py @@ -57,6 +57,15 @@ def image(self) -> Optional[str]: except KeyError: return None + @property + def envs(self) -> list[dict[str, str]]: + """Environment variables for the service""" + return self._spec.get("envs", []) + + @envs.setter + def envs(self, value: list[dict[str, str]]): + self._spec["envs"] = value + @image.setter def image(self, value: str): if "extraPodSpec" not in self._spec: @@ -318,13 +327,9 @@ def set_service_env_var(self, service_name: str, name: str, value: str): """ Set an environment variable for a specific service """ - # Check service exists - if service_name not in self._deployment_spec["spec"]["services"]: - raise ValueError(f"Service '{service_name}' not found in deployment spec") - - service = self._deployment_spec["spec"]["services"][service_name] - if "envs" not in service: - service["envs"] = [] + service = self.get_service(service_name) + if service.envs is None: + service.envs = [] # if env var already exists, update it for env in service["envs"]: @@ -342,11 +347,8 @@ def get_service_env_vars(self, service_name: str) -> list[dict]: Returns: List of environment variable dicts (e.g., [{"name": "VAR", "value": "val"}]) """ - # Check service exists - if service_name not in self._deployment_spec["spec"]["services"]: - raise ValueError(f"Service '{service_name}' not found in deployment spec") - - return self._deployment_spec["spec"]["services"][service_name].get("envs", []) + service = self.get_service(service_name) + return service.envs @property def services(self) -> list[ServiceSpec]: @@ -374,11 +376,7 @@ def add_arg_to_service(self, service_name: str, arg_name: str, arg_value: str): arg_name: Argument name (e.g., "--max-model-len", "--max-seq-len") arg_value: Argument value (e.g., "1024") """ - # Get the service - if service_name not in self._deployment_spec["spec"]["services"]: - raise ValueError(f"Service '{service_name}' not found in deployment spec") - - service = self._deployment_spec["spec"]["services"][service_name] + service = self.get_service(service_name) # Ensure args list exists if "extraPodSpec" not in service: @@ -418,15 +416,23 @@ def add_arg_to_service(self, service_name: str, arg_name: str, arg_value: str): # Add new argument args_list.extend([arg_name, arg_value]) - def set_service_replicas(self, service_name: str, replicas: int): + def get_service(self, service_name: str) -> ServiceSpec: """ - Set the number of replicas for a specific service + Get a specific service from the deployment spec """ - # Check service exists if service_name not in self._deployment_spec["spec"]["services"]: raise ValueError(f"Service '{service_name}' not found in deployment spec") - self._deployment_spec["spec"]["services"][service_name]["replicas"] = replicas + return ServiceSpec( + service_name, self._deployment_spec["spec"]["services"][service_name] + ) + + def set_service_replicas(self, service_name: str, replicas: int): + """ + Set the number of replicas for a specific service + """ + service = self.get_service(service_name) + service.replicas = replicas def save(self, out_file: str): """Save updated deployment to file""" From 147b68abd96de96524ca7fb51f1938ebc7973d71 Mon Sep 17 00:00:00 2001 From: tmontfort Date: Tue, 25 Nov 2025 11:40:51 -0500 Subject: [PATCH 10/15] skip-service-restart --- tests/fault_tolerance/deploy/conftest.py | 6 +++--- tests/fault_tolerance/deploy/test_deployment.py | 4 ++-- tests/utils/managed_deployment.py | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/tests/fault_tolerance/deploy/conftest.py b/tests/fault_tolerance/deploy/conftest.py index 74f463a9b6..2fb85fb5ad 100644 --- a/tests/fault_tolerance/deploy/conftest.py +++ b/tests/fault_tolerance/deploy/conftest.py @@ -36,7 +36,7 @@ def pytest_addoption(parser): "By default, these tests are excluded.", ) parser.addoption( - "--skip-restart-services", + "--skip-service-restart", action="store_true", default=False, help="Skip restarting NATS and etcd services before deployment. " @@ -119,6 +119,6 @@ def client_type(request): @pytest.fixture -def skip_restart_services(request): +def skip_service_restart(request): """Get skip restart services flag from command line.""" - return request.config.getoption("--skip-restart-services") + return request.config.getoption("--skip-service-restart") diff --git a/tests/fault_tolerance/deploy/test_deployment.py b/tests/fault_tolerance/deploy/test_deployment.py index cb88d77bcb..56ccf5c2fc 100644 --- a/tests/fault_tolerance/deploy/test_deployment.py +++ b/tests/fault_tolerance/deploy/test_deployment.py @@ -446,7 +446,7 @@ async def test_fault_scenario( image: str, namespace: str, validation_context, # noqa: F811 # Shared context for passing data to validation - skip_restart_services: bool, + skip_service_restart: bool, ): """ Test dynamo serve deployments with injected failures @@ -497,7 +497,7 @@ async def test_fault_scenario( namespace=namespace, log_dir=request.node.log_dir, deployment_spec=scenario.deployment, - skip_restart_services=skip_restart_services, + skip_service_restart=skip_service_restart, ) as deployment: # Populate shared context for validation validation_context["deployment"] = deployment diff --git a/tests/utils/managed_deployment.py b/tests/utils/managed_deployment.py index ac84f4559e..37bf50b2d4 100644 --- a/tests/utils/managed_deployment.py +++ b/tests/utils/managed_deployment.py @@ -492,7 +492,7 @@ class ManagedDeployment: # TODO: this should be determined by the deployment_spec # the service containing component_type: Frontend determines what is actually the frontend service frontend_service_name: str = "Frontend" - skip_restart_services: bool = False + skip_service_restart: bool = False _custom_api: Optional[client.CustomObjectsApi] = None _core_api: Optional[client.CoreV1Api] = None @@ -1009,7 +1009,7 @@ async def __aenter__(self): # Run delete deployment and service restarts in parallel tasks = [self._delete_deployment()] - if not self.skip_restart_services: + if not self.skip_service_restart: tasks.extend([self._restart_etcd(), self._restart_nats()]) await asyncio.gather(*tasks) From 8af7913386d13fb92cbbc23ba39391b1a53851aa Mon Sep 17 00:00:00 2001 From: tmontfort Date: Tue, 25 Nov 2025 12:23:19 -0500 Subject: [PATCH 11/15] small fix --- tests/utils/managed_deployment.py | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/tests/utils/managed_deployment.py b/tests/utils/managed_deployment.py index 37bf50b2d4..2a5fbfb73c 100644 --- a/tests/utils/managed_deployment.py +++ b/tests/utils/managed_deployment.py @@ -332,13 +332,13 @@ def set_service_env_var(self, service_name: str, name: str, value: str): service.envs = [] # if env var already exists, update it - for env in service["envs"]: + for env in service.envs: if env["name"] == name: env["value"] = value return # if env var does not exist, add it - service["envs"].append({"name": name, "value": value}) + service.envs.append({"name": name, "value": value}) def get_service_env_vars(self, service_name: str) -> list[dict]: """ @@ -377,23 +377,24 @@ def add_arg_to_service(self, service_name: str, arg_name: str, arg_value: str): arg_value: Argument value (e.g., "1024") """ service = self.get_service(service_name) + service_spec = service._spec # Ensure args list exists - if "extraPodSpec" not in service: - service["extraPodSpec"] = {"mainContainer": {}} - if "mainContainer" not in service["extraPodSpec"]: - service["extraPodSpec"]["mainContainer"] = {} - if "args" not in service["extraPodSpec"]["mainContainer"]: - service["extraPodSpec"]["mainContainer"]["args"] = [] + if "extraPodSpec" not in service_spec: + service_spec["extraPodSpec"] = {"mainContainer": {}} + if "mainContainer" not in service_spec["extraPodSpec"]: + service_spec["extraPodSpec"]["mainContainer"] = {} + if "args" not in service_spec["extraPodSpec"]["mainContainer"]: + service_spec["extraPodSpec"]["mainContainer"]["args"] = [] - args_list = service["extraPodSpec"]["mainContainer"]["args"] + args_list = service_spec["extraPodSpec"]["mainContainer"]["args"] # Convert to list if needed (sometimes it's a single string) if isinstance(args_list, str): import shlex args_list = shlex.split(args_list) - service["extraPodSpec"]["mainContainer"]["args"] = args_list + service_spec["extraPodSpec"]["mainContainer"]["args"] = args_list # Find existing argument arg_index = None From 540f2742cb265c1b10573d861564d3d7d5c14bde Mon Sep 17 00:00:00 2001 From: tmontfort Date: Tue, 2 Dec 2025 14:20:13 -0800 Subject: [PATCH 12/15] return pod names --- tests/fault_tolerance/deploy/scenarios.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/tests/fault_tolerance/deploy/scenarios.py b/tests/fault_tolerance/deploy/scenarios.py index c7128be888..5e354c5965 100644 --- a/tests/fault_tolerance/deploy/scenarios.py +++ b/tests/fault_tolerance/deploy/scenarios.py @@ -16,8 +16,8 @@ import asyncio import logging import re -from dataclasses import dataclass, field from abc import ABC, abstractmethod +from dataclasses import dataclass, field from enum import Enum, auto from typing import TYPE_CHECKING, Dict, List, Optional, Pattern @@ -213,7 +213,7 @@ async def execute( ) # have some requests processed after the rolling upgrade has completed return await deployment.get_pod_names(self.service_names) - + def get_failure_key(self) -> str: """Get the failure key for the rolling upgrade failure.""" return f"rolling_upgrade:{','.join(self.service_names)}" @@ -237,6 +237,8 @@ async def execute( pod.delete(force=True) # force means no graceful termination pod_names.append(pod.name) + return pod_names + def get_failure_key(self) -> str: """Get the failure key for the delete pod failure.""" return f"delete_pod:{','.join(self.service_names)}" @@ -288,8 +290,9 @@ async def execute( ) process.kill(self.signal) pod_names.append(pod.name) + return pod_names - + def get_failure_key(self) -> str: """Get the failure key for the terminate process failure.""" return f"terminate_process:{','.join(self.service_names)}:{self.process_name}:{self.signal}" From 9b268c94cebb9bf58e38b6e0f349aca1dfd94d96 Mon Sep 17 00:00:00 2001 From: tmontfort Date: Thu, 4 Dec 2025 16:29:48 -0800 Subject: [PATCH 13/15] fix rolling upgrade test --- tests/utils/managed_deployment.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/tests/utils/managed_deployment.py b/tests/utils/managed_deployment.py index 2a5fbfb73c..e040875a83 100644 --- a/tests/utils/managed_deployment.py +++ b/tests/utils/managed_deployment.py @@ -328,17 +328,18 @@ def set_service_env_var(self, service_name: str, name: str, value: str): Set an environment variable for a specific service """ service = self.get_service(service_name) - if service.envs is None: - service.envs = [] + envs = service.envs if service.envs is not None else [] # if env var already exists, update it - for env in service.envs: + for env in envs: if env["name"] == name: env["value"] = value + service.envs = envs # Save back to trigger the setter return # if env var does not exist, add it - service.envs.append({"name": name, "value": value}) + envs.append({"name": name, "value": value}) + service.envs = envs # Save back to trigger the setter def get_service_env_vars(self, service_name: str) -> list[dict]: """ @@ -754,7 +755,7 @@ async def trigger_rolling_upgrade(self, service_names: list[str]): f"Failed to patch deployment {self._deployment_name}: {e}" ) raise - + async def get_pod_names(self, service_names: list[str] | None = None) -> list[str]: if not service_names: service_names = [service.name for service in self.deployment_spec.services] @@ -766,10 +767,12 @@ async def get_pod_names(self, service_names: list[str] | None = None) -> list[st f"nvidia.com/selector={self._deployment_name}-{service_name.lower()}" ) assert self._core_api is not None, "Kubernetes API not initialized" - pods: client.V1PodList = await self._core_api.list_namespaced_pod(self.namespace, label_selector=label_selector) + pods: client.V1PodList = await self._core_api.list_namespaced_pod( + self.namespace, label_selector=label_selector + ) for pod in pods.items: pod_names.append(pod.metadata.name) - + return pod_names def get_processes(self, pod: Pod) -> list[PodProcess]: From 3de77cd53b4a506e0266de2a08ea38bc2e9f2b3e Mon Sep 17 00:00:00 2001 From: tmontfort Date: Thu, 4 Dec 2025 16:37:33 -0800 Subject: [PATCH 14/15] revert to request.node.name for log_dir --- tests/conftest.py | 11 +++------ tests/fault_tolerance/deploy/client.py | 1 + .../fault_tolerance/deploy/test_deployment.py | 24 +++++++++---------- 3 files changed, 16 insertions(+), 20 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index f315d2dafd..e4a4b562a6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -19,7 +19,6 @@ import tempfile from pathlib import Path from typing import Optional -from datetime import datetime import pytest from filelock import FileLock @@ -192,14 +191,10 @@ def predownload_tokenizers(pytestconfig): @pytest.fixture(autouse=True) def logger(request): - timestamp = datetime.now().strftime("%m-%d-%Y_%H-%M-%S") - log_dir = f"{request.node.name}_{timestamp}" - request.node.log_dir = log_dir - log_path = os.path.join(log_dir, "test.log.txt") - + log_path = os.path.join(request.node.name, "test.log.txt") logger = logging.getLogger() - shutil.rmtree(log_dir, ignore_errors=True) - os.makedirs(log_dir, exist_ok=True) + shutil.rmtree(request.node.name, ignore_errors=True) + os.makedirs(request.node.name, exist_ok=True) handler = logging.FileHandler(log_path, mode="w") formatter = logging.Formatter(LOG_FORMAT, datefmt=DATE_FORMAT) handler.setFormatter(formatter) diff --git a/tests/fault_tolerance/deploy/client.py b/tests/fault_tolerance/deploy/client.py index 6440a04fbe..8a69cc4f7e 100644 --- a/tests/fault_tolerance/deploy/client.py +++ b/tests/fault_tolerance/deploy/client.py @@ -448,6 +448,7 @@ def run_aiperf( return success +# TODO: use file redirection and wait() instead of pipes and communicate def run_aiperf_with_signal_handling( cmd_attempt: List[str], logger: logging.Logger, diff --git a/tests/fault_tolerance/deploy/test_deployment.py b/tests/fault_tolerance/deploy/test_deployment.py index 56ccf5c2fc..efc448396a 100644 --- a/tests/fault_tolerance/deploy/test_deployment.py +++ b/tests/fault_tolerance/deploy/test_deployment.py @@ -8,8 +8,8 @@ import re import signal from contextlib import contextmanager -from typing import Any from multiprocessing.context import SpawnProcess +from typing import Any import pytest @@ -191,7 +191,6 @@ def _clients( logger.debug(f"{proc} joined") - def _terminate_client_processes( client_procs: list[SpawnProcess], logger: logging.Logger, @@ -233,7 +232,9 @@ async def _inject_failures( logger.info(f"Injecting failure for: {failure}") - affected_pods[failure.get_failure_key()] = await failure.execute(deployment, logger) + affected_pods[failure.get_failure_key()] = await failure.execute( + deployment, logger + ) return affected_pods @@ -263,9 +264,6 @@ def validation_context(request, scenario): # noqa: F811 yield context # Test receives this and populates it - # Get log_dir from request.node if available (set by test), otherwise use node.name - base_log_dir = getattr(request.node, "log_dir", request.node.name) - # Determine log paths based on whether this is a mixed token test log_paths = [] test_name = request.node.name @@ -273,8 +271,8 @@ def validation_context(request, scenario): # noqa: F811 if hasattr(scenario.load, "mixed_token_test") and scenario.load.mixed_token_test: # For mixed token tests, we have separate overflow and recovery directories - overflow_dir = f"{base_log_dir}{OVERFLOW_SUFFIX}" - recovery_dir = f"{base_log_dir}{RECOVERY_SUFFIX}" + overflow_dir = f"{request.node.name}{OVERFLOW_SUFFIX}" + recovery_dir = f"{request.node.name}{RECOVERY_SUFFIX}" log_paths = [overflow_dir, recovery_dir] logging.info("Mixed token test detected. Looking for results in:") @@ -282,7 +280,7 @@ def validation_context(request, scenario): # noqa: F811 logging.info(f" - Recovery phase: {recovery_dir}") else: # Standard test with single directory - log_paths = [base_log_dir] + log_paths = [request.node.name] # Use factory to auto-detect and parse results try: @@ -495,7 +493,7 @@ async def test_fault_scenario( async with ManagedDeployment( namespace=namespace, - log_dir=request.node.log_dir, + log_dir=request.node.name, deployment_spec=scenario.deployment, skip_service_restart=skip_service_restart, ) as deployment: @@ -505,14 +503,16 @@ async def test_fault_scenario( with _clients( logger, - request.node.log_dir, + request.node.name, scenario.deployment, namespace, model, scenario.load, # Pass entire Load config object ) as client_procs: # Inject failures and capture which pods were affected - affected_pods = await _inject_failures(scenario.failures, logger, deployment) + affected_pods = await _inject_failures( + scenario.failures, logger, deployment + ) logger.info(f"Affected pods during test: {affected_pods}") if scenario.load.continuous_load: From 3b0dc3e3d943946035ebe8ab385a5765a7eeea59 Mon Sep 17 00:00:00 2001 From: tmontfort Date: Mon, 8 Dec 2025 20:09:19 -0800 Subject: [PATCH 15/15] fix mypy errors --- tests/fault_tolerance/deploy/client.py | 3 ++- tests/fault_tolerance/deploy/scenarios.py | 8 ++++---- .../fault_tolerance/deploy/test_deployment.py | 3 ++- tests/utils/managed_deployment.py | 18 +++++++++--------- 4 files changed, 17 insertions(+), 15 deletions(-) diff --git a/tests/fault_tolerance/deploy/client.py b/tests/fault_tolerance/deploy/client.py index 8a69cc4f7e..89dd7daec7 100644 --- a/tests/fault_tolerance/deploy/client.py +++ b/tests/fault_tolerance/deploy/client.py @@ -25,6 +25,7 @@ from typing import Any, Dict, List, Optional, Tuple import requests +from kr8s.objects import Pod from tests.utils.managed_deployment import ManagedDeployment @@ -45,7 +46,7 @@ def get_frontend_port( deployment_spec: Any, pod_ports: Dict[str, Any], logger: logging.Logger, -) -> Tuple[Optional[str], Optional[int], Optional[str]]: +) -> Tuple[Optional[str], Optional[int], Optional[Pod]]: """ Select a frontend pod using round-robin and setup port forwarding. diff --git a/tests/fault_tolerance/deploy/scenarios.py b/tests/fault_tolerance/deploy/scenarios.py index 5e354c5965..817f28394d 100644 --- a/tests/fault_tolerance/deploy/scenarios.py +++ b/tests/fault_tolerance/deploy/scenarios.py @@ -199,7 +199,7 @@ class RollingUpgradeFailure(Failure): async def execute( self, deployment: ManagedDeployment, logger: logging.Logger - ) -> None: + ) -> list[str]: """Execute rolling upgrade failure injection.""" await deployment.trigger_rolling_upgrade(self.service_names) @@ -225,7 +225,7 @@ class DeletePodFailure(Failure): async def execute( self, deployment: ManagedDeployment, logger: logging.Logger - ) -> None: + ) -> list[str]: """Execute pod deletion failure injection.""" service_pod_dict = deployment.get_pods(self.service_names) pod_names: list[str] = [] @@ -276,7 +276,7 @@ def __init__( async def execute( self, deployment: ManagedDeployment, logger: logging.Logger - ) -> None: + ) -> list[str]: """Execute process termination failure injection.""" service_pod_dict = deployment.get_pods(self.service_names) pod_names: list[str] = [] @@ -323,7 +323,7 @@ def __init__( async def execute( self, deployment: ManagedDeployment, logger: logging.Logger - ) -> None: + ) -> list[str]: """Token overflow is handled client-side, so this is a no-op.""" # The actual overflow is handled by the client configuration # which uses the input_token_length from the Load config diff --git a/tests/fault_tolerance/deploy/test_deployment.py b/tests/fault_tolerance/deploy/test_deployment.py index efc448396a..8fe12dba20 100644 --- a/tests/fault_tolerance/deploy/test_deployment.py +++ b/tests/fault_tolerance/deploy/test_deployment.py @@ -9,7 +9,7 @@ import signal from contextlib import contextmanager from multiprocessing.context import SpawnProcess -from typing import Any +from typing import Any, Optional import pytest @@ -463,6 +463,7 @@ async def test_fault_scenario( if image: scenario.deployment.set_image(image) + model: Optional[str] = None if scenario.model: scenario.deployment.set_model(scenario.model) model = scenario.model diff --git a/tests/utils/managed_deployment.py b/tests/utils/managed_deployment.py index e040875a83..5ee541833d 100644 --- a/tests/utils/managed_deployment.py +++ b/tests/utils/managed_deployment.py @@ -57,6 +57,14 @@ def image(self) -> Optional[str]: except KeyError: return None + @image.setter + def image(self, value: str): + if "extraPodSpec" not in self._spec: + self._spec["extraPodSpec"] = {"mainContainer": {}} + if "mainContainer" not in self._spec["extraPodSpec"]: + self._spec["extraPodSpec"]["mainContainer"] = {} + self._spec["extraPodSpec"]["mainContainer"]["image"] = value + @property def envs(self) -> list[dict[str, str]]: """Environment variables for the service""" @@ -66,14 +74,6 @@ def envs(self) -> list[dict[str, str]]: def envs(self, value: list[dict[str, str]]): self._spec["envs"] = value - @image.setter - def image(self, value: str): - if "extraPodSpec" not in self._spec: - self._spec["extraPodSpec"] = {"mainContainer": {}} - if "mainContainer" not in self._spec["extraPodSpec"]: - self._spec["extraPodSpec"]["mainContainer"] = {} - self._spec["extraPodSpec"]["mainContainer"]["image"] = value - # ----- Replicas ----- @property def replicas(self) -> int: @@ -729,7 +729,7 @@ async def trigger_rolling_upgrade(self, service_names: list[str]): "service_names cannot be empty for trigger_rolling_upgrade" ) - patch_body = {"spec": {"services": {}}} + patch_body: dict[str, Any] = {"spec": {"services": {}}} for service_name in service_names: self.deployment_spec.set_service_env_var(