Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ asyncio.run(main())
- 🤖 **Audio-passthrough** - Send TTS generated audio input and receive rendered synchronized audio/video avatar
- 🗣️ **Direct text-to-speech** - Send text directly to TTS for immediate speech output (bypasses LLM processing)
- 🎤 **Real-time user audio input** - Send raw audio samples (e.g. from microphone) to Anam for processing (turnkey solution: STT → LLM → TTS → Avatar)
- 🔧 **Client tool events** - Receive function/tool invocations from the LLM for custom client-side logic (function calling)
- 🧠 **Reasoning stream events** - Receive LLM chain-of-thought/reasoning as it streams (REASONING_STREAM_EVENT_RECEIVED, REASONING_HISTORY_UPDATED)
- 📡 **Async iterator API** - Clean, Pythonic async/await patterns for continuous stream of audio/video frames
- 🎯 **Event-driven API** - Simple decorator-based event handlers for discrete events
- 📝 **Fully typed** - Complete type hints for IDE support
Expand Down Expand Up @@ -144,7 +146,7 @@ For best performance, we suggest using 24kHz mono audio. The provided audio is r
Register callbacks for connection and message events using the `@client.on()` decorator:

```python
from anam import AnamEvent, Message, MessageRole, MessageStreamEvent
from anam import AnamEvent, ClientToolEvent, Message, MessageRole, MessageStreamEvent, ReasoningMessage, ReasoningStreamEvent

@client.on(AnamEvent.CONNECTION_ESTABLISHED)
async def on_connected():
Expand Down Expand Up @@ -197,6 +199,29 @@ async def on_message_history_updated(messages: list[Message]):
print(f"📝 Conversation history: {len(messages)} messages")
for msg in messages:
print(f" {msg.role}: {msg.content[:50]}...")

@client.on(AnamEvent.CLIENT_TOOL_EVENT_RECEIVED)
async def on_client_tool_event(event: ClientToolEvent):
"""Called when the LLM invokes a client-side tool (function calling).

Use this to implement custom tools: handle the event, execute your logic,
and optionally send a response back via the talk stream.
"""
if event.event_name == "redirect":
url = event.event_data.get("url", "")
print(f"🔧 Tool 'redirect' called with url={url}")

@client.on(AnamEvent.REASONING_STREAM_EVENT_RECEIVED)
async def on_reasoning_stream_event(event: ReasoningStreamEvent):
"""Called for each chunk of the LLM's reasoning/chain-of-thought as it streams."""
print(event.content, end="", flush=True)
if event.end_of_thought:
print() # New line when thought completes

@client.on(AnamEvent.REASONING_HISTORY_UPDATED)
async def on_reasoning_history_updated(messages: list[ReasoningMessage]):
"""Called when the reasoning history is updated (after a thought completes)."""
print(f"🧠 Reasoning history: {len(messages)} thought(s)")
```

### Session
Expand All @@ -219,10 +244,12 @@ async with client.connect() as session:
# Interrupt the avatar if speaking
await session.interrupt()

# Get message history
# Get message history and reasoning history
history = client.get_message_history()
for msg in history:
print(f"{msg.role}: {msg.content}")
for thought in client.get_reasoning_history():
print(f" [reasoning] {thought.content[:50]}...")

