#!/usr/bin/env python3 """ admin.py — Speaker Admin Web Server Local web interface for managing speaker names, voice recordings, and test recording playback. Runs on port 8001 alongside bridge.py. Access at: http://localhost:8001 """ import asyncio import json import shutil from pathlib import Path from fastapi import FastAPI, HTTPException, UploadFile, File from fastapi.responses import HTMLResponse, FileResponse from pydantic import BaseModel import uvicorn import websockets SPEAKERS_FILE = Path(__file__).parent / "speakers.json" RECORDINGS_DIR = Path(__file__).parent / "recordings" TEST_RECORDINGS_DIR = Path(__file__).parent / "test_recordings" RECORDINGS_DIR.mkdir(exist_ok=True) TEST_RECORDINGS_DIR.mkdir(exist_ok=True) ALLOWED_AUDIO_EXTS = {".wav", ".mp3", ".m4a", ".ogg", ".flac", ".webm", ".aiff"} WS_URL = "ws://localhost:8000/asr" app = FastAPI(title="Speaker Admin") # ── Speaker data helpers ────────────────────────────────────────────────────── def _load() -> dict[str, str]: if SPEAKERS_FILE.exists(): try: return json.loads(SPEAKERS_FILE.read_text(encoding="utf-8")) except Exception: pass return {} def _save(data: dict[str, str]) -> None: SPEAKERS_FILE.write_text( json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8" ) def _recording_path(sid: str) -> Path | None: for ext in (".wav", ".mp3", ".m4a", ".ogg", ".webm", ".flac"): p = RECORDINGS_DIR / f"{sid}{ext}" if p.exists(): return p return None # ── Speaker API ─────────────────────────────────────────────────────────────── class NameBody(BaseModel): name: str class AddBody(BaseModel): id: str name: str @app.get("/api/speakers") def api_list(): speakers = _load() return {"speakers": [ {"id": k, "name": v, "has_recording": _recording_path(k) is not None} for k, v in sorted(speakers.items()) ]} @app.post("/api/speakers") def api_add(body: AddBody): speakers = _load() sid = body.id.strip() if not sid: raise HTTPException(400, "Speaker ID cannot be empty") if sid in speakers: raise HTTPException(400, f"'{sid}' already exists") speakers[sid] = body.name.strip() _save(speakers) return {"ok": True, "id": sid, "name": speakers[sid]} @app.put("/api/speakers/{sid}") def api_update(sid: str, body: NameBody): name = body.name.strip() if not name: raise HTTPException(400, "Name cannot be empty") speakers = _load() speakers[sid] = name _save(speakers) return {"ok": True} @app.delete("/api/speakers/{sid}") def api_delete(sid: str): speakers = _load() speakers.pop(sid, None) _save(speakers) rec = _recording_path(sid) if rec: rec.unlink() return {"ok": True} @app.post("/api/speakers/{sid}/recording") async def api_upload(sid: str, file: UploadFile = File(...)): suffix = Path(file.filename or "audio.wav").suffix.lower() or ".wav" rec = _recording_path(sid) if rec: rec.unlink() out = RECORDINGS_DIR / f"{sid}{suffix}" with out.open("wb") as f: shutil.copyfileobj(file.file, f) speakers = _load() if sid not in speakers: speakers[sid] = sid _save(speakers) size_kb = round(out.stat().st_size / 1024) return {"ok": True, "file": out.name, "kb": size_kb} @app.get("/api/speakers/{sid}/recording") def api_playback(sid: str): rec = _recording_path(sid) if not rec: raise HTTPException(404, "No recording found") return FileResponse(rec) # ── Test playback state ─────────────────────────────────────────────────────── _playback_task: asyncio.Task | None = None _playback_status: dict = { "state": "idle", # idle | loading | playing | done | error "file": None, "progress": 0, # 0–100 "elapsed": 0.0, # seconds streamed so far "duration": 0.0, # total file duration in seconds "error": None, } async def _stream_file(filepath: Path, speed: float) -> None: global _playback_status try: import miniaudio # type: ignore except ImportError: _playback_status.update({ "state": "error", "error": "miniaudio not installed — run: pip install miniaudio", }) return try: _playback_status["state"] = "loading" duration = 0.0 try: info = miniaudio.get_file_info(str(filepath)) duration = info.duration except Exception: pass _playback_status.update({ "state": "playing", "duration": round(duration, 1), "elapsed": 0.0, "progress": 0, }) chunk_frames = 4096 chunk_secs = chunk_frames / 16000 elapsed = 0.0 async with websockets.connect(WS_URL, max_size=2**23) as ws: stream = miniaudio.stream_file( str(filepath), output_format=miniaudio.SampleFormat.SIGNED16, nchannels=1, sample_rate=16000, frames_to_read=chunk_frames, ) for chunk in stream: await ws.send(bytes(chunk)) elapsed += chunk_secs _playback_status["elapsed"] = round(elapsed, 1) _playback_status["progress"] = ( min(99, round(elapsed / duration * 100)) if duration else 0 ) await asyncio.sleep(chunk_secs / speed) _playback_status.update({ "state": "done", "progress": 100, "elapsed": round(duration, 1), }) except asyncio.CancelledError: _playback_status.update({ "state": "idle", "file": None, "progress": 0, "elapsed": 0.0, }) except Exception as exc: _playback_status.update({"state": "error", "error": str(exc), "progress": 0}) print(f"[Playback] {exc}") # ── Test recording API ──────────────────────────────────────────────────────── @app.post("/api/test/upload") async def api_test_upload(file: UploadFile = File(...)): suffix = Path(file.filename or "recording.wav").suffix.lower() if suffix not in ALLOWED_AUDIO_EXTS: raise HTTPException(400, f"Unsupported format '{suffix}'. Use WAV, MP3, FLAC, OGG, or M4A.") stem = Path(file.filename).stem[:80] # limit filename length out = TEST_RECORDINGS_DIR / f"{stem}{suffix}" with out.open("wb") as f: shutil.copyfileobj(file.file, f) size_mb = round(out.stat().st_size / 1024 / 1024, 1) return {"ok": True, "filename": out.name, "mb": size_mb} @app.get("/api/test/files") def api_test_list(): files = [] for p in sorted(TEST_RECORDINGS_DIR.iterdir()): if p.suffix.lower() in ALLOWED_AUDIO_EXTS: files.append({ "filename": p.name, "mb": round(p.stat().st_size / 1024 / 1024, 1), }) return {"files": files} @app.delete("/api/test/files/{filename}") def api_test_delete(filename: str): p = TEST_RECORDINGS_DIR / Path(filename).name # sanitise — no path traversal if p.exists(): p.unlink() return {"ok": True} class PlaybackBody(BaseModel): filename: str speed: float = 1.0 @app.post("/api/test/start") async def api_test_start(body: PlaybackBody): global _playback_task if _playback_task and not _playback_task.done(): raise HTTPException(409, "Playback already running — stop it first") p = TEST_RECORDINGS_DIR / Path(body.filename).name if not p.exists(): raise HTTPException(404, "File not found") speed = max(0.25, min(8.0, body.speed)) _playback_status.update({"state": "starting", "file": p.name, "progress": 0, "error": None}) _playback_task = asyncio.create_task(_stream_file(p, speed)) return {"ok": True} @app.post("/api/test/stop") async def api_test_stop(): global _playback_task if _playback_task and not _playback_task.done(): _playback_task.cancel() try: await _playback_task except asyncio.CancelledError: pass _playback_status.update({"state": "idle", "file": None, "progress": 0, "elapsed": 0.0}) return {"ok": True} @app.get("/api/test/status") def api_test_status(): return _playback_status # ── Web UI ──────────────────────────────────────────────────────────────────── HTML = """
| Speaker ID | Friendly Name | Voice Sample | Actions |
|---|
Upload a full church service recording (WAV, MP3, FLAC, OGG, M4A) to test the transcription pipeline offline. The file streams to WhisperLiveKit exactly as a live microphone would — results appear on the e-ink display in real time.
| Recording File | Size | Actions |
|---|---|---|
| No recordings uploaded yet | ||
While a test recording plays, the live microphone in bridge.py continues to run. Keep the room quiet during testing to avoid mixing live audio with the recording.
Upload a 10–60 second clear speech recording.
Supported: WAV, MP3, M4A, OGG, FLAC, WebM