import hashlib import logging import re from pathlib import Path from typing import Optional from langchain_core.documents import Document from langchain_community.document_loaders import ( PyPDFLoader, TextLoader, UnstructuredMarkdownLoader, WebBaseLoader, ) from langchain_text_splitters import RecursiveCharacterTextSplitter from .config import get_settings logger = logging.getLogger(__name__) settings = get_settings() LOADER_MAP = { ".pdf": PyPDFLoader, ".txt": TextLoader, ".md": UnstructuredMarkdownLoader } #Loaders def load_file(file_path: str) -> list[Document]: """This function auto detects the file type and loads to the langchain documents""" ext = Path(file_path).suffix.lower() loader_cls = LOADER_MAP.get(ext) if loader_cls is None: raise ValueError(f"Unsupported file type: {ext}") loader = loader_cls(file_path) docs = loader.load() logger.info(f"Loaded {len(docs)} pages from {file_path}") return docs def load_url(url: str) -> list[Document]: """Scrape a webpage and return Documents""" loader = WebBaseLoader(url) logger.info(f"Loaded data from {url}") return loader.load() #Cleaning def clean_text(text: str) -> str: text = re.sub(r"[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]", "", text) # control chars text = re.sub(r"[ \t]+", " ", text) # collapse horizontal whitespace text = re.sub(r"\n{3,}", "\n\n", text) # collapse excess blank lines return text.strip() #Splitter def build_splitter() -> RecursiveCharacterTextSplitter: return RecursiveCharacterTextSplitter( chunk_size=settings.chunk_size, chunk_overlap=settings.chunk_overlap, length_function = len, separators=["\n\n", "\n", ". ", "? ", "! ", "; ", ", ", " ", ""] ) #Metadata Enrichment def _stable_hash(text:str) -> str: return hashlib.md5(text.encode()).hexdigest()[:12] def enrich_metadata( chunks: list[Document], source_id: Optional[str] = None, extra_meta: Optional[dict] = None ) -> list[Document]: """ Production enrichment: - stable doc_id from content hash (dedup-safe) - chunk_index for indexing - char_count for downstream token budget checks - prev/next chunk IDs for context stitching """ chunk_ids = [_stable_hash(c.page_content) for c in chunks] enriched = [] for i, (doc,cid) in enumerate(zip(chunks,chunk_ids)): meta = { **doc.metadata, "doc_id": cid, "chunk_index": i, "char_count": len(doc.page_content), "prev_chunk_id": chunk_ids[i-1] if i > 0 else None, "next_chunk_id": chunk_ids[i+1] if i < len(chunks) - 1 else None, "source_id": source_id or "unknown" } if extra_meta: meta.update(extra_meta) enriched.append(Document(page_content=doc.page_content,metadata=meta)) return enriched #Main pipeline def process_texts( texts: list[str], metadatas: Optional[list[dict]] = None, source_id: Optional[str] = None ) -> list[Document]: """ Full ingestion Pipeline: 1. Wrap raw strings in Documents 2. Clean_text 3. Split into Chunks 4. Filter junk chunks 5. Enrich Metadata """ splitter = build_splitter() raw_docs = [ Document(page_content=clean_text(t), metadata = m or {}) for t,m in zip(texts,metadatas or [{}]*len(texts)) ] chunks = splitter.split_documents(raw_docs) #drop tiny or near to empty chunks chunks = [ c for c in chunks if len(c.page_content.strip()) >= settings.min_chunk_size ] chunks = enrich_metadata(chunks,source_id=source_id) logger.info(f"Processed {len(texts)} texts -> {len(chunks)} chunks") return chunks def process_file(file_path: str, display_name: str | None = None) -> list[Document]: """End to end ingestion of file path. display_name overrides the temp path as source_id.""" docs = load_file(file_path) texts = [d.page_content for d in docs] metas = [d.metadata for d in docs] source = display_name if display_name else file_path return process_texts(texts, metas, source_id=source) print("[document_processor] Module ready")