Postgres drainer: Valkey Stream → billing_event (system of record)#3
Open
hhuuggoo wants to merge 3 commits into
Open
Postgres drainer: Valkey Stream → billing_event (system of record)#3hhuuggoo wants to merge 3 commits into
hhuuggoo wants to merge 3 commits into
Conversation
Capture the token / API-key identity (X-Saturn-Auth-Id, the JWT sub / IdentityAuth.id) plus every other X-Saturn-* header verbatim, and record them on the metering Event. AuthID is the primary attribution key; org/user/group are resolved downstream (out of band, at rating time) from the IdentityAuth record, so the hot path never resolves a multi-org user's active-org context. - identity: add HeaderAuthID + Identity.AuthID; FromRequest reads it. - metering.Event: add AuthID, ResourceID, ResourceType (all omitempty) so no identity the edge gave us is dropped; reorganize fields with clearer docs. - proxy.emit: populate the new fields. - tests: identity capture (all-headers + all-empty), e2e proxy asserts auth_id + resource_type flow into the event. Security: Phoebe blindly trusts these headers (it does not authenticate). That is safe only because Traefik's atlas-auth ForwardAuth allowlist overwrites any client-supplied X-Saturn-* copy. X-Saturn-Auth-Id is added to that allowlist in saturn-k8s#976, and emitted by auth-server#85 — both must be live before this header is relied on for billing. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
A billing product must not serve traffic it can't attribute. Add an early billing-identity gate in handleProxy that rejects with 400 if X-Saturn-Auth-Id (the token / API-key attribution key) or X-Saturn-Resource-Id (the model being billed) is absent. The error names every missing field so a header misconfiguration — auth-server not emitting it, or Traefik not allowlisting it — is immediately obvious. This replaces the prior lone resource-id check with missingBillingFields(), which reports all missing required fields at once. UserID/GroupID are NOT required here — they're resolved downstream from AuthID. A rejected request emits no billing event (verified). Tests cover each missing combination, the both-present happy path, and updated existing proxy/abort tests to supply auth-id. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The interceptor's emitter XADDs metering events to a Valkey Stream as the hot buffer. This adds the consumer side of that durability ladder: a standalone service that reads the same stream via a consumer group and writes each event durably to Postgres, the system of record for raw, pre-rating token counts. internal/drain: - Drainer.Run drives XREADGROUP in batches; XACK happens ONLY after the Postgres write commits (at-least-once delivery). XGROUP CREATE MKSTREAM at "0" so a fresh group drains existing backlog. XAUTOCLAIM reclaims entries stranded by a dead consumer. Malformed entries are ACK'd and dropped so a poison pill can't wedge the group. - Store interface (Upsert/Ping/Close) is the mockable seam. PostgresStore does a single parameterised multi-row INSERT ... ON CONFLICT (request_id) DO NOTHING per batch, in one transaction. At-least-once + idempotent upsert = effectively-once. Empty identity strings bind NULL so billing GROUP BY auth_id never gets a spurious "" bucket. - Config mirrors emit.Config's wire-at-call-site pattern (no shared config dependency). database/sql + pgx stdlib driver; pool 90/20, Ping on start. cmd/drainer: -f YAML settings (Valkey/group knobs) + DATABASE_URL env (Atlas convention) for Postgres. SIGTERM -> finish in-flight batch, exit clean; un-ACK'd work redelivers safely. migrations: the table lives in the shared Atlas Postgres, owned by the Alembic chain — the drainer does NOT migrate at startup. Ships the schema as reviewable DDL (0001_billing_event.sql) plus a ready-to-copy Alembic file (atlas/b1f0c2d3e4a5_add_billing_event.py, down_revision = current Atlas head c1d2e3f4a5b6) following Atlas pk_/ix naming. README explains the single-migration-system rationale. Tests (11, race-clean): batch consume -> store -> ACK; idempotent redelivery (same request_id -> one row); store failure -> no ACK (redelivers); graceful shutdown; XAUTOCLAIM reclaim; poison-pill drop; sqlmock SQL-shape + NULL-binding assertions. miniredis for Valkey, fake Store + sqlmock for Postgres — no real Postgres required. Deps (both go 1.23-compatible, go directive unchanged at 1.23.0): jackc/pgx/v5 v5.7.5 (latest v5 whose go directive is still 1.23.0; v5.10 needs go 1.25), DATA-DOG/go-sqlmock v1.5.2 (go 1.15). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This was referenced Jun 7, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What
A standalone drainer service (
cmd/drainer+internal/drain) — the consumer side of the metering durability ladder. The interceptor XADDs metering events to a Valkey Stream (hot buffer); the drainer reads that stream via a consumer group and writes each event durably to Postgres (the system of record, raw pre-rating counts).This completes the "Valkey + Postgres" durability design from DESIGN.md (#2): Valkey is the hot buffer, Postgres is where the unbilled dollar lives.
Key invariants
request_id(INSERT ... ON CONFLICT DO NOTHING). At-least-once + idempotent = effectively-once.DO NOTHINGbecause an Event is immutable."") soGROUP BY auth_idhas no spurious bucket.Table
billing_eventfollows Atlas conventions exactly (snake_case singular,pk_billing_event,<table>_<cols>_ixindex naming,varchar(32)hex-id columns,timestamptz).request_idPK isvarchar(255)(it's an engine/OpenAI request id, not an atlas hex id). Mirrorsmetering.Event. No collision with the existinghourly_usage_record(that's rated; this is raw counts).Migration ownership (flagged decision — see DESIGN.md §10)
The drainer is Go; Atlas migrations are Alembic. Default taken: no Go migrator. Schema shipped as reviewable DDL (
migrations/0001_billing_event.sql) plus a ready-to-copy Alembic file following Atlas style (migrations/atlas/...), whosedown_revisionis the verified current Atlas head (c1d2e3f4a5b6). The drainer assumes the table exists and fails with a clear error if not.migrations/README.mddocuments the single-migration-system rationale and apply path. Keeps one migration system on the shared Atlas DB.Deps (both go-1.23 compatible; go directive unchanged)
jackc/pgx/v5 v5.7.5— latest v5 whose go.mod is stillgo 1.23(v5.10 bumps to 1.25).DATA-DOG/go-sqlmock v1.5.2; transitiverogpeppe/go-internalpinned to v1.14.1 to keepgo mod tidyfrom bumping the toolchain.Tests
11 tests, race-clean: batch consume→store→ACK; idempotent redelivery; store-failure→no-ACK→redeliver; graceful shutdown; XAUTOCLAIM reclaim; poison-pill drop; SQL shape + NULL-binding + rollback (miniredis + sqlmock, no real Postgres/Valkey needed).