|
|
@@ -1,13 +1,9 @@
|
|
|
#!/usr/bin/env python3
|
|
|
"""
|
|
|
-bridge.py — Church Live Transcription Bridge
|
|
|
+bridge.py — Live Transcription Bridge
|
|
|
|
|
|
-Streams microphone audio to WhisperLiveKit (ws://localhost:8000/asr),
|
|
|
-receives transcription + speaker diarization, buffers sentences, and
|
|
|
-publishes rolling 3-line JSON to Mosquitto MQTT for the e-ink display.
|
|
|
-
|
|
|
-Start WhisperLiveKit with:
|
|
|
- wlk --model_size large-v3 --language en --diarization
|
|
|
+Uses WhisperLiveKit's AudioProcessor API directly (no WebSocket).
|
|
|
+Publishes rolling 3-line JSON to Mosquitto MQTT.
|
|
|
|
|
|
Run this script:
|
|
|
python bridge.py
|
|
|
@@ -15,6 +11,7 @@ Run this script:
|
|
|
|
|
|
import asyncio
|
|
|
import json
|
|
|
+import queue as _stdlib_queue
|
|
|
import re
|
|
|
import textwrap
|
|
|
import threading
|
|
|
@@ -25,9 +22,9 @@ from pathlib import Path
|
|
|
import numpy as np
|
|
|
import paho.mqtt.client as mqtt
|
|
|
import sounddevice as sd
|
|
|
-import websockets
|
|
|
|
|
|
from fastapi import FastAPI, Request
|
|
|
+from whisperlivekit import AudioProcessor, TranscriptionEngine
|
|
|
import uvicorn
|
|
|
|
|
|
# ── Configuration ─────────────────────────────────────────────────────────────
|
|
|
@@ -37,18 +34,16 @@ MQTT_PORT = 1883
|
|
|
MQTT_TOPIC_TEXT = "display/text"
|
|
|
MQTT_TOPIC_CLEAR = "display/clear"
|
|
|
|
|
|
-WS_URL = "ws://localhost:8000/asr"
|
|
|
SAMPLE_RATE = 16000
|
|
|
CHANNELS = 1
|
|
|
BLOCKSIZE = 4096 # ~256 ms per chunk at 16 kHz
|
|
|
|
|
|
SENTENCE_TIMEOUT = 4.0 # seconds of silence before forcing a flush
|
|
|
-MAX_LINE_CHARS = 38 # characters per line (~24pt font at 800 px wide)
|
|
|
+MAX_LINE_CHARS = 38 # characters per line
|
|
|
DISPLAY_LINES = 3
|
|
|
|
|
|
# Set to a device index (integer) to force a specific microphone.
|
|
|
# Leave as None to use the Windows default input device.
|
|
|
-# Run bridge.py once to see available device indices printed at startup.
|
|
|
AUDIO_DEVICE: int | None = 12
|
|
|
|
|
|
SPEAKERS_FILE = Path(__file__).parent / "speakers.json"
|
|
|
@@ -60,32 +55,33 @@ DEFAULT_SPEAKERS: dict[str, str] = {
|
|
|
"SPEAKER_03": "Choir",
|
|
|
}
|
|
|
|
|
|
-# Shared queue for test audio injection from admin.py
|
|
|
-# Admin feeds decoded PCM float32 chunks here; bridge forwards to AudioProcessor
|
|
|
-_inject_queue: asyncio.Queue[bytes] | None = None
|
|
|
+# ── Audio injection queue ─────────────────────────────────────────────────────
|
|
|
+# stdlib queue.Queue is thread-safe across event loops; asyncio.Queue is not.
|
|
|
+# admin.py POSTs test audio chunks to /inject (port 8002) which puts them here.
|
|
|
+# _send_audio() drains this queue in preference to the live microphone.
|
|
|
+_inject_queue: _stdlib_queue.Queue = _stdlib_queue.Queue(maxsize=240)
|
|
|
+
|
|
|
+# ── Audio injection API ───────────────────────────────────────────────────────
|
|
|
|
|
|
-# ── Audio injection API (receives chunks from admin.py) ───────────────────────
|
|
|
_bridge_app = FastAPI()
|
|
|
|
|
|
@_bridge_app.post("/inject")
|
|
|
async def inject_audio(request: Request):
|
|
|
chunk = await request.body()
|
|
|
- if _inject_queue is not None and chunk:
|
|
|
+ if chunk:
|
|
|
try:
|
|
|
_inject_queue.put_nowait(chunk)
|
|
|
- except asyncio.QueueFull:
|
|
|
+ except _stdlib_queue.Full:
|
|
|
pass
|
|
|
return {"ok": True}
|
|
|
|
|
|
@_bridge_app.post("/inject/clear")
|
|
|
async def inject_clear():
|
|
|
- global _inject_queue
|
|
|
- if _inject_queue:
|
|
|
- while not _inject_queue.empty():
|
|
|
- try:
|
|
|
- _inject_queue.get_nowait()
|
|
|
- except asyncio.QueueEmpty:
|
|
|
- break
|
|
|
+ while True:
|
|
|
+ try:
|
|
|
+ _inject_queue.get_nowait()
|
|
|
+ except _stdlib_queue.Empty:
|
|
|
+ break
|
|
|
return {"ok": True}
|
|
|
|
|
|
|
|
|
@@ -99,7 +95,6 @@ def _load_speakers() -> dict[str, str]:
|
|
|
return data
|
|
|
except (json.JSONDecodeError, OSError):
|
|
|
pass
|
|
|
- # First run — seed with defaults and save
|
|
|
_write_speakers(DEFAULT_SPEAKERS)
|
|
|
return dict(DEFAULT_SPEAKERS)
|
|
|
|
|
|
@@ -129,8 +124,6 @@ class BridgeState:
|
|
|
self._display: list[str] = [""] * DISPLAY_LINES
|
|
|
self._last_final_time = time.monotonic()
|
|
|
|
|
|
- # ── Speaker name management ───────────────────────────────────────────────
|
|
|
-
|
|
|
def set_speaker_name(self, speaker_id: str, name: str) -> None:
|
|
|
with self._lock:
|
|
|
self.speaker_names[speaker_id] = name.strip()
|
|
|
@@ -152,10 +145,7 @@ class BridgeState:
|
|
|
return None
|
|
|
return self.speaker_names.get(speaker_id, speaker_id)
|
|
|
|
|
|
- # ── Text ingestion ────────────────────────────────────────────────────────
|
|
|
-
|
|
|
def push_final(self, text: str, speaker_id: str | None, mqtt_client: mqtt.Client) -> None:
|
|
|
- """Accept a finalised segment; flush on sentence boundary or speaker change."""
|
|
|
with self._lock:
|
|
|
if speaker_id:
|
|
|
self._seen.add(speaker_id)
|
|
|
@@ -180,7 +170,6 @@ class BridgeState:
|
|
|
self._flush(mqtt_client)
|
|
|
|
|
|
def _flush(self, mqtt_client: mqtt.Client) -> None:
|
|
|
- """Word-wrap buffer → rolling display → publish. Must hold lock."""
|
|
|
text = self._text_buffer.strip()
|
|
|
self._text_buffer = ""
|
|
|
if not text:
|
|
|
@@ -217,17 +206,6 @@ def _is_sentence_end(text: str) -> bool:
|
|
|
return bool(re.search(r'[.!?…]\s*$', text.strip()))
|
|
|
|
|
|
|
|
|
-def _extract_speaker(data: dict) -> str | None:
|
|
|
- if "speaker" in data:
|
|
|
- return data["speaker"] or None
|
|
|
- words = data.get("words", [])
|
|
|
- if words:
|
|
|
- ids = [w.get("speaker") for w in words if w.get("speaker")]
|
|
|
- if ids:
|
|
|
- return Counter(ids).most_common(1)[0][0]
|
|
|
- return None
|
|
|
-
|
|
|
-
|
|
|
# ── MQTT ──────────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def build_mqtt_client() -> mqtt.Client:
|
|
|
@@ -247,74 +225,12 @@ def build_mqtt_client() -> mqtt.Client:
|
|
|
return client
|
|
|
|
|
|
|
|
|
-# ── WebSocket + audio pipeline ────────────────────────────────────────────────
|
|
|
-
|
|
|
-async def _sender(ws, queue: asyncio.Queue) -> None:
|
|
|
- # Send config handshake first — WhisperLiveKit needs this before audio
|
|
|
- config = json.dumps({
|
|
|
- "uid": "bridge-client",
|
|
|
- "language": "en",
|
|
|
- "task": "transcribe",
|
|
|
- "model_size": "large-v3",
|
|
|
- "use_vad": True,
|
|
|
- })
|
|
|
- await ws.send(config)
|
|
|
- # Drain any stale chunks
|
|
|
- while not queue.empty():
|
|
|
- queue.get_nowait()
|
|
|
- while True:
|
|
|
- chunk = await queue.get()
|
|
|
- await ws.send(chunk)
|
|
|
-
|
|
|
-
|
|
|
-async def _receiver(ws, state: BridgeState, mqtt_client: mqtt.Client) -> None:
|
|
|
- async for message in ws:
|
|
|
- try:
|
|
|
- data = json.loads(message)
|
|
|
- except (json.JSONDecodeError, TypeError):
|
|
|
- continue
|
|
|
-
|
|
|
- text = (data.get("text") or data.get("buffer_transcription") or "").strip()
|
|
|
- is_final = data.get("is_final", False) or data.get("end_of_segment", False)
|
|
|
- speaker = _extract_speaker(data)
|
|
|
-
|
|
|
- if is_final and text:
|
|
|
- print(f"[Whisper] ({speaker or '?'}) {text}")
|
|
|
- state.push_final(text, speaker, mqtt_client)
|
|
|
-
|
|
|
-
|
|
|
-async def _flusher(state: BridgeState, mqtt_client: mqtt.Client) -> None:
|
|
|
- while True:
|
|
|
- await asyncio.sleep(1.0)
|
|
|
- state.maybe_timeout_flush(mqtt_client)
|
|
|
-
|
|
|
-
|
|
|
-async def _speaker_reloader(state: BridgeState) -> None:
|
|
|
- """Reload speakers.json every 5 s so admin UI changes take effect live."""
|
|
|
- last_mtime = 0.0
|
|
|
- while True:
|
|
|
- await asyncio.sleep(5.0)
|
|
|
- try:
|
|
|
- mtime = SPEAKERS_FILE.stat().st_mtime
|
|
|
- if mtime != last_mtime:
|
|
|
- fresh = _load_speakers()
|
|
|
- with state._lock:
|
|
|
- state.speaker_names = fresh
|
|
|
- last_mtime = mtime
|
|
|
- print("[Bridge] Speaker names reloaded from disk")
|
|
|
- except OSError:
|
|
|
- pass
|
|
|
-
|
|
|
+# ── Audio pipeline ────────────────────────────────────────────────────────────
|
|
|
|
|
|
def _choose_audio_device() -> int | None:
|
|
|
- """
|
|
|
- List all input devices and return the index to use.
|
|
|
- Prefers AUDIO_DEVICE if set, otherwise the system default,
|
|
|
- otherwise the first device with input channels.
|
|
|
- """
|
|
|
try:
|
|
|
- devices = sd.query_devices()
|
|
|
- default_in = sd.default.device[0] # may be -1 if unset
|
|
|
+ devices = sd.query_devices()
|
|
|
+ default_in = sd.default.device[0]
|
|
|
except Exception as exc:
|
|
|
print(f"[Audio] Cannot query devices: {exc}")
|
|
|
return None
|
|
|
@@ -328,31 +244,22 @@ def _choose_audio_device() -> int | None:
|
|
|
input_devices.append((i, dev["name"]))
|
|
|
|
|
|
if not input_devices:
|
|
|
- print("[Audio] ERROR: No input devices found. Connect a microphone and restart.")
|
|
|
+ print("[Audio] ERROR: No input devices found.")
|
|
|
return None
|
|
|
|
|
|
- # Explicit override from config
|
|
|
if AUDIO_DEVICE is not None:
|
|
|
print(f"[Audio] Using configured device [{AUDIO_DEVICE}]")
|
|
|
return AUDIO_DEVICE
|
|
|
|
|
|
- # System default (if valid)
|
|
|
if default_in >= 0:
|
|
|
print(f"[Audio] Using default input device [{default_in}]")
|
|
|
return default_in
|
|
|
|
|
|
- # Fall back to first available input
|
|
|
idx, name = input_devices[0]
|
|
|
- print(f"[Audio] No system default set — using [{idx}] {name}")
|
|
|
- print("[Audio] To choose a different device, set AUDIO_DEVICE in bridge.py")
|
|
|
+ print(f"[Audio] No system default — using [{idx}] {name}")
|
|
|
return idx
|
|
|
|
|
|
|
|
|
-# Remove the WebSocket audio sender entirely.
|
|
|
-# Use sounddevice → AudioProcessor directly via the Python API.
|
|
|
-
|
|
|
-from whisperlivekit import AudioProcessor, TranscriptionEngine
|
|
|
-
|
|
|
async def audio_processor_loop(state: BridgeState, mqtt_client: mqtt.Client, engine: TranscriptionEngine) -> None:
|
|
|
audio_queue: asyncio.Queue[bytes] = asyncio.Queue(maxsize=120)
|
|
|
loop = asyncio.get_running_loop()
|
|
|
@@ -360,7 +267,7 @@ async def audio_processor_loop(state: BridgeState, mqtt_client: mqtt.Client, eng
|
|
|
def audio_callback(indata: np.ndarray, frames: int, time_info, status) -> None:
|
|
|
if status:
|
|
|
print(f"[Audio] {status}")
|
|
|
- chunk = indata.tobytes() # raw s16le
|
|
|
+ chunk = indata.tobytes()
|
|
|
loop.call_soon_threadsafe(
|
|
|
lambda: audio_queue.put_nowait(chunk) if not audio_queue.full() else None
|
|
|
)
|
|
|
@@ -370,12 +277,11 @@ async def audio_processor_loop(state: BridgeState, mqtt_client: mqtt.Client, eng
|
|
|
print("[Audio] No input device — cannot start.")
|
|
|
return
|
|
|
|
|
|
- audio_processor = AudioProcessor(transcription_engine=engine)
|
|
|
- results_generator = await audio_processor.create_tasks()
|
|
|
+ audio_processor = AudioProcessor(transcription_engine=engine)
|
|
|
+ results_generator = await audio_processor.create_tasks()
|
|
|
|
|
|
async def _receive_results():
|
|
|
async for response in results_generator:
|
|
|
- # response is a FrontData dataclass, not a dict
|
|
|
text = (getattr(response, "text", None) or getattr(response, "buffer_transcription", None) or "").strip()
|
|
|
is_final = getattr(response, "is_final", False) or getattr(response, "end_of_segment", False)
|
|
|
speaker = getattr(response, "speaker", None)
|
|
|
@@ -384,20 +290,15 @@ async def audio_processor_loop(state: BridgeState, mqtt_client: mqtt.Client, eng
|
|
|
state.push_final(text, speaker, mqtt_client)
|
|
|
|
|
|
async def _send_audio():
|
|
|
- global _inject_queue
|
|
|
- _inject_queue = asyncio.Queue(maxsize=240)
|
|
|
- global test_audio_queue
|
|
|
- test_audio_queue = asyncio.Queue(maxsize=240)
|
|
|
with sd.InputStream(
|
|
|
device=device, samplerate=SAMPLE_RATE, channels=CHANNELS,
|
|
|
- dtype="int16", # s16le — matches pcm_input mode
|
|
|
- blocksize=BLOCKSIZE, callback=audio_callback,
|
|
|
+ dtype="int16", blocksize=BLOCKSIZE, callback=audio_callback,
|
|
|
):
|
|
|
while True:
|
|
|
- # Drain test audio injection first if available
|
|
|
+ # Injected test audio takes priority over live microphone
|
|
|
try:
|
|
|
- chunk = test_audio_queue.get_nowait()
|
|
|
- except asyncio.QueueEmpty:
|
|
|
+ chunk = _inject_queue.get_nowait()
|
|
|
+ except _stdlib_queue.Empty:
|
|
|
chunk = await audio_queue.get()
|
|
|
await audio_processor.process_audio(chunk)
|
|
|
|
|
|
@@ -410,35 +311,57 @@ async def audio_processor_loop(state: BridgeState, mqtt_client: mqtt.Client, eng
|
|
|
reloader.cancel()
|
|
|
|
|
|
|
|
|
-def run_async_loop(state: BridgeState, mqtt_client: mqtt.Client) -> None:
|
|
|
- asyncio.run(audio_ws_loop(state, mqtt_client))
|
|
|
+async def _flusher(state: BridgeState, mqtt_client: mqtt.Client) -> None:
|
|
|
+ while True:
|
|
|
+ await asyncio.sleep(1.0)
|
|
|
+ state.maybe_timeout_flush(mqtt_client)
|
|
|
+
|
|
|
+
|
|
|
+async def _speaker_reloader(state: BridgeState) -> None:
|
|
|
+ last_mtime = 0.0
|
|
|
+ while True:
|
|
|
+ await asyncio.sleep(5.0)
|
|
|
+ try:
|
|
|
+ mtime = SPEAKERS_FILE.stat().st_mtime
|
|
|
+ if mtime != last_mtime:
|
|
|
+ fresh = _load_speakers()
|
|
|
+ with state._lock:
|
|
|
+ state.speaker_names = fresh
|
|
|
+ last_mtime = mtime
|
|
|
+ print("[Bridge] Speaker names reloaded from disk")
|
|
|
+ except OSError:
|
|
|
+ pass
|
|
|
|
|
|
|
|
|
# ── Entry point ───────────────────────────────────────────────────────────────
|
|
|
|
|
|
def main() -> None:
|
|
|
- from whisperlivekit import TranscriptionEngine
|
|
|
state = BridgeState()
|
|
|
mqtt_client = build_mqtt_client()
|
|
|
engine = TranscriptionEngine(model_size="large-v3", lan="en", diarization=False, pcm_input=True)
|
|
|
|
|
|
+ # Inject API must start before the audio loop so test playback works immediately
|
|
|
+ def _run_inject_api():
|
|
|
+ uvicorn.run(_bridge_app, host="127.0.0.1", port=8002, log_level="warning")
|
|
|
+
|
|
|
+ inject_thread = threading.Thread(target=_run_inject_api, daemon=True)
|
|
|
+ inject_thread.start()
|
|
|
+ print("[Bridge] Test audio inject API at http://127.0.0.1:8002")
|
|
|
+
|
|
|
def _run():
|
|
|
asyncio.run(audio_processor_loop(state, mqtt_client, engine))
|
|
|
|
|
|
ws_thread = threading.Thread(target=_run, daemon=True)
|
|
|
ws_thread.start()
|
|
|
- print("[Bridge] Audio pipeline running")
|
|
|
+ print(f"[Bridge] Speaker names loaded from {SPEAKERS_FILE}")
|
|
|
+ print("[Bridge] Audio pipeline running — speaker admin at http://localhost:8001")
|
|
|
+ print("[Bridge] Close this window to quit")
|
|
|
+
|
|
|
try:
|
|
|
ws_thread.join()
|
|
|
except KeyboardInterrupt:
|
|
|
pass
|
|
|
|
|
|
- def _run_inject_api():
|
|
|
- uvicorn.run(_bridge_app, host="127.0.0.1", port=8002, log_level="warning")
|
|
|
-
|
|
|
- inject_thread = threading.Thread(target=_run_inject_api, daemon=True)
|
|
|
- inject_thread.start()
|
|
|
-
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
main()
|