Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 68 additions & 60 deletions internal/xelon/load_balancers.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,92 +415,100 @@ func (l *loadBalancers) updateLoadBalancer(ctx context.Context, xlb *xelonLoadBa
patcher := newServicePatcher(l.client.k8s, service)
defer func() { _ = patcher.Patch(ctx) }()

var definedForwardingRuleIDs []string
// get current state
var currentForwardingRules []xelon.LoadBalancerClusterForwardingRule
var currentForwardingRuleIDs []string
if forwardingRuleIDs, ok := service.Annotations[serviceAnnotationLoadBalancerClusterForwardingRuleIDs]; ok && forwardingRuleIDs != "" {
definedForwardingRuleIDs = strings.Split(forwardingRuleIDs, ",")
currentForwardingRuleIDs = strings.Split(forwardingRuleIDs, ",")
}

existingForwardingRules, _, err := l.client.xelon.LoadBalancerClusters.ListForwardingRules(ctx, xlb.clusterID, xlb.virtualIPID)
if err != nil {
return err
}
var definedForwardingRules []xelon.LoadBalancerClusterForwardingRule
for _, existingForwardingRule := range existingForwardingRules {
if slices.Contains(definedForwardingRuleIDs, existingForwardingRule.Frontend.ID) {
definedForwardingRules = append(definedForwardingRules, existingForwardingRule)
if slices.Contains(currentForwardingRuleIDs, existingForwardingRule.Frontend.ID) {
currentForwardingRules = append(currentForwardingRules, existingForwardingRule)
}
}
logger.Info("Fetched current state for forwarding rules", "current_forwarding_rules", currentForwardingRules)

xlbUsedPorts := make(map[int]bool, len(definedForwardingRules))
for _, definedForwardingRule := range definedForwardingRules {
if definedForwardingRule.Frontend == nil {
continue
// get desired state
var desiredForwardingRules []xelon.LoadBalancerClusterForwardingRule
for _, port := range service.Spec.Ports {
portNo := int(port.Port)
forwardingRule := xelon.LoadBalancerClusterForwardingRule{
Backend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: int(port.NodePort)},
Frontend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: portNo},
}
xlbUsedPorts[definedForwardingRule.Frontend.Port] = true
desiredForwardingRules = append(desiredForwardingRules, forwardingRule)
}
logger.Info("Calculated desired state for forwarding rules", "desired_forwarding_rules", desiredForwardingRules)

var frontendRules []string
for _, port := range service.Spec.Ports {
portNo := int(port.Port)
portExists := xlbUsedPorts[portNo]
delete(xlbUsedPorts, portNo)

if portExists {
forwardingRuleID := ""
for _, definedForwardingRule := range definedForwardingRules {
if slices.Contains(definedForwardingRuleIDs, definedForwardingRule.Frontend.ID) {
forwardingRuleID = definedForwardingRule.Frontend.ID
}
// calculate diff (reconcile)
reconcileDiff := reconcile(currentForwardingRules, desiredForwardingRules)
logger.Info("Calculate reconcile state",
"rules_to_create", reconcileDiff.rulesToCreate,
"rules_to_update", reconcileDiff.rulesToUpdate,
"rules_to_delete", reconcileDiff.rulesToDelete,
)

var frontendRuleIDs []string
if len(reconcileDiff.rulesToCreate) > 0 {
logger.Info("Creating new forwarding rules", "payload", reconcileDiff.rulesToCreate)
rules, _, err := l.client.xelon.LoadBalancerClusters.CreateForwardingRules(ctx, xlb.clusterID, xlb.virtualIPID, reconcileDiff.rulesToCreate)
if err != nil {
return err
}
for _, rule := range rules {
if rule.Frontend == nil {
continue
}
logger.Info("Update existing forwarding rule", "port", portNo, "id", forwardingRuleID)
updateRequest := &xelon.LoadBalancerClusterForwardingRuleConfiguration{
Port: portNo,
}
_, _, err := l.client.xelon.LoadBalancerClusters.UpdateForwardingRule(ctx, xlb.clusterID, xlb.virtualIPID, forwardingRuleID, updateRequest)
frontendRuleIDs = append(frontendRuleIDs, rule.Frontend.ID)
}
}

if len(reconcileDiff.rulesToUpdate) > 0 {
for _, ruleToUpdate := range reconcileDiff.rulesToUpdate {
logger.Info("Updating existing forwarding backend rule", "payload", ruleToUpdate.Backend)
_, _, err := l.client.xelon.LoadBalancerClusters.UpdateForwardingRule(ctx, xlb.clusterID, xlb.virtualIPID, ruleToUpdate.Backend.ID, ruleToUpdate.Backend)
if err != nil {
return err
}
frontendRules = append(frontendRules, forwardingRuleID)
} else {
logger.Info("Create new forwarding rule", "port", portNo)

forwardingRule := []xelon.LoadBalancerClusterForwardingRule{{
Backend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: int(port.NodePort)},
Frontend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: portNo},
}}
}
}

rules, _, err := l.client.xelon.LoadBalancerClusters.CreateForwardingRules(ctx, xlb.clusterID, xlb.virtualIPID, forwardingRule)
if err != nil {
return err
if len(reconcileDiff.rulesToDelete) > 0 {
logger.Info("Deleting forwarding rules", "payload", reconcileDiff.rulesToDelete)
for _, ruleToDelete := range reconcileDiff.rulesToDelete {
if ruleToDelete.Frontend == nil {
continue
}
for _, rule := range rules {
if rule.Frontend != nil {
frontendRules = append(frontendRules, rule.Frontend.ID)
resp, err := l.client.xelon.LoadBalancerClusters.DeleteForwardingRule(ctx, xlb.clusterID, xlb.virtualIPID, ruleToDelete.Frontend.ID)
if err != nil {
if resp != nil && resp.StatusCode == http.StatusNotFound {
logger.Info("Skipped removing not existing forwarding rule", "forwarding_rule_id", ruleToDelete.Frontend.ID)
} else {
return err
}
}
}
}

// remove any leftovers ports
for port := range xlbUsedPorts {
for _, existingForwardingRule := range existingForwardingRules {
if existingForwardingRule.Frontend.Port == port {
logger.Info("Deleting leftover forwarding rule", "port", port, "frontend_id", existingForwardingRule.Frontend.ID)
resp, err := l.client.xelon.LoadBalancerClusters.DeleteForwardingRule(ctx, xlb.clusterID, xlb.virtualIPID, existingForwardingRule.Frontend.ID)
if err != nil {
if resp != nil && resp.StatusCode == http.StatusNotFound {
logger.Info("Skipping not existing forwarding rule", "forwarding_rule_id", existingForwardingRule.Frontend.ID)
} else {
return err
}
}
// normalize ids
forwardingRuleIDs := slices.Concat(currentForwardingRuleIDs, frontendRuleIDs)
slices.Sort(forwardingRuleIDs)
forwardingRuleIDs = slices.Compact(forwardingRuleIDs)
if len(reconcileDiff.rulesToDelete) > 0 {
for _, ruleToDelete := range reconcileDiff.rulesToDelete {
if ruleToDelete.Frontend == nil {
continue
}
forwardingRuleIDs = slices.DeleteFunc(forwardingRuleIDs, deleteByFrontendID(ruleToDelete.Frontend.ID))
}
}

logger.Info("Applying forwarding rules annotation", "forwarding_rules_ids", strings.Join(frontendRules, ","))
updateServiceAnnotation(service, serviceAnnotationLoadBalancerClusterForwardingRuleIDs, strings.Join(frontendRules, ","))
logger.Info("Applying forwarding rules annotation", "forwarding_rules_ids", strings.Join(forwardingRuleIDs, ","))
updateServiceAnnotation(service, serviceAnnotationLoadBalancerClusterForwardingRuleIDs, strings.Join(forwardingRuleIDs, ","))

return nil
}
Expand All @@ -524,16 +532,16 @@ func isVirtualIPAvailable(virtualIP *xelon.LoadBalancerClusterVirtualIP, forward
}

// combine all frontend ports, so we can check it later
var frontedPorts []int32
var frontendPorts []int32
for _, forwardingRule := range forwardingRules {
if forwardingRule.Frontend != nil {
frontedPorts = append(frontedPorts, int32(forwardingRule.Frontend.Port))
frontendPorts = append(frontendPorts, int32(forwardingRule.Frontend.Port))
}
}

// check if service's ports are already configured in forwarding rules
for _, servicePort := range service.Spec.Ports {
if slices.Contains(frontedPorts, servicePort.Port) {
if slices.Contains(frontendPorts, servicePort.Port) {
return false
}
}
Expand Down
80 changes: 80 additions & 0 deletions internal/xelon/load_balancers_reconciler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package xelon

import (
"slices"

"github.com/Xelon-AG/xelon-sdk-go/xelon"
)

type ReconcileDiff struct {
rulesToCreate []xelon.LoadBalancerClusterForwardingRule
rulesToUpdate []xelon.LoadBalancerClusterForwardingRule
rulesToDelete []xelon.LoadBalancerClusterForwardingRule
}

func reconcile(currentRules []xelon.LoadBalancerClusterForwardingRule, desiredRules []xelon.LoadBalancerClusterForwardingRule) ReconcileDiff {
reconcileDiff := ReconcileDiff{}

// first find rules to create
for _, desiredRule := range desiredRules {
found := false
for _, currentRule := range currentRules {
if currentRule.Frontend == nil || desiredRule.Frontend == nil {
continue
}
if currentRule.Frontend.Port == desiredRule.Frontend.Port {
found = true
}
}
if !found {
reconcileDiff.rulesToCreate = append(reconcileDiff.rulesToCreate, desiredRule)
}
}

// update case: iterate over current rules and find rules with the same frontend port but different backend port
for _, currentRule := range currentRules {
for _, desiredRule := range desiredRules {
if currentRule.Frontend == nil || desiredRule.Frontend == nil {
continue
}
if currentRule.Frontend.Port == desiredRule.Frontend.Port &&
currentRule.Backend.Port != desiredRule.Backend.Port {
desiredRule.Frontend.ID = currentRule.Frontend.ID
desiredRule.Backend.ID = currentRule.Backend.ID
reconcileDiff.rulesToUpdate = append(reconcileDiff.rulesToUpdate, desiredRule)
break
}
}
}

// delete
for _, currentRule := range currentRules {
if slices.ContainsFunc(reconcileDiff.rulesToCreate, compareByFrontendPorts(currentRule)) {
continue
}
if slices.ContainsFunc(reconcileDiff.rulesToUpdate, compareByFrontendPorts(currentRule)) {
continue
}
if slices.ContainsFunc(desiredRules, compareByFrontendPorts(currentRule)) {
continue
}
reconcileDiff.rulesToDelete = append(reconcileDiff.rulesToDelete, currentRule)
}

return reconcileDiff
}

func compareByFrontendPorts(first xelon.LoadBalancerClusterForwardingRule) func(xelon.LoadBalancerClusterForwardingRule) bool {
return func(second xelon.LoadBalancerClusterForwardingRule) bool {
if first.Frontend == nil || second.Frontend == nil {
return false
}
return first.Frontend.Port == second.Frontend.Port
}
}

func deleteByFrontendID(firstID string) func(string) bool {
return func(secondID string) bool {
return firstID == secondID
}
}
139 changes: 139 additions & 0 deletions internal/xelon/load_balancers_reconciler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package xelon

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/Xelon-AG/xelon-sdk-go/xelon"
)

func TestReconcile_createRules(t *testing.T) {
type testCase struct {
current []xelon.LoadBalancerClusterForwardingRule
desired []xelon.LoadBalancerClusterForwardingRule
expected []xelon.LoadBalancerClusterForwardingRule
}
tests := map[string]testCase{
"nil": {
current: nil,
desired: nil,
expected: nil,
},
"nil current": {
current: nil,
desired: []xelon.LoadBalancerClusterForwardingRule{{
Frontend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 8080},
Backend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 80800},
}},
expected: []xelon.LoadBalancerClusterForwardingRule{{
Frontend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 8080},
Backend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 80800},
}},
},
"nil desired": {
current: []xelon.LoadBalancerClusterForwardingRule{{
Frontend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 8080},
Backend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 80800},
}},
desired: nil,
expected: nil,
},
"add rule from desired": {
current: []xelon.LoadBalancerClusterForwardingRule{{
Frontend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 8080, ID: "5qggn9mtbz"},
Backend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 80800},
}},
desired: []xelon.LoadBalancerClusterForwardingRule{{
Frontend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 8080},
Backend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 80800},
}, {
Frontend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 8090},
Backend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 80900},
}},
expected: []xelon.LoadBalancerClusterForwardingRule{{
Frontend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 8090},
Backend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 80900},
}},
},
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
actual := reconcile(test.current, test.desired)
assert.Equal(t, test.expected, actual.rulesToCreate)
})
}
}

