Skip to content

Commit 394bbd0

Browse files
committed
turbopuffer cleanup
1 parent fa939c0 commit 394bbd0

File tree

13 files changed

+923
-309
lines changed

13 files changed

+923
-309
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .rag import Document, RAG
2+
3+
__all__ = ["Document", "RAG"]
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
import abc
2+
from dataclasses import dataclass
3+
from pathlib import Path
4+
5+
6+
@dataclass
7+
class Document:
8+
"""A document to be indexed in the RAG system."""
9+
10+
text: str
11+
source: str
12+
metadata: dict | None = None
13+
14+
15+
class RAG(abc.ABC):
16+
"""
17+
Abstract base class for RAG (Retrieval Augmented Generation) implementations.
18+
19+
The full complexities of RAG are beyond the scope of this project.
20+
We ship with examples including TurboPuffer RAG with hybrid search.
21+
22+
The documentation explains in greater detail how to build RAG.
23+
"""
24+
25+
@abc.abstractmethod
26+
async def add_documents(self, documents: list[Document]) -> int:
27+
"""
28+
Add documents to the RAG index.
29+
30+
Args:
31+
documents: List of documents to index.
32+
33+
Returns:
34+
Number of chunks indexed.
35+
"""
36+
37+
async def add_directory(
38+
self,
39+
path: str | Path,
40+
extensions: list[str] | None = None,
41+
) -> int:
42+
"""
43+
Add all files from a directory to the RAG index.
44+
45+
Args:
46+
path: Path to directory containing files.
47+
extensions: File extensions to include (e.g., ['.md', '.txt']).
48+
Defaults to ['.md', '.txt'].
49+
50+
Returns:
51+
Total number of chunks indexed.
52+
"""
53+
directory = Path(path)
54+
if not directory.is_dir():
55+
raise NotADirectoryError(f"Not a directory: {directory}")
56+
57+
if extensions is None:
58+
extensions = [".md", ".txt"]
59+
60+
# Normalize extensions
61+
extensions = [
62+
ext.lower() if ext.startswith(".") else f".{ext.lower()}"
63+
for ext in extensions
64+
]
65+
66+
files = [
67+
f
68+
for f in directory.iterdir()
69+
if f.is_file() and f.suffix.lower() in extensions
70+
]
71+
72+
if not files:
73+
return 0
74+
75+
documents = [
76+
Document(text=f.read_text(), source=f.name) for f in files
77+
]
78+
79+
return await self.add_documents(documents)
80+
81+
@abc.abstractmethod
82+
async def search(self, query: str, top_k: int = 3) -> str:
83+
"""
84+
Search the knowledge base.
85+
86+
Args:
87+
query: Search query.
88+
top_k: Number of results to return.
89+
90+
Returns:
91+
Formatted string with search results.
92+
"""
93+
94+
@abc.abstractmethod
95+
async def clear(self) -> None:
96+
"""Clear all indexed documents."""
97+
98+
async def close(self) -> None:
99+
"""Close any open resources. Override if needed."""

examples/03_phone_and_rag_example/inbound_phone_and_rag_example.py

Lines changed: 27 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
from fastapi.responses import JSONResponse
2929

3030
from vision_agents.core import User, Agent
31-
from vision_agents.plugins import getstream, gemini, twilio, elevenlabs, deepgram
31+
from vision_agents.plugins import getstream, gemini, twilio, elevenlabs, deepgram, turbopuffer
3232

3333
logger = logging.getLogger(__name__)
3434
logging.basicConfig(level=logging.INFO)
@@ -111,7 +111,7 @@ async def media_stream(websocket: WebSocket, call_id: str, token: str):
111111
call_registry.remove(call_id)
112112

113113

114-
async def create_rag_knowledge():
114+
async def create_rag_from_directory():
115115
"""Initialize the RAG backend based on RAG_BACKEND environment variable."""
116116
global file_search_store, rag
117117

@@ -120,74 +120,38 @@ async def create_rag_knowledge():
120120
return
121121

122122
if RAG_BACKEND == "turbopuffer":
123-
await _init_turbopuffer_rag()
123+
logger.info(f"📚 Initializing TurboPuffer RAG from {KNOWLEDGE_DIR}")
124+
rag = await turbopuffer.create_rag(
125+
namespace="stream-product-knowledge-gemini",
126+
knowledge_dir=KNOWLEDGE_DIR,
127+
extensions=[".md"],
128+
)
129+
logger.info(f"✅ TurboPuffer RAG ready with {len(rag._indexed_files)} documents indexed")
124130
else:
125-
await _init_gemini_rag()
131+
logger.info(f"📚 Initializing Gemini File Search from {KNOWLEDGE_DIR}")
132+
file_search_store = await gemini.create_file_search_store(
133+
name="stream-product-knowledge",
134+
knowledge_dir=KNOWLEDGE_DIR,
135+
extensions=[".md"],
136+
)
137+
logger.info(f"✅ Gemini RAG ready with {len(file_search_store._uploaded_files)} documents")
126138

