-
Notifications
You must be signed in to change notification settings - Fork 243
Fix SFU events handling inside Agent
#245
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the WalkthroughThe changes introduce type-checking scaffolding to prevent circular imports across the agents module, reorganize import statements, and refactor GetStream's StreamEdge connection handling to support lazy binding and event redirection from the RTC stack to the agent's event bus. Changes
Sequence Diagram(s)sequenceDiagram
participant Agent
participant StreamEdge
participant ConnectionMgr as ConnectionManager
participant RTCStack as RTC Stack
Agent->>StreamEdge: join()
StreamEdge->>StreamEdge: register on-track lifecycle handlers
StreamEdge->>ConnectionMgr: __aenter__()
ConnectionMgr->>RTCStack: initialize connection
RTCStack-->>ConnectionMgr: ready
ConnectionMgr-->>StreamEdge: connection active
StreamEdge->>StreamEdge: set _real_connection
note over StreamEdge: Lazy binding complete
RTCStack->>ConnectionMgr: participant_joined event
ConnectionMgr->>StreamEdge: forward event
StreamEdge->>Agent: emit to event bus
RTCStack->>ConnectionMgr: track_published event
ConnectionMgr->>StreamEdge: forward event
StreamEdge->>Agent: emit to event bus
note over Agent,StreamEdge: Real-time event flow<br/>(replaces pre-subscription)
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes
Possibly related PRs
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py (1)
46-58: Remove the broad exception handler.Lines 56-57 use
except Exception as e:, which explicitly violates the coding guidelines: "Never writeexcept Exception as e- use specific exception handling." This broad catch can mask unexpected errors and make debugging difficult.As per coding guidelines, remove or replace with specific exception types:
except RuntimeError as e: if "asynchronous generator" in str(e): logger.debug(f"Ignoring async generator error during shutdown: {e}") else: raise - except Exception as e: - logger.error(f"Error during connection close: {e}")If you expect other specific exceptions during connection cleanup, catch them explicitly by type.
🧹 Nitpick comments (3)
plugins/getstream/tests/test_stream_conversation.py (1)
10-15: Clarify whichMessagetype is intended formessagesto avoid ambiguityYou now import
Messagefromgetstream.modelsand annotatemessages: list[Message] = []in thestream_conversationfixture, but there is also aMessageclass invision_agents.core.agents.conversationthat represents the in-memory conversation message model. Given thatconversation.messages[0]is used via.content/.role/.user_idwhile Stream API responses are inspected viaresponse.data.messages[0].text, it’s easy for readers and type-checkers to confuse the core conversation model with the GetStream SDK model.Please double‑check which
MessagetypeStreamConversationexpects for itsmessagesargument:
- If it expects the core conversation message (
vision_agents.core.agents.conversation.Message), switch the import to that type (optionally aliasing it, e.g.ConversationMessage) and update the annotation accordingly.- If it truly expects
getstream.models.Message, consider aliasing the import (e.g.from getstream.models import Message as StreamMessage) and annotatingmessages: list[StreamMessage]to make the distinction from the coreMessageexplicit.Based on relevant_code_snippets, there is another
Messagetype invision_agents.core.agents.conversation, hence the concern about ambiguity.Also applies to: 56-56
agents-core/vision_agents/core/agents/agents.py (1)
63-63: Top-level import ofAgentSessionContextManagerlooks good; consider using it directly in the return type.The runtime import cleanly pairs with the TYPE_CHECKING-only import of
Agentinagent_session.py, so there’s no circular import at runtime. Now that the class is available at module import time, you can optionally drop the string return annotation onjoinfor better type clarity:- async def join( - self, call: Call, wait_for_participant=True - ) -> "AgentSessionContextManager": + async def join( + self, call: Call, wait_for_participant=True + ) -> AgentSessionContextManager:This is non-blocking and mostly a readability/typing win.
agents-core/vision_agents/core/agents/agent_session.py (1)
3-6: TYPE_CHECKING import plus forward reference is a solid way to break the runtime cycle.The shift to:
import typing+if typing.TYPE_CHECKING: from .agents import Agent, andagent: "Agent"in__init__cleanly removes the runtime dependency on
Agentwhile keeping type checkers happy. This should resolve circular-import issues betweenagents.pyandagent_session.pywithout changing behavior.Two minor, optional polish ideas:
- For consistency with the rest of the codebase, you could use
from typing import TYPE_CHECKINGandif TYPE_CHECKING:instead of importing the wholetypingmodule.- In
__exit__, consider routing the exception message through your logging instead ofprint(...)when you next touch this code, to keep error reporting consistent with the rest of the project.Both are non-blocking; the current changes are fine as-is.
Also applies to: 32-32
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (4)
agents-core/vision_agents/core/agents/agent_session.py(2 hunks)agents-core/vision_agents/core/agents/agents.py(1 hunks)plugins/getstream/tests/test_stream_conversation.py(2 hunks)plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py(4 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
**/*.py
📄 CodeRabbit inference engine (.cursor/rules/python.mdc)
**/*.py: Never adjust sys.path in Python code
Never writeexcept Exception as e- use specific exception handling
Avoid using getattr, hasattr, delattr and setattr; prefer normal attribute access in Python
Docstrings should follow the Google style guide for docstrings
Files:
agents-core/vision_agents/core/agents/agents.pyplugins/getstream/tests/test_stream_conversation.pyagents-core/vision_agents/core/agents/agent_session.pyplugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py
**/*test*.py
📄 CodeRabbit inference engine (.cursor/rules/python.mdc)
**/*test*.py: Never mock in tests; use pytest for testing
Mark integration tests with @pytest.mark.integration decorator
@pytest.mark.asyncio is not needed - it is automatic
Files:
plugins/getstream/tests/test_stream_conversation.py
🧬 Code graph analysis (3)
agents-core/vision_agents/core/agents/agents.py (1)
agents-core/vision_agents/core/agents/agent_session.py (1)
AgentSessionContextManager(9-91)
plugins/getstream/tests/test_stream_conversation.py (1)
agents-core/vision_agents/core/agents/conversation.py (1)
Message(14-24)
plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py (3)
agents-core/vision_agents/core/processors/base_processor.py (1)
close(45-46)agents-core/vision_agents/core/edge/types.py (1)
close(34-35)agents-core/vision_agents/core/edge/edge_transport.py (1)
close(38-39)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: unit / Mypy
- GitHub Check: unit / Test "not integration"
- GitHub Check: unit / Test "not integration"
- GitHub Check: unit / Mypy
🔇 Additional comments (3)
plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py (3)
86-87: LGTM!The
_real_connectionattribute provides clean lazy initialization for the underlying ConnectionManager, supporting the deferred binding pattern described in the PR objectives.
93-97: LGTM!The
_connectionproperty enforces that the connection is properly initialized before use, raising a clear ValueError if accessed prematurely. This defensive pattern prevents subtle bugs from uninitialized connection usage.
319-324: LGTM! Core SFU event handling fix implemented correctly.The event redirection from the RTC stack to the agent's event bus enables proper external subscriptions. The forwarding is registered before the connection starts, avoiding potential race conditions.
This PR removes the code that subscribes to WS and Coordinator events directly from the
Agentclass.Instead, only certain required events, such as
track_published, are re-emitted byStreamEdge.It's a cleaner approach and reduces the dependency between
Agentand Stream's WebSocket machinery.