Skip to content

Commit 39e71c2

Browse files
committed
Refactor RAG pipeline and scraper: update pipeline methods for consistency, enhance throttling in scraper, and improve vector store distance handling; add unit tests for pipeline indexing and markdown conversion.
1 parent d0ccf9e commit 39e71c2

File tree

8 files changed

+173
-47
lines changed

8 files changed

+173
-47
lines changed

Makefile

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -188,19 +188,19 @@ scrape-list:
188188
scrape-list-parallel:
189189
python -m RAGnificent -o $(OUTPUT_DIR) --links-file $(LINKS_FILE) --parallel --max-workers $(WORKERS)
190190

191-
# Run the complete RAG uv pipeline with a single URL
192-
.PHONY: rag-uv pipeline
193-
rag-uv pipeline:
191+
# Run the complete RAG pipeline with a single URL
192+
.PHONY: rag_pipeline
193+
rag_pipeline:
194194
ifeq ($(URL),)
195-
@echo "Error: URL is required. Use 'make rag-uv pipeline URL=https://example.com'"
195+
@echo "Error: URL is required. Use 'make rag_pipeline URL=https://example.com'"
196196
@exit 1
197197
endif
198-
python -c "from RAGnificent.rag.uv pipeline import uv pipeline; uv pipeline = uv pipeline(collection_name='$(COLLECTION)'); uv pipeline.run_uv pipeline(url='$(URL)', run_extract=True, run_chunk=True, run_embed=True, run_store=True)"
198+
python -c "from RAGnificent.rag.pipeline import Pipeline; pipeline = Pipeline(collection_name='$(COLLECTION)'); pipeline.run_pipeline(url='$(URL)', run_extract=True, run_chunk=True, run_embed=True, run_store=True)"
199199

200-
# Run the complete RAG uv pipeline with a list of URLs
201-
.PHONY: rag-uv pipeline-list
202-
rag-uv pipeline-list:
203-
python -c "from RAGnificent.rag.uv pipeline import uv pipeline; uv pipeline = uv pipeline(collection_name='$(COLLECTION)'); uv pipeline.run_uv pipeline(links_file='$(LINKS_FILE)', run_extract=True, run_chunk=True, run_embed=True, run_store=True)"
200+
# Run the complete RAG pipeline with a list of URLs
201+
.PHONY: rag_pipeline_list
202+
rag_pipeline_list:
203+
python -c "from RAGnificent.rag.pipeline import Pipeline; pipeline = Pipeline(collection_name='$(COLLECTION)'); pipeline.run_pipeline(links_file='$(LINKS_FILE)', run_extract=True, run_chunk=True, run_embed=True, run_store=True)"
204204

205205
# Extract content from a URL
206206
.PHONY: extract
@@ -209,27 +209,27 @@ ifeq ($(URL),)
209209
@echo "Error: URL is required. Use 'make extract URL=https://example.com'"
210210
@exit 1
211211
endif
212-
python -c "from RAGnificent.rag.uv pipeline import uv pipeline; uv pipeline().extract_content(url='$(URL)', output_file='$(RAW_DOCUMENTS)', output_format='$(FORMAT)')"
212+
python -c "from RAGnificent.rag.pipeline import Pipeline; Pipeline().extract_content(url='$(URL)', output_file='$(RAW_DOCUMENTS)', output_format='$(FORMAT)')"
213213

214214
# Extract content from a list of URLs
215215
.PHONY: extract-list
216216
extract-list:
217-
python -c "from RAGnificent.rag.uv pipeline import uv pipeline; uv pipeline().extract_content(links_file='$(LINKS_FILE)', output_file='$(RAW_DOCUMENTS)', output_format='$(FORMAT)')"
217+
python -c "from RAGnificent.rag.pipeline import Pipeline; Pipeline().extract_content(links_file='$(LINKS_FILE)', output_file='$(RAW_DOCUMENTS)', output_format='$(FORMAT)')"
218218

219219
# Chunk documents
220220
.PHONY: chunk
221221
chunk:
222-
python -c "from RAGnificent.rag.uv pipeline import uv pipeline; uv pipeline().chunk_documents('$(RAW_DOCUMENTS)', '$(DOCUMENT_CHUNKS)')"
222+
python -c "from RAGnificent.rag.pipeline import Pipeline; Pipeline().chunk_documents('$(RAW_DOCUMENTS)', '$(DOCUMENT_CHUNKS)')"
223223

