bridge.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444
  1. #!/usr/bin/env python3
  2. """
  3. bridge.py — Church Live Transcription Bridge
  4. Streams microphone audio to WhisperLiveKit (ws://localhost:8000/asr),
  5. receives transcription + speaker diarization, buffers sentences, and
  6. publishes rolling 3-line JSON to Mosquitto MQTT for the e-ink display.
  7. Start WhisperLiveKit with:
  8. wlk --model_size large-v3 --language en --diarization
  9. Run this script:
  10. python bridge.py
  11. """
  12. import asyncio
  13. import json
  14. import re
  15. import textwrap
  16. import threading
  17. import time
  18. from collections import Counter
  19. from pathlib import Path
  20. import numpy as np
  21. import paho.mqtt.client as mqtt
  22. import sounddevice as sd
  23. import websockets
  24. from fastapi import FastAPI, Request
  25. import uvicorn
  26. # ── Configuration ─────────────────────────────────────────────────────────────
  27. MQTT_HOST = "localhost"
  28. MQTT_PORT = 1883
  29. MQTT_TOPIC_TEXT = "display/text"
  30. MQTT_TOPIC_CLEAR = "display/clear"
  31. WS_URL = "ws://localhost:8000/asr"
  32. SAMPLE_RATE = 16000
  33. CHANNELS = 1
  34. BLOCKSIZE = 4096 # ~256 ms per chunk at 16 kHz
  35. SENTENCE_TIMEOUT = 4.0 # seconds of silence before forcing a flush
  36. MAX_LINE_CHARS = 38 # characters per line (~24pt font at 800 px wide)
  37. DISPLAY_LINES = 3
  38. # Set to a device index (integer) to force a specific microphone.
  39. # Leave as None to use the Windows default input device.
  40. # Run bridge.py once to see available device indices printed at startup.
  41. AUDIO_DEVICE: int | None = 12
  42. SPEAKERS_FILE = Path(__file__).parent / "speakers.json"
  43. DEFAULT_SPEAKERS: dict[str, str] = {
  44. "SPEAKER_00": "Pastor",
  45. "SPEAKER_01": "Reader",
  46. "SPEAKER_02": "Guest",
  47. "SPEAKER_03": "Choir",
  48. }
  49. # Shared queue for test audio injection from admin.py
  50. # Admin feeds decoded PCM float32 chunks here; bridge forwards to AudioProcessor
  51. _inject_queue: asyncio.Queue[bytes] | None = None
  52. # ── Audio injection API (receives chunks from admin.py) ───────────────────────
  53. _bridge_app = FastAPI()
  54. @_bridge_app.post("/inject")
  55. async def inject_audio(request: Request):
  56. chunk = await request.body()
  57. if _inject_queue is not None and chunk:
  58. try:
  59. _inject_queue.put_nowait(chunk)
  60. except asyncio.QueueFull:
  61. pass
  62. return {"ok": True}
  63. @_bridge_app.post("/inject/clear")
  64. async def inject_clear():
  65. global _inject_queue
  66. if _inject_queue:
  67. while not _inject_queue.empty():
  68. try:
  69. _inject_queue.get_nowait()
  70. except asyncio.QueueEmpty:
  71. break
  72. return {"ok": True}
  73. # ── Speaker persistence ───────────────────────────────────────────────────────
  74. def _load_speakers() -> dict[str, str]:
  75. if SPEAKERS_FILE.exists():
  76. try:
  77. data = json.loads(SPEAKERS_FILE.read_text(encoding="utf-8"))
  78. if isinstance(data, dict):
  79. return data
  80. except (json.JSONDecodeError, OSError):
  81. pass
  82. # First run — seed with defaults and save
  83. _write_speakers(DEFAULT_SPEAKERS)
  84. return dict(DEFAULT_SPEAKERS)
  85. def _write_speakers(names: dict[str, str]) -> None:
  86. try:
  87. SPEAKERS_FILE.write_text(
  88. json.dumps(names, indent=2, ensure_ascii=False),
  89. encoding="utf-8",
  90. )
  91. except OSError as exc:
  92. print(f"[Speakers] Save failed: {exc}")
  93. # ── State ─────────────────────────────────────────────────────────────────────
  94. class BridgeState:
  95. """All mutable state, protected by a single lock."""
  96. def __init__(self):
  97. self._lock = threading.Lock()
  98. self.speaker_names: dict[str, str] = _load_speakers()
  99. self._seen: set[str] = set(self.speaker_names)
  100. self._current_speaker: str | None = None
  101. self._speaker_changed = False
  102. self._text_buffer = ""
  103. self._display: list[str] = [""] * DISPLAY_LINES
  104. self._last_final_time = time.monotonic()
  105. # ── Speaker name management ───────────────────────────────────────────────
  106. def set_speaker_name(self, speaker_id: str, name: str) -> None:
  107. with self._lock:
  108. self.speaker_names[speaker_id] = name.strip()
  109. self._seen.add(speaker_id)
  110. _write_speakers(self.speaker_names)
  111. def delete_speaker(self, speaker_id: str) -> None:
  112. with self._lock:
  113. self.speaker_names.pop(speaker_id, None)
  114. self._seen.discard(speaker_id)
  115. _write_speakers(self.speaker_names)
  116. def seen_speakers_snapshot(self) -> set[str]:
  117. with self._lock:
  118. return set(self._seen)
  119. def _resolve(self, speaker_id: str | None) -> str | None:
  120. if not speaker_id:
  121. return None
  122. return self.speaker_names.get(speaker_id, speaker_id)
  123. # ── Text ingestion ────────────────────────────────────────────────────────
  124. def push_final(self, text: str, speaker_id: str | None, mqtt_client: mqtt.Client) -> None:
  125. """Accept a finalised segment; flush on sentence boundary or speaker change."""
  126. with self._lock:
  127. if speaker_id:
  128. self._seen.add(speaker_id)
  129. resolved = self._resolve(speaker_id)
  130. if resolved != self._current_speaker:
  131. if self._text_buffer:
  132. self._flush(mqtt_client)
  133. self._current_speaker = resolved
  134. self._speaker_changed = True
  135. sep = " " if self._text_buffer else ""
  136. self._text_buffer += sep + text.strip()
  137. self._last_final_time = time.monotonic()
  138. if _is_sentence_end(text):
  139. self._flush(mqtt_client)
  140. def maybe_timeout_flush(self, mqtt_client: mqtt.Client) -> None:
  141. with self._lock:
  142. if self._text_buffer and (time.monotonic() - self._last_final_time) > SENTENCE_TIMEOUT:
  143. self._flush(mqtt_client)
  144. def _flush(self, mqtt_client: mqtt.Client) -> None:
  145. """Word-wrap buffer → rolling display → publish. Must hold lock."""
  146. text = self._text_buffer.strip()
  147. self._text_buffer = ""
  148. if not text:
  149. return
  150. new_lines: list[str] = []
  151. if self._speaker_changed and self._current_speaker:
  152. new_lines.append(f"[{self._current_speaker.upper()}]")
  153. self._speaker_changed = False
  154. new_lines.extend(textwrap.wrap(text, MAX_LINE_CHARS) or [""])
  155. self._display.extend(new_lines)
  156. self._display = self._display[-DISPLAY_LINES:]
  157. while len(self._display) < DISPLAY_LINES:
  158. self._display.insert(0, "")
  159. payload = json.dumps({"lines": list(self._display)})
  160. mqtt_client.publish(MQTT_TOPIC_TEXT, payload)
  161. print(f"[Display] {self._display}")
  162. def clear(self, mqtt_client: mqtt.Client) -> None:
  163. with self._lock:
  164. self._display = [""] * DISPLAY_LINES
  165. self._text_buffer = ""
  166. self._current_speaker = None
  167. self._speaker_changed = False
  168. mqtt_client.publish(MQTT_TOPIC_CLEAR, "")
  169. print("[Display] Cleared")
  170. # ── Helpers ───────────────────────────────────────────────────────────────────
  171. def _is_sentence_end(text: str) -> bool:
  172. return bool(re.search(r'[.!?…]\s*$', text.strip()))
  173. def _extract_speaker(data: dict) -> str | None:
  174. if "speaker" in data:
  175. return data["speaker"] or None
  176. words = data.get("words", [])
  177. if words:
  178. ids = [w.get("speaker") for w in words if w.get("speaker")]
  179. if ids:
  180. return Counter(ids).most_common(1)[0][0]
  181. return None
  182. # ── MQTT ──────────────────────────────────────────────────────────────────────
  183. def build_mqtt_client() -> mqtt.Client:
  184. client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
  185. def on_connect(client, userdata, flags, rc, props):
  186. print("[MQTT] Connected" if rc == 0 else f"[MQTT] Failed: {rc}")
  187. def on_disconnect(client, userdata, flags, rc, props):
  188. print(f"[MQTT] Disconnected ({rc}), will reconnect...")
  189. client.on_connect = on_connect
  190. client.on_disconnect = on_disconnect
  191. client.reconnect_delay_set(min_delay=1, max_delay=30)
  192. client.connect_async(MQTT_HOST, MQTT_PORT)
  193. client.loop_start()
  194. return client
  195. # ── WebSocket + audio pipeline ────────────────────────────────────────────────
  196. async def _sender(ws, queue: asyncio.Queue) -> None:
  197. # Send config handshake first — WhisperLiveKit needs this before audio
  198. config = json.dumps({
  199. "uid": "bridge-client",
  200. "language": "en",
  201. "task": "transcribe",
  202. "model_size": "large-v3",
  203. "use_vad": True,
  204. })
  205. await ws.send(config)
  206. # Drain any stale chunks
  207. while not queue.empty():
  208. queue.get_nowait()
  209. while True:
  210. chunk = await queue.get()
  211. await ws.send(chunk)
  212. async def _receiver(ws, state: BridgeState, mqtt_client: mqtt.Client) -> None:
  213. async for message in ws:
  214. try:
  215. data = json.loads(message)
  216. except (json.JSONDecodeError, TypeError):
  217. continue
  218. text = (data.get("text") or data.get("buffer_transcription") or "").strip()
  219. is_final = data.get("is_final", False) or data.get("end_of_segment", False)
  220. speaker = _extract_speaker(data)
  221. if is_final and text:
  222. print(f"[Whisper] ({speaker or '?'}) {text}")
  223. state.push_final(text, speaker, mqtt_client)
  224. async def _flusher(state: BridgeState, mqtt_client: mqtt.Client) -> None:
  225. while True:
  226. await asyncio.sleep(1.0)
  227. state.maybe_timeout_flush(mqtt_client)
  228. async def _speaker_reloader(state: BridgeState) -> None:
  229. """Reload speakers.json every 5 s so admin UI changes take effect live."""
  230. last_mtime = 0.0
  231. while True:
  232. await asyncio.sleep(5.0)
  233. try:
  234. mtime = SPEAKERS_FILE.stat().st_mtime
  235. if mtime != last_mtime:
  236. fresh = _load_speakers()
  237. with state._lock:
  238. state.speaker_names = fresh
  239. last_mtime = mtime
  240. print("[Bridge] Speaker names reloaded from disk")
  241. except OSError:
  242. pass
  243. def _choose_audio_device() -> int | None:
  244. """
  245. List all input devices and return the index to use.
  246. Prefers AUDIO_DEVICE if set, otherwise the system default,
  247. otherwise the first device with input channels.
  248. """
  249. try:
  250. devices = sd.query_devices()
  251. default_in = sd.default.device[0] # may be -1 if unset
  252. except Exception as exc:
  253. print(f"[Audio] Cannot query devices: {exc}")
  254. return None
  255. print("[Audio] Available input devices:")
  256. input_devices: list[tuple[int, str]] = []
  257. for i, dev in enumerate(devices):
  258. if dev["max_input_channels"] > 0:
  259. marker = " ← default" if i == default_in else ""
  260. print(f" [{i}] {dev['name']}{marker}")
  261. input_devices.append((i, dev["name"]))
  262. if not input_devices:
  263. print("[Audio] ERROR: No input devices found. Connect a microphone and restart.")
  264. return None
  265. # Explicit override from config
  266. if AUDIO_DEVICE is not None:
  267. print(f"[Audio] Using configured device [{AUDIO_DEVICE}]")
  268. return AUDIO_DEVICE
  269. # System default (if valid)
  270. if default_in >= 0:
  271. print(f"[Audio] Using default input device [{default_in}]")
  272. return default_in
  273. # Fall back to first available input
  274. idx, name = input_devices[0]
  275. print(f"[Audio] No system default set — using [{idx}] {name}")
  276. print("[Audio] To choose a different device, set AUDIO_DEVICE in bridge.py")
  277. return idx
  278. # Remove the WebSocket audio sender entirely.
  279. # Use sounddevice → AudioProcessor directly via the Python API.
  280. from whisperlivekit import AudioProcessor, TranscriptionEngine
  281. async def audio_processor_loop(state: BridgeState, mqtt_client: mqtt.Client, engine: TranscriptionEngine) -> None:
  282. audio_queue: asyncio.Queue[bytes] = asyncio.Queue(maxsize=120)
  283. loop = asyncio.get_running_loop()
  284. def audio_callback(indata: np.ndarray, frames: int, time_info, status) -> None:
  285. if status:
  286. print(f"[Audio] {status}")
  287. chunk = indata.tobytes() # raw s16le
  288. loop.call_soon_threadsafe(
  289. lambda: audio_queue.put_nowait(chunk) if not audio_queue.full() else None
  290. )
  291. device = _choose_audio_device()
  292. if device is None:
  293. print("[Audio] No input device — cannot start.")
  294. return
  295. audio_processor = AudioProcessor(transcription_engine=engine)
  296. results_generator = await audio_processor.create_tasks()
  297. async def _receive_results():
  298. async for response in results_generator:
  299. # response is a FrontData dataclass, not a dict
  300. text = (getattr(response, "text", None) or getattr(response, "buffer_transcription", None) or "").strip()
  301. is_final = getattr(response, "is_final", False) or getattr(response, "end_of_segment", False)
  302. speaker = getattr(response, "speaker", None)
  303. if is_final and text:
  304. print(f"[Whisper] ({speaker or '?'}) {text}")
  305. state.push_final(text, speaker, mqtt_client)
  306. async def _send_audio():
  307. global _inject_queue
  308. _inject_queue = asyncio.Queue(maxsize=240)
  309. global test_audio_queue
  310. test_audio_queue = asyncio.Queue(maxsize=240)
  311. with sd.InputStream(
  312. device=device, samplerate=SAMPLE_RATE, channels=CHANNELS,
  313. dtype="int16", # s16le — matches pcm_input mode
  314. blocksize=BLOCKSIZE, callback=audio_callback,
  315. ):
  316. while True:
  317. # Drain test audio injection first if available
  318. try:
  319. chunk = test_audio_queue.get_nowait()
  320. except asyncio.QueueEmpty:
  321. chunk = await audio_queue.get()
  322. await audio_processor.process_audio(chunk)
  323. flusher = asyncio.create_task(_flusher(state, mqtt_client))
  324. reloader = asyncio.create_task(_speaker_reloader(state))
  325. try:
  326. await asyncio.gather(_send_audio(), _receive_results())
  327. finally:
  328. flusher.cancel()
  329. reloader.cancel()
  330. def run_async_loop(state: BridgeState, mqtt_client: mqtt.Client) -> None:
  331. asyncio.run(audio_ws_loop(state, mqtt_client))
  332. # ── Entry point ───────────────────────────────────────────────────────────────
  333. def main() -> None:
  334. from whisperlivekit import TranscriptionEngine
  335. state = BridgeState()
  336. mqtt_client = build_mqtt_client()
  337. engine = TranscriptionEngine(model_size="large-v3", lan="en", diarization=False, pcm_input=True)
  338. def _run():
  339. asyncio.run(audio_processor_loop(state, mqtt_client, engine))
  340. ws_thread = threading.Thread(target=_run, daemon=True)
  341. ws_thread.start()
  342. print("[Bridge] Audio pipeline running")
  343. try:
  344. ws_thread.join()
  345. except KeyboardInterrupt:
  346. pass
  347. def _run_inject_api():
  348. uvicorn.run(_bridge_app, host="127.0.0.1", port=8002, log_level="warning")
  349. inject_thread = threading.Thread(target=_run_inject_api, daemon=True)
  350. inject_thread.start()
  351. if __name__ == "__main__":
  352. main()