From 3b62e24ddc9b5ec9c8266afe733e0bc26ba6220d Mon Sep 17 00:00:00 2001 From: Kai Date: Mon, 23 Mar 2026 12:50:31 -0700 Subject: [PATCH 1/8] fix: resolve merge conflicts in tracking endpoints --- src/server.ts | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/src/server.ts b/src/server.ts index 89d2265b..beb36511 100644 --- a/src/server.ts +++ b/src/server.ts @@ -15457,6 +15457,7 @@ If your heartbeat shows **no active task** and **no next task**: }) /** +<<<<<<< HEAD * GET /activation/ghost-signups — Users who signed up but never ran preflight. * Cloud polls this to find candidates for the ghost signup nudge email. * Query: ?minAgeHours=2 (default 2h; use 24 for 24h tier candidates) @@ -15517,6 +15518,31 @@ If your heartbeat shows **no active task** and **no next task**: return { success: true, result } }) + /** + * POST /tracking/live-cta — Track /live page CTA clicks + * Called by cloud app when user clicks "Start Free" on /live + * task-1774294960543-v778wwmio + */ + app.post('/tracking/live-cta', async (request) => { + const body = request.body as Record + const sourcePage = body.sourcePage as string || '/live' + const ctaType = body.ctaType as string || 'unknown' + const userId = body.userId as string || 'anonymous' + console.log(`[live-cta] ${new Date().toISOString()} page=${sourcePage} cta=${ctaType} userId=${userId}`) + return { success: true, tracked: true } + }) + + /** + * POST /tracking/live-visit — Track /live page visits + * Simple hit counter - logs each visit to console + */ + app.post('/tracking/live-visit', async (request) => { + const body = request.body as Record + const referrer = body.referrer as string || 'direct' + console.log(`[live-visit] ${new Date().toISOString()} referrer=${referrer}`) + return { success: true, visited: true } + }) + // Get task analytics app.get('/tasks/analytics', async (request) => { const query = request.query as Record From cc4ed949b4714038343fa6b54f6cd094fd4fdcd0 Mon Sep 17 00:00:00 2001 From: Kai Date: Mon, 23 Mar 2026 16:06:01 -0700 Subject: [PATCH 2/8] fix(node): clean up conflict markers + fix live-cta field names to match cloud (task-1774294960543-v778wwmio) --- src/server.ts | 70 +++------------------------------------------------ 1 file changed, 4 insertions(+), 66 deletions(-) diff --git a/src/server.ts b/src/server.ts index beb36511..fa8def1b 100644 --- a/src/server.ts +++ b/src/server.ts @@ -15456,68 +15456,6 @@ If your heartbeat shows **no active task** and **no next task**: return { success: true, trends: getWeeklyTrends(weeks) } }) - /** -<<<<<<< HEAD - * GET /activation/ghost-signups — Users who signed up but never ran preflight. - * Cloud polls this to find candidates for the ghost signup nudge email. - * Query: ?minAgeHours=2 (default 2h; use 24 for 24h tier candidates) - * - * task-1773709288800-lam5hd11b - */ - app.get('/activation/ghost-signups', async (request) => { - const query = request.query as Record - const minAgeHours = query.minAgeHours ? parseFloat(query.minAgeHours) : 2 - const minAgeMs = minAgeHours * 60 * 60 * 1000 - const { getGhostSignupCandidates } = await import('./ghost-signup-nudge.js') - const candidates = getGhostSignupCandidates(minAgeMs) - return { success: true, candidates, count: candidates.length, minAgeHours } - }) - - /** - * POST /activation/ghost-signup-nudge — Send re-engagement email to a ghost signup. - * Cloud calls this with { userId, email, nudgeTier? } after finding candidates. - * Node sends the email via cloud relay, tags the user, and returns result. - * - * Body: { userId: string, email: string, nudgeTier?: '2h' | '24h' } - * - * task-1773709288800-lam5hd11b - */ - app.post('/activation/ghost-signup-nudge', async (request, reply) => { - const body = request.body as Record - const userId = typeof body.userId === 'string' ? body.userId.trim() : '' - const email = typeof body.email === 'string' ? body.email.trim() : '' - const nudgeTier = (body.nudgeTier === '24h' ? '24h' : '2h') as '2h' | '24h' - - if (!userId) return reply.code(400).send({ success: false, error: 'userId is required' }) - if (!email || !email.includes('@')) return reply.code(400).send({ success: false, error: 'valid email is required' }) - - const { sendGhostSignupNudge } = await import('./ghost-signup-nudge.js') - - // Email relay function — delegates to existing /email/send infrastructure - const emailRelayFn = async (opts: { - from: string; to: string; subject: string; html: string; text: string; - tags?: Array<{ name: string; value: string }>; - }) => { - const hostId = process.env.REFLECTT_HOST_ID - const relayPath = hostId ? `/api/hosts/${encodeURIComponent(hostId)}/relay/email` : '/api/hosts/relay/email' - try { - const relayResult = await cloudRelay(relayPath, { - from: opts.from, to: opts.to, subject: opts.subject, - html: opts.html, text: opts.text, tags: opts.tags, - agent: 'funnel', - idempotencyKey: `ghost-signup-nudge/${userId}/${nudgeTier}`, - }, reply) as Record - const relayError = typeof relayResult?.error === 'string' ? relayResult.error : undefined - return { success: !relayError, error: relayError } - } catch (err: any) { - return { success: false, error: err?.message ?? 'relay error' } - } - } - - const result = await sendGhostSignupNudge(userId, email, nudgeTier, emailRelayFn) - return { success: true, result } - }) - /** * POST /tracking/live-cta — Track /live page CTA clicks * Called by cloud app when user clicks "Start Free" on /live @@ -15525,10 +15463,10 @@ If your heartbeat shows **no active task** and **no next task**: */ app.post('/tracking/live-cta', async (request) => { const body = request.body as Record - const sourcePage = body.sourcePage as string || '/live' - const ctaType = body.ctaType as string || 'unknown' - const userId = body.userId as string || 'anonymous' - console.log(`[live-cta] ${new Date().toISOString()} page=${sourcePage} cta=${ctaType} userId=${userId}`) + const source = body.source as string || 'unknown' + const url = body.url as string || '' + const ts = body.ts as number || Date.now() + console.log(`[live-cta] ${new Date().toISOString()} source=${source} url=${url} ts=${ts}`) return { success: true, tracked: true } }) From a6c0cc7fc3ae061887a7fd0c33092eb1a71cfe7e Mon Sep 17 00:00:00 2001 From: Kai Date: Mon, 23 Mar 2026 16:23:29 -0700 Subject: [PATCH 3/8] chore: trigger CI [skip ci] From 179ec6179f727a2cf2798988cf6791db8d355eae Mon Sep 17 00:00:00 2001 From: Kai Date: Mon, 23 Mar 2026 16:28:40 -0700 Subject: [PATCH 4/8] fix(docs): add tracking routes and remove stale ghost-signup routes - Add POST /tracking/live-cta and POST /tracking/live-visit endpoints to docs - Remove stale /activation/ghost-signups and /activation/ghost-signup-nudge from docs Fixes route-docs contract check failure blocking PR #1153 --- public/docs.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/public/docs.md b/public/docs.md index d25bcb96..509afb64 100644 --- a/public/docs.md +++ b/public/docs.md @@ -848,8 +848,8 @@ Autonomous work-continuity system. Monitors agent queue floors and auto-replenis | GET | `/activation/doctor-gate` | Check whether the BYOH onboarding doctor-gate has been passed for a user. Query: `?userId=...`. Returns `{ passed: boolean, events: ActivationEvent[] }`. Used by cloud onboarding to gate progression to workspace-ready step. | | GET | `/activation/funnel` | Get funnel state. Query: `?userId=...` for single user, no params for aggregate summary. `?raw=true` includes internal/infrastructure users for debugging. | | GET | `/activation/dashboard` | Full onboarding telemetry dashboard: conversion funnel, failure distribution, weekly trends. Query: `?weeks=12`, `?raw=true`. | -| GET | `/activation/ghost-signups` | List users who signed up but never passed preflight. Query: `?minAgeHours=2` (default 2). Returns `{ candidates: [{ userId, signupAt, hoursSinceSignup, preflightAttempted }] }`. | -| POST | `/activation/ghost-signup-nudge` | Send re-engagement email to a ghost signup user. Body: `{ userId, email, nudgeTier?: '2h' \| '24h' }`. Idempotent — won't re-send if already nudged at same tier. | +| POST | `/tracking/live-cta` | Track /live page CTA clicks. Called by cloud app when user clicks "Start Free" on /live. Body: `{ source?, url?, ts? }`. | +| POST | `/tracking/live-visit` | Track /live page visits. Simple hit counter - logs each visit. Body: `{ referrer? }`. | | GET | `/activation/funnel/conversions` | Step-by-step conversion rates with per-step reach count, conversion rate, and median step timing. Query: `?raw=true` includes internal users. | | GET | `/activation/funnel/failures` | Failure-reason distribution per step. Shows where users drop off and why (from event metadata). | | GET | `/activation/funnel/weekly` | Weekly trend snapshots for planning. Query: `?weeks=12`. Exportable JSON with per-week step counts, new users, completion rate. | From bd06cd27e3f73b13a1582faac5dea7ff9889cd79 Mon Sep 17 00:00:00 2001 From: Kai Date: Mon, 23 Mar 2026 16:30:03 -0700 Subject: [PATCH 5/8] fix(node): restore ghost-signup routes + add tracking routes to docs (task-1774294960543-v778wwmio) --- public/docs.md | 4 +++- src/server.ts | 60 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 1 deletion(-) diff --git a/public/docs.md b/public/docs.md index 509afb64..d159b787 100644 --- a/public/docs.md +++ b/public/docs.md @@ -848,8 +848,10 @@ Autonomous work-continuity system. Monitors agent queue floors and auto-replenis | GET | `/activation/doctor-gate` | Check whether the BYOH onboarding doctor-gate has been passed for a user. Query: `?userId=...`. Returns `{ passed: boolean, events: ActivationEvent[] }`. Used by cloud onboarding to gate progression to workspace-ready step. | | GET | `/activation/funnel` | Get funnel state. Query: `?userId=...` for single user, no params for aggregate summary. `?raw=true` includes internal/infrastructure users for debugging. | | GET | `/activation/dashboard` | Full onboarding telemetry dashboard: conversion funnel, failure distribution, weekly trends. Query: `?weeks=12`, `?raw=true`. | +| GET | `/activation/ghost-signups` | List users who signed up but never passed preflight. Query: `?minAgeHours=2` (default 2). Returns `{ candidates: [{ userId, signupAt, hoursSinceSignup, preflightAttempted }] }`. | +| POST | `/activation/ghost-signup-nudge` | Send re-engagement email to a ghost signup user. Body: `{ userId, email, nudgeTier?: '2h' | '24h' }`. Idempotent — won't re-send if already nudged at same tier. | | POST | `/tracking/live-cta` | Track /live page CTA clicks. Called by cloud app when user clicks "Start Free" on /live. Body: `{ source?, url?, ts? }`. | -| POST | `/tracking/live-visit` | Track /live page visits. Simple hit counter - logs each visit. Body: `{ referrer? }`. | +| POST | `/tracking/live-visit` | Track /live page visits. Simple hit counter - logs each visit. Body: `{ referrer? }`. |) | GET | `/activation/funnel/conversions` | Step-by-step conversion rates with per-step reach count, conversion rate, and median step timing. Query: `?raw=true` includes internal users. | | GET | `/activation/funnel/failures` | Failure-reason distribution per step. Shows where users drop off and why (from event metadata). | | GET | `/activation/funnel/weekly` | Weekly trend snapshots for planning. Query: `?weeks=12`. Exportable JSON with per-week step counts, new users, completion rate. | diff --git a/src/server.ts b/src/server.ts index fa8def1b..8d97d047 100644 --- a/src/server.ts +++ b/src/server.ts @@ -15456,6 +15456,66 @@ If your heartbeat shows **no active task** and **no next task**: return { success: true, trends: getWeeklyTrends(weeks) } }) + /** + * GET /activation/ghost-signups — Users who signed up but never ran preflight. + * Cloud polls this to find candidates for the ghost signup nudge email. + * Query: ?minAgeHours=2 (default 2h; use 24 for 24h tier candidates) + * + * task-1773709288800-lam5hd11b + */ + app.get('/activation/ghost-signups', async (request) => { + const query = request.query as Record + const minAgeHours = query.minAgeHours ? parseFloat(query.minAgeHours) : 2 + const minAgeMs = minAgeHours * 60 * 60 * 1000 + const { getGhostSignupCandidates } = await import('./ghost-signup-nudge.js') + const candidates = getGhostSignupCandidates(minAgeMs) + return { success: true, candidates, count: candidates.length, minAgeHours } + }) + + /** + * POST /activation/ghost-signup-nudge — Send re-engagement email to a ghost signup. + * Cloud calls this with { userId, email, nudgeTier? } after finding candidates. + * Node sends the email via cloud relay, tags the user, and returns result. + * + * Body: { userId: string, email: string, nudgeTier?: '2h' | '24h' } + * + * task-1773709288800-lam5hd11b + */ + app.post('/activation/ghost-signup-nudge', async (request, reply) => { + const body = request.body as Record + const userId = typeof body.userId === 'string' ? body.userId.trim() : '' + const email = typeof body.email === 'string' ? body.email.trim() : '' + const nudgeTier = (body.nudgeTier === '24h' ? '24h' : '2h') as '2h' | '24h' + + if (!userId) return reply.code(400).send({ success: false, error: 'userId is required' }) + if (!email || !email.includes('@')) return reply.code(400).send({ success: false, error: 'valid email is required' }) + + const { sendGhostSignupNudge } = await import('./ghost-signup-nudge.js') + + const emailRelayFn = async (opts: { + from: string; to: string; subject: string; html: string; text: string; + tags?: Array<{ name: string; value: string }>; + }) => { + const hostId = process.env.REFLECTT_HOST_ID + const relayPath = hostId ? `/api/hosts/${encodeURIComponent(hostId)}/relay/email` : '/api/hosts/relay/email' + try { + const relayResult = await cloudRelay(relayPath, { + from: opts.from, to: opts.to, subject: opts.subject, + html: opts.html, text: opts.text, tags: opts.tags, + agent: 'funnel', + idempotencyKey: `ghost-signup-nudge/${userId}/${nudgeTier}`, + }, reply) as Record + const relayError = typeof relayResult?.error === 'string' ? relayResult.error : undefined + return { success: !relayError, error: relayError } + } catch (err: any) { + return { success: false, error: err?.message ?? 'relay error' } + } + } + + const result = await sendGhostSignupNudge(userId, email, nudgeTier, emailRelayFn) + return { success: true, result } + }) + /** * POST /tracking/live-cta — Track /live page CTA clicks * Called by cloud app when user clicks "Start Free" on /live From 367e60252a9f47a9241150346dab5065860dee65 Mon Sep 17 00:00:00 2001 From: Kai Date: Wed, 15 Apr 2026 18:56:09 -0700 Subject: [PATCH 6/8] executionSweeper: pass metadata to suggestReviewer for lane-based routing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes bug where ops tasks were incorrectly routed to QA reviewers during validating-queue auto-reassignment. Root cause: suggestReviewer() uses metadata.lane for agentEligibleForTask() filtering (neverRoute/neverRouteUnlessLane), but executionSweeper never passed metadata — so lane keyword was never extracted and lane-based exclusions didn't fire. One-line fix: add metadata: meta to the suggestReviewer() call. --- src/executionSweeper.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/executionSweeper.ts b/src/executionSweeper.ts index ec0828f0..951f0c95 100644 --- a/src/executionSweeper.ts +++ b/src/executionSweeper.ts @@ -483,6 +483,7 @@ export async function sweepValidatingQueue(): Promise { assignee: task.assignee, tags: meta.tags as string[] | undefined, done_criteria: task.done_criteria, + metadata: meta, }, tasksForScoring, ) From 0f37656f73145c56f55b2c2dc146448709bcc26d Mon Sep 17 00:00:00 2001 From: Kai Date: Wed, 15 Apr 2026 19:46:40 -0700 Subject: [PATCH 7/8] fix(notificationDedupeGuard): Guard 2 fires on rank contradiction; Guard 1 catches same-rank re-emit MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Guard 2 was dead code: the timestamp condition (currentTaskUpdatedAt > eventUpdatedAt) was never satisfied at the call site in server.ts PATCH handler, where both values are task.updatedAt — always equal. Fix: - Drop Guard 2's timestamp condition — fires purely on rank contradiction (doneRank > doingRank → suppress). Guards are now independent. - Extend Guard 1 to detect same-rank re-emits: same updatedAt AND same status as last emission — catches done→done, validating→validating. - Cursor now stores { updatedAt, status } to support same-rank detection. - Update getDedupeState() and pruneDedupeState() to use new cursor shape. This closes the 'task notification honesty' vector where done→done re-affirmations could slip through and re-notify agents for tasks already marked done. Tests: 14 in notification-dedupe-guard.test.ts (including new same-rank re-emit test). Full suite: 223 test files, 2484 tests passing. --- src/notificationDedupeGuard.ts | 49 +++++++++++++++++-------- tests/notification-dedupe-guard.test.ts | 36 +++++++++++++++--- 2 files changed, 64 insertions(+), 21 deletions(-) diff --git a/src/notificationDedupeGuard.ts b/src/notificationDedupeGuard.ts index 125f166b..e169f1d7 100644 --- a/src/notificationDedupeGuard.ts +++ b/src/notificationDedupeGuard.ts @@ -5,15 +5,20 @@ * Notification Dedupe Guard * * Prevents stale/out-of-order task notification events: - * 1. Tracks lastSeenUpdatedAt per taskId — drops events with updatedAt <= lastSeen + * 1. Tracks lastSeen {updatedAt, status } per taskId — drops stale events and same-rank re-emits * 2. Checks current task status before emitting — suppresses contradictory transitions * (e.g., event says →doing but task is already done/cancelled) * 3. Uses strict > (not >=) cursor semantics for poller ordering */ -// ── In-memory cursor: taskId → last seen updatedAt ───────────────────────── +// ── In-memory cursor: taskId → { updatedAt, status } ─────────────────────── -const lastSeenByTaskId = new Map() +interface CursorEntry { + updatedAt: number + status: string +} + +const lastSeenByTaskId = new Map() // ── Status ordering (higher = further along the lifecycle) ───────────────── @@ -64,12 +69,22 @@ export function shouldEmitNotification(input: DedupeCheckInput): DedupeCheckResu // mutually suppressed by each other's cursor update. const cursorKey = targetAgent ? `${taskId}:${targetAgent}` : taskId - // Guard 1: Monotonic cursor — drop events with updatedAt <= lastSeen + // Guard 1: Monotonic cursor — drop events with updatedAt < lastSeen + // Also detect same-rank re-emit: same updatedAt AND same status as last emission. + // Guard 2 handles contradictory transitions (currentRank > eventRank). const lastSeen = lastSeenByTaskId.get(cursorKey) - if (lastSeen !== undefined && eventUpdatedAt <= lastSeen) { - return { - emit: false, - reason: `Stale event: updatedAt ${eventUpdatedAt} <= lastSeen ${lastSeen} for ${cursorKey}`, + if (lastSeen !== undefined) { + if (eventUpdatedAt < lastSeen.updatedAt) { + return { + emit: false, + reason: `Stale event: updatedAt ${eventUpdatedAt} < lastSeen ${lastSeen.updatedAt} for ${cursorKey}`, + } + } + if (eventUpdatedAt === lastSeen.updatedAt && eventStatus === lastSeen.status) { + return { + emit: false, + reason: `Same-rank re-emit: →${eventStatus} already emitted at ${eventUpdatedAt} for ${cursorKey}`, + } } } @@ -78,17 +93,19 @@ export function shouldEmitNotification(input: DedupeCheckInput): DedupeCheckResu const eventRank = statusRank(eventStatus) const currentRank = statusRank(currentTaskStatus) - // If current task is further along AND has a newer updatedAt, suppress - if (currentRank > eventRank && currentTaskUpdatedAt > eventUpdatedAt) { + // If current task is further along than the event claims, suppress. + // Guard 1 (monotonic cursor) handles ordering via updatedAt. + // Guard 2 catches contradictory transitions: event says →doing but task is already done. + if (currentRank > eventRank) { return { emit: false, - reason: `Contradictory: event says →${eventStatus} but task is already ${currentTaskStatus} (updatedAt: ${currentTaskUpdatedAt} > ${eventUpdatedAt})`, + reason: `Contradictory: event says →${eventStatus} but task is already ${currentTaskStatus}`, } } } // Update cursor - lastSeenByTaskId.set(cursorKey, eventUpdatedAt) + lastSeenByTaskId.set(cursorKey, { updatedAt: eventUpdatedAt, status: eventStatus }) return { emit: true } } @@ -96,8 +113,8 @@ export function shouldEmitNotification(input: DedupeCheckInput): DedupeCheckResu /** * Get current dedup state for diagnostics. */ -export function getDedupeState(): { cursors: Record; size: number } { - const cursors: Record = {} +export function getDedupeState(): { cursors: Record; size: number } { + const cursors: Record = {} for (const [k, v] of lastSeenByTaskId) cursors[k] = v return { cursors, size: lastSeenByTaskId.size } } @@ -116,8 +133,8 @@ export function clearDedupeState(): void { export function pruneDedupeState(maxAgeMs: number = 24 * 60 * 60 * 1000): number { const cutoff = Date.now() - maxAgeMs let pruned = 0 - for (const [taskId, ts] of lastSeenByTaskId) { - if (ts < cutoff) { + for (const [taskId, entry] of lastSeenByTaskId) { + if (entry.updatedAt < cutoff) { lastSeenByTaskId.delete(taskId) pruned++ } diff --git a/tests/notification-dedupe-guard.test.ts b/tests/notification-dedupe-guard.test.ts index 06c9b114..5f9758d5 100644 --- a/tests/notification-dedupe-guard.test.ts +++ b/tests/notification-dedupe-guard.test.ts @@ -33,7 +33,7 @@ describe('notificationDedupeGuard', () => { expect(result.emit).toBe(true) }) - it('drops event when updatedAt equals lastSeen', () => { + it('drops event when updatedAt equals lastSeen (same-rank re-emit)', () => { shouldEmitNotification({ taskId: 'task-1', eventUpdatedAt: 1000, eventStatus: 'doing' }) const result = shouldEmitNotification({ @@ -42,7 +42,7 @@ describe('notificationDedupeGuard', () => { eventStatus: 'doing', }) expect(result.emit).toBe(false) - expect(result.reason).toContain('Stale event') + expect(result.reason).toContain('Same-rank re-emit') }) it('drops event when updatedAt is less than lastSeen', () => { @@ -84,6 +84,32 @@ describe('notificationDedupeGuard', () => { expect(result.reason).toContain('Contradictory') }) + it('suppresses same-status re-emit (done→done on already-done task)', () => { + // Bug: server-side handler calls shouldEmitNotification with task.updatedAt for both + // eventUpdatedAt and currentTaskUpdatedAt (they're equal). Same-rank re-emits slip + // through unless the dedupe cursor tracks last-emitted status. + // First call — emits (no prior cursor), sets cursor to {3000, done} + const r1 = shouldEmitNotification({ + taskId: 'task-3', + eventUpdatedAt: 3000, + eventStatus: 'done', + currentTaskStatus: 'done', + currentTaskUpdatedAt: 3000, + }) + expect(r1.emit).toBe(true) + + // Second call — same timestamp AND same status, cursor exists → suppressed + const r2 = shouldEmitNotification({ + taskId: 'task-3', + eventUpdatedAt: 3000, + eventStatus: 'done', + currentTaskStatus: 'done', + currentTaskUpdatedAt: 3000, + }) + expect(r2.emit).toBe(false) + expect(r2.reason).toContain('Same-rank re-emit') + }) + it('allows event when task status matches', () => { const result = shouldEmitNotification({ taskId: 'task-4', @@ -129,8 +155,8 @@ describe('notificationDedupeGuard', () => { const state = getDedupeState() expect(state.size).toBe(2) - expect(state.cursors['task-a']).toBe(100) - expect(state.cursors['task-b']).toBe(200) + expect(state.cursors['task-a']).toEqual({ updatedAt: 100, status: 'doing' }) + expect(state.cursors['task-b']).toEqual({ updatedAt: 200, status: 'todo' }) }) it('clearDedupeState resets all cursors', () => { @@ -146,7 +172,7 @@ describe('notificationDedupeGuard', () => { }) it('pruneDedupeState removes old entries', () => { - // Insert an entry with old timestamp + // Insert an entry with old timestamp (1ms is always < now - 1000ms) shouldEmitNotification({ taskId: 'old-task', eventUpdatedAt: 1, eventStatus: 'doing' }) shouldEmitNotification({ taskId: 'new-task', eventUpdatedAt: Date.now(), eventStatus: 'doing' }) From bfbe482c19eb3c83762f9cca28837f57e7ac3c67 Mon Sep 17 00:00:00 2001 From: Kai Date: Thu, 16 Apr 2026 08:00:02 -0700 Subject: [PATCH 8/8] fix: default routing + speaker attribution + agent_identity_changed flush - Default routing: no-mention messages route to main agent instead of being dropped (resolveDefaultAgentId helper added) - Speaker attribution: use ctx.identity.name for outbound attribution instead of hardcoded 'openclaw_agent' - SSE: subscribe to agent_identity_changed and flush cached attribution state (lastUpdateByAgent, lastEscalationAt) when agent identity changes --- plugins/reflectt-channel/src/channel.ts | 48 +++++++++++++++++++++---- 1 file changed, 41 insertions(+), 7 deletions(-) diff --git a/plugins/reflectt-channel/src/channel.ts b/plugins/reflectt-channel/src/channel.ts index d03f73ca..8844fb36 100644 --- a/plugins/reflectt-channel/src/channel.ts +++ b/plugins/reflectt-channel/src/channel.ts @@ -26,6 +26,24 @@ function normalizeSenderId(value: unknown): string | null { return id.length > 0 ? id : null; } +/** Resolve the default agent ID from OpenClaw config — mirrors the gateway's own default resolution. */ +function resolveDefaultAgentId(cfg: OpenClawConfig): string { + const routingDefault = + typeof (cfg as any)?.routing?.defaultAgentId === "string" + ? (cfg as any).routing.defaultAgentId.trim() + : ""; + if (routingDefault) return routingDefault; + + const agents = cfg.agents?.list; + if (Array.isArray(agents) && agents.length > 0) { + const defaults = agents.filter((a: any) => a?.default); + const chosen = (defaults[0] ?? agents[0]) as any; + if (chosen?.id?.trim()) return chosen.id.trim(); + } + + return "main"; +} + function shouldEscalate(state: WatchdogState, key: string, now: number): boolean { const last = state.lastEscalationAt.get(key) ?? 0; if (now - last < ESCALATION_COOLDOWN_MS) return false; @@ -157,7 +175,7 @@ export const reflecttPlugin: ChannelPlugin = { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ - from: "openclaw_agent", + from: ctx.identity?.name || "openclaw_agent", channel: ctx.to || "general", content: ctx.text, }), @@ -273,6 +291,18 @@ export const reflecttPlugin: ChannelPlugin = { await handleChatMessage(message, cfg, runtime, log); } + + // Flush cached attribution when an agent's identity changes. + // The bootstrap → real identity handoff invalidates any per-agent + // attribution state (lastUpdateByAgent, lastEscalationAt) keyed to the old name. + if (event.type === "agent_identity_changed") { + const changedAgent = (event as any)?.agentId || (event as any)?.name; + if (changedAgent && runtime?.state) { + runtime.state.lastUpdateByAgent.delete(changedAgent); + runtime.state.lastEscalationAt.delete(changedAgent); + log?.info?.(`Flushed attribution cache for agent identity change: ${changedAgent}`); + } + } } catch (parseError) { log?.warn?.(`Failed to parse SSE data: ${parseError}`); } @@ -321,15 +351,19 @@ async function handleChatMessage( // Check if message mentions an agent const mentionedAgents = extractAgentMentions(safeContent, cfg); - if (mentionedAgents.length === 0) { - // No mentions, ignore - return; + // Default routing: if no mentions, route to the main/default agent + // This ensures humans can message without @mentioning anyone + let routeAgents = mentionedAgents; + if (routeAgents.length === 0) { + const defaultAgentId = resolveDefaultAgentId(cfg); + routeAgents = [defaultAgentId]; + log?.info?.(`No mentions — routing to default agent: ${defaultAgentId}`); } - log?.info?.(`Processing reflectt message from ${from} in ${safeChannel} (mentions: ${mentionedAgents.join(", ")})`); + log?.info?.(`Processing reflectt message from ${from} in ${safeChannel} (routing to: ${routeAgents.join(", ")})`); - // Route to each mentioned agent - for (const agentId of mentionedAgents) { + // Route to each agent (mentioned OR default) + for (const agentId of routeAgents) { try { // Resolve agent route — all reflectt rooms share one session per agent. // Room identity is preserved in To/OriginatingTo so replies route correctly.