telemetry.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. """
  2. telemetry.py — SQLite telemetry store + /telemetry event endpoint
  3. Shared between app.py (ask_logs, feedback inserts) and the /telemetry
  4. route (browser-side events: page_view, byok_call, etc.).
  5. Database layout
  6. ---------------
  7. ask_logs — one row per /ask call (query, latency, model, answer, …)
  8. feedback — thumbs up/down ratings from users
  9. events — generic browser-side events (page views, BYOK calls, errors)
  10. The DB file path comes from the TPR_DB env var (default /data/telemetry.db).
  11. It is bind-mounted from the host so both the backend and web containers can
  12. read from the same file.
  13. IP hashing
  14. ----------
  15. Client IPs are never stored in plain text. ip_hash() produces a 12-char
  16. HMAC-SHA256 prefix keyed on TPR_IP_SECRET. The truncation to 12 chars is
  17. intentional — it gives enough entropy to distinguish users for analytics
  18. while making reversal impractical even if the hash list is leaked.
  19. """
  20. from fastapi import APIRouter, Request
  21. from pydantic import BaseModel
  22. import os, sqlite3, json, hmac, hashlib
  23. from limiter import limiter
  24. from datetime import datetime
  25. from typing import Any, Dict, Optional
  26. router = APIRouter()
  27. DB_PATH = os.getenv("TPR_DB", "/data/telemetry.db")
  28. IP_SECRET = os.getenv("TPR_IP_SECRET", "change-me")
  29. def db():
  30. """
  31. Open a SQLite connection with WAL journal mode for better concurrency.
  32. Falls back to DELETE mode on filesystems that don't support WAL (e.g. some
  33. NFS or tmpfs mounts) — this can happen in certain Docker volume configs.
  34. Returns a context-manager-compatible connection (use `with db() as conn`).
  35. """
  36. os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
  37. conn = sqlite3.connect(DB_PATH)
  38. try:
  39. conn.execute("PRAGMA journal_mode=WAL;")
  40. except sqlite3.OperationalError:
  41. # fallback if FS forbids WAL
  42. conn.execute("PRAGMA journal_mode=DELETE;")
  43. return conn
  44. def ip_hash(ip: str) -> str:
  45. """
  46. Return a 12-char HMAC-SHA256 prefix of the IP address.
  47. Keyed on TPR_IP_SECRET — rotate the secret to invalidate all stored hashes.
  48. """
  49. dig = hmac.new(IP_SECRET.encode(), ip.encode(), 'sha256').hexdigest()
  50. return dig[:12]
  51. class TelemetryEvent(BaseModel):
  52. type: str # e.g. "page_view", "byok_call", "error"
  53. ts: str # ISO timestamp from the client (informational only)
  54. sid: str # browser session ID
  55. ua: Optional[str] = None
  56. data: Dict[str, Any] = {}
  57. @router.on_event("startup")
  58. def init():
  59. """Create all tables if they don't exist. Safe to run on every startup."""
  60. with db() as conn:
  61. conn.executescript("""
  62. CREATE TABLE IF NOT EXISTS events (
  63. id INTEGER PRIMARY KEY AUTOINCREMENT,
  64. ts TEXT NOT NULL,
  65. type TEXT NOT NULL,
  66. sid TEXT,
  67. ua TEXT,
  68. ip_hash TEXT,
  69. data_json TEXT
  70. );
  71. CREATE TABLE IF NOT EXISTS ask_logs (
  72. id INTEGER PRIMARY KEY AUTOINCREMENT,
  73. ts TEXT NOT NULL,
  74. sid TEXT,
  75. ip_hash TEXT,
  76. query TEXT,
  77. normalized TEXT,
  78. scope TEXT,
  79. allow_tps INTEGER,
  80. latency_ms INTEGER,
  81. model TEXT,
  82. ok INTEGER,
  83. topk_json TEXT,
  84. tokens_in INTEGER,
  85. tokens_out INTEGER,
  86. answer TEXT
  87. );
  88. CREATE TABLE IF NOT EXISTS feedback (
  89. id INTEGER PRIMARY KEY AUTOINCREMENT,
  90. ts TEXT NOT NULL,
  91. sid TEXT,
  92. ip_hash TEXT,
  93. verdict TEXT NOT NULL, -- 'up' or 'down'
  94. query TEXT, -- the question asked
  95. answer TEXT, -- the answer rated
  96. note TEXT, -- optional free-text note (thumbs down)
  97. model TEXT, -- which LLM answered
  98. scope TEXT, -- retrieval scope used
  99. sources_json TEXT -- JSON array of cited sources
  100. );
  101. CREATE INDEX IF NOT EXISTS idx_ask_logs_ts ON ask_logs(ts);
  102. CREATE INDEX IF NOT EXISTS idx_ask_logs_normalized ON ask_logs(normalized);
  103. CREATE INDEX IF NOT EXISTS idx_feedback_ts ON feedback(ts);
  104. """)
  105. conn.commit()
  106. @router.post("/telemetry")
  107. @limiter.limit("60/minute")
  108. async def telemetry(ev: TelemetryEvent, request: Request):
  109. """
  110. Record a generic browser-side event.
  111. The server-side timestamp is used for the DB row; ev.ts is stored in
  112. data_json only so dashboards can show client-reported timing if needed.
  113. """
  114. ip = request.client.host if request.client else "0.0.0.0"
  115. with db() as conn:
  116. conn.execute(
  117. "INSERT INTO events (ts,type,sid,ua,ip_hash,data_json) VALUES (?,?,?,?,?,json(?))",
  118. (datetime.utcnow().isoformat(), ev.type, ev.sid, ev.ua, ip_hash(ip), json_dumps(ev.data))
  119. )
  120. conn.commit()
  121. return {"ok": True}
  122. # ---------------------------------------------------------------------------
  123. # Utilities (used by app.py via `from telemetry import …`)
  124. # ---------------------------------------------------------------------------
  125. def json_dumps(o): return json.dumps(o, ensure_ascii=False, separators=(",",":"))