Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions components/src/dynamo/trtllm/request_handlers/handler_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,12 @@ async def generate_locally(

# 2. Per-request errors - send to client, don't shutdown
except RequestError as e:
logging.warning(f"Request {request_id} error: {e}")
yield {"finish_reason": "error", "token_ids": []}
error_msg = str(e)
logging.warning(f"Request {request_id} error: {error_msg}")
yield {
"finish_reason": {"error": error_msg},
"token_ids": [],
}

# 3. ALL OTHER ERRORS - graceful shutdown
except Exception as e:
Expand All @@ -384,7 +388,7 @@ async def generate_locally(
# Try to send error to client before shutdown
try:
yield {
"finish_reason": "error",
"finish_reason": {"error": error_msg},
"token_ids": [],
}
except Exception:
Expand Down
289 changes: 272 additions & 17 deletions lib/llm/src/http/service/openai.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ use super::{
use crate::engines::ValidateRequest;
use crate::protocols::openai::chat_completions::aggregator::ChatCompletionAggregator;
use crate::protocols::openai::{
chat_completions::{NvCreateChatCompletionRequest, NvCreateChatCompletionResponse},
chat_completions::{
NvCreateChatCompletionRequest, NvCreateChatCompletionResponse,
NvCreateChatCompletionStreamResponse,
},
completions::{NvCreateCompletionRequest, NvCreateCompletionResponse},
embeddings::{NvCreateEmbeddingRequest, NvCreateEmbeddingResponse},
responses::{NvCreateResponse, NvResponse},
Expand Down Expand Up @@ -717,6 +720,119 @@ async fn handler_chat_completions(
response
}

/// Checks if an Annotated event represents a backend error and extracts error information.
/// Returns Some((message, status_code)) if it's an error, None otherwise.
fn extract_backend_error_if_present<T: serde::Serialize>(
event: &Annotated<T>,
) -> Option<(String, StatusCode)> {
#[derive(serde::Deserialize)]
struct ErrorPayload {
message: Option<String>,
code: Option<u16>,
}

// Check if event type is "error" (from postprocessor when FinishReason::Error is encountered)
if let Some(event_type) = &event.event
&& event_type == "error"
{
let comment_str = event.comment
.as_ref()
.map(|c| c.join(", "))
.unwrap_or_else(|| "Unknown error".to_string());

// Try to parse comment as error JSON to extract status code
if let Ok(error_payload) = serde_json::from_str::<ErrorPayload>(&comment_str) {
let code = error_payload.code
.and_then(|c| StatusCode::from_u16(c).ok())
.unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
let message = error_payload.message.unwrap_or(comment_str);
return Some((message, code));
}

return Some((comment_str, StatusCode::INTERNAL_SERVER_ERROR));
}

// Check if the data payload itself contains an error structure with code >= 400
if let Some(data) = &event.data {
if let Ok(json_value) = serde_json::to_value(data) {
if let Ok(error_payload) = serde_json::from_value::<ErrorPayload>(json_value.clone()) {
if let Some(code_num) = error_payload.code {
if code_num >= 400 {
let code = StatusCode::from_u16(code_num)
.unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
let message = error_payload.message.unwrap_or_else(|| {
json_value.to_string()
});
return Some((message, code));
}
}
}
}
}

// Check if comment contains error information (without event: error)
if let Some(comments) = &event.comment
&& !comments.is_empty()
{
let comment_str = comments.join(", ");

// Try to parse comment as error JSON with code >= 400
if let Ok(error_payload) = serde_json::from_str::<ErrorPayload>(&comment_str) {
if let Some(code_num) = error_payload.code {
if code_num >= 400 {
let code = StatusCode::from_u16(code_num)
.unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
let message = error_payload.message.unwrap_or(comment_str);
return Some((message, code));
}
}
}

// Comments present with no data often indicates error
if event.data.is_none() {
return Some((comment_str, StatusCode::INTERNAL_SERVER_ERROR));
}
}

None
}

/// Checks if the first event in the stream is a backend error.
/// Returns Err(ErrorResponse) if error detected, Ok(stream) otherwise.
async fn check_for_backend_error(
mut stream: impl futures::Stream<Item = Annotated<NvCreateChatCompletionStreamResponse>>
+ Send
+ Unpin
+ 'static,
) -> Result<
impl futures::Stream<Item = Annotated<NvCreateChatCompletionStreamResponse>> + Send,
ErrorResponse,
> {
use futures::stream::StreamExt;

// Peek at the first event
if let Some(first_event) = stream.next().await {
// Check if it's an error event
if let Some((error_msg, status_code)) = extract_backend_error_if_present(&first_event) {
return Err((
status_code,
Json(ErrorMessage {
message: error_msg,
error_type: map_error_code_to_error_type(status_code),
code: status_code.as_u16(),
}),
));
}

// Not an error - reconstruct stream with first event
let reconstructed_stream = futures::stream::iter(vec![first_event]).chain(stream);
Ok(reconstructed_stream)
} else {
// Empty stream - this shouldn't happen but handle gracefully
Ok(futures::stream::iter(vec![]).chain(stream))
}
}

/// OpenAI Chat Completions Request Handler
///
/// This method will handle the incoming request for the /v1/chat/completions endpoint. The endpoint is a "source"
Expand Down Expand Up @@ -821,11 +937,17 @@ async fn chat_completions(
// todo - tap the stream and propagate request level metrics
// note - we might do this as part of the post processing set to make it more generic

// Check first event for backend errors before streaming
let stream_with_check = check_for_backend_error(stream).await.map_err(|error_response| {
tracing::error!(request_id, "Backend error detected: {:?}", error_response);
error_response
})?;

if streaming {
stream_handle.arm(); // allows the system to detect client disconnects and cancel the LLM generation

let mut http_queue_guard = Some(http_queue_guard);
let stream = stream.map(move |response| {
let stream = stream_with_check.map(move |response| {
// Calls observe_response() on each token
process_response_using_event_converter_and_observe_metrics(
EventConverter::from(response),
Expand All @@ -844,7 +966,7 @@ async fn chat_completions(
Ok(sse_stream.into_response())
} else {
let mut http_queue_guard = Some(http_queue_guard);
let stream = stream.inspect(move |response| {
let stream = stream_with_check.inspect(move |response| {
// Calls observe_response() on each token - drops http_queue_guard on first token
process_response_and_observe_metrics(
response,
Expand All @@ -853,20 +975,22 @@ async fn chat_completions(
);
});

let response =
NvCreateChatCompletionResponse::from_annotated_stream(stream, parsing_options.clone())
.await
.map_err(|e| {
tracing::error!(
request_id,
"Failed to fold chat completions stream for: {:?}",
e
);
ErrorMessage::internal_server_error(&format!(
"Failed to fold chat completions stream: {}",
e
))
})?;
let response = NvCreateChatCompletionResponse::from_annotated_stream(
stream,
parsing_options.clone(),
)
.await
.map_err(|e| {
tracing::error!(
request_id,
"Failed to parse chat completion response: {:?}",
e
);
ErrorMessage::internal_server_error(&format!(
"Failed to parse chat completion response: {}",
e
))
})?;

inflight_guard.mark_ok();
Ok(Json(response).into_response())
Expand Down Expand Up @@ -2055,4 +2179,135 @@ mod tests {
assert!(msg.contains("response_format"));
}
}

#[tokio::test]
async fn test_check_for_backend_error_with_error_event() {
use crate::types::openai::chat_completions::NvCreateChatCompletionStreamResponse;
use futures::stream;

// Create an error event
let error_event = Annotated::<NvCreateChatCompletionStreamResponse> {
data: None,
id: None,
event: Some("error".to_string()),
comment: Some(vec!["Backend service unavailable".to_string()]),
};

let test_stream = stream::iter(vec![error_event]);
let result = check_for_backend_error(test_stream).await;

// Should return an error
assert!(result.is_err());
if let Err(error_response) = result {
assert_eq!(error_response.0, StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(error_response.1.message, "Backend service unavailable");
}
}

#[tokio::test]
async fn test_check_for_backend_error_with_json_error_and_code() {
use crate::types::openai::chat_completions::NvCreateChatCompletionStreamResponse;
use futures::stream;

// Create an error event with JSON payload containing error code in comment
let error_json = r#"{"message":"prompt > max_seq_len","type":"Internal Server Error","code":500}"#;
let error_event = Annotated::<NvCreateChatCompletionStreamResponse> {
data: None,
id: None,
event: Some("error".to_string()),
comment: Some(vec![error_json.to_string()]),
};

let test_stream = stream::iter(vec![error_event]);
let result = check_for_backend_error(test_stream).await;

// Should return an error with correct status code extracted from JSON
assert!(result.is_err());
if let Err(error_response) = result {
assert_eq!(error_response.0, StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(error_response.1.message, "prompt > max_seq_len");
assert_eq!(error_response.1.code, 500);
}
}

#[tokio::test]
async fn test_check_for_backend_error_with_normal_event() {
use crate::types::openai::chat_completions::NvCreateChatCompletionStreamResponse;
use dynamo_async_openai::types::CreateChatCompletionStreamResponse;
use futures::stream::{self, StreamExt};

// Create a normal data event
let normal_event = Annotated::<NvCreateChatCompletionStreamResponse> {
data: Some(CreateChatCompletionStreamResponse {
id: "test-id".to_string(),
choices: vec![],
created: 0,
model: "test-model".to_string(),
system_fingerprint: None,
object: "chat.completion.chunk".to_string(),
service_tier: None,
usage: None,
nvext: None,
}),
id: Some("msg-1".to_string()),
event: None,
comment: None,
};

let test_stream = stream::iter(vec![normal_event.clone()]);
let result = check_for_backend_error(test_stream).await;

// Should return Ok with the stream
assert!(result.is_ok());
let mut returned_stream = result.unwrap();

// Verify we can read the event back from the stream
let first = returned_stream.next().await;
assert!(first.is_some());
let first_event = first.unwrap();
assert_eq!(first_event.id, Some("msg-1".to_string()));
}

#[tokio::test]
async fn test_check_for_backend_error_with_empty_stream() {
use crate::types::openai::chat_completions::NvCreateChatCompletionStreamResponse;
use futures::stream::{self, StreamExt};

// Create an empty stream
let test_stream =
stream::iter::<Vec<Annotated<NvCreateChatCompletionStreamResponse>>>(vec![]);
let result = check_for_backend_error(test_stream).await;

// Should return Ok with an empty stream
assert!(result.is_ok());
let mut returned_stream = result.unwrap();

// Verify stream is empty
let first = returned_stream.next().await;
assert!(first.is_none());
}

#[tokio::test]
async fn test_check_for_backend_error_with_comment_but_no_event_type() {
use crate::types::openai::chat_completions::NvCreateChatCompletionStreamResponse;
use futures::stream;

// Create an event with comment but no event type and no data (error indicator)
let error_event = Annotated::<NvCreateChatCompletionStreamResponse> {
data: None,
id: None,
event: None,
comment: Some(vec!["Connection timeout".to_string()]),
};

let test_stream = stream::iter(vec![error_event]);
let result = check_for_backend_error(test_stream).await;

// Should return an error based on is_backend_error_event logic
assert!(result.is_err());
if let Err(error_response) = result {
assert_eq!(error_response.0, StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(error_response.1.message, "Connection timeout");
}
}
}
Loading