| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142 |
- """
- telemetry.py — SQLite telemetry store + /telemetry event endpoint
- Shared between app.py (ask_logs, feedback inserts) and the /telemetry
- route (browser-side events: page_view, byok_call, etc.).
- Database layout
- ---------------
- ask_logs — one row per /ask call (query, latency, model, answer, …)
- feedback — thumbs up/down ratings from users
- events — generic browser-side events (page views, BYOK calls, errors)
- The DB file path comes from the TPR_DB env var (default /data/telemetry.db).
- It is bind-mounted from the host so both the backend and web containers can
- read from the same file.
- IP hashing
- ----------
- Client IPs are never stored in plain text. ip_hash() produces a 12-char
- HMAC-SHA256 prefix keyed on TPR_IP_SECRET. The truncation to 12 chars is
- intentional — it gives enough entropy to distinguish users for analytics
- while making reversal impractical even if the hash list is leaked.
- """
- from fastapi import APIRouter, Request
- from pydantic import BaseModel
- import os, sqlite3, json, hmac, hashlib
- from limiter import limiter
- from datetime import datetime
- from typing import Any, Dict, Optional
- router = APIRouter()
- DB_PATH = os.getenv("TPR_DB", "/data/telemetry.db")
- IP_SECRET = os.getenv("TPR_IP_SECRET", "change-me")
- def db():
- """
- Open a SQLite connection with WAL journal mode for better concurrency.
- Falls back to DELETE mode on filesystems that don't support WAL (e.g. some
- NFS or tmpfs mounts) — this can happen in certain Docker volume configs.
- Returns a context-manager-compatible connection (use `with db() as conn`).
- """
- os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
- conn = sqlite3.connect(DB_PATH)
- try:
- conn.execute("PRAGMA journal_mode=WAL;")
- except sqlite3.OperationalError:
- # fallback if FS forbids WAL
- conn.execute("PRAGMA journal_mode=DELETE;")
- return conn
- def ip_hash(ip: str) -> str:
- """
- Return a 12-char HMAC-SHA256 prefix of the IP address.
- Keyed on TPR_IP_SECRET — rotate the secret to invalidate all stored hashes.
- """
- dig = hmac.new(IP_SECRET.encode(), ip.encode(), 'sha256').hexdigest()
- return dig[:12]
- class TelemetryEvent(BaseModel):
- type: str # e.g. "page_view", "byok_call", "error"
- ts: str # ISO timestamp from the client (informational only)
- sid: str # browser session ID
- ua: Optional[str] = None
- data: Dict[str, Any] = {}
- @router.on_event("startup")
- def init():
- """Create all tables if they don't exist. Safe to run on every startup."""
- with db() as conn:
- conn.executescript("""
- CREATE TABLE IF NOT EXISTS events (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- ts TEXT NOT NULL,
- type TEXT NOT NULL,
- sid TEXT,
- ua TEXT,
- ip_hash TEXT,
- data_json TEXT
- );
- CREATE TABLE IF NOT EXISTS ask_logs (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- ts TEXT NOT NULL,
- sid TEXT,
- ip_hash TEXT,
- query TEXT,
- normalized TEXT,
- scope TEXT,
- allow_tps INTEGER,
- latency_ms INTEGER,
- model TEXT,
- ok INTEGER,
- topk_json TEXT,
- tokens_in INTEGER,
- tokens_out INTEGER,
- answer TEXT
- );
- CREATE TABLE IF NOT EXISTS feedback (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- ts TEXT NOT NULL,
- sid TEXT,
- ip_hash TEXT,
- verdict TEXT NOT NULL, -- 'up' or 'down'
- query TEXT, -- the question asked
- answer TEXT, -- the answer rated
- note TEXT, -- optional free-text note (thumbs down)
- model TEXT, -- which LLM answered
- scope TEXT, -- retrieval scope used
- sources_json TEXT -- JSON array of cited sources
- );
- CREATE INDEX IF NOT EXISTS idx_ask_logs_ts ON ask_logs(ts);
- CREATE INDEX IF NOT EXISTS idx_ask_logs_normalized ON ask_logs(normalized);
- CREATE INDEX IF NOT EXISTS idx_feedback_ts ON feedback(ts);
- """)
- conn.commit()
- @router.post("/telemetry")
- @limiter.limit("60/minute")
- async def telemetry(ev: TelemetryEvent, request: Request):
- """
- Record a generic browser-side event.
- The server-side timestamp is used for the DB row; ev.ts is stored in
- data_json only so dashboards can show client-reported timing if needed.
- """
- ip = request.client.host if request.client else "0.0.0.0"
- with db() as conn:
- conn.execute(
- "INSERT INTO events (ts,type,sid,ua,ip_hash,data_json) VALUES (?,?,?,?,?,json(?))",
- (datetime.utcnow().isoformat(), ev.type, ev.sid, ev.ua, ip_hash(ip), json_dumps(ev.data))
- )
- conn.commit()
- return {"ok": True}
- # ---------------------------------------------------------------------------
- # Utilities (used by app.py via `from telemetry import …`)
- # ---------------------------------------------------------------------------
- def json_dumps(o): return json.dumps(o, ensure_ascii=False, separators=(",",":"))
|