Skip to content

Commit 99617f4

Browse files
committed
fix(gemini): Add persistent hash-based deduplication for File Search
- Reuse existing stores with the same display_name instead of creating new ones - Store content hash (SHA-256) in document custom_metadata for persistence - Load existing hashes from API on startup to skip duplicate uploads - Works across app restarts: same content = skipped, regardless of filename - Update tests to use unique store names to avoid interference
1 parent 062309a commit 99617f4

File tree

2 files changed

+94
-13
lines changed

2 files changed

+94
-13
lines changed

plugins/gemini/tests/test_gemini_file_search.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@
2121
@pytest.fixture
2222
async def rag():
2323
"""Create a RAG instance for testing, clean up after."""
24-
# Use unique name to avoid conflicts
25-
rag = GeminiFilesearchRAG(name="test-rag-123")
24+
# Use unique name to avoid conflicts with store reuse
25+
unique_name = f"test-rag-{uuid.uuid4().hex[:8]}"
26+
rag = GeminiFilesearchRAG(name=unique_name)
2627
await rag.create()
2728
yield rag
2829
await rag.clear()

plugins/gemini/vision_agents/plugins/gemini/file_search.py

Lines changed: 91 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,13 @@ class GeminiFilesearchRAG(RAG):
3535
File Search imports, chunks, and indexes your data to enable fast retrieval
3636
of relevant information. Search is performed by Gemini's infrastructure.
3737
38+
The store automatically reuses existing stores with the same name and skips
39+
uploading documents that already exist (based on content hash stored in metadata).
40+
3841
Usage:
3942
rag = GeminiFilesearchRAG(name="my-knowledge-base")
40-
await rag.create()
41-
await rag.add_directory("./knowledge")
43+
await rag.create() # Reuses existing store if found
44+
await rag.add_directory("./knowledge")
4245
4346
# Search
4447
results = await rag.search("How does the API work?")
@@ -67,6 +70,8 @@ def __init__(
6770
self._store_name: str | None = None
6871
self._uploaded_files: list[str] = []
6972
self._uploaded_hashes: set[str] = set()
73+
# Map of content_hash -> display_name for existing documents (loaded from API)
74+
self._existing_hashes: set[str] = set()
7075
self._model = model
7176

7277
if client is not None:
@@ -91,7 +96,10 @@ def uploaded_hashes(self) -> set[str]:
9196

9297
async def create(self) -> str:
9398
"""
94-
Create the file search store. Must be called before adding documents.
99+
Create or reuse an existing file search store.
100+
101+
If a store with the same display_name already exists, it will be reused
102+
and existing documents will be loaded for deduplication.
95103
96104
Returns:
97105
The store resource name.
@@ -103,17 +111,68 @@ async def create(self) -> str:
103111
return self._store_name
104112

105113
loop = asyncio.get_event_loop()
114+
115+
# Check if a store with this name already exists
116+
existing_store = await loop.run_in_executor(
117+
None, self._find_existing_store
118+
)
119+
120+
if existing_store:
121+
self._store_name = existing_store
122+
await self._load_existing_documents()
123+
logger.info(
124+
f"Reusing existing store '{self.name}': {self._store_name} "
125+
f"({len(self._existing_hashes)} documents with hashes)"
126+
)
127+
return self._store_name
128+
129+
# Create new store if none exists
106130
store = await loop.run_in_executor(
107131
None,
108132
lambda: self._client.file_search_stores.create(
109133
config=CreateFileSearchStoreConfig(display_name=self.name)
110134
),
111135
)
112136
self._store_name = store.name
113-
logger.info(f"Created GeminiFilesearchRAG '{self.name}': {self._store_name}")
137+
logger.info(f"Created new store '{self.name}': {self._store_name}")
114138
assert self._store_name is not None
115139
return self._store_name
116140

141+
def _find_existing_store(self) -> str | None:
142+
"""Find an existing store with the same display_name."""
143+
for store in self._client.file_search_stores.list():
144+
if store.display_name == self.name:
145+
return store.name
146+
return None
147+
148+
async def _load_existing_documents(self) -> None:
149+
"""Load existing document hashes from the store for deduplication."""
150+
if not self._store_name:
151+
return
152+
153+
loop = asyncio.get_event_loop()
154+
store_name = self._store_name # Capture for closure
155+
156+
def list_docs():
157+
return list(
158+
self._client.file_search_stores.documents.list(parent=store_name)
159+
)
160+
161+
docs = await loop.run_in_executor(None, list_docs)
162+
163+
for doc in docs:
164+
self._uploaded_files.append(doc.display_name)
165+
# Extract content_hash from custom_metadata if present
166+
if doc.custom_metadata:
167+
for meta in doc.custom_metadata:
168+
if meta.key == "content_hash" and meta.string_value:
169+
self._existing_hashes.add(meta.string_value)
170+
break
171+
172+
logger.debug(
173+
f"Loaded {len(docs)} documents, {len(self._existing_hashes)} with hashes"
174+
)
175+
117176
async def _upload_file(
118177
self,
119178
file_path: str | Path,
@@ -123,6 +182,12 @@ async def _upload_file(
123182
"""
124183
Upload a single file to the file search store.
125184
185+
Skips upload if the content hash matches a previously uploaded file
186+
(checked against both in-memory session hashes and API-stored hashes).
187+
188+
The content hash is stored in the document's custom_metadata for
189+
persistent deduplication across restarts.
190+
126191
Args:
127192
file_path: Path to the file to upload.
128193
display_name: Optional display name (defaults to filename).
@@ -140,26 +205,39 @@ async def _upload_file(
140205
if not file_path.exists():
141206
raise FileNotFoundError(f"File not found: {file_path}")
142207

208+
display_name = display_name or file_path.name
209+
143210
# Compute hash if not provided
144211
if content_hash is None:
145212
content_hash = _compute_hash(file_path.read_text())
146213

147-
# Skip if already uploaded
148-
if content_hash in self._uploaded_hashes:
149-
logger.info(f"Skipping duplicate: {display_name or file_path.name}")
214+
# Check if hash already exists (from API or this session)
215+
if content_hash in self._existing_hashes:
216+
logger.info(f"Skipping (hash exists in store): {display_name}")
150217
return False
151218

152-
display_name = display_name or file_path.name
219+
if content_hash in self._uploaded_hashes:
220+
logger.info(f"Skipping (duplicate in session): {display_name}")
221+
return False
153222

154223
loop = asyncio.get_event_loop()
155224

156-
# Upload and wait for indexing
225+
# Upload with content_hash in custom_metadata for persistent deduplication
226+
# Capture variables for lambda closure
227+
file_str = str(file_path)
157228
operation = await loop.run_in_executor(
158229
None,
159230
lambda: self._client.file_search_stores.upload_to_file_search_store(
160-
file=str(file_path),
231+
file=file_str,
161232
file_search_store_name=store_name,
162-
config={"display_name": display_name},
233+
config=types.UploadToFileSearchStoreConfig(
234+
display_name=display_name,
235+
custom_metadata=[
236+
types.CustomMetadata(
237+
key="content_hash", string_value=content_hash
238+
),
239+
],
240+
),
163241
),
164242
)
165243

@@ -172,6 +250,7 @@ async def _upload_file(
172250

173251
self._uploaded_files.append(display_name)
174252
self._uploaded_hashes.add(content_hash)
253+
self._existing_hashes.add(content_hash)
175254
logger.info(f"Uploaded and indexed: {display_name}")
176255
return True
177256

@@ -333,6 +412,7 @@ async def clear(self) -> None:
333412
self._store_name = None
334413
self._uploaded_files = []
335414
self._uploaded_hashes = set()
415+
self._existing_hashes = set()
336416

337417
async def close(self) -> None:
338418
"""Close resources. Note: does not delete the store."""

0 commit comments

Comments
 (0)