diff --git a/astrbot/core/platform/sources/wecom_ai_bot/wecomai_adapter.py b/astrbot/core/platform/sources/wecom_ai_bot/wecomai_adapter.py index ac1797036..aba60e06c 100644 --- a/astrbot/core/platform/sources/wecom_ai_bot/wecomai_adapter.py +++ b/astrbot/core/platform/sources/wecom_ai_bot/wecomai_adapter.py @@ -127,6 +127,7 @@ def __init__( self.queue_mgr, self._handle_queued_message, ) + self._stream_plain_cache: dict[str, str] = {} self.webhook_client: WecomAIBotWebhookClient | None = None if self.msg_push_webhook_url: @@ -198,6 +199,7 @@ async def _process_message( # wechat server is requesting for updates of a stream stream_id = message_data["stream"]["id"] if not self.queue_mgr.has_back_queue(stream_id): + self._stream_plain_cache.pop(stream_id, None) if self.queue_mgr.is_stream_finished(stream_id): logger.debug( f"Stream already finished, returning end message: {stream_id}" @@ -225,24 +227,48 @@ async def _process_message( return None # aggregate all delta chains in the back queue - latest_plain_content = "" + cached_plain_content = self._stream_plain_cache.get(stream_id, "") + latest_plain_content = cached_plain_content image_base64 = [] finish = False while not queue.empty(): msg = await queue.get() if msg["type"] == "plain": - latest_plain_content = msg["data"] or "" + plain_data = msg.get("data") or "" + if msg.get("streaming", False): + # streaming plain payload is already cumulative + cached_plain_content = plain_data + else: + # segmented non-stream send() pushes plain chunks, needs append + cached_plain_content += plain_data + latest_plain_content = cached_plain_content elif msg["type"] == "image": image_base64.append(msg["image_data"]) + elif msg["type"] == "break": + continue elif msg["type"] in {"end", "complete"}: # stream end finish = True self.queue_mgr.remove_queues(stream_id, mark_finished=True) + self._stream_plain_cache.pop(stream_id, None) break logger.debug( f"Aggregated content: {latest_plain_content}, image: {len(image_base64)}, finish: {finish}", ) + if not finish: + self._stream_plain_cache[stream_id] = cached_plain_content + if finish and not latest_plain_content and not image_base64: + end_message = WecomAIBotStreamMessageBuilder.make_text_stream( + stream_id, + "", + True, + ) + return await self.api_client.encrypt_message( + end_message, + callback_params["nonce"], + callback_params["timestamp"], + ) if latest_plain_content or image_base64: msg_items = [] if finish and image_base64: diff --git a/astrbot/core/platform/sources/wecom_ai_bot/wecomai_event.py b/astrbot/core/platform/sources/wecom_ai_bot/wecomai_event.py index b9899f25e..0369a82af 100644 --- a/astrbot/core/platform/sources/wecom_ai_bot/wecomai_event.py +++ b/astrbot/core/platform/sources/wecom_ai_bot/wecomai_event.py @@ -186,7 +186,7 @@ async def send_streaming(self, generator, use_fallback=False) -> None: "type": "break", # break means a segment end "data": final_data, "streaming": True, - "session_id": self.session_id, + "session_id": stream_id, }, ) final_data = "" @@ -205,7 +205,7 @@ async def send_streaming(self, generator, use_fallback=False) -> None: "type": "complete", # complete means we return the final result "data": final_data, "streaming": True, - "session_id": self.session_id, + "session_id": stream_id, }, ) await super().send_streaming(generator, use_fallback)