telemetry.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. # backend/telemetry.py
  2. from fastapi import APIRouter, Request, Depends
  3. from pydantic import BaseModel
  4. import time, hashlib, hmac, os
  5. from datetime import datetime
  6. from typing import Any, Dict, List, Optional
  7. import os, sqlite3, json, hmac, hashlib
  8. router = APIRouter()
  9. DB_PATH = os.getenv("TPR_DB", "/data/telemetry.db")
  10. IP_SECRET = os.getenv("TPR_IP_SECRET", "change-me")
  11. def db():
  12. os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
  13. conn = sqlite3.connect(DB_PATH)
  14. try:
  15. conn.execute("PRAGMA journal_mode=WAL;")
  16. except sqlite3.OperationalError:
  17. # fallback if FS forbids WAL
  18. conn.execute("PRAGMA journal_mode=DELETE;")
  19. return conn
  20. def ip_hash(ip: str) -> str:
  21. dig = hmac.new(IP_SECRET.encode(), ip.encode(), 'sha256').hexdigest()
  22. return dig[:12]
  23. class TelemetryEvent(BaseModel):
  24. type: str
  25. ts: str
  26. sid: str
  27. ua: Optional[str] = None
  28. data: Dict[str, Any] = {}
  29. @router.on_event("startup")
  30. def init():
  31. with db() as conn:
  32. conn.executescript("""
  33. CREATE TABLE IF NOT EXISTS events (
  34. id INTEGER PRIMARY KEY AUTOINCREMENT,
  35. ts TEXT NOT NULL,
  36. type TEXT NOT NULL,
  37. sid TEXT,
  38. ua TEXT,
  39. ip_hash TEXT,
  40. data_json TEXT
  41. );
  42. CREATE TABLE IF NOT EXISTS ask_logs (
  43. id INTEGER PRIMARY KEY AUTOINCREMENT,
  44. ts TEXT NOT NULL,
  45. sid TEXT,
  46. ip_hash TEXT,
  47. query TEXT,
  48. normalized TEXT,
  49. scope TEXT,
  50. allow_tps INTEGER,
  51. latency_ms INTEGER,
  52. model TEXT,
  53. ok INTEGER,
  54. topk_json TEXT,
  55. tokens_in INTEGER,
  56. tokens_out INTEGER,
  57. answer TEXT
  58. );
  59. CREATE TABLE IF NOT EXISTS feedback (
  60. id INTEGER PRIMARY KEY AUTOINCREMENT,
  61. ts TEXT NOT NULL,
  62. sid TEXT,
  63. ip_hash TEXT,
  64. verdict TEXT NOT NULL, -- 'up' or 'down'
  65. query TEXT, -- the question asked
  66. answer TEXT, -- the answer rated
  67. note TEXT, -- optional free-text note (thumbs down)
  68. model TEXT, -- which LLM answered
  69. scope TEXT, -- retrieval scope used
  70. sources_json TEXT -- JSON array of cited sources
  71. );
  72. """)
  73. conn.commit()
  74. @router.post("/telemetry")
  75. async def telemetry(ev: TelemetryEvent, request: Request):
  76. ip = request.client.host if request.client else "0.0.0.0"
  77. with db() as conn:
  78. conn.execute(
  79. "INSERT INTO events (ts,type,sid,ua,ip_hash,data_json) VALUES (?,?,?,?,?,json(?))",
  80. (ev.ts, ev.type, ev.sid, ev.ua, ip_hash(ip), json_dumps(ev.data))
  81. )
  82. conn.commit()
  83. return {"ok": True}
  84. # Wrap your /ask handler to also log authoritative facts:
  85. from fastapi import APIRouter
  86. import json, re
  87. ask_router = APIRouter()
  88. def normalize(q: str) -> str:
  89. return re.sub(r"\s+", " ", q.strip().lower())
  90. def json_dumps(o): return json.dumps(o, ensure_ascii=False, separators=(",",":"))
  91. @ask_router.post("/ask")
  92. async def ask(req: Dict[str, Any], request: Request):
  93. t0 = time.perf_counter()
  94. ip = request.client.host if request.client else "0.0.0.0"
  95. sid = request.headers.get("x-tpr-sid") or request.cookies.get("sid") or ""
  96. query = (req.get("query") or "").strip()
  97. allow_tps = bool(req.get("allow_tps"))
  98. # ... run retrieval/LLM as you already do ...
  99. # mock result placeholders:
  100. model = "localai/llama-3.1"
  101. topk = [{"id": "doc123", "score": 0.83}]
  102. tokens_in, tokens_out = 250, 380
  103. ok = True
  104. answer = {"text": "…", "citations": topk, "model": model, "usage": {"input_tokens": tokens_in, "output_tokens": tokens_out}}
  105. latency = int((time.perf_counter() - t0) * 1000)
  106. with db() as conn:
  107. conn.execute("""
  108. INSERT INTO ask_logs (ts,sid,ip_hash,query,normalized,allow_tps,latency_ms,model,ok,topk_json,tokens_in,tokens_out)
  109. VALUES (?,?,?,?,?,?,?,?,?,?,?,?)
  110. """, (
  111. datetime.utcnow().isoformat(),
  112. sid, ip_hash(ip), query, normalize(query), int(allow_tps),
  113. latency, model, int(ok), json_dumps(topk), tokens_in, tokens_out
  114. ))
  115. conn.commit()
  116. return answer