| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524 |
- import os
- import re
- import argparse
- import fitz # PyMuPDF
- import uuid
- import json
- import requests
- from typing import List, Tuple, Optional
- from pdfminer.high_level import extract_text as pdfminer_extract_text
- from qdrant_client import QdrantClient
- from qdrant_client.http import models as qmodels
- from tqdm import tqdm
- # import ocrmypdf
- import shutil, subprocess, tempfile
- # ---------------- CLI ----------------
- def get_args():
- p = argparse.ArgumentParser(description="Ingest PDFs -> chunks -> embeddings via Ollama -> Qdrant")
- p.add_argument("--pdf-dir", default="./pdfs", help="Folder with PDFs")
- p.add_argument("--qdrant-url", default=os.getenv("QDRANT_URL", "http://localhost:6333"))
- p.add_argument("--collection", default=os.getenv("QDRANT_COLLECTION", "planning_docs"))
- p.add_argument("--ollama-url", default=os.getenv("OLLAMA_URL", "http://192.168.8.73:11434"))
- p.add_argument("--embed-model", default=os.getenv("EMBED_MODEL", "nomic-embed-text"))
- p.add_argument("--chunk-words", type=int, default=400)
- p.add_argument("--overlap-words", type=int, default=60)
- p.add_argument("--batch-size", type=int, default=128)
- # New controls
- p.add_argument("--corpus", choices=["auto", "tps", "lps", "ncc", "as"], default="auto",
- help="Corpus label for all PDFs in this run (auto = infer per file).")
- p.add_argument("--council", default=None,
- help="Council token for LPS (e.g., 'brighton'). If omitted, inferred from filename.")
- p.add_argument("--wipe-existing", action="store_true",
- help="Delete existing points for each file before re-ingesting.")
- # Optional structured extraction
- p.add_argument("--extract-structure", action="store_true",
- help="Run LLM extraction on clause-start chunks and store JSON in payload.")
- p.add_argument("--extract-model", default=os.getenv("EXTRACT_MODEL", "llama3.1:8b"),
- help="Ollama model for structured extraction.")
- p.add_argument("--extract-max-per-file", type=int, default=200,
- help="Max extractions per file (safety cap).")
- p.add_argument("--json-retries", type=int, default=1,
- help="Retries on bad JSON from the LLM.")
- return p.parse_args()
- # ---------------- Helpers ----------------
- # Clause-like tokens such as C2.6.1 or 13.4.2
- _CLAUSE_RX = re.compile(r"\b(?:C\d+(?:\.\d+){1,3}|\d+(?:\.\d+){1,3})\b")
- _AS_RX = re.compile(r"\bAS\s*([0-9]{2,4}(?:\.[0-9]+)?)\s*[:\-]?\s*(\d{4})?", re.IGNORECASE)
- def make_point_id(source_file: str, page_no: int, chunk_index: int) -> str:
- ns = uuid.uuid5(uuid.NAMESPACE_DNS, "modulos.ai/ingest")
- return str(uuid.uuid5(ns, f"{source_file}:{page_no}:{chunk_index}"))
- def clean_text(s: str) -> str:
- return " ".join(s.replace("\u00ad", "").split())
- def extract_text_pymupdf(pdf_path: str) -> List[Tuple[int, str]]:
- """Return list of (page_number, text) with 1-based page numbers."""
- doc = fitz.open(pdf_path)
- pages = []
- for i, page in enumerate(doc):
- try:
- txt = page.get_text("text")
- except Exception:
- txt = ""
- pages.append((i + 1, txt or ""))
- return pages
- def fallback_pdfminer(pdf_path: str) -> str:
- try:
- return pdfminer_extract_text(pdf_path) or ""
- except Exception:
- return ""
- def try_repair_pdf(pdf_path: str) -> Optional[str]:
- """Try to rewrite/repair the PDF via qpdf or ghostscript and return a temp path if successful."""
- tmpdir = tempfile.mkdtemp(prefix="pdf_fix_")
- fixed = None
- qpdf_bin = shutil.which("qpdf")
- if qpdf_bin:
- out1 = os.path.join(tmpdir, "qpdf_fixed.pdf")
- r = subprocess.run([qpdf_bin, "--linearize", pdf_path, out1],
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- if r.returncode == 0 and os.path.exists(out1) and os.path.getsize(out1) > 0:
- fixed = out1
- if not fixed:
- gs_bin = shutil.which("gs") or shutil.which("ghostscript")
- if gs_bin:
- out2 = os.path.join(tmpdir, "gs_fixed.pdf")
- cmd = [
- gs_bin, "-dSAFER", "-dBATCH", "-dNOPAUSE",
- "-sDEVICE=pdfwrite", "-dCompatibilityLevel=1.6",
- f"-sOutputFile={out2}", pdf_path
- ]
- r = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- if r.returncode == 0 and os.path.exists(out2) and os.path.getsize(out2) > 0:
- fixed = out2
- return fixed
- def extract_with_pypdfium2(pdf_path: str) -> List[Tuple[int, str]]:
- """Optional fallback using PyPDFium2 if available."""
- try:
- import pypdfium2 as pdfium
- except Exception:
- return []
- pages = []
- try:
- doc = pdfium.PdfDocument(pdf_path)
- for i in range(len(doc)):
- page = doc.get_page(i)
- textpage = page.get_textpage()
- txt = textpage.get_text_bounded()
- pages.append((i + 1, txt or ""))
- textpage.close()
- page.close()
- except Exception:
- return []
- return pages
- def pagewise_or_fallback(pdf_path: str) -> List[Tuple[int, str]]:
- """
- Robust text extraction:
- 1) PyMuPDF pagewise
- 2) pdfminer whole-doc
- 3) PyPDFium2 pagewise (if installed)
- 4) repair via qpdf/ghostscript and retry 1–3
- 5) (optional) OCR via ocrmypdf and retry 1–3
- Returns list of (page_number, text) or [] if totally unextractable.
- """
- def nonempty(pages): return sum(len(t) for _, t in pages) > 50
- # 1) PyMuPDF
- pages = extract_text_pymupdf(pdf_path)
- if nonempty(pages):
- return pages
- # 2) pdfminer whole
- whole = fallback_pdfminer(pdf_path)
- if whole.strip():
- return [(0, clean_text(whole))]
- # 3) PyPDFium2
- pf = extract_with_pypdfium2(pdf_path)
- if nonempty(pf):
- return pf
- # 4) repair & retry
- fixed = try_repair_pdf(pdf_path)
- if fixed:
- pages = extract_text_pymupdf(fixed)
- if nonempty(pages):
- return pages
- whole = fallback_pdfminer(fixed)
- if whole.strip():
- return [(0, clean_text(whole))]
- pf = extract_with_pypdfium2(fixed)
- if nonempty(pf):
- return pf
- # 5) optional OCR (enable by flipping the flag below)
- OCR_IF_NEEDED = True # set True if you want automatic OCR fallback
- if OCR_IF_NEEDED and shutil.which("ocrmypdf"):
- jobs = str(max(1, (os.cpu_count() or 1))) # e.g., cap it: min(os.cpu_count() or 1, 6)
- ocrd = tempfile.mkdtemp(prefix="pdf_ocr_")
- ocr_out = os.path.join(ocrd, "ocr.pdf")
- def run_ocr(args):
- return subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- try:
- # First pass: only OCR pages that lack text
- base_cmd = [
- "ocrmypdf",
- "--skip-text",
- "--rotate-pages",
- "--jobs", jobs,
- "--optimize", "3",
- "--output-type", "pdf",
- "-l", "eng",
- pdf_path, ocr_out,
- ]
- r = run_ocr(base_cmd)
- need_force = not (r.returncode == 0 and os.path.exists(ocr_out) and os.path.getsize(ocr_out) > 0)
- # If the output exists but extraction is still empty, we’ll detect that below
- def try_extract(p):
- _pages = extract_text_pymupdf(p)
- if sum(len(t) for _, t in _pages) > 50:
- return _pages
- _whole = fallback_pdfminer(p)
- if _whole.strip():
- return [(0, clean_text(_whole))]
- return []
- pages = []
- if r.returncode == 0 and os.path.exists(ocr_out):
- pages = try_extract(ocr_out)
- # If skip-text didn’t help (e.g., corrupt text layer), force OCR once
- if not pages:
- ocr_force = os.path.join(ocrd, "ocr_force.pdf")
- r2 = run_ocr([
- "ocrmypdf",
- "--force-ocr",
- "--rotate-pages",
- "--jobs", jobs,
- "--optimize", "3",
- "--output-type", "pdf",
- "-l", "eng",
- pdf_path, ocr_force,
- ])
- if r2.returncode == 0 and os.path.exists(ocr_force):
- pages = try_extract(ocr_force)
- if pages:
- return pages
- finally:
- # Clean up temp files
- try:
- shutil.rmtree(ocrd, ignore_errors=True)
- except Exception:
- pass
- # final: let caller see empty result
- return pages
- def chunk_words(text: str, chunk_size=400, overlap=60) -> List[str]:
- words = text.split()
- chunks = []
- step = max(1, chunk_size - overlap)
- for i in range(0, len(words), step):
- chunk = " ".join(words[i:i + chunk_size])
- if chunk.strip():
- chunks.append(chunk)
- return chunks
- def ollama_embed(ollama_url: str, model: str, text: str) -> List[float]:
- r = requests.post(f"{ollama_url}/api/embeddings",
- json={"model": model, "prompt": text},
- timeout=60)
- r.raise_for_status()
- data = r.json()
- if "embedding" not in data:
- raise RuntimeError(f"Ollama embeddings error: {data}")
- return data["embedding"]
- def determine_dim(ollama_url: str, model: str) -> int:
- emb = ollama_embed(ollama_url, model, "test")
- return len(emb)
- def ensure_collection(qc: QdrantClient, name: str, dim: int):
- try:
- qc.create_collection(
- collection_name=name,
- vectors_config=qmodels.VectorParams(size=dim, distance=qmodels.Distance.COSINE)
- )
- print(f"[qdrant] created collection '{name}' size={dim}")
- except Exception:
- # already exists or compatible — upsert will fail loudly if size mismatches
- pass
- def ensure_payload_indexes(qc: QdrantClient, collection: str):
- for field in ("corpus", "council", "source_file", "clauses", "clause_id", "doc_type", "title"):
- try:
- qc.create_payload_index(
- collection_name=collection,
- field_name=field,
- field_schema=qmodels.PayloadSchemaType.KEYWORD
- )
- except Exception:
- pass
- def delete_points_for_file(qc: QdrantClient, collection: str, source_file: str):
- filt = qmodels.Filter(must=[
- qmodels.FieldCondition(key="source_file", match=qmodels.MatchText(text=source_file))
- ])
- qc.delete(collection_name=collection, points_selector=qmodels.FilterSelector(filter=filt))
- print(f"[qdrant] wiped existing points for {source_file}")
- # -------- Structured extraction (optional) --------
- def llm_extract_structured(ollama_url: str, model: str, corpus: str, text: str,
- retries: int = 1) -> Optional[dict]:
- schema_hint = {
- "tps": "Tasmanian Planning Scheme (SPP)",
- "lps": "Local Provisions Schedule (LPS)",
- "ncc": "National Construction Code",
- "as": "Australian Standard"
- }.get(corpus, "Regulatory Document")
- template = {
- "doc_type": corpus,
- "clause_id": "",
- "title": "",
- "purpose": "",
- "application_scope": "",
- "performance_criteria": [],
- "acceptable_solutions": [],
- "exemptions": "",
- "cross_refs": [],
- "version": "",
- "volume": "",
- "year": ""
- }
- prompt = f"""
- You extract structured fields from regulatory clauses. Return ONLY valid JSON matching this template:
- {json.dumps(template, ensure_ascii=False, indent=2)}
- Guidance:
- - Prefer exact clause IDs and titles if present.
- - If a field is absent, keep it empty or [].
- - Use short faithful fragments; no speculation.
- - Only include cross_refs explicitly present.
- EXCERPT:
- {text}
- """.strip()
- for _ in range(max(1, retries + 1)):
- r = requests.post(f"{ollama_url}/api/generate", json={
- "model": model,
- "prompt": prompt,
- "stream": False,
- "options": {
- "temperature": 0.1,
- "num_ctx": 4096,
- "num_predict": 400,
- "keep_alive": "5m"
- }
- }, timeout=120)
- r.raise_for_status()
- resp = r.json().get("response", "").strip().strip("`").strip()
- try:
- obj = json.loads(resp)
- if isinstance(obj, dict) and obj.get("doc_type"):
- return obj
- except Exception:
- pass
- return None
- # ---------------- Classification & metadata ----------------
- def infer_corpus_and_meta(fname: str, forced_corpus: str = "auto", council_cli: Optional[str] = None):
- """
- Returns (corpus, council, extra_meta)
- corpus ∈ {tps, lps, ncc, as}
- council only when LPS
- extra_meta may include ncc_volume, as_code, as_year, year
- """
- lower = fname.lower()
- base = os.path.splitext(os.path.basename(lower))[0]
- corpus = None
- council = None
- extra = {}
- if forced_corpus != "auto":
- corpus = forced_corpus
- else:
- if ("local" in lower and "provision" in lower) or "lps" in lower:
- corpus = "lps"
- elif ("ncc" in lower or
- "national construction code" in lower or
- "volume one" in lower or "volume two" in lower or "volume three" in lower):
- corpus = "ncc"
- elif base.startswith("as") or "australian standard" in lower or _AS_RX.search(fname):
- corpus = "as"
- else:
- corpus = "tps"
- if corpus == "lps":
- council = (council_cli or base.split("_")[0].split("-")[0]).strip()
- if corpus == "ncc":
- if ("volume one" in lower or "vol 1" in lower or "vol1" in lower or
- re.search(r"\bvolume\s*1\b", lower)):
- extra["ncc_volume"] = "Volume One"
- elif ("volume two" in lower or "vol 2" in lower or "vol2" in lower or
- re.search(r"\bvolume\s*2\b", lower)):
- extra["ncc_volume"] = "Volume Two"
- elif ("volume three" in lower or "vol 3" in lower or "vol3" in lower or
- re.search(r"\bvolume\s*3\b", lower)):
- extra["ncc_volume"] = "Volume Three"
- m_year = re.search(r"\b(20\d{2}|19\d{2})\b", lower)
- if m_year:
- extra["year"] = m_year.group(1)
- if corpus == "as":
- m = _AS_RX.search(fname)
- if m:
- extra["as_code"] = m.group(1)
- if m.group(2):
- extra["as_year"] = m.group(2)
- return corpus, council, extra
- # ---------------- Upsert ----------------
- def upsert_batch(qc: QdrantClient, collection: str, ids, vectors, payloads):
- qc.upsert(
- collection_name=collection,
- points=qmodels.Batch(ids=ids, vectors=vectors, payloads=payloads)
- )
- # ---------------- Main ingest ----------------
- def ingest_folder(pdf_dir: str, ollama_url: str, embed_model: str, qdrant_url: str, collection: str,
- chunk_words_size: int, overlap_words: int, batch_size: int,
- forced_corpus: str, council_cli: Optional[str], wipe_existing: bool,
- extract_structure: bool, extract_model: str, extract_max_per_file: int, json_retries: int):
- qc = QdrantClient(url=qdrant_url)
- dim = determine_dim(ollama_url, embed_model)
- ensure_collection(qc, collection, dim)
- ensure_payload_indexes(qc, collection)
- pdfs = [f for f in os.listdir(pdf_dir) if f.lower().endswith(".pdf")]
- if not pdfs:
- print(f"No PDFs found in {pdf_dir}")
- return
- for fname in pdfs:
- path = os.path.join(pdf_dir, fname)
- corpus, council, extra = infer_corpus_and_meta(fname, forced_corpus, council_cli)
- print(f"\nProcessing {path} -> corpus={corpus} council={council or '-'} meta={extra}")
- if wipe_existing:
- delete_points_for_file(qc, collection, fname)
- pages = pagewise_or_fallback(path)
- all_chunks = []
- for page_no, txt in pages:
- txt = clean_text(txt)
- if not txt.strip():
- continue
- chunks = chunk_words(txt, chunk_size=chunk_words_size, overlap=overlap_words)
- for idx, ch in enumerate(chunks):
- all_chunks.append((page_no, idx, ch))
- if not all_chunks:
- print(f" !! No usable text extracted from {fname}")
- continue
- ids, vectors, payloads = [], [], []
- extracts_done = 0
- for page_no, cidx, ch in tqdm(all_chunks, desc=f"Embedding {fname}", unit="chunk"):
- emb = ollama_embed(ollama_url, embed_model, ch)
- point_id = make_point_id(fname, page_no, cidx)
- meta = {
- "source_file": fname,
- "page": page_no,
- "chunk_index": cidx,
- "text": ch,
- "corpus": corpus,
- "doc_type": corpus, # for filtering even without structured JSON
- }
- if council:
- meta["council"] = council
- # carry extra fields (NCC volume, AS code/year, year hints)
- for k, v in extra.items():
- meta[k] = v
- # detect a likely clause ID in this chunk
- m_clause = _CLAUSE_RX.search(ch)
- if m_clause:
- meta["clause_id"] = m_clause.group(0)
- # optional structured extraction (only on clause-starty chunks)
- if extract_structure and m_clause and extracts_done < extract_max_per_file:
- obj = llm_extract_structured(ollama_url, extract_model, corpus, ch, retries=json_retries)
- if obj:
- meta["structured"] = obj
- # If the model gave a title/ids, surface a few for filtering
- if obj.get("title"):
- meta["title"] = obj["title"]
- if obj.get("clause_id") and "clause_id" not in meta:
- meta["clause_id"] = obj["clause_id"]
- extracts_done += 1
- ids.append(point_id)
- vectors.append(emb)
- payloads.append(meta)
- if len(ids) >= batch_size:
- upsert_batch(qc, collection, ids, vectors, payloads)
- ids, vectors, payloads = [], [], []
- if ids:
- upsert_batch(qc, collection, ids, vectors, payloads)
- print(f" ✓ Ingested {len(all_chunks)} chunks from {fname}")
- # ---------------- Entrypoint ----------------
- if __name__ == "__main__":
- args = get_args()
- ingest_folder(
- pdf_dir=args.pdf_dir,
- ollama_url=args.ollama_url,
- embed_model=args.embed_model,
- qdrant_url=args.qdrant_url,
- collection=args.collection,
- chunk_words_size=args.chunk_words,
- overlap_words=args.overlap_words,
- batch_size=args.batch_size,
- forced_corpus=args.corpus,
- council_cli=args.council,
- wipe_existing=args.wipe_existing,
- extract_structure=args.extract_structure,
- extract_model=args.extract_model,
- extract_max_per_file=args.extract_max_per_file,
- json_retries=args.json_retries,
- )
|