From ca3da4a1c354ad78dfb3d3805572d89f91bf4023 Mon Sep 17 00:00:00 2001 From: Eurekaxun Date: Fri, 8 May 2026 17:53:21 +0800 Subject: [PATCH] fix(openclaw-plugin): dedupe afterTurn replay with raw tail --- examples/openclaw-plugin/client.ts | 19 + examples/openclaw-plugin/context-engine.ts | 427 +++++++++++++++++- .../openclaw-plugin/tests/ut/client.test.ts | 24 + .../tests/ut/context-engine-afterTurn.test.ts | 387 +++++++++++++++- .../tests/ut/dedup-after-turn-batch.test.ts | 253 +++++++++++ openviking/server/routers/sessions.py | 25 + openviking/session/session.py | 36 ++ tests/server/test_api_sessions.py | 38 ++ tests/session/test_session_context.py | 31 ++ 9 files changed, 1222 insertions(+), 18 deletions(-) create mode 100644 examples/openclaw-plugin/tests/ut/dedup-after-turn-batch.test.ts diff --git a/examples/openclaw-plugin/client.ts b/examples/openclaw-plugin/client.ts index 308c8aed7..9716b046e 100644 --- a/examples/openclaw-plugin/client.ts +++ b/examples/openclaw-plugin/client.ts @@ -96,6 +96,12 @@ export type SessionContextResult = { }; }; +export type SessionMessagesResult = { + messages: OVMessage[]; + count?: number; + tail?: number; +}; + export type SessionArchiveResult = { archive_id: string; abstract: string; @@ -771,6 +777,19 @@ export class OpenVikingClient { ); } + async getSessionMessagesTail( + sessionId: string, + tail: number = 64, + agentId?: string, + ): Promise { + const safeTail = Math.max(0, Math.min(10_000, Math.floor(Number.isFinite(tail) ? tail : 64))); + return this.request( + `/api/v1/sessions/${encodeURIComponent(sessionId)}/messages?tail=${safeTail}`, + { method: "GET" }, + agentId, + ); + } + async getSessionArchive( sessionId: string, archiveId: string, diff --git a/examples/openclaw-plugin/context-engine.ts b/examples/openclaw-plugin/context-engine.ts index d0bcc62c9..fe46206c6 100644 --- a/examples/openclaw-plugin/context-engine.ts +++ b/examples/openclaw-plugin/context-engine.ts @@ -10,6 +10,7 @@ import { import { compileSessionPatterns, getCaptureDecision, + type ExtractedMessage, extractNewTurnMessages, shouldBypassSession, } from "./text-utils.js"; @@ -58,6 +59,22 @@ type IngestBatchResult = { ingestedCount: number; }; +type CapturedMessagePart = { + type: "text" | "tool"; + text?: string; + tool_id?: string; + tool_name?: string; + tool_input?: unknown; + tool_output?: string; + tool_status?: string; +}; + +export type CapturedOpenVikingMessage = { + msg: ExtractedMessage; + ovParts: CapturedMessagePart[]; + signature: string; +}; + type CompactResult = { ok: boolean; compacted: boolean; @@ -221,6 +238,302 @@ function messageDigest(messages: AgentMessage[], maxCharsPerMsg = 2000): Array<{ }); } +function isEmptyPlainObject(value: unknown): boolean { + return Object.prototype.toString.call(value) === "[object Object]" && + Object.keys(value as Record).length === 0; +} + +function normalizeMessageParts(parts: Array): CapturedMessagePart[] { + return parts.map((part) => { + if (part.type === "text") { + return { + type: "text", + text: typeof part.text === "string" ? part.text : "", + }; + } + return { + type: "tool", + tool_id: typeof part.tool_id === "string" ? part.tool_id : "", + tool_name: typeof part.tool_name === "string" ? part.tool_name : "", + tool_input: isEmptyPlainObject(part.tool_input) ? undefined : part.tool_input, + tool_output: typeof part.tool_output === "string" ? part.tool_output : "", + tool_status: typeof part.tool_status === "string" ? part.tool_status : "", + }; + }); +} + +export function capturedMessageSignature(params: { + role: ExtractedMessage["role"] | string; + parts: Array; +}): string { + return createHash("sha256") + .update(JSON.stringify({ + role: params.role, + parts: normalizeMessageParts(params.parts), + })) + .digest("hex"); +} + +export type AfterTurnDedupMatchKind = + | "none" + | "no-op" + | "tail-match" + | "full-prefix" + | "suffix-fallback"; + +export type AfterTurnDedupResult = { + /** Messages that should be appended to OV (tail of incoming with replay prefix removed). */ + toAppend: CapturedOpenVikingMessage[]; + /** Number of incoming messages classified as already-stored replay and skipped. */ + skipped: number; + /** Which dedup branch produced the result; useful for diagnostics. */ + matchKind: AfterTurnDedupMatchKind; + /** Optional human-readable rationale (especially for fallback branches). */ + reason?: string; +}; + +/** + * AfterTurn replay-aware deduplication for OpenViking. + * + * The OpenClaw 2026.4.x ``installContextEngineLoopHook`` advances a + * private ``lastSeenLength`` per loop iteration, but the attempt + * finalizer (``finalizeHarnessContextEngineTurn``) reuses the outer + * ``prePromptMessageCount`` and re-emits the entire turn delta. Without + * a plugin-side guard, every finalizer replays the whole turn into OV + * once more, producing duplicates. + * + * The algorithm splits across three helpers — + * ``deduplicateAfterTurnBatch`` (entry point dispatching by length), + * ``deduplicateOversizedBatch`` (stored history longer than the batch) + * and ``deduplicateSuffixFallback`` (tail-walk). The matching is + * ordering-aware: + * - It never deduplicates an isolated message based on content alone; + * any positive match requires the incoming batch to align with a + * contiguous segment of stored messages. + * - It also refuses to use a one-message stored prefix as positive + * proof. That case is ambiguous: it may be a finalizer replay of a + * just-captured hook message, or it may be a real next turn whose + * first user message has identical content. We prefer a possible + * duplicate over dropping a real user message. + * - It never lets content-only matching consume the entire incoming + * batch. A full-batch match may be a true replay, but it may also be + * a legitimate next turn with identical user/assistant text. Without + * a runtime message id or turn kind, preserving input is safer. + */ +export function deduplicateAfterTurnBatch( + storedTail: OVMessage[], + incoming: CapturedOpenVikingMessage[], + options?: { oversizedNoOverlap?: "ingest" | "skip" }, +): AfterTurnDedupResult { + if (incoming.length === 0) { + return { toAppend: incoming, skipped: 0, matchKind: "no-op" }; + } + if (incoming.length === 1) { + return { + toAppend: incoming, + skipped: 0, + matchKind: "none", + reason: "single-message-no-dedup", + }; + } + if (storedTail.length === 0) { + return { toAppend: incoming, skipped: 0, matchKind: "none" }; + } + + const storedSignatures = storedTail.map((message) => + capturedMessageSignature({ role: message.role, parts: message.parts }), + ); + const incomingSignatures = incoming.map((message) => message.signature); + + if (storedTail.length > incoming.length) { + return deduplicateOversizedBatch( + storedSignatures, + incoming, + incomingSignatures, + options, + ); + } + + // storedTail.length <= incoming.length: batch may be a finalizer-style + // replay where the entire stored history is the prefix of the incoming + // batch. Verify the boundary identity first (cheap reject) before doing + // the full prefix scan. + const boundaryStored = storedSignatures[storedSignatures.length - 1]; + const boundaryIncoming = incomingSignatures[storedSignatures.length - 1]; + if (boundaryStored !== boundaryIncoming) { + return deduplicateSuffixFallback( + storedSignatures, + incoming, + incomingSignatures, + "prefix-mismatch", + ); + } + + if (storedSignatures.length === 1) { + return { + toAppend: incoming, + skipped: 0, + matchKind: "none", + reason: "single-stored-prefix-no-dedup", + }; + } + + for (let i = 0; i < storedSignatures.length; i += 1) { + if (storedSignatures[i] !== incomingSignatures[i]) { + return deduplicateSuffixFallback( + storedSignatures, + incoming, + incomingSignatures, + "full-prefix-mismatch", + ); + } + } + + const toAppend = incoming.slice(storedSignatures.length); + if (toAppend.length === 0) { + return { + toAppend: incoming, + skipped: 0, + matchKind: "none", + reason: "full-prefix-empty-no-dedup", + }; + } + + return { + toAppend, + skipped: storedSignatures.length, + matchKind: "full-prefix", + }; +} + +/** + * Stored history is longer than the incoming batch — likely a tail-only + * replay observed after the runtime has already commited to a deeper + * transcript than the plugin sees in this afterTurn call. + * + * 1. If stored.last == incoming.last, verify whether the **entire** + * incoming batch matches the stored tail of the same length. That is + * not enough proof to skip the batch, because a legitimate next turn + * can contain identical content; keep the incoming batch instead. + * 2. Otherwise fall through to suffix matching, optionally fail-closed + * when no overlap exists at all. We default to ``ingest`` (see the + * no-overlap branch below): OpenViking has no transcript reconcile + * path, and silently dropping legitimately new messages is strictly + * worse than tolerating an occasional duplicate on a true tail-only + * replay. + */ +function deduplicateOversizedBatch( + storedSignatures: string[], + incoming: CapturedOpenVikingMessage[], + incomingSignatures: string[], + options?: { oversizedNoOverlap?: "ingest" | "skip" }, +): AfterTurnDedupResult { + const lastStored = storedSignatures[storedSignatures.length - 1]; + const lastIncoming = incomingSignatures[incomingSignatures.length - 1]; + + if (lastStored === lastIncoming) { + const tailStart = storedSignatures.length - incoming.length; + let tailMatches = true; + for (let i = 0; i < incoming.length; i += 1) { + if (storedSignatures[tailStart + i] !== incomingSignatures[i]) { + tailMatches = false; + break; + } + } + if (tailMatches) { + return { + toAppend: incoming, + skipped: 0, + matchKind: "none", + reason: "tail-match-empty-no-dedup", + }; + } + } + + // Default to "ingest" on no-overlap, never "skip". Failing closed + // would require an external transcript reconcile path that imports + // genuine missing JSONL tail turns before afterTurn dedup runs; + // OpenViking has no equivalent reconcile mechanism — the plugin's + // afterTurn is the only ingest path — so fail-closed would permanently + // drop legitimately new messages whose content happens to match + // nothing in the existing stored tail (for example, a single new user + // message after the previous turn already grew the stored tail beyond + // the incoming batch length). Tolerating an occasional duplicated + // entry on a true tail-only replay is strictly safer than silently + // losing a real user turn. + return deduplicateSuffixFallback( + storedSignatures, + incoming, + incomingSignatures, + "oversized", + { onNoOverlap: options?.oversizedNoOverlap ?? "ingest" }, + ); +} + +/** + * Suffix-matching fallback: scan the incoming batch from the end looking + * for an index ``k`` where ``incoming[..k]`` aligns with the trailing + * end of stored history. Return only the messages strictly after ``k``. + * + * The matching constraint is order-preserving and only accepts matches + * that leave a non-empty new tail. If the overlap would consume the + * entire incoming batch, content alone cannot prove this is a replay + * rather than a real repeated turn. + */ +function deduplicateSuffixFallback( + storedSignatures: string[], + incoming: CapturedOpenVikingMessage[], + incomingSignatures: string[], + context: string, + options?: { onNoOverlap?: "ingest" | "skip" }, +): AfterTurnDedupResult { + if (storedSignatures.length === 0) { + return { toAppend: incoming, skipped: 0, matchKind: "none", reason: context }; + } + const lastStored = storedSignatures[storedSignatures.length - 1]; + + for (let k = incomingSignatures.length - 1; k >= 0; k -= 1) { + if (incomingSignatures[k] !== lastStored) { + continue; + } + const matchLen = Math.min(k + 1, storedSignatures.length); + const startStored = storedSignatures.length - matchLen; + let suffixMatch = true; + for (let j = 0; j < matchLen; j += 1) { + if ( + storedSignatures[startStored + j] !== incomingSignatures[k - matchLen + 1 + j] + ) { + suffixMatch = false; + break; + } + } + const newSlice = incoming.slice(k + 1); + if (suffixMatch && newSlice.length > 0) { + return { + toAppend: newSlice, + skipped: incoming.length - newSlice.length, + matchKind: "suffix-fallback", + reason: context, + }; + } + } + + if (options?.onNoOverlap === "skip") { + return { + toAppend: [], + skipped: incoming.length, + matchKind: "suffix-fallback", + reason: `${context}-no-overlap-fail-closed`, + }; + } + return { + toAppend: incoming, + skipped: 0, + matchKind: "none", + reason: `${context}-no-overlap-ingest`, + }; +} + function extractAgentMessageText(message: AgentMessage | undefined): string { if (!message) { return ""; @@ -1300,21 +1613,11 @@ export function createMemoryOpenVikingContextEngine(params: { const newMsgFull = messageDigest(newMessages); const newTurnTokens = newMsgFull.reduce((s, d) => s + d.tokens, 0); - diag("afterTurn_entry", OVSessionId, { - totalMessages: messages.length, - newMessageCount: newCount, - prePromptMessageCount: start, - newTurnTokens, - senderIdFound: sender.found, - senderId: sender.senderId ?? null, - messages: newMsgFull, - }); - const client = await getClient(); const createdAt = pickLatestCreatedAt(turnMessages); const senderRoleId = toRoleId(sender.senderId); // 发送结构化消息:统一 role 为 user,通过 parts 区分类型 - for (const msg of extractedMessages) { + const capturedMessages: CapturedOpenVikingMessage[] = extractedMessages.map((msg) => { const ovParts = msg.parts.map((part) => { if (part.type === "text") { // 清理 relevant-memories 块 @@ -1334,7 +1637,100 @@ export function createMemoryOpenVikingContextEngine(params: { }; } }); + return { + msg, + ovParts, + signature: capturedMessageSignature({ role: msg.role, parts: ovParts }), + }; + }); + + // Pre-fetch session meta so we can pull the live-only stored tail + // (capped at ``message_count`` — never crossing into archives, which + // would compare against summarized history that the runtime no + // longer puts into incoming) and get pending_tokens in one round + // trip. The returned shape is reused for the post-write commit + // decision below. + let preSessionInfo: Awaited> | undefined; + try { + preSessionInfo = await client.getSession(OVSessionId, agentId); + } catch (sessErr) { + logger.warn?.( + `openviking: afterTurn pre-fetch session meta failed for session=${OVSessionId}: ${String(sessErr)}`, + ); + diag("afterTurn_pre_session_fetch_error", OVSessionId, { + error: String(sessErr), + senderIdFound: sender.found, + senderId: sender.senderId ?? null, + }); + } + const liveMessageCount = Math.max(0, Math.floor(preSessionInfo?.message_count ?? 0)); + + let messagesToAppend = capturedMessages; + let dedupMatchKind: AfterTurnDedupMatchKind = "no-op"; + let dedupSkipped = 0; + let dedupReason: string | undefined; + let persistedTailLength = 0; + let dedupTailLimit = 0; + if (capturedMessages.length > 0 && liveMessageCount > 0) { + dedupTailLimit = Math.min(10_000, liveMessageCount); + try { + const persistedTail = await client.getSessionMessagesTail( + OVSessionId, + dedupTailLimit, + agentId, + ); + const persistedMessages = Array.isArray(persistedTail.messages) + ? persistedTail.messages + : []; + persistedTailLength = persistedMessages.length; + const result = deduplicateAfterTurnBatch(persistedMessages, capturedMessages); + messagesToAppend = result.toAppend; + dedupMatchKind = result.matchKind; + dedupSkipped = result.skipped; + dedupReason = result.reason; + if (dedupSkipped > 0) { + diag("afterTurn_tail_dedup", OVSessionId, { + skippedMessages: dedupSkipped, + capturedMessages: capturedMessages.length, + persistedTailMessages: persistedMessages.length, + tailLimit: dedupTailLimit, + liveMessageCount, + matchKind: dedupMatchKind, + reason: dedupReason ?? null, + senderIdFound: sender.found, + senderId: sender.senderId ?? null, + }); + } + } catch (tailErr) { + logger.warn?.( + `openviking: afterTurn raw tail fetch failed for session=${OVSessionId}: ${String(tailErr)}`, + ); + diag("afterTurn_tail_dedup_error", OVSessionId, { + error: String(tailErr), + capturedMessages: capturedMessages.length, + senderIdFound: sender.found, + senderId: sender.senderId ?? null, + }); + } + } + + diag("afterTurn_entry", OVSessionId, { + totalMessages: messages.length, + newMessageCount: newCount, + prePromptMessageCount: start, + liveMessageCount, + persistedTailLength, + dedupTailLimit, + dedupMatchKind, + dedupSkipped, + dedupReason: dedupReason ?? null, + newTurnTokens, + senderIdFound: sender.found, + senderId: sender.senderId ?? null, + messages: newMsgFull, + }); + for (const { msg, ovParts } of messagesToAppend) { if (ovParts.length > 0) { await client.addSessionMessage( OVSessionId, @@ -1347,8 +1743,13 @@ export function createMemoryOpenVikingContextEngine(params: { } } - const session = await client.getSession(OVSessionId, agentId); - const pendingTokens = session.pending_tokens ?? 0; + // Re-fetch pending_tokens now that this batch's writes are + // persisted; the pre-fetch above only captured the state before + // this turn's appends. + const session = messagesToAppend.length > 0 + ? await client.getSession(OVSessionId, agentId) + : preSessionInfo; + const pendingTokens = session?.pending_tokens ?? 0; if (pendingTokens < cfg.commitTokenThreshold) { diag("afterTurn_skip", OVSessionId, { diff --git a/examples/openclaw-plugin/tests/ut/client.test.ts b/examples/openclaw-plugin/tests/ut/client.test.ts index b4ab0acb5..b5515b1cf 100644 --- a/examples/openclaw-plugin/tests/ut/client.test.ts +++ b/examples/openclaw-plugin/tests/ut/client.test.ts @@ -559,4 +559,28 @@ describe("OpenVikingClient canonical namespace policy", () => { const body = JSON.parse(String(init.body)); expect(body.role_id).toBe("telegram_12345"); }); + + it("fetches raw session message tail with a bounded tail query", async () => { + const fetchMock = vi.fn().mockResolvedValue(okResponse({ + messages: [ + { + id: "msg_1", + role: "user", + parts: [{ type: "text", text: "stored tail" }], + created_at: "2026-05-07T00:00:00.000Z", + }, + ], + count: 1, + tail: 64, + })); + vi.stubGlobal("fetch", fetchMock); + + const client = new OpenVikingClient("http://127.0.0.1:1933", "", "agent", 5000); + const result = await client.getSessionMessagesTail("s1", 64, "agent"); + + expect(result.messages[0].parts[0].text).toBe("stored tail"); + expect(String(fetchMock.mock.calls[0][0])).toBe( + "http://127.0.0.1:1933/api/v1/sessions/s1/messages?tail=64", + ); + }); }); diff --git a/examples/openclaw-plugin/tests/ut/context-engine-afterTurn.test.ts b/examples/openclaw-plugin/tests/ut/context-engine-afterTurn.test.ts index 7515dac1b..8f75c5051 100644 --- a/examples/openclaw-plugin/tests/ut/context-engine-afterTurn.test.ts +++ b/examples/openclaw-plugin/tests/ut/context-engine-afterTurn.test.ts @@ -2,7 +2,10 @@ import { describe, expect, it, vi } from "vitest"; import type { OpenVikingClient } from "../../client.js"; import { memoryOpenVikingConfigSchema } from "../../config.js"; -import { createMemoryOpenVikingContextEngine } from "../../context-engine.js"; +import { + capturedMessageSignature, + createMemoryOpenVikingContextEngine, +} from "../../context-engine.js"; function makeLogger() { return { @@ -18,6 +21,12 @@ function makeEngine(opts?: { getSession?: Record; addSessionMessageError?: Error; cfgOverrides?: Record; + messageTail?: Array<{ + id: string; + role: string; + parts: Array>; + created_at: string; + }>; }) { const cfg = memoryOpenVikingConfigSchema.parse({ mode: "remote", @@ -30,9 +39,39 @@ function makeEngine(opts?: { }); const logger = makeLogger(); + const storedTail = [...(opts?.messageTail ?? [])]; const addSessionMessage = opts?.addSessionMessageError ? vi.fn().mockRejectedValue(opts.addSessionMessageError) - : vi.fn().mockResolvedValue(undefined); + : vi.fn().mockImplementation( + async ( + _sessionId: string, + role: string, + parts: Array>, + _agentId?: string, + createdAt?: string, + ) => { + storedTail.push({ + id: `msg_${storedTail.length + 1}`, + role, + parts: JSON.parse(JSON.stringify(parts)), + created_at: createdAt ?? "2026-05-07T00:00:00.000Z", + }); + }, + ); + + const baseSessionMeta = opts?.getSession ?? { pending_tokens: 100 }; + // The plugin pre-fetches getSession to size the dedup tail (live count + // only) and re-fetches after writes for the pending-tokens decision. + // Match that by making message_count track storedTail length unless an + // explicit override was supplied. + const overrideMessageCount = + typeof (baseSessionMeta as { message_count?: number }).message_count === "number"; + const getSession = vi.fn().mockImplementation(async () => ({ + ...baseSessionMeta, + message_count: overrideMessageCount + ? (baseSessionMeta as { message_count: number }).message_count + : storedTail.length, + })); const client = { addSessionMessage, @@ -41,9 +80,7 @@ function makeEngine(opts?: { task_id: "task-1", archived: false, }), - getSession: vi.fn().mockResolvedValue( - opts?.getSession ?? { pending_tokens: 100 }, - ), + getSession, getSessionContext: vi.fn().mockResolvedValue({ latest_archive_overview: "", latest_archive_id: "", @@ -52,6 +89,9 @@ function makeEngine(opts?: { estimatedTokens: 0, stats: { totalArchives: 0, includedArchives: 0, droppedArchives: 0, failedArchives: 0, activeTokens: 0, archiveTokens: 0 }, }), + getSessionMessagesTail: vi.fn().mockImplementation(async (_sid: string, tail: number) => ({ + messages: storedTail.slice(Math.max(0, storedTail.length - tail)), + })), } as unknown as OpenVikingClient; const getClient = vi.fn().mockResolvedValue(client); @@ -73,6 +113,7 @@ function makeEngine(opts?: { addSessionMessage: ReturnType; commitSession: ReturnType; getSession: ReturnType; + getSessionMessagesTail: ReturnType; }, logger, getClient, @@ -176,6 +217,342 @@ describe("context-engine afterTurn()", () => { expect(client.addSessionMessage.mock.calls[1][2][0].text).toContain("hi there"); }); + it("stores a real new single-message turn even when it repeats persisted user content (review regression)", async () => { + // The published reviewer-reproduced regression for the previous + // raw-tail dedup attempt: user types the same content across two + // turns. Stored tail at the start of turn 2 is [user, assistant]; the + // incoming batch carries only the new user turn (single message). + // The deduplicateAfterTurnBatch routine falls through to the + // oversized branch (stored > incoming), tail-match fails (stored + // last is the assistant reply), suffix-fallback finds no overlap, + // and the OV-tuned default policy is "ingest" rather than "skip" — + // so the second user message is preserved. + const { engine, client } = makeEngine({ + messageTail: [ + { + id: "msg_existing_user", + role: "user", + parts: [{ type: "text", text: "same answer" }], + created_at: "2026-05-07T00:00:00.000Z", + }, + { + id: "msg_existing_assistant", + role: "assistant", + parts: [{ type: "text", text: "first reply" }], + created_at: "2026-05-07T00:00:01.000Z", + }, + ], + }); + + await engine.afterTurn!({ + sessionId: "s1", + sessionFile: "", + messages: [{ role: "user", content: "same answer" }], + prePromptMessageCount: 0, + }); + + expect(client.getSessionMessagesTail).toHaveBeenCalled(); + expect(client.addSessionMessage).toHaveBeenCalledTimes(1); + expect(client.addSessionMessage.mock.calls[0][1]).toBe("user"); + expect(client.addSessionMessage.mock.calls[0][2][0].text).toBe("same answer"); + }); + + it("stores a user-only repeated message when persisted tail contains only the same user", async () => { + const { engine, client } = makeEngine({ + messageTail: [ + { + id: "msg_existing_user", + role: "user", + parts: [{ type: "text", text: "same answer" }], + created_at: "2026-05-07T00:00:00.000Z", + }, + ], + }); + + await engine.afterTurn!({ + sessionId: "s1", + sessionFile: "", + messages: [{ role: "user", content: "same answer" }], + prePromptMessageCount: 0, + }); + + expect(client.getSessionMessagesTail).toHaveBeenCalled(); + expect(client.addSessionMessage).toHaveBeenCalledTimes(1); + expect(client.addSessionMessage.mock.calls[0][1]).toBe("user"); + expect(client.addSessionMessage.mock.calls[0][2][0].text).toBe("same answer"); + }); + + it("treats empty tool_input and missing tool_input as the same signature", () => { + const withEmptyInput = capturedMessageSignature({ + role: "user", + parts: [{ + type: "tool", + tool_id: "call_1", + tool_name: "diagnostic_tool", + tool_input: {}, + tool_output: "ok", + tool_status: "success", + }], + }); + const withMissingInput = capturedMessageSignature({ + role: "user", + parts: [{ + type: "tool", + tool_id: "call_1", + tool_name: "diagnostic_tool", + tool_output: "ok", + tool_status: "success", + }], + }); + + expect(withEmptyInput).toBe(withMissingInput); + }); + + it("keeps a repeated leading user when only one stored message exists", async () => { + const { engine, client } = makeEngine(); + + await engine.afterTurn!({ + sessionId: "s1", + sessionFile: "", + messages: [{ role: "user", content: "hello from loop hook" }], + prePromptMessageCount: 0, + }); + + await engine.afterTurn!({ + sessionId: "s1", + sessionFile: "", + messages: [ + { role: "user", content: "hello from loop hook" }, + { role: "assistant", content: "final answer" }, + ], + prePromptMessageCount: 0, + }); + + expect(client.addSessionMessage).toHaveBeenCalledTimes(3); + expect(client.addSessionMessage.mock.calls[0][1]).toBe("user"); + expect(client.addSessionMessage.mock.calls[0][2][0].text).toContain("hello from loop hook"); + expect(client.addSessionMessage.mock.calls[1][1]).toBe("user"); + expect(client.addSessionMessage.mock.calls[1][2][0].text).toContain("hello from loop hook"); + expect(client.addSessionMessage.mock.calls[2][1]).toBe("assistant"); + expect(client.addSessionMessage.mock.calls[2][2][0].text).toContain("final answer"); + }); + + it("keeps a fully repeated user and assistant turn instead of treating content equality as replay", async () => { + const { engine, client } = makeEngine(); + + await engine.afterTurn!({ + sessionId: "s1", + sessionFile: "", + messages: [ + { role: "user", content: "ping" }, + { role: "assistant", content: "pong" }, + ], + prePromptMessageCount: 0, + }); + + await engine.afterTurn!({ + sessionId: "s1", + sessionFile: "", + messages: [ + { role: "user", content: "ping" }, + { role: "assistant", content: "pong" }, + ], + prePromptMessageCount: 0, + }); + + expect(client.addSessionMessage).toHaveBeenCalledTimes(4); + expect(client.addSessionMessage.mock.calls.map((call) => call[1])).toEqual([ + "user", + "assistant", + "user", + "assistant", + ]); + }); + + it("does not use a single persisted raw transcript tail message as replay proof after a plugin restart", async () => { + const { engine, client, logger } = makeEngine({ + messageTail: [ + { + id: "msg_existing_user", + role: "user", + parts: [{ type: "text", text: "hello from stored tail" }], + created_at: "2026-05-07T00:00:00.000Z", + }, + ], + }); + + await engine.afterTurn!({ + sessionId: "s1", + sessionFile: "", + messages: [ + { role: "user", content: "hello from stored tail" }, + { role: "assistant", content: "final answer after restart" }, + ], + prePromptMessageCount: 0, + }); + + // tailLimit equals the live message_count returned by getSession, + // never a commitKeepRecentCount-derived value: the dedup compares + // against the OV live tail only and must avoid crossing into + // archives, where the runtime's incoming batch (built from a + // post-archive coordinate space) cannot align. + expect(client.getSessionMessagesTail).toHaveBeenCalledWith("s1", 1, "test-agent"); + expect(client.addSessionMessage).toHaveBeenCalledTimes(2); + expect(client.addSessionMessage.mock.calls[0][1]).toBe("user"); + expect(client.addSessionMessage.mock.calls[0][2][0].text).toContain("hello from stored tail"); + expect(client.addSessionMessage.mock.calls[1][1]).toBe("assistant"); + expect(client.addSessionMessage.mock.calls[1][2][0].text).toContain("final answer after restart"); + expect(logger.info).not.toHaveBeenCalledWith( + expect.stringContaining('"stage":"afterTurn_tail_dedup"'), + ); + }); + + it("sizes the persisted-tail fetch by live message_count, not by config", async () => { + const { engine, client } = makeEngine({ + cfgOverrides: { commitKeepRecentCount: 24 }, + messageTail: Array.from({ length: 5 }, (_, idx) => ({ + id: `msg_${idx + 1}`, + role: idx % 2 === 0 ? "user" : "assistant", + parts: [{ type: "text", text: `pre-existing ${idx + 1}` }], + created_at: "2026-05-07T00:00:00.000Z", + })), + }); + + await engine.afterTurn!({ + sessionId: "s1", + sessionFile: "", + messages: [ + { role: "user", content: "live-count drives tailLimit" }, + { role: "assistant", content: "regardless of commitKeepRecentCount" }, + ], + prePromptMessageCount: 0, + }); + + // 5 stored messages = live message_count = exact tailLimit + expect(client.getSessionMessagesTail).toHaveBeenCalledWith("s1", 5, "test-agent"); + }); + + it("skips the persisted-tail fetch entirely when the live session is empty", async () => { + const { engine, client } = makeEngine(); + + await engine.afterTurn!({ + sessionId: "s1", + sessionFile: "", + messages: Array.from({ length: 8 }, (_, index) => ({ + role: index % 2 === 0 ? "user" : "assistant", + content: `first turn message ${index + 1}`, + })), + prePromptMessageCount: 0, + }); + + // Empty live session => no remote tail to compare against; we + // shouldn't waste a round trip on dedup. + expect(client.getSessionMessagesTail).not.toHaveBeenCalled(); + expect(client.addSessionMessage).toHaveBeenCalledTimes(8); + }); + + it("skips replayed tool-loop transcript messages already captured for the session", async () => { + const { engine, client, logger } = makeEngine(); + const userMessage = { role: "user", content: "store this locomo conversation" }; + const toolCall = { + role: "assistant", + content: [ + { type: "text", text: "I will store it first." }, + { type: "toolUse", id: "call_1", name: "memory_store", input: { text: "locomo facts" } }, + ], + }; + const toolResult = { + role: "toolResult", + toolCallId: "call_1", + toolName: "memory_store", + content: "Stored in OpenViking and committed 6 memories.", + }; + + await engine.afterTurn!({ + sessionId: "s1", + sessionFile: "", + messages: [userMessage, toolCall, toolResult], + prePromptMessageCount: 0, + }); + + await engine.afterTurn!({ + sessionId: "s1", + sessionFile: "", + messages: [ + userMessage, + toolCall, + toolResult, + { role: "assistant", content: "Stored. Here is the recap." }, + ], + prePromptMessageCount: 0, + }); + + expect(client.addSessionMessage).toHaveBeenCalledTimes(4); + expect(client.addSessionMessage.mock.calls.map((call) => call[1])).toEqual([ + "user", + "assistant", + "user", + "assistant", + ]); + expect(client.addSessionMessage.mock.calls[3][2][0].text).toContain("Here is the recap"); + expect(logger.info).toHaveBeenCalledWith( + expect.stringContaining('"stage":"afterTurn_tail_dedup"'), + ); + expect(logger.info).toHaveBeenCalledWith( + expect.stringContaining('"skippedMessages":3'), + ); + }); + + it("keeps an entire identical finalizer transcript because content-only proof is ambiguous", async () => { + const { engine, client, logger } = makeEngine(); + const messages = [ + { role: "user", content: "please run the diagnostic tool once" }, + { + role: "assistant", + content: [ + { type: "text", text: "I will run it now." }, + { type: "toolCall", id: "call_1", name: "diagnostic_tool", arguments: { scope: "afterTurn" } }, + ], + }, + { + role: "toolResult", + toolCallId: "call_1", + toolName: "diagnostic_tool", + content: [{ type: "text", text: "diagnostic result: ok" }], + }, + { role: "assistant", content: "The diagnostic finished cleanly." }, + ]; + + await engine.afterTurn!({ + sessionId: "s1", + sessionFile: "", + messages, + prePromptMessageCount: 0, + }); + + await engine.afterTurn!({ + sessionId: "s1", + sessionFile: "", + messages, + prePromptMessageCount: 0, + }); + + expect(client.addSessionMessage).toHaveBeenCalledTimes(8); + expect(client.addSessionMessage.mock.calls.map((call) => call[1])).toEqual([ + "user", + "assistant", + "user", + "assistant", + "user", + "assistant", + "user", + "assistant", + ]); + expect(logger.info).not.toHaveBeenCalledWith( + expect.stringContaining('"stage":"afterTurn_tail_dedup"'), + ); + }); + it("passes the latest non-system message timestamp to addSessionMessage as ISO string", async () => { const { engine, client } = makeEngine(); diff --git a/examples/openclaw-plugin/tests/ut/dedup-after-turn-batch.test.ts b/examples/openclaw-plugin/tests/ut/dedup-after-turn-batch.test.ts new file mode 100644 index 000000000..1128bc752 --- /dev/null +++ b/examples/openclaw-plugin/tests/ut/dedup-after-turn-batch.test.ts @@ -0,0 +1,253 @@ +import { describe, expect, it } from "vitest"; + +import type { OVMessage } from "../../client.js"; +import { + type CapturedOpenVikingMessage, + capturedMessageSignature, + deduplicateAfterTurnBatch, +} from "../../context-engine.js"; + +// Build a CapturedOpenVikingMessage triplet (msg + ovParts + signature) +// directly, so the algorithm-level tests don't have to drag the whole +// engine into setup. The signature here is the same hash the engine +// uses, derived from role + normalized parts, so it cleanly aligns with +// how OVMessage tail entries are signed inside deduplicateAfterTurnBatch. +function makeUserText(text: string): CapturedOpenVikingMessage { + const ovParts = [{ type: "text" as const, text }]; + return { + msg: { role: "user", parts: ovParts }, + ovParts, + signature: capturedMessageSignature({ role: "user", parts: ovParts }), + }; +} + +function makeAssistantText(text: string): CapturedOpenVikingMessage { + const ovParts = [{ type: "text" as const, text }]; + return { + msg: { role: "assistant", parts: ovParts }, + ovParts, + signature: capturedMessageSignature({ role: "assistant", parts: ovParts }), + }; +} + +function makeStoredFromCaptured(captured: CapturedOpenVikingMessage[]): OVMessage[] { + return captured.map((c, idx) => ({ + id: `stored_${idx + 1}`, + role: c.msg.role, + parts: c.ovParts, + created_at: "2026-05-07T00:00:00.000Z", + })); +} + +describe("deduplicateAfterTurnBatch — algorithm-level coverage", () => { + it("no-op when incoming is empty", () => { + const stored = makeStoredFromCaptured([makeUserText("hi"), makeAssistantText("hello")]); + const result = deduplicateAfterTurnBatch(stored, []); + expect(result.matchKind).toBe("no-op"); + expect(result.toAppend).toEqual([]); + expect(result.skipped).toBe(0); + }); + + it("ingests entire batch when stored tail is empty", () => { + const incoming = [makeUserText("first"), makeAssistantText("answer")]; + const result = deduplicateAfterTurnBatch([], incoming); + expect(result.matchKind).toBe("none"); + expect(result.skipped).toBe(0); + expect(result.toAppend).toBe(incoming); + }); + + it("slices full-prefix replay when stored is exactly the leading prefix of incoming", () => { + const sharedUser = makeUserText("first user prompt"); + const sharedAssistant = makeAssistantText("first assistant reply"); + const newAssistant = makeAssistantText("final answer this turn"); + + const stored = makeStoredFromCaptured([sharedUser, sharedAssistant]); + const incoming = [sharedUser, sharedAssistant, newAssistant]; + + const result = deduplicateAfterTurnBatch(stored, incoming); + expect(result.matchKind).toBe("full-prefix"); + expect(result.skipped).toBe(2); + expect(result.toAppend).toEqual([newAssistant]); + }); + + it("falls back to suffix matching when boundary signature does not align", () => { + // Stored history ends with assistant; incoming carries the same + // assistant-as-suffix plus a brand new follow-up assistant. The + // boundary at incoming[storedLength-1] does not equal stored.last, + // so the routine must take the suffix-fallback branch. + const storedAssistant = makeAssistantText("response after restart"); + const newUser = makeUserText("follow up question"); + const stored = makeStoredFromCaptured([storedAssistant]); + const incoming = [newUser, storedAssistant]; + + const result = deduplicateAfterTurnBatch(stored, incoming); + expect(result.matchKind).toBe("none"); + // Suffix fallback ran (context="prefix-mismatch") but found no + // qualifying overlap — single-stored-message means matchLen=1 and + // newSlice would be empty, which the fallback rejects to avoid a + // coincidental single-message match. We then fall through to the + // ingest policy, surfacing the "-no-overlap-ingest" tag. + expect(result.reason).toBe("prefix-mismatch-no-overlap-ingest"); + expect(result.toAppend).toEqual(incoming); + }); + + it("oversized + tail-match: keeps incoming when it exactly equals the stored tail", () => { + const u = makeUserText("turn 1 user"); + const a = makeAssistantText("turn 1 assistant"); + const u2 = makeUserText("turn 2 user"); + const a2 = makeAssistantText("turn 2 assistant"); + const stored = makeStoredFromCaptured([u, a, u2, a2]); + // Content alone cannot distinguish a tail-only replay from a real + // next turn that happens to have the same user/assistant text. + const incoming = [u2, a2]; + + const result = deduplicateAfterTurnBatch(stored, incoming); + expect(result.matchKind).toBe("none"); + expect(result.reason).toBe("tail-match-empty-no-dedup"); + expect(result.skipped).toBe(0); + expect(result.toAppend).toEqual(incoming); + }); + + it("single-message incoming is preserved instead of content-deduping against older tail", () => { + // Reviewer-reported regression: stored=[user='same answer', assistant='reply'] + // and incoming=[user='same answer'] — the second turn carries a + // legitimately new user message whose content happens to match an + // older stored user. Stored.last is the assistant, suffix-fallback + // finds no match, and our default "ingest" policy preserves the new + // user instead of fail-closing it. + const turn1User = makeUserText("same answer"); + const turn1Assistant = makeAssistantText("first reply"); + const turn2User = makeUserText("same answer"); + const stored = makeStoredFromCaptured([turn1User, turn1Assistant]); + const incoming = [turn2User]; + + const result = deduplicateAfterTurnBatch(stored, incoming); + expect(result.matchKind).toBe("none"); + expect(result.reason).toBe("single-message-no-dedup"); + expect(result.skipped).toBe(0); + expect(result.toAppend).toEqual([turn2User]); + }); + + it("single-message incoming ignores explicit skip override to avoid dropping real user input", () => { + const turn1User = makeUserText("same answer"); + const turn1Assistant = makeAssistantText("first reply"); + const turn2User = makeUserText("same answer"); + const stored = makeStoredFromCaptured([turn1User, turn1Assistant]); + const incoming = [turn2User]; + + const result = deduplicateAfterTurnBatch(stored, incoming, { + oversizedNoOverlap: "skip", + }); + expect(result.matchKind).toBe("none"); + expect(result.reason).toBe("single-message-no-dedup"); + expect(result.skipped).toBe(0); + expect(result.toAppend).toEqual([turn2User]); + }); + + it("does not use a one-message stored prefix as proof for a repeated user plus assistant turn", () => { + const turn1User = makeUserText("same answer"); + const turn2User = makeUserText("same answer"); + const turn2Assistant = makeAssistantText("second reply"); + const stored = makeStoredFromCaptured([turn1User]); + const incoming = [turn2User, turn2Assistant]; + + const result = deduplicateAfterTurnBatch(stored, incoming); + expect(result.matchKind).toBe("none"); + expect(result.reason).toBe("single-stored-prefix-no-dedup"); + expect(result.skipped).toBe(0); + expect(result.toAppend).toEqual(incoming); + }); + + it("oversized with partial suffix overlap: returns the genuinely new tail", () => { + // Stored ends with [oldUser, oldAssistant]. Incoming reuses + // oldAssistant as its leading message and carries a new user/ + // assistant pair after. Suffix scan should align on oldAssistant and + // return only the new tail. + const oldUser = makeUserText("old user"); + const oldAssistant = makeAssistantText("old assistant"); + const newUser = makeUserText("new user"); + const newAssistant = makeAssistantText("new assistant"); + + const stored = makeStoredFromCaptured([oldUser, oldAssistant]); + const incoming = [oldAssistant, newUser, newAssistant]; + + const result = deduplicateAfterTurnBatch(stored, incoming); + expect(result.matchKind).toBe("suffix-fallback"); + expect(result.toAppend).toEqual([newUser, newAssistant]); + expect(result.skipped).toBe(1); + }); + + it("does not falsely match a single-message incoming against an unrelated stored last", () => { + const stored = makeStoredFromCaptured([ + makeUserText("totally unrelated stored content"), + makeAssistantText("equally unrelated reply"), + ]); + const incoming = [makeUserText("brand new user input")]; + + const result = deduplicateAfterTurnBatch(stored, incoming); + expect(result.matchKind).toBe("none"); + expect(result.reason).toBe("single-message-no-dedup"); + expect(result.toAppend).toEqual(incoming); + }); + + it("falls back to suffix matching when boundary matches but the full prefix does not", () => { + const storedUser = makeUserText("stored user"); + const storedAssistant = makeAssistantText("stored assistant"); + const unrelatedUser = makeUserText("unrelated leading user"); + const finalAssistant = makeAssistantText("final assistant"); + const stored = makeStoredFromCaptured([storedUser, storedAssistant]); + const incoming = [unrelatedUser, storedAssistant, storedUser, storedAssistant, finalAssistant]; + + const result = deduplicateAfterTurnBatch(stored, incoming); + expect(result.matchKind).toBe("suffix-fallback"); + expect(result.reason).toBe("full-prefix-mismatch"); + expect(result.skipped).toBe(4); + expect(result.toAppend).toEqual([finalAssistant]); + }); + + it("does not let suffix fallback consume the entire incoming batch when no new tail remains", () => { + const storedUser = makeUserText("stored user"); + const storedAssistant = makeAssistantText("stored assistant"); + const storedToolResult = makeUserText("stored tool result"); + const unrelatedUser = makeUserText("unrelated leading user"); + const stored = makeStoredFromCaptured([storedUser, storedAssistant, storedToolResult]); + const incoming = [unrelatedUser, storedUser, storedAssistant, storedToolResult]; + + const result = deduplicateAfterTurnBatch(stored, incoming); + expect(result.matchKind).toBe("none"); + expect(result.reason).toBe("prefix-mismatch-no-overlap-ingest"); + expect(result.skipped).toBe(0); + expect(result.toAppend).toEqual(incoming); + }); + + it("multi-tool finalizer replay: prefix proof skips the entire stored prefix", () => { + const user = makeUserText("please run the diagnostic tool once"); + const assCall = makeAssistantText("I will run it now."); + const toolResult = makeUserText("diagnostic result: ok"); + const finalAss = makeAssistantText("The diagnostic finished cleanly."); + + // After the loop hook captured user / assistant_call / toolResult, + // stored == [user, assCall, toolResult]. The finalizer replays the + // entire turn including the final assistant. + const stored = makeStoredFromCaptured([user, assCall, toolResult]); + const incoming = [user, assCall, toolResult, finalAss]; + + const result = deduplicateAfterTurnBatch(stored, incoming); + expect(result.matchKind).toBe("full-prefix"); + expect(result.skipped).toBe(3); + expect(result.toAppend).toEqual([finalAss]); + }); + + it("full identical replay (no new tail): keeps incoming because content-only proof is ambiguous", () => { + const u = makeUserText("u"); + const a = makeAssistantText("a"); + const stored = makeStoredFromCaptured([u, a]); + const incoming = [u, a]; + + const result = deduplicateAfterTurnBatch(stored, incoming); + expect(result.matchKind).toBe("none"); + expect(result.reason).toBe("full-prefix-empty-no-dedup"); + expect(result.skipped).toBe(0); + expect(result.toAppend).toEqual(incoming); + }); +}); diff --git a/openviking/server/routers/sessions.py b/openviking/server/routers/sessions.py index 08e662d4b..859edf03f 100644 --- a/openviking/server/routers/sessions.py +++ b/openviking/server/routers/sessions.py @@ -269,6 +269,31 @@ async def extract_session( return Response(status="ok", result=_to_jsonable(result)) +@router.get("/{session_id}/messages") +async def get_session_messages( + session_id: str = Path(..., description="Session ID"), + tail: int = Query(64, ge=0, le=10_000, description="Number of latest raw messages to return"), + _ctx: RequestContext = Depends(get_request_context), +): + """Get raw session transcript messages. + + With ``tail`` this returns the latest raw messages across completed archives, + incomplete archives, and the current live session. + """ + service = get_service() + session = service.sessions.session(_ctx, session_id) + await session.load() + messages = await session.get_raw_messages_tail(tail=tail) + return Response( + status="ok", + result={ + "messages": [m.to_dict() for m in messages], + "count": len(messages), + "tail": tail, + }, + ) + + @router.post("/{session_id}/messages") async def add_message( request: AddMessageRequest, diff --git a/openviking/session/session.py b/openviking/session/session.py index 6b70cb5fc..de25066af 100644 --- a/openviking/session/session.py +++ b/openviking/session/session.py @@ -1166,6 +1166,42 @@ async def get_session_archive(self, archive_id: str) -> Dict[str, Any]: raise NotFoundError(archive_id, "session archive") + async def get_raw_messages_tail(self, tail: int = 64) -> List[Message]: + """Return the latest raw transcript messages across archives and live state. + + Unlike get_session_context(), this does not summarize completed archives. + It is intended for transcript-boundary checks such as append de-duplication. + """ + tail = max(0, int(tail or 0)) + if tail <= 0: + return [] + + result: List[Message] = list(self._messages[-tail:]) + needed = tail - len(result) + if needed <= 0: + return result[-tail:] + + pending_messages = await self._get_pending_archive_messages() + if pending_messages: + pending_tail = pending_messages[-needed:] + result = pending_tail + result + needed = tail - len(result) + if needed <= 0: + return result[-tail:] + + completed_prefix: List[Message] = [] + for archive in await self._get_completed_archive_refs(): + if needed <= 0: + break + archive_messages = await self._read_archive_messages(archive["archive_uri"]) + if not archive_messages: + continue + take = archive_messages[-needed:] + completed_prefix = take + completed_prefix + needed = tail - len(completed_prefix) - len(result) + + return (completed_prefix + result)[-tail:] + # ============= Internal methods ============= async def _collect_session_context_components(self) -> Dict[str, Any]: diff --git a/tests/server/test_api_sessions.py b/tests/server/test_api_sessions.py index 8b842118b..7f4fe3270 100644 --- a/tests/server/test_api_sessions.py +++ b/tests/server/test_api_sessions.py @@ -637,6 +637,44 @@ async def test_get_session_context_endpoint_returns_trimmed_latest_archive_and_m assert result["stats"]["failedArchives"] == 0 +async def test_get_session_messages_tail_includes_completed_archives_and_live_messages( + client: httpx.AsyncClient, +): + create_resp = await client.post("/api/v1/sessions", json={}) + session_id = create_resp.json()["result"]["session_id"] + + for content in ["archive one user", "archive one assistant"]: + await client.post( + f"/api/v1/sessions/{session_id}/messages", + json=_message_request("user", content=content), + ) + commit_one = await client.post(f"/api/v1/sessions/{session_id}/commit") + await _wait_for_task(client, commit_one.json()["result"]["task_id"]) + + for content in ["archive two user", "archive two assistant"]: + await client.post( + f"/api/v1/sessions/{session_id}/messages", + json=_message_request("assistant", content=content), + ) + commit_two = await client.post(f"/api/v1/sessions/{session_id}/commit") + await _wait_for_task(client, commit_two.json()["result"]["task_id"]) + + await client.post( + f"/api/v1/sessions/{session_id}/messages", + json=_message_request("user", content="live tail"), + ) + + resp = await client.get(f"/api/v1/sessions/{session_id}/messages?tail=3") + assert resp.status_code == 200 + body = resp.json() + assert body["status"] == "ok" + assert [m["parts"][0]["text"] for m in body["result"]["messages"]] == [ + "archive two user", + "archive two assistant", + "live tail", + ] + + async def test_get_session_archive_endpoint_returns_archive_details(client: httpx.AsyncClient): create_resp = await client.post("/api/v1/sessions", json={}) session_id = create_resp.json()["result"]["session_id"] diff --git a/tests/session/test_session_context.py b/tests/session/test_session_context.py index 2dbb7f71d..4c8e56831 100644 --- a/tests/session/test_session_context.py +++ b/tests/session/test_session_context.py @@ -331,6 +331,37 @@ async def gated_extract(messages, **kwargs): class TestGetSessionContext: """Test get_session_context""" + async def test_get_raw_messages_tail_reads_completed_archives_and_live_messages( + self, client: AsyncOpenViking + ): + session = client.session(session_id="raw_tail_completed_archives_test") + + for role, content in [ + ("user", "archive one user"), + ("assistant", "archive one assistant"), + ]: + session.add_message(role, [TextPart(content)]) + result_one = await session.commit_async() + await _wait_for_task(result_one["task_id"]) + + for role, content in [ + ("user", "archive two user"), + ("assistant", "archive two assistant"), + ]: + session.add_message(role, [TextPart(content)]) + result_two = await session.commit_async() + await _wait_for_task(result_two["task_id"]) + + session.add_message("user", [TextPart("live tail")]) + + tail = await session.get_raw_messages_tail(tail=3) + + assert [m.content for m in tail] == [ + "archive two user", + "archive two assistant", + "live tail", + ] + async def test_get_session_context_returns_latest_archive_overview_and_history( self, client: AsyncOpenViking, monkeypatch ):