diff --git a/frontend/src/renderer/src/components/ai-assistant/chat-history-list-item.tsx b/frontend/src/renderer/src/components/ai-assistant/chat-history-list-item.tsx index 582e9298..e1b2cfd6 100644 --- a/frontend/src/renderer/src/components/ai-assistant/chat-history-list-item.tsx +++ b/frontend/src/renderer/src/components/ai-assistant/chat-history-list-item.tsx @@ -1,12 +1,14 @@ import { FC, useState, MouseEvent } from 'react' -import { Typography, Trigger, Modal, Form, Input } from '@arco-design/web-react' +import { Typography, Trigger, Modal, Form, Input, Spin } from '@arco-design/web-react' import { IconDelete, IconMore } from '@arco-design/web-react/icon' import { useMemoizedFn, useRequest } from 'ahooks' import { conversationService } from '@renderer/services/conversation-service' import chatHistoryIcon from '@renderer/assets/icons/ai-assistant/chat-history.svg' import renameIcon from '@renderer/assets/icons/rename.svg' import clsx from 'clsx' +import { useAppDispatch, useAppSelector } from '@renderer/store' +import { deleteConversation as deleteConversationAction } from '@renderer/store/chat-history' export interface ChatHistoryListItemProps { conversation: any refreshConversationList?: () => void @@ -17,6 +19,12 @@ const ChatHistoryListItem: FC = (props) => { const { conversation, refreshConversationList, handleGetMessages } = props const [renameVisible, setRenameVisible] = useState(false) const [form] = Form.useForm() + const dispatch = useAppDispatch() + const backgroundGeneratingConversations = useAppSelector( + (state) => state.chatHistory.backgroundGeneratingConversations + ) + const isGenerating = backgroundGeneratingConversations.includes(conversation.id) + const { run: updateConversationTitle } = useRequest(conversationService.updateConversationTitle, { manual: true }) const { run: deleteConversation } = useRequest(conversationService.deleteConversation, { manual: true }) const handleDelete = useMemoizedFn(async () => { @@ -27,6 +35,8 @@ const ChatHistoryListItem: FC = (props) => { okText: 'Delete', onOk: () => { deleteConversation(conversation.id) + // Sync Redux state so the sidebar removes this item immediately + dispatch(deleteConversationAction(conversation.id)) refreshConversationList?.() } }) @@ -79,11 +89,14 @@ const ChatHistoryListItem: FC = (props) => { { 'bg-[#F6F7FA]': editMenuVisible } )} onClick={(e) => handleGetMessages?.(e, conversation.id)}> -
- +
+ {conversation.title || 'Untitled Conversation'} + {isGenerating && ( + + )}
{ + const dispatch = useAppDispatch() + const [chatState, setChatState] = useState({ messages: [], isLoading: false, @@ -42,11 +46,15 @@ export const useChatStream = () => { const [streamingMessage, setStreamingMessage] = useState(null) const currentStreamingId = useRef(null) + // Tracks which conversation is currently being streamed in this hook instance + const currentConversationId = useRef(0) - // Cleanup function + // Cleanup: do NOT abort the stream on unmount so that background generation + // in other conversations continues uninterrupted. The user can always stop + // via the explicit Stop button which calls stopStreaming(). useEffect(() => { return () => { - chatStreamService.abortStream() + // intentionally empty — background generation should survive navigation } }, []) @@ -55,6 +63,9 @@ export const useChatStream = () => { async (query: string, conversation_id: number, context?: ChatStreamRequest['context']) => { if (!query.trim() || chatState.isLoading) return + // Remember which conversation we're streaming into + currentConversationId.current = conversation_id + // Add user message const userMessage: ChatMessage = { role: 'user', @@ -104,6 +115,10 @@ export const useChatStream = () => { messageId: get(event, 'assistant_message_id', prev.messageId) })) } + // Mark this conversation as actively generating in global Redux state + if (currentConversationId.current) { + dispatch(addGeneratingConversation(currentConversationId.current)) + } break case 'thinking': @@ -196,6 +211,9 @@ export const useChatStream = () => { })) return null }) + if (currentConversationId.current) { + dispatch(removeGeneratingConversation(currentConversationId.current)) + } break case 'completed': @@ -224,6 +242,9 @@ export const useChatStream = () => { } setStreamingMessage(null) currentStreamingId.current = null + if (currentConversationId.current) { + dispatch(removeGeneratingConversation(currentConversationId.current)) + } break case 'fail': @@ -234,6 +255,9 @@ export const useChatStream = () => { currentStage: 'failed' })) setStreamingMessage(null) + if (currentConversationId.current) { + dispatch(removeGeneratingConversation(currentConversationId.current)) + } break case 'done': @@ -244,7 +268,7 @@ export const useChatStream = () => { })) break } - }, []) + }, [dispatch]) // Handle stream error const handleStreamError = useCallback((error: Error) => { @@ -277,7 +301,7 @@ export const useChatStream = () => { // Clear chat history const clearChat = useCallback(() => { - chatStreamService.abortStream() + chatStreamService.abortStreamForConversation(currentConversationId.current) setChatState({ messages: [], isLoading: false, @@ -291,13 +315,16 @@ export const useChatStream = () => { // Stop the current streaming request const stopStreaming = useCallback(() => { - chatStreamService.abortStream() + chatStreamService.abortStreamForConversation(currentConversationId.current) + if (currentConversationId.current) { + dispatch(removeGeneratingConversation(currentConversationId.current)) + } setChatState((prev) => ({ ...prev, isLoading: false })) setStreamingMessage(null) - }, []) + }, [dispatch]) return { ...chatState, diff --git a/frontend/src/renderer/src/services/ChatStreamService.ts b/frontend/src/renderer/src/services/ChatStreamService.ts index 08bdd36b..f9bc37f9 100644 --- a/frontend/src/renderer/src/services/ChatStreamService.ts +++ b/frontend/src/renderer/src/services/ChatStreamService.ts @@ -89,19 +89,26 @@ export interface StreamEvent { // Streaming chat service class export class ChatStreamService { - private abortController?: AbortController + // One AbortController per active conversation (keyed by conversation_id). + // Using conversation_id = 0 as a fallback when no id is available. + private controllers: Map = new Map() - // Send a streaming chat request + // Send a streaming chat request. + // Only aborts a pre-existing stream for the SAME conversation so that + // background generations in other conversations are unaffected. async sendStreamMessage( request: ChatStreamRequest, onEvent: (event: StreamEvent) => void, onError?: (error: Error) => void, onComplete?: () => void ): Promise { - // Cancel the previous request - this.abortStream() + const convId = request.conversation_id ?? 0 - this.abortController = new AbortController() + // Cancel any previous stream for THIS conversation only + this.abortStreamForConversation(convId) + + const controller = new AbortController() + this.controllers.set(convId, controller) try { // Use the baseURL of axiosInstance to ensure consistent ports @@ -112,7 +119,7 @@ export class ChatStreamService { 'Content-Type': 'application/json' }, body: JSON.stringify(request), - signal: this.abortController.signal + signal: controller.signal }) if (!response.ok) { @@ -170,17 +177,32 @@ export class ChatStreamService { console.error('Stream request failed:', error) onError?.(error as Error) + } finally { + // Clean up the controller entry when this conversation's stream ends + this.controllers.delete(convId) } } - // Cancel the current streaming request - abortStream(): void { - if (this.abortController) { - this.abortController.abort() - this.abortController = undefined + // Cancel the streaming request for a specific conversation + abortStreamForConversation(conversationId: number): void { + const controller = this.controllers.get(conversationId) + if (controller) { + controller.abort() + this.controllers.delete(conversationId) } } + // Cancel ALL active streaming requests (e.g., on app shutdown) + abortStream(): void { + this.controllers.forEach((ctrl) => ctrl.abort()) + this.controllers.clear() + } + + // Return whether a particular conversation currently has an active stream + isStreaming(conversationId: number): boolean { + return this.controllers.has(conversationId) + } + // Generate a session ID generateSessionId(): string { return 'session_' + Date.now() + '_' + Math.random().toString(36).substr(2, 9) diff --git a/frontend/src/renderer/src/store/chat-history.ts b/frontend/src/renderer/src/store/chat-history.ts index c17b4e6b..bdeac30a 100644 --- a/frontend/src/renderer/src/store/chat-history.ts +++ b/frontend/src/renderer/src/store/chat-history.ts @@ -29,6 +29,8 @@ export interface ChatHistoryState { loading: boolean // Error message error: string | null + // Conversations that currently have an active background generation stream + backgroundGeneratingConversations: number[] // AI assistant visibility for each page home: { aiAssistantVisible: boolean @@ -44,6 +46,7 @@ const initialState: ChatHistoryState = { chatHistoryMessages: [], loading: false, error: null, + backgroundGeneratingConversations: [], home: { aiAssistantVisible: false }, @@ -226,6 +229,26 @@ const chatHistorySlice = createSlice({ */ resetChatHistory(state) { Object.assign(state, initialState) + }, + + // ========== Background Generation Tracking ========== + + /** + * Mark a conversation as having an active background generation stream. + */ + addGeneratingConversation(state, action: PayloadAction) { + if (!state.backgroundGeneratingConversations.includes(action.payload)) { + state.backgroundGeneratingConversations.push(action.payload) + } + }, + + /** + * Remove a conversation from the active-generation set (stream finished/cancelled). + */ + removeGeneratingConversation(state, action: PayloadAction) { + state.backgroundGeneratingConversations = state.backgroundGeneratingConversations.filter( + (id) => id !== action.payload + ) } } }) @@ -254,7 +277,10 @@ export const { setLoading, setError, clearError, - resetChatHistory + resetChatHistory, + // Background generation tracking + addGeneratingConversation, + removeGeneratingConversation } = chatHistorySlice.actions export default chatHistorySlice.reducer diff --git a/opencontext/server/background_generation.py b/opencontext/server/background_generation.py new file mode 100644 index 00000000..e42306e7 --- /dev/null +++ b/opencontext/server/background_generation.py @@ -0,0 +1,214 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: Apache-2.0 + +""" +Background Generation Manager +Manages background AI generation tasks so they continue even if the SSE +HTTP connection is dropped by the client (e.g., user switches conversations). + +Design: + - Each generation is an asyncio.Task running independently of the SSE connection. + - The task writes chunks to the DB AND to an asyncio.Queue consumed by the SSE generator. + - If the SSE connection drops, the Queue simply accumulates (unlimited) and is garbage- + collected when the task finishes. The task itself keeps running. + - A global Semaphore(3) limits concurrent background generations. +""" + +import asyncio +from typing import Any, Callable, Coroutine, Dict, List, Optional + +from opencontext.utils.logging_utils import get_logger + +logger = get_logger(__name__) + +# Sentinel object — placed in queue to signal that generation has finished +_DONE = object() + +# Maximum number of concurrent background generation tasks +MAX_CONCURRENT = 3 + + +class BackgroundGenerationManager: + """ + Singleton manager for background AI generation tasks. + + Usage:: + + mgr = BackgroundGenerationManager.get_instance() + + # Start a task; returns an asyncio.Queue the SSE generator can read from. + queue = await mgr.submit_task( + session_id="...", + conversation_id=42, + coro_factory=lambda q: my_generation_coro(q, ...), + ) + + # SSE generator reads from the queue until it receives None (done sentinel). + while True: + item = await queue.get() + if item is _DONE: + break + yield ... + + # Hard-cancel a running task (e.g., on delete). + await mgr.cancel_task(session_id="...") + """ + + _instance: Optional["BackgroundGenerationManager"] = None + + def __new__(cls) -> "BackgroundGenerationManager": + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._ready = False + return cls._instance + + # ------------------------------------------------------------------ + # Lifecycle + # ------------------------------------------------------------------ + + def _ensure_ready(self) -> None: + """Lazy-initialize asyncio primitives (must run inside an event loop).""" + if self._ready: + return + self._semaphore: asyncio.Semaphore = asyncio.Semaphore(MAX_CONCURRENT) + self._tasks: Dict[str, asyncio.Task] = {} # session_id → Task + self._queues: Dict[str, asyncio.Queue] = {} # session_id → Queue + self._conv_map: Dict[str, int] = {} # session_id → conversation_id + self._ready = True + + @classmethod + def get_instance(cls) -> "BackgroundGenerationManager": + inst = cls() + inst._ensure_ready() + return inst + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + async def submit_task( + self, + session_id: str, + conversation_id: int, + coro_factory: Callable[["asyncio.Queue[Any]"], Coroutine], + ) -> "asyncio.Queue[Any]": + """ + Submit a generation coroutine as a background task. + + ``coro_factory`` is called with the per-task Queue as its sole argument and + must return a coroutine. The coroutine should: + - Put event dicts into the queue while generating. + - Put the ``DONE_SENTINEL`` (imported from this module) when finished. + - Handle ``asyncio.CancelledError`` gracefully (mark message cancelled in DB). + + Returns the Queue so the caller (SSE generator) can read events from it. + Raises ``RuntimeError`` with status 429 if the semaphore limit is reached. + """ + self._ensure_ready() + + # Check semaphore without blocking — return 429 if all slots are taken + if not self._semaphore._value: # type: ignore[attr-defined] + raise RuntimeError( + f"Maximum concurrent generation limit ({MAX_CONCURRENT}) reached. " + "Please wait for an existing generation to complete." + ) + + # Cancel any existing task for the same session + await self._cancel_session(session_id) + + queue: asyncio.Queue = asyncio.Queue() + self._queues[session_id] = queue + self._conv_map[session_id] = conversation_id + + async def _run() -> None: + async with self._semaphore: + try: + logger.info( + f"Background generation started: session={session_id}, conv={conversation_id}" + ) + await coro_factory(queue) + except asyncio.CancelledError: + logger.info(f"Background generation cancelled: session={session_id}") + # Signal SSE consumer to stop + await _safe_put(queue, _DONE) + # Re-raise so asyncio knows the task was cancelled + raise + except Exception as exc: + logger.exception(f"Background generation error: session={session_id}: {exc}") + await _safe_put(queue, {"type": "error", "content": str(exc)}) + await _safe_put(queue, _DONE) + finally: + self._cleanup(session_id) + logger.info(f"Background generation finished: session={session_id}") + + task = asyncio.create_task(_run(), name=f"gen-{session_id}") + self._tasks[session_id] = task + return queue + + async def cancel_task(self, session_id: str) -> None: + """Cancel a running background generation task and wait for it to stop.""" + await self._cancel_session(session_id) + + def is_running(self, session_id: str) -> bool: + """Return True if the given session has an active (not-done) task.""" + self._ensure_ready() + task = self._tasks.get(session_id) + return task is not None and not task.done() + + def get_active_sessions(self) -> List[Dict[str, Any]]: + """Return a list of {session_id, conversation_id} for all running tasks.""" + self._ensure_ready() + return [ + {"session_id": sid, "conversation_id": self._conv_map.get(sid)} + for sid, task in list(self._tasks.items()) + if not task.done() + ] + + def find_session_by_conversation(self, conversation_id: int) -> Optional[str]: + """Return the session_id for a running task associated with *conversation_id*, or None.""" + self._ensure_ready() + for sid, cid in list(self._conv_map.items()): + if cid == conversation_id and self.is_running(sid): + return sid + return None + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + async def _cancel_session(self, session_id: str) -> None: + self._ensure_ready() + task = self._tasks.get(session_id) + if task and not task.done(): + task.cancel() + try: + await asyncio.wait_for(asyncio.shield(task), timeout=5.0) + except (asyncio.CancelledError, asyncio.TimeoutError): + pass + self._cleanup(session_id) + + def _cleanup(self, session_id: str) -> None: + self._tasks.pop(session_id, None) + self._queues.pop(session_id, None) + self._conv_map.pop(session_id, None) + + +async def _safe_put(queue: asyncio.Queue, item: Any) -> None: + """Put an item in the queue without blocking (best-effort).""" + try: + queue.put_nowait(item) + except Exception: + pass + + +# Module-level sentinel export so the SSE generator can import it +DONE_SENTINEL = _DONE + + +# Convenience accessor +def get_background_manager() -> BackgroundGenerationManager: + """Return the ready singleton instance.""" + return BackgroundGenerationManager.get_instance() diff --git a/opencontext/server/routes/agent_chat.py b/opencontext/server/routes/agent_chat.py index f4e6c30d..d70adc6a 100644 --- a/opencontext/server/routes/agent_chat.py +++ b/opencontext/server/routes/agent_chat.py @@ -9,6 +9,7 @@ Intelligent conversation routing based on Context Agent """ +import asyncio import json import uuid from typing import Any, Dict, Optional @@ -20,6 +21,11 @@ from opencontext.context_consumption.context_agent import ContextAgent from opencontext.context_consumption.context_agent.models import WorkflowStage from opencontext.context_consumption.context_agent.models.enums import EventType +from opencontext.server.background_generation import ( + DONE_SENTINEL, + _safe_put, + get_background_manager, +) from opencontext.server.middleware.auth import auth_dependency from opencontext.storage.global_storage import get_storage from opencontext.utils.logging_utils import get_logger @@ -117,167 +123,194 @@ async def chat(request: ChatRequest, _auth: str = auth_dependency) -> ChatRespon @router.post("/chat/stream") async def chat_stream(request: ChatRequest, _auth: str = auth_dependency): - """Intelligent chat interface (streaming)""" + """Intelligent chat interface (streaming, background generation)""" + + manager = get_background_manager() + storage = get_storage() + + if not request.session_id: + request.session_id = str(uuid.uuid4()) + + user_message_id: Optional[int] = None + assistant_message_id: Optional[int] = None + + # ------------------------------------------------------------------ # + # Create DB rows before starting the background task so that the # + # SSE session_start event can carry the real assistant_message_id. # + # ------------------------------------------------------------------ # + if request.conversation_id: + user_message_id = storage.create_message( + conversation_id=request.conversation_id, + role="user", + content=request.query, + is_complete=True, + ) + logger.info( + f"Created user message {user_message_id} in conversation {request.conversation_id}" + ) - async def generate(): - user_message_id = None - assistant_message_id = None - storage = None + if request.query and request.query.strip(): + conversation = storage.get_conversation(request.conversation_id) + if conversation and not conversation.get("title"): + title = request.query[:50].strip() + storage.update_conversation(conversation_id=request.conversation_id, title=title) + logger.info(f"Set conversation {request.conversation_id} title: {title}") + assistant_message_id = storage.create_streaming_message( + conversation_id=request.conversation_id, + role="assistant", + ) + logger.info(f"Created assistant streaming message {assistant_message_id}") + active_streams[assistant_message_id] = False # interrupt flag + + # ------------------------------------------------------------------ # + # Background coroutine: runs the agent and writes all output to DB. # + # Events are also forwarded to the per-task asyncio.Queue so that # + # the SSE generator (below) can relay them to the HTTP client. # + # If the HTTP client disconnects, the queue accumulates events but # + # the coroutine keeps running until generation finishes. # + # ------------------------------------------------------------------ # + # Capture local references for the closure + _session_id = request.session_id + _query = request.query + _user_id = request.user_id + _context = request.context + _conv_id = request.conversation_id + _asst_mid = assistant_message_id + + async def generation_coro(queue: asyncio.Queue) -> None: try: agent = get_agent() - storage = get_storage() - - if not request.session_id: - request.session_id = str(uuid.uuid4()) - - # Save user message if conversation_id is provided - if request.conversation_id: - user_message_id = storage.create_message( - conversation_id=request.conversation_id, - role="user", - content=request.query, - is_complete=True - ) - logger.info(f"Created user message {user_message_id} in conversation {request.conversation_id}") - - # Update conversation title with user's question only if not already set - if request.query and request.query.strip(): - conversation = storage.get_conversation(request.conversation_id) - if conversation and not conversation.get("title"): - title = request.query[:50].strip() - storage.update_conversation( - conversation_id=request.conversation_id, - title=title - ) - logger.info(f"Set conversation {request.conversation_id} title from user query: {title}") - - # Create streaming assistant message if conversation_id is provided - if request.conversation_id: - assistant_message_id = storage.create_streaming_message( - conversation_id=request.conversation_id, - role="assistant" - ) - logger.info(f"Created assistant streaming message {assistant_message_id}") - # Register this message as an active stream - active_streams[assistant_message_id] = False - - # Send session start event with assistant_message_id - yield f"data: {json.dumps({'type': 'session_start', 'session_id': request.session_id, 'assistant_message_id': assistant_message_id}, ensure_ascii=False)}\n\n" - - args = { - "query": request.query, - "session_id": request.session_id, - "user_id": request.user_id, - } - if request.context: - args.update(request.context) - - accumulated_content = "" - event_metadata = {} # Store events by type - interrupted = False # Track if stream was interrupted + args: Dict[str, Any] = {"query": _query, "session_id": _session_id, "user_id": _user_id} + if _context: + args.update(_context) + + event_metadata: Dict[str, Any] = {} + interrupted = False async for event in agent.process_stream(**args): - # Check interrupt flag (in-memory, no database query) - if assistant_message_id and active_streams.get(assistant_message_id): - logger.info(f"Message {assistant_message_id} was interrupted, stopping stream") + # Check in-memory interrupt flag + if _asst_mid and active_streams.get(_asst_mid): + logger.info(f"Message {_asst_mid} was interrupted, stopping stream") interrupted = True - yield f"data: {json.dumps({'type': 'interrupted', 'content': 'Message generation was interrupted'}, ensure_ascii=False)}\n\n" + await _safe_put( + queue, + {"type": "interrupted", "content": "Message generation was interrupted"}, + ) break converted_event = event.to_dict() - # Save event content based on type - if assistant_message_id and event.content: - # Check if this is a thinking event + # Persist event to DB + if _asst_mid and event.content: if event.type == EventType.THINKING: - # Save thinking messages separately to message_thinking table storage.add_message_thinking( - message_id=assistant_message_id, + message_id=_asst_mid, content=event.content, stage=event.stage.value if event.stage else None, - progress=event.progress if hasattr(event, 'progress') else 0.0, - metadata=event.metadata if hasattr(event, 'metadata') else None + progress=getattr(event, "progress", 0.0), + metadata=getattr(event, "metadata", None), ) - logger.debug(f"Saved thinking to message {assistant_message_id}: stage={event.stage.value if event.stage else 'unknown'}, content_len={len(event.content)}") elif event.type == EventType.STREAM_CHUNK: - # Only stream_chunk content goes to message.content - accumulated_content += event.content storage.append_message_content( - message_id=assistant_message_id, + message_id=_asst_mid, content_chunk=event.content, - token_count=1 # Approximate token count + token_count=1, ) - logger.debug(f"Appended stream_chunk to message {assistant_message_id}: content_len={len(event.content)}") else: - # Other event types (running, done, etc.) go to metadata as lists - event_type_key = event.type.value - if event_type_key not in event_metadata: - event_metadata[event_type_key] = [] - event_metadata[event_type_key].append({ - "content": event.content, - "timestamp": event.timestamp.isoformat() if hasattr(event, 'timestamp') else None, - "stage": event.stage.value if event.stage else None, - "progress": event.progress if hasattr(event, 'progress') else None, - }) - logger.debug(f"Added {event_type_key} event to metadata for message {assistant_message_id}") - - yield f"data: {json.dumps(converted_event, ensure_ascii=False)}\n\n" + key = event.type.value + event_metadata.setdefault(key, []).append( + { + "content": event.content, + "timestamp": ( + event.timestamp.isoformat() + if hasattr(event, "timestamp") + else None + ), + "stage": event.stage.value if event.stage else None, + "progress": getattr(event, "progress", None), + } + ) + + await _safe_put(queue, converted_event) if event.stage in [WorkflowStage.COMPLETED, WorkflowStage.FAILED]: - # Update metadata with collected events before finishing - if assistant_message_id and event_metadata: + if _asst_mid and event_metadata: storage.update_message_metadata( - message_id=assistant_message_id, - metadata=event_metadata + message_id=_asst_mid, metadata=event_metadata ) - logger.info(f"Updated message {assistant_message_id} metadata with {len(event_metadata)} event types") - - # Mark assistant message as finished - if assistant_message_id: + if _asst_mid: status = "completed" if event.stage == WorkflowStage.COMPLETED else "failed" storage.mark_message_finished( - message_id=assistant_message_id, + message_id=_asst_mid, status=status, - error_message=event.metadata.get("error") if status == "failed" else None + error_message=( + event.metadata.get("error") if status == "failed" else None + ), ) - logger.info(f"Marked assistant message {assistant_message_id} as {status}") + logger.info(f"Marked assistant message {_asst_mid} as {status}") break - # Handle interrupted stream - save accumulated data and mark as cancelled - if interrupted and assistant_message_id: - # Update metadata with collected events + if interrupted and _asst_mid: if event_metadata: - storage.update_message_metadata( - message_id=assistant_message_id, - metadata=event_metadata - ) - logger.info(f"Updated interrupted message {assistant_message_id} metadata with {len(event_metadata)} event types") - - # Mark message as cancelled (status already set by interrupt endpoint) - logger.info(f"Message {assistant_message_id} interrupted with {len(accumulated_content)} characters saved") - - except Exception as e: - logger.exception(f"Stream chat failed: {e}") + storage.update_message_metadata(message_id=_asst_mid, metadata=event_metadata) + logger.info(f"Message {_asst_mid} interrupted (partial content saved)") - # Mark assistant message as failed if it exists - if assistant_message_id and storage: + except asyncio.CancelledError: + logger.info(f"Generation task cancelled for session {_session_id}") + if _asst_mid: + try: + storage.mark_message_finished(message_id=_asst_mid, status="cancelled") + except Exception: + pass + raise + + except Exception as exc: + logger.exception(f"Background generation failed: {exc}") + if _asst_mid: try: storage.mark_message_finished( - message_id=assistant_message_id, - status="failed", - error_message=str(e) + message_id=_asst_mid, status="failed", error_message=str(exc) ) - except Exception as mark_error: - logger.exception(f"Failed to mark message as failed: {mark_error}") - - yield f"data: {json.dumps({'type': 'error', 'content': str(e)}, ensure_ascii=False)}\n\n" + except Exception: + pass + await _safe_put(queue, {"type": "error", "content": str(exc)}) finally: - # Clean up the interrupt flag when stream ends - if assistant_message_id and assistant_message_id in active_streams: - del active_streams[assistant_message_id] - logger.debug(f"Cleaned up interrupt flag for message {assistant_message_id}") + if _asst_mid and _asst_mid in active_streams: + del active_streams[_asst_mid] + + # Submit the background task; raises HTTPException 429 if limit reached + try: + queue = await manager.submit_task( + session_id=_session_id, + conversation_id=_conv_id or 0, + coro_factory=generation_coro, + ) + except RuntimeError as exc: + raise HTTPException(status_code=429, detail=str(exc)) + + # ------------------------------------------------------------------ # + # SSE generator: relays events from the queue to the HTTP client. # + # If the client disconnects, this generator exits but the background # + # task continues running. # + # ------------------------------------------------------------------ # + async def generate(): + # Send session_start immediately (carries assistant_message_id) + yield f"data: {json.dumps({'type': 'session_start', 'session_id': _session_id, 'assistant_message_id': _asst_mid}, ensure_ascii=False)}\n\n" + + while True: + try: + item = await asyncio.wait_for(queue.get(), timeout=30.0) + except asyncio.TimeoutError: + # Keep-alive heartbeat + yield f"data: {json.dumps({'type': 'heartbeat'}, ensure_ascii=False)}\n\n" + continue + + if item is DONE_SENTINEL: + break + + yield f"data: {json.dumps(item, ensure_ascii=False)}\n\n" return StreamingResponse( generate(), @@ -290,6 +323,23 @@ async def generate(): ) +@router.get("/generation/status") +async def get_generation_status(_auth: str = auth_dependency): + """Return all active background generation sessions.""" + manager = get_background_manager() + return {"success": True, "active_sessions": manager.get_active_sessions()} + + +@router.delete("/generation/{session_id}") +async def cancel_generation(session_id: str, _auth: str = auth_dependency): + """Cancel an active background generation task by session_id.""" + manager = get_background_manager() + if not manager.is_running(session_id): + return {"success": False, "message": f"No active generation for session {session_id}"} + await manager.cancel_task(session_id) + return {"success": True, "message": f"Generation {session_id} cancelled"} + + @router.post("/resume/{workflow_id}") async def resume_workflow(workflow_id: str, request: ResumeRequest, _auth: str = auth_dependency): """Resume workflow execution""" diff --git a/opencontext/server/routes/conversation.py b/opencontext/server/routes/conversation.py index 3abda160..b90d6b6c 100644 --- a/opencontext/server/routes/conversation.py +++ b/opencontext/server/routes/conversation.py @@ -14,6 +14,7 @@ from fastapi import APIRouter, Depends, HTTPException, Query from pydantic import BaseModel, Field +from opencontext.server.background_generation import get_background_manager from opencontext.server.middleware.auth import auth_dependency from opencontext.storage.global_storage import get_storage from opencontext.utils.logging_utils import get_logger @@ -27,19 +28,22 @@ # --- Pydantic Models --- # Based on API spec 4.1.1 - 4.1.5 + class CreateConversationRequest(BaseModel): """Request model for 4.1.1 Create Conversation""" - page_name: str = Field(..., - description="Page name, e.g., 'home' or 'creation'") - document_id: Optional[str] = Field(None, - description="Optional document ID to store in metadata") + + page_name: str = Field(..., description="Page name, e.g., 'home' or 'creation'") + document_id: Optional[str] = Field( + None, description="Optional document ID to store in metadata" + ) class ConversationResponse(BaseModel): """ - Response model for 4.1.1 (Create), 4.1.2 (Get Detail), + Response model for 4.1.1 (Create), 4.1.2 (Get Detail), and 4.1.4 (Update Title). """ + id: int title: Optional[str] = None user_id: Optional[str] = None @@ -52,28 +56,33 @@ class ConversationResponse(BaseModel): class ConversationSummary(ConversationResponse): """Individual conversation item for list response (matches detail)""" + pass class GetConversationListResponse(BaseModel): """Response model for 4.1.3 Get Conversation List""" + items: List[ConversationSummary] total: int class UpdateConversationRequest(BaseModel): """Request model for 4.1.4 Update Title""" + title: str class DeleteConversationResponse(BaseModel): """Response model for 4.1.5 Delete Conversation""" + success: bool id: int # --- API Endpoints --- + @router.post("/conversations", response_model=ConversationResponse) async def create_conversation( request: CreateConversationRequest, @@ -91,15 +100,10 @@ async def create_conversation( metadata = {"document_id": request.document_id} # user_id is optional in the backend and can be added later - conversation = storage.create_conversation( - page_name=request.page_name, - metadata=metadata - ) + conversation = storage.create_conversation(page_name=request.page_name, metadata=metadata) if not conversation: - raise HTTPException( - status_code=500, detail="Failed to create conversation in database" - ) + raise HTTPException(status_code=500, detail="Failed to create conversation in database") # The backend's `create_conversation` calls `get_conversation`, # which returns a dict that matches the ConversationResponse model. @@ -114,12 +118,9 @@ async def create_conversation( async def get_conversation_list( limit: int = Query(default=20, description="Return limit"), offset: int = Query(default=0, description="Offset"), - page_name: Optional[str] = Query( - default=None, description="Filter by page_name"), - user_id: Optional[str] = Query( - default=None, description="Filter by user_id"), - status: str = Query( - default="active", description="Filter by status ('active', 'deleted')"), + page_name: Optional[str] = Query(default=None, description="Filter by page_name"), + user_id: Optional[str] = Query(default=None, description="Filter by user_id"), + status: str = Query(default="active", description="Filter by status ('active', 'deleted')"), _auth: str = auth_dependency, ): """ @@ -131,11 +132,7 @@ async def get_conversation_list( # The backend method returns a dict: {"items": [], "total": 0} # which directly matches the GetConversationListResponse model. result = storage.get_conversation_list( - limit=limit, - offset=offset, - page_name=page_name, - user_id=user_id, - status=status + limit=limit, offset=offset, page_name=page_name, user_id=user_id, status=status ) return result @@ -158,8 +155,7 @@ async def get_conversation_detail( conversation = storage.get_conversation(conversation_id=cid) if not conversation: - raise HTTPException( - status_code=404, detail="Conversation not found") + raise HTTPException(status_code=404, detail="Conversation not found") return conversation @@ -184,15 +180,10 @@ async def update_conversation_title( # The backend's `update_conversation` calls `get_conversation` # on success, returning the updated object. - updated_convo = storage.update_conversation( - conversation_id=cid, - title=request.title - ) + updated_convo = storage.update_conversation(conversation_id=cid, title=request.title) if not updated_convo: - raise HTTPException( - status_code=404, detail="Conversation not found or update failed" - ) + raise HTTPException(status_code=404, detail="Conversation not found or update failed") return updated_convo @@ -209,20 +200,25 @@ async def delete_conversation( _auth: str = auth_dependency, ): """ - 4.1.5 Mark a conversation as deleted (soft delete) + 4.1.5 Hard-delete a conversation (cascade: messages + thinking records). + Any active background generation for this conversation is cancelled first. """ try: storage = get_storage() + manager = get_background_manager() + + # Cancel any in-progress generation for this conversation + session_id = manager.find_session_by_conversation(cid) + if session_id: + await manager.cancel_task(session_id) + logger.info( + f"Cancelled background generation {session_id} before deleting conversation {cid}" + ) - # The backend's `delete_conversation` method handles setting - # the status to 'deleted' and returns the exact format - # required by `DeleteConversationResponse`. - result = storage.delete_conversation(conversation_id=cid) + result = storage.hard_delete_conversation(conversation_id=cid) if not result.get("success"): - raise HTTPException( - status_code=404, detail="Conversation not found or delete failed" - ) + raise HTTPException(status_code=404, detail="Conversation not found or delete failed") return result diff --git a/opencontext/storage/backends/sqlite_backend.py b/opencontext/storage/backends/sqlite_backend.py index efc5af7a..63b11ce9 100644 --- a/opencontext/storage/backends/sqlite_backend.py +++ b/opencontext/storage/backends/sqlite_backend.py @@ -41,22 +41,19 @@ def initialize(self, config: Dict[str, Any]) -> bool: """Initialize SQLite database""" try: # Use path from configuration, default to ./persist/sqlite/app.db - self.db_path = config.get("config", {}).get( - "path", "./persist/sqlite/app.db") + self.db_path = config.get("config", {}).get("path", "./persist/sqlite/app.db") # Ensure directory exists os.makedirs(os.path.dirname(self.db_path), exist_ok=True) - self.connection = sqlite3.connect( - self.db_path, check_same_thread=False) + self.connection = sqlite3.connect(self.db_path, check_same_thread=False) self.connection.row_factory = sqlite3.Row # Allow column name access # Create table structure self._create_tables() self._initialized = True - logger.info( - f"SQLite backend initialized successfully, database path: {self.db_path}") + logger.info(f"SQLite backend initialized successfully, database path: {self.db_path}") return True except Exception as e: @@ -254,25 +251,17 @@ def _create_tables(self): ) # New table indexes - cursor.execute( - "CREATE INDEX IF NOT EXISTS idx_vaults_created ON vaults (created_at)") - cursor.execute( - "CREATE INDEX IF NOT EXISTS idx_vaults_type ON vaults (document_type)") - cursor.execute( - "CREATE INDEX IF NOT EXISTS idx_vaults_folder ON vaults (is_folder)") - cursor.execute( - "CREATE INDEX IF NOT EXISTS idx_vaults_deleted ON vaults (is_deleted)") - cursor.execute( - "CREATE INDEX IF NOT EXISTS idx_todo_status ON todo (status)") - cursor.execute( - "CREATE INDEX IF NOT EXISTS idx_todo_urgency ON todo (urgency)") - cursor.execute( - "CREATE INDEX IF NOT EXISTS idx_todo_created ON todo (created_at)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_vaults_created ON vaults (created_at)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_vaults_type ON vaults (document_type)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_vaults_folder ON vaults (is_folder)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_vaults_deleted ON vaults (is_deleted)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_todo_status ON todo (status)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_todo_urgency ON todo (urgency)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_todo_created ON todo (created_at)") cursor.execute( "CREATE INDEX IF NOT EXISTS idx_activity_time ON activity (start_time, end_time)" ) - cursor.execute( - "CREATE INDEX IF NOT EXISTS idx_tips_time ON tips (created_at)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_tips_time ON tips (created_at)") # Monitoring table indexes cursor.execute( @@ -295,10 +284,8 @@ def _create_tables(self): ) # Conversation/Message indexes - cursor.execute( - "CREATE INDEX IF NOT EXISTS idx_messages_created_at ON messages(created_at)") - cursor.execute( - "CREATE INDEX IF NOT EXISTS idx_messages_status ON messages(status)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_messages_created_at ON messages(created_at)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_messages_status ON messages(status)") cursor.execute( "CREATE INDEX IF NOT EXISTS idx_messages_conversation_id ON messages(conversation_id)" ) @@ -347,23 +334,20 @@ def _insert_default_vault_document(self): cursor = self.connection.cursor() # Check if Quick Start document already exists - cursor.execute( - "SELECT COUNT(*) FROM vaults WHERE title = 'Start With Tutorial'") + cursor.execute("SELECT COUNT(*) FROM vaults WHERE title = 'Start With Tutorial'") if cursor.fetchone()[0] > 0: return try: config_dir = "./config" - quick_start_file = os.path.join( - config_dir, "quick_start_default.md") + quick_start_file = os.path.join(config_dir, "quick_start_default.md") if os.path.exists(quick_start_file): with open(quick_start_file, "r", encoding="utf-8") as f: default_content = f.read() else: # If file doesn't exist, use fallback content - logger.error( - f"Quick Start document {quick_start_file} does not exist") + logger.error(f"Quick Start document {quick_start_file} does not exist") default_content = "Welcome to MineContext!\n\nYour Context-Aware AI Partner is ready to help you work, study, and create better." except Exception as e: @@ -402,8 +386,7 @@ def _insert_default_vault_document(self): event_manager.publish_event(event_type=event_type, data=data) except Exception as e: - logger.exception( - f"Failed to insert default Quick Start document: {e}") + logger.exception(f"Failed to insert default Quick Start document: {e}") self.connection.rollback() # Report table operations @@ -690,8 +673,7 @@ def get_todos( if status is not None: where_conditions.append("status = ?") params.append(status) - where_clause = " AND ".join( - where_conditions) if where_conditions else "1=1" + where_clause = " AND ".join(where_conditions) if where_conditions else "1=1" params.extend([limit, offset]) cursor.execute( f""" @@ -810,8 +792,7 @@ def get_activities( where_conditions.append("end_time <= ?") params.append(end_time) - where_clause = " AND ".join( - where_conditions) if where_conditions else "1=1" + where_clause = " AND ".join(where_conditions) if where_conditions else "1=1" params.extend([limit, offset]) cursor.execute( @@ -879,8 +860,7 @@ def get_tips( where_conditions.append("created_at <= ?") params.append(end_time.isoformat()) - where_clause = " AND ".join( - where_conditions) if where_conditions else "1=1" + where_clause = " AND ".join(where_conditions) if where_conditions else "1=1" params.extend([limit, offset]) cursor.execute( @@ -1408,8 +1388,7 @@ def get_conversation_list( where_clauses.append("user_id = ?") params.append(user_id) - where_sql = " AND ".join( - where_clauses) if where_clauses else "1=1" + where_sql = " AND ".join(where_clauses) if where_clauses else "1=1" # Get total count count_params = params[:] @@ -1487,7 +1466,8 @@ def update_conversation( return self.get_conversation(conversation_id) else: logger.warning( - f"Failed to update conversation {conversation_id}, row not found or no change.") + f"Failed to update conversation {conversation_id}, row not found or no change." + ) return None except Exception as e: self.connection.rollback() @@ -1499,17 +1479,61 @@ def delete_conversation(self, conversation_id: int) -> Dict[str, Any]: Mark a conversation as deleted (4.1.5) """ # Note: The spec uses 'delected', we'll update status to 'deleted' - updated_convo = self.update_conversation( - conversation_id=conversation_id, status="deleted" - ) + updated_convo = self.update_conversation(conversation_id=conversation_id, status="deleted") success = updated_convo is not None return {"success": success, "id": conversation_id} + def hard_delete_conversation(self, conversation_id: int) -> Dict[str, Any]: + """ + Permanently delete a conversation and all its associated data + (messages, thinking records). This is irreversible. + + Delete order (FK-safe): + 1. message_thinking (references messages.id) + 2. messages (references conversations.id) + 3. conversations (root) + """ + if not self._initialized: + logger.error("Unified storage system not initialized") + return {"success": False, "id": conversation_id} + + cursor = self.connection.cursor() + try: + # 1. Delete thinking records for every message in this conversation + cursor.execute( + """ + DELETE FROM message_thinking + WHERE message_id IN ( + SELECT id FROM messages WHERE conversation_id = ? + ) + """, + (conversation_id,), + ) + # 2. Delete all messages + cursor.execute( + "DELETE FROM messages WHERE conversation_id = ?", + (conversation_id,), + ) + # 3. Delete the conversation row itself + cursor.execute( + "DELETE FROM conversations WHERE id = ?", + (conversation_id,), + ) + self.connection.commit() + logger.info(f"Hard-deleted conversation {conversation_id} and all associated data") + return {"success": True, "id": conversation_id} + except Exception as e: + self.connection.rollback() + logger.exception(f"Failed to hard-delete conversation {conversation_id}: {e}") + return {"success": False, "id": conversation_id, "error": str(e)} + # ----------------------------------------------------------------- # Conversation/Message operations (Continued) # ----------------------------------------------------------------- - def get_message(self, message_id: int, include_thinking: bool = True) -> Optional[Dict[str, Any]]: + def get_message( + self, message_id: int, include_thinking: bool = True + ) -> Optional[Dict[str, Any]]: """ Get a single message by its ID, optionally including thinking records. @@ -1537,7 +1561,7 @@ def get_message(self, message_id: int, include_thinking: bool = True) -> Optiona # Include thinking records if requested if include_thinking: - message['thinking'] = self.get_message_thinking(message_id) + message["thinking"] = self.get_message_thinking(message_id) return message return None @@ -1677,8 +1701,7 @@ def update_message( if cursor.rowcount > 0: return self.get_message(message_id) else: - logger.warning( - f"Failed to update message {message_id}, not found.") + logger.warning(f"Failed to update message {message_id}, not found.") return None except Exception as e: self.connection.rollback() @@ -1716,8 +1739,7 @@ def append_message_content( ) if cursor.rowcount == 0: - logger.warning( - f"Failed to append message {message_id}, not found.") + logger.warning(f"Failed to append message {message_id}, not found.") return False # Update conversation's updated_at @@ -1736,11 +1758,7 @@ def append_message_content( logger.exception(f"Failed to append message content: {e}") return False - def update_message_metadata( - self, - message_id: int, - metadata: Dict[str, Any] - ) -> bool: + def update_message_metadata(self, message_id: int, metadata: Dict[str, Any]) -> bool: """ Update message metadata """ @@ -1770,10 +1788,7 @@ def update_message_metadata( return False def mark_message_finished( - self, - message_id: int, - status: str = "completed", - error_message: Optional[str] = None + self, message_id: int, status: str = "completed", error_message: Optional[str] = None ) -> bool: """ Mark a message as finished (completed, failed, or cancelled) (4.2.6 & Interrupt) @@ -1809,14 +1824,14 @@ def mark_message_finished( success = cursor.rowcount > 0 if not success: # Check if it failed because it was already in the desired state - cursor.execute( - "SELECT status FROM messages WHERE id = ?", (message_id,)) + cursor.execute("SELECT status FROM messages WHERE id = ?", (message_id,)) row = cursor.fetchone() if row and row[0] == status: success = True # Already done, count as success else: logger.warning( - f"Failed to mark message {message_id} as {status}, not found or no change.") + f"Failed to mark message {message_id} as {status}, not found or no change." + ) # Update conversation's updated_at cursor.execute( @@ -1839,9 +1854,7 @@ def interrupt_message(self, message_id: int) -> bool: Interrupt a streaming message (marks as 'cancelled') """ return self.mark_message_finished( - message_id=message_id, - status="cancelled", - error_message="Message interrupted by user." + message_id=message_id, status="cancelled", error_message="Message interrupted by user." ) def get_conversation_messages(self, conversation_id: int) -> List[Dict[str, Any]]: @@ -1868,7 +1881,7 @@ def get_conversation_messages(self, conversation_id: int) -> List[Dict[str, Any] for row in rows: message = dict(row) # Add thinking records for this message - message['thinking'] = self.get_message_thinking(message['id']) + message["thinking"] = self.get_message_thinking(message["id"]) messages.append(message) return messages except Exception as e: @@ -1891,10 +1904,7 @@ def delete_message(self, message_id: int) -> bool: cursor = self.connection.cursor() try: - cursor.execute( - "DELETE FROM messages WHERE id = ?", - (message_id,) - ) + cursor.execute("DELETE FROM messages WHERE id = ?", (message_id,)) self.connection.commit() return cursor.rowcount > 0 except Exception as e: @@ -1937,7 +1947,7 @@ def add_message_thinking( if sequence is None: cursor.execute( "SELECT COALESCE(MAX(sequence), -1) + 1 FROM message_thinking WHERE message_id = ?", - (message_id,) + (message_id,), ) sequence = cursor.fetchone()[0] @@ -1982,7 +1992,7 @@ def get_message_thinking(self, message_id: int) -> List[Dict[str, Any]]: WHERE message_id = ? ORDER BY sequence ASC, created_at ASC """, - (message_id,) + (message_id,), ) rows = cursor.fetchall() return [dict(row) for row in rows] @@ -2005,10 +2015,7 @@ def clear_message_thinking(self, message_id: int) -> bool: cursor = self.connection.cursor() try: - cursor.execute( - "DELETE FROM message_thinking WHERE message_id = ?", - (message_id,) - ) + cursor.execute("DELETE FROM message_thinking WHERE message_id = ?", (message_id,)) self.connection.commit() return True except Exception as e: @@ -2041,8 +2048,7 @@ def query( # Filter conditions if filters: if "content_type" in filters: - where_conditions.append( - 'JSON_EXTRACT(metadata, "$.content_type") = ?') + where_conditions.append('JSON_EXTRACT(metadata, "$.content_type") = ?') params.append(filters["content_type"]) if "data_type" in filters: @@ -2051,21 +2057,19 @@ def query( if "tags" in filters: tags = ( - filters["tags"] if isinstance(filters["tags"], list) else [ - filters["tags"]] + filters["tags"] if isinstance(filters["tags"], list) else [filters["tags"]] ) if tags: # Use proper parameterized query for tags tag_placeholders = ",".join(["?"] * len(tags)) where_conditions.append( - f'id IN (SELECT document_id FROM document_tags WHERE tag IN ({tag_placeholders}))' + f"id IN (SELECT document_id FROM document_tags WHERE tag IN ({tag_placeholders}))" ) for tag in tags: params.append(tag.lower()) # Build SQL query - where_clause = " AND ".join( - where_conditions) if where_conditions else "1=1" + where_clause = " AND ".join(where_conditions) if where_conditions else "1=1" # Get documents # Use text() for safe SQL composition with parameters @@ -2074,10 +2078,14 @@ def query( FROM documents d LEFT JOIN document_tags dt ON d.id = dt.document_id WHERE """ - sql = base_sql + where_clause + """ + sql = ( + base_sql + + where_clause + + """ ORDER BY d.updated_at DESC LIMIT ? """ + ) params.append(limit) cursor.execute(sql, params) @@ -2087,8 +2095,7 @@ def query( for row in rows: # Get images for each document cursor.execute( - "SELECT image_path FROM images WHERE document_id = ? ORDER BY id", ( - row["id"],) + "SELECT image_path FROM images WHERE document_id = ? ORDER BY id", (row["id"],) ) images = [img_row[0] for img_row in cursor.fetchall()] diff --git a/opencontext/storage/unified_storage.py b/opencontext/storage/unified_storage.py index 51bc8e91..1ece93fa 100644 --- a/opencontext/storage/unified_storage.py +++ b/opencontext/storage/unified_storage.py @@ -472,6 +472,18 @@ def delete_conversation(self, conversation_id: int) -> Dict[str, Any]: return self._document_backend.delete_conversation(conversation_id) + def hard_delete_conversation(self, conversation_id: int) -> Dict[str, Any]: + """Permanently delete conversation and all associated messages / thinking records.""" + if not self._initialized: + logger.error("Unified storage system not initialized") + return {"success": False, "id": conversation_id} + + if not self._document_backend: + logger.error("Document database backend not initialized") + return {"success": False, "id": conversation_id} + + return self._document_backend.hard_delete_conversation(conversation_id) + def insert_vaults( self, title: str, @@ -836,9 +848,7 @@ def append_message_content( message_id=message_id, content_chunk=content_chunk, token_count=token_count ) - def update_message_metadata( - self, message_id: int, metadata: Dict[str, Any] - ) -> bool: + def update_message_metadata(self, message_id: int, metadata: Dict[str, Any]) -> bool: """Update message metadata""" if not self._initialized or not self._document_backend: logger.error("Storage not initialized")