Skip to content
Merged
142 changes: 108 additions & 34 deletions tests/fault_tolerance/deploy/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
import json
import logging
import os
import signal
import subprocess
import time
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple

import requests
from kr8s.objects import Pod

from tests.utils.managed_deployment import ManagedDeployment

Expand All @@ -44,7 +46,7 @@ def get_frontend_port(
deployment_spec: Any,
pod_ports: Dict[str, Any],
logger: logging.Logger,
) -> Tuple[Optional[str], Optional[int], Optional[str]]:
) -> Tuple[Optional[str], Optional[int], Optional[Pod]]:
"""
Select a frontend pod using round-robin and setup port forwarding.

Expand All @@ -60,7 +62,7 @@ def get_frontend_port(
Returns:
Tuple of (pod_name, local_port, pod_instance) or (None, None, None) if failed
"""
pods = managed_deployment.get_pods(managed_deployment.frontend_service_name)
pods = managed_deployment.get_pods([managed_deployment.frontend_service_name])

port = 0
pod_name = None
Expand Down Expand Up @@ -270,6 +272,7 @@ def run_aiperf(
logger: logging.Logger,
max_retries: int = 1,
retry_delay: float = 1,
continuous_load: bool = False,
) -> bool:
"""
Execute AI-Perf with specified parameters.
Expand All @@ -280,13 +283,14 @@ def run_aiperf(
model: Model name
pod_name: Selected pod name for logging
port: Local port number
requests_per_client: Number of requests to send
requests_per_client: Number of requests to send (used if continuous load not enabled)
input_token_length: Input token count
output_token_length: Output token count
output_dir: Directory for AI-Perf artifacts
logger: Logger instance
max_retries: Maximum number of retry attempts (default: 1)
retry_delay: Delay in seconds between retries (default: 1)
continuous_load: If True, use continuous load instead of fixed request count

Returns:
True if successful, False otherwise
Expand Down Expand Up @@ -315,8 +319,6 @@ def run_aiperf(
# Enable streaming for TTFT and ITL metrics
"--streaming",
# Request parameters
"--request-count",
str(requests_per_client), # Required: how many requests
"--concurrency",
"1", # Optional: we set to 1 for sequential
# Token configuration
Expand All @@ -338,8 +340,13 @@ def run_aiperf(
"100", # For reproducible results
]

# Calculate timeout (same as legacy would for all requests)
timeout = max(requests_per_client * 2 + 60, 300) # At least 5 minutes
if continuous_load:
cmd.extend(["--benchmark-duration", "1800"]) # 30 minutes for continuous load
logger.info("Using continuous load with duration: 30 minutes")
timeout = 1860 # 31 minutes default for duration-based tests (30 minutes + 1 minute buffer)
else:
cmd.extend(["--request-count", str(requests_per_client)])
timeout = max(requests_per_client * 2 + 60, 300) # At least 5 minutes

# Log execution
logger.info(f"Starting AI-Perf for Pod {pod_name} Local Port {port}")
Expand All @@ -354,15 +361,19 @@ def run_aiperf(
logger.info(f"Command: {' '.join(cmd)}")

# Retry logic for fault tolerance - retry FULL request count until success

max_attempts = max_retries if max_retries > 0 else 1
# Note: For continuous load, we only run once and expect SIGINT to stop it
max_attempts = 1 if continuous_load else (max_retries if max_retries > 0 else 1)
success = False
all_results = []

for attempt in range(max_attempts):
logger.info(
f"AI-Perf attempt {attempt + 1}/{max_attempts} with {requests_per_client} requests"
)
if continuous_load:
logger.info(
"AI-Perf continuous load (will run until interrupted by SIGINT)"
)
else:
logger.info(
f"AI-Perf attempt {attempt + 1}/{max_attempts} with {requests_per_client} requests"
)

# Update output directory for this attempt
attempt_dir = output_dir / f"attempt_{attempt}"
Expand All @@ -374,13 +385,7 @@ def run_aiperf(
cmd_attempt[artifact_dir_idx] = str(attempt_dir)

try:
result = subprocess.run(
cmd_attempt,
capture_output=True,
text=True,
timeout=timeout,
stdin=subprocess.DEVNULL, # Prevent stdin reading which can cause process suspension
)
result = run_aiperf_with_signal_handling(cmd_attempt, logger, timeout)

# Save logs for this attempt
with open(attempt_dir / "genai_perf.log", "w") as f:
Expand All @@ -389,15 +394,6 @@ def run_aiperf(
f.write("\n\n=== STDERR ===\n")
f.write(result.stderr)

all_results.append(
{
"attempt": attempt + 1,
"returncode": result.returncode,
"stdout": result.stdout,
"stderr": result.stderr,
}
)

if result.returncode == 0:
# AI-Perf returns 0 even if all requests failed, so we need to check the output
json_path = attempt_dir / "profile_export_aiperf.json"
Expand All @@ -412,6 +408,19 @@ def run_aiperf(
)
if success:
break # Success - exit the retry loop
## TODO: bug with aiperf git+https://github.com/ai-dynamo/aiperf.git@4d3fa29403c8f75da22a14f1f7b3aeb27db9288f
## where sending a SIGINT on Mac can sometimes have an error code of -9 (SIGABRT) which results in profile_export_aiperf.json not being created
elif result.returncode == -9 and continuous_load:
logger.warning(
f"""
Attempt {attempt + 1} failed with return code {result.returncode}
This is a known bug with aiperf on Mac where sending a SIGINT can sometimes have an error code of -9 (SIGABRT)
which results in profile_export_aiperf.json not being created
"""
)
logger.debug(
f"Stderr: {result.stderr[:500] if result.stderr else 'No stderr'}"
)
else:
logger.warning(
f"Attempt {attempt + 1} failed with return code {result.returncode}"
Expand All @@ -421,22 +430,84 @@ def run_aiperf(
)
except Exception as e:
logger.error(f"Error in attempt {attempt + 1}: {str(e)}")
all_results.append({"attempt": attempt + 1, "error": str(e)})

# Sleep before next attempt (if not the last attempt)
if not success and attempt < max_attempts - 1:
# Sleep before next attempt (if not the last attempt and not continuous load)
if not success and attempt < max_attempts - 1 and not continuous_load:
time.sleep(retry_delay)

if success:
if success and not continuous_load:
logger.info(
f"AI-Perf successfully completed all {requests_per_client} requests for {pod_name}"
)
elif success and continuous_load:
logger.info(
f"AI-Perf sustained continuous load for {pod_name} and existed succesfully"
)
else:
logger.error(f"AI-Perf failed all {max_attempts} attempts for {pod_name}")

return success


# TODO: use file redirection and wait() instead of pipes and communicate
def run_aiperf_with_signal_handling(
cmd_attempt: List[str],
logger: logging.Logger,
timeout: int,
) -> subprocess.CompletedProcess:
"""
Run aiperf with signal handling for graceful shutdown.

