#!/usr/bin/env python3 """ admin.py — Speaker Admin Web Server Local web interface for managing speaker names, voice recordings, and test recording playback. Also serves the fullscreen display page for tablets / TVs and the SSE stream that feeds it. Runs on port 8001 alongside bridge.py. Access at: http://localhost:8001 ← speaker admin http://[PC-IP]:8001/display ← fullscreen display for tablets """ import asyncio import json import shutil import threading from contextlib import asynccontextmanager from pathlib import Path import paho.mqtt.client as mqtt from fastapi import FastAPI, HTTPException, UploadFile, File from fastapi.responses import HTMLResponse, FileResponse, StreamingResponse from pydantic import BaseModel import uvicorn SPEAKERS_FILE = Path(__file__).parent / "speakers.json" RECORDINGS_DIR = Path(__file__).parent / "recordings" TEST_RECORDINGS_DIR = Path(__file__).parent / "test_recordings" RECORDINGS_DIR.mkdir(exist_ok=True) TEST_RECORDINGS_DIR.mkdir(exist_ok=True) ALLOWED_AUDIO_EXTS = {".wav", ".mp3", ".m4a", ".ogg", ".flac", ".webm", ".aiff"} MQTT_HOST = "localhost" MQTT_PORT = 1883 MQTT_TOPIC_TEXT = "display/text" MQTT_TOPIC_CLEAR = "display/clear" # ── SSE broadcast state ─────────────────────────────────────────────────────── _sse_clients: set[asyncio.Queue] = set() _sse_lock = threading.Lock() _event_loop: asyncio.AbstractEventLoop | None = None def _broadcast(data: str) -> None: if _event_loop is None: return with _sse_lock: for q in list(_sse_clients): try: _event_loop.call_soon_threadsafe(q.put_nowait, data) except Exception: pass # ── MQTT subscriber ─────────────────────────────────────────────────────────── def _build_mqtt_subscriber() -> mqtt.Client: client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) def on_connect(client, userdata, flags, rc, props): if rc == 0: client.subscribe([(MQTT_TOPIC_TEXT, 0), (MQTT_TOPIC_CLEAR, 0)]) print("[Admin] MQTT subscribed to display/text and display/clear") else: print(f"[Admin] MQTT connect failed: {rc}") def on_message(client, userdata, msg): payload = msg.payload.decode("utf-8", errors="replace") if msg.topic == MQTT_TOPIC_CLEAR: _broadcast("event: clear\ndata: {}\n\n") else: _broadcast(f"event: text\ndata: {payload}\n\n") client.on_connect = on_connect client.on_message = on_message client.reconnect_delay_set(min_delay=1, max_delay=30) client.connect_async(MQTT_HOST, MQTT_PORT) client.loop_start() return client # ── App lifecycle ───────────────────────────────────────────────────────────── _mqtt_sub: mqtt.Client | None = None @asynccontextmanager async def lifespan(app: FastAPI): global _mqtt_sub, _event_loop _event_loop = asyncio.get_running_loop() _mqtt_sub = _build_mqtt_subscriber() yield if _mqtt_sub: _mqtt_sub.loop_stop() _mqtt_sub.disconnect() app = FastAPI(title="Speaker Admin", lifespan=lifespan) # ── Speaker data helpers ────────────────────────────────────────────────────── def _load() -> dict[str, str]: if SPEAKERS_FILE.exists(): try: return json.loads(SPEAKERS_FILE.read_text(encoding="utf-8")) except Exception: pass return {} def _save(data: dict[str, str]) -> None: SPEAKERS_FILE.write_text( json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8" ) def _recording_path(sid: str) -> Path | None: for ext in (".wav", ".mp3", ".m4a", ".ogg", ".webm", ".flac"): p = RECORDINGS_DIR / f"{sid}{ext}" if p.exists(): return p return None # ── Speaker API ─────────────────────────────────────────────────────────────── class NameBody(BaseModel): name: str | None = None role: str | None = None location: str | None = None class AddBody(BaseModel): id: str name: str = "" role: str = "" location: str = "" @app.get("/api/speakers") def api_list(): from embeddings import EmbeddingRegistry registry = EmbeddingRegistry( embeddings_dir=Path(__file__).parent / "embeddings", speakers_file=SPEAKERS_FILE, ) speakers = _load() result = [] for k, v in sorted(speakers.items()): if isinstance(v, dict): entry = { "id": k, "name": v.get("name", ""), "role": v.get("role", ""), "location": v.get("location", ""), "has_recording": _recording_path(k) is not None, "has_embedding": registry.has(k), "embedding_updated": v.get("embedding_updated"), } else: entry = { "id": k, "name": str(v), "role": "", "location": "", "has_recording": _recording_path(k) is not None, "has_embedding": registry.has(k), "embedding_updated": None, } result.append(entry) return {"speakers": result} @app.post("/api/speakers") def api_add(body: AddBody): speakers = _load() sid = body.id.strip() if not sid: raise HTTPException(400, "Speaker ID cannot be empty") if sid in speakers: raise HTTPException(400, f"'{sid}' already exists") speakers[sid] = {"name": body.name.strip(), "role": body.role.strip(), "location": body.location.strip()} _save(speakers) return {"ok": True, "id": sid, "name": body.name} @app.put("/api/speakers/{sid}") def api_update(sid: str, body: NameBody): speakers = _load() entry = speakers.get(sid, {}) if not isinstance(entry, dict): entry = {"name": str(entry), "role": "", "location": ""} if body.name is not None: entry["name"] = body.name.strip() if body.role is not None: entry["role"] = body.role.strip() if body.location is not None: entry["location"] = body.location.strip() speakers[sid] = entry _save(speakers) return {"ok": True} @app.delete("/api/speakers/{sid}") def api_delete(sid: str): speakers = _load() speakers.pop(sid, None) _save(speakers) rec = _recording_path(sid) if rec: rec.unlink() return {"ok": True} @app.post("/api/speakers/{sid}/recording") async def api_upload(sid: str, file: UploadFile = File(...)): suffix = Path(file.filename or "audio.wav").suffix.lower() or ".wav" rec = _recording_path(sid) if rec: rec.unlink() out = RECORDINGS_DIR / f"{sid}{suffix}" with out.open("wb") as f: shutil.copyfileobj(file.file, f) speakers = _load() if sid not in speakers: speakers[sid] = sid _save(speakers) size_kb = round(out.stat().st_size / 1024) return {"ok": True, "file": out.name, "kb": size_kb} @app.get("/api/speakers/{sid}/recording") def api_playback(sid: str): rec = _recording_path(sid) if not rec: raise HTTPException(404, "No recording found") return FileResponse(rec) # ── Voiceprint / embedding API ──────────────────────────────────────────────── class EnrolBody(BaseModel): audio_file: str # filename in test_recordings/ start: float = 0.0 end: float | None = None class AutoEnrolBody(BaseModel): audio_file: str session: str | None = None min_duration: float = 8.0 def _get_registry(): from embeddings import EmbeddingRegistry return EmbeddingRegistry( embeddings_dir=Path(__file__).parent / "embeddings", speakers_file=SPEAKERS_FILE, ) @app.get("/api/speakers/{sid}/voiceprint") def api_voiceprint_status(sid: str): registry = _get_registry() speakers = _load() entry = speakers.get(sid, {}) if not isinstance(entry, dict): entry = {} return { "has_embedding": registry.has(sid), "embedding_updated": entry.get("embedding_updated"), } @app.post("/api/speakers/{sid}/voiceprint/enrol") async def api_voiceprint_enrol(sid: str, body: EnrolBody): p = TEST_RECORDINGS_DIR / Path(body.audio_file).name if not p.exists(): raise HTTPException(404, f"Recording not found: {body.audio_file}") try: registry = _get_registry() emb = await asyncio.get_running_loop().run_in_executor( None, lambda: registry.extract_and_save(sid, p, body.start, body.end), ) return {"ok": True, "dim": int(emb.shape[0])} except Exception as exc: raise HTTPException(500, str(exc)) @app.post("/api/speakers/{sid}/voiceprint/auto-enrol") async def api_voiceprint_auto_enrol(sid: str, body: AutoEnrolBody): p = TEST_RECORDINGS_DIR / Path(body.audio_file).name if not p.exists(): raise HTTPException(404, f"Recording not found: {body.audio_file}") try: registry = _get_registry() emb = await asyncio.get_running_loop().run_in_executor( None, lambda: registry.enrol_from_transcript( sid, p, session_id=body.session, min_duration=body.min_duration, ), ) return {"ok": True, "dim": int(emb.shape[0])} except ValueError as exc: raise HTTPException(422, str(exc)) except Exception as exc: raise HTTPException(500, str(exc)) @app.delete("/api/speakers/{sid}/voiceprint") def api_voiceprint_delete(sid: str): registry = _get_registry() deleted = registry.delete(sid) return {"ok": True, "deleted": deleted} @app.get("/api/speakers/{sid}/voiceprint/candidates") def api_voiceprint_candidates(sid: str, session: str | None = None, min_dur: float = 8.0): from embeddings import get_best_enrolment_segments candidates = get_best_enrolment_segments(sid, session_id=session, min_duration=min_dur) return {"candidates": candidates} @app.get("/api/voiceprints") def api_voiceprints_list(): registry = _get_registry() return {"enrolled": registry.list_enrolled()} # ── Test playback state ─────────────────────────────────────────────────────── _playback_task: asyncio.Task | None = None _playback_status: dict = { "state": "idle", # idle | loading | playing | done | error "file": None, "progress": 0, # 0–100 "elapsed": 0.0, # seconds streamed so far "duration": 0.0, # total file duration in seconds "error": None, } BRIDGE_INJECT_URL = "http://127.0.0.1:8002/inject" async def _stream_file(filepath: Path, speed: float) -> None: global _playback_status try: import miniaudio import httpx except ImportError as e: _playback_status.update({"state": "error", "error": f"Missing package: {e}"}) return try: _playback_status["state"] = "loading" info = miniaudio.get_file_info(str(filepath)) duration = info.duration _playback_status.update({ "state": "playing", "duration": round(duration, 1), "elapsed": 0.0, "progress": 0, }) chunk_frames = 4096 chunk_secs = chunk_frames / 16000 elapsed = 0.0 stream = miniaudio.stream_file( str(filepath), output_format=miniaudio.SampleFormat.SIGNED16, nchannels=1, sample_rate=16000, frames_to_read=chunk_frames, ) async with httpx.AsyncClient() as client: for chunk in stream: await client.post(BRIDGE_INJECT_URL, content=bytes(chunk)) elapsed += chunk_secs _playback_status["elapsed"] = round(elapsed, 1) _playback_status["progress"] = ( min(99, round(elapsed / duration * 100)) if duration else 0 ) await asyncio.sleep(chunk_secs / speed) _playback_status.update({ "state": "done", "progress": 100, "elapsed": round(duration, 1), }) except asyncio.CancelledError: _playback_status.update({ "state": "idle", "file": None, "progress": 0, "elapsed": 0.0, }) except Exception as exc: _playback_status.update({"state": "error", "error": str(exc), "progress": 0}) print(f"[Playback] {exc}") # ── Test recording API ──────────────────────────────────────────────────────── @app.post("/api/test/upload") async def api_test_upload(file: UploadFile = File(...)): suffix = Path(file.filename or "recording.wav").suffix.lower() if suffix not in ALLOWED_AUDIO_EXTS: raise HTTPException(400, f"Unsupported format '{suffix}'") stem = Path(file.filename).stem[:80].replace(" ", "_") out = TEST_RECORDINGS_DIR / f"{stem}{suffix}" try: with out.open("wb") as f: shutil.copyfileobj(file.file, f) except OSError as e: raise HTTPException(500, f"Could not save file: {e}") return {"ok": True, "filename": out.name, "mb": round(out.stat().st_size / 1024 / 1024, 1)} @app.get("/api/test/files") def api_test_list(): files = [] for p in sorted(TEST_RECORDINGS_DIR.iterdir()): if p.suffix.lower() in ALLOWED_AUDIO_EXTS: files.append({ "filename": p.name, "mb": round(p.stat().st_size / 1024 / 1024, 1), }) return {"files": files} @app.delete("/api/test/files/{filename:path}") def api_test_delete(filename: str): p = TEST_RECORDINGS_DIR / Path(filename).name try: if p.exists(): p.unlink() except OSError as e: raise HTTPException(500, f"Could not delete: {e}") return {"ok": True} class PlaybackBody(BaseModel): filename: str speed: float = 1.0 @app.post("/api/test/start") async def api_test_start(body: PlaybackBody): global _playback_task if _playback_task and not _playback_task.done(): raise HTTPException(409, "Playback already running — stop it first") p = TEST_RECORDINGS_DIR / Path(body.filename).name if not p.exists(): raise HTTPException(404, "File not found") speed = max(0.25, min(8.0, body.speed)) _playback_status.update({"state": "starting", "file": p.name, "progress": 0, "error": None}) _playback_task = asyncio.create_task(_stream_file(p, speed)) return {"ok": True} @app.post("/api/test/stop") async def api_test_stop(): global _playback_task if _playback_task and not _playback_task.done(): _playback_task.cancel() try: await _playback_task except asyncio.CancelledError: pass _playback_status.update({"state": "idle", "file": None, "progress": 0, "elapsed": 0.0}) return {"ok": True} @app.get("/api/test/status") def api_test_status(): return _playback_status # ── SSE display stream ──────────────────────────────────────────────────────── @app.get("/api/display/stream") async def display_stream(): q: asyncio.Queue[str] = asyncio.Queue(maxsize=50) with _sse_lock: _sse_clients.add(q) async def generator(): try: yield ": heartbeat\n\n" while True: try: data = await asyncio.wait_for(q.get(), timeout=25) yield data except asyncio.TimeoutError: yield ": heartbeat\n\n" except (asyncio.CancelledError, GeneratorExit): pass finally: with _sse_lock: _sse_clients.discard(q) return StreamingResponse( generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", }, ) # ── Web UI (admin) ──────────────────────────────────────────────────────────── HTML = """
| Speaker ID | Initials | Name | Locality | Voiceprint | Voice Sample | Actions |
|---|
Upload a full church service recording (WAV, MP3, FLAC, OGG, M4A) to test the transcription pipeline offline. The file streams to WhisperLiveKit exactly as a live microphone would — results appear on the display page in real time.
| Recording File | Size | Actions |
|---|---|---|
| No recordings uploaded yet | ||
While a test recording plays, the live microphone in bridge.py continues to run. Keep the room quiet during testing to avoid mixing live audio with the recording.
Picks the cleanest isolated segment from the transcript log automatically. Run a test recording first to populate the log.
Extract a voiceprint from a specific time range in a recording. Choose a section with clear isolated speech (10–30 seconds).
Upload a 10–60 second clear speech recording.
Supported: WAV, MP3, M4A, OGG, FLAC, WebM