Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
1d757c8
added pipeline functions for updater
carowa292 Apr 24, 2026
ad52c5e
prepare loaders for updater
kymeyer Apr 28, 2026
920a5d0
use store.py functions
kymeyer Apr 28, 2026
276726b
use get_documents and switch to "source"-identifier
kymeyer May 4, 2026
08c3c81
Update for doc-specific hashing
kymeyer May 6, 2026
2ef59c2
simplify loading pdf in one document
carowa292 May 6, 2026
505b272
adapt get_documents() to the needs of process_loaders
carowa292 May 7, 2026
afebe20
remove combined-hash
kymeyer May 11, 2026
3be4a20
type-safety improvements / comments
kymeyer May 11, 2026
7a60321
changed naming: path to source
carowa292 May 13, 2026
05f6081
Fix running the import from UI
denkv May 18, 2026
a393fc3
Merge branch 'develop' into feature/updater
denkv May 18, 2026
19448a2
Merge branch 'feature/updater' of https://github.com/learn2rag/config…
denkv May 18, 2026
90f7dbe
Refactor calling of the modules from the main entrypoint
denkv May 18, 2026
3a2bfd1
Add repeatedly scheduled import running with `apscheduler`
denkv May 18, 2026
0f9e197
Fix the first import not starting immediately
denkv May 18, 2026
a01f3d6
Fix the look of an input
denkv May 18, 2026
5a2280f
Add the import interval input in the interface
denkv May 18, 2026
faa6d02
Fix typing
denkv May 18, 2026
42207c1
Use a default interval value when missing
denkv May 18, 2026
8d8cf99
switch to single mode for pdf import
kymeyer May 18, 2026
e04c6af
update version and remove unused import
kymeyer May 18, 2026
b3eb455
Do not specify the pipeline port in the first steps guide
denkv May 18, 2026
d7644ba
Add missing interval in the first steps guide
denkv May 18, 2026
ef71113
Run the continuous pipeline from the first steps guide
denkv May 18, 2026
badb7ea
Change the interface for starting pipelines
denkv May 18, 2026
078b252
Merge branch 'feature/updater' of https://github.com/learn2rag/config…
denkv May 18, 2026
87446a9
Normalize logging config
denkv May 19, 2026
4eed21b
Add status log
denkv May 19, 2026
58c7b96
Display the last status log message
denkv May 19, 2026
c1a5acb
Fixme
denkv May 19, 2026
1611c36
exclude main.py from mypy test
hannred May 19, 2026
7440ff2
exclude main.py from mypy test
hannred May 19, 2026
a156c52
Fix immediately starting the import when pipeline is configured
denkv May 19, 2026
6f391f2
Fix input label
denkv May 19, 2026
a6de58d
update main importer script +tests +typesafe
kymeyer May 20, 2026
5290ee3
Merge branch 'develop' into feature/updater
denkv May 21, 2026
854cd30
adapted generate from path to source
carowa292 May 26, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 33 additions & 4 deletions learn2rag/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,24 @@
import os
import pathlib
import sys
from datetime import datetime, timedelta
from types import TracebackType
from typing import Any, Unpack

import yaml
# TODO: apscheduler 4.x would come with py.typed
# https://github.com/agronholm/apscheduler/issues/648#issuecomment-1195304357
from apscheduler.schedulers.blocking import BlockingScheduler # type: ignore[import-untyped]
from apscheduler.triggers.interval import IntervalTrigger # type: ignore[import-untyped]
from pydantic import TypeAdapter


class LauncherArgumentParser(argparse.ArgumentParser):
def __init__(self) -> None:
super().__init__()
self.add_argument('module', type=str, nargs='?', default='learn2rag.ui')
self.add_argument('--logging-config', type=pathlib.Path)
self.add_argument('--schedule-interval', type=TypeAdapter(timedelta).validate_python)


