diff --git a/components/src/dynamo/trtllm/request_handlers/handler_base.py b/components/src/dynamo/trtllm/request_handlers/handler_base.py index 58390bcedc..ccde8acb90 100644 --- a/components/src/dynamo/trtllm/request_handlers/handler_base.py +++ b/components/src/dynamo/trtllm/request_handlers/handler_base.py @@ -369,8 +369,12 @@ async def generate_locally( # 2. Per-request errors - send to client, don't shutdown except RequestError as e: - logging.warning(f"Request {request_id} error: {e}") - yield {"finish_reason": "error", "token_ids": []} + error_msg = str(e) + logging.warning(f"Request {request_id} error: {error_msg}") + yield { + "finish_reason": {"error": error_msg}, + "token_ids": [], + } # 3. ALL OTHER ERRORS - graceful shutdown except Exception as e: @@ -384,7 +388,7 @@ async def generate_locally( # Try to send error to client before shutdown try: yield { - "finish_reason": "error", + "finish_reason": {"error": error_msg}, "token_ids": [], } except Exception: diff --git a/lib/llm/src/http/service/openai.rs b/lib/llm/src/http/service/openai.rs index 4f65f16c10..295c452d1c 100644 --- a/lib/llm/src/http/service/openai.rs +++ b/lib/llm/src/http/service/openai.rs @@ -41,7 +41,10 @@ use super::{ use crate::engines::ValidateRequest; use crate::protocols::openai::chat_completions::aggregator::ChatCompletionAggregator; use crate::protocols::openai::{ - chat_completions::{NvCreateChatCompletionRequest, NvCreateChatCompletionResponse}, + chat_completions::{ + NvCreateChatCompletionRequest, NvCreateChatCompletionResponse, + NvCreateChatCompletionStreamResponse, + }, completions::{NvCreateCompletionRequest, NvCreateCompletionResponse}, embeddings::{NvCreateEmbeddingRequest, NvCreateEmbeddingResponse}, responses::{NvCreateResponse, NvResponse}, @@ -717,6 +720,116 @@ async fn handler_chat_completions( response } +/// Checks if an Annotated event represents a backend error and extracts error information. +/// Returns Some((message, status_code)) if it's an error, None otherwise. +fn extract_backend_error_if_present( + event: &Annotated, +) -> Option<(String, StatusCode)> { + #[derive(serde::Deserialize)] + struct ErrorPayload { + message: Option, + code: Option, + } + + // Check if event type is "error" (from postprocessor when FinishReason::Error is encountered) + if let Some(event_type) = &event.event + && event_type == "error" + { + let comment_str = event + .comment + .as_ref() + .map(|c| c.join(", ")) + .unwrap_or_else(|| "Unknown error".to_string()); + + // Try to parse comment as error JSON to extract status code + if let Ok(error_payload) = serde_json::from_str::(&comment_str) { + let code = error_payload + .code + .and_then(|c| StatusCode::from_u16(c).ok()) + .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR); + let message = error_payload.message.unwrap_or(comment_str); + return Some((message, code)); + } + + return Some((comment_str, StatusCode::INTERNAL_SERVER_ERROR)); + } + + // Check if the data payload itself contains an error structure with code >= 400 + if let Some(data) = &event.data + && let Ok(json_value) = serde_json::to_value(data) + && let Ok(error_payload) = serde_json::from_value::(json_value.clone()) + && let Some(code_num) = error_payload.code + && code_num >= 400 + { + let code = StatusCode::from_u16(code_num).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR); + let message = error_payload + .message + .unwrap_or_else(|| json_value.to_string()); + return Some((message, code)); + } + + // Check if comment contains error information (without event: error) + if let Some(comments) = &event.comment + && !comments.is_empty() + { + let comment_str = comments.join(", "); + + // Try to parse comment as error JSON with code >= 400 + if let Ok(error_payload) = serde_json::from_str::(&comment_str) + && let Some(code_num) = error_payload.code + && code_num >= 400 + { + let code = StatusCode::from_u16(code_num).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR); + let message = error_payload.message.unwrap_or(comment_str); + return Some((message, code)); + } + + // Comments present with no data AND no event type indicates error + // (events with event types like "request_id" or "event.dynamo.test.sentinel" are annotations) + if event.data.is_none() && event.event.is_none() { + return Some((comment_str, StatusCode::INTERNAL_SERVER_ERROR)); + } + } + + None +} + +/// Checks if the first event in the stream is a backend error. +/// Returns Err(ErrorResponse) if error detected, Ok(stream) otherwise. +async fn check_for_backend_error( + mut stream: impl futures::Stream> + + Send + + Unpin + + 'static, +) -> Result< + impl futures::Stream> + Send, + ErrorResponse, +> { + use futures::stream::StreamExt; + + // Peek at the first event + if let Some(first_event) = stream.next().await { + // Check if it's an error event + if let Some((error_msg, status_code)) = extract_backend_error_if_present(&first_event) { + return Err(( + status_code, + Json(ErrorMessage { + message: error_msg, + error_type: map_error_code_to_error_type(status_code), + code: status_code.as_u16(), + }), + )); + } + + // Not an error - reconstruct stream with first event + let reconstructed_stream = futures::stream::iter(vec![first_event]).chain(stream); + Ok(reconstructed_stream) + } else { + // Empty stream - this shouldn't happen but handle gracefully + Ok(futures::stream::iter(vec![]).chain(stream)) + } +} + /// OpenAI Chat Completions Request Handler /// /// This method will handle the incoming request for the /v1/chat/completions endpoint. The endpoint is a "source" @@ -822,11 +935,16 @@ async fn chat_completions( // note - we might do this as part of the post processing set to make it more generic if streaming { + // For streaming responses, we return HTTP 200 immediately without checking for errors. + // Once HTTP 200 OK is sent, we cannot change the status code, so any backend errors + // must be delivered as SSE events with `event: error` in the stream (handled by + // EventConverter and monitor_for_disconnects). This is standard SSE behavior. stream_handle.arm(); // allows the system to detect client disconnects and cancel the LLM generation let mut http_queue_guard = Some(http_queue_guard); let stream = stream.map(move |response| { // Calls observe_response() on each token + // EventConverter will detect `event: "error"` and convert to SSE error events process_response_using_event_converter_and_observe_metrics( EventConverter::from(response), &mut response_collector, @@ -843,8 +961,17 @@ async fn chat_completions( Ok(sse_stream.into_response()) } else { + // Check first event for backend errors before aggregating (non-streaming only) + let stream_with_check = + check_for_backend_error(stream) + .await + .map_err(|error_response| { + tracing::error!(request_id, "Backend error detected: {:?}", error_response); + error_response + })?; + let mut http_queue_guard = Some(http_queue_guard); - let stream = stream.inspect(move |response| { + let stream = stream_with_check.inspect(move |response| { // Calls observe_response() on each token - drops http_queue_guard on first token process_response_and_observe_metrics( response, @@ -859,11 +986,11 @@ async fn chat_completions( .map_err(|e| { tracing::error!( request_id, - "Failed to fold chat completions stream for: {:?}", + "Failed to parse chat completion response: {:?}", e ); ErrorMessage::internal_server_error(&format!( - "Failed to fold chat completions stream: {}", + "Failed to parse chat completion response: {}", e )) })?; @@ -2055,4 +2182,136 @@ mod tests { assert!(msg.contains("response_format")); } } + + #[tokio::test] + async fn test_check_for_backend_error_with_error_event() { + use crate::types::openai::chat_completions::NvCreateChatCompletionStreamResponse; + use futures::stream; + + // Create an error event + let error_event = Annotated:: { + data: None, + id: None, + event: Some("error".to_string()), + comment: Some(vec!["Backend service unavailable".to_string()]), + }; + + let test_stream = stream::iter(vec![error_event]); + let result = check_for_backend_error(test_stream).await; + + // Should return an error + assert!(result.is_err()); + if let Err(error_response) = result { + assert_eq!(error_response.0, StatusCode::INTERNAL_SERVER_ERROR); + assert_eq!(error_response.1.message, "Backend service unavailable"); + } + } + + #[tokio::test] + async fn test_check_for_backend_error_with_json_error_and_code() { + use crate::types::openai::chat_completions::NvCreateChatCompletionStreamResponse; + use futures::stream; + + // Create an error event with JSON payload containing error code in comment + let error_json = + r#"{"message":"prompt > max_seq_len","type":"Internal Server Error","code":500}"#; + let error_event = Annotated:: { + data: None, + id: None, + event: Some("error".to_string()), + comment: Some(vec![error_json.to_string()]), + }; + + let test_stream = stream::iter(vec![error_event]); + let result = check_for_backend_error(test_stream).await; + + // Should return an error with correct status code extracted from JSON + assert!(result.is_err()); + if let Err(error_response) = result { + assert_eq!(error_response.0, StatusCode::INTERNAL_SERVER_ERROR); + assert_eq!(error_response.1.message, "prompt > max_seq_len"); + assert_eq!(error_response.1.code, 500); + } + } + + #[tokio::test] + async fn test_check_for_backend_error_with_normal_event() { + use crate::types::openai::chat_completions::NvCreateChatCompletionStreamResponse; + use dynamo_async_openai::types::CreateChatCompletionStreamResponse; + use futures::stream::{self, StreamExt}; + + // Create a normal data event + let normal_event = Annotated:: { + data: Some(CreateChatCompletionStreamResponse { + id: "test-id".to_string(), + choices: vec![], + created: 0, + model: "test-model".to_string(), + system_fingerprint: None, + object: "chat.completion.chunk".to_string(), + service_tier: None, + usage: None, + nvext: None, + }), + id: Some("msg-1".to_string()), + event: None, + comment: None, + }; + + let test_stream = stream::iter(vec![normal_event.clone()]); + let result = check_for_backend_error(test_stream).await; + + // Should return Ok with the stream + assert!(result.is_ok()); + let mut returned_stream = result.unwrap(); + + // Verify we can read the event back from the stream + let first = returned_stream.next().await; + assert!(first.is_some()); + let first_event = first.unwrap(); + assert_eq!(first_event.id, Some("msg-1".to_string())); + } + + #[tokio::test] + async fn test_check_for_backend_error_with_empty_stream() { + use crate::types::openai::chat_completions::NvCreateChatCompletionStreamResponse; + use futures::stream::{self, StreamExt}; + + // Create an empty stream + let test_stream = + stream::iter::>>(vec![]); + let result = check_for_backend_error(test_stream).await; + + // Should return Ok with an empty stream + assert!(result.is_ok()); + let mut returned_stream = result.unwrap(); + + // Verify stream is empty + let first = returned_stream.next().await; + assert!(first.is_none()); + } + + #[tokio::test] + async fn test_check_for_backend_error_with_comment_but_no_event_type() { + use crate::types::openai::chat_completions::NvCreateChatCompletionStreamResponse; + use futures::stream; + + // Create an event with comment but no event type and no data (error indicator) + let error_event = Annotated:: { + data: None, + id: None, + event: None, + comment: Some(vec!["Connection timeout".to_string()]), + }; + + let test_stream = stream::iter(vec![error_event]); + let result = check_for_backend_error(test_stream).await; + + // Should return an error based on is_backend_error_event logic + assert!(result.is_err()); + if let Err(error_response) = result { + assert_eq!(error_response.0, StatusCode::INTERNAL_SERVER_ERROR); + assert_eq!(error_response.1.message, "Connection timeout"); + } + } }