diff --git a/.gitignore b/.gitignore index 8d6e7a4..a13ecbf 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,4 @@ skills-lock.json videos/ ccxray-init.png docs/src/ +.tmp/ diff --git a/AGENTS.md b/AGENTS.md index 11d9145..f79c17a 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -2,117 +2,15 @@ Guidance for Codex when working with this repository. -## What is ccxray +## Source of Truth -A transparent HTTP proxy that sits between Codex and the Anthropic API. It records every request/response, serves a real-time Miller-column dashboard at the same port, and supports request interception/editing. Zero config, zero dependencies beyond Node.js. +Use `CLAUDE.md` as the canonical repository instructions file. -## Commands +Codex should read and follow `CLAUDE.md` for: +- project overview +- commands +- architecture notes +- launcher behavior +- data flow and storage details -```bash -ccxray # Proxy + dashboard (set ANTHROPIC_BASE_URL=http://localhost:5577 before running Codex) -ccxray --port 8080 # Custom port -ccxray status # Show hub info and connected clients -npm run dev # Dev mode (auto-restart on server/public changes) -npm test # Run tests -``` - -No build step. No linting. Restart to apply changes. - -> **Note for Codex**: ccxray proxies the Anthropic API. It does not spawn Codex itself. Start ccxray first, then ensure `ANTHROPIC_BASE_URL=http://localhost:5577` is set in the environment where Codex runs. - -## Architecture - -### Server (`server/`) - -| Module | Purpose | -|--------|---------| -| `server/index.js` | Entry point: HTTP server, request routing, startup | -| `server/config.js` | PORT, ANTHROPIC_HOST/PORT/PROTOCOL, LOGS_DIR, MAX_ENTRIES, model context windows | -| `server/pricing.js` | LiteLLM price fetch, 24h cache, fallback rates, cost calculation | -| `server/cost-budget.js` | Cost data orchestration: cache, warm-up, grouping | -| `server/cost-worker.js` | Child process: scans `~/.claude/projects` and `~/.config/claude/projects` JSONL files without blocking event loop | -| `server/store.js` | In-memory state: entries[] (capped at MAX_ENTRIES), sseClients[], sessions, intercept, versionIndex (keyed by `agentKey::coreHash`). Session detection with subagent inference (inflight + temporal heuristic) | -| `server/sse-broadcast.js` | SSE broadcast to dashboard clients, entry summarization | -| `server/helpers.js` | Tokenization, context breakdown, SSE parsing, formatting | -| `server/system-prompt.js` | KNOWN_AGENTS registry, agent type detection, B2 block splitting, unified diff | -| `server/restore.js` | Startup log restoration, lazy-load req/res from disk, delta chain reconstruction | -| `server/forward.js` | HTTP/HTTPS proxy to Anthropic, SSE capture, response logging, proxyRes error handling | -| `server/routes/api.js` | REST endpoints for entries, tokens, system prompt | -| `server/routes/sse.js` | SSE endpoint | -| `server/routes/intercept.js` | Intercept toggle/approve/reject/timeout | -| `server/routes/costs.js` | Cost budget endpoints | -| `server/hub.js` | Multi-project hub: lockfile (`~/.ccxray/hub.json`), discovery (with orphan port probe fallback), client registration, idle shutdown (injectable via setOnShutdown), crash auto-recovery | -| `server/auth.js` | API key auth middleware (enabled via `AUTH_TOKEN` env) | -| `server/storage/` | Storage adapters (local filesystem, S3/R2). `statShared()` for file mtime. `supportsDelta` flag gates delta-write eligibility | - -### Client (`public/`) - -| File | Purpose | -|------|---------| -| `public/index.html` | Dashboard shell | -| `public/style.css` | Dark theme, Miller column layout | -| `public/app.js` | App initialization | -| `public/miller-columns.js` | Projects → Sessions → Turns → Sections → Timeline → Detail | -| `public/entry-rendering.js` | Turn rendering, session/project tracking | -| `public/messages.js` | Merged steps: thinking + tool groups, timeline detail, minimap rendering + layout | -| `public/cost-budget-ui.js` | Cost analysis page, heatmap, burn rate | -| `public/intercept-ui.js` | Pause/edit/approve/reject requests | -| `public/system-prompt-ui.js` | Multi-agent browsing (3-column Miller), version history, unified diffs | -| `public/keyboard-nav.js` | Arrow keys, Enter, Escape | -| `public/quota-ticker.js` | Topbar quota ticker | - -### Hub Mode (multi-project) - -Hub mode is activated by `ccxray claude` (spawns Claude Code as a child). When using Codex, run `ccxray` directly and share the hub by pointing all Codex instances at the same `ANTHROPIC_BASE_URL`. - -``` -ccxray claude (1st) → fork detached hub → connect as client → spawn claude -ccxray claude (2nd) → discover hub via ~/.ccxray/hub.json → connect as client → spawn claude - ↓ - Hub (detached process) - ├── HTTP proxy on :5577 - ├── Dashboard (same port) - ├── Client registry (register/unregister/health) - └── Idle shutdown (5s after last client exits) -``` - -- Hub lockfile: `~/.ccxray/hub.json` (written after `listen()` succeeds = readiness signal) -- Hub log: `~/.ccxray/hub.log` (stdout/stderr of detached process) -- `--port` opts out of hub mode entirely (independent server) -- Crash recovery: clients monitor hub pid every 5s, auto-fork new hub using port as mutex -- Version check: semver major mismatch → reject, minor → warn, patch → silent - -### Data Flow - -``` -Codex → proxy receives request → detect session (explicit or inferred) - → [intercept check] → log {id}_req.json → forward to Anthropic - → capture SSE response → log {id}_res.json → calculate cost - → broadcast via SSE (includes sessionInferred flag) → dashboard updates -``` - -Logs stored in `~/.ccxray/logs/` (not package-relative). Respects `CCXRAY_HOME` env var. - -### Delta Log Storage - -Each `_req.json` normally stores the full `messages` array. For long sessions this wastes 85–90% of disk space (each turn re-stores the entire conversation history). Delta storage writes only new messages and a pointer to the previous turn. - -**Format** (delta turn): -```json -{ "model": "...", "max_tokens": 8096, "prevId": "2026-05-01T11-47-17-808", "msgOffset": 18, - "messages": [ /* only messages[18..] */ ], "sysHash": "...", "toolsHash": "..." } -``` - -**Format** (full / anchor turn): -```json -{ "model": "...", "max_tokens": 8096, "messages": [ /* all */ ], "sysHash": "...", "toolsHash": "..." } -``` - -Rules: -- Delta only applies to sessions with an explicit `session_id` (main orchestrator turns). Subagents and inferred sessions always write full format. -- First turn of a session = always full (chain anchor). -- Compaction (messages shrinks) = always full (resets chain). -- `supportsDelta: false` on the storage adapter (e.g. S3) disables delta entirely. -- `CCXRAY_DELTA_SNAPSHOT_N=N` forces a full snapshot every N delta writes (default `0` = only session-start anchor). Use `5` for S3-backed setups. - -**Read side**: `loadEntryReqRes` detects `prevId`, recursively loads the chain, and splices `prevMessages[0..msgOffset]` + delta messages. Results are cached in memory (per entry). If `prevId` entry has been pruned, gracefully degrades to showing only the delta portion. +Keep this file intentionally small so Codex and Claude do not drift. When repository instructions change, update `CLAUDE.md` first and only adjust this routing note if the routing policy itself changes. diff --git a/CLAUDE.md b/CLAUDE.md index 59f2b57..d4a55a5 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -43,6 +43,8 @@ No build step. No linting. Restart to apply changes. | `server/routes/costs.js` | Cost budget endpoints | | `server/hub.js` | Multi-project hub: lockfile (`~/.ccxray/hub.json`), discovery (with orphan port probe fallback), client registration, idle shutdown (injectable via setOnShutdown), crash auto-recovery | | `server/auth.js` | API key auth middleware (enabled via `AUTH_TOKEN` env) | +| `server/openai-session.js` | Shared OpenAI/Codex header + session helpers (session id extraction, agent type, turn-metadata sidecar) | +| `server/ws-proxy.js` | OpenAI WebSocket transport proxy for `/v1/responses` and `/v1/realtime` upgrades. Tunables: `CCXRAY_WS_IDLE_TIMEOUT_MS` (default 60s), `CCXRAY_WS_MAX_QUEUE_BYTES` (default 4 MiB; caps client→upstream buffer while upstream is connecting) | | `server/storage/` | Storage adapters (local filesystem, S3/R2). `statShared()` for file mtime. `supportsDelta` flag gates delta-write eligibility | ### Client (`public/`) @@ -84,9 +86,10 @@ ccxray claude (2nd) → discover hub via ~/.ccxray/hub.json → connect as clie - Launchers are registered in `server/providers.js`. Add future providers there with one entry for command name, display name, upstream family, launch args/env, and install hint; avoid adding new `if provider` branches in `server/index.js`. - Claude mode sets `ANTHROPIC_BASE_URL=http://localhost:` in the spawned Claude process. -- Codex mode spawns `codex -c 'openai_base_url="http://localhost:/v1"' ...args` and logs raw OpenAI Responses request/response JSON. +- Codex mode spawns `codex -c 'openai_base_url="http://localhost:/v1"' -c 'chatgpt_base_url="http://localhost:/v1"' ...args`, covering both API-key and ChatGPT-auth Codex transports. - Extra user args pass through unchanged after ccxray's injected launcher config. - `--no-browser` only suppresses browser auto-open. The dashboard remains available on the proxy port. +- Codex's main session traffic upgrades to a WebSocket on `POST /v1/responses` (with `openai-beta: responses_websockets=*`), not `/v1/realtime`. `/v1/realtime` exists for the older Realtime API but is not what current codex uses for normal `/goal` / chat turns. When ChatGPT auth is active, codex also sends `chatgpt-account-id`, which `getUpstreamForRequestAndHeaders` (see `server/config.js`) uses to route to `CHATGPT_BASE_URL` instead of `OPENAI_BASE_URL`. ### Data Flow diff --git a/server/config.js b/server/config.js index 901ad27..839a05a 100644 --- a/server/config.js +++ b/server/config.js @@ -137,7 +137,9 @@ function getUpstream(provider) { function getProviderForRequest(urlPath) { const pathname = (urlPath || '').split('?')[0]; if (pathname === '/v1/responses' || pathname.startsWith('/v1/responses/')) return 'openai'; + if (pathname === '/v1/realtime' || pathname.startsWith('/v1/realtime/')) return 'openai'; if (pathname === '/v1/models' || pathname.startsWith('/v1/models/')) return 'openai'; + if (isChatGPTCodexPath(pathname)) return 'openai'; return 'anthropic'; } @@ -145,7 +147,22 @@ function getUpstreamForRequest(urlPath) { return getUpstream(getProviderForRequest(urlPath)); } +function isChatGPTCodexPath(pathname) { + return pathname === '/v1/api/codex' + || pathname.startsWith('/v1/api/codex/') + || pathname === '/v1/codex' + || pathname.startsWith('/v1/codex/') + || pathname === '/v1/plugins' + || pathname.startsWith('/v1/plugins/') + || pathname === '/v1/connectors' + || pathname.startsWith('/v1/connectors/'); +} + function getUpstreamForRequestAndHeaders(urlPath, headers = {}) { + const pathname = (urlPath || '').split('?')[0]; + if (isChatGPTCodexPath(pathname)) { + return UPSTREAMS.openaiChatGPT; + } const upstream = getUpstreamForRequest(urlPath); if (upstream.provider === 'openai' && headers['chatgpt-account-id']) { return UPSTREAMS.openaiChatGPT; diff --git a/server/forward.js b/server/forward.js index e1449cb..a16e3a1 100644 --- a/server/forward.js +++ b/server/forward.js @@ -10,6 +10,7 @@ const helpers = require('./helpers'); const { broadcast, broadcastSessionStatus, broadcastSessionTitleUpdate } = require('./sse-broadcast'); const { appendSample, collectRatelimitHeaders } = require('./ratelimit-log'); const hub = require('./hub'); +const { stripAuthParams } = require('./url-sanitize'); // For title-generator subagent responses, extract the clean title from the // JSON payload and (when attribution succeeds) stamp it onto the parent @@ -332,7 +333,7 @@ function forwardRequest(ctx) { reqId: id, }); helpers.printSeparator(); - console.log(`\x1b[36m📤 [${ts}] ${ctx.attribPrefix} ${clientReq.method} ${clientReq.url}\x1b[0m`); + console.log(`\x1b[36m📤 [${ts}] ${ctx.attribPrefix} ${clientReq.method} ${stripAuthParams(clientReq.url)}\x1b[0m`); console.log(helpers.summarizeRequest(parsedBody)); } @@ -345,7 +346,7 @@ function forwardRequest(ctx) { const tunnelAgent = getTunnelAgent(upstream); const proxyReq = transport.request({ hostname: upstream.host, port: upstream.port, - path: config.joinUpstreamPath(upstream, clientReq.url), method: clientReq.method, + path: config.joinUpstreamPath(upstream, stripAuthParams(clientReq.url)), method: clientReq.method, headers: { ...fwdHeaders, 'content-length': bodyToSend.length }, ...(tunnelAgent ? { agent: tunnelAgent } : {}), }, (proxyRes) => { @@ -581,7 +582,7 @@ function handleSSEResponse(ctx, proxyRes, clientRes) { const currMsgCount = parsedBody?.messages?.length || 0; const thinkingStripped = computeThinkingStripped(isSubagent, reqSessionId, currMsgCount, parsedBody); const entry = { - id, ts: ctx.ts, sessionId, method: ctx.clientReq.method, url: ctx.clientReq.url, + id, ts: ctx.ts, sessionId, method: ctx.clientReq.method, url: stripAuthParams(ctx.clientReq.url), provider: 'anthropic', agent: 'claude', req: parsedBody, res: events, @@ -714,7 +715,7 @@ function handleOpenAISSE(ctx, proxyRes, clientRes) { const usage = response?.usage || null; const entry = { - id, ts: ctx.ts, sessionId: reqSessionId, method: ctx.clientReq.method, url: ctx.clientReq.url, + id, ts: ctx.ts, sessionId: reqSessionId, method: ctx.clientReq.method, url: stripAuthParams(ctx.clientReq.url), provider: 'openai', agent: 'codex', req: parsedBody, res: events, @@ -856,7 +857,7 @@ function handleNonSSEResponse(ctx, proxyRes, clientRes) { const responseMetadata = buildResponseMetadata(provider, openAIResponse || resData, proxyRes); if (openAIEvents) responseMetadata.streaming = true; const entry = { - id, ts: ctx.ts, sessionId, method: ctx.clientReq.method, url: ctx.clientReq.url, + id, ts: ctx.ts, sessionId, method: ctx.clientReq.method, url: stripAuthParams(ctx.clientReq.url), provider, agent: provider === 'openai' ? 'codex' : 'claude', req: parsedBody, res: resData, diff --git a/server/index.js b/server/index.js index c47ce40..3c7b6f6 100755 --- a/server/index.js +++ b/server/index.js @@ -18,6 +18,13 @@ const { authMiddleware } = require('./auth'); const { extractAgentType, extractPromptAgentType, splitB2IntoBlocks } = require('./system-prompt'); const { findSharedPrefix } = require('./delta-helpers'); const providers = require('./providers'); +const { handleWebSocketUpgrade } = require('./ws-proxy'); +const { + getCodexRawSessionId, + isOpenAISubagent, + detectOpenAISession, + withCodexMetadata, +} = require('./openai-session'); // ── CLI: parse flags and detect provider launchers ── const portIdx = process.argv.indexOf('--port'); @@ -135,53 +142,6 @@ function buildForwardHeaders(clientHeaders, upstream) { return fwdHeaders; } -function getCodexRawSessionId() { - return 'codex-raw'; -} - -function firstHeader(headers, name) { - const value = headers?.[name.toLowerCase()]; - return Array.isArray(value) ? value[0] : value; -} - -function getCodexSessionId(headers, parsedBody) { - const direct = firstHeader(headers, 'session_id') || firstHeader(headers, 'x-openai-session-id'); - if (direct) return String(direct); - return parsedBody?.metadata?.session_id || null; -} - -function isOpenAISubagent(headers, parsedBody) { - const raw = firstHeader(headers, 'x-openai-subagent'); - if (raw != null) { - const text = String(raw).toLowerCase(); - return text !== '0' && text !== 'false' && text !== 'no'; - } - return Boolean(parsedBody?.metadata?.is_subagent || parsedBody?.metadata?.isSubagent); -} - -function getOpenAIAgentTypeFromHeaders(headers) { - const subagent = firstHeader(headers, 'x-openai-subagent'); - const direct = firstHeader(headers, 'x-openai-agent-type') || firstHeader(headers, 'x-codex-agent-type'); - const value = direct || subagent; - if (!value) return null; - const normalized = String(value).toLowerCase(); - if (normalized === 'explorer' || normalized === 'worker' || normalized === 'default') return normalized; - return null; -} - -function detectOpenAISession(headers, parsedBody) { - if (!parsedBody) return { sessionId: getCodexRawSessionId(), isNewSession: false, inferred: true }; - if (!getCodexSessionId(headers, parsedBody)) { - return { sessionId: getCodexRawSessionId(), isNewSession: false, inferred: true }; - } - const detected = store.detectSession(parsedBody); - return { - sessionId: detected.sessionId || getCodexRawSessionId(), - isNewSession: detected.isNewSession || false, - inferred: detected.inferred || false, - }; -} - function getCodexCwdFallback() { return hub.lookupClientCwd() || (agentCommand === 'codex' ? process.cwd() : null); } @@ -190,19 +150,6 @@ function getOpenAICwd(parsedBody) { return parsedBody?.metadata?.cwd || getCodexCwdFallback(); } -function withCodexMetadata(parsedBody, headers) { - if (!parsedBody || typeof parsedBody !== 'object') return parsedBody; - const sessionId = getCodexSessionId(headers, parsedBody); - const agentType = getOpenAIAgentTypeFromHeaders(headers); - if (!sessionId && !agentType) return parsedBody; - const metadata = parsedBody.metadata && typeof parsedBody.metadata === 'object' - ? { ...parsedBody.metadata } - : {}; - if (sessionId && !metadata.session_id) metadata.session_id = sessionId; - if (agentType && !metadata.agent_type) metadata.agent_type = agentType; - return { ...parsedBody, metadata }; -} - function registerPromptVersion({ provider, parsedBody, sharedFile, promptText, firstSeen, notify = true }) { if (!promptText) return null; const { key: agentKey, label: agentLabel } = extractPromptAgentType(provider, parsedBody); @@ -317,6 +264,12 @@ const server = http.createServer((clientReq, clientRes) => { sharedFile: `openai_instructions_${sysHash}.json`, promptText: parsedBody.instructions, }) : null; + if (promptInfo) { + config.storage.writeSharedIfAbsent(`openai_prompt_meta_${sysHash}.json`, JSON.stringify({ + agentKey: promptInfo.agentKey, + agentLabel: promptInfo.agentLabel, + })).catch(e => console.error('Write OpenAI prompt metadata failed:', e.message)); + } coreHash = promptInfo?.coreHash || null; } if (toolsHash) config.storage.writeSharedIfAbsent(`openai_tools_${toolsHash}.json`, JSON.stringify(parsedBody.tools)) @@ -452,6 +405,10 @@ const server = http.createServer((clientReq, clientRes) => { }); }); +server.on('upgrade', (req, socket, head) => { + handleWebSocketUpgrade(req, socket, head); +}); + // ── Spawn agent CLI with proxy routing ── function spawnAgent(command, port, args, onExit) { @@ -646,14 +603,25 @@ async function startServer() { if (!allowUpstreamLoop) { const localHosts = new Set(['localhost', '127.0.0.1', '::1']); const upstreamFamily = providers.getAgentProvider(agentCommand)?.upstream ?? 'anthropic'; - const upstream = config.UPSTREAMS[upstreamFamily]; - if (upstream && localHosts.has(upstream.host) && upstream.port === config.PORT) { - const url = `${upstream.protocol}://${upstream.host}:${upstream.port}`; - const envVar = upstreamFamily === 'openai' ? 'OPENAI_BASE_URL' : 'ANTHROPIC_BASE_URL'; - throw new Error( - `${envVar} points back to ccxray (${url}); unset it before starting ccxray.\n` + - 'Pass --allow-upstream-loop or set CCXRAY_ALLOW_UPSTREAM_LOOP=1 to allow this.' - ); + // Check all upstreams that could loop back: the agent's primary upstream + // plus any user-configured ChatGPT upstream (since PR #6 promoted + // chatgpt_base_url to first-class, a misconfigured CHATGPT_BASE_URL would + // otherwise silently loop with only a startup warn). + const candidates = [ + { key: upstreamFamily, upstream: config.UPSTREAMS[upstreamFamily], envVar: upstreamFamily === 'openai' ? 'OPENAI_BASE_URL' : 'ANTHROPIC_BASE_URL' }, + ]; + const chatgpt = config.UPSTREAMS.openaiChatGPT; + if (chatgpt && chatgpt.source !== 'chatgpt-default') { + candidates.push({ key: 'openaiChatGPT', upstream: chatgpt, envVar: chatgpt.source || 'CHATGPT_BASE_URL' }); + } + for (const { upstream, envVar } of candidates) { + if (upstream && localHosts.has(upstream.host) && upstream.port === config.PORT) { + const url = `${upstream.protocol}://${upstream.host}:${upstream.port}`; + throw new Error( + `${envVar} points back to ccxray (${url}); unset it before starting ccxray.\n` + + 'Pass --allow-upstream-loop or set CCXRAY_ALLOW_UPSTREAM_LOOP=1 to allow this.' + ); + } } } diff --git a/server/openai-session.js b/server/openai-session.js new file mode 100644 index 0000000..59368b6 --- /dev/null +++ b/server/openai-session.js @@ -0,0 +1,94 @@ +'use strict'; + +const store = require('./store'); + +function getCodexRawSessionId() { + return 'codex-raw'; +} + +function firstHeader(headers, name) { + const value = headers?.[name.toLowerCase()]; + return Array.isArray(value) ? value[0] : value; +} + +function parseCodexTurnMetadata(headers) { + const raw = firstHeader(headers, 'x-codex-turn-metadata'); + if (!raw) return null; + try { + const parsed = JSON.parse(String(raw)); + return parsed && typeof parsed === 'object' ? parsed : null; + } catch { + return null; + } +} + +function getCodexSessionId(headers, parsedBody) { + const direct = firstHeader(headers, 'session_id') || firstHeader(headers, 'x-openai-session-id'); + if (direct) return String(direct); + const turnMetadata = parseCodexTurnMetadata(headers); + if (typeof turnMetadata?.session_id === 'string') return turnMetadata.session_id; + return parsedBody?.metadata?.session_id || null; +} + +function getOpenAIAgentTypeFromHeaders(headers) { + const subagent = firstHeader(headers, 'x-openai-subagent'); + const direct = firstHeader(headers, 'x-openai-agent-type') || firstHeader(headers, 'x-codex-agent-type'); + const turnMetadata = parseCodexTurnMetadata(headers); + const value = direct || turnMetadata?.agent_type || subagent; + if (!value) return null; + const normalized = String(value).toLowerCase(); + if (normalized === 'explorer' || normalized === 'worker' || normalized === 'default') return normalized; + return null; +} + +function isOpenAISubagent(headers, parsedBody) { + const raw = firstHeader(headers, 'x-openai-subagent'); + if (raw != null) { + const text = String(raw).toLowerCase(); + return text !== '0' && text !== 'false' && text !== 'no'; + } + return Boolean(parsedBody?.metadata?.is_subagent || parsedBody?.metadata?.isSubagent); +} + +// Used by both HTTP (parsedBody present) and WebSocket upgrade (no body) paths. +// When parsedBody is null but headers carry session_id, we synthesize a minimal +// body so store.detectSession honors header-derived sessions consistently — +// otherwise WS upgrades and body-less HTTP retries would collapse into the +// `codex-raw` bucket. +function detectOpenAISession(headers, parsedBody) { + const sessionId = getCodexSessionId(headers, parsedBody); + if (!sessionId) { + return { sessionId: getCodexRawSessionId(), isNewSession: false, inferred: true }; + } + const bodyForDetection = parsedBody || { metadata: { session_id: sessionId } }; + const detected = store.detectSession(bodyForDetection); + return { + sessionId: detected.sessionId || sessionId || getCodexRawSessionId(), + isNewSession: detected.isNewSession || false, + inferred: detected.inferred || false, + }; +} + +function withCodexMetadata(parsedBody, headers) { + if (!parsedBody || typeof parsedBody !== 'object') return parsedBody; + const sessionId = getCodexSessionId(headers, parsedBody); + const agentType = getOpenAIAgentTypeFromHeaders(headers); + if (!sessionId && !agentType) return parsedBody; + const metadata = parsedBody.metadata && typeof parsedBody.metadata === 'object' + ? { ...parsedBody.metadata } + : {}; + if (sessionId && !metadata.session_id) metadata.session_id = sessionId; + if (agentType && !metadata.agent_type) metadata.agent_type = agentType; + return { ...parsedBody, metadata }; +} + +module.exports = { + getCodexRawSessionId, + firstHeader, + parseCodexTurnMetadata, + getCodexSessionId, + getOpenAIAgentTypeFromHeaders, + isOpenAISubagent, + detectOpenAISession, + withCodexMetadata, +}; diff --git a/server/providers.js b/server/providers.js index 4c812d2..334845f 100644 --- a/server/providers.js +++ b/server/providers.js @@ -28,9 +28,16 @@ const AGENT_PROVIDERS = Object.freeze({ upstream: 'openai', installHint: ' npm install -g @openai/codex', createLaunch({ port, args, env }) { + const proxyBaseUrl = `http://localhost:${port}/v1`; return { bin: 'codex', - args: ['-c', `openai_base_url="http://localhost:${port}/v1"`, ...args], + args: [ + '-c', + `openai_base_url="${proxyBaseUrl}"`, + '-c', + `chatgpt_base_url="${proxyBaseUrl}"`, + ...args, + ], env: { ...env }, }; }, diff --git a/server/restore.js b/server/restore.js index da389cb..4159a60 100644 --- a/server/restore.js +++ b/server/restore.js @@ -217,10 +217,19 @@ async function buildVersionIndex() { const b0 = isOpenAI ? '' : (sys[0]?.text || ''); const b2 = isOpenAI ? (typeof sys === 'string' ? sys : JSON.stringify(sys, null, 2)) : (sys[2]?.text || ''); const m = b0.match(/cc_version=(\S+?)[; ]/); - const { key: agentKey, label: agentLabel } = isOpenAI - ? extractPromptAgentType('openai', { instructions: b2 }) - : extractAgentType(sys); const sysHash = filename.replace(/^sys_/, '').replace(/^openai_instructions_/, '').replace(/\.json$/, ''); + let agentInfo = null; + if (isOpenAI) { + const meta = await config.storage.readShared(`openai_prompt_meta_${sysHash}.json`) + .then(JSON.parse) + .catch(() => null); + if (meta?.agentKey) { + agentInfo = { key: meta.agentKey, label: meta.agentLabel || meta.agentKey }; + } + } + const { key: agentKey, label: agentLabel } = agentInfo || (isOpenAI + ? extractPromptAgentType('openai', { instructions: b2 }) + : extractAgentType(sys)); if (sysHash && agentKey) sysHashToAgentKey.set(sysHash, agentKey); if (b2.length >= (isOpenAI ? 1 : 500)) { const coreText = isOpenAI ? b2 : (splitB2IntoBlocks(b2).coreInstructions || ''); diff --git a/server/url-sanitize.js b/server/url-sanitize.js new file mode 100644 index 0000000..3d2312e --- /dev/null +++ b/server/url-sanitize.js @@ -0,0 +1,47 @@ +'use strict'; + +/** + * Strip ccxray's own auth query parameters from URLs before: + * - forwarding to upstream (would leak ccxray's AUTH_TOKEN to OpenAI/Anthropic) + * - writing to entry logs on disk (~/.ccxray/logs/{id}_req.json) + * - broadcasting to dashboard via SSE + * + * Only strips ccxray-recognized auth params (currently `?token=`). Upstream API + * keys travel in Authorization headers, not query params, so this never affects + * upstream authentication. + * + * See server/auth.js — `?token=` is the supported query-param auth. + */ + +const AUTH_QUERY_PARAMS = Object.freeze(['token']); + +function stripAuthParams(url) { + if (typeof url !== 'string' || url.length === 0) return url; + const qIdx = url.indexOf('?'); + if (qIdx === -1) return url; + + const pathname = url.slice(0, qIdx); + const query = url.slice(qIdx + 1); + + // Cheap pre-check: if none of the auth param names appear at all, return as-is. + let mightContain = false; + for (const name of AUTH_QUERY_PARAMS) { + if (query.indexOf(name) !== -1) { mightContain = true; break; } + } + if (!mightContain) return url; + + const params = new URLSearchParams(query); + let modified = false; + for (const name of AUTH_QUERY_PARAMS) { + if (params.has(name)) { + params.delete(name); + modified = true; + } + } + if (!modified) return url; + + const remaining = params.toString(); + return remaining ? pathname + '?' + remaining : pathname; +} + +module.exports = { stripAuthParams, AUTH_QUERY_PARAMS }; diff --git a/server/ws-proxy.js b/server/ws-proxy.js new file mode 100644 index 0000000..e9b32c5 --- /dev/null +++ b/server/ws-proxy.js @@ -0,0 +1,474 @@ +'use strict'; + +const WebSocket = require('ws'); +const config = require('./config'); +const store = require('./store'); +const helpers = require('./helpers'); +const { broadcast, broadcastSessionStatus } = require('./sse-broadcast'); +const { AUTH_TOKEN } = require('./auth'); +const { stripAuthParams } = require('./url-sanitize'); +const { + detectOpenAISession, + getCodexSessionId, + getOpenAIAgentTypeFromHeaders, + parseCodexTurnMetadata, +} = require('./openai-session'); + +const HOP_BY_HOP_HEADERS = new Set([ + 'connection', + 'keep-alive', + 'proxy-authenticate', + 'proxy-authorization', + 'te', + 'trailer', + 'transfer-encoding', + 'upgrade', +]); + +const WS_HANDSHAKE_HEADERS = new Set([ + 'sec-websocket-accept', + 'sec-websocket-extensions', + 'sec-websocket-key', + 'sec-websocket-protocol', + 'sec-websocket-version', +]); + +const wss = new WebSocket.Server({ + noServer: true, + handleProtocols(protocols) { + return protocols.values().next().value || false; + }, +}); + +const DEFAULT_IDLE_TIMEOUT_MS = parseInt(process.env.CCXRAY_WS_IDLE_TIMEOUT_MS || '60000', 10); +const IDLE_TIMEOUT_MS = Number.isFinite(DEFAULT_IDLE_TIMEOUT_MS) && DEFAULT_IDLE_TIMEOUT_MS > 0 + ? DEFAULT_IDLE_TIMEOUT_MS + : 60000; +const DEFAULT_MAX_QUEUE_BYTES = parseInt(process.env.CCXRAY_WS_MAX_QUEUE_BYTES || String(4 * 1024 * 1024), 10); +const MAX_QUEUE_BYTES = Number.isFinite(DEFAULT_MAX_QUEUE_BYTES) && DEFAULT_MAX_QUEUE_BYTES > 0 + ? DEFAULT_MAX_QUEUE_BYTES + : 4 * 1024 * 1024; +const OPENAI_WS_PATHS = new Set(['/v1/responses', '/v1/realtime']); +const WS_CLOSE_REASON_MAX_BYTES = 120; // WS spec caps reason at 123 bytes; leave headroom. + +function isUpgradeRequest(req) { + return String(req.headers.upgrade || '').toLowerCase() === 'websocket'; +} + +function isOpenAIWebSocket(req, upstream) { + const pathname = (req.url || '').split('?')[0]; + return upstream?.provider === 'openai' && OPENAI_WS_PATHS.has(pathname) && isUpgradeRequest(req); +} + +function writeSocketResponse(socket, statusCode, reason) { + if (socket.destroyed) return; + socket.write( + `HTTP/1.1 ${statusCode} ${reason}\r\n` + + 'Connection: close\r\n' + + 'Content-Length: 0\r\n' + + '\r\n' + ); + socket.destroy(); +} + +function isAuthorized(req) { + if (!AUTH_TOKEN) return true; + const authHeader = req.headers.authorization || ''; + if (authHeader === `Bearer ${AUTH_TOKEN}`) return true; + try { + const url = new URL(req.url, `http://${req.headers.host || 'localhost'}`); + return url.searchParams.get('token') === AUTH_TOKEN; + } catch { + return false; + } +} + +function buildWebSocketHeaders(clientHeaders, upstream) { + const headers = {}; + const connectionTokens = String(clientHeaders.connection || '') + .split(',') + .map(token => token.trim().toLowerCase()) + .filter(Boolean); + + for (const [name, value] of Object.entries(clientHeaders)) { + const lower = name.toLowerCase(); + if (HOP_BY_HOP_HEADERS.has(lower)) continue; + if (WS_HANDSHAKE_HEADERS.has(lower)) continue; + if (connectionTokens.includes(lower)) continue; + if (lower === 'host') continue; + headers[name] = value; + } + headers.host = upstream.host; + return headers; +} + +function getWebSocketProtocols(clientHeaders) { + const raw = clientHeaders['sec-websocket-protocol']; + if (!raw) return undefined; + return String(raw).split(',').map(v => v.trim()).filter(Boolean); +} + +function getWebSocketUrl(upstream, requestUrl) { + const protocol = upstream.protocol === 'https' ? 'wss' : 'ws'; + const path = config.joinUpstreamPath(upstream, requestUrl); + return `${protocol}://${upstream.host}:${upstream.port}${path}`; +} + +function getWorkspaceCwd(turnMetadata) { + const workspaces = turnMetadata?.workspaces; + if (!workspaces || typeof workspaces !== 'object') return null; + if (typeof workspaces.cwd === 'string') return workspaces.cwd; + if (typeof workspaces.current === 'string') return workspaces.current; + const first = Object.values(workspaces).find(v => typeof v === 'string'); + if (first) return first; + const nested = Object.values(workspaces).find(v => v && typeof v === 'object' && typeof v.cwd === 'string'); + return nested?.cwd || null; +} + +function safeSend(target, data, isBinary) { + if (target.readyState === WebSocket.OPEN) { + target.send(data, { binary: isBinary }, () => {}); + } +} + +// Buffer one frame for a CONNECTING upstream. Returns true on overflow so the +// caller can shut the pair down instead of growing memory unboundedly. +function bufferOrSend(target, state, data, isBinary) { + if (target.readyState === WebSocket.OPEN) { + target.send(data, { binary: isBinary }, () => {}); + return { overflow: false }; + } + if (target.readyState !== WebSocket.CONNECTING) { + return { overflow: false }; + } + const size = frameSize(data); + if (state.bufferedBytes + size > state.maxBytes) { + return { overflow: true, size }; + } + state.queue.push({ data, isBinary }); + state.bufferedBytes += size; + return { overflow: false }; +} + +function flushQueue(target, state) { + while (state.queue.length && target.readyState === WebSocket.OPEN) { + const item = state.queue.shift(); + state.bufferedBytes = Math.max(0, state.bufferedBytes - frameSize(item.data)); + target.send(item.data, { binary: item.isBinary }, () => {}); + } +} + +function frameSize(data) { + if (Buffer.isBuffer(data)) return data.length; + if (typeof data === 'string') return Buffer.byteLength(data); + if (data instanceof ArrayBuffer) return data.byteLength; + if (ArrayBuffer.isView(data)) return data.byteLength; + return 0; +} + +// WS close reason field is capped at 123 bytes by RFC 6455; the ws library +// throws RangeError when it overflows. Clamp by codepoint to stay UTF-8 safe. +function clampWsReason(reason) { + const str = typeof reason === 'string' ? reason : String(reason || ''); + if (Buffer.byteLength(str) <= WS_CLOSE_REASON_MAX_BYTES) return str; + let bytes = 0; + let out = ''; + for (const ch of str) { + const len = Buffer.byteLength(ch); + if (bytes + len > WS_CLOSE_REASON_MAX_BYTES) break; + bytes += len; + out += ch; + } + return out; +} + +function normalizeCloseCode(code) { + if (!Number.isInteger(code)) return 1000; + if (code >= 3000 && code <= 4999) return code; + if (code >= 1000 && code <= 1014 && ![1004, 1005, 1006].includes(code)) return code; + return 1000; +} + +async function recordWebSocketEntry(ctx, result) { + const elapsed = ((Date.now() - ctx.startTime) / 1000).toFixed(1); + const reqLog = { + transport: 'websocket', + capture: 'transport-only', + method: ctx.req.method, + url: stripAuthParams(ctx.req.url), + endpoint: ctx.endpoint, + headers: { + openaiBeta: ctx.req.headers['openai-beta'] || null, + sessionId: ctx.sessionId, + agentType: ctx.agentType, + }, + metadata: ctx.turnMetadata || null, + }; + const resLog = { + transport: 'websocket', + capture: 'transport-only', + frameCounts: ctx.frameCounts, + byteCounts: ctx.byteCounts, + close: result.close || null, + error: result.error || null, + }; + + const reqWritePromise = config.storage.write(ctx.id, '_req.json', JSON.stringify(reqLog)) + .catch(e => console.error('Write ws req.json failed:', e.message)); + const resWritePromise = config.storage.write(ctx.id, '_res.json', JSON.stringify(resLog)) + .catch(e => console.error('Write ws res.json failed:', e.message)); + + const responseMetadata = { + transport: 'websocket', + capture: 'transport-only', + endpoint: ctx.endpoint, + frameCounts: ctx.frameCounts, + byteCounts: ctx.byteCounts, + close: result.close || null, + error: result.error || null, + }; + const entry = { + id: ctx.id, + ts: ctx.ts, + sessionId: ctx.sessionId, + method: ctx.req.method, + url: stripAuthParams(ctx.req.url), + provider: 'openai', + agent: 'codex', + req: reqLog, + res: resLog, + elapsed, + status: result.status, + isSSE: false, + tokens: null, + usage: null, + cost: null, + responseMetadata, + maxContext: null, + cwd: store.sessionMeta[ctx.sessionId]?.cwd || null, + receivedAt: ctx.startTime, + duplicateToolCalls: null, + model: null, + msgCount: 0, + toolCount: 0, + toolCalls: {}, + isSubagent: ctx.agentType === 'explorer' || ctx.agentType === 'worker', + sessionInferred: ctx.sessionInferred, + title: 'Codex WebSocket session', + stopReason: result.close?.reason || result.error?.message || null, + toolFail: false, + sysHash: null, + toolsHash: null, + coreHash: null, + thinkingStripped: undefined, + }; + entry.hasCredential = helpers.entryHasCredential(entry) || undefined; + entry.toolSources = helpers.buildToolSources(entry) || undefined; + entry._writePromise = Promise.all([reqWritePromise, resWritePromise]); + store.entries.push(entry); + store.trimEntries(); + broadcast(entry); + + const indexLine = JSON.stringify({ + id: entry.id, + ts: entry.ts, + sessionId: entry.sessionId, + provider: entry.provider, + agent: entry.agent, + model: entry.model, + msgCount: entry.msgCount, + toolCount: entry.toolCount, + toolCalls: entry.toolCalls, + isSubagent: entry.isSubagent, + sessionInferred: entry.sessionInferred, + cwd: entry.cwd, + isSSE: entry.isSSE, + usage: entry.usage, + cost: entry.cost, + maxContext: entry.maxContext, + responseMetadata, + stopReason: entry.stopReason, + title: entry.title, + thinkingDuration: null, + toolFail: entry.toolFail, + elapsed, + status: entry.status, + receivedAt: entry.receivedAt, + sysHash: null, + toolsHash: null, + coreHash: null, + thinkingStripped: entry.thinkingStripped, + hasCredential: entry.hasCredential, + toolSources: entry.toolSources, + }); + config.storage.appendIndex(indexLine + '\n').catch(e => console.error('Write ws index failed:', e.message)); + entry.req = null; + entry.res = null; + entry._loaded = false; +} + +function handleWebSocketUpgrade(req, socket, head) { + const upstream = config.getUpstreamForRequestAndHeaders(req.url, req.headers); + if (!isOpenAIWebSocket(req, upstream)) { + writeSocketResponse(socket, 404, 'Not Found'); + return true; + } + if (!isAuthorized(req)) { + writeSocketResponse(socket, 401, 'Unauthorized'); + return true; + } + + const id = helpers.timestamp(); + const ts = helpers.taipeiTime(); + const startTime = Date.now(); + const detected = detectOpenAISession(req.headers, null); + const sessionId = detected.sessionId; + const turnMetadata = parseCodexTurnMetadata(req.headers); + const agentType = getOpenAIAgentTypeFromHeaders(req.headers); + const cwd = getWorkspaceCwd(turnMetadata); + const endpoint = (req.url || '').split('?')[0]; + + if (!store.sessionMeta[sessionId]) store.sessionMeta[sessionId] = {}; + store.sessionMeta[sessionId].provider = 'openai'; + store.sessionMeta[sessionId].lastSeenAt = Date.now(); + if (cwd) store.sessionMeta[sessionId].cwd = cwd; + if (agentType) store.sessionMeta[sessionId].agentType = agentType; + store.activeRequests[sessionId] = (store.activeRequests[sessionId] || 0) + 1; + broadcastSessionStatus(sessionId); + if (detected.isNewSession) store.printSessionBanner(sessionId); + + wss.handleUpgrade(req, socket, head, clientWs => { + const upstreamUrl = getWebSocketUrl(upstream, stripAuthParams(req.url)); + const upstreamWs = new WebSocket(upstreamUrl, getWebSocketProtocols(req.headers), { + headers: buildWebSocketHeaders(req.headers, upstream), + }); + const ctx = { + id, + ts, + startTime, + req, + sessionId, + agentType, + turnMetadata, + endpoint, + sessionInferred: detected.inferred || !getCodexSessionId(req.headers, null), + frameCounts: { clientToUpstream: 0, upstreamToClient: 0 }, + byteCounts: { clientToUpstream: 0, upstreamToClient: 0 }, + }; + // Only client→upstream needs queueing; clientWs is already OPEN inside this + // callback so upstream→client can always send directly via safeSend(). + const upstreamBuffer = { queue: [], bufferedBytes: 0, maxBytes: MAX_QUEUE_BYTES }; + let finalized = false; + let idleTimer = null; + + function refreshIdleTimer() { + clearTimeout(idleTimer); + idleTimer = setTimeout(() => { + closeBoth(1011, 'idle timeout'); + finalize({ status: 504, error: { message: `WebSocket idle timeout after ${IDLE_TIMEOUT_MS}ms` } }); + }, IDLE_TIMEOUT_MS); + if (typeof idleTimer.unref === 'function') idleTimer.unref(); + } + + function finalize(result) { + if (finalized) return; + finalized = true; + clearTimeout(idleTimer); + store.activeRequests[sessionId] = Math.max(0, (store.activeRequests[sessionId] || 1) - 1); + if (store.sessionMeta[sessionId]) store.sessionMeta[sessionId].lastStopReason = null; + broadcastSessionStatus(sessionId); + recordWebSocketEntry(ctx, result).catch(e => console.error('Record ws entry failed:', e.message)); + } + + function closeBoth(code, reason) { + const closeCode = normalizeCloseCode(code); + const closeReason = clampWsReason(reason); + if (clientWs.readyState === WebSocket.OPEN || clientWs.readyState === WebSocket.CONNECTING) { + clientWs.close(closeCode, closeReason); + } + if (upstreamWs.readyState === WebSocket.OPEN || upstreamWs.readyState === WebSocket.CONNECTING) { + upstreamWs.close(closeCode, closeReason); + } + } + + // Arm the idle timer before the upstream handshake completes so a stalled + // upstream (accepts TCP, never sends 101) is bounded by IDLE_TIMEOUT_MS. + refreshIdleTimer(); + + clientWs.on('message', (data, isBinary) => { + refreshIdleTimer(); + ctx.frameCounts.clientToUpstream += 1; + ctx.byteCounts.clientToUpstream += frameSize(data); + const result = bufferOrSend(upstreamWs, upstreamBuffer, data, isBinary); + if (result.overflow) { + closeBoth(1009, 'client buffer exceeded'); + finalize({ status: 507, error: { message: `Client send buffer exceeded ${MAX_QUEUE_BYTES} bytes while upstream was connecting` } }); + } + }); + upstreamWs.on('message', (data, isBinary) => { + refreshIdleTimer(); + ctx.frameCounts.upstreamToClient += 1; + ctx.byteCounts.upstreamToClient += frameSize(data); + safeSend(clientWs, data, isBinary); + }); + clientWs.on('ping', data => { + refreshIdleTimer(); + if (upstreamWs.readyState === WebSocket.OPEN) upstreamWs.ping(data, undefined, () => {}); + }); + upstreamWs.on('ping', data => { + refreshIdleTimer(); + if (clientWs.readyState === WebSocket.OPEN) clientWs.ping(data, undefined, () => {}); + }); + clientWs.on('pong', data => { + refreshIdleTimer(); + if (upstreamWs.readyState === WebSocket.OPEN) upstreamWs.pong(data, undefined, () => {}); + }); + upstreamWs.on('pong', data => { + refreshIdleTimer(); + if (clientWs.readyState === WebSocket.OPEN) clientWs.pong(data, undefined, () => {}); + }); + upstreamWs.on('open', () => { + refreshIdleTimer(); + flushQueue(upstreamWs, upstreamBuffer); + }); + upstreamWs.on('unexpected-response', (request, response) => { + // ws gives us ownership when a listener is present: drain and destroy + // both ends so the underlying HTTP socket doesn't leak. + const statusCode = response.statusCode || 502; + try { response.resume(); } catch {} + try { request.destroy(); } catch {} + closeBoth(1011, `upstream ${statusCode}`); + finalize({ status: statusCode, error: { message: `Upstream WebSocket rejected handshake: ${statusCode}` } }); + }); + clientWs.on('close', (code, reason) => { + const reasonStr = reason.toString(); + if (upstreamWs.readyState === WebSocket.OPEN || upstreamWs.readyState === WebSocket.CONNECTING) { + upstreamWs.close(normalizeCloseCode(code), clampWsReason(reasonStr)); + } + finalize({ status: 101, close: { side: 'client', code, reason: reasonStr } }); + }); + upstreamWs.on('close', (code, reason) => { + const reasonStr = reason.toString(); + if (clientWs.readyState === WebSocket.OPEN || clientWs.readyState === WebSocket.CONNECTING) { + clientWs.close(normalizeCloseCode(code), clampWsReason(reasonStr)); + } + finalize({ status: 101, close: { side: 'upstream', code, reason: reasonStr } }); + }); + clientWs.on('error', err => { + closeBoth(1011, 'client error'); + finalize({ status: 502, error: { side: 'client', message: err.message } }); + }); + upstreamWs.on('error', err => { + closeBoth(1011, 'upstream error'); + finalize({ status: 502, error: { side: 'upstream', message: err.message } }); + }); + }); + return true; +} + +module.exports = { + handleWebSocketUpgrade, + buildWebSocketHeaders, + isOpenAIWebSocket, + normalizeCloseCode, +}; diff --git a/test/auth-token-strip.e2e.test.js b/test/auth-token-strip.e2e.test.js new file mode 100644 index 0000000..0d1f250 --- /dev/null +++ b/test/auth-token-strip.e2e.test.js @@ -0,0 +1,201 @@ +'use strict'; + +const { describe, it, after } = require('node:test'); +const assert = require('node:assert/strict'); +const fs = require('fs'); +const http = require('http'); +const os = require('os'); +const path = require('path'); +const { spawn } = require('child_process'); + +const SERVER_SCRIPT = path.join(__dirname, '..', 'server', 'index.js'); +const tmpDirs = []; + +async function findFreePort() { + return new Promise(resolve => { + const server = http.createServer(); + server.listen(0, () => { + const port = server.address().port; + server.close(() => resolve(port)); + }); + }); +} + +function waitForPort(port, timeoutMs = 8000) { + return new Promise((resolve, reject) => { + const start = Date.now(); + const check = () => { + const req = http.get(`http://localhost:${port}/_api/health`, { timeout: 1000 }, res => { + res.resume(); + res.on('end', () => resolve()); + }); + req.on('error', () => { + if (Date.now() - start > timeoutMs) return reject(new Error('proxy did not start')); + setTimeout(check, 100); + }); + req.on('timeout', () => { + req.destroy(); + if (Date.now() - start > timeoutMs) return reject(new Error('proxy did not start')); + setTimeout(check, 100); + }); + }; + check(); + }); +} + +function killAndWait(child) { + return new Promise(resolve => { + if (!child || child.exitCode !== null) return resolve(); + child.on('exit', resolve); + child.kill('SIGTERM'); + setTimeout(() => { + try { child.kill('SIGKILL'); } catch {} + resolve(); + }, 3000); + }); +} + +function makeTmpHome() { + const home = fs.mkdtempSync(path.join(os.tmpdir(), 'ccxray-auth-strip-')); + tmpDirs.push(home); + return home; +} + +function postJson(port, urlPath, body, headers = {}) { + return new Promise((resolve, reject) => { + const data = JSON.stringify(body); + const req = http.request({ + hostname: 'localhost', port, path: urlPath, method: 'POST', + headers: { + 'content-type': 'application/json', + 'content-length': Buffer.byteLength(data), + 'x-api-key': 'sk-fake', + 'anthropic-version': '2023-06-01', + ...headers, + }, + }, res => { + const chunks = []; + res.on('data', c => chunks.push(c)); + res.on('end', () => resolve({ statusCode: res.statusCode, body: Buffer.concat(chunks).toString() })); + }); + req.on('error', reject); + req.write(data); req.end(); + }); +} + +function subscribeSSE(port, token, durationMs) { + return new Promise((resolve, reject) => { + const req = http.get({ + hostname: 'localhost', port, path: '/_events', + headers: { Authorization: `Bearer ${token}` }, + }, res => { + if (res.statusCode !== 200) { res.resume(); return reject(new Error(`SSE status ${res.statusCode}`)); } + const events = []; + let buf = ''; + res.on('data', chunk => { + buf += chunk.toString(); + let idx; + while ((idx = buf.indexOf('\n\n')) !== -1) { + events.push(buf.slice(0, idx)); + buf = buf.slice(idx + 2); + } + }); + setTimeout(() => { req.destroy(); resolve(events); }, durationMs); + }); + req.on('error', reject); + }); +} + +function recursiveFiles(dir) { + const out = []; + for (const entry of fs.readdirSync(dir, { withFileTypes: true })) { + const full = path.join(dir, entry.name); + if (entry.isDirectory()) out.push(...recursiveFiles(full)); + else if (entry.isFile()) out.push(full); + } + return out; +} + +describe('AUTH_TOKEN ?token= query param strip (a5d28f0)', () => { + after(() => { + for (const d of tmpDirs) fs.rmSync(d, { recursive: true, force: true }); + }); + + it('strips ?token= from upstream URL, SSE broadcasts, disk logs, and console', async () => { + const SECRET = 'verify-secret-d4f7'; + const upstreamPort = await findFreePort(); + const proxyPort = await findFreePort(); + const home = makeTmpHome(); + + const upstreamRequests = []; + const upstream = http.createServer((req, res) => { + upstreamRequests.push({ url: req.url }); + res.writeHead(200, { 'content-type': 'application/json' }); + res.end(JSON.stringify({ + id: 'msg_fake', type: 'message', role: 'assistant', + model: 'claude-3-haiku-20240307', stop_reason: 'end_turn', stop_sequence: null, + content: [{ type: 'text', text: 'ok' }], + usage: { input_tokens: 1, output_tokens: 1 }, + })); + }); + await new Promise(resolve => upstream.listen(upstreamPort, '127.0.0.1', resolve)); + + let stdout = ''; + let stderr = ''; + const child = spawn(process.execPath, [SERVER_SCRIPT, '--port', String(proxyPort), '--no-browser'], { + env: { + ...process.env, + ANTHROPIC_TEST_HOST: '127.0.0.1', + ANTHROPIC_TEST_PORT: String(upstreamPort), + ANTHROPIC_TEST_PROTOCOL: 'http', + AUTH_TOKEN: SECRET, + CCXRAY_HOME: home, + BROWSER: 'none', + RESTORE_DAYS: '0', + }, + stdio: ['ignore', 'pipe', 'pipe'], + }); + child.stdout.on('data', d => { stdout += d.toString(); }); + child.stderr.on('data', d => { stderr += d.toString(); }); + + try { + await waitForPort(proxyPort); + + const ssePromise = subscribeSSE(proxyPort, SECRET, 2500); + await new Promise(r => setTimeout(r, 150)); + + const resp = await postJson(proxyPort, `/v1/messages?token=${SECRET}&trace=keepme`, { + model: 'claude-3-haiku-20240307', + max_tokens: 8, + messages: [{ role: 'user', content: 'hello' }], + }); + assert.equal(resp.statusCode, 200, 'proxy should forward and respond 200'); + + const sseEvents = await ssePromise; + await new Promise(r => setTimeout(r, 200)); + + assert.equal(upstreamRequests.length, 1, 'upstream should receive exactly one request'); + assert.equal(upstreamRequests[0].url, '/v1/messages?trace=keepme', + 'upstream URL must have ?token= stripped and other params preserved'); + + for (const e of sseEvents) { + assert.ok(!e.includes(SECRET), `SSE event must not contain secret: ${e.slice(0, 200)}`); + } + const sawEntryUrl = sseEvents.some(e => e.includes('"url"') && e.includes('/v1/messages')); + assert.ok(sawEntryUrl, 'expected at least one SSE event with the entry url'); + + const allFiles = recursiveFiles(home); + assert.ok(allFiles.length >= 3, `expected logs on disk, found ${allFiles.length}`); + for (const f of allFiles) { + const txt = fs.readFileSync(f, 'utf8'); + assert.ok(!txt.includes(SECRET), `secret leaked to disk file: ${f}`); + } + + assert.ok(!stdout.includes(SECRET), 'secret leaked to ccxray stdout'); + assert.ok(!stderr.includes(SECRET), 'secret leaked to ccxray stderr'); + } finally { + upstream.close(); + await killAndWait(child); + } + }); +}); diff --git a/test/config.test.js b/test/config.test.js index 94b2c63..fee5daa 100644 --- a/test/config.test.js +++ b/test/config.test.js @@ -221,7 +221,12 @@ describe('provider-aware OpenAI upstream configuration', () => { providers: { messages: c.getProviderForRequest('/v1/messages'), responses: c.getProviderForRequest('/v1/responses'), + realtime: c.getProviderForRequest('/v1/realtime?model=gpt-realtime'), models: c.getProviderForRequest('/v1/models?client_version=0.125.0'), + chatgptCodexApps: c.getProviderForRequest('/v1/api/codex/apps'), + chatgptAnalytics: c.getProviderForRequest('/v1/codex/analytics-events/events'), + chatgptPlugins: c.getProviderForRequest('/v1/plugins/featured?platform=codex'), + chatgptConnectors: c.getProviderForRequest('/v1/connectors/directory/list?external_logos=true'), }, chatgpt: { source: c.UPSTREAMS.openaiChatGPT.source, @@ -235,6 +240,7 @@ describe('provider-aware OpenAI upstream configuration', () => { responses: c.joinUpstreamPath(c.getUpstream('openai'), '/v1/responses'), completions: c.joinUpstreamPath(c.getUpstream('openai'), '/chat/completions'), chatgptResponses: c.joinUpstreamPath(c.getUpstreamForRequestAndHeaders('/v1/responses', {'chatgpt-account-id': 'acct'}), '/v1/responses'), + chatgptApps: c.joinUpstreamPath(c.getUpstreamForRequestAndHeaders('/v1/api/codex/apps'), '/v1/api/codex/apps'), }, })); `; @@ -258,7 +264,12 @@ describe('provider-aware OpenAI upstream configuration', () => { }); assert.equal(result.providers.messages, 'anthropic'); assert.equal(result.providers.responses, 'openai'); + assert.equal(result.providers.realtime, 'openai'); assert.equal(result.providers.models, 'openai'); + assert.equal(result.providers.chatgptCodexApps, 'openai'); + assert.equal(result.providers.chatgptAnalytics, 'openai'); + assert.equal(result.providers.chatgptPlugins, 'openai'); + assert.equal(result.providers.chatgptConnectors, 'openai'); assert.deepEqual(result.chatgpt, { host: 'chatgpt.com', port: 443, @@ -268,6 +279,7 @@ describe('provider-aware OpenAI upstream configuration', () => { stripPathPrefix: '/v1', }); assert.equal(result.paths.chatgptResponses, '/backend-api/codex/responses'); + assert.equal(result.paths.chatgptApps, '/backend-api/codex/api/codex/apps'); }); it('OPENAI_BASE_URL overrides only the OpenAI upstream and preserves /v1 request paths', async () => { @@ -298,6 +310,7 @@ describe('provider-aware OpenAI upstream configuration', () => { assert.equal(result.chatgpt.source, 'CHATGPT_BASE_URL'); assert.equal(result.chatgpt.basePath, '/backend-api/codex'); assert.equal(result.paths.chatgptResponses, '/backend-api/codex/responses'); + assert.equal(result.paths.chatgptApps, '/backend-api/codex/api/codex/apps'); }); it('OPENAI_TEST_* overrides OPENAI_BASE_URL', async () => { diff --git a/test/providers.test.js b/test/providers.test.js index c0a6af5..ca44a29 100644 --- a/test/providers.test.js +++ b/test/providers.test.js @@ -40,6 +40,8 @@ describe('agent provider registry', () => { assert.deepEqual(launch.args, [ '-c', 'openai_base_url="http://localhost:5577/v1"', + '-c', + 'chatgpt_base_url="http://localhost:5577/v1"', 'exec', 'hello', ]); @@ -57,6 +59,8 @@ describe('agent provider registry', () => { assert.deepEqual(launch.args, [ '-c', 'openai_base_url="http://localhost:5577/v1"', + '-c', + 'chatgpt_base_url="http://localhost:5577/v1"', 'app', '/repo', ]); diff --git a/test/socket-error-survival.e2e.test.js b/test/socket-error-survival.e2e.test.js new file mode 100644 index 0000000..c70df97 --- /dev/null +++ b/test/socket-error-survival.e2e.test.js @@ -0,0 +1,261 @@ +'use strict'; + +const { describe, it, after } = require('node:test'); +const assert = require('node:assert/strict'); +const fs = require('fs'); +const http = require('http'); +const os = require('os'); +const path = require('path'); +const { spawn } = require('child_process'); + +const SERVER_SCRIPT = path.join(__dirname, '..', 'server', 'index.js'); +const tmpDirs = []; + +async function findFreePort() { + return new Promise(resolve => { + const server = http.createServer(); + server.listen(0, () => { + const port = server.address().port; + server.close(() => resolve(port)); + }); + }); +} + +function waitForPort(port, timeoutMs = 8000) { + return new Promise((resolve, reject) => { + const start = Date.now(); + const check = () => { + const req = http.get(`http://localhost:${port}/_api/health`, { timeout: 1000 }, res => { + res.resume(); + res.on('end', () => resolve()); + }); + req.on('error', () => { + if (Date.now() - start > timeoutMs) return reject(new Error('proxy did not start')); + setTimeout(check, 100); + }); + req.on('timeout', () => { + req.destroy(); + if (Date.now() - start > timeoutMs) return reject(new Error('proxy did not start')); + setTimeout(check, 100); + }); + }; + check(); + }); +} + +function killAndWait(child) { + return new Promise(resolve => { + if (!child || child.exitCode !== null) return resolve(); + child.on('exit', resolve); + child.kill('SIGTERM'); + setTimeout(() => { + try { child.kill('SIGKILL'); } catch {} + resolve(); + }, 3000); + }); +} + +// Slow SSE responder that can optionally self-destruct after N chunks +// (simulates upstream EPIPE). Controlled via ?destroy=N on the request URL. +function makeSlowUpstream() { + return http.createServer((req, res) => { + const url = new URL(req.url, 'http://x'); + const destroyAfter = parseInt(url.searchParams.get('destroy') || '0', 10); + res.writeHead(200, { + 'content-type': 'text/event-stream', + 'cache-control': 'no-cache', + 'connection': 'keep-alive', + }); + res.write('event: message_start\n'); + res.write('data: ' + JSON.stringify({ + type: 'message_start', + message: { + id: 'msg_fake', type: 'message', role: 'assistant', model: 'claude-3-haiku-20240307', + content: [], stop_reason: null, stop_sequence: null, + usage: { input_tokens: 1, output_tokens: 0 }, + }, + }) + '\n\n'); + res.write('event: content_block_start\n'); + res.write('data: ' + JSON.stringify({ + type: 'content_block_start', index: 0, + content_block: { type: 'text', text: '' }, + }) + '\n\n'); + + let i = 0; + const interval = setInterval(() => { + if (res.destroyed || res.writableEnded) { clearInterval(interval); return; } + try { + res.write('event: content_block_delta\n'); + res.write('data: ' + JSON.stringify({ + type: 'content_block_delta', index: 0, + delta: { type: 'text_delta', text: `chunk${i}` }, + }) + '\n\n'); + } catch (_) { clearInterval(interval); return; } + i++; + if (destroyAfter > 0 && i >= destroyAfter) { + clearInterval(interval); + try { res.socket.destroy(); } catch (_) {} + return; + } + if (i > 50) { + clearInterval(interval); + try { + res.write('event: message_stop\ndata: {"type":"message_stop"}\n\n'); + res.end(); + } catch (_) {} + } + }, 25); + }); +} + +function probe(port) { + return new Promise(resolve => { + const body = JSON.stringify({ + model: 'claude-3-haiku-20240307', + max_tokens: 8, + messages: [{ role: 'user', content: 'probe' }], + }); + const req = http.request({ + hostname: 'localhost', port, path: '/v1/messages', method: 'POST', + headers: { + 'content-type': 'application/json', + 'content-length': Buffer.byteLength(body), + 'x-api-key': 'sk-fake', + 'anthropic-version': '2023-06-01', + }, + }, res => { + let len = 0; + res.on('data', c => { len += c.length; }); + res.on('end', () => resolve({ statusCode: res.statusCode, len })); + }); + req.on('error', e => resolve({ error: e.message })); + req.write(body); req.end(); + setTimeout(() => resolve({ error: 'timeout' }), 8000); + }); +} + +function abortMidStream(port) { + return new Promise(resolve => { + const body = JSON.stringify({ + model: 'claude-3-haiku-20240307', + max_tokens: 256, + stream: true, + messages: [{ role: 'user', content: 'abort me' }], + }); + const req = http.request({ + hostname: 'localhost', port, path: '/v1/messages', method: 'POST', + headers: { + 'content-type': 'application/json', + 'content-length': Buffer.byteLength(body), + 'x-api-key': 'sk-fake', + 'anthropic-version': '2023-06-01', + 'accept': 'text/event-stream', + }, + }, res => { + let count = 0; + res.on('data', chunk => { + count += (chunk.toString().match(/\n\n/g) || []).length; + if (count >= 2) { + req.destroy(); + resolve('aborted'); + } + }); + res.on('error', () => resolve('res-error')); + res.on('end', () => resolve('ended')); + }); + req.on('error', () => resolve('req-error')); + req.write(body); req.end(); + setTimeout(() => { try { req.destroy(); } catch (_) {} resolve('timeout'); }, 5000); + }); +} + +function upstreamDestroyRequest(port) { + return new Promise(resolve => { + const body = JSON.stringify({ + model: 'claude-3-haiku-20240307', + max_tokens: 256, + stream: true, + messages: [{ role: 'user', content: 'destroy me' }], + }); + const req = http.request({ + hostname: 'localhost', port, path: '/v1/messages?destroy=3', method: 'POST', + headers: { + 'content-type': 'application/json', + 'content-length': Buffer.byteLength(body), + 'x-api-key': 'sk-fake', + 'anthropic-version': '2023-06-01', + 'accept': 'text/event-stream', + }, + }, res => { + let bytes = 0; + res.on('data', c => { bytes += c.length; }); + res.on('end', () => resolve({ outcome: 'ended', bytes })); + res.on('error', () => resolve({ outcome: 'res-error', bytes })); + }); + req.on('error', e => resolve({ outcome: 'req-error', error: e.code || e.message })); + req.write(body); req.end(); + setTimeout(() => { try { req.destroy(); } catch (_) {} resolve({ outcome: 'timeout' }); }, 5000); + }); +} + +describe('proxy survives client and upstream socket errors (efd4a70)', () => { + after(() => { + for (const d of tmpDirs) fs.rmSync(d, { recursive: true, force: true }); + }); + + it('survives both client mid-stream abort and upstream socket destroy', async () => { + const upstreamPort = await findFreePort(); + const proxyPort = await findFreePort(); + const home = fs.mkdtempSync(path.join(os.tmpdir(), 'ccxray-socket-survival-')); + tmpDirs.push(home); + + const upstream = makeSlowUpstream(); + await new Promise(resolve => upstream.listen(upstreamPort, '127.0.0.1', resolve)); + + let stderr = ''; + const child = spawn(process.execPath, [SERVER_SCRIPT, '--port', String(proxyPort), '--no-browser'], { + env: { + ...process.env, + ANTHROPIC_TEST_HOST: '127.0.0.1', + ANTHROPIC_TEST_PORT: String(upstreamPort), + ANTHROPIC_TEST_PROTOCOL: 'http', + CCXRAY_HOME: home, + BROWSER: 'none', + RESTORE_DAYS: '0', + }, + stdio: ['ignore', 'pipe', 'pipe'], + }); + child.stderr.on('data', d => { stderr += d.toString(); }); + + try { + await waitForPort(proxyPort); + + // Case 1: client aborts mid-stream + const abortOutcome = await abortMidStream(proxyPort); + assert.equal(abortOutcome, 'aborted', `expected aborted outcome, got ${abortOutcome}`); + await new Promise(r => setTimeout(r, 500)); + assert.equal(child.exitCode, null, 'proxy must be alive after client abort'); + const probe1 = await probe(proxyPort); + assert.equal(probe1.statusCode, 200, `probe-1 should be HTTP 200, got ${JSON.stringify(probe1)}`); + + // Case 2: upstream destroys TCP socket mid-response + const upstreamOutcome = await upstreamDestroyRequest(proxyPort); + assert.ok( + upstreamOutcome.outcome === 'ended' || upstreamOutcome.outcome === 'res-error', + `expected ended/res-error after upstream destroy, got ${JSON.stringify(upstreamOutcome)}` + ); + await new Promise(r => setTimeout(r, 500)); + assert.equal(child.exitCode, null, 'proxy must be alive after upstream destroy'); + const probe2 = await probe(proxyPort); + assert.equal(probe2.statusCode, 200, `probe-2 should be HTTP 200, got ${JSON.stringify(probe2)}`); + + assert.ok( + !/uncaughtException|^TypeError|^Error:.*\n\s+at /m.test(stderr), + `stderr should not contain uncaught exception traces. Tail: ${stderr.slice(-500)}` + ); + } finally { + upstream.close(); + await killAndWait(child); + } + }); +}); diff --git a/test/startup.test.js b/test/startup.test.js index 8d7c983..2aaa987 100644 --- a/test/startup.test.js +++ b/test/startup.test.js @@ -407,12 +407,15 @@ describe('codex desktop app launcher mode', () => { "const fs = require('fs');", "const http = require('http');", "const argv = process.argv.slice(2);", - "const configIdx = argv.indexOf('-c');", - "const configArg = configIdx === -1 ? null : argv[configIdx + 1];", - "const match = configArg && configArg.match(/openai_base_url=\"([^\"]+)\"/);", - "const openaiBaseUrl = match ? match[1] : null;", + "const configArgs = argv.flatMap((arg, idx) => arg === '-c' ? [argv[idx + 1]] : []).filter(Boolean);", + "const configArg = configArgs.find(arg => arg.includes('openai_base_url')) || null;", + "const chatgptConfigArg = configArgs.find(arg => arg.includes('chatgpt_base_url')) || null;", + "const openaiMatch = configArg && configArg.match(/openai_base_url=\"([^\"]+)\"/);", + "const chatgptMatch = chatgptConfigArg && chatgptConfigArg.match(/chatgpt_base_url=\"([^\"]+)\"/);", + "const openaiBaseUrl = openaiMatch ? openaiMatch[1] : null;", + "const chatgptBaseUrl = chatgptMatch ? chatgptMatch[1] : null;", "function writeCapture(extra = {}) {", - " fs.writeFileSync(process.env.CCXRAY_TEST_CODEX_CAPTURE, JSON.stringify({ argv, configArg, openaiBaseUrl, cwd: process.cwd(), ...extra }));", + " fs.writeFileSync(process.env.CCXRAY_TEST_CODEX_CAPTURE, JSON.stringify({ argv, configArgs, configArg, chatgptConfigArg, openaiBaseUrl, chatgptBaseUrl, cwd: process.cwd(), ...extra }));", "}", "function probeHealth(baseUrl) {", " return new Promise(resolve => {", @@ -463,10 +466,13 @@ describe('codex desktop app launcher mode', () => { assert.deepEqual(capture.argv, [ '-c', `openai_base_url="http://localhost:${port}/v1"`, + '-c', + `chatgpt_base_url="http://localhost:${port}/v1"`, 'app', workspacePath, ]); assert.equal(capture.openaiBaseUrl, `http://localhost:${port}/v1`); + assert.equal(capture.chatgptBaseUrl, `http://localhost:${port}/v1`); assert.equal(capture.healthOk, true); } finally { fs.rmSync(fakeBin, { recursive: true, force: true }); @@ -918,6 +924,37 @@ describe('OpenAI Responses raw capture', () => { assert.ok(explorer.coreHash); assert.ok(explorer.b2Len > 0); }); + + it('restores Codex prompt agent type from metadata sidecar instead of instruction text', async () => { + const sessionId = 'codex-system-prompt-restore-001'; + const requestBody = JSON.stringify({ + model: 'gpt-5.5', + instructions: 'Inspect the codebase and report findings.', + input: 'inspect prompt restore', + }); + + await sendOpenAIResponsesRequest(proxyPort, requestBody, '/v1/responses?trace=sysprompt-restore', { + session_id: sessionId, + 'x-openai-subagent': 'worker', + }); + await new Promise(r => setTimeout(r, 500)); + + await killAndWait(proxyChild); + proxyChild = spawnServer(['--port', String(proxyPort)], { + env: { + OPENAI_TEST_HOST: 'localhost', + OPENAI_TEST_PORT: String(mockPort), + OPENAI_TEST_PROTOCOL: 'http', + }, + }); + await waitForPort(proxyPort); + await new Promise(r => setTimeout(r, 500)); + + const data = await httpGet(proxyPort, '/_api/sysprompt/versions'); + const worker = data.versions.find(v => v.agentKey === 'worker' && v.b2Len === 'Inspect the codebase and report findings.'.length); + assert.ok(worker, 'expected restored Codex worker prompt version'); + assert.equal(worker.agentLabel, 'Codex Worker'); + }); }); // ── Intercept lifecycle E2E ────────────────────────────────────────── @@ -1763,4 +1800,39 @@ describe('Proxy loop startup guard', () => { await killAndWait(proxyChild); } }); + + it('exits when CHATGPT_BASE_URL points back to itself (codex ChatGPT-auth)', async () => { + const proxyPort = await findFreePort(); + const { stderr, code } = await spawnAndCollect(['--port', String(proxyPort), 'codex'], 10000, { + CHATGPT_BASE_URL: `http://localhost:${proxyPort}/v1`, + }); + + assert.equal(code, 1); + assert.ok(stderr.includes('CHATGPT_BASE_URL points back to ccxray'), `Expected loop error, got: ${stderr}`); + assert.ok(stderr.includes('--allow-upstream-loop'), `Expected override hint, got: ${stderr}`); + }); + + it('does NOT exit when CHATGPT_BASE_URL is left at the built-in default (chatgpt.com)', async () => { + // Sanity check: built-in default must never self-loop. + const proxyPort = await findFreePort(); + + const stubBinDir = path.join(TEST_HOME, 'stub-bin-chatgpt-default'); + fs.mkdirSync(stubBinDir, { recursive: true }); + const stubCodex = path.join(stubBinDir, 'codex'); + fs.writeFileSync(stubCodex, '#!/bin/sh\nsleep 60\n', { mode: 0o755 }); + + const proxyChild = spawnServer(['--port', String(proxyPort), 'codex'], { + env: { + PATH: `${stubBinDir}${path.delimiter}${process.env.PATH}`, + }, + }); + + try { + await waitForPort(proxyPort); + const health = await httpGet(proxyPort, '/_api/health'); + assert.deepEqual(health, { ok: true }); + } finally { + await killAndWait(proxyChild); + } + }); }); diff --git a/test/url-sanitize.test.js b/test/url-sanitize.test.js new file mode 100644 index 0000000..3d7953e --- /dev/null +++ b/test/url-sanitize.test.js @@ -0,0 +1,77 @@ +'use strict'; + +const { describe, it } = require('node:test'); +const assert = require('node:assert/strict'); + +const { stripAuthParams, AUTH_QUERY_PARAMS } = require('../server/url-sanitize'); + +describe('stripAuthParams', () => { + it('returns the URL unchanged when no query string is present', () => { + assert.equal(stripAuthParams('/v1/messages'), '/v1/messages'); + assert.equal(stripAuthParams('/v1/responses'), '/v1/responses'); + assert.equal(stripAuthParams('/'), '/'); + }); + + it('returns the URL unchanged when no auth param is present', () => { + assert.equal(stripAuthParams('/v1/models?client_version=0.125.0'), '/v1/models?client_version=0.125.0'); + assert.equal(stripAuthParams('/v1/realtime?model=gpt-realtime'), '/v1/realtime?model=gpt-realtime'); + }); + + it('strips ?token= as the sole query param and drops the question mark', () => { + assert.equal(stripAuthParams('/v1/messages?token=secret'), '/v1/messages'); + assert.equal(stripAuthParams('/_api/entries?token=abc123'), '/_api/entries'); + }); + + it('strips token when mixed with other params, preserving the rest', () => { + assert.equal( + stripAuthParams('/v1/realtime?model=gpt-realtime&token=secret'), + '/v1/realtime?model=gpt-realtime' + ); + assert.equal( + stripAuthParams('/v1/realtime?token=secret&model=gpt-realtime'), + '/v1/realtime?model=gpt-realtime' + ); + assert.equal( + stripAuthParams('/path?a=1&token=secret&b=2'), + '/path?a=1&b=2' + ); + }); + + it('strips an empty token value as well', () => { + assert.equal(stripAuthParams('/path?token='), '/path'); + assert.equal(stripAuthParams('/path?token=&other=1'), '/path?other=1'); + }); + + it('handles repeated token params (deletes all occurrences)', () => { + assert.equal(stripAuthParams('/path?token=a&token=b'), '/path'); + assert.equal(stripAuthParams('/path?token=a&keep=1&token=b'), '/path?keep=1'); + }); + + it('does NOT strip params whose names merely contain the substring "token"', () => { + // Upstream API keys may live in headers, but if any future query param + // contained "token" as a substring (e.g. continuation_token), we must + // leave it alone. + assert.equal(stripAuthParams('/path?continuation_token=xyz'), '/path?continuation_token=xyz'); + assert.equal(stripAuthParams('/path?access_token_hint=1'), '/path?access_token_hint=1'); + }); + + it('preserves URL encoding of values it keeps', () => { + const url = '/path?greeting=hello%20world&token=secret'; + assert.equal(stripAuthParams(url), '/path?greeting=hello+world'); + // URLSearchParams normalizes %20 → + on round-trip; that's fine for forwarded + // paths because + is also a valid space encoding in query strings. + }); + + it('returns non-string inputs unchanged', () => { + assert.equal(stripAuthParams(undefined), undefined); + assert.equal(stripAuthParams(null), null); + assert.equal(stripAuthParams(''), ''); + assert.equal(stripAuthParams(123), 123); + }); + + it('exports a frozen AUTH_QUERY_PARAMS list', () => { + assert.ok(Array.isArray(AUTH_QUERY_PARAMS)); + assert.ok(AUTH_QUERY_PARAMS.includes('token')); + assert.ok(Object.isFrozen(AUTH_QUERY_PARAMS)); + }); +}); diff --git a/test/websocket-headers-forward.e2e.test.js b/test/websocket-headers-forward.e2e.test.js new file mode 100644 index 0000000..c71b69f --- /dev/null +++ b/test/websocket-headers-forward.e2e.test.js @@ -0,0 +1,136 @@ +'use strict'; + +const { describe, it, after } = require('node:test'); +const assert = require('node:assert/strict'); +const fs = require('fs'); +const http = require('http'); +const os = require('os'); +const path = require('path'); +const { spawn } = require('child_process'); +const WebSocket = require('ws'); + +const SERVER_SCRIPT = path.join(__dirname, '..', 'server', 'index.js'); +const tmpDirs = []; + +async function findFreePort() { + return new Promise(resolve => { + const server = http.createServer(); + server.listen(0, () => { + const port = server.address().port; + server.close(() => resolve(port)); + }); + }); +} + +function waitForPort(port, timeoutMs = 8000) { + return new Promise((resolve, reject) => { + const start = Date.now(); + const check = () => { + const req = http.get(`http://localhost:${port}/_api/health`, { timeout: 1000 }, res => { + res.resume(); + res.on('end', () => resolve()); + }); + req.on('error', () => { + if (Date.now() - start > timeoutMs) return reject(new Error('proxy did not start')); + setTimeout(check, 100); + }); + req.on('timeout', () => { + req.destroy(); + if (Date.now() - start > timeoutMs) return reject(new Error('proxy did not start')); + setTimeout(check, 100); + }); + }; + check(); + }); +} + +function killAndWait(child) { + return new Promise(resolve => { + if (!child || child.exitCode !== null) return resolve(); + child.on('exit', resolve); + child.kill('SIGTERM'); + setTimeout(() => { + try { child.kill('SIGKILL'); } catch {} + resolve(); + }, 3000); + }); +} + +describe('WebSocket upgrade header forwarding + ChatGPT routing (PR #29, 0ff5507)', () => { + after(() => { + for (const d of tmpDirs) fs.rmSync(d, { recursive: true, force: true }); + }); + + it('forwards chatgpt-account-id and custom headers to ChatGPT upstream when header is present', async () => { + const upstreamPort = await findFreePort(); + const proxyPort = await findFreePort(); + const home = fs.mkdtempSync(path.join(os.tmpdir(), 'ccxray-ws-headers-')); + tmpDirs.push(home); + + let capturedHeaders = null; + let capturedPath = null; + const upstreamHttp = http.createServer(); + const upstreamWss = new WebSocket.Server({ noServer: true }); + upstreamHttp.on('upgrade', (req, socket, head) => { + capturedHeaders = { ...req.headers }; + capturedPath = req.url; + upstreamWss.handleUpgrade(req, socket, head, ws => { + setTimeout(() => { try { ws.send('hello'); ws.close(1000, 'bye'); } catch (_) {} }, 50); + }); + }); + await new Promise(resolve => upstreamHttp.listen(upstreamPort, '127.0.0.1', resolve)); + + const child = spawn(process.execPath, [SERVER_SCRIPT, '--port', String(proxyPort), '--no-browser'], { + env: { + ...process.env, + // Route both OpenAI and ChatGPT upstream to our fake server. + // When the client sends chatgpt-account-id, config.js routes to CHATGPT_BASE_URL. + OPENAI_BASE_URL: `http://127.0.0.1:${upstreamPort}/v1`, + CHATGPT_BASE_URL: `http://127.0.0.1:${upstreamPort}/backend-api/codex`, + CCXRAY_HOME: home, + BROWSER: 'none', + RESTORE_DAYS: '0', + }, + stdio: ['ignore', 'pipe', 'pipe'], + }); + + try { + await waitForPort(proxyPort); + + const customHeaders = { + 'chatgpt-account-id': 'acct-test-12345', + 'openai-beta': 'realtime=v1', + 'x-mark': 'verify-canary', + }; + const wsUrl = `ws://localhost:${proxyPort}/v1/realtime?model=gpt-4o-realtime-preview`; + const ws = new WebSocket(wsUrl, { headers: customHeaders }); + + await new Promise((resolve, reject) => { + const t = setTimeout(() => reject(new Error('WS open timeout')), 4000); + ws.once('open', () => { clearTimeout(t); resolve(); }); + ws.once('error', e => { clearTimeout(t); reject(e); }); + }); + await new Promise(r => setTimeout(r, 200)); + ws.close(); + await new Promise(r => setTimeout(r, 300)); + + assert.ok(capturedHeaders, 'upstream must have received a WS upgrade'); + assert.equal(capturedHeaders['chatgpt-account-id'], 'acct-test-12345', + 'chatgpt-account-id must be forwarded intact'); + assert.equal(capturedHeaders['openai-beta'], 'realtime=v1', + 'openai-beta must be forwarded intact'); + assert.equal(capturedHeaders['x-mark'], 'verify-canary', + 'custom headers must pass through'); + assert.equal(capturedHeaders['host'], '127.0.0.1', + 'host must be rewritten to upstream hostname (no port — buildWebSocketHeaders sets host = upstream.host)'); + + // ChatGPT routing: when chatgpt-account-id is present, ccxray strips /v1 + // prefix and prepends /backend-api/codex (the CHATGPT_BASE_URL path). + assert.match(capturedPath, /^\/backend-api\/codex\/realtime\?model=gpt-4o-realtime-preview$/, + `upstream path should be ChatGPT-rewritten, got ${capturedPath}`); + } finally { + upstreamHttp.close(); + await killAndWait(child); + } + }); +}); diff --git a/test/websocket-proxy.test.js b/test/websocket-proxy.test.js new file mode 100644 index 0000000..8b822b9 --- /dev/null +++ b/test/websocket-proxy.test.js @@ -0,0 +1,459 @@ +'use strict'; + +const { describe, it, beforeEach, afterEach } = require('node:test'); +const assert = require('node:assert/strict'); +const { spawn } = require('child_process'); +const http = require('http'); +const fs = require('fs'); +const path = require('path'); +const os = require('os'); +const net = require('net'); +const WebSocket = require('ws'); + +const SERVER_SCRIPT = path.resolve(__dirname, '..', 'server', 'index.js'); + +async function findFreePort() { + return new Promise((resolve, reject) => { + const srv = net.createServer(); + srv.listen(0, () => { + const port = srv.address().port; + srv.close(() => resolve(port)); + }); + srv.on('error', reject); + }); +} + +function spawnServer(args, env) { + const child = spawn(process.execPath, [SERVER_SCRIPT, ...args], { + env: { ...process.env, BROWSER: 'none', ...env }, + stdio: ['ignore', 'pipe', 'pipe'], + }); + let stdout = ''; + let stderr = ''; + child.stdout.on('data', d => { stdout += d; }); + child.stderr.on('data', d => { stderr += d; }); + child.getOutput = () => ({ stdout, stderr }); + return child; +} + +function waitForPort(port, timeoutMs = 8000) { + return new Promise((resolve, reject) => { + const start = Date.now(); + const check = () => { + const req = http.get(`http://localhost:${port}/_api/health`, { timeout: 1000 }, res => { + res.resume(); + res.on('end', () => resolve()); + }); + req.on('error', () => { + if (Date.now() - start > timeoutMs) return reject(new Error('timeout waiting for proxy')); + setTimeout(check, 100); + }); + req.on('timeout', () => { + req.destroy(); + if (Date.now() - start > timeoutMs) return reject(new Error('timeout waiting for proxy')); + setTimeout(check, 100); + }); + }; + check(); + }); +} + +function killAndWait(child) { + return new Promise(resolve => { + if (!child || child.exitCode !== null) return resolve(); + child.on('exit', resolve); + child.kill('SIGTERM'); + setTimeout(() => { + try { child.kill('SIGKILL'); } catch {} + resolve(); + }, 3000); + }); +} + +async function waitForIndexEntry(logsDir, predicate, timeoutMs = 4000) { + const indexPath = path.join(logsDir, 'index.ndjson'); + const start = Date.now(); + while (Date.now() - start < timeoutMs) { + if (fs.existsSync(indexPath)) { + const entries = fs.readFileSync(indexPath, 'utf8') + .trim() + .split('\n') + .filter(Boolean) + .map(line => JSON.parse(line)); + const match = entries.find(predicate); + if (match) return match; + } + await new Promise(r => setTimeout(r, 100)); + } + throw new Error('timeout waiting for index entry'); +} + +describe('OpenAI Responses WebSocket proxy', () => { + let testHome; + let upstreamServer; + let upstreamWss; + let upstreamPort; + let proxyChild; + let proxyPort; + + beforeEach(async () => { + testHome = fs.mkdtempSync(path.join(os.tmpdir(), 'ccxray-ws-test-')); + upstreamServer = http.createServer(); + await new Promise(resolve => upstreamServer.listen(0, resolve)); + upstreamPort = upstreamServer.address().port; + proxyPort = await findFreePort(); + }); + + afterEach(async () => { + await killAndWait(proxyChild); + if (upstreamWss) await new Promise(resolve => upstreamWss.close(resolve)); + if (upstreamServer?.listening) await new Promise(resolve => upstreamServer.close(resolve)); + fs.rmSync(testHome, { recursive: true, force: true }); + }); + + async function startProxy(extraEnv = {}) { + proxyChild = spawnServer(['--port', String(proxyPort)], { + CCXRAY_HOME: testHome, + OPENAI_TEST_HOST: 'localhost', + OPENAI_TEST_PORT: String(upstreamPort), + OPENAI_TEST_PROTOCOL: 'http', + ...extraEnv, + }); + await waitForPort(proxyPort); + } + + it('forwards text and binary frames and records a transport entry by session_id header', async () => { + const received = { headers: null, text: null, binary: null }; + upstreamWss = new WebSocket.Server({ server: upstreamServer, path: '/v1/responses' }); + upstreamWss.on('connection', (ws, req) => { + received.headers = req.headers; + ws.on('message', (data, isBinary) => { + if (isBinary) { + received.binary = Buffer.from(data); + ws.send(data, { binary: true }); + } else { + received.text = data.toString(); + ws.send(`echo:${received.text}`); + } + }); + }); + await startProxy(); + + const sessionId = '019e0ab2-bcc2-7b72-a1bf-980edc2ea943'; + const ws = new WebSocket(`ws://localhost:${proxyPort}/v1/responses`, { + headers: { + authorization: 'Bearer test-openai-key', + 'openai-beta': 'responses_websockets=2026-02-06', + session_id: sessionId, + 'x-codex-turn-metadata': JSON.stringify({ + session_id: sessionId, + agent_type: 'worker', + workspaces: { cwd: '/tmp/ccxray-ws' }, + }), + }, + }); + + const messages = []; + ws.on('message', data => messages.push(Buffer.isBuffer(data) ? data : Buffer.from(data))); + await new Promise((resolve, reject) => { + ws.on('open', resolve); + ws.on('error', reject); + }); + ws.send('hello'); + ws.send(Buffer.from([1, 2, 3]), { binary: true }); + await new Promise(resolve => setTimeout(resolve, 200)); + ws.close(1000, 'done'); + await new Promise(resolve => ws.on('close', resolve)); + + assert.equal(received.headers['openai-beta'], 'responses_websockets=2026-02-06'); + assert.equal(received.headers.session_id, sessionId); + assert.equal(received.headers.authorization, 'Bearer test-openai-key'); + assert.equal(received.text, 'hello'); + assert.deepEqual(received.binary, Buffer.from([1, 2, 3])); + assert.ok(messages.some(msg => msg.toString() === 'echo:hello')); + assert.ok(messages.some(msg => msg.equals(Buffer.from([1, 2, 3])))); + + const entry = await waitForIndexEntry(path.join(testHome, 'logs'), e => e.sessionId === sessionId); + assert.equal(entry.provider, 'openai'); + assert.equal(entry.agent, 'codex'); + assert.equal(entry.isSubagent, true); + assert.equal(entry.cwd, '/tmp/ccxray-ws'); + assert.equal(entry.responseMetadata.transport, 'websocket'); + assert.equal(entry.responseMetadata.capture, 'transport-only'); + assert.equal(entry.responseMetadata.frameCounts.clientToUpstream, 2); + assert.equal(entry.responseMetadata.frameCounts.upstreamToClient, 2); + + const reqLog = JSON.parse(fs.readFileSync(path.join(testHome, 'logs', `${entry.id}_req.json`), 'utf8')); + assert.equal(reqLog.transport, 'websocket'); + assert.equal(reqLog.headers.sessionId, sessionId); + assert.equal(reqLog.headers.agentType, 'worker'); + }); + + it('closes the client and records an error entry when upstream rejects the handshake', async () => { + upstreamServer.on('upgrade', (_req, socket) => { + socket.write('HTTP/1.1 401 Unauthorized\r\nConnection: close\r\nContent-Length: 0\r\n\r\n'); + socket.destroy(); + }); + await startProxy(); + + const sessionId = '019e0ab2-bcc2-7b72-a1bf-980edc2ea944'; + const ws = new WebSocket(`ws://localhost:${proxyPort}/v1/responses`, { + headers: { + 'openai-beta': 'responses_websockets=2026-02-06', + session_id: sessionId, + }, + }); + + const close = await new Promise(resolve => { + ws.on('close', (code, reason) => resolve({ code, reason: reason.toString() })); + }); + assert.equal(close.code, 1011); + + const entry = await waitForIndexEntry(path.join(testHome, 'logs'), e => e.sessionId === sessionId); + assert.equal(entry.status, 401); + assert.match(entry.responseMetadata.error.message, /rejected handshake: 401/); + }); + + it('routes /v1/realtime WebSocket upgrades to the OpenAI upstream', async () => { + const received = { url: null, text: null }; + upstreamWss = new WebSocket.Server({ server: upstreamServer, path: '/v1/realtime' }); + upstreamWss.on('connection', (ws, req) => { + received.url = req.url; + ws.on('message', data => { + received.text = data.toString(); + ws.send('realtime-ok'); + }); + }); + await startProxy(); + + const sessionId = '019e0ab2-bcc2-7b72-a1bf-980edc2ea945'; + const ws = new WebSocket(`ws://localhost:${proxyPort}/v1/realtime?model=gpt-realtime`, { + headers: { + 'openai-beta': 'realtime=v1', + session_id: sessionId, + }, + }); + const messages = []; + ws.on('message', data => messages.push(data.toString())); + await new Promise((resolve, reject) => { + ws.on('open', resolve); + ws.on('error', reject); + }); + ws.send('hello realtime'); + await new Promise(resolve => setTimeout(resolve, 200)); + ws.close(1000, 'done'); + await new Promise(resolve => ws.on('close', resolve)); + + assert.equal(received.url, '/v1/realtime?model=gpt-realtime'); + assert.equal(received.text, 'hello realtime'); + assert.ok(messages.includes('realtime-ok')); + + const entry = await waitForIndexEntry(path.join(testHome, 'logs'), e => e.sessionId === sessionId); + assert.equal(entry.provider, 'openai'); + assert.equal(entry.responseMetadata.transport, 'websocket'); + assert.equal(entry.responseMetadata.endpoint, '/v1/realtime'); + const reqLog = JSON.parse(fs.readFileSync(path.join(testHome, 'logs', `${entry.id}_req.json`), 'utf8')); + assert.equal(reqLog.url, '/v1/realtime?model=gpt-realtime'); + assert.equal(reqLog.endpoint, '/v1/realtime'); + }); + + it('records the entry and keeps running when the client disconnects abnormally', async () => { + upstreamWss = new WebSocket.Server({ server: upstreamServer, path: '/v1/responses' }); + upstreamWss.on('connection', ws => { + ws.on('message', data => ws.send(`echo:${data.toString()}`)); + }); + await startProxy(); + + const sessionId = '019e0ab2-bcc2-7b72-a1bf-980edc2ea946'; + const ws = new WebSocket(`ws://localhost:${proxyPort}/v1/responses`, { + headers: { + 'openai-beta': 'responses_websockets=2026-02-06', + session_id: sessionId, + }, + }); + + await new Promise((resolve, reject) => { + ws.on('open', resolve); + ws.on('error', reject); + }); + ws.send('hello'); + await new Promise(resolve => setTimeout(resolve, 100)); + ws.terminate(); + + const entry = await waitForIndexEntry(path.join(testHome, 'logs'), e => e.sessionId === sessionId); + assert.equal(entry.status, 101); + assert.equal(entry.responseMetadata.close.side, 'client'); + assert.equal(entry.responseMetadata.close.code, 1006); + + await waitForPort(proxyPort); + assert.equal(proxyChild.exitCode, null); + }); + + it('closes idle WebSocket pairs and records a timeout entry', async () => { + upstreamWss = new WebSocket.Server({ server: upstreamServer, path: '/v1/responses' }); + upstreamWss.on('connection', () => {}); + await startProxy({ CCXRAY_WS_IDLE_TIMEOUT_MS: '100' }); + + const sessionId = '019e0ab2-bcc2-7b72-a1bf-980edc2ea947'; + const ws = new WebSocket(`ws://localhost:${proxyPort}/v1/responses`, { + headers: { + 'openai-beta': 'responses_websockets=2026-02-06', + session_id: sessionId, + }, + }); + await new Promise((resolve, reject) => { + ws.on('open', resolve); + ws.on('error', reject); + }); + const close = await new Promise(resolve => { + ws.on('close', (code, reason) => resolve({ code, reason: reason.toString() })); + }); + + assert.equal(close.code, 1011); + assert.equal(close.reason, 'idle timeout'); + + const entry = await waitForIndexEntry(path.join(testHome, 'logs'), e => e.sessionId === sessionId); + assert.equal(entry.status, 504); + assert.match(entry.responseMetadata.error.message, /idle timeout/); + }); + + it('fires idle timeout when upstream stalls before completing handshake', async () => { + // Accept TCP but never reply: simulates a slowloris-style upstream. Track + // the raw sockets so afterEach's upstreamServer.close() doesn't block on + // upgraded connections that we own directly. + const stalledSockets = []; + upstreamServer.on('upgrade', (_req, socket) => { + stalledSockets.push(socket); + }); + try { + await startProxy({ CCXRAY_WS_IDLE_TIMEOUT_MS: '150' }); + + const sessionId = '019e0ab2-bcc2-7b72-a1bf-980edc2ea947'; + const ws = new WebSocket(`ws://localhost:${proxyPort}/v1/responses`, { + headers: { + 'openai-beta': 'responses_websockets=2026-02-06', + session_id: sessionId, + }, + }); + await new Promise((resolve, reject) => { + ws.on('open', resolve); + ws.on('error', reject); + }); + const close = await new Promise(resolve => { + ws.on('close', (code, reason) => resolve({ code, reason: reason.toString() })); + }); + + assert.equal(close.code, 1011); + assert.equal(close.reason, 'idle timeout'); + + const entry = await waitForIndexEntry(path.join(testHome, 'logs'), e => e.sessionId === sessionId); + assert.equal(entry.status, 504); + assert.match(entry.responseMetadata.error.message, /idle timeout/); + } finally { + for (const socket of stalledSockets) { + try { socket.destroy(); } catch {} + } + } + }); + + it('rejects WebSocket upgrade with 401 when AUTH_TOKEN is set and credentials are missing', async () => { + upstreamWss = new WebSocket.Server({ server: upstreamServer, path: '/v1/responses' }); + upstreamWss.on('connection', () => { + throw new Error('upstream should not be reached when auth fails'); + }); + await startProxy({ AUTH_TOKEN: 'sekret' }); + + const ws = new WebSocket(`ws://localhost:${proxyPort}/v1/responses`, { + headers: { + 'openai-beta': 'responses_websockets=2026-02-06', + session_id: 'auth-fail-001', + }, + }); + const result = await new Promise(resolve => { + ws.on('unexpected-response', (_req, res) => { + res.resume(); + resolve({ statusCode: res.statusCode }); + }); + ws.on('error', err => resolve({ error: err.message })); + }); + assert.equal(result.statusCode, 401); + }); + + it('accepts WebSocket upgrade when AUTH_TOKEN is set and bearer matches', async () => { + upstreamWss = new WebSocket.Server({ server: upstreamServer, path: '/v1/responses' }); + upstreamWss.on('connection', (ws) => { + ws.on('message', data => ws.send(`echo:${data.toString()}`)); + }); + await startProxy({ AUTH_TOKEN: 'sekret' }); + + const sessionId = '019e0ab2-bcc2-7b72-a1bf-980edc2ea948'; + const ws = new WebSocket(`ws://localhost:${proxyPort}/v1/responses`, { + headers: { + authorization: 'Bearer sekret', + 'openai-beta': 'responses_websockets=2026-02-06', + session_id: sessionId, + }, + }); + const messages = []; + ws.on('message', d => messages.push(d.toString())); + await new Promise((resolve, reject) => { + ws.on('open', resolve); + ws.on('error', reject); + }); + ws.send('ping'); + await new Promise(r => setTimeout(r, 200)); + ws.close(1000, 'done'); + await new Promise(r => ws.on('close', r)); + assert.ok(messages.includes('echo:ping')); + }); + + it('returns 404 for upgrades on non-OpenAI WebSocket paths', async () => { + await startProxy(); + const ws = new WebSocket(`ws://localhost:${proxyPort}/v1/messages`); + const result = await new Promise(resolve => { + ws.on('unexpected-response', (_req, res) => { + res.resume(); + resolve({ statusCode: res.statusCode }); + }); + ws.on('error', err => resolve({ error: err.message })); + }); + assert.equal(result.statusCode, 404); + }); + + it('forwards Sec-WebSocket-Protocol selection through to the upstream', async () => { + let upstreamProtocolHeader = null; + upstreamWss = new WebSocket.Server({ + server: upstreamServer, + path: '/v1/responses', + handleProtocols(protocols) { + return protocols.values().next().value || false; + }, + }); + const upstreamConnected = new Promise(resolve => { + upstreamWss.on('connection', (_ws, req) => { + upstreamProtocolHeader = req.headers['sec-websocket-protocol']; + resolve(); + }); + }); + await startProxy(); + + const sessionId = '019e0ab2-bcc2-7b72-a1bf-980edc2ea949'; + const ws = new WebSocket( + `ws://localhost:${proxyPort}/v1/responses`, + ['codex-v1', 'codex-v2'], + { headers: { session_id: sessionId } }, + ); + await new Promise((resolve, reject) => { + ws.on('open', resolve); + ws.on('error', reject); + }); + // Proxy → upstream handshake races the proxy → client one; wait for it. + await upstreamConnected; + + assert.equal(ws.protocol, 'codex-v1'); + // ws joins protocols with ", " when constructing the upstream request header. + assert.match(upstreamProtocolHeader || '', /codex-v1.*codex-v2/); + ws.close(1000, 'done'); + await new Promise(r => ws.on('close', r)); + }); +});