diff --git a/learn2rag/__main__.py b/learn2rag/__main__.py index e06a600..83daa2a 100644 --- a/learn2rag/__main__.py +++ b/learn2rag/__main__.py @@ -6,10 +6,16 @@ import os import pathlib import sys +from datetime import datetime, timedelta from types import TracebackType from typing import Any, Unpack import yaml +# TODO: apscheduler 4.x would come with py.typed +# https://github.com/agronholm/apscheduler/issues/648#issuecomment-1195304357 +from apscheduler.schedulers.blocking import BlockingScheduler # type: ignore[import-untyped] +from apscheduler.triggers.interval import IntervalTrigger # type: ignore[import-untyped] +from pydantic import TypeAdapter class LauncherArgumentParser(argparse.ArgumentParser): @@ -17,6 +23,7 @@ def __init__(self) -> None: super().__init__() self.add_argument('module', type=str, nargs='?', default='learn2rag.ui') self.add_argument('--logging-config', type=pathlib.Path) + self.add_argument('--schedule-interval', type=TypeAdapter(timedelta).validate_python) def excepthook(*exc_info: Unpack[tuple[type[BaseException], BaseException, TracebackType | None]]) -> None: @@ -58,12 +65,34 @@ def configure_logging(config_path: pathlib.Path, debug: bool) -> None: logging.debug('Learn2RAG launcher starting: %s, %s', args, rest) module = importlib.import_module(args.module) # TODO + module_args: tuple[Any, ...] = () + module_kwargs = {} if args.module == 'learn2rag.ollama_tool': # FIXME default config values - module.main(rest, config=config.get('OLLAMA', {'port': 11434})) + module_args = ( + rest, + ) + module_kwargs = {'config': config.get('OLLAMA', {'port': 11434})} elif args.module == 'learn2rag.importer': - module.main(module.ImporterArgumentParser().parse_args(rest)) + module_args = ( + module.ImporterArgumentParser().parse_args(rest), + ) elif args.module == 'learn2rag.ui': - module.main(config) + module_args = ( + config, + ) + + if args.schedule_interval: + scheduler = BlockingScheduler() + trigger = IntervalTrigger(seconds=args.schedule_interval.total_seconds()) + scheduler.add_job( + module.main, + trigger, + next_run_time=datetime.now(), + max_instances=1, + args=module_args, + kwargs=module_kwargs, + ) + scheduler.start() else: - module.main() + module.main(*module_args, **module_kwargs) diff --git a/learn2rag/evaluation/tools.py b/learn2rag/evaluation/tools.py index b91d0d7..b927bc1 100644 --- a/learn2rag/evaluation/tools.py +++ b/learn2rag/evaluation/tools.py @@ -74,7 +74,9 @@ def ingest_dataset_documents(dataset_name: str) -> None: 'imported_documents_file_path': documents_path, 'llm': None, } - learn2rag.pipeline.ingestion.index(user_config, opt_config) + raise NotImplementedError() + # FIXME + # learn2rag.pipeline.ingestion.index(user_config, opt_config) def read_dataset_qa(dataset_name: str, subdirectory: str, split: str | None=None) -> Any: diff --git a/learn2rag/importer/loaders/directory_loader.py b/learn2rag/importer/loaders/directory_loader.py index ec03ce4..2278459 100644 --- a/learn2rag/importer/loaders/directory_loader.py +++ b/learn2rag/importer/loaders/directory_loader.py @@ -5,7 +5,7 @@ This module handles loading documents from directories. Author: Kyrill Meyer -Version: 0.0.5 +Version: 0.0.6 Institution: IFDT Creation Date: June 10, 2025 Last Modified: Mai 05, 2026 @@ -16,7 +16,7 @@ from datetime import datetime from typing import List, Union from ..globals import stop_loading -from langchain_community.document_loaders import DirectoryLoader, PyPDFDirectoryLoader +from langchain_community.document_loaders import DirectoryLoader, PyPDFLoader from langchain_core.documents import Document # supress pdfminer-Warnings @@ -63,7 +63,7 @@ def load_from_directory(path: str, recursive: Union[bool, str], silent_errors: b recursive = recursive.lower() == "true" - text_loader_kwargs = {"autodetect_encoding": True, "detect_language_per_element": False} + text_loader_kwargs = {"autodetect_encoding": True, "detect_language_per_element": False, "mode": "single"} loader = DirectoryLoader( path, show_progress=True, @@ -83,21 +83,25 @@ def load_from_directory(path: str, recursive: Union[bool, str], silent_errors: b "*.epub", ] ) - pdf_loader = PyPDFDirectoryLoader( + # use pypdf instead of unstructured[pdf] for better performance and stability, especially with large PDFs + pdf_loader = DirectoryLoader( path, + glob="*.pdf", + loader_cls=PyPDFLoader, # type: ignore[arg-type] + loader_kwargs={"mode": "single"}, recursive=recursive, silent_errors=silent_errors, ) - #loader = DirectoryLoader(path, show_progress=True, loader_kwargs=text_loader_kwargs, recursive=recursive, glob=["*.csv", "*.docx", "*.eml", "*.epub", "*.html", "*.json", "*.md", "*.odt", "*.pdf", "*.ppt", "*.pptx", "*.rst", "*.rtf", "*.txt", "*.tsv", "*.cls", "*.xlsx", "*.xml"]) - #external dependencies # doc - requires libreoffice # epub - requires pandoc #loader = DirectoryLoader(path, show_progress=True, silent_errors=True, recursive=False) try: - loaded_documents = loader.load() + pdf_loader.load() + other_docs = loader.load() + pdf_docs = pdf_loader.load() + loaded_documents = other_docs + pdf_docs except Exception as e: logger.error(f"Error loading documents from directory: {e}") return [] @@ -107,9 +111,14 @@ def load_from_directory(path: str, recursive: Union[bool, str], silent_errors: b logger.info("Loading process stopped by user.") break try: - # generate a unique hash for the document content if isinstance(doc, Document): - content_hash = hashlib.sha256(doc.page_content.encode('utf-8')).hexdigest() + # Hash raw file bytes so the Document carries a stable, source-level hash + # directly comparable to the value stored in Qdrant by get_documents(). + try: + with open(doc.metadata["source"], "rb") as _f: + content_hash = hashlib.sha256(_f.read()).hexdigest() + except OSError: + content_hash = hashlib.sha256(doc.page_content.encode("utf-8")).hexdigest() doc.metadata["content_hash"] = content_hash # get file metadata @@ -159,6 +168,3 @@ def load_from_directory(path: str, recursive: Union[bool, str], silent_errors: b else: logger.warning(f"No documents found in directory: {path}") return documents - - - diff --git a/learn2rag/importer/loaders/drupal_loader.py b/learn2rag/importer/loaders/drupal_loader.py index a6a731a..e274783 100644 --- a/learn2rag/importer/loaders/drupal_loader.py +++ b/learn2rag/importer/loaders/drupal_loader.py @@ -11,13 +11,14 @@ Author: Kyrill Meyer Institution: IFDT -Version: 0.0.1 +Version: 0.0.2 Creation Date: March 17, 2026 -Last Modified: March 17, 2026 +Last Modified: April 24, 2026 """ import hashlib import logging +from datetime import datetime, timezone from typing import Any, Dict, List, Optional from bs4 import BeautifulSoup from langchain_core.documents import Document @@ -77,7 +78,7 @@ def _build_session(auth_type: str, username: str, password: str, token: str) -> def _html_to_text(html: str) -> str: """Strip HTML tags and return plain text.""" soup = BeautifulSoup(html, "html.parser") - return soup.get_text(separator="\n", strip=True) + return str(soup.get_text(separator="\n", strip=True)) def _extract_page_content(attributes: Dict[str, Any], text_fields: List[str]) -> str: @@ -140,6 +141,7 @@ def load_from_drupal( text_fields: Optional[List[str]] = None, page_size: int = 50, language: str = "", + since: Optional[datetime] = None, ) -> List[Document]: """ Load documents from a Drupal instance via the JSON:API. @@ -193,6 +195,15 @@ def load_from_drupal( "page[offset]": 0, } + # Timestamp filter: only documents changed on or after `since` + if since is not None: + # Ensure the timestamp is timezone-aware and formatted as ISO 8601 for JSON:API + since_utc = since.astimezone(timezone.utc) if since.tzinfo else since.replace(tzinfo=timezone.utc) + params["filter[changed-filter][condition][path]"] = "changed" + params["filter[changed-filter][condition][operator]"] = ">=" + params["filter[changed-filter][condition][value]"] = since_utc.isoformat() + logger.info(f"DrupalLoader: applying since-filter >= {since_utc.isoformat()} for '{content_type}'") + page_count = 0 next_url: Optional[str] = endpoint @@ -301,3 +312,100 @@ def load_from_drupal( logger.info(f"DrupalLoader: finished. Total documents loaded: {len(all_documents)}") return all_documents + + +def get_all_drupal_document_ids( + base_url: str, + content_types: List[str], + auth_type: str = "none", + username: str = "", + password: str = "", + token: str = "", + page_size: int = 100, + language: str = "", +) -> List[str]: + """ + Retrieve the source URL for every current node in Drupal without loading content. + + Intended for deletion detection in the 2-pass delta import: compare the returned + set against the paths stored in Qdrant to find nodes that have been removed. + + Args: + base_url (str): Base URL of the Drupal site, e.g. ``"https://example.com"``. + content_types (list): List of content type machine names, e.g. ``["article", "page"]``. + auth_type (str): Authentication type: ``"none"``, ``"basic"``, or ``"token"``. + username (str): Username for Basic Auth. + password (str): Password for Basic Auth. + token (str): Bearer token for token auth. + page_size (int): Items per API page request (default 100; larger than load_from_drupal + default because only IDs are fetched, saving bandwidth). + language (str): Optional language filter passed via ``Accept-Language`` header. + + Returns: + List[str]: Source URLs of all current nodes, e.g. ``["https://example.com/node/42"]``. + """ + session = _build_session(auth_type, username, password, token) + if language: + session.headers.update({"Accept-Language": language}) + + endpoint_map = _discover_endpoint_map(base_url, session) + all_ids: List[str] = [] + + for content_type in content_types: + if stop_loading: + break + + resource_key = f"node--{content_type}" + if resource_key in endpoint_map: + endpoint = endpoint_map[resource_key] + else: + endpoint = f"{base_url.rstrip('/')}/jsonapi/node/{content_type}" + + # Request only nid + langcode (no content) to minimise bandwidth + params: Dict[str, Any] = { + "fields[node--{}]".format(content_type): "drupal_internal__nid,langcode", + "page[limit]": page_size, + "page[offset]": 0, + } + + next_url: Optional[str] = endpoint + page_count = 0 + + while next_url: + try: + if page_count == 0: + response = session.get(next_url, params=params, timeout=30) + else: + response = session.get(next_url, timeout=30) + response.raise_for_status() + data = response.json() + except requests.exceptions.RequestException as e: + logger.error(f"get_all_drupal_document_ids: Request failed for '{content_type}': {e}") + break + + items = data.get("data", []) + if not items: + break + + for item in items: + attributes: Dict[str, Any] = item.get("attributes", {}) + node_id: str = item.get("id", "") + drupal_id: str = str(attributes.get("drupal_internal__nid", node_id)) + source_url = f"{base_url.rstrip('/')}/node/{drupal_id}" + all_ids.append(source_url) + + links = data.get("links", {}) + next_link = links.get("next") + if next_link and isinstance(next_link, dict): + next_url = next_link.get("href") + elif next_link and isinstance(next_link, str): + next_url = next_link + else: + next_url = None + + page_count += 1 + + logger.info(f"get_all_drupal_document_ids: found {len(all_ids)} IDs so far after content type '{content_type}'") + + logger.info(f"get_all_drupal_document_ids: total {len(all_ids)} document IDs") + return all_ids diff --git a/learn2rag/importer/loaders/html_loader.py b/learn2rag/importer/loaders/html_loader.py index 064562d..6f5b434 100644 --- a/learn2rag/importer/loaders/html_loader.py +++ b/learn2rag/importer/loaders/html_loader.py @@ -6,7 +6,7 @@ Author: Kyrill Meyer Institution: IFDT -Version: 0.0.6 +Version: 0.0.7 Creation Date: July 28, 2025 Last Modified: May 05, 2026 """ @@ -87,29 +87,38 @@ def load_html_content(url: str, depth: int = 0, visited: Optional[Set[str]] = No # Use UnstructuredHTMLLoader to extract content loader = UnstructuredHTMLLoader(temp_file) page_documents = loader.load() + # Compute one hash for the entire page so all sub-documents share the same value. + # This ensures that get_documents_by_loader_id can safely deduplicate by source URL + # without ambiguity caused by different hashes for chunks of the same page. + page_hash = hashlib.sha256(response.text.encode('utf-8')).hexdigest() # Extract meta properties using BeautifulSoup soup = BeautifulSoup(response.text, "html.parser") meta_tags = {meta.get("name", meta.get("property", "")): meta.get("content", "") for meta in soup.find_all("meta") if meta.get("content")} - for doc in page_documents: + # Merge all extracted elements into a single Document per URL so that + # delta-import deduplication always works on a 1:1 source→document basis. + valid_docs = [d for d in page_documents if isinstance(d, Document)] + if not valid_docs: + logger.warning(f"No valid documents extracted from {url}") + else: if stop_loading: logger.info("Loading process stopped by user.") - break - # Generate a unique hash for the document content - if isinstance(doc, Document): - content_hash = hashlib.sha256(doc.page_content.encode('utf-8')).hexdigest() - doc.metadata["content_hash"] = content_hash else: - logger.warning(f"Document is not of type Document: {type(doc)}. Skipping.") - continue - doc.metadata["source"] = url - doc.metadata["process_date"] = datetime.now().strftime("%Y-%m-%d") - doc.metadata["process_time"] = datetime.now().strftime("%H:%M:%S") - doc.metadata["loader_type"] = "HTMLLoader" - doc.metadata["meta_properties"] = meta_tags - doc.metadata["loader_id"] = loader_id - documents.extend(page_documents) + merged_content = "\n\n".join(d.page_content for d in valid_docs) + merged_doc = Document( + page_content=merged_content, + metadata={ + "source": url, + "content_hash": page_hash, + "process_date": datetime.now().strftime("%Y-%m-%d"), + "process_time": datetime.now().strftime("%H:%M:%S"), + "loader_type": "HTMLLoader", + "meta_properties": meta_tags, + "loader_id": loader_id, + }, + ) + documents.append(merged_doc) logger.info(f"Loaded content from {url}") diff --git a/learn2rag/importer/loaders/process_loaders.py b/learn2rag/importer/loaders/process_loaders.py index 4a84f91..db48d57 100644 --- a/learn2rag/importer/loaders/process_loaders.py +++ b/learn2rag/importer/loaders/process_loaders.py @@ -6,20 +6,25 @@ Author: Kyrill Meyer Institution: IFDT -Version: 0.0.5 +Version: 0.0.7 Creation Date: June 10, 2025 -Last Modified: March 17, 2026 +Last Modified: May 4, 2026 """ +import hashlib import logging -from typing import List, Dict, Any +from typing import Dict, List, Any, TYPE_CHECKING +if TYPE_CHECKING: + from learn2rag.importer.utils.import_state import ImportState +from learn2rag.pipeline.ingestion import index +from learn2rag.pipeline.store import get_documents, delete_documents, update_documents from ..globals import stop_loading from langchain_core.documents import Document from .directory_loader import load_from_directory from .csv_loader import load_from_csv from .html_loader import load_html_content -from .sharepoint_loader import load_from_sharepoint -from .drupal_loader import load_from_drupal +from .sharepoint_loader import load_from_sharepoint, get_all_sharepoint_document_ids +from .drupal_loader import load_from_drupal, get_all_drupal_document_ids # # initialize logger @@ -60,10 +65,16 @@ def process_configuration_entries(config_entries: List[Dict[str, Any]]) -> List[ documents = load_from_directory(path, recursive=recursive, silent_errors=silent_errors, loader_id=loader_id) logger.info(f"Loaded {len(documents)} documents from {path} using {loader_type} for configuration entry with loader_id: {loader_id}.") elif loader_type == "CSVLoader": + path = entry.get("path") if not path: logger.error("Missing 'path' for 'CSVLoader' in configuration entry.") continue documents = load_from_csv(path) + # CSVLoader does not set loader_id, source or content_hash — populate them here + for doc in documents: + doc.metadata["loader_id"] = loader_id + doc.metadata["source"] = path + doc.metadata["content_hash"] = hashlib.sha256(doc.page_content.encode("utf-8")).hexdigest() logger.info(f"Loaded {len(documents)} documents from {path} using {loader_type}.") elif loader_type == "HTMLLoader": url = entry.get("url") @@ -152,3 +163,291 @@ def process_configuration_entries(config_entries: List[Dict[str, Any]]) -> List[ logger.error(f"Error processing entry {entry}: {e}") return all_documents + + +def process_delta_imports( + config_entries: List[Dict[str, Any]], + user_config: Dict[str, Any], + opt_config: Dict[str, Any], + import_state: "ImportState", +) -> None: + """ + Perform a delta import for all configured loaders. + + Dispatches between two strategies: + + - **Intelligent loaders** (DrupalLoader, SharepointLoader): 2-pass approach — + fetch all current document IDs to detect deletions, then load only changed + documents using a server-side timestamp filter. + - **Normal loaders** (DirectoryLoader, HTMLLoader, CSVLoader): full load followed + by content-hash comparison to detect additions, changes, and deletions. + + The import timestamp is only persisted on successful completion, so a failed run + will be retried in full on the next call. + + Args: + config_entries (List[Dict[str, Any]]): Loader configuration entries from the + importer config file. + user_config (Dict[str, Any]): User configuration dict (must contain + ``collection_name``). + opt_config (Dict[str, Any]): Optimisation configuration dict. + import_state (ImportState): ImportState instance for timestamp management. + """ + from datetime import datetime, timezone + + for entry in config_entries: + if stop_loading: + logger.info("Delta import stopped by user.") + break + + loader_type = entry.get("loader_type") + loader_id = entry.get("loader_id") or "" + + if not loader_type or not loader_id: + logger.error(f"process_delta_imports: entry missing loader_type or loader_id: {entry}") + continue + + try: + last_import_time = import_state.get_last_import_time(loader_id) + import_start = datetime.now(timezone.utc) + import_state.record_import_start(loader_id, import_start) + + # Retrieve existing Qdrant documents for this loader as {source: content_hash} map + existing_map: Dict[str, str] = get_documents(loader_id, user_config, opt_config) + is_initial = len(existing_map) == 0 + + logger.info( + f"Delta import '{loader_id}': is_initial={is_initial}, " + f"last_import_time={last_import_time}, existing_docs={len(existing_map)}" + ) + + # ---------------------------------------------------------------- + # INTELLIGENT LOADERS: Drupal / SharePoint + # 2-pass: (1) fetch all current IDs → detect deletions, + # (2) load only changed documents via timestamp filter + # ---------------------------------------------------------------- + if loader_type == "DrupalLoader": + base_url = entry.get("base_url") + if not base_url: + logger.error(f"DrupalLoader '{loader_id}': missing 'base_url'") + continue + base_url = str(base_url) + content_types = entry.get("content_types", []) + auth_type = str(entry.get("auth_type", "none")) + username = str(entry.get("username", "")) + password = str(entry.get("password", "")) + token = str(entry.get("token", "")) + text_fields = entry.get("text_fields") + page_size = int(entry.get("page_size", 50)) + language = str(entry.get("language", "")) + + if is_initial or last_import_time is None: + # No prior state: fall back to full load + hash comparison + logger.info(f"Drupal '{loader_id}': full load (initial={is_initial})") + all_docs = load_from_drupal( + base_url=base_url, content_types=content_types, loader_id=loader_id, + auth_type=auth_type, username=username, password=password, token=token, + text_fields=text_fields, page_size=page_size, language=language, + ) + if is_initial: + index(all_docs, user_config, opt_config) + else: + # Hash comparison: replace changed, remove deleted + _delta_by_source(all_docs, existing_map, loader_id, user_config, opt_config) + else: + # 2-pass delta + logger.info(f"Drupal '{loader_id}': 2-pass delta since {last_import_time.isoformat()}") + # Pass 1: fetch all current IDs to detect deleted documents + current_ids = set(get_all_drupal_document_ids( + base_url=base_url, content_types=content_types, + auth_type=auth_type, username=username, password=password, token=token, + page_size=page_size, language=language, + )) + deleted_paths = [p for p in existing_map if p not in current_ids] + if deleted_paths: + logger.info(f"Drupal '{loader_id}': deleting {len(deleted_paths)} removed documents") + delete_documents(loader_id, deleted_paths, user_config, opt_config) + + # Pass 2: load and index changed documents + changed_docs = load_from_drupal( + base_url=base_url, content_types=content_types, loader_id=loader_id, + auth_type=auth_type, username=username, password=password, token=token, + text_fields=text_fields, page_size=page_size, language=language, + since=last_import_time, + ) + sources_to_delete = [doc.metadata.get("source", "") for doc in changed_docs] + if sources_to_delete: + delete_documents(loader_id, sources_to_delete, user_config, opt_config) + index(changed_docs, user_config, opt_config) + logger.info(f"Drupal '{loader_id}': {len(deleted_paths)} deleted, {len(changed_docs)} updated") + + elif loader_type == "SharepointLoader": + client_id = entry.get("client_id", "") + client_secret = entry.get("client_secret", "") + document_library_id = entry.get("document_library_id", "") + folder_path = entry.get("folder_path") + folder_id = entry.get("folder_id") + recursive = entry.get("recursive", False) + auth_with_token = entry.get("auth_with_token", True) + reset_token = entry.get("reset_token", False) + tenant_id = entry.get("tenant_id", "common") + site_id = entry.get("site_id") + + if is_initial or last_import_time is None: + logger.info(f"SharePoint '{loader_id}': full load (initial={is_initial})") + all_docs = load_from_sharepoint( + client_id=client_id, client_secret=client_secret, + document_library_id=document_library_id, folder_path=folder_path, + folder_id=folder_id, recursive=recursive, auth_with_token=auth_with_token, + reset_token=reset_token, tenant_id=tenant_id, site_id=site_id, + loader_id=loader_id, + ) + if is_initial: + index(all_docs, user_config, opt_config) + else: + _delta_by_source(all_docs, existing_map, loader_id, user_config, opt_config) + else: + logger.info(f"SharePoint '{loader_id}': 2-pass delta since {last_import_time.isoformat()}") + # Pass 1: fetch all current URLs to detect deleted documents + current_ids = set(get_all_sharepoint_document_ids( + client_id=client_id, client_secret=client_secret, + document_library_id=document_library_id, folder_path=folder_path, + folder_id=folder_id, recursive=recursive, auth_with_token=auth_with_token, + reset_token=reset_token, tenant_id=tenant_id, site_id=site_id, + )) + deleted_paths = [p for p in existing_map if p not in current_ids] + if deleted_paths: + logger.info(f"SharePoint '{loader_id}': deleting {len(deleted_paths)} removed documents") + delete_documents(loader_id, deleted_paths, user_config, opt_config) + + # Pass 2: load and index changed documents + changed_docs = load_from_sharepoint( + client_id=client_id, client_secret=client_secret, + document_library_id=document_library_id, folder_path=folder_path, + folder_id=folder_id, recursive=recursive, auth_with_token=auth_with_token, + reset_token=reset_token, tenant_id=tenant_id, site_id=site_id, + loader_id=loader_id, since=last_import_time, + ) + sources_to_delete = [doc.metadata.get("source", "") for doc in changed_docs] + if sources_to_delete: + delete_documents(loader_id, sources_to_delete, user_config, opt_config) + index(changed_docs, user_config, opt_config) + logger.info(f"SharePoint '{loader_id}': {len(deleted_paths)} deleted, {len(changed_docs)} updated") + + # ---------------------------------------------------------------- + # NORMAL LOADERS: Directory / HTML / CSV — hash comparison + # ---------------------------------------------------------------- + elif loader_type == "DirectoryLoader": + path = entry.get("path") + if not path: + logger.error(f"DirectoryLoader '{loader_id}': missing 'path'") + continue + all_docs = load_from_directory( + path, + recursive=entry.get("recursive", False), + silent_errors=entry.get("silent_errors", True), + loader_id=loader_id, + ) + if is_initial: + index(all_docs, user_config, opt_config) + else: + _delta_by_source(all_docs, existing_map, loader_id, user_config, opt_config) + + elif loader_type == "HTMLLoader": + url = entry.get("url") + depth = entry.get("depth", 0) + if not url: + logger.error(f"HTMLLoader '{loader_id}': missing 'url'") + continue + all_docs = load_html_content(url, depth=depth, loader_id=loader_id) + if is_initial: + index(all_docs, user_config, opt_config) + else: + _delta_by_source(all_docs, existing_map, loader_id, user_config, opt_config) + + elif loader_type == "CSVLoader": + path = entry.get("path") + if not path: + logger.error(f"CSVLoader '{loader_id}': missing 'path'") + continue + all_docs = load_from_csv(path) + for doc in all_docs: + doc.metadata["loader_id"] = loader_id + doc.metadata["source"] = path + doc.metadata["content_hash"] = hashlib.sha256(doc.page_content.encode("utf-8")).hexdigest() + if is_initial: + index(all_docs, user_config, opt_config) + else: + _delta_by_source(all_docs, existing_map, loader_id, user_config, opt_config) + + else: + logger.error(f"process_delta_imports: unknown loader_type '{loader_type}' for loader_id '{loader_id}'") + continue + + import_state.save_success(loader_id) + logger.info(f"Delta import '{loader_id}': completed successfully.") + + except Exception as e: + logger.error(f"process_delta_imports: error processing loader '{loader_id}': {e}", exc_info=True) + + +def _delta_by_source( + all_docs: List[Document], + existing_map: Dict[str, str], + loader_id: str, + user_config: Dict[str, Any], + opt_config: Dict[str, Any], +) -> None: + """ + Hash-based delta import for normal loaders (DirectoryLoader, HTMLLoader, CSVLoader). + + Each loader guarantees exactly one Document per source (PDF pages merged, HTML + elements merged), so ``content_hash`` on that Document is the raw-file hash and + directly comparable to the value stored in Qdrant by ``get_documents()``. + + - Deletes Qdrant chunks (bulk) for sources that no longer exist in the new load. + - Calls ``update_documents`` for sources whose content_hash has changed or that + are entirely new (update_documents handles delete-then-reindex internally). + - Leaves unchanged sources untouched. + + Args: + all_docs (List[Document]): All documents returned by the loader for this run. + existing_map (Dict[str, str]): Mapping of ``{source: content_hash}``. + loader_id (str): Unique loader identifier. + user_config (Dict[str, Any]): User configuration dict (must contain + ``collection_name``). + opt_config (Dict[str, Any]): Optimisation configuration dict. + """ + # Group freshly loaded documents by source (1 source = 1 Document after loader merging) + new_docs_by_source: Dict[str, List[Document]] = {} + for doc in all_docs: + source: str = doc.metadata.get("source", "") + new_docs_by_source.setdefault(source, []).append(doc) + + # All loaders guarantee exactly one Document per source (PDF pages merged, + # HTML elements merged), so content_hash on docs[0] is directly comparable + # to the value returned by get_documents() from Qdrant. + new_hash_by_source: Dict[str, str] = { + source: docs[0].metadata.get("content_hash", "") + for source, docs in new_docs_by_source.items() + } + + # Bulk-delete sources that are no longer present in the fresh load + deleted_sources: List[str] = [s for s in existing_map if s not in new_docs_by_source] + if deleted_sources: + delete_documents(loader_id, deleted_sources, user_config, opt_config) + + # Update changed and new sources via update_documents (delete-then-reindex) + changed_docs: List[Document] = [] + for source, docs in new_docs_by_source.items(): + if existing_map.get(source) != new_hash_by_source[source]: + changed_docs.extend(docs) + + if changed_docs: + update_documents(loader_id, changed_docs, user_config, opt_config) + + changed_source_count = len(set(d.metadata.get("source", "") for d in changed_docs)) + logger.info( + f"_delta_by_source '{loader_id}': {len(deleted_sources)} deleted, " + f"{len(changed_docs)} chunks re-indexed from {changed_source_count} changed sources" + ) diff --git a/learn2rag/importer/loaders/sharepoint_loader.py b/learn2rag/importer/loaders/sharepoint_loader.py index 819b94a..05c6398 100644 --- a/learn2rag/importer/loaders/sharepoint_loader.py +++ b/learn2rag/importer/loaders/sharepoint_loader.py @@ -7,15 +7,17 @@ and Site-Specific contexts. Author: Kyrill Meyer -Version: 0.0.5 +Version: 0.0.7 Institution: IFDT Creation Date: January 14, 2026 -Last Modified Date: March 17, 2026 +Last Modified Date: May 18, 2026 """ + +import hashlib import logging -import os import tempfile import shutil +from datetime import datetime, timezone from pathlib import Path from typing import List, Optional, Any, Union from langchain_community.document_loaders import UnstructuredFileLoader, TextLoader, UnstructuredExcelLoader, PyPDFLoader @@ -31,6 +33,9 @@ def _parse_file(file_path: Path, original_item: Any, loader_id: str = "N/A") -> Parses file using the robust UnstructuredFileLoader. """ docs: List[Document] = [] + # One hash for the entire file so all chunks share the same value, + # enabling unambiguous deduplication by source URL in get_documents_by_loader_id. + file_hash = hashlib.sha256(file_path.read_bytes()).hexdigest() # Check file extension (lowercase) suffix = file_path.suffix.lower() @@ -70,7 +75,7 @@ def _parse_file(file_path: Path, original_item: Any, loader_id: str = "N/A") -> # SPECIAL HANDLING FOR PDF (using PyPDFLoader to avoid unstructured dependencies) elif suffix == ".pdf": logger.info(f"Detected PDF file: {file_path.name} - using PyPDFLoader") - loader = PyPDFLoader(str(file_path)) + loader = PyPDFLoader(str(file_path), mode="single") docs = loader.load() # SPECIAL HANDLING FOR IMAGES (Skip due to broken OCR environment) @@ -105,7 +110,8 @@ def _parse_file(file_path: Path, original_item: Any, loader_id: str = "N/A") -> "created": str(original_item.created), "modified": str(original_item.modified), "loader": "SharePointLoader", - "loader_id": loader_id + "loader_id": loader_id, + "content_hash": file_hash, }) return docs @@ -137,11 +143,13 @@ def _parse_file(file_path: Path, original_item: Any, loader_id: str = "N/A") -> for doc in docs: doc.metadata.update({ "source": original_item.web_url, - "sharepoint_id": original_item.object_id, + "document_id": original_item.object_id, "name": original_item.name, "created": str(original_item.created), "modified": str(original_item.modified), - "loader": "SharePointLoader" + "loader": "SharePointLoader", + "loader_id": loader_id, + "content_hash": file_hash, }) return docs @@ -234,7 +242,7 @@ def _list_available_drives(account: Account, search_term: Optional[str] = None) except Exception as e: logger.error(f"Error while listing available drives: {e}") -def _load_items_manual_traversal(drive: Any, folder_id: Optional[str] = None, recursive: bool = True, loader_id: str = "N/A") -> List[Document]: +def _load_items_manual_traversal(drive: Any, folder_id: Optional[str] = None, recursive: bool = True, loader_id: str = "N/A", since: Optional[datetime] = None) -> List[Document]: """ Internal helper to manually traverse and load items into Document objects. This bypasses LangChain's internal 'storage()' call which fails in App-Only context. @@ -276,6 +284,17 @@ def _process_folder_items(item_list: Any) -> None: elif item.is_file: try: + # Seit-Filter: Dateien überspringen, die vor `since` zuletzt geändert wurden + if since is not None: + item_modified = item.modified + if item_modified is not None: + # Sicherstellen, dass beide tz-aware sind + since_utc = since.astimezone(timezone.utc) if since.tzinfo else since.replace(tzinfo=timezone.utc) + item_modified_utc = item_modified.astimezone(timezone.utc) if item_modified.tzinfo else item_modified.replace(tzinfo=timezone.utc) + if item_modified_utc < since_utc: + logger.debug(f"Skipping unchanged file (modified={item_modified_utc.isoformat()}): {item.name}") + continue + # 2. Download file download_success = item.download(to_path=temp_dir) @@ -307,12 +326,150 @@ def _process_folder_items(item_list: Any) -> None: return documents +def _list_items_web_urls(drive: Any, folder_id: Optional[str] = None, recursive: bool = True) -> List[str]: + """ + Traverse SharePoint files without downloading and collect their web URLs. + + Intended for deletion detection in the 2-pass delta import: compare the returned + set against the paths stored in Qdrant to find files that have been removed. + + Args: + drive (Any): Authenticated O365 Drive object. + folder_id (Optional[str]): Object ID of the folder to start from. + Uses the drive root when ``None``. + recursive (bool): Whether to traverse sub-folders recursively (default ``True``). + + Returns: + List[str]: Web URLs of all files found, e.g. + ``["https://tenant.sharepoint.com/.../file.pdf"]``. + """ + urls: List[str] = [] + + try: + if folder_id: + folder = drive.get_item(folder_id) + else: + folder = drive.get_root_folder() + + items = folder.get_items() + except Exception as e: + logger.error(f"_list_items_web_urls: error accessing folder: {e}") + return urls + + for item in items: + try: + if item.is_folder and recursive: + sub_urls = _list_items_web_urls(drive, folder_id=item.object_id, recursive=recursive) + urls.extend(sub_urls) + elif item.is_file: + if item.web_url: + urls.append(item.web_url) + except Exception as e: + logger.warning(f"_list_items_web_urls: error processing item {getattr(item, 'name', '?')}: {e}") + + return urls + + +def get_all_sharepoint_document_ids( + client_id: str, + client_secret: str, + document_library_id: str, + folder_path: Optional[str] = None, + folder_id: Optional[str] = None, + recursive: bool = False, + auth_with_token: bool = True, + reset_token: bool = False, + tenant_id: str = "common", + site_id: Optional[str] = None, +) -> List[str]: + """ + Retrieve the web URL for every file in a SharePoint document library without loading content. + + Intended for deletion detection in the 2-pass delta import: compare the returned + set against the paths stored in Qdrant to find files that have been removed. + + Args: + client_id (str): Azure AD application (client) ID. + client_secret (str): Azure AD client secret. + document_library_id (str): GUID of the SharePoint document library (Drive ID). + folder_path (Optional[str]): Slash-separated path to a sub-folder relative to + the library root, e.g. ``"Docs/Reports"``. + folder_id (Optional[str]): Object ID of the entry-point folder; takes precedence + over ``folder_path`` when both are provided. + recursive (bool): Whether to traverse sub-folders recursively (default ``False``). + auth_with_token (bool): Use cached O365 token when available (default ``True``). + reset_token (bool): Delete the cached token before authenticating (default ``False``). + tenant_id (str): Azure AD tenant ID or ``"common"`` (default). + site_id (Optional[str]): SharePoint site ID; when provided, the library is looked + up on that specific site rather than the root site. + + Returns: + List[str]: Web URLs of all files found, e.g. + ``["https://tenant.sharepoint.com/.../file.pdf"]``. + """ + if reset_token: + reset_o365_token() + + token_path = Path.home() / ".credentials" / "o365_token.txt" + token_backend = FileSystemTokenBackend(token_path=Path.home() / ".credentials", token_filename="o365_token.txt") + + if (not auth_with_token) or (not token_path.exists()): + if tenant_id and tenant_id != "common": + _authenticate_directly_with_o365(client_id, client_secret, tenant_id) + else: + logger.error("get_all_sharepoint_document_ids: No valid authentication method available.") + return [] + + account = Account((client_id, client_secret), token_backend=token_backend) + + if not account.is_authenticated: + logger.error("get_all_sharepoint_document_ids: Authentication failed.") + return [] + + try: + if site_id: + sp = account.sharepoint() + site = sp.get_site(site_id) + storage = site.storage + else: + storage = account.storage() + + drive = storage.get_drive(document_library_id) + if drive is None: + logger.error(f"get_all_sharepoint_document_ids: Drive not found: {document_library_id}") + return [] + + # Optionaler Unterordner-Start + effective_folder_id = folder_id + if folder_path and not folder_id: + root = drive.get_root_folder() + for part in folder_path.strip("/").split("/"): + found = None + for child in root.get_items(): + if child.is_folder and child.name == part: + found = child + break + if found: + root = found + else: + logger.warning(f"get_all_sharepoint_document_ids: folder part '{part}' not found") + return [] + effective_folder_id = root.object_id + + return _list_items_web_urls(drive, folder_id=effective_folder_id, recursive=recursive) + + except Exception as e: + logger.error(f"get_all_sharepoint_document_ids: error: {e}") + return [] + + def load_from_sharepoint(client_id: str, client_secret: str, document_library_id: str, folder_path: Optional[str] = None, folder_id: Optional[str] = None, object_ids: Optional[List[str]] = None, recursive: bool = False, auth_with_token: bool = True, load_extended_metadata: bool = True, reset_token: bool = False, tenant_id: str = "common", - site_id: Optional[str] = None, loader_id: str = "N/A") -> List[Document]: + site_id: Optional[str] = None, loader_id: str = "N/A", + since: Optional[datetime] = None) -> List[Document]: """ Load documents from SharePoint and set metadata. """ @@ -372,7 +529,7 @@ def load_from_sharepoint(client_id: str, client_secret: str, document_library_id # Load documents using internal helper function # Use folder_id if provided, otherwise use Root of the Drive - loaded_docs = _load_items_manual_traversal(drive, folder_id=folder_id, recursive=recursive, loader_id=loader_id) + loaded_docs = _load_items_manual_traversal(drive, folder_id=folder_id, recursive=recursive, loader_id=loader_id, since=since) logger.info(f"Found {len(loaded_docs)} documents.") diff --git a/learn2rag/importer/main.py b/learn2rag/importer/main.py index ca727c6..e309161 100644 --- a/learn2rag/importer/main.py +++ b/learn2rag/importer/main.py @@ -6,9 +6,9 @@ Author: Kyrill Meyer Institution: IFDT -Version: 0.0.3 +Version: 0.0.4 Creation Date: June 10, 2025 -Last Modified: February 20, 2026 +Last Modified: May 20, 2026 """ import argparse @@ -22,25 +22,62 @@ from .config.config_constants import LOGS_DIR, VERSION from .utils.logging_setup import setup_logging from .utils.config_loader import load_json_config, validate_config_entry -from .loaders.process_loaders import process_configuration_entries +from .loaders.process_loaders import process_configuration_entries, process_delta_imports +from .utils.import_state import ImportState +from learn2rag.pipeline.ingestion import index logger = logging.getLogger("Learn2RAGImporter") +statusLogger = logging.getLogger('status') warnings.filterwarnings("ignore", category=SyntaxWarning, module="magic") # Suppress SyntaxWarnings from the 'magic' module class ImporterArgumentParser(argparse.ArgumentParser): + """Argument parser for the Learn2RAG importer. + + Arguments: + --config Path to the importer config JSON file. + Default: config.json bundled with the package. + --state-file Path to the import-state JSON file that persists per-loader + last-import timestamps across runs. + Default: import_state.json placed next to --config. + --delta Run a delta import instead of a full import. + Intelligent loaders (Drupal, SharePoint) fetch only documents + changed since the last run; plain loaders (Directory, HTML, CSV) + perform a SHA-256 content-hash comparison against the current + Qdrant index and only update changed documents. + On the very first run (no state file) a full import is performed + automatically. + --save-documents Write all loaded documents to loaded_documents.json in the + current working directory after a full import. + Intended for debugging and backwards compatibility only; + disabled by default. + + Environment variables (read by main(), not CLI arguments): + PIPELINE_USER_CONFIG Path to the pipeline user_config.json. + Default: learn2rag/pipeline/user_config.json + PIPELINE_OPT_CONFIG Path to the pipeline opt_config.json. + Default: learn2rag/pipeline/opt_config.json + """ + def __init__(self) -> None: super().__init__() json_config_path = importlib.resources.files("learn2rag.importer.config") / "config.json" - self.add_argument('--config', default=str(json_config_path)) + self.add_argument('--config', default=str(json_config_path), + help='path to the importer config JSON (default: bundled config.json)') + self.add_argument('--state-file', default=None, + help='path to the import state JSON file (default: import_state.json next to --config)') + self.add_argument('--delta', action='store_true', + help='perform a delta import instead of a full import') + self.add_argument('--save-documents', action='store_true', + help='write loaded documents to loaded_documents.json (debug/backwards-compat only)') def init(args: argparse.Namespace) -> None: # Display a small textual description about the app print("------------------------------------------------------------") print("Learn2RAG Importer - DataImporter for Learn2RAG.") - print(f"Version: {VERSION} | Author: IFDT (KM) | Date: February 20, 2026\n") + print(f"Version: {VERSION} | Author: IFDT (KM) | Date: May 20, 2026\n") print("https://github.com/Learn2RAG/") print("------------------------------------------------------------\n") @@ -64,39 +101,69 @@ def init(args: argparse.Namespace) -> None: #main function to run the application def main(args: argparse.Namespace) -> None: - # Load JSON configuration + statusLogger.info('Import started') try: config = load_json_config(args.config) logger.debug("Configuration loaded successfully, starting validation...") - # Validate each entry in the configuration + # load pipeline configuration for user and opt settings, needed for delta-import and indexing + user_config_path = os.environ.get("PIPELINE_USER_CONFIG", "learn2rag/pipeline/user_config.json") + opt_config_path = os.environ.get("PIPELINE_OPT_CONFIG", "learn2rag/pipeline/opt_config.json") + with open(user_config_path, "r", encoding="utf-8") as f: + user_config = json.load(f) + with open(opt_config_path, "r", encoding="utf-8") as f: + opt_config = json.load(f) + logger.debug("Pipeline configuration loaded from '%s' and '%s'.", user_config_path, opt_config_path) + + # save import state file next to the importer config if not explicitly specified + + state_file_path = args.state_file if args.state_file else str(Path(args.config).parent / "import_state.json") + import_state = ImportState(state_file_path) + + # validate configuration entries before processing to avoid partial imports and ensure all issues are caught upfront validation_errors = False - for index, entry in enumerate(config.get("loaders", []), start=1): + for entry_idx, entry in enumerate(config.get("loaders", []), start=1): try: loader_type = entry.get("loader_type", "Unknown") - logger.debug(f"Validated configuration entry {index}: {loader_type}") + logger.debug(f"Validated configuration entry {entry_idx}: {loader_type}") validate_config_entry(entry) except ValueError as e: - logger.error(f"Validation error in configuration entry {index}: {e}") + logger.error(f"Validation error in configuration entry {entry_idx}: {e}") validation_errors = True - # Process configuration entries and load documents if not validation_errors: - all_documents = process_configuration_entries(config.get("loaders", [])) - logger.debug(f"Total documents loaded: {len(all_documents)}") - - # Optional: Speichern der Dokumente in einer Datei - output_path = "loaded_documents.json" - with open(output_path, "w", encoding="utf-8") as f: - json.dump([{"metadata": doc.metadata, "content": doc.page_content} for doc in all_documents], f, ensure_ascii=False, indent=4) - - logger.info('Documents saved to: %s', output_path) + if args.delta: + # Delta-Import: Hash-/Timestamp-comparison, direct ingest in Qdrant, + logger.info("Running delta import (state file: %s)", state_file_path) + process_delta_imports( + config_entries=config.get("loaders", []), + user_config=user_config, + opt_config=opt_config, + import_state=import_state, + ) + else: + # full import: all documents load and directly ingest in Qdrant + logger.info("Running full import") + all_documents = process_configuration_entries(config.get("loaders", [])) + logger.debug(f"Total documents loaded: {len(all_documents)}") + index(all_documents, user_config, opt_config) + + # JSON-Dump für Rückwärtskompatibilität (nur mit --save-documents) + if args.save_documents: + output_path = "loaded_documents.json" + with open(output_path, "w", encoding="utf-8") as f: + json.dump([{"metadata": doc.metadata, "content": doc.page_content} for doc in all_documents], f, ensure_ascii=False, indent=4) + logger.debug('Documents saved to: %s', output_path) + + statusLogger.info('Import finished') else: logger.error("Configuration validation failed. No documents were processed.") + statusLogger.error('Import failed') sys.exit(1) except Exception as e: logger.error(f"Error loading configuration: {e}") + statusLogger.error('Import failed') sys.exit(1) diff --git a/learn2rag/importer/readme.md b/learn2rag/importer/readme.md index 8533e48..955aaa7 100644 --- a/learn2rag/importer/readme.md +++ b/learn2rag/importer/readme.md @@ -1,9 +1,9 @@ # Learn2RAG Importer -An importer for document sources used within the Learn2RAG pipeline. It reads a `config.json`, delegates loading to the appropriate loader, enriches documents with metadata, and writes results to `loaded_documents.json`. +An importer for document sources used within the Learn2RAG pipeline. It reads a `config.json`, delegates loading to the appropriate loader, enriches documents with metadata, and ingests them directly into Qdrant. **Author:** IFDT, KM -**Version:** 0.0.5 +**Version:** 0.0.9 --- @@ -48,8 +48,8 @@ graph TD G --> I H --> I I --> J[Enrich metadata] - J --> K[loaded_documents.json] - K --> L[Pipeline input] + J --> L[Qdrant] + J -.->|--save-documents| K[loaded_documents.json] classDef purple fill:#9370db,stroke:#000000,stroke-width:2px A:::purple @@ -71,9 +71,57 @@ graph TD ## Running the importer ```bash -python -m learn2rag.importer +python -m learn2rag.importer [OPTIONS] ``` +### CLI options + +| Option | Default | Description | +|---|---|---| +| `--config PATH` | bundled `config.json` | Path to the importer config JSON file | +| `--state-file PATH` | `import_state.json` next to `--config` | Path to the import-state JSON file that persists per-loader timestamps across runs | +| `--delta` | off | Run a delta import instead of a full import (see [Delta import](#delta-import)) | +| `--save-documents` | off | Write all loaded documents to `loaded_documents.json` in the working directory (debug / backwards compatibility) | + +### Environment variables + +The importer reads the pipeline configuration directly from environment variables. These must be set (or the default paths must exist) before running. + +| Variable | Default | Description | +|---|---|---| +| `PIPELINE_USER_CONFIG` | `learn2rag/pipeline/user_config.json` | Path to the pipeline `user_config.json` | +| `PIPELINE_OPT_CONFIG` | `learn2rag/pipeline/opt_config.json` | Path to the pipeline `opt_config.json` | + +### Examples + +**Full import** — loads all documents and ingests them into Qdrant: +```bash +python -m learn2rag.importer --config /data/config.json +``` + +**Delta import** — only processes new or changed documents: +```bash +python -m learn2rag.importer --config /data/config.json --delta +``` + +**Full import with debug output:** +```bash +python -m learn2rag.importer --config /data/config.json --save-documents +``` + +--- + +## Delta import + +With `--delta` the importer uses a loader-specific strategy to minimise the number of documents re-processed: + +- **Intelligent loaders** (DrupalLoader, SharepointLoader): 2-pass approach — fetch all current document IDs to detect deletions, then load only documents changed since the last successful run via a server-side timestamp filter. +- **Plain loaders** (DirectoryLoader, HTMLLoader, CSVLoader): full load followed by SHA-256 content-hash comparison against the existing Qdrant index to detect additions, changes, and deletions. + +The import timestamp for each loader is only persisted after a **successful** run. A failed run will therefore be retried in full on the next call. + +On the **very first run** (empty state file or empty Qdrant collection) a full import is performed automatically regardless of the `--delta` flag. + --- ## Configuration @@ -98,7 +146,9 @@ Edit `config/config.json` to define one or more loaders. Each entry requires at ## Output -Results are written to `loaded_documents.json` in the project root. Each document entry contains a `metadata` object and a `content` string: +By default, documents are ingested **directly into Qdrant** without writing any local file. The `loaded_documents.json` file is only written when `--save-documents` is passed (useful for debugging or backwards compatibility). + +Each document entry contains a `metadata` object and a `content` string: ```json [ @@ -422,9 +472,21 @@ where - v0.0.6 - removed permission information from metadata in directory loader - added SharePoint Loader -- v0.0.7 +- v0.0.7 - added type checks - v0.0.8 - added loader_id for all loaders - improved error handling and dependency checking for directory loader - - added drupal_loader \ No newline at end of file + - added drupal_loader +- v0.0.9 + - unified `source` field as document identifier across all loaders in metadata (directory: file path, HTML: URL, SharePoint: web URL, Drupal: node URL, CSV: file path) + - delta import now uses `get_documents` from the pipeline + - hash comparison now uses sorted chunk hashes per source for stable results + - **Breaking change:** Qdrant payload field renamed from `path` → `source`; existing collections must be deleted and re-imported +- v0.1.0 + - documents are now ingested directly into Qdrant instead of being written to `loaded_documents.json` + - added `--delta` flag to run a delta import (hash/timestamp comparison, direct Qdrant update) + - added `--state-file` flag to override the path of the per-loader import-state JSON + - added `--save-documents` flag to optionally write `loaded_documents.json` for debugging / backwards compatibility + - pipeline configuration (`user_config`, `opt_config`) is now read from `PIPELINE_USER_CONFIG` / `PIPELINE_OPT_CONFIG` environment variables + - loop variable `index` renamed to `entry_idx` to avoid shadowing the `index()` import from `learn2rag.pipeline.ingestion` \ No newline at end of file diff --git a/learn2rag/importer/tests/test_loaders.py b/learn2rag/importer/tests/test_loaders.py index 2ca604b..8b14b98 100644 --- a/learn2rag/importer/tests/test_loaders.py +++ b/learn2rag/importer/tests/test_loaders.py @@ -1,13 +1,47 @@ import json +import os +import pathlib +import shutil +import sys +import tempfile import unittest -from pathlib import Path -from typing import Any +from collections import defaultdict +from typing import Any, ClassVar, DefaultDict, List, Optional, Set from unittest.mock import patch, MagicMock +from langchain_core.documents import Document from ..loaders.directory_loader import load_from_directory from ..loaders.html_loader import load_html_content, _is_same_site +# Set RUN_INTEGRATION_TESTS=1 to run tests that require network access. +_RUN_INTEGRATION: bool = os.environ.get("RUN_INTEGRATION_TESTS", "0") == "1" + class ImporterLoadersTestCase(unittest.TestCase): + """Tests for directory and HTML loaders. Runs fully offline without Qdrant.""" + + test_path: ClassVar[str] + _temp_dir: ClassVar[Optional[str]] + + @classmethod + def setUpClass(cls) -> None: + env_path = os.environ.get("TEST_IMPORT_PATH", "") + if env_path and pathlib.Path(env_path).is_dir(): + cls.test_path = env_path + cls._temp_dir = None + else: + cls._temp_dir = tempfile.mkdtemp(prefix="learn2rag_test_") + cls.test_path = cls._temp_dir + (pathlib.Path(cls._temp_dir) / "sample.txt").write_text( + "This is a test document.\nIt has multiple lines of content.\nLine three.", + encoding="utf-8", + ) + + @classmethod + def tearDownClass(cls) -> None: + if cls._temp_dir: + shutil.rmtree(cls._temp_dir, ignore_errors=True) + + @unittest.skipUnless(_RUN_INTEGRATION, "Set RUN_INTEGRATION_TESTS=1 to run") def test_remote_url(self) -> None: docs = load_html_content('https://learn2rag.de') assert len(docs) == 1 @@ -15,36 +49,75 @@ def test_remote_url(self) -> None: assert 'source' in doc.metadata assert 'wird das Projekt von einem Konsortium führender wissenschaftlicher Institutionen und Softwareentwickler durchgeführt und durch weitreichende Unternehmensnetzwerke unterstützt' in doc.page_content - # def test_data_uri(self): - # docs = load_html_content('data:text/html;charset=utf-8,%3Cbody%3E%3Cp%3EData%20URI%20content%3C%2Fp%3E%3C%2Fbody%3E') - # assert len(docs) == 1 - # doc, = docs - # assert doc.page_content == 'Data URI content' + def test_import_directory(self) -> None: + """Loads files from test_path, prints metadata incl. content_hash, and + verifies that each source yields exactly one Document with a stable hash. + + Intentionally Qdrant-free: only the loader and hash consistency are tested. + Set env var SKIP_HASH_ASSERT=1 to print-only without assertions (debugging). + """ + skip_assert = os.environ.get("SKIP_HASH_ASSERT", "0") == "1" - # TODO: actual tests + docs: List[Document] = load_from_directory( + self.test_path, recursive=False, loader_id="test_import" + ) + + def _safe(text: str, limit: int = 500) -> str: + """Truncate and replace unencodable characters for safe terminal output.""" + encoding = sys.stdout.encoding or "utf-8" + return text[:limit].encode(encoding, errors="replace").decode(encoding) + + print(f"\n=== {len(docs)} document(s) loaded from '{self.test_path}' ===") + + by_source: DefaultDict[str, List[Document]] = defaultdict(list) + for doc in docs: + by_source[doc.metadata.get("source", "?")].append(doc) - def test_import_directory(self) -> None: - """Loads files from ./data and prints what would be passed to Qdrant.""" - path = Path(__file__).parent.resolve() / 'data' - docs = load_from_directory(str(path), recursive=True, loader_id="test_import") - print(f"\n=== {len(docs)} document(s) loaded ===") for i, doc in enumerate(docs, start=1): print(f"\n--- Document {i} ---") print(f"Metadata: {json.dumps(doc.metadata, indent=2, default=str)}") - print(f"page_content (first 500 characters):\n{doc.page_content[:500]}") - self.assertTrue(len(docs) > 0, f"No documents found in: {path}") - - + print(f"page_content (first 500 chars):\n{_safe(doc.page_content)}") + + print("\n=== Hash consistency per source ===") + for source, source_docs in sorted(by_source.items()): + hashes: Set[Optional[str]] = { + d.metadata.get("content_hash", "MISSING") for d in source_docs + } + status = "OK" if len(hashes) == 1 else "MISMATCH" + print(f"[{status}] {source} ({len(source_docs)} doc(s)) hashes={hashes}") + + self.assertGreater(len(docs), 0, f"No documents found in: {self.test_path}") + + if not skip_assert: + for source, source_docs in by_source.items(): + hashes2: Set[Optional[str]] = {d.metadata.get("content_hash") for d in source_docs} + self.assertEqual( + len(hashes2), 1, f"Hash mismatch for '{source}': {hashes2}" + ) + self.assertEqual( + len(source_docs), + 1, + f"Expected 1 Document per source, got {len(source_docs)} for '{source}'", + ) + else: + print("\n[SKIP_HASH_ASSERT=1] Hash assertion skipped.") + + +@unittest.skipUnless(_RUN_INTEGRATION, "Set RUN_INTEGRATION_TESTS=1 to run") class HtmlLoaderLearn2RagFullCrawlTestCase(unittest.TestCase): """Integration test: full site crawl of https://learn2rag.de with depth=-1.""" def test_full_site_crawl(self) -> None: """Crawls the entire learn2rag.de domain and prints all discovered pages.""" root_url = "https://learn2rag.de" - skipped: set[str] = set() + skipped: Set[str] = set() docs = load_html_content(root_url, depth=-1, loader_id="learn2rag_full", skipped=skipped) - visited_urls = {doc.metadata["source"] for doc in docs} + by_url: DefaultDict[str, List[Document]] = defaultdict(list) + for doc in docs: + by_url[doc.metadata.get("source", "?")].append(doc) + + visited_urls = set(by_url.keys()) for i, doc in enumerate(docs, start=1): print(f"\n--- Document {i}: {doc.metadata.get('source')} ---") print(f"page_content (first 300 characters):\n{doc.page_content[:300]}") @@ -55,9 +128,10 @@ def test_full_site_crawl(self) -> None: print(f" Integrated (unique pages loaded): {len(visited_urls)}") print(f" Skipped (off-site links): {len(skipped)}") print(f" Total documents (incl. duplicates): {len(docs)}") - print(f"\n Integrated URLs:") + print(f"\n Documents per URL:") for url in sorted(visited_urls): - print(f" [OK] {url}") + count = len(by_url[url]) + print(f" [{count} doc(s)] {url}") print(f"\n Skipped URLs (sample, max 20):") for url in sorted(skipped)[:20]: print(f" [--] {url}") @@ -181,3 +255,147 @@ def test_metadata_set_correctly(self, mock_get: MagicMock) -> None: self.assertEqual(doc.metadata.get("loader_id"), "test_meta") self.assertIn("content_hash", doc.metadata) self.assertEqual(doc.metadata.get("loader_type"), "HTMLLoader") + + +# --------------------------------------------------------------------------- +# Helper +# --------------------------------------------------------------------------- + +def _doc(source: str, content_hash: str) -> Document: + """Create a minimal Document with source and content_hash metadata.""" + return Document( + page_content="content", + metadata={"source": source, "content_hash": content_hash}, + ) + + +_USER_CONFIG: dict[str, str] = {"collection_name": "test_collection"} +_OPT_CONFIG: dict[str, Any] = {} + +# --------------------------------------------------------------------------- +# _delta_by_source unit tests (no Qdrant required) +# --------------------------------------------------------------------------- + +class DeltaBySourceTestCase(unittest.TestCase): + """Unit tests for _delta_by_source. + + All Qdrant calls (delete_documents, update_documents) are mocked so no + running Qdrant instance is required. + """ + + def setUp(self) -> None: + from ..loaders.process_loaders import _delta_by_source # local import to avoid top-level side effects + self._delta_by_source = _delta_by_source + self._patch_delete = patch("learn2rag.importer.loaders.process_loaders.delete_documents") + self._patch_update = patch("learn2rag.importer.loaders.process_loaders.update_documents") + self.mock_delete = self._patch_delete.start() + self.mock_update = self._patch_update.start() + + def tearDown(self) -> None: + self._patch_delete.stop() + self._patch_update.stop() + + # -- no changes -------------------------------------------------------- + + def test_no_changes_no_qdrant_calls(self) -> None: + """When nothing changed, neither delete nor update should be called.""" + existing_map = {"a.txt": "hash_a", "b.txt": "hash_b"} + all_docs = [_doc("a.txt", "hash_a"), _doc("b.txt", "hash_b")] + + self._delta_by_source(all_docs, existing_map, "loader1", _USER_CONFIG, _OPT_CONFIG) + + self.mock_delete.assert_not_called() + self.mock_update.assert_not_called() + + # -- new document ------------------------------------------------------ + + def test_new_document_is_updated(self) -> None: + """A source that does not exist in Qdrant yet must be passed to update_documents.""" + existing_map = {"a.txt": "hash_a"} + all_docs = [_doc("a.txt", "hash_a"), _doc("new.txt", "hash_new")] + + self._delta_by_source(all_docs, existing_map, "loader1", _USER_CONFIG, _OPT_CONFIG) + + self.mock_delete.assert_not_called() + updated_sources = {d.metadata["source"] for d in self.mock_update.call_args[0][1]} + self.assertIn("new.txt", updated_sources) + self.assertNotIn("a.txt", updated_sources) + + # -- changed document -------------------------------------------------- + + def test_changed_document_is_updated(self) -> None: + """A source whose hash differs from the stored one must be re-indexed.""" + existing_map = {"a.txt": "hash_a_old"} + all_docs = [_doc("a.txt", "hash_a_new")] + + self._delta_by_source(all_docs, existing_map, "loader1", _USER_CONFIG, _OPT_CONFIG) + + self.mock_delete.assert_not_called() + updated_sources = {d.metadata["source"] for d in self.mock_update.call_args[0][1]} + self.assertIn("a.txt", updated_sources) + + # -- deleted document -------------------------------------------------- + + def test_deleted_document_is_removed(self) -> None: + """A source present in Qdrant but absent from the fresh load must be deleted.""" + existing_map = {"a.txt": "hash_a", "gone.txt": "hash_gone"} + all_docs = [_doc("a.txt", "hash_a")] + + self._delta_by_source(all_docs, existing_map, "loader1", _USER_CONFIG, _OPT_CONFIG) + + deleted = self.mock_delete.call_args[0][1] + self.assertIn("gone.txt", deleted) + self.mock_update.assert_not_called() + + # -- mixed scenario ---------------------------------------------------- + + def test_mixed_new_changed_deleted_unchanged(self) -> None: + """Combined scenario: new, changed, deleted and unchanged in one call.""" + existing_map = { + "unchanged.txt": "hash_u", + "changed.txt": "hash_c_old", + "deleted.txt": "hash_d", + } + all_docs = [ + _doc("unchanged.txt", "hash_u"), # unchanged — must not be touched + _doc("changed.txt", "hash_c_new"), # changed — must be re-indexed + _doc("new.txt", "hash_n"), # new — must be indexed + # deleted.txt is absent # deleted — must be removed + ] + + self._delta_by_source(all_docs, existing_map, "loader1", _USER_CONFIG, _OPT_CONFIG) + + deleted = self.mock_delete.call_args[0][1] + self.assertIn("deleted.txt", deleted) + self.assertNotIn("unchanged.txt", deleted) + + updated_sources = {d.metadata["source"] for d in self.mock_update.call_args[0][1]} + self.assertIn("changed.txt", updated_sources) + self.assertIn("new.txt", updated_sources) + self.assertNotIn("unchanged.txt", updated_sources) + + # -- empty existing map (initial run) ---------------------------------- + + def test_initial_run_all_documents_indexed(self) -> None: + """On the first run (empty Qdrant) every document must be passed to update_documents.""" + existing_map: dict[str, str] = {} + all_docs = [_doc("a.txt", "hash_a"), _doc("b.txt", "hash_b")] + + self._delta_by_source(all_docs, existing_map, "loader1", _USER_CONFIG, _OPT_CONFIG) + + self.mock_delete.assert_not_called() + updated_sources = {d.metadata["source"] for d in self.mock_update.call_args[0][1]} + self.assertEqual(updated_sources, {"a.txt", "b.txt"}) + + # -- empty fresh load (all documents removed) -------------------------- + + def test_all_documents_removed(self) -> None: + """If the loader returns nothing, all existing sources must be deleted.""" + existing_map = {"a.txt": "hash_a", "b.txt": "hash_b"} + all_docs: list[Document] = [] + + self._delta_by_source(all_docs, existing_map, "loader1", _USER_CONFIG, _OPT_CONFIG) + + deleted = self.mock_delete.call_args[0][1] + self.assertCountEqual(deleted, ["a.txt", "b.txt"]) + self.mock_update.assert_not_called() diff --git a/learn2rag/importer/utils/import_state.py b/learn2rag/importer/utils/import_state.py new file mode 100644 index 0000000..3356f4b --- /dev/null +++ b/learn2rag/importer/utils/import_state.py @@ -0,0 +1,128 @@ +""" +import_state.py + +Description: + Manages the persistent per-loader import state (last_import_timestamp). + The state file (import_state.json) is stored next to the importer_config.json. + + State file format: + { + "loader_id": { + "last_import_timestamp": "2026-04-27T10:00:00+00:00" + } + } + +Author: Kyrill Meyer +Institution: IFDT +Version: 0.0.1 +Creation Date: April 27, 2026 +Last Modified: April 27, 2026 +""" + +import json +import logging +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, Optional + +logger = logging.getLogger("Learn2RAGImporter") + + +class ImportState: + """ + Manages import state (timestamps) per loader. + + Tracks the start timestamp of the last successful import run per loader_id. + Changes are held in memory until ``save_success()`` is called, so a failed + import does not advance the stored timestamp. + """ + + def __init__(self, state_file_path: str) -> None: + """ + Initialise ImportState and load existing state from disk if present. + + Args: + state_file_path (str): Absolute or relative path to the JSON state file + (e.g. ``/data/import_state.json``). + """ + self._path = Path(state_file_path) + self._state: Dict[str, Any] = {} + self._pending: Dict[str, datetime] = {} # in-memory only, not yet persisted + self._load() + + def _load(self) -> None: + if self._path.exists(): + try: + with self._path.open("r", encoding="utf-8") as f: + self._state = json.load(f) + logger.info("Import state loaded from %s", self._path) + except (json.JSONDecodeError, OSError) as e: + logger.warning("Could not load import state from %s: %s — starting fresh.", self._path, e) + self._state = {} + else: + self._state = {} + + def _save(self) -> None: + self._path.parent.mkdir(parents=True, exist_ok=True) + with self._path.open("w", encoding="utf-8") as f: + json.dump(self._state, f, ensure_ascii=False, indent=2) + logger.debug("Import state saved to %s", self._path) + + def get_last_import_time(self, loader_id: str) -> Optional[datetime]: + """ + Return the start timestamp of the last successful import for a loader. + + Args: + loader_id (str): Unique loader identifier as defined in the importer config. + + Returns: + Optional[datetime]: UTC-aware datetime of the last successful import start, + or ``None`` if no state exists for this loader. + """ + entry = self._state.get(loader_id) + if not entry: + return None + ts_str = entry.get("last_import_timestamp") + if not ts_str: + return None + try: + dt = datetime.fromisoformat(ts_str) + # If no timezone info is present, treat as UTC + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt + except ValueError as e: + logger.warning("Invalid timestamp for loader_id=%s: %s", loader_id, e) + return None + + def record_import_start(self, loader_id: str, timestamp: datetime) -> None: + """ + Record the start time of an ongoing import in memory (not yet persisted). + + Must be called before ``save_success()`` for the same loader_id. + + Args: + loader_id (str): Unique loader identifier. + timestamp (datetime): UTC-aware datetime representing the import start time. + """ + self._pending[loader_id] = timestamp + + def save_success(self, loader_id: str) -> None: + """ + Persist the previously recorded import start timestamp to disk. + + Must be preceded by a call to ``record_import_start()`` for the same + loader_id; raises ``AssertionError`` otherwise. + + Args: + loader_id (str): Unique loader identifier. + """ + assert loader_id in self._pending, ( + f"save_success() called for loader_id='{loader_id}' without prior record_import_start()" + ) + ts = self._pending.pop(loader_id) + self._state[loader_id] = { + "last_import_timestamp": ts.isoformat() + } + self._save() + logger.info("Import state saved for loader_id=%s (start_time=%s)", loader_id, ts.isoformat()) diff --git a/learn2rag/pipeline/app.py b/learn2rag/pipeline/app.py index 52f9e1b..44ac781 100644 --- a/learn2rag/pipeline/app.py +++ b/learn2rag/pipeline/app.py @@ -202,14 +202,6 @@ async def search( return await search_authorized(user=input.user, question=input.question) - - -@app.post("/ingest") -async def ingest() -> None: - ingestion.index(user_config, opt_config) - - - @app.get("/test") async def test() -> TestResponse: - return TestResponse(message="Hello World") + return TestResponse(message="Hello World") \ No newline at end of file diff --git a/learn2rag/pipeline/generate.py b/learn2rag/pipeline/generate.py index 64e3efc..cb913a6 100644 --- a/learn2rag/pipeline/generate.py +++ b/learn2rag/pipeline/generate.py @@ -18,7 +18,7 @@ def generate(query: str, search_results: list[ScoredPoint], opt_config: dict[str assert llm is not None if hasattr(search_results, "points"): search_results = search_results.points - context = "\n\n".join([context_template.format(source=result.payload['path'], content=result.payload['content']) for result in search_results]) # type: ignore[index] + context = "\n\n".join([context_template.format(source=result.payload['source'], content=result.payload['content']) for result in search_results]) # type: ignore[index] system_message = SystemMessagePromptTemplate.from_template(opt_config["prompt"]) user_message = HumanMessagePromptTemplate.from_template("{question}") prompt = ChatPromptTemplate.from_messages([system_message, user_message]) @@ -33,7 +33,7 @@ def generate_stream(query: str, search_results: list[ScoredPoint], opt_config: d if hasattr(search_results, "points"): search_results = search_results.points - context = "\n\n".join([context_template.format(source=result.payload['path'], content=result.payload['content']) for result in search_results]) # type: ignore[index] + context = "\n\n".join([context_template.format(source=result.payload['source'], content=result.payload['content']) for result in search_results]) # type: ignore[index] system_message = SystemMessagePromptTemplate.from_template(opt_config["prompt"]) user_message = HumanMessagePromptTemplate.from_template("{question}") prompt = ChatPromptTemplate.from_messages([system_message, user_message]) diff --git a/learn2rag/pipeline/ingestion.py b/learn2rag/pipeline/ingestion.py index b4c1150..d4f5cfa 100644 --- a/learn2rag/pipeline/ingestion.py +++ b/learn2rag/pipeline/ingestion.py @@ -12,8 +12,6 @@ from .qdrant import Qdrant from qdrant_client.models import PointStruct, Filter, FieldCondition, MatchValue, SparseVector, VectorParams, MultiVectorConfig, MultiVectorComparator, Distance - -from . import json_loader from .embeddings import create_embeddings @@ -33,7 +31,7 @@ def point_exists(qdrant: Qdrant, collection_name: str, loader_id: str, path: str filter = Filter( must=[ FieldCondition(key="loader_id", match=MatchValue(value=loader_id)), - FieldCondition(key="path", match=MatchValue(value=path)), + FieldCondition(key="source", match=MatchValue(value=path)), FieldCondition(key="content_hash", match=MatchValue(value=content_hash)), FieldCondition(key="chunk_hash", match=MatchValue(value=chunk_hash)), ] @@ -97,7 +95,7 @@ def insert_multi(qdrant: Qdrant, collection_name: str, sample: dict[str, Any]) - def payload(sample: dict[str, Any]) -> dict[str, str]: return { "content": sample["page_content"], - "path": sample["metadata"]["source"], + "source": sample["metadata"]["source"], "content_hash": sample["metadata"]["content_hash"], "chunk_hash": sample["chunk_hash"], "title": sample["metadata"].get("title",""), @@ -106,29 +104,42 @@ def payload(sample: dict[str, Any]) -> dict[str, str]: "document_id": sample["metadata"].get("document_id", "") } -def index(user_config: dict[str, Any], opt_config: dict[str, Any]) -> None: - logging.info('Loading documents') - all_documents = json_loader.json_loader(user_config['imported_documents_file_path']) - # Split documents into chunks +def ingest_batch(docs: list[Document], qdrant: Qdrant, user_config: dict[str, Any], opt_config: dict[str, Any]) -> None: + """ + Chunk, embed, and bulk-insert a list of documents into Qdrant. + + Mirrors the behaviour of the original ``index()`` function but accepts an + already-constructed ``Qdrant`` instance instead of creating one internally. + Intended for use by ``process_delta_imports`` and other callers that manage + their own Qdrant connection. + + Points that already exist (identical ``loader_id``, ``path``, ``content_hash``, + and ``chunk_hash``) are skipped via ``point_exists()``. + + Args: + docs (list[Document]): Documents to ingest. May be a full initial load or + a filtered subset of changed documents. + qdrant (Qdrant): Authenticated Qdrant wrapper instance. + user_config (dict[str, Any]): User configuration dict (must contain + ``collection_name``). + opt_config (dict[str, Any]): Optimisation configuration dict (must contain + ``chunk_size``, ``chunk_overlap``, + ``embedding_model``, and ``search_mode``). + """ + collection_name = user_config["collection_name"] + all_documents = docs + logging.info('Splitting documents into chunks') text_splitter = RecursiveCharacterTextSplitter( chunk_size=opt_config["chunk_size"], chunk_overlap=opt_config["chunk_overlap"] ) chunks = text_splitter.split_documents(all_documents) - collection_name = user_config["collection_name"] - - # Init vector store - qdrant = Qdrant( - collection_name=collection_name, - opt_config=opt_config - ) - chunks_content = [chunk.page_content for chunk in chunks] if len(opt_config["multi_search"]) > 0 and opt_config["query_mode"] == "multi": - chunks_metadata = {} - embeddings_metadata = {} + chunks_metadata: dict[str, list[str]] = {} + embeddings_metadata: dict[str, Any] = {} for item in opt_config["multi_search"]: chunks_metadata[item] = list(get_chunks_metadata(chunks, item)) embeddings_metadata[item] = create_embeddings(chunks_metadata[item], opt_config["embedding_model"], opt_config["search_mode"]) @@ -138,8 +149,7 @@ def index(user_config: dict[str, Any], opt_config: dict[str, Any]) -> None: else: raise TypeError(f"dense_vecs must be np.ndarray, got {type(dense_vecs)}") - chunk_hash = [hashlib.md5(chunk.page_content.encode()).hexdigest() for chunk in chunks] - # Todo: handle different vector lengths for batch encoding when using sparse vectors + chunk_hash = [hashlib.md5(chunk.page_content.encode()).hexdigest() for chunk in chunks] logging.info('Creating embeddings...') embeddings = create_embeddings(chunks_content, opt_config["embedding_model"], opt_config["search_mode"]) @@ -199,12 +209,25 @@ def index(user_config: dict[str, Any], opt_config: dict[str, Any]) -> None: insert(qdrant, collection_name, sample) -def main() -> None: - logging.basicConfig(level=logging.INFO) - from .config import user_config, opt_config - index(user_config, opt_config) +def index(documents: list[Document], user_config: dict[str, Any], opt_config: dict[str, Any]) -> None: + """ + Ingest a list of documents — entry point for standalone pipeline operation. + Creates a ``Qdrant`` instance internally and delegates to ``ingest_batch()``. + This function also serves as the replacement for the originally planned + ``ingest_document()`` helper: a single-document delta upsert is expressed as + ``index([doc], user_config, opt_config)`` without requiring a separate function. -if __name__ == "__main__": - logging.basicConfig(level=logging.INFO) - main() + Called by ``pipeline/main.py`` and the ``/ingest`` HTTP endpoint. For the + delta-import path (which manages its own Qdrant connection), use + ``ingest_batch()`` directly. + + Args: + documents (list[Document]): One or more documents to ingest. + user_config (dict[str, Any]): User configuration dict (must contain + ``collection_name``). + opt_config (dict[str, Any]): Optimisation configuration dict. + """ + collection_name = user_config["collection_name"] + qdrant = Qdrant(collection_name=collection_name, opt_config=opt_config) + ingest_batch(documents, qdrant, user_config, opt_config) \ No newline at end of file diff --git a/learn2rag/pipeline/json_loader.py b/learn2rag/pipeline/json_loader.py deleted file mode 100644 index 739bc60..0000000 --- a/learn2rag/pipeline/json_loader.py +++ /dev/null @@ -1,12 +0,0 @@ -from langchain_community.document_loaders import JSONLoader -from langchain_core.documents import Document - - -def json_loader(file_path: str) -> list[Document]: - loader = JSONLoader( - file_path, - jq_schema=".[]", - content_key="content", - metadata_func=lambda record, meta: record.get("metadata", {}), - ) - return loader.load() \ No newline at end of file diff --git a/learn2rag/pipeline/main.py b/learn2rag/pipeline/main.py index 6355696..ca7a199 100755 --- a/learn2rag/pipeline/main.py +++ b/learn2rag/pipeline/main.py @@ -2,9 +2,12 @@ import yaml import asyncio +from langchain_core.documents.base import Document + from . import ingestion from . import search from . import generate +from .store import delete_collection, delete_documents, get_documents, update_documents if __name__ == "__main__": @@ -15,7 +18,47 @@ from .config import user_config, opt_config - ingestion.index(user_config, opt_config) + #delete_collection(loader_id="json_test_file", user_config=user_config, opt_config=opt_config) + results = get_documents(loader_id="json_test_file", user_config=user_config, opt_config=opt_config) + + documents = [ + Document(page_content=d["content"], metadata=d["metadata"]) + for d in [ + { + "metadata": { + "source": "C:C:\\Users\\foo\\Revised Manuscript_Text categorization approach.docx", + "content_hash": "e18e509d138cf86c22df0b0dfafc5ca5b8f1e266f5e3470de68190f3ebe495b0", + "source_path": "C:\\Users\\foo", + "file_extension": "docx", + "process_date": "2025-07-28", + "process_time": "14:42:02", + "loader_type": "DirectoryLoader", + "loader_id": "json_test_file", + "title": "The title of a real document", + "summary": "This document is awesome" + }, + "content": "A brand-new Corpus-based Real-time Text Classification and Tagging Approach for Social Data..." + }, + { + "metadata": { + "source": "C:C:\\Users\\foo\\qdrant.docx", + "content_hash": "7f3b9c1a0d4e6f8b2c5a7d9e1f0b3c6d8a4e2f1c9b7d0a6e5f1c3a8b9d2e4f0", + "source_path": "C:\\Users\\foo", + "file_extension": "docx", + "process_date": "2025-07-28", + "process_time": "14:42:02", + "loader_type": "DirectoryLoader", + "loader_id": "json_test_file", + "title": "The title of a real document", + "summary": "This document is awesome" + }, + "content": "Qdrant ist eine Open-Source-Vektordatenbank..." + }, + ] +] + update_documents(loader_id="json_test_file", documents=documents, user_config=user_config, opt_config=opt_config) + ingestion.index(documents, user_config, opt_config) + if opt_config["query_mode"] == "multi": # in query_mode 'multi' different querys for each vector in the multi-vector are allowed multi_query = {"content": "What is USM AI?", "title": "What is USM AI?", "summary": "What is USM AI?", "source_path":"USU/ITSM/"} diff --git a/learn2rag/pipeline/store.py b/learn2rag/pipeline/store.py new file mode 100644 index 0000000..2ec3be7 --- /dev/null +++ b/learn2rag/pipeline/store.py @@ -0,0 +1,109 @@ +import logging +from typing import Any + +from qdrant_client.models import Filter, FieldCondition, MatchValue, FilterSelector +from langchain_core.documents.base import Document + +from learn2rag.pipeline.ingestion import index +from learn2rag.pipeline.qdrant import Qdrant + +def delete_collection(loader_id: str|None, user_config: dict[str, Any], opt_config: dict[str, Any]) -> None: + """Delete a collection from the vector store or a subset of points based on loader_id.""" + qdrant = Qdrant(user_config["collection_name"], opt_config) + if qdrant.client.collection_exists(user_config["collection_name"]): + if loader_id is None: + logging.info('Deleting entire collection: %s', user_config["collection_name"]) + qdrant.client.delete_collection(collection_name=user_config["collection_name"]) + return + else: + # Delete points with the specified loader_id + logging.info('Deleting points with loader_id: %s from collection: %s', loader_id, user_config["collection_name"]) + qdrant.client.delete( + collection_name=user_config["collection_name"], + points_selector=FilterSelector( + filter=Filter( + must=[FieldCondition( + key="loader_id", + match=MatchValue(value=loader_id), + ), + ], + ) + ), + ) + +def delete_documents(loader_id: str, docs: list[str], user_config: dict[str, Any], opt_config: dict[str, Any]) -> None: + """Delete documents from the vector store based on loader_id and their source. A source is the path to and the identification of one document.""" + qdrant = Qdrant(user_config["collection_name"], opt_config) + if qdrant.client.collection_exists(user_config["collection_name"]): + logging.info('Deleting documents with loader_id: %s and paths: %s', loader_id, docs) + # Delete points with the specified loader_id and paths + for path in docs: + qdrant.client.delete( + collection_name=user_config["collection_name"], + points_selector=FilterSelector( + filter=Filter( + must=[ + FieldCondition( + key="loader_id", + match=MatchValue(value=loader_id), + ), + FieldCondition( + key="source", + match=MatchValue(value=path) + ), + ], + ) + ), + ) + +def get_documents(loader_id: str, user_config: dict[str, Any], opt_config: dict[str, Any]) -> dict[str, str]: + """Retrieve documents from the vector store and return a {source: content_hash} mapping.""" + qdrant = Qdrant(user_config["collection_name"], opt_config) + path_hash_dict: dict[str, str] = {} + if qdrant.client.collection_exists(user_config["collection_name"]): + logging.info('Scrolling through collection to retrieve documents with loader_id: %s', loader_id) + filter = Filter( + must=[ + FieldCondition( + key="loader_id", + match=MatchValue(value=loader_id) + ) + ] + ) + points = [] + offset = None + + while True: + result = qdrant.client.scroll( + collection_name=user_config["collection_name"], + scroll_filter=filter, + limit=100, + offset=offset, + with_payload=["source", "content_hash"], + with_vectors=False, + ) + + points.extend(result[0]) + offset = result[1] + + if offset is None: + break + + for point in points: + if point.payload: + source = point.payload.get("source", "") + content_hash = point.payload.get("content_hash", "") + if source and source not in path_hash_dict: + path_hash_dict[source] = content_hash + + return path_hash_dict + + + +def update_documents(loader_id: str, documents: list[Document], user_config: dict[str, Any], opt_config: dict[str, Any]) -> None: + """Update documents in the vector store. This is done by deleting all chunks of the existing document based on source and loader_id, and then re-indexing the new document.""" + qdrant = Qdrant(user_config["collection_name"], opt_config) + if qdrant.client.collection_exists(user_config["collection_name"]): + logging.info('Updating documents with loader_id: %s', loader_id) + delete_documents(loader_id, docs=[doc.metadata["source"] for doc in documents], user_config=user_config, opt_config=opt_config) + index(documents, user_config, opt_config) \ No newline at end of file diff --git a/learn2rag/ui/__init__.py b/learn2rag/ui/__init__.py index bc7bd27..af15f08 100644 --- a/learn2rag/ui/__init__.py +++ b/learn2rag/ui/__init__.py @@ -383,7 +383,14 @@ def source_action(name: str) -> 'str | werkzeug.wrappers.response.Response': @app.get('/pipelines') def pipelines_list() -> 'str | werkzeug.wrappers.response.Response': + pipelines = learn2rag.data.get_all(app.instance_path, 'pipelines') + for pipeline in pipelines.values(): + try: + pipeline['status_message'] = (Path(pipeline['storage_path']) / 'logs' / 'status.log').read_text().splitlines()[-1] + except (FileNotFoundError, IndexError): + pipeline['status_message'] = '' context = { + 'pipelines': pipelines, 'projects': Project.get_all(), } template = 'pipelines_list.html' if request.headers.get('HX-Request') else 'pipelines_page.html' @@ -393,15 +400,16 @@ def pipelines_list() -> 'str | werkzeug.wrappers.response.Response': def pipeline_create() -> 'str | werkzeug.wrappers.response.Response': label = request.form['label'] data: dict[str, Any] = request.form.to_dict() - data.pop('import', None) + data.pop('now', None) data['ports'] = [int(port) for port in request.form.getlist("ports") if port] data['sources'] = request.form.getlist('sources') + data['import_schedule_interval_hours'] = float(data['import_schedule_interval_hours']) name = learn2rag.data.create_entry(app.instance_path, 'pipelines', data) flash(pgettext('flash', 'Added a new pipeline configuration: %(label)s', label=label)) - if request.form.get('import'): + if request.form.get('now'): pipeline = learn2rag.data.get_entry(app.instance_path, 'pipelines', name) assert pipeline is not None - start_pipeline(name, pipeline, 'import') + start_pipeline(name, pipeline, 'continuous') return redirect(url_for('pipelines_list')) def start_pipeline(name: str, pipeline: dict[str, Any], template_name: str) -> None: diff --git a/learn2rag/ui/templates/compose/pipelines/continuous.yml b/learn2rag/ui/templates/compose/pipelines/continuous.yml new file mode 100644 index 0000000..de540ad --- /dev/null +++ b/learn2rag/ui/templates/compose/pipelines/continuous.yml @@ -0,0 +1,208 @@ +name: continuous +label: Start +ports: + # TODO: labels in the interface currently assume a specific port order + - ui + - qdrant_http + - pipeline + - open_webui_pipelines +ui_url: '{{learn2rag_scheme}}://{{learn2rag_hostname}}:{{ports.ui}}/' +files: + - path: '{{storage_path}}/qdrant_config.yml' + content: | + log_level: INFO + service: + api_key: '{{qdrant_api_key}}' + grpc_port: null + http_port: {{ports.qdrant_http}} + host: '127.0.0.1' + telemetry_disabled: true + + - path: '{{storage_path}}/importer_config.json' + content: '{{import_config | tojson}}' + + - path: '{{storage_path}}/basic_user_config.json' + content: | + { + "collection_name": "learn2rag", + "imported_documents_file_path": "loaded_documents.json", + "llm": "{{language_model.model}}" + } + + - path: '{{storage_path}}/logging_config.yml' + content: | + version: 1 + formatters: + status: + format: "%(message)s %(asctime)s" + datefmt: "%Y-%m-%d %H:%M" + simple: + format: "%(asctime)s %(levelname)-8s %(name)s %(message)s" + colored: + class: colorlog.ColoredFormatter + format: "%(log_color)s%(asctime)s %(levelname)-8s %(name)s %(message)s" + profiling: + format: "%(created)f %(request_id)s %(activity)s %(message)s" + handlers: + display: + class: logging.StreamHandler + level: INFO + formatter: colored + stream: ext://sys.stderr + profiling_display: + class: logging.StreamHandler + formatter: profiling + stream: ext://sys.stderr + profiling_file: + class: logging.FileHandler + formatter: profiling + filename: '{{storage_path}}/logs/profiling.log' + encoding: utf-8 + errors_file: + class: logging.FileHandler + level: ERROR + formatter: simple + filename: '{{storage_path}}/logs/error.log' + encoding: utf-8 + status_file: + class: logging.FileHandler + level: INFO + formatter: status + filename: '{{storage_path}}/logs/status.log' + mode: 'w' + encoding: utf-8 + {% if debug_logging %} + debug_file: + class: logging.FileHandler + level: DEBUG + formatter: simple + filename: '{{storage_path}}/logs/debug.log' + encoding: utf-8 + {% endif %} + loggers: + profiling: + handlers: + - profiling_display + - profiling_file + propagate: no + status: + handlers: + - status_file + propagate: no + root: + level: DEBUG + handlers: + - display + - errors_file + {% if debug_logging %} + - debug_file + {% endif %} + + - path: '{{storage_path}}/logs/.keep' + content: '' + +services: + open-webui: + working_dir: '{{storage_path}}' + command: + - '{{learn2rag_path}}/services/start-open-webui{% if is_windows %}.exe{% endif %}' + - 'serve' + - '--host' + - '0.0.0.0' + - '--port' + - '{{ports.ui}}' + environment: + LEARN2RAG_PATH: '{{learn2rag_path}}' + DATA_DIR: '{{storage_path}}/open-webui-data' + DEFAULT_LOCALE: 'de' + ENABLE_OPENAI_API: 'True' + OPENAI_API_BASE_URL: http://127.0.0.1:{{ports.pipeline}} + OPENAI_API_KEY: '0p3n-w3bu!' + ENABLE_OLLAMA_API: 'False' + ENABLE_PERSISTENT_CONFIG: 'False' + ENABLE_TITLE_GENERATION: 'False' + GLOBAL_LOG_LEVEL: 'WARNING' + OFFLINE_MODE: 'True' + USER_PERMISSIONS_CHAT_FILE_UPLOAD: 'False' + USER_PERMISSIONS_CHAT_TEMPORARY_ENFORCED: 'True' + USER_PERMISSIONS_FEATURES_CODE_INTERPRETER: 'False' + USER_PERMISSIONS_FEATURES_IMAGE_GENERATION: 'False' + USER_PERMISSIONS_FEATURES_WEB_SEARCH: 'False' + #!!! {% if config.get("SIMPLE_AUTH") %} + ENABLE_SIGNUP: 'False' + WEBUI_AUTH: 'True' + WEBUI_ADMIN_EMAIL: '{{config["SIMPLE_AUTH"].get("username", "")}}' + WEBUI_ADMIN_NAME: '{{config["SIMPLE_AUTH"].get("username", "")}}' + WEBUI_ADMIN_PASSWORD: '{{config["SIMPLE_AUTH"].get("password", "")}}' + #!!! {% else %} + # https://docs.openwebui.com/getting-started/env-configuration/#webui_auth + # "turning off authentication is only possible for fresh installations without any existing users" + ENABLE_SIGNUP: 'True' + WEBUI_AUTH: 'False' + #!!! {% endif %} + WEBUI_NAME: 'Learn2RAG' + #!!! {% if config.get("TLS") %} + UVICORN_SSL_CERTFILE: '{{ config["TLS"]["CERTFILE"] }}' + UVICORN_SSL_KEYFILE: '{{ config["TLS"]["KEYFILE"] }}' + #!!! {% endif %} + healthcheck: + # TODO: We only support ['CMD', 'curl', '-f', ...] + test: ['CMD', 'curl', '-f', '{{learn2rag_scheme}}://localhost:{{ports.ui}}/health'] + qdrant: + working_dir: '{{storage_path}}' + command: + - '{{learn2rag_path}}/services/qdrant/qdrant{% if is_windows %}.exe{% endif %}' + - '--config-path' + - '{{storage_path}}/qdrant_config.yml' + # https://qdrant.tech/documentation/guides/configuration/ + environment: + QDRANT__LOG_LEVEL: 'ERROR' + QDRANT__SERVICE__HOST: '127.0.0.1' + QDRANT__SERVICE__HTTP_PORT: '{{ports.qdrant_http}}' + QDRANT__TELEMETRY_DISABLED: 'true' + import: + working_dir: '{{storage_path}}' + command: + - '{{learn2rag_path}}/configurator{% if is_windows %}.exe{% endif %}' + - '--logging-config' + - '{{storage_path}}/logging_config.yml' + - '--schedule-interval' + - 'PT{{ ((pipeline.import_schedule_interval_hours or 12) * 60) | round | int }}M' + - 'learn2rag.importer' + - '--config' + - '{{storage_path}}/importer_config.json' + environment: + LEARN2RAG_PATH: '{{learn2rag_path}}' + STORAGE_PATH: '{{storage_path}}' + QDRANT__SERVICE__HTTP_PORT: '{{ports.qdrant_http}}' + QDRANT__SERVICE__API_KEY: '{{qdrant_api_key}}' + PIPELINE_USER_CONFIG: '{{storage_path}}/basic_user_config.json' + IMPORTER_CONFIG: '{{storage_path}}/importer_config.json' + PIPELINE_OPT_CONFIG: '{{learn2rag_path}}/learn2rag/pipeline/opt_config.json' + LANGCHAIN_API_KEY: '1' + LANGCHAIN_TRACING_V2: 'false' + LLM_API_TYPE: '{{language_model.api}}' + LLM_API_URL: '{{ language_model.url }}' + LLM_API_TOKEN: '{{language_model.token}}' + LLM_API_MODEL: '{{language_model.model}}' + rag: + working_dir: '{{storage_path}}' + command: + - '{{learn2rag_path}}/configurator{% if is_windows %}.exe{% endif %}' + - 'learn2rag.pipeline' + - '--logging-config' + - '{{storage_path}}/logging_config.yml' + environment: + LEARN2RAG_PATH: '{{learn2rag_path}}' + LEARN2RAG_PIPELINE_PORT: '{{ports.pipeline}}' + QDRANT__SERVICE__HTTP_PORT: '{{ports.qdrant_http}}' + QDRANT__SERVICE__API_KEY: '{{qdrant_api_key}}' + PIPELINE_USER_CONFIG: '{{storage_path}}/basic_user_config.json' + IMPORTER_CONFIG: '{{storage_path}}/importer_config.json' + PIPELINE_OPT_CONFIG: '{{learn2rag_path}}/learn2rag/pipeline/opt_config.json' + LANGCHAIN_API_KEY: '1' + LANGCHAIN_TRACING_V2: 'false' + LLM_API_TYPE: '{{language_model.api}}' + LLM_API_URL: '{{language_model.url}}' + LLM_API_TOKEN: '{{language_model.token}}' + LLM_API_MODEL: '{{language_model.model}}' diff --git a/learn2rag/ui/templates/compose/pipelines/import.yml b/learn2rag/ui/templates/compose/pipelines/import.yml index 839f08c..534c157 100644 --- a/learn2rag/ui/templates/compose/pipelines/import.yml +++ b/learn2rag/ui/templates/compose/pipelines/import.yml @@ -1,5 +1,5 @@ name: import -label: Import +label: Start one-time import ports: # TODO: UI port is not used; labels in the interface currently assume a specific port order - ui @@ -30,23 +30,44 @@ files: content: | version: 1 formatters: + status: + format: "%(message)s %(asctime)s" + datefmt: "%Y-%m-%d %H:%M" simple: format: "%(asctime)s %(levelname)-8s %(name)s %(message)s" colored: class: colorlog.ColoredFormatter format: "%(log_color)s%(asctime)s %(levelname)-8s %(name)s %(message)s" + profiling: + format: "%(created)f %(request_id)s %(activity)s %(message)s" handlers: display: class: logging.StreamHandler level: INFO formatter: colored stream: ext://sys.stderr + profiling_display: + class: logging.StreamHandler + formatter: profiling + stream: ext://sys.stderr + profiling_file: + class: logging.FileHandler + formatter: profiling + filename: '{{storage_path}}/logs/profiling.log' + encoding: utf-8 errors_file: class: logging.FileHandler level: ERROR formatter: simple filename: '{{storage_path}}/logs/error.log' encoding: utf-8 + status_file: + class: logging.FileHandler + level: INFO + formatter: status + filename: '{{storage_path}}/logs/status.log' + mode: 'w' + encoding: utf-8 {% if debug_logging %} debug_file: class: logging.FileHandler @@ -55,6 +76,16 @@ files: filename: '{{storage_path}}/logs/debug.log' encoding: utf-8 {% endif %} + loggers: + profiling: + handlers: + - profiling_display + - profiling_file + propagate: no + status: + handlers: + - status_file + propagate: no root: level: DEBUG handlers: diff --git a/learn2rag/ui/templates/compose/pipelines/pipeline.yml b/learn2rag/ui/templates/compose/pipelines/pipeline.yml index 680d993..7eb6c05 100644 --- a/learn2rag/ui/templates/compose/pipelines/pipeline.yml +++ b/learn2rag/ui/templates/compose/pipelines/pipeline.yml @@ -1,5 +1,5 @@ name: pipeline -label: RAG +label: Start only the chat ports: # TODO: labels in the interface currently assume a specific port order - ui @@ -31,6 +31,9 @@ files: content: | version: 1 formatters: + status: + format: "%(message)s %(asctime)s" + datefmt: "%Y-%m-%d %H:%M" simple: format: "%(asctime)s %(levelname)-8s %(name)s %(message)s" colored: @@ -59,6 +62,13 @@ files: formatter: simple filename: '{{storage_path}}/logs/error.log' encoding: utf-8 + status_file: + class: logging.FileHandler + level: INFO + formatter: status + filename: '{{storage_path}}/logs/status.log' + mode: 'w' + encoding: utf-8 {% if debug_logging %} debug_file: class: logging.FileHandler @@ -73,6 +83,10 @@ files: - profiling_display - profiling_file propagate: no + status: + handlers: + - status_file + propagate: no root: level: DEBUG handlers: diff --git a/learn2rag/ui/templates/firststeps_pipelines.html b/learn2rag/ui/templates/firststeps_pipelines.html index 8167d51..5d0632e 100644 --- a/learn2rag/ui/templates/firststeps_pipelines.html +++ b/learn2rag/ui/templates/firststeps_pipelines.html @@ -12,8 +12,8 @@
|
-
+
+ {{ pipeline.status_message }}
+
|