Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 109 additions & 38 deletions pkg/scheduler/plugins/predicates/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,17 @@ type PredicatesPlugin struct {

FilterPlugins map[string]fwk.FilterPlugin
StableFilterPlugins map[string]fwk.FilterPlugin // Subset of FilterPlugins for cache-stable filters
PrefilterPlugins map[string]fwk.PreFilterPlugin
PreFilterPlugins map[string]fwk.PreFilterPlugin
ReservePlugins map[string]fwk.ReservePlugin
PreBindPlugins map[string]fwk.PreBindPlugin
ScorePlugins map[string]nodescore.BaseScorePlugin
ScoreWeights map[string]int // Weight for each score plugin
FilterOrder []string
StableFilterOrder []string
PreFilterOrder []string
ReserveOrder []string
PreBindOrder []string
ScoreOrder []string
PredicateCache *predicateCache
Handle fwk.Handle
}
Expand Down Expand Up @@ -413,7 +419,11 @@ func (pp *PredicatesPlugin) PrePredicate(task *api.TaskInfo, state *k8sframework
}

// Run all PreFilter plugins
for name, plugin := range pp.PrefilterPlugins {
for _, name := range pp.PreFilterOrder {
plugin, exists := pp.PreFilterPlugins[name]
if !exists {
continue
}
_, status := plugin.PreFilter(context.TODO(), state, task.Pod, nodeInfoList)
if err := handleSkipPrePredicatePlugin(status, state, task, name); err != nil {
return err
Expand All @@ -431,14 +441,46 @@ func (pp *PredicatesPlugin) InitPlugin() {
scorePlugins := map[string]nodescore.BaseScorePlugin{}
preBindPlugins := map[string]fwk.PreBindPlugin{}
scoreWeights := map[string]int{} // Weight for each score plugin
var filterOrder []string
var stableFilterOrder []string
var preFilterOrder []string
var reserveOrder []string
var preBindOrder []string
var scoreOrder []string

addFilterPlugin := func(name string, plugin fwk.FilterPlugin) {
filterPlugins[name] = plugin
filterOrder = append(filterOrder, name)
}
addStableFilterPlugin := func(name string, plugin fwk.FilterPlugin) {
stableFilterPlugins[name] = plugin
stableFilterOrder = append(stableFilterOrder, name)
}
addPreFilterPlugin := func(name string, plugin fwk.PreFilterPlugin) {
prefilterPlugins[name] = plugin
preFilterOrder = append(preFilterOrder, name)
}
addReservePlugin := func(name string, plugin fwk.ReservePlugin) {
reservePlugins[name] = plugin
reserveOrder = append(reserveOrder, name)
}
addPreBindPlugin := func(name string, plugin fwk.PreBindPlugin) {
preBindPlugins[name] = plugin
preBindOrder = append(preBindOrder, name)
}
addScorePlugin := func(name string, plugin nodescore.BaseScorePlugin, weight int) {
scorePlugins[name] = plugin
scoreOrder = append(scoreOrder, name)
scoreWeights[name] = weight
}

// Initialize k8s plugins
// TODO: Add more predicates, k8s.io/kubernetes/pkg/scheduler/framework/plugins/legacy_registry.go
// 1. NodeUnschedulable (stable filter for cache)
if plugin, err := nodeunschedulable.New(context.TODO(), nil, pp.Handle, pp.features); err == nil {
nodeUnscheduleFilter := plugin.(*nodeunschedulable.NodeUnschedulable)
filterPlugins[nodeunschedulable.Name] = nodeUnscheduleFilter
stableFilterPlugins[nodeunschedulable.Name] = nodeUnscheduleFilter
addFilterPlugin(nodeunschedulable.Name, nodeUnscheduleFilter)
addStableFilterPlugin(nodeunschedulable.Name, nodeUnscheduleFilter)
} else {
klog.Errorf("Failed to init %s plugin %v", nodeunschedulable.Name, err)
}
Expand All @@ -450,8 +492,8 @@ func (pp *PredicatesPlugin) InitPlugin() {
}
if plugin, err := nodeaffinity.New(context.TODO(), &nodeAffinityArgs, pp.Handle, pp.features); err == nil {
nodeAffinityFilter := plugin.(*nodeaffinity.NodeAffinity)
filterPlugins[nodeaffinity.Name] = nodeAffinityFilter
stableFilterPlugins[nodeaffinity.Name] = nodeAffinityFilter
addFilterPlugin(nodeaffinity.Name, nodeAffinityFilter)
addStableFilterPlugin(nodeaffinity.Name, nodeAffinityFilter)
} else {
klog.Errorf("Failed to init %s plugin %v", nodeaffinity.Name, err)
}
Expand All @@ -460,8 +502,8 @@ func (pp *PredicatesPlugin) InitPlugin() {
if pp.enabledPredicates.nodePortEnable {
if plugin, err := nodeports.New(context.TODO(), nil, pp.Handle, pp.features); err == nil {
nodePortFilter := plugin.(*nodeports.NodePorts)
filterPlugins[nodeports.Name] = nodePortFilter
prefilterPlugins[nodeports.Name] = nodePortFilter
addFilterPlugin(nodeports.Name, nodePortFilter)
addPreFilterPlugin(nodeports.Name, nodePortFilter)
} else {
klog.Errorf("Failed to init %s plugin %v", nodeports.Name, err)
}
Expand All @@ -470,8 +512,8 @@ func (pp *PredicatesPlugin) InitPlugin() {
if pp.enabledPredicates.taintTolerationEnable {
if plugin, err := tainttoleration.New(context.TODO(), nil, pp.Handle, pp.features); err == nil {
tolerationFilter := plugin.(*tainttoleration.TaintToleration)
filterPlugins[tainttoleration.Name] = tolerationFilter
stableFilterPlugins[tainttoleration.Name] = tolerationFilter
addFilterPlugin(tainttoleration.Name, tolerationFilter)
addStableFilterPlugin(tainttoleration.Name, tolerationFilter)
} else {
klog.Errorf("Failed to init %s plugin %v", tainttoleration.Name, err)
}
Expand All @@ -481,8 +523,8 @@ func (pp *PredicatesPlugin) InitPlugin() {
plArgs := &config.InterPodAffinityArgs{}
if plugin, err := interpodaffinity.New(context.TODO(), plArgs, pp.Handle, pp.features); err == nil {
podAffinityFilter := plugin.(*interpodaffinity.InterPodAffinity)
filterPlugins[interpodaffinity.Name] = podAffinityFilter
prefilterPlugins[interpodaffinity.Name] = podAffinityFilter
addFilterPlugin(interpodaffinity.Name, podAffinityFilter)
addPreFilterPlugin(interpodaffinity.Name, podAffinityFilter)
} else {
klog.Errorf("Failed to init %s plugin %v", interpodaffinity.Name, err)
}
Expand All @@ -491,7 +533,7 @@ func (pp *PredicatesPlugin) InitPlugin() {
if pp.enabledPredicates.nodeVolumeLimitsEnable {
if plugin, err := nodevolumelimits.NewCSI(context.TODO(), nil, pp.Handle, pp.features); err == nil {
nodeVolumeLimitsCSIFilter := plugin.(*nodevolumelimits.CSILimits)
filterPlugins[nodevolumelimits.CSIName] = nodeVolumeLimitsCSIFilter
addFilterPlugin(nodevolumelimits.CSIName, nodeVolumeLimitsCSIFilter)
} else {
klog.Errorf("Failed to init %s plugin %v", nodevolumelimits.CSIName, err)
}
Expand All @@ -500,7 +542,7 @@ func (pp *PredicatesPlugin) InitPlugin() {
if pp.enabledPredicates.volumeZoneEnable {
if plugin, err := volumezone.New(context.TODO(), nil, pp.Handle, pp.features); err == nil {
volumeZoneFilter := plugin.(*volumezone.VolumeZone)
filterPlugins[volumezone.Name] = volumeZoneFilter
addFilterPlugin(volumezone.Name, volumeZoneFilter)
} else {
klog.Errorf("Failed to init %s plugin %v", volumezone.Name, err)
}
Expand All @@ -511,8 +553,8 @@ func (pp *PredicatesPlugin) InitPlugin() {
ptsArgs := &config.PodTopologySpreadArgs{DefaultingType: config.SystemDefaulting}
if plugin, err := podtopologyspread.New(context.TODO(), ptsArgs, pp.Handle, pp.features); err == nil {
podTopologySpreadFilter := plugin.(*podtopologyspread.PodTopologySpread)
filterPlugins[podtopologyspread.Name] = podTopologySpreadFilter
prefilterPlugins[podtopologyspread.Name] = podTopologySpreadFilter
addFilterPlugin(podtopologyspread.Name, podTopologySpreadFilter)
addPreFilterPlugin(podtopologyspread.Name, podTopologySpreadFilter)
} else {
klog.Errorf("Failed to init %s plugin %v", podtopologyspread.Name, err)
}
Expand All @@ -534,12 +576,11 @@ func (pp *PredicatesPlugin) InitPlugin() {
volumeBindingPluginInstance = plugin.(*vbcap.VolumeBinding)
})

filterPlugins[vbcap.Name] = volumeBindingPluginInstance
prefilterPlugins[vbcap.Name] = volumeBindingPluginInstance
reservePlugins[vbcap.Name] = volumeBindingPluginInstance
preBindPlugins[vbcap.Name] = volumeBindingPluginInstance
scorePlugins[vbcap.Name] = volumeBindingPluginInstance
scoreWeights[vbcap.Name] = vbArgs.Weight // Set weight from plugin args
addFilterPlugin(vbcap.Name, volumeBindingPluginInstance)
addPreFilterPlugin(vbcap.Name, volumeBindingPluginInstance)
addReservePlugin(vbcap.Name, volumeBindingPluginInstance)
addPreBindPlugin(vbcap.Name, volumeBindingPluginInstance)
addScorePlugin(vbcap.Name, volumeBindingPluginInstance, vbArgs.Weight)
}
// 10. DRA
if pp.enabledPredicates.dynamicResourceAllocationEnable {
Expand All @@ -550,19 +591,25 @@ func (pp *PredicatesPlugin) InitPlugin() {
klog.Fatalf("failed to create dra plugin with err: %v", err)
}
dynamicResourceAllocationPlugin := plugin.(*dynamicresources.DynamicResources)
filterPlugins[dynamicresources.Name] = dynamicResourceAllocationPlugin
prefilterPlugins[dynamicresources.Name] = dynamicResourceAllocationPlugin
reservePlugins[dynamicresources.Name] = dynamicResourceAllocationPlugin
preBindPlugins[dynamicresources.Name] = dynamicResourceAllocationPlugin
addFilterPlugin(dynamicresources.Name, dynamicResourceAllocationPlugin)
addPreFilterPlugin(dynamicresources.Name, dynamicResourceAllocationPlugin)
addReservePlugin(dynamicresources.Name, dynamicResourceAllocationPlugin)
addPreBindPlugin(dynamicresources.Name, dynamicResourceAllocationPlugin)
}

pp.FilterPlugins = filterPlugins
pp.StableFilterPlugins = stableFilterPlugins
pp.PrefilterPlugins = prefilterPlugins
pp.PreFilterPlugins = prefilterPlugins
pp.ReservePlugins = reservePlugins
pp.PreBindPlugins = preBindPlugins
pp.ScorePlugins = scorePlugins
pp.ScoreWeights = scoreWeights
pp.FilterOrder = filterOrder
pp.StableFilterOrder = stableFilterOrder
pp.PreFilterOrder = preFilterOrder
pp.ReserveOrder = reserveOrder
pp.PreBindOrder = preBindOrder
pp.ScoreOrder = scoreOrder
}

// Predicate runs all Filter plugins for the given task and node.
Expand Down Expand Up @@ -596,7 +643,11 @@ func (pp *PredicatesPlugin) Predicate(task *api.TaskInfo, node *api.NodeInfo, st
// Run all stable filter plugins (for cache)
predicateStatus := make([]*api.Status, 0)

for name, plugin := range pp.StableFilterPlugins {
for _, name := range pp.StableFilterOrder {
plugin, exists := pp.StableFilterPlugins[name]
if !exists {
continue
}
status := plugin.Filter(context.TODO(), state, task.Pod, nodeInfo)
filterStatus := api.ConvertPredicateStatus(status)
if filterStatus.Code != api.Success {
Expand Down Expand Up @@ -636,7 +687,11 @@ func (pp *PredicatesPlugin) Predicate(task *api.TaskInfo, node *api.NodeInfo, st
}

// Run all Filter plugins (except those in StableFilterPlugins)
for name, plugin := range pp.FilterPlugins {
for _, name := range pp.FilterOrder {
plugin, exists := pp.FilterPlugins[name]
if !exists {
continue
}
Comment on lines 689 to +694
// Skip plugins that are already handled in predicateByStablefilter
if _, isStable := pp.StableFilterPlugins[name]; isStable {
continue
Expand Down Expand Up @@ -669,7 +724,11 @@ func (pp *PredicatesPlugin) BatchNodeOrder(task *api.TaskInfo, nodes []fwk.NodeI
nodeScores := make(map[string]float64, len(nodes))

// Run all Score plugins
for name, plugin := range pp.ScorePlugins {
for _, name := range pp.ScoreOrder {
plugin, exists := pp.ScorePlugins[name]
if !exists {
continue
}
// Get normalizer (most plugins don't need normalization, use EmptyNormalizer by default)
normalizer := &nodescore.EmptyNormalizer{}

Expand Down Expand Up @@ -699,7 +758,11 @@ func (pp *PredicatesPlugin) BatchNodeOrder(task *api.TaskInfo, nodes []fwk.NodeI
func (pp *PredicatesPlugin) runReservePlugins(ssn *framework.Session, event *framework.Event) {
state := ssn.GetCycleState(event.Task.UID)

for name, plugin := range pp.ReservePlugins {
for _, name := range pp.ReserveOrder {
plugin, exists := pp.ReservePlugins[name]
if !exists {
continue
}
status := plugin.Reserve(context.TODO(), state, event.Task.Pod, event.Task.Pod.Spec.NodeName)
if !status.IsSuccess() {
klog.Errorf("Reserve plugin %s failed for pod %s/%s: %v", name, event.Task.Namespace, event.Task.Name, status.AsError())
Expand All @@ -711,9 +774,16 @@ func (pp *PredicatesPlugin) runReservePlugins(ssn *framework.Session, event *fra

func (pp *PredicatesPlugin) runUnReservePlugins(ssn *framework.Session, event *framework.Event) {
state := ssn.GetCycleState(event.Task.UID)
pp.runUnreservePluginsWithState(context.TODO(), state, event.Task.Pod, event.Task.Pod.Spec.NodeName)
}

for _, plugin := range pp.ReservePlugins {
plugin.Unreserve(context.TODO(), state, event.Task.Pod, event.Task.Pod.Spec.NodeName)
func (pp *PredicatesPlugin) runUnreservePluginsWithState(ctx context.Context, state *k8sframework.CycleState, pod *v1.Pod, nodeName string) {
for i := len(pp.ReserveOrder) - 1; i >= 0; i-- {
plugin, exists := pp.ReservePlugins[pp.ReserveOrder[i]]
if !exists {
continue
}
plugin.Unreserve(ctx, state, pod, nodeName)
}
}

Expand Down Expand Up @@ -751,7 +821,11 @@ func (pp *PredicatesPlugin) PreBind(ctx context.Context, bindCtx *cache.BindCont
state := bindCtx.Extensions[pp.Name()].(*BindContextExtension).State

// Run all PreBind plugins
for name, plugin := range pp.PreBindPlugins {
for _, name := range pp.PreBindOrder {
plugin, exists := pp.PreBindPlugins[name]
if !exists {
continue
}
status := plugin.PreBind(ctx, state, bindCtx.TaskInfo.Pod, bindCtx.TaskInfo.Pod.Spec.NodeName)
if !status.IsSuccess() {
klog.Errorf("PreBind plugin %s failed for pod %s/%s: %v", name, bindCtx.TaskInfo.Namespace, bindCtx.TaskInfo.Name, status.AsError())
Expand All @@ -768,10 +842,7 @@ func (pp *PredicatesPlugin) PreBindRollBack(ctx context.Context, bindCtx *cache.
}

state := bindCtx.Extensions[pp.Name()].(*BindContextExtension).State

for _, plugin := range pp.ReservePlugins {
plugin.Unreserve(ctx, state, bindCtx.TaskInfo.Pod, bindCtx.TaskInfo.Pod.Spec.NodeName)
}
pp.runUnreservePluginsWithState(ctx, state, bindCtx.TaskInfo.Pod, bindCtx.TaskInfo.Pod.Spec.NodeName)
}

func (pp *PredicatesPlugin) SetupBindContextExtension(state *k8sframework.CycleState, bindCtx *cache.BindContext) {
Expand Down
Loading
Loading