Skip to content

Commit 0571391

Browse files
authored
feat(serviceintegration): adopt existing integrations instead of failing (#1042)
1 parent f92707e commit 0571391

File tree

8 files changed

+534
-27
lines changed

8 files changed

+534
-27
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
## [MAJOR.MINOR.PATCH] - YYYY-MM-DD
44

5+
- Change `ServiceIntegration`: operator now adopts existing integrations instead of failing with conflict
56
- Fix `PostgreSQL`: added retry logic for errors during upgrade task
67

78
## v0.33.1 - 2025-10-08

api/v1alpha1/serviceintegration_types.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,9 @@ type ServiceIntegrationStatus struct {
103103
// +kubebuilder:object:root=true
104104
// +kubebuilder:subresource:status
105105

106-
// ServiceIntegration is the Schema for the serviceintegrations API
106+
// ServiceIntegration is the Schema for the serviceintegrations API.
107+
//
108+
// info "Adoption of existing integrations": If a ServiceIntegration resource is created with configuration matching an existing Aiven integration (created outside the operator), the operator will adopt the existing integration.
107109
// +kubebuilder:printcolumn:name="Project",type="string",JSONPath=".spec.project"
108110
// +kubebuilder:printcolumn:name="Type",type="string",JSONPath=".spec.integrationType"
109111
// +kubebuilder:printcolumn:name="Source Service Name",type="string",JSONPath=".spec.sourceServiceName"

charts/aiven-operator-crds/templates/aiven.io_serviceintegrations.yaml

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,11 @@ spec:
3636
name: v1alpha1
3737
schema:
3838
openAPIV3Schema:
39-
description:
40-
ServiceIntegration is the Schema for the serviceintegrations
41-
API
39+
description: |-
40+
ServiceIntegration is the Schema for the serviceintegrations API.
41+
42+
43+
info "Adoption of existing integrations": If a ServiceIntegration resource is created with configuration matching an existing Aiven integration (created outside the operator), the operator will adopt the existing integration.
4244
properties:
4345
apiVersion:
4446
description: |-

config/crd/bases/aiven.io_serviceintegrations.yaml

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,11 @@ spec:
3636
name: v1alpha1
3737
schema:
3838
openAPIV3Schema:
39-
description:
40-
ServiceIntegration is the Schema for the serviceintegrations
41-
API
39+
description: |-
40+
ServiceIntegration is the Schema for the serviceintegrations API.
41+
42+
43+
info "Adoption of existing integrations": If a ServiceIntegration resource is created with configuration matching an existing Aiven integration (created outside the operator), the operator will adopt the existing integration.
4244
properties:
4345
apiVersion:
4446
description: |-

controllers/serviceintegration_controller.go

Lines changed: 109 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
avngen "github.com/aiven/go-client-codegen"
1212
"github.com/aiven/go-client-codegen/handler/service"
1313
"github.com/avast/retry-go"
14+
"github.com/google/go-cmp/cmp"
1415
corev1 "k8s.io/api/core/v1"
1516
"k8s.io/apimachinery/pkg/api/meta"
1617
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -57,31 +58,43 @@ func (h ServiceIntegrationHandler) createOrUpdate(ctx context.Context, avnGen av
5758
}
5859

5960
if si.Status.ID == "" {
60-
userConfigMap, err := CreateUserConfiguration(userConfig)
61+
existingIntegration, err := h.findExistingIntegration(ctx, avnGen, si)
6162
if err != nil {
62-
return err
63+
return fmt.Errorf("failed to check for existing integration: %w", err)
6364
}
6465

65-
integration, err := avnGen.ServiceIntegrationCreate(
66-
ctx,
67-
si.Spec.Project,
68-
&service.ServiceIntegrationCreateIn{
69-
DestEndpointId: NilIfZero(si.Spec.DestinationEndpointID),
70-
DestService: NilIfZero(si.Spec.DestinationServiceName),
71-
DestProject: NilIfZero(si.Spec.DestinationProjectName),
72-
IntegrationType: si.Spec.IntegrationType,
73-
SourceEndpointId: NilIfZero(si.Spec.SourceEndpointID),
74-
SourceService: NilIfZero(si.Spec.SourceServiceName),
75-
SourceProject: NilIfZero(si.Spec.SourceProjectName),
76-
UserConfig: &userConfigMap,
77-
},
78-
)
79-
if err != nil {
80-
return fmt.Errorf("cannot createOrUpdate service integration: %w", err)
66+
if existingIntegration != nil {
67+
si.Status.ID = existingIntegration.ServiceIntegrationId // adopt existing
68+
} else {
69+
userConfigMap, err := CreateUserConfiguration(userConfig)
70+
if err != nil {
71+
return err
72+
}
73+
74+
integration, err := avnGen.ServiceIntegrationCreate(
75+
ctx,
76+
si.Spec.Project,
77+
&service.ServiceIntegrationCreateIn{
78+
DestEndpointId: NilIfZero(si.Spec.DestinationEndpointID),
79+
DestService: NilIfZero(si.Spec.DestinationServiceName),
80+
DestProject: NilIfZero(si.Spec.DestinationProjectName),
81+
IntegrationType: si.Spec.IntegrationType,
82+
SourceEndpointId: NilIfZero(si.Spec.SourceEndpointID),
83+
SourceService: NilIfZero(si.Spec.SourceServiceName),
84+
SourceProject: NilIfZero(si.Spec.SourceProjectName),
85+
UserConfig: &userConfigMap,
86+
},
87+
)
88+
if err != nil {
89+
return fmt.Errorf("cannot createOrUpdate service integration: %w", err)
90+
}
91+
92+
si.Status.ID = integration.ServiceIntegrationId
93+
return nil
8194
}
95+
}
8296

83-
si.Status.ID = integration.ServiceIntegrationId
84-
} else {
97+
if si.Status.ID != "" {
8598
if !si.HasUserConfig() {
8699
return nil
87100
}
@@ -199,3 +212,79 @@ func (h ServiceIntegrationHandler) convert(i client.Object) (*v1alpha1.ServiceIn
199212

200213
return si, nil
201214
}
215+
216+
// findExistingIntegration checks if an integration with matching configuration already exists on Aiven.
217+
func (h ServiceIntegrationHandler) findExistingIntegration(
218+
ctx context.Context,
219+
avnGen avngen.Client,
220+
si *v1alpha1.ServiceIntegration,
221+
) (*service.ServiceIntegrationOut, error) {
222+
if si.Spec.SourceServiceName == "" {
223+
return nil, nil // integration with only endpoints, cannot list integrations
224+
}
225+
226+
sourceProject := si.Spec.SourceProjectName
227+
if sourceProject == "" {
228+
sourceProject = si.Spec.Project
229+
}
230+
231+
svc, err := avnGen.ServiceGet(ctx, sourceProject, si.Spec.SourceServiceName)
232+
if err != nil {
233+
return nil, fmt.Errorf("failed to get service %s/%s: %w", sourceProject, si.Spec.SourceServiceName, err)
234+
}
235+
236+
for _, integration := range svc.ServiceIntegrations {
237+
if h.integrationMatches(&integration, si) {
238+
return &integration, nil
239+
}
240+
}
241+
242+
return nil, nil
243+
}
244+
245+
type integrationKey struct {
246+
IntegrationType service.IntegrationType
247+
SourceService string
248+
SourceProject string
249+
SourceEndpointID *string
250+
DestService *string
251+
DestProject string
252+
DestEndpointID *string
253+
}
254+
255+
func (h ServiceIntegrationHandler) integrationMatches(
256+
existing *service.ServiceIntegrationOut,
257+
desired *v1alpha1.ServiceIntegration,
258+
) bool {
259+
sourceProject := desired.Spec.SourceProjectName
260+
if sourceProject == "" {
261+
sourceProject = desired.Spec.Project
262+
}
263+
264+
destProject := desired.Spec.DestinationProjectName
265+
if destProject == "" {
266+
destProject = desired.Spec.Project
267+
}
268+
269+
existingKey := integrationKey{
270+
IntegrationType: existing.IntegrationType,
271+
SourceService: existing.SourceService,
272+
SourceProject: existing.SourceProject,
273+
SourceEndpointID: existing.SourceEndpointId,
274+
DestService: existing.DestService,
275+
DestProject: existing.DestProject,
276+
DestEndpointID: existing.DestEndpointId,
277+
}
278+
279+
desiredKey := integrationKey{
280+
IntegrationType: desired.Spec.IntegrationType,
281+
SourceService: desired.Spec.SourceServiceName,
282+
SourceProject: sourceProject,
283+
SourceEndpointID: NilIfZero(desired.Spec.SourceEndpointID),
284+
DestService: NilIfZero(desired.Spec.DestinationServiceName),
285+
DestProject: destProject,
286+
DestEndpointID: NilIfZero(desired.Spec.DestinationEndpointID),
287+
}
288+
289+
return cmp.Equal(existingKey, desiredKey)
290+
}

0 commit comments

Comments
 (0)