diff --git a/pkg/ddc/thin/engine_test.go b/pkg/ddc/thin/engine_test.go index 79373882d08..0307bf8683e 100644 --- a/pkg/ddc/thin/engine_test.go +++ b/pkg/ddc/thin/engine_test.go @@ -17,320 +17,247 @@ package thin import ( - "testing" + "context" datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime" "github.com/fluid-cloudnative/fluid/pkg/utils/fake" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" ) -func TestBuild(t *testing.T) { - var namespace = v1.Namespace{ - ObjectMeta: metav1.ObjectMeta{ - Name: "fluid", - }, - } - testObjs := []runtime.Object{} - testObjs = append(testObjs, namespace.DeepCopy()) - - var dataset = datav1alpha1.Dataset{ - ObjectMeta: metav1.ObjectMeta{ - Name: "hbase", - Namespace: "fluid", - }, - } - testObjs = append(testObjs, dataset.DeepCopy()) - var runtimeProfile = datav1alpha1.ThinRuntimeProfile{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-profile", - }, - Spec: datav1alpha1.ThinRuntimeProfileSpec{FileSystemType: "test-fstype"}, - } - testObjs = append(testObjs, runtimeProfile.DeepCopy()) - - var runtime = datav1alpha1.ThinRuntime{ - ObjectMeta: metav1.ObjectMeta{ - Name: "hbase", - Namespace: "fluid", - }, - Spec: datav1alpha1.ThinRuntimeSpec{ - ThinRuntimeProfileName: "test-profile", - Fuse: datav1alpha1.ThinFuseSpec{}, - }, - Status: datav1alpha1.RuntimeStatus{ - CacheStates: map[common.CacheStateName]string{ - common.Cached: "true", - }, - }, - } - testObjs = append(testObjs, runtime.DeepCopy()) - var runtime2 = datav1alpha1.ThinRuntime{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test", - Namespace: "fluid", - }, - Spec: datav1alpha1.ThinRuntimeSpec{ - ThinRuntimeProfileName: "test-profile", - Fuse: datav1alpha1.ThinFuseSpec{}, - }, - Status: datav1alpha1.RuntimeStatus{ - CacheStates: map[common.CacheStateName]string{ - common.Cached: "true", - }, - }, - } - - var sts = appsv1.StatefulSet{ - ObjectMeta: metav1.ObjectMeta{ - Name: "hbase-worker", - Namespace: "fluid", - }, - } - testObjs = append(testObjs, sts.DeepCopy()) - client := fake.NewFakeClientWithScheme(testScheme, testObjs...) - - var ctx = cruntime.ReconcileRequestContext{ +func buildEngineTestContext(client ctrlclient.Client, runtimeObj ctrlclient.Object, name, namespace string) cruntime.ReconcileRequestContext { + return cruntime.ReconcileRequestContext{ NamespacedName: types.NamespacedName{ - Name: "hbase", - Namespace: "fluid", + Name: name, + Namespace: namespace, }, Client: client, Log: fake.NullLogger(), - RuntimeType: "thin", - Runtime: &runtime, + RuntimeType: common.ThinRuntime, + EngineImpl: common.ThinEngineImpl, + Runtime: runtimeObj, } +} - engine, err := Build("testId", ctx) - if err != nil || engine == nil { - t.Errorf("fail to exec the build function with the eror %v", err) - } +var _ = Describe("Thin Engine Build", Label("pkg.ddc.thin.engine_test.go"), func() { + Describe("Build", func() { + It("should return an error when runtime is nil", func() { + client := fake.NewFakeClientWithScheme(testScheme) - var errCtx = cruntime.ReconcileRequestContext{ - NamespacedName: types.NamespacedName{ - Name: "hbase", - Namespace: "fluid", - }, - Client: client, - Log: fake.NullLogger(), - RuntimeType: "thin", - Runtime: nil, - } + engine, err := Build("test-id", cruntime.ReconcileRequestContext{ + NamespacedName: types.NamespacedName{Name: "hbase", Namespace: "fluid"}, + Client: client, + Log: fake.NullLogger(), + RuntimeType: common.ThinRuntime, + Runtime: nil, + }) - got, err := Build("testId", errCtx) - if err == nil { - t.Errorf("expect err, but no err got %v", got) - } + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("runtime is nil")) + Expect(engine).To(BeNil()) + }) - var errrCtx = cruntime.ReconcileRequestContext{ - NamespacedName: types.NamespacedName{ - Name: "test", - Namespace: "fluid", - }, - Client: client, - Log: fake.NullLogger(), - RuntimeType: "thin", - Runtime: &runtime2, - } + It("should return an error when runtime type conversion fails", func() { + alluxioRuntime := &datav1alpha1.AlluxioRuntime{ + ObjectMeta: metav1.ObjectMeta{Name: "hbase", Namespace: "fluid"}, + } + client := fake.NewFakeClientWithScheme(testScheme, alluxioRuntime) - gott, err := Build("testId", errrCtx) - if err == nil { - t.Errorf("expect err, but no err got %v", gott) - } -} + engine, err := Build("test-id", buildEngineTestContext(client, alluxioRuntime, "hbase", "fluid")) -func TestBuildReferenceDatasetEngine(t *testing.T) { - var namespace = v1.Namespace{ - ObjectMeta: metav1.ObjectMeta{ - Name: "fluid", - }, - } - testObjs := []runtime.Object{} - testObjs = append(testObjs, namespace.DeepCopy()) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("type conversion")) + Expect(engine).To(BeNil()) + }) - var dataset = datav1alpha1.Dataset{ - ObjectMeta: metav1.ObjectMeta{ - Name: "done", - Namespace: "big-data", - }, - Status: datav1alpha1.DatasetStatus{ - Runtimes: []datav1alpha1.Runtime{ - { - Name: "done", - Namespace: "big-data", - Type: common.AlluxioRuntime, + It("should build a template engine for a normal thin runtime when runtime and profile exist", func() { + namespace := &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "fluid"}} + dataset := &datav1alpha1.Dataset{ + ObjectMeta: metav1.ObjectMeta{Name: "hbase", Namespace: "fluid"}, + } + runtimeProfile := &datav1alpha1.ThinRuntimeProfile{ + ObjectMeta: metav1.ObjectMeta{Name: "test-profile"}, + Spec: datav1alpha1.ThinRuntimeProfileSpec{FileSystemType: "test-fstype"}, + } + thinRuntime := &datav1alpha1.ThinRuntime{ + ObjectMeta: metav1.ObjectMeta{Name: "hbase", Namespace: "fluid"}, + Spec: datav1alpha1.ThinRuntimeSpec{ + ThinRuntimeProfileName: "test-profile", + Fuse: datav1alpha1.ThinFuseSpec{}, }, - }, - }, - } - var runtime = datav1alpha1.AlluxioRuntime{ - ObjectMeta: metav1.ObjectMeta{ - Name: "done", - Namespace: "big-data", - }, - } - - var refRuntime = datav1alpha1.ThinRuntime{ - ObjectMeta: metav1.ObjectMeta{ - Name: "hbase", - Namespace: "fluid", - }, - } - var refDataset = datav1alpha1.Dataset{ - ObjectMeta: metav1.ObjectMeta{ - Name: "hbase", - Namespace: "fluid", - }, - Spec: datav1alpha1.DatasetSpec{ - Mounts: []datav1alpha1.Mount{ - { - MountPoint: "dataset://big-data/done", + Status: datav1alpha1.RuntimeStatus{ + CacheStates: map[common.CacheStateName]string{common.Cached: "true"}, }, - }, - }, - } - - testObjs = append(testObjs, &dataset, &refDataset) - - testObjs = append(testObjs, &runtime, &refRuntime) - client := fake.NewFakeClientWithScheme(testScheme, testObjs...) - - var ctx = cruntime.ReconcileRequestContext{ - NamespacedName: types.NamespacedName{ - Name: "hbase", - Namespace: "fluid", - }, - Client: client, - Log: fake.NullLogger(), - RuntimeType: "thin", - Runtime: &refRuntime, - } - - engine, err := Build("testId", ctx) - if err != nil || engine == nil { - t.Errorf("fail to exec the build function with the eror %v", err) - } + } + worker := &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{Name: "hbase-worker", Namespace: "fluid"}, + } + client := fake.NewFakeClientWithScheme(testScheme, namespace, dataset, runtimeProfile, thinRuntime, worker) - var errCtx = cruntime.ReconcileRequestContext{ - NamespacedName: types.NamespacedName{ - Name: "hbase", - Namespace: "fluid", - }, - Client: client, - Log: fake.NullLogger(), - RuntimeType: "thin", - Runtime: &runtime, - } + engine, err := Build("test-id", buildEngineTestContext(client, thinRuntime, "hbase", "fluid")) - got, err := Build("testId", errCtx) - if err == nil { - t.Errorf("expect err, but no err got %v", got) - } -} + Expect(err).NotTo(HaveOccurred()) + Expect(engine).NotTo(BeNil()) + }) -func TestCheckReferenceDatasetRuntime(t *testing.T) { - tests := []struct { - name string - dataset *datav1alpha1.Dataset - runtime *datav1alpha1.ThinRuntime - ctx cruntime.ReconcileRequestContext - want bool - wantErr bool - }{ - { - name: "ref-dataset", - dataset: &datav1alpha1.Dataset{ - ObjectMeta: metav1.ObjectMeta{ - Name: "hbase", - Namespace: "fluid", + It("should delegate to the reference dataset branch when profile name is empty", func() { + namespace := &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "fluid"}} + physicalDataset := &datav1alpha1.Dataset{ + ObjectMeta: metav1.ObjectMeta{Name: "done", Namespace: "big-data"}, + Status: datav1alpha1.DatasetStatus{ + Runtimes: []datav1alpha1.Runtime{{Name: "done", Namespace: "big-data", Type: common.AlluxioRuntime}}, }, + } + physicalRuntime := &datav1alpha1.AlluxioRuntime{ + ObjectMeta: metav1.ObjectMeta{Name: "done", Namespace: "big-data"}, + } + refRuntime := &datav1alpha1.ThinRuntime{ + ObjectMeta: metav1.ObjectMeta{Name: "hbase", Namespace: "fluid"}, + } + refDataset := &datav1alpha1.Dataset{ + ObjectMeta: metav1.ObjectMeta{Name: "hbase", Namespace: "fluid"}, Spec: datav1alpha1.DatasetSpec{ - Mounts: []datav1alpha1.Mount{ - { - MountPoint: "dataset://test/test", - }, - }, - }, - }, - runtime: &datav1alpha1.ThinRuntime{ - ObjectMeta: metav1.ObjectMeta{ - Name: "hbase", - Namespace: "fluid", - }, - Spec: datav1alpha1.ThinRuntimeSpec{}, - }, - want: true, - wantErr: false, - }, - { - name: "not-ref-dataset", - dataset: &datav1alpha1.Dataset{ - ObjectMeta: metav1.ObjectMeta{ - Name: "hbase", - Namespace: "fluid", - }, - }, - runtime: &datav1alpha1.ThinRuntime{ - ObjectMeta: metav1.ObjectMeta{ - Name: "hbase", - Namespace: "fluid", + Mounts: []datav1alpha1.Mount{{MountPoint: "dataset://big-data/done"}}, }, + } + client := fake.NewFakeClientWithScheme(testScheme, namespace, physicalDataset, physicalRuntime, refRuntime, refDataset) + + engine, err := Build("test-id", buildEngineTestContext(client, refRuntime, "hbase", "fluid")) + + Expect(err).NotTo(HaveOccurred()) + Expect(engine).NotTo(BeNil()) + }) + + It("should return an error when thin runtime profile lookup fails", func() { + namespace := &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "fluid"}} + dataset := &datav1alpha1.Dataset{ + ObjectMeta: metav1.ObjectMeta{Name: "hbase", Namespace: "fluid"}, + } + thinRuntime := &datav1alpha1.ThinRuntime{ + ObjectMeta: metav1.ObjectMeta{Name: "hbase", Namespace: "fluid"}, Spec: datav1alpha1.ThinRuntimeSpec{ - ThinRuntimeProfileName: "1", + ThinRuntimeProfileName: "missing-profile", }, - }, - want: false, - wantErr: false, - }, - { - name: "dataset-not-exist", - dataset: &datav1alpha1.Dataset{ - ObjectMeta: metav1.ObjectMeta{ - Name: "hbase-no-use", - Namespace: "fluid", + } + client := fake.NewFakeClientWithScheme(testScheme, namespace, dataset, thinRuntime) + + engine, err := Build("test-id", buildEngineTestContext(client, thinRuntime, "hbase", "fluid")) + + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("error when getting thinruntime profile missing-profile")) + Expect(apierrors.IsNotFound(err)).To(BeTrue()) + Expect(engine).To(BeNil()) + }) + + It("should return an error when the request runtime has a different name than the stored thin runtime", func() { + namespace := &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "fluid"}} + dataset := &datav1alpha1.Dataset{ + ObjectMeta: metav1.ObjectMeta{Name: "hbase", Namespace: "fluid"}, + } + runtimeProfile := &datav1alpha1.ThinRuntimeProfile{ + ObjectMeta: metav1.ObjectMeta{Name: "test-profile"}, + Spec: datav1alpha1.ThinRuntimeProfileSpec{FileSystemType: "test-fstype"}, + } + storedRuntime := &datav1alpha1.ThinRuntime{ + ObjectMeta: metav1.ObjectMeta{Name: "hbase", Namespace: "fluid"}, + Spec: datav1alpha1.ThinRuntimeSpec{ + ThinRuntimeProfileName: "test-profile", + Fuse: datav1alpha1.ThinFuseSpec{}, }, - }, - runtime: &datav1alpha1.ThinRuntime{ - ObjectMeta: metav1.ObjectMeta{ - Name: "hbase", - Namespace: "fluid", + Status: datav1alpha1.RuntimeStatus{ + CacheStates: map[common.CacheStateName]string{common.Cached: "true"}, }, + } + runtimeWithDifferentName := &datav1alpha1.ThinRuntime{ + ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "fluid"}, Spec: datav1alpha1.ThinRuntimeSpec{ - ThinRuntimeProfileName: "1", + ThinRuntimeProfileName: "test-profile", + Fuse: datav1alpha1.ThinFuseSpec{}, }, - }, - want: false, - wantErr: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - fakeClient := fake.NewFakeClientWithScheme(testScheme, tt.dataset, tt.runtime) - var ctx = cruntime.ReconcileRequestContext{ - NamespacedName: types.NamespacedName{ - Name: "hbase", - Namespace: "fluid", + Status: datav1alpha1.RuntimeStatus{ + CacheStates: map[common.CacheStateName]string{common.Cached: "true"}, }, - Client: fakeClient, - Log: fake.NullLogger(), - RuntimeType: "thin", - Runtime: tt.runtime, } - isRef, err := CheckReferenceDatasetRuntime(ctx, tt.runtime) + worker := &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{Name: "hbase-worker", Namespace: "fluid"}, + } + client := fake.NewFakeClientWithScheme(testScheme, namespace, dataset, runtimeProfile, storedRuntime, worker) + + engine, err := Build("test-id", buildEngineTestContext(client, runtimeWithDifferentName, "test", "fluid")) - if (err != nil) != tt.wantErr { - t.Errorf("expect has error %t, but get error %v", tt.wantErr, err) - return + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(Equal("engine test failed to get runtime info")) + Expect(engine).To(BeNil()) + }) + + It("should return an error when runtime info bootstrap cannot fetch the thin runtime", func() { + namespace := &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "fluid"}} + runtimeProfile := &datav1alpha1.ThinRuntimeProfile{ + ObjectMeta: metav1.ObjectMeta{Name: "test-profile"}, + Spec: datav1alpha1.ThinRuntimeProfileSpec{FileSystemType: "test-fstype"}, + } + thinRuntime := &datav1alpha1.ThinRuntime{ + ObjectMeta: metav1.ObjectMeta{Name: "hbase", Namespace: "fluid"}, + Spec: datav1alpha1.ThinRuntimeSpec{ + ThinRuntimeProfileName: "test-profile", + }, } + client := fake.NewFakeClientWithScheme(testScheme, namespace, runtimeProfile) + + engine, err := Build("test-id", buildEngineTestContext(client, thinRuntime, "hbase", "fluid")) + + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(Equal("engine hbase failed to get runtime info")) + Expect(engine).To(BeNil()) + }) + }) - if isRef != tt.want { - t.Errorf(" expect is ref dataset %t, but get %t", tt.want, err) + Describe("Precheck", func() { + It("should return found true when the thin runtime exists", func() { + thinRuntime := &datav1alpha1.ThinRuntime{ + ObjectMeta: metav1.ObjectMeta{Name: "hbase", Namespace: "fluid"}, } + client := fake.NewFakeClientWithScheme(testScheme, thinRuntime) + + found, err := Precheck(client, types.NamespacedName{Name: "hbase", Namespace: "fluid"}) + + Expect(err).NotTo(HaveOccurred()) + Expect(found).To(BeTrue()) }) - } -} + + It("should return found false when the thin runtime does not exist", func() { + client := fake.NewFakeClientWithScheme(testScheme) + + found, err := Precheck(client, types.NamespacedName{Name: "missing", Namespace: "fluid"}) + + Expect(err).NotTo(HaveOccurred()) + Expect(found).To(BeFalse()) + }) + }) + + Describe("CheckReferenceDatasetRuntime", func() { + DescribeTable("should match the current thin runtime profile-name contract", + func(profileName string, expected bool) { + thinRuntime := &datav1alpha1.ThinRuntime{ + ObjectMeta: metav1.ObjectMeta{Name: "hbase", Namespace: "fluid"}, + Spec: datav1alpha1.ThinRuntimeSpec{ThinRuntimeProfileName: profileName}, + } + + isRef, err := CheckReferenceDatasetRuntime(cruntime.ReconcileRequestContext{Context: context.TODO()}, thinRuntime) + + Expect(err).NotTo(HaveOccurred()) + Expect(isRef).To(Equal(expected)) + }, + Entry("empty profile name means reference dataset runtime", "", true), + Entry("non-empty profile name means normal thin runtime", "test-profile", false), + ) + }) +}) diff --git a/pkg/ddc/thin/shutdown_test.go b/pkg/ddc/thin/shutdown_test.go index ede2bf3300a..5f1eb3f0933 100644 --- a/pkg/ddc/thin/shutdown_test.go +++ b/pkg/ddc/thin/shutdown_test.go @@ -19,7 +19,6 @@ package thin import ( "errors" "reflect" - "testing" . "github.com/agiledragon/gomonkey/v2" datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" @@ -29,317 +28,419 @@ import ( "github.com/fluid-cloudnative/fluid/pkg/utils/fake" "github.com/fluid-cloudnative/fluid/pkg/utils/helm" "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" - "github.com/go-logr/logr" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "sigs.k8s.io/controller-runtime/pkg/client" ) -func TestDestroyWorker(t *testing.T) { - // runtimeInfoSpark tests destroy Worker in exclusive mode. - runtimeInfoSpark, err := base.BuildRuntimeInfo("spark", "fluid", common.ThinRuntime) - if err != nil { - t.Errorf("fail to create the runtimeInfo with error %v", err) +func buildShutdownRuntime(name, namespace string, placementMode datav1alpha1.PlacementMode, nodeSelector map[string]string) *datav1alpha1.ThinRuntime { + return &datav1alpha1.ThinRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: datav1alpha1.ThinRuntimeSpec{ + Fuse: datav1alpha1.ThinFuseSpec{NodeSelector: nodeSelector}, + }, + Status: datav1alpha1.RuntimeStatus{}, } - runtimeInfoSpark.SetupWithDataset(&datav1alpha1.Dataset{ - Spec: datav1alpha1.DatasetSpec{PlacementMode: datav1alpha1.ExclusiveMode}, - }) +} - // runtimeInfoSpark tests destroy Worker in shareMode mode. - runtimeInfoHadoop, err := base.BuildRuntimeInfo("hadoop", "fluid", common.ThinRuntime) - if err != nil { - t.Errorf("fail to create the runtimeInfo with error %v", err) - } - runtimeInfoHadoop.SetupWithDataset(&datav1alpha1.Dataset{ - Spec: datav1alpha1.DatasetSpec{PlacementMode: datav1alpha1.ShareMode}, - }) - nodeSelector := map[string]string{ - "node-select": "true", - } - runtimeInfoHadoop.SetFuseNodeSelector(nodeSelector) - - var nodeInputs = []*corev1.Node{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: "test-node-spark", - Labels: map[string]string{ - "fluid.io/dataset-num": "1", - "fluid.io/s-thin-fluid-spark": "true", - "fluid.io/s-fluid-spark": "true", - "fluid.io/s-h-thin-d-fluid-spark": "5B", - "fluid.io/s-h-thin-m-fluid-spark": "1B", - "fluid.io/s-h-thin-t-fluid-spark": "6B", - "fluid_exclusive": "fluid_spark", - }, - }, +func buildShutdownDataset(name, namespace string, placementMode datav1alpha1.PlacementMode) *datav1alpha1.Dataset { + return &datav1alpha1.Dataset{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, }, - { - ObjectMeta: metav1.ObjectMeta{ - Name: "test-node-share", - Labels: map[string]string{ - "fluid.io/dataset-num": "2", - "fluid.io/s-thin-fluid-hadoop": "true", - "fluid.io/s-fluid-hadoop": "true", - "fluid.io/s-h-thin-d-fluid-hadoop": "5B", - "fluid.io/s-h-thin-m-fluid-hadoop": "1B", - "fluid.io/s-h-thin-t-fluid-hadoop": "6B", - "fluid.io/s-thin-fluid-hbase": "true", - "fluid.io/s-fluid-hbase": "true", - "fluid.io/s-h-thin-d-fluid-hbase": "5B", - "fluid.io/s-h-thin-m-fluid-hbase": "1B", - "fluid.io/s-h-thin-t-fluid-hbase": "6B", - }, - }, + Spec: datav1alpha1.DatasetSpec{ + PlacementMode: placementMode, }, - { - ObjectMeta: metav1.ObjectMeta{ - Name: "test-node-hadoop", - Labels: map[string]string{ - "fluid.io/dataset-num": "1", - "fluid.io/s-thin-fluid-hadoop": "true", - "fluid.io/s-fluid-hadoop": "true", - "fluid.io/s-h-thin-d-fluid-hadoop": "5B", - "fluid.io/s-h-thin-m-fluid-hadoop": "1B", - "fluid.io/s-h-thin-t-fluid-hadoop": "6B", - "node-select": "true", - }, + } +} + +func buildShutdownEngineWithWorkerTeardownFixture(name, namespace string) *ThinEngine { + runtimeInfo, err := base.BuildRuntimeInfo(name, namespace, common.ThinRuntime) + Expect(err).NotTo(HaveOccurred()) + runtimeInfo.SetupWithDataset(buildShutdownDataset(name, namespace, datav1alpha1.ExclusiveMode)) + + workerLabelName := runtimeInfo.GetCommonLabelName() + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "shutdown-node", + Labels: map[string]string{ + runtimeInfo.GetDatasetNumLabelName(): "1", + workerLabelName: "true", + runtimeInfo.GetRuntimeLabelName(): "true", }, }, } + runtimeObj := buildShutdownRuntime(name, namespace, datav1alpha1.ExclusiveMode, nil) + datasetObj := buildShutdownDataset(name, namespace, datav1alpha1.ExclusiveMode) + valueConfigMapName := name + "-" + common.ThinEngineImpl + "-values" + fakeClient := fake.NewFakeClientWithScheme( + testScheme, + runtimeObj, + datasetObj, + node, + &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: name + "-config", Namespace: namespace}}, + &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: valueConfigMapName, Namespace: namespace}}, + ) - testNodes := []runtime.Object{} - for _, nodeInput := range nodeInputs { - testNodes = append(testNodes, nodeInput.DeepCopy()) + engine := &ThinEngine{ + name: name, + namespace: namespace, + runtime: runtimeObj, + Client: fakeClient, + Log: fake.NullLogger(), + engineImpl: common.ThinEngineImpl, + runtimeInfo: runtimeInfo, + runtimeType: common.ThinRuntime, } + engine.Helper = ctrl.BuildHelper(runtimeInfo, fakeClient, engine.Log) + + return engine +} - client := fake.NewFakeClientWithScheme(testScheme, testNodes...) - - var testCase = []struct { - expectedWorkers int32 - runtimeInfo base.RuntimeInfoInterface - wantedNodeNumber int32 - wantedNodeLabels map[string]map[string]string - }{ - { - expectedWorkers: -1, - runtimeInfo: runtimeInfoSpark, - wantedNodeNumber: 0, - wantedNodeLabels: map[string]map[string]string{ - "test-node-spark": {}, - "test-node-share": { - "fluid.io/dataset-num": "2", - "fluid.io/s-thin-fluid-hadoop": "true", - "fluid.io/s-fluid-hadoop": "true", - "fluid.io/s-h-thin-d-fluid-hadoop": "5B", - "fluid.io/s-h-thin-m-fluid-hadoop": "1B", - "fluid.io/s-h-thin-t-fluid-hadoop": "6B", - "fluid.io/s-thin-fluid-hbase": "true", - "fluid.io/s-fluid-hbase": "true", - "fluid.io/s-h-thin-d-fluid-hbase": "5B", - "fluid.io/s-h-thin-m-fluid-hbase": "1B", - "fluid.io/s-h-thin-t-fluid-hbase": "6B", +var _ = Describe("ThinEngine shutdown", Label("pkg.ddc.thin.shutdown_test.go"), func() { + Describe("destroyWorkers", func() { + It("should tear down worker labels after seeding runtime objects required by runtime info", func() { + runtimeInfoSpark, err := base.BuildRuntimeInfo("spark", "fluid", common.ThinRuntime) + Expect(err).NotTo(HaveOccurred()) + runtimeInfoSpark.SetupWithDataset(buildShutdownDataset("spark", "fluid", datav1alpha1.ExclusiveMode)) + + nodeSelector := map[string]string{"node-select": "true"} + runtimeInfoHadoop, err := base.BuildRuntimeInfo("hadoop", "fluid", common.ThinRuntime) + Expect(err).NotTo(HaveOccurred()) + runtimeInfoHadoop.SetupWithDataset(buildShutdownDataset("hadoop", "fluid", datav1alpha1.ShareMode)) + runtimeInfoHadoop.SetFuseNodeSelector(nodeSelector) + + nodeInputs := []*corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node-spark", + Labels: map[string]string{ + "fluid.io/dataset-num": "1", + "fluid.io/s-thin-fluid-spark": "true", + "fluid.io/s-fluid-spark": "true", + "fluid.io/s-h-thin-d-fluid-spark": "5B", + "fluid.io/s-h-thin-m-fluid-spark": "1B", + "fluid.io/s-h-thin-t-fluid-spark": "6B", + "fluid_exclusive": "fluid_spark", + }, + }, }, - "test-node-hadoop": { - "fluid.io/dataset-num": "1", - "fluid.io/s-thin-fluid-hadoop": "true", - "fluid.io/s-fluid-hadoop": "true", - "fluid.io/s-h-thin-d-fluid-hadoop": "5B", - "fluid.io/s-h-thin-m-fluid-hadoop": "1B", - "fluid.io/s-h-thin-t-fluid-hadoop": "6B", - "node-select": "true", + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node-share", + Labels: map[string]string{ + "fluid.io/dataset-num": "2", + "fluid.io/s-thin-fluid-hadoop": "true", + "fluid.io/s-fluid-hadoop": "true", + "fluid.io/s-h-thin-d-fluid-hadoop": "5B", + "fluid.io/s-h-thin-m-fluid-hadoop": "1B", + "fluid.io/s-h-thin-t-fluid-hadoop": "6B", + "fluid.io/s-thin-fluid-hbase": "true", + "fluid.io/s-fluid-hbase": "true", + "fluid.io/s-h-thin-d-fluid-hbase": "5B", + "fluid.io/s-h-thin-m-fluid-hbase": "1B", + "fluid.io/s-h-thin-t-fluid-hbase": "6B", + }, + }, }, - }, - }, - { - expectedWorkers: -1, - runtimeInfo: runtimeInfoHadoop, - wantedNodeNumber: 0, - wantedNodeLabels: map[string]map[string]string{ - "test-node-spark": {}, - "test-node-share": { - "fluid.io/dataset-num": "1", - "fluid.io/s-thin-fluid-hbase": "true", - "fluid.io/s-fluid-hbase": "true", - "fluid.io/s-h-thin-d-fluid-hbase": "5B", - "fluid.io/s-h-thin-m-fluid-hbase": "1B", - "fluid.io/s-h-thin-t-fluid-hbase": "6B", + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node-hadoop", + Labels: map[string]string{ + "fluid.io/dataset-num": "1", + "fluid.io/s-thin-fluid-hadoop": "true", + "fluid.io/s-fluid-hadoop": "true", + "fluid.io/s-h-thin-d-fluid-hadoop": "5B", + "fluid.io/s-h-thin-m-fluid-hadoop": "1B", + "fluid.io/s-h-thin-t-fluid-hadoop": "6B", + "node-select": "true", + }, + }, }, - "test-node-hadoop": { - "node-select": "true", + } + + testObjects := []runtime.Object{ + buildShutdownRuntime("spark", "fluid", datav1alpha1.ExclusiveMode, nil), + buildShutdownDataset("spark", "fluid", datav1alpha1.ExclusiveMode), + buildShutdownRuntime("hadoop", "fluid", datav1alpha1.ShareMode, nodeSelector), + buildShutdownDataset("hadoop", "fluid", datav1alpha1.ShareMode), + } + for _, nodeInput := range nodeInputs { + testObjects = append(testObjects, nodeInput.DeepCopy()) + } + + fakeClient := fake.NewFakeClientWithScheme(testScheme, testObjects...) + + tests := []struct { + runtimeInfo base.RuntimeInfoInterface + wantedNodeLabels map[string]map[string]string + }{ + { + runtimeInfo: runtimeInfoSpark, + wantedNodeLabels: map[string]map[string]string{ + "test-node-spark": nil, + "test-node-share": { + "fluid.io/dataset-num": "2", + "fluid.io/s-thin-fluid-hadoop": "true", + "fluid.io/s-fluid-hadoop": "true", + "fluid.io/s-h-thin-d-fluid-hadoop": "5B", + "fluid.io/s-h-thin-m-fluid-hadoop": "1B", + "fluid.io/s-h-thin-t-fluid-hadoop": "6B", + "fluid.io/s-thin-fluid-hbase": "true", + "fluid.io/s-fluid-hbase": "true", + "fluid.io/s-h-thin-d-fluid-hbase": "5B", + "fluid.io/s-h-thin-m-fluid-hbase": "1B", + "fluid.io/s-h-thin-t-fluid-hbase": "6B", + }, + "test-node-hadoop": { + "fluid.io/dataset-num": "1", + "fluid.io/s-thin-fluid-hadoop": "true", + "fluid.io/s-fluid-hadoop": "true", + "fluid.io/s-h-thin-d-fluid-hadoop": "5B", + "fluid.io/s-h-thin-m-fluid-hadoop": "1B", + "fluid.io/s-h-thin-t-fluid-hadoop": "6B", + "node-select": "true", + }, + }, + }, + { + runtimeInfo: runtimeInfoHadoop, + wantedNodeLabels: map[string]map[string]string{ + "test-node-spark": nil, + "test-node-share": { + "fluid.io/dataset-num": "1", + "fluid.io/s-thin-fluid-hbase": "true", + "fluid.io/s-fluid-hbase": "true", + "fluid.io/s-h-thin-d-fluid-hbase": "5B", + "fluid.io/s-h-thin-m-fluid-hbase": "1B", + "fluid.io/s-h-thin-t-fluid-hbase": "6B", + }, + "test-node-hadoop": { + "node-select": "true", + }, + }, }, - }, - }, - } - for _, test := range testCase { - engine := &ThinEngine{Log: fake.NullLogger(), runtimeInfo: test.runtimeInfo} - engine.Client = client - engine.Helper = ctrl.BuildHelper(test.runtimeInfo, client, engine.Log) - engine.name = test.runtimeInfo.GetName() - engine.namespace = test.runtimeInfo.GetNamespace() - if err != nil { - t.Errorf("fail to exec the function with the error %v", err) - } - err := engine.destroyWorkers() - if err != nil { - t.Errorf("fail to exec the function with the error %v", err) - } - for _, node := range nodeInputs { - newNode, err := kubeclient.GetNode(client, node.Name) - if err != nil { - t.Errorf("fail to get the node with the error %v", err) } - if len(newNode.Labels) != len(test.wantedNodeLabels[node.Name]) { - t.Errorf("fail to decrease the labels") + for _, test := range tests { + engine := &ThinEngine{ + Log: fake.NullLogger(), + Client: fakeClient, + Helper: ctrl.BuildHelper(test.runtimeInfo, fakeClient, fake.NullLogger()), + runtimeInfo: test.runtimeInfo, + name: test.runtimeInfo.GetName(), + namespace: test.runtimeInfo.GetNamespace(), + runtimeType: common.ThinRuntime, + } + + Expect(engine.destroyWorkers()).To(Succeed()) + + for _, node := range nodeInputs { + newNode, err := kubeclient.GetNode(fakeClient, node.Name) + Expect(err).NotTo(HaveOccurred()) + Expect(newNode.Labels).To(Equal(test.wantedNodeLabels[node.Name])) + } } - if len(newNode.Labels) != 0 && !reflect.DeepEqual(newNode.Labels, test.wantedNodeLabels[node.Name]) { - t.Errorf("fail to decrease the labels") + }) + }) + + Describe("destroyMaster", func() { + It("should delete the release when the helm release exists", func() { + client := fake.NewFakeClientWithScheme(testScheme) + engine := &ThinEngine{ + name: "test", + namespace: "fluid", + Log: fake.NullLogger(), + Client: client, + runtime: &datav1alpha1.ThinRuntime{ + Spec: datav1alpha1.ThinRuntimeSpec{Fuse: datav1alpha1.ThinFuseSpec{}}, + }, } - } - } -} + checkReleasePatch := ApplyFunc(helm.CheckRelease, func(name string, namespace string) (bool, error) { + Expect(name).To(Equal("test")) + Expect(namespace).To(Equal("fluid")) + return true, nil + }) + defer checkReleasePatch.Reset() -func TestThinEngine_destroyMaster(t *testing.T) { - mockExecCheckReleaseCommonFound := func(name string, namespace string) (exist bool, err error) { - return true, nil - } - mockExecCheckReleaseCommonNotFound := func(name string, namespace string) (exist bool, err error) { - return false, nil - } - mockExecCheckReleaseErr := func(name string, namespace string) (exist bool, err error) { - return false, errors.New("fail to check release") - } - mockExecDeleteReleaseCommon := func(name string, namespace string) error { - return nil - } - mockExecDeleteReleaseErr := func(name string, namespace string) error { - return errors.New("fail to delete chart") - } + deleteReleasePatch := ApplyFunc(helm.DeleteRelease, func(name string, namespace string) error { + Expect(name).To(Equal("test")) + Expect(namespace).To(Equal("fluid")) + return nil + }) + defer deleteReleasePatch.Reset() - orphanedCm := &corev1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "fluid", - Name: "test-runtimeset", - }, - } - client := fake.NewFakeClientWithScheme(testScheme, orphanedCm) - - engine := ThinEngine{ - name: "test", - namespace: "fluid", - Log: fake.NullLogger(), - Client: client, - runtime: &datav1alpha1.ThinRuntime{ - Spec: datav1alpha1.ThinRuntimeSpec{ - Fuse: datav1alpha1.ThinFuseSpec{}, - }, - }, - } + Expect(engine.destroyMaster()).To(Succeed()) + }) - // check release found & delete common - checkReleasePatch := ApplyFunc(helm.CheckRelease, mockExecCheckReleaseCommonFound) - deleteReleasePatc := ApplyFunc(helm.DeleteRelease, mockExecDeleteReleaseCommon) - err := engine.destroyMaster() - if err != nil { - t.Errorf("fail to exec check helm release: %v", err) - } - checkReleasePatch.Reset() - deleteReleasePatc.Reset() - - // check release not found - checkReleasePatch.ApplyFunc(helm.CheckRelease, mockExecCheckReleaseCommonNotFound) - err = engine.destroyMaster() - if err != nil { - t.Errorf("fail to exec check helm release: %v", err) - } + It("should clean orphaned resources when the helm release does not exist", func() { + orphanedCm := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "fluid", + Name: "test-runtimeset", + }, + } + client := fake.NewFakeClientWithScheme(testScheme, orphanedCm) + engine := &ThinEngine{ + name: "test", + namespace: "fluid", + Log: fake.NullLogger(), + Client: client, + runtime: &datav1alpha1.ThinRuntime{ + Spec: datav1alpha1.ThinRuntimeSpec{Fuse: datav1alpha1.ThinFuseSpec{}}, + }, + } - if cm, err := kubeclient.GetConfigmapByName(engine.Client, orphanedCm.Name, orphanedCm.Namespace); err != nil { - t.Errorf("fail to delete orphaned resources: %v", err) - } else if cm != nil { - t.Errorf("orphaned configmap should be cleaned up") - } - checkReleasePatch.Reset() + checkReleasePatch := ApplyFunc(helm.CheckRelease, func(name string, namespace string) (bool, error) { + return false, nil + }) + defer checkReleasePatch.Reset() - // check release error - checkReleasePatch.ApplyFunc(helm.CheckRelease, mockExecCheckReleaseErr) - err = engine.destroyMaster() - if err == nil { - t.Errorf("fail to exec check helm release: %v", err) - } - checkReleasePatch.Reset() - - // check release found & delete common error - checkReleasePatch.ApplyFunc(helm.CheckRelease, mockExecCheckReleaseCommonFound) - deleteReleasePatc.ApplyFunc(helm.DeleteRelease, mockExecDeleteReleaseErr) - err = engine.destroyMaster() - if err == nil { - t.Errorf("fail to exec check helm release: %v", err) - } - checkReleasePatch.Reset() - deleteReleasePatc.Reset() -} + Expect(engine.destroyMaster()).To(Succeed()) -func TestThinEngine_cleanAll(t *testing.T) { - configMaps := []corev1.ConfigMap{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: "test-config", - Namespace: "fluid", - }, - }, - { - ObjectMeta: metav1.ObjectMeta{ - Name: "test-fluid-value", - Namespace: "fluid", - }, - }, - } - testObjs := []runtime.Object{} - for _, cm := range configMaps { - testObjs = append(testObjs, cm.DeepCopy()) - } + cm, err := kubeclient.GetConfigmapByName(engine.Client, orphanedCm.Name, orphanedCm.Namespace) + Expect(err).NotTo(HaveOccurred()) + Expect(cm).To(BeNil()) + }) - fakeClient := fake.NewFakeClientWithScheme(testScheme, testObjs...) - type fields struct { - name string - namespace string - Client client.Client - log logr.Logger - } - tests := []struct { - name string - fields fields - wantErr bool - }{ - { - name: "test", - fields: fields{ + It("should return an error when checking the helm release fails", func() { + client := fake.NewFakeClientWithScheme(testScheme) + engine := &ThinEngine{ name: "test", namespace: "fluid", - Client: fakeClient, - log: fake.NullLogger(), - }, - wantErr: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { + Log: fake.NullLogger(), + Client: client, + runtime: &datav1alpha1.ThinRuntime{ + Spec: datav1alpha1.ThinRuntimeSpec{Fuse: datav1alpha1.ThinFuseSpec{}}, + }, + } + + checkReleasePatch := ApplyFunc(helm.CheckRelease, func(name string, namespace string) (bool, error) { + return false, errors.New("fail to check release") + }) + defer checkReleasePatch.Reset() + + Expect(engine.destroyMaster()).To(MatchError("fail to check release")) + }) + + It("should return an error when deleting an installed release fails", func() { + client := fake.NewFakeClientWithScheme(testScheme) + engine := &ThinEngine{ + name: "test", + namespace: "fluid", + Log: fake.NullLogger(), + Client: client, + runtime: &datav1alpha1.ThinRuntime{ + Spec: datav1alpha1.ThinRuntimeSpec{Fuse: datav1alpha1.ThinFuseSpec{}}, + }, + } + + checkReleasePatch := ApplyFunc(helm.CheckRelease, func(name string, namespace string) (bool, error) { + return true, nil + }) + defer checkReleasePatch.Reset() + + deleteReleasePatch := ApplyFunc(helm.DeleteRelease, func(name string, namespace string) error { + return errors.New("fail to delete chart") + }) + defer deleteReleasePatch.Reset() + + Expect(engine.destroyMaster()).To(MatchError("fail to delete chart")) + }) + }) + + Describe("cleanAll", func() { + It("should clean fuse artifacts and configmaps", func() { + configMaps := []runtime.Object{ + &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: "test-config", Namespace: "fluid"}}, + &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: "test-thin-values", Namespace: "fluid"}}, + } + fakeClient := fake.NewFakeClientWithScheme(testScheme, configMaps...) + helper := &ctrl.Helper{} patches := ApplyMethod(reflect.TypeOf(helper), "CleanUpFuse", func(_ *ctrl.Helper) (int, error) { return 0, nil }) defer patches.Reset() - j := &ThinEngine{ - name: tt.fields.name, - namespace: tt.fields.namespace, - Client: fakeClient, - Log: tt.fields.log, + + engine := &ThinEngine{ + name: "test", + namespace: "fluid", + engineImpl: common.ThinEngineImpl, + Client: fakeClient, + Log: fake.NullLogger(), + Helper: helper, + } + + Expect(engine.cleanAll()).To(Succeed()) + + for _, name := range []string{"test-config", "test-thin-values"} { + cm, err := kubeclient.GetConfigmapByName(fakeClient, name, "fluid") + Expect(err).NotTo(HaveOccurred()) + Expect(cm).To(BeNil()) } - if err := j.cleanAll(); (err != nil) != tt.wantErr { - t.Errorf("cleanAll() error = %v, wantErr %v", err, tt.wantErr) + }) + }) + + Describe("Shutdown", func() { + It("should return the destroyWorkers error after the cache retry limit is reached", func() { + engine := &ThinEngine{ + name: "missing-runtime", + namespace: "fluid", + Log: fake.NullLogger(), + Client: fake.NewFakeClientWithScheme(testScheme), + gracefulShutdownLimits: 1, + retryShutdown: 1, + runtimeType: common.ThinRuntime, } + + err := engine.Shutdown() + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("not found")) }) - } -} + + It("should return the destroyMaster error after workers are torn down successfully", func() { + engine := buildShutdownEngineWithWorkerTeardownFixture("shutdown-master-error", "fluid") + engine.gracefulShutdownLimits = 1 + engine.retryShutdown = 1 + + checkReleasePatch := ApplyFunc(helm.CheckRelease, func(name string, namespace string) (bool, error) { + return false, errors.New("check release failed") + }) + defer checkReleasePatch.Reset() + + Expect(engine.Shutdown()).To(MatchError("check release failed")) + }) + + It("should complete shutdown successfully after the cache retry limit is reached", func() { + engine := buildShutdownEngineWithWorkerTeardownFixture("shutdown-success", "fluid") + engine.gracefulShutdownLimits = 1 + engine.retryShutdown = 1 + + checkReleasePatch := ApplyFunc(helm.CheckRelease, func(name string, namespace string) (bool, error) { + return false, nil + }) + defer checkReleasePatch.Reset() + + cleanUpFusePatch := ApplyMethod(reflect.TypeOf(&ctrl.Helper{}), "CleanUpFuse", func(_ *ctrl.Helper) (int, error) { + return 0, nil + }) + defer cleanUpFusePatch.Reset() + + Expect(engine.Shutdown()).To(Succeed()) + + for _, cmName := range []string{"shutdown-success-config", "shutdown-success-thin-values"} { + cm, err := kubeclient.GetConfigmapByName(engine.Client, cmName, "fluid") + Expect(err).NotTo(HaveOccurred()) + Expect(cm).To(BeNil()) + } + }) + }) +}) diff --git a/pkg/ddc/thin/status_ginkgo_test.go b/pkg/ddc/thin/status_ginkgo_test.go new file mode 100644 index 00000000000..a015c82736b --- /dev/null +++ b/pkg/ddc/thin/status_ginkgo_test.go @@ -0,0 +1,151 @@ +/* +Copyright 2026 The Fluid Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package thin + +import ( + "context" + "time" + + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/common" + "github.com/fluid-cloudnative/fluid/pkg/utils" + "github.com/fluid-cloudnative/fluid/pkg/utils/fake" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8sruntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/ptr" +) + +var _ = Describe("ThinEngine CheckAndUpdateRuntimeStatus", func() { + newEngine := func(thinRuntime *datav1alpha1.ThinRuntime, objs ...k8sruntime.Object) *ThinEngine { + allObjects := append([]k8sruntime.Object{thinRuntime}, objs...) + return &ThinEngine{ + name: thinRuntime.Name, + namespace: thinRuntime.Namespace, + runtime: thinRuntime, + engineImpl: common.ThinEngineImpl, + Client: fake.NewFakeClientWithScheme(testScheme, allObjects...), + Log: fake.NullLogger(), + } + } + + newDataset := func(name string, mounts ...datav1alpha1.Mount) *datav1alpha1.Dataset { + return &datav1alpha1.Dataset{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "fluid", + }, + Spec: datav1alpha1.DatasetSpec{Mounts: mounts}, + Status: datav1alpha1.DatasetStatus{Mounts: mounts}, + } + } + + newRuntime := func(name string) *datav1alpha1.ThinRuntime { + return &datav1alpha1.ThinRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "fluid", + CreationTimestamp: metav1.NewTime(time.Now().Add(-5 * time.Minute)), + }, + Status: datav1alpha1.RuntimeStatus{ + FusePhase: datav1alpha1.RuntimePhaseNone, + }, + } + } + + It("initializes fuse-only runtime status and strips mount options from dataset status", func() { + runtime := newRuntime("fuse-only") + engine := newEngine( + runtime, + newDataset("fuse-only", datav1alpha1.Mount{ + Name: "demo", + MountPoint: "s3://bucket", + Options: map[string]string{"fs.s3a.endpoint": "endpoint"}, + EncryptOptions: []datav1alpha1.EncryptOption{{Name: "fs.s3a.secret.key"}}, + }), + ) + + ready, err := engine.CheckAndUpdateRuntimeStatus() + + Expect(err).NotTo(HaveOccurred()) + Expect(ready).To(BeTrue()) + + updatedRuntime := &datav1alpha1.ThinRuntime{} + Expect(engine.Get(context.TODO(), types.NamespacedName{Name: "fuse-only", Namespace: "fluid"}, updatedRuntime)).To(Succeed()) + Expect(updatedRuntime.Status.FusePhase).To(Equal(datav1alpha1.RuntimePhaseReady)) + Expect(updatedRuntime.Status.ValueFileConfigmap).To(Equal("fuse-only-thin-values")) + Expect(updatedRuntime.Status.SetupDuration).NotTo(BeEmpty()) + Expect(updatedRuntime.Status.CacheStates).To(Equal(common.CacheStateList{ + common.CacheCapacity: "N/A", + common.CachedPercentage: "N/A", + common.Cached: "N/A", + common.CacheHitRatio: "N/A", + common.CacheThroughputRatio: "N/A", + })) + Expect(updatedRuntime.Status.Mounts).To(HaveLen(1)) + Expect(updatedRuntime.Status.Mounts[0].MountPoint).To(Equal("s3://bucket")) + Expect(updatedRuntime.Status.Mounts[0].Options).To(BeNil()) + Expect(updatedRuntime.Status.Mounts[0].EncryptOptions).To(BeNil()) + _, condition := utils.GetRuntimeCondition(updatedRuntime.Status.Conditions, datav1alpha1.RuntimeFusesInitialized) + Expect(condition).NotTo(BeNil()) + }) + + It("updates cache affinity and leaves setup duration empty until workers are ready", func() { + runtime := newRuntime("workers") + runtime.Spec.Replicas = 2 + runtime.Spec.Worker.Enabled = true + + workers := &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "workers-worker", + Namespace: "fluid", + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: ptr.To[int32](2), + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + NodeSelector: map[string]string{"topology.kubernetes.io/zone": "zone-a"}, + }, + }, + }, + Status: appsv1.StatefulSetStatus{ReadyReplicas: 0}, + } + + engine := newEngine(runtime, newDataset("workers", datav1alpha1.Mount{Name: "demo", MountPoint: "oss://bucket"}), workers) + + ready, err := engine.CheckAndUpdateRuntimeStatus() + + Expect(err).NotTo(HaveOccurred()) + Expect(ready).To(BeTrue()) + + updatedRuntime := &datav1alpha1.ThinRuntime{} + Expect(engine.Get(context.TODO(), types.NamespacedName{Name: "workers", Namespace: "fluid"}, updatedRuntime)).To(Succeed()) + Expect(updatedRuntime.Status.FusePhase).To(Equal(datav1alpha1.RuntimePhaseReady)) + Expect(updatedRuntime.Status.SetupDuration).To(BeEmpty()) + Expect(updatedRuntime.Status.CacheAffinity).NotTo(BeNil()) + Expect(updatedRuntime.Status.CacheAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms).To(HaveLen(1)) + Expect(updatedRuntime.Status.CacheAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions).To(ContainElement(corev1.NodeSelectorRequirement{ + Key: "topology.kubernetes.io/zone", + Operator: corev1.NodeSelectorOpIn, + Values: []string{"zone-a"}, + })) + }) +}) diff --git a/pkg/ddc/thin/transform_ginkgo_test.go b/pkg/ddc/thin/transform_ginkgo_test.go new file mode 100644 index 00000000000..41feb039f13 --- /dev/null +++ b/pkg/ddc/thin/transform_ginkgo_test.go @@ -0,0 +1,112 @@ +/* + Copyright 2022 The Fluid Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package thin + +import ( + "github.com/fluid-cloudnative/fluid/pkg/utils" + "github.com/fluid-cloudnative/fluid/pkg/utils/fake" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("ThinEngine transform", Label("pkg.ddc.thin.transform_ginkgo_test.go"), func() { + It("returns an error when the runtime is nil", func() { + engine := &ThinEngine{Log: fake.NullLogger()} + + value, err := engine.transform(nil, nil) + + Expect(err).To(MatchError("the thinRuntime is null")) + Expect(value).To(BeNil()) + }) + + It("merges runtime and profile state while enabling worker orchestration", func() { + dataset, runtime, profile := mockFluidObjectsForTests(types.NamespacedName{Name: "test-dataset", Namespace: "default"}) + engine := mockThinEngineForTests(dataset, runtime, profile) + engine.Client = fake.NewFakeClientWithScheme(datav1alpha1.UnitTestScheme, dataset, runtime, profile) + engine.runtimeInfo.SetOwnerDatasetUID(dataset.UID) + + runtime.Spec.ImagePullSecrets = []corev1.LocalObjectReference{{Name: "runtime-secret"}} + runtime.Spec.Worker.Enabled = true + runtime.Spec.Worker.Image = "runtime-worker-image" + runtime.Spec.Worker.ImageTag = "runtime-worker-tag" + runtime.Spec.Worker.ImagePullPolicy = string(corev1.PullIfNotPresent) + runtime.Spec.Worker.ImagePullSecrets = []corev1.LocalObjectReference{{Name: "runtime-worker-secret"}} + runtime.Spec.Worker.Env = []corev1.EnvVar{{Name: "RUNTIME_ENV", Value: "runtime-value"}} + runtime.Spec.Worker.Ports = []corev1.ContainerPort{{Name: "rpc", ContainerPort: 19998}} + runtime.Spec.Worker.NodeSelector = map[string]string{"disk": "ssd"} + runtime.Spec.Worker.NetworkMode = datav1alpha1.ContainerNetworkMode + runtime.Spec.TieredStore.Levels = []datav1alpha1.Level{{Path: "/runtime/cache"}} + runtime.Spec.Fuse.Image = "runtime-fuse-image" + + profile.Spec.Worker = datav1alpha1.ThinCompTemplateSpec{ + Image: "profile-worker-image", + ImageTag: "profile-worker-tag", + ImagePullPolicy: string(corev1.PullAlways), + ImagePullSecrets: []corev1.LocalObjectReference{{Name: "profile-worker-secret"}}, + Env: []corev1.EnvVar{{Name: "PROFILE_ENV", Value: "profile-value"}}, + Ports: []corev1.ContainerPort{{Name: "metrics", ContainerPort: 8080}}, + NodeSelector: map[string]string{"from": "profile"}, + NetworkMode: datav1alpha1.HostNetworkMode, + } + + value, err := engine.transform(runtime, profile) + + Expect(err).NotTo(HaveOccurred()) + Expect(value.ImagePullSecrets).To(Equal(runtime.Spec.ImagePullSecrets)) + Expect(value.Worker.Image).To(Equal(runtime.Spec.Worker.Image)) + Expect(value.Worker.ImageTag).To(Equal(runtime.Spec.Worker.ImageTag)) + Expect(value.Worker.ImagePullPolicy).To(Equal(runtime.Spec.Worker.ImagePullPolicy)) + Expect(value.Worker.ImagePullSecrets).To(Equal(runtime.Spec.Worker.ImagePullSecrets)) + Expect(value.Worker.Envs).To(ContainElement(runtime.Spec.Worker.Env[0])) + Expect(value.Worker.Ports).To(ContainElement(runtime.Spec.Worker.Ports[0])) + Expect(value.Worker.NodeSelector).To(Equal(runtime.Spec.Worker.NodeSelector)) + Expect(value.Worker.HostNetwork).To(BeFalse()) + Expect(value.Worker.CacheDir).To(Equal("/runtime/cache")) + Expect(value.RuntimeIdentity.Namespace).To(Equal(runtime.Namespace)) + Expect(value.RuntimeIdentity.Name).To(Equal(runtime.Name)) + Expect(value.OwnerDatasetId).To(Equal(utils.GetDatasetId(engine.namespace, engine.name, string(dataset.UID)))) + Expect(value.Owner).NotTo(BeNil()) + Expect(value.Owner.Name).To(Equal(runtime.Name)) + Expect(value.Fuse.Image).To(Equal(runtime.Spec.Fuse.Image)) + Expect(value.Fuse.NodeSelector).To(HaveKeyWithValue(utils.GetFuseLabelName(runtime.Namespace, runtime.Name, string(dataset.UID)), "true")) + }) + + It("defaults placement mode to exclusive when the dataset does not set one", func() { + dataset, runtime, profile := mockFluidObjectsForTests(types.NamespacedName{Name: "test-dataset", Namespace: "default"}) + dataset.Spec.PlacementMode = "" + engine := mockThinEngineForTests(dataset, runtime, profile) + engine.Client = fake.NewFakeClientWithScheme(datav1alpha1.UnitTestScheme, dataset, runtime, profile) + engine.runtimeInfo.SetOwnerDatasetUID(dataset.UID) + runtime.Spec.Fuse.Image = "runtime-fuse-image" + + value, err := engine.transform(runtime, profile) + + Expect(err).NotTo(HaveOccurred()) + Expect(value.PlacementMode).To(Equal(string(datav1alpha1.ExclusiveMode))) + Expect(value.Tolerations).To(BeEmpty()) + Expect(value.ImagePullSecrets).To(Equal(profile.Spec.ImagePullSecrets)) + Expect(value.Fuse.Enabled).To(BeTrue()) + Expect(value.OwnerDatasetId).To(Equal(utils.GetDatasetId(engine.namespace, engine.name, string(dataset.UID)))) + Expect(value.FullnameOverride).To(Equal(engine.name)) + Expect(value.Owner.Name).To(Equal(runtime.Name)) + Expect(value.Owner.UID).To(Equal(string(runtime.UID))) + }) +}) diff --git a/pkg/ddc/thin/ufs_ginkgo_test.go b/pkg/ddc/thin/ufs_ginkgo_test.go new file mode 100644 index 00000000000..4898862b0d7 --- /dev/null +++ b/pkg/ddc/thin/ufs_ginkgo_test.go @@ -0,0 +1,351 @@ +/* +Copyright 2026 The Fluid Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package thin + +import ( + "context" + "os" + + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/common" + "github.com/fluid-cloudnative/fluid/pkg/utils/fake" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8sruntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" +) + +var _ = Describe("ThinEngine genFuseMountOptions", func() { + newEngine := func(objs ...k8sruntime.Object) *ThinEngine { + return &ThinEngine{ + name: "dataset", + namespace: "fluid", + Client: fake.NewFakeClientWithScheme(testScheme, objs...), + Log: fake.NullLogger(), + } + } + + newSecret := func() *corev1.Secret { + return &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "creds", + Namespace: "fluid", + }, + Data: map[string][]byte{ + "shared-key": []byte("shared-secret"), + "mount-key": []byte("mount-secret"), + }, + } + } + + It("merges mount options with shared and mount encrypt options", func() { + engine := newEngine(newSecret()) + mount := datav1alpha1.Mount{ + Name: "demo-mount", + Options: map[string]string{"fs.mount": "mount-option"}, + EncryptOptions: []datav1alpha1.EncryptOption{{ + Name: "fs.mount.secret", + ValueFrom: datav1alpha1.EncryptOptionSource{SecretKeyRef: datav1alpha1.SecretKeySelector{ + Name: "creds", + Key: "mount-key", + }}, + }}, + } + + options, err := engine.genFuseMountOptions( + mount, + map[string]string{"fs.shared": "shared-option"}, + []datav1alpha1.EncryptOption{{ + Name: "fs.shared.secret", + ValueFrom: datav1alpha1.EncryptOptionSource{SecretKeyRef: datav1alpha1.SecretKeySelector{ + Name: "creds", + Key: "shared-key", + }}, + }}, + true, + ) + + Expect(err).NotTo(HaveOccurred()) + Expect(options).To(Equal(map[string]string{ + "fs.shared": "shared-option", + "fs.mount": "mount-option", + "fs.shared.secret": "shared-secret", + "fs.mount.secret": "mount-secret", + })) + }) + + It("skips encrypt option extraction when disabled", func() { + engine := newEngine(newSecret()) + mount := datav1alpha1.Mount{ + Name: "demo-mount", + Options: map[string]string{"fs.mount": "mount-option"}, + EncryptOptions: []datav1alpha1.EncryptOption{{Name: "fs.mount.secret"}}, + } + + options, err := engine.genFuseMountOptions( + mount, + map[string]string{"fs.shared": "shared-option"}, + []datav1alpha1.EncryptOption{{Name: "fs.shared.secret"}}, + false, + ) + + Expect(err).NotTo(HaveOccurred()) + Expect(options).To(Equal(map[string]string{ + "fs.shared": "shared-option", + "fs.mount": "mount-option", + })) + }) + + It("returns an error when an encrypt option duplicates an existing option", func() { + engine := newEngine(newSecret()) + mount := datav1alpha1.Mount{ + Name: "demo-mount", + Options: map[string]string{"fs.mount.secret": "already-set"}, + EncryptOptions: []datav1alpha1.EncryptOption{{ + Name: "fs.mount.secret", + ValueFrom: datav1alpha1.EncryptOptionSource{SecretKeyRef: datav1alpha1.SecretKeySelector{ + Name: "creds", + Key: "mount-key", + }}, + }}, + } + + _, err := engine.genFuseMountOptions(mount, nil, nil, true) + + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("set more than one times")) + }) +}) + +var _ = Describe("ThinEngine updateFusePod", func() { + newEngine := func(objs ...k8sruntime.Object) *ThinEngine { + return &ThinEngine{ + name: "dataset", + namespace: "fluid", + Client: fake.NewFakeClientWithScheme(testScheme, objs...), + Log: fake.NullLogger(), + } + } + + newFuseDaemonSet := func() *appsv1.DaemonSet { + return &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "dataset-fuse", + Namespace: "fluid", + }, + Spec: appsv1.DaemonSetSpec{ + Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"app": "fuse"}}, + }, + } + } + + newFusePod := func(name string, annotations map[string]string, ready bool) *corev1.Pod { + conditionStatus := corev1.ConditionFalse + if ready { + conditionStatus = corev1.ConditionTrue + } + + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "fluid", + Labels: map[string]string{"app": "fuse"}, + Annotations: annotations, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + Conditions: []corev1.PodCondition{{ + Type: corev1.PodReady, + Status: conditionStatus, + }}, + }, + } + } + + annotationKey := common.LabelAnnotationFusePrefix + "update-fuse-config" + + It("adds and increments the fuse config annotation on running pods", func() { + engine := newEngine( + newFuseDaemonSet(), + newFusePod("fuse-0", map[string]string{"existing": "value"}, true), + newFusePod("fuse-1", map[string]string{annotationKey: "3"}, true), + newFusePod("fuse-2", map[string]string{annotationKey: "9"}, false), + ) + + Expect(engine.updateFusePod()).To(Succeed()) + + updatedReadyPod := &corev1.Pod{} + Expect(engine.Get(context.TODO(), types.NamespacedName{Name: "fuse-0", Namespace: "fluid"}, updatedReadyPod)).To(Succeed()) + Expect(updatedReadyPod.Annotations).To(HaveKeyWithValue(annotationKey, "1")) + + updatedIncrementedPod := &corev1.Pod{} + Expect(engine.Get(context.TODO(), types.NamespacedName{Name: "fuse-1", Namespace: "fluid"}, updatedIncrementedPod)).To(Succeed()) + Expect(updatedIncrementedPod.Annotations).To(HaveKeyWithValue(annotationKey, "4")) + + notReadyPod := &corev1.Pod{} + Expect(engine.Get(context.TODO(), types.NamespacedName{Name: "fuse-2", Namespace: "fluid"}, notReadyPod)).To(Succeed()) + Expect(notReadyPod.Annotations).To(HaveKeyWithValue(annotationKey, "9")) + }) + + It("returns nil when no running fuse pod is found", func() { + engine := newEngine(newFuseDaemonSet()) + + Expect(engine.updateFusePod()).To(Succeed()) + }) + + It("returns an error when an existing annotation is not numeric", func() { + engine := newEngine( + newFuseDaemonSet(), + newFusePod("fuse-0", map[string]string{annotationKey: "invalid"}, true), + ) + + err := engine.updateFusePod() + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("invalid syntax")) + + pod := &corev1.Pod{} + Expect(engine.Get(context.TODO(), types.NamespacedName{Name: "fuse-0", Namespace: "fluid"}, pod)).To(Succeed()) + Expect(pod.Annotations).To(HaveKeyWithValue(annotationKey, "invalid")) + }) +}) + +var _ = Describe("ThinEngine ShouldUpdateUFS", func() { + newEngine := func(runtime *datav1alpha1.ThinRuntime, objs ...k8sruntime.Object) *ThinEngine { + allObjects := append([]k8sruntime.Object{runtime}, objs...) + return &ThinEngine{ + name: runtime.Name, + namespace: runtime.Namespace, + runtime: runtime, + Client: fake.NewFakeClientWithScheme(testScheme, allObjects...), + Log: fake.NullLogger(), + } + } + + newRuntime := func(name string) *datav1alpha1.ThinRuntime { + return &datav1alpha1.ThinRuntime{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "fluid", + }, + } + } + + newDataset := func(name string, mounts ...datav1alpha1.Mount) *datav1alpha1.Dataset { + return &datav1alpha1.Dataset{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "fluid", + }, + Spec: datav1alpha1.DatasetSpec{Mounts: mounts}, + } + } + + newFuseConfigMap := func(name, config string) *corev1.ConfigMap { + return &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: name + "-fuse-conf", + Namespace: "fluid", + }, + Data: map[string]string{"config.json": config}, + } + } + + newFuseDaemonSet := func(name string) *appsv1.DaemonSet { + return &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: name + "-fuse", + Namespace: "fluid", + }, + Spec: appsv1.DaemonSetSpec{ + Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"app": name + "-fuse"}}, + }, + } + } + + newFusePod := func(runtimeName, podName string) *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + Namespace: "fluid", + Labels: map[string]string{"app": runtimeName + "-fuse"}, + Annotations: map[string]string{"existing": "value"}, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + Conditions: []corev1.PodCondition{{ + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }}, + }, + } + } + + BeforeEach(func() { + Expect(os.Setenv(EnvFuseConfigStorage, "configmap")).To(Succeed()) + }) + + AfterEach(func() { + Expect(os.Unsetenv(EnvFuseConfigStorage)).To(Succeed()) + }) + + It("updates the fuse configmap and nudges fuse pods when dataset mounts change", func() { + runtime := newRuntime("sync") + dataset := newDataset( + "sync", + datav1alpha1.Mount{Name: "zookeeper", MountPoint: "https://mirrors.bit.edu.cn/apache/zookeeper/stable/"}, + datav1alpha1.Mount{Name: "hbase", MountPoint: "https://mirrors.bit.edu.cn/apache/hbase/stable/"}, + ) + configMap := newFuseConfigMap("sync", "{\"mounts\":[{\"mountPoint\":\"https://mirrors.bit.edu.cn/apache/zookeeper/stable/\",\"name\":\"zookeeper\"}],\"targetPath\":\"/thin/fluid/sync/thin-fuse\",\"accessModes\":[\"ReadOnlyMany\"]}") + engine := newEngine(runtime, dataset, configMap, newFuseDaemonSet("sync"), newFusePod("sync", "sync-fuse-0")) + + Expect(engine.ShouldUpdateUFS()).To(BeNil()) + + updatedConfigMap := &corev1.ConfigMap{} + Expect(engine.Get(context.TODO(), types.NamespacedName{Name: "sync-fuse-conf", Namespace: "fluid"}, updatedConfigMap)).To(Succeed()) + Expect(updatedConfigMap.Data["config.json"]).To(ContainSubstring("apache/hbase/stable")) + + updatedPod := &corev1.Pod{} + Expect(engine.Get(context.TODO(), types.NamespacedName{Name: "sync-fuse-0", Namespace: "fluid"}, updatedPod)).To(Succeed()) + Expect(updatedPod.Annotations).To(HaveKeyWithValue(common.LabelAnnotationFusePrefix+"update-fuse-config", "1")) + }) + + It("leaves fuse pods untouched when the generated config is unchanged", func() { + runtime := newRuntime("steady") + dataset := newDataset( + "steady", + datav1alpha1.Mount{Name: "zookeeper", MountPoint: "https://mirrors.bit.edu.cn/apache/zookeeper/stable/"}, + ) + config := "{\"mounts\":[{\"mountPoint\":\"https://mirrors.bit.edu.cn/apache/zookeeper/stable/\",\"name\":\"zookeeper\"}],\"targetPath\":\"/thin/fluid/steady/thin-fuse\",\"accessModes\":[\"ReadOnlyMany\"]}" + engine := newEngine(runtime, dataset, newFuseConfigMap("steady", config), newFuseDaemonSet("steady"), newFusePod("steady", "steady-fuse-0")) + + Expect(engine.ShouldUpdateUFS()).To(BeNil()) + + pod := &corev1.Pod{} + Expect(engine.Get(context.TODO(), types.NamespacedName{Name: "steady-fuse-0", Namespace: "fluid"}, pod)).To(Succeed()) + Expect(pod.Annotations).NotTo(HaveKey(common.LabelAnnotationFusePrefix + "update-fuse-config")) + }) + + It("returns nil when the dataset is not found", func() { + runtime := newRuntime("missing") + engine := newEngine(runtime) + + Expect(engine.ShouldUpdateUFS()).To(BeNil()) + }) +}) diff --git a/pkg/ddc/thin/wrap_pvc_ginkgo_test.go b/pkg/ddc/thin/wrap_pvc_ginkgo_test.go new file mode 100644 index 00000000000..02878dcfe3a --- /dev/null +++ b/pkg/ddc/thin/wrap_pvc_ginkgo_test.go @@ -0,0 +1,120 @@ +/* +Copyright 2026 The Fluid Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package thin + +import ( + "context" + + datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1" + "github.com/fluid-cloudnative/fluid/pkg/utils/fake" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" +) + +var _ = Describe("ThinEngine bindDatasetToMountedPersistentVolumeClaim", func() { + newEngine := func(objs ...runtime.Object) *ThinEngine { + return &ThinEngine{ + name: "dataset", + namespace: "fluid", + Client: fake.NewFakeClientWithScheme(testScheme, objs...), + Log: fake.NullLogger(), + } + } + + newDataset := func(mounts ...datav1alpha1.Mount) *datav1alpha1.Dataset { + return &datav1alpha1.Dataset{ + ObjectMeta: metav1.ObjectMeta{ + Name: "dataset", + Namespace: "fluid", + }, + Spec: datav1alpha1.DatasetSpec{Mounts: mounts}, + } + } + + newPVC := func(name string) *corev1.PersistentVolumeClaim { + return &corev1.PersistentVolumeClaim{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "PersistentVolumeClaim", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "fluid", + UID: types.UID(name + "-uid"), + }, + } + } + + It("adds the mounted pvc as a dataset owner reference", func() { + pvc := newPVC("mounted-pvc") + engine := newEngine( + newDataset(datav1alpha1.Mount{Name: "native", MountPoint: "pvc://mounted-pvc"}), + pvc, + ) + + Expect(engine.bindDatasetToMountedPersistentVolumeClaim()).To(Succeed()) + + updatedDataset := &datav1alpha1.Dataset{} + Expect(engine.Get(context.TODO(), types.NamespacedName{Name: "dataset", Namespace: "fluid"}, updatedDataset)).To(Succeed()) + Expect(updatedDataset.OwnerReferences).To(ContainElement(metav1.OwnerReference{ + APIVersion: pvc.APIVersion, + Kind: pvc.Kind, + Name: pvc.Name, + UID: pvc.UID, + })) + }) + + It("does not duplicate an existing pvc owner reference", func() { + pvc := newPVC("mounted-pvc") + ownerReference := metav1.OwnerReference{ + APIVersion: pvc.APIVersion, + Kind: pvc.Kind, + Name: pvc.Name, + UID: pvc.UID, + } + dataset := newDataset(datav1alpha1.Mount{Name: "native", MountPoint: "pvc://mounted-pvc"}) + dataset.OwnerReferences = []metav1.OwnerReference{ownerReference} + + engine := newEngine(dataset, pvc) + + Expect(engine.bindDatasetToMountedPersistentVolumeClaim()).To(Succeed()) + + updatedDataset := &datav1alpha1.Dataset{} + Expect(engine.Get(context.TODO(), types.NamespacedName{Name: "dataset", Namespace: "fluid"}, updatedDataset)).To(Succeed()) + Expect(updatedDataset.OwnerReferences).To(HaveLen(1)) + Expect(updatedDataset.OwnerReferences[0]).To(Equal(ownerReference)) + }) + + It("returns an error when more than one pvc mount is declared", func() { + engine := newEngine( + newDataset( + datav1alpha1.Mount{Name: "first", MountPoint: "pvc://mounted-pvc-1"}, + datav1alpha1.Mount{Name: "second", MountPoint: "pvc://mounted-pvc-2"}, + ), + newPVC("mounted-pvc-1"), + newPVC("mounted-pvc-2"), + ) + + err := engine.bindDatasetToMountedPersistentVolumeClaim() + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("can only contain one pvc:// mount point")) + }) +})