def excepthook(*exc_info: Unpack[tuple[type[BaseException], BaseException, TracebackType | None]]) -> None:
Expand Down Expand Up @@ -58,12 +65,34 @@ def configure_logging(config_path: pathlib.Path, debug: bool) -> None:
logging.debug('Learn2RAG launcher starting: %s, %s', args, rest)
module = importlib.import_module(args.module)
# TODO
module_args: tuple[Any, ...] = ()
module_kwargs = {}
if args.module == 'learn2rag.ollama_tool':
# FIXME default config values
module.main(rest, config=config.get('OLLAMA', {'port': 11434}))
module_args = (
rest,
)
module_kwargs = {'config': config.get('OLLAMA', {'port': 11434})}
elif args.module == 'learn2rag.importer':
module.main(module.ImporterArgumentParser().parse_args(rest))
module_args = (
module.ImporterArgumentParser().parse_args(rest),
)
elif args.module == 'learn2rag.ui':
module.main(config)
module_args = (
config,
)

if args.schedule_interval:
scheduler = BlockingScheduler()
trigger = IntervalTrigger(seconds=args.schedule_interval.total_seconds())
scheduler.add_job(
module.main,
trigger,
next_run_time=datetime.now(),
max_instances=1,
args=module_args,
kwargs=module_kwargs,
)
scheduler.start()
else:
module.main()
module.main(*module_args, **module_kwargs)
4 changes: 3 additions & 1 deletion learn2rag/evaluation/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ def ingest_dataset_documents(dataset_name: str) -> None:
'imported_documents_file_path': documents_path,
'llm': None,
}
learn2rag.pipeline.ingestion.index(user_config, opt_config)
raise NotImplementedError()
# FIXME
# learn2rag.pipeline.ingestion.index(user_config, opt_config)


def read_dataset_qa(dataset_name: str, subdirectory: str, split: str | None=None) -> Any:
Expand Down
30 changes: 18 additions & 12 deletions learn2rag/importer/loaders/directory_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
This module handles loading documents from directories.

Author: Kyrill Meyer
Version: 0.0.5
Version: 0.0.6
Institution: IFDT
Creation Date: June 10, 2025
Last Modified: Mai 05, 2026
Expand All @@ -16,7 +16,7 @@
from datetime import datetime
from typing import List, Union
from ..globals import stop_loading
from langchain_community.document_loaders import DirectoryLoader, PyPDFDirectoryLoader
from langchain_community.document_loaders import DirectoryLoader, PyPDFLoader
from langchain_core.documents import Document

# supress pdfminer-Warnings
Expand Down Expand Up @@ -63,7 +63,7 @@ def load_from_directory(path: str, recursive: Union[bool, str], silent_errors: b
recursive = recursive.lower() == "true"


text_loader_kwargs = {"autodetect_encoding": True, "detect_language_per_element": False}
text_loader_kwargs = {"autodetect_encoding": True, "detect_language_per_element": False, "mode": "single"}
loader = DirectoryLoader(
path,
show_progress=True,
Expand All @@ -83,21 +83,25 @@ def load_from_directory(path: str, recursive: Union[bool, str], silent_errors: b
"*.epub",
]
)
pdf_loader = PyPDFDirectoryLoader(
# use pypdf instead of unstructured[pdf] for better performance and stability, especially with large PDFs
pdf_loader = DirectoryLoader(
path,
glob="*.pdf",
loader_cls=PyPDFLoader, # type: ignore[arg-type]
loader_kwargs={"mode": "single"},
recursive=recursive,
silent_errors=silent_errors,
)

#loader = DirectoryLoader(path, show_progress=True, loader_kwargs=text_loader_kwargs, recursive=recursive, glob=["*.csv", "*.docx", "*.eml", "*.epub", "*.html", "*.json", "*.md", "*.odt", "*.pdf", "*.ppt", "*.pptx", "*.rst", "*.rtf", "*.txt", "*.tsv", "*.cls", "*.xlsx", "*.xml"])

#external dependencies
# doc - requires libreoffice
# epub - requires pandoc

#loader = DirectoryLoader(path, show_progress=True, silent_errors=True, recursive=False)
try:
loaded_documents = loader.load() + pdf_loader.load()
other_docs = loader.load()
pdf_docs = pdf_loader.load()
loaded_documents = other_docs + pdf_docs
except Exception as e:
logger.error(f"Error loading documents from directory: {e}")
return []
Expand All @@ -107,9 +111,14 @@ def load_from_directory(path: str, recursive: Union[bool, str], silent_errors: b
logger.info("Loading process stopped by user.")
break
try:
# generate a unique hash for the document content
if isinstance(doc, Document):
content_hash = hashlib.sha256(doc.page_content.encode('utf-8')).hexdigest()
# Hash raw file bytes so the Document carries a stable, source-level hash
# directly comparable to the value stored in Qdrant by get_documents().
try:
with open(doc.metadata["source"], "rb") as _f:
content_hash = hashlib.sha256(_f.read()).hexdigest()
except OSError:
content_hash = hashlib.sha256(doc.page_content.encode("utf-8")).hexdigest()
doc.metadata["content_hash"] = content_hash

# get file metadata
Expand Down Expand Up @@ -159,6 +168,3 @@ def load_from_directory(path: str, recursive: Union[bool, str], silent_errors: b
else:
logger.warning(f"No documents found in directory: {path}")
return documents



114 changes: 111 additions & 3 deletions learn2rag/importer/loaders/drupal_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@

Author: Kyrill Meyer
Institution: IFDT
Version: 0.0.1
Version: 0.0.2
Creation Date: March 17, 2026
Last Modified: March 17, 2026
Last Modified: April 24, 2026
"""

import hashlib
import logging
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
from bs4 import BeautifulSoup
from langchain_core.documents import Document
Expand Down Expand Up @@ -77,7 +78,7 @@ def _build_session(auth_type: str, username: str, password: str, token: str) ->
def _html_to_text(html: str) -> str:
"""Strip HTML tags and return plain text."""
soup = BeautifulSoup(html, "html.parser")
return soup.get_text(separator="\n", strip=True)
return str(soup.get_text(separator="\n", strip=True))


def _extract_page_content(attributes: Dict[str, Any], text_fields: List[str]) -> str:
Expand Down Expand Up @@ -140,6 +141,7 @@ def load_from_drupal(
text_fields: Optional[List[str]] = None,
page_size: int = 50,
language: str = "",
since: Optional[datetime] = None,
) -> List[Document]:
"""
Load documents from a Drupal instance via the JSON:API.
Expand Down Expand Up @@ -193,6 +195,15 @@ def load_from_drupal(
"page[offset]": 0,
}

# Timestamp filter: only documents changed on or after `since`
if since is not None:
# Ensure the timestamp is timezone-aware and formatted as ISO 8601 for JSON:API
since_utc = since.astimezone(timezone.utc) if since.tzinfo else since.replace(tzinfo=timezone.utc)
params["filter[changed-filter][condition][path]"] = "changed"
params["filter[changed-filter][condition][operator]"] = ">="
params["filter[changed-filter][condition][value]"] = since_utc.isoformat()
logger.info(f"DrupalLoader: applying since-filter >= {since_utc.isoformat()} for '{content_type}'")

page_count = 0
next_url: Optional[str] = endpoint

Expand Down Expand Up @@ -301,3 +312,100 @@ def load_from_drupal(

logger.info(f"DrupalLoader: finished. Total documents loaded: {len(all_documents)}")
return all_documents


def get_all_drupal_document_ids(
base_url: str,
content_types: List[str],
auth_type: str = "none",
username: str = "",
password: str = "",
token: str = "",
page_size: int = 100,
language: str = "",
) -> List[str]:
"""
Retrieve the source URL for every current node in Drupal without loading content.

Intended for deletion detection in the 2-pass delta import: compare the returned
set against the paths stored in Qdrant to find nodes that have been removed.

Args:
base_url (str): Base URL of the Drupal site, e.g. ``"https://example.com"``.
content_types (list): List of content type machine names, e.g. ``["article", "page"]``.
auth_type (str): Authentication type: ``"none"``, ``"basic"``, or ``"token"``.
username (str): Username for Basic Auth.
password (str): Password for Basic Auth.
token (str): Bearer token for token auth.
page_size (int): Items per API page request (default 100; larger than load_from_drupal
default because only IDs are fetched, saving bandwidth).
language (str): Optional language filter passed via ``Accept-Language`` header.

Returns:
List[str]: Source URLs of all current nodes, e.g. ``["https://example.com/node/42"]``.
"""
session = _build_session(auth_type, username, password, token)
if language:
session.headers.update({"Accept-Language": language})

endpoint_map = _discover_endpoint_map(base_url, session)
all_ids: List[str] = []

for content_type in content_types:
if stop_loading:
break

resource_key = f"node--{content_type}"
if resource_key in endpoint_map:
endpoint = endpoint_map[resource_key]
else:
endpoint = f"{base_url.rstrip('/')}/jsonapi/node/{content_type}"

# Request only nid + langcode (no content) to minimise bandwidth
params: Dict[str, Any] = {
"fields[node--{}]".format(content_type): "drupal_internal__nid,langcode",
"page[limit]": page_size,
"page[offset]": 0,
}

next_url: Optional[str] = endpoint
page_count = 0

while next_url:
try:
if page_count == 0:
response = session.get(next_url, params=params, timeout=30)
else:
response = session.get(next_url, timeout=30)
response.raise_for_status()
data = response.json()
except requests.exceptions.RequestException as e:
logger.error(f"get_all_drupal_document_ids: Request failed for '{content_type}': {e}")
break

items = data.get("data", [])
if not items:
break

for item in items:
attributes: Dict[str, Any] = item.get("attributes", {})
node_id: str = item.get("id", "")
drupal_id: str = str(attributes.get("drupal_internal__nid", node_id))
source_url = f"{base_url.rstrip('/')}/node/{drupal_id}"
all_ids.append(source_url)

links = data.get("links", {})
next_link = links.get("next")
if next_link and isinstance(next_link, dict):
next_url = next_link.get("href")
elif next_link and isinstance(next_link, str):
next_url = next_link
else:
next_url = None

page_count += 1

logger.info(f"get_all_drupal_document_ids: found {len(all_ids)} IDs so far after content type '{content_type}'")

logger.info(f"get_all_drupal_document_ids: total {len(all_ids)} document IDs")
return all_ids
41 changes: 25 additions & 16 deletions learn2rag/importer/loaders/html_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

Author: Kyrill Meyer
Institution: IFDT
Version: 0.0.6
Version: 0.0.7
Creation Date: July 28, 2025
Last Modified: May 05, 2026
"""
Expand Down Expand Up @@ -87,29 +87,38 @@ def load_html_content(url: str, depth: int = 0, visited: Optional[Set[str]] = No
# Use UnstructuredHTMLLoader to extract content
loader = UnstructuredHTMLLoader(temp_file)
page_documents = loader.load()
# Compute one hash for the entire page so all sub-documents share the same value.
# This ensures that get_documents_by_loader_id can safely deduplicate by source URL
# without ambiguity caused by different hashes for chunks of the same page.
page_hash = hashlib.sha256(response.text.encode('utf-8')).hexdigest()
# Extract meta properties using BeautifulSoup
soup = BeautifulSoup(response.text, "html.parser")
meta_tags = {meta.get("name", meta.get("property", "")): meta.get("content", "")
for meta in soup.find_all("meta") if meta.get("content")}

for doc in page_documents:
# Merge all extracted elements into a single Document per URL so that
# delta-import deduplication always works on a 1:1 source→document basis.
valid_docs = [d for d in page_documents if isinstance(d, Document)]
if not valid_docs:
logger.warning(f"No valid documents extracted from {url}")
else:
if stop_loading:
logger.info("Loading process stopped by user.")
break
# Generate a unique hash for the document content
if isinstance(doc, Document):
content_hash = hashlib.sha256(doc.page_content.encode('utf-8')).hexdigest()
doc.metadata["content_hash"] = content_hash
else:
logger.warning(f"Document is not of type Document: {type(doc)}. Skipping.")
continue
doc.metadata["source"] = url
doc.metadata["process_date"] = datetime.now().strftime("%Y-%m-%d")
doc.metadata["process_time"] = datetime.now().strftime("%H:%M:%S")
doc.metadata["loader_type"] = "HTMLLoader"
doc.metadata["meta_properties"] = meta_tags
doc.metadata["loader_id"] = loader_id
documents.extend(page_documents)
merged_content = "\n\n".join(d.page_content for d in valid_docs)
merged_doc = Document(
page_content=merged_content,
metadata={
"source": url,
"content_hash": page_hash,
"process_date": datetime.now().strftime("%Y-%m-%d"),
"process_time": datetime.now().strftime("%H:%M:%S"),
"loader_type": "HTMLLoader",
"meta_properties": meta_tags,
"loader_id": loader_id,
},
)
documents.append(merged_doc)

logger.info(f"Loaded content from {url}")

Expand Down
Loading
Loading