Skip to content

Commit 10a5f0f

Browse files
authored
Merge pull request #6812 from mszacillo/replica-estimator
Implement MaxAvailableComponentSets for general estimator
2 parents 64428fa + c2dd2a7 commit 10a5f0f

File tree

2 files changed

+344
-9
lines changed

2 files changed

+344
-9
lines changed

pkg/estimator/client/general.go

Lines changed: 138 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -53,15 +53,6 @@ func (ge *GeneralEstimator) MaxAvailableReplicas(_ context.Context, clusters []*
5353
return availableTargetClusters, nil
5454
}
5555

56-
// MaxAvailableComponentSets returns the maximum number of complete multi-component sets (in terms of replicas) that each cluster can host.
57-
func (ge *GeneralEstimator) MaxAvailableComponentSets(
58-
_ context.Context,
59-
_ *ComponentSetEstimationRequest) ([]ComponentSetEstimationResponse, error) {
60-
// Dummy implementation: return nothing for now
61-
// TODO: Implement as part of #6734
62-
return nil, nil
63-
}
64-
6556
func (ge *GeneralEstimator) maxAvailableReplicas(cluster *clusterv1alpha1.Cluster, replicaRequirements *workv1alpha2.ReplicaRequirements) int32 {
6657
//Note: resourceSummary must be deep-copied before using in the function to avoid modifying the original data structure.
6758
resourceSummary := cluster.Status.ResourceSummary.DeepCopy()
@@ -102,6 +93,144 @@ func (ge *GeneralEstimator) maxAvailableReplicas(cluster *clusterv1alpha1.Cluste
10293
return int32(maximumReplicas) // #nosec G115: integer overflow conversion int64 -> int32
10394
}
10495

96+
// MaxAvailableComponentSets (generic estimator) – resourceSummary only.
97+
func (ge *GeneralEstimator) MaxAvailableComponentSets(_ context.Context, req *ComponentSetEstimationRequest) ([]ComponentSetEstimationResponse, error) {
98+
responses := make([]ComponentSetEstimationResponse, len(req.Clusters))
99+
for i, cluster := range req.Clusters {
100+
maxComponentSets := ge.maxAvailableComponentSets(cluster, req.Components)
101+
responses[i] = ComponentSetEstimationResponse{Name: cluster.Name, Sets: maxComponentSets}
102+
}
103+
return responses, nil
104+
}
105+
106+
func (ge *GeneralEstimator) maxAvailableComponentSets(cluster *clusterv1alpha1.Cluster, components []*workv1alpha2.Component) int32 {
107+
resourceSummary := cluster.Status.ResourceSummary.DeepCopy()
108+
if resourceSummary == nil {
109+
return 0
110+
}
111+
112+
// Aggregate per-set resource requirements
113+
perSet := perSetRequirement(components)
114+
115+
// Check pod constraint
116+
available := availableResourceMap(resourceSummary)
117+
allowedPods := getAllowedPodNumber(resourceSummary)
118+
if allowedPods <= 0 {
119+
return 0
120+
}
121+
122+
podsPerSet := podsInSet(components)
123+
if podsPerSet <= 0 {
124+
// No components or resources are defined, return max pod allowance as estimate
125+
return int32(allowedPods) // #nosec G115: integer overflow conversion int64 -> int32
126+
}
127+
128+
podBound := allowedPods / podsPerSet
129+
if len(perSet) == 0 || allZero(perSet) {
130+
return int32(podBound) // #nosec G115: integer overflow conversion int64 -> int32
131+
}
132+
133+
// Find limiting resource requirement, which will bound maxSet calculation
134+
maxSets := podBound
135+
for resName, req := range perSet {
136+
if req <= 0 {
137+
continue
138+
}
139+
140+
resAvail := available[resName]
141+
if resAvail <= 0 {
142+
return 0 // no capacity for this resource
143+
}
144+
145+
resBound := resAvail / req
146+
if resBound < maxSets {
147+
maxSets = resBound
148+
}
149+
}
150+
151+
if features.FeatureGate.Enabled(features.CustomizedClusterResourceModeling) && len(cluster.Status.ResourceSummary.AllocatableModelings) > 0 {
152+
num, err := getMaximumSetsBasedOnResourceModels(cluster, components)
153+
if err != nil {
154+
klog.Warningf("Failed to get maximum sets based on resource models, skipping: %v", err)
155+
} else if num < maxSets {
156+
maxSets = num
157+
}
158+
}
159+
160+
return int32(maxSets) // #nosec G115: integer overflow conversion int64 -> int32
161+
}
162+
163+
// getMaximumSetsBasedOnResourceModels is a placeholder for future implementation.
164+
// It should refine the maximum sets based on cluster resource models, similar
165+
// to getMaximumReplicasBasedOnResourceModels but adapted to full component sets.
166+
func getMaximumSetsBasedOnResourceModels(_ *clusterv1alpha1.Cluster, _ []*workv1alpha2.Component) (int64, error) {
167+
// TODO: implement logic based on cluster.Spec.ResourceModels
168+
// For now, just return MaxInt64 so it never reduces the upper bound.
169+
return math.MaxInt64, nil
170+
}
171+
172+
// podsInSet computes the total number of pods in the CRD
173+
func podsInSet(components []*workv1alpha2.Component) int64 {
174+
var sum int64
175+
for _, c := range components {
176+
sum += int64(c.Replicas)
177+
}
178+
return sum
179+
}
180+
181+
// perSetRequirement computes the aggregate resource(such as CPU, Memory, GPU, etc) demand of one set of components.
182+
func perSetRequirement(components []*workv1alpha2.Component) map[corev1.ResourceName]int64 {
183+
resourceRequirements := map[corev1.ResourceName]int64{}
184+
for _, c := range components {
185+
if c.ReplicaRequirements == nil || c.ReplicaRequirements.ResourceRequest == nil {
186+
continue
187+
}
188+
replicas := int64(c.Replicas)
189+
for resName, qty := range c.ReplicaRequirements.ResourceRequest {
190+
baseAmount := quantityAsInt64(qty)
191+
resourceRequirements[resName] += baseAmount * replicas
192+
}
193+
}
194+
return resourceRequirements
195+
}
196+
197+
// availableResourceMap parses the cluster resourceSummary and returns map of resourceName -> availableQuantity (int64)
198+
func availableResourceMap(resourceSummary *clusterv1alpha1.ResourceSummary) map[corev1.ResourceName]int64 {
199+
available := make(map[corev1.ResourceName]int64, len(resourceSummary.Allocatable))
200+
for key, allocatable := range resourceSummary.Allocatable {
201+
a := allocatable.DeepCopy()
202+
if allocated, ok := resourceSummary.Allocated[key]; ok {
203+
a.Sub(allocated)
204+
}
205+
if allocating, ok := resourceSummary.Allocating[key]; ok {
206+
a.Sub(allocating)
207+
}
208+
available[key] = quantityAsInt64(a)
209+
}
210+
return available
211+
}
212+
213+
// Converts quantity into an int representation depending on format
214+
func quantityAsInt64(q resource.Quantity) int64 {
215+
switch q.Format {
216+
case resource.DecimalSI, resource.DecimalExponent:
217+
return q.MilliValue()
218+
case resource.BinarySI:
219+
return q.Value()
220+
default:
221+
return q.Value()
222+
}
223+
}
224+
225+
func allZero(m map[corev1.ResourceName]int64) bool {
226+
for _, v := range m {
227+
if v != 0 {
228+
return false
229+
}
230+
}
231+
return true
232+
}
233+
105234
func getAllowedPodNumber(resourceSummary *clusterv1alpha1.ResourceSummary) int64 {
106235
var allocatable, allocated, allocating int64
107236
if resourceSummary.Allocatable != nil {

pkg/estimator/client/general_test.go

Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -800,3 +800,209 @@ func TestMinimumModelIndex(t *testing.T) {
800800
})
801801
}
802802
}
803+
804+
func TestGetMaxAvailableComponentSetsGeneral(t *testing.T) {
805+
tests := []struct {
806+
name string
807+
cluster *clusterv1alpha1.Cluster
808+
components []*workv1alpha2.Component
809+
expected int32
810+
}{
811+
{
812+
name: "nil resource summary",
813+
cluster: &clusterv1alpha1.Cluster{
814+
Status: clusterv1alpha1.ClusterStatus{},
815+
},
816+
expected: 0,
817+
},
818+
{
819+
name: "no allowed pods",
820+
cluster: &clusterv1alpha1.Cluster{
821+
Status: clusterv1alpha1.ClusterStatus{
822+
ResourceSummary: &clusterv1alpha1.ResourceSummary{
823+
Allocatable: corev1.ResourceList{
824+
corev1.ResourcePods: resource.MustParse("10"),
825+
},
826+
Allocated: corev1.ResourceList{
827+
corev1.ResourcePods: resource.MustParse("10"),
828+
},
829+
},
830+
},
831+
},
832+
expected: 0,
833+
},
834+
{
835+
name: "empty component list should return max pod allowance",
836+
cluster: &clusterv1alpha1.Cluster{
837+
Status: clusterv1alpha1.ClusterStatus{
838+
ResourceSummary: &clusterv1alpha1.ResourceSummary{
839+
Allocatable: corev1.ResourceList{
840+
corev1.ResourcePods: resource.MustParse("10"),
841+
},
842+
},
843+
},
844+
},
845+
expected: 10,
846+
},
847+
{
848+
name: "basic resource estimation",
849+
cluster: &clusterv1alpha1.Cluster{
850+
Status: clusterv1alpha1.ClusterStatus{
851+
ResourceSummary: &clusterv1alpha1.ResourceSummary{
852+
Allocatable: corev1.ResourceList{
853+
corev1.ResourcePods: resource.MustParse("100"),
854+
corev1.ResourceCPU: resource.MustParse("10"),
855+
corev1.ResourceMemory: resource.MustParse("8Gi"),
856+
},
857+
Allocated: corev1.ResourceList{
858+
corev1.ResourcePods: resource.MustParse("20"),
859+
corev1.ResourceCPU: resource.MustParse("0"),
860+
corev1.ResourceMemory: resource.MustParse("2Gi"),
861+
},
862+
},
863+
},
864+
},
865+
components: []*workv1alpha2.Component{
866+
{
867+
Name: "jobmanager",
868+
Replicas: 1,
869+
ReplicaRequirements: &workv1alpha2.ComponentReplicaRequirements{
870+
ResourceRequest: corev1.ResourceList{
871+
corev1.ResourceCPU: resource.MustParse("1"),
872+
},
873+
},
874+
},
875+
{
876+
Name: "taskmanager",
877+
Replicas: 2,
878+
ReplicaRequirements: &workv1alpha2.ComponentReplicaRequirements{
879+
ResourceRequest: corev1.ResourceList{
880+
corev1.ResourceCPU: resource.MustParse("1.5"),
881+
},
882+
},
883+
},
884+
},
885+
expected: 2,
886+
},
887+
{
888+
name: "resource estimation with mixed components",
889+
cluster: &clusterv1alpha1.Cluster{
890+
Status: clusterv1alpha1.ClusterStatus{
891+
ResourceSummary: &clusterv1alpha1.ResourceSummary{
892+
Allocatable: corev1.ResourceList{
893+
corev1.ResourcePods: resource.MustParse("100"),
894+
corev1.ResourceCPU: resource.MustParse("10"),
895+
corev1.ResourceMemory: resource.MustParse("8Gi"),
896+
},
897+
Allocated: corev1.ResourceList{
898+
corev1.ResourcePods: resource.MustParse("20"),
899+
corev1.ResourceCPU: resource.MustParse("0"),
900+
corev1.ResourceMemory: resource.MustParse("2Gi"),
901+
},
902+
},
903+
},
904+
},
905+
components: []*workv1alpha2.Component{
906+
{
907+
Name: "jobmanager",
908+
Replicas: 1,
909+
ReplicaRequirements: &workv1alpha2.ComponentReplicaRequirements{
910+
ResourceRequest: corev1.ResourceList{
911+
corev1.ResourceCPU: resource.MustParse("1"),
912+
corev1.ResourceMemory: resource.MustParse("2Gi"),
913+
},
914+
},
915+
},
916+
{
917+
Name: "taskmanager",
918+
Replicas: 2,
919+
ReplicaRequirements: &workv1alpha2.ComponentReplicaRequirements{
920+
ResourceRequest: corev1.ResourceList{
921+
corev1.ResourceCPU: resource.MustParse("2000m"),
922+
corev1.ResourceMemory: resource.MustParse("2Gi"),
923+
},
924+
},
925+
},
926+
},
927+
// Per-set demand: 2 replicas × (2 CPU, 2Gi) + 1 replica x (1 CPU, 2Gi) = (5 CPU, 6Gi)
928+
// Cluster allocatable: 10 CPU, 6Gi
929+
// 10/5 = 2 sets (CPU), 6Gi/6 = 1 set (Mem)
930+
// min(2,1) = 1 set total
931+
expected: 1,
932+
},
933+
{
934+
name: "estimation limited by pod count",
935+
cluster: &clusterv1alpha1.Cluster{
936+
Status: clusterv1alpha1.ClusterStatus{
937+
ResourceSummary: &clusterv1alpha1.ResourceSummary{
938+
Allocatable: corev1.ResourceList{
939+
corev1.ResourcePods: resource.MustParse("3"),
940+
corev1.ResourceCPU: resource.MustParse("100"),
941+
corev1.ResourceMemory: resource.MustParse("1Ti"),
942+
},
943+
},
944+
},
945+
},
946+
components: []*workv1alpha2.Component{
947+
{
948+
Name: "small-component",
949+
Replicas: 3,
950+
ReplicaRequirements: &workv1alpha2.ComponentReplicaRequirements{
951+
ResourceRequest: corev1.ResourceList{
952+
corev1.ResourceCPU: resource.MustParse("10m"),
953+
corev1.ResourceMemory: resource.MustParse("1Mi"),
954+
},
955+
},
956+
},
957+
},
958+
expected: 1, // limited by pods
959+
},
960+
{
961+
name: "custom resource estimation with GPUs",
962+
cluster: &clusterv1alpha1.Cluster{
963+
Status: clusterv1alpha1.ClusterStatus{
964+
ResourceSummary: &clusterv1alpha1.ResourceSummary{
965+
Allocatable: corev1.ResourceList{
966+
corev1.ResourcePods: resource.MustParse("20"),
967+
corev1.ResourceCPU: resource.MustParse("40"),
968+
corev1.ResourceMemory: resource.MustParse("64Gi"),
969+
"nvidia.com/gpu": resource.MustParse("8"),
970+
},
971+
Allocated: corev1.ResourceList{
972+
corev1.ResourcePods: resource.MustParse("0"),
973+
corev1.ResourceCPU: resource.MustParse("0"),
974+
corev1.ResourceMemory: resource.MustParse("0Gi"),
975+
"nvidia.com/gpu": resource.MustParse("0"),
976+
},
977+
},
978+
},
979+
},
980+
components: []*workv1alpha2.Component{
981+
{
982+
Name: "gpu-worker",
983+
Replicas: 2,
984+
ReplicaRequirements: &workv1alpha2.ComponentReplicaRequirements{
985+
ResourceRequest: corev1.ResourceList{
986+
corev1.ResourceCPU: resource.MustParse("4"),
987+
corev1.ResourceMemory: resource.MustParse("8Gi"),
988+
"nvidia.com/gpu": resource.MustParse("1"),
989+
},
990+
},
991+
},
992+
},
993+
// Per-set demand: 2 replicas × (4 CPU, 8Gi, 1 GPU) = (8 CPU, 16Gi, 2 GPUs)
994+
// Cluster allocatable: 40 CPU, 64Gi, 8 GPUs
995+
// 40/8 = 5 sets (CPU), 64/16 = 4 sets (Mem), 8/2 = 4 sets (GPU)
996+
// min(5, 4, 4) = 4 sets total
997+
expected: 4,
998+
},
999+
}
1000+
1001+
estimator := NewGeneralEstimator()
1002+
for _, tt := range tests {
1003+
t.Run(tt.name, func(t *testing.T) {
1004+
result := estimator.maxAvailableComponentSets(tt.cluster, tt.components)
1005+
assert.Equal(t, tt.expected, result)
1006+
})
1007+
}
1008+
}

0 commit comments

Comments
 (0)