| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395 |
- #!/usr/bin/env python3
- """
- bridge.py — Church Live Transcription Bridge
- Streams microphone audio to WhisperLiveKit (ws://localhost:8000/asr),
- receives transcription + speaker diarization, buffers sentences, and
- publishes rolling 3-line JSON to Mosquitto MQTT for the e-ink display.
- Start WhisperLiveKit with:
- wlk --model_size large-v3 --language en --diarization
- Run this script:
- python bridge.py
- """
- import asyncio
- import json
- import re
- import textwrap
- import threading
- import time
- from collections import Counter
- from pathlib import Path
- import numpy as np
- import paho.mqtt.client as mqtt
- import sounddevice as sd
- import websockets
- # ── Configuration ─────────────────────────────────────────────────────────────
- MQTT_HOST = "localhost"
- MQTT_PORT = 1883
- MQTT_TOPIC_TEXT = "display/text"
- MQTT_TOPIC_CLEAR = "display/clear"
- WS_URL = "ws://localhost:8000/asr"
- SAMPLE_RATE = 16000
- CHANNELS = 1
- BLOCKSIZE = 4096 # ~256 ms per chunk at 16 kHz
- SENTENCE_TIMEOUT = 4.0 # seconds of silence before forcing a flush
- MAX_LINE_CHARS = 38 # characters per line (~24pt font at 800 px wide)
- DISPLAY_LINES = 3
- # Set to a device index (integer) to force a specific microphone.
- # Leave as None to use the Windows default input device.
- # Run bridge.py once to see available device indices printed at startup.
- AUDIO_DEVICE: int | None = 12
- SPEAKERS_FILE = Path(__file__).parent / "speakers.json"
- DEFAULT_SPEAKERS: dict[str, str] = {
- "SPEAKER_00": "Pastor",
- "SPEAKER_01": "Reader",
- "SPEAKER_02": "Guest",
- "SPEAKER_03": "Choir",
- }
- # ── Speaker persistence ───────────────────────────────────────────────────────
- def _load_speakers() -> dict[str, str]:
- if SPEAKERS_FILE.exists():
- try:
- data = json.loads(SPEAKERS_FILE.read_text(encoding="utf-8"))
- if isinstance(data, dict):
- return data
- except (json.JSONDecodeError, OSError):
- pass
- # First run — seed with defaults and save
- _write_speakers(DEFAULT_SPEAKERS)
- return dict(DEFAULT_SPEAKERS)
- def _write_speakers(names: dict[str, str]) -> None:
- try:
- SPEAKERS_FILE.write_text(
- json.dumps(names, indent=2, ensure_ascii=False),
- encoding="utf-8",
- )
- except OSError as exc:
- print(f"[Speakers] Save failed: {exc}")
- # ── State ─────────────────────────────────────────────────────────────────────
- class BridgeState:
- """All mutable state, protected by a single lock."""
- def __init__(self):
- self._lock = threading.Lock()
- self.speaker_names: dict[str, str] = _load_speakers()
- self._seen: set[str] = set(self.speaker_names)
- self._current_speaker: str | None = None
- self._speaker_changed = False
- self._text_buffer = ""
- self._display: list[str] = [""] * DISPLAY_LINES
- self._last_final_time = time.monotonic()
- # ── Speaker name management ───────────────────────────────────────────────
- def set_speaker_name(self, speaker_id: str, name: str) -> None:
- with self._lock:
- self.speaker_names[speaker_id] = name.strip()
- self._seen.add(speaker_id)
- _write_speakers(self.speaker_names)
- def delete_speaker(self, speaker_id: str) -> None:
- with self._lock:
- self.speaker_names.pop(speaker_id, None)
- self._seen.discard(speaker_id)
- _write_speakers(self.speaker_names)
- def seen_speakers_snapshot(self) -> set[str]:
- with self._lock:
- return set(self._seen)
- def _resolve(self, speaker_id: str | None) -> str | None:
- if not speaker_id:
- return None
- return self.speaker_names.get(speaker_id, speaker_id)
- # ── Text ingestion ────────────────────────────────────────────────────────
- def push_final(self, text: str, speaker_id: str | None, mqtt_client: mqtt.Client) -> None:
- """Accept a finalised segment; flush on sentence boundary or speaker change."""
- with self._lock:
- if speaker_id:
- self._seen.add(speaker_id)
- resolved = self._resolve(speaker_id)
- if resolved != self._current_speaker:
- if self._text_buffer:
- self._flush(mqtt_client)
- self._current_speaker = resolved
- self._speaker_changed = True
- sep = " " if self._text_buffer else ""
- self._text_buffer += sep + text.strip()
- self._last_final_time = time.monotonic()
- if _is_sentence_end(text):
- self._flush(mqtt_client)
- def maybe_timeout_flush(self, mqtt_client: mqtt.Client) -> None:
- with self._lock:
- if self._text_buffer and (time.monotonic() - self._last_final_time) > SENTENCE_TIMEOUT:
- self._flush(mqtt_client)
- def _flush(self, mqtt_client: mqtt.Client) -> None:
- """Word-wrap buffer → rolling display → publish. Must hold lock."""
- text = self._text_buffer.strip()
- self._text_buffer = ""
- if not text:
- return
- new_lines: list[str] = []
- if self._speaker_changed and self._current_speaker:
- new_lines.append(f"[{self._current_speaker.upper()}]")
- self._speaker_changed = False
- new_lines.extend(textwrap.wrap(text, MAX_LINE_CHARS) or [""])
- self._display.extend(new_lines)
- self._display = self._display[-DISPLAY_LINES:]
- while len(self._display) < DISPLAY_LINES:
- self._display.insert(0, "")
- payload = json.dumps({"lines": list(self._display)})
- mqtt_client.publish(MQTT_TOPIC_TEXT, payload)
- print(f"[Display] {self._display}")
- def clear(self, mqtt_client: mqtt.Client) -> None:
- with self._lock:
- self._display = [""] * DISPLAY_LINES
- self._text_buffer = ""
- self._current_speaker = None
- self._speaker_changed = False
- mqtt_client.publish(MQTT_TOPIC_CLEAR, "")
- print("[Display] Cleared")
- # ── Helpers ───────────────────────────────────────────────────────────────────
- def _is_sentence_end(text: str) -> bool:
- return bool(re.search(r'[.!?…]\s*$', text.strip()))
- def _extract_speaker(data: dict) -> str | None:
- if "speaker" in data:
- return data["speaker"] or None
- words = data.get("words", [])
- if words:
- ids = [w.get("speaker") for w in words if w.get("speaker")]
- if ids:
- return Counter(ids).most_common(1)[0][0]
- return None
- # ── MQTT ──────────────────────────────────────────────────────────────────────
- def build_mqtt_client() -> mqtt.Client:
- client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
- def on_connect(client, userdata, flags, rc, props):
- print("[MQTT] Connected" if rc == 0 else f"[MQTT] Failed: {rc}")
- def on_disconnect(client, userdata, flags, rc, props):
- print(f"[MQTT] Disconnected ({rc}), will reconnect...")
- client.on_connect = on_connect
- client.on_disconnect = on_disconnect
- client.reconnect_delay_set(min_delay=1, max_delay=30)
- client.connect_async(MQTT_HOST, MQTT_PORT)
- client.loop_start()
- return client
- # ── WebSocket + audio pipeline ────────────────────────────────────────────────
- async def _sender(ws, queue: asyncio.Queue) -> None:
- # Send config handshake first — WhisperLiveKit needs this before audio
- config = json.dumps({
- "uid": "bridge-client",
- "language": "en",
- "task": "transcribe",
- "model_size": "large-v3",
- "use_vad": True,
- })
- await ws.send(config)
- # Drain any stale chunks
- while not queue.empty():
- queue.get_nowait()
- while True:
- chunk = await queue.get()
- await ws.send(chunk)
- async def _receiver(ws, state: BridgeState, mqtt_client: mqtt.Client) -> None:
- async for message in ws:
- try:
- data = json.loads(message)
- except (json.JSONDecodeError, TypeError):
- continue
- text = (data.get("text") or data.get("buffer_transcription") or "").strip()
- is_final = data.get("is_final", False) or data.get("end_of_segment", False)
- speaker = _extract_speaker(data)
- if is_final and text:
- print(f"[Whisper] ({speaker or '?'}) {text}")
- state.push_final(text, speaker, mqtt_client)
- async def _flusher(state: BridgeState, mqtt_client: mqtt.Client) -> None:
- while True:
- await asyncio.sleep(1.0)
- state.maybe_timeout_flush(mqtt_client)
- async def _speaker_reloader(state: BridgeState) -> None:
- """Reload speakers.json every 5 s so admin UI changes take effect live."""
- last_mtime = 0.0
- while True:
- await asyncio.sleep(5.0)
- try:
- mtime = SPEAKERS_FILE.stat().st_mtime
- if mtime != last_mtime:
- fresh = _load_speakers()
- with state._lock:
- state.speaker_names = fresh
- last_mtime = mtime
- print("[Bridge] Speaker names reloaded from disk")
- except OSError:
- pass
- def _choose_audio_device() -> int | None:
- """
- List all input devices and return the index to use.
- Prefers AUDIO_DEVICE if set, otherwise the system default,
- otherwise the first device with input channels.
- """
- try:
- devices = sd.query_devices()
- default_in = sd.default.device[0] # may be -1 if unset
- except Exception as exc:
- print(f"[Audio] Cannot query devices: {exc}")
- return None
- print("[Audio] Available input devices:")
- input_devices: list[tuple[int, str]] = []
- for i, dev in enumerate(devices):
- if dev["max_input_channels"] > 0:
- marker = " ← default" if i == default_in else ""
- print(f" [{i}] {dev['name']}{marker}")
- input_devices.append((i, dev["name"]))
- if not input_devices:
- print("[Audio] ERROR: No input devices found. Connect a microphone and restart.")
- return None
- # Explicit override from config
- if AUDIO_DEVICE is not None:
- print(f"[Audio] Using configured device [{AUDIO_DEVICE}]")
- return AUDIO_DEVICE
- # System default (if valid)
- if default_in >= 0:
- print(f"[Audio] Using default input device [{default_in}]")
- return default_in
- # Fall back to first available input
- idx, name = input_devices[0]
- print(f"[Audio] No system default set — using [{idx}] {name}")
- print("[Audio] To choose a different device, set AUDIO_DEVICE in bridge.py")
- return idx
- # Remove the WebSocket audio sender entirely.
- # Use sounddevice → AudioProcessor directly via the Python API.
- from whisperlivekit import AudioProcessor, TranscriptionEngine
- async def audio_processor_loop(state: BridgeState, mqtt_client: mqtt.Client, engine: TranscriptionEngine) -> None:
- audio_queue: asyncio.Queue[bytes] = asyncio.Queue(maxsize=120)
- loop = asyncio.get_running_loop()
- def audio_callback(indata: np.ndarray, frames: int, time_info, status) -> None:
- if status:
- print(f"[Audio] {status}")
- chunk = indata.astype(np.float32).tobytes()
- loop.call_soon_threadsafe(lambda: audio_queue.put_nowait(chunk) if not audio_queue.full() else None)
- device = _choose_audio_device()
- if device is None:
- print("[Audio] No input device — cannot start.")
- return
- audio_processor = AudioProcessor(transcription_engine=engine)
- results_generator = await audio_processor.create_tasks()
- async def _receive_results():
- async for response in results_generator:
- # response is a FrontData dataclass, not a dict
- text = (getattr(response, "text", None) or getattr(response, "buffer_transcription", None) or "").strip()
- is_final = getattr(response, "is_final", False) or getattr(response, "end_of_segment", False)
- speaker = getattr(response, "speaker", None)
- if is_final and text:
- print(f"[Whisper] ({speaker or '?'}) {text}")
- state.push_final(text, speaker, mqtt_client)
- async def _send_audio():
- with sd.InputStream(
- device=device, samplerate=SAMPLE_RATE, channels=CHANNELS,
- dtype="float32", blocksize=BLOCKSIZE, callback=audio_callback,
- ):
- while True:
- chunk = await audio_queue.get()
- await audio_processor.process_audio(chunk)
- flusher = asyncio.create_task(_flusher(state, mqtt_client))
- reloader = asyncio.create_task(_speaker_reloader(state))
- try:
- await asyncio.gather(_send_audio(), _receive_results())
- finally:
- flusher.cancel()
- reloader.cancel()
- def run_async_loop(state: BridgeState, mqtt_client: mqtt.Client) -> None:
- asyncio.run(audio_ws_loop(state, mqtt_client))
- # ── Entry point ───────────────────────────────────────────────────────────────
- def main() -> None:
- from whisperlivekit import TranscriptionEngine
- state = BridgeState()
- mqtt_client = build_mqtt_client()
- engine = TranscriptionEngine(model_size="large-v3", lan="en", diarization=False)
- def _run():
- asyncio.run(audio_processor_loop(state, mqtt_client, engine))
- ws_thread = threading.Thread(target=_run, daemon=True)
- ws_thread.start()
- print("[Bridge] Audio pipeline running")
- try:
- ws_thread.join()
- except KeyboardInterrupt:
- pass
- if __name__ == "__main__":
- main()
|