| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361 |
- #!/usr/bin/env python3
- """
- bridge.py — Church Live Transcription Bridge
- Streams microphone audio to WhisperLiveKit (ws://localhost:8000/asr),
- receives transcription + speaker diarization, buffers sentences, and
- publishes rolling 3-line JSON to Mosquitto MQTT for the e-ink display.
- Start WhisperLiveKit with:
- wlk --model large-v3 --language en --diarization
- Run this script:
- python bridge.py
- """
- import asyncio
- import json
- import re
- import textwrap
- import threading
- import time
- from collections import Counter
- import numpy as np
- import paho.mqtt.client as mqtt
- import sounddevice as sd
- import websockets
- import tkinter as tk
- from tkinter import ttk
- # ── Configuration ─────────────────────────────────────────────────────────────
- MQTT_HOST = "localhost"
- MQTT_PORT = 1883
- MQTT_TOPIC_TEXT = "display/text"
- MQTT_TOPIC_CLEAR = "display/clear"
- WS_URL = "ws://localhost:8000/asr"
- SAMPLE_RATE = 16000
- CHANNELS = 1
- BLOCKSIZE = 4096 # ~256 ms per chunk at 16 kHz
- SENTENCE_TIMEOUT = 4.0 # seconds of silence before forcing a flush
- MAX_LINE_CHARS = 38 # characters per line (~24pt font at 800 px wide)
- DISPLAY_LINES = 3
- # ── State ─────────────────────────────────────────────────────────────────────
- class BridgeState:
- """All mutable state, protected by a single lock."""
- def __init__(self):
- self._lock = threading.Lock()
- self.speaker_names: dict[str, str] = {} # "SPEAKER_00" → "Pastor"
- self._current_speaker: str | None = None
- self._speaker_changed = False
- self._text_buffer = ""
- self._display: list[str] = [""] * DISPLAY_LINES
- self._last_final_time = time.monotonic()
- # ── Speaker name mapping ──────────────────────────────────────────────────
- def set_speaker_name(self, speaker_id: str, name: str) -> None:
- with self._lock:
- self.speaker_names[speaker_id] = name.strip()
- def _resolve(self, speaker_id: str | None) -> str | None:
- if not speaker_id:
- return None
- return self.speaker_names.get(speaker_id, speaker_id)
- # ── Text ingestion ────────────────────────────────────────────────────────
- def push_final(self, text: str, speaker_id: str | None, mqtt_client: mqtt.Client) -> None:
- """Accept a finalised segment; flush on sentence boundary or speaker change."""
- with self._lock:
- resolved = self._resolve(speaker_id)
- if resolved != self._current_speaker:
- if self._text_buffer:
- self._flush(mqtt_client) # push previous speaker's words first
- self._current_speaker = resolved
- self._speaker_changed = True
- sep = " " if self._text_buffer else ""
- self._text_buffer += sep + text.strip()
- self._last_final_time = time.monotonic()
- if _is_sentence_end(text):
- self._flush(mqtt_client)
- def maybe_timeout_flush(self, mqtt_client: mqtt.Client) -> None:
- with self._lock:
- if self._text_buffer and (time.monotonic() - self._last_final_time) > SENTENCE_TIMEOUT:
- self._flush(mqtt_client)
- def _flush(self, mqtt_client: mqtt.Client) -> None:
- """Word-wrap buffer → rolling display → publish. Must hold lock."""
- text = self._text_buffer.strip()
- self._text_buffer = ""
- if not text:
- return
- new_lines: list[str] = []
- if self._speaker_changed and self._current_speaker:
- new_lines.append(f"[{self._current_speaker.upper()}]")
- self._speaker_changed = False
- new_lines.extend(textwrap.wrap(text, MAX_LINE_CHARS) or [""])
- self._display.extend(new_lines)
- self._display = self._display[-DISPLAY_LINES:]
- while len(self._display) < DISPLAY_LINES:
- self._display.insert(0, "")
- payload = json.dumps({"lines": list(self._display)})
- mqtt_client.publish(MQTT_TOPIC_TEXT, payload)
- print(f"[Display] {self._display}")
- def clear(self, mqtt_client: mqtt.Client) -> None:
- with self._lock:
- self._display = [""] * DISPLAY_LINES
- self._text_buffer = ""
- self._current_speaker = None
- self._speaker_changed = False
- mqtt_client.publish(MQTT_TOPIC_CLEAR, "")
- print("[Display] Cleared")
- # ── Helpers ───────────────────────────────────────────────────────────────────
- def _is_sentence_end(text: str) -> bool:
- return bool(re.search(r'[.!?…]\s*$', text.strip()))
- def _extract_speaker(data: dict) -> str | None:
- """
- Extract speaker ID from a WhisperLiveKit response dict.
- Handles segment-level {"speaker": "SPEAKER_00"} and word-level
- {"words": [{"speaker": "SPEAKER_00", ...}, ...]} formats.
- """
- if "speaker" in data:
- return data["speaker"] or None
- words = data.get("words", [])
- if words:
- ids = [w.get("speaker") for w in words if w.get("speaker")]
- if ids:
- return Counter(ids).most_common(1)[0][0]
- return None
- # ── MQTT ──────────────────────────────────────────────────────────────────────
- def build_mqtt_client() -> mqtt.Client:
- client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
- def on_connect(client, userdata, flags, rc, props):
- print("[MQTT] Connected" if rc == 0 else f"[MQTT] Failed: {rc}")
- def on_disconnect(client, userdata, flags, rc, props):
- print(f"[MQTT] Disconnected ({rc}), will reconnect...")
- client.on_connect = on_connect
- client.on_disconnect = on_disconnect
- client.reconnect_delay_set(min_delay=1, max_delay=30)
- client.connect_async(MQTT_HOST, MQTT_PORT)
- client.loop_start()
- return client
- # ── WebSocket + audio pipeline ────────────────────────────────────────────────
- async def _sender(ws, queue: asyncio.Queue) -> None:
- while not queue.empty(): # drain stale audio before streaming
- queue.get_nowait()
- while True:
- chunk = await queue.get()
- await ws.send(chunk)
- async def _receiver(ws, state: BridgeState, mqtt_client: mqtt.Client) -> None:
- async for message in ws:
- try:
- data = json.loads(message)
- except (json.JSONDecodeError, TypeError):
- continue
- text = (data.get("text") or data.get("buffer_transcription") or "").strip()
- is_final = data.get("is_final", False) or data.get("end_of_segment", False)
- speaker = _extract_speaker(data)
- if is_final and text:
- print(f"[Whisper] ({speaker or '?'}) {text}")
- state.push_final(text, speaker, mqtt_client)
- async def _flusher(state: BridgeState, mqtt_client: mqtt.Client) -> None:
- while True:
- await asyncio.sleep(1.0)
- state.maybe_timeout_flush(mqtt_client)
- async def audio_ws_loop(state: BridgeState, mqtt_client: mqtt.Client) -> None:
- audio_queue: asyncio.Queue[bytes] = asyncio.Queue(maxsize=120)
- loop = asyncio.get_running_loop()
- def audio_callback(indata: np.ndarray, frames: int, time_info, status) -> None:
- if status:
- print(f"[Audio] {status}")
- chunk = indata.tobytes()
- def _put():
- try:
- audio_queue.put_nowait(chunk)
- except asyncio.QueueFull:
- pass
- loop.call_soon_threadsafe(_put)
- with sd.InputStream(
- samplerate=SAMPLE_RATE,
- channels=CHANNELS,
- dtype="int16",
- blocksize=BLOCKSIZE,
- callback=audio_callback,
- ):
- flusher = asyncio.create_task(_flusher(state, mqtt_client))
- try:
- while True:
- try:
- print(f"[WS] Connecting to {WS_URL} ...")
- async with websockets.connect(WS_URL, max_size=2**23) as ws:
- print("[WS] Connected")
- send_t = asyncio.create_task(_sender(ws, audio_queue))
- recv_t = asyncio.create_task(_receiver(ws, state, mqtt_client))
- done, pending = await asyncio.wait(
- [send_t, recv_t], return_when=asyncio.FIRST_COMPLETED
- )
- for t in pending:
- t.cancel()
- for t in done:
- if not t.cancelled() and (exc := t.exception()):
- print(f"[WS] Task error: {exc}")
- except (websockets.ConnectionClosed, OSError, ConnectionRefusedError) as exc:
- print(f"[WS] {exc} — retrying in 3 s...")
- await asyncio.sleep(3)
- finally:
- flusher.cancel()
- def run_async_loop(state: BridgeState, mqtt_client: mqtt.Client) -> None:
- asyncio.run(audio_ws_loop(state, mqtt_client))
- # ── Speaker name-mapping UI ───────────────────────────────────────────────────
- PRESET_SPEAKERS = [
- ("SPEAKER_00", "Pastor"),
- ("SPEAKER_01", "Reader"),
- ("SPEAKER_02", "Guest"),
- ("SPEAKER_03", "Choir"),
- ]
- def run_speaker_ui(state: BridgeState, mqtt_client: mqtt.Client) -> None:
- root = tk.Tk()
- root.title("Transcription Bridge — Speaker Names")
- root.attributes("-topmost", True)
- root.resizable(False, False)
- tk.Label(root, text="Speaker Name Mapping", font=("Helvetica", 12, "bold")).grid(
- row=0, column=0, columnspan=3, pady=(12, 2), padx=12
- )
- tk.Label(
- root,
- text="Diarization is automatic. Assign readable names to each speaker ID.",
- font=("Helvetica", 9), fg="gray", justify="center",
- ).grid(row=1, column=0, columnspan=3, pady=(0, 8))
- tk.Label(root, text="Speaker ID", font=("Helvetica", 10, "bold")).grid(row=2, column=0, padx=8)
- tk.Label(root, text="Friendly Name", font=("Helvetica", 10, "bold")).grid(row=2, column=1, padx=8)
- entries: list[tuple[str, tk.Entry]] = []
- for i, (sid, default) in enumerate(PRESET_SPEAKERS):
- tk.Label(root, text=sid, font=("Courier", 10)).grid(row=3+i, column=0, sticky="e", padx=8, pady=3)
- e = tk.Entry(root, width=16, font=("Helvetica", 10))
- e.insert(0, default)
- e.grid(row=3+i, column=1, padx=8, pady=3)
- entries.append((sid, e))
- def _apply(s=sid, entry=e):
- state.set_speaker_name(s, entry.get())
- print(f"[UI] {s} → {entry.get()!r}")
- tk.Button(root, text="Apply", command=_apply, width=6).grid(row=3+i, column=2, padx=6)
- ttk.Separator(root, orient="horizontal").grid(
- row=7, column=0, columnspan=3, sticky="ew", padx=8, pady=8
- )
- # Custom ID row
- tk.Label(root, text="Custom ID:").grid(row=8, column=0, sticky="e", padx=8)
- cid = tk.Entry(root, width=14, font=("Courier", 10))
- cid.insert(0, "SPEAKER_04")
- cid.grid(row=8, column=1, sticky="w", padx=8, pady=2)
- tk.Label(root, text="Name:").grid(row=9, column=0, sticky="e", padx=8)
- cname = tk.Entry(root, width=14, font=("Helvetica", 10))
- cname.grid(row=9, column=1, sticky="w", padx=8, pady=2)
- def _apply_custom():
- s, n = cid.get().strip(), cname.get().strip()
- if s and n:
- state.set_speaker_name(s, n)
- print(f"[UI] Custom: {s} → {n!r}")
- tk.Button(root, text="Apply", command=_apply_custom, width=6).grid(row=9, column=2, padx=6)
- ttk.Separator(root, orient="horizontal").grid(
- row=10, column=0, columnspan=3, sticky="ew", padx=8, pady=8
- )
- def _apply_all():
- for sid, entry in entries:
- state.set_speaker_name(sid, entry.get())
- print("[UI] All names applied")
- tk.Button(root, text="Apply All Names", width=18, command=_apply_all).grid(
- row=11, column=0, columnspan=2, padx=8, pady=4, sticky="w"
- )
- tk.Button(root, text="Clear Display", width=14, fg="red",
- command=lambda: state.clear(mqtt_client)).grid(
- row=11, column=2, padx=8, pady=4
- )
- tk.Label(root, text="Speaker labels appear on the display when the speaker changes.",
- font=("Helvetica", 8), fg="gray").grid(
- row=12, column=0, columnspan=3, pady=(0, 10)
- )
- _apply_all() # activate defaults immediately
- root.mainloop()
- # ── Entry point ───────────────────────────────────────────────────────────────
- def main() -> None:
- state = BridgeState()
- mqtt_client = build_mqtt_client()
- ws_thread = threading.Thread(
- target=run_async_loop, args=(state, mqtt_client), daemon=True
- )
- ws_thread.start()
- print("[Bridge] Audio pipeline running — close this window to quit")
- run_speaker_ui(state, mqtt_client)
- if __name__ == "__main__":
- main()
|