Skip to content

Commit ad6e0e1

Browse files
committed
Add video_track_override_path to Agent to play video from local files
1 parent 210b64d commit ad6e0e1

File tree

2 files changed

+147
-5
lines changed

2 files changed

+147
-5
lines changed

agents-core/vision_agents/core/agents/agents.py

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import time
77
import uuid
88
from collections import defaultdict
9+
from pathlib import Path
910
from typing import TYPE_CHECKING, Any, Dict, List, Optional, TypeGuard
1011
from uuid import uuid4
1112

@@ -51,6 +52,7 @@
5152
set_call_context,
5253
)
5354
from ..utils.video_forwarder import VideoForwarder
55+
from ..utils.video_track import VideoFileTrack
5456
from . import events
5557
from .conversation import Conversation
5658
from .transcript_buffer import TranscriptBuffer
@@ -125,6 +127,7 @@ def __init__(
125127
options: Optional[AgentOptions] = None,
126128
tracer: Tracer = trace.get_tracer("agents"),
127129
profiler: Optional[Profiler] = None,
130+
video_track_override_path: Optional[str | Path] = None,
128131
):
129132
self._pending_turn: Optional[LLMTurn] = None
130133
self.participants: Optional[ParticipantsState] = None
@@ -204,12 +207,23 @@ def __init__(
204207
self._interval_task = None
205208
self._callback_executed = False
206209
self._track_tasks: Dict[str, asyncio.Task] = {}
210+
207211
# Track metadata: track_id -> TrackInfo
208212
self._active_video_tracks: Dict[str, TrackInfo] = {}
209213
self._video_forwarders: List[VideoForwarder] = []
210214
self._current_video_track_id: Optional[str] = None
211215
self._connection: Optional[Connection] = None
212216

217+
# Optional local video track override for debugging.
218+
# This track will play instead of any incoming video track.
219+
self._video_track_override: Optional[VideoFileTrack] = None
220+
if video_track_override_path:
221+
logger.warning(
222+
f'🎥 The video will be played from "{video_track_override_path}" instead of the call'
223+
)
224+
# Store the local video track.
225+
self._video_track_override = VideoFileTrack(video_track_override_path)
226+
213227
# the outgoing audio track
214228
self._audio_track: Optional[OutputAudioTrack] = None
215229

@@ -1011,11 +1025,17 @@ async def _on_track_added(
10111025
):
10121026
return
10131027

1014-
# Subscribe to the video track, we watch all tracks by default
1015-
track = self.edge.add_track_subscriber(track_id)
1016-
if not track:
1017-
self.logger.error(f"Failed to subscribe to {track_id}")
1018-
return
1028+
if self._video_track_override is not None:
1029+
# If local video track is set, we override all other video tracks with it.
1030+
# We override tracks instead of simply playing one in order to keep the same lifecycle within the call.
1031+
# Otherwise, we'd have a video going on without anybody on the call.
1032+
track = self._video_track_override
1033+
else:
1034+
# Subscribe to the video track, we watch all tracks by default
1035+
track = self.edge.add_track_subscriber(track_id)
1036+
if not track:
1037+
self.logger.error(f"Failed to subscribe to {track_id}")
1038+
return
10191039

10201040
# Store track metadata
10211041
forwarder = VideoForwarder(

agents-core/vision_agents/core/utils/video_track.py

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,15 @@
11
import asyncio
22
import logging
3+
import time
4+
from concurrent.futures import ThreadPoolExecutor
5+
from pathlib import Path
6+
from typing import Optional, cast
37

48
import av
9+
import av.filter
10+
import av.frame
511
from aiortc import VideoStreamTrack
12+
from av import VideoFrame
613
from PIL import Image
714
from vision_agents.core.utils.video_queue import VideoLatestNQueue
815

@@ -88,3 +95,118 @@ def stop(self):
8895
@property
8996
def stopped(self) -> bool:
9097
return self._stopped
98+
99+
100+
class VideoFileTrack(VideoStreamTrack):
101+
"""
102+
A video track reading from a local MP4 file,
103+
filtered to a constant FPS using FFmpeg (30 FPS by default).
104+
105+
Use it for testing and debugging.
106+
"""
107+
108+
def __init__(self, path: str | Path, fps: int = 30):
109+
super().__init__()
110+
self.fps = fps
111+
self.path = Path(path)
112+
113+
self._stopped = False
114+
self._container = av.open(path)
115+
self._stream = self._container.streams.video[0]
116+
if self._stream.time_base is None:
117+
raise ValueError("Cannot determine time_base for the video stream")
118+
119+
self._time_base = self._stream.time_base
120+
121+
# Decoder iterator to read the frames
122+
self._decoder = self._container.decode(self._stream)
123+
self._executor = ThreadPoolExecutor(1)
124+
self._set_filter_graph()
125+
126+
def _set_filter_graph(self):
127+
# Safe extraction of sample_aspect_ratio
128+
sar = self._stream.sample_aspect_ratio
129+
if sar is None:
130+
sar_num, sar_den = 1, 1
131+
else:
132+
sar_num, sar_den = sar.numerator, sar.denominator
133+
134+
# Build ffmpeg filter graph to resample video to fixed fps
135+
# Keep the reference to the graph to avoid GC
136+
self._graph = av.filter.Graph()
137+
# Buffer source with all required parameters
138+
139+
self._src = self._graph.add(
140+
"buffer",
141+
f"video_size={self._stream.width}x{self._stream.height}:"
142+
f"pix_fmt={self._stream.pix_fmt}:"
143+
f"time_base={self._time_base.numerator}/{self._time_base.denominator}:"
144+
f"pixel_aspect={sar_num}/{sar_den}",
145+
)
146+
147+
# Add an FPS filter
148+
fps_filter = self._graph.add("fps", f"fps={self.fps}")
149+
150+
# Add a buffer sink
151+
self._sink = self._graph.add("buffersink")
152+
153+
# Connect graph: buffer -> fps filter -> sink
154+
self._src.link_to(fps_filter)
155+
fps_filter.link_to(self._sink)
156+
self._graph.configure()
157+
158+
def _next_frame(self) -> av.VideoFrame:
159+
filtered_frame: Optional[av.VideoFrame] = None
160+
while filtered_frame is None:
161+
# Get the next decoded frame
162+
try:
163+
frame = next(self._decoder)
164+
except StopIteration:
165+
# Loop the video when it ends
166+
self._container.seek(0)
167+
self._decoder = self._container.decode(self._stream)
168+
# Reset the filter graph too
169+
self._set_filter_graph()
170+
frame = next(self._decoder)
171+
172+
# Ensure frame has a time_base (required by buffer source)
173+
frame.time_base = self._time_base
174+
175+
# Push decoded frame into the filter graph
176+
self._src.push(frame)
177+
178+
# Pull filtered frame from buffersink
179+
try:
180+
filtered_frame = cast(av.VideoFrame, self._sink.pull())
181+
except (av.ExitError, av.BlockingIOError):
182+
# Filter graph is not ready to output yet
183+
time.sleep(0.001)
184+
continue
185+
except Exception:
186+
logger.exception("Failed to read a frame from video file")
187+
continue
188+
189+
# Convert the filtered video frame to RGB for aiortc
190+
new_frame = filtered_frame.to_rgb()
191+
192+
return new_frame
193+
194+
async def recv(self) -> VideoFrame:
195+
"""
196+
Async method to produce the next filtered video frame.
197+
Loops automatically at the end of the file.
198+
"""
199+
if self._stopped:
200+
raise VideoTrackClosedError("Track stopped")
201+
loop = asyncio.get_running_loop()
202+
frame = await loop.run_in_executor(self._executor, self._next_frame)
203+
# Sleep between frames to let other coroutines to run
204+
await asyncio.sleep(float(frame.time_base))
205+
return frame
206+
207+
def stop(self) -> None:
208+
self._stopped = True
209+
self._executor.shutdown(wait=False)
210+
211+
def __repr__(self):
212+
return f'<{self.__class__.__name__} path="{self.path}" fps={self.fps}>'

0 commit comments

Comments
 (0)