-
Notifications
You must be signed in to change notification settings - Fork 2.3k
sqldb+lncfg: add retry logic for postgres #10564
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -186,6 +186,56 @@ func (s *PostgresStore) ExecuteMigrations(target MigrationTarget) error { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // WaitForPostgresReady waits for the Postgres database to become available by | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // pinging it in a retry loop with a fixed delay. This is useful in environments | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // like Kubernetes where the database container may not be ready when LND | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // starts. If StartupMaxRetries is zero, this function returns immediately | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // without pinging. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| func WaitForPostgresReady(ctx context.Context, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| cfg *PostgresConfig) error { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if cfg.StartupMaxRetries <= 0 { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return nil | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| sanitizedDSN, err := replacePasswordInDSN(cfg.Dsn) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return err | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| db, err := sql.Open("pgx", cfg.Dsn) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return fmt.Errorf("error creating postgres connection: %w", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| err) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| defer db.Close() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for attempt := 0; attempt < cfg.StartupMaxRetries; attempt++ { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| pingCtx, cancel := context.WithTimeout(ctx, 5*time.Second) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| err = db.PingContext(pingCtx) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| cancel() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if err == nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| log.Infof("Postgres is ready at '%s'", sanitizedDSN) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return nil | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| log.Warnf("Failed to connect to postgres at '%s' "+ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "(attempt %d/%d): %v", sanitizedDSN, attempt+1, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| cfg.StartupMaxRetries, err) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| select { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| case <-time.After(cfg.StartupRetryDelay): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| case <-ctx.Done(): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return fmt.Errorf("context canceled while waiting "+ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "for postgres: %w", ctx.Err()) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return fmt.Errorf("failed to connect to postgres at '%s' after %d "+ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "attempts: %w", sanitizedDSN, cfg.StartupMaxRetries, err) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+201
to
+236
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // GetSchemaVersion returns the current schema version of the Postgres database. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| func (s *PostgresStore) GetSchemaVersion() (int, bool, error) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| driver, err := pgx_migrate.WithInstance(s.DB, &pgx_migrate.Config{}) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,72 @@ | ||
| package sqldb | ||
|
|
||
| import ( | ||
| "context" | ||
| "testing" | ||
| "time" | ||
|
|
||
| "github.com/stretchr/testify/require" | ||
| ) | ||
|
|
||
| // TestWaitForPostgresReadySkipsWhenDisabled verifies that WaitForPostgresReady | ||
| // returns immediately when StartupMaxRetries is set to zero. | ||
| func TestWaitForPostgresReadySkipsWhenDisabled(t *testing.T) { | ||
| t.Parallel() | ||
|
|
||
| cfg := &PostgresConfig{ | ||
| Dsn: "postgres://localhost:1/testdb", | ||
| StartupMaxRetries: 0, | ||
| StartupRetryDelay: 1 * time.Second, | ||
| } | ||
|
|
||
| err := WaitForPostgresReady(context.Background(), cfg) | ||
| require.NoError(t, err) | ||
| } | ||
|
|
||
| // TestWaitForPostgresReadyExhaustsRetries verifies that WaitForPostgresReady | ||
| // returns an error after exhausting all retry attempts against an unreachable | ||
| // endpoint. | ||
| func TestWaitForPostgresReadyExhaustsRetries(t *testing.T) { | ||
| t.Parallel() | ||
|
|
||
| cfg := &PostgresConfig{ | ||
| Dsn: "postgres://localhost:1/testdb", | ||
| StartupMaxRetries: 3, | ||
| StartupRetryDelay: 10 * time.Millisecond, | ||
| } | ||
|
|
||
| err := WaitForPostgresReady(context.Background(), cfg) | ||
| require.Error(t, err) | ||
| require.Contains(t, err.Error(), "failed to connect to postgres") | ||
| require.Contains(t, err.Error(), "3 attempts") | ||
| } | ||
|
|
||
| // TestWaitForPostgresReadyContextCancel verifies that WaitForPostgresReady | ||
| // respects context cancellation and stops retrying early. | ||
| func TestWaitForPostgresReadyContextCancel(t *testing.T) { | ||
| t.Parallel() | ||
|
|
||
| cfg := &PostgresConfig{ | ||
| Dsn: "postgres://localhost:1/testdb", | ||
| StartupMaxRetries: 100, | ||
| StartupRetryDelay: 1 * time.Second, | ||
| } | ||
|
|
||
| ctx, cancel := context.WithCancel(context.Background()) | ||
|
|
||
| // Cancel the context after a short delay. | ||
| go func() { | ||
| time.Sleep(100 * time.Millisecond) | ||
| cancel() | ||
| }() | ||
|
|
||
| start := time.Now() | ||
| err := WaitForPostgresReady(ctx, cfg) | ||
| elapsed := time.Since(start) | ||
|
|
||
| require.Error(t, err) | ||
| require.Contains(t, err.Error(), "context canceled") | ||
|
|
||
| // Ensure we didn't wait for all 100 retries. | ||
| require.Less(t, elapsed, 10*time.Second) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exponential back off?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Intentionally used a fixed delay since the primary use case is a startup race in which Postgres is ready within seconds. The retry delay is configurable if users need a different interval.