Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .air.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ root = "."
tmp_dir = "tmp"

[build]
poll = true
poll_interval = 500
cmd = "go build -o tmp/main cmd/outpost/main.go"
bin = "tmp/main"
delay = 100
Expand Down
7 changes: 6 additions & 1 deletion cmd/e2e/configs/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/hookdeck/outpost/internal/config"
"github.com/hookdeck/outpost/internal/idgen"
"github.com/hookdeck/outpost/internal/infra"
"github.com/hookdeck/outpost/internal/logging"
"github.com/hookdeck/outpost/internal/redis"
"github.com/hookdeck/outpost/internal/util/testinfra"
"github.com/hookdeck/outpost/internal/util/testutil"
Expand Down Expand Up @@ -97,10 +98,14 @@ func Basic(t *testing.T, opts BasicOpts) config.Config {
if err != nil {
log.Println("Failed to create redis client:", err)
}
logger, err := logging.NewLogger(logging.WithLogLevel("warn"))
if err != nil {
log.Println("Failed to create logger:", err)
}
outpostInfra := infra.NewInfra(infra.Config{
DeliveryMQ: c.MQs.ToInfraConfig("deliverymq"),
LogMQ: c.MQs.ToInfraConfig("logmq"),
}, redisClient)
}, redisClient, logger, c.MQs.GetInfraType())
if err := outpostInfra.Teardown(context.Background()); err != nil {
log.Println("Teardown failed:", err)
}
Expand Down
25 changes: 23 additions & 2 deletions docs/pages/references/configuration.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ Global configurations are provided through env variables or a YAML file. ConfigM
| `AZURE_SERVICEBUS_RESOURCE_GROUP` | Azure resource group name | `nil` | Yes |
| `AZURE_SERVICEBUS_SUBSCRIPTION_ID` | Azure subscription ID | `nil` | Yes |
| `AZURE_SERVICEBUS_TENANT_ID` | Azure Active Directory tenant ID | `nil` | Yes |
| `CLICKHOUSE_ADDR` | Address (host:port) of the ClickHouse server. Example: 'localhost:9000' or 'host.clickhouse.cloud:9440' for ClickHouse Cloud. | `nil` | No |
| `CLICKHOUSE_DATABASE` | Database name in ClickHouse to use. | `outpost` | No |
| `CLICKHOUSE_PASSWORD` | Password for ClickHouse authentication. | `nil` | No |
| `CLICKHOUSE_TLS_ENABLED` | Enable TLS for ClickHouse connection. | `false` | No |
| `CLICKHOUSE_USERNAME` | Username for ClickHouse authentication. | `nil` | No |
| `DELIVERY_IDEMPOTENCY_KEY_TTL` | Time-to-live in seconds for delivery queue idempotency keys. Controls how long processed deliveries are remembered to prevent duplicate delivery attempts. Default: 3600 (1 hour). | `3600` | No |
| `DELIVERY_MAX_CONCURRENCY` | Maximum number of delivery attempts to process concurrently. | `1` | No |
| `DELIVERY_TIMEOUT_SECONDS` | Timeout in seconds for HTTP requests made during event delivery to webhook destinations. | `5` | No |
Expand Down Expand Up @@ -96,7 +101,7 @@ Global configurations are provided through env variables or a YAML file. ConfigM
| `PORTAL_ORGANIZATION_NAME` | Organization name displayed in the Outpost Portal. | `nil` | No |
| `PORTAL_PROXY_URL` | URL to proxy the Outpost Portal through. If set, Outpost serves the portal assets, and this URL is used as the base. Must be a valid URL. | `nil` | No |
| `PORTAL_REFERER_URL` | The URL where the user is redirected when the JWT token is expired or when the user clicks 'back'. Required if the Outpost Portal is enabled/used. | `nil` | Conditional |
| `POSTGRES_URL` | Connection URL for PostgreSQL, used for log storage. Example: 'postgres://user:pass@host:port/dbname?sslmode=disable'. | `nil` | Yes |
| `POSTGRES_URL` | Connection URL for PostgreSQL, used for log storage. Example: 'postgres://user:pass@host:port/dbname?sslmode=disable'. | `nil` | No |
| `PUBLISH_AWS_SQS_ACCESS_KEY_ID` | AWS Access Key ID for the SQS publish queue. Required if AWS SQS is the chosen publish MQ provider. | `nil` | Conditional |
| `PUBLISH_AWS_SQS_ENDPOINT` | Custom AWS SQS endpoint URL for the publish queue. Optional. | `nil` | No |
| `PUBLISH_AWS_SQS_QUEUE` | Name of the SQS queue for publishing events. Required if AWS SQS is the chosen publish MQ provider. | `nil` | Conditional |
Expand Down Expand Up @@ -172,6 +177,23 @@ alert:
# Enables or disables audit logging for significant events.
audit_log: true

