Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 25 additions & 6 deletions lib/bindings/python/rust/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ impl PythonServerStreamingEngine {

#[derive(Debug, thiserror::Error)]
enum ResponseProcessingError {
#[error("python value error exception: {0}")]
ValueError(String),

#[error("python exception: {0}")]
PythonException(String),

Expand Down Expand Up @@ -237,7 +240,8 @@ where
Err(e) => {
done = true;

let msg = match &e {
// Returns (optional_http_code, error_message)
let (code, msg) = match &e {
ResponseProcessingError::DeserializeError(e) => {
// tell the python async generator to stop generating
// right now, this is impossible as we are not passing the context to the python async generator
Expand All @@ -247,28 +251,39 @@ where
"critical error: invalid response object from python async generator; application-logic-mismatch: {}",
e
);
msg
(None, msg)
}
ResponseProcessingError::PyGeneratorExit(_) => {
"Stream ended before generation completed".to_string()
(None, "Stream ended before generation completed".to_string())
}
ResponseProcessingError::ValueError(e) => {
let msg = format!(
"a python value error exception was caught while processing the async generator: {}",
e
);
// ValueError should return 400 Bad Request
(Some(400u16), msg)
}
ResponseProcessingError::PythonException(e) => {
let msg = format!(
"a python exception was caught while processing the async generator: {}",
e
);
msg
(None, msg)
}
ResponseProcessingError::OffloadError(e) => {
let msg = format!(
"critical error: failed to offload the python async generator to a new thread: {}",
e
);
msg
(None, msg)
}
};

Annotated::from_error(msg)
match code {
Some(code) => Annotated::from_error_with_code(code, msg),
None => Annotated::from_error(msg),
}
}
};

Expand Down Expand Up @@ -310,12 +325,16 @@ where
let item = item.map_err(|e| {
println!();
let mut is_py_generator_exit = false;
let mut is_py_value_error = false;
Python::with_gil(|py| {
e.display(py);
is_py_generator_exit = e.is_instance_of::<pyo3::exceptions::PyGeneratorExit>(py);
is_py_value_error = e.is_instance_of::<pyo3::exceptions::PyValueError>(py);
});
if is_py_generator_exit {
ResponseProcessingError::PyGeneratorExit(e.to_string())
} else if is_py_value_error {
ResponseProcessingError::ValueError(e.to_string())
} else {
ResponseProcessingError::PythonException(e.to_string())
}
Expand Down
5 changes: 3 additions & 2 deletions lib/llm/src/audit/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ where
let _ = tx.send(final_resp);
}
Err(e) => {
tracing::warn!("audit: aggregation failed: {e}");
tracing::warn!("audit: aggregation failed: {:?}", e);
}
}
});
Expand Down Expand Up @@ -123,7 +123,7 @@ where
final_response_to_one_chunk_stream(final_resp)
}
Err(e) => {
tracing::warn!("fold aggregation failed: {e}");
tracing::warn!("fold aggregation failed: {:?}", e);
let fallback = NvCreateChatCompletionResponse {
id: String::new(),
created: 0,
Expand Down Expand Up @@ -237,6 +237,7 @@ pub fn final_response_to_one_chunk_stream(
id: None,
event: None,
comment: None,
error_code: None,
};
Box::pin(futures::stream::once(async move { annotated }))
}
Expand Down
8 changes: 4 additions & 4 deletions lib/llm/src/engines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,12 +160,12 @@ impl
// we are returning characters not tokens, so there will be some postprocessing overhead
tokio::time::sleep(*TOKEN_ECHO_DELAY).await;
let response = deltas.create_choice(0, Some(c.to_string()), None, None);
yield Annotated{ id: Some(id.to_string()), data: Some(response), event: None, comment: None };
yield Annotated{ id: Some(id.to_string()), data: Some(response), event: None, comment: None, error_code: None };
id += 1;
}

let response = deltas.create_choice(0, None, Some(dynamo_async_openai::types::FinishReason::Stop), None);
yield Annotated { id: Some(id.to_string()), data: Some(response), event: None, comment: None };
yield Annotated { id: Some(id.to_string()), data: Some(response), event: None, comment: None, error_code: None };
};

Ok(ResponseStream::new(Box::pin(output), ctx))
Expand Down Expand Up @@ -193,11 +193,11 @@ impl
for c in chars_string.chars() {
tokio::time::sleep(*TOKEN_ECHO_DELAY).await;
let response = deltas.create_choice(0, Some(c.to_string()), None, None);
yield Annotated{ id: Some(id.to_string()), data: Some(response), event: None, comment: None };
yield Annotated{ id: Some(id.to_string()), data: Some(response), event: None, comment: None, error_code: None };
id += 1;
}
let response = deltas.create_choice(0, None, Some(dynamo_async_openai::types::CompletionFinishReason::Stop), None);
yield Annotated { id: Some(id.to_string()), data: Some(response), event: None, comment: None };
yield Annotated { id: Some(id.to_string()), data: Some(response), event: None, comment: None, error_code: None };

};

