diff --git a/plugins/AHavenVLMConnector/ahavenvlmconnector.yml b/plugins/AHavenVLMConnector/ahavenvlmconnector.yml index d3d8aaca..d1c66bd2 100644 --- a/plugins/AHavenVLMConnector/ahavenvlmconnector.yml +++ b/plugins/AHavenVLMConnector/ahavenvlmconnector.yml @@ -1,7 +1,7 @@ name: Haven VLM Connector # requires: PythonDepManager description: Tag videos with Vision-Language Models using any OpenAI-compatible VLM endpoint -version: 1.0.3 +version: 1.1.0 url: https://discourse.stashapp.cc/t/haven-vlm-connector/5464 exec: - python diff --git a/plugins/AHavenVLMConnector/haven_vlm_config.py b/plugins/AHavenVLMConnector/haven_vlm_config.py index 1e5bc69a..e6ac0c3b 100644 --- a/plugins/AHavenVLMConnector/haven_vlm_config.py +++ b/plugins/AHavenVLMConnector/haven_vlm_config.py @@ -13,6 +13,7 @@ # VLM Engine Configuration VLM_ENGINE_CONFIG = { "active_ai_models": ["vlm_multiplexer_model"], + "trace_logging": True, "pipelines": { "video_pipeline_dynamic": { "inputs": [ diff --git a/plugins/AHavenVLMConnector/haven_vlm_connector.py b/plugins/AHavenVLMConnector/haven_vlm_connector.py index e8110453..872b3039 100644 --- a/plugins/AHavenVLMConnector/haven_vlm_connector.py +++ b/plugins/AHavenVLMConnector/haven_vlm_connector.py @@ -18,6 +18,7 @@ try: from exit_tracker import install_exit_tracker import stashapi.log as log + install_exit_tracker(log) except ImportError as e: print(f"Warning: exit_tracker not available: {e}") @@ -28,23 +29,23 @@ # Use PythonDepManager for dependency management try: from PythonDepManager import ensure_import - + # Install and ensure all required dependencies with specific versions ensure_import( "stashapi:stashapp-tools==0.2.58", "aiohttp==3.12.13", "pydantic==2.12.5", - "vlm-engine==0.9.4", - "pyyaml==6.0.2" + "vlm-engine==1.0.0", + "pyyaml==6.0.2", ) - + # Import the dependencies after ensuring they're available import stashapi.log as log from stashapi.stashapp import StashInterface import aiohttp import pydantic import yaml - + except ImportError as e: print(f"Failed to import PythonDepManager or required dependencies: {e}") print("Please ensure PythonDepManager is installed and available.") @@ -59,7 +60,9 @@ import haven_vlm_config as config except ModuleNotFoundError: log.error("Please provide a haven_vlm_config.py file with the required variables.") - raise Exception("Please provide a haven_vlm_config.py file with the required variables.") + raise Exception( + "Please provide a haven_vlm_config.py file with the required variables." + ) import haven_media_handler as media_handler import haven_vlm_engine as vlm_engine @@ -78,29 +81,36 @@ # ----------------- Main Execution ----------------- + async def main() -> None: """Main entry point for the plugin""" global semaphore - + # Semaphore initialization logging for hypothesis A - log.debug(f"[DEBUG_HYPOTHESIS_A] Initializing semaphore with limit {config.config.concurrent_task_limit}") - + log.debug( + f"[DEBUG_HYPOTHESIS_A] Initializing semaphore with limit {config.config.concurrent_task_limit}" + ) + semaphore = asyncio.Semaphore(config.config.concurrent_task_limit) - + # Post-semaphore creation logging - log.debug(f"[DEBUG_HYPOTHESIS_A] Semaphore created successfully (limit: {config.config.concurrent_task_limit})") - + log.debug( + f"[DEBUG_HYPOTHESIS_A] Semaphore created successfully (limit: {config.config.concurrent_task_limit})" + ) + json_input = read_json_input() output = {} await run(json_input, output) out = json.dumps(output) print(out + "\n") + def read_json_input() -> Dict[str, Any]: """Read JSON input from stdin""" json_input = sys.stdin.read() return json.loads(json_input) + async def run(json_input: Dict[str, Any], output: Dict[str, Any]) -> None: """Main execution logic""" plugin_args = None @@ -113,7 +123,7 @@ async def run(json_input: Dict[str, Any], output: Dict[str, Any]) -> None: raise try: - plugin_args = json_input['args']["mode"] + plugin_args = json_input["args"]["mode"] except KeyError: pass @@ -129,46 +139,54 @@ async def run(json_input: Dict[str, Any], output: Dict[str, Any]) -> None: collect_incorrect_markers_and_images() output["output"] = "ok" return - + output["output"] = "ok" return + # ----------------- High Level Processing Functions ----------------- + async def tag_videos() -> None: """Tag videos with VLM analysis using improved async orchestration""" global completed_tasks, total_tasks scenes = media_handler.get_tagme_scenes() if not scenes: - log.info("No videos to tag. Have you tagged any scenes with the VLM_TagMe tag to get processed?") + log.info( + "No videos to tag. Have you tagged any scenes with the VLM_TagMe tag to get processed?" + ) return - + total_tasks = len(scenes) completed_tasks = 0 - + video_progress.clear() for scene in scenes: - video_progress[scene.get('id', 'unknown')] = 0.0 + video_progress[scene.get("id", "unknown")] = 0.0 log.progress(0.0) - - log.info(f"🚀 Starting video processing for {total_tasks} scenes with semaphore limit of {config.config.concurrent_task_limit}") + + log.info( + f"🚀 Starting video processing for {total_tasks} scenes with semaphore limit of {config.config.concurrent_task_limit}" + ) # Create tasks with proper indexing for debugging tasks = [] for i, scene in enumerate(scenes): # Pre-task creation logging for hypothesis A (semaphore deadlock) and E (signal termination) - scene_id = scene.get('id') - log.debug(f"[DEBUG_HYPOTHESIS_A] Creating task {i+1}/{total_tasks} for scene {scene_id}, semaphore limit: {config.config.concurrent_task_limit}") - + scene_id = scene.get("id") + log.debug( + f"[DEBUG_HYPOTHESIS_A] Creating task {i + 1}/{total_tasks} for scene {scene_id}, semaphore limit: {config.config.concurrent_task_limit}" + ) + task = asyncio.create_task(__tag_video_with_timing(scene, i)) tasks.append(task) - + # Use asyncio.as_completed to process results as they finish (proves concurrency) completed_task_futures = asyncio.as_completed(tasks) - + batch_start_time = asyncio.get_event_loop().time() - + for completed_task in completed_task_futures: try: await completed_task @@ -178,36 +196,46 @@ async def tag_videos() -> None: completed_tasks += 1 # Exception logging for hypothesis E (signal termination) error_type = type(e).__name__ - log.debug(f"[DEBUG_HYPOTHESIS_E] Task failed with exception: {error_type}: {str(e)} (Task {completed_tasks}/{total_tasks})") + log.debug( + f"[DEBUG_HYPOTHESIS_E] Task failed with exception: {error_type}: {str(e)} (Task {completed_tasks}/{total_tasks})" + ) log.error(f"❌ Task failed: {e}") total_time = asyncio.get_event_loop().time() - batch_start_time - log.info(f"🎉 All {total_tasks} videos completed in {total_time:.2f}s (avg: {total_time/total_tasks:.2f}s/video)") + log.info( + f"🎉 All {total_tasks} videos completed in {total_time:.2f}s (avg: {total_time / total_tasks:.2f}s/video)" + ) log.progress(1.0) + async def find_marker_settings() -> None: """Find optimal marker settings based on a single tagged video""" scenes = media_handler.get_tagme_scenes() if len(scenes) != 1: - log.error("Please tag exactly one scene with the VLM_TagMe tag to get processed.") + log.error( + "Please tag exactly one scene with the VLM_TagMe tag to get processed." + ) return scene = scenes[0] await __find_marker_settings(scene) + def collect_incorrect_markers_and_images() -> None: """Collect data from incorrectly tagged markers and images""" incorrect_images = media_handler.get_incorrect_images() - image_paths, image_ids, temp_files = media_handler.get_image_paths_and_ids(incorrect_images) + image_paths, image_ids, temp_files = media_handler.get_image_paths_and_ids( + incorrect_images + ) incorrect_markers = media_handler.get_incorrect_markers() - + if not (len(incorrect_images) > 0 or len(incorrect_markers) > 0): log.info("No incorrect images or markers to collect.") return - + current_time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") - + try: # Process images image_folder = os.path.join(config.config.output_data_dir, "images") @@ -235,98 +263,130 @@ def collect_incorrect_markers_and_images() -> None: scene_folder = os.path.join(config.config.output_data_dir, "scenes") os.makedirs(scene_folder, exist_ok=True) tag_folders = {} - + for marker in incorrect_markers: - scene_path = marker['scene']['files'][0]['path'] + scene_path = marker["scene"]["files"][0]["path"] if not scene_path: log.error(f"Marker {marker['id']} has no scene path") continue try: - tag_name = marker['primary_tag']['name'] + tag_name = marker["primary_tag"]["name"] if tag_name not in tag_folders: tag_folders[tag_name] = os.path.join(scene_folder, tag_name) os.makedirs(tag_folders[tag_name], exist_ok=True) - media_handler.write_scene_marker_to_file(marker, scene_path, tag_folders[tag_name]) + media_handler.write_scene_marker_to_file( + marker, scene_path, tag_folders[tag_name] + ) except Exception as e: log.error(f"Failed to collect scene: {e}") - + # Remove incorrect tags from images - image_ids = [image['id'] for image in incorrect_images] + image_ids = [image["id"] for image in incorrect_images] media_handler.remove_incorrect_tag_from_images(image_ids) + # ----------------- Low Level Processing Functions ----------------- + async def __tag_video_with_timing(scene: Dict[str, Any], scene_index: int) -> None: """Tag a single video scene with timing diagnostics""" start_time = asyncio.get_event_loop().time() - scene_id = scene.get('id', 'unknown') - + scene_id = scene.get("id", "unknown") + log.info(f"🎬 Starting video {scene_index + 1}: Scene {scene_id}") - + try: await __tag_video(scene) end_time = asyncio.get_event_loop().time() duration = end_time - start_time - log.info(f"✅ Completed video {scene_index + 1} (Scene {scene_id}) in {duration:.2f}s") - + log.info( + f"✅ Completed video {scene_index + 1} (Scene {scene_id}) in {duration:.2f}s" + ) + except Exception as e: end_time = asyncio.get_event_loop().time() duration = end_time - start_time - log.error(f"❌ Failed video {scene_index + 1} (Scene {scene_id}) after {duration:.2f}s: {e}") + log.error( + f"❌ Failed video {scene_index + 1} (Scene {scene_id}) after {duration:.2f}s: {e}" + ) raise + async def __tag_video(scene: Dict[str, Any]) -> None: """Tag a single video scene with semaphore timing instrumentation""" - scene_id = scene.get('id') - + scene_id = scene.get("id") + # Pre-semaphore acquisition logging for hypothesis A (semaphore deadlock) task_start_time = asyncio.get_event_loop().time() acquisition_start_time = task_start_time - log.debug(f"[DEBUG_HYPOTHESIS_A] Task starting for scene {scene_id} at {task_start_time:.3f}s") - + log.debug( + f"[DEBUG_HYPOTHESIS_A] Task starting for scene {scene_id} at {task_start_time:.3f}s" + ) + async with semaphore: try: # Semaphore acquisition successful logging acquisition_end_time = asyncio.get_event_loop().time() acquisition_time = acquisition_end_time - acquisition_start_time - log.debug(f"[DEBUG_HYPOTHESIS_A] Semaphore acquired for scene {scene_id} after {acquisition_time:.3f}s") - + log.debug( + f"[DEBUG_HYPOTHESIS_A] Semaphore acquired for scene {scene_id} after {acquisition_time:.3f}s" + ) + if scene_id is None: log.error("Scene missing 'id' field") return - - files = scene.get('files', []) + + files = scene.get("files", []) if not files: log.error(f"Scene {scene_id} has no files") return - - scene_file = files[0].get('path') + + scene_file = files[0].get("path") if scene_file is None: log.error(f"Scene {scene_id} file has no path") return - + # Check if scene is VR - is_vr = media_handler.is_vr_scene(scene.get('tags', [])) - + is_vr = media_handler.is_vr_scene(scene.get("tags", [])) + def progress_cb(p: int) -> None: global video_progress, total_tasks video_progress[scene_id] = p / 100.0 total_prog = sum(video_progress.values()) / total_tasks + + stats = vlm_engine.vlm_engine.get_performance_stats() + total_frames = stats.get("total_frames_processed", 0) + elapsed_seconds = stats.get("elapsed_time", 0.0) + + log.info(f"[Throughput] total_frames: {total_frames}") + log.info(f"[Throughput] elapsed_seconds: {elapsed_seconds:.2f}") + + if elapsed_seconds > 0: + fpm = (total_frames / elapsed_seconds) * 60.0 + else: + fpm = 0.0 + + log.info(f"[Throughput] calculated_fpm: {fpm:.1f}") + log.info( + f"[Throughput] Frame ~{(p / 100) * 100:.0f}: {fpm:.1f} FPM | progress: {p}%" + ) log.progress(total_prog) - + # Process video through VLM Engine with HTTP timing for hypothesis B processing_start_time = asyncio.get_event_loop().time() - + # HTTP request lifecycle tracking start - log.debug(f"[DEBUG_HYPOTHESIS_B] Starting VLM processing for scene {scene_id}: {scene_file}") - + log.debug( + f"[DEBUG_HYPOTHESIS_B] Starting VLM processing for scene {scene_id}: {scene_file}" + ) + video_result = await vlm_engine.process_video_async( scene_file, vr_video=is_vr, frame_interval=config.config.video_frame_interval, threshold=config.config.video_threshold, return_confidence=config.config.video_confidence_return, - progress_callback=progress_cb + progress_callback=progress_cb, ) # Extract detected tags @@ -337,13 +397,15 @@ def progress_cb(p: int) -> None: # Post-VLM processing logging processing_end_time = asyncio.get_event_loop().time() processing_duration = processing_end_time - processing_start_time - log.debug(f"[DEBUG_HYPOTHESIS_B] VLM processing completed for scene {scene_id} in {processing_duration:.2f}s ({len(detected_tags)} detected tags)") + log.debug( + f"[DEBUG_HYPOTHESIS_B] VLM processing completed for scene {scene_id} in {processing_duration:.2f}s ({len(detected_tags)} detected tags)" + ) if detected_tags: # Clear all existing tags and markers before adding new ones media_handler.clear_all_tags_from_video(scene) media_handler.clear_all_markers_from_video(scene_id) - + # Add tags to scene tag_ids = media_handler.get_tag_ids(list(detected_tags), create=True) media_handler.add_tags_to_video(scene_id, tag_ids) @@ -351,81 +413,91 @@ def progress_cb(p: int) -> None: # Add markers if enabled if config.config.create_markers: - media_handler.add_markers_to_video_from_dict(scene_id, video_result.tag_timespans) + media_handler.add_markers_to_video_from_dict( + scene_id, video_result.tag_timespans + ) log.info(f"Added markers to scene {scene_id}") # Remove VLM_TagMe tag from processed scene media_handler.remove_tagme_tag_from_scene(scene_id) - + # Task completion logging task_end_time = asyncio.get_event_loop().time() total_task_time = task_end_time - task_start_time - log.debug(f"[DEBUG_HYPOTHESIS_A] Task completed for scene {scene_id} in {total_task_time:.2f}s") - + log.debug( + f"[DEBUG_HYPOTHESIS_A] Task completed for scene {scene_id} in {total_task_time:.2f}s" + ) + except Exception as e: # Exception handling with detailed logging for hypothesis E exception_time = asyncio.get_event_loop().time() error_type = type(e).__name__ - log.debug(f"[DEBUG_HYPOTHESIS_E] Task exception for scene {scene_id}: {error_type}: {str(e)} at {exception_time:.3f}s") - - scene_id = scene.get('id', 'unknown') + log.debug( + f"[DEBUG_HYPOTHESIS_E] Task exception for scene {scene_id}: {error_type}: {str(e)} at {exception_time:.3f}s" + ) + + scene_id = scene.get("id", "unknown") log.error(f"Error processing video scene {scene_id}: {e}") # Add error tag to failed scene if we have a valid ID - if scene_id != 'unknown': + if scene_id != "unknown": media_handler.add_error_scene(scene_id) + async def __find_marker_settings(scene: Dict[str, Any]) -> None: """Find optimal marker settings for a scene""" try: - scene_id = scene.get('id') + scene_id = scene.get("id") if scene_id is None: log.error("Scene missing 'id' field") return - - files = scene.get('files', []) + + files = scene.get("files", []) if not files: log.error(f"Scene {scene_id} has no files") return - - scene_file = files[0].get('path') + + scene_file = files[0].get("path") if scene_file is None: log.error(f"Scene {scene_id} file has no path") return - + # Get existing markers for the scene existing_markers = media_handler.get_scene_markers(scene_id) - + # Convert markers to desired timespan format desired_timespan_data = {} for marker in existing_markers: - tag_name = marker['primary_tag']['name'] + tag_name = marker["primary_tag"]["name"] desired_timespan_data[tag_name] = TimeFrame( - start=marker['seconds'], - end=marker.get('end_seconds', marker['seconds'] + 1), - total_confidence=1.0 + start=marker["seconds"], + end=marker.get("end_seconds", marker["seconds"] + 1), + total_confidence=1.0, ) # Find optimal settings optimal_settings = await vlm_engine.find_optimal_marker_settings_async( existing_json={}, # No existing JSON data - desired_timespan_data=desired_timespan_data + desired_timespan_data=desired_timespan_data, ) # Output results log.info(f"Optimal marker settings found for scene {scene_id}:") log.info(json.dumps(optimal_settings, indent=2)) - + except Exception as e: - scene_id = scene.get('id', 'unknown') + scene_id = scene.get("id", "unknown") log.error(f"Error finding marker settings for scene {scene_id}: {e}") + # ----------------- Cleanup ----------------- + async def cleanup() -> None: """Cleanup resources""" if vlm_engine.vlm_engine: await vlm_engine.vlm_engine.shutdown() + # Run main function if script is executed directly if __name__ == "__main__": try: diff --git a/plugins/AHavenVLMConnector/haven_vlm_engine.py b/plugins/AHavenVLMConnector/haven_vlm_engine.py index 7c604655..db9c03ef 100644 --- a/plugins/AHavenVLMConnector/haven_vlm_engine.py +++ b/plugins/AHavenVLMConnector/haven_vlm_engine.py @@ -13,49 +13,57 @@ # Use PythonDepManager for dependency management from vlm_engine import VLMEngine from vlm_engine.config_models import ( - EngineConfig, - PipelineConfig, - ModelConfig, - PipelineModelConfig + EngineConfig, + PipelineConfig, + ModelConfig, + PipelineModelConfig, ) import haven_vlm_config as config # Configure logging -logging.basicConfig(level=logging.CRITICAL) +logging.basicConfig( + level=logging.WARNING, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" +) logger = logging.getLogger(__name__) + @dataclass class TimeFrame: """Represents a time frame with start and end times""" + start: float end: float total_confidence: Optional[float] = None def to_json(self) -> str: """Convert to JSON string""" - return json.dumps({ - "start": self.start, - "end": self.end, - "total_confidence": self.total_confidence - }) + return json.dumps( + { + "start": self.start, + "end": self.end, + "total_confidence": self.total_confidence, + } + ) def __str__(self) -> str: return f"TimeFrame(start={self.start}, end={self.end}, confidence={self.total_confidence})" + @dataclass class VideoTagInfo: """Represents video tagging information""" + video_duration: float video_tags: Dict[str, Set[str]] tag_totals: Dict[str, Dict[str, float]] tag_timespans: Dict[str, Dict[str, List[TimeFrame]]] @classmethod - def from_json(cls, json_data: Dict[str, Any]) -> 'VideoTagInfo': + def from_json(cls, json_data: Dict[str, Any]) -> "VideoTagInfo": """Create VideoTagInfo from JSON data""" logger.debug(f"Creating VideoTagInfo from JSON: {json_data}") - + # Convert tag_timespans to TimeFrame objects tag_timespans = {} for category, tags in json_data.get("tag_timespans", {}).items(): @@ -65,46 +73,65 @@ def from_json(cls, json_data: Dict[str, Any]) -> 'VideoTagInfo': TimeFrame( start=tf["start"], end=tf["end"], - total_confidence=tf.get("total_confidence") - ) for tf in timeframes + total_confidence=tf.get("total_confidence"), + ) + for tf in timeframes ] - + return cls( video_duration=json_data.get("video_duration", 0.0), video_tags=json_data.get("video_tags", {}), tag_totals=json_data.get("tag_totals", {}), - tag_timespans=tag_timespans + tag_timespans=tag_timespans, ) def __str__(self) -> str: return f"VideoTagInfo(duration={self.video_duration}, tags={len(self.video_tags)}, timespans={len(self.tag_timespans)})" + class HavenVLMEngine: """Main VLM Engine integration class""" - + def __init__(self): self.engine: Optional[VLMEngine] = None self.engine_config: Optional[EngineConfig] = None self._initialized = False + def _configure_logging(self) -> None: + """Configure logging levels based on plugin config.""" + vlm_config = config.config.vlm_engine_config + trace_enabled = vlm_config.get("trace_logging", False) + + if trace_enabled: + logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + ) + logging.getLogger("logger").setLevel(logging.DEBUG) + logging.getLogger("multiplexer_llm").setLevel(logging.DEBUG) + logger.debug("Trace logging enabled for vlm-engine and multiplexer-llm") + else: + logger.setLevel(logging.WARNING) + async def initialize(self) -> None: """Initialize the VLM Engine with configuration""" if self._initialized: return try: + self._configure_logging() logger.info("Initializing Haven VLM Engine...") - + # Convert config dict to EngineConfig objects self.engine_config = self._create_engine_config() - + # Create and initialize the engine self.engine = VLMEngine(config=self.engine_config) await self.engine.initialize() - + self._initialized = True logger.info("Haven VLM Engine initialized successfully") - + except Exception as e: logger.error(f"Failed to initialize VLM Engine: {e}") raise @@ -112,24 +139,23 @@ async def initialize(self) -> None: def _create_engine_config(self) -> EngineConfig: """Create EngineConfig from the configuration""" vlm_config = config.config.vlm_engine_config - + # Create pipeline configs pipelines = {} for pipeline_name, pipeline_data in vlm_config["pipelines"].items(): models = [ PipelineModelConfig( - name=model["name"], - inputs=model["inputs"], - outputs=model["outputs"] - ) for model in pipeline_data["models"] + name=model["name"], inputs=model["inputs"], outputs=model["outputs"] + ) + for model in pipeline_data["models"] ] - + pipelines[pipeline_name] = PipelineConfig( inputs=pipeline_data["inputs"], output=pipeline_data["output"], short_name=pipeline_data["short_name"], version=pipeline_data["version"], - models=models + models=models, ) # Create model configs with new architectural changes @@ -141,17 +167,21 @@ def _create_engine_config(self) -> EngineConfig: for endpoint in model_data.get("multiplexer_endpoints", []): # Validate that max_concurrent is present if "max_concurrent" not in endpoint: - raise ValueError(f"Endpoint '{endpoint.get('name', 'unnamed')}' is missing required 'max_concurrent' parameter") - - multiplexer_endpoints.append({ - "base_url": endpoint["base_url"], - "api_key": endpoint.get("api_key", ""), - "name": endpoint["name"], - "weight": endpoint.get("weight", 5), - "is_fallback": endpoint.get("is_fallback", False), - "max_concurrent": endpoint["max_concurrent"] - }) - + raise ValueError( + f"Endpoint '{endpoint.get('name', 'unnamed')}' is missing required 'max_concurrent' parameter" + ) + + multiplexer_endpoints.append( + { + "base_url": endpoint["base_url"], + "api_key": endpoint.get("api_key", ""), + "name": endpoint["name"], + "weight": endpoint.get("weight", 5), + "is_fallback": endpoint.get("is_fallback", False), + "max_concurrent": endpoint["max_concurrent"], + } + ) + models[model_name] = ModelConfig( type=model_data["type"], model_file_name=model_data["model_file_name"], @@ -160,23 +190,26 @@ def _create_engine_config(self) -> EngineConfig: model_identifier=model_data["model_identifier"], model_version=model_data["model_version"], use_multiplexer=model_data.get("use_multiplexer", False), - max_concurrent_requests=model_data.get("max_concurrent_requests", 10), - instance_count=model_data.get("instance_count",1), - max_batch_size=model_data.get("max_batch_size",1), + max_concurrent_requests=model_data.get( + "max_concurrent_requests", 10 + ), + instance_count=model_data.get("instance_count", 1), + max_batch_size=model_data.get("max_batch_size", 1), multiplexer_endpoints=multiplexer_endpoints, - tag_list=model_data.get("tag_list", []) + tag_list=model_data.get("tag_list", []), ) else: models[model_name] = ModelConfig( type=model_data["type"], - model_file_name=model_data["model_file_name"] + model_file_name=model_data["model_file_name"], ) return EngineConfig( active_ai_models=vlm_config["active_ai_models"], pipelines=pipelines, models=models, - category_config=vlm_config["category_config"] + category_config=vlm_config["category_config"], + loglevel="DEBUG" if vlm_config.get("trace_logging", False) else "WARNING", ) async def process_video( @@ -187,7 +220,7 @@ async def process_video( threshold: Optional[float] = None, return_confidence: Optional[bool] = None, existing_json: Optional[Dict[str, Any]] = None, - progress_callback: Optional[Callable[[int], None]] = None + progress_callback: Optional[Callable[[int], None]] = None, ) -> VideoTagInfo: """Process a video using the VLM Engine""" if not self._initialized: @@ -195,41 +228,53 @@ async def process_video( try: logger.info(f"Processing video: {video_path}") - + # Use config defaults if not provided frame_interval = frame_interval or config.config.video_frame_interval threshold = threshold or config.config.video_threshold - return_confidence = return_confidence if return_confidence is not None else config.config.video_confidence_return + return_confidence = ( + return_confidence + if return_confidence is not None + else config.config.video_confidence_return + ) # Process video through the engine results = await self.engine.process_video( video_path, frame_interval=frame_interval, - progress_callback=progress_callback + progress_callback=progress_callback, ) - + logger.info(f"Video processing completed for: {video_path}") logger.debug(f"Raw results structure: {type(results)}") - + # Extract video_tag_info from the nested structure - if isinstance(results, dict) and 'video_tag_info' in results: - video_tag_data = results['video_tag_info'] - logger.debug(f"Using video_tag_info from results: {video_tag_data.keys()}") + if isinstance(results, dict) and "video_tag_info" in results: + video_tag_data = results["video_tag_info"] + logger.debug( + f"Using video_tag_info from results: {video_tag_data.keys()}" + ) else: # Fallback: assume results is already in the correct format video_tag_data = results - logger.debug(f"Using results directly: {video_tag_data.keys() if isinstance(video_tag_data, dict) else type(video_tag_data)}") - + logger.debug( + f"Using results directly: {video_tag_data.keys() if isinstance(video_tag_data, dict) else type(video_tag_data)}" + ) + return VideoTagInfo.from_json(video_tag_data) - + except Exception as e: logger.error(f"Error processing video {video_path}: {e}") raise + def get_performance_stats(self) -> Dict[str, Any]: + """Get performance statistics from the VLM Engine.""" + if not self._initialized or not self.engine: + return {} + return self.engine.get_performance_stats() + async def find_optimal_marker_settings( - self, - existing_json: Dict[str, Any], - desired_timespan_data: Dict[str, TimeFrame] + self, existing_json: Dict[str, Any], desired_timespan_data: Dict[str, TimeFrame] ) -> Dict[str, Any]: """Find optimal marker settings based on existing data""" if not self._initialized: @@ -237,25 +282,24 @@ async def find_optimal_marker_settings( try: logger.info("Finding optimal marker settings...") - + # Convert TimeFrame objects to dict format desired_data = {} for key, timeframe in desired_timespan_data.items(): desired_data[key] = { "start": timeframe.start, "end": timeframe.end, - "total_confidence": timeframe.total_confidence + "total_confidence": timeframe.total_confidence, } # Call the engine's optimization method results = await self.engine.optimize_timeframe_settings( - existing_json_data=existing_json, - desired_timespan_data=desired_data + existing_json_data=existing_json, desired_timespan_data=desired_data ) - + logger.info("Optimal marker settings found") return results - + except Exception as e: logger.error(f"Error finding optimal marker settings: {e}") raise @@ -267,14 +311,16 @@ async def shutdown(self) -> None: # VLMEngine doesn't have a shutdown method, just perform basic cleanup logger.info("VLM Engine cleanup completed") self._initialized = False - + except Exception as e: logger.error(f"Error during VLM Engine cleanup: {e}") self._initialized = False + # Global VLM Engine instance vlm_engine = HavenVLMEngine() + # Convenience functions for backward compatibility async def process_video_async( video_path: str, @@ -283,17 +329,24 @@ async def process_video_async( threshold: Optional[float] = None, return_confidence: Optional[bool] = None, existing_json: Optional[Dict[str, Any]] = None, - progress_callback: Optional[Callable[[int], None]] = None + progress_callback: Optional[Callable[[int], None]] = None, ) -> VideoTagInfo: """Process video asynchronously""" return await vlm_engine.process_video( - video_path, vr_video, frame_interval, threshold, return_confidence, existing_json, - progress_callback=progress_callback + video_path, + vr_video, + frame_interval, + threshold, + return_confidence, + existing_json, + progress_callback=progress_callback, ) + async def find_optimal_marker_settings_async( - existing_json: Dict[str, Any], - desired_timespan_data: Dict[str, TimeFrame] + existing_json: Dict[str, Any], desired_timespan_data: Dict[str, TimeFrame] ) -> Dict[str, Any]: """Find optimal marker settings asynchronously""" - return await vlm_engine.find_optimal_marker_settings(existing_json, desired_timespan_data) + return await vlm_engine.find_optimal_marker_settings( + existing_json, desired_timespan_data + )