Embedding.md 23 KB

The README shows voice enrolment v2 is the next planned feature, and per-speaker voice sample upload already exists in admin.py. That's the right foundation to build on.


Voiceprint / Speaker Embedding Design

To answer your core question first: No, don't store the embedding in speakers.json. A speaker embedding is a float32 numpy array of 512 dimensions — about 2KB of binary data. JSON can technically hold it as a list of floats but it becomes unwieldy and slow to load. The clean approach is:

bridge/
├── speakers.json          ← names, metadata (text only, fast to load)
└── embeddings/
    ├── SPEAKER_00.npy     ← numpy binary, one file per speaker
    ├── SPEAKER_01.npy
    └── SPEAKER_02.npy

When you later move to a remote DB, the .npy files become BLOB columns or vector embeddings in something like pgvector. The speakers.json structure stays identical — you just add an "has_embedding": true flag.


Extracting a Voiceprint From a 10-Second Snip

This is the right approach — far better than asking someone to "say a sentence". Here's the full picture:

The tool: pyannote.audio's SpeakerEmbedding pipeline — already in your venv as a dependency of diart. It takes an audio segment and returns a 512-dimensional embedding vector that acts as a voice fingerprint.

The process for extracting from an existing recording:

  1. Operator opens admin.py, navigates to a speaker
  2. Selects a timestamp range from a test recording (e.g. "00:45 → 00:55")
  3. Admin server extracts that 10s segment using miniaudio or ffmpeg
  4. Passes it to SpeakerEmbedding pipeline
  5. Saves the resulting .npy file to embeddings/SPEAKER_XX.npy

At runtime during a service:

When diart assigns a SPEAKER_XX label, bridge.py checks if an embedding exists for any unmatched speaker ID. If the current speaker's embedding (accumulated over ~5 seconds of live speech) cosine-similarity matches a stored embedding above threshold (~0.82), it auto-assigns the name without operator input.


The speakers.json Schema Change

Extend it from the current simple {id: name} to:

json

{
  "SPEAKER_00": {
    "name": "Pastor John",
    "has_embedding": true,
    "embedding_updated": "2026-05-04T09:32:00",
    "colour": "#2563eb",
    "notes": "Senior pastor"
  },
  "SPEAKER_01": {
    "name": "Mary (Reader)",
    "has_embedding": false,
    "embedding_updated": null,
    "colour": "#16a34a",
    "notes": ""
  }
}

The bridge reads has_embedding to decide whether to attempt auto-matching. The actual vector lives in embeddings/SPEAKER_XX.npy.


Implementation Plan

This is a 3-part addition:

Part 1 — embeddings.py (new file, ~80 lines) A small module that handles extraction, storage and matching:

python

# Core interface
extract_embedding(audio_path, start_sec, end_sec) -> np.ndarray
save_embedding(speaker_id, embedding)
load_embedding(speaker_id) -> np.ndarray | None
find_best_match(live_embedding, threshold=0.82) -> tuple[str, float] | None

Part 2 — Admin UI additions On each speaker row, add a "Extract from recording" button that lets the operator:

  • Pick an uploaded test recording from a dropdown
  • Set a start/end time (or use a simple "pick a 10s window" slider)
  • Click "Extract" — server processes it and saves the .npy
  • Row updates to show "✓ Voiceprint saved"

Part 3 — Bridge auto-matching In push_final(), when a new unknown speaker appears, compare their accumulated audio embedding against all stored profiles before prompting the operator. If a match is found above threshold, auto-assign the name silently.


Want me to write embeddings.py first, then the admin UI changes, then the bridge matching logic? The embeddings.py module is the cleanest starting point — it's self-contained and we can test it standalone before wiring it into the UI.

You said: ok lets get teh embeddings.

ok lets get teh embeddings.py under way

3:30 PM

Claude responded: Let me check the current bridge structure first before writing it.

Let me check the current bridge structure first before writing it.

Searched the web

Searched the web

The repo is requiring login for raw file access now. No matter — I have enough context from the session and the README. Let me write the module.

