Skip to content

Conversation

@Soulter
Copy link
Member

@Soulter Soulter commented Feb 9, 2026

fixes: #3965

Modifications / 改动点

  • This is NOT a breaking change. / 这不是一个破坏性变更。

Screenshots or Test Results / 运行截图或测试结果


Checklist / 检查清单

  • 😊 如果 PR 中有新加入的功能,已经通过 Issue / 邮件等方式和作者讨论过。/ If there are new features added in the PR, I have discussed it with the authors through issues/emails, etc.
  • 👀 我的更改经过了良好的测试,并已在上方提供了“验证步骤”和“运行截图”。/ My changes have been well-tested, and "Verification Steps" and "Screenshots" have been provided above.
  • 🤓 我确保没有引入新依赖库,或者引入了新依赖库的同时将其添加到了 requirements.txtpyproject.toml 文件相应位置。/ I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations in requirements.txt and pyproject.toml.
  • 😮 我的更改没有引入恶意代码。/ My changes do not introduce malicious code.

Summary by Sourcery

改进企业微信 AI Bot 的流式消息处理和状态管理,以获得更准确、一致的回复。

Bug 修复:

  • 通过使用正确的流 ID,修复流式负载中错误的会话标识符问题。
  • 通过重置关联的缓存内容,防止陈旧或缺失的流队列导致不一致的回复。

增强功能:

  • 为每个流添加累积纯文本内容的缓存,以正确聚合混合的流式消息和分段消息。
  • 处理额外的流控制类型,并在流在无内容情况下结束时发送显式的终止消息,以向客户端发出完成信号。
Original summary in English

Summary by Sourcery

Improve Wecom AI Bot streaming message handling and state management for more accurate, consistent responses.

Bug Fixes:

  • Fix incorrect session identifier in streaming payloads by using the proper stream ID.
  • Prevent stale or missing stream queues from causing inconsistent responses by resetting associated cached content.

Enhancements:

  • Add caching of cumulative plain-text content per stream to correctly aggregate mixed streaming and segmented messages.
  • Handle additional stream control types and send explicit terminal messages when a stream ends without content to signal completion to the client.

@dosubot dosubot bot added the size:M This PR changes 30-99 lines, ignoring generated files. label Feb 9, 2026
@Soulter Soulter merged commit 7193454 into master Feb 9, 2026
5 checks passed
@dosubot
Copy link

dosubot bot commented Feb 9, 2026

Related Documentation

Checked 1 published document(s) in 1 knowledge base(s). No updates required.

How did I do? Any feedback?  Join Discord

@Soulter Soulter deleted the fix/3965 branch February 9, 2026 14:30
Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - 我发现了 1 个问题

给 AI Agents 的提示
Please address the comments from this code review:

## Individual Comments

### Comment 1
<location> `astrbot/core/platform/sources/wecom_ai_bot/wecomai_adapter.py:130` </location>
<code_context>
             self.queue_mgr,
             self._handle_queued_message,
         )
+        self._stream_plain_cache: dict[str, str] = {}

         self.webhook_client: WecomAIBotWebhookClient | None = None
</code_context>

<issue_to_address>
**suggestion (performance):** 请考虑 `_stream_plain_cache` 的生命周期以及其潜在的无限增长问题。

目前只有在流结束且 `has_back_queue(stream_id)` 为 false 时才会移除条目。如果某个流永远无法正常完成,并且其 back queue 从未被移除(例如生产者崩溃或未来逻辑变更),在长时间运行的进程中,这个 dict 可能会无限增长。建议将缓存清理与队列生命周期直接绑定(例如在 `remove_queues` 中或某个共享的 teardown 路径中),并/或增加保护措施,比如 TTL、最大大小限制或调试断言,以检测泄漏。

建议实现方式:

```python
        )
        # Cache plain text for streaming responses; guarded by a max size and lifecycle-based cleanup
        self._stream_plain_cache: dict[str, str] = {}
        # Hard cap to avoid unbounded growth in long-running processes
        self._stream_plain_cache_max_size: int = 10_000

        self.webhook_client: WecomAIBotWebhookClient | None = None

```

