Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 43 additions & 5 deletions agents-core/vision_agents/core/agents/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import time
import uuid
from collections import defaultdict
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, List, Optional, TypeGuard
from uuid import uuid4

Expand Down Expand Up @@ -51,6 +52,7 @@
set_call_context,
)
from ..utils.video_forwarder import VideoForwarder
from ..utils.video_track import VideoFileTrack
from . import events
from .conversation import Conversation
from .transcript_buffer import TranscriptBuffer
Expand All @@ -61,6 +63,7 @@
from opentelemetry.trace import Tracer
from opentelemetry.context import Token


if TYPE_CHECKING:
from vision_agents.plugins.getstream.stream_edge_transport import StreamEdge

Expand Down Expand Up @@ -204,12 +207,17 @@ def __init__(
self._interval_task = None
self._callback_executed = False
self._track_tasks: Dict[str, asyncio.Task] = {}

# Track metadata: track_id -> TrackInfo
self._active_video_tracks: Dict[str, TrackInfo] = {}
self._video_forwarders: List[VideoForwarder] = []
self._current_video_track_id: Optional[str] = None
self._connection: Optional[Connection] = None

# Optional local video track override for debugging.
# This track will play instead of any incoming video track.
self._video_track_override_path: Optional[str | Path] = None

# the outgoing audio track
self._audio_track: Optional[OutputAudioTrack] = None

Expand Down Expand Up @@ -868,6 +876,16 @@ async def say(
completed=True,
)

def set_video_track_override_path(self, path: str):
if not path or not self.publish_video:
return

self.logger.warning(
f'🎥 The video will be played from "{path}" instead of the call'
)
# Store the local video track.
self._video_track_override_path = path

async def _consume_incoming_audio(self) -> None:
"""Consumer that continuously processes audio from the queue."""
interval_seconds = 0.02 # 20ms target interval
Expand Down Expand Up @@ -964,6 +982,8 @@ async def _on_track_removed(
):
track = self._active_video_tracks.pop(track_id, None)
if track is not None:
await track.forwarder.stop()
track.track.stop()
await self._on_track_change(track_id)

async def _on_track_change(self, track_id: str):
Expand Down Expand Up @@ -1011,11 +1031,17 @@ async def _on_track_added(
):
return

# Subscribe to the video track, we watch all tracks by default
track = self.edge.add_track_subscriber(track_id)
if not track:
self.logger.error(f"Failed to subscribe to {track_id}")
return
if self._video_track_override_path is not None:
# If local video track is set, we override all other video tracks with it.
# We override tracks instead of simply playing one in order to keep the same lifecycle within the call.
# Otherwise, we'd have a video going on without anybody on the call.
track = await self._get_video_track_override()
else:
# Subscribe to the video track, we watch all tracks by default
track = self.edge.add_track_subscriber(track_id)
if not track:
self.logger.error(f"Failed to subscribe to {track_id}")
return

# Store track metadata
forwarder = VideoForwarder(
Expand Down Expand Up @@ -1326,6 +1352,18 @@ def _truncate_for_logging(self, obj, max_length=200):
obj_str = obj_str[:max_length] + "... (truncated)"
return obj_str

async def _get_video_track_override(self) -> VideoFileTrack:
"""
Create a video track override in async way if the path is set.

Returns: `VideoFileTrack`
"""
if not self._video_track_override_path:
raise ValueError("video_track_override_path is not set")
return await asyncio.to_thread(
lambda p: VideoFileTrack(p), self._video_track_override_path
)


def _is_audio_llm(llm: LLM | VideoLLM | AudioLLM) -> TypeGuard[AudioLLM]:
return isinstance(llm, AudioLLM)
Expand Down
11 changes: 11 additions & 0 deletions agents-core/vision_agents/core/cli/cli_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,20 @@ def cli(launcher: "AgentLauncher") -> None:
default=False,
help="Disable opening the demo UI",
)
@click.option(
"--video-track-override",
type=click.Path(dir_okay=False, exists=True, resolve_path=True),
default=None,
help="Optional local video track override for debugging. "
"This track will play instead of any incoming video track.",
)
def run_agent(
call_type: str,
call_id: Optional[str],
debug: bool,
log_level: str,
no_demo: bool,
video_track_override: Optional[str],
) -> None:
"""Run the agent with the specified configuration."""
# Configure logging
Expand All @@ -95,6 +103,9 @@ async def _run():
try:
# Launch agent with warmup
agent = await launcher.launch(call_type=call_type, call_id=call_id)
if video_track_override:
agent.set_video_track_override_path(video_track_override)

logger.info("✅ Agent warmed up and ready")

# Open demo UI by default
Expand Down
133 changes: 133 additions & 0 deletions agents-core/vision_agents/core/utils/video_track.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
import asyncio
import logging
import time
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import Optional, cast

import av
import av.filter
import av.frame
from aiortc import VideoStreamTrack
from av import VideoFrame
from PIL import Image
from vision_agents.core.utils.video_queue import VideoLatestNQueue

Expand Down Expand Up @@ -88,3 +95,129 @@ def stop(self):
@property
def stopped(self) -> bool:
return self._stopped


class VideoFileTrack(VideoStreamTrack):
"""
A video track reading from a local MP4 file,
filtered to a constant FPS using FFmpeg (30 FPS by default).

Use it for testing and debugging.
"""

def __init__(self, path: str | Path, fps: int = 30):
super().__init__()
self.fps = fps
self.path = Path(path)

self._stopped = False
self._frame_interval = 1.0 / self.fps
self._container = av.open(path)

if not self._container.streams.video:
raise ValueError(f"No video streams found in file: {path}")

self._stream = self._container.streams.video[0]
if self._stream.time_base is None:
raise ValueError("Cannot determine time_base for the video stream")

self._time_base = self._stream.time_base

# Decoder iterator to read the frames
self._decoder = self._container.decode(self._stream)
self._executor = ThreadPoolExecutor(1)
self._set_filter_graph()

def _set_filter_graph(self):
# Safe extraction of sample_aspect_ratio
sar = self._stream.sample_aspect_ratio
if sar is None:
sar_num, sar_den = 1, 1
else:
sar_num, sar_den = sar.numerator, sar.denominator

# Build ffmpeg filter graph to resample video to fixed fps
# Keep the reference to the graph to avoid GC
self._graph = av.filter.Graph()
# Buffer source with all required parameters

self._src = self._graph.add(
"buffer",
f"video_size={self._stream.width}x{self._stream.height}:"
f"pix_fmt={self._stream.pix_fmt}:"
f"time_base={self._time_base.numerator}/{self._time_base.denominator}:"
f"pixel_aspect={sar_num}/{sar_den}",
)

# Add an FPS filter
fps_filter = self._graph.add("fps", f"fps={self.fps}")

# Add a buffer sink
self._sink = self._graph.add("buffersink")

# Connect graph: buffer -> fps filter -> sink
self._src.link_to(fps_filter)
fps_filter.link_to(self._sink)
self._graph.configure()

def _next_frame(self) -> av.VideoFrame:
filtered_frame: Optional[av.VideoFrame] = None
while filtered_frame is None:
# Get the next decoded frame
try:
frame = next(self._decoder)
except StopIteration:
# Loop the video when it ends
self._container.seek(0)
self._decoder = self._container.decode(self._stream)
# Reset the filter graph too
self._set_filter_graph()
frame = next(self._decoder)

# Ensure frame has a time_base (required by buffer source)
frame.time_base = self._time_base

# Push decoded frame into the filter graph
self._src.push(frame)

# Pull filtered frame from buffersink
try:
filtered_frame = cast(av.VideoFrame, self._sink.pull())
except (av.ExitError, av.BlockingIOError):
# Filter graph is not ready to output yet
time.sleep(0.001)
continue
except Exception:
logger.exception(
f'Failed to read a video frame from file "{self.path}"'
)
time.sleep(0.001)
continue

# Convert the filtered video frame to RGB for aiortc
new_frame = filtered_frame.to_rgb()

return new_frame

async def recv(self) -> VideoFrame:
"""
Async method to produce the next filtered video frame.
Loops automatically at the end of the file.
"""
if self._stopped:
raise VideoTrackClosedError("Track stopped")
loop = asyncio.get_running_loop()
frame = await loop.run_in_executor(self._executor, self._next_frame)

# Sleep between frames to simulate real-time playback
await asyncio.sleep(self._frame_interval)
return frame

def stop(self) -> None:
self._stopped = True
self._executor.shutdown(wait=False)
self._container.close()
super(VideoFileTrack, self).stop()

def __repr__(self):
return f'<{self.__class__.__name__} path="{self.path}" fps={self.fps}>'
Loading