Official Python SDK for Anam AI - Real-time AI avatar streaming.
# Using uv (recommended)
uv add anam
# With optional display utilities (for testing)
uv add anam --extra display
# Using pip
pip install anam
# With optional display utilities (for testing)
pip install anam[display]import asyncio
from anam import AnamClient
async def main():
# Create client with your API key and persona_id (for pre-defined personas)
client = AnamClient(
api_key="your-api-key",
persona_id="your-persona-id",
)
# Connect and stream
async with client.connect() as session:
print(f"Connected! Session: {session.session_id}")
# Consume video and audio frames concurrently
async def consume_video():
async for frame in session.video_frames():
img = frame.to_ndarray(format="rgb24") # numpy array (H, W, 3) in RGB format - use "bgr24" for OpenCV
print(f"Video: {frame.width}x{frame.height}")
async def consume_audio():
async for frame in session.audio_frames():
samples = frame.to_ndarray() # int16 samples (1D array, interleaved for stereo)
# Determine mono/stereo from frame layout
channel_type = "mono" if frame.layout.nb_channels == 1 else "stereo"
print(f"Audio: {samples.size} samples ({channel_type}) @ {frame.sample_rate}Hz")
# Run both streams concurrently until session closes
await asyncio.gather(
consume_video(),
consume_audio(),
)
asyncio.run(main())- 🎥 Real-time Audio/Video streaming - Receive synchronized audio/video frames from the avatar (as PyAV AudioFrame/VideoFrame objects)
- đź’¬ Two-way communication - Send text messages (like transcribed user speech) and receive generated responses
- 📝 Real-time transcriptions - Receive incremental message stream events for user and persona text as it's generated
- 📚 Message history tracking - Automatic conversation history with incremental updates
- 🤖 Audio-passthrough - Send TTS generated audio input and receive rendered synchronized audio/video avatar
- 🗣️ Direct text-to-speech - Send text directly to TTS for immediate speech output (bypasses LLM processing)
- 🎤 Real-time user audio input - Send raw audio samples (e.g. from microphone) to Anam for processing (turnkey solution: STT → LLM → TTS → Avatar)
- 📡 Async iterator API - Clean, Pythonic async/await patterns for continuous stream of audio/video frames
- 🎯 Event-driven API - Simple decorator-based event handlers for discrete events
- 📝 Fully typed - Complete type hints for IDE support
- đź”’ Server-side ready - Designed for server-side Python applications (e.g. for backend pipelines)
The main client class for connecting to Anam AI.
from anam import AnamClient, PersonaConfig, ClientOptions
# Simple initialization for pre-defined personas - all other parameters are ignored except enable_audio_passthrough
client = AnamClient(
api_key="your-api-key",
persona_id="your-persona-id",
)
# Advanced initialization with full (ephemeral) persona config - ideal for programmatic configuration.
# Use avatar_id instead of persona_id.
client = AnamClient(
api_key="your-api-key",
persona_config=PersonaConfig(
avatar_id="your-avatar-id",
voice_id="your-voice-id",
llm_id="your-llm-id",
name="My Assistant",
system_prompt="You are a helpful assistant...",
avatar_model="cara-3",
language_code="en",
enable_audio_passthrough=False,
),
)Frames are PyAV objects (VideoFrame/AudioFrame) containing synchronized decoded audio (PCM) and video (RGB) samples from the avatar, delivered over WebRTC and extracted by aiortc. All PyAV frame attributes are accessible (samples, format, layout, etc.). Access the frames via async iterators and run both iterators concurrently, e.g. using asyncio.gather():
async with client.connect() as session:
async def process_video():
async for frame in session.video_frames():
img = frame.to_ndarray(format="rgb24") # RGB numpy array
# Process frame...
async def process_audio():
async for frame in session.audio_frames():
samples = frame.to_ndarray() # int16 samples
# Process frame...
# Both streams run concurrently
await asyncio.gather(process_video(), process_audio())User audio input is real-time audio such as microphone audio. User audio is 16-bit PCM samples, mono or stereo, with any sample rate. In order to process the audio correctly, the sample rate needs to be provided. The audio is forwarded in real-time as a WebRTC audio track. In order to reduce latency, any audio provided before the WebRTC audio track is created will be dropped.
TTS audio is generated by a TTS engine, and should be provided in chunks through the send_audio_chunk method. The audio can be a byte array or base64 encoded strings (the SDK will convert to base64). The audio is sent to the backend at maximum upload speed. Sample rate and channels need to be provided through the AgentAudioInputConfig object. When TTS audio finishes (e.g. at the end of a turn), call end_sequence() to signal completion. Without this, the backend keeps waiting for more chunks and the avatar will freeze.
For best performance, we suggest using 24kHz mono audio. The provided audio is returned in-sync with the avatar without any resampling. Sample rates lower than 24kHz will result in poor avatar performance. Sample rates higher than 24kHz might impact latency without any noticeable improvement in audio quality.
Register callbacks for connection and message events using the @client.on() decorator:
from anam import AnamEvent, Message, MessageRole, MessageStreamEvent
@client.on(AnamEvent.CONNECTION_ESTABLISHED)
async def on_connected():
"""Called when the connection is established."""
print("âś… Connected!")
@client.on(AnamEvent.CONNECTION_CLOSED)
async def on_closed(code: str, reason: str | None):
"""Called when the connection is closed."""
print(f"Connection closed: {code} - {reason or 'No reason'}")
@client.on(AnamEvent.MESSAGE_STREAM_EVENT_RECEIVED)
async def on_message_stream_event(event: MessageStreamEvent):
"""Called for each incremental chunk of transcribed text or persona response.
This event fires for both user transcriptions and persona responses as they stream in.
This can be used for real-time captions or transcriptions.
"""
if event.role == MessageRole.USER:
# User transcription (from their speech)
if event.content_index == 0:
print(f"👤 User: ", end="", flush=True)
print(event.content, end="", flush=True)
if event.end_of_speech:
print() # New line when transcription completes
else:
# Persona response
if event.content_index == 0:
print(f"🤖 Persona: ", end="", flush=True)
print(event.content, end="", flush=True)
if event.end_of_speech:
status = "âś“" if not event.interrupted else "âś— INTERRUPTED"
print(f" {status}")
@client.on(AnamEvent.MESSAGE_RECEIVED)
async def on_message(message: Message):
"""Called when a complete message is received (after end_of_speech).
This is fired after MESSAGE_STREAM_EVENT_RECEIVED with end_of_speech=True.
Useful for backward compatibility or when you only need complete messages.
"""
print(f"{message.role}: {message.content}")
@client.on(AnamEvent.MESSAGE_HISTORY_UPDATED)
async def on_message_history_updated(messages: list[Message]):
"""Called when the message history is updated (after a message completes).
The messages list contains the complete conversation history.
"""
print(f"📝 Conversation history: {len(messages)} messages")
for msg in messages:
print(f" {msg.role}: {msg.content[:50]}...")The Session object is returned by client.connect() and provides methods for interacting with the avatar:
async with client.connect() as session:
# Send a text message (simulates user speech)
await session.send_message("Hello, how are you?")
# Send text directly to TTS (bypasses LLM)
await session.talk("This will be spoken immediately")
# Stream text to TTS incrementally (for streaming scenarios)
await session.send_talk_stream(
content="Hello",
start_of_speech=True,
end_of_speech=False,
)
await session.send_talk_stream(
content=" world!",
start_of_speech=False,
end_of_speech=True,
)
# Interrupt the avatar if speaking
await session.interrupt()
# Get message history
history = client.get_message_history()
for msg in history:
print(f"{msg.role}: {msg.content}")
# Wait until the session ends
await session.wait_until_closed()import cv2
import wave
import asyncio
from anam import AnamClient
client = AnamClient(api_key="...", persona_id="...")
video_writer = cv2.VideoWriter("output.mp4", ...)
audio_writer = wave.open("output.wav", "wb")
async def save_video(session):
async for frame in session.video_frames():
# Read frame as BGR for OpenCV VideoWriter
bgr_frame = frame.to_ndarray(format="bgr24")
video_writer.write(bgr_frame)
async def save_audio(session):
async for frame in session.audio_frames():
# Initialize writer on first frame
if audio_writer.getnframes() == 0:
audio_writer.setnchannels(frame.layout.nb_channels)
audio_writer.setsampwidth(2) # 16-bit
audio_writer.setframerate(frame.sample_rate)
# Write audio data (convert to int16 and get bytes)
audio_writer.writeframes(frame.to_ndarray().tobytes())
async with client.connect() as session:
# Record for 30 seconds
await asyncio.wait_for(
asyncio.gather(save_video(session), save_audio(session)),
timeout=30.0,
)import cv2
import asyncio
from anam import AnamClient
client = AnamClient(api_key="...", persona_id="...")
latest_frame = None
async def update_frame(session):
global latest_frame
async for frame in session.video_frames():
# Read frame as BGR for OpenCV display
latest_frame = frame.to_ndarray(format="bgr24")
async def main():
async with client.connect() as session:
# Start frame consumer
frame_task = asyncio.create_task(update_frame(session))
# Display loop
while True:
if latest_frame is not None:
cv2.imshow("Avatar", latest_frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
frame_task.cancel()
asyncio.run(main())export ANAM_API_KEY="your-api-key"
export ANAM_AVATAR_ID="your-avatar-id"
export ANAM_VOICE_ID="your-voice-id"
export ANAM_LLM_ID="your-llm-id"from anam import ClientOptions
options = ClientOptions(
api_base_url="https://api.anam.ai", # API base URL
api_version="v1", # API version
ice_servers=None, # Custom ICE servers for WebRTC delivery
)There are two types of personas:
- Pre-defined personas: use
persona_idonly. Other parameters are ignored exceptenable_audio_passthrough. - Ephemeral personas: use
avatar_id,voice_id,llm_id,avatar_model,system_prompt,language_codeandenable_audio_passthrough.
Pre-defined personas are built in lab.anam.ai and combine avatar, voice and LLM. They cannot be changed after creation. They are quick to set up for demos but offer less flexibility for production use.
client = AnamClient(
api_key="your-api-key",
persona_id="your-persona-id",
)Ephemeral personas give you full control over components at startup. Configure avatar, voice, LLM, and other options at lab.anam.ai (avatars, voices, LLMs). They are ideal for production environments where you need to control the components at startup.
from anam import PersonaConfig
# Ephemeral: specify avatar_id, voice_id, and optionally llm_id, avatar_model
persona = PersonaConfig(
avatar_id="your-avatar-id", # From https://lab.anam.ai/avatars (do not use persona_id)
voice_id="your-voice-id", # From https://lab.anam.ai/voices
llm_id="your-llm-id", # From https://lab.anam.ai/llms (optional)
avatar_model="cara-3", # Video frame model (optional)
system_prompt="You are...", # See https://docs.anam.ai/concepts/prompting-guide
enable_audio_passthrough=False,
)Orchestration is the process of running a pipeline with different components to transform user audio into a response (STT -> LLM -> TTS -> Avatar). Anam allows two types of orchestration:
- Anam's orchestration: Anam receives user audio (or text messages) and runs the pipeline, with a default or custom LLM.
- Custom orchestration: Anam's orchestration is bypassed by directly providing TTS audio. The TTS audio is passed through directly to the avatar, without being added to the context or message history. This can be achieved by setting
enable_audio_passthrough=True. See TTS audio (Audio Passthrough) for more details.
Anam's orchestration layer allows you to choose between default LLMs or running your own custom LLMs:
- Default LLMs: Use Anam-provided models when you do not run your own.
- Custom LLMs: Anam connects to your LLM server-to-server. Add and test the connection at lab.anam.ai/llms.
CUSTOMER_CLIENT_V1: Your LLM is not directly connected to Anam. UseMESSAGE_STREAM_EVENT_RECEIVEDto forward messages and send responses via talk stream (orenable_audio_passthrough=Truefor TTS). Higher latency; useful for niche use cases but not recommended for general applications.
from anam import AnamError, AuthenticationError, SessionError
try:
async with client.connect() as session:
await session.wait_until_closed()
except AuthenticationError as e:
print(f"Invalid API key: {e}")
except SessionError as e:
print(f"Session error: {e}")
except AnamError as e:
print(f"Anam error [{e.code}]: {e.message}")- Python 3.10+
- Dependencies are installed automatically:
aiortc- WebRTC implementationaiohttp- HTTP clientwebsockets- WebSocket clientnumpy- Array handlingpyav- Video and audio handling
Optional for display utilities:
opencv-python- Video displaysounddevice- Audio playback
MIT License - see LICENSE for details.