#!/usr/bin/env python3 """ bridge.py — Church Live Transcription Bridge Streams microphone audio to WhisperLiveKit (ws://localhost:8000/asr), receives transcription + speaker diarization, buffers sentences, and publishes rolling 3-line JSON to Mosquitto MQTT for the e-ink display. Start WhisperLiveKit with: wlk --model large-v3 --language en --diarization Run this script: python bridge.py """ import asyncio import json import re import textwrap import threading import time from collections import Counter import numpy as np import paho.mqtt.client as mqtt import sounddevice as sd import websockets import tkinter as tk from tkinter import ttk # ── Configuration ───────────────────────────────────────────────────────────── MQTT_HOST = "localhost" MQTT_PORT = 1883 MQTT_TOPIC_TEXT = "display/text" MQTT_TOPIC_CLEAR = "display/clear" WS_URL = "ws://localhost:8000/asr" SAMPLE_RATE = 16000 CHANNELS = 1 BLOCKSIZE = 4096 # ~256 ms per chunk at 16 kHz SENTENCE_TIMEOUT = 4.0 # seconds of silence before forcing a flush MAX_LINE_CHARS = 38 # characters per line (~24pt font at 800 px wide) DISPLAY_LINES = 3 # ── State ───────────────────────────────────────────────────────────────────── class BridgeState: """All mutable state, protected by a single lock.""" def __init__(self): self._lock = threading.Lock() self.speaker_names: dict[str, str] = {} # "SPEAKER_00" → "Pastor" self._current_speaker: str | None = None self._speaker_changed = False self._text_buffer = "" self._display: list[str] = [""] * DISPLAY_LINES self._last_final_time = time.monotonic() # ── Speaker name mapping ────────────────────────────────────────────────── def set_speaker_name(self, speaker_id: str, name: str) -> None: with self._lock: self.speaker_names[speaker_id] = name.strip() def _resolve(self, speaker_id: str | None) -> str | None: if not speaker_id: return None return self.speaker_names.get(speaker_id, speaker_id) # ── Text ingestion ──────────────────────────────────────────────────────── def push_final(self, text: str, speaker_id: str | None, mqtt_client: mqtt.Client) -> None: """Accept a finalised segment; flush on sentence boundary or speaker change.""" with self._lock: resolved = self._resolve(speaker_id) if resolved != self._current_speaker: if self._text_buffer: self._flush(mqtt_client) # push previous speaker's words first self._current_speaker = resolved self._speaker_changed = True sep = " " if self._text_buffer else "" self._text_buffer += sep + text.strip() self._last_final_time = time.monotonic() if _is_sentence_end(text): self._flush(mqtt_client) def maybe_timeout_flush(self, mqtt_client: mqtt.Client) -> None: with self._lock: if self._text_buffer and (time.monotonic() - self._last_final_time) > SENTENCE_TIMEOUT: self._flush(mqtt_client) def _flush(self, mqtt_client: mqtt.Client) -> None: """Word-wrap buffer → rolling display → publish. Must hold lock.""" text = self._text_buffer.strip() self._text_buffer = "" if not text: return new_lines: list[str] = [] if self._speaker_changed and self._current_speaker: new_lines.append(f"[{self._current_speaker.upper()}]") self._speaker_changed = False new_lines.extend(textwrap.wrap(text, MAX_LINE_CHARS) or [""]) self._display.extend(new_lines) self._display = self._display[-DISPLAY_LINES:] while len(self._display) < DISPLAY_LINES: self._display.insert(0, "") payload = json.dumps({"lines": list(self._display)}) mqtt_client.publish(MQTT_TOPIC_TEXT, payload) print(f"[Display] {self._display}") def clear(self, mqtt_client: mqtt.Client) -> None: with self._lock: self._display = [""] * DISPLAY_LINES self._text_buffer = "" self._current_speaker = None self._speaker_changed = False mqtt_client.publish(MQTT_TOPIC_CLEAR, "") print("[Display] Cleared") # ── Helpers ─────────────────────────────────────────────────────────────────── def _is_sentence_end(text: str) -> bool: return bool(re.search(r'[.!?…]\s*$', text.strip())) def _extract_speaker(data: dict) -> str | None: """ Extract speaker ID from a WhisperLiveKit response dict. Handles segment-level {"speaker": "SPEAKER_00"} and word-level {"words": [{"speaker": "SPEAKER_00", ...}, ...]} formats. """ if "speaker" in data: return data["speaker"] or None words = data.get("words", []) if words: ids = [w.get("speaker") for w in words if w.get("speaker")] if ids: return Counter(ids).most_common(1)[0][0] return None # ── MQTT ────────────────────────────────────────────────────────────────────── def build_mqtt_client() -> mqtt.Client: client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) def on_connect(client, userdata, flags, rc, props): print("[MQTT] Connected" if rc == 0 else f"[MQTT] Failed: {rc}") def on_disconnect(client, userdata, flags, rc, props): print(f"[MQTT] Disconnected ({rc}), will reconnect...") client.on_connect = on_connect client.on_disconnect = on_disconnect client.reconnect_delay_set(min_delay=1, max_delay=30) client.connect_async(MQTT_HOST, MQTT_PORT) client.loop_start() return client # ── WebSocket + audio pipeline ──────────────────────────────────────────────── async def _sender(ws, queue: asyncio.Queue) -> None: while not queue.empty(): # drain stale audio before streaming queue.get_nowait() while True: chunk = await queue.get() await ws.send(chunk) async def _receiver(ws, state: BridgeState, mqtt_client: mqtt.Client) -> None: async for message in ws: try: data = json.loads(message) except (json.JSONDecodeError, TypeError): continue text = (data.get("text") or data.get("buffer_transcription") or "").strip() is_final = data.get("is_final", False) or data.get("end_of_segment", False) speaker = _extract_speaker(data) if is_final and text: print(f"[Whisper] ({speaker or '?'}) {text}") state.push_final(text, speaker, mqtt_client) async def _flusher(state: BridgeState, mqtt_client: mqtt.Client) -> None: while True: await asyncio.sleep(1.0) state.maybe_timeout_flush(mqtt_client) async def audio_ws_loop(state: BridgeState, mqtt_client: mqtt.Client) -> None: audio_queue: asyncio.Queue[bytes] = asyncio.Queue(maxsize=120) loop = asyncio.get_running_loop() def audio_callback(indata: np.ndarray, frames: int, time_info, status) -> None: if status: print(f"[Audio] {status}") chunk = indata.tobytes() def _put(): try: audio_queue.put_nowait(chunk) except asyncio.QueueFull: pass loop.call_soon_threadsafe(_put) with sd.InputStream( samplerate=SAMPLE_RATE, channels=CHANNELS, dtype="int16", blocksize=BLOCKSIZE, callback=audio_callback, ): flusher = asyncio.create_task(_flusher(state, mqtt_client)) try: while True: try: print(f"[WS] Connecting to {WS_URL} ...") async with websockets.connect(WS_URL, max_size=2**23) as ws: print("[WS] Connected") send_t = asyncio.create_task(_sender(ws, audio_queue)) recv_t = asyncio.create_task(_receiver(ws, state, mqtt_client)) done, pending = await asyncio.wait( [send_t, recv_t], return_when=asyncio.FIRST_COMPLETED ) for t in pending: t.cancel() for t in done: if not t.cancelled() and (exc := t.exception()): print(f"[WS] Task error: {exc}") except (websockets.ConnectionClosed, OSError, ConnectionRefusedError) as exc: print(f"[WS] {exc} — retrying in 3 s...") await asyncio.sleep(3) finally: flusher.cancel() def run_async_loop(state: BridgeState, mqtt_client: mqtt.Client) -> None: asyncio.run(audio_ws_loop(state, mqtt_client)) # ── Speaker name-mapping UI ─────────────────────────────────────────────────── PRESET_SPEAKERS = [ ("SPEAKER_00", "Pastor"), ("SPEAKER_01", "Reader"), ("SPEAKER_02", "Guest"), ("SPEAKER_03", "Choir"), ] def run_speaker_ui(state: BridgeState, mqtt_client: mqtt.Client) -> None: root = tk.Tk() root.title("Transcription Bridge — Speaker Names") root.attributes("-topmost", True) root.resizable(False, False) tk.Label(root, text="Speaker Name Mapping", font=("Helvetica", 12, "bold")).grid( row=0, column=0, columnspan=3, pady=(12, 2), padx=12 ) tk.Label( root, text="Diarization is automatic. Assign readable names to each speaker ID.", font=("Helvetica", 9), fg="gray", justify="center", ).grid(row=1, column=0, columnspan=3, pady=(0, 8)) tk.Label(root, text="Speaker ID", font=("Helvetica", 10, "bold")).grid(row=2, column=0, padx=8) tk.Label(root, text="Friendly Name", font=("Helvetica", 10, "bold")).grid(row=2, column=1, padx=8) entries: list[tuple[str, tk.Entry]] = [] for i, (sid, default) in enumerate(PRESET_SPEAKERS): tk.Label(root, text=sid, font=("Courier", 10)).grid(row=3+i, column=0, sticky="e", padx=8, pady=3) e = tk.Entry(root, width=16, font=("Helvetica", 10)) e.insert(0, default) e.grid(row=3+i, column=1, padx=8, pady=3) entries.append((sid, e)) def _apply(s=sid, entry=e): state.set_speaker_name(s, entry.get()) print(f"[UI] {s} → {entry.get()!r}") tk.Button(root, text="Apply", command=_apply, width=6).grid(row=3+i, column=2, padx=6) ttk.Separator(root, orient="horizontal").grid( row=7, column=0, columnspan=3, sticky="ew", padx=8, pady=8 ) # Custom ID row tk.Label(root, text="Custom ID:").grid(row=8, column=0, sticky="e", padx=8) cid = tk.Entry(root, width=14, font=("Courier", 10)) cid.insert(0, "SPEAKER_04") cid.grid(row=8, column=1, sticky="w", padx=8, pady=2) tk.Label(root, text="Name:").grid(row=9, column=0, sticky="e", padx=8) cname = tk.Entry(root, width=14, font=("Helvetica", 10)) cname.grid(row=9, column=1, sticky="w", padx=8, pady=2) def _apply_custom(): s, n = cid.get().strip(), cname.get().strip() if s and n: state.set_speaker_name(s, n) print(f"[UI] Custom: {s} → {n!r}") tk.Button(root, text="Apply", command=_apply_custom, width=6).grid(row=9, column=2, padx=6) ttk.Separator(root, orient="horizontal").grid( row=10, column=0, columnspan=3, sticky="ew", padx=8, pady=8 ) def _apply_all(): for sid, entry in entries: state.set_speaker_name(sid, entry.get()) print("[UI] All names applied") tk.Button(root, text="Apply All Names", width=18, command=_apply_all).grid( row=11, column=0, columnspan=2, padx=8, pady=4, sticky="w" ) tk.Button(root, text="Clear Display", width=14, fg="red", command=lambda: state.clear(mqtt_client)).grid( row=11, column=2, padx=8, pady=4 ) tk.Label(root, text="Speaker labels appear on the display when the speaker changes.", font=("Helvetica", 8), fg="gray").grid( row=12, column=0, columnspan=3, pady=(0, 10) ) _apply_all() # activate defaults immediately root.mainloop() # ── Entry point ─────────────────────────────────────────────────────────────── def main() -> None: state = BridgeState() mqtt_client = build_mqtt_client() ws_thread = threading.Thread( target=run_async_loop, args=(state, mqtt_client), daemon=True ) ws_thread.start() print("[Bridge] Audio pipeline running — close this window to quit") run_speaker_ui(state, mqtt_client) if __name__ == "__main__": main()