""" telemetry.py — SQLite telemetry store + /telemetry event endpoint Shared between app.py (ask_logs, feedback inserts) and the /telemetry route (browser-side events: page_view, byok_call, etc.). Database layout --------------- ask_logs — one row per /ask call (query, latency, model, answer, …) feedback — thumbs up/down ratings from users events — generic browser-side events (page views, BYOK calls, errors) The DB file path comes from the TPR_DB env var (default /data/telemetry.db). It is bind-mounted from the host so both the backend and web containers can read from the same file. IP hashing ---------- Client IPs are never stored in plain text. ip_hash() produces a 12-char HMAC-SHA256 prefix keyed on TPR_IP_SECRET. The truncation to 12 chars is intentional — it gives enough entropy to distinguish users for analytics while making reversal impractical even if the hash list is leaked. """ from fastapi import APIRouter, Request from pydantic import BaseModel import os, sqlite3, json, hmac, hashlib from limiter import limiter from datetime import datetime from typing import Any, Dict, Optional router = APIRouter() DB_PATH = os.getenv("TPR_DB", "/data/telemetry.db") IP_SECRET = os.getenv("TPR_IP_SECRET", "change-me") def db(): """ Open a SQLite connection with WAL journal mode for better concurrency. Falls back to DELETE mode on filesystems that don't support WAL (e.g. some NFS or tmpfs mounts) — this can happen in certain Docker volume configs. Returns a context-manager-compatible connection (use `with db() as conn`). """ os.makedirs(os.path.dirname(DB_PATH), exist_ok=True) conn = sqlite3.connect(DB_PATH) try: conn.execute("PRAGMA journal_mode=WAL;") except sqlite3.OperationalError: # fallback if FS forbids WAL conn.execute("PRAGMA journal_mode=DELETE;") return conn def ip_hash(ip: str) -> str: """ Return a 12-char HMAC-SHA256 prefix of the IP address. Keyed on TPR_IP_SECRET — rotate the secret to invalidate all stored hashes. """ dig = hmac.new(IP_SECRET.encode(), ip.encode(), 'sha256').hexdigest() return dig[:12] class TelemetryEvent(BaseModel): type: str # e.g. "page_view", "byok_call", "error" ts: str # ISO timestamp from the client (informational only) sid: str # browser session ID ua: Optional[str] = None data: Dict[str, Any] = {} @router.on_event("startup") def init(): """Create all tables if they don't exist. Safe to run on every startup.""" with db() as conn: conn.executescript(""" CREATE TABLE IF NOT EXISTS events ( id INTEGER PRIMARY KEY AUTOINCREMENT, ts TEXT NOT NULL, type TEXT NOT NULL, sid TEXT, ua TEXT, ip_hash TEXT, data_json TEXT ); CREATE TABLE IF NOT EXISTS ask_logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, ts TEXT NOT NULL, sid TEXT, ip_hash TEXT, query TEXT, normalized TEXT, scope TEXT, allow_tps INTEGER, latency_ms INTEGER, model TEXT, ok INTEGER, topk_json TEXT, tokens_in INTEGER, tokens_out INTEGER, answer TEXT ); CREATE TABLE IF NOT EXISTS feedback ( id INTEGER PRIMARY KEY AUTOINCREMENT, ts TEXT NOT NULL, sid TEXT, ip_hash TEXT, verdict TEXT NOT NULL, -- 'up' or 'down' query TEXT, -- the question asked answer TEXT, -- the answer rated note TEXT, -- optional free-text note (thumbs down) model TEXT, -- which LLM answered scope TEXT, -- retrieval scope used sources_json TEXT -- JSON array of cited sources ); CREATE INDEX IF NOT EXISTS idx_ask_logs_ts ON ask_logs(ts); CREATE INDEX IF NOT EXISTS idx_ask_logs_normalized ON ask_logs(normalized); CREATE INDEX IF NOT EXISTS idx_feedback_ts ON feedback(ts); """) conn.commit() @router.post("/telemetry") @limiter.limit("60/minute") async def telemetry(ev: TelemetryEvent, request: Request): """ Record a generic browser-side event. The server-side timestamp is used for the DB row; ev.ts is stored in data_json only so dashboards can show client-reported timing if needed. """ ip = request.client.host if request.client else "0.0.0.0" with db() as conn: conn.execute( "INSERT INTO events (ts,type,sid,ua,ip_hash,data_json) VALUES (?,?,?,?,?,json(?))", (datetime.utcnow().isoformat(), ev.type, ev.sid, ev.ua, ip_hash(ip), json_dumps(ev.data)) ) conn.commit() return {"ok": True} # --------------------------------------------------------------------------- # Utilities (used by app.py via `from telemetry import …`) # --------------------------------------------------------------------------- def json_dumps(o): return json.dumps(o, ensure_ascii=False, separators=(",",":"))