Skip to content

Conversation

@dangusev
Copy link
Contributor

@dangusev dangusev commented Dec 9, 2025

This PR removes the code that subscribes to WS and Coordinator events directly from the Agent class.

Instead, only certain required events, such as track_published, are re-emitted by StreamEdge.
It's a cleaner approach and reduces the dependency between Agent and Stream's WebSocket machinery.

⚠️ It's waiting for GetStream/stream-py#203 to be merged and released.

@dangusev dangusev marked this pull request as draft December 9, 2025 14:08
@coderabbitai
Copy link

coderabbitai bot commented Dec 9, 2025

Important

Review skipped

Draft detected.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Walkthrough

The changes introduce type-checking scaffolding to prevent circular imports across the agents module, reorganize import statements, and refactor GetStream's StreamEdge connection handling to support lazy binding and event redirection from the RTC stack to the agent's event bus.

Changes

Cohort / File(s) Summary
Type-checking and circular import prevention
agents-core/vision_agents/core/agents/agent_session.py, agents-core/vision_agents/core/agents/agents.py
Added TYPE_CHECKING guards and forward-referenced type annotations to break circular import dependency. Reorganized imports of AgentSessionContextManager.
Type annotation improvements
plugins/getstream/tests/test_stream_conversation.py
Updated imports and annotated local messages variable with explicit list[Message] type.
StreamEdge connection lifecycle refactoring
plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py
Made StreamConnection.close timeout configurable, introduced lazy-binding pattern via _real_connection and _connection property, added event redirection from RTC stack (participant_joined, participant_left, track_published, track_unpublished) to agent events, and removed _subscribe_to_existing_tracks pre-subscription logic.

Sequence Diagram(s)

sequenceDiagram
    participant Agent
    participant StreamEdge
    participant ConnectionMgr as ConnectionManager
    participant RTCStack as RTC Stack
    
    Agent->>StreamEdge: join()
    StreamEdge->>StreamEdge: register on-track lifecycle handlers
    StreamEdge->>ConnectionMgr: __aenter__()
    ConnectionMgr->>RTCStack: initialize connection
    RTCStack-->>ConnectionMgr: ready
    ConnectionMgr-->>StreamEdge: connection active
    StreamEdge->>StreamEdge: set _real_connection
    
    note over StreamEdge: Lazy binding complete
    
    RTCStack->>ConnectionMgr: participant_joined event
    ConnectionMgr->>StreamEdge: forward event
    StreamEdge->>Agent: emit to event bus
    
    RTCStack->>ConnectionMgr: track_published event
    ConnectionMgr->>StreamEdge: forward event
    StreamEdge->>Agent: emit to event bus
    
    note over Agent,StreamEdge: Real-time event flow<br/>(replaces pre-subscription)
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

  • StreamEdge connection refactoring: Verify the lazy-binding pattern with _real_connection and _connection property correctly enforces initialization order and handles edge cases where connection is accessed before join() completes.
  • Event redirection logic: Confirm that event forwarding from RTC stack (participant_joined, participant_left, track_published, track_unpublished) to agent event bus is complete and no event types are missed.
  • _subscribe_to_existing_tracks removal: Validate that removal of pre-subscription logic does not introduce race conditions or missed track events during the connection establishment window.
  • Timeout parameter in StreamConnection.close: Ensure TimeoutError handling doesn't mask underlying issues and that the default timeout value is appropriate.

Possibly related PRs

Poem

Bell jar sealed, connection waits in shadow—
lazy binding whispers through the wire,
events bloom where subscriptions burned away.
The RTC stack breathes its truth at last,
no ghosts of tracks before the awakening.

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 36.36% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'Fix SFU events handling inside Agent' directly addresses the main change: refactoring event handling in the Agent class to fix SFU (Selective Forwarding Unit) event handling, as evidenced by removal of coordinator/ws client event forwarding and internal wiring changes.

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py (1)

46-58: Remove the broad exception handler.

Lines 56-57 use except Exception as e:, which explicitly violates the coding guidelines: "Never write except Exception as e - use specific exception handling." This broad catch can mask unexpected errors and make debugging difficult.

As per coding guidelines, remove or replace with specific exception types:

         except RuntimeError as e:
             if "asynchronous generator" in str(e):
                 logger.debug(f"Ignoring async generator error during shutdown: {e}")
             else:
                 raise
-        except Exception as e:
-            logger.error(f"Error during connection close: {e}")

If you expect other specific exceptions during connection cleanup, catch them explicitly by type.

🧹 Nitpick comments (3)
plugins/getstream/tests/test_stream_conversation.py (1)

10-15: Clarify which Message type is intended for messages to avoid ambiguity

You now import Message from getstream.models and annotate messages: list[Message] = [] in the stream_conversation fixture, but there is also a Message class in vision_agents.core.agents.conversation that represents the in-memory conversation message model. Given that conversation.messages[0] is used via .content/.role/.user_id while Stream API responses are inspected via response.data.messages[0].text, it’s easy for readers and type-checkers to confuse the core conversation model with the GetStream SDK model.

