Skip to content

Postgres drainer: Valkey Stream → billing_event (system of record)#3

Open
hhuuggoo wants to merge 3 commits into
mainfrom
postgres-drainer
Open

Postgres drainer: Valkey Stream → billing_event (system of record)#3
hhuuggoo wants to merge 3 commits into
mainfrom
postgres-drainer

Conversation

@hhuuggoo
Copy link
Copy Markdown
Contributor

@hhuuggoo hhuuggoo commented Jun 7, 2026

What

A standalone drainer service (cmd/drainer + internal/drain) — the consumer side of the metering durability ladder. The interceptor XADDs metering events to a Valkey Stream (hot buffer); the drainer reads that stream via a consumer group and writes each event durably to Postgres (the system of record, raw pre-rating counts).

This completes the "Valkey + Postgres" durability design from DESIGN.md (#2): Valkey is the hot buffer, Postgres is where the unbilled dollar lives.

Key invariants

  • XACK only after the Postgres commit — at-least-once delivery; crash-before-ACK redelivers.
  • Idempotent upsert keyed on request_id (INSERT ... ON CONFLICT DO NOTHING). At-least-once + idempotent = effectively-once. DO NOTHING because an Event is immutable.
  • XAUTOCLAIM reclaims entries stranded by a dead consumer; poison-pill entries are ACK'd-and-dropped (logged) so they can't wedge the group.
  • Empty identity strings bind NULL (not "") so GROUP BY auth_id has no spurious bucket.

Table

billing_event follows Atlas conventions exactly (snake_case singular, pk_billing_event, <table>_<cols>_ix index naming, varchar(32) hex-id columns, timestamptz). request_id PK is varchar(255) (it's an engine/OpenAI request id, not an atlas hex id). Mirrors metering.Event. No collision with the existing hourly_usage_record (that's rated; this is raw counts).

Migration ownership (flagged decision — see DESIGN.md §10)

The drainer is Go; Atlas migrations are Alembic. Default taken: no Go migrator. Schema shipped as reviewable DDL (migrations/0001_billing_event.sql) plus a ready-to-copy Alembic file following Atlas style (migrations/atlas/...), whose down_revision is the verified current Atlas head (c1d2e3f4a5b6). The drainer assumes the table exists and fails with a clear error if not. migrations/README.md documents the single-migration-system rationale and apply path. Keeps one migration system on the shared Atlas DB.

Deps (both go-1.23 compatible; go directive unchanged)

  • jackc/pgx/v5 v5.7.5 — latest v5 whose go.mod is still go 1.23 (v5.10 bumps to 1.25).
  • DATA-DOG/go-sqlmock v1.5.2; transitive rogpeppe/go-internal pinned to v1.14.1 to keep go mod tidy from bumping the toolchain.

Tests

11 tests, race-clean: batch consume→store→ACK; idempotent redelivery; store-failure→no-ACK→redeliver; graceful shutdown; XAUTOCLAIM reclaim; poison-pill drop; SQL shape + NULL-binding + rollback (miniredis + sqlmock, no real Postgres/Valkey needed).

hhuuggoo and others added 3 commits June 5, 2026 00:05
Capture the token / API-key identity (X-Saturn-Auth-Id, the JWT sub /
IdentityAuth.id) plus every other X-Saturn-* header verbatim, and record them
on the metering Event. AuthID is the primary attribution key; org/user/group
are resolved downstream (out of band, at rating time) from the IdentityAuth
record, so the hot path never resolves a multi-org user's active-org context.

- identity: add HeaderAuthID + Identity.AuthID; FromRequest reads it.
- metering.Event: add AuthID, ResourceID, ResourceType (all omitempty) so no
  identity the edge gave us is dropped; reorganize fields with clearer docs.
- proxy.emit: populate the new fields.
- tests: identity capture (all-headers + all-empty), e2e proxy asserts
  auth_id + resource_type flow into the event.

Security: Phoebe blindly trusts these headers (it does not authenticate). That
is safe only because Traefik's atlas-auth ForwardAuth allowlist overwrites any
client-supplied X-Saturn-* copy. X-Saturn-Auth-Id is added to that allowlist in
saturn-k8s#976, and emitted by auth-server#85 — both must be live before this
header is relied on for billing.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
A billing product must not serve traffic it can't attribute. Add an early
billing-identity gate in handleProxy that rejects with 400 if X-Saturn-Auth-Id
(the token / API-key attribution key) or X-Saturn-Resource-Id (the model being
billed) is absent. The error names every missing field so a header
misconfiguration — auth-server not emitting it, or Traefik not allowlisting it
— is immediately obvious.

This replaces the prior lone resource-id check with missingBillingFields(),
which reports all missing required fields at once. UserID/GroupID are NOT
required here — they're resolved downstream from AuthID.

A rejected request emits no billing event (verified). Tests cover each missing
combination, the both-present happy path, and updated existing proxy/abort
tests to supply auth-id.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The interceptor's emitter XADDs metering events to a Valkey Stream as the
hot buffer. This adds the consumer side of that durability ladder: a
standalone service that reads the same stream via a consumer group and
writes each event durably to Postgres, the system of record for raw,
pre-rating token counts.

internal/drain:
- Drainer.Run drives XREADGROUP in batches; XACK happens ONLY after the
  Postgres write commits (at-least-once delivery). XGROUP CREATE MKSTREAM
  at "0" so a fresh group drains existing backlog. XAUTOCLAIM reclaims
  entries stranded by a dead consumer. Malformed entries are ACK'd and
  dropped so a poison pill can't wedge the group.
- Store interface (Upsert/Ping/Close) is the mockable seam. PostgresStore
  does a single parameterised multi-row INSERT ... ON CONFLICT
  (request_id) DO NOTHING per batch, in one transaction. At-least-once +
  idempotent upsert = effectively-once. Empty identity strings bind NULL
  so billing GROUP BY auth_id never gets a spurious "" bucket.
- Config mirrors emit.Config's wire-at-call-site pattern (no shared config
  dependency). database/sql + pgx stdlib driver; pool 90/20, Ping on start.

cmd/drainer: -f YAML settings (Valkey/group knobs) + DATABASE_URL env
(Atlas convention) for Postgres. SIGTERM -> finish in-flight batch, exit
clean; un-ACK'd work redelivers safely.

migrations: the table lives in the shared Atlas Postgres, owned by the
Alembic chain — the drainer does NOT migrate at startup. Ships the schema
as reviewable DDL (0001_billing_event.sql) plus a ready-to-copy Alembic
file (atlas/b1f0c2d3e4a5_add_billing_event.py, down_revision = current
Atlas head c1d2e3f4a5b6) following Atlas pk_/ix naming. README explains
the single-migration-system rationale.

Tests (11, race-clean): batch consume -> store -> ACK; idempotent
redelivery (same request_id -> one row); store failure -> no ACK
(redelivers); graceful shutdown; XAUTOCLAIM reclaim; poison-pill drop;
sqlmock SQL-shape + NULL-binding assertions. miniredis for Valkey, fake
Store + sqlmock for Postgres — no real Postgres required.

Deps (both go 1.23-compatible, go directive unchanged at 1.23.0):
jackc/pgx/v5 v5.7.5 (latest v5 whose go directive is still 1.23.0; v5.10
needs go 1.25), DATA-DOG/go-sqlmock v1.5.2 (go 1.15).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant