#!/usr/bin/env python3 """ admin.py — Speaker Admin Web Server Local web interface for managing speaker names and voice recordings. Runs on port 8001 alongside bridge.py. Access at: http://localhost:8001 """ import json import shutil from pathlib import Path from fastapi import FastAPI, HTTPException, UploadFile, File from fastapi.responses import HTMLResponse, FileResponse from pydantic import BaseModel import uvicorn SPEAKERS_FILE = Path(__file__).parent / "speakers.json" RECORDINGS_DIR = Path(__file__).parent / "recordings" RECORDINGS_DIR.mkdir(exist_ok=True) app = FastAPI(title="Speaker Admin") # ── Data helpers ────────────────────────────────────────────────────────────── def _load() -> dict[str, str]: if SPEAKERS_FILE.exists(): try: return json.loads(SPEAKERS_FILE.read_text(encoding="utf-8")) except Exception: pass return {} def _save(data: dict[str, str]) -> None: SPEAKERS_FILE.write_text( json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8" ) def _recording_path(sid: str) -> Path | None: for ext in (".wav", ".mp3", ".m4a", ".ogg", ".webm", ".flac"): p = RECORDINGS_DIR / f"{sid}{ext}" if p.exists(): return p return None # ── API ─────────────────────────────────────────────────────────────────────── class NameBody(BaseModel): name: str class AddBody(BaseModel): id: str name: str @app.get("/api/speakers") def api_list(): speakers = _load() return {"speakers": [ {"id": k, "name": v, "has_recording": _recording_path(k) is not None} for k, v in sorted(speakers.items()) ]} @app.post("/api/speakers") def api_add(body: AddBody): speakers = _load() sid = body.id.strip() if not sid: raise HTTPException(400, "Speaker ID cannot be empty") if sid in speakers: raise HTTPException(400, f"'{sid}' already exists") speakers[sid] = body.name.strip() _save(speakers) return {"ok": True, "id": sid, "name": speakers[sid]} @app.put("/api/speakers/{sid}") def api_update(sid: str, body: NameBody): name = body.name.strip() if not name: raise HTTPException(400, "Name cannot be empty") speakers = _load() speakers[sid] = name _save(speakers) return {"ok": True} @app.delete("/api/speakers/{sid}") def api_delete(sid: str): speakers = _load() speakers.pop(sid, None) _save(speakers) rec = _recording_path(sid) if rec: rec.unlink() return {"ok": True} @app.post("/api/speakers/{sid}/recording") async def api_upload(sid: str, file: UploadFile = File(...)): suffix = Path(file.filename or "audio.wav").suffix.lower() or ".wav" rec = _recording_path(sid) if rec: rec.unlink() out = RECORDINGS_DIR / f"{sid}{suffix}" with out.open("wb") as f: shutil.copyfileobj(file.file, f) speakers = _load() if sid not in speakers: speakers[sid] = sid _save(speakers) size_kb = round(out.stat().st_size / 1024) return {"ok": True, "file": out.name, "kb": size_kb} @app.get("/api/speakers/{sid}/recording") def api_playback(sid: str): rec = _recording_path(sid) if not rec: raise HTTPException(404, "No recording found") return FileResponse(rec) # ── Web UI ──────────────────────────────────────────────────────────────────── HTML = """ Speaker Admin

🎤 Speaker Admin

Church Live Transcription — Speaker Name & Voice Library
Speaker ID Friendly Name Voice Sample Actions
""" @app.get("/", response_class=HTMLResponse) def index(): return HTML # ── Entry point ─────────────────────────────────────────────────────────────── if __name__ == "__main__": print("[Admin] Speaker admin running at http://localhost:8001") uvicorn.run(app, host="0.0.0.0", port=8001, log_level="warning")