diff --git a/plugins/reflectt-channel/src/channel.ts b/plugins/reflectt-channel/src/channel.ts index d03f73ca..8844fb36 100644 --- a/plugins/reflectt-channel/src/channel.ts +++ b/plugins/reflectt-channel/src/channel.ts @@ -26,6 +26,24 @@ function normalizeSenderId(value: unknown): string | null { return id.length > 0 ? id : null; } +/** Resolve the default agent ID from OpenClaw config — mirrors the gateway's own default resolution. */ +function resolveDefaultAgentId(cfg: OpenClawConfig): string { + const routingDefault = + typeof (cfg as any)?.routing?.defaultAgentId === "string" + ? (cfg as any).routing.defaultAgentId.trim() + : ""; + if (routingDefault) return routingDefault; + + const agents = cfg.agents?.list; + if (Array.isArray(agents) && agents.length > 0) { + const defaults = agents.filter((a: any) => a?.default); + const chosen = (defaults[0] ?? agents[0]) as any; + if (chosen?.id?.trim()) return chosen.id.trim(); + } + + return "main"; +} + function shouldEscalate(state: WatchdogState, key: string, now: number): boolean { const last = state.lastEscalationAt.get(key) ?? 0; if (now - last < ESCALATION_COOLDOWN_MS) return false; @@ -157,7 +175,7 @@ export const reflecttPlugin: ChannelPlugin = { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ - from: "openclaw_agent", + from: ctx.identity?.name || "openclaw_agent", channel: ctx.to || "general", content: ctx.text, }), @@ -273,6 +291,18 @@ export const reflecttPlugin: ChannelPlugin = { await handleChatMessage(message, cfg, runtime, log); } + + // Flush cached attribution when an agent's identity changes. + // The bootstrap → real identity handoff invalidates any per-agent + // attribution state (lastUpdateByAgent, lastEscalationAt) keyed to the old name. + if (event.type === "agent_identity_changed") { + const changedAgent = (event as any)?.agentId || (event as any)?.name; + if (changedAgent && runtime?.state) { + runtime.state.lastUpdateByAgent.delete(changedAgent); + runtime.state.lastEscalationAt.delete(changedAgent); + log?.info?.(`Flushed attribution cache for agent identity change: ${changedAgent}`); + } + } } catch (parseError) { log?.warn?.(`Failed to parse SSE data: ${parseError}`); } @@ -321,15 +351,19 @@ async function handleChatMessage( // Check if message mentions an agent const mentionedAgents = extractAgentMentions(safeContent, cfg); - if (mentionedAgents.length === 0) { - // No mentions, ignore - return; + // Default routing: if no mentions, route to the main/default agent + // This ensures humans can message without @mentioning anyone + let routeAgents = mentionedAgents; + if (routeAgents.length === 0) { + const defaultAgentId = resolveDefaultAgentId(cfg); + routeAgents = [defaultAgentId]; + log?.info?.(`No mentions — routing to default agent: ${defaultAgentId}`); } - log?.info?.(`Processing reflectt message from ${from} in ${safeChannel} (mentions: ${mentionedAgents.join(", ")})`); + log?.info?.(`Processing reflectt message from ${from} in ${safeChannel} (routing to: ${routeAgents.join(", ")})`); - // Route to each mentioned agent - for (const agentId of mentionedAgents) { + // Route to each agent (mentioned OR default) + for (const agentId of routeAgents) { try { // Resolve agent route — all reflectt rooms share one session per agent. // Room identity is preserved in To/OriginatingTo so replies route correctly. diff --git a/public/docs.md b/public/docs.md index d25bcb96..d159b787 100644 --- a/public/docs.md +++ b/public/docs.md @@ -849,7 +849,9 @@ Autonomous work-continuity system. Monitors agent queue floors and auto-replenis | GET | `/activation/funnel` | Get funnel state. Query: `?userId=...` for single user, no params for aggregate summary. `?raw=true` includes internal/infrastructure users for debugging. | | GET | `/activation/dashboard` | Full onboarding telemetry dashboard: conversion funnel, failure distribution, weekly trends. Query: `?weeks=12`, `?raw=true`. | | GET | `/activation/ghost-signups` | List users who signed up but never passed preflight. Query: `?minAgeHours=2` (default 2). Returns `{ candidates: [{ userId, signupAt, hoursSinceSignup, preflightAttempted }] }`. | -| POST | `/activation/ghost-signup-nudge` | Send re-engagement email to a ghost signup user. Body: `{ userId, email, nudgeTier?: '2h' \| '24h' }`. Idempotent — won't re-send if already nudged at same tier. | +| POST | `/activation/ghost-signup-nudge` | Send re-engagement email to a ghost signup user. Body: `{ userId, email, nudgeTier?: '2h' | '24h' }`. Idempotent — won't re-send if already nudged at same tier. | +| POST | `/tracking/live-cta` | Track /live page CTA clicks. Called by cloud app when user clicks "Start Free" on /live. Body: `{ source?, url?, ts? }`. | +| POST | `/tracking/live-visit` | Track /live page visits. Simple hit counter - logs each visit. Body: `{ referrer? }`. |) | GET | `/activation/funnel/conversions` | Step-by-step conversion rates with per-step reach count, conversion rate, and median step timing. Query: `?raw=true` includes internal users. | | GET | `/activation/funnel/failures` | Failure-reason distribution per step. Shows where users drop off and why (from event metadata). | | GET | `/activation/funnel/weekly` | Weekly trend snapshots for planning. Query: `?weeks=12`. Exportable JSON with per-week step counts, new users, completion rate. | diff --git a/src/executionSweeper.ts b/src/executionSweeper.ts index ec0828f0..951f0c95 100644 --- a/src/executionSweeper.ts +++ b/src/executionSweeper.ts @@ -483,6 +483,7 @@ export async function sweepValidatingQueue(): Promise { assignee: task.assignee, tags: meta.tags as string[] | undefined, done_criteria: task.done_criteria, + metadata: meta, }, tasksForScoring, ) diff --git a/src/notificationDedupeGuard.ts b/src/notificationDedupeGuard.ts index 125f166b..e169f1d7 100644 --- a/src/notificationDedupeGuard.ts +++ b/src/notificationDedupeGuard.ts @@ -5,15 +5,20 @@ * Notification Dedupe Guard * * Prevents stale/out-of-order task notification events: - * 1. Tracks lastSeenUpdatedAt per taskId — drops events with updatedAt <= lastSeen + * 1. Tracks lastSeen {updatedAt, status } per taskId — drops stale events and same-rank re-emits * 2. Checks current task status before emitting — suppresses contradictory transitions * (e.g., event says →doing but task is already done/cancelled) * 3. Uses strict > (not >=) cursor semantics for poller ordering */ -// ── In-memory cursor: taskId → last seen updatedAt ───────────────────────── +// ── In-memory cursor: taskId → { updatedAt, status } ─────────────────────── -const lastSeenByTaskId = new Map() +interface CursorEntry { + updatedAt: number + status: string +} + +const lastSeenByTaskId = new Map() // ── Status ordering (higher = further along the lifecycle) ───────────────── @@ -64,12 +69,22 @@ export function shouldEmitNotification(input: DedupeCheckInput): DedupeCheckResu // mutually suppressed by each other's cursor update. const cursorKey = targetAgent ? `${taskId}:${targetAgent}` : taskId - // Guard 1: Monotonic cursor — drop events with updatedAt <= lastSeen + // Guard 1: Monotonic cursor — drop events with updatedAt < lastSeen + // Also detect same-rank re-emit: same updatedAt AND same status as last emission. + // Guard 2 handles contradictory transitions (currentRank > eventRank). const lastSeen = lastSeenByTaskId.get(cursorKey) - if (lastSeen !== undefined && eventUpdatedAt <= lastSeen) { - return { - emit: false, - reason: `Stale event: updatedAt ${eventUpdatedAt} <= lastSeen ${lastSeen} for ${cursorKey}`, + if (lastSeen !== undefined) { + if (eventUpdatedAt < lastSeen.updatedAt) { + return { + emit: false, + reason: `Stale event: updatedAt ${eventUpdatedAt} < lastSeen ${lastSeen.updatedAt} for ${cursorKey}`, + } + } + if (eventUpdatedAt === lastSeen.updatedAt && eventStatus === lastSeen.status) { + return { + emit: false, + reason: `Same-rank re-emit: →${eventStatus} already emitted at ${eventUpdatedAt} for ${cursorKey}`, + } } } @@ -78,17 +93,19 @@ export function shouldEmitNotification(input: DedupeCheckInput): DedupeCheckResu const eventRank = statusRank(eventStatus) const currentRank = statusRank(currentTaskStatus) - // If current task is further along AND has a newer updatedAt, suppress - if (currentRank > eventRank && currentTaskUpdatedAt > eventUpdatedAt) { + // If current task is further along than the event claims, suppress. + // Guard 1 (monotonic cursor) handles ordering via updatedAt. + // Guard 2 catches contradictory transitions: event says →doing but task is already done. + if (currentRank > eventRank) { return { emit: false, - reason: `Contradictory: event says →${eventStatus} but task is already ${currentTaskStatus} (updatedAt: ${currentTaskUpdatedAt} > ${eventUpdatedAt})`, + reason: `Contradictory: event says →${eventStatus} but task is already ${currentTaskStatus}`, } } } // Update cursor - lastSeenByTaskId.set(cursorKey, eventUpdatedAt) + lastSeenByTaskId.set(cursorKey, { updatedAt: eventUpdatedAt, status: eventStatus }) return { emit: true } } @@ -96,8 +113,8 @@ export function shouldEmitNotification(input: DedupeCheckInput): DedupeCheckResu /** * Get current dedup state for diagnostics. */ -export function getDedupeState(): { cursors: Record; size: number } { - const cursors: Record = {} +export function getDedupeState(): { cursors: Record; size: number } { + const cursors: Record = {} for (const [k, v] of lastSeenByTaskId) cursors[k] = v return { cursors, size: lastSeenByTaskId.size } } @@ -116,8 +133,8 @@ export function clearDedupeState(): void { export function pruneDedupeState(maxAgeMs: number = 24 * 60 * 60 * 1000): number { const cutoff = Date.now() - maxAgeMs let pruned = 0 - for (const [taskId, ts] of lastSeenByTaskId) { - if (ts < cutoff) { + for (const [taskId, entry] of lastSeenByTaskId) { + if (entry.updatedAt < cutoff) { lastSeenByTaskId.delete(taskId) pruned++ } diff --git a/src/server.ts b/src/server.ts index 89d2265b..8d97d047 100644 --- a/src/server.ts +++ b/src/server.ts @@ -15492,7 +15492,6 @@ If your heartbeat shows **no active task** and **no next task**: const { sendGhostSignupNudge } = await import('./ghost-signup-nudge.js') - // Email relay function — delegates to existing /email/send infrastructure const emailRelayFn = async (opts: { from: string; to: string; subject: string; html: string; text: string; tags?: Array<{ name: string; value: string }>; @@ -15517,6 +15516,31 @@ If your heartbeat shows **no active task** and **no next task**: return { success: true, result } }) + /** + * POST /tracking/live-cta — Track /live page CTA clicks + * Called by cloud app when user clicks "Start Free" on /live + * task-1774294960543-v778wwmio + */ + app.post('/tracking/live-cta', async (request) => { + const body = request.body as Record + const source = body.source as string || 'unknown' + const url = body.url as string || '' + const ts = body.ts as number || Date.now() + console.log(`[live-cta] ${new Date().toISOString()} source=${source} url=${url} ts=${ts}`) + return { success: true, tracked: true } + }) + + /** + * POST /tracking/live-visit — Track /live page visits + * Simple hit counter - logs each visit to console + */ + app.post('/tracking/live-visit', async (request) => { + const body = request.body as Record + const referrer = body.referrer as string || 'direct' + console.log(`[live-visit] ${new Date().toISOString()} referrer=${referrer}`) + return { success: true, visited: true } + }) + // Get task analytics app.get('/tasks/analytics', async (request) => { const query = request.query as Record diff --git a/tests/notification-dedupe-guard.test.ts b/tests/notification-dedupe-guard.test.ts index 06c9b114..5f9758d5 100644 --- a/tests/notification-dedupe-guard.test.ts +++ b/tests/notification-dedupe-guard.test.ts @@ -33,7 +33,7 @@ describe('notificationDedupeGuard', () => { expect(result.emit).toBe(true) }) - it('drops event when updatedAt equals lastSeen', () => { + it('drops event when updatedAt equals lastSeen (same-rank re-emit)', () => { shouldEmitNotification({ taskId: 'task-1', eventUpdatedAt: 1000, eventStatus: 'doing' }) const result = shouldEmitNotification({ @@ -42,7 +42,7 @@ describe('notificationDedupeGuard', () => { eventStatus: 'doing', }) expect(result.emit).toBe(false) - expect(result.reason).toContain('Stale event') + expect(result.reason).toContain('Same-rank re-emit') }) it('drops event when updatedAt is less than lastSeen', () => { @@ -84,6 +84,32 @@ describe('notificationDedupeGuard', () => { expect(result.reason).toContain('Contradictory') }) + it('suppresses same-status re-emit (done→done on already-done task)', () => { + // Bug: server-side handler calls shouldEmitNotification with task.updatedAt for both + // eventUpdatedAt and currentTaskUpdatedAt (they're equal). Same-rank re-emits slip + // through unless the dedupe cursor tracks last-emitted status. + // First call — emits (no prior cursor), sets cursor to {3000, done} + const r1 = shouldEmitNotification({ + taskId: 'task-3', + eventUpdatedAt: 3000, + eventStatus: 'done', + currentTaskStatus: 'done', + currentTaskUpdatedAt: 3000, + }) + expect(r1.emit).toBe(true) + + // Second call — same timestamp AND same status, cursor exists → suppressed + const r2 = shouldEmitNotification({ + taskId: 'task-3', + eventUpdatedAt: 3000, + eventStatus: 'done', + currentTaskStatus: 'done', + currentTaskUpdatedAt: 3000, + }) + expect(r2.emit).toBe(false) + expect(r2.reason).toContain('Same-rank re-emit') + }) + it('allows event when task status matches', () => { const result = shouldEmitNotification({ taskId: 'task-4', @@ -129,8 +155,8 @@ describe('notificationDedupeGuard', () => { const state = getDedupeState() expect(state.size).toBe(2) - expect(state.cursors['task-a']).toBe(100) - expect(state.cursors['task-b']).toBe(200) + expect(state.cursors['task-a']).toEqual({ updatedAt: 100, status: 'doing' }) + expect(state.cursors['task-b']).toEqual({ updatedAt: 200, status: 'todo' }) }) it('clearDedupeState resets all cursors', () => { @@ -146,7 +172,7 @@ describe('notificationDedupeGuard', () => { }) it('pruneDedupeState removes old entries', () => { - // Insert an entry with old timestamp + // Insert an entry with old timestamp (1ms is always < now - 1000ms) shouldEmitNotification({ taskId: 'old-task', eventUpdatedAt: 1, eventStatus: 'doing' }) shouldEmitNotification({ taskId: 'new-task', eventUpdatedAt: Date.now(), eventStatus: 'doing' })