Skip to content

Add Anthropic Messages API (/v1/messages) endpoint#1369

Open
ashutoshuiuc wants to merge 2 commits intoNovaSky-AI:mainfrom
ashutoshuiuc:pr/anthropic-messages-api
Open

Add Anthropic Messages API (/v1/messages) endpoint#1369
ashutoshuiuc wants to merge 2 commits intoNovaSky-AI:mainfrom
ashutoshuiuc:pr/anthropic-messages-api

Conversation

@ashutoshuiuc
Copy link
Copy Markdown

@ashutoshuiuc ashutoshuiuc commented Mar 23, 2026

Summary

  • Adds /v1/messages endpoint across the full inference engine stack (base, client, HTTP endpoint, ray wrapped, remote, vLLM)
  • Full Anthropic-to-OpenAI format conversion in AsyncVLLMInferenceEngine: system blocks, content blocks, sampling params, and response format mapping
  • Session-based sticky routing for conversation continuity

Split from #1298 per maintainer feedback.

Closes #1222


Open with Devin

…ngine stack

Adds /v1/messages endpoint that accepts Anthropic Messages API format and
converts to/from OpenAI chat completions internally. This enables using
SkyRL's inference engines with Anthropic-compatible clients.

Changes across the full inference engine stack:
- base.py: Add anthropic_messages abstract method
- inference_engine_client.py: Route /v1/messages with session_id-based sticky routing
- inference_engine_client_http_endpoint.py: FastAPI /v1/messages endpoint with validation
- ray_wrapped_inference_engine.py: Forward anthropic_messages via Ray remote
- remote_inference_engine.py: HTTP client for /v1/messages
- vllm_engine.py: Full Anthropic-to-OpenAI format conversion in AsyncVLLMInferenceEngine

Closes NovaSky-AI#1222
Copilot AI review requested due to automatic review settings March 23, 2026 13:38
devin-ai-integration[bot]

This comment was marked as resolved.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds Anthropic-compatible Messages API support (POST /v1/messages) across the SkyRL inference engine stack, enabling Anthropic-style clients to talk to the existing vLLM/OpenAI-compatible serving path.

Changes:

  • Introduces anthropic_messages() into the InferenceEngineInterface and wires it through InferenceEngineClient, Ray wrapper, and remote HTTP engine.
  • Adds a FastAPI POST /v1/messages endpoint that forwards requests to the inference engine client.
  • Implements Anthropic↔OpenAI format conversion in AsyncVLLMInferenceEngine.anthropic_messages().

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
skyrl/backends/skyrl_train/inference_engines/base.py Adds anthropic_messages() to the inference engine interface.
skyrl/backends/skyrl_train/inference_engines/inference_engine_client.py Adds sticky-routing support for /v1/messages via session_id.
skyrl/backends/skyrl_train/inference_engines/inference_engine_client_http_endpoint.py Adds FastAPI route POST /v1/messages and maps errors to HTTP statuses.
skyrl/backends/skyrl_train/inference_engines/remote_inference_engine.py Adds remote forwarding implementation to /v1/messages.
skyrl/backends/skyrl_train/inference_engines/ray_wrapped_inference_engine.py Adds Ray actor forwarding for anthropic_messages().
skyrl/backends/skyrl_train/inference_engines/vllm/vllm_engine.py Implements Anthropic message conversion to/from OpenAI chat completions in async vLLM engine.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

message_content = ""