clickhouse:
# Address (host:port) of the ClickHouse server. Example: 'localhost:9000' or 'host.clickhouse.cloud:9440' for ClickHouse Cloud.
addr: ""

# Database name in ClickHouse to use.
database: "outpost"

# Password for ClickHouse authentication.
password: ""

# Enable TLS for ClickHouse connection.
tls_enabled: false

# Username for ClickHouse authentication.
username: ""


# Time-to-live in seconds for delivery queue idempotency keys. Controls how long processed deliveries are remembered to prevent duplicate delivery attempts. Default: 3600 (1 hour).
delivery_idempotency_key_ttl: 3600

Expand Down Expand Up @@ -466,7 +488,6 @@ portal:


# Connection URL for PostgreSQL, used for log storage. Example: 'postgres://user:pass@host:port/dbname?sslmode=disable'.
# Required: Y
postgres: ""

# Time-to-live in seconds for publish queue idempotency keys. Controls how long processed events are remembered to prevent duplicate processing. Default: 3600 (1 hour).
Expand Down
8 changes: 8 additions & 0 deletions internal/apirouter/logger_middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,14 @@ func getErrorFields(err error) []zap.Field {
originalErr = err
}

// Handle nil error case (e.g., ErrorResponse with nil Err field)
if originalErr == nil {
return []zap.Field{
zap.String("error", "unknown error"),
zap.String("error_type", "nil"),
}
}

fields := []zap.Field{
zap.String("error", originalErr.Error()),
zap.String("error_type", fmt.Sprintf("%T", originalErr)),
Expand Down
1 change: 1 addition & 0 deletions internal/apirouter/tenant_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ func (h *TenantHandlers) List(c *gin.Context) {
// Map errors to HTTP status codes
if errors.Is(err, models.ErrListTenantNotSupported) {
AbortWithError(c, http.StatusNotImplemented, ErrorResponse{
Err: err,
Code: http.StatusNotImplemented,
Message: err.Error(),
})
Expand Down
6 changes: 4 additions & 2 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package app
import (
"context"
"errors"
"fmt"
"os"
"os/signal"
"syscall"
Expand Down Expand Up @@ -49,14 +50,15 @@ func (a *App) Run(ctx context.Context) error {
}

// PreRun initializes all dependencies before starting the application
func (a *App) PreRun(ctx context.Context) error {
func (a *App) PreRun(ctx context.Context) (err error) {
if err := a.setupLogger(); err != nil {
return err
}

defer func() {
if r := recover(); r != nil {
a.logger.Error("panic during PreRun", zap.Any("panic", r))
err = fmt.Errorf("panic during PreRun: %v", r)
}
}()

Expand Down Expand Up @@ -217,7 +219,7 @@ func (a *App) initializeInfrastructure(ctx context.Context) error {
DeliveryMQ: a.config.MQs.ToInfraConfig("deliverymq"),
LogMQ: a.config.MQs.ToInfraConfig("logmq"),
AutoProvision: a.config.MQs.AutoProvision,
}, a.redisClient); err != nil {
}, a.redisClient, a.logger, a.config.MQs.GetInfraType()); err != nil {
a.logger.Error("infrastructure initialization failed", zap.Error(err))
return err
}
Expand Down
21 changes: 15 additions & 6 deletions internal/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package clickhouse

import (
"crypto/tls"

"github.com/ClickHouse/clickhouse-go/v2"
chdriver "github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
Expand All @@ -12,14 +14,15 @@ type (
)

type ClickHouseConfig struct {
Addr string
Username string
Password string
Database string
Addr string
Username string
Password string
Database string
TLSEnabled bool
}

func New(config *ClickHouseConfig) (DB, error) {
conn, err := clickhouse.Open(&clickhouse.Options{
opts := &clickhouse.Options{
Addr: []string{config.Addr},
Auth: clickhouse.Auth{
Database: config.Database,
Expand All @@ -31,6 +34,12 @@ func New(config *ClickHouseConfig) (DB, error) {
// Debugf: func(format string, v ...any) {
// fmt.Printf(format+"\n", v...)
// },
})
}

if config.TLSEnabled {
opts.TLS = &tls.Config{}
}

conn, err := clickhouse.Open(opts)
return conn, err
}
19 changes: 11 additions & 8 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,21 +380,23 @@ func (c *RedisConfig) ToConfig() *redis.RedisConfig {
}

type ClickHouseConfig struct {
Addr string `yaml:"addr" env:"CLICKHOUSE_ADDR" desc:"Address (host:port) of the ClickHouse server. Example: 'localhost:9000'." required:"N"`
Username string `yaml:"username" env:"CLICKHOUSE_USERNAME" desc:"Username for ClickHouse authentication." required:"N"`
Password string `yaml:"password" env:"CLICKHOUSE_PASSWORD" desc:"Password for ClickHouse authentication." required:"N"`
Database string `yaml:"database" env:"CLICKHOUSE_DATABASE" desc:"Database name in ClickHouse to use." required:"N"`
Addr string `yaml:"addr" env:"CLICKHOUSE_ADDR" desc:"Address (host:port) of the ClickHouse server. Example: 'localhost:9000' or 'host.clickhouse.cloud:9440' for ClickHouse Cloud." required:"N"`
Username string `yaml:"username" env:"CLICKHOUSE_USERNAME" desc:"Username for ClickHouse authentication." required:"N"`
Password string `yaml:"password" env:"CLICKHOUSE_PASSWORD" desc:"Password for ClickHouse authentication." required:"N"`
Database string `yaml:"database" env:"CLICKHOUSE_DATABASE" desc:"Database name in ClickHouse to use." required:"N"`
TLSEnabled bool `yaml:"tls_enabled" env:"CLICKHOUSE_TLS_ENABLED" desc:"Enable TLS for ClickHouse connection." required:"N"`
}

func (c *ClickHouseConfig) ToConfig() *clickhouse.ClickHouseConfig {
if c.Addr == "" {
return nil
}
return &clickhouse.ClickHouseConfig{
Addr: c.Addr,
Username: c.Username,
Password: c.Password,
Database: c.Database,
Addr: c.Addr,
Username: c.Username,
Password: c.Password,
Database: c.Database,
TLSEnabled: c.TLSEnabled,
}
}

Expand Down Expand Up @@ -478,6 +480,7 @@ func (c *Config) ToMigratorOpts() migrator.MigrationOpts {
Password: c.ClickHouse.Password,
Database: c.ClickHouse.Database,
DeploymentID: c.DeploymentID,
TLSEnabled: c.ClickHouse.TLSEnabled,
},
}
}
7 changes: 7 additions & 0 deletions internal/config/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ func (c *Config) LogConfigurationSummary() []zap.Field {
zap.Bool("postgres_configured", c.PostgresURL != ""),
zap.String("postgres_host", maskPostgresURLHost(c.PostgresURL)),

// ClickHouse
zap.Bool("clickhouse_configured", c.ClickHouse.Addr != ""),
zap.String("clickhouse_addr", c.ClickHouse.Addr),
zap.String("clickhouse_database", c.ClickHouse.Database),
zap.Bool("clickhouse_password_configured", c.ClickHouse.Password != ""),
zap.Bool("clickhouse_tls_enabled", c.ClickHouse.TLSEnabled),

// Message Queue
zap.String("mq_type", c.MQs.GetInfraType()),

Expand Down
20 changes: 20 additions & 0 deletions internal/deliverymq/messagehandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ func (h *messageHandler) Handle(ctx context.Context, msg *mqs.Message) error {
}

h.logger.Ctx(ctx).Info("processing delivery event",
zap.String("delivery_event_id", deliveryEvent.ID),
zap.String("event_id", deliveryEvent.Event.ID),
zap.String("tenant_id", deliveryEvent.Event.TenantID),
zap.String("destination_id", deliveryEvent.DestinationID),
Expand Down Expand Up @@ -200,6 +201,8 @@ func (h *messageHandler) doHandle(ctx context.Context, deliveryEvent models.Deli

h.logger.Ctx(ctx).Error("failed to publish event",
zap.Error(err),
zap.String("delivery_event_id", deliveryEvent.ID),
zap.String("delivery_id", delivery.ID),
zap.String("event_id", deliveryEvent.Event.ID),
zap.String("tenant_id", deliveryEvent.Event.TenantID),
zap.String("destination_id", destination.ID),
Expand All @@ -220,6 +223,8 @@ func (h *messageHandler) doHandle(ctx context.Context, deliveryEvent models.Deli
if err := h.retryScheduler.Cancel(ctx, deliveryEvent.GetRetryID()); err != nil {
h.logger.Ctx(ctx).Error("failed to cancel scheduled retry",
zap.Error(err),
zap.String("delivery_event_id", deliveryEvent.ID),
zap.String("delivery_id", delivery.ID),
zap.String("event_id", deliveryEvent.Event.ID),
zap.String("tenant_id", deliveryEvent.Event.TenantID),
zap.String("destination_id", destination.ID),
Expand All @@ -228,6 +233,8 @@ func (h *messageHandler) doHandle(ctx context.Context, deliveryEvent models.Deli
return h.logDeliveryResult(ctx, &deliveryEvent, destination, delivery, err)
}
logger.Audit("scheduled retry canceled",
zap.String("delivery_event_id", deliveryEvent.ID),
zap.String("delivery_id", delivery.ID),
zap.String("event_id", deliveryEvent.Event.ID),
zap.String("tenant_id", deliveryEvent.Event.TenantID),
zap.String("destination_id", destination.ID),
Expand All @@ -244,6 +251,8 @@ func (h *messageHandler) logDeliveryResult(ctx context.Context, deliveryEvent *m
deliveryEvent.Delivery = delivery

logger.Audit("event delivered",
zap.String("delivery_event_id", deliveryEvent.ID),
zap.String("delivery_id", deliveryEvent.Delivery.ID),
zap.String("event_id", deliveryEvent.Event.ID),
zap.String("tenant_id", deliveryEvent.Event.TenantID),
zap.String("destination_id", destination.ID),
Expand All @@ -256,6 +265,8 @@ func (h *messageHandler) logDeliveryResult(ctx context.Context, deliveryEvent *m
if logErr := h.logMQ.Publish(ctx, *deliveryEvent); logErr != nil {
logger.Error("failed to publish delivery log",
zap.Error(logErr),
zap.String("delivery_event_id", deliveryEvent.ID),
zap.String("delivery_id", deliveryEvent.Delivery.ID),
zap.String("event_id", deliveryEvent.Event.ID),
zap.String("tenant_id", deliveryEvent.Event.TenantID),
zap.String("destination_id", destination.ID),
Expand Down Expand Up @@ -328,6 +339,8 @@ func (h *messageHandler) handleAlertAttempt(ctx context.Context, deliveryEvent *
if monitorErr := h.alertMonitor.HandleAttempt(ctx, attempt); monitorErr != nil {
h.logger.Ctx(ctx).Error("failed to handle alert attempt",
zap.Error(monitorErr),
zap.String("delivery_event_id", deliveryEvent.ID),
zap.String("delivery_id", deliveryEvent.Delivery.ID),
zap.String("event_id", deliveryEvent.Event.ID),
zap.String("tenant_id", destination.TenantID),
zap.String("destination_id", destination.ID),
Expand All @@ -336,6 +349,8 @@ func (h *messageHandler) handleAlertAttempt(ctx context.Context, deliveryEvent *
}

h.logger.Ctx(ctx).Info("alert attempt handled",
zap.String("delivery_event_id", deliveryEvent.ID),
zap.String("delivery_id", deliveryEvent.Delivery.ID),
zap.String("event_id", deliveryEvent.Event.ID),
zap.String("tenant_id", destination.TenantID),
zap.String("destination_id", destination.ID),
Expand Down Expand Up @@ -412,6 +427,7 @@ func (h *messageHandler) scheduleRetry(ctx context.Context, deliveryEvent models
if err := h.retryScheduler.Schedule(ctx, retryMessageStr, backoffDuration, scheduler.WithTaskID(deliveryEvent.GetRetryID())); err != nil {
h.logger.Ctx(ctx).Error("failed to schedule retry",
zap.Error(err),
zap.String("delivery_event_id", deliveryEvent.ID),
zap.String("event_id", deliveryEvent.Event.ID),
zap.String("tenant_id", deliveryEvent.Event.TenantID),
zap.String("destination_id", deliveryEvent.DestinationID),
Expand All @@ -421,6 +437,7 @@ func (h *messageHandler) scheduleRetry(ctx context.Context, deliveryEvent models
}

h.logger.Ctx(ctx).Audit("retry scheduled",
zap.String("delivery_event_id", deliveryEvent.ID),
zap.String("event_id", deliveryEvent.Event.ID),
zap.String("tenant_id", deliveryEvent.Event.TenantID),
zap.String("destination_id", deliveryEvent.DestinationID),
Expand Down Expand Up @@ -462,6 +479,7 @@ func (h *messageHandler) ensurePublishableDestination(ctx context.Context, deliv
logger := h.logger.Ctx(ctx)
fields := []zap.Field{
zap.Error(err),
zap.String("delivery_event_id", deliveryEvent.ID),
zap.String("event_id", deliveryEvent.Event.ID),
zap.String("tenant_id", deliveryEvent.Event.TenantID),
zap.String("destination_id", deliveryEvent.DestinationID),
Expand All @@ -477,13 +495,15 @@ func (h *messageHandler) ensurePublishableDestination(ctx context.Context, deliv
}
if destination == nil {
h.logger.Ctx(ctx).Info("destination not found",
zap.String("delivery_event_id", deliveryEvent.ID),
zap.String("event_id", deliveryEvent.Event.ID),
zap.String("tenant_id", deliveryEvent.Event.TenantID),
zap.String("destination_id", deliveryEvent.DestinationID))
return nil, models.ErrDestinationNotFound
}
if destination.DisabledAt != nil {
h.logger.Ctx(ctx).Info("skipping disabled destination",
zap.String("delivery_event_id", deliveryEvent.ID),
zap.String("event_id", deliveryEvent.Event.ID),
zap.String("tenant_id", deliveryEvent.Event.TenantID),
zap.String("destination_id", destination.ID),
Expand Down
Loading