From efeec8a34a8f9b085309682efbcc6a79af1359b9 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Tue, 20 Jan 2026 21:18:24 +0700 Subject: [PATCH 01/13] feat(config): add ClickHouse settings to startup log output Adds clickhouse_configured, clickhouse_addr, clickhouse_database, clickhouse_password_configured, and clickhouse_tls_enabled to the configuration summary logged at startup for easier debugging. Co-Authored-By: Claude Opus 4.5 --- internal/config/logging.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/internal/config/logging.go b/internal/config/logging.go index e877eb77..408baf5c 100644 --- a/internal/config/logging.go +++ b/internal/config/logging.go @@ -56,6 +56,13 @@ func (c *Config) LogConfigurationSummary() []zap.Field { zap.Bool("postgres_configured", c.PostgresURL != ""), zap.String("postgres_host", maskPostgresURLHost(c.PostgresURL)), + // ClickHouse + zap.Bool("clickhouse_configured", c.ClickHouse.Addr != ""), + zap.String("clickhouse_addr", c.ClickHouse.Addr), + zap.String("clickhouse_database", c.ClickHouse.Database), + zap.Bool("clickhouse_password_configured", c.ClickHouse.Password != ""), + zap.Bool("clickhouse_tls_enabled", c.ClickHouse.TLSEnabled), + // Message Queue zap.String("mq_type", c.MQs.GetInfraType()), From a2ac297111bfa44af0fa0015a167be1ae5394777 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Tue, 20 Jan 2026 21:18:33 +0700 Subject: [PATCH 02/13] feat(clickhouse): add TLS support for ClickHouse connections Adds CLICKHOUSE_TLS_ENABLED env var to enable TLS connections, required for ClickHouse Cloud which only accepts secure connections on port 9440. Co-Authored-By: Claude Opus 4.5 --- internal/clickhouse/clickhouse.go | 21 +++++++++++++++------ internal/config/config.go | 19 +++++++++++-------- 2 files changed, 26 insertions(+), 14 deletions(-) diff --git a/internal/clickhouse/clickhouse.go b/internal/clickhouse/clickhouse.go index 68e8b6b0..067fbf00 100644 --- a/internal/clickhouse/clickhouse.go +++ b/internal/clickhouse/clickhouse.go @@ -1,6 +1,8 @@ package clickhouse import ( + "crypto/tls" + "github.com/ClickHouse/clickhouse-go/v2" chdriver "github.com/ClickHouse/clickhouse-go/v2" "github.com/ClickHouse/clickhouse-go/v2/lib/driver" @@ -12,14 +14,15 @@ type ( ) type ClickHouseConfig struct { - Addr string - Username string - Password string - Database string + Addr string + Username string + Password string + Database string + TLSEnabled bool } func New(config *ClickHouseConfig) (DB, error) { - conn, err := clickhouse.Open(&clickhouse.Options{ + opts := &clickhouse.Options{ Addr: []string{config.Addr}, Auth: clickhouse.Auth{ Database: config.Database, @@ -31,6 +34,12 @@ func New(config *ClickHouseConfig) (DB, error) { // Debugf: func(format string, v ...any) { // fmt.Printf(format+"\n", v...) // }, - }) + } + + if config.TLSEnabled { + opts.TLS = &tls.Config{} + } + + conn, err := clickhouse.Open(opts) return conn, err } diff --git a/internal/config/config.go b/internal/config/config.go index 1b7eac20..98c5a86a 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -380,10 +380,11 @@ func (c *RedisConfig) ToConfig() *redis.RedisConfig { } type ClickHouseConfig struct { - Addr string `yaml:"addr" env:"CLICKHOUSE_ADDR" desc:"Address (host:port) of the ClickHouse server. Example: 'localhost:9000'." required:"N"` - Username string `yaml:"username" env:"CLICKHOUSE_USERNAME" desc:"Username for ClickHouse authentication." required:"N"` - Password string `yaml:"password" env:"CLICKHOUSE_PASSWORD" desc:"Password for ClickHouse authentication." required:"N"` - Database string `yaml:"database" env:"CLICKHOUSE_DATABASE" desc:"Database name in ClickHouse to use." required:"N"` + Addr string `yaml:"addr" env:"CLICKHOUSE_ADDR" desc:"Address (host:port) of the ClickHouse server. Example: 'localhost:9000' or 'host.clickhouse.cloud:9440' for ClickHouse Cloud." required:"N"` + Username string `yaml:"username" env:"CLICKHOUSE_USERNAME" desc:"Username for ClickHouse authentication." required:"N"` + Password string `yaml:"password" env:"CLICKHOUSE_PASSWORD" desc:"Password for ClickHouse authentication." required:"N"` + Database string `yaml:"database" env:"CLICKHOUSE_DATABASE" desc:"Database name in ClickHouse to use." required:"N"` + TLSEnabled bool `yaml:"tls_enabled" env:"CLICKHOUSE_TLS_ENABLED" desc:"Enable TLS for ClickHouse connection." required:"N"` } func (c *ClickHouseConfig) ToConfig() *clickhouse.ClickHouseConfig { @@ -391,10 +392,11 @@ func (c *ClickHouseConfig) ToConfig() *clickhouse.ClickHouseConfig { return nil } return &clickhouse.ClickHouseConfig{ - Addr: c.Addr, - Username: c.Username, - Password: c.Password, - Database: c.Database, + Addr: c.Addr, + Username: c.Username, + Password: c.Password, + Database: c.Database, + TLSEnabled: c.TLSEnabled, } } @@ -478,6 +480,7 @@ func (c *Config) ToMigratorOpts() migrator.MigrationOpts { Password: c.ClickHouse.Password, Database: c.ClickHouse.Database, DeploymentID: c.DeploymentID, + TLSEnabled: c.ClickHouse.TLSEnabled, }, } } From e719bfb6b10e6a197306a72bd5bf1c696b76581a Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Tue, 20 Jan 2026 22:49:37 +0700 Subject: [PATCH 03/13] fix: clickhouse conn url for golang-migrate --- internal/migrator/migrator.go | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/internal/migrator/migrator.go b/internal/migrator/migrator.go index f92997a0..597ad51f 100644 --- a/internal/migrator/migrator.go +++ b/internal/migrator/migrator.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "io/fs" + "net/url" "strings" "time" @@ -143,6 +144,7 @@ type MigrationOptsCH struct { Password string Database string DeploymentID string + TLSEnabled bool } type MigrationOpts struct { @@ -197,11 +199,20 @@ func (opts *MigrationOpts) databaseURL() string { } if opts.CH.Addr != "" { - url := fmt.Sprintf("clickhouse://%s:%s@%s/%s?x-multi-statement=true", opts.CH.Username, opts.CH.Password, opts.CH.Addr, opts.CH.Database) + // clickhouse-go v1 (used by golang-migrate) expects credentials as query params. + // MergeTree engine is used for broader compatibility (TinyLog is not supported everywhere). + connURL := fmt.Sprintf("clickhouse://%s/%s?username=%s&password=%s&x-multi-statement=true&x-migrations-table-engine=MergeTree", + opts.CH.Addr, + opts.CH.Database, + url.QueryEscape(opts.CH.Username), + url.QueryEscape(opts.CH.Password)) + if opts.CH.TLSEnabled { + connURL += "&secure=true" + } if opts.CH.DeploymentID != "" { - url += "&x-migrations-table=" + opts.CH.DeploymentID + "_schema_migrations" + connURL += "&x-migrations-table=" + url.QueryEscape(opts.CH.DeploymentID) + "_schema_migrations" } - return url + return connURL } return "" From 62e69c784727919c5bdd7d3a090f832c15bb70e3 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Tue, 20 Jan 2026 22:54:46 +0700 Subject: [PATCH 04/13] fix: resolve nil pointer panics in error handling and RSMQ - Add nil check in getErrorFields() for ErrorResponse with nil Err field - Add missing Err field to ErrorResponse in tenant list handler - Handle nil RSMQ response as 'no message' instead of error Co-Authored-By: Claude Opus 4.5 --- internal/apirouter/logger_middleware.go | 8 ++++++++ internal/apirouter/tenant_handlers.go | 1 + internal/rsmq/rsmq.go | 11 +++++++++++ 3 files changed, 20 insertions(+) diff --git a/internal/apirouter/logger_middleware.go b/internal/apirouter/logger_middleware.go index be3f72aa..8b92f492 100644 --- a/internal/apirouter/logger_middleware.go +++ b/internal/apirouter/logger_middleware.go @@ -143,6 +143,14 @@ func getErrorFields(err error) []zap.Field { originalErr = err } + // Handle nil error case (e.g., ErrorResponse with nil Err field) + if originalErr == nil { + return []zap.Field{ + zap.String("error", "unknown error"), + zap.String("error_type", "nil"), + } + } + fields := []zap.Field{ zap.String("error", originalErr.Error()), zap.String("error_type", fmt.Sprintf("%T", originalErr)), diff --git a/internal/apirouter/tenant_handlers.go b/internal/apirouter/tenant_handlers.go index db225d4e..0221c722 100644 --- a/internal/apirouter/tenant_handlers.go +++ b/internal/apirouter/tenant_handlers.go @@ -132,6 +132,7 @@ func (h *TenantHandlers) List(c *gin.Context) { // Map errors to HTTP status codes if errors.Is(err, models.ErrListTenantNotSupported) { AbortWithError(c, http.StatusNotImplemented, ErrorResponse{ + Err: err, Code: http.StatusNotImplemented, Message: err.Error(), }) diff --git a/internal/rsmq/rsmq.go b/internal/rsmq/rsmq.go index f91fb318..653ce797 100644 --- a/internal/rsmq/rsmq.go +++ b/internal/rsmq/rsmq.go @@ -556,8 +556,19 @@ func (rsmq *RedisSMQ) PopMessage(qname string) (*QueueMessage, error) { } func (rsmq *RedisSMQ) createQueueMessage(cmd *redis.Cmd) (*QueueMessage, error) { + // Check for command error first + if err := cmd.Err(); err != nil { + return nil, fmt.Errorf("rsmq command failed: %w", err) + } + val := cmd.Val() + // Handle nil response - some Redis-compatible databases (e.g., Dragonfly) + // may return nil instead of empty array when no message is available + if val == nil { + return nil, nil + } + // Try different type assertions for cluster vs regular client compatibility var vals []any if v, ok := val.([]any); ok { From 67af43e6050f121376a7665889faf7103e65b034 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Tue, 20 Jan 2026 23:42:34 +0700 Subject: [PATCH 05/13] test: fix migrator ch conn url tests --- internal/migrator/migrator_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/internal/migrator/migrator_test.go b/internal/migrator/migrator_test.go index 75aad906..51762911 100644 --- a/internal/migrator/migrator_test.go +++ b/internal/migrator/migrator_test.go @@ -2,6 +2,7 @@ package migrator import ( "context" + "net/url" "strings" "testing" @@ -140,7 +141,8 @@ func TestMigrator_CredentialExposure_Integration(t *testing.T) { } else if tt.opts.CH.Addr != "" { assert.Contains(t, dbURL, "clickhouse://", "Expected ClickHouse URL") - assert.Contains(t, dbURL, tt.opts.CH.Password, + // Password is URL-encoded in the query string + assert.Contains(t, dbURL, url.QueryEscape(tt.opts.CH.Password), "Test setup: password should be in the database URL") } @@ -195,7 +197,7 @@ func TestMigrator_DatabaseURLGeneration(t *testing.T) { Database: "outpost", }, }, - expectedURL: "clickhouse://admin:secret123@localhost:9000/outpost?x-multi-statement=true", + expectedURL: "clickhouse://localhost:9000/outpost?username=admin&password=secret123&x-multi-statement=true&x-migrations-table-engine=MergeTree", hasPassword: true, }, { From bf86d9a1bf1120fda3c5f3399004f68b7d97b37a Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Tue, 20 Jan 2026 23:42:47 +0700 Subject: [PATCH 06/13] chore: air.toml polling --- .air.toml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.air.toml b/.air.toml index b9236004..ff4bf252 100644 --- a/.air.toml +++ b/.air.toml @@ -2,6 +2,8 @@ root = "." tmp_dir = "tmp" [build] + poll = true + poll_interval = 500 cmd = "go build -o tmp/main cmd/outpost/main.go" bin = "tmp/main" delay = 100 From 75b1101429ff641b65f5fd79b0674347250bddc6 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Wed, 21 Jan 2026 00:41:21 +0700 Subject: [PATCH 07/13] fix: portal using new api pagination & error schema --- internal/portal/src/app.tsx | 44 +++++++++++++++++-- .../RetryDeliveryButton.tsx | 10 ++--- .../RetryEventButton/RetryEventButton.tsx | 10 ++--- .../CreateDestination/CreateDestination.tsx | 7 +-- .../DestinationSettings.tsx | 37 ++++------------ .../scenes/Destination/Events/Deliveries.tsx | 27 ++++++------ internal/portal/src/typings/Event.ts | 19 +++++--- 7 files changed, 84 insertions(+), 70 deletions(-) diff --git a/internal/portal/src/app.tsx b/internal/portal/src/app.tsx index 8a53180d..5c775984 100644 --- a/internal/portal/src/app.tsx +++ b/internal/portal/src/app.tsx @@ -17,6 +17,40 @@ type ApiClient = { fetch: (path: string, init?: RequestInit) => Promise; }; +// API error response from the server +export class ApiError extends Error { + status: number; + data?: string[]; + + constructor(message: string, status: number, data?: string[]) { + super(message); + this.name = "ApiError"; + this.status = status; + this.data = data; + } + + // Format error for display - includes validation details if present + toDisplayString(): string { + if (this.data && this.data.length > 0) { + return this.data + .map((d) => d.charAt(0).toUpperCase() + d.slice(1)) + .join(". "); + } + return this.message.charAt(0).toUpperCase() + this.message.slice(1); + } +} + +// Helper to format any error for display +export function formatError(error: unknown): string { + if (error instanceof ApiError) { + return error.toDisplayString(); + } + if (error instanceof Error) { + return error.message.charAt(0).toUpperCase() + error.message.slice(1); + } + return String(error); +} + export const ApiContext = createContext({} as ApiClient); type TenantResponse = { @@ -73,12 +107,16 @@ function AuthenticatedApp({ }, }).then(async (res) => { if (!res.ok) { - let error; + let error: ApiError; try { const data = await res.json(); - error = new Error(data.message); + error = new ApiError( + data.message || res.statusText, + data.status || res.status, + Array.isArray(data.data) ? data.data : undefined, + ); } catch (e) { - error = new Error(res.statusText); + error = new ApiError(res.statusText, res.status); } throw error; } diff --git a/internal/portal/src/common/RetryDeliveryButton/RetryDeliveryButton.tsx b/internal/portal/src/common/RetryDeliveryButton/RetryDeliveryButton.tsx index 288ca32e..8746eaca 100644 --- a/internal/portal/src/common/RetryDeliveryButton/RetryDeliveryButton.tsx +++ b/internal/portal/src/common/RetryDeliveryButton/RetryDeliveryButton.tsx @@ -2,7 +2,7 @@ import React, { useCallback, useContext, useState, MouseEvent } from "react"; import Button from "../Button/Button"; import { ReplayIcon } from "../Icons"; import { showToast } from "../Toast/Toast"; -import { ApiContext } from "../../app"; +import { ApiContext, formatError } from "../../app"; interface RetryDeliveryButtonProps { deliveryId: string; @@ -34,12 +34,8 @@ const RetryDeliveryButton: React.FC = ({ }); showToast("success", "Retry successful."); completed(true); - } catch (error: any) { - showToast( - "error", - "Retry failed. " + - `${error.message.charAt(0).toUpperCase() + error.message.slice(1)}`, - ); + } catch (error: unknown) { + showToast("error", "Retry failed. " + formatError(error)); completed(false); } diff --git a/internal/portal/src/common/RetryEventButton/RetryEventButton.tsx b/internal/portal/src/common/RetryEventButton/RetryEventButton.tsx index d937f667..c6435488 100644 --- a/internal/portal/src/common/RetryEventButton/RetryEventButton.tsx +++ b/internal/portal/src/common/RetryEventButton/RetryEventButton.tsx @@ -2,7 +2,7 @@ import React, { useCallback, useContext, useState, MouseEvent } from "react"; import Button from "../Button/Button"; import { ReplayIcon } from "../Icons"; import { showToast } from "../Toast/Toast"; -import { ApiContext } from "../../app"; +import { ApiContext, formatError } from "../../app"; interface RetryEventButtonProps { eventId: string; @@ -35,12 +35,8 @@ const RetryEventButton: React.FC = ({ ); showToast("success", "Retry successful."); completed(true); - } catch (error: any) { - showToast( - "error", - "Retry failed. " + - `${error.message.charAt(0).toUpperCase() + error.message.slice(1)}`, - ); + } catch (error: unknown) { + showToast("error", "Retry failed. " + formatError(error)); completed(false); } diff --git a/internal/portal/src/scenes/CreateDestination/CreateDestination.tsx b/internal/portal/src/scenes/CreateDestination/CreateDestination.tsx index 0deac6a0..4948f2fd 100644 --- a/internal/portal/src/scenes/CreateDestination/CreateDestination.tsx +++ b/internal/portal/src/scenes/CreateDestination/CreateDestination.tsx @@ -10,7 +10,7 @@ import { import Badge from "../../common/Badge/Badge"; import { useNavigate } from "react-router-dom"; import { useContext, useEffect, useState } from "react"; -import { ApiContext } from "../../app"; +import { ApiContext, formatError } from "../../app"; import { showToast } from "../../common/Toast/Toast"; import useSWR, { mutate } from "swr"; import TopicPicker from "../../common/TopicPicker/TopicPicker"; @@ -337,10 +337,7 @@ export default function CreateDestination() { navigate(`/destinations/${data.id}`); }) .catch((error) => { - showToast( - "error", - `${error.message.charAt(0).toUpperCase() + error.message.slice(1)}`, - ); + showToast("error", formatError(error)); }) .finally(() => { setIsCreating(false); diff --git a/internal/portal/src/scenes/Destination/DestinationSettings/DestinationSettings.tsx b/internal/portal/src/scenes/Destination/DestinationSettings/DestinationSettings.tsx index d3e5fae2..f9c9f07b 100644 --- a/internal/portal/src/scenes/Destination/DestinationSettings/DestinationSettings.tsx +++ b/internal/portal/src/scenes/Destination/DestinationSettings/DestinationSettings.tsx @@ -15,7 +15,7 @@ import { } from "../../../common/Icons"; import { FilterSyntaxGuide } from "../../../common/FilterSyntaxGuide/FilterSyntaxGuide"; import { useSidebar } from "../../../common/Sidebar/Sidebar"; -import { ApiContext } from "../../../app"; +import { ApiContext, formatError } from "../../../app"; import { mutate } from "swr"; import { showToast } from "../../../common/Toast/Toast"; import { @@ -77,10 +77,7 @@ const DestinationSettings = ({ mutate(`destinations/${destination.id}`, data, false); }) .catch((error) => { - showToast( - "error", - `${error.message.charAt(0).toUpperCase() + error.message.slice(1)}`, - ); + showToast("error", formatError(error)); }) .finally(() => { setIsDisabling(false); @@ -102,10 +99,7 @@ const DestinationSettings = ({ mutate(`destinations/${destination.id}`, data, false); }) .catch((error) => { - showToast( - "error", - `${error.message.charAt(0).toUpperCase() + error.message.slice(1)}`, - ); + showToast("error", formatError(error)); }) .finally(() => { setIsTopicsSaving(false); @@ -144,10 +138,7 @@ const DestinationSettings = ({ setIsConfigFormValid(false); }) .catch((error) => { - showToast( - "error", - `${error.message.charAt(0).toUpperCase() + error.message.slice(1)}`, - ); + showToast("error", formatError(error)); }) .finally(() => { setIsConfigSaving(false); @@ -174,10 +165,7 @@ const DestinationSettings = ({ mutate(`destinations/${destination.id}`, data, false); }) .catch((error) => { - showToast( - "error", - `${error.message.charAt(0).toUpperCase() + error.message.slice(1)}`, - ); + showToast("error", formatError(error)); }) .finally(() => { setIsFilterSaving(false); @@ -213,10 +201,7 @@ const DestinationSettings = ({ setFilter(null); }) .catch((error) => { - showToast( - "error", - `${error.message.charAt(0).toUpperCase() + error.message.slice(1)}`, - ); + showToast("error", formatError(error)); }) .finally(() => { setIsFilterSaving(false); @@ -239,10 +224,7 @@ const DestinationSettings = ({ mutate(`destinations/${destination.id}`, data, false); }) .catch((error) => { - showToast( - "error", - `${error.message.charAt(0).toUpperCase() + error.message.slice(1)}`, - ); + showToast("error", formatError(error)); }) .finally(() => { setIsRotatingSecret(false); @@ -275,10 +257,7 @@ const DestinationSettings = ({ navigate("/"); }) .catch((error) => { - showToast( - "error", - `${error.message.charAt(0).toUpperCase() + error.message.slice(1)}`, - ); + showToast("error", formatError(error)); }) .finally(() => { setIsDeleting(false); diff --git a/internal/portal/src/scenes/Destination/Events/Deliveries.tsx b/internal/portal/src/scenes/Destination/Events/Deliveries.tsx index 92b422e5..3cfb8b8b 100644 --- a/internal/portal/src/scenes/Destination/Events/Deliveries.tsx +++ b/internal/portal/src/scenes/Destination/Events/Deliveries.tsx @@ -3,11 +3,7 @@ import Badge from "../../../common/Badge/Badge"; import Button from "../../../common/Button/Button"; import "./Deliveries.scss"; import Table from "../../../common/Table/Table"; -import { - DeliveryListResponse, - Delivery, - EventSummary, -} from "../../../typings/Event"; +import { DeliveryListResponse, EventSummary } from "../../../typings/Event"; import useSWR from "swr"; import Dropdown from "../../../common/Dropdown/Dropdown"; import { @@ -87,8 +83,8 @@ const Deliveries: React.FC = ({ const topicsList = CONFIGS.TOPICS.split(","); - const table_rows = deliveriesList?.data - ? deliveriesList.data.map((delivery) => { + const table_rows = deliveriesList?.models + ? deliveriesList.models.map((delivery) => { const event = typeof delivery.event === "object" ? (delivery.event as EventSummary) @@ -135,7 +131,8 @@ const Deliveries: React.FC = ({

- Deliveries + Deliveries{" "} +

= ({
- {deliveriesList?.data.length ?? 0} deliveries + {deliveriesList?.models.length ?? 0} deliveries
@@ -267,16 +264,20 @@ const Deliveries: React.FC = ({ diff --git a/internal/portal/src/typings/Event.ts b/internal/portal/src/typings/Event.ts index 0dd56bc4..bedf367b 100644 --- a/internal/portal/src/typings/Event.ts +++ b/internal/portal/src/typings/Event.ts @@ -35,16 +35,22 @@ interface Delivery { destination: string; } +interface SeekPagination { + order_by: string; + dir: "asc" | "desc"; + limit: number; + next: string | null; + prev: string | null; +} + interface DeliveryListResponse { - data: Delivery[]; - next?: string; - prev?: string; + models: Delivery[]; + pagination: SeekPagination; } interface EventListResponse { - data: Event[]; - next?: string; - prev?: string; + models: Event[]; + pagination: SeekPagination; } export type { @@ -52,6 +58,7 @@ export type { EventSummary, EventFull, Delivery, + SeekPagination, DeliveryListResponse, EventListResponse, }; From a35347316455af1e934628d3d806e1509573f761 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Wed, 21 Jan 2026 17:17:17 +0700 Subject: [PATCH 08/13] feat: idempotence supports deployment_id --- internal/idempotence/idempotence.go | 43 ++++++--- internal/idempotence/idempotence_test.go | 108 +++++++++++++++++++++++ internal/services/builder.go | 2 + 3 files changed, 142 insertions(+), 11 deletions(-) diff --git a/internal/idempotence/idempotence.go b/internal/idempotence/idempotence.go index 6e6825ec..56469764 100644 --- a/internal/idempotence/idempotence.go +++ b/internal/idempotence/idempotence.go @@ -24,14 +24,16 @@ type Idempotence interface { } type IdempotenceImpl struct { - redisClient redis.Cmdable - options IdempotenceImplOptions - tracer trace.Tracer + redisClient redis.Cmdable + deploymentID string + options IdempotenceImplOptions + tracer trace.Tracer } type IdempotenceImplOptions struct { Timeout time.Duration SuccessfulTTL time.Duration + DeploymentID string } func WithTimeout(timeout time.Duration) func(opts *IdempotenceImplOptions) { @@ -46,6 +48,12 @@ func WithSuccessfulTTL(successfulTTL time.Duration) func(opts *IdempotenceImplOp } } +func WithDeploymentID(deploymentID string) func(opts *IdempotenceImplOptions) { + return func(opts *IdempotenceImplOptions) { + opts.DeploymentID = deploymentID + } +} + func New(redisClient redis.Cmdable, opts ...func(opts *IdempotenceImplOptions)) Idempotence { options := &IdempotenceImplOptions{ Timeout: DefaultTimeout, @@ -57,21 +65,34 @@ func New(redisClient redis.Cmdable, opts ...func(opts *IdempotenceImplOptions)) } return &IdempotenceImpl{ - redisClient: redisClient, - options: *options, - tracer: otel.GetTracerProvider().Tracer("github.com/hookdeck/outpost/internal/idempotence"), + redisClient: redisClient, + deploymentID: options.DeploymentID, + options: *options, + tracer: otel.GetTracerProvider().Tracer("github.com/hookdeck/outpost/internal/idempotence"), } } var _ Idempotence = (*IdempotenceImpl)(nil) +func (i *IdempotenceImpl) deploymentPrefix() string { + if i.deploymentID == "" { + return "" + } + return i.deploymentID + ":" +} + +func (i *IdempotenceImpl) prefixKey(key string) string { + return i.deploymentPrefix() + key +} + func (i *IdempotenceImpl) Exec(ctx context.Context, key string, exec func(context.Context) error) error { - isIdempotent, err := i.checkIdempotency(ctx, key) + prefixedKey := i.prefixKey(key) + isIdempotent, err := i.checkIdempotency(ctx, prefixedKey) if err != nil { return err } if !isIdempotent { - processingStatus, err := i.getIdempotencyStatus(ctx, key) + processingStatus, err := i.getIdempotencyStatus(ctx, prefixedKey) if err != nil { // TODO: Question: // What if err == redis.Nil here? It happens @@ -87,7 +108,7 @@ func (i *IdempotenceImpl) Exec(ctx context.Context, key string, exec func(contex } if processingStatus == StatusProcessing { time.Sleep(i.options.Timeout) - status, err := i.getIdempotencyStatus(ctx, key) + status, err := i.getIdempotencyStatus(ctx, prefixedKey) if err != nil { if err == redis.Nil { // The previous consumer has err-ed and removed the processing key. We should also err @@ -107,7 +128,7 @@ func (i *IdempotenceImpl) Exec(ctx context.Context, key string, exec func(contex execCtx, span := i.tracer.Start(ctx, "Idempotence.Exec") err = exec(execCtx) if err != nil { - clearErr := i.clearIdempotency(ctx, key) + clearErr := i.clearIdempotency(ctx, prefixedKey) if clearErr != nil { finalErr := errors.Join(err, clearErr) span.RecordError(finalErr) @@ -121,7 +142,7 @@ func (i *IdempotenceImpl) Exec(ctx context.Context, key string, exec func(contex span.End() } - err = i.markProcessedIdempotency(ctx, key) + err = i.markProcessedIdempotency(ctx, prefixedKey) if err != nil { // TODO: Question: how to properly handle this error? return err diff --git a/internal/idempotence/idempotence_test.go b/internal/idempotence/idempotence_test.go index ca1422bb..9bc81c05 100644 --- a/internal/idempotence/idempotence_test.go +++ b/internal/idempotence/idempotence_test.go @@ -134,6 +134,114 @@ func TestIdempotence_Success(t *testing.T) { }) } +func TestIdempotence_WithDeploymentID(t *testing.T) { + t.Parallel() + + i := idempotence.New(testutil.CreateTestRedisClient(t), + idempotence.WithTimeout(1*time.Second), + idempotence.WithSuccessfulTTL(24*time.Hour), + idempotence.WithDeploymentID("dp_test_001"), + ) + + t.Run("executes successfully with deployment prefix", func(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + executed := false + err := i.Exec(ctx, "test-key-1", func(ctx context.Context) error { + executed = true + return nil + }) + + assert.NoError(t, err) + assert.True(t, executed, "function should have been executed") + }) + + t.Run("returns nil on duplicate execution with deployment prefix", func(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + key := testutil.RandomString(5) + execCount := 0 + + // First execution + err := i.Exec(ctx, key, func(ctx context.Context) error { + execCount++ + return nil + }) + assert.NoError(t, err) + + // Second execution with same key should be idempotent + err = i.Exec(ctx, key, func(ctx context.Context) error { + execCount++ + return nil + }) + assert.NoError(t, err) + assert.Equal(t, 1, execCount, "function should only execute once") + }) +} + +func TestIdempotenceIsolation(t *testing.T) { + t.Parallel() + + redisClient := testutil.CreateTestRedisClient(t) + + // Create two idempotence instances with different deployment IDs + i1 := idempotence.New(redisClient, + idempotence.WithTimeout(1*time.Second), + idempotence.WithSuccessfulTTL(24*time.Hour), + idempotence.WithDeploymentID("dp_001"), + ) + i2 := idempotence.New(redisClient, + idempotence.WithTimeout(1*time.Second), + idempotence.WithSuccessfulTTL(24*time.Hour), + idempotence.WithDeploymentID("dp_002"), + ) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + // Use the same key for both + sharedKey := "shared-idempotency-key" + + exec1Count := 0 + exec2Count := 0 + + // Execute on instance 1 + err := i1.Exec(ctx, sharedKey, func(ctx context.Context) error { + exec1Count++ + return nil + }) + require.NoError(t, err) + assert.Equal(t, 1, exec1Count) + + // Execute on instance 2 with same key - should execute (different deployment) + err = i2.Exec(ctx, sharedKey, func(ctx context.Context) error { + exec2Count++ + return nil + }) + require.NoError(t, err) + assert.Equal(t, 1, exec2Count, "Instance 2 should execute independently") + + // Execute on instance 1 again - should be idempotent + err = i1.Exec(ctx, sharedKey, func(ctx context.Context) error { + exec1Count++ + return nil + }) + require.NoError(t, err) + assert.Equal(t, 1, exec1Count, "Instance 1 should remain idempotent") + + // Execute on instance 2 again - should be idempotent + err = i2.Exec(ctx, sharedKey, func(ctx context.Context) error { + exec2Count++ + return nil + }) + require.NoError(t, err) + assert.Equal(t, 1, exec2Count, "Instance 2 should remain idempotent") +} + func TestIdempotence_Failure(t *testing.T) { t.Parallel() diff --git a/internal/services/builder.go b/internal/services/builder.go index d782ab06..8b3809c2 100644 --- a/internal/services/builder.go +++ b/internal/services/builder.go @@ -189,6 +189,7 @@ func (b *ServiceBuilder) BuildAPIWorkers(baseRouter *gin.Engine) error { publishIdempotence := idempotence.New(svc.redisClient, idempotence.WithTimeout(5*time.Second), idempotence.WithSuccessfulTTL(time.Duration(b.cfg.PublishIdempotencyKeyTTL)*time.Second), + idempotence.WithDeploymentID(b.cfg.DeploymentID), ) eventHandler := publishmq.NewEventHandler(b.logger, svc.deliveryMQ, svc.entityStore, svc.eventTracer, b.cfg.Topics, publishIdempotence) @@ -298,6 +299,7 @@ func (b *ServiceBuilder) BuildDeliveryWorker(baseRouter *gin.Engine) error { deliveryIdempotence := idempotence.New(svc.redisClient, idempotence.WithTimeout(5*time.Second), idempotence.WithSuccessfulTTL(time.Duration(b.cfg.DeliveryIdempotencyKeyTTL)*time.Second), + idempotence.WithDeploymentID(b.cfg.DeploymentID), ) // Get retry configuration From 2cf53cc0b770409e2d1992f1e4dc8937bc894a2b Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Wed, 21 Jan 2026 19:59:36 +0700 Subject: [PATCH 09/13] chore: add infra-related logs during startup --- cmd/e2e/configs/basic.go | 7 ++- internal/app/app.go | 6 ++- internal/infra/infra.go | 84 ++++++++++++++++++++++++++++++------ internal/infra/infra_test.go | 5 ++- 4 files changed, 86 insertions(+), 16 deletions(-) diff --git a/cmd/e2e/configs/basic.go b/cmd/e2e/configs/basic.go index bf6629d5..4e2c4bf8 100644 --- a/cmd/e2e/configs/basic.go +++ b/cmd/e2e/configs/basic.go @@ -11,6 +11,7 @@ import ( "github.com/hookdeck/outpost/internal/config" "github.com/hookdeck/outpost/internal/idgen" "github.com/hookdeck/outpost/internal/infra" + "github.com/hookdeck/outpost/internal/logging" "github.com/hookdeck/outpost/internal/redis" "github.com/hookdeck/outpost/internal/util/testinfra" "github.com/hookdeck/outpost/internal/util/testutil" @@ -97,10 +98,14 @@ func Basic(t *testing.T, opts BasicOpts) config.Config { if err != nil { log.Println("Failed to create redis client:", err) } + logger, err := logging.NewLogger(logging.WithLogLevel("warn")) + if err != nil { + log.Println("Failed to create logger:", err) + } outpostInfra := infra.NewInfra(infra.Config{ DeliveryMQ: c.MQs.ToInfraConfig("deliverymq"), LogMQ: c.MQs.ToInfraConfig("logmq"), - }, redisClient) + }, redisClient, logger, c.MQs.GetInfraType()) if err := outpostInfra.Teardown(context.Background()); err != nil { log.Println("Teardown failed:", err) } diff --git a/internal/app/app.go b/internal/app/app.go index c547b80c..7d0ced4f 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -3,6 +3,7 @@ package app import ( "context" "errors" + "fmt" "os" "os/signal" "syscall" @@ -49,7 +50,7 @@ func (a *App) Run(ctx context.Context) error { } // PreRun initializes all dependencies before starting the application -func (a *App) PreRun(ctx context.Context) error { +func (a *App) PreRun(ctx context.Context) (err error) { if err := a.setupLogger(); err != nil { return err } @@ -57,6 +58,7 @@ func (a *App) PreRun(ctx context.Context) error { defer func() { if r := recover(); r != nil { a.logger.Error("panic during PreRun", zap.Any("panic", r)) + err = fmt.Errorf("panic during PreRun: %v", r) } }() @@ -217,7 +219,7 @@ func (a *App) initializeInfrastructure(ctx context.Context) error { DeliveryMQ: a.config.MQs.ToInfraConfig("deliverymq"), LogMQ: a.config.MQs.ToInfraConfig("logmq"), AutoProvision: a.config.MQs.AutoProvision, - }, a.redisClient); err != nil { + }, a.redisClient, a.logger, a.config.MQs.GetInfraType()); err != nil { a.logger.Error("infrastructure initialization failed", zap.Error(err)) return err } diff --git a/internal/infra/infra.go b/internal/infra/infra.go index 92412608..354232c9 100644 --- a/internal/infra/infra.go +++ b/internal/infra/infra.go @@ -6,16 +6,18 @@ import ( "fmt" "time" + "github.com/hookdeck/outpost/internal/logging" "github.com/hookdeck/outpost/internal/mqinfra" "github.com/hookdeck/outpost/internal/redis" "github.com/hookdeck/outpost/internal/redislock" + "go.uber.org/zap" ) const ( lockKey = "outpost:lock" lockAttempts = 5 lockDelay = 5 * time.Second - lockTTL = 10 * time.Second + lockTTL = 60 * time.Second // 1 minute to allow for slow cloud provider operations ) var ( @@ -27,6 +29,7 @@ type Infra struct { lock redislock.Lock provider InfraProvider shouldManage bool + logger *logging.Logger } // InfraProvider handles the actual infrastructure operations @@ -51,32 +54,58 @@ func (cfg *Config) SetSensiblePolicyDefaults() { type infraProvider struct { deliveryMQ mqinfra.MQInfra logMQ mqinfra.MQInfra + logger *logging.Logger + mqType string } func (p *infraProvider) Exist(ctx context.Context) (bool, error) { + p.logger.Debug("checking if deliverymq infrastructure exists", zap.String("mq_type", p.mqType)) if exists, err := p.deliveryMQ.Exist(ctx); err != nil { return false, err } else if !exists { + p.logger.Debug("deliverymq infrastructure does not exist", zap.String("mq_type", p.mqType)) return false, nil } + p.logger.Debug("deliverymq infrastructure exists", zap.String("mq_type", p.mqType)) + p.logger.Debug("checking if logmq infrastructure exists", zap.String("mq_type", p.mqType)) if exists, err := p.logMQ.Exist(ctx); err != nil { return false, err } else if !exists { + p.logger.Debug("logmq infrastructure does not exist", zap.String("mq_type", p.mqType)) return false, nil } + p.logger.Debug("logmq infrastructure exists", zap.String("mq_type", p.mqType)) return true, nil } func (p *infraProvider) Declare(ctx context.Context) error { + p.logger.Info("declaring deliverymq infrastructure", zap.String("mq_type", p.mqType)) + start := time.Now() if err := p.deliveryMQ.Declare(ctx); err != nil { + p.logger.Error("failed to declare deliverymq infrastructure", + zap.String("mq_type", p.mqType), + zap.Duration("duration", time.Since(start)), + zap.Error(err)) return err } + p.logger.Info("deliverymq infrastructure declared", + zap.String("mq_type", p.mqType), + zap.Duration("duration", time.Since(start))) + p.logger.Info("declaring logmq infrastructure", zap.String("mq_type", p.mqType)) + start = time.Now() if err := p.logMQ.Declare(ctx); err != nil { + p.logger.Error("failed to declare logmq infrastructure", + zap.String("mq_type", p.mqType), + zap.Duration("duration", time.Since(start)), + zap.Error(err)) return err } + p.logger.Info("logmq infrastructure declared", + zap.String("mq_type", p.mqType), + zap.Duration("duration", time.Since(start))) return nil } @@ -93,12 +122,14 @@ func (p *infraProvider) Teardown(ctx context.Context) error { return nil } -func NewInfra(cfg Config, redisClient redis.Cmdable) Infra { +func NewInfra(cfg Config, redisClient redis.Cmdable, logger *logging.Logger, mqType string) Infra { cfg.SetSensiblePolicyDefaults() provider := &infraProvider{ deliveryMQ: mqinfra.New(cfg.DeliveryMQ), logMQ: mqinfra.New(cfg.LogMQ), + logger: logger, + mqType: mqType, } // Default shouldManage to true if not set (backward compatible) @@ -111,20 +142,26 @@ func NewInfra(cfg Config, redisClient redis.Cmdable) Infra { lock: redislock.New(redisClient, redislock.WithKey(lockKey), redislock.WithTTL(lockTTL)), provider: provider, shouldManage: shouldManage, + logger: logger, } } // Init initializes and verifies infrastructure based on configuration. // If AutoProvision is true (default), it will create infrastructure if needed. // If AutoProvision is false, it will only verify infrastructure exists. -func Init(ctx context.Context, cfg Config, redisClient redis.Cmdable) error { - infra := NewInfra(cfg, redisClient) +func Init(ctx context.Context, cfg Config, redisClient redis.Cmdable, logger *logging.Logger, mqType string) error { + infra := NewInfra(cfg, redisClient, logger, mqType) + + logger.Info("initializing mq infrastructure", + zap.String("mq_type", mqType), + zap.Bool("auto_provision", infra.shouldManage)) if infra.shouldManage { return infra.Declare(ctx) } // shouldManage is false, only verify existence + logger.Debug("auto_provision disabled, verifying infrastructure exists", zap.String("mq_type", mqType)) exists, err := infra.provider.Exist(ctx) if err != nil { return fmt.Errorf("failed to verify infrastructure exists: %w", err) @@ -132,38 +169,56 @@ func Init(ctx context.Context, cfg Config, redisClient redis.Cmdable) error { if !exists { return ErrInfraNotFound } + logger.Info("mq infrastructure verified", zap.String("mq_type", mqType)) return nil } // NewInfraWithProvider creates an Infra instance with custom lock and provider (for testing) -func NewInfraWithProvider(lock redislock.Lock, provider InfraProvider, shouldManage bool) *Infra { +func NewInfraWithProvider(lock redislock.Lock, provider InfraProvider, shouldManage bool, logger *logging.Logger) *Infra { return &Infra{ lock: lock, provider: provider, shouldManage: shouldManage, + logger: logger, } } func (infra *Infra) Declare(ctx context.Context) error { for attempt := 0; attempt < lockAttempts; attempt++ { + infra.logger.Debug("checking if infrastructure declaration needed", + zap.Int("attempt", attempt+1), + zap.Int("max_attempts", lockAttempts)) + shouldDeclare, hasLocked, err := infra.shouldDeclareAndAcquireLock(ctx) if err != nil { return err } if !shouldDeclare { + infra.logger.Info("infrastructure already exists, skipping declaration") return nil } if hasLocked { + infra.logger.Info("acquired infrastructure lock, declaring infrastructure", + zap.Duration("lock_ttl", lockTTL)) + // We got the lock, declare infrastructure + declareStart := time.Now() defer func() { - // TODO: improve error handling - unlocked, err := infra.lock.Unlock(ctx) - if err != nil { - panic(err) - } - if !unlocked { - panic("failed to unlock lock") + // Best-effort unlock. If it fails (e.g., lock expired due to slow + // infrastructure provisioning), that's acceptable - the lock will + // expire on its own and the infrastructure was still declared successfully. + unlocked, unlockErr := infra.lock.Unlock(ctx) + if unlockErr != nil { + infra.logger.Warn("failed to unlock infrastructure lock", + zap.Error(unlockErr), + zap.Duration("declaration_duration", time.Since(declareStart))) + } else if !unlocked { + infra.logger.Warn("infrastructure lock already expired before unlock", + zap.Duration("declaration_duration", time.Since(declareStart)), + zap.Duration("lock_ttl", lockTTL)) + } else { + infra.logger.Debug("infrastructure lock released") } }() @@ -171,10 +226,15 @@ func (infra *Infra) Declare(ctx context.Context) error { return err } + infra.logger.Info("infrastructure declaration completed", + zap.Duration("duration", time.Since(declareStart))) return nil } // We didn't get the lock, wait before retry + infra.logger.Debug("infrastructure lock held by another instance, waiting", + zap.Duration("delay", lockDelay), + zap.Int("attempt", attempt+1)) if attempt < lockAttempts-1 { time.Sleep(lockDelay) } diff --git a/internal/infra/infra_test.go b/internal/infra/infra_test.go index 205df314..b488adcb 100644 --- a/internal/infra/infra_test.go +++ b/internal/infra/infra_test.go @@ -11,6 +11,7 @@ import ( "github.com/alicebob/miniredis/v2" "github.com/hookdeck/outpost/internal/idgen" "github.com/hookdeck/outpost/internal/infra" + "github.com/hookdeck/outpost/internal/logging" "github.com/hookdeck/outpost/internal/redis" "github.com/hookdeck/outpost/internal/redislock" "github.com/hookdeck/outpost/internal/util/testutil" @@ -74,7 +75,9 @@ func newTestInfra(t *testing.T, provider infra.InfraProvider, lockKey string, sh // Helper to create test infra with specific Redis client func newTestInfraWithRedis(t *testing.T, provider infra.InfraProvider, lockKey string, client redis.Cmdable, shouldManage bool) *infra.Infra { lock := redislock.New(client, redislock.WithKey(lockKey)) - return infra.NewInfraWithProvider(lock, provider, shouldManage) + logger, err := logging.NewLogger(logging.WithLogLevel("warn")) + require.NoError(t, err) + return infra.NewInfraWithProvider(lock, provider, shouldManage, logger) } func TestInfra_SingleNode(t *testing.T) { From 4f9675cac7ff8ad29c0c855966f20c79c440e684 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Wed, 21 Jan 2026 20:05:39 +0700 Subject: [PATCH 10/13] docs: configuration --- docs/pages/references/configuration.mdx | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/docs/pages/references/configuration.mdx b/docs/pages/references/configuration.mdx index 8d7ddb52..4ec1524e 100644 --- a/docs/pages/references/configuration.mdx +++ b/docs/pages/references/configuration.mdx @@ -41,6 +41,11 @@ Global configurations are provided through env variables or a YAML file. ConfigM | `AZURE_SERVICEBUS_RESOURCE_GROUP` | Azure resource group name | `nil` | Yes | | `AZURE_SERVICEBUS_SUBSCRIPTION_ID` | Azure subscription ID | `nil` | Yes | | `AZURE_SERVICEBUS_TENANT_ID` | Azure Active Directory tenant ID | `nil` | Yes | +| `CLICKHOUSE_ADDR` | Address (host:port) of the ClickHouse server. Example: 'localhost:9000' or 'host.clickhouse.cloud:9440' for ClickHouse Cloud. | `nil` | No | +| `CLICKHOUSE_DATABASE` | Database name in ClickHouse to use. | `outpost` | No | +| `CLICKHOUSE_PASSWORD` | Password for ClickHouse authentication. | `nil` | No | +| `CLICKHOUSE_TLS_ENABLED` | Enable TLS for ClickHouse connection. | `false` | No | +| `CLICKHOUSE_USERNAME` | Username for ClickHouse authentication. | `nil` | No | | `DELIVERY_IDEMPOTENCY_KEY_TTL` | Time-to-live in seconds for delivery queue idempotency keys. Controls how long processed deliveries are remembered to prevent duplicate delivery attempts. Default: 3600 (1 hour). | `3600` | No | | `DELIVERY_MAX_CONCURRENCY` | Maximum number of delivery attempts to process concurrently. | `1` | No | | `DELIVERY_TIMEOUT_SECONDS` | Timeout in seconds for HTTP requests made during event delivery to webhook destinations. | `5` | No | @@ -96,7 +101,7 @@ Global configurations are provided through env variables or a YAML file. ConfigM | `PORTAL_ORGANIZATION_NAME` | Organization name displayed in the Outpost Portal. | `nil` | No | | `PORTAL_PROXY_URL` | URL to proxy the Outpost Portal through. If set, Outpost serves the portal assets, and this URL is used as the base. Must be a valid URL. | `nil` | No | | `PORTAL_REFERER_URL` | The URL where the user is redirected when the JWT token is expired or when the user clicks 'back'. Required if the Outpost Portal is enabled/used. | `nil` | Conditional | -| `POSTGRES_URL` | Connection URL for PostgreSQL, used for log storage. Example: 'postgres://user:pass@host:port/dbname?sslmode=disable'. | `nil` | Yes | +| `POSTGRES_URL` | Connection URL for PostgreSQL, used for log storage. Example: 'postgres://user:pass@host:port/dbname?sslmode=disable'. | `nil` | No | | `PUBLISH_AWS_SQS_ACCESS_KEY_ID` | AWS Access Key ID for the SQS publish queue. Required if AWS SQS is the chosen publish MQ provider. | `nil` | Conditional | | `PUBLISH_AWS_SQS_ENDPOINT` | Custom AWS SQS endpoint URL for the publish queue. Optional. | `nil` | No | | `PUBLISH_AWS_SQS_QUEUE` | Name of the SQS queue for publishing events. Required if AWS SQS is the chosen publish MQ provider. | `nil` | Conditional | @@ -172,6 +177,23 @@ alert: # Enables or disables audit logging for significant events. audit_log: true +clickhouse: + # Address (host:port) of the ClickHouse server. Example: 'localhost:9000' or 'host.clickhouse.cloud:9440' for ClickHouse Cloud. + addr: "" + + # Database name in ClickHouse to use. + database: "outpost" + + # Password for ClickHouse authentication. + password: "" + + # Enable TLS for ClickHouse connection. + tls_enabled: false + + # Username for ClickHouse authentication. + username: "" + + # Time-to-live in seconds for delivery queue idempotency keys. Controls how long processed deliveries are remembered to prevent duplicate delivery attempts. Default: 3600 (1 hour). delivery_idempotency_key_ttl: 3600 @@ -466,7 +488,6 @@ portal: # Connection URL for PostgreSQL, used for log storage. Example: 'postgres://user:pass@host:port/dbname?sslmode=disable'. -# Required: Y postgres: "" # Time-to-live in seconds for publish queue idempotency keys. Controls how long processed events are remembered to prevent duplicate processing. Default: 3600 (1 hour). From 23724daee2b3006b826b5fdeaf92dbcb17dc531f Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Wed, 21 Jan 2026 20:20:45 +0700 Subject: [PATCH 11/13] chore: include delivery_event & delivery ids in logs --- internal/deliverymq/messagehandler.go | 20 ++++++++++++++++++++ internal/publishmq/eventhandler.go | 2 ++ 2 files changed, 22 insertions(+) diff --git a/internal/deliverymq/messagehandler.go b/internal/deliverymq/messagehandler.go index e5cf7054..f9625bd9 100644 --- a/internal/deliverymq/messagehandler.go +++ b/internal/deliverymq/messagehandler.go @@ -145,6 +145,7 @@ func (h *messageHandler) Handle(ctx context.Context, msg *mqs.Message) error { } h.logger.Ctx(ctx).Info("processing delivery event", + zap.String("delivery_event_id", deliveryEvent.ID), zap.String("event_id", deliveryEvent.Event.ID), zap.String("tenant_id", deliveryEvent.Event.TenantID), zap.String("destination_id", deliveryEvent.DestinationID), @@ -200,6 +201,8 @@ func (h *messageHandler) doHandle(ctx context.Context, deliveryEvent models.Deli h.logger.Ctx(ctx).Error("failed to publish event", zap.Error(err), + zap.String("delivery_event_id", deliveryEvent.ID), + zap.String("delivery_id", delivery.ID), zap.String("event_id", deliveryEvent.Event.ID), zap.String("tenant_id", deliveryEvent.Event.TenantID), zap.String("destination_id", destination.ID), @@ -220,6 +223,8 @@ func (h *messageHandler) doHandle(ctx context.Context, deliveryEvent models.Deli if err := h.retryScheduler.Cancel(ctx, deliveryEvent.GetRetryID()); err != nil { h.logger.Ctx(ctx).Error("failed to cancel scheduled retry", zap.Error(err), + zap.String("delivery_event_id", deliveryEvent.ID), + zap.String("delivery_id", delivery.ID), zap.String("event_id", deliveryEvent.Event.ID), zap.String("tenant_id", deliveryEvent.Event.TenantID), zap.String("destination_id", destination.ID), @@ -228,6 +233,8 @@ func (h *messageHandler) doHandle(ctx context.Context, deliveryEvent models.Deli return h.logDeliveryResult(ctx, &deliveryEvent, destination, delivery, err) } logger.Audit("scheduled retry canceled", + zap.String("delivery_event_id", deliveryEvent.ID), + zap.String("delivery_id", delivery.ID), zap.String("event_id", deliveryEvent.Event.ID), zap.String("tenant_id", deliveryEvent.Event.TenantID), zap.String("destination_id", destination.ID), @@ -244,6 +251,8 @@ func (h *messageHandler) logDeliveryResult(ctx context.Context, deliveryEvent *m deliveryEvent.Delivery = delivery logger.Audit("event delivered", + zap.String("delivery_event_id", deliveryEvent.ID), + zap.String("delivery_id", deliveryEvent.Delivery.ID), zap.String("event_id", deliveryEvent.Event.ID), zap.String("tenant_id", deliveryEvent.Event.TenantID), zap.String("destination_id", destination.ID), @@ -256,6 +265,8 @@ func (h *messageHandler) logDeliveryResult(ctx context.Context, deliveryEvent *m if logErr := h.logMQ.Publish(ctx, *deliveryEvent); logErr != nil { logger.Error("failed to publish delivery log", zap.Error(logErr), + zap.String("delivery_event_id", deliveryEvent.ID), + zap.String("delivery_id", deliveryEvent.Delivery.ID), zap.String("event_id", deliveryEvent.Event.ID), zap.String("tenant_id", deliveryEvent.Event.TenantID), zap.String("destination_id", destination.ID), @@ -328,6 +339,8 @@ func (h *messageHandler) handleAlertAttempt(ctx context.Context, deliveryEvent * if monitorErr := h.alertMonitor.HandleAttempt(ctx, attempt); monitorErr != nil { h.logger.Ctx(ctx).Error("failed to handle alert attempt", zap.Error(monitorErr), + zap.String("delivery_event_id", deliveryEvent.ID), + zap.String("delivery_id", deliveryEvent.Delivery.ID), zap.String("event_id", deliveryEvent.Event.ID), zap.String("tenant_id", destination.TenantID), zap.String("destination_id", destination.ID), @@ -336,6 +349,8 @@ func (h *messageHandler) handleAlertAttempt(ctx context.Context, deliveryEvent * } h.logger.Ctx(ctx).Info("alert attempt handled", + zap.String("delivery_event_id", deliveryEvent.ID), + zap.String("delivery_id", deliveryEvent.Delivery.ID), zap.String("event_id", deliveryEvent.Event.ID), zap.String("tenant_id", destination.TenantID), zap.String("destination_id", destination.ID), @@ -412,6 +427,7 @@ func (h *messageHandler) scheduleRetry(ctx context.Context, deliveryEvent models if err := h.retryScheduler.Schedule(ctx, retryMessageStr, backoffDuration, scheduler.WithTaskID(deliveryEvent.GetRetryID())); err != nil { h.logger.Ctx(ctx).Error("failed to schedule retry", zap.Error(err), + zap.String("delivery_event_id", deliveryEvent.ID), zap.String("event_id", deliveryEvent.Event.ID), zap.String("tenant_id", deliveryEvent.Event.TenantID), zap.String("destination_id", deliveryEvent.DestinationID), @@ -421,6 +437,7 @@ func (h *messageHandler) scheduleRetry(ctx context.Context, deliveryEvent models } h.logger.Ctx(ctx).Audit("retry scheduled", + zap.String("delivery_event_id", deliveryEvent.ID), zap.String("event_id", deliveryEvent.Event.ID), zap.String("tenant_id", deliveryEvent.Event.TenantID), zap.String("destination_id", deliveryEvent.DestinationID), @@ -462,6 +479,7 @@ func (h *messageHandler) ensurePublishableDestination(ctx context.Context, deliv logger := h.logger.Ctx(ctx) fields := []zap.Field{ zap.Error(err), + zap.String("delivery_event_id", deliveryEvent.ID), zap.String("event_id", deliveryEvent.Event.ID), zap.String("tenant_id", deliveryEvent.Event.TenantID), zap.String("destination_id", deliveryEvent.DestinationID), @@ -477,6 +495,7 @@ func (h *messageHandler) ensurePublishableDestination(ctx context.Context, deliv } if destination == nil { h.logger.Ctx(ctx).Info("destination not found", + zap.String("delivery_event_id", deliveryEvent.ID), zap.String("event_id", deliveryEvent.Event.ID), zap.String("tenant_id", deliveryEvent.Event.TenantID), zap.String("destination_id", deliveryEvent.DestinationID)) @@ -484,6 +503,7 @@ func (h *messageHandler) ensurePublishableDestination(ctx context.Context, deliv } if destination.DisabledAt != nil { h.logger.Ctx(ctx).Info("skipping disabled destination", + zap.String("delivery_event_id", deliveryEvent.ID), zap.String("event_id", deliveryEvent.Event.ID), zap.String("tenant_id", deliveryEvent.Event.TenantID), zap.String("destination_id", destination.ID), diff --git a/internal/publishmq/eventhandler.go b/internal/publishmq/eventhandler.go index 81bfc093..a7bdac40 100644 --- a/internal/publishmq/eventhandler.go +++ b/internal/publishmq/eventhandler.go @@ -181,6 +181,7 @@ func (h *eventHandler) enqueueDeliveryEvent(ctx context.Context, deliveryEvent m if err := h.deliveryMQ.Publish(ctx, deliveryEvent); err != nil { h.logger.Ctx(ctx).Error("failed to enqueue delivery event", zap.Error(err), + zap.String("delivery_event_id", deliveryEvent.ID), zap.String("event_id", deliveryEvent.Event.ID), zap.String("tenant_id", deliveryEvent.Event.TenantID), zap.String("destination_id", deliveryEvent.DestinationID)) @@ -190,6 +191,7 @@ func (h *eventHandler) enqueueDeliveryEvent(ctx context.Context, deliveryEvent m } h.logger.Ctx(ctx).Audit("delivery event enqueued", + zap.String("delivery_event_id", deliveryEvent.ID), zap.String("event_id", deliveryEvent.Event.ID), zap.String("tenant_id", deliveryEvent.Event.TenantID), zap.String("destination_id", deliveryEvent.DestinationID)) From 0b4ada50f603261055409f091106a612e6d03c5f Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Wed, 21 Jan 2026 20:43:27 +0700 Subject: [PATCH 12/13] chore: fix lint --- internal/models/destination.go | 2 +- internal/models/entity.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/models/destination.go b/internal/models/destination.go index f9da63c3..5bd41136 100644 --- a/internal/models/destination.go +++ b/internal/models/destination.go @@ -175,7 +175,7 @@ func (ds *DestinationSummary) MatchFilter(event Event) bool { // matchFilter is the shared implementation for filter matching. // Returns true if no filter is set (nil or empty) or if the event matches the filter. func matchFilter(filter Filter, event Event) bool { - if filter == nil || len(filter) == 0 { + if len(filter) == 0 { return true } // Build the filter input from the event diff --git a/internal/models/entity.go b/internal/models/entity.go index d60547bd..0c969249 100644 --- a/internal/models/entity.go +++ b/internal/models/entity.go @@ -505,7 +505,7 @@ func (s *entityStoreImpl) fetchTenants(ctx context.Context, baseFilter string, q // parseSearchResult parses the FT.SEARCH result into a list of tenants. // Supports both RESP2 (array) and RESP3 (map) formats. -func (s *entityStoreImpl) parseSearchResult(ctx context.Context, result interface{}) ([]Tenant, int, error) { +func (s *entityStoreImpl) parseSearchResult(_ context.Context, result interface{}) ([]Tenant, int, error) { // RESP3 format (go-redis v9): map with "total_results", "results", etc. if resultMap, ok := result.(map[interface{}]interface{}); ok { return s.parseResp3SearchResult(resultMap) @@ -815,7 +815,7 @@ func (s *entityStoreImpl) UpsertDestination(ctx context.Context, destination Des } // Store filter if present - if destination.Filter != nil && len(destination.Filter) > 0 { + if len(destination.Filter) > 0 { pipe.HSet(ctx, key, "filter", &destination.Filter) } else { pipe.HDel(ctx, key, "filter") From 57b7b405ba2a696e647e28236611106bb409aced Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Thu, 22 Jan 2026 02:09:54 +0700 Subject: [PATCH 13/13] chore: parallelize infra provisioning --- internal/infra/infra.go | 107 +++++++++++++++++++++++++--------------- 1 file changed, 68 insertions(+), 39 deletions(-) diff --git a/internal/infra/infra.go b/internal/infra/infra.go index 354232c9..ea28ec3a 100644 --- a/internal/infra/infra.go +++ b/internal/infra/infra.go @@ -11,6 +11,7 @@ import ( "github.com/hookdeck/outpost/internal/redis" "github.com/hookdeck/outpost/internal/redislock" "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) const ( @@ -59,55 +60,83 @@ type infraProvider struct { } func (p *infraProvider) Exist(ctx context.Context) (bool, error) { - p.logger.Debug("checking if deliverymq infrastructure exists", zap.String("mq_type", p.mqType)) - if exists, err := p.deliveryMQ.Exist(ctx); err != nil { - return false, err - } else if !exists { - p.logger.Debug("deliverymq infrastructure does not exist", zap.String("mq_type", p.mqType)) - return false, nil - } - p.logger.Debug("deliverymq infrastructure exists", zap.String("mq_type", p.mqType)) + var deliveryExists, logExists bool + + g, ctx := errgroup.WithContext(ctx) + + g.Go(func() error { + p.logger.Debug("checking if deliverymq infrastructure exists", zap.String("mq_type", p.mqType)) + exists, err := p.deliveryMQ.Exist(ctx) + if err != nil { + return err + } + deliveryExists = exists + if exists { + p.logger.Debug("deliverymq infrastructure exists", zap.String("mq_type", p.mqType)) + } else { + p.logger.Debug("deliverymq infrastructure does not exist", zap.String("mq_type", p.mqType)) + } + return nil + }) + + g.Go(func() error { + p.logger.Debug("checking if logmq infrastructure exists", zap.String("mq_type", p.mqType)) + exists, err := p.logMQ.Exist(ctx) + if err != nil { + return err + } + logExists = exists + if exists { + p.logger.Debug("logmq infrastructure exists", zap.String("mq_type", p.mqType)) + } else { + p.logger.Debug("logmq infrastructure does not exist", zap.String("mq_type", p.mqType)) + } + return nil + }) - p.logger.Debug("checking if logmq infrastructure exists", zap.String("mq_type", p.mqType)) - if exists, err := p.logMQ.Exist(ctx); err != nil { + if err := g.Wait(); err != nil { return false, err - } else if !exists { - p.logger.Debug("logmq infrastructure does not exist", zap.String("mq_type", p.mqType)) - return false, nil } - p.logger.Debug("logmq infrastructure exists", zap.String("mq_type", p.mqType)) - return true, nil + return deliveryExists && logExists, nil } func (p *infraProvider) Declare(ctx context.Context) error { - p.logger.Info("declaring deliverymq infrastructure", zap.String("mq_type", p.mqType)) - start := time.Now() - if err := p.deliveryMQ.Declare(ctx); err != nil { - p.logger.Error("failed to declare deliverymq infrastructure", + g, ctx := errgroup.WithContext(ctx) + + g.Go(func() error { + p.logger.Info("declaring deliverymq infrastructure", zap.String("mq_type", p.mqType)) + start := time.Now() + if err := p.deliveryMQ.Declare(ctx); err != nil { + p.logger.Error("failed to declare deliverymq infrastructure", + zap.String("mq_type", p.mqType), + zap.Duration("duration", time.Since(start)), + zap.Error(err)) + return err + } + p.logger.Info("deliverymq infrastructure declared", zap.String("mq_type", p.mqType), - zap.Duration("duration", time.Since(start)), - zap.Error(err)) - return err - } - p.logger.Info("deliverymq infrastructure declared", - zap.String("mq_type", p.mqType), - zap.Duration("duration", time.Since(start))) - - p.logger.Info("declaring logmq infrastructure", zap.String("mq_type", p.mqType)) - start = time.Now() - if err := p.logMQ.Declare(ctx); err != nil { - p.logger.Error("failed to declare logmq infrastructure", + zap.Duration("duration", time.Since(start))) + return nil + }) + + g.Go(func() error { + p.logger.Info("declaring logmq infrastructure", zap.String("mq_type", p.mqType)) + start := time.Now() + if err := p.logMQ.Declare(ctx); err != nil { + p.logger.Error("failed to declare logmq infrastructure", + zap.String("mq_type", p.mqType), + zap.Duration("duration", time.Since(start)), + zap.Error(err)) + return err + } + p.logger.Info("logmq infrastructure declared", zap.String("mq_type", p.mqType), - zap.Duration("duration", time.Since(start)), - zap.Error(err)) - return err - } - p.logger.Info("logmq infrastructure declared", - zap.String("mq_type", p.mqType), - zap.Duration("duration", time.Since(start))) + zap.Duration("duration", time.Since(start))) + return nil + }) - return nil + return g.Wait() } func (p *infraProvider) Teardown(ctx context.Context) error {