Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,11 +675,15 @@ var (
"client_requests",
WithDescription("The number of requests sent by the client to an individual service, keyed by `service_role` and `operation`."),
)
ClientFailures = NewCounterDef("client_errors")
ClientLatency = NewTimerDef("client_latency")
ClientRedirectionRequests = NewCounterDef("client_redirection_requests")
ClientRedirectionFailures = NewCounterDef("client_redirection_errors")
ClientRedirectionLatency = NewTimerDef("client_redirection_latency")
ClientFailures = NewCounterDef("client_errors")
ClientLatency = NewTimerDef("client_latency")
ClientRedirectionRequests = NewCounterDef("client_redirection_requests")
ClientRedirectionFailures = NewCounterDef("client_redirection_errors")
ClientRedirectionLatency = NewTimerDef("client_redirection_latency")
ClientDuplicatedRedirects = NewCounterDef(
"client_duplicated_redirects",
WithDescription("The number of requests that would result in multiple redirect hops, indicating potential redirect loops or excessive redirection."),
)
StateTransitionCount = NewDimensionlessHistogramDef("state_transition_count")
HistorySize = NewBytesHistogramDef("history_size")
HistoryCount = NewDimensionlessHistogramDef("history_count")
Expand Down
11 changes: 11 additions & 0 deletions common/rpc/interceptor/redirection.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,17 @@ func (i *Redirection) handleRedirectAPIInvocation(
if targetClusterName == i.currentClusterName {
resp, err = handler(ctx, req)
} else {
// Check if request already has redirection header, indicating multiple redirect hops
values := metadata.ValueFromIncomingContext(ctx, DCRedirectionApiHeaderName)
if len(values) > 0 && values[0] == "true" {
// Emit too-many-redirects metric
metricHandler := scope.WithTags(
metrics.TargetClusterTag(targetClusterName),
metrics.NamespaceTag(namespaceName.String()),
)
metrics.ClientDuplicatedRedirects.With(metricHandler).Record(1)
}

remoteClient, _, err := i.clientBean.GetRemoteFrontendClient(targetClusterName)
if err != nil {
return err
Expand Down
139 changes: 139 additions & 0 deletions common/rpc/interceptor/redirection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,3 +679,142 @@
s.Len(failureMetrics, 1)
s.Equal(int64(1), failureMetrics[0].Value)
}

func (s *redirectionInterceptorSuite) TestHandleGlobalAPIInvocation_TooManyRedirects_EmitsMetric() {
metricsHandler := metricstest.NewCaptureHandler()
capture := metricsHandler.StartCapture()
defer metricsHandler.StopCapture(capture)

redirector := NewRedirection(
dynamicconfig.GetBoolPropertyFnFilteredByNamespace(true),
dynamicconfig.GetBoolPropertyFnFilteredByNamespace(false),
s.namespaceCache,
config.DCRedirectionPolicy{Policy: DCRedirectionPolicyAllAPIsForwarding},
log.NewNoopLogger(),
s.clientBean,
metricsHandler,
clock.NewRealTimeSource(),
s.clusterMetadata,
)

// Create context with DCRedirectionApiHeaderName already present (indicating this is a second redirect)
ctx := metadata.NewIncomingContext(
context.Background(),
metadata.New(map[string]string{
DCRedirectionApiHeaderName: "true",
}),
)
req := &workflowservice.SignalWithStartWorkflowExecutionRequest{}
info := &grpc.UnaryServerInfo{
FullMethod: "/temporal.api.workflowservice.v1.WorkflowService/SignalWithStartWorkflowExecution",
}
namespaceName := namespace.Name("test-namespace")

// Setup namespace to be active in remote cluster (triggers redirect)
namespaceEntry := namespace.NewGlobalNamespaceForTest(
&persistencespb.NamespaceInfo{Id: uuid.NewString(), Name: namespaceName.String()},
&persistencespb.NamespaceConfig{Retention: timestamp.DurationFromDays(1)},
&persistencespb.NamespaceReplicationConfig{
ActiveClusterName: cluster.TestAlternativeClusterName, // Active in remote cluster
Clusters: []string{
cluster.TestCurrentClusterName,
cluster.TestAlternativeClusterName,
},
},
1,
)
s.namespaceCache.EXPECT().GetNamespace(namespaceName).Return(namespaceEntry, nil).AnyTimes()
methodName := "SignalWithStartWorkflowExecution"

// Setup mock remote client
grpcConn := &mockClientConnInterface{
Suite: &s.Suite,
targetMethod: info.FullMethod,
targetResponse: &workflowservice.SignalWithStartWorkflowExecutionResponse{},
}
s.clientBean.EXPECT().GetRemoteFrontendClient(cluster.TestAlternativeClusterName).Return(grpcConn, nil, nil).Times(1)

// Execute
_, err := redirector.handleRedirectAPIInvocation(
ctx, req, info, nil, methodName,
globalAPIResponses[methodName], namespaceName,
)
s.NoError(err)

// Verify too-many-redirects metric was emitted
snapshot := capture.Snapshot()
tooManyRedirectsMetrics := snapshot[metrics.ClientRedirectionTooManyRedirects.Name()]

Check failure on line 746 in common/rpc/interceptor/redirection_test.go

View workflow job for this annotation

GitHub Actions / fmt

undefined: metrics.ClientRedirectionTooManyRedirects

Check failure on line 746 in common/rpc/interceptor/redirection_test.go

View workflow job for this annotation

GitHub Actions / golangci

undefined: metrics.ClientRedirectionTooManyRedirects

Check failure on line 746 in common/rpc/interceptor/redirection_test.go

View workflow job for this annotation

GitHub Actions / Unit test

undefined: metrics.ClientRedirectionTooManyRedirects
s.NotEmpty(tooManyRedirectsMetrics, "ClientRedirectionTooManyRedirects should be emitted when redirect header is present")
s.Len(tooManyRedirectsMetrics, 1)
s.Equal(int64(1), tooManyRedirectsMetrics[0].Value)

// Verify normal redirection metrics are also emitted
s.NotEmpty(snapshot[metrics.ClientRedirectionRequests.Name()], "ClientRedirectionRequests should still be emitted")
s.NotEmpty(snapshot[metrics.ClientRedirectionLatency.Name()], "ClientRedirectionLatency should still be emitted")
}

func (s *redirectionInterceptorSuite) TestHandleGlobalAPIInvocation_NoRedirectHeader_NoTooManyRedirectsMetric() {
metricsHandler := metricstest.NewCaptureHandler()
capture := metricsHandler.StartCapture()
defer metricsHandler.StopCapture(capture)

redirector := NewRedirection(
dynamicconfig.GetBoolPropertyFnFilteredByNamespace(true),
dynamicconfig.GetBoolPropertyFnFilteredByNamespace(false),
s.namespaceCache,
config.DCRedirectionPolicy{Policy: DCRedirectionPolicyAllAPIsForwarding},
log.NewNoopLogger(),
s.clientBean,
metricsHandler,
clock.NewRealTimeSource(),
s.clusterMetadata,
)

// Create context WITHOUT the redirect header (first redirect scenario)
ctx := context.Background()
req := &workflowservice.SignalWithStartWorkflowExecutionRequest{}
info := &grpc.UnaryServerInfo{
FullMethod: "/temporal.api.workflowservice.v1.WorkflowService/SignalWithStartWorkflowExecution",
}
namespaceName := namespace.Name("test-namespace")

// Setup namespace to be active in remote cluster (triggers redirect)
namespaceEntry := namespace.NewGlobalNamespaceForTest(
&persistencespb.NamespaceInfo{Id: uuid.NewString(), Name: namespaceName.String()},
&persistencespb.NamespaceConfig{Retention: timestamp.DurationFromDays(1)},
&persistencespb.NamespaceReplicationConfig{
ActiveClusterName: cluster.TestAlternativeClusterName, // Active in remote cluster
Clusters: []string{
cluster.TestCurrentClusterName,
cluster.TestAlternativeClusterName,
},
},
1,
)
s.namespaceCache.EXPECT().GetNamespace(namespaceName).Return(namespaceEntry, nil).AnyTimes()
methodName := "SignalWithStartWorkflowExecution"

// Setup mock remote client
grpcConn := &mockClientConnInterface{
Suite: &s.Suite,
targetMethod: info.FullMethod,
targetResponse: &workflowservice.SignalWithStartWorkflowExecutionResponse{},
}
s.clientBean.EXPECT().GetRemoteFrontendClient(cluster.TestAlternativeClusterName).Return(grpcConn, nil, nil).Times(1)

// Execute
_, err := redirector.handleRedirectAPIInvocation(
ctx, req, info, nil, methodName,
globalAPIResponses[methodName], namespaceName,
)
s.NoError(err)

// Verify too-many-redirects metric was NOT emitted
snapshot := capture.Snapshot()
s.Empty(snapshot[metrics.ClientRedirectionTooManyRedirects.Name()],

Check failure on line 814 in common/rpc/interceptor/redirection_test.go

View workflow job for this annotation

GitHub Actions / fmt

undefined: metrics.ClientRedirectionTooManyRedirects

Check failure on line 814 in common/rpc/interceptor/redirection_test.go

View workflow job for this annotation

GitHub Actions / golangci

undefined: metrics.ClientRedirectionTooManyRedirects (typecheck)

Check failure on line 814 in common/rpc/interceptor/redirection_test.go

View workflow job for this annotation

GitHub Actions / Unit test

undefined: metrics.ClientRedirectionTooManyRedirects
"ClientRedirectionTooManyRedirects should NOT be emitted for first redirect")

// Verify normal redirection metrics ARE emitted
s.NotEmpty(snapshot[metrics.ClientRedirectionRequests.Name()], "ClientRedirectionRequests should be emitted")
s.NotEmpty(snapshot[metrics.ClientRedirectionLatency.Name()], "ClientRedirectionLatency should be emitted")
}
Loading