```python
        if self.msg_push_webhook_url:
            # Before handling the request, proactively clean up cache entries whose queues no longer exist.
            # This ties cache lifecycle to queue lifecycle and prevents silent leaks.
            if self._stream_plain_cache:
                stale_stream_ids: list[str] = []
                for cached_stream_id in list(self._stream_plain_cache.keys()):
                    if not self.queue_mgr.has_back_queue(cached_stream_id):
                        stale_stream_ids.append(cached_stream_id)

                for stale_stream_id in stale_stream_ids:
                    self._stream_plain_cache.pop(stale_stream_id, None)

                # Safeguard: enforce a hard cap on cache size.
                if len(self._stream_plain_cache) > self._stream_plain_cache_max_size:
                    logger.warning(
                        "wecom_ai_bot _stream_plain_cache exceeded max size (%d > %d); "
                        "clearing cached entries to stay within bounds",
                        len(self._stream_plain_cache),
                        self._stream_plain_cache_max_size,
                    )
                    # Drop arbitrary entries until under the limit.
                    # We use list(...) to avoid RuntimeError from dict size change during iteration.
                    for cached_stream_id in list(self._stream_plain_cache.keys()):
                        self._stream_plain_cache.pop(cached_stream_id, None)
                        if len(self._stream_plain_cache) <= self._stream_plain_cache_max_size:
                            break

            # wechat server is requesting for updates of a stream
            stream_id = message_data["stream"]["id"]
            if not self.queue_mgr.has_back_queue(stream_id):
                # If the queue for this stream no longer exists, clear any residual cache for it。
                self._stream_plain_cache.pop(stream_id, None)
                if self.queue_mgr.is_stream_finished(stream_id):
                    logger.debug(
                        "Stream already finished, returning end message: %s",
                        stream_id,
                    )
                    return None

```

为了在整个代码库中让缓存生命周期与队列生命周期完全对齐,可以考虑:

1. 在任何会永久销毁给定 `stream_id` 的队列的方法中(可能在本文件之外),例如 `queue_mgr.remove_queues(stream_id)` 或类似方法,同时移除对应的缓存条目:
   ```python
   self._stream_plain_cache.pop(stream_id, None)
   ```
2. 确保所有对 `_stream_plain_cache` 的写入都位于这个类中,并使用一致的模式,这样未来如果需要改造成更复杂的 TTL 或 LRU 策略,可以集中管理。如果写入点分散在代码库各处,建议将其重构到一个辅助方法中,例如 `_set_stream_plain_cache(stream_id, value)`,在其中统一执行限制和日志记录。
</issue_to_address>

Sourcery 对开源项目是免费的 —— 如果你觉得我们的评审有帮助,欢迎分享 ✨
帮我变得更有用!请在每条评论上点 👍 或 👎,我会根据你的反馈改进后续评审。
Original comment in English

Hey - I've found 1 issue

Prompt for AI Agents
Please address the comments from this code review:

## Individual Comments

### Comment 1
<location> `astrbot/core/platform/sources/wecom_ai_bot/wecomai_adapter.py:130` </location>
<code_context>
             self.queue_mgr,
             self._handle_queued_message,
         )
+        self._stream_plain_cache: dict[str, str] = {}

         self.webhook_client: WecomAIBotWebhookClient | None = None
</code_context>

<issue_to_address>
**suggestion (performance):** Consider lifecycle and potential unbounded growth of `_stream_plain_cache`.

Right now entries are only removed when a stream finishes and `has_back_queue(stream_id)` is false. If a stream never reaches completion and its back queue is never removed (e.g., producer crash or future logic changes), this dict can grow without bound in long‑running processes. Consider tying cache cleanup directly to queue lifecycle (e.g., in `remove_queues` or a shared teardown path), and/or adding a safeguard such as TTL, max size, or debug assertions to detect leaks.

Suggested implementation:

```python
        )
        # Cache plain text for streaming responses; guarded by a max size and lifecycle-based cleanup
        self._stream_plain_cache: dict[str, str] = {}
        # Hard cap to avoid unbounded growth in long-running processes
        self._stream_plain_cache_max_size: int = 10_000

        self.webhook_client: WecomAIBotWebhookClient | None = None

```