Viewed a file, created a file, read a file

Viewed a file, created a file, read a file

Here's what's in it and why each piece is designed the way it is.

Storage.npy files in bridge/embeddings/, one per speaker. speakers.json gains has_embedding, embedding_updated, colour, and notes fields but the schema is backward compatible — existing flat {"SPEAKER_00": "Pastor John"} entries are migrated transparently via _normalise_speaker_entry().

Extractionextract() uses miniaudio (already in your venv) to pull a segment by start/end seconds, writes a temp WAV, feeds it to pyannote's SpeakerEmbedding pipeline, then deletes the temp file. The pipeline is lazy-loaded on first call so import is fast.

LiveEmbeddingAccumulator — this is the piece that plugs into bridge.py. When diart announces a new SPEAKER_XX, bridge creates an accumulator, feeds it PCM chunks via push(), and once ready() returns True (5 seconds by default) calls extract_embedding() + registry.find_match(). If it matches an enrolled speaker, the name is auto-assigned without operator prompt.

CLI — the file is executable standalone for testing without starting the full stack:

bash

# Enrol from seconds 45–55 of last Sunday's recording
python bridge\embeddings.py enrol SPEAKER_00 test_recordings\service.mp3 --start 45 --end 55

# Check what it matched against all enrolled speakers
python bridge\embeddings.py scores test_recordings\service.mp3 --start 45 --end 55

# List who's enrolled
python bridge\embeddings.py list

Next steps are the admin UI additions (a segment picker per speaker row) and wiring LiveEmbeddingAccumulator into bridge.py's push_final() path.

What a Voiceprint Actually Is

The embedding captures acoustic characteristics of a voice — things like fundamental frequency, formant patterns, speaking rhythm, vocal tract shape. It has nothing to do with the words spoken. So a person saying "the quick brown fox" and saying "let us pray" would produce very similar embeddings, because it's the voice not the words being fingerprinted.

So transcription accuracy doesn't directly improve the embedding quality.


What Does Improve Embedding Quality

The factors that matter are:

Factor Impact
Audio cleanliness Highest — noise, reverb, music bleed all degrade it
Segment length 10–20s is the sweet spot; diminishing returns after 30s
Single speaker Must be isolated — no overlapping voices in the segment
Consistent mic distance Close-mic (pulpit mic) is better than room mic
Multiple samples averaged Enrolling 2–3 separate clips and averaging the embeddings is more robust than one clip

Where Transcription Does Help — Indirectly

Here's the indirect benefit you're probably intuiting:

If you know what was said, you can verify the segment is clean. For example:

  • If the transcript shows [SPEAKER_00] "Let us open in prayer..." — you know that's isolated speech, no congregation response, no overlap. That's a good enrolment segment.
  • If the transcript shows [SPEAKER_00] "[inaudible] ...mmm... [SPEAKER_01]" — overlapping speakers, bad segment, skip it.

So the workflow becomes:

1. Run a test recording through the pipeline
2. Read the transcript output to find clean, isolated segments per speaker
3. Note the timestamps of those clean segments
4. Use embeddings.py enrol with those specific timestamps

This is actually a much better enrolment strategy than asking someone to record a sample — because the transcript lets you cherry-pick the cleanest real-world speech from an actual service.


Practical Addition to embeddings.py

We could add a enrol_from_transcript() helper that takes the transcript log (which bridge.py already produces) and automatically picks the best segment per speaker — longest clean run with no speaker change within it:

python

def enrol_from_transcript(
    speaker_id: str,
    audio_path: Path,
    transcript_segments: list[dict],  # [{speaker, start, end, text}, ...]
    min_duration: float = 8.0,
) -> np.ndarray:
    """
    Find the longest clean single-speaker segment in a transcript
    and use it for enrolment. Avoids segments near speaker changes.
    """
    candidates = [
        s for s in transcript_segments
        if s["speaker"] == speaker_id
        and (s["end"] - s["start"]) >= min_duration
    ]
    if not candidates:
        raise ValueError(f"No clean segment >= {min_duration}s found for {speaker_id}")
    
    # Pick longest segment
    best = max(candidates, key=lambda s: s["end"] - s["start"])
    return self.extract_and_save(speaker_id, audio_path, best["start"], best["end"])

