Skip to content

Commit 0fbe945

Browse files
fix(flagd): use provider params to configure grpc retry policy
Signed-off-by: Alexandra Oberaigner <[email protected]>
1 parent 725545d commit 0fbe945

File tree

13 files changed

+395
-195
lines changed

13 files changed

+395
-195
lines changed

providers/flagd/flagd-testbed

providers/flagd/pkg/configuration.go

Lines changed: 61 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ const (
2626
defaultHost = "localhost"
2727
defaultResolver = rpc
2828
defaultGracePeriod = 5
29+
defaultRetryBackoffMs = 1000
30+
defaultRetryBackoffMaxMs = 120000
31+
defaultFatalStatusCodes = "UNAUTHENTICATED,PERMISSION_DENIED"
2932

3033
rpc ResolverType = "rpc"
3134
inProcess ResolverType = "in-process"
@@ -45,6 +48,9 @@ const (
4548
flagdOfflinePathEnvironmentVariableName = "FLAGD_OFFLINE_FLAG_SOURCE_PATH"
4649
flagdTargetUriEnvironmentVariableName = "FLAGD_TARGET_URI"
4750
flagdGracePeriodVariableName = "FLAGD_RETRY_GRACE_PERIOD"
51+
flagdRetryBackoffMsVariableName = "FLAGD_RETRY_BACKOFF_MS"
52+
flagdRetryBackoffMaxMsVariableName = "FLAGD_RETRY_BACKOFF_MAX_MS"
53+
flagdFatalStatusCodesVariableName = "FLAGD_FATAL_STATUS_CODES"
4854
)
4955

5056
type ProviderConfiguration struct {
@@ -66,6 +72,9 @@ type ProviderConfiguration struct {
6672
CustomSyncProviderUri string
6773
GrpcDialOptionsOverride []grpc.DialOption
6874
RetryGracePeriod int
75+
RetryBackoffMs int
76+
RetryBackoffMaxMs int
77+
FatalStatusCodes []string
6978

7079
log logr.Logger
7180
}
@@ -80,6 +89,9 @@ func newDefaultConfiguration(log logr.Logger) *ProviderConfiguration {
8089
Resolver: defaultResolver,
8190
Tls: defaultTLS,
8291
RetryGracePeriod: defaultGracePeriod,
92+
RetryBackoffMs: defaultRetryBackoffMs,
93+
RetryBackoffMaxMs: defaultRetryBackoffMaxMs,
94+
FatalStatusCodes: strings.Split(defaultFatalStatusCodes, ","),
8395
}
8496

8597
p.updateFromEnvVar()
@@ -130,6 +142,7 @@ func validateProviderConfiguration(p *ProviderConfiguration) error {
130142

131143
// updateFromEnvVar is a utility to update configurations based on current environment variables
132144
func (cfg *ProviderConfiguration) updateFromEnvVar() {
145+
133146
portS := os.Getenv(flagdPortEnvironmentVariableName)
134147
if portS != "" {
135148
port, err := strconv.Atoi(portS)
@@ -159,17 +172,7 @@ func (cfg *ProviderConfiguration) updateFromEnvVar() {
159172
cfg.CertPath = certificatePath
160173
}
161174

162-
if maxCacheSizeS := os.Getenv(flagdMaxCacheSizeEnvironmentVariableName); maxCacheSizeS != "" {
163-
maxCacheSizeFromEnv, err := strconv.Atoi(maxCacheSizeS)
164-
if err != nil {
165-
cfg.log.Error(err,
166-
fmt.Sprintf("invalid env config for %s provided, using default value: %d",
167-
flagdMaxCacheSizeEnvironmentVariableName, defaultMaxCacheSize,
168-
))
169-
} else {
170-
cfg.MaxCacheSize = maxCacheSizeFromEnv
171-
}
172-
}
175+
cfg.MaxCacheSize = getIntFromEnvVarOrDefault(flagdMaxCacheSizeEnvironmentVariableName, defaultMaxCacheSize, cfg.log)
173176

174177
if cacheValue := os.Getenv(flagdCacheEnvironmentVariableName); cacheValue != "" {
175178
switch cache.Type(cacheValue) {
@@ -185,18 +188,8 @@ func (cfg *ProviderConfiguration) updateFromEnvVar() {
185188
}
186189
}
187190

188-
if maxEventStreamRetriesS := os.Getenv(
189-
flagdMaxEventStreamRetriesEnvironmentVariableName); maxEventStreamRetriesS != "" {
190-
191-
maxEventStreamRetries, err := strconv.Atoi(maxEventStreamRetriesS)
192-
if err != nil {
193-
cfg.log.Error(err,
194-
fmt.Sprintf("invalid env config for %s provided, using default value: %d",
195-
flagdMaxEventStreamRetriesEnvironmentVariableName, defaultMaxEventStreamRetries))
196-
} else {
197-
cfg.EventStreamConnectionMaxAttempts = maxEventStreamRetries
198-
}
199-
}
191+
cfg.EventStreamConnectionMaxAttempts = getIntFromEnvVarOrDefault(
192+
flagdMaxEventStreamRetriesEnvironmentVariableName, defaultMaxEventStreamRetries, cfg.log)
200193

201194
if resolver := os.Getenv(flagdResolverEnvironmentVariableName); resolver != "" {
202195
switch strings.ToLower(resolver) {
@@ -227,17 +220,34 @@ func (cfg *ProviderConfiguration) updateFromEnvVar() {
227220
if targetUri := os.Getenv(flagdTargetUriEnvironmentVariableName); targetUri != "" {
228221
cfg.TargetUri = targetUri
229222
}
230-
if gracePeriod := os.Getenv(flagdGracePeriodVariableName); gracePeriod != "" {
231-
if seconds, err := strconv.Atoi(gracePeriod); err == nil {
232-
cfg.RetryGracePeriod = seconds
223+
224+
cfg.RetryGracePeriod = getIntFromEnvVarOrDefault(flagdGracePeriodVariableName, defaultGracePeriod, cfg.log)
225+
cfg.RetryBackoffMs = getIntFromEnvVarOrDefault(flagdRetryBackoffMsVariableName, defaultRetryBackoffMs, cfg.log)
226+
cfg.RetryBackoffMaxMs = getIntFromEnvVarOrDefault(flagdRetryBackoffMaxMsVariableName, defaultRetryBackoffMaxMs, cfg.log)
227+
228+
if fatalStatusCodes := os.Getenv(flagdFatalStatusCodesVariableName); fatalStatusCodes != "" {
229+
cfg.FatalStatusCodes = strings.Split(fatalStatusCodes, ",")
230+
}
231+
}
232+
233+
// Helper
234+
235+
func getIntFromEnvVarOrDefault(envVarName string, defaultValue int, log logr.Logger) int {
236+
if valueFromEnv := os.Getenv(envVarName); valueFromEnv != "" {
237+
intValue, err := strconv.Atoi(valueFromEnv)
238+
if err != nil {
239+
log.Error(err,
240+
fmt.Sprintf("invalid env config for %s provided, using default value: %d",
241+
envVarName, defaultValue,
242+
))
233243
} else {
234-
// Handle parsing error
235-
cfg.log.Error(err, fmt.Sprintf("invalid grace period '%s'", gracePeriod))
244+
return intValue
236245
}
237246
}
238-
247+
return defaultValue
239248
}
240249

250+
241251
// ProviderOptions
242252

243253
type ProviderOption func(*ProviderConfiguration)
@@ -415,3 +425,25 @@ func WithRetryGracePeriod(gracePeriod int) ProviderOption {
415425
p.RetryGracePeriod = gracePeriod
416426
}
417427
}
428+
429+
// WithRetryBackoffMs sets the initial backoff duration (in milliseconds) for retrying failed connections
430+
func WithRetryBackoffMs(retryBackoffMs int) ProviderOption {
431+
return func(p *ProviderConfiguration) {
432+
p.RetryBackoffMs = retryBackoffMs
433+
}
434+
}
435+
436+
// WithRetryBackoffMaxMs sets the maximum backoff duration (in milliseconds) for retrying failed connections
437+
func WithRetryBackoffMaxMs(retryBackoffMaxMs int) ProviderOption {
438+
return func(p *ProviderConfiguration) {
439+
p.RetryBackoffMaxMs = retryBackoffMaxMs
440+
}
441+
}
442+
443+
// WithFatalStatusCodes allows to set a list of gRPC status codes, which will cause streams to give up
444+
// and put the provider in a PROVIDER_FATAL state
445+
func WithFatalStatusCodes(fatalStatusCodes []string) ProviderOption {
446+
return func(p *ProviderConfiguration) {
447+
p.FatalStatusCodes = fatalStatusCodes
448+
}
449+
}

providers/flagd/pkg/provider.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,9 @@ func NewProvider(opts ...ProviderOption) (*Provider, error) {
7474
CustomSyncProviderUri: provider.providerConfiguration.CustomSyncProviderUri,
7575
GrpcDialOptionsOverride: provider.providerConfiguration.GrpcDialOptionsOverride,
7676
RetryGracePeriod: provider.providerConfiguration.RetryGracePeriod,
77+
RetryBackOffMs: provider.providerConfiguration.RetryBackoffMs,
78+
RetryBackOffMaxMs: provider.providerConfiguration.RetryBackoffMaxMs,
79+
FatalStatusCodes: provider.providerConfiguration.FatalStatusCodes,
7780
})
7881
default:
7982
service = process.NewInProcessService(process.Configuration{
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package process
2+
3+
import (
4+
"encoding/json"
5+
"strings"
6+
"time"
7+
)
8+
9+
const (
10+
// Default timeouts and retry intervals
11+
defaultKeepaliveTime = 30 * time.Second
12+
defaultKeepaliveTimeout = 5 * time.Second
13+
)
14+
15+
type RetryPolicy struct {
16+
MaxAttempts int `json:"MaxAttempts"`
17+
InitialBackoff string `json:"InitialBackoff"`
18+
MaxBackoff string `json:"MaxBackoff"`
19+
BackoffMultiplier float64 `json:"BackoffMultiplier"`
20+
RetryableStatusCodes []string `json:"RetryableStatusCodes"`
21+
}
22+
23+
func (g *Sync) buildRetryPolicy() string {
24+
var policy = map[string]interface{}{
25+
"methodConfig": []map[string]interface{}{
26+
{
27+
"name": []map[string]string{
28+
{"service": "flagd.sync.v1.FlagSyncService"},
29+
},
30+
"retryPolicy": RetryPolicy{
31+
MaxAttempts: 3,
32+
InitialBackoff: (time.Duration(g.RetryBackOffMs) * time.Millisecond).String(),
33+
MaxBackoff: (time.Duration(g.RetryBackOffMaxMs) * time.Millisecond).String(),
34+
BackoffMultiplier: 2.0,
35+
RetryableStatusCodes: []string{"UNKNOWN","UNAVAILABLE"},
36+
},
37+
},
38+
},
39+
}
40+
retryPolicyBytes, _ := json.Marshal(policy)
41+
retryPolicy := string(retryPolicyBytes)
42+
43+
return retryPolicy
44+
}
45+
46+
// Set of non-retryable gRPC status codes for faster lookup
47+
var nonRetryableCodes map[string]struct{}
48+
49+
// initNonRetryableStatusCodesSet initializes the set of non-retryable gRPC status codes for quick lookup
50+
func (g *Sync) initNonRetryableStatusCodesSet() {
51+
nonRetryableCodes = make(map[string]struct{})
52+
for _, code := range g.FatalStatusCodes {
53+
normalized := toCamelCase(code)
54+
nonRetryableCodes[normalized] = struct{}{}
55+
}
56+
}
57+
58+
// toCamelCase converts a SNAKE_CASE string to CamelCase
59+
func toCamelCase(s string) string {
60+
parts := strings.Split(strings.ToLower(s), "_")
61+
for i, part := range parts {
62+
if len(part) > 0 {
63+
parts[i] = strings.ToUpper(part[:1]) + part[1:]
64+
}
65+
}
66+
return strings.Join(parts, "")
67+
}
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
package process
2+
3+
import (
4+
"encoding/json"
5+
"reflect"
6+
"strings"
7+
"testing"
8+
)
9+
10+
11+
func TestBuildRetryPolicy(t *testing.T) {
12+
g := &Sync{
13+
RetryBackOffMs: 100,
14+
RetryBackOffMaxMs: 500,
15+
}
16+
17+
result := g.buildRetryPolicy()
18+
19+
// Unmarshal to check structure
20+
var policy map[string]interface{}
21+
if err := json.Unmarshal([]byte(result), &policy); err != nil {
22+
t.Fatalf("Failed to unmarshal result: %v", err)
23+
}
24+
25+
methodConfig, ok := policy["methodConfig"].([]interface{})
26+
if !ok || len(methodConfig) == 0 {
27+
t.Fatalf("methodConfig missing or empty")
28+
}
29+
30+
config := methodConfig[0].(map[string]interface{})
31+
retryPolicy, ok := config["retryPolicy"].(map[string]interface{})
32+
if !ok {
33+
t.Fatalf("retryPolicy missing")
34+
}
35+
36+
if retryPolicy["MaxAttempts"].(float64) != 3 {
37+
t.Errorf("MaxAttempts = %v; want 3", retryPolicy["MaxAttempts"])
38+
}
39+
if retryPolicy["InitialBackoff"].(string) != "100ms" {
40+
t.Errorf("InitialBackoff = %v; want 100ms", retryPolicy["InitialBackoff"])
41+
}
42+
if retryPolicy["MaxBackoff"].(string) != "500ms" {
43+
t.Errorf("MaxBackoff = %v; want 500ms", retryPolicy["MaxBackoff"])
44+
}
45+
if retryPolicy["BackoffMultiplier"].(float64) != 2.0 {
46+
t.Errorf("BackoffMultiplier = %v; want 2.0", retryPolicy["BackoffMultiplier"])
47+
}
48+
codes := retryPolicy["RetryableStatusCodes"].([]interface{})
49+
expectedCodes := []string{"UNKNOWN", "UNAVAILABLE"}
50+
for i, code := range expectedCodes {
51+
if codes[i].(string) != code {
52+
t.Errorf("RetryableStatusCodes[%d] = %v; want %v", i, codes[i], code)
53+
}
54+
}
55+
56+
// Also check that the result is valid JSON and contains expected substrings
57+
if !strings.Contains(result, `"MaxAttempts":3`) {
58+
t.Error("Result does not contain MaxAttempts")
59+
}
60+
if !strings.Contains(result, `"InitialBackoff":"100ms"`) {
61+
t.Error("Result does not contain InitialBackoff")
62+
}
63+
if !strings.Contains(result, `"MaxBackoff":"500ms"`) {
64+
t.Error("Result does not contain MaxBackoff")
65+
}
66+
if !strings.Contains(result, `"RetryableStatusCodes":["UNKNOWN","UNAVAILABLE"]`) {
67+
t.Error("Result does not contain RetryableStatusCodes")
68+
}
69+
}
70+
71+
type syncTestCase struct {
72+
input []string
73+
expected map[string]struct{}
74+
}
75+
76+
func TestInitNonRetryableStatusCodesSet(t *testing.T) {
77+
testCases := []syncTestCase{
78+
{
79+
input: []string{"PERMISSION_DENIED", "UNKNOWN"},
80+
expected: map[string]struct{}{"PermissionDenied": {}, "Unknown": {}},
81+
},
82+
{
83+
input: []string{"ALREADY_EXISTS"},
84+
expected: map[string]struct{}{"AlreadyExists": {}},
85+
},
86+
{
87+
input: []string{},
88+
expected: map[string]struct{}{},
89+
},
90+
}
91+
92+
for _, tc := range testCases {
93+
g := &Sync{FatalStatusCodes: tc.input}
94+
nonRetryableCodes = nil // reset global
95+
g.initNonRetryableStatusCodesSet()
96+
if !reflect.DeepEqual(nonRetryableCodes, tc.expected) {
97+
t.Errorf("input: %v, got: %v, want: %v", tc.input, nonRetryableCodes, tc.expected)
98+
}
99+
}
100+
}
101+
102+
func TestToCamelCase(t *testing.T) {
103+
testCases := []struct {
104+
input string
105+
expected string
106+
}{
107+
{"INVALID_ARGUMENT", "InvalidArgument"},
108+
{"NOT_FOUND", "NotFound"},
109+
{"ALREADY_EXISTS", "AlreadyExists"},
110+
{"UNKNOWN", "Unknown"},
111+
{"", ""},
112+
{"SINGLE", "Single"},
113+
{"MULTI_WORD_EXAMPLE", "MultiWordExample"},
114+
{"_LEADING_UNDERSCORE", "LeadingUnderscore"},
115+
{"TRAILING_UNDERSCORE_", "TrailingUnderscore"},
116+
{"__DOUBLE__UNDERSCORES__", "DoubleUnderscores"},
117+
}
118+
119+
for _, tc := range testCases {
120+
got := toCamelCase(tc.input)
121+
if got != tc.expected {
122+
t.Errorf("toCamelCase(%q) = %q; want %q", tc.input, got, tc.expected)
123+
}
124+
}
125+
}

0 commit comments

Comments
 (0)