Skip to content

Commit 2468aea

Browse files
committed
Merge remote-tracking branch 'origin/main' into aws-default-creds
2 parents fb160b5 + bfd6957 commit 2468aea

File tree

12 files changed

+82
-48
lines changed

12 files changed

+82
-48
lines changed

pkg/adapter/apiserver/adapter.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -157,14 +157,16 @@ func (a *apiServerAdapter) startFailFast(ctx context.Context, stopCh <-chan stru
157157
wg.Add(1)
158158
go func() {
159159
defer wg.Done()
160-
if err := reflector.ListAndWatchWithContext(watchCtx); err != nil {
161-
a.logger.Errorf("reflector failed: %v", err)
162-
select {
163-
case errorChan <- fmt.Errorf("failed to watch: %v", err):
164-
default:
165-
}
166-
cancelWatchers()
160+
// ListAndWatch could exit with `nil` under some circumstances, it shouldn't
161+
// ever stop listening and watching, so we will treat any exit as a reason to restart
162+
err := reflector.ListAndWatchWithContext(watchCtx)
163+
a.logger.Error("reflector exited: %v", err)
164+
165+
select {
166+
case errorChan <- fmt.Errorf("failed to watch: %v", err):
167+
default:
167168
}
169+
cancelWatchers()
168170
}()
169171
}
170172
}

pkg/channel/event_receiver.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ func (r *EventReceiver) ServeHTTP(response nethttp.ResponseWriter, request *neth
306306
return
307307
}
308308