This would mean after every test recording run, bridge.py logs timestamped segments, and the admin UI could show a "Use best segment for enrolment" button per speaker — fully automatic, no timestamp hunting needed.

Want me to add that to embeddings.py and also add the transcript segment logging to bridge.py so the timestamps are captured?

You said: ok lets update them both let me know where i need to put the block into the bridge.

ok lets update them both let me know where i need to put the block into the bridge.py and embeddings.py

3:47 PM

Claude responded: Let me fetch the current bridge.

Let me fetch the current bridge.py first so I place the changes exactly right.

Failed to fetch: https://git.tazz.com.au/benjamin.harris/Deaf_Transcription_Service/src/main/bridge/bridge.py

Failed to fetch: https://git.tazz.com.au/benjamin.harris/Deaf_Transcription_Service/src/main/bridge/bridge.py

The repo is still requiring login for file access. Can you paste the current bridge.py content here? Specifically I need to see:

  1. The push_final() method in BridgeState
  2. The _receive_results() async function
  3. The main() function

Those are the three places I need to insert code. While you grab those, here's the complete updated embeddings.py with the transcript-based enrolment added:

Ran a command, created a file, read a file

Ran a command, created a file, read a file

Good. Now for the bridge.py changes — here are the exact three insertions you need to make. Since I can't see the current file I'll give you find/replace targets that should be unambiguous:


bridge.py — Change 1: Add import at the top

Find the block of imports at the top (near import json, import re etc). Add after the last import line:

python

from embeddings import (
    EmbeddingRegistry,
    LiveEmbeddingAccumulator,
    log_transcript_segment,
)

bridge.py — Change 2: Add registry + accumulator tracking to BridgeState.__init__

Find:

python

self._last_final_time = time.monotonic()

Add immediately after it:

python

# Voiceprint matching
self._embedding_registry = EmbeddingRegistry()
# Per-speaker accumulators: speaker_id -> LiveEmbeddingAccumulator
self._accumulators: dict[str, LiveEmbeddingAccumulator] = {}
# Speaker IDs confirmed this session (skip re-matching)
self._confirmed_ids: set[str] = set()
# Session ID for transcript logging (date string)
self._session_id: str = datetime.now(timezone.utc).strftime("%Y-%m-%d")

You'll also need to add from datetime import datetime, timezone to the imports if it's not already there.


bridge.py — Change 3: Add segment logging + embedding accumulation to push_final()

Find the line inside push_final() that reads something like:

python

self._last_final_time = time.monotonic()

Just after that line, add:

python

# ── Transcript segment logging ──────────────────────────────────────────
# Log every finalised segment for later voiceprint enrolment
# start_sec/end_sec from FrontData if available, else estimate from time
seg_start = getattr(response, "start", None)
seg_end   = getattr(response, "end",   None)
if seg_start is None:
    seg_end   = time.monotonic() - self._last_final_time
    seg_start = max(0.0, seg_end - len(text.split()) * 0.4)
log_transcript_segment(
    speaker_id = speaker_id or "UNKNOWN",
    text       = text,
    start_sec  = seg_start or 0.0,
    end_sec    = seg_end   or 0.0,
    session_id = self._session_id,
)

# ── Live embedding accumulation for auto speaker matching ───────────────
if speaker_id and speaker_id not in self._confirmed_ids:
    if speaker_id not in self._accumulators:
        self._accumulators[speaker_id] = \
            self._embedding_registry.make_accumulator(min_seconds=5.0)
    # We don't have raw PCM here — accumulation happens in _send_audio
    # This is a placeholder; see Change 4 for the PCM-level accumulation

bridge.py — Change 4: PCM-level accumulation in _send_audio

This is the most important change — accumulating actual audio per speaker. Find inside audio_processor_loop the _send_audio coroutine. Find the line that calls:

