import os import re import argparse import fitz # PyMuPDF import uuid import json import requests from typing import List, Tuple, Optional from pdfminer.high_level import extract_text as pdfminer_extract_text from qdrant_client import QdrantClient from qdrant_client.http import models as qmodels from tqdm import tqdm # import ocrmypdf import shutil, subprocess, tempfile # ---------------- CLI ---------------- def get_args(): p = argparse.ArgumentParser(description="Ingest PDFs -> chunks -> embeddings via Ollama -> Qdrant") p.add_argument("--pdf-dir", default="./pdfs", help="Folder with PDFs") p.add_argument("--qdrant-url", default=os.getenv("QDRANT_URL", "http://localhost:6333")) p.add_argument("--collection", default=os.getenv("QDRANT_COLLECTION", "planning_docs")) p.add_argument("--ollama-url", default=os.getenv("OLLAMA_URL", "http://192.168.8.73:11434")) p.add_argument("--embed-model", default=os.getenv("EMBED_MODEL", "nomic-embed-text")) p.add_argument("--chunk-words", type=int, default=400) p.add_argument("--overlap-words", type=int, default=60) p.add_argument("--batch-size", type=int, default=128) # New controls p.add_argument("--corpus", choices=["auto", "tps", "lps", "ncc", "as"], default="auto", help="Corpus label for all PDFs in this run (auto = infer per file).") p.add_argument("--council", default=None, help="Council token for LPS (e.g., 'brighton'). If omitted, inferred from filename.") p.add_argument("--wipe-existing", action="store_true", help="Delete existing points for each file before re-ingesting.") # Optional structured extraction p.add_argument("--extract-structure", action="store_true", help="Run LLM extraction on clause-start chunks and store JSON in payload.") p.add_argument("--extract-model", default=os.getenv("EXTRACT_MODEL", "llama3.1:8b"), help="Ollama model for structured extraction.") p.add_argument("--extract-max-per-file", type=int, default=200, help="Max extractions per file (safety cap).") p.add_argument("--json-retries", type=int, default=1, help="Retries on bad JSON from the LLM.") return p.parse_args() # ---------------- Helpers ---------------- # Clause-like tokens such as C2.6.1 or 13.4.2 _CLAUSE_RX = re.compile(r"\b(?:C\d+(?:\.\d+){1,3}|\d+(?:\.\d+){1,3})\b") _AS_RX = re.compile(r"\bAS\s*([0-9]{2,4}(?:\.[0-9]+)?)\s*[:\-]?\s*(\d{4})?", re.IGNORECASE) def make_point_id(source_file: str, page_no: int, chunk_index: int) -> str: ns = uuid.uuid5(uuid.NAMESPACE_DNS, "modulos.ai/ingest") return str(uuid.uuid5(ns, f"{source_file}:{page_no}:{chunk_index}")) def clean_text(s: str) -> str: return " ".join(s.replace("\u00ad", "").split()) def extract_text_pymupdf(pdf_path: str) -> List[Tuple[int, str]]: """Return list of (page_number, text) with 1-based page numbers.""" doc = fitz.open(pdf_path) pages = [] for i, page in enumerate(doc): try: txt = page.get_text("text") except Exception: txt = "" pages.append((i + 1, txt or "")) return pages def fallback_pdfminer(pdf_path: str) -> str: try: return pdfminer_extract_text(pdf_path) or "" except Exception: return "" def try_repair_pdf(pdf_path: str) -> Optional[str]: """Try to rewrite/repair the PDF via qpdf or ghostscript and return a temp path if successful.""" tmpdir = tempfile.mkdtemp(prefix="pdf_fix_") fixed = None qpdf_bin = shutil.which("qpdf") if qpdf_bin: out1 = os.path.join(tmpdir, "qpdf_fixed.pdf") r = subprocess.run([qpdf_bin, "--linearize", pdf_path, out1], stdout=subprocess.PIPE, stderr=subprocess.PIPE) if r.returncode == 0 and os.path.exists(out1) and os.path.getsize(out1) > 0: fixed = out1 if not fixed: gs_bin = shutil.which("gs") or shutil.which("ghostscript") if gs_bin: out2 = os.path.join(tmpdir, "gs_fixed.pdf") cmd = [ gs_bin, "-dSAFER", "-dBATCH", "-dNOPAUSE", "-sDEVICE=pdfwrite", "-dCompatibilityLevel=1.6", f"-sOutputFile={out2}", pdf_path ] r = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if r.returncode == 0 and os.path.exists(out2) and os.path.getsize(out2) > 0: fixed = out2 return fixed def extract_with_pypdfium2(pdf_path: str) -> List[Tuple[int, str]]: """Optional fallback using PyPDFium2 if available.""" try: import pypdfium2 as pdfium except Exception: return [] pages = [] try: doc = pdfium.PdfDocument(pdf_path) for i in range(len(doc)): page = doc.get_page(i) textpage = page.get_textpage() txt = textpage.get_text_bounded() pages.append((i + 1, txt or "")) textpage.close() page.close() except Exception: return [] return pages def pagewise_or_fallback(pdf_path: str) -> List[Tuple[int, str]]: """ Robust text extraction: 1) PyMuPDF pagewise 2) pdfminer whole-doc 3) PyPDFium2 pagewise (if installed) 4) repair via qpdf/ghostscript and retry 1–3 5) (optional) OCR via ocrmypdf and retry 1–3 Returns list of (page_number, text) or [] if totally unextractable. """ def nonempty(pages): return sum(len(t) for _, t in pages) > 50 # 1) PyMuPDF pages = extract_text_pymupdf(pdf_path) if nonempty(pages): return pages # 2) pdfminer whole whole = fallback_pdfminer(pdf_path) if whole.strip(): return [(0, clean_text(whole))] # 3) PyPDFium2 pf = extract_with_pypdfium2(pdf_path) if nonempty(pf): return pf # 4) repair & retry fixed = try_repair_pdf(pdf_path) if fixed: pages = extract_text_pymupdf(fixed) if nonempty(pages): return pages whole = fallback_pdfminer(fixed) if whole.strip(): return [(0, clean_text(whole))] pf = extract_with_pypdfium2(fixed) if nonempty(pf): return pf # 5) optional OCR (enable by flipping the flag below) OCR_IF_NEEDED = True # set True if you want automatic OCR fallback if OCR_IF_NEEDED and shutil.which("ocrmypdf"): jobs = str(max(1, (os.cpu_count() or 1))) # e.g., cap it: min(os.cpu_count() or 1, 6) ocrd = tempfile.mkdtemp(prefix="pdf_ocr_") ocr_out = os.path.join(ocrd, "ocr.pdf") def run_ocr(args): return subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) try: # First pass: only OCR pages that lack text base_cmd = [ "ocrmypdf", "--skip-text", "--rotate-pages", "--jobs", jobs, "--optimize", "3", "--output-type", "pdf", "-l", "eng", pdf_path, ocr_out, ] r = run_ocr(base_cmd) need_force = not (r.returncode == 0 and os.path.exists(ocr_out) and os.path.getsize(ocr_out) > 0) # If the output exists but extraction is still empty, we’ll detect that below def try_extract(p): _pages = extract_text_pymupdf(p) if sum(len(t) for _, t in _pages) > 50: return _pages _whole = fallback_pdfminer(p) if _whole.strip(): return [(0, clean_text(_whole))] return [] pages = [] if r.returncode == 0 and os.path.exists(ocr_out): pages = try_extract(ocr_out) # If skip-text didn’t help (e.g., corrupt text layer), force OCR once if not pages: ocr_force = os.path.join(ocrd, "ocr_force.pdf") r2 = run_ocr([ "ocrmypdf", "--force-ocr", "--rotate-pages", "--jobs", jobs, "--optimize", "3", "--output-type", "pdf", "-l", "eng", pdf_path, ocr_force, ]) if r2.returncode == 0 and os.path.exists(ocr_force): pages = try_extract(ocr_force) if pages: return pages finally: # Clean up temp files try: shutil.rmtree(ocrd, ignore_errors=True) except Exception: pass # final: let caller see empty result return pages def chunk_words(text: str, chunk_size=400, overlap=60) -> List[str]: words = text.split() chunks = [] step = max(1, chunk_size - overlap) for i in range(0, len(words), step): chunk = " ".join(words[i:i + chunk_size]) if chunk.strip(): chunks.append(chunk) return chunks def ollama_embed(ollama_url: str, model: str, text: str) -> List[float]: r = requests.post(f"{ollama_url}/api/embeddings", json={"model": model, "prompt": text}, timeout=60) r.raise_for_status() data = r.json() if "embedding" not in data: raise RuntimeError(f"Ollama embeddings error: {data}") return data["embedding"] def determine_dim(ollama_url: str, model: str) -> int: emb = ollama_embed(ollama_url, model, "test") return len(emb) def ensure_collection(qc: QdrantClient, name: str, dim: int): try: qc.create_collection( collection_name=name, vectors_config=qmodels.VectorParams(size=dim, distance=qmodels.Distance.COSINE) ) print(f"[qdrant] created collection '{name}' size={dim}") except Exception: # already exists or compatible — upsert will fail loudly if size mismatches pass def ensure_payload_indexes(qc: QdrantClient, collection: str): for field in ("corpus", "council", "source_file", "clauses", "clause_id", "doc_type", "title"): try: qc.create_payload_index( collection_name=collection, field_name=field, field_schema=qmodels.PayloadSchemaType.KEYWORD ) except Exception: pass def delete_points_for_file(qc: QdrantClient, collection: str, source_file: str): filt = qmodels.Filter(must=[ qmodels.FieldCondition(key="source_file", match=qmodels.MatchText(text=source_file)) ]) qc.delete(collection_name=collection, points_selector=qmodels.FilterSelector(filter=filt)) print(f"[qdrant] wiped existing points for {source_file}") # -------- Structured extraction (optional) -------- def llm_extract_structured(ollama_url: str, model: str, corpus: str, text: str, retries: int = 1) -> Optional[dict]: schema_hint = { "tps": "Tasmanian Planning Scheme (SPP)", "lps": "Local Provisions Schedule (LPS)", "ncc": "National Construction Code", "as": "Australian Standard" }.get(corpus, "Regulatory Document") template = { "doc_type": corpus, "clause_id": "", "title": "", "purpose": "", "application_scope": "", "performance_criteria": [], "acceptable_solutions": [], "exemptions": "", "cross_refs": [], "version": "", "volume": "", "year": "" } prompt = f""" You extract structured fields from regulatory clauses. Return ONLY valid JSON matching this template: {json.dumps(template, ensure_ascii=False, indent=2)} Guidance: - Prefer exact clause IDs and titles if present. - If a field is absent, keep it empty or []. - Use short faithful fragments; no speculation. - Only include cross_refs explicitly present. EXCERPT: {text} """.strip() for _ in range(max(1, retries + 1)): r = requests.post(f"{ollama_url}/api/generate", json={ "model": model, "prompt": prompt, "stream": False, "options": { "temperature": 0.1, "num_ctx": 4096, "num_predict": 400, "keep_alive": "5m" } }, timeout=120) r.raise_for_status() resp = r.json().get("response", "").strip().strip("`").strip() try: obj = json.loads(resp) if isinstance(obj, dict) and obj.get("doc_type"): return obj except Exception: pass return None # ---------------- Classification & metadata ---------------- def infer_corpus_and_meta(fname: str, forced_corpus: str = "auto", council_cli: Optional[str] = None): """ Returns (corpus, council, extra_meta) corpus ∈ {tps, lps, ncc, as} council only when LPS extra_meta may include ncc_volume, as_code, as_year, year """ lower = fname.lower() base = os.path.splitext(os.path.basename(lower))[0] corpus = None council = None extra = {} if forced_corpus != "auto": corpus = forced_corpus else: if ("local" in lower and "provision" in lower) or "lps" in lower: corpus = "lps" elif ("ncc" in lower or "national construction code" in lower or "volume one" in lower or "volume two" in lower or "volume three" in lower): corpus = "ncc" elif base.startswith("as") or "australian standard" in lower or _AS_RX.search(fname): corpus = "as" else: corpus = "tps" if corpus == "lps": council = (council_cli or base.split("_")[0].split("-")[0]).strip() if corpus == "ncc": if ("volume one" in lower or "vol 1" in lower or "vol1" in lower or re.search(r"\bvolume\s*1\b", lower)): extra["ncc_volume"] = "Volume One" elif ("volume two" in lower or "vol 2" in lower or "vol2" in lower or re.search(r"\bvolume\s*2\b", lower)): extra["ncc_volume"] = "Volume Two" elif ("volume three" in lower or "vol 3" in lower or "vol3" in lower or re.search(r"\bvolume\s*3\b", lower)): extra["ncc_volume"] = "Volume Three" m_year = re.search(r"\b(20\d{2}|19\d{2})\b", lower) if m_year: extra["year"] = m_year.group(1) if corpus == "as": m = _AS_RX.search(fname) if m: extra["as_code"] = m.group(1) if m.group(2): extra["as_year"] = m.group(2) return corpus, council, extra # ---------------- Upsert ---------------- def upsert_batch(qc: QdrantClient, collection: str, ids, vectors, payloads): qc.upsert( collection_name=collection, points=qmodels.Batch(ids=ids, vectors=vectors, payloads=payloads) ) # ---------------- Main ingest ---------------- def ingest_folder(pdf_dir: str, ollama_url: str, embed_model: str, qdrant_url: str, collection: str, chunk_words_size: int, overlap_words: int, batch_size: int, forced_corpus: str, council_cli: Optional[str], wipe_existing: bool, extract_structure: bool, extract_model: str, extract_max_per_file: int, json_retries: int): qc = QdrantClient(url=qdrant_url) dim = determine_dim(ollama_url, embed_model) ensure_collection(qc, collection, dim) ensure_payload_indexes(qc, collection) pdfs = [f for f in os.listdir(pdf_dir) if f.lower().endswith(".pdf")] if not pdfs: print(f"No PDFs found in {pdf_dir}") return for fname in pdfs: path = os.path.join(pdf_dir, fname) corpus, council, extra = infer_corpus_and_meta(fname, forced_corpus, council_cli) print(f"\nProcessing {path} -> corpus={corpus} council={council or '-'} meta={extra}") if wipe_existing: delete_points_for_file(qc, collection, fname) pages = pagewise_or_fallback(path) all_chunks = [] for page_no, txt in pages: txt = clean_text(txt) if not txt.strip(): continue chunks = chunk_words(txt, chunk_size=chunk_words_size, overlap=overlap_words) for idx, ch in enumerate(chunks): all_chunks.append((page_no, idx, ch)) if not all_chunks: print(f" !! No usable text extracted from {fname}") continue ids, vectors, payloads = [], [], [] extracts_done = 0 for page_no, cidx, ch in tqdm(all_chunks, desc=f"Embedding {fname}", unit="chunk"): emb = ollama_embed(ollama_url, embed_model, ch) point_id = make_point_id(fname, page_no, cidx) meta = { "source_file": fname, "page": page_no, "chunk_index": cidx, "text": ch, "corpus": corpus, "doc_type": corpus, # for filtering even without structured JSON } if council: meta["council"] = council # carry extra fields (NCC volume, AS code/year, year hints) for k, v in extra.items(): meta[k] = v # detect a likely clause ID in this chunk m_clause = _CLAUSE_RX.search(ch) if m_clause: meta["clause_id"] = m_clause.group(0) # optional structured extraction (only on clause-starty chunks) if extract_structure and m_clause and extracts_done < extract_max_per_file: obj = llm_extract_structured(ollama_url, extract_model, corpus, ch, retries=json_retries) if obj: meta["structured"] = obj # If the model gave a title/ids, surface a few for filtering if obj.get("title"): meta["title"] = obj["title"] if obj.get("clause_id") and "clause_id" not in meta: meta["clause_id"] = obj["clause_id"] extracts_done += 1 ids.append(point_id) vectors.append(emb) payloads.append(meta) if len(ids) >= batch_size: upsert_batch(qc, collection, ids, vectors, payloads) ids, vectors, payloads = [], [], [] if ids: upsert_batch(qc, collection, ids, vectors, payloads) print(f" ✓ Ingested {len(all_chunks)} chunks from {fname}") # ---------------- Entrypoint ---------------- if __name__ == "__main__": args = get_args() ingest_folder( pdf_dir=args.pdf_dir, ollama_url=args.ollama_url, embed_model=args.embed_model, qdrant_url=args.qdrant_url, collection=args.collection, chunk_words_size=args.chunk_words, overlap_words=args.overlap_words, batch_size=args.batch_size, forced_corpus=args.corpus, council_cli=args.council, wipe_existing=args.wipe_existing, extract_structure=args.extract_structure, extract_model=args.extract_model, extract_max_per_file=args.extract_max_per_file, json_retries=args.json_retries, )