Skip to content
This repository was archived by the owner on Nov 3, 2025. It is now read-only.

Commit e3d92d7

Browse files
committed
Use CRC32 instead of comparing flows directly
Signed-off-by: Ricardo Zanini <[email protected]>
1 parent c12f3da commit e3d92d7

File tree

10 files changed

+81
-9
lines changed

10 files changed

+81
-9
lines changed

api/v1alpha08/sonataflow_types.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,8 @@ type SonataFlowStatus struct {
200200
// Triggers list of triggers created for the SonataFlow
201201
//+operator-sdk:csv:customresourcedefinitions:type=status,displayName="triggers"
202202
Triggers []SonataFlowTriggerRef `json:"triggers,omitempty"`
203+
//+operator-sdk:csv:customresourcedefinitions:type=status,displayName="flowRevision"
204+
FlowCRC uint32 `json:"flowCRC,omitempty"`
203205
}
204206

205207
// SonataFlowTriggerRef defines a trigger created for the SonataFlow.

bundle/manifests/sonataflow.org_sonataflows.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10050,6 +10050,9 @@ spec:
1005010050
endpoint:
1005110051
description: Endpoint is an externally accessible URL of the workflow
1005210052
type: string
10053+
flowCRC:
10054+
format: int32
10055+
type: integer
1005310056
lastTimeRecoverAttempt:
1005410057
format: date-time
1005510058
type: string

config/crd/bases/sonataflow.org_sonataflows.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10050,6 +10050,9 @@ spec:
1005010050
endpoint:
1005110051
description: Endpoint is an externally accessible URL of the workflow
1005210052
type: string
10053+
flowCRC:
10054+
format: int32
10055+
type: integer
1005310056
lastTimeRecoverAttempt:
1005410057
format: date-time
1005510058
type: string

config/manifests/bases/sonataflow-operator.clusterserviceversion.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,8 @@ spec:
239239
- description: Endpoint is an externally accessible URL of the workflow
240240
displayName: endpoint
241241
path: endpoint
242+
- displayName: flowRevision
243+
path: flowCRC
242244
- displayName: lastTimeRecoverAttempt
243245
path: lastTimeRecoverAttempt
244246
- description: Platform displays which platform is being used by this workflow

