Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/llm/src/protocols/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use super::TokenIdType;
pub mod llm_backend;
pub mod postprocessor;
pub mod preprocessor;
pub mod timing;

/// SamplingOptionsProvider is a trait that allows the caller to extract the sampling options from
/// the object that implements it. This will mutate the object.
Expand Down
109 changes: 109 additions & 0 deletions lib/llm/src/protocols/common/timing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

//! Per-request timing tracker for capturing request lifecycle metrics.
//!
//! This module provides [`RequestTimingTracker`] for tracking timing information
//! that can be returned to clients via the `nvext` response field.

use serde::{Deserialize, Serialize};
use std::sync::OnceLock;
use std::time::{Instant, SystemTime, UNIX_EPOCH};

/// Per-request timing tracker.
///
/// Captures timing information throughout the request lifecycle:
/// - `request_received`: When the request was received
/// - `first_token_time`: When the first token was generated (set once via OnceLock)
/// - `request_finish_time`: When the request finished (set once via OnceLock)
///
/// The `OnceLock` fields ensure that timing values are set exactly once,
/// which is important for disaggregated serving where the "first token"
/// might appear multiple times.
pub struct RequestTimingTracker {
/// When the request was received (monotonic clock for duration calculations)
request_received: Instant,

/// When the request was received (wall clock time as epoch milliseconds)
request_received_epoch_ms: u64,

/// When the first token was generated - set once via OnceLock
first_token_time: OnceLock<Instant>,

/// When the request finished - set once via OnceLock
request_finish_time: OnceLock<Instant>,
}

impl RequestTimingTracker {
/// Create a new timing tracker, capturing the current time as request received.
pub fn new() -> Self {
let now = Instant::now();
let epoch_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);

RequestTimingTracker {
request_received: now,
request_received_epoch_ms: epoch_ms,
first_token_time: OnceLock::new(),
request_finish_time: OnceLock::new(),
}
}

pub fn record_first_token(&self) -> bool {
self.first_token_time.set(Instant::now()).is_ok()
}

pub fn record_finish(&self) -> bool {
self.request_finish_time.set(Instant::now()).is_ok()
}

pub fn ttft_ms(&self) -> Option<f64> {
self.first_token_time
.get()
.map(|t| t.duration_since(self.request_received).as_secs_f64() * 1000.0)
}

pub fn total_time_ms(&self) -> Option<f64> {
self.request_finish_time
.get()
.map(|t| t.duration_since(self.request_received).as_secs_f64() * 1000.0)
}

pub fn request_received_epoch_ms(&self) -> u64 {
self.request_received_epoch_ms
}

pub fn get_timing_info(&self) -> TimingInfo {
TimingInfo {
request_received_ms: self.request_received_epoch_ms,
ttft_ms: self.ttft_ms(),
total_time_ms: self.total_time_ms(),
}
}
}

impl Default for RequestTimingTracker {
fn default() -> Self {
Self::new()
}
}