224224
# Embed chunks
225225
.PHONY: embed
226226
embed:
227-
python -c "from RAGnificent.rag.uv pipeline import uv pipeline; uv pipeline().embed_chunks('$(DOCUMENT_CHUNKS)', '$(EMBEDDED_CHUNKS)')"
227+
python -c "from RAGnificent.rag.pipeline import Pipeline; Pipeline().embed_chunks('$(DOCUMENT_CHUNKS)', '$(EMBEDDED_CHUNKS)')"
228228

229229
# Store chunks in vector database
230230
.PHONY: store
231231
store:
232-
python -c "from RAGnificent.rag.uv pipeline import uv pipeline; uv pipeline(collection_name='$(COLLECTION)').store_chunks('$(EMBEDDED_CHUNKS)')"
232+
python -c "from RAGnificent.rag.pipeline import Pipeline; Pipeline(collection_name='$(COLLECTION)').store_chunks('$(EMBEDDED_CHUNKS)')"
233233

234234
# Search the vector database
235235
.PHONY: search
@@ -247,7 +247,7 @@ ifeq ($(QUERY),)
247247
@echo "Error: QUERY is required. Use 'make query QUERY=\"your query\"'"
248248
@exit 1
249249
endif
250-
python -c "from RAGnificent.rag.uv pipeline import uv pipeline; response = uv pipeline(collection_name='$(COLLECTION)').query_with_context('$(QUERY)', $(LIMIT)); print(f'Response: {response[\"response\"]}\n\nSources:\n' + '\n'.join([f'- {r[\"source_url\"]}' for r in response['context']]))"
250+
python -c "from RAGnificent.rag.pipeline import Pipeline; response = Pipeline(collection_name='$(COLLECTION)').query_with_context('$(QUERY)', $(LIMIT)); print(f'Response: {response[\"response\"]}\n\nSources:\n' + '\n'.join([f'- {r[\"source_url\"]}' for r in response['context']]))"
251251

252252
# Run the demo for all output formats
253253
.PHONY: run-demo
@@ -262,7 +262,7 @@ run-hello:
262262
# Run the RAG uv pipeline example
263263
.PHONY: run-rag-example
264264
run-rag-example:
265-
python examples/rag_uv pipeline_example.py
265+
python examples/rag_pipeline_example.py
266266

267267
# Visualize Qdrant data
268268
.PHONY: view-qdrant

RAGnificent/core/scraper.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import requests
2020
from bs4 import BeautifulSoup, Tag
21-
from bs4 import BeautifulSoup, Tag
2221
from RAGnificent.core.cache import RequestCache
2322
from RAGnificent.core.logging import get_logger
2423
from RAGnificent.core.security import redact_sensitive_data
@@ -216,7 +215,8 @@ def _cache_response(self, url: str, content: str) -> None:
216215
def _fetch_with_retries(self, url: str) -> str:
217216
for attempt in range(self.max_retries):
218217
try:
219-
self.throttler.throttle()
218+
# Use domain-aware throttling
219+
self.throttler.throttle(url)
220220
response = self.session.get(url, timeout=self.timeout)
221221
response.raise_for_status()
222222

@@ -429,7 +429,6 @@ def convert_to_markdown(self, html_content: str, url: str = "") -> str:
429429
]:
430430
if element_markdown := self._get_element_markdown(element, base_url):
431431
markdown_content += element_markdown + "\n\n"
432-
markdown_content += element_markdown + "\n\n"
433432

434433
logger.info("Conversion to Markdown completed.")
435434
return markdown_content.strip()

RAGnificent/core/security.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,15 +69,17 @@ def wrapper(*args, **kwargs):
6969
class ThrottledSession:
7070
"""Session with built-in throttling for HTTP requests."""
7171

72-
def __init__(self, requests_per_second: float = 1.0):
72+
def __init__(self, requests_per_second: float = 1.0, default_timeout: float = 30.0):
7373
"""
7474
Initialize throttled session.
7575
7676
Args:
7777
requests_per_second: Maximum number of requests per second
78+
default_timeout: Default network timeout in seconds if none is provided
7879
"""
7980
self.min_interval = 1.0 / requests_per_second
8081
self.last_request_time = 0
82+
self.default_timeout = default_timeout
8183

