Skip to content

Commit a3a0276

Browse files
committed
fixed the inbound example
1 parent cf5037a commit a3a0276

File tree

5 files changed

+111
-136
lines changed

5 files changed

+111
-136
lines changed

examples/03_phone_and_rag_example/inbound_phone_and_rag_example.py

Lines changed: 51 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -8,37 +8,24 @@
88
- "turbopuffer": Uses TurboPuffer + LangChain with function calling
99
1010
Flow:
11-
1. Twilio triggers webhook for phone number on /twilio/voice
12-
2. Start a bi directional stream using start.stream which goes to /twilio/media
13-
3. Start a call on Stream's edge network
14-
4. Create a participant for the phone call and join the call
15-
5. Create the AI, and have the AI join the call
16-
17-
Notes: Twilio uses ulaw audio encoding at 8kHz.
18-
19-
TODO/ to fix:
20-
- Things should prep when creating the call in the voice endpoint
21-
- Auth for stream endpoint
22-
- Ulaw audio bugs
23-
- Frankfurt connection bug
24-
- Study best practices for Gemini RAG
25-
- Study Turbopuffer Rag
26-
- Add an outbound calling example
27-
- See if there is a nicer diff approach to rag indexing
28-
- Write docs about Rag
11+
1. Twilio triggers webhook on /twilio/voice, which starts preparing the call
12+
2. Start a bi-directional stream using start.stream which goes to /twilio/media
13+
3. When media stream connects, await the prepared call and attach the phone user
14+
4. Run the agent session until the call ends
15+
16+
Notes: Twilio uses mulaw audio encoding at 8kHz.
2917
"""
3018
import asyncio
3119
import logging
3220
import os
21+
import traceback
22+
import uuid
3323
from pathlib import Path
3424

3525
import uvicorn
3626
from dotenv import load_dotenv
37-
from fastapi import Depends, FastAPI, WebSocket
38-
from getstream.video import rtc
39-
from getstream.video.rtc.pb.stream.video.sfu.models.models_pb2 import TrackType
40-
from getstream.video.rtc.track_util import PcmData
41-
from getstream.video.rtc.tracks import SubscriptionConfig, TrackSubscriptionConfig
27+
from fastapi import Depends, FastAPI, Request, WebSocket
28+
from fastapi.responses import JSONResponse
4229

4330
from vision_agents.core import User, Agent
4431
from vision_agents.plugins import getstream, gemini, twilio, elevenlabs, deepgram
@@ -62,54 +49,69 @@
6249
app = FastAPI()
6350
call_registry = twilio.TwilioCallRegistry()
6451

65-
"""
66-
Twilio call webhook points here. Signature is validated and we start the media stream
67-
"""
52+
53+
@app.exception_handler(Exception)
54+
async def global_exception_handler(request: Request, exc: Exception):
55+
logger.error(f"Unhandled exception: {exc}\n{traceback.format_exc()}")
56+
return JSONResponse(status_code=500, content={"detail": str(exc)})
57+
58+
6859
@app.post("/twilio/voice")
6960
async def twilio_voice_webhook(
7061
_: None = Depends(twilio.verify_twilio_signature),
7162
data: twilio.CallWebhookInput = Depends(twilio.CallWebhookInput.as_form),
7263
):
73-
url = f"wss://{NGROK_URL}/twilio/media/{data.call_sid}"
74-
logger.info(f"📞 Call from {data.caller} ({data.caller_city or 'unknown location'}) forwarding to {url}")
64+
"""Twilio call webhook. Validates signature and starts the media stream."""
65+
logger.info(f"📞 Call from {data.caller} ({data.caller_city or 'unknown location'})")
66+
call_id = str(uuid.uuid4())
7567

76-
call_registry.create(data.call_sid, data)
68+
async def prepare_call():
69+
agent = await create_agent()
70+
await agent.create_user()
71+
72+
phone_number = data.from_number or "unknown"
73+
sanitized_number = phone_number.replace("+", "").replace(" ", "").replace("(", "").replace(")", "")
74+
phone_user = User(name=f"Call from {phone_number}", id=f"phone-{sanitized_number}")
75+
await agent.edge.create_user(user=phone_user)
76+
77+
stream_call = await agent.create_call("default", call_id=call_id)
78+
agent_session = await agent.join(stream_call, wait_for_participant=False)
79+
return agent, phone_user, stream_call, agent_session
80+
81+
twilio_call = call_registry.create(call_id, data, prepare=prepare_call)
82+
url = f"wss://{NGROK_URL}/twilio/media/{call_id}/{twilio_call.token}"
83+
logger.info("twilio redirect to %s", url)
7784

7885
return twilio.create_media_stream_response(url)
7986

8087

81-
"""
82-
Twilio media stream endpoint
83-
"""
84-
@app.websocket("/twilio/media/{call_sid}")
85-
async def media_stream(websocket: WebSocket, call_sid: str):
88+
@app.websocket("/twilio/media/{call_id}/{token}")
89+
async def media_stream(websocket: WebSocket, call_id: str, token: str):
8690
"""Receive real-time audio stream from Twilio."""
87-
twilio_call = call_registry.require(call_sid)
91+
twilio_call = call_registry.validate(call_id, token)
8892

89-
logger.info(f"🔗 Media stream connecting for {twilio_call.caller} from {twilio_call.caller_city or 'unknown location'}")
93+
logger.info(f"🔗 Media stream connected for {twilio_call.caller}")
9094

9195
twilio_stream = twilio.TwilioMediaStream(websocket)
9296
await twilio_stream.accept()
9397
twilio_call.twilio_stream = twilio_stream
9498

9599
try:
96-
agent = await create_agent()
97-
await agent.create_user()
98-
99-
phone_number = twilio_call.from_number or "unknown"
100-
sanitized_number = phone_number.replace("+", "").replace(" ", "").replace("(", "").replace(")", "")
101-
phone_user = User(name=f"Call from {phone_number}", id=f"phone-{sanitized_number}")
102-
await agent.edge.create_user(user=phone_user)
103-
104-
stream_call = await agent.create_call("default", call_sid)
100+
agent, phone_user, stream_call, agent_session = await twilio_call.await_prepare()
105101
twilio_call.stream_call = stream_call
106102

107-
await join_call(agent, stream_call, twilio_stream, phone_user)
103+
await twilio.attach_phone_to_call(stream_call, twilio_stream, phone_user.id)
104+
105+
with agent_session:
106+
await agent.llm.simple_response(
107+
text="Greet the caller warmly and ask what kind of app they're building. Use your knowledge base to provide relevant product recommendations."
108+
)
109+
await twilio_stream.run()
108110
finally:
109-
call_registry.remove(call_sid)
111+
call_registry.remove(call_id)
110112

111113

112-
async def startup_event():
114+
async def create_rag_knowledge():
113115
"""Initialize the RAG backend based on RAG_BACKEND environment variable."""
114116
global file_search_store, rag
115117

@@ -197,38 +199,7 @@ async def search_knowledge(query: str) -> str:
197199
)
198200

199201

200-
# =============================================================================
201-
# Call Handling
202-
# =============================================================================
203-
204-
205-
async def join_call(
206-
agent: Agent, call, twilio_stream: twilio.TwilioMediaStream, phone_user: User
207-
) -> None:
208-
"""Join a call and bridge audio between Twilio and Stream."""
209-
subscription_config = SubscriptionConfig(
210-
default=TrackSubscriptionConfig(track_types=[TrackType.TRACK_TYPE_AUDIO])
211-
)
212-
213-
connection = await rtc.join(call, phone_user.id, subscription_config=subscription_config)
214-
215-
@connection.on("audio")
216-
async def on_audio_received(pcm: PcmData):
217-
await twilio_stream.send_audio(pcm)
218-
219-
await connection.__aenter__()
220-
await connection.add_tracks(audio=twilio_stream.audio_track, video=None)
221-
222-
logger.info(f"{phone_user.name} joined the call, agent is joining next")
223-
224-
with await agent.join(call):
225-
await agent.llm.simple_response(
226-
text="Greet the caller warmly and ask what kind of app they're building. Use your knowledge base to provide relevant product recommendations."
227-
)
228-
await twilio_stream.run()
229-
230-
231202
if __name__ == "__main__":
232-
asyncio.run(startup_event())
203+
asyncio.run(create_rag_knowledge())
233204
logger.info(f"Starting with RAG_BACKEND={RAG_BACKEND}")
234205
uvicorn.run(app, host="localhost", port=8000)

examples/03_phone_and_rag_example/outbound_phone_example.py

Lines changed: 2 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,10 @@
77
import uvicorn
88
from dotenv import load_dotenv
99
from fastapi import FastAPI, WebSocket
10-
from getstream.video import rtc
11-
from getstream.video.rtc import PcmData
12-
from getstream.video.rtc.pb.stream.video.sfu.models.models_pb2 import TrackType
13-
from getstream.video.rtc.tracks import SubscriptionConfig, TrackSubscriptionConfig
1410
from twilio.rest import Client
1511

1612
from vision_agents.core import Agent, User
17-
from vision_agents.plugins import deepgram, elevenlabs, gemini, getstream, twilio, openai
13+
from vision_agents.plugins import gemini, getstream, twilio
1814

1915
load_dotenv()
2016

@@ -81,7 +77,7 @@ async def media_stream(websocket: WebSocket, call_sid: str, token: str):
8177
agent, phone_user, stream_call, agent_session = await twilio_call.await_prepare()
8278
twilio_call.stream_call = stream_call
8379

84-
await attach_phone_to_call(stream_call, twilio_stream, phone_user)
80+
await twilio.attach_phone_to_call(stream_call, twilio_stream, phone_user.id)
8581

8682
with agent_session:
8783
await agent.llm.simple_response(
@@ -92,26 +88,6 @@ async def media_stream(websocket: WebSocket, call_sid: str, token: str):
9288
call_registry.remove(call_sid)
9389

9490

95-
async def attach_phone_to_call(
96-
call, twilio_stream: twilio.TwilioMediaStream, phone_user: User
97-
) -> None:
98-
"""Join a call and bridge audio between Twilio and Stream."""
99-
subscription_config = SubscriptionConfig(
100-
default=TrackSubscriptionConfig(track_types=[TrackType.TRACK_TYPE_AUDIO])
101-
)
102-
103-
connection = await rtc.join(call, phone_user.id, subscription_config=subscription_config)
104-
105-
@connection.on("audio")
106-
async def on_audio_received(pcm: PcmData):
107-
await twilio_stream.send_audio(pcm)
108-
109-
await connection.__aenter__()
110-
await connection.add_tracks(audio=twilio_stream.audio_track, video=None)
111-
112-
logger.info(f"🔊 {phone_user.name} attached to call")
113-
114-
11591
async def run_with_server(from_number: str, to_number: str):
11692
"""Start the server and initiate the outbound call once ready."""
11793
config = uvicorn.Config(app, host="localhost", port=8000, log_level="info")

plugins/twilio/vision_agents/plugins/twilio/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from .audio import mulaw_to_pcm, pcm_to_mulaw, TWILIO_SAMPLE_RATE
44
from .call_registry import TwilioCall, TwilioCallRegistry
5-
from .media_stream import TwilioMediaStream
5+
from .media_stream import TwilioMediaStream, attach_phone_to_call
66
from .models import (
77
CallWebhookInput,
88
TwilioSignatureVerifier,
@@ -17,6 +17,7 @@
1717
"TwilioCallRegistry",
1818
"TwilioMediaStream",
1919
"TwilioSignatureVerifier",
20+
"attach_phone_to_call",
2021
"create_media_stream_response",
2122
"create_media_stream_twiml",
2223
"mulaw_to_pcm",

plugins/twilio/vision_agents/plugins/twilio/media_stream.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,11 @@
55
import logging
66
from typing import Any, Protocol
77

8+
from getstream.video import rtc
89
from getstream.video.rtc.audio_track import AudioStreamTrack
10+
from getstream.video.rtc.pb.stream.video.sfu.models.models_pb2 import TrackType
911
from getstream.video.rtc.track_util import PcmData
12+
from getstream.video.rtc.tracks import SubscriptionConfig, TrackSubscriptionConfig
1013

1114
from .audio import mulaw_to_pcm, pcm_to_mulaw, TWILIO_SAMPLE_RATE
1215

@@ -143,3 +146,27 @@ def is_connected(self) -> bool:
143146
return self._connected
144147

145148

149+
async def attach_phone_to_call(call, twilio_stream: TwilioMediaStream, user_id: str) -> None:
150+
"""
151+
Attach a phone user to a Stream call, bridging audio between Twilio and Stream.
152+
153+
Args:
154+
call: The Stream call to attach to.
155+
twilio_stream: The TwilioMediaStream handling the Twilio WebSocket.
156+
user_id: The user ID for the phone participant.
157+
"""
158+
subscription_config = SubscriptionConfig(
159+
default=TrackSubscriptionConfig(track_types=[TrackType.TRACK_TYPE_AUDIO])
160+
)
161+
162+
connection = await rtc.join(call, user_id, subscription_config=subscription_config)
163+
164+
@connection.on("audio")
165+
async def on_audio_received(pcm: PcmData):
166+
await twilio_stream.send_audio(pcm)
167+
168+
await connection.__aenter__()
169+
await connection.add_tracks(audio=twilio_stream.audio_track, video=None)
170+
171+
logger.info(f"Phone user {user_id} attached to call")
172+

plugins/twilio/vision_agents/plugins/twilio/models.py

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import os
55
from typing import Optional
66

7+
from fastapi import Form, Request
78
from fastapi.responses import Response
89
from pydantic import BaseModel, Field
910
from twilio.request_validator import RequestValidator
@@ -82,14 +83,13 @@ def auth_token(self) -> str:
8283
)
8384
return token
8485

85-
async def __call__(self, request) -> None:
86+
async def __call__(self, request: Request) -> None:
8687
"""
8788
FastAPI dependency that verifies Twilio signature.
8889
8990
Raises:
9091
HTTPException: If signature is missing or invalid.
9192
"""
92-
# Import here to avoid requiring FastAPI as a dependency
9393
from fastapi import HTTPException
9494

9595
signature = request.headers.get("X-Twilio-Signature")
@@ -174,33 +174,33 @@ async def webhook(data: CallWebhookInput = Depends(CallWebhookInput.as_form)):
174174
@classmethod
175175
def as_form(
176176
cls,
177-
CallSid: str = "",
178-
AccountSid: str = "",
179-
ApiVersion: str = "2010-04-01",
180-
CallStatus: str = "",
181-
Direction: str = "",
182-
From: str = "",
183-
Caller: str = "",
184-
CallerCity: Optional[str] = None,
185-
CallerState: Optional[str] = None,
186-
CallerZip: Optional[str] = None,
187-
CallerCountry: Optional[str] = None,
188-
FromCity: Optional[str] = None,
189-
FromState: Optional[str] = None,
190-
FromZip: Optional[str] = None,
191-
FromCountry: Optional[str] = None,
192-
To: str = "",
193-
Called: str = "",
194-
CalledCity: Optional[str] = None,
195-
CalledState: Optional[str] = None,
196-
CalledZip: Optional[str] = None,
197-
CalledCountry: Optional[str] = None,
198-
ToCity: Optional[str] = None,
199-
ToState: Optional[str] = None,
200-
ToZip: Optional[str] = None,
201-
ToCountry: Optional[str] = None,
202-
StirVerstat: Optional[str] = None,
203-
CallToken: Optional[str] = None,
177+
CallSid: str = Form(""),
178+
AccountSid: str = Form(""),
179+
ApiVersion: str = Form("2010-04-01"),
180+
CallStatus: str = Form(""),
181+
Direction: str = Form(""),
182+
From: str = Form(""),
183+
Caller: str = Form(""),
184+
CallerCity: Optional[str] = Form(None),
185+
CallerState: Optional[str] = Form(None),
186+
CallerZip: Optional[str] = Form(None),
187+
CallerCountry: Optional[str] = Form(None),
188+
FromCity: Optional[str] = Form(None),
189+
FromState: Optional[str] = Form(None),
190+
FromZip: Optional[str] = Form(None),
191+
FromCountry: Optional[str] = Form(None),
192+
To: str = Form(""),
193+
Called: str = Form(""),
194+
CalledCity: Optional[str] = Form(None),
195+
CalledState: Optional[str] = Form(None),
196+
CalledZip: Optional[str] = Form(None),
197+
CalledCountry: Optional[str] = Form(None),
198+
ToCity: Optional[str] = Form(None),
199+
ToState: Optional[str] = Form(None),
200+
ToZip: Optional[str] = Form(None),
201+
ToCountry: Optional[str] = Form(None),
202+
StirVerstat: Optional[str] = Form(None),
203+
CallToken: Optional[str] = Form(None),
204204
) -> "CallWebhookInput":
205205
"""
206206
Create CallWebhookInput from FastAPI Form fields.

0 commit comments

Comments
 (0)