| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516 |
- #!/usr/bin/env python3
- """
- bridge.py — Church Live Transcription Bridge
- Streams microphone audio to WhisperLiveKit (ws://localhost:8000/asr),
- receives transcription + speaker diarization, buffers sentences, and
- publishes rolling 3-line JSON to Mosquitto MQTT for the e-ink display.
- Start WhisperLiveKit with:
- wlk --model large-v3 --language en --diarization
- Run this script:
- python bridge.py
- """
- import asyncio
- import json
- import re
- import textwrap
- import threading
- import time
- from collections import Counter
- from pathlib import Path
- import numpy as np
- import paho.mqtt.client as mqtt
- import sounddevice as sd
- import websockets
- import tkinter as tk
- from tkinter import ttk
- # ── Configuration ─────────────────────────────────────────────────────────────
- MQTT_HOST = "localhost"
- MQTT_PORT = 1883
- MQTT_TOPIC_TEXT = "display/text"
- MQTT_TOPIC_CLEAR = "display/clear"
- WS_URL = "ws://localhost:8000/asr"
- SAMPLE_RATE = 16000
- CHANNELS = 1
- BLOCKSIZE = 4096 # ~256 ms per chunk at 16 kHz
- SENTENCE_TIMEOUT = 4.0 # seconds of silence before forcing a flush
- MAX_LINE_CHARS = 38 # characters per line (~24pt font at 800 px wide)
- DISPLAY_LINES = 3
- # Set to a device index (integer) to force a specific microphone.
- # Leave as None to use the Windows default input device.
- # Run bridge.py once to see available device indices printed at startup.
- AUDIO_DEVICE: int | None = None
- SPEAKERS_FILE = Path(__file__).parent / "speakers.json"
- DEFAULT_SPEAKERS: dict[str, str] = {
- "SPEAKER_00": "Pastor",
- "SPEAKER_01": "Reader",
- "SPEAKER_02": "Guest",
- "SPEAKER_03": "Choir",
- }
- # ── Speaker persistence ───────────────────────────────────────────────────────
- def _load_speakers() -> dict[str, str]:
- if SPEAKERS_FILE.exists():
- try:
- data = json.loads(SPEAKERS_FILE.read_text(encoding="utf-8"))
- if isinstance(data, dict):
- return data
- except (json.JSONDecodeError, OSError):
- pass
- # First run — seed with defaults and save
- _write_speakers(DEFAULT_SPEAKERS)
- return dict(DEFAULT_SPEAKERS)
- def _write_speakers(names: dict[str, str]) -> None:
- try:
- SPEAKERS_FILE.write_text(
- json.dumps(names, indent=2, ensure_ascii=False),
- encoding="utf-8",
- )
- except OSError as exc:
- print(f"[Speakers] Save failed: {exc}")
- # ── State ─────────────────────────────────────────────────────────────────────
- class BridgeState:
- """All mutable state, protected by a single lock."""
- def __init__(self):
- self._lock = threading.Lock()
- self.speaker_names: dict[str, str] = _load_speakers()
- self._seen: set[str] = set(self.speaker_names)
- self._current_speaker: str | None = None
- self._speaker_changed = False
- self._text_buffer = ""
- self._display: list[str] = [""] * DISPLAY_LINES
- self._last_final_time = time.monotonic()
- # ── Speaker name management ───────────────────────────────────────────────
- def set_speaker_name(self, speaker_id: str, name: str) -> None:
- with self._lock:
- self.speaker_names[speaker_id] = name.strip()
- self._seen.add(speaker_id)
- _write_speakers(self.speaker_names)
- def delete_speaker(self, speaker_id: str) -> None:
- with self._lock:
- self.speaker_names.pop(speaker_id, None)
- self._seen.discard(speaker_id)
- _write_speakers(self.speaker_names)
- def seen_speakers_snapshot(self) -> set[str]:
- with self._lock:
- return set(self._seen)
- def _resolve(self, speaker_id: str | None) -> str | None:
- if not speaker_id:
- return None
- return self.speaker_names.get(speaker_id, speaker_id)
- # ── Text ingestion ────────────────────────────────────────────────────────
- def push_final(self, text: str, speaker_id: str | None, mqtt_client: mqtt.Client) -> None:
- """Accept a finalised segment; flush on sentence boundary or speaker change."""
- with self._lock:
- if speaker_id:
- self._seen.add(speaker_id)
- resolved = self._resolve(speaker_id)
- if resolved != self._current_speaker:
- if self._text_buffer:
- self._flush(mqtt_client)
- self._current_speaker = resolved
- self._speaker_changed = True
- sep = " " if self._text_buffer else ""
- self._text_buffer += sep + text.strip()
- self._last_final_time = time.monotonic()
- if _is_sentence_end(text):
- self._flush(mqtt_client)
- def maybe_timeout_flush(self, mqtt_client: mqtt.Client) -> None:
- with self._lock:
- if self._text_buffer and (time.monotonic() - self._last_final_time) > SENTENCE_TIMEOUT:
- self._flush(mqtt_client)
- def _flush(self, mqtt_client: mqtt.Client) -> None:
- """Word-wrap buffer → rolling display → publish. Must hold lock."""
- text = self._text_buffer.strip()
- self._text_buffer = ""
- if not text:
- return
- new_lines: list[str] = []
- if self._speaker_changed and self._current_speaker:
- new_lines.append(f"[{self._current_speaker.upper()}]")
- self._speaker_changed = False
- new_lines.extend(textwrap.wrap(text, MAX_LINE_CHARS) or [""])
- self._display.extend(new_lines)
- self._display = self._display[-DISPLAY_LINES:]
- while len(self._display) < DISPLAY_LINES:
- self._display.insert(0, "")
- payload = json.dumps({"lines": list(self._display)})
- mqtt_client.publish(MQTT_TOPIC_TEXT, payload)
- print(f"[Display] {self._display}")
- def clear(self, mqtt_client: mqtt.Client) -> None:
- with self._lock:
- self._display = [""] * DISPLAY_LINES
- self._text_buffer = ""
- self._current_speaker = None
- self._speaker_changed = False
- mqtt_client.publish(MQTT_TOPIC_CLEAR, "")
- print("[Display] Cleared")
- # ── Helpers ───────────────────────────────────────────────────────────────────
- def _is_sentence_end(text: str) -> bool:
- return bool(re.search(r'[.!?…]\s*$', text.strip()))
- def _extract_speaker(data: dict) -> str | None:
- if "speaker" in data:
- return data["speaker"] or None
- words = data.get("words", [])
- if words:
- ids = [w.get("speaker") for w in words if w.get("speaker")]
- if ids:
- return Counter(ids).most_common(1)[0][0]
- return None
- # ── MQTT ──────────────────────────────────────────────────────────────────────
- def build_mqtt_client() -> mqtt.Client:
- client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
- def on_connect(client, userdata, flags, rc, props):
- print("[MQTT] Connected" if rc == 0 else f"[MQTT] Failed: {rc}")
- def on_disconnect(client, userdata, flags, rc, props):
- print(f"[MQTT] Disconnected ({rc}), will reconnect...")
- client.on_connect = on_connect
- client.on_disconnect = on_disconnect
- client.reconnect_delay_set(min_delay=1, max_delay=30)
- client.connect_async(MQTT_HOST, MQTT_PORT)
- client.loop_start()
- return client
- # ── WebSocket + audio pipeline ────────────────────────────────────────────────
- async def _sender(ws, queue: asyncio.Queue) -> None:
- while not queue.empty():
- queue.get_nowait()
- while True:
- chunk = await queue.get()
- await ws.send(chunk)
- async def _receiver(ws, state: BridgeState, mqtt_client: mqtt.Client) -> None:
- async for message in ws:
- try:
- data = json.loads(message)
- except (json.JSONDecodeError, TypeError):
- continue
- text = (data.get("text") or data.get("buffer_transcription") or "").strip()
- is_final = data.get("is_final", False) or data.get("end_of_segment", False)
- speaker = _extract_speaker(data)
- if is_final and text:
- print(f"[Whisper] ({speaker or '?'}) {text}")
- state.push_final(text, speaker, mqtt_client)
- async def _flusher(state: BridgeState, mqtt_client: mqtt.Client) -> None:
- while True:
- await asyncio.sleep(1.0)
- state.maybe_timeout_flush(mqtt_client)
- def _choose_audio_device() -> int | None:
- """
- List all input devices and return the index to use.
- Prefers AUDIO_DEVICE if set, otherwise the system default,
- otherwise the first device with input channels.
- """
- try:
- devices = sd.query_devices()
- default_in = sd.default.device[0] # may be -1 if unset
- except Exception as exc:
- print(f"[Audio] Cannot query devices: {exc}")
- return None
- print("[Audio] Available input devices:")
- input_devices: list[tuple[int, str]] = []
- for i, dev in enumerate(devices):
- if dev["max_input_channels"] > 0:
- marker = " ← default" if i == default_in else ""
- print(f" [{i}] {dev['name']}{marker}")
- input_devices.append((i, dev["name"]))
- if not input_devices:
- print("[Audio] ERROR: No input devices found. Connect a microphone and restart.")
- return None
- # Explicit override from config
- if AUDIO_DEVICE is not None:
- print(f"[Audio] Using configured device [{AUDIO_DEVICE}]")
- return AUDIO_DEVICE
- # System default (if valid)
- if default_in >= 0:
- print(f"[Audio] Using default input device [{default_in}]")
- return default_in
- # Fall back to first available input
- idx, name = input_devices[0]
- print(f"[Audio] No system default set — using [{idx}] {name}")
- print("[Audio] To choose a different device, set AUDIO_DEVICE in bridge.py")
- return idx
- async def audio_ws_loop(state: BridgeState, mqtt_client: mqtt.Client) -> None:
- audio_queue: asyncio.Queue[bytes] = asyncio.Queue(maxsize=120)
- loop = asyncio.get_running_loop()
- def audio_callback(indata: np.ndarray, frames: int, time_info, status) -> None:
- if status:
- print(f"[Audio] {status}")
- chunk = indata.tobytes()
- def _put():
- try:
- audio_queue.put_nowait(chunk)
- except asyncio.QueueFull:
- pass
- loop.call_soon_threadsafe(_put)
- device = _choose_audio_device()
- if device is None:
- print("[Audio] No input device available — audio pipeline cannot start.")
- return
- with sd.InputStream(
- device=device,
- samplerate=SAMPLE_RATE,
- channels=CHANNELS,
- dtype="int16",
- blocksize=BLOCKSIZE,
- callback=audio_callback,
- ):
- flusher = asyncio.create_task(_flusher(state, mqtt_client))
- try:
- while True:
- try:
- print(f"[WS] Connecting to {WS_URL} ...")
- async with websockets.connect(WS_URL, max_size=2**23) as ws:
- print("[WS] Connected")
- send_t = asyncio.create_task(_sender(ws, audio_queue))
- recv_t = asyncio.create_task(_receiver(ws, state, mqtt_client))
- done, pending = await asyncio.wait(
- [send_t, recv_t], return_when=asyncio.FIRST_COMPLETED
- )
- for t in pending:
- t.cancel()
- for t in done:
- if not t.cancelled() and (exc := t.exception()):
- print(f"[WS] Task error: {exc}")
- except (websockets.ConnectionClosed, OSError, ConnectionRefusedError) as exc:
- print(f"[WS] {exc} — retrying in 3 s...")
- await asyncio.sleep(3)
- finally:
- flusher.cancel()
- def run_async_loop(state: BridgeState, mqtt_client: mqtt.Client) -> None:
- asyncio.run(audio_ws_loop(state, mqtt_client))
- # ── Speaker UI ────────────────────────────────────────────────────────────────
- def run_speaker_ui(state: BridgeState, mqtt_client: mqtt.Client) -> None:
- root = tk.Tk()
- root.title("Transcription Bridge — Speaker Names")
- root.attributes("-topmost", True)
- root.minsize(440, 360)
- root.geometry("460x480")
- # ── Header ────────────────────────────────────────────────────────────────
- tk.Label(root, text="Speaker Name Mapping", font=("Helvetica", 12, "bold")).pack(
- pady=(12, 2)
- )
- tk.Label(
- root,
- text="Names are saved to speakers.json and restored each session.\n"
- "New speakers detected by diarization appear here automatically.",
- font=("Helvetica", 9), fg="gray", justify="center",
- ).pack(pady=(0, 6))
- # ── Scrollable list ───────────────────────────────────────────────────────
- list_outer = tk.Frame(root, relief="sunken", bd=1)
- list_outer.pack(fill="both", expand=True, padx=12, pady=(0, 4))
- canvas = tk.Canvas(list_outer, highlightthickness=0)
- scrollbar = ttk.Scrollbar(list_outer, orient="vertical", command=canvas.yview)
- rows_frame = tk.Frame(canvas)
- rows_frame.bind(
- "<Configure>",
- lambda e: canvas.configure(scrollregion=canvas.bbox("all")),
- )
- canvas.create_window((0, 0), window=rows_frame, anchor="nw")
- canvas.configure(yscrollcommand=scrollbar.set)
- canvas.pack(side="left", fill="both", expand=True)
- scrollbar.pack(side="right", fill="y")
- canvas.bind_all(
- "<MouseWheel>",
- lambda e: canvas.yview_scroll(int(-1 * e.delta / 120), "units"),
- )
- # Column headers
- hdr = tk.Frame(rows_frame, bg="#e8e8e8")
- hdr.pack(fill="x", pady=(2, 0))
- tk.Label(hdr, text=" Speaker ID", bg="#e8e8e8", font=("Helvetica", 9, "bold"), width=14, anchor="w").pack(side="left")
- tk.Label(hdr, text="Friendly Name", bg="#e8e8e8", font=("Helvetica", 9, "bold"), width=18, anchor="w").pack(side="left")
- # ── Row management ────────────────────────────────────────────────────────
- rendered_sids: set[str] = set()
- def _add_row(sid: str, name: str) -> None:
- if sid in rendered_sids:
- return
- rendered_sids.add(sid)
- row = tk.Frame(rows_frame)
- row.pack(fill="x", padx=4, pady=2)
- tk.Label(row, text=sid, font=("Courier", 9), width=14, anchor="w").pack(side="left")
- entry = tk.Entry(row, font=("Helvetica", 10), width=18)
- entry.insert(0, name)
- entry.pack(side="left", padx=4)
- saved_lbl = tk.Label(row, text="", font=("Helvetica", 8), fg="#2a7a2a", width=5)
- saved_lbl.pack(side="left")
- def _save(s=sid, e=entry, lbl=saved_lbl):
- n = e.get().strip()
- if not n:
- return
- state.set_speaker_name(s, n)
- lbl.config(text="Saved")
- row.after(2000, lambda: lbl.config(text=""))
- print(f"[UI] {s} → {n!r}")
- def _delete(s=sid, r=row):
- state.delete_speaker(s)
- rendered_sids.discard(s)
- r.destroy()
- print(f"[UI] Removed {s}")
- entry.bind("<Return>", lambda _e, s=sid, e=entry, lbl=saved_lbl: _save(s, e, lbl))
- tk.Button(row, text="Save", command=_save, width=5).pack(side="left", padx=2)
- tk.Button(row, text="✕", command=_delete, fg="red", width=3).pack(side="left")
- # Populate from persisted state (sorted so order is stable)
- for sid, name in sorted(state.speaker_names.items()):
- _add_row(sid, name)
- # Poll every 2 s for speaker IDs newly seen from Whisper this session
- def _poll():
- for sid in sorted(state.seen_speakers_snapshot() - rendered_sids):
- _add_row(sid, state.speaker_names.get(sid, sid))
- root.after(2000, _poll)
- _poll()
- # ── Add row manually ──────────────────────────────────────────────────────
- ttk.Separator(root, orient="horizontal").pack(fill="x", padx=12, pady=4)
- add_row = tk.Frame(root)
- add_row.pack(fill="x", padx=12)
- tk.Label(add_row, text="Add:", font=("Helvetica", 9)).pack(side="left")
- add_id = tk.Entry(add_row, font=("Courier", 9), width=13)
- add_id.insert(0, "SPEAKER_04")
- add_id.pack(side="left", padx=4)
- tk.Label(add_row, text="→").pack(side="left")
- add_name = tk.Entry(add_row, font=("Helvetica", 10), width=16)
- add_name.pack(side="left", padx=4)
- def _add_manual():
- sid = add_id.get().strip()
- name = add_name.get().strip()
- if sid and name:
- state.set_speaker_name(sid, name)
- _add_row(sid, name)
- add_name.delete(0, tk.END)
- print(f"[UI] Added {sid} → {name!r}")
- add_name.bind("<Return>", lambda _e: _add_manual())
- tk.Button(add_row, text="Add", command=_add_manual, width=5).pack(side="left", padx=2)
- # ── Footer buttons ────────────────────────────────────────────────────────
- ttk.Separator(root, orient="horizontal").pack(fill="x", padx=12, pady=6)
- footer = tk.Frame(root)
- footer.pack(fill="x", padx=12, pady=(0, 12))
- tk.Label(
- footer, text="Changes save instantly to speakers.json",
- font=("Helvetica", 8), fg="gray",
- ).pack(side="left")
- tk.Button(
- footer, text="Clear Display", fg="red", width=14,
- command=lambda: state.clear(mqtt_client),
- ).pack(side="right")
- root.mainloop()
- # ── Entry point ───────────────────────────────────────────────────────────────
- def main() -> None:
- state = BridgeState()
- mqtt_client = build_mqtt_client()
- ws_thread = threading.Thread(
- target=run_async_loop, args=(state, mqtt_client), daemon=True
- )
- ws_thread.start()
- print(f"[Bridge] Speaker names loaded from {SPEAKERS_FILE}")
- print("[Bridge] Audio pipeline running — close this window to quit")
- run_speaker_ui(state, mqtt_client)
- if __name__ == "__main__":
- main()
|