8284
def request(self, method: str, url: str, **kwargs) -> Any:
8385
"""
@@ -102,9 +104,34 @@ def request(self, method: str, url: str, **kwargs) -> Any:
102104
time.sleep(wait_time)
103105

104106
self.last_request_time = time.time()
107+
# Ensure a sane default timeout is always applied
108+
kwargs.setdefault("timeout", self.default_timeout)
105109
return requests.request(method, url, **kwargs)
106110

107111

112+
def validate_file_access(path: str) -> bool:
113+
"""
114+
Validate that a file path exists and is safely readable.
115+
116+
Args:
117+
path: Absolute or relative path to a file
118+
119+
Returns:
120+
bool: True if the file exists, is a regular file, and is readable
121+
"""
122+
import os
123+
124+
try:
125+
# Resolve symlinks and ensure it points to a regular file
126+
real_path = os.path.realpath(path)
127+
if not os.path.isfile(real_path):
128+
return False
129+
# Check read permission
130+
return os.access(real_path, os.R_OK)
131+
except Exception:
132+
return False
133+
134+
108135
def redact_sensitive_data(
109136
text: str, patterns: Optional[List[Tuple[str, str]]] = None
110137
) -> str:

RAGnificent/rag/pipeline.py

Lines changed: 35 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -372,20 +372,19 @@ def _execute_index_step(self, config: Dict[str, Any]) -> Dict[str, Any]:
372372
content, source_url=str(md_file)
373373
)
374374

375-
# Generate embeddings and store
376-
for chunk in chunks:
377-
embedding = self.embedding_service.generate_embeddings(
378-
[chunk["content"]]
379-
)[0]
380-
self.vector_store.store_documents(
381-
[
382-
{
383-
"content": chunk["content"],
384-
"metadata": chunk["metadata"],
385-
"embedding": embedding,
386-
}
387-
]
388-
)
375+
# Generate embeddings for all chunks and store
376+
chunk_dicts = [
377+
{
378+
"content": chunk.content,
379+
"metadata": chunk.metadata,
380+
"source_url": chunk.source_url,
381+
"id": chunk.id,
382+
}
383+
for chunk in chunks
384+
]
385+
386+
embedded_chunks = self.embedding_service.embed_chunks(chunk_dicts)
387+
self.vector_store.store_documents(embedded_chunks)
389388

390389
indexed_count += len(chunks)
391390

@@ -435,7 +434,7 @@ def _process_single_url(
435434
Document dictionary or None if extraction failed
436435
"""
437436
try:
438-
from ragnificent_rs import OutputFormat
437+
from RAGnificent.ragnificent_rs import OutputFormat
439438

440439
# Convert string to OutputFormat enum if it's not already an enum
441440
output_format_enum = output_format
@@ -1099,16 +1098,27 @@ def query_with_context(
10991098
]
11001099

11011100
try:
1102-
# Generate response
1103-
completion = openai.chat.completions.create(
1104-
model=model,
1105-
messages=messages,
1106-
temperature=temperature,
1107-
max_tokens=self.config.openai.max_tokens,
1108-
timeout=self.config.openai.request_timeout,
1109-
)
1110-
1111-
response = completion.choices[0].message.content
1101+
# Generate response (support both legacy and client-style APIs)
1102+
try:
1103+
from openai import OpenAI # type: ignore
1104+
1105+
client = OpenAI()
1106+
completion = client.chat.completions.create(
1107+
model=model,
1108+
messages=messages,
1109+
temperature=temperature,
1110+
max_tokens=self.config.openai.max_tokens,
1111+
)
1112+
response = completion.choices[0].message.content
1113+
except Exception: # Fallback to legacy if client API unavailable
1114+
completion = openai.chat.completions.create(
1115+
model=model,
1116+
messages=messages,
1117+
temperature=temperature,
1118+
max_tokens=self.config.openai.max_tokens,
1119+
timeout=self.config.openai.request_timeout,
1120+
)
1121+
response = completion.choices[0].message.content
11121122

