Skip to content

Commit 5646605

Browse files
committed
feat(broker): Emit K8s Warning events for event loop detection in ingress
1 parent be8d89c commit 5646605

File tree

1 file changed

+76
-0
lines changed

1 file changed

+76
-0
lines changed

pkg/broker/ingress/ingress_handler.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@ import (
3535
"github.com/cloudevents/sdk-go/v2/client"
3636
cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
3737
"go.uber.org/zap"
38+
corev1 "k8s.io/api/core/v1"
3839
"k8s.io/apimachinery/pkg/types"
40+
"k8s.io/client-go/tools/record"
3941
corev1listers "k8s.io/client-go/listers/core/v1"
4042
"k8s.io/client-go/tools/cache"
4143

@@ -80,6 +82,10 @@ type Handler struct {
8082
withContext func(ctx context.Context) context.Context
8183
tracer trace.Tracer
8284
dispatchDuration metric.Float64Histogram
85+
86+
// eventRecorder is used to emit Kubernetes events when event loops are detected
87+
// (when TTL reaches 0, indicating an infinite loop)
88+
eventRecorder record.EventRecorder
8389
}
8490

8591
func NewHandler(
@@ -166,6 +172,10 @@ func NewHandler(
166172

167173
return h, nil
168174
}
175+
// SetEventRecorder sets the event recorder for emitting Kubernetes events when event loops are detected.
176+
func (h *Handler) SetEventRecorder(recorder record.EventRecorder) {
177+
h.eventRecorder = recorder
178+
}
169179

170180
func (h *Handler) getBroker(name, namespace string) (*eventingv1.Broker, error) {
171181
broker, err := h.BrokerLister.Brokers(namespace).Get(name)
@@ -347,6 +357,12 @@ func (h *Handler) receive(ctx context.Context, headers http.Header, event *cloud
347357

348358
if ttl, err := broker.GetTTL(event.Context); err != nil || ttl <= 0 {
349359
h.Logger.Debug("dropping event based on TTL status.", zap.Int32("TTL", ttl), zap.String("event.id", event.ID()), zap.Error(err))
360+
// Emit K8s Warning event for event loop detection.
361+
// TTL <= 0 indicates the event has been looping through the broker/trigger chain.
362+
if ttl == 0 {
363+
h.emitEventLoopWarning(ctx, event, brokerObj)
364+
}
365+
350366
return http.StatusBadRequest, kncloudevents.NoDuration
351367
}
352368

@@ -374,3 +390,63 @@ func (h *Handler) receive(ctx context.Context, headers http.Header, event *cloud
374390

375391
return dispatchInfo.ResponseCode, dispatchInfo.Duration
376392
}
393+
// emitEventLoopWarning creates a Kubernetes Warning event for event loop detection.
394+
// This is called when TTL reaches 0, which indicates that an event has been cycling
395+
// through the broker/trigger chain infinitely, likely due to misconfiguration.
396+
func (h *Handler) emitEventLoopWarning(ctx context.Context, event *cloudevents.Event, broker *eventingv1.Broker) {
397+
if h.eventRecorder == nil {
398+
h.Logger.Warn("Event recorder not available, cannot emit K8s event for event loop")
399+
return
400+
}
401+
402+
// Skip if broker has no proper identity
403+
if broker.Name == "" || broker.Namespace == "" || broker.UID == "" {
404+
h.Logger.Warn("Broker missing identity, cannot emit K8s event",
405+
zap.String("name", broker.Name),
406+
zap.String("namespace", broker.Namespace))
407+
return
408+
}
409+
410+
// Get event metadata for proper labeling
411+
eventType := event.Type()
412+
eventSource := event.Source()
413+
eventID := event.ID()
414+
415+
// Create the event message
416+
message := fmt.Sprintf("Event loop detected (TTL reached 0): type=%s, source=%s, id=%s. "+
417+
"This likely indicates a misconfiguration where events are cycling through broker/trigger chain infinitely. "+
418+
"Check your trigger filters and reply configurations.",
419+
eventType, eventSource, eventID)
420+
421+
// Create annotations with event-type and event-source for Knative reconciler.
422+
// These annotations enable the reconciler to properly aggregate events and bump series.count.
423+
annotations := map[string]string{
424+
"event-type": eventType,
425+
"event-source": eventSource,
426+
"event-id": eventID,
427+
}
428+
429+
// Emit the Warning event.
430+
// Using a stable reason ("EventLoop") ensures proper event aggregation.
431+
// K8s will automatically aggregate events with same reason, source, and involved object.
432+
h.eventRecorder.AnnotatedEventf(
433+
&corev1.ObjectReference{
434+
APIVersion: eventingv1.SchemeGroupVersion.String(),
435+
Kind: "Broker",
436+
Name: broker.Name,
437+
Namespace: broker.Namespace,
438+
UID: broker.UID,
439+
},
440+
annotations,
441+
corev1.EventTypeWarning,
442+
"EventLoop",
443+
message,
444+
)
445+
446+
h.Logger.Info("Emitted K8s Warning event for event loop",
447+
zap.String("broker", fmt.Sprintf("%s/%s", broker.Namespace, broker.Name)),
448+
zap.String("eventType", eventType),
449+
zap.String("eventSource", eventSource),
450+
zap.String("eventID", eventID),
451+
)
452+
}

0 commit comments

Comments
 (0)