Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions src/pipecat/transports/websocket/fastapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,13 @@ async def cleanup(self):

async def _receive_messages(self):
"""Main message receiving loop for WebSocket messages."""

async def trigger_disconnect_if_needed():
# Trigger `on_client_disconnected` if the client actually disconnects,
# that is, we are not the ones disconnecting.
if not self._client.is_closing:
await self._client.trigger_client_disconnected()

try:
async for message in self._client.receive():
if not self._params.serializer:
Expand All @@ -294,11 +301,14 @@ async def _receive_messages(self):
await self.push_frame(frame)
except Exception as e:
logger.error(f"{self} exception receiving data: {e.__class__.__name__} ({e})")

# Trigger `on_client_disconnected` if the client actually disconnects,
# that is, we are not the ones disconnecting.
if not self._client.is_closing:
await self._client.trigger_client_disconnected()
Comment on lines -300 to -301
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that reaching this point means the client has closed the socket, which will trigger on_client_disconnected.

Usually, the task is cancelled(stop) inside the user’s code at that point. Like, inside the on_client_disconnected callback.

I’m not sure how the race condition you mentioned would specifically impact this case. Like who is invoking stop at the same time ?

Have you been able to reproduce the issue locally ? Do you have the steps we should follow ?

Also tagging @aconchillo here for thoughts.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel the same. Also. we only want to trigger on_client_disconnected when the client really disconnects.

finally:
# Use shield to prevent cancellation from stopping the disconnect callback
try:
await asyncio.shield(trigger_disconnect_if_needed())
except asyncio.CancelledError:
# Even if we're cancelled, try to trigger the disconnect
await trigger_disconnect_if_needed()
raise

async def _monitor_websocket(self):
"""Wait for self._params.session_timeout seconds, if the websocket is still open, trigger timeout event."""
Expand Down