Spaces:
Running
Running
File size: 4,253 Bytes
4796bbf | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 | import hashlib
import logging
import re
from pathlib import Path
from typing import Optional
from langchain_core.documents import Document
from langchain_community.document_loaders import (
PyPDFLoader,
TextLoader,
UnstructuredMarkdownLoader,
WebBaseLoader,
)
from langchain_text_splitters import RecursiveCharacterTextSplitter
from .config import get_settings
logger = logging.getLogger(__name__)
settings = get_settings()
LOADER_MAP = {
".pdf": PyPDFLoader,
".txt": TextLoader,
".md": UnstructuredMarkdownLoader
}
#Loaders
def load_file(file_path: str) -> list[Document]:
"""This function auto detects the file type and loads to the langchain documents"""
ext = Path(file_path).suffix.lower()
loader_cls = LOADER_MAP.get(ext)
if loader_cls is None:
raise ValueError(f"Unsupported file type: {ext}")
loader = loader_cls(file_path)
docs = loader.load()
logger.info(f"Loaded {len(docs)} pages from {file_path}")
return docs
def load_url(url: str) -> list[Document]:
"""Scrape a webpage and return Documents"""
loader = WebBaseLoader(url)
logger.info(f"Loaded data from {url}")
return loader.load()
#Cleaning
def clean_text(text: str) -> str:
text = re.sub(r"[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]", "", text) # control chars
text = re.sub(r"[ \t]+", " ", text) # collapse horizontal whitespace
text = re.sub(r"\n{3,}", "\n\n", text) # collapse excess blank lines
return text.strip()
#Splitter
def build_splitter() -> RecursiveCharacterTextSplitter:
return RecursiveCharacterTextSplitter(
chunk_size=settings.chunk_size,
chunk_overlap=settings.chunk_overlap,
length_function = len,
separators=["\n\n", "\n", ". ", "? ", "! ", "; ", ", ", " ", ""]
)
#Metadata Enrichment
def _stable_hash(text:str) -> str:
return hashlib.md5(text.encode()).hexdigest()[:12]
def enrich_metadata(
chunks: list[Document],
source_id: Optional[str] = None,
extra_meta: Optional[dict] = None
) -> list[Document]:
"""
Production enrichment:
- stable doc_id from content hash (dedup-safe)
- chunk_index for indexing
- char_count for downstream token budget checks
- prev/next chunk IDs for context stitching
"""
chunk_ids = [_stable_hash(c.page_content) for c in chunks]
enriched = []
for i, (doc,cid) in enumerate(zip(chunks,chunk_ids)):
meta = {
**doc.metadata,
"doc_id": cid,
"chunk_index": i,
"char_count": len(doc.page_content),
"prev_chunk_id": chunk_ids[i-1] if i > 0 else None,
"next_chunk_id": chunk_ids[i+1] if i < len(chunks) - 1 else None,
"source_id": source_id or "unknown"
}
if extra_meta:
meta.update(extra_meta)
enriched.append(Document(page_content=doc.page_content,metadata=meta))
return enriched
#Main pipeline
def process_texts(
texts: list[str],
metadatas: Optional[list[dict]] = None,
source_id: Optional[str] = None
) -> list[Document]:
"""
Full ingestion Pipeline:
1. Wrap raw strings in Documents
2. Clean_text
3. Split into Chunks
4. Filter junk chunks
5. Enrich Metadata
"""
splitter = build_splitter()
raw_docs = [
Document(page_content=clean_text(t), metadata = m or {})
for t,m in zip(texts,metadatas or [{}]*len(texts))
]
chunks = splitter.split_documents(raw_docs)
#drop tiny or near to empty chunks
chunks = [
c for c in chunks
if len(c.page_content.strip()) >= settings.min_chunk_size
]
chunks = enrich_metadata(chunks,source_id=source_id)
logger.info(f"Processed {len(texts)} texts -> {len(chunks)} chunks")
return chunks
def process_file(file_path: str, display_name: str | None = None) -> list[Document]:
"""End to end ingestion of file path. display_name overrides the temp path as source_id."""
docs = load_file(file_path)
texts = [d.page_content for d in docs]
metas = [d.metadata for d in docs]
source = display_name if display_name else file_path
return process_texts(texts, metas, source_id=source)
print("[document_processor] Module ready") |