```python
        if self.msg_push_webhook_url:
            # Before handling the request, proactively clean up cache entries whose queues no longer exist.
            # This ties cache lifecycle to queue lifecycle and prevents silent leaks.
            if self._stream_plain_cache:
                stale_stream_ids: list[str] = []
                for cached_stream_id in list(self._stream_plain_cache.keys()):
                    if not self.queue_mgr.has_back_queue(cached_stream_id):
                        stale_stream_ids.append(cached_stream_id)

                for stale_stream_id in stale_stream_ids:
                    self._stream_plain_cache.pop(stale_stream_id, None)

                # Safeguard: enforce a hard cap on cache size.
                if len(self._stream_plain_cache) > self._stream_plain_cache_max_size:
                    logger.warning(
                        "wecom_ai_bot _stream_plain_cache exceeded max size (%d > %d); "
                        "clearing cached entries to stay within bounds",
                        len(self._stream_plain_cache),
                        self._stream_plain_cache_max_size,
                    )
                    # Drop arbitrary entries until under the limit.
                    # We use list(...) to avoid RuntimeError from dict size change during iteration.
                    for cached_stream_id in list(self._stream_plain_cache.keys()):
                        self._stream_plain_cache.pop(cached_stream_id, None)
                        if len(self._stream_plain_cache) <= self._stream_plain_cache_max_size:
                            break

            # wechat server is requesting for updates of a stream
            stream_id = message_data["stream"]["id"]
            if not self.queue_mgr.has_back_queue(stream_id):
                # If the queue for this stream no longer exists, clear any residual cache for it.
                self._stream_plain_cache.pop(stream_id, None)
                if self.queue_mgr.is_stream_finished(stream_id):
                    logger.debug(
                        "Stream already finished, returning end message: %s",
                        stream_id,
                    )
                    return None

```

To fully align cache lifecycle with queue lifecycle across the codebase, consider:

1. In any method (possibly outside this file) that permanently tears down queues for a given `stream_id` (e.g., `queue_mgr.remove_queues(stream_id)` or similar), also removing the corresponding cache entry:
   ```python
   self._stream_plain_cache.pop(stream_id, None)
   ```
2. Ensuring all write sites for `_stream_plain_cache` reside in this class and use a consistent pattern so that future lifecycle changes (like a more sophisticated TTL or LRU strategy) can be centralized. If writes are scattered across the codebase, refactor them into a helper like `_set_stream_plain_cache(stream_id, value)` that can enforce limits and logging in one place.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

self.queue_mgr,
self._handle_queued_message,
)
self._stream_plain_cache: dict[str, str] = {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (performance): 请考虑 _stream_plain_cache 的生命周期以及其潜在的无限增长问题。

目前只有在流结束且 has_back_queue(stream_id) 为 false 时才会移除条目。如果某个流永远无法正常完成,并且其 back queue 从未被移除(例如生产者崩溃或未来逻辑变更),在长时间运行的进程中,这个 dict 可能会无限增长。建议将缓存清理与队列生命周期直接绑定(例如在 remove_queues 中或某个共享的 teardown 路径中),并/或增加保护措施,比如 TTL、最大大小限制或调试断言,以检测泄漏。

建议实现方式:

        )
        # Cache plain text for streaming responses; guarded by a max size and lifecycle-based cleanup
        self._stream_plain_cache: dict[str, str] = {}
        # Hard cap to avoid unbounded growth in long-running processes
        self._stream_plain_cache_max_size: int = 10_000

        self.webhook_client: WecomAIBotWebhookClient | None = None
        if self.msg_push_webhook_url:
            # Before handling the request, proactively clean up cache entries whose queues no longer exist.
            # This ties cache lifecycle to queue lifecycle and prevents silent leaks.
            if self._stream_plain_cache:
                stale_stream_ids: list[str] = []
                for cached_stream_id in list(self._stream_plain_cache.keys()):
                    if not self.queue_mgr.has_back_queue(cached_stream_id):
                        stale_stream_ids.append(cached_stream_id)

                for stale_stream_id in stale_stream_ids:
                    self._stream_plain_cache.pop(stale_stream_id, None)

                # Safeguard: enforce a hard cap on cache size.
                if len(self._stream_plain_cache) > self._stream_plain_cache_max_size:
                    logger.warning(
                        "wecom_ai_bot _stream_plain_cache exceeded max size (%d > %d); "
                        "clearing cached entries to stay within bounds",
                        len(self._stream_plain_cache),
                        self._stream_plain_cache_max_size,
                    )
                    # Drop arbitrary entries until under the limit.
                    # We use list(...) to avoid RuntimeError from dict size change during iteration.
                    for cached_stream_id in list(self._stream_plain_cache.keys()):
                        self._stream_plain_cache.pop(cached_stream_id, None)
                        if len(self._stream_plain_cache) <= self._stream_plain_cache_max_size:
                            break

            # wechat server is requesting for updates of a stream
            stream_id = message_data["stream"]["id"]
            if not self.queue_mgr.has_back_queue(stream_id):
                # If the queue for this stream no longer exists, clear any residual cache for it。
                self._stream_plain_cache.pop(stream_id, None)
                if self.queue_mgr.is_stream_finished(stream_id):
                    logger.debug(
                        "Stream already finished, returning end message: %s",
                        stream_id,
                    )
                    return None