11131123
return {
11141124
"query": query,

RAGnificent/rag/vector_store.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,9 +195,20 @@ def initialize_collection(
195195
logger.info(
196196
f"Creating collection '{collection_name}' with vector size {vector_size}"
197197
)
198+
# Map distance string to Qdrant enum
199+
distance_map = {
200+
"Cosine": models.Distance.COSINE,
201+
"cosine": models.Distance.COSINE,
202+
"Euclid": models.Distance.EUCLID,
203+
"euclid": models.Distance.EUCLID,
204+
"Dot": models.Distance.DOT,
205+
"dot": models.Distance.DOT,
206+
}
207+
dist_enum = distance_map.get(distance, models.Distance.COSINE)
208+
198209
client.create_collection(
199210
collection_name=collection_name,
200-
vectors_config=models.VectorParams(size=vector_size, distance=distance),
211+
vectors_config=models.VectorParams(size=vector_size, distance=dist_enum),
201212
)
202213

203214
logger.info(f"Successfully created collection '{collection_name}'")

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ dependencies = [
1717
"sentry-sdk[fastapi]>=2.29.1",
1818
"rich>=14.0.0",
1919
"bleach>=6.2.0",
20-
"code2flow>=2.5.1",
2120
"responses>=0.25.7",
2221
]
2322

@@ -29,6 +28,7 @@ dev = [
2928
"mypy>=1.15.0",
3029
"isort>=6.0.1",
3130
"sourcery>=1.36.0",
31+
"code2flow>=2.5.1",
3232
]
3333
test = [
3434
"pytest>=8.3.5",
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
import os
2+
from pathlib import Path
3+
4+
import numpy as np
5+
import pytest
6+
7+
8+
def test_execute_index_step_embeds_and_stores(tmp_path, monkeypatch):
9+
# Arrange: create a simple markdown file in a temp data dir
10+
data_dir = tmp_path / "data"
11+
data_dir.mkdir(parents=True, exist_ok=True)
12+
md_file = data_dir / "doc.md"
13+
md_file.write_text("# Title\n\nHello world paragraph.", encoding="utf-8")
14+
15+
# Dummy vector store that just records documents
16+
class DummyVS:
17+
def __init__(self, *args, **kwargs):
18+
self.stored = []
19+
20+
def store_documents(self, docs, **kwargs):
21+
# Ensure embeddings present
22+
for d in docs:
23+
assert "embedding" in d
24+
emb = d["embedding"]
25+
# Accept numpy array or list
26+
assert isinstance(emb, (list, np.ndarray))
27+
self.stored.extend(docs)
28+
return True
29+
30+
def count_documents(self):
31+
return len(self.stored)
32+
33+
# Dummy search object used by pipeline
34+
class DummySearch:
35+
def __init__(self, *args, **kwargs):
36+
pass
37+
38+
import RAGnificent.rag.pipeline as pl
39+
40+
# Patch to avoid real Qdrant and search init
41+
monkeypatch.setattr(pl, "get_vector_store", lambda *a, **k: DummyVS())
42+
monkeypatch.setattr(pl, "get_search", lambda *a, **k: DummySearch())
43+
44+
# Act: construct pipeline with simple embedder to avoid heavy models
45+
pipeline = pl.Pipeline(
46+
data_dir=str(data_dir),
47+
embedding_model_type="simpler",
48+
continue_on_error=True,
49+
)
50+
51+
result = pipeline._execute_index_step({"input_dir": "."})
52+
53+
# Assert
54+
assert isinstance(result, dict)
55+
assert result.get("indexed_documents", 0) >= 1
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import pytest
2+
3+
from RAGnificent.core.scraper import MarkdownScraper
4+
5+
6+
def test_markdown_conversion_no_duplicate_elements():
7+
html = """
8+
<html>
9+
<head><title>Page</title></head>
10+
<body>
11+
<main>
12+
<h1>Header</h1>
13+
<p>One</p>
14+
<p>Two</p>
15+
</main>
16+
</body>
17+
</html>
18+
"""
19+
s = MarkdownScraper()
20+
md = s.convert_to_markdown(html, url="https://example.com")
21+
# Expect title once and paragraphs once each
22+
assert md.count("# Header") == 1
23+
assert md.count("One") == 1
24+
assert md.count("Two") == 1

0 commit comments

Comments
 (0)