| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129 |
- # backend/telemetry.py
- from fastapi import APIRouter, Request, Depends
- from pydantic import BaseModel
- import time, hashlib, hmac, os
- from datetime import datetime
- from typing import Any, Dict, List, Optional
- import os, sqlite3, json, hmac, hashlib
- router = APIRouter()
- DB_PATH = os.getenv("TPR_DB", "/data/telemetry.db")
- IP_SECRET = os.getenv("TPR_IP_SECRET", "change-me")
- def db():
- os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
- conn = sqlite3.connect(DB_PATH)
- try:
- conn.execute("PRAGMA journal_mode=WAL;")
- except sqlite3.OperationalError:
- # fallback if FS forbids WAL
- conn.execute("PRAGMA journal_mode=DELETE;")
- return conn
- def ip_hash(ip: str) -> str:
- dig = hmac.new(IP_SECRET.encode(), ip.encode(), 'sha256').hexdigest()
- return dig[:12]
- class TelemetryEvent(BaseModel):
- type: str
- ts: str
- sid: str
- ua: Optional[str] = None
- data: Dict[str, Any] = {}
- @router.on_event("startup")
- def init():
- with db() as conn:
- conn.executescript("""
- CREATE TABLE IF NOT EXISTS events (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- ts TEXT NOT NULL,
- type TEXT NOT NULL,
- sid TEXT,
- ua TEXT,
- ip_hash TEXT,
- data_json TEXT
- );
- CREATE TABLE IF NOT EXISTS ask_logs (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- ts TEXT NOT NULL,
- sid TEXT,
- ip_hash TEXT,
- query TEXT,
- normalized TEXT,
- scope TEXT,
- allow_tps INTEGER,
- latency_ms INTEGER,
- model TEXT,
- ok INTEGER,
- topk_json TEXT,
- tokens_in INTEGER,
- tokens_out INTEGER,
- answer TEXT
- );
- CREATE TABLE IF NOT EXISTS feedback (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- ts TEXT NOT NULL,
- sid TEXT,
- ip_hash TEXT,
- verdict TEXT NOT NULL, -- 'up' or 'down'
- query TEXT, -- the question asked
- answer TEXT, -- the answer rated
- note TEXT, -- optional free-text note (thumbs down)
- model TEXT, -- which LLM answered
- scope TEXT, -- retrieval scope used
- sources_json TEXT -- JSON array of cited sources
- );
- """)
- conn.commit()
- @router.post("/telemetry")
- async def telemetry(ev: TelemetryEvent, request: Request):
- ip = request.client.host if request.client else "0.0.0.0"
- with db() as conn:
- conn.execute(
- "INSERT INTO events (ts,type,sid,ua,ip_hash,data_json) VALUES (?,?,?,?,?,json(?))",
- (ev.ts, ev.type, ev.sid, ev.ua, ip_hash(ip), json_dumps(ev.data))
- )
- conn.commit()
- return {"ok": True}
- # Wrap your /ask handler to also log authoritative facts:
- from fastapi import APIRouter
- import json, re
- ask_router = APIRouter()
- def normalize(q: str) -> str:
- return re.sub(r"\s+", " ", q.strip().lower())
- def json_dumps(o): return json.dumps(o, ensure_ascii=False, separators=(",",":"))
- @ask_router.post("/ask")
- async def ask(req: Dict[str, Any], request: Request):
- t0 = time.perf_counter()
- ip = request.client.host if request.client else "0.0.0.0"
- sid = request.headers.get("x-tpr-sid") or request.cookies.get("sid") or ""
- query = (req.get("query") or "").strip()
- allow_tps = bool(req.get("allow_tps"))
- # ... run retrieval/LLM as you already do ...
- # mock result placeholders:
- model = "localai/llama-3.1"
- topk = [{"id": "doc123", "score": 0.83}]
- tokens_in, tokens_out = 250, 380
- ok = True
- answer = {"text": "…", "citations": topk, "model": model, "usage": {"input_tokens": tokens_in, "output_tokens": tokens_out}}
- latency = int((time.perf_counter() - t0) * 1000)
- with db() as conn:
- conn.execute("""
- INSERT INTO ask_logs (ts,sid,ip_hash,query,normalized,allow_tps,latency_ms,model,ok,topk_json,tokens_in,tokens_out)
- VALUES (?,?,?,?,?,?,?,?,?,?,?,?)
- """, (
- datetime.utcnow().isoformat(),
- sid, ip_hash(ip), query, normalize(query), int(allow_tps),
- latency, model, int(ok), json_dumps(topk), tokens_in, tokens_out
- ))
- conn.commit()
- return answer
|