The README shows voice enrolment v2 is the next planned feature, and per-speaker voice sample upload already exists in admin.py. That's the right foundation to build on.
To answer your core question first: No, don't store the embedding in speakers.json. A speaker embedding is a float32 numpy array of 512 dimensions — about 2KB of binary data. JSON can technically hold it as a list of floats but it becomes unwieldy and slow to load. The clean approach is:
bridge/
├── speakers.json ← names, metadata (text only, fast to load)
└── embeddings/
├── SPEAKER_00.npy ← numpy binary, one file per speaker
├── SPEAKER_01.npy
└── SPEAKER_02.npy
When you later move to a remote DB, the .npy files become BLOB columns or vector embeddings in something like pgvector. The speakers.json structure stays identical — you just add an "has_embedding": true flag.
This is the right approach — far better than asking someone to "say a sentence". Here's the full picture:
The tool: pyannote.audio's SpeakerEmbedding pipeline — already in your venv as a dependency of diart. It takes an audio segment and returns a 512-dimensional embedding vector that acts as a voice fingerprint.
The process for extracting from an existing recording:
SpeakerEmbedding pipeline.npy file to embeddings/SPEAKER_XX.npyAt runtime during a service:
When diart assigns a SPEAKER_XX label, bridge.py checks if an embedding exists for any unmatched speaker ID. If the current speaker's embedding (accumulated over ~5 seconds of live speech) cosine-similarity matches a stored embedding above threshold (~0.82), it auto-assigns the name without operator input.
speakers.json Schema ChangeExtend it from the current simple {id: name} to:
json
{
"SPEAKER_00": {
"name": "Pastor John",
"has_embedding": true,
"embedding_updated": "2026-05-04T09:32:00",
"colour": "#2563eb",
"notes": "Senior pastor"
},
"SPEAKER_01": {
"name": "Mary (Reader)",
"has_embedding": false,
"embedding_updated": null,
"colour": "#16a34a",
"notes": ""
}
}
The bridge reads has_embedding to decide whether to attempt auto-matching. The actual vector lives in embeddings/SPEAKER_XX.npy.
This is a 3-part addition:
Part 1 — embeddings.py (new file, ~80 lines) A small module that handles extraction, storage and matching:
python
# Core interface
extract_embedding(audio_path, start_sec, end_sec) -> np.ndarray
save_embedding(speaker_id, embedding)
load_embedding(speaker_id) -> np.ndarray | None
find_best_match(live_embedding, threshold=0.82) -> tuple[str, float] | None
Part 2 — Admin UI additions On each speaker row, add a "Extract from recording" button that lets the operator:
.npyPart 3 — Bridge auto-matching In push_final(), when a new unknown speaker appears, compare their accumulated audio embedding against all stored profiles before prompting the operator. If a match is found above threshold, auto-assign the name silently.
Want me to write embeddings.py first, then the admin UI changes, then the bridge matching logic? The embeddings.py module is the cleanest starting point — it's self-contained and we can test it standalone before wiring it into the UI.
ok lets get teh embeddings.py under way
3:30 PM
Let me check the current bridge structure first before writing it.
Searched the web
Searched the web
The repo is requiring login for raw file access now. No matter — I have enough context from the session and the README. Let me write the module.
Viewed a file, created a file, read a file
Viewed a file, created a file, read a file
Here's what's in it and why each piece is designed the way it is.
Storage — .npy files in bridge/embeddings/, one per speaker. speakers.json gains has_embedding, embedding_updated, colour, and notes fields but the schema is backward compatible — existing flat {"SPEAKER_00": "Pastor John"} entries are migrated transparently via _normalise_speaker_entry().
Extraction — extract() uses miniaudio (already in your venv) to pull a segment by start/end seconds, writes a temp WAV, feeds it to pyannote's SpeakerEmbedding pipeline, then deletes the temp file. The pipeline is lazy-loaded on first call so import is fast.
LiveEmbeddingAccumulator — this is the piece that plugs into bridge.py. When diart announces a new SPEAKER_XX, bridge creates an accumulator, feeds it PCM chunks via push(), and once ready() returns True (5 seconds by default) calls extract_embedding() + registry.find_match(). If it matches an enrolled speaker, the name is auto-assigned without operator prompt.
CLI — the file is executable standalone for testing without starting the full stack:
bash
# Enrol from seconds 45–55 of last Sunday's recording
python bridge\embeddings.py enrol SPEAKER_00 test_recordings\service.mp3 --start 45 --end 55
# Check what it matched against all enrolled speakers
python bridge\embeddings.py scores test_recordings\service.mp3 --start 45 --end 55
# List who's enrolled
python bridge\embeddings.py list
Next steps are the admin UI additions (a segment picker per speaker row) and wiring LiveEmbeddingAccumulator into bridge.py's push_final() path.
The embedding captures acoustic characteristics of a voice — things like fundamental frequency, formant patterns, speaking rhythm, vocal tract shape. It has nothing to do with the words spoken. So a person saying "the quick brown fox" and saying "let us pray" would produce very similar embeddings, because it's the voice not the words being fingerprinted.
So transcription accuracy doesn't directly improve the embedding quality.
The factors that matter are:
| Factor | Impact |
| Audio cleanliness | Highest — noise, reverb, music bleed all degrade it |
| Segment length | 10–20s is the sweet spot; diminishing returns after 30s |
| Single speaker | Must be isolated — no overlapping voices in the segment |
| Consistent mic distance | Close-mic (pulpit mic) is better than room mic |
| Multiple samples averaged | Enrolling 2–3 separate clips and averaging the embeddings is more robust than one clip |
Here's the indirect benefit you're probably intuiting:
If you know what was said, you can verify the segment is clean. For example:
[SPEAKER_00] "Let us open in prayer..." — you know that's isolated speech, no congregation response, no overlap. That's a good enrolment segment.[SPEAKER_00] "[inaudible] ...mmm... [SPEAKER_01]" — overlapping speakers, bad segment, skip it.So the workflow becomes:
1. Run a test recording through the pipeline
2. Read the transcript output to find clean, isolated segments per speaker
3. Note the timestamps of those clean segments
4. Use embeddings.py enrol with those specific timestamps
This is actually a much better enrolment strategy than asking someone to record a sample — because the transcript lets you cherry-pick the cleanest real-world speech from an actual service.
We could add a enrol_from_transcript() helper that takes the transcript log (which bridge.py already produces) and automatically picks the best segment per speaker — longest clean run with no speaker change within it:
python
def enrol_from_transcript(
speaker_id: str,
audio_path: Path,
transcript_segments: list[dict], # [{speaker, start, end, text}, ...]
min_duration: float = 8.0,
) -> np.ndarray:
"""
Find the longest clean single-speaker segment in a transcript
and use it for enrolment. Avoids segments near speaker changes.
"""
candidates = [
s for s in transcript_segments
if s["speaker"] == speaker_id
and (s["end"] - s["start"]) >= min_duration
]
if not candidates:
raise ValueError(f"No clean segment >= {min_duration}s found for {speaker_id}")
# Pick longest segment
best = max(candidates, key=lambda s: s["end"] - s["start"])
return self.extract_and_save(speaker_id, audio_path, best["start"], best["end"])
This would mean after every test recording run, bridge.py logs timestamped segments, and the admin UI could show a "Use best segment for enrolment" button per speaker — fully automatic, no timestamp hunting needed.
Want me to add that to embeddings.py and also add the transcript segment logging to bridge.py so the timestamps are captured?
ok lets update them both let me know where i need to put the block into the bridge.py and embeddings.py
3:47 PM
Let me fetch the current bridge.py first so I place the changes exactly right.
Failed to fetch: https://git.tazz.com.au/benjamin.harris/Deaf_Transcription_Service/src/main/bridge/bridge.py
Failed to fetch: https://git.tazz.com.au/benjamin.harris/Deaf_Transcription_Service/src/main/bridge/bridge.py
The repo is still requiring login for file access. Can you paste the current bridge.py content here? Specifically I need to see:
push_final() method in BridgeState_receive_results() async functionmain() functionThose are the three places I need to insert code. While you grab those, here's the complete updated embeddings.py with the transcript-based enrolment added:
Ran a command, created a file, read a file
Ran a command, created a file, read a file
Good. Now for the bridge.py changes — here are the exact three insertions you need to make. Since I can't see the current file I'll give you find/replace targets that should be unambiguous:
Find the block of imports at the top (near import json, import re etc). Add after the last import line:
python
from embeddings import (
EmbeddingRegistry,
LiveEmbeddingAccumulator,
log_transcript_segment,
)
BridgeState.__init__Find:
python
self._last_final_time = time.monotonic()
Add immediately after it:
python
# Voiceprint matching
self._embedding_registry = EmbeddingRegistry()
# Per-speaker accumulators: speaker_id -> LiveEmbeddingAccumulator
self._accumulators: dict[str, LiveEmbeddingAccumulator] = {}
# Speaker IDs confirmed this session (skip re-matching)
self._confirmed_ids: set[str] = set()
# Session ID for transcript logging (date string)
self._session_id: str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
You'll also need to add from datetime import datetime, timezone to the imports if it's not already there.
push_final()Find the line inside push_final() that reads something like:
python
self._last_final_time = time.monotonic()
Just after that line, add:
python
# ── Transcript segment logging ──────────────────────────────────────────
# Log every finalised segment for later voiceprint enrolment
# start_sec/end_sec from FrontData if available, else estimate from time
seg_start = getattr(response, "start", None)
seg_end = getattr(response, "end", None)
if seg_start is None:
seg_end = time.monotonic() - self._last_final_time
seg_start = max(0.0, seg_end - len(text.split()) * 0.4)
log_transcript_segment(
speaker_id = speaker_id or "UNKNOWN",
text = text,
start_sec = seg_start or 0.0,
end_sec = seg_end or 0.0,
session_id = self._session_id,
)
# ── Live embedding accumulation for auto speaker matching ───────────────
if speaker_id and speaker_id not in self._confirmed_ids:
if speaker_id not in self._accumulators:
self._accumulators[speaker_id] = \
self._embedding_registry.make_accumulator(min_seconds=5.0)
# We don't have raw PCM here — accumulation happens in _send_audio
# This is a placeholder; see Change 4 for the PCM-level accumulation
_send_audioThis is the most important change — accumulating actual audio per speaker. Find inside audio_processor_loop the _send_audio coroutine. Find the line that calls:
python
await _audio_processor.process_audio(chunk)
Replace it with:
python
await _audio_processor.process_audio(chunk)
# Feed chunk to the accumulator for the current speaker
# _current_speaker_id is set by _receive_results as segments arrive
current_spk = state.get_current_speaker_id()
if current_spk and current_spk not in state._confirmed_ids:
if current_spk not in state._accumulators:
state._accumulators[current_spk] = \
state._embedding_registry.make_accumulator(min_seconds=5.0)
state._accumulators[current_spk].push(chunk)
if state._accumulators[current_spk].ready():
try:
live_emb = state._accumulators[current_spk].extract_embedding()
match = state._embedding_registry.find_match(live_emb)
if match:
matched_id, score = match
resolved_name = state.speaker_names.get(matched_id, matched_id)
print(
f"[Embeddings] Auto-matched {current_spk} → "
f"{resolved_name} (score={score:.3f})"
)
state.set_speaker_name(current_spk, resolved_name)
state._confirmed_ids.add(current_spk)
else:
# No match — reset accumulator to try again with more audio
state._accumulators[current_spk].reset()
except Exception as exc:
logger.warning(f"[Embeddings] Accumulator error: {exc}")
And add this method to BridgeState:
python
def get_current_speaker_id(self) -> str | None:
"""Return the raw diarization ID (SPEAKER_XX) of the current speaker."""
with self._lock:
return self._raw_speaker_id # see below
And add self._raw_speaker_id: str | None = None to __init__, then in push_final() add:
python
self._raw_speaker_id = speaker_id # track raw ID for accumulator
at the top of the with self._lock: block.
That's all four changes. The key file to copy into bridge/ is embeddings.py.
Here are the four precise changes:
Change 1 — Imports (top of file)
Find:
python
from collections import Counter from pathlib import Path
Replace with:
python
from collections import Counter from datetime import datetime, timezone from pathlib import Path
Then find:
python
from whisperlivekit import AudioProcessor, TranscriptionEngine
Replace with:
python
from whisperlivekit import AudioProcessor, TranscriptionEngine from embeddings import (
EmbeddingRegistry,
log_transcript_segment,
)
Change 2 — BridgeState.init
Find:
python
self._last_final_time = time.monotonic()
Replace with:
python
self._last_final_time = time.monotonic()
# Raw diarization ID of current speaker (SPEAKER_XX)
self._raw_speaker_id: str | None = None
# Voiceprint matching
self._embedding_registry = EmbeddingRegistry()
self._accumulators: dict[str, object] = {}
self._confirmed_ids: set[str] = set()
self._session_id: str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
Change 3 — push_final() method
Find:
python
def push_final(self, text: str, speaker_id: str | None, mqtt_client: mqtt.Client) -> None:
"""Accept a finalised segment; flush on sentence boundary or speaker change."""
with self._lock:
if speaker_id:
self._seen.add(speaker_id)
resolved = self._resolve(speaker_id)
Replace with:
python
def push_final(self, text: str, speaker_id: str | None, mqtt_client: mqtt.Client,
seg_start: float = 0.0, seg_end: float = 0.0) -> None:
"""Accept a finalised segment; flush on sentence boundary or speaker change."""
with self._lock:
# Track raw diarization ID for PCM accumulator
self._raw_speaker_id = speaker_id
if speaker_id:
self._seen.add(speaker_id)
# Log segment to transcript_segments.jsonl for later enrolment
log_transcript_segment(
speaker_id = speaker_id or "UNKNOWN",
text = text,
start_sec = seg_start,
end_sec = seg_end,
session_id = self._session_id,
)
resolved = self._resolve(speaker_id)
Change 4 — _receive_results() — pass timestamps to push_final
Find:
python
async def _receive_results():
async for response in results_generator:
# response is a FrontData dataclass, not a dict
text = (getattr(response, "text", None) or getattr(response, "buffer_transcription", None) or "").strip()
is_final = getattr(response, "is_final", False) or getattr(response, "end_of_segment", False)
speaker = getattr(response, "speaker", None)
if is_final and text:
print(f"[Whisper] ({speaker or '?'}) {text}")
state.push_final(text, speaker, mqtt_client)
Replace with:
python
async def _receive_results():
async for response in results_generator:
text = (getattr(response, "text", None) or
getattr(response, "buffer_transcription", None) or "").strip()
is_final = getattr(response, "is_final", False) or \
getattr(response, "end_of_segment", False)
speaker = getattr(response, "speaker", None)
# Grab timestamps if FrontData provides them
seg_start = float(getattr(response, "start", 0.0) or 0.0)
seg_end = float(getattr(response, "end", 0.0) or 0.0)
if is_final and text:
print(f"[Whisper] ({speaker or '?'}) {text}")
state.push_final(text, speaker, mqtt_client, seg_start, seg_end)
Change 5 — _send_audio() — PCM accumulation for live matching
Find:
python
while True:
# Drain test audio injection first if available
try:
chunk = test_audio_queue.get_nowait()
except asyncio.QueueEmpty:
chunk = await audio_queue.get()
await audio_processor.process_audio(chunk)
Replace with:
python
while True:
# Drain test audio injection first if available
try:
chunk = test_audio_queue.get_nowait()
except asyncio.QueueEmpty:
chunk = await audio_queue.get()
await audio_processor.process_audio(chunk)
# Accumulate PCM for live speaker matching
current_spk = state._raw_speaker_id
if current_spk and current_spk not in state._confirmed_ids:
if current_spk not in state._accumulators:
state._accumulators[current_spk] = \
state._embedding_registry.make_accumulator(min_seconds=5.0)
state._accumulators[current_spk].push(chunk)
if state._accumulators[current_spk].ready():
try:
live_emb = state._accumulators[current_spk].extract_embedding()
match = state._embedding_registry.find_match(live_emb)
if match:
matched_id, score = match
resolved = state.speaker_names.get(matched_id, matched_id)
print(
f"[Embeddings] Auto-matched {current_spk} → "
f"{resolved} (score={score:.3f})"
)
with state._lock:
state.speaker_names[current_spk] = resolved
state._confirmed_ids.add(current_spk)
_write_speakers(state.speaker_names)
else:
# No match yet — reset and try again with more audio
state._accumulators[current_spk].reset()
except Exception as exc:
print(f"[Embeddings] Accumulator error: {exc}")
state._accumulators[current_spk].reset()
That's all five changes. The dtype in audio_callback is currently float32 but the accumulator expects int16 — note that Change 5 feeds whatever chunk is into the accumulator. The LiveEmbeddingAccumulator.push() and extract_embedding() write a WAV file assuming int16, so you'll want to make sure the chunk is int16 before pushing. Add this line right before state._accumulators[current_spk].push(chunk):
python
# Convert float32 → int16 for the embedding accumulator
chunk_i16 = (np.frombuffer(chunk, dtype=np.float32) * 32767).astype(np.int16).tobytes()
state._accumulators[current_spk].push(chunk_i16)
And change the .push(chunk) call to .push(chunk_i16).
Once those are in, drop embeddings.py into the bridge/ folder alongside bridge.py and it will be importable. The transcript log will start filling up from the first run, ready for auto-enrol once you have a recording to pull segments from.
What needs to be added: