Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ No build step. No linting. Restart to apply changes.
| `server/hub.js` | Multi-project hub: lockfile (`~/.ccxray/hub.json`), discovery (with orphan port probe fallback), client registration, idle shutdown (injectable via setOnShutdown), crash auto-recovery |
| `server/auth.js` | API key auth middleware (enabled via `AUTH_TOKEN` env) |
| `server/openai-session.js` | Shared OpenAI/Codex header + session helpers (session id extraction, agent type, turn-metadata sidecar) |
| `server/ws-proxy.js` | OpenAI WebSocket transport proxy for `/v1/responses` and `/v1/realtime` upgrades. Tunables: `CCXRAY_WS_IDLE_TIMEOUT_MS` (default 60s), `CCXRAY_WS_MAX_QUEUE_BYTES` (default 4 MiB; caps client→upstream buffer while upstream is connecting) |
| `server/storage/` | Storage adapters (local filesystem, S3/R2). `statShared()` for file mtime. `supportsDelta` flag gates delta-write eligibility |
| `server/ws-proxy.js` | OpenAI WebSocket transport proxy for `/v1/responses` and `/v1/realtime` upgrades. Tracks active sessions + pending `recordWebSocketEntry` promises so `drainWebSocketProxy()` can force-finalize stragglers and await writes on shutdown. Tunables: `CCXRAY_WS_IDLE_TIMEOUT_MS` (default 60s), `CCXRAY_WS_MAX_QUEUE_BYTES` (default 4 MiB; caps client→upstream buffer while upstream is connecting) |
| `server/storage/` | Storage adapters (local filesystem, S3/R2). `statShared()` for file mtime. `supportsDelta` flag gates delta-write eligibility. The factory wraps every adapter with a write-tracker that exposes `drain()` for graceful shutdown |

### Client (`public/`)

Expand Down Expand Up @@ -90,6 +90,8 @@ ccxray claude (2nd) → discover hub via ~/.ccxray/hub.json → connect as clie
- Extra user args pass through unchanged after ccxray's injected launcher config.
- `--no-browser` only suppresses browser auto-open. The dashboard remains available on the proxy port.
- Codex's main session traffic upgrades to a WebSocket on `POST /v1/responses` (with `openai-beta: responses_websockets=*`), not `/v1/realtime`. `/v1/realtime` exists for the older Realtime API but is not what current codex uses for normal `/goal` / chat turns. When ChatGPT auth is active, codex also sends `chatgpt-account-id`, which `getUpstreamForRequestAndHeaders` (see `server/config.js`) uses to route to `CHATGPT_BASE_URL` instead of `OPENAI_BASE_URL`.
- Codex 0.133+ pings ~10 platform endpoints on startup (`/v1/plugins/*`, `/v1/ps/plugins/*`, `/v1/connectors/*`, `/v1/api/codex/apps`, `/v1/api/codex/usage`). `isCodexPlatformNoisePath` in `server/config.js` flags them; `server/index.js` forwards them with `skipEntry: true` so they don't pollute the dashboard. `/v1/codex/analytics-events/events` (the telemetry endpoint) is intentionally kept visible — a follow-up will parse turn metadata out of it.
- Graceful shutdown: `spawnStandaloneAgent`, hub idle shutdown, and SIGTERM/SIGINT handlers route through `gracefulExit(code)` in `server/index.js`. It awaits `drainWebSocketProxy()` (force-finalizes any open WS sessions, awaits their `recordWebSocketEntry` promises) then `config.storage.drain()` (awaits pending fs writes) before calling `process.exit`, bounded by a 5s safety timeout. Without this, async storage writes for WS entries lose to `process.exit` and leave 0-byte log files.

### Data Flow

Expand Down
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,23 @@ Connected clients (2):

Use `--port` to opt out and run an independent server instead.

## Codex support (Beta)

```bash
npx ccxray codex
```

Works for both API-key and ChatGPT-auth codex sessions. ChatGPT routing to `chatgpt.com/backend-api/codex` triggers automatically on codex's `chatgpt-account-id` header — no extra config. Codex's startup platform polls (plugin lists, connector directory, app metadata) are proxied but hidden from the dashboard so the timeline shows only conversation traffic.