# Wait until the session ends
await session.wait_until_closed()
Expand Down
6 changes: 6 additions & 0 deletions src/anam/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,14 @@ async def consume_audio():
AgentAudioInputConfig,
AnamEvent,
ClientOptions,
ClientToolEvent,
ConnectionClosedCode,
Message,
MessageRole,
MessageStreamEvent,
PersonaConfig,
ReasoningMessage,
ReasoningStreamEvent,
SessionOptions,
SessionReplayOptions,
)
Expand All @@ -79,11 +82,14 @@ async def consume_audio():
"AnamEvent",
"AudioFrame",
"ClientOptions",
"ClientToolEvent",
"ConnectionClosedCode",
"Message",
"MessageRole",
"MessageStreamEvent",
"PersonaConfig",
"ReasoningMessage",
"ReasoningStreamEvent",
"SessionOptions",
"SessionReplayOptions",
"VideoFrame",
Expand Down
75 changes: 75 additions & 0 deletions src/anam/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@
AgentAudioInputConfig,
AnamEvent,
ClientOptions,
ClientToolEvent,
ConnectionClosedCode,
Message,
MessageRole,
MessageStreamEvent,
PersonaConfig,
ReasoningMessage,
ReasoningStreamEvent,
SessionInfo,
SessionOptions,
)
Expand Down Expand Up @@ -137,6 +140,7 @@ def __init__(
self._streaming_client: StreamingClient | None = None
self._is_streaming = False
self._message_history: list[Message] = []
self._reasoning_history: list[ReasoningMessage] = []

def on(self, event: AnamEvent) -> Callable[[T], T]:
"""Decorator to register an event handler.
Expand Down Expand Up @@ -316,6 +320,68 @@ async def _handle_data_message(self, data: dict[str, Any]) -> None:
AnamEvent.MESSAGE_HISTORY_UPDATED, self._message_history.copy()
)

elif message_type == "clientToolEvent":
# Convert WebRTC format (snake_case) to ClientToolEvent
tool_data = data.get("data", {})
client_tool_event = ClientToolEvent(
event_uid=tool_data.get("event_uid", ""),
session_id=tool_data.get("session_id", ""),
event_name=tool_data.get("event_name", ""),
event_data=tool_data.get("event_data", {}),
timestamp=tool_data.get("timestamp", ""),
timestamp_user_action=tool_data.get("timestamp_user_action", ""),
user_action_correlation_id=tool_data.get("user_action_correlation_id", ""),
)
await self._emit(AnamEvent.CLIENT_TOOL_EVENT_RECEIVED, client_tool_event)

elif message_type == "reasoningText":
# Convert WebRTC format to ReasoningStreamEvent
reason_data = data.get("data", {})
message_id = reason_data.get("message_id", "")
role = reason_data.get("role", "persona")
content = reason_data.get("content", "")
end_of_thought = reason_data.get("end_of_thought", False)

stream_event_id = f"{role}::{message_id}"
stream_event = ReasoningStreamEvent(
id=stream_event_id,
content=content,
role=role,
end_of_thought=end_of_thought,
)
await self._emit(AnamEvent.REASONING_STREAM_EVENT_RECEIVED, stream_event)

self._process_reasoning_stream_event(stream_event)

if end_of_thought:
await self._emit(
AnamEvent.REASONING_HISTORY_UPDATED,
self._reasoning_history.copy(),
)

def _process_reasoning_stream_event(self, event: ReasoningStreamEvent) -> None:
"""Process a reasoning stream event and update reasoning history."""
existing_index = next(
(i for i, msg in enumerate(self._reasoning_history) if msg.id == event.id),
None,
)

if existing_index is not None:
existing = self._reasoning_history[existing_index]
self._reasoning_history[existing_index] = ReasoningMessage(
id=existing.id,
content=existing.content + event.content,
role=existing.role,
)
else:
self._reasoning_history.append(
ReasoningMessage(
id=event.id,
content=event.content,
role=event.role,
)
)

def _process_message_stream_event(self, event: MessageStreamEvent, timestamp: str) -> None:
"""Process a message stream event and update message history."""
# Find existing message with same ID (for both user and persona messages)
Expand Down Expand Up @@ -391,6 +457,7 @@ async def close(self) -> None:
self._streaming_client = None
self._session_info = None
self._message_history.clear()
self._reasoning_history.clear()
logger.info("Client closed")

@property
Expand All @@ -411,6 +478,14 @@ def get_message_history(self) -> list[Message]:
"""
return self._message_history.copy()

def get_reasoning_history(self) -> list[ReasoningMessage]:
"""Get the current reasoning/chain-of-thought history.

Returns:
A list of reasoning messages from the LLM.
"""
return self._reasoning_history.copy()

def set_persona_config(self, persona_config: PersonaConfig) -> None:
"""Set the persona configuration.

Expand Down
72 changes: 72 additions & 0 deletions src/anam/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ class AnamEvent(str, Enum):
# Persona events
TALK_STREAM_INTERRUPTED = "talk_stream_interrupted"

# Tool events (function calling)
CLIENT_TOOL_EVENT_RECEIVED = "client_tool_event_received"

# Reasoning events (LLM chain-of-thought)
REASONING_STREAM_EVENT_RECEIVED = "reasoning_stream_event_received"
REASONING_HISTORY_UPDATED = "reasoning_history_updated"

# Error events
ERROR = "error"
SERVER_WARNING = "server_warning"
Expand Down Expand Up @@ -197,6 +204,71 @@ class MessageStreamEvent:
interrupted: bool = False


@dataclass
class ReasoningStreamEvent:
"""A streaming reasoning/chain-of-thought event from the LLM.

Emitted for each chunk of the LLM's reasoning as it streams. Use this
to display or log the model's internal reasoning process.

Attributes:
id: Unique identifier for the thought (same for all chunks).
content: The text content of this chunk.
role: Role of the reasoning (e.g., "persona").
end_of_thought: Whether this is the final chunk of the thought.
"""

id: str
content: str
role: str
end_of_thought: bool


@dataclass
class ReasoningMessage:
"""A complete reasoning/chain-of-thought message from the LLM.

Accumulated from ReasoningStreamEvent chunks. Emitted in
REASONING_HISTORY_UPDATED when end_of_thought is True.

Attributes:
id: Unique identifier for the thought.
content: The full text content of the thought.
role: Role of the reasoning (e.g., "persona").
"""

id: str
content: str
role: str


@dataclass
class ClientToolEvent:
"""A client tool event from the LLM (function calling).

Emitted when the LLM invokes a client-side tool. Use this to implement
function calling: handle the event, execute the tool logic, and optionally
send a response back via the talk stream.

Attributes:
event_uid: Unique ID for this event.
session_id: Session ID.
event_name: The tool name (e.g., "redirect", "get_weather").
event_data: LLM-generated parameters for the tool.
timestamp: ISO timestamp when event was created.
timestamp_user_action: ISO timestamp of user action that triggered this.
user_action_correlation_id: Correlation ID for tracking.
"""

event_uid: str
session_id: str
event_name: str
event_data: dict[str, Any]
timestamp: str
timestamp_user_action: str
user_action_correlation_id: str


@dataclass
class AgentAudioInputConfig:
"""Configuration for agent audio input stream.
Expand Down
Loading
Loading