Skip to content

Commit b023688

Browse files
Fixes
Signed-off-by: Zhongxuan Wang <[email protected]>
1 parent 4f437c1 commit b023688

File tree

4 files changed

+32
-26
lines changed

4 files changed

+32
-26
lines changed

components/src/dynamo/vllm/handlers.py

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ def build_sampling_params(
7474
return sampling_params
7575

7676

77-
def _should_include_timing_metrics(request: Dict[str, Any]) -> bool:
77+
def _request_contains_timing_metrics(request: Dict[str, Any]) -> bool:
7878
"""Check if timing_metrics is requested in extra_fields."""
7979
extra_fields: Optional[List[str]] = request.get("extra_fields")
8080
if extra_fields is None:
@@ -259,10 +259,10 @@ async def generate_tokens(
259259
out = {"token_ids": output.token_ids[num_output_tokens_so_far:]}
260260
if output.finish_reason:
261261
out["finish_reason"] = output.finish_reason
262-
out[
263-
"completion_usage"
264-
] = BaseWorkerHandler._build_completion_usage(
265-
request_output=res,
262+
out["completion_usage"] = (
263+
BaseWorkerHandler._build_completion_usage(
264+
request_output=res,
265+
)
266266
)
267267
if output.stop_reason:
268268
out["stop_reason"] = output.stop_reason
@@ -306,12 +306,14 @@ async def generate(self, request, context):
306306
logger.debug(f"Decode Request ID: {request_id}")
307307

308308
# Check if timing metrics are requested
309-
include_timing = _should_include_timing_metrics(request)
309+
include_timing = _request_contains_timing_metrics(request)
310310

311311
# Initialize timing metrics using request_received_seconds from frontend (passed via PreprocessedRequest)
312-
timing_metrics: Optional[Dict[str, float]] = None
312+
# NOTE: If frontend, prefill workers, and decode workers are running on different machines,
313+
# there may be slight clock drifts between them. As a result, timing values recorded on
314+
# different machines may not be perfectly synchronized and could show minor inconsistencies.
315+
timing_metrics: Dict[str, float] = {}
313316
if include_timing:
314-
timing_metrics = {}
315317
# Use request_received_seconds from the request (set by frontend) if available
316318
frontend_received = request.get("request_received_seconds")
317319
if frontend_received is not None:
@@ -394,9 +396,7 @@ async def generate(self, request, context):
394396
# On finish, record decode_end_seconds and inject timing_metrics
395397
# Note: request_finish_seconds is set in the Rust HTTP layer when the response actually leaves the server
396398
if tok.get("finish_reason") is not None and include_timing:
397-
timing_metrics[
398-
"decode_end_seconds"
399-
] = time.time()
399+
timing_metrics["decode_end_seconds"] = time.time()
400400

401401
# Inject timing_metrics into disaggregated_params
402402
if (
@@ -439,12 +439,14 @@ async def generate(self, request, context):
439439
logger.debug(f"Prefill Request ID: {request_id}")
440440

441441
# Check if timing metrics are requested
442-
include_timing = _should_include_timing_metrics(request)
442+
include_timing = _request_contains_timing_metrics(request)
443443

444444
# Initialize timing metrics using request_received_seconds from frontend (passed via PreprocessedRequest)
445-
timing_metrics: Optional[Dict[str, float]] = None
445+
# NOTE: If frontend, prefill workers, and decode workers are running on different machines,
446+
# there may be slight clock drifts between them. As a result, timing values recorded on
447+
# different machines may not be perfectly synchronized and could show minor inconsistencies.
448+
timing_metrics: Dict[str, float] = {}
446449
if include_timing:
447-
timing_metrics = {}
448450
# Use request_received_seconds from the request (set by frontend) if available
449451
frontend_received = request.get("request_received_seconds")
450452
if frontend_received is not None:
@@ -509,14 +511,12 @@ async def generate(self, request, context):
509511
disaggregated_params: Optional[Dict[str, Any]] = {}
510512

511513
if res.kv_transfer_params:
512-
disaggregated_params[
513-
"kv_transfer_params"
514-
] = res.kv_transfer_params
514+
disaggregated_params["kv_transfer_params"] = (
515+
res.kv_transfer_params
516+
)
515517

516518
if include_timing and timing_metrics:
517-
timing_metrics[
518-
"prefill_end_seconds"
519-
] = time.time()
519+
timing_metrics["prefill_end_seconds"] = time.time()
520520
disaggregated_params["timing_metrics"] = timing_metrics
521521

522522
output: Dict[str, Any] = {

components/src/dynamo/vllm/tests/test_vllm_extra_fields.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from dynamo.vllm.handlers import ( # noqa: E402
2020
DecodeWorkerHandler,
2121
PrefillWorkerHandler,
22-
_should_include_timing_metrics,
22+
_request_contains_timing_metrics,
2323
)
2424

2525

@@ -32,22 +32,22 @@
3232

3333

3434
class TestShouldIncludeTimingMetrics:
35-
"""Tests for _should_include_timing_metrics helper function."""
35+
"""Tests for _request_contains_timing_metrics helper function."""
3636

3737
def test_returns_true_with_multiple_extra_fields(self):
3838
"""Timing metrics should be included when explicitly requested."""
3939
request = {"extra_fields": ["worker_id", "timing_metrics", "other_field"]}
40-
assert _should_include_timing_metrics(request) is True
40+
assert _request_contains_timing_metrics(request) is True
4141

4242
def test_returns_false_when_extra_fields_is_none(self):
4343
"""Timing metrics should not be included when extra_fields is None."""
4444
request = {"extra_fields": None}
45-
assert _should_include_timing_metrics(request) is False
45+
assert _request_contains_timing_metrics(request) is False
4646

4747
def test_returns_false_when_extra_fields_missing(self):
4848
"""Timing metrics should not be included when extra_fields key is absent."""
4949
request: dict[str, list[str]] = {}
50-
assert _should_include_timing_metrics(request) is False
50+
assert _request_contains_timing_metrics(request) is False
5151

5252

5353
def make_mock_request_output(

lib/llm/src/http/service/openai.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,9 @@ async fn handler_completions(
305305
Json(mut request): Json<NvCreateCompletionRequest>,
306306
) -> Result<Response, ErrorResponse> {
307307
// Capture received timestamp immediately when request arrives at the frontend
308+
// NOTE: If frontend, prefill workers, and decode workers are running on different machines,
309+
// there may be slight clock drifts between them. As a result, timing values recorded on
310+
// different machines may not be perfectly synchronized and could show minor inconsistencies.
308311
let request_received_seconds = SystemTime::now()
309312
.duration_since(UNIX_EPOCH)
310313
.map(|d| d.as_secs_f32())
@@ -739,6 +742,9 @@ async fn handler_chat_completions(
739742
Json(mut request): Json<NvCreateChatCompletionRequest>,
740743
) -> Result<Response, ErrorResponse> {
741744
// Capture received timestamp immediately when request arrives at the frontend
745+
// NOTE: If frontend, prefill workers, and decode workers are running on different machines,
746+
// there may be slight clock drifts between them. As a result, timing values recorded on
747+
// different machines may not be perfectly synchronized and could show minor inconsistencies.
742748
let request_received_seconds = SystemTime::now()
743749
.duration_since(UNIX_EPOCH)
744750
.map(|d| d.as_secs_f32())

lib/llm/src/protocols/common/preprocessor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ pub struct PreprocessedRequest {
106106
/// Used for timing metrics to track end-to-end latency
107107
#[builder(default)]
108108
#[serde(default, skip_serializing_if = "Option::is_none")]
109-
pub request_received_seconds: Option<f64>,
109+
pub request_received_seconds: Option<f32>,
110110
}
111111

112112
impl PreprocessedRequest {

0 commit comments

Comments
 (0)