309-
/// Here we do the OIDC audience verification
309+
// Here we do the OIDC audience verification
310310
features := feature.FromContext(ctx)
311311
if features.IsOIDCAuthentication() {
312312
r.logger.Debug("OIDC authentication is enabled")

pkg/channel/event_receiver_test.go

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ func TestEventReceiver_ServeHTTP(t *testing.T) {
8888
},
8989
"path based channel reference": {
9090
path: "/new-namespace/new-channel",
91-
host: "test-name.test-namespace.svc." + network.GetClusterDomainName(),
91+
host: host(),
9292
receiverFunc: func(ctx context.Context, r ChannelReference, m event.Event, additionalHeaders nethttp.Header) error {
9393
if r.Namespace != "new-namespace" || r.Name != "new-channel" {
9494
return fmt.Errorf("bad channel reference %v", r)
@@ -107,9 +107,9 @@ func TestEventReceiver_ServeHTTP(t *testing.T) {
107107
"x-requEst-id": {"1234"},
108108
"knatIve-will-pass-through": {"true", "always"},
109109
},
110-
host: "test-name.test-namespace.svc." + network.GetClusterDomainName(),
110+
host: host(),
111111
receiverFunc: func(ctx context.Context, r ChannelReference, e event.Event, additionalHeaders nethttp.Header) error {
112-
if r.Namespace != "test-namespace" || r.Name != "test-name" {
112+
if r.Namespace != "test-namespace" || r.Name != "test-channel" {
113113
return fmt.Errorf("test receiver func -- bad reference: %v", r)
114114
}
115115

@@ -138,7 +138,7 @@ func TestEventReceiver_ServeHTTP(t *testing.T) {
138138
},
139139
"OPTIONS okay": {
140140
method: nethttp.MethodOptions,
141-
host: "test-name.test-namespace.svc." + network.GetClusterDomainName(),
141+
host: host(),
142142
expected: nethttp.StatusOK,
143143
responseValidator: func(res httptest.ResponseRecorder) error {
144144
expectedHeaders := nethttp.Header{
@@ -163,7 +163,7 @@ func TestEventReceiver_ServeHTTP(t *testing.T) {
163163
tc.path = "/"
164164
}
165165
if tc.host == "" {
166-
tc.host = "test-channel.test-namespace.svc." + network.GetClusterDomainName()
166+
tc.host = host()
167167
}
168168

169169
f := tc.receiverFunc
@@ -238,7 +238,7 @@ func TestEventReceiver_ServerStart_trace_propagation(t *testing.T) {
238238
done := make(chan struct{}, 1)
239239

240240
receiverFunc := func(ctx context.Context, r ChannelReference, e event.Event, additionalHeaders nethttp.Header) error {
241-
if r.Namespace != "test-namespace" || r.Name != "test-name" {
241+
if r.Namespace != "test-namespace" || r.Name != "test-channel" {
242242
return fmt.Errorf("test receiver func -- bad reference: %v", r)
243243
}
244244

@@ -253,8 +253,6 @@ func TestEventReceiver_ServerStart_trace_propagation(t *testing.T) {
253253

254254
// Default the common things.
255255
method := nethttp.MethodPost
256-
host := "test-name.test-namespace.svc." + network.GetClusterDomainName()
257-
258256
logger, _ := zap.NewDevelopment()
259257

260258
r, err := NewEventReceiver(receiverFunc, logger)
@@ -276,7 +274,7 @@ func TestEventReceiver_ServerStart_trace_propagation(t *testing.T) {
276274
),
277275
))
278276
require.NoError(t, err)
279-
p.RequestTemplate.Host = host
277+
p.RequestTemplate.Host = host()
280278

281279
c, err := cloudevents.NewClient(p)
282280
require.NoError(t, err)
@@ -291,8 +289,6 @@ func TestEventReceiver_ServerStart_trace_propagation(t *testing.T) {
291289
}
292290

293291
func TestEventReceiver_WrongRequest(t *testing.T) {
294-
host := "http://test-channel.test-namespace.svc." + network.GetClusterDomainName() + "/"
295-
296292
f := func(_ context.Context, _ ChannelReference, _ event.Event, _ nethttp.Header) error {
297293
return errors.New("test induced receiver function error")
298294
}
@@ -301,7 +297,7 @@ func TestEventReceiver_WrongRequest(t *testing.T) {
301297
t.Fatalf("Error creating new event receiver. Error:%s", err)
302298
}
303299

304-
req := httptest.NewRequest(nethttp.MethodPost, host, bytes.NewReader([]byte("{}")))
300+
req := httptest.NewRequest(nethttp.MethodPost, "http://"+host()+"/", bytes.NewReader([]byte("{}")))
305301
req.Header.Set("content-type", "application/json")
306302

307303
res := httptest.ResponseRecorder{}
@@ -313,8 +309,6 @@ func TestEventReceiver_WrongRequest(t *testing.T) {
313309
}
314310

315311
func TestEventReceiver_UnknownHost(t *testing.T) {
316-
host := "http://test-channel.test-namespace.svc." + network.GetClusterDomainName() + "/"
317-
318312
f := func(_ context.Context, _ ChannelReference, _ event.Event, _ nethttp.Header) error {
319313
return errors.New("test induced receiver function error")
320314
}
@@ -335,7 +329,7 @@ func TestEventReceiver_UnknownHost(t *testing.T) {
335329
}
336330

337331
req := httptest.NewRequest("POST", "http://localhost:8080/", nil)
338-
req.Host = host
332+
req.Host = "http://" + host() + "/"
339333

340334
err = http.WriteRequest(context.TODO(), binding.ToMessage(&event), req)
341335
if err != nil {
@@ -349,3 +343,7 @@ func TestEventReceiver_UnknownHost(t *testing.T) {
349343
t.Fatal("Unexpected status code. Expected 404. Actual", res.Code)
350344
}
351345
}
346+
347+
func host() string {
348+
return fmt.Sprintf("test-channel%s.test-namespace.svc.%s", K8ServiceNameSuffix, network.GetClusterDomainName())
349+
}

pkg/channel/references.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@ import (
2121
"strings"
2222
)
2323

24+
const (
25+
// K8ServiceNameSuffix is added to the k8 service name which is owned by the channel
26+
K8ServiceNameSuffix = "-kn-channel"
27+
)
28+
2429
// ChannelReference references a Channel within the cluster by name and
2530
// namespace.
2631
type ChannelReference struct {
@@ -38,6 +43,11 @@ func ParseChannelFromHost(host string) (ChannelReference, error) {
3843
if len(chunks) < 2 {
3944
return ChannelReference{}, BadRequestError(fmt.Sprintf("bad host format %q", host))
4045
}
46+
47+
if channelName, found := strings.CutSuffix(chunks[0], K8ServiceNameSuffix); found {
48+
chunks[0] = channelName
49+
}
50+
4151
return ChannelReference{
4252
Name: chunks[0],
4353
Namespace: chunks[1],

pkg/channel/references_test.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,16 +46,24 @@ func TestParseChannelFromHost(t *testing.T) {
4646
wantErr bool
4747
expectedChannelRef ChannelReference
4848
}{
49+
"host based with channel k8 service suffix": {
50+
host: fmt.Sprintf("%s%s.%s.svc.cluster.local", referencesTestChannelName, K8ServiceNameSuffix, referencesTestNamespace),
51+
wantErr: false,
52+
expectedChannelRef: ChannelReference{
53+
Namespace: referencesTestNamespace,
54+
Name: referencesTestChannelName,
55+
},
56+
},
4957
"host based": {
50-
host: "test-channel.test-namespace.svc.cluster.local",
58+
host: fmt.Sprintf("%s.%s.svc.cluster.local", referencesTestChannelName, referencesTestNamespace),
5159
wantErr: false,
5260
expectedChannelRef: ChannelReference{
53-
Namespace: "test-namespace",
54-
Name: "test-channel",
61+
Namespace: referencesTestNamespace,
62+
Name: referencesTestChannelName,
5563
},
5664
},
5765
"bad host format should return error": {
58-
host: "test-channel",
66+
host: referencesTestChannelName,
5967
wantErr: true,
6068
},
6169
}
@@ -91,11 +99,11 @@ func TestParseChannelFromPath(t *testing.T) {
9199
expectedChannelRef ChannelReference
92100
}{
93101
"path based": {
94-
path: "/new-namespace/new-channel/",
102+
path: fmt.Sprintf("/%s/%s/", referencesTestNamespace, referencesTestChannelName),
95103
wantErr: false,
96104
expectedChannelRef: ChannelReference{
97-
Namespace: "new-namespace",
98-
Name: "new-channel",
105+
Namespace: referencesTestNamespace,
106+
Name: referencesTestChannelName,
99107
},
100108
},
101109

pkg/reconciler/inmemorychannel/controller/inmemorychannel_test.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"knative.dev/pkg/resolver"
3434

3535
"knative.dev/eventing/pkg/apis/feature"
36+
"knative.dev/eventing/pkg/channel"
3637
fakeeventingclient "knative.dev/eventing/pkg/client/injection/client/fake"
3738
"knative.dev/eventing/pkg/eventingtls"
3839
"knative.dev/eventing/pkg/eventingtls/eventingtlstesting"
@@ -70,6 +71,7 @@ const (
7071
dlsName = "test-dls"
7172
testNS = "test-namespace"
7273
imcName = "test-imc"
74+
imcSvcName = imcName + channel.K8ServiceNameSuffix
7375
imageName = "test-image"
7476
maxIdleConns = 2000
7577
maxIdleConnsPerHost = 200
@@ -83,7 +85,7 @@ const (
8385
var (
8486
channelServiceAddress = duckv1.Addressable{
8587
Name: pointer.String("http"),
86-
URL: apis.HTTP("test-imc-kn-channel.test-namespace.svc.cluster.local"),
88+
URL: apis.HTTP(fmt.Sprintf("%s.test-namespace.svc.cluster.local", imcSvcName)),
8789
}
8890

8991
channelAudience = fmt.Sprintf("messaging.knative.dev/inmemorychannel/%s/%s", testNS, imcName)
@@ -426,11 +428,11 @@ func TestAllCases(t *testing.T) {
426428
WithInMemoryChannelDeploymentReady(),
427429
WithInMemoryChannelServiceReady(),
428430
WithInMemoryChannelEndpointsReady(),
429-
WithInMemoryChannelChannelServiceNotReady("ChannelServiceFailed", `Channel Service failed: inmemorychannel: test-namespace/test-imc does not own Service: "test-imc-kn-channel"`),
431+
WithInMemoryChannelChannelServiceNotReady("ChannelServiceFailed", fmt.Sprintf(`Channel Service failed: inmemorychannel: test-namespace/test-imc does not own Service: "%s"`, imcSvcName)),
430432
),
431433
}},
432434
WantEvents: []string{
433-
Eventf(corev1.EventTypeWarning, "InternalError", `inmemorychannel: test-namespace/test-imc does not own Service: "test-imc-kn-channel"`),
435+
Eventf(corev1.EventTypeWarning, "InternalError", fmt.Sprintf(`inmemorychannel: test-namespace/test-imc does not own Service: "%s"`, imcSvcName)),
434436
},
435437
}, {
436438
Name: "Works, channel exists with subscribers",
@@ -986,7 +988,7 @@ func makeChannelService(imc *v1.InMemoryChannel) *corev1.Service {
986988
},
987989
ObjectMeta: metav1.ObjectMeta{
988990
Namespace: testNS,
989-
Name: fmt.Sprintf("%s-kn-channel", imcName),
991+
Name: imcSvcName,
990992
Labels: map[string]string{
991993
resources.MessagingRoleLabel: resources.MessagingRole,
992994
},
@@ -1016,7 +1018,7 @@ func makeChannelServiceNotOwnedByUs(imc *v1.InMemoryChannel) *corev1.Service {
10161018
},
10171019
ObjectMeta: metav1.ObjectMeta{
10181020
Namespace: testNS,
1019-
Name: fmt.Sprintf("%s-kn-channel", imcName),
1021+
Name: imcSvcName,
10201022
Labels: map[string]string{
10211023
resources.MessagingRoleLabel: resources.MessagingRole,
10221024
},

pkg/reconciler/inmemorychannel/controller/resources/service.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
corev1 "k8s.io/api/core/v1"
2121
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2222
v1 "knative.dev/eventing/pkg/apis/messaging/v1"
23+
"knative.dev/eventing/pkg/channel"
2324
"knative.dev/pkg/kmeta"
2425
"knative.dev/pkg/network"
2526
)
@@ -35,7 +36,7 @@ const (
3536
type K8sServiceOption func(*corev1.Service) error
3637

3738
func CreateChannelServiceName(name string) string {
38-
return kmeta.ChildName(name, "-kn-channel")
39+
return kmeta.ChildName(name, channel.K8ServiceNameSuffix)
3940
}
4041

4142
// ExternalService is a functional option for CreateK8sService to create a K8s service of type ExternalName

pkg/reconciler/inmemorychannel/controller/resources/service_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,21 @@ package resources
1818

1919
import (
2020
"errors"
21-
"fmt"
2221
"testing"
2322

2423
"github.com/google/go-cmp/cmp"
2524
corev1 "k8s.io/api/core/v1"
2625
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2726
v1 "knative.dev/eventing/pkg/apis/messaging/v1"
27+
"knative.dev/eventing/pkg/channel"
2828
"knative.dev/pkg/kmeta"
2929
"knative.dev/pkg/network"
3030
)
3131

3232
const (
3333
serviceName = "my-test-service"
3434
imcName = "my-test-imc"
35+
imcSvcName = imcName + channel.K8ServiceNameSuffix
3536
testNS = "my-test-ns"
3637
dispatcherNS = "dispatcher-namespace"
3738
dispatcherName = "dispatcher-name"
@@ -44,7 +45,7 @@ func TestCreateExternalServiceAddress(t *testing.T) {
4445
}
4546

4647
func TestCreateChannelServiceAddress(t *testing.T) {
47-
if want, got := "my-test-imc-kn-channel", CreateChannelServiceName(imcName); want != got {
48+
if want, got := imcSvcName, CreateChannelServiceName(imcName); want != got {
4849
t.Errorf("Want: %q got %q", want, got)
4950
}
5051
}
@@ -62,7 +63,7 @@ func TestNewK8sService(t *testing.T) {
6263
Kind: "Service",
6364
},
6465
ObjectMeta: metav1.ObjectMeta{
65-
Name: fmt.Sprintf("%s-kn-channel", imcName),
66+
Name: CreateChannelServiceName(imcName),
6667
Namespace: testNS,
6768
Labels: map[string]string{
6869
MessagingRoleLabel: MessagingRole,
@@ -105,7 +106,7 @@ func TestNewK8sServiceWithExternal(t *testing.T) {
105106
Kind: "Service",
106107
},
107108
ObjectMeta: metav1.ObjectMeta{
108-
Name: fmt.Sprintf("%s-kn-channel", imcName),
109+
Name: CreateChannelServiceName(imcName),
109110
Namespace: testNS,
110111
Labels: map[string]string{
111112
MessagingRoleLabel: MessagingRole,

pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package dispatcher
1818

1919
import (
2020
"context"
21+
"fmt"
2122
"net/http"
2223
"reflect"
2324
"testing"
@@ -48,6 +49,7 @@ import (
4849
"knative.dev/eventing/pkg/apis/feature"
4950
v1 "knative.dev/eventing/pkg/apis/messaging/v1"
5051
"knative.dev/eventing/pkg/auth"
52+
"knative.dev/eventing/pkg/channel"
5153
"knative.dev/eventing/pkg/channel/fanout"
5254
fakeeventingclient "knative.dev/eventing/pkg/client/injection/client/fake"
5355
"knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1/inmemorychannel"
@@ -68,7 +70,7 @@ const (
6870

6971
var (
7072
channelServiceAddress = duckv1.Addressable{
71-
URL: apis.HTTP("test-imc-kn-channel.test-namespace.svc.cluster.local"),
73+
URL: apis.HTTP(fmt.Sprintf("%s%s.%s.svc.cluster.local", imcName, channel.K8ServiceNameSuffix, testNS)),
7274
}
7375

7476
linear = eventingduckv1.BackoffPolicyLinear

pkg/reconciler/integration/sink/resources/container_image.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ func MakeDeploymentSpec(sink *v1alpha1.IntegrationSink, authProxyImage string, f
124124
deploy.Spec.Template.Spec.Containers = append(deploy.Spec.Template.Spec.Containers, corev1.Container{
125125
Name: "auth-proxy",
126126
Image: authProxyImage,
127-
ImagePullPolicy: corev1.PullAlways,
127+
ImagePullPolicy: corev1.PullIfNotPresent,
128128
Ports: []corev1.ContainerPort{
129129
{
130130
ContainerPort: 3128,

0 commit comments

Comments
 (0)