Handles SIGINT and SIGTERM forwarding and timeout when running with subprocess.Popen.
This ensures that Ctrl-C (SIGINT) and graceful termination signals (SIGTERM)
are properly forwarded to the subprocess so it can clean up gracefully and write results files.
"""
proc = subprocess.Popen(
cmd_attempt,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
stdin=subprocess.DEVNULL,
)

def signal_handler(signum, frame):
signal_names = {
signal.SIGINT: "SIGINT",
signal.SIGTERM: "SIGTERM",
}
signal_name = signal_names.get(signum, f"signal {signum}")
logger.info(f"Received {signal_name}, forwarding to aiperf subprocess")
try:
proc.send_signal(signum)
except ProcessLookupError:
pass # Process already terminated

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

try:
stdout, stderr = proc.communicate(timeout=timeout)
returncode = proc.returncode
except subprocess.TimeoutExpired:
logger.warning(f"AI-Perf subprocess timed out after {timeout}s")
proc.kill()
stdout, stderr = proc.communicate()
returncode = proc.returncode
except KeyboardInterrupt:
logger.info("Received KeyboardInterrupt, sending SIGINT to aiperf subprocess")
proc.send_signal(signal.SIGINT)
try:
stdout, stderr = proc.communicate(timeout=30) # Give it time to clean up
returncode = proc.returncode
except subprocess.TimeoutExpired:
logger.warning("Subprocess didn't terminate gracefully, killing it")
proc.kill()
stdout, stderr = proc.communicate()
returncode = proc.returncode

return subprocess.CompletedProcess(cmd_attempt, returncode, stdout, stderr)


def log_summary_metrics(
output_dir: Path, logger: logging.Logger, pod_name: str, port: int
) -> None:
Expand Down Expand Up @@ -513,6 +584,7 @@ def client(
output_token_length: int,
max_retries: int,
retry_delay: float = 1,
continuous_load: bool = False,
):
"""
Generate load using AI-Perf for fault tolerance testing.
Expand All @@ -527,11 +599,12 @@ def client(
model: Model name
log_dir: Directory for output logs and AI-Perf artifacts
index: Client index used for round-robin pod selection
requests_per_client: Number of requests to generate
requests_per_client: Number of requests to generate (used if continuous load not enabled)
input_token_length: Number of input tokens per request
output_token_length: Number of output tokens per request
max_retries: Maximum retry attempts for AI-Perf execution
retry_delay: Delay in seconds between retry attempts
continuous_load: If True, use continuous load instead of fixed request count
"""
logger = logging.getLogger(f"CLIENT: {index}")
logging.getLogger("httpx").setLevel(logging.WARNING)
Expand Down Expand Up @@ -578,6 +651,7 @@ def client(
logger=logger,
max_retries=max_retries,
retry_delay=retry_delay,
continuous_load=continuous_load,
)

if not success:
Expand Down
1 change: 1 addition & 0 deletions tests/fault_tolerance/deploy/client_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def get_client_function(client_type: str) -> Callable:
output_token_length,
max_retries,
retry_delay_or_rate, # Differs between implementations
continuous_load,
)

Raises:
Expand Down
13 changes: 13 additions & 0 deletions tests/fault_tolerance/deploy/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ def pytest_addoption(parser):
help="Include tests that require custom builds (e.g., MoE models). "
"By default, these tests are excluded.",
)
parser.addoption(
"--skip-service-restart",
action="store_true",
default=False,
help="Skip restarting NATS and etcd services before deployment. "
"By default, these services are restarted.",
)


def pytest_generate_tests(metafunc):
Expand Down Expand Up @@ -109,3 +116,9 @@ def namespace(request):
def client_type(request):
"""Get client type from command line or use scenario default."""
return request.config.getoption("--client-type")


@pytest.fixture
def skip_service_restart(request):
"""Get skip restart services flag from command line."""
return request.config.getoption("--skip-service-restart")
7 changes: 6 additions & 1 deletion tests/fault_tolerance/deploy/legacy_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ def client(
max_retries,
max_request_rate,
retry_delay=1,
continuous_load=False,
):
"""Legacy custom client for fault tolerance testing.

Expand All @@ -211,7 +212,11 @@ def client(
max_retries: Maximum retry attempts per request
max_request_rate: Maximum requests per second (for rate limiting)
retry_delay: Delay in seconds between retries
continuous_load: If True, use continuous load instead of fixed request count
"""
if continuous_load:
raise ValueError("Continuous load is not supported for legacy client")

logger = logging.getLogger(f"CLIENT: {index}")
logging.getLogger("httpx").setLevel(logging.WARNING)

Expand All @@ -228,7 +233,7 @@ def client(
for i in range(requests_per_client):
# Get available pods
pods = managed_deployment.get_pods(
managed_deployment.frontend_service_name
[managed_deployment.frontend_service_name]
)
port = 0
pod_name = None
Expand Down
Loading
Loading