func TestReconcile_updateRules(t *testing.T) {
type testCase struct {
current []xelon.LoadBalancerClusterForwardingRule
desired []xelon.LoadBalancerClusterForwardingRule
expected []xelon.LoadBalancerClusterForwardingRule
}
tests := map[string]testCase{
"nil": {
current: nil,
desired: nil,
expected: nil,
},
"update with new backend port": {
current: []xelon.LoadBalancerClusterForwardingRule{{
Frontend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 8080, ID: "5qggn9mtbz"},
Backend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 80800, ID: "u0gkddw9rr"},
}},
desired: []xelon.LoadBalancerClusterForwardingRule{{
Frontend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 8080},
Backend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 99999},
}},
expected: []xelon.LoadBalancerClusterForwardingRule{{
Frontend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 8080, ID: "5qggn9mtbz"},
Backend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 99999, ID: "u0gkddw9rr"},
}},
},
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
actual := reconcile(test.current, test.desired)
assert.Equal(t, test.expected, actual.rulesToUpdate)
})
}
}

func TestReconcile_deleteRules(t *testing.T) {
type testCase struct {
current []xelon.LoadBalancerClusterForwardingRule
desired []xelon.LoadBalancerClusterForwardingRule
expected []xelon.LoadBalancerClusterForwardingRule
}
tests := map[string]testCase{
"nil": {
current: nil,
desired: nil,
expected: nil,
},
"remove non-used existed rule": {
current: []xelon.LoadBalancerClusterForwardingRule{{
Frontend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 8080, ID: "5qggn9mtbz"},
Backend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 80800},
}},
desired: []xelon.LoadBalancerClusterForwardingRule{{
Frontend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 8090},
Backend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 80900},
}},
expected: []xelon.LoadBalancerClusterForwardingRule{{
Frontend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 8080, ID: "5qggn9mtbz"},
Backend: &xelon.LoadBalancerClusterForwardingRuleConfiguration{Port: 80800},
}},
},
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
actual := reconcile(test.current, test.desired)
assert.Equal(t, test.expected, actual.rulesToDelete)
})
}
}
Loading