diff --git a/lib/bindings/python/rust/engine.rs b/lib/bindings/python/rust/engine.rs index 6105ef687d..c32e338fb3 100644 --- a/lib/bindings/python/rust/engine.rs +++ b/lib/bindings/python/rust/engine.rs @@ -128,6 +128,9 @@ impl PythonServerStreamingEngine { #[derive(Debug, thiserror::Error)] enum ResponseProcessingError { + #[error("python value error exception: {0}")] + ValueError(String), + #[error("python exception: {0}")] PythonException(String), @@ -237,7 +240,8 @@ where Err(e) => { done = true; - let msg = match &e { + // Returns (optional_http_code, error_message) + let (code, msg) = match &e { ResponseProcessingError::DeserializeError(e) => { // tell the python async generator to stop generating // right now, this is impossible as we are not passing the context to the python async generator @@ -247,28 +251,39 @@ where "critical error: invalid response object from python async generator; application-logic-mismatch: {}", e ); - msg + (None, msg) } ResponseProcessingError::PyGeneratorExit(_) => { - "Stream ended before generation completed".to_string() + (None, "Stream ended before generation completed".to_string()) + } + ResponseProcessingError::ValueError(e) => { + let msg = format!( + "a python value error exception was caught while processing the async generator: {}", + e + ); + // ValueError should return 400 Bad Request + (Some(400u16), msg) } ResponseProcessingError::PythonException(e) => { let msg = format!( "a python exception was caught while processing the async generator: {}", e ); - msg + (None, msg) } ResponseProcessingError::OffloadError(e) => { let msg = format!( "critical error: failed to offload the python async generator to a new thread: {}", e ); - msg + (None, msg) } }; - Annotated::from_error(msg) + match code { + Some(code) => Annotated::from_error_with_code(code, msg), + None => Annotated::from_error(msg), + } } }; @@ -310,12 +325,16 @@ where let item = item.map_err(|e| { println!(); let mut is_py_generator_exit = false; + let mut is_py_value_error = false; Python::with_gil(|py| { e.display(py); is_py_generator_exit = e.is_instance_of::(py); + is_py_value_error = e.is_instance_of::(py); }); if is_py_generator_exit { ResponseProcessingError::PyGeneratorExit(e.to_string()) + } else if is_py_value_error { + ResponseProcessingError::ValueError(e.to_string()) } else { ResponseProcessingError::PythonException(e.to_string()) } diff --git a/lib/llm/src/audit/stream.rs b/lib/llm/src/audit/stream.rs index 46e2bb318a..1413e72be2 100644 --- a/lib/llm/src/audit/stream.rs +++ b/lib/llm/src/audit/stream.rs @@ -64,7 +64,7 @@ where let _ = tx.send(final_resp); } Err(e) => { - tracing::warn!("audit: aggregation failed: {e}"); + tracing::warn!("audit: aggregation failed: {:?}", e); } } }); @@ -123,7 +123,7 @@ where final_response_to_one_chunk_stream(final_resp) } Err(e) => { - tracing::warn!("fold aggregation failed: {e}"); + tracing::warn!("fold aggregation failed: {:?}", e); let fallback = NvCreateChatCompletionResponse { id: String::new(), created: 0, @@ -237,6 +237,7 @@ pub fn final_response_to_one_chunk_stream( id: None, event: None, comment: None, + error_code: None, }; Box::pin(futures::stream::once(async move { annotated })) } diff --git a/lib/llm/src/engines.rs b/lib/llm/src/engines.rs index 49c800fd7e..0e482b3ca1 100644 --- a/lib/llm/src/engines.rs +++ b/lib/llm/src/engines.rs @@ -160,12 +160,12 @@ impl // we are returning characters not tokens, so there will be some postprocessing overhead tokio::time::sleep(*TOKEN_ECHO_DELAY).await; let response = deltas.create_choice(0, Some(c.to_string()), None, None); - yield Annotated{ id: Some(id.to_string()), data: Some(response), event: None, comment: None }; + yield Annotated{ id: Some(id.to_string()), data: Some(response), event: None, comment: None, error_code: None }; id += 1; } let response = deltas.create_choice(0, None, Some(dynamo_async_openai::types::FinishReason::Stop), None); - yield Annotated { id: Some(id.to_string()), data: Some(response), event: None, comment: None }; + yield Annotated { id: Some(id.to_string()), data: Some(response), event: None, comment: None, error_code: None }; }; Ok(ResponseStream::new(Box::pin(output), ctx)) @@ -193,11 +193,11 @@ impl for c in chars_string.chars() { tokio::time::sleep(*TOKEN_ECHO_DELAY).await; let response = deltas.create_choice(0, Some(c.to_string()), None, None); - yield Annotated{ id: Some(id.to_string()), data: Some(response), event: None, comment: None }; + yield Annotated{ id: Some(id.to_string()), data: Some(response), event: None, comment: None, error_code: None }; id += 1; } let response = deltas.create_choice(0, None, Some(dynamo_async_openai::types::CompletionFinishReason::Stop), None); - yield Annotated { id: Some(id.to_string()), data: Some(response), event: None, comment: None }; + yield Annotated { id: Some(id.to_string()), data: Some(response), event: None, comment: None, error_code: None }; }; diff --git a/lib/llm/src/http/service/openai.rs b/lib/llm/src/http/service/openai.rs index 145c253d2f..3bb1fe4d29 100644 --- a/lib/llm/src/http/service/openai.rs +++ b/lib/llm/src/http/service/openai.rs @@ -763,7 +763,12 @@ fn extract_backend_error_if_present( return Some((message, code)); } - return Some((comment_str, StatusCode::INTERNAL_SERVER_ERROR)); + // Fallback: use error_code from Annotated if present, otherwise 500 + let status_code = event + .error_code + .and_then(|c| StatusCode::from_u16(c).ok()) + .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR); + return Some((comment_str, status_code)); } // Check if the data payload itself contains an error structure with code >= 400 @@ -799,7 +804,12 @@ fn extract_backend_error_if_present( // Comments present with no data AND no event type indicates error // (events with event types like "request_id" or "event.dynamo.test.sentinel" are annotations) if event.data.is_none() && event.event.is_none() { - return Some((comment_str, StatusCode::INTERNAL_SERVER_ERROR)); + // Use error_code from Annotated if present, otherwise 500 + let status_code = event + .error_code + .and_then(|c| StatusCode::from_u16(c).ok()) + .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR); + return Some((comment_str, status_code)); } } @@ -1001,16 +1011,24 @@ async fn chat_completions( let response = NvCreateChatCompletionResponse::from_annotated_stream(stream, parsing_options.clone()) .await - .map_err(|e| { + .map_err(|(code, msg)| { tracing::error!( request_id, - "Failed to parse chat completion response: {:?}", - e + "Failed to fold chat completions stream for: {:?}", + msg ); - ErrorMessage::internal_server_error(&format!( - "Failed to parse chat completion response: {}", - e - )) + // Use the error code if provided, otherwise default to 500 + if let Some(code) = code { + ErrorMessage::from_http_error(HttpError { + code, + message: format!("Failed to fold chat completions stream: {}", msg), + }) + } else { + ErrorMessage::internal_server_error(&format!( + "Failed to fold chat completions stream: {}", + msg + )) + } })?; inflight_guard.mark_ok(); @@ -1226,16 +1244,24 @@ async fn responses( let response = NvCreateChatCompletionResponse::from_annotated_stream(stream, parsing_options.clone()) .await - .map_err(|e| { + .map_err(|(code, msg)| { tracing::error!( request_id, "Failed to fold chat completions stream for: {:?}", - e + msg ); - ErrorMessage::internal_server_error(&format!( - "Failed to fold chat completions stream: {}", - e - )) + // Use the error code if provided, otherwise default to 500 + if let Some(code) = code { + ErrorMessage::from_http_error(HttpError { + code, + message: format!("Failed to fold chat completions stream: {}", msg), + }) + } else { + ErrorMessage::internal_server_error(&format!( + "Failed to fold chat completions stream: {}", + msg + )) + } })?; // Convert NvCreateChatCompletionResponse --> NvResponse diff --git a/lib/llm/src/preprocessor.rs b/lib/llm/src/preprocessor.rs index b7bfd8ac4c..bfa056e8fe 100644 --- a/lib/llm/src/preprocessor.rs +++ b/lib/llm/src/preprocessor.rs @@ -708,6 +708,7 @@ impl OpenAIPreprocessor { data, event: Some(ANNOTATION_LLM_METRICS.to_string()), comment: annotation.comment, + error_code: None, }; tracing::trace!( diff --git a/lib/llm/src/protocols/codec.rs b/lib/llm/src/protocols/codec.rs index 6544e2a501..5e5de468af 100644 --- a/lib/llm/src/protocols/codec.rs +++ b/lib/llm/src/protocols/codec.rs @@ -108,6 +108,7 @@ where id: value.id, event: value.event, comment: value.comments, + error_code: None, }) } } diff --git a/lib/llm/src/protocols/openai/chat_completions/aggregator.rs b/lib/llm/src/protocols/openai/chat_completions/aggregator.rs index 6e178bc25a..f1589122c7 100644 --- a/lib/llm/src/protocols/openai/chat_completions/aggregator.rs +++ b/lib/llm/src/protocols/openai/chat_completions/aggregator.rs @@ -30,8 +30,8 @@ pub struct DeltaAggregator { system_fingerprint: Option, /// Map of incremental response choices, keyed by index. choices: HashMap, - /// Optional error message if an error occurs during aggregation. - error: Option, + /// Optional error (code, message) if an error occurs during aggregation. + error: Option<(Option, String)>, /// Optional service tier information for the response. service_tier: Option, /// Aggregated nvext field from stream responses @@ -111,18 +111,18 @@ impl DeltaAggregator { /// /// # Returns /// * `Ok(NvCreateChatCompletionResponse)` if aggregation is successful. - /// * `Err(String)` if an error occurs during processing. + /// * `Err((Option, String))` if an error occurs during processing, with optional HTTP status code. pub async fn apply( stream: impl Stream>, _parsing_options: ParsingOptions, - ) -> Result { + ) -> Result, String)> { let aggregator = stream .fold(DeltaAggregator::new(), |mut aggregator, delta| async move { // Attempt to unwrap the delta, capturing any errors. let delta = match delta.ok() { Ok(delta) => delta, - Err(error) => { - aggregator.error = Some(error); + Err((code, error)) => { + aggregator.error = Some((code, error)); return aggregator; } }; @@ -312,11 +312,11 @@ pub trait ChatCompletionAggregator { /// /// # Returns /// * `Ok(NvCreateChatCompletionResponse)` if aggregation succeeds. - /// * `Err(String)` if an error occurs. + /// * `Err((Option, String))` if an error occurs, with optional HTTP status code. async fn from_annotated_stream( stream: impl Stream>, parsing_options: ParsingOptions, - ) -> Result; + ) -> Result, String)>; /// Converts an SSE stream into a [`NvCreateChatCompletionResponse`]. /// @@ -325,25 +325,25 @@ pub trait ChatCompletionAggregator { /// /// # Returns /// * `Ok(NvCreateChatCompletionResponse)` if aggregation succeeds. - /// * `Err(String)` if an error occurs. + /// * `Err((Option, String))` if an error occurs, with optional HTTP status code. async fn from_sse_stream( stream: DataStream>, parsing_options: ParsingOptions, - ) -> Result; + ) -> Result, String)>; } impl ChatCompletionAggregator for dynamo_async_openai::types::CreateChatCompletionResponse { async fn from_annotated_stream( stream: impl Stream>, parsing_options: ParsingOptions, - ) -> Result { + ) -> Result, String)> { DeltaAggregator::apply(stream, parsing_options).await } async fn from_sse_stream( stream: DataStream>, parsing_options: ParsingOptions, - ) -> Result { + ) -> Result, String)> { let stream = convert_sse_stream::(stream); NvCreateChatCompletionResponse::from_annotated_stream(stream, parsing_options).await } @@ -428,6 +428,7 @@ mod tests { id: Some("test_id".to_string()), event: None, comment: None, + error_code: None, } } @@ -652,6 +653,7 @@ mod tests { id: Some("test_id".to_string()), event: None, comment: None, + error_code: None, }; let stream = Box::pin(stream::iter(vec![annotated_delta])); @@ -711,6 +713,7 @@ mod tests { id: Some("test_id".to_string()), event: None, comment: None, + error_code: None, }; let stream = Box::pin(stream::iter(vec![annotated_delta])); @@ -753,6 +756,7 @@ mod tests { id: Some("test_id".to_string()), event: None, comment: None, + error_code: None, }; let stream = Box::pin(stream::iter(vec![annotated_delta])); @@ -795,6 +799,7 @@ mod tests { id: Some("test_id".to_string()), event: None, comment: None, + error_code: None, }; let stream = Box::pin(stream::iter(vec![annotated_delta])); @@ -835,6 +840,7 @@ mod tests { id: Some("test_id".to_string()), event: None, comment: None, + error_code: None, }; let stream = Box::pin(stream::iter(vec![annotated_delta])); @@ -879,6 +885,7 @@ mod tests { id: Some("test_id".to_string()), event: None, comment: None, + error_code: None, }; let stream = Box::pin(stream::iter(vec![annotated_delta])); @@ -921,6 +928,7 @@ mod tests { id: Some("test_id".to_string()), event: None, comment: None, + error_code: None, }; let stream = Box::pin(stream::iter(vec![annotated_delta])); diff --git a/lib/llm/src/protocols/openai/chat_completions/jail.rs b/lib/llm/src/protocols/openai/chat_completions/jail.rs index 2a716cc743..ec23b0dd4e 100644 --- a/lib/llm/src/protocols/openai/chat_completions/jail.rs +++ b/lib/llm/src/protocols/openai/chat_completions/jail.rs @@ -680,6 +680,7 @@ impl JailedStream { id, event, comment, + error_code: None, }] } EmissionMode::SingleChoicePerChunk => { @@ -695,6 +696,7 @@ impl JailedStream { id: id.clone(), event: event.clone(), comment: comment.clone(), + error_code: None, } }) .collect() diff --git a/lib/llm/src/protocols/openai/completions/aggregator.rs b/lib/llm/src/protocols/openai/completions/aggregator.rs index 51b9aae1db..c042b0473f 100644 --- a/lib/llm/src/protocols/openai/completions/aggregator.rs +++ b/lib/llm/src/protocols/openai/completions/aggregator.rs @@ -23,7 +23,7 @@ pub struct DeltaAggregator { usage: Option, system_fingerprint: Option, choices: HashMap, - error: Option, + error: Option<(Option, String)>, nvext: Option, } @@ -64,8 +64,8 @@ impl DeltaAggregator { .fold(DeltaAggregator::new(), |mut aggregator, delta| async move { let delta = match delta.ok() { Ok(delta) => delta, - Err(error) => { - aggregator.error = Some(error); + Err((code, error)) => { + aggregator.error = Some((code, error)); return aggregator; } }; @@ -146,7 +146,7 @@ impl DeltaAggregator { .await; // If we have an error, return it - let aggregator = if let Some(error) = aggregator.error { + let aggregator = if let Some((_code, error)) = aggregator.error { return Err(anyhow::anyhow!(error)); } else { aggregator diff --git a/lib/llm/src/protocols/openai/embeddings/aggregator.rs b/lib/llm/src/protocols/openai/embeddings/aggregator.rs index 22bc7739e2..297405b183 100644 --- a/lib/llm/src/protocols/openai/embeddings/aggregator.rs +++ b/lib/llm/src/protocols/openai/embeddings/aggregator.rs @@ -18,7 +18,7 @@ pub struct DeltaAggregator { /// The accumulated embeddings response. response: Option, /// Optional error message if an error occurs during aggregation. - error: Option, + error: Option<(Option, String)>, } impl Default for DeltaAggregator { @@ -54,8 +54,8 @@ impl DeltaAggregator { // Attempt to unwrap the delta, capturing any errors. let delta = match delta.ok() { Ok(delta) => delta, - Err(error) => { - aggregator.error = Some(error); + Err((code, error)) => { + aggregator.error = Some((code, error)); return aggregator; } }; @@ -85,7 +85,7 @@ impl DeltaAggregator { .await; // Return early if an error was encountered. - if let Some(error) = aggregator.error { + if let Some((_code, error)) = aggregator.error { return Err(error); } diff --git a/lib/llm/src/protocols/tensor.rs b/lib/llm/src/protocols/tensor.rs index 9bc0668895..be55d4b91f 100644 --- a/lib/llm/src/protocols/tensor.rs +++ b/lib/llm/src/protocols/tensor.rs @@ -259,7 +259,7 @@ impl AnnotationsProvider for NvCreateTensorRequest { pub struct DeltaAggregator { response: Option, - error: Option, + error: Option<(Option, String)>, } impl NvCreateTensorResponse { @@ -275,9 +275,9 @@ impl NvCreateTensorResponse { |mut aggregator, delta| async move { let delta = match delta.ok() { Ok(delta) => delta, - Err(error) => { + Err((code, error)) => { if aggregator.error.is_none() { - aggregator.error = Some(error); + aggregator.error = Some((code, error)); } return aggregator; } @@ -288,7 +288,7 @@ impl NvCreateTensorResponse { aggregator.response = Some(resp); } else if aggregator.error.is_none() { aggregator.error = - Some("Multiple responses in non-streaming mode".to_string()); + Some((None, "Multiple responses in non-streaming mode".to_string())); } } None => { @@ -299,7 +299,7 @@ impl NvCreateTensorResponse { }, ) .await; - if let Some(error) = aggregator.error { + if let Some((_code, error)) = aggregator.error { Err(anyhow::anyhow!(error)) } else if let Some(response) = aggregator.response { Ok(response) diff --git a/lib/runtime/src/protocols/annotated.rs b/lib/runtime/src/protocols/annotated.rs index 9cc4de21e6..b57f289774 100644 --- a/lib/runtime/src/protocols/annotated.rs +++ b/lib/runtime/src/protocols/annotated.rs @@ -27,6 +27,8 @@ pub struct Annotated { pub event: Option, #[serde(skip_serializing_if = "Option::is_none")] pub comment: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub error_code: Option, } impl Annotated { @@ -37,6 +39,18 @@ impl Annotated { id: None, event: Some("error".to_string()), comment: Some(vec![error]), + error_code: None, + } + } + + /// Create a new annotated stream from the given error with an HTTP status code + pub fn from_error_with_code(code: u16, error: String) -> Self { + Self { + data: None, + id: None, + event: Some("error".to_string()), + comment: Some(vec![error]), + error_code: Some(code), } } @@ -47,6 +61,7 @@ impl Annotated { id: None, event: None, comment: None, + error_code: None, } } @@ -62,19 +77,22 @@ impl Annotated { id: None, event: Some(name.into()), comment: Some(vec![serde_json::to_string(value)?]), + error_code: None, }) } - /// Convert to a [`Result`] - /// If [`Self::event`] is "error", return an error message(s) held by [`Self::comment`] - pub fn ok(self) -> Result { + /// Convert to a [`Result, String)>`] + /// If [`Self::event`] is "error", return the error code and message(s) held by [`Self::comment`] + pub fn ok(self) -> Result, String)> { if let Some(event) = &self.event && event == "error" { - return Err(self - .comment - .unwrap_or(vec!["unknown error".to_string()]) - .join(", ")); + return Err(( + self.error_code, + self.comment + .unwrap_or(vec!["unknown error".to_string()]) + .join(", "), + )); } Ok(self) } @@ -97,6 +115,7 @@ impl Annotated { id: self.id, event: self.event, comment: self.comment, + error_code: self.error_code, } } @@ -112,6 +131,7 @@ impl Annotated { id: self.id, event: self.event, comment: self.comment, + error_code: self.error_code, }, Err(e) => Annotated::from_error(e), }