python

await _audio_processor.process_audio(chunk)

Replace it with:

python

await _audio_processor.process_audio(chunk)

# Feed chunk to the accumulator for the current speaker
# _current_speaker_id is set by _receive_results as segments arrive
current_spk = state.get_current_speaker_id()
if current_spk and current_spk not in state._confirmed_ids:
    if current_spk not in state._accumulators:
        state._accumulators[current_spk] = \
            state._embedding_registry.make_accumulator(min_seconds=5.0)
    state._accumulators[current_spk].push(chunk)
    if state._accumulators[current_spk].ready():
        try:
            live_emb = state._accumulators[current_spk].extract_embedding()
            match    = state._embedding_registry.find_match(live_emb)
            if match:
                matched_id, score = match
                resolved_name = state.speaker_names.get(matched_id, matched_id)
                print(
                    f"[Embeddings] Auto-matched {current_spk} → "
                    f"{resolved_name} (score={score:.3f})"
                )
                state.set_speaker_name(current_spk, resolved_name)
                state._confirmed_ids.add(current_spk)
            else:
                # No match — reset accumulator to try again with more audio
                state._accumulators[current_spk].reset()
        except Exception as exc:
            logger.warning(f"[Embeddings] Accumulator error: {exc}")

And add this method to BridgeState:

python

def get_current_speaker_id(self) -> str | None:
    """Return the raw diarization ID (SPEAKER_XX) of the current speaker."""
    with self._lock:
        return self._raw_speaker_id  # see below

And add self._raw_speaker_id: str | None = None to __init__, then in push_final() add:

python

self._raw_speaker_id = speaker_id  # track raw ID for accumulator

at the top of the with self._lock: block.


That's all four changes. The key file to copy into bridge/ is embeddings.py.

Here are the four precise changes:

Change 1 — Imports (top of file)

Find:

python

from collections import Counter from pathlib import Path

Replace with:

python

from collections import Counter from datetime import datetime, timezone from pathlib import Path

Then find:

python

from whisperlivekit import AudioProcessor, TranscriptionEngine

Replace with:

python

from whisperlivekit import AudioProcessor, TranscriptionEngine from embeddings import (

EmbeddingRegistry,
log_transcript_segment,

)

Change 2 — BridgeState.init

Find:

python

    self._last_final_time              = time.monotonic()

Replace with:

python

    self._last_final_time              = time.monotonic()
    # Raw diarization ID of current speaker (SPEAKER_XX)
    self._raw_speaker_id: str | None   = None
    # Voiceprint matching
    self._embedding_registry           = EmbeddingRegistry()
    self._accumulators: dict[str, object] = {}
    self._confirmed_ids: set[str]      = set()
    self._session_id: str              = datetime.now(timezone.utc).strftime("%Y-%m-%d")

Change 3 — push_final() method

Find:

python

def push_final(self, text: str, speaker_id: str | None, mqtt_client: mqtt.Client) -> None:
    """Accept a finalised segment; flush on sentence boundary or speaker change."""
    with self._lock:
        if speaker_id:
            self._seen.add(speaker_id)

        resolved = self._resolve(speaker_id)

Replace with:

python

def push_final(self, text: str, speaker_id: str | None, mqtt_client: mqtt.Client,
               seg_start: float = 0.0, seg_end: float = 0.0) -> None:
    """Accept a finalised segment; flush on sentence boundary or speaker change."""
    with self._lock:
        # Track raw diarization ID for PCM accumulator
        self._raw_speaker_id = speaker_id

        if speaker_id:
            self._seen.add(speaker_id)

        # Log segment to transcript_segments.jsonl for later enrolment
        log_transcript_segment(
            speaker_id = speaker_id or "UNKNOWN",
            text       = text,
            start_sec  = seg_start,
            end_sec    = seg_end,
            session_id = self._session_id,
        )

        resolved = self._resolve(speaker_id)

Change 4 — _receive_results() — pass timestamps to push_final

Find:

python

