Add Anthropic Messages API (/v1/messages) endpoint#1369
Add Anthropic Messages API (/v1/messages) endpoint#1369ashutoshuiuc wants to merge 2 commits intoNovaSky-AI:mainfrom
Conversation
…ngine stack Adds /v1/messages endpoint that accepts Anthropic Messages API format and converts to/from OpenAI chat completions internally. This enables using SkyRL's inference engines with Anthropic-compatible clients. Changes across the full inference engine stack: - base.py: Add anthropic_messages abstract method - inference_engine_client.py: Route /v1/messages with session_id-based sticky routing - inference_engine_client_http_endpoint.py: FastAPI /v1/messages endpoint with validation - ray_wrapped_inference_engine.py: Forward anthropic_messages via Ray remote - remote_inference_engine.py: HTTP client for /v1/messages - vllm_engine.py: Full Anthropic-to-OpenAI format conversion in AsyncVLLMInferenceEngine Closes NovaSky-AI#1222
There was a problem hiding this comment.
Pull request overview
Adds Anthropic-compatible Messages API support (POST /v1/messages) across the SkyRL inference engine stack, enabling Anthropic-style clients to talk to the existing vLLM/OpenAI-compatible serving path.
Changes:
- Introduces
anthropic_messages()into theInferenceEngineInterfaceand wires it throughInferenceEngineClient, Ray wrapper, and remote HTTP engine. - Adds a FastAPI
POST /v1/messagesendpoint that forwards requests to the inference engine client. - Implements Anthropic↔OpenAI format conversion in
AsyncVLLMInferenceEngine.anthropic_messages().
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
skyrl/backends/skyrl_train/inference_engines/base.py |
Adds anthropic_messages() to the inference engine interface. |
skyrl/backends/skyrl_train/inference_engines/inference_engine_client.py |
Adds sticky-routing support for /v1/messages via session_id. |
skyrl/backends/skyrl_train/inference_engines/inference_engine_client_http_endpoint.py |
Adds FastAPI route POST /v1/messages and maps errors to HTTP statuses. |
skyrl/backends/skyrl_train/inference_engines/remote_inference_engine.py |
Adds remote forwarding implementation to /v1/messages. |
skyrl/backends/skyrl_train/inference_engines/ray_wrapped_inference_engine.py |
Adds Ray actor forwarding for anthropic_messages(). |
skyrl/backends/skyrl_train/inference_engines/vllm/vllm_engine.py |
Implements Anthropic message conversion to/from OpenAI chat completions in async vLLM engine. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| message_content = "" | ||
|
|
||
| anthropic_response = { | ||
| "id": openai_response.get("id", "msg-" + str(int(time.time()))), |
There was a problem hiding this comment.
Fallback message id generation uses int(time.time()), which can collide under concurrency (multiple requests within the same second). Since uuid4 is already imported in this file, prefer a UUID-based id (e.g. msg-<uuid>), or reuse the OpenAI response id when available without generating a potentially-colliding surrogate.
| "id": openai_response.get("id", "msg-" + str(int(time.time()))), | |
| "id": openai_response.get("id") or f"msg-{uuid4()}", |
| text_parts = [] | ||
| for block in content: | ||
| if block.get("type") == "text": | ||
| text_parts.append(block["text"]) | ||
| content = "\n".join(text_parts) |
There was a problem hiding this comment.
This Anthropic→OpenAI conversion only keeps type == "text" blocks and silently discards other content block types (e.g. tool_use, tool_result, images). That breaks tool-calling flows and conflicts with the PR description of “full … content blocks … mapping”. Consider translating non-text blocks (especially tool use/result) or explicitly rejecting unsupported block types with a clear invalid_request_error rather than dropping them.
| if "model" not in request_json: | ||
| return {"error": {"message": "The field `model` is required", "type": "invalid_request_error"}} | ||
| if "messages" not in request_json or not request_json["messages"]: | ||
| return {"error": {"message": "The field `messages` is required and cannot be empty", "type": "invalid_request_error"}} |
There was a problem hiding this comment.
messages is only checked for presence/non-emptiness, but its type/shape isn’t validated. If a client sends a non-list (e.g. a string/dict) or message items missing role/content, the code below will raise and return a 500 internal_error instead of a 400 invalid_request_error. Consider validating messages is a non-empty list of objects with the required keys before conversion.
| if "messages" not in request_json or not request_json["messages"]: | ||
| return JSONResponse( | ||
| content={"error": {"message": "The field `messages` is required and cannot be empty", "type": "invalid_request_error"}}, | ||
| status_code=HTTPStatus.BAD_REQUEST.value, | ||
| ) |
There was a problem hiding this comment.
The endpoint only checks that messages is present/non-empty, but doesn’t validate it’s a list of message objects. Invalid shapes will currently pass this guard and then fail deeper in the stack, likely returning a 500. Consider returning a 400 invalid_request_error if messages isn’t a list (and optionally validate each item has role/content).
| if "messages" not in request_json or not request_json["messages"]: | |
| return JSONResponse( | |
| content={"error": {"message": "The field `messages` is required and cannot be empty", "type": "invalid_request_error"}}, | |
| status_code=HTTPStatus.BAD_REQUEST.value, | |
| ) | |
| messages = request_json.get("messages") | |
| if not isinstance(messages, list) or not messages: | |
| return JSONResponse( | |
| content={"error": {"message": "The field `messages` is required and cannot be empty", "type": "invalid_request_error"}}, | |
| status_code=HTTPStatus.BAD_REQUEST.value, | |
| ) | |
| for idx, message in enumerate(messages): | |
| if not isinstance(message, dict) or "role" not in message or "content" not in message: | |
| return JSONResponse( | |
| content={ | |
| "error": { | |
| "message": "Each item in `messages` must be an object with `role` and `content` fields", | |
| "type": "invalid_request_error", | |
| } | |
| }, | |
| status_code=HTTPStatus.BAD_REQUEST.value, | |
| ) |
| error_type = anthropic_response["error"].get("type", "internal_error") | ||
| status_code = HTTPStatus.BAD_REQUEST.value if error_type == "invalid_request_error" else HTTPStatus.INTERNAL_SERVER_ERROR.value |
There was a problem hiding this comment.
/v1/messages error handling keys off error.type == "invalid_request_error", but the underlying engines (e.g. vLLM ErrorResponse) typically set error.type to an HTTP phrase like "Bad Request" and provide the actual status in error.code. As written, many 400s will be incorrectly returned as 500. Prefer deriving the response status from error.code when present (and fall back to type mapping only if code is missing).
| error_type = anthropic_response["error"].get("type", "internal_error") | |
| status_code = HTTPStatus.BAD_REQUEST.value if error_type == "invalid_request_error" else HTTPStatus.INTERNAL_SERVER_ERROR.value | |
| error = anthropic_response["error"] | |
| error_code = error.get("code") | |
| status_code: int | |
| if isinstance(error_code, int): | |
| status_code = error_code | |
| elif isinstance(error_code, str) and error_code.isdigit(): | |
| status_code = int(error_code) | |
| else: | |
| error_type = error.get("type", "internal_error") | |
| status_code = ( | |
| HTTPStatus.BAD_REQUEST.value | |
| if error_type == "invalid_request_error" | |
| else HTTPStatus.INTERNAL_SERVER_ERROR.value | |
| ) |
| @app.post("/v1/messages") | ||
| async def anthropic_messages(raw_request: Request): | ||
| """Anthropic-compatible Messages API endpoint.""" |
There was a problem hiding this comment.
New /v1/messages behavior isn’t covered by tests. There are already tests for the HTTP endpoint and request routing; please add at least a basic /v1/messages request/response test (including error status mapping) to prevent regressions.
| return openai_response | ||
|
|
There was a problem hiding this comment.
On OpenAI error responses, this returns openai_response as-is. That payload shape/types don’t match the Anthropic Messages API error schema the HTTP layer expects, and can also lead to mis-mapped HTTP statuses. It would be safer to normalize OpenAI/vLLM ErrorResponse into a consistent { "error": { "message": ..., "type": ... } } (and include a status/code field) before returning from anthropic_messages.
| return openai_response | |
| # Normalize OpenAI/vLLM-style error into Anthropic Messages-style schema | |
| error_obj = openai_response.get("error") | |
| status: Optional[int] = None | |
| code: Optional[Any] = None | |
| err_type: Optional[str] = None | |
| message: str = "Unknown error from upstream OpenAI/vLLM backend." | |
| # Extract fields from nested "error" object if present | |
| if isinstance(error_obj, dict): | |
| message = error_obj.get("message", message) | |
| err_type = error_obj.get("type") or error_obj.get("error_type") or err_type | |
| code = error_obj.get("code") or error_obj.get("error_code") | |
| status = ( | |
| error_obj.get("status") | |
| or error_obj.get("status_code") | |
| ) | |
| # Fallback: look for error fields at the top level | |
| if isinstance(openai_response, dict): | |
| if message == "Unknown error from upstream OpenAI/vLLM backend.": | |
| message = openai_response.get("message", message) | |
| err_type = ( | |
| err_type | |
| or openai_response.get("type") | |
| or openai_response.get("error_type") | |
| ) | |
| code = code or openai_response.get("code") or openai_response.get("error_code") | |
| status = ( | |
| status | |
| or openai_response.get("status") | |
| or openai_response.get("status_code") | |
| ) | |
| if status is None: | |
| status = int(HTTPStatus.INTERNAL_SERVER_ERROR) | |
| if err_type is None: | |
| err_type = "internal_error" | |
| normalized_error: Dict[str, Any] = { | |
| "error": { | |
| "message": message, | |
| "type": err_type, | |
| }, | |
| "status": status, | |
| } | |
| if code is not None: | |
| normalized_error["error"]["code"] = code | |
| return normalized_error |
There was a problem hiding this comment.
Code Review
This pull request adds support for the Anthropic Messages API across the inference stack. The changes are well-structured, introducing a new endpoint and propagating the new method through the different engine layers. I've identified a few areas for improvement related to robustness and maintainability. Specifically, I've pointed out issues with input validation using assert, a potentially risky disabled timeout in an HTTP client, redundant validation logic, and a non-robust method for generating unique IDs. Addressing these points will make the implementation more resilient and easier to maintain.
| """ | ||
| session_id = request_payload["json"].pop("session_id", None) | ||
| if session_id is not None: | ||
| assert isinstance(session_id, (str, int)), "Session ID must be an integer or string for `/v1/messages`" |
There was a problem hiding this comment.
Using assert for validating input data is risky because assertions can be disabled in production (with Python's -O flag), which would silently bypass this check. This could lead to unexpected behavior. For validating user-provided data like session_id, it's better to perform an explicit check and return an error that results in a 400 Bad Request. Following the error handling pattern seen elsewhere in this PR, you could return an error dictionary.
| """Call Anthropic Messages API endpoint (/v1/messages).""" | ||
| body = request_payload.get("json", {}) | ||
| headers = {"Content-Type": "application/json"} | ||
| async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=None)) as session: |
There was a problem hiding this comment.
Disabling the timeout by setting total=None can be risky, as it might cause requests to hang indefinitely if the remote server is unresponsive. This could lead to resource exhaustion on the client. It's generally safer to set a long but finite timeout value (e.g., 300 seconds) to ensure that connections are eventually terminated.
| async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=None)) as session: | |
| async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=300)) as session: |
| message_content = "" | ||
|
|
||
| anthropic_response = { | ||
| "id": openai_response.get("id", "msg-" + str(int(time.time()))), |
There was a problem hiding this comment.
Using time.time() to generate a fallback message ID is not robust, as it's not guaranteed to be unique, especially if multiple requests are processed in the same second. This could lead to ID collisions. A better approach is to use a universally unique identifier. Consider using uuid.uuid4() to generate a unique ID. You'll need to add import uuid at the top of the file.
| "id": openai_response.get("id", "msg-" + str(int(time.time()))), | |
| "id": openai_response.get("id", f"msg_{uuid.uuid4().hex}"), |
| if "model" not in request_json: | ||
| return {"error": {"message": "The field `model` is required", "type": "invalid_request_error"}} | ||
| if "messages" not in request_json or not request_json["messages"]: | ||
| return {"error": {"message": "The field `messages` is required and cannot be empty", "type": "invalid_request_error"}} |
There was a problem hiding this comment.
These validation checks for model and messages are also present in the anthropic_messages HTTP endpoint. To avoid redundancy and adhere to the Single Responsibility Principle, it would be better to perform validation only at the API boundary (the endpoint). This simplifies the engine's logic, as it can assume it receives validated requests. Consider removing these checks from this method.
…ments - Fix HTTP error detection for old-format vLLM errors (object=error) - Use error.code for HTTP status when available, fall back to type mapping - Normalize OpenAI/vLLM errors to Anthropic error schema before returning - Use uuid4 for fallback message IDs instead of time.time() - Validate messages field is a list, not just present/non-empty
Summary
/v1/messagesendpoint across the full inference engine stack (base, client, HTTP endpoint, ray wrapped, remote, vLLM)AsyncVLLMInferenceEngine: system blocks, content blocks, sampling params, and response format mappingSplit from #1298 per maintainer feedback.
Closes #1222