-
-
Notifications
You must be signed in to change notification settings - Fork 1.3k
feat: enhance WecomAIBotAdapter and WecomAIBotMessageEvent for improved streaming message handling #5000
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…ed streaming message handling fixes: #3965
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey - 我发现了 1 个问题
给 AI Agents 的提示
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location> `astrbot/core/platform/sources/wecom_ai_bot/wecomai_adapter.py:130` </location>
<code_context>
self.queue_mgr,
self._handle_queued_message,
)
+ self._stream_plain_cache: dict[str, str] = {}
self.webhook_client: WecomAIBotWebhookClient | None = None
</code_context>
<issue_to_address>
**suggestion (performance):** 请考虑 `_stream_plain_cache` 的生命周期以及其潜在的无限增长问题。
目前只有在流结束且 `has_back_queue(stream_id)` 为 false 时才会移除条目。如果某个流永远无法正常完成,并且其 back queue 从未被移除(例如生产者崩溃或未来逻辑变更),在长时间运行的进程中,这个 dict 可能会无限增长。建议将缓存清理与队列生命周期直接绑定(例如在 `remove_queues` 中或某个共享的 teardown 路径中),并/或增加保护措施,比如 TTL、最大大小限制或调试断言,以检测泄漏。
建议实现方式:
```python
)
# Cache plain text for streaming responses; guarded by a max size and lifecycle-based cleanup
self._stream_plain_cache: dict[str, str] = {}
# Hard cap to avoid unbounded growth in long-running processes
self._stream_plain_cache_max_size: int = 10_000
self.webhook_client: WecomAIBotWebhookClient | None = None
```
```python
if self.msg_push_webhook_url:
# Before handling the request, proactively clean up cache entries whose queues no longer exist.
# This ties cache lifecycle to queue lifecycle and prevents silent leaks.
if self._stream_plain_cache:
stale_stream_ids: list[str] = []
for cached_stream_id in list(self._stream_plain_cache.keys()):
if not self.queue_mgr.has_back_queue(cached_stream_id):
stale_stream_ids.append(cached_stream_id)
for stale_stream_id in stale_stream_ids:
self._stream_plain_cache.pop(stale_stream_id, None)
# Safeguard: enforce a hard cap on cache size.
if len(self._stream_plain_cache) > self._stream_plain_cache_max_size:
logger.warning(
"wecom_ai_bot _stream_plain_cache exceeded max size (%d > %d); "
"clearing cached entries to stay within bounds",
len(self._stream_plain_cache),
self._stream_plain_cache_max_size,
)
# Drop arbitrary entries until under the limit.
# We use list(...) to avoid RuntimeError from dict size change during iteration.
for cached_stream_id in list(self._stream_plain_cache.keys()):
self._stream_plain_cache.pop(cached_stream_id, None)
if len(self._stream_plain_cache) <= self._stream_plain_cache_max_size:
break
# wechat server is requesting for updates of a stream
stream_id = message_data["stream"]["id"]
if not self.queue_mgr.has_back_queue(stream_id):
# If the queue for this stream no longer exists, clear any residual cache for it。
self._stream_plain_cache.pop(stream_id, None)
if self.queue_mgr.is_stream_finished(stream_id):
logger.debug(
"Stream already finished, returning end message: %s",
stream_id,
)
return None
```
为了在整个代码库中让缓存生命周期与队列生命周期完全对齐,可以考虑:
1. 在任何会永久销毁给定 `stream_id` 的队列的方法中(可能在本文件之外),例如 `queue_mgr.remove_queues(stream_id)` 或类似方法,同时移除对应的缓存条目:
```python
self._stream_plain_cache.pop(stream_id, None)
```
2. 确保所有对 `_stream_plain_cache` 的写入都位于这个类中,并使用一致的模式,这样未来如果需要改造成更复杂的 TTL 或 LRU 策略,可以集中管理。如果写入点分散在代码库各处,建议将其重构到一个辅助方法中,例如 `_set_stream_plain_cache(stream_id, value)`,在其中统一执行限制和日志记录。
</issue_to_address>帮我变得更有用!请在每条评论上点 👍 或 👎,我会根据你的反馈改进后续评审。
Original comment in English
Hey - I've found 1 issue
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location> `astrbot/core/platform/sources/wecom_ai_bot/wecomai_adapter.py:130` </location>
<code_context>
self.queue_mgr,
self._handle_queued_message,
)
+ self._stream_plain_cache: dict[str, str] = {}
self.webhook_client: WecomAIBotWebhookClient | None = None
</code_context>
<issue_to_address>
**suggestion (performance):** Consider lifecycle and potential unbounded growth of `_stream_plain_cache`.
Right now entries are only removed when a stream finishes and `has_back_queue(stream_id)` is false. If a stream never reaches completion and its back queue is never removed (e.g., producer crash or future logic changes), this dict can grow without bound in long‑running processes. Consider tying cache cleanup directly to queue lifecycle (e.g., in `remove_queues` or a shared teardown path), and/or adding a safeguard such as TTL, max size, or debug assertions to detect leaks.
Suggested implementation:
```python
)
# Cache plain text for streaming responses; guarded by a max size and lifecycle-based cleanup
self._stream_plain_cache: dict[str, str] = {}
# Hard cap to avoid unbounded growth in long-running processes
self._stream_plain_cache_max_size: int = 10_000
self.webhook_client: WecomAIBotWebhookClient | None = None
```
```python
if self.msg_push_webhook_url:
# Before handling the request, proactively clean up cache entries whose queues no longer exist.
# This ties cache lifecycle to queue lifecycle and prevents silent leaks.
if self._stream_plain_cache:
stale_stream_ids: list[str] = []
for cached_stream_id in list(self._stream_plain_cache.keys()):
if not self.queue_mgr.has_back_queue(cached_stream_id):
stale_stream_ids.append(cached_stream_id)
for stale_stream_id in stale_stream_ids:
self._stream_plain_cache.pop(stale_stream_id, None)
# Safeguard: enforce a hard cap on cache size.
if len(self._stream_plain_cache) > self._stream_plain_cache_max_size:
logger.warning(
"wecom_ai_bot _stream_plain_cache exceeded max size (%d > %d); "
"clearing cached entries to stay within bounds",
len(self._stream_plain_cache),
self._stream_plain_cache_max_size,
)
# Drop arbitrary entries until under the limit.
# We use list(...) to avoid RuntimeError from dict size change during iteration.
for cached_stream_id in list(self._stream_plain_cache.keys()):
self._stream_plain_cache.pop(cached_stream_id, None)
if len(self._stream_plain_cache) <= self._stream_plain_cache_max_size:
break
# wechat server is requesting for updates of a stream
stream_id = message_data["stream"]["id"]
if not self.queue_mgr.has_back_queue(stream_id):
# If the queue for this stream no longer exists, clear any residual cache for it.
self._stream_plain_cache.pop(stream_id, None)
if self.queue_mgr.is_stream_finished(stream_id):
logger.debug(
"Stream already finished, returning end message: %s",
stream_id,
)
return None
```
To fully align cache lifecycle with queue lifecycle across the codebase, consider:
1. In any method (possibly outside this file) that permanently tears down queues for a given `stream_id` (e.g., `queue_mgr.remove_queues(stream_id)` or similar), also removing the corresponding cache entry:
```python
self._stream_plain_cache.pop(stream_id, None)
```
2. Ensuring all write sites for `_stream_plain_cache` reside in this class and use a consistent pattern so that future lifecycle changes (like a more sophisticated TTL or LRU strategy) can be centralized. If writes are scattered across the codebase, refactor them into a helper like `_set_stream_plain_cache(stream_id, value)` that can enforce limits and logging in one place.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| self.queue_mgr, | ||
| self._handle_queued_message, | ||
| ) | ||
| self._stream_plain_cache: dict[str, str] = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (performance): 请考虑 _stream_plain_cache 的生命周期以及其潜在的无限增长问题。
目前只有在流结束且 has_back_queue(stream_id) 为 false 时才会移除条目。如果某个流永远无法正常完成,并且其 back queue 从未被移除(例如生产者崩溃或未来逻辑变更),在长时间运行的进程中,这个 dict 可能会无限增长。建议将缓存清理与队列生命周期直接绑定(例如在 remove_queues 中或某个共享的 teardown 路径中),并/或增加保护措施,比如 TTL、最大大小限制或调试断言,以检测泄漏。
建议实现方式:
)
# Cache plain text for streaming responses; guarded by a max size and lifecycle-based cleanup
self._stream_plain_cache: dict[str, str] = {}
# Hard cap to avoid unbounded growth in long-running processes
self._stream_plain_cache_max_size: int = 10_000
self.webhook_client: WecomAIBotWebhookClient | None = None if self.msg_push_webhook_url:
# Before handling the request, proactively clean up cache entries whose queues no longer exist.
# This ties cache lifecycle to queue lifecycle and prevents silent leaks.
if self._stream_plain_cache:
stale_stream_ids: list[str] = []
for cached_stream_id in list(self._stream_plain_cache.keys()):
if not self.queue_mgr.has_back_queue(cached_stream_id):
stale_stream_ids.append(cached_stream_id)
for stale_stream_id in stale_stream_ids:
self._stream_plain_cache.pop(stale_stream_id, None)
# Safeguard: enforce a hard cap on cache size.
if len(self._stream_plain_cache) > self._stream_plain_cache_max_size:
logger.warning(
"wecom_ai_bot _stream_plain_cache exceeded max size (%d > %d); "
"clearing cached entries to stay within bounds",
len(self._stream_plain_cache),
self._stream_plain_cache_max_size,
)
# Drop arbitrary entries until under the limit.
# We use list(...) to avoid RuntimeError from dict size change during iteration.
for cached_stream_id in list(self._stream_plain_cache.keys()):
self._stream_plain_cache.pop(cached_stream_id, None)
if len(self._stream_plain_cache) <= self._stream_plain_cache_max_size:
break
# wechat server is requesting for updates of a stream
stream_id = message_data["stream"]["id"]
if not self.queue_mgr.has_back_queue(stream_id):
# If the queue for this stream no longer exists, clear any residual cache for it。
self._stream_plain_cache.pop(stream_id, None)
if self.queue_mgr.is_stream_finished(stream_id):
logger.debug(
"Stream already finished, returning end message: %s",
stream_id,
)
return None为了在整个代码库中让缓存生命周期与队列生命周期完全对齐,可以考虑:
- 在任何会永久销毁给定
stream_id的队列的方法中(可能在本文件之外),例如queue_mgr.remove_queues(stream_id)或类似方法,同时移除对应的缓存条目:self._stream_plain_cache.pop(stream_id, None)
- 确保所有对
_stream_plain_cache的写入都位于这个类中,并使用一致的模式,这样未来如果需要改造成更复杂的 TTL 或 LRU 策略,可以集中管理。如果写入点分散在代码库各处,建议将其重构到一个辅助方法中,例如_set_stream_plain_cache(stream_id, value),在其中统一执行限制和日志记录。
Original comment in English
suggestion (performance): Consider lifecycle and potential unbounded growth of _stream_plain_cache.
Right now entries are only removed when a stream finishes and has_back_queue(stream_id) is false. If a stream never reaches completion and its back queue is never removed (e.g., producer crash or future logic changes), this dict can grow without bound in long‑running processes. Consider tying cache cleanup directly to queue lifecycle (e.g., in remove_queues or a shared teardown path), and/or adding a safeguard such as TTL, max size, or debug assertions to detect leaks.
Suggested implementation:
)
# Cache plain text for streaming responses; guarded by a max size and lifecycle-based cleanup
self._stream_plain_cache: dict[str, str] = {}
# Hard cap to avoid unbounded growth in long-running processes
self._stream_plain_cache_max_size: int = 10_000
self.webhook_client: WecomAIBotWebhookClient | None = None if self.msg_push_webhook_url:
# Before handling the request, proactively clean up cache entries whose queues no longer exist.
# This ties cache lifecycle to queue lifecycle and prevents silent leaks.
if self._stream_plain_cache:
stale_stream_ids: list[str] = []
for cached_stream_id in list(self._stream_plain_cache.keys()):
if not self.queue_mgr.has_back_queue(cached_stream_id):
stale_stream_ids.append(cached_stream_id)
for stale_stream_id in stale_stream_ids:
self._stream_plain_cache.pop(stale_stream_id, None)
# Safeguard: enforce a hard cap on cache size.
if len(self._stream_plain_cache) > self._stream_plain_cache_max_size:
logger.warning(
"wecom_ai_bot _stream_plain_cache exceeded max size (%d > %d); "
"clearing cached entries to stay within bounds",
len(self._stream_plain_cache),
self._stream_plain_cache_max_size,
)
# Drop arbitrary entries until under the limit.
# We use list(...) to avoid RuntimeError from dict size change during iteration.
for cached_stream_id in list(self._stream_plain_cache.keys()):
self._stream_plain_cache.pop(cached_stream_id, None)
if len(self._stream_plain_cache) <= self._stream_plain_cache_max_size:
break
# wechat server is requesting for updates of a stream
stream_id = message_data["stream"]["id"]
if not self.queue_mgr.has_back_queue(stream_id):
# If the queue for this stream no longer exists, clear any residual cache for it.
self._stream_plain_cache.pop(stream_id, None)
if self.queue_mgr.is_stream_finished(stream_id):
logger.debug(
"Stream already finished, returning end message: %s",
stream_id,
)
return NoneTo fully align cache lifecycle with queue lifecycle across the codebase, consider:
- In any method (possibly outside this file) that permanently tears down queues for a given
stream_id(e.g.,queue_mgr.remove_queues(stream_id)or similar), also removing the corresponding cache entry:self._stream_plain_cache.pop(stream_id, None)
- Ensuring all write sites for
_stream_plain_cachereside in this class and use a consistent pattern so that future lifecycle changes (like a more sophisticated TTL or LRU strategy) can be centralized. If writes are scattered across the codebase, refactor them into a helper like_set_stream_plain_cache(stream_id, value)that can enforce limits and logging in one place.
fixes: #3965
Modifications / 改动点
Screenshots or Test Results / 运行截图或测试结果
Checklist / 检查清单
requirements.txt和pyproject.toml文件相应位置。/ I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations inrequirements.txtandpyproject.toml.Summary by Sourcery
改进企业微信 AI Bot 的流式消息处理和状态管理,以获得更准确、一致的回复。
Bug 修复:
增强功能:
Original summary in English
Summary by Sourcery
Improve Wecom AI Bot streaming message handling and state management for more accurate, consistent responses.
Bug Fixes:
Enhancements: