| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873 |
- """
- app.py — FastAPI backend for tasplanning.report
- RAG pipeline:
- 1. Embed the user query via Ollama (nomic-embed-text)
- 2. Search Qdrant for the closest chunks, split by corpus/scope
- 3. Inject retrieved context into a structured prompt
- 4. Call Ollama (llama3.1:8b) and return the answer + source citations
- BYOK mode (context_only=True): skip step 4 and return the prompt so
- the browser can call its own LLM (Claude, GPT, Grok, local Ollama).
- Restart required after any change to this file:
- docker compose restart backend
- """
- import os, re, hmac, logging
- import json
- import requests
- import time
- logger = logging.getLogger(__name__)
- from typing import Optional, Literal, List, Tuple
- from fastapi import FastAPI, Query, HTTPException, Request
- from fastapi.middleware.cors import CORSMiddleware
- from fastapi.responses import StreamingResponse
- from slowapi.middleware import SlowAPIMiddleware
- from slowapi.errors import RateLimitExceeded
- from limiter import limiter
- from fastapi.responses import JSONResponse
- from pydantic import BaseModel
- from qdrant_client import QdrantClient
- from qdrant_client.http import models as qmodels
- from collections import Counter, defaultdict
- from datetime import datetime
- from telemetry import router as telemetry_router, db, ip_hash
- # ---------------------------------------------------------------------------
- # Environment
- # ---------------------------------------------------------------------------
- OLLAMA_URL = os.getenv("OLLAMA_URL", "http://192.168.8.73:11434")
- QDRANT_URL = os.getenv("QDRANT_URL", "http://localhost:6333")
- OLLAMA_KEEP_ALIVE = os.getenv("OLLAMA_KEEP_ALIVE", "-1") # -1 = keep loaded forever
- COLLECTION = os.getenv("QDRANT_COLLECTION", "planning_docs")
- EMBED_MODEL = os.getenv("EMBED_MODEL", "nomic-embed-text")
- CHAT_MODEL = os.getenv("CHAT_MODEL", "llama3.1:8b-instruct-q4_K_M")
- CORS_ORIGINS = [o.strip() for o in os.getenv("CORS_ORIGINS", "https://tasplanning.report").split(",") if o.strip()]
- # ---------------------------------------------------------------------------
- # Demo token gate (disabled by default)
- # Enable by setting DEMO_REQUIRE_TOKEN=1 and DEMO_TOKEN=<secret> in .env.
- # When enabled, every request to /ask and /admin/* must include:
- # Authorization: Bearer <DEMO_TOKEN>
- # ---------------------------------------------------------------------------
- DEMO_REQUIRE_TOKEN = os.getenv("DEMO_REQUIRE_TOKEN", "0") == "1"
- DEMO_TOKEN = os.getenv("DEMO_TOKEN", "")
- def _verify_demo_token_if_needed(request):
- if not DEMO_REQUIRE_TOKEN:
- return
- auth = request.headers.get("Authorization", "")
- parts = auth.split(" ", 1)
- if len(parts) != 2 or parts[0] != "Bearer":
- raise HTTPException(status_code=401, detail="Unauthorized")
- # compare_digest runs in constant time — prevents timing-based token guessing
- if not hmac.compare_digest(parts[1], DEMO_TOKEN):
- raise HTTPException(status_code=401, detail="Unauthorized")
- # ---------------------------------------------------------------------------
- # FastAPI app + CORS
- # ---------------------------------------------------------------------------
- app = FastAPI()
- # If CORS_ORIGINS is empty (shouldn't happen in production) fall back to a
- # wildcard with the tasplanning.report regex — credentials cannot be used
- # with wildcard origins so allow_credentials is gated on explicit origins.
- _origins = CORS_ORIGINS if CORS_ORIGINS else []
- _allow_all = len(_origins) == 0
- app.add_middleware(
- CORSMiddleware,
- allow_origins=_origins if not _allow_all else ["*"],
- allow_origin_regex=r"https://.*\.tasplanning\.report" if _allow_all else None,
- allow_credentials=not _allow_all, # credentials only when origins are explicit
- allow_methods=["GET", "POST", "OPTIONS"],
- allow_headers=["Content-Type", "Authorization", "X-TPR-SID"],
- expose_headers=["X-TPR-SID"],
- )
- qc = QdrantClient(url=QDRANT_URL)
- app.include_router(telemetry_router)
- # ---------------------------------------------------------------------------
- # Rate limiting (slowapi — in-memory, per IP)
- # Shared limiter instance lives in limiter.py to avoid circular imports with
- # telemetry.py, which also needs to decorate its own endpoints.
- # ---------------------------------------------------------------------------
- app.state.limiter = limiter # type: ignore
- app.add_middleware(SlowAPIMiddleware)
- @app.exception_handler(RateLimitExceeded)
- def ratelimit_handler(request, exc):
- return JSONResponse(status_code=429, content={"error":"rate_limited","detail":"Too many requests"})
- # ---------------------------------------------------------------------------
- # Feedback endpoint
- # Stores thumbs-up/down ratings alongside the query + answer for prompt tuning.
- # Fields are truncated before insert to keep the SQLite row size reasonable.
- # ---------------------------------------------------------------------------
- class FeedbackBody(BaseModel):
- verdict: str # "up" or "down"
- query: Optional[str] = None # the question that was asked
- answer: Optional[str] = None # the answer that was rated
- note: Optional[str] = None # optional free-text from thumbs-down
- sid: Optional[str] = None # session id from browser
- model: Optional[str] = None # which model answered
- scope: Optional[str] = None # which scope was used
- sources: Optional[list] = None # which sources were cited
- @app.post("/feedback")
- @limiter.limit("60/minute")
- def feedback(request: Request, body: FeedbackBody):
- if body.verdict not in ("up", "down"):
- raise HTTPException(status_code=422, detail="verdict must be 'up' or 'down'")
- ip = request.client.host if request.client else "0.0.0.0"
- sid = body.sid or request.headers.get("X-TPR-SID") or ""
- try:
- with db() as conn:
- conn.execute("""
- INSERT INTO feedback
- (ts, sid, ip_hash, verdict, query, answer, note, model, scope, sources_json)
- VALUES (?,?,?,?,?,?,?,?,?,?)
- """, (
- datetime.utcnow().isoformat(),
- sid, ip_hash(ip), body.verdict,
- _trunc(body.query or "", 2000, "feedback.query"),
- _trunc(body.answer or "", 8000, "feedback.answer"),
- _trunc(body.note or "", 1000, "feedback.note"),
- body.model or CHAT_MODEL,
- body.scope or "",
- _json_dumps(body.sources or []),
- ))
- conn.commit()
- except Exception as e:
- logger.exception("[feedback] telemetry insert failed")
- # Still return ok — don't surface DB errors to users
- return {"ok": True}
- # ---------------------------------------------------------------------------
- # Ollama helpers
- # ---------------------------------------------------------------------------
- def slug(s: Optional[str]) -> Optional[str]:
- """Normalise a council name to a URL-safe slug for Qdrant filter matching."""
- if not s:
- return None
- return re.sub(r'[^a-z0-9]+', '-', s.strip().lower()).strip('-') or None
- def ollama_embed(text: str) -> List[float]:
- """Call the Ollama embeddings API and return the float vector."""
- try:
- r = requests.post(
- f"{OLLAMA_URL}/api/embeddings",
- json={"model": EMBED_MODEL, "prompt": text},
- timeout=60
- )
- r.raise_for_status()
- except requests.Timeout:
- logger.error("Ollama embed timeout after 60s (url=%s model=%s)", OLLAMA_URL, EMBED_MODEL)
- raise HTTPException(status_code=503, detail="Embedding service timed out")
- except requests.ConnectionError:
- logger.error("Ollama embed connection error (url=%s)", OLLAMA_URL)
- raise HTTPException(status_code=503, detail="Embedding service unavailable")
- except requests.HTTPError as e:
- logger.error("Ollama embed HTTP %s: %s", e.response.status_code, e.response.text[:200])
- raise HTTPException(status_code=502, detail="Embedding service error")
- data = r.json()
- if "embedding" not in data:
- logger.error("Ollama embed unexpected response: %s", str(data)[:200])
- raise HTTPException(status_code=502, detail="Embedding service returned unexpected response")
- return data["embedding"]
- def ollama_chat(prompt: str) -> str:
- """
- Send a prompt to Ollama and return the generated text.
- keep_alive MUST be a top-level key — putting it inside options{} causes
- Ollama to silently ignore it and unload the model between requests.
- num_ctx is fixed at 6144. Changing it between requests forces Ollama to
- reload the model (KV cache is resized), adding ~3–5 s of cold-start latency.
- """
- try:
- r = requests.post(
- f"{OLLAMA_URL}/api/generate",
- json={
- "model": CHAT_MODEL,
- "prompt": prompt,
- "stream": False,
- "options": {
- "num_ctx": 6144, # was 8192,
- "num_predict": 512,
- "temperature": 0.2,
- "top_p": 0.9,
- "repeat_penalty": 1.1,
- },
- "keep_alive": -1, # keep model resident in VRAM between requests
- },
- timeout=180
- )
- r.raise_for_status()
- except requests.Timeout:
- logger.error("Ollama chat timeout after 180s (url=%s model=%s)", OLLAMA_URL, CHAT_MODEL)
- raise HTTPException(status_code=503, detail="LLM service timed out")
- except requests.ConnectionError:
- logger.error("Ollama chat connection error (url=%s)", OLLAMA_URL)
- raise HTTPException(status_code=503, detail="LLM service unavailable")
- except requests.HTTPError as e:
- logger.error("Ollama chat HTTP %s: %s", e.response.status_code, e.response.text[:200])
- raise HTTPException(status_code=502, detail="LLM service error")
- data = r.json()
- return data.get("response", "").strip()
- def _scroll_points(collection: str, qfilter=None, include_vector: bool=False, page_size: int=200):
- """
- Page through all points in a collection using Qdrant's scroll API.
- Used by /admin/export which needs vectors and supports arbitrary collections.
- For payload-only scans over the default collection use _scan_points instead.
- """
- offset = None
- while True:
- points, offset = qc.scroll(
- collection_name=collection,
- limit=page_size,
- with_payload=True,
- with_vectors=include_vector,
- offset=offset,
- scroll_filter=qfilter
- )
- if not points:
- break
- for pt in points:
- yield pt
- if offset is None:
- break
- # ---------------------------------------------------------------------------
- # Health + utility endpoints
- # ---------------------------------------------------------------------------
- @app.get("/readyz")
- def readyz():
- return {"ok": True}
- def _normalize(q: Optional[str]) -> str:
- """Collapse whitespace and lowercase — used for dedup tracking in ask_logs."""
- return re.sub(r"\s+", " ", (q or "").strip().lower())
- def _json_dumps(o) -> str:
- return json.dumps(o, ensure_ascii=False, separators=(",",":"))
- def _trunc(s: str, limit: int, field: str) -> str:
- """Truncate `s` to `limit` chars. Logs a warning if truncation occurs so
- data loss is visible in docker logs rather than silently discarded."""
- if len(s) > limit:
- logger.warning("telemetry field %r truncated from %d to %d chars", field, len(s), limit)
- return s[:limit]
- return s
- # ---- Councils list (prefers payload 'council', falls back to filename token) ----
- @app.get("/councils")
- def councils():
- councils = set()
- offset = None
- # sample up to ~5k points (50 * 100)
- for _ in range(50):
- points, offset = qc.scroll(
- collection_name=COLLECTION,
- limit=100,
- with_payload=True,
- offset=offset
- )
- for pt in points:
- p = pt.payload or {}
- token = (p.get("council") or "").strip().lower()
- if not token:
- sf = (p.get("source_file") or "").lower()
- if sf:
- token = sf.replace(".pdf", "").split("_")[0].split("-")[0]
- if token:
- councils.add(token)
- if offset is None:
- break
- return sorted(councils)
- # ---------------------------------------------------------------------------
- # Qdrant filter builders
- # _mv — exact MatchValue (keyword field, case-sensitive)
- # _mt — MatchText (full-text / substring match)
- # ---------------------------------------------------------------------------
- def _mv(key: str, value: str) -> qmodels.FieldCondition:
- return qmodels.FieldCondition(key=key, match=qmodels.MatchValue(value=value))
- def _mt(key: str, text: str) -> qmodels.FieldCondition:
- return qmodels.FieldCondition(key=key, match=qmodels.MatchText(text=text))
- def filter_tps() -> qmodels.Filter:
- """TPS only, exact match on corpus."""
- return qmodels.Filter(must=[_mv("corpus", "tps")])
- def filter_lps(council: str) -> qmodels.Filter:
- """
- LPS for a specific council (slug), exact match on both fields.
- """
- cslug = slug(council) or council.lower()
- return qmodels.Filter(must=[_mv("corpus", "lps"), _mv("council", cslug)])
- def filter_ncc() -> qmodels.Filter:
- return qmodels.Filter(must=[_mv("corpus", "ncc")])
- def filter_as() -> qmodels.Filter:
- return qmodels.Filter(must=[_mv("corpus", "as")])
- def with_source_contains(flt: Optional[qmodels.Filter], source_contains: Optional[str]) -> qmodels.Filter:
- """AND an additional source_file substring condition onto an existing filter."""
- if not source_contains:
- return flt
- add = _mt("source_file", source_contains)
- if flt:
- # preserve existing must/should/must_not and AND the filename condition
- must = list(getattr(flt, "must", []) or [])
- must.append(add)
- return qmodels.Filter(
- must=must,
- should=getattr(flt, "should", None),
- must_not=getattr(flt, "must_not", None),
- )
- return qmodels.Filter(must=[add])
- def q_search(vec: List[float], flt: Optional[qmodels.Filter], limit: int):
- """ANN vector search — returns up to `limit` scored points."""
- results = qc.query_points(
- collection_name=COLLECTION,
- query=vec,
- limit=max(1, limit),
- query_filter=flt,
- with_payload=True,
- )
- return results.points
- def render_blocks(hits) -> Tuple[List[str], List[dict]]:
- """Convert raw Qdrant hits into plain-text context blocks and source dicts."""
- blocks, sources = [], []
- for h in hits:
- p = h.payload or {}
- src = f"{p.get('source_file')} (p.{p.get('page')} chunk {p.get('chunk_index')})"
- snippet = p.get("text", "")
- blocks.append(f"Source: {src}\nText: {snippet}")
- sources.append({
- "source_file": p.get("source_file"),
- "page": p.get("page"),
- "chunk_index": p.get("chunk_index"),
- "score": h.score
- })
- return blocks, sources
- def combine_context(sections: List[Tuple[str, List[str]]]) -> str:
- """Join all section blocks into a single context string for the prompt."""
- out = []
- for heading, blocks in sections:
- if not blocks:
- continue
- out.append(f"=== {heading} ===")
- out.extend(blocks)
- return "\n\n".join(out) if out else "No context found."
- def _scan_points(qfilter: Optional[qmodels.Filter] = None, max_pages: int = 10000, page_size: int = 200):
- """
- Iterate through ALL points in the default collection (payload only, no vectors).
- Used by /admin/stats, /admin/files, /admin/sample.
- For the current dataset size this is fine. If the collection grows very large,
- switch to a pre-aggregated summary stored in a separate Qdrant collection or
- a background job that writes counts to SQLite.
- """
- offset = None
- pages = 0
- while pages < max_pages:
- points, offset = qc.scroll(
- collection_name=COLLECTION,
- limit=page_size,
- with_payload=True,
- offset=offset,
- scroll_filter=qfilter
- )
- if not points:
- break
- for pt in points:
- yield pt
- pages += 1
- if offset is None:
- break
- # ---------------------------------------------------------------------------
- # Admin endpoints — require DEMO_TOKEN when DEMO_REQUIRE_TOKEN=1
- # All endpoints are rate-limited; /export is tighter (streams full DB).
- # ---------------------------------------------------------------------------
- @app.get("/admin/stats")
- @limiter.limit("30/minute")
- def admin_stats(request: Request, council: Optional[str] = None, corpus: Optional[str] = None):
- _verify_demo_token_if_needed(request)
- must = []
- if council:
- must.append(qmodels.FieldCondition(key="council", match=qmodels.MatchText(text=council.lower())))
- if corpus:
- must.append(qmodels.FieldCondition(key="corpus", match=qmodels.MatchText(text=corpus.lower())))
- qfilter = qmodels.Filter(must=must) if must else None
- corp = Counter()
- councils = Counter()
- total = 0
- for pt in _scan_points(qfilter=qfilter):
- p = pt.payload or {}
- corp[(p.get("corpus") or "").lower()] += 1
- if p.get("council"):
- councils[(p.get("council") or "").lower()] += 1
- total += 1
- return {
- "collection": COLLECTION,
- "total_points": total,
- "by_corpus": dict(corp),
- "by_council": dict(councils),
- "note": "Counts are points (chunks), not documents.",
- }
- @app.get("/admin/files")
- @limiter.limit("30/minute")
- def admin_files(request: Request, council: Optional[str] = None, corpus: Optional[str] = None, contains: Optional[str] = None, limit: int = 200):
- _verify_demo_token_if_needed(request)
- must = []
- if council:
- must.append(qmodels.FieldCondition(key="council", match=qmodels.MatchText(text=council.lower())))
- if corpus:
- must.append(qmodels.FieldCondition(key="corpus", match=qmodels.MatchText(text=corpus.lower())))
- if contains:
- must.append(qmodels.FieldCondition(key="source_file", match=qmodels.MatchText(text=contains)))
- qfilter = qmodels.Filter(must=must) if must else None
- files = defaultdict(lambda: {"points": 0, "corpus": None, "council": None, "pages": set()})
- for pt in _scan_points(qfilter=qfilter):
- p = pt.payload or {}
- f = (p.get("source_file") or "").strip()
- if not f:
- continue
- rec = files[f]
- rec["points"] += 1
- rec["corpus"] = rec["corpus"] or p.get("corpus")
- rec["council"] = rec["council"] or p.get("council")
- if p.get("page") is not None:
- rec["pages"].add(p["page"])
- # shape for output
- out = []
- for f, rec in files.items():
- out.append({
- "source_file": f,
- "corpus": rec["corpus"],
- "council": rec["council"],
- "points": rec["points"],
- "page_count_est": len(rec["pages"]) if rec["pages"] else None,
- })
- # sort by points desc, limit
- out.sort(key=lambda x: x["points"], reverse=True)
- return out[:max(1, limit)]
- @app.get("/admin/sample")
- @limiter.limit("30/minute")
- def admin_sample(request: Request, council: Optional[str] = None, corpus: Optional[str] = None, n: int = 5):
- _verify_demo_token_if_needed(request)
- must = []
- if council:
- must.append(qmodels.FieldCondition(key="council", match=qmodels.MatchText(text=council.lower())))
- if corpus:
- must.append(qmodels.FieldCondition(key="corpus", match=qmodels.MatchText(text=corpus.lower())))
- qfilter = qmodels.Filter(must=must) if must else None
- samples = []
- for pt in _scan_points(qfilter=qfilter):
- p = pt.payload or {}
- txt = (p.get("text") or "").strip()
- if not txt:
- continue
- samples.append({
- "source_file": p.get("source_file"),
- "corpus": p.get("corpus"),
- "council": p.get("council"),
- "page": p.get("page"),
- "chunk_index": p.get("chunk_index"),
- "preview": (txt[:400] + "…") if len(txt) > 400 else txt
- })
- if len(samples) >= max(1, n):
- break
- return samples
- @app.get("/admin/export")
- @limiter.limit("5/minute")
- def admin_export(
- request: Request,
- collection: str = COLLECTION,
- council: Optional[str] = None,
- corpus: Optional[str] = None,
- source_contains: Optional[str] = None,
- include_vector: bool = False,
- limit: Optional[int] = None
- ):
- _verify_demo_token_if_needed(request)
- must = []
- if council:
- must.append(qmodels.FieldCondition(key="council", match=qmodels.MatchText(text=council.lower())))
- if corpus:
- must.append(qmodels.FieldCondition(key="corpus", match=qmodels.MatchText(text=corpus.lower())))
- if source_contains:
- must.append(qmodels.FieldCondition(key="source_file", match=qmodels.MatchText(text=source_contains)))
- qfilter = qmodels.Filter(must=must) if must else None
- def gen():
- count = 0
- for pt in _scroll_points(collection, qfilter=qfilter, include_vector=include_vector):
- obj = {
- "id": str(getattr(pt, "id", None)),
- "payload": pt.payload or {},
- }
- if include_vector:
- obj["vector"] = pt.vector
- yield json.dumps(obj, ensure_ascii=False) + "\n"
- count += 1
- if limit and count >= limit:
- break
- filename = f'{collection}-{corpus or "all"}-{council or "all"}.ndjson'
- headers = {"Content-Disposition": f'attachment; filename="{filename}"'}
- return StreamingResponse(gen(), media_type="application/x-ndjson", headers=headers)
- # ---------------------------------------------------------------------------
- # Section-specific format guides
- # Each section_id maps to a tightly-scoped formatting instruction injected at
- # the end of the prompt. This steers the LLM output for structured report
- # sections without changing the core RAG prompt.
- # ---------------------------------------------------------------------------
- def _section_format_guide(section_id: Optional[str], section_title: str, ctx: dict) -> str:
- """
- Return strict, section-specific formatting guidance for the LLM.
- Keep these short, prescriptive, and impossible to ignore.
- """
- sid = (section_id or "").lower()
- # Utility bits from context
- zones = ctx.get("planning_zones") or []
- zone_label = ", ".join(zones) if zones else "the applicable zone"
- council_label = ctx.get("council") or ""
- # ---- ZONING (tables of clauses like your sample) ----
- if sid in {"zoning", "zoning-41", "zoning-42", "zoning-43", "zoning-44", "zoning-441", "zoning-442"}:
- return f"""
- FORMAT REQUIREMENTS (MANDATORY):
- - Produce a concise preface (≤ 2 sentences) naming {zone_label}.
- - Then include a Markdown table listing EACH visible clause found in CONTEXT that applies to the zone or LPS for **{council_label or 'the selected council'}**.
- - One row per subclause. If an A/P pair exists (e.g., A1 / P1), include both in the same row.
- - Columns (exact):
- | Clause | Topic | Acceptable Solution (A) | Performance Criteria (P) | Assessment | Source |
- - "Clause": the clause number (e.g., "12.3.1 A1" or "DOR-S1.7.1").
- - "Topic": short label extracted from the clause heading.
- - "Acceptable Solution (A)" and "Performance Criteria (P)": quote briefly—no more than 1–2 lines each.
- - "Assessment": state clearly whether the proposal meets A, or relies on P. If unknown from CONTEXT, write "TBC".
- - "Source": filename + page (from CONTEXT).
- - Only include clauses actually present in CONTEXT; NEVER invent clause numbers or text.
- - After the table, add a one-paragraph summary noting any items assessed as TBC or non-compliant.
- """.strip()
- # ---- Codes overview list/table (optional future) ----
- if sid.startswith("code-"):
- return """
- FORMAT REQUIREMENTS:
- - Start with one sentence stating which Code and why it is triggered.
- - Then provide a short checklist or table of the relevant sub-clauses (A vs P), with Source for each.
- - Keep to 150–250 words + table.
- """.strip()
- # ---- Permit Overview (concise triggers) ----
- if sid == "permit-overview":
- return """
- FORMAT REQUIREMENTS:
- - Produce 3 blocks with headings:
- 1) "Project Context" – 3–5 bullet points (site, proposal, zone).
- 2) "Applicable Provisions" – bullets grouping TPS SPP, LPS (selected council), and triggered Codes.
- 3) "Assessment Path" – bullet list of key clauses to assess next.
- - Cite specific clause numbers ONLY if present in CONTEXT (include Source).
- """.strip()
- # ---- Default (no special formatting) ----
- return """
- FORMAT REQUIREMENTS:
- - Use concise Markdown with short paragraphs and bullets as needed.
- - Cite briefly (filename + page) when quoting a control.
- """.strip()
- # ---------------------------------------------------------------------------
- # /ask — core RAG endpoint
- # ---------------------------------------------------------------------------
- class AskBody(BaseModel):
- # accept multiple keys from different frontends
- query: Optional[str] = None
- question: Optional[str] = None
- q: Optional[str] = None
- prompt: Optional[str] = None
- top_k: int = 10
- council: Optional[str] = None
- include_ncc: bool = False
- include_standards: bool = False
- source_contains: Optional[str] = None
- scope: Literal['state_plus_local','local_only','state_only','any'] = 'state_plus_local'
- section_id: Optional[str] = None
- # BYOK mode: return context blocks without calling Ollama.
- # The browser then calls its own LLM with the returned context + prompt.
- context_only: bool = False
- def _allowed(p: dict, scope: str, cslug: Optional[str]) -> bool:
- """
- Secondary guardrail applied after the Qdrant vector search.
- Qdrant filters are the primary gate; this catches any edge-case leakage
- (e.g. MatchText returning a partial match across corpora).
- """
- corp = (p.get("corpus") or "").lower()
- council = (p.get("council") or "").lower()
- if scope == "local_only":
- return corp == "lps" and cslug and council == cslug
- if scope == "state_only":
- return corp == "tps"
- if scope == "state_plus_local":
- return corp == "tps" or (corp == "lps" and cslug and council == cslug)
- return True
- def do_ask(
- query: str,
- top_k: int = 10,
- council: Optional[str] = None,
- include_ncc: bool = False,
- include_standards: bool = False,
- source_contains: Optional[str] = None,
- scope: str = "state_plus_local",
- section_id: Optional[str] = None,
- context_only: bool = False,
- ):
- top_k = max(1, min(top_k, 30)) # clamp: at least 1, at most 30
- vec = ollama_embed(query)
- cslug = slug(council) if council else None
- # Build the list of (section_heading, qdrant_filter) pairs based on scope.
- # Each pair is searched independently so we can control the chunk budget
- # per corpus — avoids TPS drowning out LPS results or vice versa.
- scopes: List[Tuple[str, qmodels.Filter]] = []
- if scope in ("state_only", "state_plus_local", "any"):
- scopes.append(("Tasmanian Planning Scheme (SPP)", filter_tps()))
- if scope in ("local_only", "state_plus_local", "any") and cslug:
- scopes.append((f"Local Provisions Schedule — {cslug}", filter_lps(cslug)))
- if include_ncc:
- scopes.append(("National Construction Code (NCC)", filter_ncc()))
- if include_standards:
- scopes.append(("Australian Standards (AS)", filter_as()))
- # Apply additional filename filter if requested (AND)
- scopes = [(name, with_source_contains(flt, source_contains)) for name, flt in scopes]
- # Divide top_k across scopes: SPP and LPS each get ~1/3, the remainder
- # is split evenly across any extra corpora (NCC, AS).
- per_spp = max(3, top_k // 3) if any(n.startswith("Tasmanian Planning Scheme") for n, _ in scopes) else 0
- per_lps = max(3, top_k // 3) if any(n.startswith("Local Provisions Schedule") for n, _ in scopes) else 0
- remaining = max(1, top_k - (per_spp + per_lps))
- extra_scopes = sum(1 for n, _ in scopes if not (n.startswith("Tasmanian Planning Scheme") or n.startswith("Local Provisions Schedule")))
- per_extra = max(1, remaining // max(1, extra_scopes)) if extra_scopes else 0
- limits: List[int] = []
- for name, _ in scopes:
- if name.startswith("Tasmanian Planning Scheme"):
- limits.append(per_spp)
- elif name.startswith("Local Provisions Schedule"):
- limits.append(per_lps)
- else:
- limits.append(per_extra)
- sections: List[Tuple[str, List[str]]] = []
- all_sources: List[dict] = []
- for (name, flt), lim in zip(scopes, limits):
- if lim <= 0:
- continue
- hits = q_search(vec, flt, lim)
- # Guardrail: drop any hit that violates scope/council
- hits = [h for h in hits if _allowed(h.payload or {}, scope, cslug)]
- blocks, sources = render_blocks(hits)
- sections.append((name, blocks))
- all_sources.extend(sources)
- context = combine_context(sections)
- format_guide = _section_format_guide(
- section_id,
- section_title="(auto)",
- ctx={
- "council": council, # from do_ask parameter
- "planning_zones": [], # populate if you have zone detection
- }
- )
- prompt = f"""
- You are an expert Tasmanian planning and building compliance assistant with deep knowledge of the Tasmanian Planning Scheme structure.
- ## AUTHORITY ORDER — always apply in this sequence:
- 1. State Planning Provisions (SPP) — the statewide baseline. Cite clause numbers exactly.
- 2. Local Provisions Schedule (LPS) for the selected council — overrides SPP where it differs.
- 3. National Construction Code (NCC) — building controls only, keep separate from planning.
- 4. Australian Standards — only when directly referenced by a clause in CONTEXT.
- ## STRICT RULES:
- - Use ONLY information present in CONTEXT below. Never invent clause numbers, standards, or measurements.
- - If CONTEXT does not contain enough information to answer, say: "The provided context does not cover this — check the TPSO viewer directly at tpso.planning.tas.gov.au"
- - Every specific standard or requirement you state MUST include its source: (filename, p.N)
- - Quote clause text briefly (1–2 lines max) then explain in plain English.
- - Distinguish clearly between Acceptable Solutions (A) and Performance Criteria (P).
- ## OUTPUT FORMAT:
- - Use Markdown: ## for main headings, ### for sub-headings, **bold** for clause numbers.
- - For setbacks, parking rates, or multiple standards: use a Markdown table with columns: Clause | Requirement | A or P | Source
- - End every response with a ## Sources section listing each cited document and page.
- - Keep answers concise but complete — do not pad or repeat information.
- - Professional planning language; avoid informal phrasing.
- ## CONTEXT (retrieved from Tasmanian Planning Scheme documents):
- {context}
- {format_guide}
- ## QUESTION:
- {query}
- ## ANSWER:
- """.strip()
- # BYOK mode: skip Ollama and return the context + prompt so the
- # browser can call its own LLM provider (Claude, GPT, Grok, etc.)
- if context_only:
- return {
- "context_only": True,
- "context": context,
- "prompt": prompt,
- "sources": all_sources,
- # Include the raw section blocks so the browser can inspect them
- "sections": [
- {"heading": name, "blocks": blocks}
- for name, blocks in sections
- ]
- }
- answer = ollama_chat(prompt)
- return {"answer": answer, "sources": all_sources}
- @app.get("/ask")
- @limiter.limit("20/minute")
- def ask_get(
- request: Request,
- query: str = Query(..., description="User question"),
- top_k: int = 10,
- council: Optional[str] = None,
- include_ncc: bool = False,
- include_standards: bool = False,
- source_contains: Optional[str] = None,
- scope: str = "state_plus_local",
- section_id: Optional[str] = None,
- context_only: bool = False,
- ):
- _verify_demo_token_if_needed(request)
- started = time.perf_counter()
- out = do_ask(query, top_k, council, include_ncc, include_standards, source_contains, scope, section_id, context_only)
- latency_ms = int((time.perf_counter() - started) * 1000)
- # Telemetry insert — never allowed to break the response
- try:
- ip = request.client.host if request.client else "0.0.0.0"
- sid = request.headers.get("X-TPR-SID") or request.cookies.get("sid") or ""
- allow_tps = scope in ("state_only", "state_plus_local")
- topk = [{"id": f"{s.get('source_file')}#p{s.get('page')}", "score": s.get("score")} for s in (out.get("sources") or [])]
- with db() as conn:
- conn.execute("""
- INSERT INTO ask_logs
- (ts, sid, ip_hash, query, normalized, scope, allow_tps, latency_ms,
- model, ok, topk_json, tokens_in, tokens_out, answer)
- VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)
- """, (
- datetime.utcnow().isoformat(),
- sid, ip_hash(ip), query, _normalize(query),
- scope, int(allow_tps),
- latency_ms, CHAT_MODEL, 1, _json_dumps(topk), 0, 0,
- _trunc(out.get("answer") or "", 8000, "ask_get.answer"),
- ))
- conn.commit()
- except Exception as e:
- logger.exception("[telemetry] ask_get insert failed")
- return out
- @app.post("/ask")
- @limiter.limit("20/minute")
- def ask_post(request: Request, body: AskBody):
- _verify_demo_token_if_needed(request)
- qtxt = (body.query or body.question or body.q or body.prompt or "").strip()
- if not qtxt:
- raise HTTPException(status_code=422, detail="Missing query/question")
- started = time.perf_counter()
- out = do_ask(
- query=qtxt,
- top_k=body.top_k,
- council=body.council,
- include_ncc=body.include_ncc,
- include_standards=body.include_standards,
- source_contains=body.source_contains,
- scope=body.scope,
- section_id=body.section_id,
- context_only=body.context_only,
- )
- latency_ms = int((time.perf_counter() - started) * 1000)
- # Telemetry insert — never allowed to break the response
- try:
- ip = request.client.host if request.client else "0.0.0.0"
- sid = request.headers.get("X-TPR-SID") or request.cookies.get("sid") or ""
- allow_tps = body.scope in ("state_only", "state_plus_local")
- topk = [{"id": f"{s.get('source_file')}#p{s.get('page')}", "score": s.get("score")} for s in (out.get("sources") or [])]
- with db() as conn:
- conn.execute("""
- INSERT INTO ask_logs
- (ts, sid, ip_hash, query, normalized, scope, allow_tps, latency_ms,
- model, ok, topk_json, tokens_in, tokens_out, answer)
- VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)
- """, (
- datetime.utcnow().isoformat(),
- sid, ip_hash(ip), qtxt, _normalize(qtxt),
- body.scope, int(allow_tps),
- latency_ms, CHAT_MODEL, 1, _json_dumps(topk), 0, 0,
- _trunc(out.get("answer") or "", 8000, "ask_post.answer"),
- ))
- conn.commit()
- except Exception as e:
- logger.exception("[telemetry] ask_post insert failed")
- return out
|