/// Timing information for response injection.
///
/// This struct is serialized and included in the response's `nvext` field
/// when the client requests timing information via `extra_fields: ["timing"]`.
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub struct TimingInfo {
/// When the request was received (epoch milliseconds)
pub request_received_ms: u64,

/// Time to first token in milliseconds
#[serde(skip_serializing_if = "Option::is_none")]
pub ttft_ms: Option<f64>,

/// Total request time in milliseconds
#[serde(skip_serializing_if = "Option::is_none")]
pub total_time_ms: Option<f64>,
}
64 changes: 51 additions & 13 deletions lib/llm/src/protocols/openai/chat_completions/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use super::{NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse}
use crate::{
local_model::runtime_config::ModelRuntimeConfig,
protocols::{
common,
openai::nvext::{NvExtResponse, WorkerIdInfo},
common::{self, timing::RequestTimingTracker},
openai::nvext::{NvExtProvider, NvExtResponse, TimingInfo, WorkerIdInfo},
},
types::TokenIdType,
};
Expand Down Expand Up @@ -44,6 +44,12 @@ impl NvCreateChatCompletionRequest {
/// # Returns
/// * [`DeltaGenerator`] configured with model name and response options.
pub fn response_generator(&self, request_id: String) -> DeltaGenerator {
// Check if client requested timing in extra_fields
let enable_timing = self
.nvext()
.and_then(|nv| nv.extra_fields.as_ref())
.is_some_and(|fields| fields.iter().any(|f| f == "timing"));

let options = DeltaGeneratorOptions {
enable_usage: self
.inner
Expand All @@ -53,6 +59,7 @@ impl NvCreateChatCompletionRequest {
.unwrap_or(false),
enable_logprobs: self.inner.logprobs.unwrap_or(false)
|| self.inner.top_logprobs.unwrap_or(0) > 0,
enable_timing,
runtime_config: ModelRuntimeConfig::default(),
};

Expand All @@ -67,12 +74,13 @@ pub struct DeltaGeneratorOptions {
pub enable_usage: bool,
/// Determines whether log probabilities should be included in the response.
pub enable_logprobs: bool,
/// Determines whether timing information should be included in the response's nvext.
pub enable_timing: bool,

pub runtime_config: ModelRuntimeConfig,
}

/// Generates incremental chat completion responses in a streaming fashion.
#[derive(Debug)]
pub struct DeltaGenerator {
/// Unique identifier for the chat completion session.
id: String,
Expand All @@ -91,6 +99,8 @@ pub struct DeltaGenerator {
msg_counter: u64,
/// Configuration options for response generation.
options: DeltaGeneratorOptions,
/// Optional timing tracker for per-request timing metrics.
timing_tracker: Option<RequestTimingTracker>,
}

impl DeltaGenerator {
Expand Down Expand Up @@ -123,6 +133,13 @@ impl DeltaGenerator {

let chatcmpl_id = format!("chatcmpl-{request_id}");

// Create timing tracker if timing is enabled
let timing_tracker = if options.enable_timing {
Some(RequestTimingTracker::new())
} else {
None
};

Self {
id: chatcmpl_id,
object: "chat.completion.chunk".to_string(),
Expand All @@ -133,6 +150,7 @@ impl DeltaGenerator {
usage,
msg_counter: 0,
options,
timing_tracker,
}
}

Expand Down Expand Up @@ -365,24 +383,44 @@ impl crate::protocols::openai::DeltaGeneratorExt<NvCreateChatCompletionStreamRes
let index = 0;
let mut stream_response = self.create_choice(index, delta.text, finish_reason, logprobs);

// Extract worker_id from disaggregated_params and inject into nvext if present
if let Some(worker_id_info) = delta
// Record first token time (only succeeds on first call due to OnceLock)
if let Some(ref tracker) = self.timing_tracker {
tracker.record_first_token();
}

// Extract worker_id from disaggregated_params
let worker_id_info = delta
.disaggregated_params
.as_ref()
.and_then(|params| params.get("worker_id"))
.and_then(|v| serde_json::from_value::<WorkerIdInfo>(v.clone()).ok())
{
.and_then(|v| serde_json::from_value::<WorkerIdInfo>(v.clone()).ok());

// Get timing info if this is the final response (has finish_reason)
let timing_info: Option<TimingInfo> = if finish_reason.is_some() {
self.timing_tracker.as_ref().map(|tracker| {
tracker.record_finish();
tracker.get_timing_info()
})
} else {
None
};

// Inject nvext if we have worker_id or timing
if worker_id_info.is_some() || timing_info.is_some() {
let nvext_response = NvExtResponse {
worker_id: Some(worker_id_info.clone()),
worker_id: worker_id_info.clone(),
timing: timing_info,
};

if let Ok(nvext_json) = serde_json::to_value(&nvext_response) {
stream_response.nvext = Some(nvext_json);
tracing::debug!(
"Injected worker_id into chat completion nvext: prefill={:?}, decode={:?}",
worker_id_info.prefill_worker_id,
worker_id_info.decode_worker_id
);
if let Some(ref info) = worker_id_info {
tracing::debug!(
"Injected worker_id into chat completion nvext: prefill={:?}, decode={:?}",
info.prefill_worker_id,
info.decode_worker_id
);
}
}
}

Expand Down
62 changes: 49 additions & 13 deletions lib/llm/src/protocols/openai/completions/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
use super::{NvCreateCompletionRequest, NvCreateCompletionResponse};
use crate::{
protocols::{
common,
openai::nvext::{NvExtResponse, WorkerIdInfo},
common::{self, timing::RequestTimingTracker},
openai::nvext::{NvExtProvider, NvExtResponse, TimingInfo, WorkerIdInfo},
},
types::TokenIdType,
};
Expand Down Expand Up @@ -39,6 +39,12 @@ impl NvCreateCompletionRequest {
// put this method on the request
// inspect the request to extract options
pub fn response_generator(&self, request_id: String) -> DeltaGenerator {
// Check if client requested timing in extra_fields
let enable_timing = self
.nvext()
.and_then(|nv| nv.extra_fields.as_ref())
.is_some_and(|fields| fields.iter().any(|f| f == "timing"));

let options = DeltaGeneratorOptions {
enable_usage: self
.inner
Expand All @@ -47,6 +53,7 @@ impl NvCreateCompletionRequest {
.map(|opts| opts.include_usage)
.unwrap_or(false),
enable_logprobs: self.inner.logprobs.unwrap_or(0) > 0,
enable_timing,
};

DeltaGenerator::new(self.inner.model.clone(), options, request_id)
Expand All @@ -57,9 +64,9 @@ impl NvCreateCompletionRequest {
pub struct DeltaGeneratorOptions {
pub enable_usage: bool,
pub enable_logprobs: bool,
pub enable_timing: bool,
}

#[derive(Debug, Clone)]
pub struct DeltaGenerator {
id: String,
object: String,
Expand All @@ -68,6 +75,7 @@ pub struct DeltaGenerator {
system_fingerprint: Option<String>,
usage: dynamo_async_openai::types::CompletionUsage,
options: DeltaGeneratorOptions,
timing_tracker: Option<RequestTimingTracker>,
}

impl DeltaGenerator {
Expand All @@ -93,6 +101,13 @@ impl DeltaGenerator {

let completion_id = format!("cmpl-{request_id}");

// Create timing tracker if timing is enabled
let timing_tracker = if options.enable_timing {
Some(RequestTimingTracker::new())
} else {
None
};

Self {
id: completion_id,
object: "text_completion".to_string(),
Expand All @@ -101,6 +116,7 @@ impl DeltaGenerator {
system_fingerprint: None,
usage,
options,
timing_tracker,
}
}

Expand Down Expand Up @@ -271,24 +287,44 @@ impl crate::protocols::openai::DeltaGeneratorExt<NvCreateCompletionResponse> for
let index = delta.index.unwrap_or(0);
let mut response = self.create_choice(index, delta.text.clone(), finish_reason, logprobs);

// Extract worker_id from disaggregated_params and inject into nvext if present
if let Some(worker_id_info) = delta
// Record first token time (only succeeds on first call due to OnceLock)
if let Some(ref tracker) = self.timing_tracker {
tracker.record_first_token();
}

// Extract worker_id from disaggregated_params
let worker_id_info = delta
.disaggregated_params
.as_ref()
.and_then(|params| params.get("worker_id"))
.and_then(|v| serde_json::from_value::<WorkerIdInfo>(v.clone()).ok())
{
.and_then(|v| serde_json::from_value::<WorkerIdInfo>(v.clone()).ok());

// Get timing info if this is the final response (has finish_reason)
let timing_info: Option<TimingInfo> = if finish_reason.is_some() {
self.timing_tracker.as_ref().map(|tracker| {
tracker.record_finish();
tracker.get_timing_info()
})
} else {
None
};

// Inject nvext if we have worker_id or timing
if worker_id_info.is_some() || timing_info.is_some() {
let nvext_response = NvExtResponse {
worker_id: Some(worker_id_info.clone()),
worker_id: worker_id_info.clone(),
timing: timing_info,
};

if let Ok(nvext_json) = serde_json::to_value(&nvext_response) {
response.inner.nvext = Some(nvext_json);
tracing::debug!(
"Injected worker_id into completions nvext: prefill={:?}, decode={:?}",
worker_id_info.prefill_worker_id,
worker_id_info.decode_worker_id
);
if let Some(ref info) = worker_id_info {
tracing::debug!(
"Injected worker_id into completions nvext: prefill={:?}, decode={:?}",
info.prefill_worker_id,
info.decode_worker_id
);
}
}
}

Expand Down
Loading
Loading