为了在整个代码库中让缓存生命周期与队列生命周期完全对齐,可以考虑:

  1. 在任何会永久销毁给定 stream_id 的队列的方法中(可能在本文件之外),例如 queue_mgr.remove_queues(stream_id) 或类似方法,同时移除对应的缓存条目:
    self._stream_plain_cache.pop(stream_id, None)
  2. 确保所有对 _stream_plain_cache 的写入都位于这个类中,并使用一致的模式,这样未来如果需要改造成更复杂的 TTL 或 LRU 策略,可以集中管理。如果写入点分散在代码库各处,建议将其重构到一个辅助方法中,例如 _set_stream_plain_cache(stream_id, value),在其中统一执行限制和日志记录。
Original comment in English

suggestion (performance): Consider lifecycle and potential unbounded growth of _stream_plain_cache.

Right now entries are only removed when a stream finishes and has_back_queue(stream_id) is false. If a stream never reaches completion and its back queue is never removed (e.g., producer crash or future logic changes), this dict can grow without bound in long‑running processes. Consider tying cache cleanup directly to queue lifecycle (e.g., in remove_queues or a shared teardown path), and/or adding a safeguard such as TTL, max size, or debug assertions to detect leaks.

Suggested implementation:

        )
        # Cache plain text for streaming responses; guarded by a max size and lifecycle-based cleanup
        self._stream_plain_cache: dict[str, str] = {}
        # Hard cap to avoid unbounded growth in long-running processes
        self._stream_plain_cache_max_size: int = 10_000

        self.webhook_client: WecomAIBotWebhookClient | None = None
        if self.msg_push_webhook_url:
            # Before handling the request, proactively clean up cache entries whose queues no longer exist.
            # This ties cache lifecycle to queue lifecycle and prevents silent leaks.
            if self._stream_plain_cache:
                stale_stream_ids: list[str] = []
                for cached_stream_id in list(self._stream_plain_cache.keys()):
                    if not self.queue_mgr.has_back_queue(cached_stream_id):
                        stale_stream_ids.append(cached_stream_id)

                for stale_stream_id in stale_stream_ids:
                    self._stream_plain_cache.pop(stale_stream_id, None)

                # Safeguard: enforce a hard cap on cache size.
                if len(self._stream_plain_cache) > self._stream_plain_cache_max_size:
                    logger.warning(
                        "wecom_ai_bot _stream_plain_cache exceeded max size (%d > %d); "
                        "clearing cached entries to stay within bounds",
                        len(self._stream_plain_cache),
                        self._stream_plain_cache_max_size,
                    )
                    # Drop arbitrary entries until under the limit.
                    # We use list(...) to avoid RuntimeError from dict size change during iteration.
                    for cached_stream_id in list(self._stream_plain_cache.keys()):
                        self._stream_plain_cache.pop(cached_stream_id, None)
                        if len(self._stream_plain_cache) <= self._stream_plain_cache_max_size:
                            break

            # wechat server is requesting for updates of a stream
            stream_id = message_data["stream"]["id"]
            if not self.queue_mgr.has_back_queue(stream_id):
                # If the queue for this stream no longer exists, clear any residual cache for it.
                self._stream_plain_cache.pop(stream_id, None)
                if self.queue_mgr.is_stream_finished(stream_id):
                    logger.debug(
                        "Stream already finished, returning end message: %s",
                        stream_id,
                    )
                    return None

To fully align cache lifecycle with queue lifecycle across the codebase, consider:

  1. In any method (possibly outside this file) that permanently tears down queues for a given stream_id (e.g., queue_mgr.remove_queues(stream_id) or similar), also removing the corresponding cache entry:
    self._stream_plain_cache.pop(stream_id, None)
  2. Ensuring all write sites for _stream_plain_cache reside in this class and use a consistent pattern so that future lifecycle changes (like a more sophisticated TTL or LRU strategy) can be centralized. If writes are scattered across the codebase, refactor them into a helper like _set_stream_plain_cache(stream_id, value) that can enforce limits and logging in one place.

@dosubot dosubot bot added the area:platform The bug / feature is about IM platform adapter, such as QQ, Lark, Telegram, WebChat and so on. label Feb 9, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:platform The bug / feature is about IM platform adapter, such as QQ, Lark, Telegram, WebChat and so on. size:M This PR changes 30-99 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug]企业微信共用一个消息气泡

1 participant