Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions examples/01_standalone_sdk/41_acp_agent_remote_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""Example: ACPAgent with TCP transport.

This example shows how to connect ACPAgent to an already-running ACP server
over TCP, instead of spawning one as a subprocess.

Note: TCP is a *custom transport* — the ACP spec currently defines stdio
(and draft Streamable HTTP). The remote server must speak newline-delimited
JSON-RPC over a TCP socket (stdio-style framing).

Prerequisites:
- An ACP-compatible server listening on TCP with newline-delimited
JSON-RPC (e.g. a wrapper around claude-code-acp)
- ACP_HOST env var (hostname, e.g. "acp-server.internal")
- ACP_PORT env var (port, default 4001)

Usage:
ACP_HOST=localhost ACP_PORT=4001 \
uv run python examples/01_standalone_sdk/41_acp_agent_remote_example.py
"""

import os

from openhands.sdk.agent import ACPAgent
from openhands.sdk.conversation import Conversation


acp_host = os.environ["ACP_HOST"]
acp_port = int(os.getenv("ACP_PORT", "4001"))

agent = ACPAgent(acp_host=acp_host, acp_port=acp_port)

try:
cwd = os.getcwd()
conversation = Conversation(agent=agent, workspace=cwd)

conversation.send_message(
"List the files in the current directory and write a short "
"summary of what you see into SUMMARY.md."
)
conversation.run()

# --- ask_agent: stateless side-question via fork_session ---
print("\n--- ask_agent ---")
response = conversation.ask_agent(
"Based on what you just saw, what is the most interesting file?"
)
print(f"ask_agent response: {response}")
finally:
agent.close()

print("Done!")
202 changes: 152 additions & 50 deletions openhands-sdk/openhands/sdk/agent/acp_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
UsageUpdate,
)
from acp.transports import default_environment
from pydantic import Field, PrivateAttr
from pydantic import Field, PrivateAttr, model_validator

from openhands.sdk.agent.base import AgentBase
from openhands.sdk.conversation.state import ConversationExecutionStatus
Expand Down Expand Up @@ -274,20 +274,79 @@ def on_connect(self, conn: Any) -> None: # noqa: ARG002


class ACPAgent(AgentBase):
"""Agent that delegates to an ACP-compatible subprocess server."""
"""Agent that delegates to an ACP (Agent Client Protocol) server.

Instead of calling an LLM directly, this agent communicates with an
ACP-compatible server (e.g. ``claude-code-acp``) via the ACP protocol.
The server manages its own LLM, tools, and execution lifecycle.

Two transport modes are supported (mutually exclusive):

**Subprocess mode** (standard ACP stdio transport) — the agent spawns
the ACP server as a child process and communicates via stdin/stdout
JSON-RPC::

agent = ACPAgent(acp_command=["npx", "-y", "claude-code-acp"])

**TCP mode** (custom transport) — the agent connects to an already-running
server over the network via a raw TCP socket. This is **not** a
transport defined by the ACP specification (which currently standardises
stdio and draft Streamable HTTP); the remote server must speak
newline-delimited JSON-RPC over TCP (stdio-style framing)::

agent = ACPAgent(acp_host="acp-server.internal", acp_port=4001)
"""

# Override required fields with ACP-appropriate defaults
llm: LLM = Field(default_factory=_make_dummy_llm)
tools: list[Tool] = Field(default_factory=list)
include_default_tools: list[str] = Field(default_factory=list)

# ACP-specific configuration
acp_command: list[str] = Field(
...,
acp_command: list[str] | None = Field(
default=None,
description=(
"Command to start the ACP server, e.g. ['npx', '-y', 'claude-code-acp']. "
"Mutually exclusive with acp_host."
),
)
acp_host: str | None = Field(
default=None,
description=(
"Command to start the ACP server, e.g. ['npx', '-y', 'claude-code-acp']"
"Hostname of a remote ACP server (custom TCP transport — "
"requires newline-delimited JSON-RPC). Mutually exclusive with acp_command."
),
)
acp_port: int | None = Field(
default=None,
description="Port of the remote ACP server. Required when acp_host is set.",
)
acp_tcp_connect_timeout: float = Field(
default=30.0,
description="Timeout in seconds for the TCP connection attempt.",
)

@model_validator(mode="before")
@classmethod
def _validate_transport(cls, data):
if not isinstance(data, dict):
return data
has_command = data.get("acp_command") is not None
has_host = data.get("acp_host") is not None
if has_command and has_host:
raise ValueError("acp_command and acp_host are mutually exclusive")
if not has_command and not has_host:
raise ValueError("Either acp_command or acp_host must be provided")
if has_host and data.get("acp_port") is None:
raise ValueError("acp_port is required when acp_host is set")
if not has_host and data.get("acp_port") is not None:
raise ValueError("acp_port requires acp_host to be set")
# Guard against empty command list which would blow up later
cmd = data.get("acp_command")
if cmd is not None and (not isinstance(cmd, list) or len(cmd) == 0):
raise ValueError("acp_command must be a non-empty list")
return data

acp_args: list[str] = Field(
default_factory=list,
description="Additional arguments for the ACP server command",
Expand All @@ -304,6 +363,7 @@ class ACPAgent(AgentBase):
_process: Any = PrivateAttr(default=None) # asyncio subprocess
_client: Any = PrivateAttr(default=None) # _OpenHandsACPBridge
_filtered_reader: Any = PrivateAttr(default=None) # StreamReader
_tcp_writer: Any = PrivateAttr(default=None) # asyncio.StreamWriter (TCP mode)
_closed: bool = PrivateAttr(default=False)
_working_dir: str = PrivateAttr(default="")

Expand Down Expand Up @@ -389,63 +449,95 @@ def init_state(
self._initialized = True

def _start_acp_server(self, state: ConversationState) -> None:
"""Start the ACP subprocess and initialize the session."""
"""Start the ACP connection and initialize the session.

