telemetry.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. """
  2. telemetry.py — SQLite telemetry store + /telemetry event endpoint
  3. Shared between app.py (ask_logs, feedback inserts) and the /telemetry
  4. route (browser-side events: page_view, byok_call, etc.).
  5. Database layout
  6. ---------------
  7. ask_logs — one row per /ask call (query, latency, model, answer, …)
  8. feedback — thumbs up/down ratings from users
  9. events — generic browser-side events (page views, BYOK calls, errors)
  10. The DB file path comes from the TPR_DB env var (default /data/telemetry.db).
  11. It is bind-mounted from the host so both the backend and web containers can
  12. read from the same file.
  13. IP hashing
  14. ----------
  15. Client IPs are never stored in plain text. ip_hash() produces a 12-char
  16. HMAC-SHA256 prefix keyed on TPR_IP_SECRET. The truncation to 12 chars is
  17. intentional — it gives enough entropy to distinguish users for analytics
  18. while making reversal impractical even if the hash list is leaked.
  19. """
  20. from fastapi import APIRouter, Request
  21. from pydantic import BaseModel
  22. import os, sqlite3, json, hmac, hashlib
  23. from datetime import datetime
  24. from typing import Any, Dict, Optional
  25. router = APIRouter()
  26. DB_PATH = os.getenv("TPR_DB", "/data/telemetry.db")
  27. IP_SECRET = os.getenv("TPR_IP_SECRET", "change-me")
  28. def db():
  29. """
  30. Open a SQLite connection with WAL journal mode for better concurrency.
  31. Falls back to DELETE mode on filesystems that don't support WAL (e.g. some
  32. NFS or tmpfs mounts) — this can happen in certain Docker volume configs.
  33. Returns a context-manager-compatible connection (use `with db() as conn`).
  34. """
  35. os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
  36. conn = sqlite3.connect(DB_PATH)
  37. try:
  38. conn.execute("PRAGMA journal_mode=WAL;")
  39. except sqlite3.OperationalError:
  40. # fallback if FS forbids WAL
  41. conn.execute("PRAGMA journal_mode=DELETE;")
  42. return conn
  43. def ip_hash(ip: str) -> str:
  44. """
  45. Return a 12-char HMAC-SHA256 prefix of the IP address.
  46. Keyed on TPR_IP_SECRET — rotate the secret to invalidate all stored hashes.
  47. """
  48. dig = hmac.new(IP_SECRET.encode(), ip.encode(), 'sha256').hexdigest()
  49. return dig[:12]
  50. class TelemetryEvent(BaseModel):
  51. type: str # e.g. "page_view", "byok_call", "error"
  52. ts: str # ISO timestamp from the client (informational only)
  53. sid: str # browser session ID
  54. ua: Optional[str] = None
  55. data: Dict[str, Any] = {}
  56. @router.on_event("startup")
  57. def init():
  58. """Create all tables if they don't exist. Safe to run on every startup."""
  59. with db() as conn:
  60. conn.executescript("""
  61. CREATE TABLE IF NOT EXISTS events (
  62. id INTEGER PRIMARY KEY AUTOINCREMENT,
  63. ts TEXT NOT NULL,
  64. type TEXT NOT NULL,
  65. sid TEXT,
  66. ua TEXT,
  67. ip_hash TEXT,
  68. data_json TEXT
  69. );
  70. CREATE TABLE IF NOT EXISTS ask_logs (
  71. id INTEGER PRIMARY KEY AUTOINCREMENT,
  72. ts TEXT NOT NULL,
  73. sid TEXT,
  74. ip_hash TEXT,
  75. query TEXT,
  76. normalized TEXT,
  77. scope TEXT,
  78. allow_tps INTEGER,
  79. latency_ms INTEGER,
  80. model TEXT,
  81. ok INTEGER,
  82. topk_json TEXT,
  83. tokens_in INTEGER,
  84. tokens_out INTEGER,
  85. answer TEXT
  86. );
  87. CREATE TABLE IF NOT EXISTS feedback (
  88. id INTEGER PRIMARY KEY AUTOINCREMENT,
  89. ts TEXT NOT NULL,
  90. sid TEXT,
  91. ip_hash TEXT,
  92. verdict TEXT NOT NULL, -- 'up' or 'down'
  93. query TEXT, -- the question asked
  94. answer TEXT, -- the answer rated
  95. note TEXT, -- optional free-text note (thumbs down)
  96. model TEXT, -- which LLM answered
  97. scope TEXT, -- retrieval scope used
  98. sources_json TEXT -- JSON array of cited sources
  99. );
  100. """)
  101. conn.commit()
  102. @router.post("/telemetry")
  103. @limiter.limit("60/minute")
  104. async def telemetry(ev: TelemetryEvent, request: Request):
  105. """
  106. Record a generic browser-side event.
  107. The server-side timestamp is used for the DB row; ev.ts is stored in
  108. data_json only so dashboards can show client-reported timing if needed.
  109. """
  110. ip = request.client.host if request.client else "0.0.0.0"
  111. with db() as conn:
  112. conn.execute(
  113. "INSERT INTO events (ts,type,sid,ua,ip_hash,data_json) VALUES (?,?,?,?,?,json(?))",
  114. (datetime.utcnow().isoformat(), ev.type, ev.sid, ev.ua, ip_hash(ip), json_dumps(ev.data))
  115. )
  116. conn.commit()
  117. return {"ok": True}
  118. # ---------------------------------------------------------------------------
  119. # Utilities (used by app.py via `from telemetry import …`)
  120. # ---------------------------------------------------------------------------
  121. def json_dumps(o): return json.dumps(o, ensure_ascii=False, separators=(",",":"))