internal/controller/profiles/common/reconciler.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import (
2323
"context"
2424
"fmt"
2525

26+
"github.com/apache/incubator-kie-kogito-serverless-operator/utils"
27+
2628
"k8s.io/client-go/rest"
2729

2830
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/discovery"
@@ -56,6 +58,10 @@ func (s *StateSupport) PerformStatusUpdate(ctx context.Context, workflow *operat
5658
return false, err
5759
}
5860
workflow.Status.ObservedGeneration = workflow.Generation
61+
workflow.Status.FlowCRC, err = utils.Crc32Checksum(workflow.Spec.Flow)
62+
if err != nil {
63+
return false, err
64+
}
5965
services.SetServiceUrlsInWorkflowStatus(pl, workflow)
6066
if workflow.Status.Platform == nil {
6167
workflow.Status.Platform = &operatorapi.SonataFlowPlatformRef{}

internal/controller/profiles/preview/states_preview.go

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ package preview
2222
import (
2323
"context"
2424
"fmt"
25-
"reflect"
2625

26+
"github.com/apache/incubator-kie-kogito-serverless-operator/utils"
2727
corev1 "k8s.io/api/core/v1"
2828
"k8s.io/apimachinery/pkg/api/errors"
2929
"k8s.io/klog/v2"
@@ -199,7 +199,11 @@ func (h *deployWithBuildWorkflowState) Do(ctx context.Context, workflow *operato
199199
return ctrl.Result{}, nil, err
200200
}
201201

202-
if h.isWorkflowChanged(workflow) { // Let's check that the 2 resWorkflowDef definition are different
202+
hasChanged, err := h.isWorkflowChanged(workflow)
203+
if err != nil {
204+
return ctrl.Result{}, nil, err
205+
}
206+
if hasChanged { // Let's check that the 2 resWorkflowDef definition are different
203207
if err = buildManager.MarkToRestart(build); err != nil {
204208
return ctrl.Result{}, nil, err
205209
}
@@ -226,10 +230,15 @@ func (h *deployWithBuildWorkflowState) PostReconcile(ctx context.Context, workfl
226230
}
227231

228232
// isWorkflowChanged checks whether the contents of .spec.flow of the given workflow has changed.
229-
func (h *deployWithBuildWorkflowState) isWorkflowChanged(workflow *operatorapi.SonataFlow) bool {
230-
serverlessWorkflow := &operatorapi.SonataFlow{}
231-
if err := h.C.Get(context.TODO(), client.ObjectKeyFromObject(workflow), serverlessWorkflow); err != nil {
232-
klog.V(log.E).ErrorS(err, "unable to retrieve SonataFlow definition")
233+
func (h *deployWithBuildWorkflowState) isWorkflowChanged(workflow *operatorapi.SonataFlow) (bool, error) {
234+
// Added this guard for backward compatibility for workflows deployed with a previous operator version, so we won't kick thousands of builds on users' cluster.
235+
// After this reconciliation cycle, the CRC should be updated
236+
if workflow.Status.FlowCRC == 0 {
237+
return false, nil
238+
}
239+
actualCRC, err := utils.Crc32Checksum(workflow.Spec.Flow)
240+
if err != nil {
241+
return false, err
233242
}
234-
return !reflect.DeepEqual(&serverlessWorkflow.Spec.Flow, &workflow.Spec.Flow)
243+
return actualCRC != workflow.Status.FlowCRC, nil
235244
}

internal/controller/profiles/preview/states_preview_test.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ package preview
2020
import (
2121
"testing"
2222

23+
"github.com/apache/incubator-kie-kogito-serverless-operator/utils"
24+
2325
"github.com/apache/incubator-kie-kogito-serverless-operator/internal/controller/profiles/common"
2426
"github.com/apache/incubator-kie-kogito-serverless-operator/test"
2527
"github.com/serverlessworkflow/sdk-go/v2/model"
@@ -29,11 +31,15 @@ import (
2931
func Test_deployWithBuildWorkflowState_isWorkflowChanged(t *testing.T) {
3032
workflow1 := test.GetBaseSonataFlow(t.Name())
3133
workflow2 := test.GetBaseSonataFlow(t.Name())
34+
workflow1.Status.FlowCRC, _ = utils.Crc32Checksum(workflow1.Spec.Flow)
35+
workflow2.Status.FlowCRC, _ = utils.Crc32Checksum(workflow2.Spec.Flow)
3236
deployWithBuildWorkflowState := &deployWithBuildWorkflowState{
3337
StateSupport: &common.StateSupport{C: test.NewSonataFlowClientBuilder().WithRuntimeObjects(workflow1).Build()},
3438
}
3539

36-
assert.False(t, deployWithBuildWorkflowState.isWorkflowChanged(workflow2))
40+
hasChanged, err := deployWithBuildWorkflowState.isWorkflowChanged(workflow2)
41+
assert.NoError(t, err)
42+
assert.False(t, hasChanged)
3743

3844
// change workflow2
3945
workflow2.Spec.Flow.Metadata = model.Metadata{
@@ -42,5 +48,7 @@ func Test_deployWithBuildWorkflowState_isWorkflowChanged(t *testing.T) {
4248
},
4349
}
4450

45-
assert.True(t, deployWithBuildWorkflowState.isWorkflowChanged(workflow2))
51+
hasChanged, err = deployWithBuildWorkflowState.isWorkflowChanged(workflow2)
52+
assert.NoError(t, err)
53+
assert.True(t, hasChanged)
4654
}

operator.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27477,6 +27477,9 @@ spec:
2747727477
endpoint:
2747827478
description: Endpoint is an externally accessible URL of the workflow
2747927479
type: string
27480+
flowCRC:
27481+
format: int32
27482+
type: integer
2748027483
lastTimeRecoverAttempt:
2748127484
format: date-time
2748227485
type: string

test/yaml.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ import (
2727
"runtime"
2828
"strings"
2929

30+
"github.com/apache/incubator-kie-kogito-serverless-operator/utils"
31+
3032
"github.com/apache/incubator-kie-kogito-serverless-operator/api"
3133
operatorapi "github.com/apache/incubator-kie-kogito-serverless-operator/api/v1alpha08"
3234
"github.com/apache/incubator-kie-kogito-serverless-operator/log"
@@ -71,6 +73,7 @@ func GetSonataFlow(testFile, namespace string) *operatorapi.SonataFlow {
7173
GetKubernetesResource(testFile, ksw)
7274
klog.V(log.D).InfoS("Successfully read KSW", "ksw", spew.Sprint(ksw))
7375
ksw.Namespace = namespace
76+
ksw.Status.FlowCRC, _ = utils.Crc32Checksum(ksw.Spec.Flow)
7477
return ksw
7578
}
7679

utils/crc.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package utils
19+
20+
import (
21+
"bytes"
22+
"encoding/gob"
23+
"hash/crc32"
24+
)
25+
26+
func Crc32Checksum(v interface{}) (uint32, error) {
27+
var buf bytes.Buffer
28+
enc := gob.NewEncoder(&buf)
29+
if err := enc.Encode(v); err != nil {
30+
return 0, err
31+
}
32+
return crc32.ChecksumIEEE(buf.Bytes()), nil
33+
}

0 commit comments

Comments
 (0)