#!/usr/bin/env python3 """ admin.py — Speaker Admin Web Server Local web interface for managing speaker names, voice recordings, and test recording playback. Also serves the fullscreen display page for tablets / TVs and the SSE stream that feeds it. Runs on port 8001 alongside bridge.py. Access at: http://localhost:8001 ← speaker admin http://[PC-IP]:8001/display ← fullscreen display for tablets """ import asyncio import json import shutil import threading from contextlib import asynccontextmanager from pathlib import Path import paho.mqtt.client as mqtt from fastapi import FastAPI, HTTPException, UploadFile, File from fastapi.responses import HTMLResponse, FileResponse, StreamingResponse from pydantic import BaseModel import uvicorn SPEAKERS_FILE = Path(__file__).parent / "speakers.json" RECORDINGS_DIR = Path(__file__).parent / "recordings" TEST_RECORDINGS_DIR = Path(__file__).parent / "test_recordings" RECORDINGS_DIR.mkdir(exist_ok=True) TEST_RECORDINGS_DIR.mkdir(exist_ok=True) ALLOWED_AUDIO_EXTS = {".wav", ".mp3", ".m4a", ".ogg", ".flac", ".webm", ".aiff"} MQTT_HOST = "localhost" MQTT_PORT = 1883 MQTT_TOPIC_TEXT = "display/text" MQTT_TOPIC_CLEAR = "display/clear" # ── SSE broadcast state ─────────────────────────────────────────────────────── _sse_clients: set[asyncio.Queue] = set() _sse_lock = threading.Lock() _event_loop: asyncio.AbstractEventLoop | None = None def _broadcast(data: str) -> None: if _event_loop is None: return with _sse_lock: for q in list(_sse_clients): try: _event_loop.call_soon_threadsafe(q.put_nowait, data) except Exception: pass # ── MQTT subscriber ─────────────────────────────────────────────────────────── def _build_mqtt_subscriber() -> mqtt.Client: client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) def on_connect(client, userdata, flags, rc, props): if rc == 0: client.subscribe([(MQTT_TOPIC_TEXT, 0), (MQTT_TOPIC_CLEAR, 0)]) print("[Admin] MQTT subscribed to display/text and display/clear") else: print(f"[Admin] MQTT connect failed: {rc}") def on_message(client, userdata, msg): payload = msg.payload.decode("utf-8", errors="replace") if msg.topic == MQTT_TOPIC_CLEAR: _broadcast("event: clear\ndata: {}\n\n") else: _broadcast(f"event: text\ndata: {payload}\n\n") client.on_connect = on_connect client.on_message = on_message client.reconnect_delay_set(min_delay=1, max_delay=30) client.connect_async(MQTT_HOST, MQTT_PORT) client.loop_start() return client # ── App lifecycle ───────────────────────────────────────────────────────────── _mqtt_sub: mqtt.Client | None = None @asynccontextmanager async def lifespan(app: FastAPI): global _mqtt_sub, _event_loop _event_loop = asyncio.get_running_loop() _mqtt_sub = _build_mqtt_subscriber() yield if _mqtt_sub: _mqtt_sub.loop_stop() _mqtt_sub.disconnect() app = FastAPI(title="Speaker Admin", lifespan=lifespan) # ── Speaker data helpers ────────────────────────────────────────────────────── def _load() -> dict[str, str]: if SPEAKERS_FILE.exists(): try: return json.loads(SPEAKERS_FILE.read_text(encoding="utf-8")) except Exception: pass return {} def _save(data: dict[str, str]) -> None: SPEAKERS_FILE.write_text( json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8" ) def _recording_path(sid: str) -> Path | None: for ext in (".wav", ".mp3", ".m4a", ".ogg", ".webm", ".flac"): p = RECORDINGS_DIR / f"{sid}{ext}" if p.exists(): return p return None # ── Speaker API ─────────────────────────────────────────────────────────────── class NameBody(BaseModel): name: str class AddBody(BaseModel): id: str name: str @app.get("/api/speakers") def api_list(): speakers = _load() return {"speakers": [ {"id": k, "name": v, "has_recording": _recording_path(k) is not None} for k, v in sorted(speakers.items()) ]} @app.post("/api/speakers") def api_add(body: AddBody): speakers = _load() sid = body.id.strip() if not sid: raise HTTPException(400, "Speaker ID cannot be empty") if sid in speakers: raise HTTPException(400, f"'{sid}' already exists") speakers[sid] = body.name.strip() _save(speakers) return {"ok": True, "id": sid, "name": speakers[sid]} @app.put("/api/speakers/{sid}") def api_update(sid: str, body: NameBody): name = body.name.strip() if not name: raise HTTPException(400, "Name cannot be empty") speakers = _load() speakers[sid] = name _save(speakers) return {"ok": True} @app.delete("/api/speakers/{sid}") def api_delete(sid: str): speakers = _load() speakers.pop(sid, None) _save(speakers) rec = _recording_path(sid) if rec: rec.unlink() return {"ok": True} @app.post("/api/speakers/{sid}/recording") async def api_upload(sid: str, file: UploadFile = File(...)): suffix = Path(file.filename or "audio.wav").suffix.lower() or ".wav" rec = _recording_path(sid) if rec: rec.unlink() out = RECORDINGS_DIR / f"{sid}{suffix}" with out.open("wb") as f: shutil.copyfileobj(file.file, f) speakers = _load() if sid not in speakers: speakers[sid] = sid _save(speakers) size_kb = round(out.stat().st_size / 1024) return {"ok": True, "file": out.name, "kb": size_kb} @app.get("/api/speakers/{sid}/recording") def api_playback(sid: str): rec = _recording_path(sid) if not rec: raise HTTPException(404, "No recording found") return FileResponse(rec) # ── Test playback state ─────────────────────────────────────────────────────── _playback_task: asyncio.Task | None = None _playback_status: dict = { "state": "idle", # idle | loading | playing | done | error "file": None, "progress": 0, # 0–100 "elapsed": 0.0, # seconds streamed so far "duration": 0.0, # total file duration in seconds "error": None, } BRIDGE_INJECT_URL = "http://127.0.0.1:8002/inject" async def _stream_file(filepath: Path, speed: float) -> None: global _playback_status try: import miniaudio import httpx except ImportError as e: _playback_status.update({"state": "error", "error": f"Missing package: {e}"}) return try: _playback_status["state"] = "loading" info = miniaudio.get_file_info(str(filepath)) duration = info.duration _playback_status.update({ "state": "playing", "duration": round(duration, 1), "elapsed": 0.0, "progress": 0, }) chunk_frames = 4096 chunk_secs = chunk_frames / 16000 elapsed = 0.0 stream = miniaudio.stream_file( str(filepath), output_format=miniaudio.SampleFormat.SIGNED16, nchannels=1, sample_rate=16000, frames_to_read=chunk_frames, ) async with httpx.AsyncClient() as client: for chunk in stream: await client.post(BRIDGE_INJECT_URL, content=bytes(chunk)) elapsed += chunk_secs _playback_status["elapsed"] = round(elapsed, 1) _playback_status["progress"] = ( min(99, round(elapsed / duration * 100)) if duration else 0 ) await asyncio.sleep(chunk_secs / speed) _playback_status.update({ "state": "done", "progress": 100, "elapsed": round(duration, 1), }) except asyncio.CancelledError: _playback_status.update({ "state": "idle", "file": None, "progress": 0, "elapsed": 0.0, }) except Exception as exc: _playback_status.update({"state": "error", "error": str(exc), "progress": 0}) print(f"[Playback] {exc}") # ── Test recording API ──────────────────────────────────────────────────────── @app.post("/api/test/upload") async def api_test_upload(file: UploadFile = File(...)): suffix = Path(file.filename or "recording.wav").suffix.lower() if suffix not in ALLOWED_AUDIO_EXTS: raise HTTPException(400, f"Unsupported format '{suffix}'") stem = Path(file.filename).stem[:80].replace(" ", "_") out = TEST_RECORDINGS_DIR / f"{stem}{suffix}" try: with out.open("wb") as f: shutil.copyfileobj(file.file, f) except OSError as e: raise HTTPException(500, f"Could not save file: {e}") return {"ok": True, "filename": out.name, "mb": round(out.stat().st_size / 1024 / 1024, 1)} @app.get("/api/test/files") def api_test_list(): files = [] for p in sorted(TEST_RECORDINGS_DIR.iterdir()): if p.suffix.lower() in ALLOWED_AUDIO_EXTS: files.append({ "filename": p.name, "mb": round(p.stat().st_size / 1024 / 1024, 1), }) return {"files": files} @app.delete("/api/test/files/{filename:path}") def api_test_delete(filename: str): p = TEST_RECORDINGS_DIR / Path(filename).name try: if p.exists(): p.unlink() except OSError as e: raise HTTPException(500, f"Could not delete: {e}") return {"ok": True} class PlaybackBody(BaseModel): filename: str speed: float = 1.0 @app.post("/api/test/start") async def api_test_start(body: PlaybackBody): global _playback_task if _playback_task and not _playback_task.done(): raise HTTPException(409, "Playback already running — stop it first") p = TEST_RECORDINGS_DIR / Path(body.filename).name if not p.exists(): raise HTTPException(404, "File not found") speed = max(0.25, min(8.0, body.speed)) _playback_status.update({"state": "starting", "file": p.name, "progress": 0, "error": None}) _playback_task = asyncio.create_task(_stream_file(p, speed)) return {"ok": True} @app.post("/api/test/stop") async def api_test_stop(): global _playback_task if _playback_task and not _playback_task.done(): _playback_task.cancel() try: await _playback_task except asyncio.CancelledError: pass _playback_status.update({"state": "idle", "file": None, "progress": 0, "elapsed": 0.0}) return {"ok": True} @app.get("/api/test/status") def api_test_status(): return _playback_status # ── SSE display stream ──────────────────────────────────────────────────────── @app.get("/api/display/stream") async def display_stream(): q: asyncio.Queue[str] = asyncio.Queue(maxsize=50) with _sse_lock: _sse_clients.add(q) async def generator(): try: yield ": heartbeat\n\n" while True: try: data = await asyncio.wait_for(q.get(), timeout=25) yield data except asyncio.TimeoutError: yield ": heartbeat\n\n" except (asyncio.CancelledError, GeneratorExit): pass finally: with _sse_lock: _sse_clients.discard(q) return StreamingResponse( generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", }, ) # ── Web UI (admin) ──────────────────────────────────────────────────────────── HTML = """ Meeting Transcription for the Deaf

