diff --git a/stt_stream_file/README.md b/stt_stream_file/README.md index 5dec803..8236ddd 100644 --- a/stt_stream_file/README.md +++ b/stt_stream_file/README.md @@ -38,12 +38,27 @@ uv run stream_audio_file.py --ui --live \ ### Save & Print Mode -**Stream to file (realtime mode - natural pace):** +**Stream and save JSON output:** ```bash -uv run stream_audio_file.py -o output.json -f audio.wav --realtime \ +uv run stream_audio_file.py -f audio.wav \ --url "wss://api.deepgram.com/v1/listen?model=nova-3&interim_results=true" ``` +Output is automatically saved to `audio.json` (derived from input filename). + +**Specify a custom output file:** +```bash +uv run stream_audio_file.py -o output.json -f audio.wav \ + --url "wss://api.deepgram.com/v1/listen?model=nova-3&interim_results=true" +``` + +**Live recording saves with timestamp:** +```bash +uv run stream_audio_file.py --live \ + --url "wss://api.deepgram.com/v1/listen?model=nova-3&interim_results=true" +# Saves to recording_20250114_153022.json (or similar) +``` + **Print basic transcript:** ```bash uv run print_transcript.py -f output.json @@ -56,7 +71,7 @@ uv run print_transcript.py -f output.json **Print with all the details:** ```bash uv run print_transcript.py -f output.json \ - --print-speakers --print-channels --print-interim --print-delay --colorize + --print-speakers --print-channels --print-interim --print-latency --colorize ``` ``` [18:30:24.066 (0.665s since EOS)] [00:00:00.00 - 00:00:03.48] [Speaker 0] [Channel 0] [IsFinal]: The missile knows where it is at all times. @@ -74,24 +89,51 @@ It knows this because it knows where it isn't. ## Key Options ### stream_audio_file.py -- `--ui` - Interactive terminal UI with live updates -- `-f, --audio` - Audio file to stream -- `-l, --live` - Stream from microphone -- `-o, --output` - Save JSON messages to file -- `-v, -vv, -vvv` - Increase verbosity + +| Option | Description | +|--------|-------------| +| `--url, -u` | Deepgram websocket URL (required) | +| `--ui` | Interactive terminal UI with live updates | +| `-f, --audio` | Audio file to stream | +| `-l, --live` | Stream from microphone | +| `-o, --output` | Save JSON messages to file (defaults to input filename or timestamped name) | +| `-v, -vv, -vvv` | Increase verbosity | ### print_transcript.py -- `--print-speakers` - Show speaker labels -- `--print-channels` - Show audio channels -- `--print-interim` - Include interim results -- `--print-delay` - Show latency (time since end of speech) -- `--colorize` - Color words by confidence -- `--only-transcript` - Just the text, no metadata + +| Option | Description | +|--------|-------------| +| `--print-speakers` | Show speaker labels | +| `--print-channels` | Show audio channels | +| `--print-interim` | Include interim results | +| `--print-received` | Show received timestamp for streamed messages | +| `--print-latency` | Show latency metrics (TTFT, update frequency, message latency, EOT latency) | +| `--print-entities` | Show detected entities | +| `--colorize` | Color words by confidence | +| `--only-transcript` | Just the text, no metadata | Run either script with `--help` for full options. -## What's Happening? +### Shell Completion + +Generate shell completions for your preferred shell: + +```bash +uv run stream_audio_file.py completion bash # or zsh, fish +``` + +## Metrics Calculated + +When using `--print-latency`, the following metrics are computed: -The UI mode shows transcription speed in real-time - watch words appear as you speak and see exactly how fast Deepgram processes your audio. The `--print-delay` option reveals latency metrics, perfect for testing different models and configurations. +**Session-level:** +- **TTFT (Time To First Transcript)**: Wall-clock time from when audio streaming begins to when the first transcript message is received. Measures initial responsiveness. +- **Update Frequency**: Number of interim transcript updates per second of audio. Higher values mean a more fluid, responsive transcription experience. +**Per-message:** +- **Message Latency**: How far behind the transcription is from the audio being sent, calculated as `audio_cursor - transcript_cursor`. Measured on interim results only, per Deepgram's methodology. +- **EOT Latency (End-of-Turn Latency)**: Time between the last interim result and the finalizing event (e.g., `speech_final`, `UtteranceEnd`, `EndOfTurn`). Critical for voice agents—they can't respond until they know the user stopped speaking. + +## What's Happening? +The UI mode shows transcription speed in real-time—watch words appear as you speak and see exactly how fast Deepgram processes your audio. The `--print-latency` option reveals latency metrics, perfect for testing different models and configurations. \ No newline at end of file diff --git a/stt_stream_file/print_transcript.py b/stt_stream_file/print_transcript.py index 14121d7..72d8696 100644 --- a/stt_stream_file/print_transcript.py +++ b/stt_stream_file/print_transcript.py @@ -23,8 +23,8 @@ class DisplayConfig(BaseModel): print_speakers: bool = False print_interim: bool = False print_received: bool = False - print_delay: bool = False print_entities: bool = False + print_latency: bool = False colorize: bool = False only_transcript: bool = False @@ -63,6 +63,47 @@ def confidence_color(confidence): return red, green, blue +def colorize_latency( + latency: float, + min_latency: float, + max_latency: float, + use_rich_markup: bool = False, +) -> str: + """ + Colorize a latency value string based on min/max range. + + Args: + latency: The latency value in seconds + min_latency: Minimum latency (maps to white/good) + max_latency: Maximum latency (maps to red/bad) + use_rich_markup: If True, use Rich markup format. If False, use ANSI codes. + """ + text = f"{latency:.3f}s" + + # Convert a latency value to an RGB color where: + # min_latency maps to white (255, 255, 255) - good + # max_latency maps to red (255, 0, 0) - bad + if max_latency <= min_latency: + # Avoid division by zero; treat as best case + normalized = 1.0 + else: + # Normalize: 0 = max (bad), 1 = min (good) + normalized = 1.0 - (latency - min_latency) / (max_latency - min_latency) + normalized = max(0.0, min(1.0, normalized)) # Clamp to [0, 1] + + # Use same color mapping as confidence + normalized = normalized**2 + red = 255 + green = int(255 * normalized) + blue = int(255 * normalized) + + if use_rich_markup: + return f"[rgb({red},{green},{blue})]{text}[/]" + else: + ansi_color = rgb_to_ansi(red, green, blue) + return f"{ansi_color}{text}{Style.RESET_ALL}" + + def rgb_to_ansi(r, g, b): return f"\033[38;2;{r};{g};{b}m" @@ -329,7 +370,8 @@ class TranscriptLine: speaker_label: str = "" channel: Optional[int | tuple[int, ...]] = None received: Optional[datetime.datetime] = None - delay: Optional[float] = None + latency: Optional[float] = None + eot_latency: Optional[float] = None def format(self) -> str: """Build the final formatted line""" @@ -337,12 +379,17 @@ def format(self) -> str: # Add received timestamp if configured if self.config.print_received and self.received is not None: - if self.config.print_delay: - received_str = f"[{self.received.strftime('%H:%M:%S.%f')} ({self.delay:.4f}s since EOS)]" - else: - received_str = f"[{self.received.strftime('%H:%M:%S.%f')}]" + received_str = f"[{self.received.strftime('%H:%M:%S.%f')}]" parts.append(received_str) + # Add interim latency if configured and available + if self.config.print_latency and self.latency is not None: + parts.append(f"[latency={self.latency:.3f}s]") + + # Add EOT latency if configured and available + if self.config.print_latency and self.eot_latency is not None: + parts.append(f"[eot_latency={self.eot_latency:.3f}s]") + # Add duration duration = f"[{format_time(self.start)} - {format_time(self.end)}]" parts.append(duration) @@ -374,7 +421,8 @@ def format(self) -> str: def str_from_word_array( streaming: bool, received: datetime.datetime | None, - delay: float | None, + latency: float | None, + eot_latency: float | None, word_array: list[WordType], channel: int | tuple[int, ...], start: float, @@ -457,7 +505,8 @@ def str_from_word_array( channel=channel, events=events, received=received, - delay=delay, + latency=latency, + eot_latency=eot_latency, config=config, ) @@ -465,92 +514,504 @@ def str_from_word_array( # ============================================================================ -# Delay Tracking +# EOT Latency Tracking (End-of-Turn Latency) # ============================================================================ -class DelayAggregator: - def __init__(self, start: datetime.datetime) -> None: - self.start = start - self.last_message = start - self.last_is_final = start - self.last_speech_final = start - self.last_utterance_end = start - self.last_word_end = datetime.timedelta(seconds=0) +class EOTLatencyAggregator: + """ + Calculates End-of-Turn (EOT) latency for streaming transcription. - @staticmethod - def calculate_start_from_messages(messages: list[dict]) -> datetime.datetime | None: - # Look for an OpenStream message, if it exists. - for message in messages: - if message.get("type") == "OpenStream": - return datetime.datetime.fromisoformat(message["received"]) + EOT latency measures the time between receiving the last interim result + and receiving a finalizing event. This represents the additional delay, + on top of interim result latency, required to identify that the user has + finished speaking. This is critical for voice agents because they cannot + respond until they know the user is done. - # If there is no OpenStream message, we can try to estimate it. - approx_stream_open_time = None - for message in messages: - if "received" not in message: - continue - received = datetime.datetime.fromisoformat(message["received"]) - # Since this is a streaming request, we should assume the stream was opened when the audio started. - # This may not be accurate because the user can send the audio faster or slower than real-time, - # but it should work for our needs. - if message.get("type") == "Results": - approx_stream_open_time = received - ( - datetime.timedelta(seconds=message["start"]) - + datetime.timedelta(seconds=message["duration"]) - ) - elif message.get("type") == "TurnInfo": - # For Flux, use audio_window_end - approx_stream_open_time = received - datetime.timedelta( - seconds=message.get("audio_window_end", 0) - ) - # Break after the first message with a received time - if approx_stream_open_time: - break - return approx_stream_open_time - - def add_message(self, message: dict, received: datetime.datetime): - self.last_message = received - if message.get("type") == "UtteranceEnd": - self.last_utterance_end = received - elif message.get("type") == "Results": + Supported EOT events: + - Nova: is_final=true, speech_final=true, UtteranceEnd + - Flux: EndOfTurn, EagerEndOfTurn + """ + + def __init__(self) -> None: + # Track the last interim result time per channel + self.last_interim_time: dict[int, datetime.datetime] = {} + # Store measurements as (event_type, latency_seconds) + self.measurements: list[tuple[str, float]] = [] + + def record_interim(self, message: dict, received: datetime.datetime) -> None: + """Record the time of an interim result (Nova or Flux).""" + # Nova: Results with is_final=false + if message.get("type") == "Results": if message.get("is_final", False): - self.last_is_final = received + return # Not an interim result + channel = message.get("channel_index", [0])[0] + self.last_interim_time[channel] = received + + # Flux: TurnInfo with Update or StartOfTurn event + elif message.get("type") == "TurnInfo": + event = message.get("event") + if event in ("Update", "StartOfTurn"): + # Flux doesn't have channels, use 0 + self.last_interim_time[0] = received + + def calculate_eot_latency( + self, message: dict, received: datetime.datetime + ) -> tuple[str, float] | None: + """ + Calculate EOT latency for end-of-turn events. + + Returns (event_type, latency_seconds) or None if not an EOT event + or if there was no preceding interim result. + """ + event_type: str | None = None + channel: int = 0 + + # Nova: Check for EOT events in Results messages + if message.get("type") == "Results": + channel = message.get("channel_index", [0])[0] if message.get("speech_final", False): - self.last_speech_final = received - for word in message["channel"]["alternatives"][0]["words"]: - if "end" in word and word["end"] > 0: - self.last_word_end = datetime.timedelta(seconds=word["end"]) + event_type = "speech_final" + elif message.get("is_final", False): + event_type = "is_final" + + # Nova: UtteranceEnd message + elif message.get("type") == "UtteranceEnd": + channel = message.get("channel", [0])[0] + event_type = "UtteranceEnd" + + # Flux: TurnInfo with EndOfTurn or EagerEndOfTurn event + elif message.get("type") == "TurnInfo": + event = message.get("event") + if event == "EndOfTurn": + event_type = "EndOfTurn" + channel = 0 # Flux doesn't have channels + elif event == "EagerEndOfTurn": + event_type = "EagerEndOfTurn" + channel = 0 + + if event_type is None: + return None + + # Get the last interim time for this channel + last_interim = self.last_interim_time.get(channel) + if last_interim is None: + # No preceding interim result recorded + return None + + # Calculate latency + eot_latency = (received - last_interim).total_seconds() + + # Only record positive latencies (negative would indicate timing issues) + if eot_latency >= 0: + self.measurements.append((event_type, eot_latency)) + + # Clear the last interim time for this channel after a definitive EOT event + # - Nova: speech_final and UtteranceEnd are definitive; is_final may have more + # - Flux: EndOfTurn is definitive; EagerEndOfTurn may be followed by TurnResumed + if event_type in ("speech_final", "UtteranceEnd", "EndOfTurn"): + self.last_interim_time.pop(channel, None) + + return (event_type, eot_latency) + + def get_stats(self) -> dict: + """Return EOT latency statistics.""" + if not self.measurements: + return {"count": 0} + + latencies = [m[1] for m in self.measurements] + sorted_latencies = sorted(latencies) + + def percentile(p: float) -> float: + idx = int(len(sorted_latencies) * p) + return sorted_latencies[min(idx, len(sorted_latencies) - 1)] + + return { + "min": min(latencies), + "p50": percentile(0.50), + "p95": percentile(0.95), + "p99": percentile(0.99), + "max": max(latencies), + "count": len(self.measurements), + "by_event": self._stats_by_event(), + } + + def _stats_by_event(self) -> dict[str, dict]: + """Get stats broken down by event type.""" + from collections import defaultdict + + by_event: dict[str, list[float]] = defaultdict(list) + for event_type, latency in self.measurements: + by_event[event_type].append(latency) + + def percentile(sorted_vals: list[float], p: float) -> float: + idx = int(len(sorted_vals) * p) + return sorted_vals[min(idx, len(sorted_vals) - 1)] + + result = {} + for event_type, latencies in by_event.items(): + sorted_latencies = sorted(latencies) + result[event_type] = { + "min": min(latencies), + "p50": percentile(sorted_latencies, 0.50), + "p95": percentile(sorted_latencies, 0.95), + "p99": percentile(sorted_latencies, 0.99), + "max": max(latencies), + "count": len(latencies), + } + return result + + def format_summary(self) -> str: + """Return formatted summary string.""" + stats = self.get_stats() + if stats["count"] == 0: + return "No EOT latency measurements (requires interim_results=true and EOT events)" + + lines = [ + f"EOT Latency: min={stats['min']:.3f}s, p50={stats['p50']:.3f}s, " + f"p95={stats['p95']:.3f}s, p99={stats['p99']:.3f}s, " + f"max={stats['max']:.3f}s ({stats['count']} events)" + ] + + for event_type, event_stats in stats["by_event"].items(): + lines.append( + f" {event_type}: min={event_stats['min']:.3f}s, p50={event_stats['p50']:.3f}s, " + f"p95={event_stats['p95']:.3f}s, p99={event_stats['p99']:.3f}s, " + f"max={event_stats['max']:.3f}s ({event_stats['count']} events)" + ) + + return "\n".join(lines) + + def colorize_lines( + self, lines: list[str], use_rich_markup: bool = False + ) -> list[str]: + """ + Post-process transcript lines to colorize EOT latency values. + + Replaces [eot_latency=X.XXXs] with colorized version based on min/max. + """ + import re + + stats = self.get_stats() + if stats["count"] == 0: + return lines + + min_lat = stats["min"] + max_lat = stats["max"] + + # If the worst latency is less than 400ms, that's still really good. + # We will manually increase the worst case latency to 400ms in these cases. + if max_lat <= 0.4: + max_lat = 0.4 + + # Pattern to match [eot_latency=0.123s] + pattern = re.compile(r"\[eot_latency=(\d+\.\d+)s\]") + + colorized_lines = [] + for line in lines: + + def replace_latency(match): + latency_val = float(match.group(1)) + colored = colorize_latency( + latency_val, min_lat, max_lat, use_rich_markup + ) + return f"[eot_latency={colored}]" + + colorized_lines.append(pattern.sub(replace_latency, line)) + + return colorized_lines + + +# ============================================================================ +# Interim Latency Tracking (Message Latency per Deepgram methodology) +# ============================================================================ + + +class LatencyAggregator: + """ + Calculates message latency following Deepgram's methodology. + + Latency = audio_cursor - transcript_cursor + + Where: + audio_cursor = seconds of audio sent to Deepgram (from message) + transcript_cursor = start + duration from the response + + Per Deepgram docs, latency is measured using INTERIM results because + they arrive faster and reflect actual real-time latency. + + Supported interim message types: + - Nova: Results with is_final=false + - Flux: TurnInfo with event=Update or event=TurnResumed + + Reference: https://developers.deepgram.com/docs/measuring-streaming-latency + """ + + def __init__(self) -> None: + self.measurements: list[float] = [] + + def calculate_latency(self, message: dict) -> float | None: + """ + Calculate latency for an interim result message (Nova or Flux). + + Returns latency in seconds, or None if not applicable. + """ + audio_cursor: float | None = None + transcript_cursor: float | None = None + + # Nova: Results messages + if message.get("type") == "Results": + # Only measure interim results + if message.get("is_final", True): + return None + + audio_cursor = message.get("audio_cursor") + start = message.get("start", 0) + duration = message.get("duration", 0) + + if duration <= 0: + return None + + transcript_cursor = start + duration + + # Flux: TurnInfo messages with Update or TurnResumed events elif message.get("type") == "TurnInfo": - # For Flux, handle EndOfTurn events - if message.get("event") == "EndOfTurn": - self.last_is_final = received - self.last_speech_final = received - # Update last word end time from audio window - if message.get("audio_window_end", 0) > 0: - self.last_word_end = datetime.timedelta( - seconds=message["audio_window_end"] + event = message.get("event") + if event not in ("Update", "TurnResumed"): + return None + + audio_cursor = message.get("audio_cursor") + audio_window_end = message.get("audio_window_end", 0) + + if audio_window_end <= 0: + return None + + transcript_cursor = audio_window_end + + else: + return None + + # Need audio_cursor to calculate latency + if audio_cursor is None: + return None + + if transcript_cursor is None: + return None + + cur_latency = audio_cursor - transcript_cursor + + # Record measurement + self.measurements.append(cur_latency) + + return cur_latency + + def get_p50(self) -> float | None: + """Calculate p50 latency.""" + if not self.measurements: + return None + sorted_measurements = sorted(self.measurements) + idx = int(len(sorted_measurements) * 0.50) + idx = min(idx, len(sorted_measurements) - 1) + return sorted_measurements[idx] + + def get_p95(self) -> float | None: + """Calculate p95 latency.""" + if not self.measurements: + return None + sorted_measurements = sorted(self.measurements) + idx = int(len(sorted_measurements) * 0.95) + idx = min(idx, len(sorted_measurements) - 1) + return sorted_measurements[idx] + + def get_p99(self) -> float | None: + """Calculate p99 latency.""" + if not self.measurements: + return None + sorted_measurements = sorted(self.measurements) + idx = int(len(sorted_measurements) * 0.99) + idx = min(idx, len(sorted_measurements) - 1) + return sorted_measurements[idx] + + def get_stats(self) -> dict: + """Return latency statistics.""" + if not self.measurements: + return { + "min": None, + "p50": None, + "p95": None, + "p99": None, + "max": None, + "count": 0, + } + + p50 = self.get_p50() + p95 = self.get_p95() + p99 = self.get_p99() + + return { + "min": min(self.measurements) if self.measurements else None, + "p50": p50, + "p95": p95, + "p99": p99, + "max": max(self.measurements) if self.measurements else None, + "count": len(self.measurements), + } + + def format_summary(self) -> str: + """Return formatted summary string.""" + stats = self.get_stats() + if stats["count"] == 0: + return "No latency measurements (requires interim_results=true and audio_cursor in messages)" + return ( + f"Message Latency: min={round(stats['min'], 3):.3f}s, p50={round(stats['p50'], 3):.3f}s, p95={round(stats['p95'], 3):.3f}s, p99={round(stats['p99'], 3):.3f}s, " + f"max={round(stats['max'], 3):.3f}s ({stats['count']} measurements)" + ) + + def colorize_lines( + self, lines: list[str], use_rich_markup: bool = False + ) -> list[str]: + """ + Post-process transcript lines to colorize latency values. + + Replaces [latency=X.XXXs] with colorized version based on min/max. + """ + import re + + stats = self.get_stats() + min_latency = stats["min"] + max_latency = stats["max"] + + if not self.measurements or min_latency is None or max_latency is None: + return lines + + min_lat = min_latency + max_lat = max_latency + + # If the worst latency is less than 400ms, that's still really good. + # We will manually increase the worst case latency to 400ms in these cases. + if max_lat <= 0.4: + max_lat = 0.4 + + # Pattern to match [latency=0.123s] + pattern = re.compile(r"\[latency=(\d+\.\d+)s\]") + + colorized_lines = [] + for line in lines: + + def replace_latency(match): + latency_val = float(match.group(1)) + colored = colorize_latency( + latency_val, min_lat, max_lat, use_rich_markup ) + return f"[latency={colored}]" + + colorized_lines.append(pattern.sub(replace_latency, line)) + + return colorized_lines + + +# ============================================================================ +# Response Metrics (TTFT and Update Frequency) +# ============================================================================ + + +class ResponseMetricsAggregator: + """ + Tracks Time-to-First-Transcript (TTFT) and Update Frequency metrics. + + TTFT: Wall-clock time from first audio sent (OpenStream) to first + transcript message received (Results or TurnInfo). This includes empty + transcripts, as they still indicate Deepgram has processed audio. + + Update Frequency: Number of interim/update messages per second of audio. + This captures how "responsive" the transcription feels to users, as more + frequent updates create a more fluid real-time experience. + """ + + def __init__(self) -> None: + # TTFT tracking + self.first_audio_sent_time: datetime.datetime | None = None + self.first_transcript_received_time: datetime.datetime | None = None + + # Update frequency tracking + self.interim_message_count: int = 0 + self.total_audio_duration: float = 0.0 + + def record_stream_start(self, received: datetime.datetime) -> None: + """Record when audio streaming begins (OpenStream message).""" + if self.first_audio_sent_time is None: + self.first_audio_sent_time = received + + def record_transcript_message( + self, message: dict, received: datetime.datetime + ) -> None: + """ + Record a transcript message for TTFT and update frequency tracking. - def get_message_delay(self, received: datetime.datetime) -> datetime.timedelta: - return received - self.last_message + Args: + message: The Results or TurnInfo message + received: When the message was received + """ + msg_type = message.get("type") - def get_is_final_delay(self, received: datetime.datetime) -> datetime.timedelta: - return received - self.last_is_final + # Track first transcript for TTFT (including empty transcripts) + if self.first_transcript_received_time is None: + if msg_type in ("Results", "TurnInfo"): + self.first_transcript_received_time = received + + # Track interim messages for update frequency + if msg_type == "Results": + # Nova: Count interim results (is_final=false) + if not message.get("is_final", True): + self.interim_message_count += 1 + # Track audio duration from the last message + audio_cursor = message.get("audio_cursor") + if audio_cursor is not None and audio_cursor > self.total_audio_duration: + self.total_audio_duration = audio_cursor - def get_speech_final_delay(self, received: datetime.datetime) -> datetime.timedelta: - return received - self.last_speech_final + elif msg_type == "TurnInfo": + # Flux: Count Update and TurnResumed events as interim messages + event = message.get("event") + if event in ("Update", "TurnResumed"): + self.interim_message_count += 1 + # Track audio duration + audio_cursor = message.get("audio_cursor") + if audio_cursor is not None and audio_cursor > self.total_audio_duration: + self.total_audio_duration = audio_cursor + + def get_ttft(self) -> float | None: + """ + Calculate Time-to-First-Transcript in seconds. - def get_finalized_delay(self, received: datetime.datetime) -> datetime.timedelta: - last = max(self.last_speech_final, self.last_utterance_end) - return (received - last) - self.last_word_end + Returns None if either timestamp is missing. + """ + if ( + self.first_audio_sent_time is None + or self.first_transcript_received_time is None + ): + return None + return ( + self.first_transcript_received_time - self.first_audio_sent_time + ).total_seconds() - def get_eos_delay(self, received: datetime.datetime) -> datetime.timedelta: + def get_update_frequency(self) -> float | None: """ - Returns the delay between the received time and when the last word was spoken. - This is a time-since-end-of-speech calculation. + Calculate update frequency (interim messages per second of audio). + + Returns None if no audio duration is recorded. """ - return (received - self.start) - self.last_word_end + if self.total_audio_duration <= 0: + return None + return self.interim_message_count / self.total_audio_duration + + def get_stats(self) -> dict: + """Return all response metrics.""" + return { + "ttft": self.get_ttft(), + "update_frequency": self.get_update_frequency(), + "interim_count": self.interim_message_count, + "audio_duration": self.total_audio_duration, + } # ============================================================================ @@ -602,7 +1063,8 @@ def formatted_batch_transcript( single_line_transcript = str_from_word_array( streaming=False, received=None, - delay=None, + latency=None, + eot_latency=None, word_array=word_array, channel=channel, start=start, @@ -624,7 +1086,6 @@ def render_open_stream( cls, message: dict, received: datetime.datetime | None, - delay: float | None, config: DisplayConfig, ) -> str: if received is not None: @@ -641,7 +1102,6 @@ def render_connected( cls, message: dict, received: datetime.datetime | None, - delay: float | None, config: DisplayConfig, ) -> str: if received is not None: @@ -656,7 +1116,6 @@ def render_metadata( cls, message: dict, received: datetime.datetime | None, - delay: float | None, config: DisplayConfig, ) -> str: if received is not None: @@ -670,7 +1129,8 @@ def render_results( cls, message: dict, received: datetime.datetime | None, - delay: float | None, + latency: float | None, + eot_latency: float | None, config: DisplayConfig, speaker_aggregator: SpeakerAggregator, use_rich_markup: bool = False, @@ -695,7 +1155,8 @@ def render_results( line = str_from_word_array( streaming=True, received=received, - delay=delay, + latency=latency, + eot_latency=eot_latency, word_array=word_array, channel=channel, start=start, @@ -713,7 +1174,7 @@ def render_utterance_end( cls, message: dict, received: datetime.datetime | None, - delay: float | None, + eot_latency: float | None, config: DisplayConfig, ) -> str: channel = message["channel"][0] @@ -727,15 +1188,14 @@ def render_utterance_end( else: channel_str = "" if config.print_received and received is not None: - if config.print_delay: - received_str = ( - f" [{received.strftime('%H:%M:%S.%f')} ({delay:.4f}s since EOS)]" - ) - else: - received_str = f"[{received.strftime('%H:%M:%S.%f')}]" + received_str = f"[{received.strftime('%H:%M:%S.%f')}]" else: received_str = "" - line = f"{received_str} {channel_str} [UtteranceEnd]: last_word_end={last_word_end}" + if config.print_latency and eot_latency is not None: + eot_str = f"[eot_latency={eot_latency:.3f}s]" + else: + eot_str = "" + line = f"{received_str} {eot_str} {channel_str} [UtteranceEnd]: last_word_end={last_word_end}" while " " in line: line = line.replace(" ", " ") while " :" in line: @@ -748,7 +1208,6 @@ def render_start_of_turn( cls, message: dict, received: datetime.datetime | None, - delay: float | None, config: DisplayConfig, ) -> str: channel = message["channel"][0] @@ -757,12 +1216,7 @@ def render_start_of_turn( else: channel_str = "" if config.print_received and received is not None: - if config.print_delay: - received_str = ( - f" [{received.strftime('%H:%M:%S.%f')} ({delay:.4f}s since EOS)]" - ) - else: - received_str = f"[{received.strftime('%H:%M:%S.%f')}]" + received_str = f"[{received.strftime('%H:%M:%S.%f')}]" else: received_str = "" timestamp = format_time(message["timestamp"]) @@ -779,7 +1233,8 @@ def render_turn_info( cls, message: dict, received: datetime.datetime | None, - delay: float | None, + latency: float | None, + eot_latency: float | None, config: DisplayConfig, speaker_aggregator: SpeakerAggregator, use_rich_markup: bool = False, @@ -796,7 +1251,8 @@ def render_turn_info( events = (event,) if event else () line = str_from_word_array( received=received, - delay=delay, + latency=latency, + eot_latency=eot_latency, streaming=True, word_array=word_array, channel=channel, @@ -817,42 +1273,62 @@ def get_transcript( config: DisplayConfig, ) -> str: speaker_aggregator = SpeakerAggregator() - stream_open_time = DelayAggregator.calculate_start_from_messages(messages) - if stream_open_time is not None: - delay_aggregator = DelayAggregator(start=stream_open_time) - else: - delay_aggregator = None - config.print_delay = False + + # Initialize latency aggregators if latency printing is enabled + latency_aggregator = LatencyAggregator() if config.print_latency else None + eot_latency_aggregator = ( + EOTLatencyAggregator() if config.print_latency else None + ) + response_metrics_aggregator = ( + ResponseMetricsAggregator() if config.print_latency else None + ) single_line_transcripts: list[str] = [] for message in messages: - # Update the DelayAggregator and `delay` - delay = None + # Parse received time received_iso = message.get("received", None) if received_iso is None: received = None else: received = datetime.datetime.fromisoformat(received_iso) - if delay_aggregator is not None: - delay = delay_aggregator.get_eos_delay(received).total_seconds() - delay_aggregator.add_message(message, received) + + # Calculate interim latency for Results and TurnInfo messages + latency = None + if latency_aggregator is not None: + if message.get("type") in ("Results", "TurnInfo"): + latency = latency_aggregator.calculate_latency(message) + + # Track interim results and calculate EOT latency + eot_latency = None + if eot_latency_aggregator is not None and received is not None: + # Record interim results (Nova and Flux) + eot_latency_aggregator.record_interim(message, received) + # Calculate EOT latency for finalizing events + eot_result = eot_latency_aggregator.calculate_eot_latency( + message, received + ) + if eot_result is not None: + eot_latency = eot_result[1] # Handle UtteranceEnd messages separately because they have a different type. + line: str | None if message.get("type") == "UtteranceEnd": line = cls.render_utterance_end( message=message, received=received, - delay=delay, + eot_latency=eot_latency, config=config, ) single_line_transcripts.append(line) # Print the OpenStream message if it exists. elif message.get("type") == "OpenStream": + # Record stream start time for TTFT calculation + if response_metrics_aggregator is not None and received is not None: + response_metrics_aggregator.record_stream_start(received) line = cls.render_open_stream( message=message, received=received, - delay=delay, config=config, ) single_line_transcripts.append(line) @@ -861,7 +1337,6 @@ def get_transcript( line = cls.render_connected( message=message, received=received, - delay=delay, config=config, ) single_line_transcripts.append(line) @@ -871,16 +1346,21 @@ def get_transcript( line = cls.render_metadata( message=message, received=received, - delay=delay, config=config, ) single_line_transcripts.append(line) elif message.get("type") == "Results": + # Record for TTFT and update frequency tracking + if response_metrics_aggregator is not None and received is not None: + response_metrics_aggregator.record_transcript_message( + message, received + ) line = cls.render_results( message=message, received=received, - delay=delay, + latency=latency, + eot_latency=eot_latency, config=config, speaker_aggregator=speaker_aggregator, ) @@ -891,16 +1371,21 @@ def get_transcript( line = cls.render_start_of_turn( message=message, received=received, - delay=delay, config=config, ) single_line_transcripts.append(line) elif message.get("type") == "TurnInfo": + # Record for TTFT and update frequency tracking + if response_metrics_aggregator is not None and received is not None: + response_metrics_aggregator.record_transcript_message( + message, received + ) line = cls.render_turn_info( message=message, received=received, - delay=delay, + latency=latency, + eot_latency=eot_latency, config=config, speaker_aggregator=speaker_aggregator, ) @@ -909,6 +1394,41 @@ def get_transcript( else: print("Skipping message of type:", message.get("type")) + # Colorize latency values if enabled + if latency_aggregator is not None and config.colorize: + single_line_transcripts = latency_aggregator.colorize_lines( + single_line_transcripts, use_rich_markup=False + ) + + # Colorize EOT latency values if enabled + if eot_latency_aggregator is not None and config.colorize: + single_line_transcripts = eot_latency_aggregator.colorize_lines( + single_line_transcripts, use_rich_markup=False + ) + + # Append response metrics summary (TTFT and Update Frequency) if enabled + if response_metrics_aggregator is not None: + stats = response_metrics_aggregator.get_stats() + single_line_transcripts.append("") + if stats["ttft"] is not None: + single_line_transcripts.append( + f"Time-to-First-Transcript: {stats['ttft']:.3f}s" + ) + if stats["update_frequency"] is not None: + single_line_transcripts.append( + f"Update Frequency: {stats['update_frequency']:.2f} updates/sec " + f"({stats['interim_count']} updates over {stats['audio_duration']:.1f}s of audio)" + ) + + # Append latency summaries if enabled + if latency_aggregator is not None: + single_line_transcripts.append("") + single_line_transcripts.append(latency_aggregator.format_summary()) + + if eot_latency_aggregator is not None: + single_line_transcripts.append("") + single_line_transcripts.append(eot_latency_aggregator.format_summary()) + transcript = "\n".join(single_line_transcripts) return transcript @@ -919,76 +1439,101 @@ class StreamingFormatter: def __init__( self, config: DisplayConfig, - stream_open_time: datetime.datetime | None = None, use_rich_markup: bool = False, ): self.config = config self.speaker_aggregator = SpeakerAggregator() self.use_rich_markup = use_rich_markup - if stream_open_time: - self.delay_aggregator = DelayAggregator(start=stream_open_time) - self.config.print_delay = True - else: - self.delay_aggregator = None - self.config.print_delay = False + # Initialize latency aggregators if latency printing is enabled + self.latency_aggregator = LatencyAggregator() if config.print_latency else None + self.eot_latency_aggregator = ( + EOTLatencyAggregator() if config.print_latency else None + ) + self.response_metrics_aggregator = ( + ResponseMetricsAggregator() if config.print_latency else None + ) def format_message(self, message: dict) -> str | None: """Format a single message and return the formatted string""" - # Update delay tracking + # Parse received time received = None - delay = None received_iso = message.get("received", None) if received_iso is not None: received = datetime.datetime.fromisoformat(received_iso) - if self.delay_aggregator is not None: - delay = self.delay_aggregator.get_eos_delay(received).total_seconds() - self.delay_aggregator.add_message(message, received) + + # Calculate interim latency for Results and TurnInfo messages + latency = None + if self.latency_aggregator is not None: + if message.get("type") in ("Results", "TurnInfo"): + latency = self.latency_aggregator.calculate_latency(message) + + # Track interim results and calculate EOT latency + eot_latency = None + if self.eot_latency_aggregator is not None and received is not None: + # Record interim results (Nova and Flux) + self.eot_latency_aggregator.record_interim(message, received) + # Calculate EOT latency for finalizing events + eot_result = self.eot_latency_aggregator.calculate_eot_latency( + message, received + ) + if eot_result is not None: + eot_latency = eot_result[1] # Handle different message types msg_type = message.get("type") if msg_type == "OpenStream": - # Update delay aggregator start time if not set - if self.delay_aggregator is None and received: - self.delay_aggregator = DelayAggregator(start=received) + # Record stream start time for TTFT calculation + if self.response_metrics_aggregator is not None and received is not None: + self.response_metrics_aggregator.record_stream_start(received) return StreamingTranscriptPrinter.render_open_stream( - message, received, delay, self.config + message, received, self.config ) elif msg_type == "Connected": - if self.delay_aggregator is None and received: - self.delay_aggregator = DelayAggregator(start=received) return StreamingTranscriptPrinter.render_connected( - message, received, delay, self.config + message, received, self.config ) elif msg_type == "UtteranceEnd": return StreamingTranscriptPrinter.render_utterance_end( - message, received, delay, self.config + message, received, eot_latency, self.config ) elif msg_type == "StartOfTurn": return StreamingTranscriptPrinter.render_start_of_turn( - message, received, delay, self.config + message, received, self.config ) elif msg_type == "Results": + # Record for TTFT and update frequency tracking + if self.response_metrics_aggregator is not None and received is not None: + self.response_metrics_aggregator.record_transcript_message( + message, received + ) return StreamingTranscriptPrinter.render_results( message, received, - delay, + latency, + eot_latency, self.config, self.speaker_aggregator, self.use_rich_markup, ) elif msg_type == "TurnInfo": + # Record for TTFT and update frequency tracking + if self.response_metrics_aggregator is not None and received is not None: + self.response_metrics_aggregator.record_transcript_message( + message, received + ) return StreamingTranscriptPrinter.render_turn_info( message, received, - delay, + latency, + eot_latency, self.config, self.speaker_aggregator, self.use_rich_markup, @@ -996,7 +1541,7 @@ def format_message(self, message: dict) -> str | None: elif msg_type == "Metadata": return StreamingTranscriptPrinter.render_metadata( - message, received, delay, self.config + message, received, self.config ) return None @@ -1081,7 +1626,9 @@ def remove_text_in_brackets(text: str, remove_newlines: bool = False) -> str: help="display the received time of a streamed message, if available", ) @click.option( - "--print-delay/--skip-delay", default=False, help="display the EOS latency" + "--print-latency/--skip-latency", + default=False, + help="display latency metrics: TTFT, update frequency, message latency, and EOT latency", ) @click.option( "--print-entities/--skip-entities", default=False, help="display entities" @@ -1098,12 +1645,13 @@ def main( print_speakers: bool, print_interim: bool, print_received: bool, - print_delay: bool, + print_latency: bool, print_entities: bool, colorize: bool, ): - if print_delay: - print_received = True + # Latency requires interim results to be printed + if print_latency: + print_interim = True # Create config object config = DisplayConfig( @@ -1111,7 +1659,7 @@ def main( print_speakers=print_speakers, print_interim=print_interim, print_received=print_received, - print_delay=print_delay, + print_latency=print_latency, print_entities=print_entities, colorize=colorize, only_transcript=only_transcript, diff --git a/stt_stream_file/stream_audio_file.py b/stt_stream_file/stream_audio_file.py index e5ec72b..aee37fd 100644 --- a/stt_stream_file/stream_audio_file.py +++ b/stt_stream_file/stream_audio_file.py @@ -1,4 +1,3 @@ -#!/home/jjmaldonis/miniconda3/envs/deepgram/bin/python from typing import AsyncGenerator, BinaryIO, Callable import os import io @@ -653,6 +652,7 @@ async def data_stream() -> AsyncGenerator[bytes, None]: yield data[i : i + chunk_size] all_messages = [] + amount_of_audio_sent = 0.0 try: async with websockets.connect( url, @@ -682,8 +682,8 @@ async def sender(ws: websockets.WebSocketClientProtocol): nonlocal all_messages nonlocal ws_open_time nonlocal nmessages_to_send + nonlocal amount_of_audio_sent - amount_of_audio_sent = 0.0 if verbose: # We'll use much more verbose logging than tqdm tqdm_output = open(os.devnull, "w") @@ -733,6 +733,7 @@ async def sender(ws: websockets.WebSocketClientProtocol): async def receiver(ws): nonlocal all_messages nonlocal aiworks + nonlocal amount_of_audio_sent async for msg in ws: res = json.loads(msg) @@ -746,6 +747,7 @@ async def receiver(ws): res["received"] = datetime.datetime.now( tz=datetime.timezone.utc ).isoformat() + res["audio_cursor"] = amount_of_audio_sent all_messages.append(res) if message_callback: diff --git a/stt_stream_file/textual_ui.py b/stt_stream_file/textual_ui.py index 053bad4..57d30d4 100644 --- a/stt_stream_file/textual_ui.py +++ b/stt_stream_file/textual_ui.py @@ -180,7 +180,7 @@ async def launch_ui( print_speakers: bool = True, print_interim: bool = True, print_received: bool = True, - print_delay: bool = True, + print_latency: bool = True, print_entities: bool = False, colorize: bool = True, ): @@ -191,7 +191,7 @@ async def launch_ui( print_speakers=print_speakers, print_interim=print_interim, print_received=print_received, - print_delay=print_delay, + print_latency=print_latency, print_entities=print_entities, colorize=colorize, only_transcript=False,