Please double‑check which Message type StreamConversation expects for its messages argument:

  • If it expects the core conversation message (vision_agents.core.agents.conversation.Message), switch the import to that type (optionally aliasing it, e.g. ConversationMessage) and update the annotation accordingly.
  • If it truly expects getstream.models.Message, consider aliasing the import (e.g. from getstream.models import Message as StreamMessage) and annotating messages: list[StreamMessage] to make the distinction from the core Message explicit.

Based on relevant_code_snippets, there is another Message type in vision_agents.core.agents.conversation, hence the concern about ambiguity.

Also applies to: 56-56

agents-core/vision_agents/core/agents/agents.py (1)

63-63: Top-level import of AgentSessionContextManager looks good; consider using it directly in the return type.

The runtime import cleanly pairs with the TYPE_CHECKING-only import of Agent in agent_session.py, so there’s no circular import at runtime. Now that the class is available at module import time, you can optionally drop the string return annotation on join for better type clarity:

-    async def join(
-        self, call: Call, wait_for_participant=True
-    ) -> "AgentSessionContextManager":
+    async def join(
+        self, call: Call, wait_for_participant=True
+    ) -> AgentSessionContextManager:

This is non-blocking and mostly a readability/typing win.

agents-core/vision_agents/core/agents/agent_session.py (1)

3-6: TYPE_CHECKING import plus forward reference is a solid way to break the runtime cycle.

The shift to:

  • import typing + if typing.TYPE_CHECKING: from .agents import Agent, and
  • agent: "Agent" in __init__

cleanly removes the runtime dependency on Agent while keeping type checkers happy. This should resolve circular-import issues between agents.py and agent_session.py without changing behavior.

Two minor, optional polish ideas:

  • For consistency with the rest of the codebase, you could use from typing import TYPE_CHECKING and if TYPE_CHECKING: instead of importing the whole typing module.
  • In __exit__, consider routing the exception message through your logging instead of print(...) when you next touch this code, to keep error reporting consistent with the rest of the project.

Both are non-blocking; the current changes are fine as-is.

Also applies to: 32-32

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 04f006f and 6a8c7f6.

📒 Files selected for processing (4)
  • agents-core/vision_agents/core/agents/agent_session.py (2 hunks)
  • agents-core/vision_agents/core/agents/agents.py (1 hunks)
  • plugins/getstream/tests/test_stream_conversation.py (2 hunks)
  • plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py (4 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
**/*.py

📄 CodeRabbit inference engine (.cursor/rules/python.mdc)

**/*.py: Never adjust sys.path in Python code
Never write except Exception as e - use specific exception handling
Avoid using getattr, hasattr, delattr and setattr; prefer normal attribute access in Python
Docstrings should follow the Google style guide for docstrings

Files:

  • agents-core/vision_agents/core/agents/agents.py
  • plugins/getstream/tests/test_stream_conversation.py
  • agents-core/vision_agents/core/agents/agent_session.py
  • plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py
**/*test*.py

📄 CodeRabbit inference engine (.cursor/rules/python.mdc)

**/*test*.py: Never mock in tests; use pytest for testing
Mark integration tests with @pytest.mark.integration decorator
@pytest.mark.asyncio is not needed - it is automatic

Files:

  • plugins/getstream/tests/test_stream_conversation.py
🧬 Code graph analysis (3)
agents-core/vision_agents/core/agents/agents.py (1)
agents-core/vision_agents/core/agents/agent_session.py (1)
  • AgentSessionContextManager (9-91)
plugins/getstream/tests/test_stream_conversation.py (1)
agents-core/vision_agents/core/agents/conversation.py (1)
  • Message (14-24)
plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py (3)
agents-core/vision_agents/core/processors/base_processor.py (1)
  • close (45-46)
agents-core/vision_agents/core/edge/types.py (1)
  • close (34-35)
agents-core/vision_agents/core/edge/edge_transport.py (1)
  • close (38-39)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: unit / Mypy
  • GitHub Check: unit / Test "not integration"
  • GitHub Check: unit / Test "not integration"
  • GitHub Check: unit / Mypy
🔇 Additional comments (3)
plugins/getstream/vision_agents/plugins/getstream/stream_edge_transport.py (3)

86-87: LGTM!

The _real_connection attribute provides clean lazy initialization for the underlying ConnectionManager, supporting the deferred binding pattern described in the PR objectives.


93-97: LGTM!

The _connection property enforces that the connection is properly initialized before use, raising a clear ValueError if accessed prematurely. This defensive pattern prevents subtle bugs from uninitialized connection usage.


319-324: LGTM! Core SFU event handling fix implemented correctly.

The event redirection from the RTC stack to the agent's event bus enables proper external subscriptions. The forwarding is registered before the connection starts, avoiding potential race conditions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants