import os, re import json import requests import time from typing import Optional, Literal, List, Tuple from fastapi import FastAPI, Query, HTTPException, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse from slowapi.middleware import SlowAPIMiddleware from slowapi import Limiter from slowapi.util import get_remote_address from slowapi.errors import RateLimitExceeded from fastapi.responses import JSONResponse from pydantic import BaseModel from qdrant_client import QdrantClient from qdrant_client.http import models as qmodels from collections import Counter, defaultdict from datetime import datetime from telemetry import router as telemetry_router, db, ip_hash # ---- Environment ---- OLLAMA_URL = os.getenv("OLLAMA_URL", "http://192.168.8.73:11434") QDRANT_URL = os.getenv("QDRANT_URL", "http://localhost:6333") OLLAMA_KEEP_ALIVE = -1 COLLECTION = os.getenv("QDRANT_COLLECTION", "planning_docs") EMBED_MODEL = os.getenv("EMBED_MODEL", "nomic-embed-text") CHAT_MODEL = os.getenv("CHAT_MODEL", "llama3.1:8b") CORS_ORIGINS = [o.strip() for o in os.getenv("CORS_ORIGINS", "https://tasplanning.report").split(",") if o.strip()] # ---- DEMO TOKEN ---- DEMO_REQUIRE_TOKEN = os.getenv("DEMO_REQUIRE_TOKEN", "0") == "1" DEMO_TOKEN = os.getenv("DEMO_TOKEN", "") def _verify_demo_token_if_needed(request): if not DEMO_REQUIRE_TOKEN: return auth = request.headers.get("Authorization", "") if not (auth.startswith("Bearer ") and auth.split(" ",1)[1] == DEMO_TOKEN): raise HTTPException(status_code=401, detail="Unauthorized") # ---- FAST API ---- app = FastAPI() # Allowed origins — always include your frontend explicitly _origins = CORS_ORIGINS if CORS_ORIGINS else [] _allow_all = len(_origins) == 0 app.add_middleware( CORSMiddleware, allow_origins=_origins if not _allow_all else ["*"], allow_origin_regex=r"https://.*\.tasplanning\.report" if _allow_all else None, allow_credentials=not _allow_all, # credentials only when origins are explicit allow_methods=["GET", "POST", "OPTIONS"], allow_headers=["Content-Type", "Authorization", "X-TPR-SID"], expose_headers=["X-TPR-SID"], ) qc = QdrantClient(url=QDRANT_URL) app.include_router(telemetry_router) # ---- SLOW API ---- limiter = Limiter(key_func=get_remote_address) app.state.limiter = limiter # type: ignore app.add_middleware(SlowAPIMiddleware) @app.exception_handler(RateLimitExceeded) def ratelimit_handler(request, exc): return JSONResponse(status_code=429, content={"error":"rate_limited","detail":"Too many requests"}) # ---- Ollama helpers ---- def slug(s: Optional[str]) -> Optional[str]: if not s: return None return re.sub(r'[^a-z0-9]+', '-', s.strip().lower()).strip('-') or None def ollama_embed(text: str) -> List[float]: r = requests.post( f"{OLLAMA_URL}/api/embeddings", json={"model": EMBED_MODEL, "prompt": text}, timeout=60 ) r.raise_for_status() data = r.json() if "embedding" not in data: raise RuntimeError(f"Ollama embeddings error: {data}") return data["embedding"] def ollama_chat(prompt: str) -> str: r = requests.post( f"{OLLAMA_URL}/api/generate", json={ "model": CHAT_MODEL, "prompt": prompt, "stream": False, "options": { "num_ctx": 8192, "num_predict": 512, "temperature": 0.2, "top_p": 0.9, "repeat_penalty": 1.1, }, "keep_alive": OLLAMA_KEEP_ALIVE # ← moved outside options, uses env var }, timeout=180 ) r.raise_for_status() data = r.json() return data.get("response", "").strip() def _scroll_points(collection: str, qfilter=None, include_vector: bool=False, page_size: int=200): offset = None while True: points, offset = qc.scroll( collection_name=collection, limit=page_size, with_payload=True, with_vectors=include_vector, offset=offset, scroll_filter=qfilter ) if not points: break for pt in points: yield pt if offset is None: break # ---- Health ---- @app.get("/readyz") def readyz(): return {"ok": True} def _normalize(q: Optional[str]) -> str: return re.sub(r"\s+", " ", (q or "").strip().lower()) def _json_dumps(o) -> str: return json.dumps(o, ensure_ascii=False, separators=(",",":")) # ---- Councils list (prefers payload 'council', falls back to filename token) ---- @app.get("/councils") def councils(): councils = set() offset = None # sample up to ~5k points (50 * 100) for _ in range(50): points, offset = qc.scroll( collection_name=COLLECTION, limit=100, with_payload=True, offset=offset ) for pt in points: p = pt.payload or {} token = (p.get("council") or "").strip().lower() if not token: sf = (p.get("source_file") or "").lower() if sf: token = sf.replace(".pdf", "").split("_")[0].split("-")[0] if token: councils.add(token) if offset is None: break return sorted(councils) # ---- Filter builders ---- def _mv(key: str, value: str) -> qmodels.FieldCondition: return qmodels.FieldCondition(key=key, match=qmodels.MatchValue(value=value)) def _mt(key: str, text: str) -> qmodels.FieldCondition: return qmodels.FieldCondition(key=key, match=qmodels.MatchText(text=text)) def filter_tps() -> qmodels.Filter: """TPS only, exact match on corpus.""" return qmodels.Filter(must=[_mv("corpus", "tps")]) def filter_lps(council: str) -> qmodels.Filter: """ LPS for a specific council (slug), exact match on both fields. """ cslug = slug(council) or council.lower() return qmodels.Filter(must=[_mv("corpus", "lps"), _mv("council", cslug)]) def filter_ncc() -> qmodels.Filter: return qmodels.Filter(must=[_mv("corpus", "ncc")]) def filter_as() -> qmodels.Filter: return qmodels.Filter(must=[_mv("corpus", "as")]) def with_source_contains(flt: Optional[qmodels.Filter], source_contains: Optional[str]) -> qmodels.Filter: if not source_contains: return flt add = _mt("source_file", source_contains) if flt: # preserve existing must/should/must_not and AND the filename condition must = list(getattr(flt, "must", []) or []) must.append(add) return qmodels.Filter( must=must, should=getattr(flt, "should", None), must_not=getattr(flt, "must_not", None), ) return qmodels.Filter(must=[add]) def q_search(vec: List[float], flt: Optional[qmodels.Filter], limit: int): results = qc.query_points( collection_name=COLLECTION, query=vec, limit=max(1, limit), query_filter=flt, with_payload=True, ) return results.points def render_blocks(hits) -> Tuple[List[str], List[dict]]: blocks, sources = [], [] for h in hits: p = h.payload or {} src = f"{p.get('source_file')} (p.{p.get('page')} chunk {p.get('chunk_index')})" snippet = p.get("text", "") blocks.append(f"Source: {src}\nText: {snippet}") sources.append({ "source_file": p.get("source_file"), "page": p.get("page"), "chunk_index": p.get("chunk_index"), "score": h.score }) return blocks, sources def combine_context(sections: List[Tuple[str, List[str]]]) -> str: out = [] for heading, blocks in sections: if not blocks: continue out.append(f"=== {heading} ===") out.extend(blocks) return "\n\n".join(out) if out else "No context found." def _scan_points(qfilter: Optional[qmodels.Filter] = None, max_pages: int = 10000, page_size: int = 200): """ Iterate through ALL points (filtered if qfilter given). For your current dataset this is fine; if it grows huge later we'll switch to a stored summary or a background job. """ offset = None pages = 0 while pages < max_pages: points, offset = qc.scroll( collection_name=COLLECTION, limit=page_size, with_payload=True, offset=offset, scroll_filter=qfilter ) if not points: break for pt in points: yield pt pages += 1 if offset is None: break @app.get("/admin/stats") def admin_stats(council: Optional[str] = None, corpus: Optional[str] = None): must = [] if council: must.append(qmodels.FieldCondition(key="council", match=qmodels.MatchText(text=council.lower()))) if corpus: must.append(qmodels.FieldCondition(key="corpus", match=qmodels.MatchText(text=corpus.lower()))) qfilter = qmodels.Filter(must=must) if must else None corp = Counter() councils = Counter() total = 0 for pt in _scan_points(qfilter=qfilter): p = pt.payload or {} corp[(p.get("corpus") or "").lower()] += 1 if p.get("council"): councils[(p.get("council") or "").lower()] += 1 total += 1 return { "collection": COLLECTION, "total_points": total, "by_corpus": dict(corp), "by_council": dict(councils), "note": "Counts are points (chunks), not documents.", } @app.get("/admin/files") def admin_files(council: Optional[str] = None, corpus: Optional[str] = None, contains: Optional[str] = None, limit: int = 200): must = [] if council: must.append(qmodels.FieldCondition(key="council", match=qmodels.MatchText(text=council.lower()))) if corpus: must.append(qmodels.FieldCondition(key="corpus", match=qmodels.MatchText(text=corpus.lower()))) if contains: must.append(qmodels.FieldCondition(key="source_file", match=qmodels.MatchText(text=contains))) qfilter = qmodels.Filter(must=must) if must else None files = defaultdict(lambda: {"points": 0, "corpus": None, "council": None, "pages": set()}) for pt in _scan_points(qfilter=qfilter): p = pt.payload or {} f = (p.get("source_file") or "").strip() if not f: continue rec = files[f] rec["points"] += 1 rec["corpus"] = rec["corpus"] or p.get("corpus") rec["council"] = rec["council"] or p.get("council") if p.get("page") is not None: rec["pages"].add(p["page"]) # shape for output out = [] for f, rec in files.items(): out.append({ "source_file": f, "corpus": rec["corpus"], "council": rec["council"], "points": rec["points"], "page_count_est": len(rec["pages"]) if rec["pages"] else None, }) # sort by points desc, limit out.sort(key=lambda x: x["points"], reverse=True) return out[:max(1, limit)] @app.get("/admin/sample") def admin_sample(council: Optional[str] = None, corpus: Optional[str] = None, n: int = 5): must = [] if council: must.append(qmodels.FieldCondition(key="council", match=qmodels.MatchText(text=council.lower()))) if corpus: must.append(qmodels.FieldCondition(key="corpus", match=qmodels.MatchText(text=corpus.lower()))) qfilter = qmodels.Filter(must=must) if must else None samples = [] for pt in _scan_points(qfilter=qfilter): p = pt.payload or {} txt = (p.get("text") or "").strip() if not txt: continue samples.append({ "source_file": p.get("source_file"), "corpus": p.get("corpus"), "council": p.get("council"), "page": p.get("page"), "chunk_index": p.get("chunk_index"), "preview": (txt[:400] + "…") if len(txt) > 400 else txt }) if len(samples) >= max(1, n): break return samples @app.get("/admin/export") def admin_export( collection: str = COLLECTION, council: Optional[str] = None, corpus: Optional[str] = None, source_contains: Optional[str] = None, include_vector: bool = False, limit: Optional[int] = None ): must = [] if council: must.append(qmodels.FieldCondition(key="council", match=qmodels.MatchText(text=council.lower()))) if corpus: must.append(qmodels.FieldCondition(key="corpus", match=qmodels.MatchText(text=corpus.lower()))) if source_contains: must.append(qmodels.FieldCondition(key="source_file", match=qmodels.MatchText(text=source_contains))) qfilter = qmodels.Filter(must=must) if must else None def gen(): count = 0 for pt in _scroll_points(collection, qfilter=qfilter, include_vector=include_vector): obj = { "id": str(getattr(pt, "id", None)), "payload": pt.payload or {}, } if include_vector: obj["vector"] = pt.vector yield json.dumps(obj, ensure_ascii=False) + "\n" count += 1 if limit and count >= limit: break filename = f'{collection}-{corpus or "all"}-{council or "all"}.ndjson' headers = {"Content-Disposition": f'attachment; filename="{filename}"'} return StreamingResponse(gen(), media_type="application/x-ndjson", headers=headers) def _section_format_guide(section_id: Optional[str], section_title: str, ctx: dict) -> str: """ Return strict, section-specific formatting guidance for the LLM. Keep these short, prescriptive, and impossible to ignore. """ sid = (section_id or "").lower() # Utility bits from context zones = ctx.get("planning_zones") or [] zone_label = ", ".join(zones) if zones else "the applicable zone" council_label = ctx.get("council") or "" # ---- ZONING (tables of clauses like your sample) ---- if sid in {"zoning", "zoning-41", "zoning-42", "zoning-43", "zoning-44", "zoning-441", "zoning-442"}: return f""" FORMAT REQUIREMENTS (MANDATORY): - Produce a concise preface (≤ 2 sentences) naming {zone_label}. - Then include a Markdown table listing EACH visible clause found in CONTEXT that applies to the zone or LPS for **{council_label or 'the selected council'}**. - One row per subclause. If an A/P pair exists (e.g., A1 / P1), include both in the same row. - Columns (exact): | Clause | Topic | Acceptable Solution (A) | Performance Criteria (P) | Assessment | Source | - "Clause": the clause number (e.g., "12.3.1 A1" or "DOR-S1.7.1"). - "Topic": short label extracted from the clause heading. - "Acceptable Solution (A)" and "Performance Criteria (P)": quote briefly—no more than 1–2 lines each. - "Assessment": state clearly whether the proposal meets A, or relies on P. If unknown from CONTEXT, write "TBC". - "Source": filename + page (from CONTEXT). - Only include clauses actually present in CONTEXT; NEVER invent clause numbers or text. - After the table, add a one-paragraph summary noting any items assessed as TBC or non-compliant. """.strip() # ---- Codes overview list/table (optional future) ---- if sid.startswith("code-"): return """ FORMAT REQUIREMENTS: - Start with one sentence stating which Code and why it is triggered. - Then provide a short checklist or table of the relevant sub-clauses (A vs P), with Source for each. - Keep to 150–250 words + table. """.strip() # ---- Permit Overview (concise triggers) ---- if sid == "permit-overview": return """ FORMAT REQUIREMENTS: - Produce 3 blocks with headings: 1) "Project Context" – 3–5 bullet points (site, proposal, zone). 2) "Applicable Provisions" – bullets grouping TPS SPP, LPS (selected council), and triggered Codes. 3) "Assessment Path" – bullet list of key clauses to assess next. - Cite specific clause numbers ONLY if present in CONTEXT (include Source). """.strip() # ---- Default (no special formatting) ---- return """ FORMAT REQUIREMENTS: - Use concise Markdown with short paragraphs and bullets as needed. - Cite briefly (filename + page) when quoting a control. """.strip() # ---- Ask (GET + POST) ---- class AskBody(BaseModel): # accept multiple keys from different frontends query: Optional[str] = None question: Optional[str] = None q: Optional[str] = None prompt: Optional[str] = None top_k: int = 10 council: Optional[str] = None include_ncc: bool = False include_standards: bool = False source_contains: Optional[str] = None scope: Literal['state_plus_local','local_only','state_only','any'] = 'state_plus_local' section_id: Optional[str] = None # BYOK mode: return context blocks without calling Ollama. # The browser then calls its own LLM with the returned context + prompt. context_only: bool = False def _allowed(p: dict, scope: str, cslug: Optional[str]) -> bool: corp = (p.get("corpus") or "").lower() council = (p.get("council") or "").lower() if scope == "local_only": return corp == "lps" and cslug and council == cslug if scope == "state_only": return corp == "tps" if scope == "state_plus_local": return corp == "tps" or (corp == "lps" and cslug and council == cslug) return True def do_ask( query: str, top_k: int = 10, council: Optional[str] = None, include_ncc: bool = False, include_standards: bool = False, source_contains: Optional[str] = None, scope: str = "state_plus_local", section_id: Optional[str] = None, context_only: bool = False, ): vec = ollama_embed(query) cslug = slug(council) if council else None # Build allowed scopes based on scope param scopes: List[Tuple[str, qmodels.Filter]] = [] if scope in ("state_only", "state_plus_local", "any"): scopes.append(("Tasmanian Planning Scheme (SPP)", filter_tps())) if scope in ("local_only", "state_plus_local", "any") and cslug: scopes.append((f"Local Provisions Schedule — {cslug}", filter_lps(cslug))) if include_ncc: scopes.append(("National Construction Code (NCC)", filter_ncc())) if include_standards: scopes.append(("Australian Standards (AS)", filter_as())) # Apply additional filename filter if requested (AND) scopes = [(name, with_source_contains(flt, source_contains)) for name, flt in scopes] # Allocate limits per scope per_spp = max(3, top_k // 3) if any(n.startswith("Tasmanian Planning Scheme") for n, _ in scopes) else 0 per_lps = max(3, top_k // 3) if any(n.startswith("Local Provisions Schedule") for n, _ in scopes) else 0 remaining = max(1, top_k - (per_spp + per_lps)) extra_scopes = sum(1 for n, _ in scopes if not (n.startswith("Tasmanian Planning Scheme") or n.startswith("Local Provisions Schedule"))) per_extra = max(1, remaining // max(1, extra_scopes)) if extra_scopes else 0 limits: List[int] = [] for name, _ in scopes: if name.startswith("Tasmanian Planning Scheme"): limits.append(per_spp) elif name.startswith("Local Provisions Schedule"): limits.append(per_lps) else: limits.append(per_extra) sections: List[Tuple[str, List[str]]] = [] all_sources: List[dict] = [] for (name, flt), lim in zip(scopes, limits): if lim <= 0: continue hits = q_search(vec, flt, lim) # Guardrail: drop any hit that violates scope/council hits = [h for h in hits if _allowed(h.payload or {}, scope, cslug)] blocks, sources = render_blocks(hits) sections.append((name, blocks)) all_sources.extend(sources) context = combine_context(sections) #format_guide = _section_format_guide(section_id, section_title="(auto)", ctx={}) format_guide = _section_format_guide( section_id, section_title="(auto)", ctx={ "council": council, # from do_ask parameter "planning_zones": [], # populate if you have zone detection } ) prompt = f""" You are a careful planning and building compliance assistant. ALWAYS follow this order of authority when forming an answer: 1) Tasmanian Planning Scheme — State Planning Provisions (SPP). Use as the base rule-set. 2) Local Provisions Schedule (LPS) for the selected council. If an LPS provision modifies or overrides an SPP control, apply the LPS outcome. 3) (Optional) National Construction Code (NCC) — building control (separate to planning). 4) (Optional) Australian Standards — cite when directly relevant. Use ONLY the information in CONTEXT. If a clause/section is visible, quote it briefly and always cite the source file + page. If something is not supported by the provided CONTEXT, say you don't know. CONTEXT: {context} SECTION FORMAT GUIDANCE: {format_guide} QUESTION: {query} Answer: """.strip() # BYOK mode: skip Ollama and return the context + prompt so the # browser can call its own LLM provider (Claude, GPT, Grok, etc.) if context_only: return { "context_only": True, "context": context, "prompt": prompt, "sources": all_sources, # Include the raw section blocks so the browser can inspect them "sections": [ {"heading": name, "blocks": blocks} for name, blocks in sections ] } answer = ollama_chat(prompt) return {"answer": answer, "sources": all_sources} @app.get("/ask") @limiter.limit("20/minute") def ask_get( request: Request, query: str = Query(..., description="User question"), top_k: int = 10, council: Optional[str] = None, include_ncc: bool = False, include_standards: bool = False, source_contains: Optional[str] = None, scope: str = "state_plus_local", section_id: Optional[str] = None, context_only: bool = False, ): _verify_demo_token_if_needed(request) started = time.perf_counter() out = do_ask(query, top_k, council, include_ncc, include_standards, source_contains, scope, section_id, context_only) latency_ms = int((time.perf_counter() - started) * 1000) # Telemetry insert try: ip = request.client.host if request.client else "0.0.0.0" sid = request.headers.get("X-TPR-SID") or request.cookies.get("sid") or "" allow_tps = scope in ("state_only", "state_plus_local") topk = [{"id": f"{s.get('source_file')}#p{s.get('page')}", "score": s.get("score")} for s in (out.get("sources") or [])] with db() as conn: conn.execute(""" INSERT INTO ask_logs (ts, sid, ip_hash, query, normalized, allow_tps, latency_ms, model, ok, topk_json, tokens_in, tokens_out) VALUES (?,?,?,?,?,?,?,?,?,?,?,?) """, ( datetime.utcnow().isoformat(), sid, ip_hash(ip), query, _normalize(query), int(allow_tps), latency_ms, CHAT_MODEL, 1, _json_dumps(topk), 0, 0 )) conn.commit() except Exception as e: # Don't break the request if logging fails print("[telemetry] ask_get insert failed:", e) return out @app.post("/ask") @limiter.limit("20/minute") def ask_post(request: Request, body: AskBody): _verify_demo_token_if_needed(request) qtxt = (body.query or body.question or body.q or body.prompt or "").strip() if not qtxt: raise HTTPException(status_code=422, detail="Missing query/question") started = time.perf_counter() out = do_ask( query=qtxt, top_k=body.top_k, council=body.council, include_ncc=body.include_ncc, include_standards=body.include_standards, source_contains=body.source_contains, scope=body.scope, section_id=body.section_id, context_only=body.context_only, ) latency_ms = int((time.perf_counter() - started) * 1000) # Telemetry insert try: ip = request.client.host if request.client else "0.0.0.0" sid = request.headers.get("X-TPR-SID") or request.cookies.get("sid") or "" allow_tps = body.scope in ("state_only", "state_plus_local") topk = [{"id": f"{s.get('source_file')}#p{s.get('page')}", "score": s.get("score")} for s in (out.get("sources") or [])] with db() as conn: conn.execute(""" INSERT INTO ask_logs (ts, sid, ip_hash, query, normalized, allow_tps, latency_ms, model, ok, topk_json, tokens_in, tokens_out) VALUES (?,?,?,?,?,?,?,?,?,?,?,?) """, ( datetime.utcnow().isoformat(), sid, ip_hash(ip), qtxt, _normalize(qtxt), int(allow_tps), latency_ms, CHAT_MODEL, 1, _json_dumps(topk), 0, 0 )) conn.commit() except Exception as e: print("[telemetry] ask_post insert failed:", e) return out