Expand Down
56 changes: 41 additions & 15 deletions lib/llm/src/http/service/openai.rs
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,12 @@ fn extract_backend_error_if_present<T: serde::Serialize>(
return Some((message, code));
}

return Some((comment_str, StatusCode::INTERNAL_SERVER_ERROR));
// Fallback: use error_code from Annotated if present, otherwise 500
let status_code = event
.error_code
.and_then(|c| StatusCode::from_u16(c).ok())
.unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
return Some((comment_str, status_code));
}

// Check if the data payload itself contains an error structure with code >= 400
Expand Down Expand Up @@ -799,7 +804,12 @@ fn extract_backend_error_if_present<T: serde::Serialize>(
// Comments present with no data AND no event type indicates error
// (events with event types like "request_id" or "event.dynamo.test.sentinel" are annotations)
if event.data.is_none() && event.event.is_none() {
return Some((comment_str, StatusCode::INTERNAL_SERVER_ERROR));
// Use error_code from Annotated if present, otherwise 500
let status_code = event
.error_code
.and_then(|c| StatusCode::from_u16(c).ok())
.unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
return Some((comment_str, status_code));
}
}

Expand Down Expand Up @@ -1001,16 +1011,24 @@ async fn chat_completions(
let response =
NvCreateChatCompletionResponse::from_annotated_stream(stream, parsing_options.clone())
.await
.map_err(|e| {
.map_err(|(code, msg)| {
tracing::error!(
request_id,
"Failed to parse chat completion response: {:?}",
e
"Failed to fold chat completions stream for: {:?}",
msg
);
ErrorMessage::internal_server_error(&format!(
"Failed to parse chat completion response: {}",
e
))
// Use the error code if provided, otherwise default to 500
if let Some(code) = code {
ErrorMessage::from_http_error(HttpError {
code,
message: format!("Failed to fold chat completions stream: {}", msg),
})
} else {
ErrorMessage::internal_server_error(&format!(
"Failed to fold chat completions stream: {}",
msg
))
}
})?;

inflight_guard.mark_ok();
Expand Down Expand Up @@ -1226,16 +1244,24 @@ async fn responses(
let response =
NvCreateChatCompletionResponse::from_annotated_stream(stream, parsing_options.clone())
.await
.map_err(|e| {
.map_err(|(code, msg)| {
tracing::error!(
request_id,
"Failed to fold chat completions stream for: {:?}",
e
msg
);
ErrorMessage::internal_server_error(&format!(
"Failed to fold chat completions stream: {}",
e
))
// Use the error code if provided, otherwise default to 500
if let Some(code) = code {
ErrorMessage::from_http_error(HttpError {
code,
message: format!("Failed to fold chat completions stream: {}", msg),
})
} else {
ErrorMessage::internal_server_error(&format!(
"Failed to fold chat completions stream: {}",
msg
))
}
})?;

// Convert NvCreateChatCompletionResponse --> NvResponse
Expand Down
1 change: 1 addition & 0 deletions lib/llm/src/preprocessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,7 @@ impl OpenAIPreprocessor {
data,
event: Some(ANNOTATION_LLM_METRICS.to_string()),
comment: annotation.comment,
error_code: None,
};

tracing::trace!(
Expand Down
1 change: 1 addition & 0 deletions lib/llm/src/protocols/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ where
id: value.id,
event: value.event,
comment: value.comments,
error_code: None,
})
}
}
Expand Down
32 changes: 20 additions & 12 deletions lib/llm/src/protocols/openai/chat_completions/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ pub struct DeltaAggregator {
system_fingerprint: Option<String>,
/// Map of incremental response choices, keyed by index.
choices: HashMap<u32, DeltaChoice>,
/// Optional error message if an error occurs during aggregation.
error: Option<String>,
/// Optional error (code, message) if an error occurs during aggregation.
error: Option<(Option<u16>, String)>,
/// Optional service tier information for the response.
service_tier: Option<dynamo_async_openai::types::ServiceTierResponse>,
/// Aggregated nvext field from stream responses
Expand Down Expand Up @@ -111,18 +111,18 @@ impl DeltaAggregator {
///
/// # Returns
/// * `Ok(NvCreateChatCompletionResponse)` if aggregation is successful.
/// * `Err(String)` if an error occurs during processing.
/// * `Err((Option<u16>, String))` if an error occurs during processing, with optional HTTP status code.
pub async fn apply(
stream: impl Stream<Item = Annotated<NvCreateChatCompletionStreamResponse>>,
_parsing_options: ParsingOptions,
) -> Result<NvCreateChatCompletionResponse, String> {
) -> Result<NvCreateChatCompletionResponse, (Option<u16>, String)> {
let aggregator = stream
.fold(DeltaAggregator::new(), |mut aggregator, delta| async move {
// Attempt to unwrap the delta, capturing any errors.
let delta = match delta.ok() {
Ok(delta) => delta,
Err(error) => {
aggregator.error = Some(error);
Err((code, error)) => {
aggregator.error = Some((code, error));
return aggregator;
}
};
Expand Down Expand Up @@ -312,11 +312,11 @@ pub trait ChatCompletionAggregator {
///
/// # Returns
/// * `Ok(NvCreateChatCompletionResponse)` if aggregation succeeds.
/// * `Err(String)` if an error occurs.
/// * `Err((Option<u16>, String))` if an error occurs, with optional HTTP status code.
async fn from_annotated_stream(
stream: impl Stream<Item = Annotated<NvCreateChatCompletionStreamResponse>>,
parsing_options: ParsingOptions,
) -> Result<NvCreateChatCompletionResponse, String>;
) -> Result<NvCreateChatCompletionResponse, (Option<u16>, String)>;

/// Converts an SSE stream into a [`NvCreateChatCompletionResponse`].
///
Expand All @@ -325,25 +325,25 @@ pub trait ChatCompletionAggregator {
///
/// # Returns
/// * `Ok(NvCreateChatCompletionResponse)` if aggregation succeeds.
/// * `Err(String)` if an error occurs.
/// * `Err((Option<u16>, String))` if an error occurs, with optional HTTP status code.
async fn from_sse_stream(
stream: DataStream<Result<Message, SseCodecError>>,
parsing_options: ParsingOptions,
) -> Result<NvCreateChatCompletionResponse, String>;
) -> Result<NvCreateChatCompletionResponse, (Option<u16>, String)>;
}

impl ChatCompletionAggregator for dynamo_async_openai::types::CreateChatCompletionResponse {
async fn from_annotated_stream(
stream: impl Stream<Item = Annotated<NvCreateChatCompletionStreamResponse>>,
parsing_options: ParsingOptions,
) -> Result<NvCreateChatCompletionResponse, String> {
) -> Result<NvCreateChatCompletionResponse, (Option<u16>, String)> {
DeltaAggregator::apply(stream, parsing_options).await
}

async fn from_sse_stream(
stream: DataStream<Result<Message, SseCodecError>>,
parsing_options: ParsingOptions,
) -> Result<NvCreateChatCompletionResponse, String> {
) -> Result<NvCreateChatCompletionResponse, (Option<u16>, String)> {
let stream = convert_sse_stream::<NvCreateChatCompletionStreamResponse>(stream);
NvCreateChatCompletionResponse::from_annotated_stream(stream, parsing_options).await
}
Expand Down Expand Up @@ -428,6 +428,7 @@ mod tests {
id: Some("test_id".to_string()),
event: None,
comment: None,
error_code: None,
}
}

Expand Down Expand Up @@ -652,6 +653,7 @@ mod tests {
id: Some("test_id".to_string()),
event: None,
comment: None,
error_code: None,
};
let stream = Box::pin(stream::iter(vec![annotated_delta]));

Expand Down Expand Up @@ -711,6 +713,7 @@ mod tests {
id: Some("test_id".to_string()),
event: None,
comment: None,
error_code: None,
};
let stream = Box::pin(stream::iter(vec![annotated_delta]));

Expand Down Expand Up @@ -753,6 +756,7 @@ mod tests {
id: Some("test_id".to_string()),
event: None,
comment: None,
error_code: None,
};
let stream = Box::pin(stream::iter(vec![annotated_delta]));

Expand Down Expand Up @@ -795,6 +799,7 @@ mod tests {
id: Some("test_id".to_string()),
event: None,
comment: None,
error_code: None,
};
let stream = Box::pin(stream::iter(vec![annotated_delta]));

Expand Down Expand Up @@ -835,6 +840,7 @@ mod tests {
id: Some("test_id".to_string()),
event: None,
comment: None,
error_code: None,
};
let stream = Box::pin(stream::iter(vec![annotated_delta]));

Expand Down Expand Up @@ -879,6 +885,7 @@ mod tests {
id: Some("test_id".to_string()),
event: None,
comment: None,
error_code: None,
};
let stream = Box::pin(stream::iter(vec![annotated_delta]));

Expand Down Expand Up @@ -921,6 +928,7 @@ mod tests {
id: Some("test_id".to_string()),
event: None,
comment: None,
error_code: None,
};
let stream = Box::pin(stream::iter(vec![annotated_delta]));

Expand Down
Loading
Loading