**Beta caveats:**
- WebSocket transport (`/v1/responses`, `/v1/realtime`) captures connection-level metadata only: frame counts, byte counts, close status. Per-frame content is not decoded — codex turns show less detail in the dashboard than Claude turns do.
- Token counts, model, and duration are not yet extracted from codex's telemetry payload; a follow-up will surface these.
- Limited real-world testing compared to the Claude path.

Env vars for tuning: `OPENAI_BASE_URL`, `CHATGPT_BASE_URL`, `CCXRAY_WS_IDLE_TIMEOUT_MS`, `CCXRAY_WS_MAX_QUEUE_BYTES`. Details in [CLAUDE.md](CLAUDE.md).

File issues on [GitHub](https://github.com/lis186/ccxray/issues) — Beta means we want the rough edges reported.

## Features

### Timeline
Expand Down
19 changes: 19 additions & 0 deletions server/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,28 @@ function isChatGPTCodexPath(pathname) {
|| pathname.startsWith('/v1/codex/')
|| pathname === '/v1/plugins'
|| pathname.startsWith('/v1/plugins/')
|| pathname === '/v1/ps/plugins'
|| pathname.startsWith('/v1/ps/plugins/')
|| pathname === '/v1/connectors'
|| pathname.startsWith('/v1/connectors/');
}

// Codex 0.133+ hits a flurry of platform endpoints on startup (plugin lists,
// connector directory, app metadata, usage). They're not conversation data —
// just codex's internal RPC — and would otherwise create ~30 dashboard entries
// before the first user turn. We still proxy them; only entry creation is
// skipped. The telemetry endpoint (/v1/codex/analytics-events/events) is kept
// because a follow-up may parse turn metadata out of it.
function isCodexPlatformNoisePath(urlPath) {
const pathname = (urlPath || '').split('?')[0];
if (pathname === '/v1/plugins' || pathname.startsWith('/v1/plugins/')) return true;
if (pathname === '/v1/ps/plugins' || pathname.startsWith('/v1/ps/plugins/')) return true;
if (pathname === '/v1/connectors' || pathname.startsWith('/v1/connectors/')) return true;
if (pathname === '/v1/api/codex/apps' || pathname.startsWith('/v1/api/codex/apps/')) return true;
if (pathname === '/v1/api/codex/usage' || pathname.startsWith('/v1/api/codex/usage/')) return true;
return false;
}

function getUpstreamForRequestAndHeaders(urlPath, headers = {}) {
const pathname = (urlPath || '').split('?')[0];
if (isChatGPTCodexPath(pathname)) {
Expand Down Expand Up @@ -322,5 +340,6 @@ module.exports = {
getUpstream,
getUpstreamForRequest,
getUpstreamForRequestAndHeaders,
isCodexPlatformNoisePath,
joinUpstreamPath,
};
10 changes: 6 additions & 4 deletions server/hub.js
Original file line number Diff line number Diff line change
Expand Up @@ -368,19 +368,21 @@ function getHubStatus() {
// ── Hub route handler (mounted in server) ───────────────────────────

function handleHubRoutes(clientReq, clientRes) {
if (clientReq.url === '/_api/health' && clientReq.method === 'GET') {
const pathname = clientReq.url.split('?')[0];

if (pathname === '/_api/health' && clientReq.method === 'GET') {
clientRes.writeHead(200, { 'Content-Type': 'application/json' });
clientRes.end(JSON.stringify({ ok: true }));
return true;
}

if (clientReq.url === '/_api/hub/status' && clientReq.method === 'GET') {
if (pathname === '/_api/hub/status' && clientReq.method === 'GET') {
clientRes.writeHead(200, { 'Content-Type': 'application/json' });
clientRes.end(JSON.stringify(getHubStatus()));
return true;
}

if (clientReq.url === '/_api/hub/register' && clientReq.method === 'POST') {
if (pathname === '/_api/hub/register' && clientReq.method === 'POST') {
let body = '';
clientReq.on('data', c => { body += c; });
clientReq.on('end', () => {
Expand All @@ -398,7 +400,7 @@ function handleHubRoutes(clientReq, clientRes) {
return true;
}

if (clientReq.url === '/_api/hub/unregister' && clientReq.method === 'POST') {
if (pathname === '/_api/hub/unregister' && clientReq.method === 'POST') {
let body = '';
clientReq.on('data', c => { body += c; });
clientReq.on('end', () => {
Expand Down
39 changes: 36 additions & 3 deletions server/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const { authMiddleware } = require('./auth');
const { extractAgentType, extractPromptAgentType, splitB2IntoBlocks } = require('./system-prompt');
const { findSharedPrefix } = require('./delta-helpers');
const providers = require('./providers');
const { handleWebSocketUpgrade } = require('./ws-proxy');
const { handleWebSocketUpgrade, drainWebSocketProxy } = require('./ws-proxy');
const {
getCodexRawSessionId,
isOpenAISubagent,
Expand Down Expand Up @@ -227,6 +227,17 @@ const server = http.createServer((clientReq, clientRes) => {
return;
}

// Codex platform RPC: forward but don't create dashboard entries. These
// are codex's internal startup polls (plugin lists, connectors, apps,
// usage) — not conversation data. Without this guard, codex 0.133+
// pollutes the timeline with ~30 entries before the first user prompt.
if (config.isCodexPlatformNoisePath(clientReq.url)) {
const upstream = config.getUpstreamForRequestAndHeaders(clientReq.url, clientReq.headers);
const fwdHeaders = buildForwardHeaders(clientReq.headers, upstream);
forwardRequest({ id, ts, startTime, parsedBody, rawBody, clientReq, clientRes, fwdHeaders, reqSessionId: null, reqWritePromise: null, skipEntry: true, upstream });
return;
}

const upstream = config.getUpstreamForRequestAndHeaders(clientReq.url, clientReq.headers);
const provider = upstream.provider || 'anthropic';
if (provider === 'openai' && parsedBody) {
Expand Down Expand Up @@ -447,10 +458,25 @@ function spawnAgent(command, port, args, onExit) {
process.on('SIGTERM', () => child.kill('SIGTERM'));
}

// Drain pending WS finalize promises and storage writes before process.exit.
// Without this, codex (or the user) killing the agent races with async
// fs.writeFile calls in ws-proxy/forward, leaving 0-byte _req.json/_res.json
// for the WS upgrade entry. Bounded by a 5s safety timeout so a stuck write
// can never block shutdown.
async function gracefulExit(code) {
const deadline = new Promise(resolve => setTimeout(resolve, 5000));
const drain = (async () => {
try { await drainWebSocketProxy(); } catch (e) { console.error('WS drain failed:', e.message); }
try { await config.storage.drain(); } catch (e) { console.error('Storage drain failed:', e.message); }
})();
await Promise.race([drain, deadline]);
process.exit(code);
}

function spawnStandaloneAgent(port, command, args) {
spawnAgent(command, port, args, (code) => {
server.close();
process.exit(code);
gracefulExit(code);
});
}

Expand Down Expand Up @@ -665,7 +691,14 @@ async function startServer() {
hub.setHubPort(actualPort);
hub.writeHubLock(actualPort, process.pid);
hub.startDeadClientCheck();
const cleanup = () => { hub.deleteHubLock(); process.exit(0); };
hub.setOnShutdown(() => gracefulExit(0));
const cleanup = () => { hub.deleteHubLock(); gracefulExit(0); };
process.on('SIGTERM', cleanup);
process.on('SIGINT', cleanup);
} else if (!agentMode) {
// Standalone mode (dashboard only, no agent): drain on signal so any WS
// entries flush. Agent mode handles signals via the child exit path.
const cleanup = () => gracefulExit(0);
process.on('SIGTERM', cleanup);
process.on('SIGINT', cleanup);
}
Expand Down
16 changes: 9 additions & 7 deletions server/routes/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ function computeSettings() {
}

function handleApiRoutes(clientReq, clientRes) {
if (clientReq.url === '/_api/entries') {
const pathname = clientReq.url.split('?')[0];

if (pathname === '/_api/entries') {
const entries = store.entries.map(summarizeEntry);
const sessionTitles = Object.fromEntries(
Object.entries(store.sessionMeta).filter(([, m]) => m.title).map(([sid, m]) => [sid, m.title])
Expand All @@ -45,7 +47,7 @@ function handleApiRoutes(clientReq, clientRes) {
return true;
}

if (clientReq.method === 'GET' && clientReq.url === '/_api/settings') {
if (clientReq.method === 'GET' && pathname === '/_api/settings') {
const s = readSettings();
clientRes.writeHead(200, { 'Content-Type': 'application/json' });
clientRes.end(JSON.stringify({ ...computeSettings(), statusLine: s.statusLine }));
Expand Down Expand Up @@ -113,7 +115,7 @@ function handleApiRoutes(clientReq, clientRes) {
}

// Full entry data (req + res) — lazy loaded
const entryMatch = clientReq.url.match(/^\/_api\/entry\/(.+)$/);
const entryMatch = pathname.match(/^\/_api\/entry\/(.+)$/);
if (entryMatch) {
const id = decodeURIComponent(entryMatch[1]);
const entry = store.entries.find(e => e.id === id);
Expand All @@ -132,7 +134,7 @@ function handleApiRoutes(clientReq, clientRes) {
}

// Lazy tokenization endpoint
const tokMatch = clientReq.url.match(/^\/_api\/tokens\/(.+)$/);
const tokMatch = pathname.match(/^\/_api\/tokens\/(.+)$/);
if (tokMatch) {
const id = decodeURIComponent(tokMatch[1]);
const entry = store.entries.find(e => e.id === id);
Expand All @@ -153,14 +155,14 @@ function handleApiRoutes(clientReq, clientRes) {
return true;
}

if (clientReq.method === 'GET' && clientReq.url === '/_api/stars') {
if (clientReq.method === 'GET' && pathname === '/_api/stars') {
const s = readSettings();
clientRes.writeHead(200, { 'Content-Type': 'application/json' });
clientRes.end(JSON.stringify(serializeStars(s)));
return true;
}

if (clientReq.method === 'POST' && clientReq.url === '/_api/stars') {
if (clientReq.method === 'POST' && pathname === '/_api/stars') {
let body = '';
clientReq.on('data', c => { body += c; });
clientReq.on('end', () => {
Expand Down Expand Up @@ -200,7 +202,7 @@ function handleApiRoutes(clientReq, clientRes) {
return true;
}

if (clientReq.method === 'POST' && clientReq.url === '/_api/settings')
if (clientReq.method === 'POST' && pathname === '/_api/settings')
{
let body = '';
clientReq.on('data', c => { body += c; });
Expand Down
10 changes: 6 additions & 4 deletions server/routes/costs.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ function sendLoadingOrData(clientRes, dataFn) {
}

function handleCostRoutes(clientReq, clientRes) {
if (clientReq.url === '/_api/costs/current-block') {
const pathname = clientReq.url.split('?')[0];

if (pathname === '/_api/costs/current-block') {
sendLoadingOrData(clientRes, data => {
const now = Date.now();
const activeBlock = data.blocks.find(b => b.isActive);
Expand Down Expand Up @@ -84,15 +86,15 @@ function handleCostRoutes(clientReq, clientRes) {
return true;
}

if (clientReq.url === '/_api/costs/daily') {
if (pathname === '/_api/costs/daily') {
sendLoadingOrData(clientRes, data => {
clientRes.writeHead(200, { 'Content-Type': 'application/json' });
clientRes.end(JSON.stringify(data.daily));
});
return true;
}

if (clientReq.url === '/_api/costs/monthly') {
if (pathname === '/_api/costs/monthly') {
sendLoadingOrData(clientRes, data => {
const now = new Date();
const currentMonth = `${now.getFullYear()}-${String(now.getMonth() + 1).padStart(2, '0')}`;
Expand All @@ -103,7 +105,7 @@ function handleCostRoutes(clientReq, clientRes) {
return true;
}

if (clientReq.url === '/_api/pricing') {
if (pathname === '/_api/pricing') {
const result = {};
for (const [model, rates] of Object.entries(pricingTable)) {
result[model] = {
Expand Down
10 changes: 6 additions & 4 deletions server/routes/intercept.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ const { broadcastInterceptToggle, broadcastInterceptRemoved, broadcastSessionSta
const { forwardRequest } = require('../forward');

function handleInterceptRoutes(clientReq, clientRes) {
if (clientReq.url === '/_api/intercept/toggle' && clientReq.method === 'POST') {
const pathname = clientReq.url.split('?')[0];

if (pathname === '/_api/intercept/toggle' && clientReq.method === 'POST') {
const chunks = []; clientReq.on('data', c => chunks.push(c));
clientReq.on('end', () => {
try {
Expand All @@ -23,7 +25,7 @@ function handleInterceptRoutes(clientReq, clientRes) {
return true;
}

const approveMatch = clientReq.url.match(/^\/_api\/intercept\/(.+)\/approve$/);
const approveMatch = pathname.match(/^\/_api\/intercept\/(.+)\/approve$/);
if (approveMatch && clientReq.method === 'POST') {
const reqId = decodeURIComponent(approveMatch[1]);
const pending = store.pendingRequests.get(reqId);
Expand All @@ -48,7 +50,7 @@ function handleInterceptRoutes(clientReq, clientRes) {
return true;
}

const rejectMatch = clientReq.url.match(/^\/_api\/intercept\/(.+)\/reject$/);
const rejectMatch = pathname.match(/^\/_api\/intercept\/(.+)\/reject$/);
if (rejectMatch && clientReq.method === 'POST') {
const reqId = decodeURIComponent(rejectMatch[1]);
const pending = store.pendingRequests.get(reqId);
Expand All @@ -70,7 +72,7 @@ function handleInterceptRoutes(clientReq, clientRes) {
return true;
}

if (clientReq.url === '/_api/intercept/timeout' && clientReq.method === 'POST') {
if (pathname === '/_api/intercept/timeout' && clientReq.method === 'POST') {
const chunks = []; clientReq.on('data', c => chunks.push(c));
clientReq.on('end', () => {
try {
Expand Down
3 changes: 2 additions & 1 deletion server/routes/sse.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
const store = require('../store');

function handleSSERoute(clientReq, clientRes) {
if (clientReq.url !== '/_events') return false;
const pathname = clientReq.url.split('?')[0];
if (pathname !== '/_events') return false;

clientRes.writeHead(200, {
'Content-Type': 'text/event-stream',
Expand Down
34 changes: 32 additions & 2 deletions server/storage/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,32 @@

const { createLocalStorage } = require('./local');

// Wraps a storage adapter so every async write is tracked in an in-flight Set.
// drain() awaits all pending writes — used on shutdown so process.exit doesn't
// kill the event loop while fs.writeFile is mid-flight, leaving 0-byte files.
// Loops until the set is empty in case a tracked promise spawns another write.
function withWriteTracking(adapter) {
const pending = new Set();
const track = (promise) => {
pending.add(promise);
const cleanup = () => pending.delete(promise);
promise.then(cleanup, cleanup);
return promise;
};
return {
...adapter,
write: (id, suffix, data) => track(adapter.write(id, suffix, data)),
appendIndex: (line) => track(adapter.appendIndex(line)),
writeSharedIfAbsent: (filename, data) => track(adapter.writeSharedIfAbsent(filename, data)),
deleteFile: (filename) => track(adapter.deleteFile(filename)),
drain: async () => {
while (pending.size > 0) {
await Promise.allSettled([...pending]);
}
},
};
}

/**
* Create the appropriate storage adapter based on STORAGE_BACKEND env var.
*
Expand All @@ -13,24 +39,28 @@ const { createLocalStorage } = require('./local');
function createStorage() {
const backend = (process.env.STORAGE_BACKEND || 'local').toLowerCase();

let adapter;
switch (backend) {
case 's3': {
const { createS3Storage } = require('./s3');
return createS3Storage({
adapter = createS3Storage({
bucket: process.env.S3_BUCKET,
region: process.env.S3_REGION || 'auto',
endpoint: process.env.S3_ENDPOINT || undefined,
prefix: process.env.S3_PREFIX || 'logs/',
});
break;
}
case 'local':
default: {
const path = require('path');
const os = require('os');
const logsDir = process.env.LOGS_DIR || path.join(process.env.CCXRAY_HOME || path.join(os.homedir(), '.ccxray'), 'logs');
return createLocalStorage(logsDir);
adapter = createLocalStorage(logsDir);
break;
}
}
return withWriteTracking(adapter);
}

module.exports = { createStorage };
Loading
Loading