diff --git a/agents-core/vision_agents/core/agents/agents.py b/agents-core/vision_agents/core/agents/agents.py index 5b7bb2d0..ab2dbd4c 100644 --- a/agents-core/vision_agents/core/agents/agents.py +++ b/agents-core/vision_agents/core/agents/agents.py @@ -6,6 +6,7 @@ import time import uuid from collections import defaultdict +from pathlib import Path from typing import TYPE_CHECKING, Any, Dict, List, Optional, TypeGuard from uuid import uuid4 @@ -51,6 +52,7 @@ set_call_context, ) from ..utils.video_forwarder import VideoForwarder +from ..utils.video_track import VideoFileTrack from . import events from .conversation import Conversation from .transcript_buffer import TranscriptBuffer @@ -61,6 +63,7 @@ from opentelemetry.trace import Tracer from opentelemetry.context import Token + if TYPE_CHECKING: from vision_agents.plugins.getstream.stream_edge_transport import StreamEdge @@ -204,12 +207,17 @@ def __init__( self._interval_task = None self._callback_executed = False self._track_tasks: Dict[str, asyncio.Task] = {} + # Track metadata: track_id -> TrackInfo self._active_video_tracks: Dict[str, TrackInfo] = {} self._video_forwarders: List[VideoForwarder] = [] self._current_video_track_id: Optional[str] = None self._connection: Optional[Connection] = None + # Optional local video track override for debugging. + # This track will play instead of any incoming video track. + self._video_track_override_path: Optional[str | Path] = None + # the outgoing audio track self._audio_track: Optional[OutputAudioTrack] = None @@ -868,6 +876,16 @@ async def say( completed=True, ) + def set_video_track_override_path(self, path: str): + if not path or not self.publish_video: + return + + self.logger.warning( + f'🎥 The video will be played from "{path}" instead of the call' + ) + # Store the local video track. + self._video_track_override_path = path + async def _consume_incoming_audio(self) -> None: """Consumer that continuously processes audio from the queue.""" interval_seconds = 0.02 # 20ms target interval @@ -964,6 +982,8 @@ async def _on_track_removed( ): track = self._active_video_tracks.pop(track_id, None) if track is not None: + await track.forwarder.stop() + track.track.stop() await self._on_track_change(track_id) async def _on_track_change(self, track_id: str): @@ -1011,11 +1031,17 @@ async def _on_track_added( ): return - # Subscribe to the video track, we watch all tracks by default - track = self.edge.add_track_subscriber(track_id) - if not track: - self.logger.error(f"Failed to subscribe to {track_id}") - return + if self._video_track_override_path is not None: + # If local video track is set, we override all other video tracks with it. + # We override tracks instead of simply playing one in order to keep the same lifecycle within the call. + # Otherwise, we'd have a video going on without anybody on the call. + track = await self._get_video_track_override() + else: + # Subscribe to the video track, we watch all tracks by default + track = self.edge.add_track_subscriber(track_id) + if not track: + self.logger.error(f"Failed to subscribe to {track_id}") + return # Store track metadata forwarder = VideoForwarder( @@ -1326,6 +1352,18 @@ def _truncate_for_logging(self, obj, max_length=200): obj_str = obj_str[:max_length] + "... (truncated)" return obj_str + async def _get_video_track_override(self) -> VideoFileTrack: + """ + Create a video track override in async way if the path is set. + + Returns: `VideoFileTrack` + """ + if not self._video_track_override_path: + raise ValueError("video_track_override_path is not set") + return await asyncio.to_thread( + lambda p: VideoFileTrack(p), self._video_track_override_path + ) + def _is_audio_llm(llm: LLM | VideoLLM | AudioLLM) -> TypeGuard[AudioLLM]: return isinstance(llm, AudioLLM) diff --git a/agents-core/vision_agents/core/cli/cli_runner.py b/agents-core/vision_agents/core/cli/cli_runner.py index ae283bcc..80d4f6a4 100644 --- a/agents-core/vision_agents/core/cli/cli_runner.py +++ b/agents-core/vision_agents/core/cli/cli_runner.py @@ -67,12 +67,20 @@ def cli(launcher: "AgentLauncher") -> None: default=False, help="Disable opening the demo UI", ) + @click.option( + "--video-track-override", + type=click.Path(dir_okay=False, exists=True, resolve_path=True), + default=None, + help="Optional local video track override for debugging. " + "This track will play instead of any incoming video track.", + ) def run_agent( call_type: str, call_id: Optional[str], debug: bool, log_level: str, no_demo: bool, + video_track_override: Optional[str], ) -> None: """Run the agent with the specified configuration.""" # Configure logging @@ -95,6 +103,9 @@ async def _run(): try: # Launch agent with warmup agent = await launcher.launch(call_type=call_type, call_id=call_id) + if video_track_override: + agent.set_video_track_override_path(video_track_override) + logger.info("✅ Agent warmed up and ready") # Open demo UI by default diff --git a/agents-core/vision_agents/core/utils/video_track.py b/agents-core/vision_agents/core/utils/video_track.py index a29369f5..04b95602 100644 --- a/agents-core/vision_agents/core/utils/video_track.py +++ b/agents-core/vision_agents/core/utils/video_track.py @@ -1,8 +1,15 @@ import asyncio import logging +import time +from concurrent.futures import ThreadPoolExecutor +from pathlib import Path +from typing import Optional, cast import av +import av.filter +import av.frame from aiortc import VideoStreamTrack +from av import VideoFrame from PIL import Image from vision_agents.core.utils.video_queue import VideoLatestNQueue @@ -88,3 +95,129 @@ def stop(self): @property def stopped(self) -> bool: return self._stopped + + +class VideoFileTrack(VideoStreamTrack): + """ + A video track reading from a local MP4 file, + filtered to a constant FPS using FFmpeg (30 FPS by default). + + Use it for testing and debugging. + """ + + def __init__(self, path: str | Path, fps: int = 30): + super().__init__() + self.fps = fps + self.path = Path(path) + + self._stopped = False + self._frame_interval = 1.0 / self.fps + self._container = av.open(path) + + if not self._container.streams.video: + raise ValueError(f"No video streams found in file: {path}") + + self._stream = self._container.streams.video[0] + if self._stream.time_base is None: + raise ValueError("Cannot determine time_base for the video stream") + + self._time_base = self._stream.time_base + + # Decoder iterator to read the frames + self._decoder = self._container.decode(self._stream) + self._executor = ThreadPoolExecutor(1) + self._set_filter_graph() + + def _set_filter_graph(self): + # Safe extraction of sample_aspect_ratio + sar = self._stream.sample_aspect_ratio + if sar is None: + sar_num, sar_den = 1, 1 + else: + sar_num, sar_den = sar.numerator, sar.denominator + + # Build ffmpeg filter graph to resample video to fixed fps + # Keep the reference to the graph to avoid GC + self._graph = av.filter.Graph() + # Buffer source with all required parameters + + self._src = self._graph.add( + "buffer", + f"video_size={self._stream.width}x{self._stream.height}:" + f"pix_fmt={self._stream.pix_fmt}:" + f"time_base={self._time_base.numerator}/{self._time_base.denominator}:" + f"pixel_aspect={sar_num}/{sar_den}", + ) + + # Add an FPS filter + fps_filter = self._graph.add("fps", f"fps={self.fps}") + + # Add a buffer sink + self._sink = self._graph.add("buffersink") + + # Connect graph: buffer -> fps filter -> sink + self._src.link_to(fps_filter) + fps_filter.link_to(self._sink) + self._graph.configure() + + def _next_frame(self) -> av.VideoFrame: + filtered_frame: Optional[av.VideoFrame] = None + while filtered_frame is None: + # Get the next decoded frame + try: + frame = next(self._decoder) + except StopIteration: + # Loop the video when it ends + self._container.seek(0) + self._decoder = self._container.decode(self._stream) + # Reset the filter graph too + self._set_filter_graph() + frame = next(self._decoder) + + # Ensure frame has a time_base (required by buffer source) + frame.time_base = self._time_base + + # Push decoded frame into the filter graph + self._src.push(frame) + + # Pull filtered frame from buffersink + try: + filtered_frame = cast(av.VideoFrame, self._sink.pull()) + except (av.ExitError, av.BlockingIOError): + # Filter graph is not ready to output yet + time.sleep(0.001) + continue + except Exception: + logger.exception( + f'Failed to read a video frame from file "{self.path}"' + ) + time.sleep(0.001) + continue + + # Convert the filtered video frame to RGB for aiortc + new_frame = filtered_frame.to_rgb() + + return new_frame + + async def recv(self) -> VideoFrame: + """ + Async method to produce the next filtered video frame. + Loops automatically at the end of the file. + """ + if self._stopped: + raise VideoTrackClosedError("Track stopped") + loop = asyncio.get_running_loop() + frame = await loop.run_in_executor(self._executor, self._next_frame) + + # Sleep between frames to simulate real-time playback + await asyncio.sleep(self._frame_interval) + return frame + + def stop(self) -> None: + self._stopped = True + self._executor.shutdown(wait=False) + self._container.close() + super(VideoFileTrack, self).stop() + + def __repr__(self): + return f'<{self.__class__.__name__} path="{self.path}" fps={self.fps}>'