diff --git a/internal/xelon/load_balancers.go b/internal/xelon/load_balancers.go index c72ef20..9b9038a 100644 --- a/internal/xelon/load_balancers.go +++ b/internal/xelon/load_balancers.go @@ -415,92 +415,100 @@ func (l *loadBalancers) updateLoadBalancer(ctx context.Context, xlb *xelonLoadBa patcher := newServicePatcher(l.client.k8s, service) defer func() { _ = patcher.Patch(ctx) }() - var definedForwardingRuleIDs []string + // get current state + var currentForwardingRules []xelon.LoadBalancerClusterForwardingRule + var currentForwardingRuleIDs []string if forwardingRuleIDs, ok := service.Annotations[serviceAnnotationLoadBalancerClusterForwardingRuleIDs]; ok && forwardingRuleIDs != "" { - definedForwardingRuleIDs = strings.Split(forwardingRuleIDs, ",") + currentForwardingRuleIDs = strings.Split(forwardingRuleIDs, ",") } - existingForwardingRules, _, err := l.client.xelon.LoadBalancerClusters.ListForwardingRules(ctx, xlb.clusterID, xlb.virtualIPID) if err != nil { return err } - var definedForwardingRules []xelon.LoadBalancerClusterForwardingRule for _, existingForwardingRule := range existingForwardingRules { - if slices.Contains(definedForwardingRuleIDs, existingForwardingRule.Frontend.ID) { - definedForwardingRules = append(definedForwardingRules, existingForwardingRule) + if slices.Contains(currentForwardingRuleIDs, existingForwardingRule.Frontend.ID) { + currentForwardingRules = append(currentForwardingRules, existingForwardingRule) } } + logger.Info("Fetched current state for forwarding rules", "current_forwarding_rules", currentForwardingRules) - xlbUsedPorts := make(map[int]bool, len(definedForwardingRules)) - for _, definedForwardingRule := range definedForwardingRules { - if definedForwardingRule.Frontend == nil { - continue + // get desired state + var desiredForwardingRules []xelon.LoadBalancerClusterForwardingRule + for _, port := range service.Spec.Ports { + portNo := int(port.Port) + forwardingRule := xelon.LoadBalancerClusterForwardingRule{ + Backend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: int(port.NodePort)}, + Frontend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: portNo}, } - xlbUsedPorts[definedForwardingRule.Frontend.Port] = true + desiredForwardingRules = append(desiredForwardingRules, forwardingRule) } + logger.Info("Calculated desired state for forwarding rules", "desired_forwarding_rules", desiredForwardingRules) - var frontendRules []string - for _, port := range service.Spec.Ports { - portNo := int(port.Port) - portExists := xlbUsedPorts[portNo] - delete(xlbUsedPorts, portNo) - - if portExists { - forwardingRuleID := "" - for _, definedForwardingRule := range definedForwardingRules { - if slices.Contains(definedForwardingRuleIDs, definedForwardingRule.Frontend.ID) { - forwardingRuleID = definedForwardingRule.Frontend.ID - } + // calculate diff (reconcile) + reconcileDiff := reconcile(currentForwardingRules, desiredForwardingRules) + logger.Info("Calculate reconcile state", + "rules_to_create", reconcileDiff.rulesToCreate, + "rules_to_update", reconcileDiff.rulesToUpdate, + "rules_to_delete", reconcileDiff.rulesToDelete, + ) + var frontendRuleIDs []string + if len(reconcileDiff.rulesToCreate) > 0 { + logger.Info("Creating new forwarding rules", "payload", reconcileDiff.rulesToCreate) + rules, _, err := l.client.xelon.LoadBalancerClusters.CreateForwardingRules(ctx, xlb.clusterID, xlb.virtualIPID, reconcileDiff.rulesToCreate) + if err != nil { + return err + } + for _, rule := range rules { + if rule.Frontend == nil { + continue } - logger.Info("Update existing forwarding rule", "port", portNo, "id", forwardingRuleID) - updateRequest := &xelon.LoadBalancerClusterForwardingRuleConfiguration{ - Port: portNo, - } - _, _, err := l.client.xelon.LoadBalancerClusters.UpdateForwardingRule(ctx, xlb.clusterID, xlb.virtualIPID, forwardingRuleID, updateRequest) + frontendRuleIDs = append(frontendRuleIDs, rule.Frontend.ID) + } + } + + if len(reconcileDiff.rulesToUpdate) > 0 { + for _, ruleToUpdate := range reconcileDiff.rulesToUpdate { + logger.Info("Updating existing forwarding backend rule", "payload", ruleToUpdate.Backend) + _, _, err := l.client.xelon.LoadBalancerClusters.UpdateForwardingRule(ctx, xlb.clusterID, xlb.virtualIPID, ruleToUpdate.Backend.ID, ruleToUpdate.Backend) if err != nil { return err } - frontendRules = append(frontendRules, forwardingRuleID) - } else { - logger.Info("Create new forwarding rule", "port", portNo) - - forwardingRule := []xelon.LoadBalancerClusterForwardingRule{{ - Backend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: int(port.NodePort)}, - Frontend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: portNo}, - }} + } + } - rules, _, err := l.client.xelon.LoadBalancerClusters.CreateForwardingRules(ctx, xlb.clusterID, xlb.virtualIPID, forwardingRule) - if err != nil { - return err + if len(reconcileDiff.rulesToDelete) > 0 { + logger.Info("Deleting forwarding rules", "payload", reconcileDiff.rulesToDelete) + for _, ruleToDelete := range reconcileDiff.rulesToDelete { + if ruleToDelete.Frontend == nil { + continue } - for _, rule := range rules { - if rule.Frontend != nil { - frontendRules = append(frontendRules, rule.Frontend.ID) + resp, err := l.client.xelon.LoadBalancerClusters.DeleteForwardingRule(ctx, xlb.clusterID, xlb.virtualIPID, ruleToDelete.Frontend.ID) + if err != nil { + if resp != nil && resp.StatusCode == http.StatusNotFound { + logger.Info("Skipped removing not existing forwarding rule", "forwarding_rule_id", ruleToDelete.Frontend.ID) + } else { + return err } } } } - // remove any leftovers ports - for port := range xlbUsedPorts { - for _, existingForwardingRule := range existingForwardingRules { - if existingForwardingRule.Frontend.Port == port { - logger.Info("Deleting leftover forwarding rule", "port", port, "frontend_id", existingForwardingRule.Frontend.ID) - resp, err := l.client.xelon.LoadBalancerClusters.DeleteForwardingRule(ctx, xlb.clusterID, xlb.virtualIPID, existingForwardingRule.Frontend.ID) - if err != nil { - if resp != nil && resp.StatusCode == http.StatusNotFound { - logger.Info("Skipping not existing forwarding rule", "forwarding_rule_id", existingForwardingRule.Frontend.ID) - } else { - return err - } - } + // normalize ids + forwardingRuleIDs := slices.Concat(currentForwardingRuleIDs, frontendRuleIDs) + slices.Sort(forwardingRuleIDs) + forwardingRuleIDs = slices.Compact(forwardingRuleIDs) + if len(reconcileDiff.rulesToDelete) > 0 { + for _, ruleToDelete := range reconcileDiff.rulesToDelete { + if ruleToDelete.Frontend == nil { + continue } + forwardingRuleIDs = slices.DeleteFunc(forwardingRuleIDs, deleteByFrontendID(ruleToDelete.Frontend.ID)) } } - logger.Info("Applying forwarding rules annotation", "forwarding_rules_ids", strings.Join(frontendRules, ",")) - updateServiceAnnotation(service, serviceAnnotationLoadBalancerClusterForwardingRuleIDs, strings.Join(frontendRules, ",")) + logger.Info("Applying forwarding rules annotation", "forwarding_rules_ids", strings.Join(forwardingRuleIDs, ",")) + updateServiceAnnotation(service, serviceAnnotationLoadBalancerClusterForwardingRuleIDs, strings.Join(forwardingRuleIDs, ",")) return nil } @@ -524,16 +532,16 @@ func isVirtualIPAvailable(virtualIP *xelon.LoadBalancerClusterVirtualIP, forward } // combine all frontend ports, so we can check it later - var frontedPorts []int32 + var frontendPorts []int32 for _, forwardingRule := range forwardingRules { if forwardingRule.Frontend != nil { - frontedPorts = append(frontedPorts, int32(forwardingRule.Frontend.Port)) + frontendPorts = append(frontendPorts, int32(forwardingRule.Frontend.Port)) } } // check if service's ports are already configured in forwarding rules for _, servicePort := range service.Spec.Ports { - if slices.Contains(frontedPorts, servicePort.Port) { + if slices.Contains(frontendPorts, servicePort.Port) { return false } } diff --git a/internal/xelon/load_balancers_reconciler.go b/internal/xelon/load_balancers_reconciler.go new file mode 100644 index 0000000..8c6f47c --- /dev/null +++ b/internal/xelon/load_balancers_reconciler.go @@ -0,0 +1,80 @@ +package xelon + +import ( + "slices" + + "github.com/Xelon-AG/xelon-sdk-go/xelon" +) + +type ReconcileDiff struct { + rulesToCreate []xelon.LoadBalancerClusterForwardingRule + rulesToUpdate []xelon.LoadBalancerClusterForwardingRule + rulesToDelete []xelon.LoadBalancerClusterForwardingRule +} + +func reconcile(currentRules []xelon.LoadBalancerClusterForwardingRule, desiredRules []xelon.LoadBalancerClusterForwardingRule) ReconcileDiff { + reconcileDiff := ReconcileDiff{} + + // first find rules to create + for _, desiredRule := range desiredRules { + found := false + for _, currentRule := range currentRules { + if currentRule.Frontend == nil || desiredRule.Frontend == nil { + continue + } + if currentRule.Frontend.Port == desiredRule.Frontend.Port { + found = true + } + } + if !found { + reconcileDiff.rulesToCreate = append(reconcileDiff.rulesToCreate, desiredRule) + } + } + + // update case: iterate over current rules and find rules with the same frontend port but different backend port + for _, currentRule := range currentRules { + for _, desiredRule := range desiredRules { + if currentRule.Frontend == nil || desiredRule.Frontend == nil { + continue + } + if currentRule.Frontend.Port == desiredRule.Frontend.Port && + currentRule.Backend.Port != desiredRule.Backend.Port { + desiredRule.Frontend.ID = currentRule.Frontend.ID + desiredRule.Backend.ID = currentRule.Backend.ID + reconcileDiff.rulesToUpdate = append(reconcileDiff.rulesToUpdate, desiredRule) + break + } + } + } + + // delete + for _, currentRule := range currentRules { + if slices.ContainsFunc(reconcileDiff.rulesToCreate, compareByFrontendPorts(currentRule)) { + continue + } + if slices.ContainsFunc(reconcileDiff.rulesToUpdate, compareByFrontendPorts(currentRule)) { + continue + } + if slices.ContainsFunc(desiredRules, compareByFrontendPorts(currentRule)) { + continue + } + reconcileDiff.rulesToDelete = append(reconcileDiff.rulesToDelete, currentRule) + } + + return reconcileDiff +} + +func compareByFrontendPorts(first xelon.LoadBalancerClusterForwardingRule) func(xelon.LoadBalancerClusterForwardingRule) bool { + return func(second xelon.LoadBalancerClusterForwardingRule) bool { + if first.Frontend == nil || second.Frontend == nil { + return false + } + return first.Frontend.Port == second.Frontend.Port + } +} + +func deleteByFrontendID(firstID string) func(string) bool { + return func(secondID string) bool { + return firstID == secondID + } +} diff --git a/internal/xelon/load_balancers_reconciler_test.go b/internal/xelon/load_balancers_reconciler_test.go new file mode 100644 index 0000000..79d071e --- /dev/null +++ b/internal/xelon/load_balancers_reconciler_test.go @@ -0,0 +1,139 @@ +package xelon + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/Xelon-AG/xelon-sdk-go/xelon" +) + +func TestReconcile_createRules(t *testing.T) { + type testCase struct { + current []xelon.LoadBalancerClusterForwardingRule + desired []xelon.LoadBalancerClusterForwardingRule + expected []xelon.LoadBalancerClusterForwardingRule + } + tests := map[string]testCase{ + "nil": { + current: nil, + desired: nil, + expected: nil, + }, + "nil current": { + current: nil, + desired: []xelon.LoadBalancerClusterForwardingRule{{ + Frontend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 8080}, + Backend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 80800}, + }}, + expected: []xelon.LoadBalancerClusterForwardingRule{{ + Frontend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 8080}, + Backend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 80800}, + }}, + }, + "nil desired": { + current: []xelon.LoadBalancerClusterForwardingRule{{ + Frontend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 8080}, + Backend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 80800}, + }}, + desired: nil, + expected: nil, + }, + "add rule from desired": { + current: []xelon.LoadBalancerClusterForwardingRule{{ + Frontend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 8080, ID: "5qggn9mtbz"}, + Backend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 80800}, + }}, + desired: []xelon.LoadBalancerClusterForwardingRule{{ + Frontend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 8080}, + Backend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 80800}, + }, { + Frontend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 8090}, + Backend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 80900}, + }}, + expected: []xelon.LoadBalancerClusterForwardingRule{{ + Frontend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 8090}, + Backend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 80900}, + }}, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + actual := reconcile(test.current, test.desired) + assert.Equal(t, test.expected, actual.rulesToCreate) + }) + } +} + +func TestReconcile_updateRules(t *testing.T) { + type testCase struct { + current []xelon.LoadBalancerClusterForwardingRule + desired []xelon.LoadBalancerClusterForwardingRule + expected []xelon.LoadBalancerClusterForwardingRule + } + tests := map[string]testCase{ + "nil": { + current: nil, + desired: nil, + expected: nil, + }, + "update with new backend port": { + current: []xelon.LoadBalancerClusterForwardingRule{{ + Frontend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 8080, ID: "5qggn9mtbz"}, + Backend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 80800, ID: "u0gkddw9rr"}, + }}, + desired: []xelon.LoadBalancerClusterForwardingRule{{ + Frontend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 8080}, + Backend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 99999}, + }}, + expected: []xelon.LoadBalancerClusterForwardingRule{{ + Frontend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 8080, ID: "5qggn9mtbz"}, + Backend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 99999, ID: "u0gkddw9rr"}, + }}, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + actual := reconcile(test.current, test.desired) + assert.Equal(t, test.expected, actual.rulesToUpdate) + }) + } +} + +func TestReconcile_deleteRules(t *testing.T) { + type testCase struct { + current []xelon.LoadBalancerClusterForwardingRule + desired []xelon.LoadBalancerClusterForwardingRule + expected []xelon.LoadBalancerClusterForwardingRule + } + tests := map[string]testCase{ + "nil": { + current: nil, + desired: nil, + expected: nil, + }, + "remove non-used existed rule": { + current: []xelon.LoadBalancerClusterForwardingRule{{ + Frontend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 8080, ID: "5qggn9mtbz"}, + Backend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 80800}, + }}, + desired: []xelon.LoadBalancerClusterForwardingRule{{ + Frontend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 8090}, + Backend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 80900}, + }}, + expected: []xelon.LoadBalancerClusterForwardingRule{{ + Frontend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 8080, ID: "5qggn9mtbz"}, + Backend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 80800}, + }}, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + actual := reconcile(test.current, test.desired) + assert.Equal(t, test.expected, actual.rulesToDelete) + }) + } +}