bridge.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516
  1. #!/usr/bin/env python3
  2. """
  3. bridge.py — Church Live Transcription Bridge
  4. Streams microphone audio to WhisperLiveKit (ws://localhost:8000/asr),
  5. receives transcription + speaker diarization, buffers sentences, and
  6. publishes rolling 3-line JSON to Mosquitto MQTT for the e-ink display.
  7. Start WhisperLiveKit with:
  8. wlk --model large-v3 --language en --diarization
  9. Run this script:
  10. python bridge.py
  11. """
  12. import asyncio
  13. import json
  14. import re
  15. import textwrap
  16. import threading
  17. import time
  18. from collections import Counter
  19. from pathlib import Path
  20. import numpy as np
  21. import paho.mqtt.client as mqtt
  22. import sounddevice as sd
  23. import websockets
  24. import tkinter as tk
  25. from tkinter import ttk
  26. # ── Configuration ─────────────────────────────────────────────────────────────
  27. MQTT_HOST = "localhost"
  28. MQTT_PORT = 1883
  29. MQTT_TOPIC_TEXT = "display/text"
  30. MQTT_TOPIC_CLEAR = "display/clear"
  31. WS_URL = "ws://localhost:8000/asr"
  32. SAMPLE_RATE = 16000
  33. CHANNELS = 1
  34. BLOCKSIZE = 4096 # ~256 ms per chunk at 16 kHz
  35. SENTENCE_TIMEOUT = 4.0 # seconds of silence before forcing a flush
  36. MAX_LINE_CHARS = 38 # characters per line (~24pt font at 800 px wide)
  37. DISPLAY_LINES = 3
  38. # Set to a device index (integer) to force a specific microphone.
  39. # Leave as None to use the Windows default input device.
  40. # Run bridge.py once to see available device indices printed at startup.
  41. AUDIO_DEVICE: int | None = None
  42. SPEAKERS_FILE = Path(__file__).parent / "speakers.json"
  43. DEFAULT_SPEAKERS: dict[str, str] = {
  44. "SPEAKER_00": "Pastor",
  45. "SPEAKER_01": "Reader",
  46. "SPEAKER_02": "Guest",
  47. "SPEAKER_03": "Choir",
  48. }
  49. # ── Speaker persistence ───────────────────────────────────────────────────────
  50. def _load_speakers() -> dict[str, str]:
  51. if SPEAKERS_FILE.exists():
  52. try:
  53. data = json.loads(SPEAKERS_FILE.read_text(encoding="utf-8"))
  54. if isinstance(data, dict):
  55. return data
  56. except (json.JSONDecodeError, OSError):
  57. pass
  58. # First run — seed with defaults and save
  59. _write_speakers(DEFAULT_SPEAKERS)
  60. return dict(DEFAULT_SPEAKERS)
  61. def _write_speakers(names: dict[str, str]) -> None:
  62. try:
  63. SPEAKERS_FILE.write_text(
  64. json.dumps(names, indent=2, ensure_ascii=False),
  65. encoding="utf-8",
  66. )
  67. except OSError as exc:
  68. print(f"[Speakers] Save failed: {exc}")
  69. # ── State ─────────────────────────────────────────────────────────────────────
  70. class BridgeState:
  71. """All mutable state, protected by a single lock."""
  72. def __init__(self):
  73. self._lock = threading.Lock()
  74. self.speaker_names: dict[str, str] = _load_speakers()
  75. self._seen: set[str] = set(self.speaker_names)
  76. self._current_speaker: str | None = None
  77. self._speaker_changed = False
  78. self._text_buffer = ""
  79. self._display: list[str] = [""] * DISPLAY_LINES
  80. self._last_final_time = time.monotonic()
  81. # ── Speaker name management ───────────────────────────────────────────────
  82. def set_speaker_name(self, speaker_id: str, name: str) -> None:
  83. with self._lock:
  84. self.speaker_names[speaker_id] = name.strip()
  85. self._seen.add(speaker_id)
  86. _write_speakers(self.speaker_names)
  87. def delete_speaker(self, speaker_id: str) -> None:
  88. with self._lock:
  89. self.speaker_names.pop(speaker_id, None)
  90. self._seen.discard(speaker_id)
  91. _write_speakers(self.speaker_names)
  92. def seen_speakers_snapshot(self) -> set[str]:
  93. with self._lock:
  94. return set(self._seen)
  95. def _resolve(self, speaker_id: str | None) -> str | None:
  96. if not speaker_id:
  97. return None
  98. return self.speaker_names.get(speaker_id, speaker_id)
  99. # ── Text ingestion ────────────────────────────────────────────────────────
  100. def push_final(self, text: str, speaker_id: str | None, mqtt_client: mqtt.Client) -> None:
  101. """Accept a finalised segment; flush on sentence boundary or speaker change."""
  102. with self._lock:
  103. if speaker_id:
  104. self._seen.add(speaker_id)
  105. resolved = self._resolve(speaker_id)
  106. if resolved != self._current_speaker:
  107. if self._text_buffer:
  108. self._flush(mqtt_client)
  109. self._current_speaker = resolved
  110. self._speaker_changed = True
  111. sep = " " if self._text_buffer else ""
  112. self._text_buffer += sep + text.strip()
  113. self._last_final_time = time.monotonic()
  114. if _is_sentence_end(text):
  115. self._flush(mqtt_client)
  116. def maybe_timeout_flush(self, mqtt_client: mqtt.Client) -> None:
  117. with self._lock:
  118. if self._text_buffer and (time.monotonic() - self._last_final_time) > SENTENCE_TIMEOUT:
  119. self._flush(mqtt_client)
  120. def _flush(self, mqtt_client: mqtt.Client) -> None:
  121. """Word-wrap buffer → rolling display → publish. Must hold lock."""
  122. text = self._text_buffer.strip()
  123. self._text_buffer = ""
  124. if not text:
  125. return
  126. new_lines: list[str] = []
  127. if self._speaker_changed and self._current_speaker:
  128. new_lines.append(f"[{self._current_speaker.upper()}]")
  129. self._speaker_changed = False
  130. new_lines.extend(textwrap.wrap(text, MAX_LINE_CHARS) or [""])
  131. self._display.extend(new_lines)
  132. self._display = self._display[-DISPLAY_LINES:]
  133. while len(self._display) < DISPLAY_LINES:
  134. self._display.insert(0, "")
  135. payload = json.dumps({"lines": list(self._display)})
  136. mqtt_client.publish(MQTT_TOPIC_TEXT, payload)
  137. print(f"[Display] {self._display}")
  138. def clear(self, mqtt_client: mqtt.Client) -> None:
  139. with self._lock:
  140. self._display = [""] * DISPLAY_LINES
  141. self._text_buffer = ""
  142. self._current_speaker = None
  143. self._speaker_changed = False
  144. mqtt_client.publish(MQTT_TOPIC_CLEAR, "")
  145. print("[Display] Cleared")
  146. # ── Helpers ───────────────────────────────────────────────────────────────────
  147. def _is_sentence_end(text: str) -> bool:
  148. return bool(re.search(r'[.!?…]\s*$', text.strip()))
  149. def _extract_speaker(data: dict) -> str | None:
  150. if "speaker" in data:
  151. return data["speaker"] or None
  152. words = data.get("words", [])
  153. if words:
  154. ids = [w.get("speaker") for w in words if w.get("speaker")]
  155. if ids:
  156. return Counter(ids).most_common(1)[0][0]
  157. return None
  158. # ── MQTT ──────────────────────────────────────────────────────────────────────
  159. def build_mqtt_client() -> mqtt.Client:
  160. client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
  161. def on_connect(client, userdata, flags, rc, props):
  162. print("[MQTT] Connected" if rc == 0 else f"[MQTT] Failed: {rc}")
  163. def on_disconnect(client, userdata, flags, rc, props):
  164. print(f"[MQTT] Disconnected ({rc}), will reconnect...")
  165. client.on_connect = on_connect
  166. client.on_disconnect = on_disconnect
  167. client.reconnect_delay_set(min_delay=1, max_delay=30)
  168. client.connect_async(MQTT_HOST, MQTT_PORT)
  169. client.loop_start()
  170. return client
  171. # ── WebSocket + audio pipeline ────────────────────────────────────────────────
  172. async def _sender(ws, queue: asyncio.Queue) -> None:
  173. while not queue.empty():
  174. queue.get_nowait()
  175. while True:
  176. chunk = await queue.get()
  177. await ws.send(chunk)
  178. async def _receiver(ws, state: BridgeState, mqtt_client: mqtt.Client) -> None:
  179. async for message in ws:
  180. try:
  181. data = json.loads(message)
  182. except (json.JSONDecodeError, TypeError):
  183. continue
  184. text = (data.get("text") or data.get("buffer_transcription") or "").strip()
  185. is_final = data.get("is_final", False) or data.get("end_of_segment", False)
  186. speaker = _extract_speaker(data)
  187. if is_final and text:
  188. print(f"[Whisper] ({speaker or '?'}) {text}")
  189. state.push_final(text, speaker, mqtt_client)
  190. async def _flusher(state: BridgeState, mqtt_client: mqtt.Client) -> None:
  191. while True:
  192. await asyncio.sleep(1.0)
  193. state.maybe_timeout_flush(mqtt_client)
  194. def _choose_audio_device() -> int | None:
  195. """
  196. List all input devices and return the index to use.
  197. Prefers AUDIO_DEVICE if set, otherwise the system default,
  198. otherwise the first device with input channels.
  199. """
  200. try:
  201. devices = sd.query_devices()
  202. default_in = sd.default.device[0] # may be -1 if unset
  203. except Exception as exc:
  204. print(f"[Audio] Cannot query devices: {exc}")
  205. return None
  206. print("[Audio] Available input devices:")
  207. input_devices: list[tuple[int, str]] = []
  208. for i, dev in enumerate(devices):
  209. if dev["max_input_channels"] > 0:
  210. marker = " ← default" if i == default_in else ""
  211. print(f" [{i}] {dev['name']}{marker}")
  212. input_devices.append((i, dev["name"]))
  213. if not input_devices:
  214. print("[Audio] ERROR: No input devices found. Connect a microphone and restart.")
  215. return None
  216. # Explicit override from config
  217. if AUDIO_DEVICE is not None:
  218. print(f"[Audio] Using configured device [{AUDIO_DEVICE}]")
  219. return AUDIO_DEVICE
  220. # System default (if valid)
  221. if default_in >= 0:
  222. print(f"[Audio] Using default input device [{default_in}]")
  223. return default_in
  224. # Fall back to first available input
  225. idx, name = input_devices[0]
  226. print(f"[Audio] No system default set — using [{idx}] {name}")
  227. print("[Audio] To choose a different device, set AUDIO_DEVICE in bridge.py")
  228. return idx
  229. async def audio_ws_loop(state: BridgeState, mqtt_client: mqtt.Client) -> None:
  230. audio_queue: asyncio.Queue[bytes] = asyncio.Queue(maxsize=120)
  231. loop = asyncio.get_running_loop()
  232. def audio_callback(indata: np.ndarray, frames: int, time_info, status) -> None:
  233. if status:
  234. print(f"[Audio] {status}")
  235. chunk = indata.tobytes()
  236. def _put():
  237. try:
  238. audio_queue.put_nowait(chunk)
  239. except asyncio.QueueFull:
  240. pass
  241. loop.call_soon_threadsafe(_put)
  242. device = _choose_audio_device()
  243. if device is None:
  244. print("[Audio] No input device available — audio pipeline cannot start.")
  245. return
  246. with sd.InputStream(
  247. device=device,
  248. samplerate=SAMPLE_RATE,
  249. channels=CHANNELS,
  250. dtype="int16",
  251. blocksize=BLOCKSIZE,
  252. callback=audio_callback,
  253. ):
  254. flusher = asyncio.create_task(_flusher(state, mqtt_client))
  255. try:
  256. while True:
  257. try:
  258. print(f"[WS] Connecting to {WS_URL} ...")
  259. async with websockets.connect(WS_URL, max_size=2**23) as ws:
  260. print("[WS] Connected")
  261. send_t = asyncio.create_task(_sender(ws, audio_queue))
  262. recv_t = asyncio.create_task(_receiver(ws, state, mqtt_client))
  263. done, pending = await asyncio.wait(
  264. [send_t, recv_t], return_when=asyncio.FIRST_COMPLETED
  265. )
  266. for t in pending:
  267. t.cancel()
  268. for t in done:
  269. if not t.cancelled() and (exc := t.exception()):
  270. print(f"[WS] Task error: {exc}")
  271. except (websockets.ConnectionClosed, OSError, ConnectionRefusedError) as exc:
  272. print(f"[WS] {exc} — retrying in 3 s...")
  273. await asyncio.sleep(3)
  274. finally:
  275. flusher.cancel()
  276. def run_async_loop(state: BridgeState, mqtt_client: mqtt.Client) -> None:
  277. asyncio.run(audio_ws_loop(state, mqtt_client))
  278. # ── Speaker UI ────────────────────────────────────────────────────────────────
  279. def run_speaker_ui(state: BridgeState, mqtt_client: mqtt.Client) -> None:
  280. root = tk.Tk()
  281. root.title("Transcription Bridge — Speaker Names")
  282. root.attributes("-topmost", True)
  283. root.minsize(440, 360)
  284. root.geometry("460x480")
  285. # ── Header ────────────────────────────────────────────────────────────────
  286. tk.Label(root, text="Speaker Name Mapping", font=("Helvetica", 12, "bold")).pack(
  287. pady=(12, 2)
  288. )
  289. tk.Label(
  290. root,
  291. text="Names are saved to speakers.json and restored each session.\n"
  292. "New speakers detected by diarization appear here automatically.",
  293. font=("Helvetica", 9), fg="gray", justify="center",
  294. ).pack(pady=(0, 6))
  295. # ── Scrollable list ───────────────────────────────────────────────────────
  296. list_outer = tk.Frame(root, relief="sunken", bd=1)
  297. list_outer.pack(fill="both", expand=True, padx=12, pady=(0, 4))
  298. canvas = tk.Canvas(list_outer, highlightthickness=0)
  299. scrollbar = ttk.Scrollbar(list_outer, orient="vertical", command=canvas.yview)
  300. rows_frame = tk.Frame(canvas)
  301. rows_frame.bind(
  302. "<Configure>",
  303. lambda e: canvas.configure(scrollregion=canvas.bbox("all")),
  304. )
  305. canvas.create_window((0, 0), window=rows_frame, anchor="nw")
  306. canvas.configure(yscrollcommand=scrollbar.set)
  307. canvas.pack(side="left", fill="both", expand=True)
  308. scrollbar.pack(side="right", fill="y")
  309. canvas.bind_all(
  310. "<MouseWheel>",
  311. lambda e: canvas.yview_scroll(int(-1 * e.delta / 120), "units"),
  312. )
  313. # Column headers
  314. hdr = tk.Frame(rows_frame, bg="#e8e8e8")
  315. hdr.pack(fill="x", pady=(2, 0))
  316. tk.Label(hdr, text=" Speaker ID", bg="#e8e8e8", font=("Helvetica", 9, "bold"), width=14, anchor="w").pack(side="left")
  317. tk.Label(hdr, text="Friendly Name", bg="#e8e8e8", font=("Helvetica", 9, "bold"), width=18, anchor="w").pack(side="left")
  318. # ── Row management ────────────────────────────────────────────────────────
  319. rendered_sids: set[str] = set()
  320. def _add_row(sid: str, name: str) -> None:
  321. if sid in rendered_sids:
  322. return
  323. rendered_sids.add(sid)
  324. row = tk.Frame(rows_frame)
  325. row.pack(fill="x", padx=4, pady=2)
  326. tk.Label(row, text=sid, font=("Courier", 9), width=14, anchor="w").pack(side="left")
  327. entry = tk.Entry(row, font=("Helvetica", 10), width=18)
  328. entry.insert(0, name)
  329. entry.pack(side="left", padx=4)
  330. saved_lbl = tk.Label(row, text="", font=("Helvetica", 8), fg="#2a7a2a", width=5)
  331. saved_lbl.pack(side="left")
  332. def _save(s=sid, e=entry, lbl=saved_lbl):
  333. n = e.get().strip()
  334. if not n:
  335. return
  336. state.set_speaker_name(s, n)
  337. lbl.config(text="Saved")
  338. row.after(2000, lambda: lbl.config(text=""))
  339. print(f"[UI] {s} → {n!r}")
  340. def _delete(s=sid, r=row):
  341. state.delete_speaker(s)
  342. rendered_sids.discard(s)
  343. r.destroy()
  344. print(f"[UI] Removed {s}")
  345. entry.bind("<Return>", lambda _e, s=sid, e=entry, lbl=saved_lbl: _save(s, e, lbl))
  346. tk.Button(row, text="Save", command=_save, width=5).pack(side="left", padx=2)
  347. tk.Button(row, text="✕", command=_delete, fg="red", width=3).pack(side="left")
  348. # Populate from persisted state (sorted so order is stable)
  349. for sid, name in sorted(state.speaker_names.items()):
  350. _add_row(sid, name)
  351. # Poll every 2 s for speaker IDs newly seen from Whisper this session
  352. def _poll():
  353. for sid in sorted(state.seen_speakers_snapshot() - rendered_sids):
  354. _add_row(sid, state.speaker_names.get(sid, sid))
  355. root.after(2000, _poll)
  356. _poll()
  357. # ── Add row manually ──────────────────────────────────────────────────────
  358. ttk.Separator(root, orient="horizontal").pack(fill="x", padx=12, pady=4)
  359. add_row = tk.Frame(root)
  360. add_row.pack(fill="x", padx=12)
  361. tk.Label(add_row, text="Add:", font=("Helvetica", 9)).pack(side="left")
  362. add_id = tk.Entry(add_row, font=("Courier", 9), width=13)
  363. add_id.insert(0, "SPEAKER_04")
  364. add_id.pack(side="left", padx=4)
  365. tk.Label(add_row, text="→").pack(side="left")
  366. add_name = tk.Entry(add_row, font=("Helvetica", 10), width=16)
  367. add_name.pack(side="left", padx=4)
  368. def _add_manual():
  369. sid = add_id.get().strip()
  370. name = add_name.get().strip()
  371. if sid and name:
  372. state.set_speaker_name(sid, name)
  373. _add_row(sid, name)
  374. add_name.delete(0, tk.END)
  375. print(f"[UI] Added {sid} → {name!r}")
  376. add_name.bind("<Return>", lambda _e: _add_manual())
  377. tk.Button(add_row, text="Add", command=_add_manual, width=5).pack(side="left", padx=2)
  378. # ── Footer buttons ────────────────────────────────────────────────────────
  379. ttk.Separator(root, orient="horizontal").pack(fill="x", padx=12, pady=6)
  380. footer = tk.Frame(root)
  381. footer.pack(fill="x", padx=12, pady=(0, 12))
  382. tk.Label(
  383. footer, text="Changes save instantly to speakers.json",
  384. font=("Helvetica", 8), fg="gray",
  385. ).pack(side="left")
  386. tk.Button(
  387. footer, text="Clear Display", fg="red", width=14,
  388. command=lambda: state.clear(mqtt_client),
  389. ).pack(side="right")
  390. root.mainloop()
  391. # ── Entry point ───────────────────────────────────────────────────────────────
  392. def main() -> None:
  393. state = BridgeState()
  394. mqtt_client = build_mqtt_client()
  395. ws_thread = threading.Thread(
  396. target=run_async_loop, args=(state, mqtt_client), daemon=True
  397. )
  398. ws_thread.start()
  399. print(f"[Bridge] Speaker names loaded from {SPEAKERS_FILE}")
  400. print("[Bridge] Audio pipeline running — close this window to quit")
  401. run_speaker_ui(state, mqtt_client)
  402. if __name__ == "__main__":
  403. main()