#!/usr/bin/env python3 """ bridge.py — Church Live Transcription Bridge Streams microphone audio to WhisperLiveKit (ws://localhost:8000/asr), receives transcription + speaker diarization, buffers sentences, and publishes rolling 3-line JSON to Mosquitto MQTT for the e-ink display. Start WhisperLiveKit with: wlk --model_size large-v3 --language en --diarization Run this script: python bridge.py """ import asyncio import json import re import textwrap import threading import time from collections import Counter from pathlib import Path import numpy as np import paho.mqtt.client as mqtt import sounddevice as sd import websockets from fastapi import FastAPI, Request import uvicorn # ── Configuration ───────────────────────────────────────────────────────────── MQTT_HOST = "localhost" MQTT_PORT = 1883 MQTT_TOPIC_TEXT = "display/text" MQTT_TOPIC_CLEAR = "display/clear" WS_URL = "ws://localhost:8000/asr" SAMPLE_RATE = 16000 CHANNELS = 1 BLOCKSIZE = 4096 # ~256 ms per chunk at 16 kHz SENTENCE_TIMEOUT = 4.0 # seconds of silence before forcing a flush MAX_LINE_CHARS = 38 # characters per line (~24pt font at 800 px wide) DISPLAY_LINES = 3 # Set to a device index (integer) to force a specific microphone. # Leave as None to use the Windows default input device. # Run bridge.py once to see available device indices printed at startup. AUDIO_DEVICE: int | None = 12 SPEAKERS_FILE = Path(__file__).parent / "speakers.json" DEFAULT_SPEAKERS: dict[str, str] = { "SPEAKER_00": "Pastor", "SPEAKER_01": "Reader", "SPEAKER_02": "Guest", "SPEAKER_03": "Choir", } # Shared queue for test audio injection from admin.py # Admin feeds decoded PCM float32 chunks here; bridge forwards to AudioProcessor _inject_queue: asyncio.Queue[bytes] | None = None # ── Audio injection API (receives chunks from admin.py) ─────────────────────── _bridge_app = FastAPI() @_bridge_app.post("/inject") async def inject_audio(request: Request): chunk = await request.body() if _inject_queue is not None and chunk: try: _inject_queue.put_nowait(chunk) except asyncio.QueueFull: pass return {"ok": True} @_bridge_app.post("/inject/clear") async def inject_clear(): global _inject_queue if _inject_queue: while not _inject_queue.empty(): try: _inject_queue.get_nowait() except asyncio.QueueEmpty: break return {"ok": True} # ── Speaker persistence ─────────────────────────────────────────────────────── def _load_speakers() -> dict[str, str]: if SPEAKERS_FILE.exists(): try: data = json.loads(SPEAKERS_FILE.read_text(encoding="utf-8")) if isinstance(data, dict): return data except (json.JSONDecodeError, OSError): pass # First run — seed with defaults and save _write_speakers(DEFAULT_SPEAKERS) return dict(DEFAULT_SPEAKERS) def _write_speakers(names: dict[str, str]) -> None: try: SPEAKERS_FILE.write_text( json.dumps(names, indent=2, ensure_ascii=False), encoding="utf-8", ) except OSError as exc: print(f"[Speakers] Save failed: {exc}") # ── State ───────────────────────────────────────────────────────────────────── class BridgeState: """All mutable state, protected by a single lock.""" def __init__(self): self._lock = threading.Lock() self.speaker_names: dict[str, str] = _load_speakers() self._seen: set[str] = set(self.speaker_names) self._current_speaker: str | None = None self._speaker_changed = False self._text_buffer = "" self._display: list[str] = [""] * DISPLAY_LINES self._last_final_time = time.monotonic() # ── Speaker name management ─────────────────────────────────────────────── def set_speaker_name(self, speaker_id: str, name: str) -> None: with self._lock: self.speaker_names[speaker_id] = name.strip() self._seen.add(speaker_id) _write_speakers(self.speaker_names) def delete_speaker(self, speaker_id: str) -> None: with self._lock: self.speaker_names.pop(speaker_id, None) self._seen.discard(speaker_id) _write_speakers(self.speaker_names) def seen_speakers_snapshot(self) -> set[str]: with self._lock: return set(self._seen) def _resolve(self, speaker_id: str | None) -> str | None: if not speaker_id: return None return self.speaker_names.get(speaker_id, speaker_id) # ── Text ingestion ──────────────────────────────────────────────────────── def push_final(self, text: str, speaker_id: str | None, mqtt_client: mqtt.Client) -> None: """Accept a finalised segment; flush on sentence boundary or speaker change.""" with self._lock: if speaker_id: self._seen.add(speaker_id) resolved = self._resolve(speaker_id) if resolved != self._current_speaker: if self._text_buffer: self._flush(mqtt_client) self._current_speaker = resolved self._speaker_changed = True sep = " " if self._text_buffer else "" self._text_buffer += sep + text.strip() self._last_final_time = time.monotonic() if _is_sentence_end(text): self._flush(mqtt_client) def maybe_timeout_flush(self, mqtt_client: mqtt.Client) -> None: with self._lock: if self._text_buffer and (time.monotonic() - self._last_final_time) > SENTENCE_TIMEOUT: self._flush(mqtt_client) def _flush(self, mqtt_client: mqtt.Client) -> None: """Word-wrap buffer → rolling display → publish. Must hold lock.""" text = self._text_buffer.strip() self._text_buffer = "" if not text: return new_lines: list[str] = [] if self._speaker_changed and self._current_speaker: new_lines.append(f"[{self._current_speaker.upper()}]") self._speaker_changed = False new_lines.extend(textwrap.wrap(text, MAX_LINE_CHARS) or [""]) self._display.extend(new_lines) self._display = self._display[-DISPLAY_LINES:] while len(self._display) < DISPLAY_LINES: self._display.insert(0, "") payload = json.dumps({"lines": list(self._display)}) mqtt_client.publish(MQTT_TOPIC_TEXT, payload) print(f"[Display] {self._display}") def clear(self, mqtt_client: mqtt.Client) -> None: with self._lock: self._display = [""] * DISPLAY_LINES self._text_buffer = "" self._current_speaker = None self._speaker_changed = False mqtt_client.publish(MQTT_TOPIC_CLEAR, "") print("[Display] Cleared") # ── Helpers ─────────────────────────────────────────────────────────────────── def _is_sentence_end(text: str) -> bool: return bool(re.search(r'[.!?…]\s*$', text.strip())) def _extract_speaker(data: dict) -> str | None: if "speaker" in data: return data["speaker"] or None words = data.get("words", []) if words: ids = [w.get("speaker") for w in words if w.get("speaker")] if ids: return Counter(ids).most_common(1)[0][0] return None # ── MQTT ────────────────────────────────────────────────────────────────────── def build_mqtt_client() -> mqtt.Client: client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) def on_connect(client, userdata, flags, rc, props): print("[MQTT] Connected" if rc == 0 else f"[MQTT] Failed: {rc}") def on_disconnect(client, userdata, flags, rc, props): print(f"[MQTT] Disconnected ({rc}), will reconnect...") client.on_connect = on_connect client.on_disconnect = on_disconnect client.reconnect_delay_set(min_delay=1, max_delay=30) client.connect_async(MQTT_HOST, MQTT_PORT) client.loop_start() return client # ── WebSocket + audio pipeline ──────────────────────────────────────────────── async def _sender(ws, queue: asyncio.Queue) -> None: # Send config handshake first — WhisperLiveKit needs this before audio config = json.dumps({ "uid": "bridge-client", "language": "en", "task": "transcribe", "model_size": "large-v3", "use_vad": True, }) await ws.send(config) # Drain any stale chunks while not queue.empty(): queue.get_nowait() while True: chunk = await queue.get() await ws.send(chunk) async def _receiver(ws, state: BridgeState, mqtt_client: mqtt.Client) -> None: async for message in ws: try: data = json.loads(message) except (json.JSONDecodeError, TypeError): continue text = (data.get("text") or data.get("buffer_transcription") or "").strip() is_final = data.get("is_final", False) or data.get("end_of_segment", False) speaker = _extract_speaker(data) if is_final and text: print(f"[Whisper] ({speaker or '?'}) {text}") state.push_final(text, speaker, mqtt_client) async def _flusher(state: BridgeState, mqtt_client: mqtt.Client) -> None: while True: await asyncio.sleep(1.0) state.maybe_timeout_flush(mqtt_client) async def _speaker_reloader(state: BridgeState) -> None: """Reload speakers.json every 5 s so admin UI changes take effect live.""" last_mtime = 0.0 while True: await asyncio.sleep(5.0) try: mtime = SPEAKERS_FILE.stat().st_mtime if mtime != last_mtime: fresh = _load_speakers() with state._lock: state.speaker_names = fresh last_mtime = mtime print("[Bridge] Speaker names reloaded from disk") except OSError: pass def _choose_audio_device() -> int | None: """ List all input devices and return the index to use. Prefers AUDIO_DEVICE if set, otherwise the system default, otherwise the first device with input channels. """ try: devices = sd.query_devices() default_in = sd.default.device[0] # may be -1 if unset except Exception as exc: print(f"[Audio] Cannot query devices: {exc}") return None print("[Audio] Available input devices:") input_devices: list[tuple[int, str]] = [] for i, dev in enumerate(devices): if dev["max_input_channels"] > 0: marker = " ← default" if i == default_in else "" print(f" [{i}] {dev['name']}{marker}") input_devices.append((i, dev["name"])) if not input_devices: print("[Audio] ERROR: No input devices found. Connect a microphone and restart.") return None # Explicit override from config if AUDIO_DEVICE is not None: print(f"[Audio] Using configured device [{AUDIO_DEVICE}]") return AUDIO_DEVICE # System default (if valid) if default_in >= 0: print(f"[Audio] Using default input device [{default_in}]") return default_in # Fall back to first available input idx, name = input_devices[0] print(f"[Audio] No system default set — using [{idx}] {name}") print("[Audio] To choose a different device, set AUDIO_DEVICE in bridge.py") return idx # Remove the WebSocket audio sender entirely. # Use sounddevice → AudioProcessor directly via the Python API. from whisperlivekit import AudioProcessor, TranscriptionEngine async def audio_processor_loop(state: BridgeState, mqtt_client: mqtt.Client, engine: TranscriptionEngine) -> None: audio_queue: asyncio.Queue[bytes] = asyncio.Queue(maxsize=120) loop = asyncio.get_running_loop() def audio_callback(indata: np.ndarray, frames: int, time_info, status) -> None: if status: print(f"[Audio] {status}") chunk = indata.tobytes() # raw s16le loop.call_soon_threadsafe( lambda: audio_queue.put_nowait(chunk) if not audio_queue.full() else None ) device = _choose_audio_device() if device is None: print("[Audio] No input device — cannot start.") return audio_processor = AudioProcessor(transcription_engine=engine) results_generator = await audio_processor.create_tasks() async def _receive_results(): async for response in results_generator: # response is a FrontData dataclass, not a dict text = (getattr(response, "text", None) or getattr(response, "buffer_transcription", None) or "").strip() is_final = getattr(response, "is_final", False) or getattr(response, "end_of_segment", False) speaker = getattr(response, "speaker", None) if is_final and text: print(f"[Whisper] ({speaker or '?'}) {text}") state.push_final(text, speaker, mqtt_client) async def _send_audio(): global _inject_queue _inject_queue = asyncio.Queue(maxsize=240) global test_audio_queue test_audio_queue = asyncio.Queue(maxsize=240) with sd.InputStream( device=device, samplerate=SAMPLE_RATE, channels=CHANNELS, dtype="int16", # s16le — matches pcm_input mode blocksize=BLOCKSIZE, callback=audio_callback, ): while True: # Drain test audio injection first if available try: chunk = test_audio_queue.get_nowait() except asyncio.QueueEmpty: chunk = await audio_queue.get() await audio_processor.process_audio(chunk) flusher = asyncio.create_task(_flusher(state, mqtt_client)) reloader = asyncio.create_task(_speaker_reloader(state)) try: await asyncio.gather(_send_audio(), _receive_results()) finally: flusher.cancel() reloader.cancel() def run_async_loop(state: BridgeState, mqtt_client: mqtt.Client) -> None: asyncio.run(audio_ws_loop(state, mqtt_client)) # ── Entry point ─────────────────────────────────────────────────────────────── def main() -> None: from whisperlivekit import TranscriptionEngine state = BridgeState() mqtt_client = build_mqtt_client() engine = TranscriptionEngine(model_size="large-v3", lan="en", diarization=False, pcm_input=True) def _run(): asyncio.run(audio_processor_loop(state, mqtt_client, engine)) ws_thread = threading.Thread(target=_run, daemon=True) ws_thread.start() print("[Bridge] Audio pipeline running") try: ws_thread.join() except KeyboardInterrupt: pass def _run_inject_api(): uvicorn.run(_bridge_app, host="127.0.0.1", port=8002, log_level="warning") inject_thread = threading.Thread(target=_run_inject_api, daemon=True) inject_thread.start() if __name__ == "__main__": main()