#!/usr/bin/env python3 """ admin.py — Speaker Admin Web Server Local web interface for managing speaker names, voice recordings, and test recording playback. Runs on port 8001 alongside bridge.py. Access at: http://localhost:8001 """ import asyncio import json import shutil from pathlib import Path from fastapi import FastAPI, HTTPException, UploadFile, File from fastapi.responses import HTMLResponse, FileResponse from pydantic import BaseModel import uvicorn import websockets SPEAKERS_FILE = Path(__file__).parent / "speakers.json" RECORDINGS_DIR = Path(__file__).parent / "recordings" TEST_RECORDINGS_DIR = Path(__file__).parent / "test_recordings" RECORDINGS_DIR.mkdir(exist_ok=True) TEST_RECORDINGS_DIR.mkdir(exist_ok=True) ALLOWED_AUDIO_EXTS = {".wav", ".mp3", ".m4a", ".ogg", ".flac", ".webm", ".aiff"} WS_URL = "ws://localhost:8000/asr" app = FastAPI(title="Speaker Admin") # ── Speaker data helpers ────────────────────────────────────────────────────── def _load() -> dict[str, str]: if SPEAKERS_FILE.exists(): try: return json.loads(SPEAKERS_FILE.read_text(encoding="utf-8")) except Exception: pass return {} def _save(data: dict[str, str]) -> None: SPEAKERS_FILE.write_text( json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8" ) def _recording_path(sid: str) -> Path | None: for ext in (".wav", ".mp3", ".m4a", ".ogg", ".webm", ".flac"): p = RECORDINGS_DIR / f"{sid}{ext}" if p.exists(): return p return None # ── Speaker API ─────────────────────────────────────────────────────────────── class NameBody(BaseModel): name: str class AddBody(BaseModel): id: str name: str @app.get("/api/speakers") def api_list(): speakers = _load() return {"speakers": [ {"id": k, "name": v, "has_recording": _recording_path(k) is not None} for k, v in sorted(speakers.items()) ]} @app.post("/api/speakers") def api_add(body: AddBody): speakers = _load() sid = body.id.strip() if not sid: raise HTTPException(400, "Speaker ID cannot be empty") if sid in speakers: raise HTTPException(400, f"'{sid}' already exists") speakers[sid] = body.name.strip() _save(speakers) return {"ok": True, "id": sid, "name": speakers[sid]} @app.put("/api/speakers/{sid}") def api_update(sid: str, body: NameBody): name = body.name.strip() if not name: raise HTTPException(400, "Name cannot be empty") speakers = _load() speakers[sid] = name _save(speakers) return {"ok": True} @app.delete("/api/speakers/{sid}") def api_delete(sid: str): speakers = _load() speakers.pop(sid, None) _save(speakers) rec = _recording_path(sid) if rec: rec.unlink() return {"ok": True} @app.post("/api/speakers/{sid}/recording") async def api_upload(sid: str, file: UploadFile = File(...)): suffix = Path(file.filename or "audio.wav").suffix.lower() or ".wav" rec = _recording_path(sid) if rec: rec.unlink() out = RECORDINGS_DIR / f"{sid}{suffix}" with out.open("wb") as f: shutil.copyfileobj(file.file, f) speakers = _load() if sid not in speakers: speakers[sid] = sid _save(speakers) size_kb = round(out.stat().st_size / 1024) return {"ok": True, "file": out.name, "kb": size_kb} @app.get("/api/speakers/{sid}/recording") def api_playback(sid: str): rec = _recording_path(sid) if not rec: raise HTTPException(404, "No recording found") return FileResponse(rec) # ── Test playback state ─────────────────────────────────────────────────────── _playback_task: asyncio.Task | None = None _playback_status: dict = { "state": "idle", # idle | loading | playing | done | error "file": None, "progress": 0, # 0–100 "elapsed": 0.0, # seconds streamed so far "duration": 0.0, # total file duration in seconds "error": None, } BRIDGE_INJECT_URL = "http://127.0.0.1:8002/inject" async def _stream_file(filepath: Path, speed: float) -> None: global _playback_status try: import miniaudio import httpx except ImportError as e: _playback_status.update({"state": "error", "error": f"Missing package: {e}"}) return try: _playback_status["state"] = "loading" info = miniaudio.get_file_info(str(filepath)) duration = info.duration _playback_status.update({ "state": "playing", "duration": round(duration, 1), "elapsed": 0.0, "progress": 0, }) chunk_frames = 4096 chunk_secs = chunk_frames / 16000 elapsed = 0.0 stream = miniaudio.stream_file( str(filepath), output_format=miniaudio.SampleFormat.SIGNED16, nchannels=1, sample_rate=16000, frames_to_read=chunk_frames, ) async with httpx.AsyncClient() as client: for chunk in stream: await client.post(BRIDGE_INJECT_URL, content=bytes(chunk)) elapsed += chunk_secs _playback_status["elapsed"] = round(elapsed, 1) _playback_status["progress"] = ( min(99, round(elapsed / duration * 100)) if duration else 0 ) await asyncio.sleep(chunk_secs / speed) _playback_status.update({ "state": "done", "progress": 100, "elapsed": round(duration, 1), }) except asyncio.CancelledError: _playback_status.update({ "state": "idle", "file": None, "progress": 0, "elapsed": 0.0, }) except Exception as exc: _playback_status.update({"state": "error", "error": str(exc), "progress": 0}) print(f"[Playback] {exc}") # ── Test recording API ──────────────────────────────────────────────────────── @app.post("/api/test/upload") async def api_test_upload(file: UploadFile = File(...)): suffix = Path(file.filename or "recording.wav").suffix.lower() if suffix not in ALLOWED_AUDIO_EXTS: raise HTTPException(400, f"Unsupported format '{suffix}'") # Sanitise filename — replace spaces with underscores stem = Path(file.filename).stem[:80].replace(" ", "_") out = TEST_RECORDINGS_DIR / f"{stem}{suffix}" try: with out.open("wb") as f: shutil.copyfileobj(file.file, f) except OSError as e: raise HTTPException(500, f"Could not save file: {e}") return {"ok": True, "filename": out.name, "mb": round(out.stat().st_size / 1024 / 1024, 1)} @app.get("/api/test/files") def api_test_list(): files = [] for p in sorted(TEST_RECORDINGS_DIR.iterdir()): if p.suffix.lower() in ALLOWED_AUDIO_EXTS: files.append({ "filename": p.name, "mb": round(p.stat().st_size / 1024 / 1024, 1), }) return {"files": files} @app.delete("/api/test/files/{filename:path}") def api_test_delete(filename: str): p = TEST_RECORDINGS_DIR / Path(filename).name try: if p.exists(): p.unlink() except OSError as e: raise HTTPException(500, f"Could not delete: {e}") return {"ok": True} class PlaybackBody(BaseModel): filename: str speed: float = 1.0 @app.post("/api/test/start") async def api_test_start(body: PlaybackBody): global _playback_task if _playback_task and not _playback_task.done(): raise HTTPException(409, "Playback already running — stop it first") p = TEST_RECORDINGS_DIR / Path(body.filename).name if not p.exists(): raise HTTPException(404, "File not found") speed = max(0.25, min(8.0, body.speed)) _playback_status.update({"state": "starting", "file": p.name, "progress": 0, "error": None}) _playback_task = asyncio.create_task(_stream_file(p, speed)) return {"ok": True} @app.post("/api/test/stop") async def api_test_stop(): global _playback_task if _playback_task and not _playback_task.done(): _playback_task.cancel() try: await _playback_task except asyncio.CancelledError: pass _playback_status.update({"state": "idle", "file": None, "progress": 0, "elapsed": 0.0}) return {"ok": True} @app.get("/api/test/status") def api_test_status(): return _playback_status # ── Web UI ──────────────────────────────────────────────────────────────────── HTML = """ Meeting Transcription for the Deaf

🎤 Speaker Admin

Meeting Transcription for the Deaf — Speaker Name & Voice Library
Speaker ID Friendly Name Voice Sample Actions
🎧 Test Recording Playback

Upload a full church service recording (WAV, MP3, FLAC, OGG, M4A) to test the transcription pipeline offline. The file streams to WhisperLiveKit exactly as a live microphone would — results appear on the e-ink display in real time.

⇧ Drop a recording here, or click to browse
WAV · MP3 · FLAC · OGG · M4A
Playback Speed
Recording File Size Actions
No recordings uploaded yet
Idle

While a test recording plays, the live microphone in bridge.py continues to run. Keep the room quiet during testing to avoid mixing live audio with the recording.

""" @app.get("/", response_class=HTMLResponse) def index(): return HTML # ── Entry point ─────────────────────────────────────────────────────────────── if __name__ == "__main__": print("[Admin] Speaker admin running at http://localhost:8001") uvicorn.run(app, host="0.0.0.0", port=8001, log_level="warning")