Skip to content

Commit 7db7a6f

Browse files
authored
Merge pull request #6896 from zhzhuang-zju/components
implement the noderesource plugin for multi-component scheduling estimator
2 parents 670c679 + 891e87c commit 7db7a6f

File tree

3 files changed

+690
-0
lines changed

3 files changed

+690
-0
lines changed

pkg/estimator/server/framework/plugins/noderesource/noderesource.go

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ type nodeResourceEstimator struct {
4848
}
4949

5050
var _ framework.EstimateReplicasPlugin = &nodeResourceEstimator{}
51+
var _ framework.EstimateComponentsPlugin = &nodeResourceEstimator{}
5152

5253
// New initializes a new plugin and returns it.
5354
func New(fh framework.Handle) (framework.Plugin, error) {
@@ -105,3 +106,170 @@ func (pl *nodeResourceEstimator) nodeMaxAvailableReplica(node *schedulerframewor
105106
rest.AllowedPodNumber = util.MaxInt64(rest.AllowedPodNumber-int64(len(node.Pods)), 0)
106107
return int32(rest.MaxDivided(rl)) // #nosec G115: integer overflow conversion int64 -> int32
107108
}
109+
110+
// EstimateComponents the sets allowed by the node resources for a given pb.Component.
111+
func (pl *nodeResourceEstimator) EstimateComponents(_ context.Context, snapshot *schedcache.Snapshot, components []pb.Component) (int32, *framework.Result) {
112+
if !pl.enabled {
113+
klog.V(5).Info("Estimator Plugin", "name", Name, "enabled", pl.enabled)
114+
return math.MaxInt32, framework.NewResult(framework.Noopperation, fmt.Sprintf("%s is disabled", pl.Name()))
115+
}
116+
117+
if len(components) == 0 {
118+
return 0, framework.AsResult(fmt.Errorf("no components specified"))
119+
}
120+
121+
nodes, err := getNodeRestResource(snapshot)
122+
if err != nil {
123+
return 0, framework.AsResult(err)
124+
}
125+
126+
var sets int32
127+
for canAssignOneComponentSets(newTasks(components), nodes) {
128+
sets++
129+
}
130+
131+
if sets == 0 {
132+
return 0, framework.NewResult(framework.Unschedulable, "no enough resources")
133+
}
134+
return sets, framework.NewResult(framework.Success)
135+
}
136+
137+
// getNodeRestResource calculates the remaining available resources for each node in the cluster.
138+
// It clones each node and subtracts the already requested resources and existing pod count
139+
// to determine how much capacity is left for new workloads.
140+
// Returns a slice of NodeInfo with updated allocatable resources representing available capacity.
141+
func getNodeRestResource(snapshot *schedcache.Snapshot) ([]*schedulerframework.NodeInfo, error) {
142+
allNodes, err := snapshot.NodeInfos().List()
143+
if err != nil {
144+
return nil, err
145+
}
146+
147+
rest := make([]*schedulerframework.NodeInfo, 0, len(allNodes))
148+
for _, node := range allNodes {
149+
n := node.Clone()
150+
n.Allocatable.SubResource(n.Requested)
151+
n.Allocatable.AllowedPodNumber = util.MaxInt64(n.Allocatable.AllowedPodNumber-int64(len(node.Pods)), 0)
152+
rest = append(rest, n)
153+
}
154+
155+
return rest, nil
156+
}
157+
158+
// canAssignOneComponentSets attempts to schedule one complete set of components across the available nodes.
159+
// It returns true if all components in the set can be successfully scheduled, false otherwise.
160+
// The function modifies the node resources as it assigns replicas to simulate actual scheduling.
161+
func canAssignOneComponentSets(ts *tasks, allNodes []*schedulerframework.NodeInfo) bool {
162+
for !ts.done() {
163+
i, t := ts.getTask()
164+
if i == -1 {
165+
// No more tasks to schedule, but done() returned false - this shouldn't happen
166+
return false
167+
}
168+
169+
scheduled := false
170+
for _, node := range allNodes {
171+
if !matchNode(t, node) {
172+
continue
173+
}
174+
needResource := util.NewResource(t.ResourceRequest)
175+
needResource.AllowedPodNumber = 1
176+
if node.Allocatable.Allocatable(needResource) {
177+
// Assign one replica to this node.
178+
node.Allocatable.SubResource(needResource)
179+
ts.scheduleOne(i)
180+
scheduled = true
181+
break
182+
}
183+
}
184+
185+
if !scheduled {
186+
// No node can fit this task, cannot complete the component set
187+
return false
188+
}
189+
}
190+
191+
return ts.done()
192+
}
193+
194+
// task represents a single component type with its scheduling requirements and remaining replicas.
195+
type task struct {
196+
// replicaRequirements defines the resource and scheduling constraints for each replica
197+
replicaRequirements pb.ReplicaRequirements
198+
// toBeScheduled tracks how many replicas of this component still need to be scheduled
199+
toBeScheduled int32
200+
}
201+
202+
// tasks manages a collection of component tasks for scheduling estimation.
203+
// It tracks the remaining replicas for each component type that need to be scheduled.
204+
type tasks struct {
205+
// items contains all component tasks to be scheduled
206+
items []task
207+
}
208+
209+
// newTasks creates a new task collection from the given components.
210+
// Each component is converted to a task with its replica requirements and count.
211+
func newTasks(components []pb.Component) *tasks {
212+
ts := make([]task, 0, len(components))
213+
for _, component := range components {
214+
ts = append(ts, task{
215+
replicaRequirements: component.ReplicaRequirements,
216+
toBeScheduled: component.Replicas,
217+
})
218+
}
219+
220+
return &tasks{
221+
items: ts,
222+
}
223+
}
224+
225+
// getTask returns the index and replica requirements of the first task that still needs to be scheduled.
226+
// It scans through all tasks to find one with remaining replicas to schedule.
227+
// Returns (-1, empty ReplicaRequirements) if no unfinished tasks are found.
228+
func (t *tasks) getTask() (int, pb.ReplicaRequirements) {
229+
for i := 0; i < len(t.items); i++ {
230+
if t.items[i].toBeScheduled > 0 {
231+
return i, t.items[i].replicaRequirements
232+
}
233+
}
234+
235+
return -1, pb.ReplicaRequirements{}
236+
}
237+
238+
// done returns true if all tasks have been completely scheduled (no replicas remaining).
239+
// This indicates that a complete component set has been successfully allocated.
240+
func (t *tasks) done() bool {
241+
for _, tk := range t.items {
242+
if tk.toBeScheduled > 0 {
243+
return false
244+
}
245+
}
246+
return true
247+
}
248+
249+
// scheduleOne decrements the replica count for the task at the specified index.
250+
// This should be called when a replica has been successfully scheduled on a node.
251+
func (t *tasks) scheduleOne(index int) {
252+
if index < 0 || index >= len(t.items) {
253+
// Invalid index - defensive programming
254+
return
255+
}
256+
if t.items[index].toBeScheduled > 0 {
257+
t.items[index].toBeScheduled--
258+
}
259+
}
260+
261+
// matchNode checks whether the node matches the replicaRequirements' node affinity and tolerations.
262+
func matchNode(replicaRequirements pb.ReplicaRequirements, node *schedulerframework.NodeInfo) bool {
263+
affinity := nodeutil.GetRequiredNodeAffinity(replicaRequirements)
264+
var tolerations []corev1.Toleration
265+
266+
if replicaRequirements.NodeClaim != nil {
267+
tolerations = replicaRequirements.NodeClaim.Tolerations
268+
}
269+
270+
if !nodeutil.IsNodeAffinityMatched(node.Node(), affinity) || !nodeutil.IsTolerationMatched(node.Node(), tolerations) {
271+
return false
272+
}
273+
274+
return true
275+
}

0 commit comments

Comments
 (0)