Skip to content
Open
48 changes: 41 additions & 7 deletions plugins/reflectt-channel/src/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,24 @@ function normalizeSenderId(value: unknown): string | null {
return id.length > 0 ? id : null;
}

/** Resolve the default agent ID from OpenClaw config — mirrors the gateway's own default resolution. */
function resolveDefaultAgentId(cfg: OpenClawConfig): string {
const routingDefault =
typeof (cfg as any)?.routing?.defaultAgentId === "string"
? (cfg as any).routing.defaultAgentId.trim()
: "";
if (routingDefault) return routingDefault;

const agents = cfg.agents?.list;
if (Array.isArray(agents) && agents.length > 0) {
const defaults = agents.filter((a: any) => a?.default);
const chosen = (defaults[0] ?? agents[0]) as any;
if (chosen?.id?.trim()) return chosen.id.trim();
}

return "main";
}

function shouldEscalate(state: WatchdogState, key: string, now: number): boolean {
const last = state.lastEscalationAt.get(key) ?? 0;
if (now - last < ESCALATION_COOLDOWN_MS) return false;
Expand Down Expand Up @@ -157,7 +175,7 @@ export const reflecttPlugin: ChannelPlugin<ResolvedReflecttAccount> = {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
from: "openclaw_agent",
from: ctx.identity?.name || "openclaw_agent",
channel: ctx.to || "general",
content: ctx.text,
}),
Expand Down Expand Up @@ -273,6 +291,18 @@ export const reflecttPlugin: ChannelPlugin<ResolvedReflecttAccount> = {

await handleChatMessage(message, cfg, runtime, log);
}