async def _receive_results():
    async for response in results_generator:
        # response is a FrontData dataclass, not a dict
        text     = (getattr(response, "text", None) or getattr(response, "buffer_transcription", None) or "").strip()
        is_final = getattr(response, "is_final", False) or getattr(response, "end_of_segment", False)
        speaker  = getattr(response, "speaker", None)
        if is_final and text:
            print(f"[Whisper] ({speaker or '?'}) {text}")
            state.push_final(text, speaker, mqtt_client)

Replace with:

python

async def _receive_results():
    async for response in results_generator:
        text     = (getattr(response, "text", None) or
                    getattr(response, "buffer_transcription", None) or "").strip()
        is_final = getattr(response, "is_final", False) or \
                   getattr(response, "end_of_segment", False)
        speaker  = getattr(response, "speaker", None)
        # Grab timestamps if FrontData provides them
        seg_start = float(getattr(response, "start", 0.0) or 0.0)
        seg_end   = float(getattr(response, "end",   0.0) or 0.0)
        if is_final and text:
            print(f"[Whisper] ({speaker or '?'}) {text}")
            state.push_final(text, speaker, mqtt_client, seg_start, seg_end)

Change 5 — _send_audio() — PCM accumulation for live matching

Find:

python

        while True:
            # Drain test audio injection first if available
            try:
                chunk = test_audio_queue.get_nowait()
            except asyncio.QueueEmpty:
                chunk = await audio_queue.get()
            await audio_processor.process_audio(chunk)

Replace with:

python

        while True:
            # Drain test audio injection first if available
            try:
                chunk = test_audio_queue.get_nowait()
            except asyncio.QueueEmpty:
                chunk = await audio_queue.get()
            await audio_processor.process_audio(chunk)

            # Accumulate PCM for live speaker matching
            current_spk = state._raw_speaker_id
            if current_spk and current_spk not in state._confirmed_ids:
                if current_spk not in state._accumulators:
                    state._accumulators[current_spk] = \
                        state._embedding_registry.make_accumulator(min_seconds=5.0)
                state._accumulators[current_spk].push(chunk)
                if state._accumulators[current_spk].ready():
                    try:
                        live_emb = state._accumulators[current_spk].extract_embedding()
                        match    = state._embedding_registry.find_match(live_emb)
                        if match:
                            matched_id, score = match
                            resolved = state.speaker_names.get(matched_id, matched_id)
                            print(
                                f"[Embeddings] Auto-matched {current_spk} → "
                                f"{resolved} (score={score:.3f})"
                            )
                            with state._lock:
                                state.speaker_names[current_spk] = resolved
                                state._confirmed_ids.add(current_spk)
                                _write_speakers(state.speaker_names)
                        else:
                            # No match yet — reset and try again with more audio
                            state._accumulators[current_spk].reset()
                    except Exception as exc:
                        print(f"[Embeddings] Accumulator error: {exc}")
                        state._accumulators[current_spk].reset()

That's all five changes. The dtype in audio_callback is currently float32 but the accumulator expects int16 — note that Change 5 feeds whatever chunk is into the accumulator. The LiveEmbeddingAccumulator.push() and extract_embedding() write a WAV file assuming int16, so you'll want to make sure the chunk is int16 before pushing. Add this line right before state._accumulators[current_spk].push(chunk):

python

                # Convert float32 → int16 for the embedding accumulator
                chunk_i16 = (np.frombuffer(chunk, dtype=np.float32) * 32767).astype(np.int16).tobytes()
                state._accumulators[current_spk].push(chunk_i16)

And change the .push(chunk) call to .push(chunk_i16).

Once those are in, drop embeddings.py into the bridge/ folder alongside bridge.py and it will be importable. The transcript log will start filling up from the first run, ready for auto-enrol once you have a recording to pull segments from.

What needs to be added:

  • A Voiceprint column to the speaker table with status badge
  • A voiceprint modal per speaker with three tabs: auto-enrol from transcript, manual segment picker, and similarity test
  • New API endpoints for all embedding operations
  • A Voiceprints summary card showing all enrolled speakers