🎤 Speaker Admin

Meeting Transcription for the Deaf — Speaker Name & Voice Library
👁 View Display
Speaker ID Initials Name Locality Voice Sample Actions
🎧 Test Recording Playback

Upload a full church service recording (WAV, MP3, FLAC, OGG, M4A) to test the transcription pipeline offline. The file streams to WhisperLiveKit exactly as a live microphone would — results appear on the display page in real time.

⇧ Drop a recording here, or click to browse
WAV · MP3 · FLAC · OGG · M4A
Playback Speed
Recording File Size Actions
No recordings uploaded yet
Idle

While a test recording plays, the live microphone in bridge.py continues to run. Keep the room quiet during testing to avoid mixing live audio with the recording.

""" # ── Display page (fullscreen tablet / TV) ───────────────────────────────────── DISPLAY_HTML = """ Live Transcription
connecting
""" @app.get("/", response_class=HTMLResponse) def index(): return HTML @app.get("/display", response_class=HTMLResponse) def display(): return DISPLAY_HTML # ── Entry point ─────────────────────────────────────────────────────────────── if __name__ == "__main__": print("[Admin] Speaker admin at http://localhost:8001") print("[Admin] Display page at http://localhost:8001/display") uvicorn.run(app, host="0.0.0.0", port=8001, log_level="warning")