diff --git a/CLAUDE.md b/CLAUDE.md index d4a55a5..f7a4fca 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -44,8 +44,8 @@ No build step. No linting. Restart to apply changes. | `server/hub.js` | Multi-project hub: lockfile (`~/.ccxray/hub.json`), discovery (with orphan port probe fallback), client registration, idle shutdown (injectable via setOnShutdown), crash auto-recovery | | `server/auth.js` | API key auth middleware (enabled via `AUTH_TOKEN` env) | | `server/openai-session.js` | Shared OpenAI/Codex header + session helpers (session id extraction, agent type, turn-metadata sidecar) | -| `server/ws-proxy.js` | OpenAI WebSocket transport proxy for `/v1/responses` and `/v1/realtime` upgrades. Tunables: `CCXRAY_WS_IDLE_TIMEOUT_MS` (default 60s), `CCXRAY_WS_MAX_QUEUE_BYTES` (default 4 MiB; caps client→upstream buffer while upstream is connecting) | -| `server/storage/` | Storage adapters (local filesystem, S3/R2). `statShared()` for file mtime. `supportsDelta` flag gates delta-write eligibility | +| `server/ws-proxy.js` | OpenAI WebSocket transport proxy for `/v1/responses` and `/v1/realtime` upgrades. Tracks active sessions + pending `recordWebSocketEntry` promises so `drainWebSocketProxy()` can force-finalize stragglers and await writes on shutdown. Tunables: `CCXRAY_WS_IDLE_TIMEOUT_MS` (default 60s), `CCXRAY_WS_MAX_QUEUE_BYTES` (default 4 MiB; caps client→upstream buffer while upstream is connecting) | +| `server/storage/` | Storage adapters (local filesystem, S3/R2). `statShared()` for file mtime. `supportsDelta` flag gates delta-write eligibility. The factory wraps every adapter with a write-tracker that exposes `drain()` for graceful shutdown | ### Client (`public/`) @@ -90,6 +90,8 @@ ccxray claude (2nd) → discover hub via ~/.ccxray/hub.json → connect as clie - Extra user args pass through unchanged after ccxray's injected launcher config. - `--no-browser` only suppresses browser auto-open. The dashboard remains available on the proxy port. - Codex's main session traffic upgrades to a WebSocket on `POST /v1/responses` (with `openai-beta: responses_websockets=*`), not `/v1/realtime`. `/v1/realtime` exists for the older Realtime API but is not what current codex uses for normal `/goal` / chat turns. When ChatGPT auth is active, codex also sends `chatgpt-account-id`, which `getUpstreamForRequestAndHeaders` (see `server/config.js`) uses to route to `CHATGPT_BASE_URL` instead of `OPENAI_BASE_URL`. +- Codex 0.133+ pings ~10 platform endpoints on startup (`/v1/plugins/*`, `/v1/ps/plugins/*`, `/v1/connectors/*`, `/v1/api/codex/apps`, `/v1/api/codex/usage`). `isCodexPlatformNoisePath` in `server/config.js` flags them; `server/index.js` forwards them with `skipEntry: true` so they don't pollute the dashboard. `/v1/codex/analytics-events/events` (the telemetry endpoint) is intentionally kept visible — a follow-up will parse turn metadata out of it. +- Graceful shutdown: `spawnStandaloneAgent`, hub idle shutdown, and SIGTERM/SIGINT handlers route through `gracefulExit(code)` in `server/index.js`. It awaits `drainWebSocketProxy()` (force-finalizes any open WS sessions, awaits their `recordWebSocketEntry` promises) then `config.storage.drain()` (awaits pending fs writes) before calling `process.exit`, bounded by a 5s safety timeout. Without this, async storage writes for WS entries lose to `process.exit` and leave 0-byte log files. ### Data Flow diff --git a/README.md b/README.md index 5093482..bdd157c 100644 --- a/README.md +++ b/README.md @@ -69,6 +69,23 @@ Connected clients (2): Use `--port` to opt out and run an independent server instead. +## Codex support (Beta) + +```bash +npx ccxray codex +``` + +Works for both API-key and ChatGPT-auth codex sessions. ChatGPT routing to `chatgpt.com/backend-api/codex` triggers automatically on codex's `chatgpt-account-id` header — no extra config. Codex's startup platform polls (plugin lists, connector directory, app metadata) are proxied but hidden from the dashboard so the timeline shows only conversation traffic. + +**Beta caveats:** +- WebSocket transport (`/v1/responses`, `/v1/realtime`) captures connection-level metadata only: frame counts, byte counts, close status. Per-frame content is not decoded — codex turns show less detail in the dashboard than Claude turns do. +- Token counts, model, and duration are not yet extracted from codex's telemetry payload; a follow-up will surface these. +- Limited real-world testing compared to the Claude path. + +Env vars for tuning: `OPENAI_BASE_URL`, `CHATGPT_BASE_URL`, `CCXRAY_WS_IDLE_TIMEOUT_MS`, `CCXRAY_WS_MAX_QUEUE_BYTES`. Details in [CLAUDE.md](CLAUDE.md). + +File issues on [GitHub](https://github.com/lis186/ccxray/issues) — Beta means we want the rough edges reported. + ## Features ### Timeline diff --git a/server/config.js b/server/config.js index 839a05a..5c97cf9 100644 --- a/server/config.js +++ b/server/config.js @@ -154,10 +154,28 @@ function isChatGPTCodexPath(pathname) { || pathname.startsWith('/v1/codex/') || pathname === '/v1/plugins' || pathname.startsWith('/v1/plugins/') + || pathname === '/v1/ps/plugins' + || pathname.startsWith('/v1/ps/plugins/') || pathname === '/v1/connectors' || pathname.startsWith('/v1/connectors/'); } +// Codex 0.133+ hits a flurry of platform endpoints on startup (plugin lists, +// connector directory, app metadata, usage). They're not conversation data — +// just codex's internal RPC — and would otherwise create ~30 dashboard entries +// before the first user turn. We still proxy them; only entry creation is +// skipped. The telemetry endpoint (/v1/codex/analytics-events/events) is kept +// because a follow-up may parse turn metadata out of it. +function isCodexPlatformNoisePath(urlPath) { + const pathname = (urlPath || '').split('?')[0]; + if (pathname === '/v1/plugins' || pathname.startsWith('/v1/plugins/')) return true; + if (pathname === '/v1/ps/plugins' || pathname.startsWith('/v1/ps/plugins/')) return true; + if (pathname === '/v1/connectors' || pathname.startsWith('/v1/connectors/')) return true; + if (pathname === '/v1/api/codex/apps' || pathname.startsWith('/v1/api/codex/apps/')) return true; + if (pathname === '/v1/api/codex/usage' || pathname.startsWith('/v1/api/codex/usage/')) return true; + return false; +} + function getUpstreamForRequestAndHeaders(urlPath, headers = {}) { const pathname = (urlPath || '').split('?')[0]; if (isChatGPTCodexPath(pathname)) { @@ -322,5 +340,6 @@ module.exports = { getUpstream, getUpstreamForRequest, getUpstreamForRequestAndHeaders, + isCodexPlatformNoisePath, joinUpstreamPath, }; diff --git a/server/hub.js b/server/hub.js index 6d7bf53..c38c222 100644 --- a/server/hub.js +++ b/server/hub.js @@ -368,19 +368,21 @@ function getHubStatus() { // ── Hub route handler (mounted in server) ─────────────────────────── function handleHubRoutes(clientReq, clientRes) { - if (clientReq.url === '/_api/health' && clientReq.method === 'GET') { + const pathname = clientReq.url.split('?')[0]; + + if (pathname === '/_api/health' && clientReq.method === 'GET') { clientRes.writeHead(200, { 'Content-Type': 'application/json' }); clientRes.end(JSON.stringify({ ok: true })); return true; } - if (clientReq.url === '/_api/hub/status' && clientReq.method === 'GET') { + if (pathname === '/_api/hub/status' && clientReq.method === 'GET') { clientRes.writeHead(200, { 'Content-Type': 'application/json' }); clientRes.end(JSON.stringify(getHubStatus())); return true; } - if (clientReq.url === '/_api/hub/register' && clientReq.method === 'POST') { + if (pathname === '/_api/hub/register' && clientReq.method === 'POST') { let body = ''; clientReq.on('data', c => { body += c; }); clientReq.on('end', () => { @@ -398,7 +400,7 @@ function handleHubRoutes(clientReq, clientRes) { return true; } - if (clientReq.url === '/_api/hub/unregister' && clientReq.method === 'POST') { + if (pathname === '/_api/hub/unregister' && clientReq.method === 'POST') { let body = ''; clientReq.on('data', c => { body += c; }); clientReq.on('end', () => { diff --git a/server/index.js b/server/index.js index 3c7b6f6..6313059 100755 --- a/server/index.js +++ b/server/index.js @@ -18,7 +18,7 @@ const { authMiddleware } = require('./auth'); const { extractAgentType, extractPromptAgentType, splitB2IntoBlocks } = require('./system-prompt'); const { findSharedPrefix } = require('./delta-helpers'); const providers = require('./providers'); -const { handleWebSocketUpgrade } = require('./ws-proxy'); +const { handleWebSocketUpgrade, drainWebSocketProxy } = require('./ws-proxy'); const { getCodexRawSessionId, isOpenAISubagent, @@ -227,6 +227,17 @@ const server = http.createServer((clientReq, clientRes) => { return; } + // Codex platform RPC: forward but don't create dashboard entries. These + // are codex's internal startup polls (plugin lists, connectors, apps, + // usage) — not conversation data. Without this guard, codex 0.133+ + // pollutes the timeline with ~30 entries before the first user prompt. + if (config.isCodexPlatformNoisePath(clientReq.url)) { + const upstream = config.getUpstreamForRequestAndHeaders(clientReq.url, clientReq.headers); + const fwdHeaders = buildForwardHeaders(clientReq.headers, upstream); + forwardRequest({ id, ts, startTime, parsedBody, rawBody, clientReq, clientRes, fwdHeaders, reqSessionId: null, reqWritePromise: null, skipEntry: true, upstream }); + return; + } + const upstream = config.getUpstreamForRequestAndHeaders(clientReq.url, clientReq.headers); const provider = upstream.provider || 'anthropic'; if (provider === 'openai' && parsedBody) { @@ -447,10 +458,25 @@ function spawnAgent(command, port, args, onExit) { process.on('SIGTERM', () => child.kill('SIGTERM')); } +// Drain pending WS finalize promises and storage writes before process.exit. +// Without this, codex (or the user) killing the agent races with async +// fs.writeFile calls in ws-proxy/forward, leaving 0-byte _req.json/_res.json +// for the WS upgrade entry. Bounded by a 5s safety timeout so a stuck write +// can never block shutdown. +async function gracefulExit(code) { + const deadline = new Promise(resolve => setTimeout(resolve, 5000)); + const drain = (async () => { + try { await drainWebSocketProxy(); } catch (e) { console.error('WS drain failed:', e.message); } + try { await config.storage.drain(); } catch (e) { console.error('Storage drain failed:', e.message); } + })(); + await Promise.race([drain, deadline]); + process.exit(code); +} + function spawnStandaloneAgent(port, command, args) { spawnAgent(command, port, args, (code) => { server.close(); - process.exit(code); + gracefulExit(code); }); } @@ -665,7 +691,14 @@ async function startServer() { hub.setHubPort(actualPort); hub.writeHubLock(actualPort, process.pid); hub.startDeadClientCheck(); - const cleanup = () => { hub.deleteHubLock(); process.exit(0); }; + hub.setOnShutdown(() => gracefulExit(0)); + const cleanup = () => { hub.deleteHubLock(); gracefulExit(0); }; + process.on('SIGTERM', cleanup); + process.on('SIGINT', cleanup); + } else if (!agentMode) { + // Standalone mode (dashboard only, no agent): drain on signal so any WS + // entries flush. Agent mode handles signals via the child exit path. + const cleanup = () => gracefulExit(0); process.on('SIGTERM', cleanup); process.on('SIGINT', cleanup); } diff --git a/server/routes/api.js b/server/routes/api.js index f32f4ed..d985a40 100644 --- a/server/routes/api.js +++ b/server/routes/api.js @@ -34,7 +34,9 @@ function computeSettings() { } function handleApiRoutes(clientReq, clientRes) { - if (clientReq.url === '/_api/entries') { + const pathname = clientReq.url.split('?')[0]; + + if (pathname === '/_api/entries') { const entries = store.entries.map(summarizeEntry); const sessionTitles = Object.fromEntries( Object.entries(store.sessionMeta).filter(([, m]) => m.title).map(([sid, m]) => [sid, m.title]) @@ -45,7 +47,7 @@ function handleApiRoutes(clientReq, clientRes) { return true; } - if (clientReq.method === 'GET' && clientReq.url === '/_api/settings') { + if (clientReq.method === 'GET' && pathname === '/_api/settings') { const s = readSettings(); clientRes.writeHead(200, { 'Content-Type': 'application/json' }); clientRes.end(JSON.stringify({ ...computeSettings(), statusLine: s.statusLine })); @@ -113,7 +115,7 @@ function handleApiRoutes(clientReq, clientRes) { } // Full entry data (req + res) — lazy loaded - const entryMatch = clientReq.url.match(/^\/_api\/entry\/(.+)$/); + const entryMatch = pathname.match(/^\/_api\/entry\/(.+)$/); if (entryMatch) { const id = decodeURIComponent(entryMatch[1]); const entry = store.entries.find(e => e.id === id); @@ -132,7 +134,7 @@ function handleApiRoutes(clientReq, clientRes) { } // Lazy tokenization endpoint - const tokMatch = clientReq.url.match(/^\/_api\/tokens\/(.+)$/); + const tokMatch = pathname.match(/^\/_api\/tokens\/(.+)$/); if (tokMatch) { const id = decodeURIComponent(tokMatch[1]); const entry = store.entries.find(e => e.id === id); @@ -153,14 +155,14 @@ function handleApiRoutes(clientReq, clientRes) { return true; } - if (clientReq.method === 'GET' && clientReq.url === '/_api/stars') { + if (clientReq.method === 'GET' && pathname === '/_api/stars') { const s = readSettings(); clientRes.writeHead(200, { 'Content-Type': 'application/json' }); clientRes.end(JSON.stringify(serializeStars(s))); return true; } - if (clientReq.method === 'POST' && clientReq.url === '/_api/stars') { + if (clientReq.method === 'POST' && pathname === '/_api/stars') { let body = ''; clientReq.on('data', c => { body += c; }); clientReq.on('end', () => { @@ -200,7 +202,7 @@ function handleApiRoutes(clientReq, clientRes) { return true; } - if (clientReq.method === 'POST' && clientReq.url === '/_api/settings') + if (clientReq.method === 'POST' && pathname === '/_api/settings') { let body = ''; clientReq.on('data', c => { body += c; }); diff --git a/server/routes/costs.js b/server/routes/costs.js index 7384ae5..966dd12 100644 --- a/server/routes/costs.js +++ b/server/routes/costs.js @@ -16,7 +16,9 @@ function sendLoadingOrData(clientRes, dataFn) { } function handleCostRoutes(clientReq, clientRes) { - if (clientReq.url === '/_api/costs/current-block') { + const pathname = clientReq.url.split('?')[0]; + + if (pathname === '/_api/costs/current-block') { sendLoadingOrData(clientRes, data => { const now = Date.now(); const activeBlock = data.blocks.find(b => b.isActive); @@ -84,7 +86,7 @@ function handleCostRoutes(clientReq, clientRes) { return true; } - if (clientReq.url === '/_api/costs/daily') { + if (pathname === '/_api/costs/daily') { sendLoadingOrData(clientRes, data => { clientRes.writeHead(200, { 'Content-Type': 'application/json' }); clientRes.end(JSON.stringify(data.daily)); @@ -92,7 +94,7 @@ function handleCostRoutes(clientReq, clientRes) { return true; } - if (clientReq.url === '/_api/costs/monthly') { + if (pathname === '/_api/costs/monthly') { sendLoadingOrData(clientRes, data => { const now = new Date(); const currentMonth = `${now.getFullYear()}-${String(now.getMonth() + 1).padStart(2, '0')}`; @@ -103,7 +105,7 @@ function handleCostRoutes(clientReq, clientRes) { return true; } - if (clientReq.url === '/_api/pricing') { + if (pathname === '/_api/pricing') { const result = {}; for (const [model, rates] of Object.entries(pricingTable)) { result[model] = { diff --git a/server/routes/intercept.js b/server/routes/intercept.js index 201ef1e..8582762 100644 --- a/server/routes/intercept.js +++ b/server/routes/intercept.js @@ -6,7 +6,9 @@ const { broadcastInterceptToggle, broadcastInterceptRemoved, broadcastSessionSta const { forwardRequest } = require('../forward'); function handleInterceptRoutes(clientReq, clientRes) { - if (clientReq.url === '/_api/intercept/toggle' && clientReq.method === 'POST') { + const pathname = clientReq.url.split('?')[0]; + + if (pathname === '/_api/intercept/toggle' && clientReq.method === 'POST') { const chunks = []; clientReq.on('data', c => chunks.push(c)); clientReq.on('end', () => { try { @@ -23,7 +25,7 @@ function handleInterceptRoutes(clientReq, clientRes) { return true; } - const approveMatch = clientReq.url.match(/^\/_api\/intercept\/(.+)\/approve$/); + const approveMatch = pathname.match(/^\/_api\/intercept\/(.+)\/approve$/); if (approveMatch && clientReq.method === 'POST') { const reqId = decodeURIComponent(approveMatch[1]); const pending = store.pendingRequests.get(reqId); @@ -48,7 +50,7 @@ function handleInterceptRoutes(clientReq, clientRes) { return true; } - const rejectMatch = clientReq.url.match(/^\/_api\/intercept\/(.+)\/reject$/); + const rejectMatch = pathname.match(/^\/_api\/intercept\/(.+)\/reject$/); if (rejectMatch && clientReq.method === 'POST') { const reqId = decodeURIComponent(rejectMatch[1]); const pending = store.pendingRequests.get(reqId); @@ -70,7 +72,7 @@ function handleInterceptRoutes(clientReq, clientRes) { return true; } - if (clientReq.url === '/_api/intercept/timeout' && clientReq.method === 'POST') { + if (pathname === '/_api/intercept/timeout' && clientReq.method === 'POST') { const chunks = []; clientReq.on('data', c => chunks.push(c)); clientReq.on('end', () => { try { diff --git a/server/routes/sse.js b/server/routes/sse.js index 2f91f42..a969aa0 100644 --- a/server/routes/sse.js +++ b/server/routes/sse.js @@ -3,7 +3,8 @@ const store = require('../store'); function handleSSERoute(clientReq, clientRes) { - if (clientReq.url !== '/_events') return false; + const pathname = clientReq.url.split('?')[0]; + if (pathname !== '/_events') return false; clientRes.writeHead(200, { 'Content-Type': 'text/event-stream', diff --git a/server/storage/index.js b/server/storage/index.js index dc510e7..c94c31b 100644 --- a/server/storage/index.js +++ b/server/storage/index.js @@ -2,6 +2,32 @@ const { createLocalStorage } = require('./local'); +// Wraps a storage adapter so every async write is tracked in an in-flight Set. +// drain() awaits all pending writes — used on shutdown so process.exit doesn't +// kill the event loop while fs.writeFile is mid-flight, leaving 0-byte files. +// Loops until the set is empty in case a tracked promise spawns another write. +function withWriteTracking(adapter) { + const pending = new Set(); + const track = (promise) => { + pending.add(promise); + const cleanup = () => pending.delete(promise); + promise.then(cleanup, cleanup); + return promise; + }; + return { + ...adapter, + write: (id, suffix, data) => track(adapter.write(id, suffix, data)), + appendIndex: (line) => track(adapter.appendIndex(line)), + writeSharedIfAbsent: (filename, data) => track(adapter.writeSharedIfAbsent(filename, data)), + deleteFile: (filename) => track(adapter.deleteFile(filename)), + drain: async () => { + while (pending.size > 0) { + await Promise.allSettled([...pending]); + } + }, + }; +} + /** * Create the appropriate storage adapter based on STORAGE_BACKEND env var. * @@ -13,24 +39,28 @@ const { createLocalStorage } = require('./local'); function createStorage() { const backend = (process.env.STORAGE_BACKEND || 'local').toLowerCase(); + let adapter; switch (backend) { case 's3': { const { createS3Storage } = require('./s3'); - return createS3Storage({ + adapter = createS3Storage({ bucket: process.env.S3_BUCKET, region: process.env.S3_REGION || 'auto', endpoint: process.env.S3_ENDPOINT || undefined, prefix: process.env.S3_PREFIX || 'logs/', }); + break; } case 'local': default: { const path = require('path'); const os = require('os'); const logsDir = process.env.LOGS_DIR || path.join(process.env.CCXRAY_HOME || path.join(os.homedir(), '.ccxray'), 'logs'); - return createLocalStorage(logsDir); + adapter = createLocalStorage(logsDir); + break; } } + return withWriteTracking(adapter); } module.exports = { createStorage }; diff --git a/server/ws-proxy.js b/server/ws-proxy.js index e9b32c5..0bd7a6d 100644 --- a/server/ws-proxy.js +++ b/server/ws-proxy.js @@ -51,6 +51,15 @@ const MAX_QUEUE_BYTES = Number.isFinite(DEFAULT_MAX_QUEUE_BYTES) && DEFAULT_MAX_ const OPENAI_WS_PATHS = new Set(['/v1/responses', '/v1/realtime']); const WS_CLOSE_REASON_MAX_BYTES = 120; // WS spec caps reason at 123 bytes; leave headroom. +// Module-level state for graceful shutdown. `activeSessions` holds a +// forceFinalize fn per upgrade that hasn't been finalized yet — the agent +// dying can race the OS-driven WS close event, so on shutdown we force-finalize +// any straggler. `pendingEntries` tracks each finalize's recordWebSocketEntry +// promise so drainWebSocketProxy() can wait for the writes to be queued in +// storage before storage.drain() runs. +const activeSessions = new Set(); +const pendingEntries = new Set(); + function isUpgradeRequest(req) { return String(req.headers.upgrade || '').toLowerCase() === 'websocket'; } @@ -360,6 +369,8 @@ function handleWebSocketUpgrade(req, socket, head) { const upstreamBuffer = { queue: [], bufferedBytes: 0, maxBytes: MAX_QUEUE_BYTES }; let finalized = false; let idleTimer = null; + const session = { forceFinalize: null }; + activeSessions.add(session); function refreshIdleTimer() { clearTimeout(idleTimer); @@ -374,12 +385,22 @@ function handleWebSocketUpgrade(req, socket, head) { if (finalized) return; finalized = true; clearTimeout(idleTimer); + activeSessions.delete(session); store.activeRequests[sessionId] = Math.max(0, (store.activeRequests[sessionId] || 1) - 1); if (store.sessionMeta[sessionId]) store.sessionMeta[sessionId].lastStopReason = null; broadcastSessionStatus(sessionId); - recordWebSocketEntry(ctx, result).catch(e => console.error('Record ws entry failed:', e.message)); + const entryPromise = recordWebSocketEntry(ctx, result) + .catch(e => console.error('Record ws entry failed:', e.message)); + pendingEntries.add(entryPromise); + entryPromise.finally(() => pendingEntries.delete(entryPromise)); } + session.forceFinalize = () => { + if (finalized) return; + closeBoth(1001, 'proxy shutting down'); + finalize({ status: 499, error: { message: 'ccxray shutdown before WebSocket closed' } }); + }; + function closeBoth(code, reason) { const closeCode = normalizeCloseCode(code); const closeReason = clampWsReason(reason); @@ -466,9 +487,21 @@ function handleWebSocketUpgrade(req, socket, head) { return true; } +// Force-close any active WS sessions (so their finalize runs and +// recordWebSocketEntry kicks off writes), then await all in-flight +// recordWebSocketEntry promises. Storage.drain() should be called AFTER this +// so the writes recordWebSocketEntry queued are also flushed. +async function drainWebSocketProxy() { + for (const session of [...activeSessions]) { + try { session.forceFinalize(); } catch {} + } + await Promise.allSettled([...pendingEntries]); +} + module.exports = { handleWebSocketUpgrade, buildWebSocketHeaders, isOpenAIWebSocket, normalizeCloseCode, + drainWebSocketProxy, }; diff --git a/test/codex-noise-suppression.e2e.test.js b/test/codex-noise-suppression.e2e.test.js new file mode 100644 index 0000000..86b0f41 --- /dev/null +++ b/test/codex-noise-suppression.e2e.test.js @@ -0,0 +1,176 @@ +'use strict'; + +// Codex 0.133+ pings ~10 distinct platform endpoints on startup (plugin lists, +// connector directory, app metadata, usage). Without filtering, ccxray records +// each one as a dashboard entry — drowning the actual conversation in noise. +// This test fires those paths at a real proxy with a mock upstream and asserts +// the dashboard's /_api/entries stays empty, while the telemetry endpoint +// (kept visible for a future Bonus PR) and a Claude request still record normally. + +const { describe, it, after } = require('node:test'); +const assert = require('node:assert/strict'); +const fs = require('fs'); +const http = require('http'); +const os = require('os'); +const path = require('path'); +const { spawn } = require('child_process'); + +const SERVER_SCRIPT = path.join(__dirname, '..', 'server', 'index.js'); +const tmpDirs = []; + +function findFreePort() { + return new Promise(resolve => { + const s = http.createServer(); + s.listen(0, () => { + const port = s.address().port; + s.close(() => resolve(port)); + }); + }); +} + +function waitForPort(port, timeoutMs = 8000) { + return new Promise((resolve, reject) => { + const start = Date.now(); + const check = () => { + const req = http.get(`http://localhost:${port}/_api/health`, { timeout: 1000 }, res => { + res.resume(); + res.on('end', () => resolve()); + }); + req.on('error', () => { + if (Date.now() - start > timeoutMs) return reject(new Error('proxy did not start')); + setTimeout(check, 100); + }); + req.on('timeout', () => { + req.destroy(); + if (Date.now() - start > timeoutMs) return reject(new Error('proxy did not start')); + setTimeout(check, 100); + }); + }; + check(); + }); +} + +function killAndWait(child) { + return new Promise(resolve => { + if (!child || child.exitCode !== null) return resolve(); + child.on('exit', resolve); + child.kill('SIGTERM'); + setTimeout(() => { try { child.kill('SIGKILL'); } catch {} resolve(); }, 3000); + }); +} + +// Mock upstream that 404s everything (matches what chatgpt.com does for +// these endpoints in the real environment). +function makeMock404Upstream() { + return http.createServer((req, res) => { + res.writeHead(404, { 'content-type': 'application/json' }); + res.end(JSON.stringify({ error: { type: 'not_found', message: 'mock 404' } })); + }); +} + +function fireRequest(port, method, urlPath) { + return new Promise((resolve, reject) => { + const req = http.request({ + hostname: 'localhost', port, path: urlPath, method, + headers: { + 'x-api-key': 'sk-fake', + 'chatgpt-account-id': '11111111-2222-3333-4444-555555555555', + }, + }, res => { + const chunks = []; + res.on('data', c => chunks.push(c)); + res.on('end', () => resolve({ status: res.statusCode })); + }); + req.on('error', reject); + req.end(); + }); +} + +function fetchEntries(port) { + return new Promise((resolve, reject) => { + http.get(`http://localhost:${port}/_api/entries`, res => { + const chunks = []; + res.on('data', c => chunks.push(c)); + res.on('end', () => { + try { resolve(JSON.parse(Buffer.concat(chunks).toString())); } + catch (e) { reject(e); } + }); + }).on('error', reject); + }); +} + +describe('codex platform noise paths are suppressed from dashboard entries', () => { + after(() => { + for (const d of tmpDirs) { try { fs.rmSync(d, { recursive: true, force: true }); } catch {} } + }); + + it('proxies noise paths but creates zero dashboard entries; keeps telemetry visible', async () => { + const upstreamPort = await findFreePort(); + const proxyPort = await findFreePort(); + const home = fs.mkdtempSync(path.join(os.tmpdir(), 'ccxray-codex-noise-')); + tmpDirs.push(home); + + const upstream = makeMock404Upstream(); + await new Promise(resolve => upstream.listen(upstreamPort, '127.0.0.1', resolve)); + + // Route both anthropic and openai upstreams to the same mock 404 server. + // The classification fix moves /v1/ps/plugins from anthropic to openai; + // either way, the upstream just 404s. + const child = spawn(process.execPath, [SERVER_SCRIPT, '--port', String(proxyPort), '--no-browser'], { + env: { + ...process.env, + ANTHROPIC_TEST_HOST: '127.0.0.1', + ANTHROPIC_TEST_PORT: String(upstreamPort), + ANTHROPIC_TEST_PROTOCOL: 'http', + OPENAI_TEST_HOST: '127.0.0.1', + OPENAI_TEST_PORT: String(upstreamPort), + OPENAI_TEST_PROTOCOL: 'http', + CHATGPT_BASE_URL: `http://127.0.0.1:${upstreamPort}/backend-api/codex`, + CCXRAY_HOME: home, + BROWSER: 'none', + RESTORE_DAYS: '0', + }, + stdio: ['ignore', 'pipe', 'pipe'], + }); + + try { + await waitForPort(proxyPort); + + // Sampled from a real codex 0.133-alpha startup capture. + const noisePaths = [ + ['GET', '/v1/plugins/featured?platform=codex'], + ['GET', '/v1/plugins/list'], + ['GET', '/v1/ps/plugins/installed?scope=GLOBAL'], + ['GET', '/v1/ps/plugins/installed?scope=WORKSPACE&includeDownloadUrls=true'], + ['POST', '/v1/api/codex/apps'], + ['GET', '/v1/api/codex/usage'], + ['GET', '/v1/connectors/directory/list?external_logos=true'], + ]; + for (const [method, urlPath] of noisePaths) { + const out = await fireRequest(proxyPort, method, urlPath); + assert.equal(out.status, 404, `${method} ${urlPath} should be proxied (got ${out.status})`); + } + + // Give the proxy a moment to settle any async writes. + await new Promise(r => setTimeout(r, 200)); + + let entries = (await fetchEntries(proxyPort)).entries || []; + const noiseEntries = entries.filter(e => noisePaths.some(([, p]) => + e.url === p.split('?')[0] || e.url === p + )); + assert.equal(noiseEntries.length, 0, + `noise paths created ${noiseEntries.length} entries: ${JSON.stringify(noiseEntries.map(e => e.url))}`); + + // Sanity: the telemetry endpoint is NOT in the noise list, so a request + // to it SHOULD record an entry (so a future Bonus PR can parse it). + await fireRequest(proxyPort, 'POST', '/v1/codex/analytics-events/events'); + await new Promise(r => setTimeout(r, 200)); + entries = (await fetchEntries(proxyPort)).entries || []; + const telemetry = entries.find(e => e.url === '/v1/codex/analytics-events/events'); + assert.ok(telemetry, 'telemetry endpoint should still record an entry'); + } finally { + await killAndWait(child); + upstream.close(); + } + }); +}); diff --git a/test/config.test.js b/test/config.test.js index fee5daa..53a96fb 100644 --- a/test/config.test.js +++ b/test/config.test.js @@ -446,6 +446,35 @@ describe('self-loop detection', () => { }); }); +// Codex 0.133+ hits /v1/plugins, /v1/ps/plugins, /v1/connectors, /v1/api/codex/apps, +// /v1/api/codex/usage on startup. We still proxy them (so codex doesn't break), +// but they shouldn't be tagged 'anthropic' (classification bug) and shouldn't +// create dashboard entries (noise bug). The telemetry endpoint stays visible. +describe('codex platform noise routing and predicate', () => { + const { isCodexPlatformNoisePath, getProviderForRequest } = require('../server/config'); + + it('classifies /v1/ps/plugins/* as openai (was falling through to anthropic)', () => { + assert.equal(getProviderForRequest('/v1/ps/plugins/installed?scope=GLOBAL'), 'openai'); + assert.equal(getProviderForRequest('/v1/ps/plugins/installed?scope=WORKSPACE&includeDownloadUrls=true'), 'openai'); + assert.equal(getProviderForRequest('/v1/ps/plugins'), 'openai'); + }); + + it('marks codex startup polls as noise', () => { + assert.equal(isCodexPlatformNoisePath('/v1/plugins/featured?platform=codex'), true); + assert.equal(isCodexPlatformNoisePath('/v1/plugins/list'), true); + assert.equal(isCodexPlatformNoisePath('/v1/ps/plugins/installed?scope=GLOBAL'), true); + assert.equal(isCodexPlatformNoisePath('/v1/api/codex/apps'), true); + assert.equal(isCodexPlatformNoisePath('/v1/api/codex/usage'), true); + assert.equal(isCodexPlatformNoisePath('/v1/connectors/directory/list?external_logos=true'), true); + }); + + it('keeps the conversation and telemetry paths visible', () => { + assert.equal(isCodexPlatformNoisePath('/v1/responses'), false); + assert.equal(isCodexPlatformNoisePath('/v1/codex/analytics-events/events'), false); + assert.equal(isCodexPlatformNoisePath('/v1/messages'), false); + }); +}); + describe('model context fallback', () => { it('uses fallback context for Codex OpenAI models when dynamic pricing data is unavailable', () => { const { getMaxContext } = require('../server/config'); diff --git a/test/route-query-string.test.js b/test/route-query-string.test.js new file mode 100644 index 0000000..f4807e7 --- /dev/null +++ b/test/route-query-string.test.js @@ -0,0 +1,86 @@ +'use strict'; + +const { describe, it, beforeEach } = require('node:test'); +const assert = require('node:assert/strict'); + +const store = require('../server/store'); +const { handleApiRoutes } = require('../server/routes/api'); + +// Regression for the query-string-bypass bug class: when AUTH_TOKEN mode is on, +// the dashboard hits /_api/?token=... — every route matcher must compare +// against the pathname, not the raw URL, or auth-mode requests fall through to +// the upstream proxy (404 + dashboard pollution) or fail to look up records. + +function fakeRes() { + let status = 0; + let body = null; + return { + headersSent: false, + status: () => status, + body: () => body, + writeHead: (s) => { status = s; }, + end: (data) => { body = data; }, + }; +} + +function reset() { + store.entries.length = 0; + for (const k of Object.keys(store.sessionMeta)) delete store.sessionMeta[k]; +} + +describe('route matchers tolerate query strings (e.g. ?token=… auth)', () => { + beforeEach(reset); + + it('/_api/entries?limit=3 returns 200 with entries (not 404 fallthrough)', () => { + const res = fakeRes(); + const handled = handleApiRoutes({ url: '/_api/entries?limit=3', method: 'GET' }, res); + assert.equal(handled, true); + assert.equal(res.status(), 200); + const payload = JSON.parse(res.body()); + assert.ok(Array.isArray(payload.entries)); + }); + + it('/_api/entry/?token=x finds the entry (regex must use pathname)', async () => { + const id = '2026-05-23T09-33-05-486'; + store.entries.push({ + id, + sessionId: 'sid-x', + provider: 'openai', + _loaded: true, + req: { foo: 'bar' }, + res: { ok: true }, + receivedAt: 1700000000000, + toolSources: null, + }); + + const res = fakeRes(); + const handled = handleApiRoutes( + { url: `/_api/entry/${id}?token=secret`, method: 'GET' }, + res, + ); + assert.equal(handled, true); + // Async body resolution — wait a tick for the inner async IIFE. + await new Promise(r => setImmediate(r)); + assert.equal(res.status(), 200, 'should not 404 when query string is present'); + const payload = JSON.parse(res.body()); + assert.deepEqual(payload.req, { foo: 'bar' }); + }); + + it('/_api/tokens/?token=x finds the entry (regex must use pathname)', async () => { + const id = '2026-05-23T09-33-05-486'; + store.entries.push({ + id, + sessionId: 'sid-y', + tokens: { contextBreakdown: { loadedSkills: [] } }, + }); + + const res = fakeRes(); + const handled = handleApiRoutes( + { url: `/_api/tokens/${id}?token=secret`, method: 'GET' }, + res, + ); + assert.equal(handled, true); + await new Promise(r => setImmediate(r)); + assert.equal(res.status(), 200, 'should not 404 when query string is present'); + }); +}); diff --git a/test/ws-shutdown-drain.e2e.test.js b/test/ws-shutdown-drain.e2e.test.js new file mode 100644 index 0000000..1af3495 --- /dev/null +++ b/test/ws-shutdown-drain.e2e.test.js @@ -0,0 +1,185 @@ +'use strict'; + +// Bug 1 from the codex Beta-readiness handoff: when ccxray exits while a WS +// upgrade is still being recorded, the storage writes for the WS entry race +// process.exit() and get truncated to 0-byte files. This test reproduces the +// race by spawning the proxy, opening a WS upgrade through it, then SIGTERMing +// the proxy. After exit, both _req.json and _res.json for the WS entry must +// contain valid JSON with the expected transport-only metadata. + +const { describe, it, after } = require('node:test'); +const assert = require('node:assert/strict'); +const fs = require('fs'); +const http = require('http'); +const os = require('os'); +const path = require('path'); +const { spawn } = require('child_process'); +const WebSocket = require('ws'); + +const SERVER_SCRIPT = path.join(__dirname, '..', 'server', 'index.js'); +const tmpDirs = []; + +function findFreePort() { + return new Promise(resolve => { + const s = http.createServer(); + s.listen(0, () => { + const port = s.address().port; + s.close(() => resolve(port)); + }); + }); +} + +function waitForPort(port, timeoutMs = 8000) { + return new Promise((resolve, reject) => { + const start = Date.now(); + const check = () => { + const req = http.get(`http://localhost:${port}/_api/health`, { timeout: 1000 }, res => { + res.resume(); + res.on('end', () => resolve()); + }); + req.on('error', () => { + if (Date.now() - start > timeoutMs) return reject(new Error('proxy did not start')); + setTimeout(check, 100); + }); + req.on('timeout', () => { + req.destroy(); + if (Date.now() - start > timeoutMs) return reject(new Error('proxy did not start')); + setTimeout(check, 100); + }); + }; + check(); + }); +} + +function waitForExit(child, timeoutMs = 5000) { + return new Promise((resolve, reject) => { + if (child.exitCode !== null) return resolve(child.exitCode); + const t = setTimeout(() => reject(new Error(`process did not exit within ${timeoutMs}ms`)), timeoutMs); + child.once('exit', code => { clearTimeout(t); resolve(code); }); + }); +} + +// Mock OpenAI/ChatGPT WS upstream that accepts the upgrade, sends one frame, +// and stays open until the proxy or client closes the socket. +function makeWsUpstream(port) { + return new Promise(resolve => { + const httpServer = http.createServer(); + const wss = new WebSocket.Server({ noServer: true }); + httpServer.on('upgrade', (req, socket, head) => { + wss.handleUpgrade(req, socket, head, ws => { + try { ws.send('hello-from-upstream'); } catch {} + }); + }); + httpServer.listen(port, '127.0.0.1', () => resolve(httpServer)); + }); +} + +describe('Bug 1: ccxray drains WS storage writes before process exit', () => { + after(() => { + for (const d of tmpDirs) { try { fs.rmSync(d, { recursive: true, force: true }); } catch {} } + }); + + it('produces non-empty WS _req.json and _res.json after proxy is SIGTERMed mid-session', async () => { + const upstreamPort = await findFreePort(); + const proxyPort = await findFreePort(); + const home = fs.mkdtempSync(path.join(os.tmpdir(), 'ccxray-ws-drain-')); + tmpDirs.push(home); + + const upstream = await makeWsUpstream(upstreamPort); + + const child = spawn(process.execPath, [SERVER_SCRIPT, '--port', String(proxyPort), '--no-browser'], { + env: { + ...process.env, + OPENAI_BASE_URL: `http://127.0.0.1:${upstreamPort}/v1`, + CHATGPT_BASE_URL: `http://127.0.0.1:${upstreamPort}/backend-api/codex`, + CCXRAY_HOME: home, + BROWSER: 'none', + RESTORE_DAYS: '0', + }, + stdio: ['ignore', 'pipe', 'pipe'], + }); + + let stderr = ''; + child.stderr.on('data', d => { stderr += d.toString(); }); + + try { + await waitForPort(proxyPort); + + // Open a WS upgrade through ccxray. Codex sends openai-beta + + // chatgpt-account-id to trigger the ChatGPT-auth routing path. + const ws = new WebSocket(`ws://localhost:${proxyPort}/v1/responses`, { + headers: { + 'openai-beta': 'responses_websockets=2026-02-06', + 'chatgpt-account-id': '11111111-2222-3333-4444-555555555555', + 'session_id': 'shutdown-race-session', + }, + }); + + await new Promise((resolve, reject) => { + const t = setTimeout(() => reject(new Error('WS open timeout')), 4000); + ws.once('open', () => { clearTimeout(t); resolve(); }); + ws.once('error', e => { clearTimeout(t); reject(e); }); + }); + + // Push one frame so byte counts are non-zero — same shape as a real + // codex turn would generate before being interrupted. + ws.send('shutdown-race-payload'); + await new Promise(r => setTimeout(r, 150)); + + // SIGTERM the proxy while the WS is still open. This is the race: the + // WS close fires inside the proxy, queueing async storage writes; without + // gracefulExit, process.exit beats fs.writeFile to the disk. + child.kill('SIGTERM'); + + // Detach our WS so it doesn't hold the test process open. + try { ws.terminate(); } catch {} + + const code = await waitForExit(child); + assert.equal(code, 0, `proxy should exit cleanly (got ${code}); stderr: ${stderr.slice(-500)}`); + + // The WS entry's id is timestamp-based, so we just scan the logs dir. + const logsDir = path.join(home, 'logs'); + const files = fs.readdirSync(logsDir).filter(f => f.endsWith('_res.json')); + assert.ok(files.length >= 1, `expected at least one _res.json, found: ${files.join(', ')}`); + + // Find the WS entry (transport: websocket) — there should be exactly one. + let wsResFile = null; + let wsResPayload = null; + for (const f of files) { + const text = fs.readFileSync(path.join(logsDir, f), 'utf8'); + assert.ok(text.length > 0, `${f} must not be 0-byte after gracefulExit`); + let parsed; + try { parsed = JSON.parse(text); } + catch (e) { throw new Error(`${f} is not valid JSON: ${text.slice(0, 80)}`); } + if (parsed && parsed.transport === 'websocket') { + wsResFile = f; + wsResPayload = parsed; + break; + } + } + + assert.ok(wsResFile, `WS res.json not found among: ${files.join(', ')}`); + assert.equal(wsResPayload.capture, 'transport-only'); + assert.ok(wsResPayload.frameCounts, 'frameCounts must be present'); + assert.ok(wsResPayload.byteCounts, 'byteCounts must be present'); + assert.ok(wsResPayload.byteCounts.clientToUpstream > 0, + `clientToUpstream bytes should reflect the sent frame, got ${wsResPayload.byteCounts.clientToUpstream}`); + + // The matching _req.json must also exist and be parseable. + const reqFile = wsResFile.replace('_res.json', '_req.json'); + const reqText = fs.readFileSync(path.join(logsDir, reqFile), 'utf8'); + assert.ok(reqText.length > 0, `${reqFile} must not be 0-byte`); + const reqParsed = JSON.parse(reqText); + assert.equal(reqParsed.transport, 'websocket'); + assert.equal(reqParsed.endpoint, '/v1/responses'); + + // The index.ndjson line for the WS entry must also be present. + const indexLines = fs.readFileSync(path.join(logsDir, 'index.ndjson'), 'utf8').trim().split('\n'); + const wsIndexLine = indexLines.find(l => l.includes('"transport":"websocket"')); + assert.ok(wsIndexLine, 'index.ndjson must contain the WS entry'); + } finally { + if (child.exitCode === null) { try { child.kill('SIGKILL'); } catch {} } + upstream.close(); + } + }); +});