// Flush cached attribution when an agent's identity changes.
// The bootstrap → real identity handoff invalidates any per-agent
// attribution state (lastUpdateByAgent, lastEscalationAt) keyed to the old name.
if (event.type === "agent_identity_changed") {
const changedAgent = (event as any)?.agentId || (event as any)?.name;
if (changedAgent && runtime?.state) {
runtime.state.lastUpdateByAgent.delete(changedAgent);
runtime.state.lastEscalationAt.delete(changedAgent);
log?.info?.(`Flushed attribution cache for agent identity change: ${changedAgent}`);
}
}
} catch (parseError) {
log?.warn?.(`Failed to parse SSE data: ${parseError}`);
}
Expand Down Expand Up @@ -321,15 +351,19 @@ async function handleChatMessage(
// Check if message mentions an agent
const mentionedAgents = extractAgentMentions(safeContent, cfg);

if (mentionedAgents.length === 0) {
// No mentions, ignore
return;
// Default routing: if no mentions, route to the main/default agent
// This ensures humans can message without @mentioning anyone
let routeAgents = mentionedAgents;
if (routeAgents.length === 0) {
const defaultAgentId = resolveDefaultAgentId(cfg);
routeAgents = [defaultAgentId];
log?.info?.(`No mentions — routing to default agent: ${defaultAgentId}`);
}

log?.info?.(`Processing reflectt message from ${from} in ${safeChannel} (mentions: ${mentionedAgents.join(", ")})`);
log?.info?.(`Processing reflectt message from ${from} in ${safeChannel} (routing to: ${routeAgents.join(", ")})`);

// Route to each mentioned agent
for (const agentId of mentionedAgents) {
// Route to each agent (mentioned OR default)
for (const agentId of routeAgents) {
try {
// Resolve agent route — all reflectt rooms share one session per agent.
// Room identity is preserved in To/OriginatingTo so replies route correctly.
Expand Down
4 changes: 3 additions & 1 deletion public/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -849,7 +849,9 @@ Autonomous work-continuity system. Monitors agent queue floors and auto-replenis
| GET | `/activation/funnel` | Get funnel state. Query: `?userId=...` for single user, no params for aggregate summary. `?raw=true` includes internal/infrastructure users for debugging. |
| GET | `/activation/dashboard` | Full onboarding telemetry dashboard: conversion funnel, failure distribution, weekly trends. Query: `?weeks=12`, `?raw=true`. |
| GET | `/activation/ghost-signups` | List users who signed up but never passed preflight. Query: `?minAgeHours=2` (default 2). Returns `{ candidates: [{ userId, signupAt, hoursSinceSignup, preflightAttempted }] }`. |
| POST | `/activation/ghost-signup-nudge` | Send re-engagement email to a ghost signup user. Body: `{ userId, email, nudgeTier?: '2h' \| '24h' }`. Idempotent — won't re-send if already nudged at same tier. |
| POST | `/activation/ghost-signup-nudge` | Send re-engagement email to a ghost signup user. Body: `{ userId, email, nudgeTier?: '2h' | '24h' }`. Idempotent — won't re-send if already nudged at same tier. |
| POST | `/tracking/live-cta` | Track /live page CTA clicks. Called by cloud app when user clicks "Start Free" on /live. Body: `{ source?, url?, ts? }`. |
| POST | `/tracking/live-visit` | Track /live page visits. Simple hit counter - logs each visit. Body: `{ referrer? }`. |)
| GET | `/activation/funnel/conversions` | Step-by-step conversion rates with per-step reach count, conversion rate, and median step timing. Query: `?raw=true` includes internal users. |
| GET | `/activation/funnel/failures` | Failure-reason distribution per step. Shows where users drop off and why (from event metadata). |
| GET | `/activation/funnel/weekly` | Weekly trend snapshots for planning. Query: `?weeks=12`. Exportable JSON with per-week step counts, new users, completion rate. |
Expand Down
1 change: 1 addition & 0 deletions src/executionSweeper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@ export async function sweepValidatingQueue(): Promise<SweepResult> {
assignee: task.assignee,
tags: meta.tags as string[] | undefined,
done_criteria: task.done_criteria,
metadata: meta,
},
tasksForScoring,
)
Expand Down
49 changes: 33 additions & 16 deletions src/notificationDedupeGuard.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,20 @@
* Notification Dedupe Guard
*
* Prevents stale/out-of-order task notification events:
* 1. Tracks lastSeenUpdatedAt per taskId — drops events with updatedAt <= lastSeen
* 1. Tracks lastSeen {updatedAt, status } per taskId — drops stale events and same-rank re-emits
* 2. Checks current task status before emitting — suppresses contradictory transitions
* (e.g., event says →doing but task is already done/cancelled)
* 3. Uses strict > (not >=) cursor semantics for poller ordering
*/

// ── In-memory cursor: taskId → last seen updatedAt ─────────────────────────
// ── In-memory cursor: taskId → { updatedAt, status } ───────────────────────

const lastSeenByTaskId = new Map<string, number>()
interface CursorEntry {
updatedAt: number
status: string
}

const lastSeenByTaskId = new Map<string, CursorEntry>()

// ── Status ordering (higher = further along the lifecycle) ─────────────────

Expand Down Expand Up @@ -64,12 +69,22 @@ export function shouldEmitNotification(input: DedupeCheckInput): DedupeCheckResu
// mutually suppressed by each other's cursor update.
const cursorKey = targetAgent ? `${taskId}:${targetAgent}` : taskId

// Guard 1: Monotonic cursor — drop events with updatedAt <= lastSeen
// Guard 1: Monotonic cursor — drop events with updatedAt < lastSeen
// Also detect same-rank re-emit: same updatedAt AND same status as last emission.
// Guard 2 handles contradictory transitions (currentRank > eventRank).
const lastSeen = lastSeenByTaskId.get(cursorKey)
if (lastSeen !== undefined && eventUpdatedAt <= lastSeen) {
return {
emit: false,
reason: `Stale event: updatedAt ${eventUpdatedAt} <= lastSeen ${lastSeen} for ${cursorKey}`,
if (lastSeen !== undefined) {
if (eventUpdatedAt < lastSeen.updatedAt) {
return {
emit: false,
reason: `Stale event: updatedAt ${eventUpdatedAt} < lastSeen ${lastSeen.updatedAt} for ${cursorKey}`,
}
}
if (eventUpdatedAt === lastSeen.updatedAt && eventStatus === lastSeen.status) {
return {
emit: false,
reason: `Same-rank re-emit: →${eventStatus} already emitted at ${eventUpdatedAt} for ${cursorKey}`,
}
}
}

Expand All @@ -78,26 +93,28 @@ export function shouldEmitNotification(input: DedupeCheckInput): DedupeCheckResu
const eventRank = statusRank(eventStatus)
const currentRank = statusRank(currentTaskStatus)

// If current task is further along AND has a newer updatedAt, suppress
if (currentRank > eventRank && currentTaskUpdatedAt > eventUpdatedAt) {
// If current task is further along than the event claims, suppress.
// Guard 1 (monotonic cursor) handles ordering via updatedAt.
// Guard 2 catches contradictory transitions: event says →doing but task is already done.
if (currentRank > eventRank) {
return {
emit: false,
reason: `Contradictory: event says →${eventStatus} but task is already ${currentTaskStatus} (updatedAt: ${currentTaskUpdatedAt} > ${eventUpdatedAt})`,
reason: `Contradictory: event says →${eventStatus} but task is already ${currentTaskStatus}`,
}
}
}

// Update cursor
lastSeenByTaskId.set(cursorKey, eventUpdatedAt)
lastSeenByTaskId.set(cursorKey, { updatedAt: eventUpdatedAt, status: eventStatus })

return { emit: true }
}

/**
* Get current dedup state for diagnostics.
*/
export function getDedupeState(): { cursors: Record<string, number>; size: number } {
const cursors: Record<string, number> = {}
export function getDedupeState(): { cursors: Record<string, CursorEntry>; size: number } {
const cursors: Record<string, CursorEntry> = {}
for (const [k, v] of lastSeenByTaskId) cursors[k] = v
return { cursors, size: lastSeenByTaskId.size }
}
Expand All @@ -116,8 +133,8 @@ export function clearDedupeState(): void {
export function pruneDedupeState(maxAgeMs: number = 24 * 60 * 60 * 1000): number {
const cutoff = Date.now() - maxAgeMs
let pruned = 0
for (const [taskId, ts] of lastSeenByTaskId) {
if (ts < cutoff) {
for (const [taskId, entry] of lastSeenByTaskId) {
if (entry.updatedAt < cutoff) {
lastSeenByTaskId.delete(taskId)
pruned++
}
Expand Down
26 changes: 25 additions & 1 deletion src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15492,7 +15492,6 @@ If your heartbeat shows **no active task** and **no next task**:

const { sendGhostSignupNudge } = await import('./ghost-signup-nudge.js')

// Email relay function — delegates to existing /email/send infrastructure
const emailRelayFn = async (opts: {
from: string; to: string; subject: string; html: string; text: string;
tags?: Array<{ name: string; value: string }>;
Expand All @@ -15517,6 +15516,31 @@ If your heartbeat shows **no active task** and **no next task**:
return { success: true, result }
})

/**
* POST /tracking/live-cta — Track /live page CTA clicks
* Called by cloud app when user clicks "Start Free" on /live
* task-1774294960543-v778wwmio
*/
app.post('/tracking/live-cta', async (request) => {
const body = request.body as Record<string, unknown>
const source = body.source as string || 'unknown'
const url = body.url as string || ''
const ts = body.ts as number || Date.now()
console.log(`[live-cta] ${new Date().toISOString()} source=${source} url=${url} ts=${ts}`)
return { success: true, tracked: true }
})

/**
* POST /tracking/live-visit — Track /live page visits
* Simple hit counter - logs each visit to console
*/
app.post('/tracking/live-visit', async (request) => {
const body = request.body as Record<string, unknown>
const referrer = body.referrer as string || 'direct'
console.log(`[live-visit] ${new Date().toISOString()} referrer=${referrer}`)
return { success: true, visited: true }
})

// Get task analytics
app.get('/tasks/analytics', async (request) => {
const query = request.query as Record<string, string>
Expand Down
36 changes: 31 additions & 5 deletions tests/notification-dedupe-guard.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ describe('notificationDedupeGuard', () => {
expect(result.emit).toBe(true)
})

it('drops event when updatedAt equals lastSeen', () => {
it('drops event when updatedAt equals lastSeen (same-rank re-emit)', () => {
shouldEmitNotification({ taskId: 'task-1', eventUpdatedAt: 1000, eventStatus: 'doing' })

const result = shouldEmitNotification({
Expand All @@ -42,7 +42,7 @@ describe('notificationDedupeGuard', () => {
eventStatus: 'doing',
})
expect(result.emit).toBe(false)
expect(result.reason).toContain('Stale event')
expect(result.reason).toContain('Same-rank re-emit')
})

it('drops event when updatedAt is less than lastSeen', () => {
Expand Down Expand Up @@ -84,6 +84,32 @@ describe('notificationDedupeGuard', () => {
expect(result.reason).toContain('Contradictory')
})

it('suppresses same-status re-emit (done→done on already-done task)', () => {
// Bug: server-side handler calls shouldEmitNotification with task.updatedAt for both
// eventUpdatedAt and currentTaskUpdatedAt (they're equal). Same-rank re-emits slip
// through unless the dedupe cursor tracks last-emitted status.
// First call — emits (no prior cursor), sets cursor to {3000, done}
const r1 = shouldEmitNotification({
taskId: 'task-3',
eventUpdatedAt: 3000,
eventStatus: 'done',
currentTaskStatus: 'done',
currentTaskUpdatedAt: 3000,
})
expect(r1.emit).toBe(true)

// Second call — same timestamp AND same status, cursor exists → suppressed
const r2 = shouldEmitNotification({
taskId: 'task-3',
eventUpdatedAt: 3000,
eventStatus: 'done',
currentTaskStatus: 'done',
currentTaskUpdatedAt: 3000,
})
expect(r2.emit).toBe(false)
expect(r2.reason).toContain('Same-rank re-emit')
})

it('allows event when task status matches', () => {
const result = shouldEmitNotification({
taskId: 'task-4',
Expand Down Expand Up @@ -129,8 +155,8 @@ describe('notificationDedupeGuard', () => {

const state = getDedupeState()
expect(state.size).toBe(2)
expect(state.cursors['task-a']).toBe(100)
expect(state.cursors['task-b']).toBe(200)
expect(state.cursors['task-a']).toEqual({ updatedAt: 100, status: 'doing' })
expect(state.cursors['task-b']).toEqual({ updatedAt: 200, status: 'todo' })
})

it('clearDedupeState resets all cursors', () => {
Expand All @@ -146,7 +172,7 @@ describe('notificationDedupeGuard', () => {
})

it('pruneDedupeState removes old entries', () => {
// Insert an entry with old timestamp
// Insert an entry with old timestamp (1ms is always < now - 1000ms)
shouldEmitNotification({ taskId: 'old-task', eventUpdatedAt: 1, eventStatus: 'doing' })
shouldEmitNotification({ taskId: 'new-task', eventUpdatedAt: Date.now(), eventStatus: 'doing' })

Expand Down
Loading