diff --git a/pkg/ddc/alluxio/const.go b/pkg/ddc/alluxio/const.go index 833988ec453..57d6a101bc0 100644 --- a/pkg/ddc/alluxio/const.go +++ b/pkg/ddc/alluxio/const.go @@ -57,6 +57,10 @@ const ( defaultGracefulShutdownLimits int32 = 3 defaultCleanCacheGracePeriodSeconds int32 = 60 + // defaultWorkerRPCPort is the Alluxio worker Thrift RPC port used when the + // runtime spec does not override alluxio.worker.rpc.port. + defaultWorkerRPCPort = 29999 + MountConfigStorage = "ALLUXIO_MOUNT_CONFIG_STORAGE" ConfigmapStorageName = "configmap" ) diff --git a/pkg/ddc/alluxio/operations/decommission.go b/pkg/ddc/alluxio/operations/decommission.go new file mode 100644 index 00000000000..146311c9be6 --- /dev/null +++ b/pkg/ddc/alluxio/operations/decommission.go @@ -0,0 +1,79 @@ +/* +Copyright 2024 The Fluid Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package operations + +import "strings" + +// DecommissionWorkers signals the Alluxio master to decommission the given +// workers. Each address must be in ":" form. +// The call is idempotent: re-issuing it against an already-decommissioned +// worker is safe. +func (a AlluxioFileUtils) DecommissionWorkers(addresses []string) error { + if len(addresses) == 0 { + return nil + } + command := []string{ + "alluxio", "fsadmin", "decommission", + "--addresses", strings.Join(addresses, ","), + } + _, _, err := a.exec(command, false) + if err != nil { + a.log.Error(err, "AlluxioFileUtils.DecommissionWorkers() failed", "addresses", addresses) + } + return err +} + +// CountActiveWorkers returns the number of workers currently tracked by the +// Alluxio master according to "alluxio fsadmin report capacity". +func (a AlluxioFileUtils) CountActiveWorkers() (int, error) { + report, _, err := a.exec([]string{"alluxio", "fsadmin", "report", "capacity"}, false) + if err != nil { + a.log.Error(err, "AlluxioFileUtils.CountActiveWorkers() failed") + return 0, err + } + return parseActiveWorkerCount(report), nil +} + +// parseActiveWorkerCount counts workers in the capacity report produced by +// "alluxio fsadmin report capacity". Worker entries begin at the non-indented +// line after the "Worker Name" header; the indented line that follows each +// entry contains the used-capacity detail. +// +// Worker Name Last Heartbeat Storage MEM +// 192.168.1.147 0 capacity 2048.00MB <- worker entry +// used 443.89MB (21%) <- detail, indented +// 192.168.1.146 0 capacity 2048.00MB <- worker entry +// used 0B (0%) +func parseActiveWorkerCount(report string) int { + inWorkerSection := false + count := 0 + for _, line := range strings.Split(report, "\n") { + if strings.HasPrefix(line, "Worker Name") { + inWorkerSection = true + continue + } + if !inWorkerSection || strings.TrimSpace(line) == "" { + continue + } + // Non-indented lines are new worker entries; indented lines are + // the used-capacity continuation for the previous entry. + if line[0] != ' ' && line[0] != '\t' { + count++ + } + } + return count +} diff --git a/pkg/ddc/alluxio/operations/decommission_test.go b/pkg/ddc/alluxio/operations/decommission_test.go new file mode 100644 index 00000000000..e5c85f2a2d0 --- /dev/null +++ b/pkg/ddc/alluxio/operations/decommission_test.go @@ -0,0 +1,203 @@ +/* +Copyright 2024 The Fluid Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package operations + +import ( + "errors" + "testing" + + "github.com/agiledragon/gomonkey/v2" + "github.com/fluid-cloudnative/fluid/pkg/utils/fake" +) + +func TestAlluxioFileUtils_DecommissionWorkers(t *testing.T) { + a := &AlluxioFileUtils{log: fake.NullLogger()} + + t.Run("empty address list is a no-op", func(t *testing.T) { + if err := a.DecommissionWorkers(nil); err != nil { + t.Fatalf("want nil, got: %v", err) + } + if err := a.DecommissionWorkers([]string{}); err != nil { + t.Fatalf("want nil, got: %v", err) + } + }) + + t.Run("exec error is propagated", func(t *testing.T) { + patches := gomonkey.ApplyFunc(AlluxioFileUtils.exec, + func(_ AlluxioFileUtils, _ []string, _ bool) (string, string, error) { + return "", "", errors.New("exec failed") + }) + defer patches.Reset() + + if err := a.DecommissionWorkers([]string{"192.168.1.1:29999"}); err == nil { + t.Error("want error, got nil") + } + }) + + t.Run("address is forwarded to the alluxio CLI", func(t *testing.T) { + var capturedCmd []string + patches := gomonkey.ApplyFunc(AlluxioFileUtils.exec, + func(_ AlluxioFileUtils, cmd []string, _ bool) (string, string, error) { + capturedCmd = cmd + return "", "", nil + }) + defer patches.Reset() + + addr := "192.168.1.1:29999" + if err := a.DecommissionWorkers([]string{addr}); err != nil { + t.Fatalf("want nil, got: %v", err) + } + found := false + for _, arg := range capturedCmd { + if arg == addr { + found = true + break + } + } + if !found { + t.Errorf("address %q not found in command: %v", addr, capturedCmd) + } + }) + + t.Run("multiple addresses are joined with commas", func(t *testing.T) { + var capturedCmd []string + patches := gomonkey.ApplyFunc(AlluxioFileUtils.exec, + func(_ AlluxioFileUtils, cmd []string, _ bool) (string, string, error) { + capturedCmd = cmd + return "", "", nil + }) + defer patches.Reset() + + if err := a.DecommissionWorkers([]string{"10.0.0.1:29999", "10.0.0.2:29999"}); err != nil { + t.Fatalf("want nil, got: %v", err) + } + found := false + for _, arg := range capturedCmd { + if arg == "10.0.0.1:29999,10.0.0.2:29999" { + found = true + break + } + } + if !found { + t.Errorf("joined addresses not found in command: %v", capturedCmd) + } + }) +} + +func TestAlluxioFileUtils_CountActiveWorkers(t *testing.T) { + a := &AlluxioFileUtils{log: fake.NullLogger()} + + t.Run("exec error returns zero and the error", func(t *testing.T) { + patches := gomonkey.ApplyFunc(AlluxioFileUtils.exec, + func(_ AlluxioFileUtils, _ []string, _ bool) (string, string, error) { + return "", "", errors.New("exec failed") + }) + defer patches.Reset() + + count, err := a.CountActiveWorkers() + if err == nil { + t.Error("want error, got nil") + } + if count != 0 { + t.Errorf("want 0 on error, got %d", count) + } + }) + + t.Run("two active workers", func(t *testing.T) { + report := `Capacity information for all workers: + Total Capacity: 4096.00MB + Used Capacity: 443.89MB + +Worker Name Last Heartbeat Storage MEM +192.168.1.147 0 capacity 2048.00MB + used 443.89MB (21%) +192.168.1.146 0 capacity 2048.00MB + used 0B (0%) +` + patches := gomonkey.ApplyFunc(AlluxioFileUtils.exec, + func(_ AlluxioFileUtils, _ []string, _ bool) (string, string, error) { + return report, "", nil + }) + defer patches.Reset() + + count, err := a.CountActiveWorkers() + if err != nil { + t.Fatalf("want nil, got: %v", err) + } + if count != 2 { + t.Errorf("want 2, got %d", count) + } + }) +} + +func TestParseActiveWorkerCount(t *testing.T) { + cases := []struct { + name string + input string + expect int + }{ + { + name: "empty report", + input: "", + expect: 0, + }, + { + name: "no worker section header", + input: "Capacity information for all workers:\n Total Capacity: 0B\n", + expect: 0, + }, + { + name: "single worker", + input: `Worker Name Last Heartbeat Storage MEM +192.168.1.1 0 capacity 1024.00MB + used 0B (0%) +`, + expect: 1, + }, + { + name: "three workers", + input: `Worker Name Last Heartbeat Storage MEM +10.0.0.1 0 capacity 2048.00MB + used 100MB (5%) +10.0.0.2 0 capacity 2048.00MB + used 0B (0%) +10.0.0.3 0 capacity 2048.00MB + used 500MB (25%) +`, + expect: 3, + }, + { + name: "trailing blank lines are ignored", + input: `Worker Name Last Heartbeat Storage MEM +10.0.0.1 0 capacity 1024.00MB + used 0B (0%) + + +`, + expect: 1, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + got := parseActiveWorkerCount(tc.input) + if got != tc.expect { + t.Errorf("want %d, got %d", tc.expect, got) + } + }) + } +} diff --git a/pkg/ddc/alluxio/replicas.go b/pkg/ddc/alluxio/replicas.go index f149578869f..7c6c3efe452 100644 --- a/pkg/ddc/alluxio/replicas.go +++ b/pkg/ddc/alluxio/replicas.go @@ -23,8 +23,11 @@ import ( data "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/ctrl" + "github.com/fluid-cloudnative/fluid/pkg/ddc/alluxio/operations" + "github.com/fluid-cloudnative/fluid/pkg/features" cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime" "github.com/fluid-cloudnative/fluid/pkg/utils" + utilfeature "github.com/fluid-cloudnative/fluid/pkg/utils/feature" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" @@ -88,6 +91,26 @@ func (e *AlluxioEngine) SyncReplicas(ctx cruntime.ReconcileRequestContext) (err return err } runtimeToUpdate := runtime.DeepCopy() + + // When the AdvancedStatefulSet feature is enabled and we detect a scale-in, + // decommission the targeted workers before the StatefulSet controller + // terminates them. This gives the Alluxio master a chance to migrate their + // cached blocks to the surviving workers. The reconciler requeues until the + // active worker count has dropped to the desired level. + if utilfeature.DefaultFeatureGate.Enabled(features.AdvancedStatefulSet) && + workers.Spec.Replicas != nil && + runtime.Replicas() < *workers.Spec.Replicas { + + drained, drainErr := e.drainScalingDownWorkers(runtime.Replicas(), *workers.Spec.Replicas) + if drainErr != nil { + return drainErr + } + if !drained { + return fmt.Errorf("workers not yet drained; scale-in to %d replicas will resume on next reconcile", + runtime.Replicas()) + } + } + err = e.Helper.SyncReplicas(ctx, runtimeToUpdate, runtimeToUpdate.Status, workers) return err }) @@ -97,3 +120,74 @@ func (e *AlluxioEngine) SyncReplicas(ctx cruntime.ReconcileRequestContext) (err return } + +// drainScalingDownWorkers decommissions the Alluxio workers that are about to be +// removed when scaling from currentReplicas down to desiredReplicas. +// +// A standard StatefulSet removes the highest-ordinal pods first, so the targets +// are ordinals [desiredReplicas, currentReplicas). The function issues a +// decommission request via the master and returns whether Alluxio's active +// worker count has already dropped to the desired level. +func (e *AlluxioEngine) drainScalingDownWorkers(desiredReplicas, currentReplicas int32) (bool, error) { + masterPodName, masterContainerName := e.getMasterPodInfo() + fileUtils := operations.NewAlluxioFileUtils(masterPodName, masterContainerName, e.namespace, e.Log) + + workerRPCPort := e.getWorkerRPCPort() + workerStsName := e.getWorkerName() + + // Collect RPC addresses of the pods that will be terminated on scale-down. + var toDecommission []string + for ord := desiredReplicas; ord < currentReplicas; ord++ { + podName := fmt.Sprintf("%s-%d", workerStsName, ord) + pod := &corev1.Pod{} + if err := e.Client.Get(context.TODO(), + types.NamespacedName{Name: podName, Namespace: e.namespace}, pod); err != nil { + if errors.IsNotFound(err) { + // Pod is already gone; nothing to decommission here. + continue + } + return false, err + } + if pod.Status.PodIP == "" { + e.Log.Info("Worker pod has no IP yet, will retry", "pod", podName) + return false, nil + } + toDecommission = append(toDecommission, + fmt.Sprintf("%s:%d", pod.Status.PodIP, workerRPCPort)) + } + + if len(toDecommission) == 0 { + // All targeted pods are already gone from the cluster. + return true, nil + } + + if err := fileUtils.DecommissionWorkers(toDecommission); err != nil { + return false, err + } + + activeCount, err := fileUtils.CountActiveWorkers() + if err != nil { + return false, err + } + + if int32(activeCount) > desiredReplicas { + e.Log.Info("Workers are still draining, will retry", + "activeWorkers", activeCount, "desired", desiredReplicas) + return false, nil + } + + return true, nil +} + +// getWorkerRPCPort returns the configured Alluxio worker RPC port, falling back +// to the Alluxio default when the runtime does not override it. +func (e *AlluxioEngine) getWorkerRPCPort() int { + runtime, err := e.getRuntime() + if err != nil { + return defaultWorkerRPCPort + } + if port, ok := runtime.Spec.Worker.Ports["rpc"]; ok && port > 0 { + return port + } + return defaultWorkerRPCPort +} diff --git a/pkg/features/features.go b/pkg/features/features.go new file mode 100644 index 00000000000..ca02819f5f3 --- /dev/null +++ b/pkg/features/features.go @@ -0,0 +1,45 @@ +/* +Copyright 2024 The Fluid Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package features + +import ( + utilfeature "github.com/fluid-cloudnative/fluid/pkg/utils/feature" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/component-base/featuregate" +) + +const ( + // AdvancedStatefulSet gates graceful worker scale-down for AlluxioRuntime. + // + // When enabled, workers targeted for removal are decommissioned from the + // Alluxio cluster before the pod is terminated, giving the master time to + // migrate their cached blocks to the surviving workers. Without this gate, + // cached data held on removed workers is lost immediately on scale-in. + // + // Selective deletion of non-highest-ordinal pods additionally requires + // OpenKruise to be installed in the cluster; without it the standard + // StatefulSet scale-down order (highest ordinal first) applies. + AdvancedStatefulSet featuregate.Feature = "AdvancedStatefulSet" +) + +var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{ + AdvancedStatefulSet: {Default: false, PreRelease: featuregate.Alpha}, +} + +func init() { + runtime.Must(utilfeature.DefaultMutableFeatureGate.Add(defaultFeatureGates)) +} diff --git a/test/gha-e2e/curvine/read_job.yaml b/test/gha-e2e/curvine/read_job.yaml index e7d584f682f..bb96ee806e1 100644 --- a/test/gha-e2e/curvine/read_job.yaml +++ b/test/gha-e2e/curvine/read_job.yaml @@ -24,7 +24,14 @@ spec: command: ['sh'] args: - -c - - set -ex; test -n "$(cat /data/minio/bar)" + - | + for i in $(seq 1 12); do + content=$(cat /data/minio/bar 2>/dev/null) && [ -n "$content" ] && exit 0 + echo "Attempt $i: /data/minio/bar not readable yet, retrying in 5s..." + sleep 5 + done + echo "ERROR: /data/minio/bar is not readable after 60 seconds" + exit 1 volumeMounts: - name: data-vol mountPath: /data