-
Notifications
You must be signed in to change notification settings - Fork 737
test: add ci tests for lora (agg, router: on/off) #4817
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
27a1b4e
feat: enable kv aware routing for loras
biswapanda 42c043a
tests: add test cases for lora agg and agg router
biswapanda 0651568
fix
biswapanda 4f6394d
Merge branch 'main' into bis/dep-681-add-agg-lora-tests
biswapanda 31163f7
Merge branch 'main' into bis/dep-681-add-agg-lora-tests
biswapanda 2103b3b
add lifecycle markers
biswapanda a529108
mv to lora_utils
biswapanda 05d0db4
fix mypy issues
biswapanda d68b1eb
Merge branch 'main' into bis/dep-681-add-agg-lora-tests
biswapanda File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,274 @@ | ||
| # SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
|
|
||
| import logging | ||
| import os | ||
| import shutil | ||
| import subprocess | ||
| import tempfile | ||
| import time | ||
| from dataclasses import dataclass | ||
| from typing import Optional | ||
|
|
||
| import requests | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| # LoRA testing constants | ||
| MINIO_ENDPOINT = "http://localhost:9000" | ||
| MINIO_ACCESS_KEY = "minioadmin" | ||
| MINIO_SECRET_KEY = "minioadmin" | ||
| MINIO_BUCKET = "my-loras" | ||
| DEFAULT_LORA_REPO = "codelion/Qwen3-0.6B-accuracy-recovery-lora" | ||
| DEFAULT_LORA_NAME = "codelion/Qwen3-0.6B-accuracy-recovery-lora" | ||
|
|
||
|
|
||
| @dataclass | ||
| class MinioLoraConfig: | ||
| """Configuration for MinIO and LoRA setup""" | ||
|
|
||
| endpoint: str = MINIO_ENDPOINT | ||
| access_key: str = MINIO_ACCESS_KEY | ||
| secret_key: str = MINIO_SECRET_KEY | ||
| bucket: str = MINIO_BUCKET | ||
| lora_repo: str = DEFAULT_LORA_REPO | ||
| lora_name: str = DEFAULT_LORA_NAME | ||
| data_dir: Optional[str] = None | ||
|
|
||
| def get_s3_uri(self) -> str: | ||
| """Get the S3 URI for the LoRA adapter""" | ||
| return f"s3://{self.bucket}/{self.lora_name}" | ||
|
|
||
| def get_env_vars(self) -> dict: | ||
| """Get environment variables for AWS/MinIO access""" | ||
| return { | ||
| "AWS_ENDPOINT": self.endpoint, | ||
| "AWS_ACCESS_KEY_ID": self.access_key, | ||
| "AWS_SECRET_ACCESS_KEY": self.secret_key, | ||
| "AWS_REGION": "us-east-1", | ||
| "AWS_ALLOW_HTTP": "true", | ||
| "DYN_LORA_ENABLED": "true", | ||
| "DYN_LORA_PATH": "/tmp/dynamo_loras_minio_test", | ||
| } | ||
|
|
||
|
|
||
| class MinioService: | ||
| """Manages MinIO Docker container lifecycle for tests""" | ||
|
|
||
| CONTAINER_NAME = "dynamo-minio-test" | ||
|
|
||
| def __init__(self, config: MinioLoraConfig): | ||
| self.config = config | ||
| self._logger = logging.getLogger(self.__class__.__name__) | ||
| self._temp_dir: Optional[str] = None | ||
|
|
||
| def start(self) -> None: | ||
| """Start MinIO container""" | ||
| self._logger.info("Starting MinIO container...") | ||
|
|
||
| # Create data directory | ||
| if self.config.data_dir: | ||
| data_dir = self.config.data_dir | ||
| else: | ||
| data_dir = tempfile.mkdtemp(prefix="minio_test_") | ||
| self.config.data_dir = data_dir | ||
|
|
||
| # Stop existing container if running | ||
| self.stop() | ||
|
|
||
| # Start MinIO container | ||
| cmd = [ | ||
| "docker", | ||
| "run", | ||
| "-d", | ||
| "--name", | ||
| self.CONTAINER_NAME, | ||
| "-p", | ||
| "9000:9000", | ||
| "-p", | ||
| "9001:9001", | ||
| "-v", | ||
| f"{data_dir}:/data", | ||
| "quay.io/minio/minio", | ||
| "server", | ||
| "/data", | ||
| "--console-address", | ||
| ":9001", | ||
| ] | ||
|
|
||
| result = subprocess.run(cmd, capture_output=True, text=True) | ||
| if result.returncode != 0: | ||
| raise RuntimeError(f"Failed to start MinIO: {result.stderr}") | ||
|
|
||
| # Wait for MinIO to be ready | ||
| self._wait_for_ready() | ||
| self._logger.info("MinIO started successfully") | ||
|
|
||
| def _wait_for_ready(self, timeout: int = 30) -> None: | ||
| """Wait for MinIO to be ready""" | ||
| health_url = f"{self.config.endpoint}/minio/health/live" | ||
| start_time = time.time() | ||
|
|
||
| while time.time() - start_time < timeout: | ||
| try: | ||
| response = requests.get(health_url, timeout=2) | ||
| if response.status_code == 200: | ||
| return | ||
| except requests.RequestException: | ||
| pass | ||
| time.sleep(1) | ||
|
|
||
| raise RuntimeError(f"MinIO did not become ready within {timeout}s") | ||
|
|
||
| def stop(self) -> None: | ||
| """Stop and remove MinIO container""" | ||
| self._logger.info("Stopping MinIO container...") | ||
|
|
||
| # Stop container | ||
| subprocess.run( | ||
| ["docker", "stop", self.CONTAINER_NAME], | ||
| capture_output=True, | ||
| ) | ||
|
|
||
| # Remove container | ||
| subprocess.run( | ||
| ["docker", "rm", self.CONTAINER_NAME], | ||
| capture_output=True, | ||
| ) | ||
|
|
||
| def create_bucket(self) -> None: | ||
| """Create the S3 bucket using AWS CLI""" | ||
| env = os.environ.copy() | ||
| env.update( | ||
| { | ||
| "AWS_ACCESS_KEY_ID": self.config.access_key, | ||
| "AWS_SECRET_ACCESS_KEY": self.config.secret_key, | ||
| } | ||
| ) | ||
|
|
||
| # Check if bucket exists | ||
| result = subprocess.run( | ||
| [ | ||
| "aws", | ||
| "--endpoint-url", | ||
| self.config.endpoint, | ||
| "s3", | ||
| "ls", | ||
| f"s3://{self.config.bucket}", | ||
| ], | ||
| capture_output=True, | ||
| text=True, | ||
| env=env, | ||
| ) | ||
|
|
||
| if result.returncode != 0: | ||
| # Create bucket | ||
| self._logger.info(f"Creating bucket: {self.config.bucket}") | ||
| result = subprocess.run( | ||
| [ | ||
| "aws", | ||
| "--endpoint-url", | ||
| self.config.endpoint, | ||
| "s3", | ||
| "mb", | ||
| f"s3://{self.config.bucket}", | ||
| ], | ||
| capture_output=True, | ||
| text=True, | ||
| env=env, | ||
| ) | ||
| if result.returncode != 0: | ||
| raise RuntimeError(f"Failed to create bucket: {result.stderr}") | ||
|
|
||
| def download_lora(self) -> str: | ||
| """Download LoRA from Hugging Face Hub, returns temp directory path""" | ||
| self._temp_dir = tempfile.mkdtemp(prefix="lora_download_") | ||
| self._logger.info( | ||
| f"Downloading LoRA {self.config.lora_repo} to {self._temp_dir}" | ||
| ) | ||
|
|
||
| result = subprocess.run( | ||
| [ | ||
| "huggingface-cli", | ||
| "download", | ||
| self.config.lora_repo, | ||
| "--local-dir", | ||
| self._temp_dir, | ||
| "--local-dir-use-symlinks", | ||
| "False", | ||
| ], | ||
| capture_output=True, | ||
| text=True, | ||
| ) | ||
|
|
||
| if result.returncode != 0: | ||
| raise RuntimeError(f"Failed to download LoRA: {result.stderr}") | ||
|
|
||
| # Clean up cache directory | ||
| cache_dir = os.path.join(self._temp_dir, ".cache") | ||
| if os.path.exists(cache_dir): | ||
| shutil.rmtree(cache_dir) | ||
|
|
||
| return self._temp_dir | ||
|
|
||
| def upload_lora(self, local_path: str) -> None: | ||
| """Upload LoRA to MinIO""" | ||
| self._logger.info( | ||
| f"Uploading LoRA to s3://{self.config.bucket}/{self.config.lora_name}" | ||
| ) | ||
|
|
||
| env = os.environ.copy() | ||
| env.update( | ||
| { | ||
| "AWS_ACCESS_KEY_ID": self.config.access_key, | ||
| "AWS_SECRET_ACCESS_KEY": self.config.secret_key, | ||
| } | ||
| ) | ||
|
|
||
| result = subprocess.run( | ||
| [ | ||
| "aws", | ||
| "--endpoint-url", | ||
| self.config.endpoint, | ||
| "s3", | ||
| "sync", | ||
| local_path, | ||
| f"s3://{self.config.bucket}/{self.config.lora_name}", | ||
| "--exclude", | ||
| "*.git*", | ||
| ], | ||
| capture_output=True, | ||
| text=True, | ||
| env=env, | ||
| ) | ||
|
|
||
| if result.returncode != 0: | ||
| raise RuntimeError(f"Failed to upload LoRA: {result.stderr}") | ||
|
|
||
| def cleanup_temp(self) -> None: | ||
| """Clean up temporary directories""" | ||
| if self._temp_dir and os.path.exists(self._temp_dir): | ||
| shutil.rmtree(self._temp_dir) | ||
| self._temp_dir = None | ||
|
|
||
| if self.config.data_dir and os.path.exists(self.config.data_dir): | ||
| shutil.rmtree(self.config.data_dir, ignore_errors=True) | ||
|
|
||
|
|
||
| def load_lora_adapter( | ||
| system_port: int, lora_name: str, s3_uri: str, timeout: int = 60 | ||
| ) -> None: | ||
| """Load a LoRA adapter via the system API""" | ||
| url = f"http://localhost:{system_port}/v1/loras" | ||
| payload = {"lora_name": lora_name, "source": {"uri": s3_uri}} | ||
|
|
||
| logger.info(f"Loading LoRA adapter: {lora_name} from {s3_uri}") | ||
|
|
||
| response = requests.post(url, json=payload, timeout=timeout) | ||
| if response.status_code != 200: | ||
| raise RuntimeError( | ||
| f"Failed to load LoRA adapter: {response.status_code} - {response.text}" | ||
| ) | ||
|
|
||
| logger.info(f"LoRA adapter loaded successfully: {response.json()}") | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.