|
@@ -0,0 +1,361 @@
|
|
|
|
|
+#!/usr/bin/env python3
|
|
|
|
|
+"""
|
|
|
|
|
+bridge.py — Church Live Transcription Bridge
|
|
|
|
|
+
|
|
|
|
|
+Streams microphone audio to WhisperLiveKit (ws://localhost:8000/asr),
|
|
|
|
|
+receives transcription + speaker diarization, buffers sentences, and
|
|
|
|
|
+publishes rolling 3-line JSON to Mosquitto MQTT for the e-ink display.
|
|
|
|
|
+
|
|
|
|
|
+Start WhisperLiveKit with:
|
|
|
|
|
+ wlk --model large-v3 --language en --diarization
|
|
|
|
|
+
|
|
|
|
|
+Run this script:
|
|
|
|
|
+ python bridge.py
|
|
|
|
|
+"""
|
|
|
|
|
+
|
|
|
|
|
+import asyncio
|
|
|
|
|
+import json
|
|
|
|
|
+import re
|
|
|
|
|
+import textwrap
|
|
|
|
|
+import threading
|
|
|
|
|
+import time
|
|
|
|
|
+from collections import Counter
|
|
|
|
|
+
|
|
|
|
|
+import numpy as np
|
|
|
|
|
+import paho.mqtt.client as mqtt
|
|
|
|
|
+import sounddevice as sd
|
|
|
|
|
+import websockets
|
|
|
|
|
+import tkinter as tk
|
|
|
|
|
+from tkinter import ttk
|
|
|
|
|
+
|
|
|
|
|
+# ── Configuration ─────────────────────────────────────────────────────────────
|
|
|
|
|
+
|
|
|
|
|
+MQTT_HOST = "localhost"
|
|
|
|
|
+MQTT_PORT = 1883
|
|
|
|
|
+MQTT_TOPIC_TEXT = "display/text"
|
|
|
|
|
+MQTT_TOPIC_CLEAR = "display/clear"
|
|
|
|
|
+
|
|
|
|
|
+WS_URL = "ws://localhost:8000/asr"
|
|
|
|
|
+SAMPLE_RATE = 16000
|
|
|
|
|
+CHANNELS = 1
|
|
|
|
|
+BLOCKSIZE = 4096 # ~256 ms per chunk at 16 kHz
|
|
|
|
|
+
|
|
|
|
|
+SENTENCE_TIMEOUT = 4.0 # seconds of silence before forcing a flush
|
|
|
|
|
+MAX_LINE_CHARS = 38 # characters per line (~24pt font at 800 px wide)
|
|
|
|
|
+DISPLAY_LINES = 3
|
|
|
|
|
+
|
|
|
|
|
+# ── State ─────────────────────────────────────────────────────────────────────
|
|
|
|
|
+
|
|
|
|
|
+class BridgeState:
|
|
|
|
|
+ """All mutable state, protected by a single lock."""
|
|
|
|
|
+
|
|
|
|
|
+ def __init__(self):
|
|
|
|
|
+ self._lock = threading.Lock()
|
|
|
|
|
+ self.speaker_names: dict[str, str] = {} # "SPEAKER_00" → "Pastor"
|
|
|
|
|
+ self._current_speaker: str | None = None
|
|
|
|
|
+ self._speaker_changed = False
|
|
|
|
|
+ self._text_buffer = ""
|
|
|
|
|
+ self._display: list[str] = [""] * DISPLAY_LINES
|
|
|
|
|
+ self._last_final_time = time.monotonic()
|
|
|
|
|
+
|
|
|
|
|
+ # ── Speaker name mapping ──────────────────────────────────────────────────
|
|
|
|
|
+
|
|
|
|
|
+ def set_speaker_name(self, speaker_id: str, name: str) -> None:
|
|
|
|
|
+ with self._lock:
|
|
|
|
|
+ self.speaker_names[speaker_id] = name.strip()
|
|
|
|
|
+
|
|
|
|
|
+ def _resolve(self, speaker_id: str | None) -> str | None:
|
|
|
|
|
+ if not speaker_id:
|
|
|
|
|
+ return None
|
|
|
|
|
+ return self.speaker_names.get(speaker_id, speaker_id)
|
|
|
|
|
+
|
|
|
|
|
+ # ── Text ingestion ────────────────────────────────────────────────────────
|
|
|
|
|
+
|
|
|
|
|
+ def push_final(self, text: str, speaker_id: str | None, mqtt_client: mqtt.Client) -> None:
|
|
|
|
|
+ """Accept a finalised segment; flush on sentence boundary or speaker change."""
|
|
|
|
|
+ with self._lock:
|
|
|
|
|
+ resolved = self._resolve(speaker_id)
|
|
|
|
|
+
|
|
|
|
|
+ if resolved != self._current_speaker:
|
|
|
|
|
+ if self._text_buffer:
|
|
|
|
|
+ self._flush(mqtt_client) # push previous speaker's words first
|
|
|
|
|
+ self._current_speaker = resolved
|
|
|
|
|
+ self._speaker_changed = True
|
|
|
|
|
+
|
|
|
|
|
+ sep = " " if self._text_buffer else ""
|
|
|
|
|
+ self._text_buffer += sep + text.strip()
|
|
|
|
|
+ self._last_final_time = time.monotonic()
|
|
|
|
|
+
|
|
|
|
|
+ if _is_sentence_end(text):
|
|
|
|
|
+ self._flush(mqtt_client)
|
|
|
|
|
+
|
|
|
|
|
+ def maybe_timeout_flush(self, mqtt_client: mqtt.Client) -> None:
|
|
|
|
|
+ with self._lock:
|
|
|
|
|
+ if self._text_buffer and (time.monotonic() - self._last_final_time) > SENTENCE_TIMEOUT:
|
|
|
|
|
+ self._flush(mqtt_client)
|
|
|
|
|
+
|
|
|
|
|
+ def _flush(self, mqtt_client: mqtt.Client) -> None:
|
|
|
|
|
+ """Word-wrap buffer → rolling display → publish. Must hold lock."""
|
|
|
|
|
+ text = self._text_buffer.strip()
|
|
|
|
|
+ self._text_buffer = ""
|
|
|
|
|
+ if not text:
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
|
|
+ new_lines: list[str] = []
|
|
|
|
|
+ if self._speaker_changed and self._current_speaker:
|
|
|
|
|
+ new_lines.append(f"[{self._current_speaker.upper()}]")
|
|
|
|
|
+ self._speaker_changed = False
|
|
|
|
|
+
|
|
|
|
|
+ new_lines.extend(textwrap.wrap(text, MAX_LINE_CHARS) or [""])
|
|
|
|
|
+
|
|
|
|
|
+ self._display.extend(new_lines)
|
|
|
|
|
+ self._display = self._display[-DISPLAY_LINES:]
|
|
|
|
|
+ while len(self._display) < DISPLAY_LINES:
|
|
|
|
|
+ self._display.insert(0, "")
|
|
|
|
|
+
|
|
|
|
|
+ payload = json.dumps({"lines": list(self._display)})
|
|
|
|
|
+ mqtt_client.publish(MQTT_TOPIC_TEXT, payload)
|
|
|
|
|
+ print(f"[Display] {self._display}")
|
|
|
|
|
+
|
|
|
|
|
+ def clear(self, mqtt_client: mqtt.Client) -> None:
|
|
|
|
|
+ with self._lock:
|
|
|
|
|
+ self._display = [""] * DISPLAY_LINES
|
|
|
|
|
+ self._text_buffer = ""
|
|
|
|
|
+ self._current_speaker = None
|
|
|
|
|
+ self._speaker_changed = False
|
|
|
|
|
+ mqtt_client.publish(MQTT_TOPIC_CLEAR, "")
|
|
|
|
|
+ print("[Display] Cleared")
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+# ── Helpers ───────────────────────────────────────────────────────────────────
|
|
|
|
|
+
|
|
|
|
|
+def _is_sentence_end(text: str) -> bool:
|
|
|
|
|
+ return bool(re.search(r'[.!?…]\s*$', text.strip()))
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def _extract_speaker(data: dict) -> str | None:
|
|
|
|
|
+ """
|
|
|
|
|
+ Extract speaker ID from a WhisperLiveKit response dict.
|
|
|
|
|
+ Handles segment-level {"speaker": "SPEAKER_00"} and word-level
|
|
|
|
|
+ {"words": [{"speaker": "SPEAKER_00", ...}, ...]} formats.
|
|
|
|
|
+ """
|
|
|
|
|
+ if "speaker" in data:
|
|
|
|
|
+ return data["speaker"] or None
|
|
|
|
|
+
|
|
|
|
|
+ words = data.get("words", [])
|
|
|
|
|
+ if words:
|
|
|
|
|
+ ids = [w.get("speaker") for w in words if w.get("speaker")]
|
|
|
|
|
+ if ids:
|
|
|
|
|
+ return Counter(ids).most_common(1)[0][0]
|
|
|
|
|
+
|
|
|
|
|
+ return None
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+# ── MQTT ──────────────────────────────────────────────────────────────────────
|
|
|
|
|
+
|
|
|
|
|
+def build_mqtt_client() -> mqtt.Client:
|
|
|
|
|
+ client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
|
|
|
|
|
+
|
|
|
|
|
+ def on_connect(client, userdata, flags, rc, props):
|
|
|
|
|
+ print("[MQTT] Connected" if rc == 0 else f"[MQTT] Failed: {rc}")
|
|
|
|
|
+
|
|
|
|
|
+ def on_disconnect(client, userdata, flags, rc, props):
|
|
|
|
|
+ print(f"[MQTT] Disconnected ({rc}), will reconnect...")
|
|
|
|
|
+
|
|
|
|
|
+ client.on_connect = on_connect
|
|
|
|
|
+ client.on_disconnect = on_disconnect
|
|
|
|
|
+ client.reconnect_delay_set(min_delay=1, max_delay=30)
|
|
|
|
|
+ client.connect_async(MQTT_HOST, MQTT_PORT)
|
|
|
|
|
+ client.loop_start()
|
|
|
|
|
+ return client
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+# ── WebSocket + audio pipeline ────────────────────────────────────────────────
|
|
|
|
|
+
|
|
|
|
|
+async def _sender(ws, queue: asyncio.Queue) -> None:
|
|
|
|
|
+ while not queue.empty(): # drain stale audio before streaming
|
|
|
|
|
+ queue.get_nowait()
|
|
|
|
|
+ while True:
|
|
|
|
|
+ chunk = await queue.get()
|
|
|
|
|
+ await ws.send(chunk)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+async def _receiver(ws, state: BridgeState, mqtt_client: mqtt.Client) -> None:
|
|
|
|
|
+ async for message in ws:
|
|
|
|
|
+ try:
|
|
|
|
|
+ data = json.loads(message)
|
|
|
|
|
+ except (json.JSONDecodeError, TypeError):
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ text = (data.get("text") or data.get("buffer_transcription") or "").strip()
|
|
|
|
|
+ is_final = data.get("is_final", False) or data.get("end_of_segment", False)
|
|
|
|
|
+ speaker = _extract_speaker(data)
|
|
|
|
|
+
|
|
|
|
|
+ if is_final and text:
|
|
|
|
|
+ print(f"[Whisper] ({speaker or '?'}) {text}")
|
|
|
|
|
+ state.push_final(text, speaker, mqtt_client)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+async def _flusher(state: BridgeState, mqtt_client: mqtt.Client) -> None:
|
|
|
|
|
+ while True:
|
|
|
|
|
+ await asyncio.sleep(1.0)
|
|
|
|
|
+ state.maybe_timeout_flush(mqtt_client)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+async def audio_ws_loop(state: BridgeState, mqtt_client: mqtt.Client) -> None:
|
|
|
|
|
+ audio_queue: asyncio.Queue[bytes] = asyncio.Queue(maxsize=120)
|
|
|
|
|
+ loop = asyncio.get_running_loop()
|
|
|
|
|
+
|
|
|
|
|
+ def audio_callback(indata: np.ndarray, frames: int, time_info, status) -> None:
|
|
|
|
|
+ if status:
|
|
|
|
|
+ print(f"[Audio] {status}")
|
|
|
|
|
+ chunk = indata.tobytes()
|
|
|
|
|
+ def _put():
|
|
|
|
|
+ try:
|
|
|
|
|
+ audio_queue.put_nowait(chunk)
|
|
|
|
|
+ except asyncio.QueueFull:
|
|
|
|
|
+ pass
|
|
|
|
|
+ loop.call_soon_threadsafe(_put)
|
|
|
|
|
+
|
|
|
|
|
+ with sd.InputStream(
|
|
|
|
|
+ samplerate=SAMPLE_RATE,
|
|
|
|
|
+ channels=CHANNELS,
|
|
|
|
|
+ dtype="int16",
|
|
|
|
|
+ blocksize=BLOCKSIZE,
|
|
|
|
|
+ callback=audio_callback,
|
|
|
|
|
+ ):
|
|
|
|
|
+ flusher = asyncio.create_task(_flusher(state, mqtt_client))
|
|
|
|
|
+ try:
|
|
|
|
|
+ while True:
|
|
|
|
|
+ try:
|
|
|
|
|
+ print(f"[WS] Connecting to {WS_URL} ...")
|
|
|
|
|
+ async with websockets.connect(WS_URL, max_size=2**23) as ws:
|
|
|
|
|
+ print("[WS] Connected")
|
|
|
|
|
+ send_t = asyncio.create_task(_sender(ws, audio_queue))
|
|
|
|
|
+ recv_t = asyncio.create_task(_receiver(ws, state, mqtt_client))
|
|
|
|
|
+ done, pending = await asyncio.wait(
|
|
|
|
|
+ [send_t, recv_t], return_when=asyncio.FIRST_COMPLETED
|
|
|
|
|
+ )
|
|
|
|
|
+ for t in pending:
|
|
|
|
|
+ t.cancel()
|
|
|
|
|
+ for t in done:
|
|
|
|
|
+ if not t.cancelled() and (exc := t.exception()):
|
|
|
|
|
+ print(f"[WS] Task error: {exc}")
|
|
|
|
|
+ except (websockets.ConnectionClosed, OSError, ConnectionRefusedError) as exc:
|
|
|
|
|
+ print(f"[WS] {exc} — retrying in 3 s...")
|
|
|
|
|
+ await asyncio.sleep(3)
|
|
|
|
|
+ finally:
|
|
|
|
|
+ flusher.cancel()
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def run_async_loop(state: BridgeState, mqtt_client: mqtt.Client) -> None:
|
|
|
|
|
+ asyncio.run(audio_ws_loop(state, mqtt_client))
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+# ── Speaker name-mapping UI ───────────────────────────────────────────────────
|
|
|
|
|
+
|
|
|
|
|
+PRESET_SPEAKERS = [
|
|
|
|
|
+ ("SPEAKER_00", "Pastor"),
|
|
|
|
|
+ ("SPEAKER_01", "Reader"),
|
|
|
|
|
+ ("SPEAKER_02", "Guest"),
|
|
|
|
|
+ ("SPEAKER_03", "Choir"),
|
|
|
|
|
+]
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def run_speaker_ui(state: BridgeState, mqtt_client: mqtt.Client) -> None:
|
|
|
|
|
+ root = tk.Tk()
|
|
|
|
|
+ root.title("Transcription Bridge — Speaker Names")
|
|
|
|
|
+ root.attributes("-topmost", True)
|
|
|
|
|
+ root.resizable(False, False)
|
|
|
|
|
+
|
|
|
|
|
+ tk.Label(root, text="Speaker Name Mapping", font=("Helvetica", 12, "bold")).grid(
|
|
|
|
|
+ row=0, column=0, columnspan=3, pady=(12, 2), padx=12
|
|
|
|
|
+ )
|
|
|
|
|
+ tk.Label(
|
|
|
|
|
+ root,
|
|
|
|
|
+ text="Diarization is automatic. Assign readable names to each speaker ID.",
|
|
|
|
|
+ font=("Helvetica", 9), fg="gray", justify="center",
|
|
|
|
|
+ ).grid(row=1, column=0, columnspan=3, pady=(0, 8))
|
|
|
|
|
+
|
|
|
|
|
+ tk.Label(root, text="Speaker ID", font=("Helvetica", 10, "bold")).grid(row=2, column=0, padx=8)
|
|
|
|
|
+ tk.Label(root, text="Friendly Name", font=("Helvetica", 10, "bold")).grid(row=2, column=1, padx=8)
|
|
|
|
|
+
|
|
|
|
|
+ entries: list[tuple[str, tk.Entry]] = []
|
|
|
|
|
+ for i, (sid, default) in enumerate(PRESET_SPEAKERS):
|
|
|
|
|
+ tk.Label(root, text=sid, font=("Courier", 10)).grid(row=3+i, column=0, sticky="e", padx=8, pady=3)
|
|
|
|
|
+ e = tk.Entry(root, width=16, font=("Helvetica", 10))
|
|
|
|
|
+ e.insert(0, default)
|
|
|
|
|
+ e.grid(row=3+i, column=1, padx=8, pady=3)
|
|
|
|
|
+ entries.append((sid, e))
|
|
|
|
|
+
|
|
|
|
|
+ def _apply(s=sid, entry=e):
|
|
|
|
|
+ state.set_speaker_name(s, entry.get())
|
|
|
|
|
+ print(f"[UI] {s} → {entry.get()!r}")
|
|
|
|
|
+
|
|
|
|
|
+ tk.Button(root, text="Apply", command=_apply, width=6).grid(row=3+i, column=2, padx=6)
|
|
|
|
|
+
|
|
|
|
|
+ ttk.Separator(root, orient="horizontal").grid(
|
|
|
|
|
+ row=7, column=0, columnspan=3, sticky="ew", padx=8, pady=8
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ # Custom ID row
|
|
|
|
|
+ tk.Label(root, text="Custom ID:").grid(row=8, column=0, sticky="e", padx=8)
|
|
|
|
|
+ cid = tk.Entry(root, width=14, font=("Courier", 10))
|
|
|
|
|
+ cid.insert(0, "SPEAKER_04")
|
|
|
|
|
+ cid.grid(row=8, column=1, sticky="w", padx=8, pady=2)
|
|
|
|
|
+
|
|
|
|
|
+ tk.Label(root, text="Name:").grid(row=9, column=0, sticky="e", padx=8)
|
|
|
|
|
+ cname = tk.Entry(root, width=14, font=("Helvetica", 10))
|
|
|
|
|
+ cname.grid(row=9, column=1, sticky="w", padx=8, pady=2)
|
|
|
|
|
+
|
|
|
|
|
+ def _apply_custom():
|
|
|
|
|
+ s, n = cid.get().strip(), cname.get().strip()
|
|
|
|
|
+ if s and n:
|
|
|
|
|
+ state.set_speaker_name(s, n)
|
|
|
|
|
+ print(f"[UI] Custom: {s} → {n!r}")
|
|
|
|
|
+
|
|
|
|
|
+ tk.Button(root, text="Apply", command=_apply_custom, width=6).grid(row=9, column=2, padx=6)
|
|
|
|
|
+
|
|
|
|
|
+ ttk.Separator(root, orient="horizontal").grid(
|
|
|
|
|
+ row=10, column=0, columnspan=3, sticky="ew", padx=8, pady=8
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ def _apply_all():
|
|
|
|
|
+ for sid, entry in entries:
|
|
|
|
|
+ state.set_speaker_name(sid, entry.get())
|
|
|
|
|
+ print("[UI] All names applied")
|
|
|
|
|
+
|
|
|
|
|
+ tk.Button(root, text="Apply All Names", width=18, command=_apply_all).grid(
|
|
|
|
|
+ row=11, column=0, columnspan=2, padx=8, pady=4, sticky="w"
|
|
|
|
|
+ )
|
|
|
|
|
+ tk.Button(root, text="Clear Display", width=14, fg="red",
|
|
|
|
|
+ command=lambda: state.clear(mqtt_client)).grid(
|
|
|
|
|
+ row=11, column=2, padx=8, pady=4
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ tk.Label(root, text="Speaker labels appear on the display when the speaker changes.",
|
|
|
|
|
+ font=("Helvetica", 8), fg="gray").grid(
|
|
|
|
|
+ row=12, column=0, columnspan=3, pady=(0, 10)
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ _apply_all() # activate defaults immediately
|
|
|
|
|
+ root.mainloop()
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+# ── Entry point ───────────────────────────────────────────────────────────────
|
|
|
|
|
+
|
|
|
|
|
+def main() -> None:
|
|
|
|
|
+ state = BridgeState()
|
|
|
|
|
+ mqtt_client = build_mqtt_client()
|
|
|
|
|
+
|
|
|
|
|
+ ws_thread = threading.Thread(
|
|
|
|
|
+ target=run_async_loop, args=(state, mqtt_client), daemon=True
|
|
|
|
|
+ )
|
|
|
|
|
+ ws_thread.start()
|
|
|
|
|
+ print("[Bridge] Audio pipeline running — close this window to quit")
|
|
|
|
|
+
|
|
|
|
|
+ run_speaker_ui(state, mqtt_client)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+if __name__ == "__main__":
|
|
|
|
|
+ main()
|