Bump up to new llama-stack 0.3.x index creation#61
Bump up to new llama-stack 0.3.x index creation#61tisnik merged 1 commit intolightspeed-core:mainfrom
Conversation
|
Warning Rate limit exceeded@are-ces has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 13 minutes and 59 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ⛔ Files ignored due to path filters (1)
📒 Files selected for processing (4)
WalkthroughSwitched llama-stack integration to async APIs: replaced client with AsyncLlamaStackAsLibraryClient, removed provider_vector_db_id, added asyncio-based insert/upload flows and new async methods, updated YAML templating/config generation, and bumped llama-stack related dependencies. Changes
Sequence Diagram(s)sequenceDiagram
participant App as Document Processor
participant AsyncClient as AsyncLlamaStackClient
participant VectorAPI as Vector Store API
participant FileAPI as File Service
rect rgba(230,245,255,1)
Note over App,AsyncClient: Manual chunking (_insert_prechunked_documents)
App->>AsyncClient: initialize client
App->>AsyncClient: create vector store (vector_stores.create)
AsyncClient->>VectorAPI: vector_stores.create
VectorAPI-->>AsyncClient: store_id
App->>FileAPI: create placeholder files (files.create)
FileAPI-->>App: file_ids
App->>AsyncClient: vector_io.insert(chunks, vector_db_id)
AsyncClient->>VectorAPI: persist chunks
VectorAPI-->>AsyncClient: insert_complete
end
rect rgba(255,240,250,1)
Note over App,AsyncClient: Auto chunking (_upload_and_process_file_batches)
App->>AsyncClient: initialize client
App->>AsyncClient: create vector store (vector_stores.create)
AsyncClient->>VectorAPI: vector_stores.create
VectorAPI-->>AsyncClient: store_id
App->>FileAPI: upload files (files.create / upload)
FileAPI-->>App: file_ids
App->>AsyncClient: vector_stores.file_batches.create(file_ids)
AsyncClient->>VectorAPI: start batch processing
loop poll
App->>AsyncClient: poll batch status
AsyncClient->>VectorAPI: file_batches.get_status
VectorAPI-->>AsyncClient: status_update
end
VectorAPI-->>AsyncClient: batch_complete
AsyncClient-->>App: processing_done
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 7
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/lightspeed_rag_content/document_processor.py (1)
338-343: Testtest_write_yaml_config_faiss_with_provider_vector_db_idpasses removed parameter.The
write_yaml_configmethod no longer accepts theprovider_vector_db_idparameter, but the test at line 182-184 intests/test_document_processor_llama_stack.pystill passes it as a keyword argument. This will cause aTypeErrorat runtime. Either remove this test if the feature is no longer needed, or update the test to match the new signature.
🤖 Fix all issues with AI agents
In @pyproject.toml:
- Around line 49-55: Add explicit documentation and an inline security note
describing CVE-2024-50050 and the mitigation: state that llama-stack 0.3.5 is
vulnerable to unsafe deserialization via pyzmq/pickle but our usage is safe
because we instantiate llama-stack only as an in-process library
(AsyncLlamaStackAsLibraryClient and LlamaStackAsLibraryClient) with file-based
configuration and no ZeroMQ network endpoints; place a concise comment next to
the AsyncLlamaStackAsLibraryClient/LlamaStackAsLibraryClient constructors and
add a short entry in the project security/README documenting the vulnerability,
the current dependency versions (llama-stack==0.3.5, llama-stack-client==0.3.5),
and the required constraint that any future changes must avoid enabling
network/exposed ZeroMQ endpoints or must adopt a patched version.
In @src/lightspeed_rag_content/document_processor.py:
- Around line 487-488: Run the project's Black formatter on the file and remove
the stray extra blank line causing the formatting error; reformat the file
(using the project's black invocation such as the uv tool wrapper) so the file
conforms to Black's rules and the pipeline will pass.
- Around line 199-275: The TEMPLATE string contains no {index_id} or
{vector_io_cfg} placeholders but write_yaml_config (and its format(...) call)
still sets index_id and vector_io_cfg, so remove the dead variables or restore
placeholders: either delete the unused index_id parameter from
write_yaml_config, remove creation/insertion of vector_io_cfg and stop passing
them into TEMPLATE.format in the write_yaml_config function, or alternatively
add appropriate {index_id} and {vector_io_cfg} placeholders into TEMPLATE at the
intended locations (and ensure their values are rendered correctly) so the
format call actually uses them.
- Around line 419-426: The code hardcodes provider_id as "faiss" when creating
vector stores; change it to derive the correct provider id used by _LlamaStackDB
(either "llamastack-faiss" or "llamastack-sqlite-vec") instead of the literal
"faiss". Update the vector store creation call (the block assigning vector_store
= await client.vector_stores.create(...)) to compute provider_id based on the
instance/config (e.g., inspect self.provider, self.provider_id or self.config to
detect "sqlite-vec" and map to "llamastack-sqlite-vec", otherwise use
"llamastack-faiss"), and apply the same change in the
_upload_and_process_file_batches function so both places use the computed
provider_id rather than a hardcoded value.
- Around line 466-474: The mypy error is caused by doc_groups lacking a type
annotation; annotate it as a mapping from (docs_url, title) tuples to lists of
indices (e.g. doc_groups: dict[tuple[str, str], list[int]] = {}) and, if your
codebase targets older Python, import and use typing types (from typing import
Dict, Tuple, List) and declare doc_groups: Dict[Tuple[str, str], List[int]] = {}
so mypy knows the key/value types used when grouping indices from self.documents
in the loop.
- Around line 331-332: Update the unit tests that assert the client class name
to match the async implementation: change the expected string from
"LlamaStackAsLibraryClient" to "AsyncLlamaStackAsLibraryClient" in the
assertions that check doc.client_class.__name__ (tests referencing the
DocumentProcessor instance and its client_class). Ensure both assertions that
currently assert "LlamaStackAsLibraryClient" are updated to assert
"AsyncLlamaStackAsLibraryClient" so they reflect the actual client_class
assigned in the DocumentProcessor code.
🧹 Nitpick comments (3)
src/lightspeed_rag_content/document_processor.py (3)
364-374: Outdated docstring and potential event loop concern.The docstring (line 368) still references
LlamaStackAsLibraryClientbut the code now usesAsyncLlamaStackAsLibraryClient.Additionally, calling
asyncio.run()here for initialization, then again insave()for document operations, works but creates multiple event loop lifecycles. Consider making_start_llama_stackasync and consolidating event loop management.♻️ Suggested docstring fix
def _start_llama_stack(self, cfg_file: str) -> Any: """Start llama-stack as a library and return the client. Return type is really - llama_stack.core.library_client.LlamaStackAsLibraryClient + llama_stack.core.library_client.AsyncLlamaStackAsLibraryClient But we do dynamic import, so we don't have it for static typechecking """
476-486: Consider failing or tracking failed file uploads.When file uploads fail, the error is logged but the operation continues and all chunks are inserted. This could result in chunks referencing non-existent files, breaking citations downstream. Consider either:
- Failing the entire operation if any uploads fail
- Filtering out chunks whose source file upload failed
♻️ Possible approach to track failures
upload_tasks = [upload_file(chunk_indices) for chunk_indices in doc_groups.values()] results = await asyncio.gather(*upload_tasks, return_exceptions=True) + failed_doc_keys = [] - for result in results: + for doc_key, result in zip(doc_groups.keys(), results): if isinstance(result, Exception): LOG.error("Failed to upload file: %s", result) + failed_doc_keys.append(doc_key) + + if failed_doc_keys: + # Either raise or filter out failed chunks + raise RuntimeError(f"Failed to upload {len(failed_doc_keys)} files")
543-551: Consider a more reasonable timeout for batch processing.The 24-hour (
24 * 60 * 60seconds) max wait time seems excessive. If batch processing takes anywhere near that long, there's likely a systemic issue. Consider a shorter timeout (e.g., 1-2 hours) with clearer messaging, or make it configurable.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (2)
pyproject.tomlsrc/lightspeed_rag_content/document_processor.py
🧰 Additional context used
🪛 GitHub Actions: Black
src/lightspeed_rag_content/document_processor.py
[error] 1-1: Black formatting check failed. 1 file would be reformatted. Command: 'uv tool run black --check .'
🪛 GitHub Actions: Type checks
src/lightspeed_rag_content/document_processor.py
[error] 466-466: mypy error: Need type annotation for 'doc_groups' (hint: "doc_groups: dict[, ] = ..."). Add a type annotation for the variable.
🪛 GitHub Actions: Unit tests
src/lightspeed_rag_content/document_processor.py
[error] 1-1: Test failure: _LlamaStackDB.client_class name is 'AsyncLlamaStackAsLibraryClient' but test expects 'LlamaStackAsLibraryClient'.
[error] 1-1: Test failure: _LlamaStackDB initialization path asserts client_class name mismatch (Async vs sync client).
[error] 1-1: Test failure: write_yaml_config output does not match expected YAML fixture (image_name, APIs, providers, etc. differ).
[error] 1-1: TypeError: _LlamaStackDB.write_yaml_config() got an unexpected keyword argument 'provider_vector_db_id' in tests.
[error] 1-1: Async usage issue: tests mock client.vector_stores.create as a non-awaitable, but code awaits it, causing TypeError: object MagicMock can't be used in 'await' expression.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: Konflux kflux-prd-rh02 / rag-tool-on-pull-request
- GitHub Check: build-and-push-dev
- GitHub Check: Pylinter
🔇 Additional comments (2)
src/lightspeed_rag_content/document_processor.py (2)
17-23: LGTM on new imports.The
asyncioandBytesIOimports are correctly added to support the new async operations and in-memory file handling.
71-77: Early return for llama-stack path is correct.The early return prevents setting
Settings.embed_modelfor llama-stack types, which is appropriate since llama-stack handles embeddings internally. There's some duplication with lines 79-87, but it's acceptable given the different initialization requirements.
| doc_uuid = doc_metadata["document_id"] | ||
|
|
||
| # Create filename: {uuid}::{url}::{title}.txt | ||
| filename = f"{doc_uuid}::{docs_url}::{title}" |
There was a problem hiding this comment.
how the docs_url look like? Does it contain a protocol too?
There was a problem hiding this comment.
It might, it depends on how users decide to build it (this is defined in custom_processor.py).
An example I created is: 8dc5e2fa-b3a1-4190-8c64-2ac0b535cb65::https://www.redhat.com/data/architecture/architecture-installation.txt::Installation_and_update.txt which includes the https protocol
tisnik
left a comment
There was a problem hiding this comment.
LGTM, thank you very much. TBH I was worried that much more changes would be needed.
a06f066 to
44d7880
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (4)
tests/test_document_processor_llama_stack.py (4)
130-130: Update assertions to match the new async client class.The source code now uses
AsyncLlamaStackAsLibraryClient(line 332 in document_processor.py), but these tests still assert forLlamaStackAsLibraryClient. This is causing the pipeline failures.🔧 Proposed fix
- assert doc.client_class.__name__ == "LlamaStackAsLibraryClient" + assert doc.client_class.__name__ == "AsyncLlamaStackAsLibraryClient"Apply this same fix at both lines 130 and 156.
Also applies to: 156-156
26-75: Update YAML template to match the source implementation.The
FAISS_EXPECTEDtemplate in the test file does not match the significantly restructuredTEMPLATEin the source file (lines 199-275 of document_processor.py). The source now uses a different structure withserver,conversations_store,metadata_store,storage.backends,storage.stores, andregistered_resourcessections. This mismatch is causing pipeline failures at lines 175 and 266.📝 Recommended action
Copy the updated YAML template from the source file (lines 199-275) and adapt it for the test expectations. The new template includes:
serverconfiguration with port 8321conversations_storeandmetadata_storeconfigurationstorage.backendsandstorage.storessectionsregistered_resourcesinstead of inline resource definitions- Different provider configuration structure
Alternatively, generate a verification script to compare the actual YAML output against expected values dynamically.
179-198: Remove test for deleted parameter.The
provider_vector_db_idparameter was removed fromwrite_yaml_configsignature in the source file (line 338-343). This test is calling the method with a non-existent parameter, causing the TypeError at line 190 in the pipeline failures.🗑️ Proposed fix
Either remove this entire test method or update it to test the new signature without
provider_vector_db_id:def test_write_yaml_config_faiss_with_provider_vector_db_id( self, mocker, llama_stack_processor ): - """Test YAML configuration generation for FAISS vector store backend.""" + """Test YAML configuration generation for FAISS vector store backend with custom vector store ID.""" mock_open = mocker.patch("builtins.open", new_callable=mocker.mock_open) doc = document_processor._LlamaStackDB(llama_stack_processor["config"]) provider_id = "my_provider_id" yaml_file = "yaml_file" db_file = "db_file" - vector_store_id = "provider_vector_db_id: my_provider_vector_db_id" - doc.write_yaml_config( - provider_id, yaml_file, db_file, provider_vector_db_id=vector_store_id - ) + doc.write_yaml_config(provider_id, yaml_file, db_file)Note: If the functionality for custom vector_store_id is no longer supported, consider removing this test entirely.
1-456: Address Black formatting issues.The pipeline reports that Black formatting check failed. Run
black .to reformat the code according to the project's style guidelines.#!/bin/bash # Run Black formatter on the test file black tests/test_document_processor_llama_stack.py --check --diff
🤖 Fix all issues with AI agents
In @src/lightspeed_rag_content/document_processor.py:
- Around line 447-463: The constructed filename (variable filename used when
creating file_obj and uploaded_file via client.files.create) can include invalid
filesystem characters and the Windows-invalid ':' from the "::" delimiter;
create a small helper (e.g., _sanitize_filename) that replaces characters like
<>:"/\\|?* with a safe character (underscore) and use it when building filename,
and change the delimiter from "::" to a safe token (e.g., ";;") before assigning
file_obj.name and updating document metadata (references: filename,
file_obj.name, client.files.create, and the loop assigning
self.documents[...]["metadata"]["filename"]).
🧹 Nitpick comments (7)
src/lightspeed_rag_content/document_processor.py (7)
71-77: Early return creates inconsistent initialization.The early return for
llamastackvector stores bypasses the embedding model initialization that other code paths execute (lines 79-87), but this appears intentional since llama-stack handles embeddings differently. However, the conditional logic could be clearer.♻️ Optional refactor for clarity
Consider extracting the llamastack initialization into a separate method to make the divergent paths more explicit:
def __init__(self, config: _Config): self.config = config if config.vector_store_type.startswith("llamastack"): - if config.manual_chunking: - Settings.chunk_size = self.config.chunk_size - Settings.chunk_overlap = self.config.chunk_overlap - if config.doc_type == "markdown": - Settings.node_parser = MarkdownNodeParser() - return + self._init_llamastack_settings() + return + + self._init_standard_settings() + + def _init_llamastack_settings(self): + """Initialize settings for llamastack vector stores.""" + if self.config.manual_chunking: + Settings.chunk_size = self.config.chunk_size + Settings.chunk_overlap = self.config.chunk_overlap + if self.config.doc_type == "markdown": + Settings.node_parser = MarkdownNodeParser() + + def _init_standard_settings(self): + """Initialize settings for standard vector stores.""" if config.manual_chunking: Settings.chunk_size = self.config.chunk_size ...
332-332: Inconsistency with class name in docstring.Line 332 uses
AsyncLlamaStackAsLibraryClient, but the docstring at line 368 still referencesLlamaStackAsLibraryClient. This is a minor documentation inconsistency.📝 Update docstring
def _start_llama_stack(self, cfg_file: str) -> Any: """Start llama-stack as a library and return the client. Return type is really - llama_stack.core.library_client.LlamaStackAsLibraryClient + llama_stack.core.library_client.AsyncLlamaStackAsLibraryClient But we do dynamic import, so we don't have it for static typechecking """
407-489: Review async error handling and concurrency patterns.The
_insert_prechunked_documentsmethod usesasyncio.gatherwithreturn_exceptions=True(line 481), which is good. However, consider the following:
- Line 481-484: Errors are logged but processing continues. If file uploads fail, the subsequent
vector_io.insertwill have chunks with invalid/missingdocument_idreferences.- Line 428-465: The nested
upload_filefunction modifiesself.documentsin place while concurrent tasks may be reading it. While Python's GIL protects against corruption, the logic depends on all uploads completing before the insert operation.🔒 Recommended improvements
Issue 1: Stop processing if file uploads fail
results = await asyncio.gather(*upload_tasks, return_exceptions=True) + failed_uploads = [r for r in results if isinstance(r, Exception)] + if failed_uploads: + error_msg = f"Failed to upload {len(failed_uploads)} file(s)" + LOG.error(error_msg) + raise RuntimeError(error_msg) - for result in results: - if isinstance(result, Exception): - LOG.error("Failed to upload file: %s", result)Issue 2: Make document mutation more explicit
Consider building a new list of updated chunks instead of mutating
self.documentsin place, or add a comment explaining why in-place mutation is safe here.
491-562: Batch processing timeout is hardcoded and might be too aggressive.Line 545 sets
max_wait = 24 * 60 * 60(24 hours), which seems excessive for typical use cases but could be appropriate for large datasets. However, there's no configuration option to adjust this.Additionally, the polling interval of 2 seconds (line 549) is hardcoded.
Consider making these configurable via the
_Configclass:# Wait for batch completion - max_wait = 24 * 60 * 60 + max_wait = getattr(self.config, 'batch_max_wait_seconds', 24 * 60 * 60) start_time = time.time() while batch.status == "in_progress" and (time.time() - start_time) < max_wait: - await asyncio.sleep(2) + await asyncio.sleep(getattr(self.config, 'batch_poll_interval', 2))
544-562: Incomplete error information in batch failure message.Lines 555-561 construct an error message when batch processing fails, but the message doesn't include potentially useful debugging information like the batch ID or individual file error details that might be available in the batch object.
📊 Enhanced error reporting
if batch.status != "completed": error_msg = ( f"Batch processing failed with status '{batch.status}'. " + f"Batch ID: {batch.id}. " f"Completed {batch.file_counts.completed}/{batch.file_counts.total} files. " f"Failed: {batch.file_counts.failed}, In progress: {batch.file_counts.in_progress}" ) LOG.error(error_msg) + # Log additional batch details if available + if hasattr(batch, 'errors'): + for error in batch.errors: + LOG.error("Batch error: %s", error) raise RuntimeError(error_msg)
373-373: Using asyncio.run in a potentially async context.Line 373 uses
asyncio.run(client.initialize()), which creates a new event loop. If_start_llama_stackis ever called from an async context (which it isn't currently), this would raise aRuntimeError: asyncio.run() cannot be called from a running event loop.The same pattern is used in lines 579 and 581 in the
savemethod.If there's a possibility that these methods might be called from async contexts in the future, consider making them async:
async def _start_llama_stack(self, cfg_file: str) -> Any: """Start llama-stack as a library and return the client.""" client = self.client_class(cfg_file) await client.initialize() return client async def save(self, index: str, output_dir: str, ...) -> None: """Save in the vector database all the documents we added.""" # ... setup code ... client = await self._start_llama_stack(cfg_file) if self.config.manual_chunking: await self._insert_prechunked_documents(client, index) else: await self._upload_and_process_file_batches(client, index)Then update callers to use
asyncio.run(processor.save(...)).However, this would be a more invasive change. The current approach is acceptable if these methods are guaranteed to be called from synchronous contexts only.
563-584: Add cleanup on exception for created vector store and uploaded files.If
_insert_prechunked_documentsor_upload_and_process_file_batchesfails after creating the vector_store, the partially created resources (vector_store and uploaded files) remain in the llama-stack instance. llama-stack 0.3.5 provides OpenAI-compatible DELETE endpoints to clean these up (e.g., DELETE /v1/openai/v1/vector_stores/{vector_store_id}).Consider implementing cleanup on exception:
def save( self, index: str, output_dir: str, embedded_files: Optional[int] = None, exec_time: Optional[int] = None, ) -> None: """Save in the vector database all the documents we added.""" os.makedirs(output_dir, exist_ok=True) db_file = os.path.realpath(os.path.join(output_dir, self.db_filename)) cfg_file = os.path.join(output_dir, self.CFG_FILENAME) self.write_yaml_config(index, cfg_file, db_file) client = self._start_llama_stack(cfg_file) try: if self.config.manual_chunking: asyncio.run(self._insert_prechunked_documents(client, index)) else: asyncio.run(self._upload_and_process_file_batches(client, index)) except Exception as exc: LOG.error("Failed to insert document: %s", exc) + # Clean up: delete vector_store and uploaded files using client APIs + # llama-stack 0.3.5 supports DELETE via OpenAI-compatible vector_stores endpoint raise
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (3)
pyproject.tomlsrc/lightspeed_rag_content/document_processor.pytests/test_document_processor_llama_stack.py
🚧 Files skipped from review as they are similar to previous changes (1)
- pyproject.toml
🧰 Additional context used
🧬 Code graph analysis (1)
tests/test_document_processor_llama_stack.py (1)
src/lightspeed_rag_content/document_processor.py (4)
_start_llama_stack(364-374)save(155-160)save(563-584)save(695-698)
🪛 GitHub Actions: Black
tests/test_document_processor_llama_stack.py
[error] 1-1: Black formatting check failed. 1 file would be reformatted. Run 'black .' to format. Command 'uv tool run black --check .' exited with code 1.
🪛 GitHub Actions: Unit tests
tests/test_document_processor_llama_stack.py
[error] 130-130: AssertionError: expected client_class name 'LlamaStackAsLibraryClient' but got 'AsyncLlamaStackAsLibraryClient'.
[error] 156-156: AssertionError: expected client_class name 'LlamaStackAsLibraryClient' but got 'AsyncLlamaStackAsLibraryClient'.
[error] 175-175: AssertionError: YAML output mismatch for write_yaml_config_faiss.
[error] 190-190: TypeError: _LlamaStackDB.write_yaml_config() got an unexpected keyword argument 'provider_vector_db_id'.
[error] 266-266: AssertionError: YAML output mismatch for sqlite vec configuration.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: build-and-push-dev
- GitHub Check: Pylinter
- GitHub Check: Konflux kflux-prd-rh02 / rag-tool-on-pull-request
🔇 Additional comments (6)
tests/test_document_processor_llama_stack.py (4)
86-93: Approve the refined mocking strategy.The conditional mock for
os.path.existsis a good improvement that provides more realistic behavior by only returning False for embeddings_model_dir checks while preserving the original behavior for other paths.
278-284: Good use of AsyncMock for async initialization.The test correctly mocks the async
initializemethod withAsyncMockand asserts it was awaited once, which aligns with the async initialization flow in the source.
377-426: Well-structured test data and async mocks.The test correctly:
- Creates structured document dictionaries matching the new format with content, mime_type, metadata, and chunk_metadata
- Mocks all async client methods (vector_stores.create, files.create, vector_io.insert, file_batches.create) using AsyncMock
- Sets up realistic return values for the mocked async operations
443-449: The async call assertions are correct and use the proper AsyncMock API.The
await_countandawait_argsattributes are the standard attributes provided byunittest.mock.AsyncMockin Python 3.8+ (your project uses Python 3.12). These correctly track awaited calls and their arguments separately from regular calls. The assertions are verifying the async call behavior properly.src/lightspeed_rag_content/document_processor.py (2)
17-17: LGTM: Required imports for async operations.The addition of
asyncioandBytesIOimports are necessary for the new async llama-stack integration and in-memory file handling.Also applies to: 23-23
199-275: Verify the YAML template schema compatibility with llama-stack 0.3.5.The YAML template structure uses top-level
conversations_store,metadata_store, and a separatestorage.backends/storage.storessection. Official llama-stack examples in public documentation show kvstore configurations nested directly within provider configs, not as separate top-level sections. Additionally, official v0.3.5 release notes and schema documentation are not publicly available (GitHub releases only go through v0.3.4). Without access to the official llama-stack 0.3.5 schema specification, confirm this template structure is compatible with the target version or provide reference documentation that validates this configuration format.
| filename = f"{doc_uuid}::{docs_url}::{title}" | ||
|
|
||
| file_obj = BytesIO("".encode("utf-8")) # Empty file for citation anchor | ||
| file_obj.name = f"{filename}.txt" | ||
|
|
||
| uploaded_file = await client.files.create( | ||
| file=file_obj, | ||
| purpose="assistants", | ||
| ) | ||
|
|
||
| # Update ALL chunks from this source document with the file_id | ||
| for chunk_idx in chunk_indices: | ||
| self.documents[chunk_idx]["metadata"]["document_id"] = uploaded_file.id | ||
| self.documents[chunk_idx]["chunk_metadata"][ | ||
| "document_id" | ||
| ] = uploaded_file.id | ||
| self.documents[chunk_idx]["metadata"]["filename"] = filename |
There was a problem hiding this comment.
Filename construction may include special characters.
Lines 447 and 511 construct filenames using the pattern f"{doc_uuid}::{docs_url}::{title}.txt". The PR objectives mention this is a workaround for a llama-stack bug with metadata. However, URLs and titles might contain characters that are problematic for filenames (e.g., /, \, ?, *, :, |, <, >, ").
The :: delimiter itself uses : which is invalid in Windows filenames. Additionally, URLs contain / and ? characters, and titles could contain various special characters, all of which could cause file system errors.
🔧 Proposed fix to sanitize filenames
import re
def _sanitize_filename(text: str) -> str:
"""Sanitize text for use in filenames by replacing invalid characters."""
# Replace problematic characters with underscores
return re.sub(r'[<>:"/\\|?*]', '_', text)
# Then in the upload_file functions:
filename = f"{doc_uuid};;{_sanitize_filename(docs_url)};;{_sanitize_filename(title)}"Note: Changed delimiter from :: to ;; to avoid Windows path issues, but any safe delimiter works.
🤖 Prompt for AI Agents
In @src/lightspeed_rag_content/document_processor.py around lines 447 - 463, The
constructed filename (variable filename used when creating file_obj and
uploaded_file via client.files.create) can include invalid filesystem characters
and the Windows-invalid ':' from the "::" delimiter; create a small helper
(e.g., _sanitize_filename) that replaces characters like <>:"/\\|?* with a safe
character (underscore) and use it when building filename, and change the
delimiter from "::" to a safe token (e.g., ";;") before assigning file_obj.name
and updating document metadata (references: filename, file_obj.name,
client.files.create, and the loop assigning
self.documents[...]["metadata"]["filename"]).
191e832 to
a89102c
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In @src/lightspeed_rag_content/document_processor.py:
- Around line 491-561: In _upload_and_process_file_batches replace the hardcoded
provider_id "faiss" with the configured value (use
self.config.vector_store_type) when creating vector_store so both faiss and
sqlite-vec are supported, and after gathering uploads validate that file_ids is
not empty before calling client.vector_stores.file_batches.create — if file_ids
is empty, log a clear error via LOG.error and raise a RuntimeError describing
that all file uploads failed to avoid creating an empty batch.
- Around line 407-489: The _insert_prechunked_documents implementation hardcodes
provider_id and proceeds with vector_io.insert even if file uploads failed;
update client.vector_stores.create to set "provider_id" from
self.config.vector_store_type (not literal "faiss"), and after launching
upload_file tasks (the upload_file inner function), inspect asyncio.gather
results and abort (raise or return early) if any result is an Exception instead
of merely logging; additionally validate or filter self.documents so every chunk
has a valid uploaded file id (check metadata["document_id"] and
chunk_metadata["document_id"]) before calling client.vector_io.insert (or remove
chunks lacking file ids), and ensure any failure surfaces to the caller so
insertion cannot proceed with broken citations.
🧹 Nitpick comments (2)
pyproject.toml (2)
54-55: Clarify the purpose of chardet and einops dependencies.These libraries are not imported anywhere in the codebase. If they're transitive dependencies from
llama-stack==0.3.5or related packages, consider whether they need explicit version pinning or if the dependency resolver should manage them automatically. If they're required for optional functionality or internal use by llama-stack, add a comment explaining their necessity.
49-50: Exact version pinning for llama-stack is acceptable during migration.The dependencies are pinned to
==0.3.5, which is a strict constraint but intentional during the 0.3.x migration period. Patch releases within the 0.3.x line are backward-compatible, so this approach ensures reproducibility without blocking compatible bug fixes.Once the migration stabilizes, consider relaxing the constraint to
>=0.3.5,<0.4.0to allow patch-level updates while maintaining API compatibility.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (3)
pyproject.tomlsrc/lightspeed_rag_content/document_processor.pytests/test_document_processor_llama_stack.py
🧰 Additional context used
🧬 Code graph analysis (1)
tests/test_document_processor_llama_stack.py (1)
src/lightspeed_rag_content/document_processor.py (5)
write_yaml_config(338-362)_start_llama_stack(364-374)save(155-160)save(563-584)save(695-698)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: build-and-push-dev
- GitHub Check: Konflux kflux-prd-rh02 / rag-tool-on-pull-request
🔇 Additional comments (13)
tests/test_document_processor_llama_stack.py (7)
18-18: LGTM! Necessary import for async test support.The
AsyncMockimport is required for properly mocking async methods introduced by theAsyncLlamaStackAsLibraryClient.
113-122: LGTM! More precise mocking strategy.The selective
os.path.existsmock correctly isolates the embeddings_model_dir check while preserving original behavior for other paths, which improves test accuracy.
159-159: LGTM! Assertions correctly updated for async client.The tests now properly validate that the document processor uses
AsyncLlamaStackAsLibraryClient, aligning with the async integration changes.Also applies to: 185-185
196-209: LGTM! YAML generation test correctly updated.The test properly validates the new template format with correct parameter substitution.
315-321: LGTM! Async initialization properly tested.The test correctly mocks the async
initialize()method and verifies it's awaited, ensuring proper async client initialization.
414-469: LGTM! Test helper correctly updated for async workflow.The
_test_savehelper properly:
- Sets up sample documents with the correct structure (metadata + chunk_metadata)
- Mocks all async client methods with
AsyncMock- Verifies file system operations with appropriate assertions
- Returns the mocked client for further verification
486-492: LGTM! Save method tests properly validate async workflows.The tests correctly verify:
- Manual chunking path calls
vector_io.insertwith proper parameters- Auto chunking path awaits
file_batches.createBoth code paths are adequately covered.
Also applies to: 499-499
src/lightspeed_rag_content/document_processor.py (6)
17-17: LGTM! Necessary imports for async file handling.The
asyncioandBytesIOimports support the new async workflow and in-memory file creation for the LlamaStack integration.Also applies to: 23-23
71-77: LGTM! Proper separation for LlamaStack initialization.The early return correctly skips HuggingFaceEmbedding setup for LlamaStack, which handles embeddings internally, while still configuring chunking and markdown parsing as needed.
332-332: LGTM! Client class updated for async support.Correctly switches to
AsyncLlamaStackAsLibraryClientto enable async operations.
373-373: LGTM with note on async context limitation.The
asyncio.run(client.initialize())call correctly initializes the async client. Note that this approach works in the current synchronous context but would fail if called from an already-running async context. Consider refactoring to make the entire chain async if async callers are anticipated.
199-275: LGTM! YAML template updated for llama-stack 0.3.x.The template correctly reflects the new llama-stack configuration structure with proper sections for storage backends, providers, and registered resources.
575-581: LGTM! Save method properly orchestrates async workflows.The method correctly:
- Generates configuration
- Initializes the client
- Routes to appropriate async method based on chunking strategy
- Handles exceptions with logging
32cc3a9 to
a2fe7e1
Compare
There was a problem hiding this comment.
Actionable comments posted: 5
🤖 Fix all issues with AI agents
In @src/lightspeed_rag_content/document_processor.py:
- Around line 447-451: The current filename construction using filename =
f"{doc_uuid}::{docs_url}::{title}" is fragile because docs_url or title may
contain the delimiter; change this to a robust encoding scheme: build a small
dict with keys "uuid","url","title" (using the variables doc_uuid, docs_url,
title), serialize to JSON and percent-encode or base64-encode the result, then
set filename to the encoded string (and keep file_obj.name = f"{filename}.txt")
so the stored filename is unambiguous and safe for filesystem/URI use.
- Line 332: The docstring and code disagree about using
AsyncLlamaStackAsLibraryClient vs LlamaStackAsLibraryClient: either switch
creation to the synchronous wrapper LlamaStackAsLibraryClient so the return type
in the method docstring is correct, or update the docstring to state the method
returns AsyncLlamaStackAsLibraryClient and make initialization async-consistent
by calling asyncio.run(client.initialize()) (or awaiting client.initialize()
from async code) wherever client.initialize() is invoked; locate references to
AsyncLlamaStackAsLibraryClient, LlamaStackAsLibraryClient, and
client.initialize() in this class and apply one consistent pattern (sync wrapper
everywhere or async client with asyncio.run/await and updated docs).
- Around line 580-586: The AsyncLlamaStackAsLibraryClient instance created as
client is not being closed, causing HTTP/async resource leaks; wrap the
try/except with a finally that checks client is not None and calls its async
close method (e.g., awaitable client.aclose())—since this block is synchronous,
call it via asyncio.run(client.aclose()) (or client.close() if a sync close
exists), and swallow/Log any close errors; update the block around
_insert_prechunked_documents/_upload_and_process_file_batches to ensure
client.aclose() is always invoked in the finally.
🧹 Nitpick comments (3)
tests/test_document_processor_llama_stack.py (1)
486-492: Test validation could be more thorough.The test only checks that
vector_io.insertwas called once and verifies basic parameters. Consider validating the chunk structure more thoroughly to ensure document metadata and chunk_metadata are properly formatted.Enhanced validation
def test_save_manual_chunking(self, mocker, llama_stack_processor): """Test saving documents with manual chunking workflow.""" client = self._test_save(mocker, llama_stack_processor["config"]) - # Verify vector_io.insert was called once with the right vector_db_id - # Documents are modified during processing, so we just check it was called assert client.vector_io.insert.await_count == 1 call_kwargs = client.vector_io.insert.await_args.kwargs assert call_kwargs["vector_db_id"] == "vs_123" assert "chunks" in call_kwargs assert len(call_kwargs["chunks"]) == 2 + # Validate chunk structure + for chunk in call_kwargs["chunks"]: + assert "content" in chunk + assert "metadata" in chunk + assert "chunk_metadata" in chunk + assert "document_id" in chunk["metadata"]src/lightspeed_rag_content/document_processor.py (2)
429-466: Consider adding type hints for nested async function.The nested
upload_filefunction would benefit from explicit type hints for better code clarity and type checking, especially for thechunk_indicesparameter and return type.Add type hints
- async def upload_file(chunk_indices: list[int]) -> str: + async def upload_file(chunk_indices: list[int]) -> str: """Upload a placeholder file and update all related chunks.This is already correct. Consider adding type hints for the file_obj:
- file_obj = BytesIO("".encode("utf-8")) # Empty file for citation anchor + file_obj: BytesIO = BytesIO("".encode("utf-8")) # Empty file for citation anchor
550-554: Polling interval could be optimized.The fixed 2-second polling interval may be too aggressive for large batches or too slow for small ones. Consider implementing exponential backoff or a configurable interval.
Exponential backoff implementation
+ poll_interval = 2 # Start with 2 seconds + max_poll_interval = 30 # Cap at 30 seconds + while batch.status == "in_progress" and (time.time() - start_time) < max_wait: - await asyncio.sleep(2) + await asyncio.sleep(poll_interval) batch = await client.vector_stores.file_batches.retrieve( vector_store_id=vector_store.id, batch_id=batch.id ) + # Exponential backoff: double the interval up to max + poll_interval = min(poll_interval * 1.5, max_poll_interval)
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (3)
pyproject.tomlsrc/lightspeed_rag_content/document_processor.pytests/test_document_processor_llama_stack.py
🚧 Files skipped from review as they are similar to previous changes (1)
- pyproject.toml
🧰 Additional context used
🧬 Code graph analysis (1)
tests/test_document_processor_llama_stack.py (1)
src/lightspeed_rag_content/document_processor.py (5)
write_yaml_config(338-362)_start_llama_stack(364-374)save(155-160)save(565-586)save(697-700)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: Konflux kflux-prd-rh02 / rag-tool-on-pull-request
- GitHub Check: Pylinter
- GitHub Check: build-and-push-dev
🔇 Additional comments (13)
tests/test_document_processor_llama_stack.py (6)
18-18: LGTM!The import of
AsyncMockis necessary to properly mock asynchronous methods in the updated tests.
26-102: YAML template updated correctly.The expected YAML template now matches the new llama-stack 0.3.x format with proper structure for files, vector_io, storage backends, and registered_resources.
113-122: Mock refinement improves test isolation.The selective mocking of
os.path.existsensures that only embeddings_model_dir checks return False while preserving the original behavior for other paths, improving test accuracy.
315-321: Async initialization properly mocked and verified.The test correctly mocks the async
initializemethod withAsyncMockand verifies it was awaited.
414-469: Comprehensive async mocking for save flow.The helper properly mocks all async operations (vector_stores.create, files.create, vector_io.insert, vector_stores.file_batches.create) with appropriate return values.
159-159: No issues found.AsyncLlamaStackAsLibraryClientis available in llama-stack 0.3.5 and can be imported fromllama_stack.core.library_clientas shown in the code. The test assertions are correct.src/lightspeed_rag_content/document_processor.py (7)
17-17: LGTM!The imports for
asyncioandBytesIOare necessary for the async operations and in-memory file handling introduced in this PR.Also applies to: 23-23
199-275: YAML template updated for llama-stack 0.3.x.The template now includes the new structure with files, vector_io providers, storage backends, and registered_resources. The structure aligns with the PR objectives for 0.3.x compatibility.
338-362: LGTM!The removal of
provider_vector_db_idparameter aligns with the PR objectives stating that manual registration of separate vector indexes is no longer needed.
556-563: Error message provides good diagnostics.The error message includes detailed information about the batch status and file counts, which will be helpful for debugging failures.
71-77: The early return is correct and intentional for llamastack integration.The llamastack code path (_LlamaStackDB) does not require Settings.embed_model or Settings.llm. Instead, it initializes embeddings directly via SentenceTransformer (line 311) and uses llama_stack APIs for inference. The early return prevents unnecessary Settings initialization that would be incompatible with the llamastack integration approach. The Settings.embed_model initialization (lines 82-85) only applies to _LlamaIndexDB, which is used exclusively for "faiss" and "postgres" vector stores, not llamastack.
Likely an incorrect or invalid review comment.
373-373: No changes needed. Theasyncio.run(client.initialize())usage on line 373 is correct as-is. This private method is called only once persave()operation to initialize the client, makingasyncio.run()the appropriate pattern for bootstrapping async code from a sync context. The hypothetical concern about "multiple calls" doesn't apply to the actual codebase.
577-586: The asyncio.run() pattern here is unusual but functional. The client is initialized in one event loop (line 373) and then used in a separate event loop (lines 581/583), which is a less-than-ideal design. However, this only occurs whensave()is called from synchronous contexts (as evidenced by the codebase), so there's no runtime risk ofasyncio.run()being invoked from within a running event loop. If the llama-stack client maintains no persistent state across event loops (or properly handles it), the current implementation is acceptable. Consider documenting this pattern or refactoring to initialize the client within the same event loop as its usage if feasible.
| async def _insert_prechunked_documents(self, client: Any, index: str) -> None: | ||
| """Insert pre-chunked documents into the vector store. | ||
|
|
||
| This method uses two new llama-stack APIs (OpenAI compatible): | ||
| 1. vector_stores API: Creates a new vector store | ||
| 2. files API: Creates empty placeholder files for citation metadata | ||
| (provides document ID, URL, and title for citations) | ||
| And the vector_io API: Inserts chunks with embeddings | ||
|
|
||
| The empty files serve as citation anchors, linking chunks back to their | ||
| source documents without storing duplicate content (we don't use files downstream). | ||
| """ | ||
| provider_type = self.config.vector_store_type.split("-", 1)[1] | ||
| vector_store = await client.vector_stores.create( | ||
| name=index, | ||
| extra_body={ | ||
| "provider_id": provider_type, | ||
| "embedding_model": f"sentence-transformers/{self.model_name_or_dir}", | ||
| "embedding_dimension": self.config.embedding_dimension, | ||
| }, | ||
| ) | ||
|
|
||
| async def upload_file(chunk_indices: list[int]) -> str: | ||
| """Upload a placeholder file and update all related chunks. | ||
|
|
||
| We upload an empty file to get the correct references in OpenAI format downstream. | ||
|
|
||
| Args: | ||
| chunk_indices: List of indices in self.documents for chunks from same source | ||
|
|
||
| Returns: | ||
| File ID | ||
| """ | ||
| first_chunk = self.documents[chunk_indices[0]] | ||
| doc_metadata = first_chunk["metadata"] | ||
|
|
||
| docs_url = doc_metadata.get("docs_url", "") | ||
| title = doc_metadata.get("title", "") | ||
| doc_uuid = doc_metadata["document_id"] | ||
|
|
||
| # Create filename: {uuid}::{url}::{title}.txt | ||
| filename = f"{doc_uuid}::{docs_url}::{title}" | ||
|
|
||
| file_obj = BytesIO("".encode("utf-8")) # Empty file for citation anchor | ||
| file_obj.name = f"{filename}.txt" | ||
|
|
||
| uploaded_file = await client.files.create( | ||
| file=file_obj, | ||
| purpose="assistants", | ||
| ) | ||
|
|
||
| # Update ALL chunks from this source document with the file_id | ||
| for chunk_idx in chunk_indices: | ||
| self.documents[chunk_idx]["metadata"]["document_id"] = uploaded_file.id | ||
| self.documents[chunk_idx]["chunk_metadata"][ | ||
| "document_id" | ||
| ] = uploaded_file.id | ||
| self.documents[chunk_idx]["metadata"]["filename"] = filename | ||
|
|
||
| return str(uploaded_file.id) | ||
|
|
||
| # Group chunks by source document | ||
| doc_groups: dict[tuple[str, str], list[int]] = {} | ||
| for idx, chunk in enumerate(self.documents): | ||
| metadata = chunk["metadata"] | ||
| docs_url = metadata.get("docs_url", "") | ||
| title = metadata.get("title", "") | ||
| doc_key = (docs_url, title) | ||
| if doc_key not in doc_groups: | ||
| doc_groups[doc_key] = [] | ||
| doc_groups[doc_key].append(idx) | ||
|
|
||
| upload_tasks = [ | ||
| upload_file(chunk_indices) for chunk_indices in doc_groups.values() | ||
| ] | ||
| results = await asyncio.gather(*upload_tasks, return_exceptions=True) | ||
| for result in results: | ||
| if isinstance(result, Exception): | ||
| LOG.error("Failed to upload file: %s", result) | ||
|
|
||
| # Note: parameter is vector_db_id, not vector_store_id, will probably change soon | ||
| await client.vector_io.insert( | ||
| vector_db_id=vector_store.id, chunks=self.documents | ||
| ) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Description: Check if there are similar error handling patterns elsewhere
# Search for asyncio.gather with return_exceptions=True
rg -nP 'asyncio\.gather\(.*return_exceptions=True' --type=pyRepository: lightspeed-core/rag-content
Length of output: 332
🏁 Script executed:
#!/bin/bash
# Search for TODO, FIXME, or bug tracking references related to llama-stack
rg -nP '(TODO|FIXME|llama-stack|llama_stack)' --type=py -A 2Repository: lightspeed-core/rag-content
Length of output: 14661
🏁 Script executed:
#!/bin/bash
# Look at the full context of the document_processor.py file around the problematic method
wc -l src/lightspeed_rag_content/document_processor.pyRepository: lightspeed-core/rag-content
Length of output: 123
🏁 Script executed:
#!/bin/bash
# Check for any comments or docstrings that might mention the bug or workaround
rg -nP 'citation.*anchor|empty file|placeholder file' --type=py -B 2 -A 2Repository: lightspeed-core/rag-content
Length of output: 2147
🏁 Script executed:
#!/bin/bash
# Check the second asyncio.gather call around line 525
sed -n '500,540p' src/lightspeed_rag_content/document_processor.pyRepository: lightspeed-core/rag-content
Length of output: 1596
🏁 Script executed:
#!/bin/bash
# Look for any error handling documentation or patterns
rg -nP 'graceful|error.*handling|partial.*fail' --type=py -B 2 -A 2Repository: lightspeed-core/rag-content
Length of output: 53
🏁 Script executed:
#!/bin/bash
# Check if BytesIO is used elsewhere and how it's handled
rg -nP 'BytesIO' --type=py -B 2 -A 2Repository: lightspeed-core/rag-content
Length of output: 1358
🏁 Script executed:
#!/bin/bash
# Look for any GitHub issues, PR descriptions, or commit messages mentioning this workaround
git log --all --oneline --grep="citation\|placeholder\|empty file" 2>/dev/null | head -20Repository: lightspeed-core/rag-content
Length of output: 53
🏁 Script executed:
#!/bin/bash
# Look at the wider context around both methods to understand the design better
sed -n '407,430p' src/lightspeed_rag_content/document_processor.pyRepository: lightspeed-core/rag-content
Length of output: 1276
🏁 Script executed:
#!/bin/bash
# Check if there's any deliberate pattern for graceful error handling in async operations
rg -nP 'return_exceptions|LOG\.error' --type=py -B 1 -A 1 | head -40Repository: lightspeed-core/rag-content
Length of output: 2766
🏁 Script executed:
#!/bin/bash
# Check the class definition to understand the overall design intent
sed -n '276,300p' src/lightspeed_rag_content/document_processor.pyRepository: lightspeed-core/rag-content
Length of output: 1048
🏁 Script executed:
#!/bin/bash
# Let's look at the full context of both methods to understand the difference in error handling
sed -n '407,490p' src/lightspeed_rag_content/document_processor.py && echo "---SECOND METHOD---" && sed -n '497,545p' src/lightspeed_rag_content/document_processor.pyRepository: lightspeed-core/rag-content
Length of output: 5478
🏁 Script executed:
#!/bin/bash
# Check what method name the second code block belongs to
sed -n '495,505p' src/lightspeed_rag_content/document_processor.pyRepository: lightspeed-core/rag-content
Length of output: 516
🏁 Script executed:
#!/bin/bash
# Find the method name for the second block
sed -n '490,510p' src/lightspeed_rag_content/document_processor.py | grep -E 'async def|def 'Repository: lightspeed-core/rag-content
Length of output: 209
🏁 Script executed:
#!/bin/bash
# Get broader context to find method signature
sed -n '490,500p' src/lightspeed_rag_content/document_processor.pyRepository: lightspeed-core/rag-content
Length of output: 623
🏁 Script executed:
#!/bin/bash
# Check if there's any documentation or PR description explaining design decisions
git log --all --oneline -n 50 -- src/lightspeed_rag_content/document_processor.py 2>/dev/null | head -20Repository: lightspeed-core/rag-content
Length of output: 787
🏁 Script executed:
#!/bin/bash
# Search for any GitHub issues or bug references in the codebase
rg -nP 'github\.com.*issue|OLS-|JIRA|ticket' --type=pyRepository: lightspeed-core/rag-content
Length of output: 243
🏁 Script executed:
#!/bin/bash
# Check if BytesIO context manager pattern is used elsewhere in the codebase
rg -nP 'with.*BytesIO|BytesIO.*as' --type=pyRepository: lightspeed-core/rag-content
Length of output: 53
🏁 Script executed:
#!/bin/bash
# Look at the full _upload_and_process_file_batches method to see complete error handling
sed -n '490,570p' src/lightspeed_rag_content/document_processor.pyRepository: lightspeed-core/rag-content
Length of output: 3202
Review error handling inconsistency in file upload methods.
Two related async methods handle file upload failures differently:
-
_insert_prechunked_documents (lines 407-490): Logs individual file upload failures but continues processing, then calls
vector_io.insert()with all documents regardless of whether their file uploads succeeded. -
_upload_and_process_file_batches (lines 490+): Filters successful uploads and only includes those in the batch, with explicit error handling if batch processing fails.
This inconsistency means the first method may attempt to insert documents with unresolved file references. Align the error handling: either filter failed uploads before calling vector_io.insert() in _insert_prechunked_documents, or explicitly document the expected behavior when file uploads fail.
Minor note: BytesIO objects are not explicitly closed, though this is not a critical resource leak since BytesIO doesn't hold external file handles. Consider using context managers for consistency with best practices.
The empty file citation anchor workaround is already documented in the docstring (lines 410-417) and inline comments.
| # Create filename: {uuid}::{url}::{title}.txt | ||
| filename = f"{doc_uuid}::{docs_url}::{title}" | ||
|
|
||
| file_obj = BytesIO("".encode("utf-8")) # Empty file for citation anchor | ||
| file_obj.name = f"{filename}.txt" |
There was a problem hiding this comment.
Filename pattern is fragile.
The filename pattern {uuid}::{url}::{title}.txt with :: as delimiter could be problematic if URLs or titles contain ::. Consider using a more robust delimiter or encoding scheme.
Suggested improvement
Consider using URL encoding or a less common delimiter:
- # Create filename: {uuid}::{url}::{title}.txt
- filename = f"{doc_uuid}::{docs_url}::{title}"
+ # Create filename: {uuid}||{url}||{title}.txt
+ # Using || as delimiter (less likely to appear in URLs/titles)
+ filename = f"{doc_uuid}||{docs_url}||{title}"Or use a more robust approach with JSON encoding:
import urllib.parse
filename_data = {"uuid": doc_uuid, "url": docs_url, "title": title}
filename = urllib.parse.quote(json.dumps(filename_data))🤖 Prompt for AI Agents
In @src/lightspeed_rag_content/document_processor.py around lines 447 - 451, The
current filename construction using filename =
f"{doc_uuid}::{docs_url}::{title}" is fragile because docs_url or title may
contain the delimiter; change this to a robust encoding scheme: build a small
dict with keys "uuid","url","title" (using the variables doc_uuid, docs_url,
title), serialize to JSON and percent-encode or base64-encode the result, then
set filename to the encoded string (and keep file_obj.name = f"{filename}.txt")
so the stored filename is unambiguous and safe for filesystem/URI use.
| async def _upload_and_process_file_batches(self, client: Any, index: str) -> None: | ||
| """Upload files and process them in batch using file_batches.""" | ||
| provider_type = self.config.vector_store_type.split("-", 1)[1] | ||
| vector_store = await client.vector_stores.create( | ||
| name=index, | ||
| extra_body={ | ||
| "provider_id": provider_type, | ||
| "embedding_model": f"sentence-transformers/{self.model_name_or_dir}", | ||
| "embedding_dimension": self.config.embedding_dimension, | ||
| }, | ||
| ) | ||
|
|
||
| async def upload_file(rag_doc: Any) -> str: | ||
| """Upload a single file to the Files API.""" | ||
| doc_metadata = rag_doc.metadata | ||
|
|
||
| docs_url = doc_metadata.get("docs_url", "") | ||
| title = doc_metadata.get("title", "") | ||
| doc_uuid = rag_doc.document_id | ||
|
|
||
| # Match working format: {uuid}::{url}::{title}.txt | ||
| filename = f"{doc_uuid}::{docs_url}::{title}" | ||
|
|
||
| file_obj = BytesIO(rag_doc.content.encode("utf-8")) | ||
| file_obj.name = f"{filename}.txt" | ||
|
|
||
| uploaded_file = await client.files.create( | ||
| file=file_obj, | ||
| purpose="assistants", | ||
| ) | ||
| return str(uploaded_file.id) | ||
|
|
||
| upload_tasks = [upload_file(doc) for doc in self.documents] | ||
| results = await asyncio.gather(*upload_tasks, return_exceptions=True) | ||
|
|
||
| file_ids = [] | ||
| for uploaded_file_id in results: | ||
| if isinstance(uploaded_file_id, Exception): | ||
| LOG.error("Failed to upload file: %s", uploaded_file_id) | ||
| else: | ||
| file_ids.append(uploaded_file_id) | ||
|
|
||
| batch = await client.vector_stores.file_batches.create( | ||
| vector_store_id=vector_store.id, | ||
| file_ids=file_ids, | ||
| chunking_strategy={ | ||
| "type": "static", | ||
| "static": { | ||
| "max_chunk_size_tokens": self.config.chunk_size, | ||
| "chunk_overlap_tokens": self.config.chunk_overlap, | ||
| }, | ||
| }, | ||
| ) | ||
|
|
||
| # Wait for batch completion | ||
| max_wait = 24 * 60 * 60 | ||
| start_time = time.time() | ||
|
|
||
| while batch.status == "in_progress" and (time.time() - start_time) < max_wait: | ||
| await asyncio.sleep(2) | ||
| batch = await client.vector_stores.file_batches.retrieve( | ||
| vector_store_id=vector_store.id, batch_id=batch.id | ||
| ) | ||
|
|
||
| if batch.status != "completed": | ||
| error_msg = ( | ||
| f"Batch processing failed with status '{batch.status}'. " | ||
| f"Completed {batch.file_counts.completed}/{batch.file_counts.total} files. " | ||
| f"Failed: {batch.file_counts.failed}, In progress: {batch.file_counts.in_progress}" | ||
| ) | ||
| LOG.error(error_msg) | ||
| raise RuntimeError(error_msg) |
There was a problem hiding this comment.
Batch processing timeout is excessive.
The 24-hour timeout (max_wait = 24 * 60 * 60) seems unreasonably long for file batch processing. Consider:
- Adding a configurable timeout parameter
- Using a more reasonable default (e.g., 1-2 hours)
- Adding progress logging during the wait
Suggested improvements
- # Wait for batch completion
- max_wait = 24 * 60 * 60
+ # Wait for batch completion (default: 2 hours)
+ max_wait = getattr(self.config, 'batch_timeout', 2 * 60 * 60)
start_time = time.time()
+ last_log_time = start_time
while batch.status == "in_progress" and (time.time() - start_time) < max_wait:
await asyncio.sleep(2)
batch = await client.vector_stores.file_batches.retrieve(
vector_store_id=vector_store.id, batch_id=batch.id
)
+ # Log progress every 60 seconds
+ current_time = time.time()
+ if current_time - last_log_time >= 60:
+ LOG.info(
+ "Batch processing in progress: %d/%d completed",
+ batch.file_counts.completed,
+ batch.file_counts.total
+ )
+ last_log_time = current_time| if self.config.manual_chunking: | ||
| client.vector_io.insert(vector_db_id=index, chunks=self.documents) | ||
| asyncio.run(self._insert_prechunked_documents(client, index)) | ||
| else: | ||
| client.tool_runtime.rag_tool.insert( | ||
| documents=self.documents, | ||
| vector_db_id=index, | ||
| chunk_size_in_tokens=self.config.chunk_size, | ||
| ) | ||
| asyncio.run(self._upload_and_process_file_batches(client, index)) | ||
| except Exception as exc: | ||
| LOG.error("Failed to insert document: %s", exc) | ||
| raise |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's examine the document_processor.py file around the flagged lines
grep -n "_start_llama_stack" src/lightspeed_rag_content/document_processor.pyRepository: lightspeed-core/rag-content
Length of output: 185
🏁 Script executed:
# Find the _start_llama_stack method definition
rg -A 20 "def _start_llama_stack" src/lightspeed_rag_content/document_processor.pyRepository: lightspeed-core/rag-content
Length of output: 974
🏁 Script executed:
# Check imports to understand what client type is being used
head -50 src/lightspeed_rag_content/document_processor.py | rg -i "llama|client|import"Repository: lightspeed-core/rag-content
Length of output: 1026
🏁 Script executed:
# Find all references to AsyncLlamaStackAsLibraryClient
rg "AsyncLlamaStackAsLibraryClient" src/Repository: lightspeed-core/rag-content
Length of output: 210
🏁 Script executed:
# Check if there's any cleanup pattern already in the codebase
rg -B 5 -A 5 "\.close\(\)|\.cleanup\(\)|__aenter__|__aexit__" src/lightspeed_rag_content/document_processor.pyRepository: lightspeed-core/rag-content
Length of output: 53
🏁 Script executed:
# Find where client_class is defined/set
rg -B 5 -A 5 "client_class\s*=" src/lightspeed_rag_content/document_processor.pyRepository: lightspeed-core/rag-content
Length of output: 610
🏁 Script executed:
# Look for the __init__ method to see how client_class is initialized
rg -A 30 "def __init__" src/lightspeed_rag_content/document_processor.py | head -50Repository: lightspeed-core/rag-content
Length of output: 1846
🏁 Script executed:
# Check if there are any imports from llama_stack
rg "from llama_stack|import llama_stack" src/lightspeed_rag_content/Repository: lightspeed-core/rag-content
Length of output: 171
🏁 Script executed:
# Let's check the requirements or setup.py to understand llama-stack version
fd "requirements|setup\\.py|pyproject\\.toml" . | xargs grep -l "llama.stack" 2>/dev/nullRepository: lightspeed-core/rag-content
Length of output: 87
🌐 Web query:
llama-stack LlamaStackAsLibraryClient resource cleanup asyncio
💡 Result:
Short answer: use the async context manager or explicitly close the async client (await aclose()) — and ensure any streaming generators are closed — to clean up sockets/tasks.
Why and how (concise):
-
Preferred: open the AsyncLlamaStackClient with an async context manager so resources are closed on exit.
Example:
async with AsyncLlamaStackClient(...) as client:
await client.models.list()
(client is closed automatically). [1][2] -
If not using a context manager, explicitly close the client when done:
await client.aclose() # async client
client.close() # sync client
(The library wraps httpx/aiohttp clients so explicit aclose()/close() is required for AsyncClient HTTP backends). [1][2][3] -
If you use a custom http backend or streams (SSE/async generators), also close those streams/generators (call their close()/aclose() as appropriate) to avoid leaked tasks/sockets. httpx’s async client warns about unclosed clients and requires explicit aclose() or context manager. [3]
Sources:
- Llama Stack Python client docs / repo (AsyncLlamaStackClient, context-manager and close behavior). [1]
- PyPI llama-stack-client package docs (async usage + aiohttp backend example with async with). [2]
- httpx async client docs: async clients must be closed (aclose or async with). [3]
References:
[1] GitHub — llamastack/llama-stack-client-python.
[2] PyPI — llama-stack-client.
[3] httpx async client docs (async client aexit/aclose notes).
Missing resource cleanup for AsyncLlamaStackAsLibraryClient.
The client is created and initialized but never explicitly closed, causing resource leaks. The underlying httpx/aiohttp client requires explicit cleanup via aclose() or a context manager.
Add a finally block to ensure the client is closed:
Suggested fix
client = self._start_llama_stack(cfg_file)
try:
if self.config.manual_chunking:
asyncio.run(self._insert_prechunked_documents(client, index))
else:
asyncio.run(self._upload_and_process_file_batches(client, index))
+ finally:
+ asyncio.run(client.aclose())
except Exception as exc:
LOG.error("Failed to insert document: %s", exc)
raiseCommittable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In @src/lightspeed_rag_content/document_processor.py around lines 580 - 586, The
AsyncLlamaStackAsLibraryClient instance created as client is not being closed,
causing HTTP/async resource leaks; wrap the try/except with a finally that
checks client is not None and calls its async close method (e.g., awaitable
client.aclose())—since this block is synchronous, call it via
asyncio.run(client.aclose()) (or client.close() if a sync close exists), and
swallow/Log any close errors; update the block around
_insert_prechunked_documents/_upload_and_process_file_batches to ensure
client.aclose() is always invoked in the finally.
a2fe7e1 to
b3aad52
Compare
b3aad52 to
b402f79
Compare
|
@tisnik please review the changes to the readme too, which highlight a limitation. TL;DR Also, rag db + files metadata + metadata (registry) are now saved in the same store, since we also need the files metadata to get file citations in the response. |
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/lightspeed_rag_content/document_processor.py (1)
367-377: Docstring references outdated class name.The docstring mentions
LlamaStackAsLibraryClientbut the code now usesAsyncLlamaStackAsLibraryClient.📝 Suggested fix
def _start_llama_stack(self, cfg_file: str) -> Any: """Start llama-stack as a library and return the client. Return type is really - llama_stack.core.library_client.LlamaStackAsLibraryClient + llama_stack.core.library_client.AsyncLlamaStackAsLibraryClient But we do dynamic import, so we don't have it for static typechecking """
🤖 Fix all issues with AI agents
In @pyproject.toml:
- Around line 49-55: pyproject.toml adds chardet and einops but they are not
used; either remove "chardet>=5.2.0" and "einops>=0.8.1" from the dependencies
list or, if they are required for upcoming features, add a brief justification
and demonstrate usage by importing them where needed (e.g., reference chardet in
any BytesIO/encoding handling code and import einops in modules that will use
its tensor operations). Update the dependency list accordingly and add a
one-line comment or changelog note explaining why they remain if you keep them.
In @src/lightspeed_rag_content/document_processor.py:
- Around line 449-450: The filename construction using the literal delimiter
"::" (variable filename built from doc_uuid, docs_url, title) can collide with
URLs or titles containing "::"; update the logic that builds filename in
document_processor.py so components are either safely encoded (e.g., URL-encode
docs_url and title) or joined with a delimiter unlikely to appear (and
document_uuid kept raw), and ensure any downstream parser that splits filename
is updated to decode/handle the chosen encoding/delimiter; change the code that
sets filename (the line using doc_uuid, docs_url, title) accordingly and add a
comment noting the encoding/decoding expectation.
- Around line 498-502: The embedding_model string in the extra_body dict is
constructed with a hardcoded "sentence-transformers/" prefix which duplicates
the prefix when model_name_or_dir already includes it; update the construction
in the method that builds extra_body (same pattern as fixed in
_insert_prechunked_documents) to conditionally prefix only when
self.model_name_or_dir does not already start with "sentence-transformers/"
(i.e., compute embedding_model = self.model_name_or_dir if startswith else
"sentence-transformers/" + self.model_name_or_dir) and use that variable in
extra_body to avoid double-prefixing.
- Around line 425-428: The embedding_model string is always prefixed with
"sentence-transformers/", which breaks absolute local paths; inside the code
that builds the dict (where "embedding_model":
f"sentence-transformers/{self.model_name_or_dir}" is set in DocumentProcessor),
change it to conditionally apply the prefix: use self.model_name_or_dir
unchanged when it is an absolute filesystem path (use
os.path.isabs(self.model_name_or_dir)) and only prepend "sentence-transformers/"
for non-absolute model names; ensure you import os if not already present.
🧹 Nitpick comments (1)
tests/test_document_processor_llama_stack.py (1)
113-122: Mock setup for os.path.exists is functional but fragile.The substring check
"embeddings_model" in str(path)may match unintended paths. Consider using an exact path comparison or a more specific pattern.♻️ Suggested refinement
def mock_exists(path): # Return False for embeddings_model_dir, True for everything else - if "embeddings_model" in str(path): + if str(path) == "embeddings_model" or str(path).endswith("/embeddings_model"): return False return original_exists(path)
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (4)
README.mdpyproject.tomlsrc/lightspeed_rag_content/document_processor.pytests/test_document_processor_llama_stack.py
🧰 Additional context used
🧬 Code graph analysis (1)
tests/test_document_processor_llama_stack.py (1)
src/lightspeed_rag_content/document_processor.py (3)
write_yaml_config(343-365)provider_type(339-341)_start_llama_stack(367-377)
🪛 LanguageTool
README.md
[style] ~218-~218: ‘exact same’ might be wordy. Consider a shorter alternative.
Context: ...mbedding model must be available at the exact same path** that was specified during databa...
(EN_WORDINESS_PREMIUM_EXACT_SAME)
🔇 Additional comments (7)
README.md (1)
208-229: Documentation clearly explains the embedding model path portability limitation.This is helpful guidance for users who need to deploy vector databases across different environments. The recommendation to use
-md ""with a HuggingFace model ID for full portability is a good workaround.tests/test_document_processor_llama_stack.py (2)
305-324: Async initialization test is correctly updated.The test properly mocks the async
initializemethod and usesassert_awaited_once()to verify it was called.
483-499: Save flow tests correctly verify async operations.The assertions properly check
await_count,await_args, and useassert_awaited_once()for the async client methods.src/lightspeed_rag_content/document_processor.py (4)
71-77: Early return for llamastack avoids unnecessary embedding model initialization.This correctly skips the HuggingFaceEmbedding setup for llama-stack backends since they handle embedding differently.
338-341: Provider type extraction is clean and well-guarded.The constructor's assertion at lines 297-300 ensures
vector_store_typeis always in the expected format before this property is accessed.
547-555: Polling loop for batch completion is well-implemented.The 24-hour timeout and 2-second polling interval are reasonable for potentially large batch operations. The error handling provides useful diagnostic information.
579-587: Multipleasyncio.run()calls withinsave()method.The method calls
asyncio.run()in_start_llama_stack()(line 376) and again at lines 582/584. While this works for CLI tooling, it would fail if called from within an existing async context. Since this appears to be designed for CLI use, this is acceptable.Ensure this code is only called from synchronous CLI entry points, not from async contexts where an event loop may already be running.
| "provider_id": self.provider_type, | ||
| "embedding_model": f"sentence-transformers/{self.model_name_or_dir}", | ||
| "embedding_dimension": self.config.embedding_dimension, | ||
| }, |
There was a problem hiding this comment.
Hardcoded sentence-transformers/ prefix may cause issues with local model paths.
When model_name_or_dir is an absolute path (e.g., /app/embeddings), prefixing it with sentence-transformers/ produces an invalid model identifier like sentence-transformers//app/embeddings.
🐛 Suggested fix
vector_store = await client.vector_stores.create(
name=index,
extra_body={
"provider_id": self.provider_type,
- "embedding_model": f"sentence-transformers/{self.model_name_or_dir}",
+ "embedding_model": self.config.model_name,
"embedding_dimension": self.config.embedding_dimension,
},
)Or conditionally apply the prefix only when needed:
embedding_model = (
self.model_name_or_dir
if os.path.isabs(self.model_name_or_dir)
else f"sentence-transformers/{self.model_name_or_dir}"
)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| "provider_id": self.provider_type, | |
| "embedding_model": f"sentence-transformers/{self.model_name_or_dir}", | |
| "embedding_dimension": self.config.embedding_dimension, | |
| }, | |
| "provider_id": self.provider_type, | |
| "embedding_model": self.config.model_name, | |
| "embedding_dimension": self.config.embedding_dimension, | |
| }, |
🤖 Prompt for AI Agents
In @src/lightspeed_rag_content/document_processor.py around lines 425 - 428, The
embedding_model string is always prefixed with "sentence-transformers/", which
breaks absolute local paths; inside the code that builds the dict (where
"embedding_model": f"sentence-transformers/{self.model_name_or_dir}" is set in
DocumentProcessor), change it to conditionally apply the prefix: use
self.model_name_or_dir unchanged when it is an absolute filesystem path (use
os.path.isabs(self.model_name_or_dir)) and only prepend "sentence-transformers/"
for non-absolute model names; ensure you import os if not already present.
| # Create filename: {uuid}::{url}::{title}.txt | ||
| filename = f"{doc_uuid}::{docs_url}::{title}" |
There was a problem hiding this comment.
Filename delimiter :: may conflict with URLs or titles containing that pattern.
If a URL like https://example.com::8080/path or a title containing :: is encountered, downstream parsing will break. Consider using a safer delimiter or URL-encoding the components.
💡 Suggestion
Consider URL-encoding the components or using a delimiter less likely to appear in URLs/titles:
import urllib.parse
filename = f"{doc_uuid}||{urllib.parse.quote(docs_url, safe='')}||{urllib.parse.quote(title, safe='')}"🤖 Prompt for AI Agents
In @src/lightspeed_rag_content/document_processor.py around lines 449 - 450, The
filename construction using the literal delimiter "::" (variable filename built
from doc_uuid, docs_url, title) can collide with URLs or titles containing "::";
update the logic that builds filename in document_processor.py so components are
either safely encoded (e.g., URL-encode docs_url and title) or joined with a
delimiter unlikely to appear (and document_uuid kept raw), and ensure any
downstream parser that splits filename is updated to decode/handle the chosen
encoding/delimiter; change the code that sets filename (the line using doc_uuid,
docs_url, title) accordingly and add a comment noting the encoding/decoding
expectation.
| extra_body={ | ||
| "provider_id": self.provider_type, | ||
| "embedding_model": f"sentence-transformers/{self.model_name_or_dir}", | ||
| "embedding_dimension": self.config.embedding_dimension, | ||
| }, |
There was a problem hiding this comment.
Same sentence-transformers/ prefix issue as in _insert_prechunked_documents.
This duplicates the problematic embedding model path construction.
See the fix suggested for _insert_prechunked_documents at line 426.
🤖 Prompt for AI Agents
In @src/lightspeed_rag_content/document_processor.py around lines 498 - 502, The
embedding_model string in the extra_body dict is constructed with a hardcoded
"sentence-transformers/" prefix which duplicates the prefix when
model_name_or_dir already includes it; update the construction in the method
that builds extra_body (same pattern as fixed in _insert_prechunked_documents)
to conditionally prefix only when self.model_name_or_dir does not already start
with "sentence-transformers/" (i.e., compute embedding_model =
self.model_name_or_dir if startswith else "sentence-transformers/" +
self.model_name_or_dir) and use that variable in extra_body to avoid
double-prefixing.
| - config: | ||
| metadata_store: | ||
| table_name: files_metadata | ||
| backend: kv_default |
There was a problem hiding this comment.
| backend: kv_default | |
| backend: sql_default |
Cloning this and trying to build the image was throwing errors unless I changed this. Can you confirm?
There was a problem hiding this comment.
Image building works fine for me, I am also able to create a vector db... Not sure why you get the error, could you share the error?
b402f79 to
5e28151
Compare
5e28151 to
f899d0e
Compare
There was a problem hiding this comment.
Actionable comments posted: 6
🤖 Fix all issues with AI agents
In @src/lightspeed_rag_content/document_processor.py:
- Around line 486-493: The loop over results currently only logs exceptions and
then continues to call client.vector_io.insert, which can write chunks
referencing missing document_ids; instead, after gathering results (the results
variable), detect any Exception instances, collect their details (or
filenames/ids if available), log them, and abort by raising an error or
returning early so client.vector_io.insert(vector_db_id=vector_store.id,
chunks=self.documents) is not executed when uploads failed; update the code
paths around results, client.vector_io.insert, and self.documents to ensure
insert only runs when no upload exceptions occurred.
- Around line 423-430: The code is prepending "sentence-transformers/" to the
embedding model ID which causes duplicated prefixes or incorrect paths; update
the client.vector_stores.create call (and the other usage at the second
occurrence) to pass the embedding model exactly as provided by
self.model_name_or_dir (i.e., set "embedding_model": self.model_name_or_dir)
instead of concatenating a hardcoded "sentence-transformers/" prefix, ensuring
both the create call and the other place referencing embedding_model use the
unmodified model identifier.
In @tests/test_document_processor_llama_stack.py:
- Around line 218-222: The test calls doc.write_yaml_config(provider_id,
yaml_file, db_file) but write_yaml_config requires a fourth
files_metadata_db_file argument; define a files_metadata_db_file variable (e.g.,
files_metadata_db_file = "files_metadata_db_file") and pass it into the call so
it becomes doc.write_yaml_config(provider_id, yaml_file, db_file,
files_metadata_db_file), ensuring the test mirrors the FAISS test fix.
- Around line 196-209: The test calls write_yaml_config with only three args but
the signature now requires files_metadata_db_file; add a files_metadata_db_file
variable (e.g., files_metadata_db_file = "files_db_file") and call
doc.write_yaml_config(index_id, yaml_file, db_file, files_metadata_db_file);
also update the expected YAML payload used in the assertion
(FAISS_EXPECTED.format(...)) to include the files_metadata_db_file substitution
if the template expects it so the mocked write contents match.
- Around line 448-479: The test currently mocks os.path.realpath to return a
single value but save() now calls realpath twice; change the realpath mock to
use a side_effect that returns two values (e.g. "/cwd/out_dir/faiss_store.db"
then "/cwd/out_dir/files_metadata.db") and replace the single-call assertion
with an assertion that both calls occurred (use realpath.assert_has_calls or
check call_count == 2 and the two expected call args) so the test verifies both
realpath invocations for faiss_store.db and files_metadata.db.
- Around line 26-102: The test YAML constant FAISS_EXPECTED and the inline
expected YAML in test_write_yaml_config_sqlitevec do not match the
implementation TEMPLATE in document_processor.py: change the files provider
metadata_store backend from "kv_default" to "sql_files" to align with TEMPLATE;
update FAISS_EXPECTED (and the other inline expected YAML referenced in
test_write_yaml_config_sqlitevec) so the files provider block uses backend:
sql_files while keeping placeholders (e.g., storage_dir, table_name) unchanged.
🧹 Nitpick comments (2)
src/lightspeed_rag_content/document_processor.py (2)
548-565: 24-hour timeout for batch processing is excessive.A 24-hour (
max_wait = 24 * 60 * 60) timeout for batch processing seems extremely long. If something goes wrong, users would wait a full day before seeing an error. Consider a more reasonable default with an option to configure it.♻️ Suggested improvement
# Wait for batch completion - max_wait = 24 * 60 * 60 + max_wait = 60 * 60 # 1 hour default; adjust based on expected workloads start_time = time.time()Alternatively, consider making this configurable via the
_Configclass.
584-591: Verify error handling for async operations.The try/except block catches exceptions from
asyncio.run()but the error message says "Failed to insert document" which may be misleading for upload failures or batch processing errors.♻️ Suggested improvement for clearer error messaging
try: if self.config.manual_chunking: asyncio.run(self._insert_prechunked_documents(client, index)) else: asyncio.run(self._upload_and_process_file_batches(client, index)) except Exception as exc: - LOG.error("Failed to insert document: %s", exc) + LOG.error("Failed to process documents for vector store '%s': %s", index, exc) raise
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (4)
README.mdpyproject.tomlsrc/lightspeed_rag_content/document_processor.pytests/test_document_processor_llama_stack.py
🧰 Additional context used
🧬 Code graph analysis (1)
tests/test_document_processor_llama_stack.py (1)
src/lightspeed_rag_content/document_processor.py (6)
write_yaml_config(346-366)provider_type(342-344)_start_llama_stack(368-378)save(155-160)save(567-591)save(702-705)
🪛 GitHub Actions: Unit tests
tests/test_document_processor_llama_stack.py
[error] 199-199: _LlamaStackDB.write_yaml_config() missing 1 required positional argument: 'files_metadata_db_file'
[error] 222-222: _LlamaStackDB.write_yaml_config() missing 1 required positional argument: 'files_metadata_db_file'
[error] 474-474: AssertionError: Expected 'realpath' to be called once. Called 2 times. Calls: [call('out_dir/faiss_store.db'), call('out_dir/files_metadata.db')]
[error] 485-485: AssertionError: Expected 'realpath' to be called once. Called 2 times. Calls: [call('out_dir/faiss_store.db'), call('out_dir/files_metadata.db')]
src/lightspeed_rag_content/document_processor.py
[error] 1-1: LlamaStack write_yaml_config signature mismatch: missing required files_metadata_db_file argument.
[error] 532-532: Upload failed: 'dict' object has no attribute 'metadata' while processing documents in upload step.
[error] 1-1: os.path.realpath called twice during save (once for faiss_store.db and once for files_metadata.db); tests expect a single call for faiss_store.db.
🪛 LanguageTool
README.md
[style] ~218-~218: ‘exact same’ might be wordy. Consider a shorter alternative.
Context: ...mbedding model must be available at the exact same path** that was specified during databa...
(EN_WORDINESS_PREMIUM_EXACT_SAME)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: build-and-push-dev
- GitHub Check: Pylinter
- GitHub Check: Konflux kflux-prd-rh02 / rag-tool-on-pull-request
🔇 Additional comments (9)
pyproject.toml (1)
49-55: LGTM!Dependency updates look appropriate for the llama-stack 0.3.x migration. Pinning
llama-stackandllama-stack-clientto exact version0.3.5is sensible given the breaking API changes in this release.README.md (1)
208-230: LGTM!The new documentation section clearly explains the embedding model path portability limitation and provides actionable recommendations. This is valuable guidance for users deploying vector databases across different environments.
tests/test_document_processor_llama_stack.py (1)
305-324: LGTM!The async initialization test correctly uses
AsyncMockfor theinitializemethod and properly asserts it was awaited.src/lightspeed_rag_content/document_processor.py (6)
17-23: LGTM!New imports for
asyncioandBytesIOare appropriate for the async llama-stack client integration and file upload functionality.
334-336: LGTM!Switching to
AsyncLlamaStackAsLibraryClientaligns with the llama-stack 0.3.x API changes.
341-344: LGTM!The
provider_typeproperty cleanly extracts the backend type from the vector_store_type string.
376-378: Verify nestedasyncio.runcalls don't cause issues.
_start_llama_stackusesasyncio.run(client.initialize()), and thensave()callsasyncio.run(self._insert_prechunked_documents(...))on the returned client. This works because eachasyncio.run()creates a new event loop, but nesting them or calling from an already-running loop would fail.Ensure this code path is never invoked from within an existing async context (e.g., from a web framework's async handler), as that would raise
RuntimeError: This event loop is already running.
199-278: LGTM - Updated YAML template for llama-stack 0.3.x.The TEMPLATE has been properly updated to reflect the new llama-stack 0.3.x configuration structure including:
version: 2andimage_name: starter- New
filesAPI and provider configuration- Separate
sql_filesbackend for files metadata- Updated storage configuration with
kv_defaultandsql_filesbackends
567-582: LGTM - Save method correctly calls async methods.The
savemethod properly constructs paths for bothdb_fileandfiles_metadata_db_file, passes them towrite_yaml_config, and dispatches to the appropriate async method based onmanual_chunkingsetting.
| for result in results: | ||
| if isinstance(result, Exception): | ||
| LOG.error("Failed to upload file: %s", result) | ||
|
|
||
| # Note: parameter is vector_db_id, not vector_store_id, will probably change soon | ||
| await client.vector_io.insert( | ||
| vector_db_id=vector_store.id, chunks=self.documents | ||
| ) |
There was a problem hiding this comment.
Silent failures during file uploads may lead to incomplete vector stores.
When asyncio.gather returns exceptions, they're only logged but the method continues to insert chunks. If critical files fail to upload, the downstream vector_io.insert will proceed with stale/missing document_id references in the chunks.
💡 Suggested improvement - track and report failed uploads
results = await asyncio.gather(*upload_tasks, return_exceptions=True)
+ failed_count = 0
for result in results:
if isinstance(result, Exception):
LOG.error("Failed to upload file: %s", result)
+ failed_count += 1
+
+ if failed_count > 0:
+ LOG.warning(
+ "%d/%d file uploads failed; proceeding with partial data",
+ failed_count,
+ len(upload_tasks),
+ )
# Note: parameter is vector_db_id, not vector_store_id, will probably change soon
await client.vector_io.insert(
vector_db_id=vector_store.id, chunks=self.documents
)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| for result in results: | |
| if isinstance(result, Exception): | |
| LOG.error("Failed to upload file: %s", result) | |
| # Note: parameter is vector_db_id, not vector_store_id, will probably change soon | |
| await client.vector_io.insert( | |
| vector_db_id=vector_store.id, chunks=self.documents | |
| ) | |
| failed_count = 0 | |
| for result in results: | |
| if isinstance(result, Exception): | |
| LOG.error("Failed to upload file: %s", result) | |
| failed_count += 1 | |
| if failed_count > 0: | |
| LOG.warning( | |
| "%d/%d file uploads failed; proceeding with partial data", | |
| failed_count, | |
| len(upload_tasks), | |
| ) | |
| # Note: parameter is vector_db_id, not vector_store_id, will probably change soon | |
| await client.vector_io.insert( | |
| vector_db_id=vector_store.id, chunks=self.documents | |
| ) |
🤖 Prompt for AI Agents
In @src/lightspeed_rag_content/document_processor.py around lines 486 - 493, The
loop over results currently only logs exceptions and then continues to call
client.vector_io.insert, which can write chunks referencing missing
document_ids; instead, after gathering results (the results variable), detect
any Exception instances, collect their details (or filenames/ids if available),
log them, and abort by raising an error or returning early so
client.vector_io.insert(vector_db_id=vector_store.id, chunks=self.documents) is
not executed when uploads failed; update the code paths around results,
client.vector_io.insert, and self.documents to ensure insert only runs when no
upload exceptions occurred.
f899d0e to
4d6d1e0
Compare
4d6d1e0 to
69b092f
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (2)
README.md (1)
208-230: Documentation clearly explains an important portability limitation.This section effectively documents the embedding model path constraint and provides practical workarounds. The recommendation to use
-md ""with HuggingFace model IDs for portability is helpful.Minor style suggestion per static analysis: Line 218-219 could use "same path" instead of "exact same path" to reduce wordiness.
📝 Optional style improvement
-When llama-stack later consumes the vector database, it reads the embedding model -location from the kv_store. Therefore, **the embedding model must be available at the -exact same path** that was specified during database creation. +When llama-stack later consumes the vector database, it reads the embedding model +location from the kv_store. Therefore, **the embedding model must be available at the +same path** that was specified during database creation.src/lightspeed_rag_content/document_processor.py (1)
368-378: Async initialization implemented correctly.Minor documentation inconsistency: the docstring on line 372 still references
LlamaStackAsLibraryClientbut the actual class is nowAsyncLlamaStackAsLibraryClient.📝 Update docstring to match async client
def _start_llama_stack(self, cfg_file: str) -> Any: """Start llama-stack as a library and return the client. Return type is really - llama_stack.core.library_client.LlamaStackAsLibraryClient + llama_stack.core.library_client.AsyncLlamaStackAsLibraryClient But we do dynamic import, so we don't have it for static typechecking """
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (4)
README.mdpyproject.tomlsrc/lightspeed_rag_content/document_processor.pytests/test_document_processor_llama_stack.py
🧰 Additional context used
🧬 Code graph analysis (1)
tests/test_document_processor_llama_stack.py (1)
src/lightspeed_rag_content/document_processor.py (6)
write_yaml_config(346-366)provider_type(342-344)_start_llama_stack(368-378)save(155-160)save(567-591)save(702-705)
🪛 LanguageTool
README.md
[style] ~218-~218: ‘exact same’ might be wordy. Consider a shorter alternative.
Context: ...mbedding model must be available at the exact same path** that was specified during databa...
(EN_WORDINESS_PREMIUM_EXACT_SAME)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: build-and-push-dev
- GitHub Check: Pylinter
- GitHub Check: Konflux kflux-prd-rh02 / rag-tool-on-pull-request
🔇 Additional comments (15)
README.md (1)
249-255: Output file documentation updated appropriately.The addition of
files_metadata.dbdocumentation and the clarification about embedded registry metadata aligns with the implementation changes.tests/test_document_processor_llama_stack.py (7)
18-18: AsyncMock import correctly added for async testing.
26-105: FAISS_EXPECTED template correctly updated for llama-stack 0.3.x format.The expected YAML structure properly reflects the new configuration schema with storage backends, registered resources, and templated placeholders. The shields warning comment is appropriately preserved.
116-125: Selective path existence mock is functional.The approach to conditionally mock
os.path.existsworks for the current test setup. Note that using string matching ("embeddings_model" in str(path)) could be fragile if path patterns change, but this is acceptable for test code.
162-162: Client class assertion correctly updated to async variant.
199-214: YAML config test correctly updated for new signature.The test now passes
files_metadata_db_fileand verifies the updated template placeholders.
324-330: Async initialization test correctly verifies await behavior.Using
AsyncMockandassert_awaited_once()properly validates the async initialization flow.
420-515: Save flow tests comprehensively cover both chunking strategies.The test helper properly mocks async client methods (
vector_stores.create,files.create,vector_io.insert,vector_stores.file_batches.create) and the assertions verify the correct async methods are called for each chunking mode.src/lightspeed_rag_content/document_processor.py (7)
17-23: Required imports for async workflow added.
asyncioandBytesIOimports are necessary for the new async implementation.
71-77: Early return for llamastack correctly avoids duplicate embedding model setup.Llama-stack handles embeddings internally via the sentence-transformers provider, so skipping the HuggingFaceEmbedding setup is appropriate.
334-336: Client class correctly updated to async variant.
341-344: Provider type extraction property is functional.The string parsing logic correctly extracts
faissfromllamastack-faissandsqlite-vecfromllamastack-sqlite-vec.
495-565: Batch file processing implementation is well-structured.The polling mechanism with status checking and comprehensive error reporting is appropriate for batch operations. The 24-hour maximum wait time accommodates large document sets.
Note: Line 501 has the same
embedding_modelformat concern as line 427.
567-591: Save method orchestrates the async workflow.The implementation correctly sets up the output directory and config file before starting the async operations.
Note on asyncio pattern: The code calls
asyncio.run()in_start_llama_stackfor initialization, then again insave()for document processing. While this works (sequentialasyncio.run()calls are allowed), it creates separate event loops. Ensure theclientobject returned from_start_llama_stackremains usable across event loop boundaries - some async clients may not support this pattern.If issues arise with the client state, consider restructuring to use a single event loop:
async def _save_async(self, client, index): if self.config.manual_chunking: await self._insert_prechunked_documents(client, index) else: await self._upload_and_process_file_batches(client, index) # In save(): asyncio.run(self._save_async(client, index))
411-493: Async document insertion workflow implemented.The implementation correctly groups chunks by source document and uses
asyncio.gatherfor parallel file uploads. The error handling logs failures but continues processing remaining files.Potential issue on line 427: The embedding model format
f"sentence-transformers/{self.model_name_or_dir}"may produce invalid paths whenmodel_name_or_diris an absolute filesystem path (e.g.,"sentence-transformers//app/embeddings"instead of the path itself). Verify whethermodel_name_or_dircan be an absolute path and whether the prefix should be conditional or use a different configuration attribute.
| "llama-stack==0.3.5", | ||
| "llama-stack-client==0.3.5", | ||
| "aiosqlite>=0.21.0", | ||
| "sqlite-vec>=0.1.6", | ||
| "tomlkit", | ||
| "chardet>=5.2.0", | ||
| "einops>=0.8.1", |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check latest versions on PyPI
echo "=== llama-stack versions ==="
curl -s https://pypi.org/pypi/llama-stack/json | jq '.releases | keys[-5:]'
echo "=== llama-stack latest info ==="
curl -s https://pypi.org/pypi/llama-stack/json | jq '.info | {version, yanked}'
echo "=== llama-stack-client latest info ==="
curl -s https://pypi.org/pypi/llama-stack-client/json | jq '.info | {version, yanked}'
echo "=== chardet latest info ==="
curl -s https://pypi.org/pypi/chardet/json | jq '.info | {version, yanked}'
echo "=== einops latest info ==="
curl -s https://pypi.org/pypi/einops/json | jq '.info | {version, yanked}'Repository: lightspeed-core/rag-content
Length of output: 198
🌐 Web query:
llama-stack 0.3.5 latest version PyPI
💡 Result:
No — PyPI's latest llama-stack release is 0.4.0 (uploaded Jan 6, 2026). [1][2]
Sources:
[1] PyPI project page for llama-stack (release files show 0.4.0 uploaded Jan 6, 2026).
[2] Safety DB listing showing v0.3.5 (released Dec 15, 2025).
🌐 Web query:
llama-stack-client 0.3.5 latest version PyPI
💡 Result:
The latest PyPI release is llama-stack-client 0.4.0 (uploaded Jan 6, 2026). [1][2]
Sources:
- PyPI project page showing v0.4.0 and upload date. [1]
- Llama Stack organization package list on PyPI. [2]
🌐 Web query:
chardet 5.2.0 latest version security
💡 Result:
Short answer: chardet 5.2.0 (released Aug 1, 2023) is the latest PyPI release and there are no publicly reported direct security vulnerabilities for that version as of Jan 12, 2026. [1][2][3]
Sources:
- PyPI release page (shows 5.2.0, Aug 1, 2023). [1]
- Snyk package page — no known direct vulnerabilities for 5.2.0. [2]
- Safety/Safety DB summary — latest version 5.2.0 with no known security issues. [3]
If you want, I can: 1) monitor vulnerability feeds for new advisories, or 2) run a dependency/vulnerability scan on your project. Which would you prefer?
🌐 Web query:
einops 0.8.1 latest version
💡 Result:
Yes — the latest published release is einops 0.8.1 (released Feb 9, 2025). [1][2]
Sources:
[1] PyPI project page (einops) — release history and 0.8.1 upload metadata.
[2] PyPI stats / package pages showing latest version 0.8.1.
Consider upgrading llama-stack and llama-stack-client to 0.4.0.
The pyproject.toml pins llama-stack and llama-stack-client to 0.3.5, but the latest stable releases on PyPI are both 0.4.0 (released Jan 6, 2026). Review whether upgrading is feasible with the current API compatibility requirements, or explicitly document why 0.3.5 is intentionally locked to an older version.
The versions for chardet>=5.2.0 (latest 5.2.0, no known security advisories) and einops>=0.8.1 (latest release) are appropriate.
|
@Jdubrick I have updated the PR to address your issues. Now the files metadata is saved in a separate SQL store. |
Description
Updated
rag-contenttool to use the new llama-stack APIs for rag db creation.llama-stackhas been updated tov0.3.5in LCS. This means we now need to be able to use the rag-content tool to generate the rag db. Version v0.3.x introduces breaking changes with OpenAI compatibility, namely:Files(these need to be created in order to generate citations to files)vector_dbhas been renamed tovector_storeBecause of these changes, we need to rewrite the entire index creation logic and update the config to match the new llama-stack format.
There is currently a bug in llama-stack (here) that makes attributes (metadata) unusable. As a workaround, we rely on file citations (i.e. the filename) to extract the data we need downstream (URL and Title, to point to documents in the UI). A separate PR to LCS needs to be created to parse these.
NOTE: we currently unify the metadata registry and the vector index into a single
.dbfile. These could be separated but with no benefit. IMO it's better to have them together as the vector stores are tightly coupled to the registry (which stores info on the registered vector stores and files).In rag-content there are two chunking strategies:
--auto-chunkingflag, where we use default values and let llama-stack split the documents into chunksAUTO CHUNKING
For auto chunking we:
{uuid}::{url}::{title}.txtso we can extract each piece of information laterclient.vector_stores.file_batches.createto chunk the documents in batchesNOTE:
tool_runtime.rag_tool.inserthas been removed as it is deprecated.MANUAL CHUNKING
For manual chunking we:
Filesfor each document (as mentioned, we only need them for citations)client.vector_io.insertfor llama-stack to insert the chunks into the vector store and create the embeddingsType of change
Tools used to create PR
Identify any AI code assistants used in this PR (for transparency and review context)
Related Tickets & Documents
Checklist before requesting a review
Testing
Use the tool to create a rag db, then use the generated db in a llama-stack 0.3.5 instance to answer queries.
Summary by CodeRabbit
Chores
New Features
Tests
Documentation
✏️ Tip: You can customize this high-level summary in your review settings.