Skip to content

Commit 6a6fb11

Browse files
authored
Merge pull request #6934 from zhzhuang-zju/frq-components
federatedresourcequota add the support to multi-components scheduling
2 parents 9da6343 + c012888 commit 6a6fb11

File tree

2 files changed

+187
-12
lines changed

2 files changed

+187
-12
lines changed

pkg/webhook/resourcebinding/validating.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,9 @@ func isQuotaRelevantFieldChanged(oldRB, newRB *workv1alpha2.ResourceBinding) boo
356356
if isScheduledReplicasChanged(oldRB, newRB) {
357357
return true
358358
}
359+
if (len(oldRB.Spec.Components) != 0 || len(newRB.Spec.Components) != 0) && isScheduledClusterChanged(oldRB, newRB) {
360+
return true
361+
}
359362
return isComponentsChanged(oldRB, newRB)
360363
}
361364

@@ -383,6 +386,22 @@ func isScheduledReplicasChanged(oldRB, newRB *workv1alpha2.ResourceBinding) bool
383386
return oldScheduledReplicas != newScheduledReplicas
384387
}
385388

389+
func isScheduledClusterChanged(oldRB, newRB *workv1alpha2.ResourceBinding) bool {
390+
if len(oldRB.Spec.Clusters) != len(newRB.Spec.Clusters) {
391+
return true
392+
}
393+
oldClusterMap := make(map[string]struct{})
394+
for _, c := range oldRB.Spec.Clusters {
395+
oldClusterMap[c.Name] = struct{}{}
396+
}
397+
for _, c := range newRB.Spec.Clusters {
398+
if _, exists := oldClusterMap[c.Name]; !exists {
399+
return true
400+
}
401+
}
402+
return false
403+
}
404+
386405
func isComponentsChanged(oldRB, newRB *workv1alpha2.ResourceBinding) bool {
387406
if len(oldRB.Spec.Components) != len(newRB.Spec.Components) {
388407
return true

test/e2e/suites/base/federatedresourcequota_test.go

Lines changed: 168 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,19 @@ import (
2727
"github.com/onsi/gomega"
2828
appsv1 "k8s.io/api/apps/v1"
2929
corev1 "k8s.io/api/core/v1"
30+
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
3031
apierrors "k8s.io/apimachinery/pkg/api/errors"
3132
"k8s.io/apimachinery/pkg/api/meta"
3233
"k8s.io/apimachinery/pkg/api/resource"
3334
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
36+
"k8s.io/apimachinery/pkg/conversion"
37+
"k8s.io/apimachinery/pkg/runtime/schema"
3438
"k8s.io/apimachinery/pkg/types"
3539
"k8s.io/apimachinery/pkg/util/rand"
3640
"k8s.io/cli-runtime/pkg/genericclioptions"
3741
"k8s.io/utils/ptr"
42+
"sigs.k8s.io/yaml"
3843

3944
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
4045
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
@@ -53,6 +58,11 @@ const waitTimeout = 3 * time.Second
5358

5459
var admissionWebhookDenyMsgPrefix = "admission webhook \"resourcebinding.karmada.io\" denied the request"
5560

61+
var checker = conversion.EqualitiesOrDie(
62+
func(a, b resource.Quantity) bool {
63+
return a.Equal(b)
64+
})
65+
5666
var _ = framework.SerialDescribe("FederatedResourceQuota auto-provision testing", func() {
5767
var frqNamespace, frqName string
5868
var federatedResourceQuota *policyv1alpha1.FederatedResourceQuota
@@ -67,15 +77,15 @@ var _ = framework.SerialDescribe("FederatedResourceQuota auto-provision testing"
6777
f = cmdutil.NewFactory(defaultConfigFlags)
6878
})
6979

70-
ginkgo.It("CURD a federatedResourceQuota", func() {
80+
ginkgo.It("CURD a FederatedResourceQuota", func() {
7181
var clusters = framework.ClusterNames()
7282
federatedResourceQuota = helper.NewFederatedResourceQuota(frqNamespace, frqName, framework.ClusterNames())
7383
ginkgo.By("[Create] federatedResourceQuota should be propagated to member clusters", func() {
7484
framework.CreateFederatedResourceQuota(karmadaClient, federatedResourceQuota)
7585
framework.WaitResourceQuotaPresentOnClusters(clusters, frqNamespace, frqName)
7686
})
7787

78-
ginkgo.By("[Update] federatedResourceQuota should be propagated to member clusters according to the new staticAssignments", func() {
88+
ginkgo.By("[Update] FederatedResourceQuota should be propagated to member clusters according to the new staticAssignments", func() {
7989
clusters = []string{framework.ClusterNames()[0]}
8090
federatedResourceQuota = helper.NewFederatedResourceQuota(frqNamespace, frqName, clusters)
8191
patch := []map[string]interface{}{
@@ -90,7 +100,7 @@ var _ = framework.SerialDescribe("FederatedResourceQuota auto-provision testing"
90100
framework.WaitResourceQuotaDisappearOnClusters(framework.ClusterNames()[1:], frqNamespace, frqName)
91101
})
92102

93-
ginkgo.By("[Delete] federatedResourceQuota should be removed from member clusters", func() {
103+
ginkgo.By("[Delete] FederatedResourceQuota should be removed from member clusters", func() {
94104
framework.RemoveFederatedResourceQuota(karmadaClient, frqNamespace, frqName)
95105
framework.WaitResourceQuotaDisappearOnClusters(clusters, frqNamespace, frqName)
96106
})
@@ -180,7 +190,7 @@ var _ = framework.SerialDescribe("FederatedResourceQuota auto-provision testing"
180190
framework.RemoveFederatedResourceQuota(karmadaClient, frqNamespace, frqName)
181191
})
182192

183-
ginkgo.It("federatedResourceQuota should only be propagated to newly joined cluster if the new cluster is declared in the StaticAssignment", func() {
193+
ginkgo.It("FederatedResourceQuota should only be propagated to newly joined cluster if the new cluster is declared in the StaticAssignment", func() {
184194
ginkgo.By(fmt.Sprintf("Joining cluster: %s", clusterNameInStaticAssignment), func() {
185195
opts := join.CommandJoinOption{
186196
DryRun: false,
@@ -193,7 +203,7 @@ var _ = framework.SerialDescribe("FederatedResourceQuota auto-provision testing"
193203
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
194204
})
195205

196-
ginkgo.By(fmt.Sprintf("waiting federatedResourceQuota(%s/%s) present on cluster: %s", frqNamespace, frqName, clusterNameInStaticAssignment), func() {
206+
ginkgo.By(fmt.Sprintf("waiting FederatedResourceQuota(%s/%s) present on cluster: %s", frqNamespace, frqName, clusterNameInStaticAssignment), func() {
197207
clusterClient, err := util.NewClusterClientSet(clusterNameInStaticAssignment, controlPlaneClient, nil)
198208
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
199209

@@ -216,7 +226,7 @@ var _ = framework.SerialDescribe("FederatedResourceQuota auto-provision testing"
216226
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
217227
})
218228

219-
ginkgo.By(fmt.Sprintf("check if federatedResourceQuota(%s/%s) present on cluster: %s", frqNamespace, frqName, clusterNameNotInStaticAssignment), func() {
229+
ginkgo.By(fmt.Sprintf("check if FederatedResourceQuota(%s/%s) present on cluster: %s", frqNamespace, frqName, clusterNameNotInStaticAssignment), func() {
220230
clusterClient, err := util.NewClusterClientSet(clusterNameNotInStaticAssignment, controlPlaneClient, nil)
221231
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
222232

@@ -238,12 +248,12 @@ var _ = framework.SerialDescribe("[FederatedResourceQuota] status collection tes
238248
federatedResourceQuota = helper.NewFederatedResourceQuota(frqNamespace, frqName, framework.ClusterNames())
239249
})
240250

241-
ginkgo.Context("collect federatedResourceQuota status", func() {
251+
ginkgo.Context("collect FederatedResourceQuota status", func() {
242252
ginkgo.AfterEach(func() {
243253
framework.RemoveFederatedResourceQuota(karmadaClient, frqNamespace, frqName)
244254
})
245255

246-
ginkgo.It("federatedResourceQuota status should be collect correctly", func() {
256+
ginkgo.It("FederatedResourceQuota status should be collect correctly", func() {
247257
framework.CreateFederatedResourceQuota(karmadaClient, federatedResourceQuota)
248258
framework.WaitFederatedResourceQuotaCollectStatus(karmadaClient, frqNamespace, frqName)
249259

@@ -297,17 +307,15 @@ var _ = ginkgo.Describe("FederatedResourceQuota enforcement testing", func() {
297307
deploymentNamespace = deployNamespace
298308
deploymentName = policyName
299309

300-
ginkgo.By("Creating federatedResourceQuota", func() {
310+
ginkgo.By("Creating FederatedResourceQuota", func() {
301311
overall = corev1.ResourceList{
302312
"cpu": resource.MustParse("10m"),
303313
"memory": resource.MustParse("100Mi"),
304314
}
305315
federatedResourceQuota = helper.NewFederatedResourceQuotaWithOverall(frqNamespace, frqName, overall)
306316
framework.CreateFederatedResourceQuota(karmadaClient, federatedResourceQuota)
307317
framework.WaitFederatedResourceQuotaFitWith(karmadaClient, frqNamespace, frqName, func(frq *policyv1alpha1.FederatedResourceQuota) bool {
308-
// To avoid race condition, ensure that OverallUsed is not nil first, and then deployment can be created.
309-
// MoreInfo can refer to https://github.com/karmada-io/karmada/pull/6876#issuecomment-3455446833.
310-
return frq.Status.Overall != nil
318+
return checker.DeepEqual(frq.Status.Overall, overall)
311319
})
312320
ginkgo.DeferCleanup(func() {
313321
framework.RemoveFederatedResourceQuota(karmadaClient, frqNamespace, frqName)
@@ -485,3 +493,151 @@ var _ = ginkgo.Describe("FederatedResourceQuota enforcement testing", func() {
485493
})
486494
})
487495
})
496+
497+
var _ = ginkgo.Describe("Multi-Components: FederatedResourceQuota enforcement testing", func() {
498+
ginkgo.Context("The FederatedResourceQuota usage should be calculated correctly", func() {
499+
var (
500+
frqNamespace, frqName string
501+
flinkDeploymentCRD apiextensionsv1.CustomResourceDefinition
502+
flinkDeploymentNamespace string
503+
flinkDeploymentName string
504+
flinkDeploymentObj *unstructured.Unstructured
505+
flinkDeploymentGVR schema.GroupVersionResource
506+
federatedResourceQuota *policyv1alpha1.FederatedResourceQuota
507+
)
508+
509+
ginkgo.BeforeEach(func() {
510+
ginkgo.By("create FlinkDeployment CRD on karmada control plane", func() {
511+
err := yaml.Unmarshal([]byte(flinkDeploymentCRDYAML), &flinkDeploymentCRD)
512+
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
513+
514+
framework.CreateCRD(dynamicClient, &flinkDeploymentCRD)
515+
ginkgo.DeferCleanup(func() {
516+
framework.RemoveCRD(dynamicClient, flinkDeploymentCRD.Name)
517+
})
518+
})
519+
520+
ginkgo.By("create FederatedResourceQuota for FlinkDeployment", func() {
521+
// To avoid conflicts with other test cases, use random strings to generate unique namespaces instead of using testNamespace.
522+
frqNamespace = fmt.Sprintf("karmadatest-%s", rand.String(RandomStrLength))
523+
err := setupTestNamespace(frqNamespace, kubeClient)
524+
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
525+
ginkgo.DeferCleanup(func() {
526+
framework.RemoveNamespace(kubeClient, frqNamespace)
527+
})
528+
frqName = federatedResourceQuotaPrefix + rand.String(RandomStrLength)
529+
overall := corev1.ResourceList{
530+
"cpu": resource.MustParse("10"),
531+
"memory": resource.MustParse("1Gi"),
532+
}
533+
federatedResourceQuota = helper.NewFederatedResourceQuotaWithOverall(frqNamespace, frqName, overall)
534+
framework.CreateFederatedResourceQuota(karmadaClient, federatedResourceQuota)
535+
framework.WaitFederatedResourceQuotaFitWith(karmadaClient, frqNamespace, frqName, func(frq *policyv1alpha1.FederatedResourceQuota) bool {
536+
return checker.DeepEqual(frq.Status.Overall, overall)
537+
})
538+
ginkgo.DeferCleanup(func() {
539+
framework.RemoveFederatedResourceQuota(karmadaClient, frqNamespace, frqName)
540+
})
541+
})
542+
543+
ginkgo.By("propagate FlinkDeployment CRD to all clusters", func() {
544+
cpp := helper.NewClusterPropagationPolicy("flink-deployment-cpp", []policyv1alpha1.ResourceSelector{
545+
{
546+
APIVersion: flinkDeploymentCRD.APIVersion,
547+
Kind: flinkDeploymentCRD.Kind,
548+
Name: flinkDeploymentCRD.Name,
549+
},
550+
}, policyv1alpha1.Placement{
551+
ClusterAffinity: &policyv1alpha1.ClusterAffinity{
552+
ClusterNames: framework.ClusterNames(),
553+
},
554+
})
555+
556+
framework.CreateClusterPropagationPolicy(karmadaClient, cpp)
557+
framework.WaitCRDPresentOnClusters(karmadaClient, framework.ClusterNames(),
558+
fmt.Sprintf("%s/%s", flinkDeploymentCRD.Spec.Group, "v1beta1"), flinkDeploymentCRD.Spec.Names.Kind)
559+
ginkgo.DeferCleanup(func() {
560+
framework.RemoveClusterPropagationPolicy(karmadaClient, cpp.Name)
561+
})
562+
})
563+
564+
ginkgo.By("create FlinkDeployment on karmada control plane", func() {
565+
flinkDeploymentNamespace = frqNamespace
566+
flinkDeploymentName = fmt.Sprintf("flinkdeployment-%s", rand.String(RandomStrLength))
567+
flinkDeploymentObj = &unstructured.Unstructured{}
568+
err := yaml.Unmarshal([]byte(flinkDeploymentCRYAML), flinkDeploymentObj)
569+
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
570+
571+
flinkDeploymentObj.SetNamespace(flinkDeploymentNamespace)
572+
flinkDeploymentObj.SetName(flinkDeploymentName)
573+
err = unstructured.SetNestedField(flinkDeploymentObj.Object, int64(3), "spec", "jobManager", "replicas")
574+
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
575+
err = unstructured.SetNestedField(flinkDeploymentObj.Object, map[string]interface{}{
576+
"cpu": int64(2),
577+
"memory": "50Mi",
578+
}, "spec", "jobManager", "resource")
579+
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
580+
err = unstructured.SetNestedField(flinkDeploymentObj.Object, map[string]interface{}{
581+
"cpu": int64(1),
582+
"memory": "100Mi",
583+
}, "spec", "taskManager", "resource")
584+
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
585+
586+
flinkDeploymentGVR = schema.GroupVersionResource{
587+
Group: flinkDeploymentObj.GroupVersionKind().Group,
588+
Version: flinkDeploymentObj.GroupVersionKind().Version,
589+
Resource: "flinkdeployments",
590+
}
591+
592+
_, err = dynamicClient.Resource(flinkDeploymentGVR).Namespace(flinkDeploymentNamespace).
593+
Create(context.Background(), flinkDeploymentObj, metav1.CreateOptions{})
594+
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
595+
ginkgo.DeferCleanup(func() {
596+
err := dynamicClient.Resource(flinkDeploymentGVR).Namespace(flinkDeploymentNamespace).
597+
Delete(context.Background(), flinkDeploymentName, metav1.DeleteOptions{})
598+
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
599+
})
600+
})
601+
602+
ginkgo.By("propagate FlinkDeployment resource to one cluster", func() {
603+
pp := helper.NewPropagationPolicy(flinkDeploymentNamespace, flinkDeploymentName, []policyv1alpha1.ResourceSelector{
604+
{
605+
APIVersion: flinkDeploymentObj.GetAPIVersion(),
606+
Kind: flinkDeploymentObj.GetKind(),
607+
Name: flinkDeploymentObj.GetName(),
608+
},
609+
}, policyv1alpha1.Placement{
610+
ClusterAffinity: &policyv1alpha1.ClusterAffinity{
611+
ClusterNames: framework.ClusterNames(),
612+
},
613+
SpreadConstraints: []policyv1alpha1.SpreadConstraint{
614+
{
615+
SpreadByField: policyv1alpha1.SpreadByFieldCluster,
616+
MaxGroups: 1,
617+
MinGroups: 1,
618+
},
619+
},
620+
ReplicaScheduling: &policyv1alpha1.ReplicaSchedulingStrategy{
621+
ReplicaSchedulingType: policyv1alpha1.ReplicaSchedulingTypeDivided,
622+
ReplicaDivisionPreference: policyv1alpha1.ReplicaDivisionPreferenceAggregated,
623+
},
624+
})
625+
framework.CreatePropagationPolicy(karmadaClient, pp)
626+
ginkgo.DeferCleanup(func() {
627+
framework.RemovePropagationPolicy(karmadaClient, pp.Namespace, pp.Name)
628+
})
629+
})
630+
})
631+
632+
ginkgo.It("FederatedResourceQuota usage should be calculated correctly", func() {
633+
framework.WaitFederatedResourceQuotaFitWith(karmadaClient, frqNamespace, frqName, func(frq *policyv1alpha1.FederatedResourceQuota) bool {
634+
expectedUsed := corev1.ResourceList{
635+
"cpu": resource.MustParse("7"),
636+
"memory": resource.MustParse("250Mi"),
637+
}
638+
return frq.Status.OverallUsed != nil && frq.Status.OverallUsed.Cpu().Equal(*expectedUsed.Cpu()) &&
639+
frq.Status.OverallUsed.Memory().Equal(*expectedUsed.Memory())
640+
})
641+
})
642+
})
643+
})

0 commit comments

Comments
 (0)