bridge.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361
  1. #!/usr/bin/env python3
  2. """
  3. bridge.py — Church Live Transcription Bridge
  4. Streams microphone audio to WhisperLiveKit (ws://localhost:8000/asr),
  5. receives transcription + speaker diarization, buffers sentences, and
  6. publishes rolling 3-line JSON to Mosquitto MQTT for the e-ink display.
  7. Start WhisperLiveKit with:
  8. wlk --model large-v3 --language en --diarization
  9. Run this script:
  10. python bridge.py
  11. """
  12. import asyncio
  13. import json
  14. import re
  15. import textwrap
  16. import threading
  17. import time
  18. from collections import Counter
  19. import numpy as np
  20. import paho.mqtt.client as mqtt
  21. import sounddevice as sd
  22. import websockets
  23. import tkinter as tk
  24. from tkinter import ttk
  25. # ── Configuration ─────────────────────────────────────────────────────────────
  26. MQTT_HOST = "localhost"
  27. MQTT_PORT = 1883
  28. MQTT_TOPIC_TEXT = "display/text"
  29. MQTT_TOPIC_CLEAR = "display/clear"
  30. WS_URL = "ws://localhost:8000/asr"
  31. SAMPLE_RATE = 16000
  32. CHANNELS = 1
  33. BLOCKSIZE = 4096 # ~256 ms per chunk at 16 kHz
  34. SENTENCE_TIMEOUT = 4.0 # seconds of silence before forcing a flush
  35. MAX_LINE_CHARS = 38 # characters per line (~24pt font at 800 px wide)
  36. DISPLAY_LINES = 3
  37. # ── State ─────────────────────────────────────────────────────────────────────
  38. class BridgeState:
  39. """All mutable state, protected by a single lock."""
  40. def __init__(self):
  41. self._lock = threading.Lock()
  42. self.speaker_names: dict[str, str] = {} # "SPEAKER_00" → "Pastor"
  43. self._current_speaker: str | None = None
  44. self._speaker_changed = False
  45. self._text_buffer = ""
  46. self._display: list[str] = [""] * DISPLAY_LINES
  47. self._last_final_time = time.monotonic()
  48. # ── Speaker name mapping ──────────────────────────────────────────────────
  49. def set_speaker_name(self, speaker_id: str, name: str) -> None:
  50. with self._lock:
  51. self.speaker_names[speaker_id] = name.strip()
  52. def _resolve(self, speaker_id: str | None) -> str | None:
  53. if not speaker_id:
  54. return None
  55. return self.speaker_names.get(speaker_id, speaker_id)
  56. # ── Text ingestion ────────────────────────────────────────────────────────
  57. def push_final(self, text: str, speaker_id: str | None, mqtt_client: mqtt.Client) -> None:
  58. """Accept a finalised segment; flush on sentence boundary or speaker change."""
  59. with self._lock:
  60. resolved = self._resolve(speaker_id)
  61. if resolved != self._current_speaker:
  62. if self._text_buffer:
  63. self._flush(mqtt_client) # push previous speaker's words first
  64. self._current_speaker = resolved
  65. self._speaker_changed = True
  66. sep = " " if self._text_buffer else ""
  67. self._text_buffer += sep + text.strip()
  68. self._last_final_time = time.monotonic()
  69. if _is_sentence_end(text):
  70. self._flush(mqtt_client)
  71. def maybe_timeout_flush(self, mqtt_client: mqtt.Client) -> None:
  72. with self._lock:
  73. if self._text_buffer and (time.monotonic() - self._last_final_time) > SENTENCE_TIMEOUT:
  74. self._flush(mqtt_client)
  75. def _flush(self, mqtt_client: mqtt.Client) -> None:
  76. """Word-wrap buffer → rolling display → publish. Must hold lock."""
  77. text = self._text_buffer.strip()
  78. self._text_buffer = ""
  79. if not text:
  80. return
  81. new_lines: list[str] = []
  82. if self._speaker_changed and self._current_speaker:
  83. new_lines.append(f"[{self._current_speaker.upper()}]")
  84. self._speaker_changed = False
  85. new_lines.extend(textwrap.wrap(text, MAX_LINE_CHARS) or [""])
  86. self._display.extend(new_lines)
  87. self._display = self._display[-DISPLAY_LINES:]
  88. while len(self._display) < DISPLAY_LINES:
  89. self._display.insert(0, "")
  90. payload = json.dumps({"lines": list(self._display)})
  91. mqtt_client.publish(MQTT_TOPIC_TEXT, payload)
  92. print(f"[Display] {self._display}")
  93. def clear(self, mqtt_client: mqtt.Client) -> None:
  94. with self._lock:
  95. self._display = [""] * DISPLAY_LINES
  96. self._text_buffer = ""
  97. self._current_speaker = None
  98. self._speaker_changed = False
  99. mqtt_client.publish(MQTT_TOPIC_CLEAR, "")
  100. print("[Display] Cleared")
  101. # ── Helpers ───────────────────────────────────────────────────────────────────
  102. def _is_sentence_end(text: str) -> bool:
  103. return bool(re.search(r'[.!?…]\s*$', text.strip()))
  104. def _extract_speaker(data: dict) -> str | None:
  105. """
  106. Extract speaker ID from a WhisperLiveKit response dict.
  107. Handles segment-level {"speaker": "SPEAKER_00"} and word-level
  108. {"words": [{"speaker": "SPEAKER_00", ...}, ...]} formats.
  109. """
  110. if "speaker" in data:
  111. return data["speaker"] or None
  112. words = data.get("words", [])
  113. if words:
  114. ids = [w.get("speaker") for w in words if w.get("speaker")]
  115. if ids:
  116. return Counter(ids).most_common(1)[0][0]
  117. return None
  118. # ── MQTT ──────────────────────────────────────────────────────────────────────
  119. def build_mqtt_client() -> mqtt.Client:
  120. client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
  121. def on_connect(client, userdata, flags, rc, props):
  122. print("[MQTT] Connected" if rc == 0 else f"[MQTT] Failed: {rc}")
  123. def on_disconnect(client, userdata, flags, rc, props):
  124. print(f"[MQTT] Disconnected ({rc}), will reconnect...")
  125. client.on_connect = on_connect
  126. client.on_disconnect = on_disconnect
  127. client.reconnect_delay_set(min_delay=1, max_delay=30)
  128. client.connect_async(MQTT_HOST, MQTT_PORT)
  129. client.loop_start()
  130. return client
  131. # ── WebSocket + audio pipeline ────────────────────────────────────────────────
  132. async def _sender(ws, queue: asyncio.Queue) -> None:
  133. while not queue.empty(): # drain stale audio before streaming
  134. queue.get_nowait()
  135. while True:
  136. chunk = await queue.get()
  137. await ws.send(chunk)
  138. async def _receiver(ws, state: BridgeState, mqtt_client: mqtt.Client) -> None:
  139. async for message in ws:
  140. try:
  141. data = json.loads(message)
  142. except (json.JSONDecodeError, TypeError):
  143. continue
  144. text = (data.get("text") or data.get("buffer_transcription") or "").strip()
  145. is_final = data.get("is_final", False) or data.get("end_of_segment", False)
  146. speaker = _extract_speaker(data)
  147. if is_final and text:
  148. print(f"[Whisper] ({speaker or '?'}) {text}")
  149. state.push_final(text, speaker, mqtt_client)
  150. async def _flusher(state: BridgeState, mqtt_client: mqtt.Client) -> None:
  151. while True:
  152. await asyncio.sleep(1.0)
  153. state.maybe_timeout_flush(mqtt_client)
  154. async def audio_ws_loop(state: BridgeState, mqtt_client: mqtt.Client) -> None:
  155. audio_queue: asyncio.Queue[bytes] = asyncio.Queue(maxsize=120)
  156. loop = asyncio.get_running_loop()
  157. def audio_callback(indata: np.ndarray, frames: int, time_info, status) -> None:
  158. if status:
  159. print(f"[Audio] {status}")
  160. chunk = indata.tobytes()
  161. def _put():
  162. try:
  163. audio_queue.put_nowait(chunk)
  164. except asyncio.QueueFull:
  165. pass
  166. loop.call_soon_threadsafe(_put)
  167. with sd.InputStream(
  168. samplerate=SAMPLE_RATE,
  169. channels=CHANNELS,
  170. dtype="int16",
  171. blocksize=BLOCKSIZE,
  172. callback=audio_callback,
  173. ):
  174. flusher = asyncio.create_task(_flusher(state, mqtt_client))
  175. try:
  176. while True:
  177. try:
  178. print(f"[WS] Connecting to {WS_URL} ...")
  179. async with websockets.connect(WS_URL, max_size=2**23) as ws:
  180. print("[WS] Connected")
  181. send_t = asyncio.create_task(_sender(ws, audio_queue))
  182. recv_t = asyncio.create_task(_receiver(ws, state, mqtt_client))
  183. done, pending = await asyncio.wait(
  184. [send_t, recv_t], return_when=asyncio.FIRST_COMPLETED
  185. )
  186. for t in pending:
  187. t.cancel()
  188. for t in done:
  189. if not t.cancelled() and (exc := t.exception()):
  190. print(f"[WS] Task error: {exc}")
  191. except (websockets.ConnectionClosed, OSError, ConnectionRefusedError) as exc:
  192. print(f"[WS] {exc} — retrying in 3 s...")
  193. await asyncio.sleep(3)
  194. finally:
  195. flusher.cancel()
  196. def run_async_loop(state: BridgeState, mqtt_client: mqtt.Client) -> None:
  197. asyncio.run(audio_ws_loop(state, mqtt_client))
  198. # ── Speaker name-mapping UI ───────────────────────────────────────────────────
  199. PRESET_SPEAKERS = [
  200. ("SPEAKER_00", "Pastor"),
  201. ("SPEAKER_01", "Reader"),
  202. ("SPEAKER_02", "Guest"),
  203. ("SPEAKER_03", "Choir"),
  204. ]
  205. def run_speaker_ui(state: BridgeState, mqtt_client: mqtt.Client) -> None:
  206. root = tk.Tk()
  207. root.title("Transcription Bridge — Speaker Names")
  208. root.attributes("-topmost", True)
  209. root.resizable(False, False)
  210. tk.Label(root, text="Speaker Name Mapping", font=("Helvetica", 12, "bold")).grid(
  211. row=0, column=0, columnspan=3, pady=(12, 2), padx=12
  212. )
  213. tk.Label(
  214. root,
  215. text="Diarization is automatic. Assign readable names to each speaker ID.",
  216. font=("Helvetica", 9), fg="gray", justify="center",
  217. ).grid(row=1, column=0, columnspan=3, pady=(0, 8))
  218. tk.Label(root, text="Speaker ID", font=("Helvetica", 10, "bold")).grid(row=2, column=0, padx=8)
  219. tk.Label(root, text="Friendly Name", font=("Helvetica", 10, "bold")).grid(row=2, column=1, padx=8)
  220. entries: list[tuple[str, tk.Entry]] = []
  221. for i, (sid, default) in enumerate(PRESET_SPEAKERS):
  222. tk.Label(root, text=sid, font=("Courier", 10)).grid(row=3+i, column=0, sticky="e", padx=8, pady=3)
  223. e = tk.Entry(root, width=16, font=("Helvetica", 10))
  224. e.insert(0, default)
  225. e.grid(row=3+i, column=1, padx=8, pady=3)
  226. entries.append((sid, e))
  227. def _apply(s=sid, entry=e):
  228. state.set_speaker_name(s, entry.get())
  229. print(f"[UI] {s} → {entry.get()!r}")
  230. tk.Button(root, text="Apply", command=_apply, width=6).grid(row=3+i, column=2, padx=6)
  231. ttk.Separator(root, orient="horizontal").grid(
  232. row=7, column=0, columnspan=3, sticky="ew", padx=8, pady=8
  233. )
  234. # Custom ID row
  235. tk.Label(root, text="Custom ID:").grid(row=8, column=0, sticky="e", padx=8)
  236. cid = tk.Entry(root, width=14, font=("Courier", 10))
  237. cid.insert(0, "SPEAKER_04")
  238. cid.grid(row=8, column=1, sticky="w", padx=8, pady=2)
  239. tk.Label(root, text="Name:").grid(row=9, column=0, sticky="e", padx=8)
  240. cname = tk.Entry(root, width=14, font=("Helvetica", 10))
  241. cname.grid(row=9, column=1, sticky="w", padx=8, pady=2)
  242. def _apply_custom():
  243. s, n = cid.get().strip(), cname.get().strip()
  244. if s and n:
  245. state.set_speaker_name(s, n)
  246. print(f"[UI] Custom: {s} → {n!r}")
  247. tk.Button(root, text="Apply", command=_apply_custom, width=6).grid(row=9, column=2, padx=6)
  248. ttk.Separator(root, orient="horizontal").grid(
  249. row=10, column=0, columnspan=3, sticky="ew", padx=8, pady=8
  250. )
  251. def _apply_all():
  252. for sid, entry in entries:
  253. state.set_speaker_name(sid, entry.get())
  254. print("[UI] All names applied")
  255. tk.Button(root, text="Apply All Names", width=18, command=_apply_all).grid(
  256. row=11, column=0, columnspan=2, padx=8, pady=4, sticky="w"
  257. )
  258. tk.Button(root, text="Clear Display", width=14, fg="red",
  259. command=lambda: state.clear(mqtt_client)).grid(
  260. row=11, column=2, padx=8, pady=4
  261. )
  262. tk.Label(root, text="Speaker labels appear on the display when the speaker changes.",
  263. font=("Helvetica", 8), fg="gray").grid(
  264. row=12, column=0, columnspan=3, pady=(0, 10)
  265. )
  266. _apply_all() # activate defaults immediately
  267. root.mainloop()
  268. # ── Entry point ───────────────────────────────────────────────────────────────
  269. def main() -> None:
  270. state = BridgeState()
  271. mqtt_client = build_mqtt_client()
  272. ws_thread = threading.Thread(
  273. target=run_async_loop, args=(state, mqtt_client), daemon=True
  274. )
  275. ws_thread.start()
  276. print("[Bridge] Audio pipeline running — close this window to quit")
  277. run_speaker_ui(state, mqtt_client)
  278. if __name__ == "__main__":
  279. main()