In subprocess mode (``acp_command``), spawns the server as a child
process and communicates via stdin/stdout (standard ACP stdio
transport).

In TCP mode (``acp_host``/``acp_port``), connects to an
already-running server over a raw TCP socket. The remote server
must use the same newline-delimited JSON-RPC framing as the stdio
transport (one JSON object per line, no embedded newlines).
Non-JSON-RPC lines are filtered in both modes.
"""
client = _OpenHandsACPBridge()
client._llm_ref = self.llm
self._client = client

# Build environment: inherit current env + ACP extras
env = default_environment()
env.update(os.environ)
env.update(self.acp_env)
# Strip CLAUDECODE so nested Claude Code instances don't refuse to start
env.pop("CLAUDECODE", None)
working_dir = str(state.workspace.working_dir)

command = self.acp_command[0]
args = list(self.acp_command[1:]) + list(self.acp_args)
if self.acp_host is not None:
# --- TCP mode (custom transport) ---
timeout = self.acp_tcp_connect_timeout

working_dir = str(state.workspace.working_dir)
async def _init_tcp() -> tuple[Any, str, Any]:
reader, writer = await asyncio.wait_for(
asyncio.open_connection(self.acp_host, self.acp_port),
timeout=timeout,
)

async def _init() -> tuple[Any, Any, Any, str]:
# Spawn the subprocess directly so we can install a
# filtering reader that skips non-JSON-RPC lines some
# ACP servers (e.g. claude-code-acp v0.1.x) write to
# stdout.
process = await asyncio.create_subprocess_exec(
command,
*args,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env,
)
assert process.stdin is not None
assert process.stdout is not None

# Wrap the subprocess stdout in a filtering reader that
# only passes lines starting with '{' (JSON-RPC messages).
filtered_reader = asyncio.StreamReader()
asyncio.get_event_loop().create_task(
_filter_jsonrpc_lines(process.stdout, filtered_reader)
)
# Apply the same JSON-RPC line filter used in subprocess
# mode — TCP peers may also emit non-protocol output.
filtered_reader = asyncio.StreamReader()
asyncio.get_event_loop().create_task(
_filter_jsonrpc_lines(reader, filtered_reader)
)

conn = ClientSideConnection(
client,
process.stdin, # write to subprocess
filtered_reader, # read filtered output
)
conn = ClientSideConnection(
client,
writer, # input_stream (write to server)
filtered_reader, # output_stream (read from server)
)
await conn.initialize(protocol_version=1)
response = await conn.new_session(cwd=working_dir)
return conn, response.session_id, writer

# Initialize the protocol
await conn.initialize(protocol_version=1)
result = self._executor.run_async(_init_tcp)
self._conn, self._session_id, self._tcp_writer = result
else:
# --- Subprocess mode ---
# Build environment: inherit current env + ACP extras
env = default_environment()
env.update(os.environ)
env.update(self.acp_env)
# Strip CLAUDECODE so nested Claude Code instances don't refuse to start
env.pop("CLAUDECODE", None)

assert self.acp_command is not None
command = self.acp_command[0]
args = list(self.acp_command[1:]) + list(self.acp_args)

async def _init_subprocess() -> tuple[Any, Any, Any, str]:
process = await asyncio.create_subprocess_exec(
command,
*args,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env,
)
assert process.stdin is not None
assert process.stdout is not None

# Create a new session
response = await conn.new_session(cwd=working_dir)
session_id = response.session_id
filtered_reader = asyncio.StreamReader()
asyncio.get_event_loop().create_task(
_filter_jsonrpc_lines(process.stdout, filtered_reader)
)

conn = ClientSideConnection(
client,
process.stdin,
filtered_reader,
)

await conn.initialize(protocol_version=1)
response = await conn.new_session(cwd=working_dir)
return conn, process, filtered_reader, response.session_id

return conn, process, filtered_reader, session_id
result = self._executor.run_async(_init_subprocess)
self._conn, self._process, self._filtered_reader, self._session_id = result

result = self._executor.run_async(_init)
self._conn, self._process, self._filtered_reader, self._session_id = result
self._working_dir = working_dir

def step(
Expand Down Expand Up @@ -582,6 +674,16 @@ def _cleanup(self) -> None:
logger.debug("Error closing ACP connection: %s", e)
self._conn = None

# Close TCP writer if in network mode
if self._tcp_writer is not None:
try:
self._tcp_writer.close()
if self._executor is not None:
self._executor.run_async(self._tcp_writer.wait_closed())
except Exception as e:
logger.debug("Error closing TCP writer: %s", e)
self._tcp_writer = None

# Terminate the subprocess
if self._process is not None:
try:
Expand Down
Loading
Loading