Skip to content

Commit 83efc6c

Browse files
committed
federatedresourcequota add the support to multi-components scheduling
Signed-off-by: zhzhuang-zju <[email protected]>
1 parent 181bffc commit 83efc6c

File tree

2 files changed

+176
-0
lines changed

2 files changed

+176
-0
lines changed

pkg/webhook/resourcebinding/validating.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,9 @@ func isQuotaRelevantFieldChanged(oldRB, newRB *workv1alpha2.ResourceBinding) boo
356356
if isScheduledReplicasChanged(oldRB, newRB) {
357357
return true
358358
}
359+
if (len(oldRB.Spec.Components) != 0 || len(newRB.Spec.Components) != 0) && isScheduledClusterChanged(oldRB, newRB) {
360+
return true
361+
}
359362
return isComponentsChanged(oldRB, newRB)
360363
}
361364

@@ -383,6 +386,22 @@ func isScheduledReplicasChanged(oldRB, newRB *workv1alpha2.ResourceBinding) bool
383386
return oldScheduledReplicas != newScheduledReplicas
384387
}
385388

389+
func isScheduledClusterChanged(oldRB, newRB *workv1alpha2.ResourceBinding) bool {
390+
if len(oldRB.Spec.Clusters) != len(newRB.Spec.Clusters) {
391+
return true
392+
}
393+
oldClusterMap := make(map[string]struct{})
394+
for _, c := range oldRB.Spec.Clusters {
395+
oldClusterMap[c.Name] = struct{}{}
396+
}
397+
for _, c := range newRB.Spec.Clusters {
398+
if _, exists := oldClusterMap[c.Name]; !exists {
399+
return true
400+
}
401+
}
402+
return false
403+
}
404+
386405
func isComponentsChanged(oldRB, newRB *workv1alpha2.ResourceBinding) bool {
387406
if len(oldRB.Spec.Components) != len(newRB.Spec.Components) {
388407
return true

test/e2e/suites/base/federatedresourcequota_test.go

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,18 @@ import (
2727
"github.com/onsi/gomega"
2828
appsv1 "k8s.io/api/apps/v1"
2929
corev1 "k8s.io/api/core/v1"
30+
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
3031
apierrors "k8s.io/apimachinery/pkg/api/errors"
3132
"k8s.io/apimachinery/pkg/api/meta"
3233
"k8s.io/apimachinery/pkg/api/resource"
3334
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
36+
"k8s.io/apimachinery/pkg/runtime/schema"
3437
"k8s.io/apimachinery/pkg/types"
3538
"k8s.io/apimachinery/pkg/util/rand"
3639
"k8s.io/cli-runtime/pkg/genericclioptions"
3740
"k8s.io/utils/ptr"
41+
"sigs.k8s.io/yaml"
3842

3943
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
4044
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
@@ -485,3 +489,156 @@ var _ = ginkgo.Describe("FederatedResourceQuota enforcement testing", func() {
485489
})
486490
})
487491
})
492+
493+
var _ = ginkgo.Describe("Multi-Components: FederatedResourceQuota enforcement testing", func() {
494+
ginkgo.Context("The federatedResourceQuota usage should be calculated correctly", func() {
495+
var (
496+
frqNamespace, frqName string
497+
flinkDeploymentCRD apiextensionsv1.CustomResourceDefinition
498+
flinkDeploymentNamespace string
499+
flinkDeploymentName string
500+
flinkDeploymentObj *unstructured.Unstructured
501+
flinkDeploymentGVR schema.GroupVersionResource
502+
federatedResourceQuota *policyv1alpha1.FederatedResourceQuota
503+
)
504+
505+
ginkgo.BeforeEach(func() {
506+
ginkgo.By("create FlinkDeployment CRD on karmada control plane", func() {
507+
err := yaml.Unmarshal([]byte(flinkDeploymentCRDYAML), &flinkDeploymentCRD)
508+
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
509+
510+
framework.CreateCRD(dynamicClient, &flinkDeploymentCRD)
511+
ginkgo.DeferCleanup(func() {
512+
framework.RemoveCRD(dynamicClient, flinkDeploymentCRD.Name)
513+
})
514+
})
515+
516+
ginkgo.By("create FederatedResourceQuota for FlinkDeployment", func() {
517+
// To avoid conflicts with other test cases, use random strings to generate unique namespaces instead of using testNamespace.
518+
frqNamespace = fmt.Sprintf("karmadatest-%s", rand.String(RandomStrLength))
519+
err := setupTestNamespace(frqNamespace, kubeClient)
520+
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
521+
ginkgo.DeferCleanup(func() {
522+
framework.RemoveNamespace(kubeClient, frqNamespace)
523+
})
524+
frqName = federatedResourceQuotaPrefix + rand.String(RandomStrLength)
525+
overall := corev1.ResourceList{
526+
"cpu": resource.MustParse("10"),
527+
"memory": resource.MustParse("1Gi"),
528+
}
529+
federatedResourceQuota = helper.NewFederatedResourceQuotaWithOverall(frqNamespace, frqName, overall)
530+
framework.CreateFederatedResourceQuota(karmadaClient, federatedResourceQuota)
531+
framework.WaitFederatedResourceQuotaFitWith(karmadaClient, frqNamespace, frqName, func(frq *policyv1alpha1.FederatedResourceQuota) bool {
532+
// To avoid race condition, ensure that OverallUsed is not nil first, and then flinkDeployment can be created.
533+
// MoreInfo can refer to https://github.com/karmada-io/karmada/pull/6876#issuecomment-3455446833.
534+
return frq.Status.Overall != nil
535+
})
536+
ginkgo.DeferCleanup(func() {
537+
framework.RemoveFederatedResourceQuota(karmadaClient, frqNamespace, frqName)
538+
})
539+
})
540+
541+
ginkgo.By("propagate FlinkDeployment CRD to all clusters", func() {
542+
cpp := helper.NewClusterPropagationPolicy("flink-deployment-cpp", []policyv1alpha1.ResourceSelector{
543+
{
544+
APIVersion: flinkDeploymentCRD.APIVersion,
545+
Kind: flinkDeploymentCRD.Kind,
546+
Name: flinkDeploymentCRD.Name,
547+
},
548+
}, policyv1alpha1.Placement{
549+
ClusterAffinity: &policyv1alpha1.ClusterAffinity{
550+
ClusterNames: framework.ClusterNames(),
551+
},
552+
})
553+
554+
framework.CreateClusterPropagationPolicy(karmadaClient, cpp)
555+
framework.WaitCRDPresentOnClusters(karmadaClient, framework.ClusterNames(),
556+
fmt.Sprintf("%s/%s", flinkDeploymentCRD.Spec.Group, "v1beta1"), flinkDeploymentCRD.Spec.Names.Kind)
557+
ginkgo.DeferCleanup(func() {
558+
framework.RemoveClusterPropagationPolicy(karmadaClient, cpp.Name)
559+
})
560+
})
561+
562+
ginkgo.By("create FlinkDeployment on karmada control plane", func() {
563+
flinkDeploymentNamespace = frqNamespace
564+
flinkDeploymentName = fmt.Sprintf("flinkdeployment-%s", rand.String(RandomStrLength))
565+
flinkDeploymentObj = &unstructured.Unstructured{}
566+
err := yaml.Unmarshal([]byte(flinkDeploymentCRYAML), flinkDeploymentObj)
567+
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
568+
569+
flinkDeploymentObj.SetNamespace(flinkDeploymentNamespace)
570+
flinkDeploymentObj.SetName(flinkDeploymentName)
571+
err = unstructured.SetNestedField(flinkDeploymentObj.Object, int64(3), "spec", "jobManager", "replicas")
572+
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
573+
err = unstructured.SetNestedField(flinkDeploymentObj.Object, map[string]interface{}{
574+
"cpu": int64(2),
575+
"memory": "50Mi",
576+
}, "spec", "jobManager", "resource")
577+
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
578+
err = unstructured.SetNestedField(flinkDeploymentObj.Object, map[string]interface{}{
579+
"cpu": int64(1),
580+
"memory": "100Mi",
581+
}, "spec", "taskManager", "resource")
582+
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
583+
584+
flinkDeploymentGVR = schema.GroupVersionResource{
585+
Group: flinkDeploymentObj.GroupVersionKind().Group,
586+
Version: flinkDeploymentObj.GroupVersionKind().Version,
587+
Resource: "flinkdeployments",
588+
}
589+
590+
_, err = dynamicClient.Resource(flinkDeploymentGVR).Namespace(flinkDeploymentNamespace).
591+
Create(context.Background(), flinkDeploymentObj, metav1.CreateOptions{})
592+
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
593+
ginkgo.DeferCleanup(func() {
594+
err := dynamicClient.Resource(flinkDeploymentGVR).Namespace(flinkDeploymentNamespace).
595+
Delete(context.Background(), flinkDeploymentName, metav1.DeleteOptions{})
596+
if err != nil && !apierrors.IsNotFound(err) {
597+
// Ignore not found errors during cleanup, but other errors should be handled
598+
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
599+
}
600+
})
601+
})
602+
603+
ginkgo.By("propagate FlinkDeployment resource to one cluster", func() {
604+
pp := helper.NewPropagationPolicy(flinkDeploymentNamespace, flinkDeploymentName, []policyv1alpha1.ResourceSelector{
605+
{
606+
APIVersion: flinkDeploymentObj.GetAPIVersion(),
607+
Kind: flinkDeploymentObj.GetKind(),
608+
Name: flinkDeploymentObj.GetName(),
609+
},
610+
}, policyv1alpha1.Placement{
611+
ClusterAffinity: &policyv1alpha1.ClusterAffinity{
612+
ClusterNames: framework.ClusterNames(),
613+
},
614+
SpreadConstraints: []policyv1alpha1.SpreadConstraint{
615+
{
616+
SpreadByField: policyv1alpha1.SpreadByFieldCluster,
617+
MaxGroups: 1,
618+
MinGroups: 1,
619+
},
620+
},
621+
ReplicaScheduling: &policyv1alpha1.ReplicaSchedulingStrategy{
622+
ReplicaSchedulingType: policyv1alpha1.ReplicaSchedulingTypeDivided,
623+
ReplicaDivisionPreference: policyv1alpha1.ReplicaDivisionPreferenceAggregated,
624+
},
625+
})
626+
framework.CreatePropagationPolicy(karmadaClient, pp)
627+
ginkgo.DeferCleanup(func() {
628+
framework.RemovePropagationPolicy(karmadaClient, pp.Namespace, pp.Name)
629+
})
630+
})
631+
})
632+
633+
ginkgo.It("federatedResourceQuota usage should be calculated correctly", func() {
634+
framework.WaitFederatedResourceQuotaFitWith(karmadaClient, frqNamespace, frqName, func(frq *policyv1alpha1.FederatedResourceQuota) bool {
635+
expectedUsed := corev1.ResourceList{
636+
"cpu": resource.MustParse("7"),
637+
"memory": resource.MustParse("250Mi"),
638+
}
639+
return frq.Status.OverallUsed != nil && frq.Status.OverallUsed.Cpu().Equal(*expectedUsed.Cpu()) &&
640+
frq.Status.OverallUsed.Memory().Equal(*expectedUsed.Memory())
641+
})
642+
})
643+
})
644+
})

0 commit comments

Comments
 (0)