anthropic_response = {
"id": openai_response.get("id", "msg-" + str(int(time.time()))),
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fallback message id generation uses int(time.time()), which can collide under concurrency (multiple requests within the same second). Since uuid4 is already imported in this file, prefer a UUID-based id (e.g. msg-<uuid>), or reuse the OpenAI response id when available without generating a potentially-colliding surrogate.

Suggested change
"id": openai_response.get("id", "msg-" + str(int(time.time()))),
"id": openai_response.get("id") or f"msg-{uuid4()}",

Copilot uses AI. Check for mistakes.
Comment on lines +679 to +683
text_parts = []
for block in content:
if block.get("type") == "text":
text_parts.append(block["text"])
content = "\n".join(text_parts)
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This Anthropic→OpenAI conversion only keeps type == "text" blocks and silently discards other content block types (e.g. tool_use, tool_result, images). That breaks tool-calling flows and conflicts with the PR description of “full … content blocks … mapping”. Consider translating non-text blocks (especially tool use/result) or explicitly rejecting unsupported block types with a clear invalid_request_error rather than dropping them.

Copilot uses AI. Check for mistakes.
Comment on lines +654 to +657
if "model" not in request_json:
return {"error": {"message": "The field `model` is required", "type": "invalid_request_error"}}
if "messages" not in request_json or not request_json["messages"]:
return {"error": {"message": "The field `messages` is required and cannot be empty", "type": "invalid_request_error"}}
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

messages is only checked for presence/non-emptiness, but its type/shape isn’t validated. If a client sends a non-list (e.g. a string/dict) or message items missing role/content, the code below will raise and return a 500 internal_error instead of a 400 invalid_request_error. Consider validating messages is a non-empty list of objects with the required keys before conversion.

Copilot uses AI. Check for mistakes.
Comment on lines +319 to +323
if "messages" not in request_json or not request_json["messages"]:
return JSONResponse(
content={"error": {"message": "The field `messages` is required and cannot be empty", "type": "invalid_request_error"}},
status_code=HTTPStatus.BAD_REQUEST.value,
)
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The endpoint only checks that messages is present/non-empty, but doesn’t validate it’s a list of message objects. Invalid shapes will currently pass this guard and then fail deeper in the stack, likely returning a 500. Consider returning a 400 invalid_request_error if messages isn’t a list (and optionally validate each item has role/content).

Suggested change
if "messages" not in request_json or not request_json["messages"]:
return JSONResponse(
content={"error": {"message": "The field `messages` is required and cannot be empty", "type": "invalid_request_error"}},
status_code=HTTPStatus.BAD_REQUEST.value,
)
messages = request_json.get("messages")
if not isinstance(messages, list) or not messages:
return JSONResponse(
content={"error": {"message": "The field `messages` is required and cannot be empty", "type": "invalid_request_error"}},
status_code=HTTPStatus.BAD_REQUEST.value,
)
for idx, message in enumerate(messages):
if not isinstance(message, dict) or "role" not in message or "content" not in message:
return JSONResponse(
content={
"error": {
"message": "Each item in `messages` must be an object with `role` and `content` fields",
"type": "invalid_request_error",
}
},
status_code=HTTPStatus.BAD_REQUEST.value,
)

Copilot uses AI. Check for mistakes.
Comment on lines +332 to +333
error_type = anthropic_response["error"].get("type", "internal_error")
status_code = HTTPStatus.BAD_REQUEST.value if error_type == "invalid_request_error" else HTTPStatus.INTERNAL_SERVER_ERROR.value
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/v1/messages error handling keys off error.type == "invalid_request_error", but the underlying engines (e.g. vLLM ErrorResponse) typically set error.type to an HTTP phrase like "Bad Request" and provide the actual status in error.code. As written, many 400s will be incorrectly returned as 500. Prefer deriving the response status from error.code when present (and fall back to type mapping only if code is missing).

Suggested change
error_type = anthropic_response["error"].get("type", "internal_error")
status_code = HTTPStatus.BAD_REQUEST.value if error_type == "invalid_request_error" else HTTPStatus.INTERNAL_SERVER_ERROR.value
error = anthropic_response["error"]
error_code = error.get("code")
status_code: int
if isinstance(error_code, int):
status_code = error_code
elif isinstance(error_code, str) and error_code.isdigit():
status_code = int(error_code)
else:
error_type = error.get("type", "internal_error")
status_code = (
HTTPStatus.BAD_REQUEST.value
if error_type == "invalid_request_error"
else HTTPStatus.INTERNAL_SERVER_ERROR.value
)

Copilot uses AI. Check for mistakes.
Comment on lines +303 to +305
@app.post("/v1/messages")
async def anthropic_messages(raw_request: Request):
"""Anthropic-compatible Messages API endpoint."""
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New /v1/messages behavior isn’t covered by tests. There are already tests for the HTTP endpoint and request routing; please add at least a basic /v1/messages request/response test (including error status mapping) to prevent regressions.

Copilot uses AI. Check for mistakes.
Comment on lines +703 to +704
return openai_response

Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On OpenAI error responses, this returns openai_response as-is. That payload shape/types don’t match the Anthropic Messages API error schema the HTTP layer expects, and can also lead to mis-mapped HTTP statuses. It would be safer to normalize OpenAI/vLLM ErrorResponse into a consistent { "error": { "message": ..., "type": ... } } (and include a status/code field) before returning from anthropic_messages.

Suggested change
return openai_response
# Normalize OpenAI/vLLM-style error into Anthropic Messages-style schema
error_obj = openai_response.get("error")
status: Optional[int] = None
code: Optional[Any] = None
err_type: Optional[str] = None
message: str = "Unknown error from upstream OpenAI/vLLM backend."
# Extract fields from nested "error" object if present
if isinstance(error_obj, dict):
message = error_obj.get("message", message)
err_type = error_obj.get("type") or error_obj.get("error_type") or err_type
code = error_obj.get("code") or error_obj.get("error_code")
status = (
error_obj.get("status")
or error_obj.get("status_code")
)
# Fallback: look for error fields at the top level
if isinstance(openai_response, dict):
if message == "Unknown error from upstream OpenAI/vLLM backend.":
message = openai_response.get("message", message)
err_type = (
err_type
or openai_response.get("type")
or openai_response.get("error_type")
)
code = code or openai_response.get("code") or openai_response.get("error_code")
status = (
status
or openai_response.get("status")
or openai_response.get("status_code")
)
if status is None:
status = int(HTTPStatus.INTERNAL_SERVER_ERROR)
if err_type is None:
err_type = "internal_error"
normalized_error: Dict[str, Any] = {
"error": {
"message": message,
"type": err_type,
},
"status": status,
}
if code is not None:
normalized_error["error"]["code"] = code
return normalized_error

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request adds support for the Anthropic Messages API across the inference stack. The changes are well-structured, introducing a new endpoint and propagating the new method through the different engine layers. I've identified a few areas for improvement related to robustness and maintainability. Specifically, I've pointed out issues with input validation using assert, a potentially risky disabled timeout in an HTTP client, redundant validation logic, and a non-robust method for generating unique IDs. Addressing these points will make the implementation more resilient and easier to maintain.

"""
session_id = request_payload["json"].pop("session_id", None)
if session_id is not None:
assert isinstance(session_id, (str, int)), "Session ID must be an integer or string for `/v1/messages`"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Using assert for validating input data is risky because assertions can be disabled in production (with Python's -O flag), which would silently bypass this check. This could lead to unexpected behavior. For validating user-provided data like session_id, it's better to perform an explicit check and return an error that results in a 400 Bad Request. Following the error handling pattern seen elsewhere in this PR, you could return an error dictionary.

"""Call Anthropic Messages API endpoint (/v1/messages)."""
body = request_payload.get("json", {})
headers = {"Content-Type": "application/json"}
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=None)) as session:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Disabling the timeout by setting total=None can be risky, as it might cause requests to hang indefinitely if the remote server is unresponsive. This could lead to resource exhaustion on the client. It's generally safer to set a long but finite timeout value (e.g., 300 seconds) to ensure that connections are eventually terminated.

Suggested change
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=None)) as session:
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=300)) as session:

message_content = ""

anthropic_response = {
"id": openai_response.get("id", "msg-" + str(int(time.time()))),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Using time.time() to generate a fallback message ID is not robust, as it's not guaranteed to be unique, especially if multiple requests are processed in the same second. This could lead to ID collisions. A better approach is to use a universally unique identifier. Consider using uuid.uuid4() to generate a unique ID. You'll need to add import uuid at the top of the file.

Suggested change
"id": openai_response.get("id", "msg-" + str(int(time.time()))),
"id": openai_response.get("id", f"msg_{uuid.uuid4().hex}"),

Comment on lines +654 to +657
if "model" not in request_json:
return {"error": {"message": "The field `model` is required", "type": "invalid_request_error"}}
if "messages" not in request_json or not request_json["messages"]:
return {"error": {"message": "The field `messages` is required and cannot be empty", "type": "invalid_request_error"}}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

These validation checks for model and messages are also present in the anthropic_messages HTTP endpoint. To avoid redundancy and adhere to the Single Responsibility Principle, it would be better to perform validation only at the API boundary (the endpoint). This simplifies the engine's logic, as it can assume it receives validated requests. Consider removing these checks from this method.

…ments

- Fix HTTP error detection for old-format vLLM errors (object=error)
- Use error.code for HTTP status when available, fall back to type mapping
- Normalize OpenAI/vLLM errors to Anthropic error schema before returning
- Use uuid4 for fallback message IDs instead of time.time()
- Validate messages field is a list, not just present/non-empty
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[train][agent] Add Anthropic Messages API endpoint, fix LoRA weight swap, and improve transition-to-training-data logic

2 participants