127139

128-
async def _init_gemini_rag():
129-
"""Initialize Gemini File Search RAG."""
130-
global file_search_store
131140

132-
logger.info(f"📚 Initializing Gemini File Search from {KNOWLEDGE_DIR}")
133-
file_search_store = await gemini.create_file_search_store(
134-
name="stream-product-knowledge",
135-
knowledge_dir=KNOWLEDGE_DIR,
136-
extensions=[".md"],
137-
)
138-
logger.info(f"✅ Gemini RAG ready with {len(file_search_store._uploaded_files)} documents")
139-
140-
141-
async def _init_turbopuffer_rag():
142-
"""Initialize TurboPuffer + LangChain RAG."""
143-
global rag
144-
145-
from rag_turbopuffer import create_rag
146-
147-
logger.info(f"📚 Initializing TurboPuffer RAG from {KNOWLEDGE_DIR}")
148-
rag = await create_rag(
149-
namespace="stream-product-knowledge-gemini",
150-
knowledge_dir=KNOWLEDGE_DIR,
151-
extensions=[".md"],
152-
)
153-
logger.info(f"✅ TurboPuffer RAG ready with {len(rag._indexed_files)} documents indexed")
154-
155-
156-
157-
async def create_agent(**kwargs) -> Agent:
141+
async def create_agent() -> Agent:
158142
"""Create an agent with RAG capabilities."""
159-
if RAG_BACKEND == "turbopuffer":
160-
return await _create_agent_turbopuffer()
161-
else:
162-
return await _create_agent_gemini()
163-
164-
165-
async def _create_agent_gemini() -> Agent:
166-
"""Create agent with Gemini File Search RAG."""
167143
instructions = """Read the instructions in @instructions.md"""
168144

169-
return Agent(
170-
edge=getstream.Edge(),
171-
agent_user=User(id="ai-agent", name="AI"),
172-
instructions=instructions,
173-
tts=elevenlabs.TTS(voice_id="FGY2WhTYpPnrIDTdsKH5"),
174-
stt=deepgram.STT(eager_turn_detection=True),
175-
llm=gemini.LLM("gemini-2.5-flash-lite", file_search_store=file_search_store),
176-
)
177-
178-
179-
async def _create_agent_turbopuffer() -> Agent:
180-
"""Create agent with TurboPuffer RAG via function calling."""
181-
instructions = """Read the instructions in @instructions.md"""
182-
183-
llm = gemini.LLM("gemini-2.5-flash-lite")
145+
if RAG_BACKEND == "turbopuffer":
146+
llm = gemini.LLM("gemini-2.5-flash-lite")
184147

185-
# Register RAG search as a callable function
186-
@llm.register_function(
187-
description="Search Stream's product knowledge base for detailed information about Chat, Video, Feeds, and Moderation APIs."
188-
)
189-
async def search_knowledge(query: str) -> str:
190-
return await rag.search(query, top_k=3)
148+
@llm.register_function(
149+
description="Search Stream's product knowledge base for detailed information about Chat, Video, Feeds, and Moderation APIs."
150+
)
151+
async def search_knowledge(query: str) -> str:
152+
return await rag.search(query, top_k=3)
153+
else:
154+
llm = gemini.LLM("gemini-2.5-flash-lite", file_search_store=file_search_store)
191155

192156
return Agent(
193157
edge=getstream.Edge(),
@@ -200,6 +164,6 @@ async def search_knowledge(query: str) -> str:
200164

201165

202166
if __name__ == "__main__":
203-
asyncio.run(create_rag_knowledge())
167+
asyncio.run(create_rag_from_directory())
204168
logger.info(f"Starting with RAG_BACKEND={RAG_BACKEND}")
205169
uvicorn.run(app, host="localhost", port=8000)
Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
from .gemini_llm import GeminiLLM as LLM
22
from .gemini_realtime import GeminiRealtime as Realtime
3-
from .file_search import FileSearchStore, create_file_search_store
3+
from .file_search import GeminiFilesearchRAG, FileSearchStore, create_file_search_store
44
from google.genai.types import ThinkingLevel, MediaResolution
55

66
__all__ = [
77
"Realtime",
88
"LLM",
99
"ThinkingLevel",
1010
"MediaResolution",
11-
"FileSearchStore",
11+
"GeminiFilesearchRAG",
12+
"FileSearchStore", # Backwards compatibility alias
1213
"create_file_search_store",
1314
]

0 commit comments

Comments
 (0)