• kube-scheduler framework


    调度框架用来保存配置的插件,并在调度算法中被执行遍历插件,其实现了如下接口

    type Framework interface {
    	//提供了插件获取调度框架里数据的接口和抢占接口,见下面注释
    	Handle
    
    	//返回排序函数,用在调度队列中,用来给pod排序
    	QueueSortFunc() LessFunc
    
    	//用来执行配置的PreFilter插件
    	RunPreFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod) *Status
    
    	//用来执行配置的PostFilter插件
    	RunPostFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, filteredNodeStatusMap NodeToStatusMap) (*PostFilterResult, *Status)
    
    	//用来执行配置的PreBind插件
    	RunPreBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status
    
    	//用来执行配置的PostBind插件
    	RunPostBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string)
    
    	//用来执行配置的Reserve插件的Reserve方法
    	RunReservePluginsReserve(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status
    
    	//用来执行配置的Reserve插件的Unreserve方法
    	RunReservePluginsUnreserve(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string)
    
    	//用来执行配置的Permit插件
    	RunPermitPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status
    
    	//如果pod是一个等待pod,则堵塞,直到等待pod返回拒绝或者允许
    	WaitOnPermit(ctx context.Context, pod *v1.Pod) *Status
    
    	//用来执行配置的Bind插件
    	RunBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status
    
    	//返回是否配置了Filter插件
    	HasFilterPlugins() bool
    
    	//返回是否配置了PostFilter插件
    	HasPostFilterPlugins() bool
    
    	//返回是否配置了Score插件
    	HasScorePlugins() bool
    
    	//获取map,key为扩展点名字,value为扩展点上配置的插件
    	ListPlugins() *config.Plugins
    
    	//返回此framework的名字,由此可见,一个framework对应一个profile
    	ProfileName() string
    }
    
    type Handle interface {
    	//抽象了操作提名pod的函数,会在抢占流程中讲解
    	PodNominator
    	//抽象了运行插件的函数
    	PluginsRunner
    	
    	SnapshotSharedLister() SharedLister
    
    	IterateOverWaitingPods(callback func(WaitingPod))
    
    	//根据pod UID获取WaitingPod,目前只在抢占插件中被调用
    	GetWaitingPod(uid types.UID) WaitingPod
    
    	//拒绝WaitingPod,即让WaitingPod不可调度,目前只在抢占插件中被调用
    	RejectWaitingPod(uid types.UID) bool
    
    	//获取和apiserver交互的接口,比如bind插件和apiserver交互来bind pod
    	ClientSet() clientset.Interface
    
    	KubeConfig() *restclient.Config
    
    	EventRecorder() events.EventRecorder
    
    	//有些插件需要从informer获取数据
    	SharedInformerFactory() informers.SharedInformerFactory
    
    	//为提名pod运行Filter插件,此函数在抢占流程和正常调度流程都会被调用,后面会详细讲解此函数
    	RunFilterPluginsWithNominatedPods(ctx context.Context, state *CycleState, pod *v1.Pod, info *NodeInfo) *Status
    
    	//暂时忽略
    	Extenders() []Extender
    
    	//获取并行个数
    	Parallelizer() parallelize.Parallelizer
    }
    
    //官网说此接口是抢占插件(PostFilter)用来评估当某些运行的pod被抢占后,调度成功的可行性。
    //但目前RunPreScorePlugins和RunScorePlugins只用在正常调度,RunFilterPlugins用在正常调度和抢占流程,其他两个只用在抢占流程
    type PluginsRunner interface {
    	//执行PreScore插件
    	RunPreScorePlugins(context.Context, *CycleState, *v1.Pod, []*v1.Node) *Status
    	//执行Score插件
    	RunScorePlugins(context.Context, *CycleState, *v1.Pod, []*v1.Node) (PluginToNodeScores, *Status)
    	//执行Filter插件
    	RunFilterPlugins(context.Context, *CycleState, *v1.Pod, *NodeInfo) PluginToStatus
    	//调用插件的AddPod函数,只用在抢占流程 
    	RunPreFilterExtensionAddPod(ctx context.Context, state *CycleState, podToSchedule *v1.Pod, podInfoToAdd *PodInfo, nodeInfo *NodeInfo) *Status
    	//调用插件的RemovePod函数,只用在抢占流程 
    	RunPreFilterExtensionRemovePod(ctx context.Context, state *CycleState, podToSchedule *v1.Pod, podInfoToRemove *PodInfo, nodeInfo *NodeInfo) *Status
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100

    frameworkImpl为调度框架的具体实现,其实现了上面的接口

    // frameworkImpl is the component responsible for initializing and running scheduler
    // plugins.
    type frameworkImpl struct {
    	//支持的所有插件,保存后好像也没用到
    	registry             Registry
    	snapshotSharedLister framework.SharedLister
    	//用来保存waitingPod
    	waitingPods          *waitingPodsMap
    	//保存score插件的权重
    	scorePluginWeight    map[string]int
    	//这里保存了每个扩展点上的插件
    	queueSortPlugins     []framework.QueueSortPlugin
    	preFilterPlugins     []framework.PreFilterPlugin
    	filterPlugins        []framework.FilterPlugin
    	postFilterPlugins    []framework.PostFilterPlugin
    	preScorePlugins      []framework.PreScorePlugin
    	scorePlugins         []framework.ScorePlugin
    	reservePlugins       []framework.ReservePlugin
    	preBindPlugins       []framework.PreBindPlugin
    	bindPlugins          []framework.BindPlugin
    	postBindPlugins      []framework.PostBindPlugin
    	permitPlugins        []framework.PermitPlugin
    
    	//和apiserver通信的接口
    	clientSet       clientset.Interface
    	kubeConfig      *restclient.Config
    	eventRecorder   events.EventRecorder
    	//informer获取数据接口
    	informerFactory informers.SharedInformerFactory
    
    	metricsRecorder *metricsRecorder
    	//一个profiler对应一个调用框架的实现frameworkImpl
    	profileName     string
    
    	extenders []framework.Extender
    	framework.PodNominator
    
    	parallelizer parallelize.Parallelizer
    
    	// Indicates that RunFilterPlugins should accumulate all failed statuses and not return
    	// after the first failure.
    	runAllFilters bool
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    调度框架初始化

    //r为所有的插件
    //profile为一个调度器的配置
    func NewFramework(r Registry, profile *config.KubeSchedulerProfile, opts ...Option) (framework.Framework, error) {
    	options := defaultFrameworkOptions()
    	for _, opt := range opts {
    		opt(&options)
    	}
    
    	f := &frameworkImpl{
    		registry:             r,
    		snapshotSharedLister: options.snapshotSharedLister,
    		scorePluginWeight:    make(map[string]int),
    		waitingPods:          newWaitingPodsMap(),
    		clientSet:            options.clientSet,
    		kubeConfig:           options.kubeConfig,
    		eventRecorder:        options.eventRecorder,
    		informerFactory:      options.informerFactory,
    		metricsRecorder:      options.metricsRecorder,
    		runAllFilters:        options.runAllFilters,
    		extenders:            options.extenders,
    		PodNominator:         options.podNominator,
    		parallelizer:         options.parallelizer,
    	}
    
    	if profile == nil {
    		return f, nil
    	}
    
    	f.profileName = profile.SchedulerName
    	if profile.Plugins == nil {
    		return f, nil
    	}
    	...
    	//获取每个扩展点上使能的插件。
    	//因为一个插件可能在多个扩展点上,这里通过map结构进行去重,map的key为插件名字,value为插件,
    	//为什么去重?因为后面会调用factory初始化插件对象,一个插件初始化一次即可
    	pg := f.pluginsNeeded(profile.Plugins)
    
    	//将插件参数保存到pluginConfig,key为插件名字,value为插件参数
    	pluginConfig := make(map[string]runtime.Object, len(profile.PluginConfig))
    	for i := range profile.PluginConfig {
    		name := profile.PluginConfig[i].Name
    		if _, ok := pluginConfig[name]; ok {
    			return nil, fmt.Errorf("repeated config for plugin %s", name)
    		}
    		pluginConfig[name] = profile.PluginConfig[i].Args
    	}
    	...
    	//pluginsMap用来保存初始化后的插件对象
    	pluginsMap := make(map[string]framework.Plugin)
    	//遍历所有插件
    	for name, factory := range r {
    		//跳过没使能的插件
    		if _, ok := pg[name]; !ok {
    			continue
    		}
    
    		args := pluginConfig[name]
    		...
    		//调用插件的初始化函数,将f作为参数传给插件,这样插件就可以获取调度框架的数据
    		//比如pkg/scheduler/framework/plugins/serviceaffinity/service_affinity.go:New()
    		p, err := factory(args, f)
    		if err != nil {
    			return nil, fmt.Errorf("initializing plugin %q: %w", name, err)
    		}
    		pluginsMap[name] = p
    
    		// Update ClusterEventMap in place.
    		fillEventToPluginMap(p, options.clusterEventMap)
    	}
    
    	//因为一个插件可能在多个扩展点,这里将初始化后的插件对象保存到各个扩展点上
    	for _, e := range f.getExtensionPoints(profile.Plugins) {
    		if err := updatePluginList(e.slicePtr, *e.plugins, pluginsMap); err != nil {
    			return nil, err
    		}
    	}
    	...
    	//排序插件个数不能为0
    	if len(f.queueSortPlugins) == 0 {
    		return nil, fmt.Errorf("no queue sort plugin is enabled")
    	}
    	//排序插件个数不能大于1,所以一个调度器只能使能一个排序插件
    	if len(f.queueSortPlugins) > 1 {
    		return nil, fmt.Errorf("only one queue sort plugin can be enabled")
    	}
    	//bind插件个数不能为0
    	if len(f.bindPlugins) == 0 {
    		return nil, fmt.Errorf("at least one bind plugin is needed")
    	}
    	...
    
    	return f, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94

    RunPreFilterPlugins
    执行配置在PreFilter扩展点上的插件

    func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (status *framework.Status) {
    	startTime := time.Now()
    	defer func() {
    		metrics.FrameworkExtensionPointDuration.WithLabelValues(preFilter, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
    	}()
    	//遍历扩展点上的插件
    	for _, pl := range f.preFilterPlugins {
    		status = f.runPreFilterPlugin(ctx, pl, state, pod)
    		if !status.IsSuccess() {
    			status.SetFailedPlugin(pl.Name())
    			if status.IsUnschedulable() {
    				return status
    			}
    			return framework.AsStatus(fmt.Errorf("running PreFilter plugin %q: %w", pl.Name(), status.AsError())).WithFailedPlugin(pl.Name())
    		}
    	}
    
    	return nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    RunFilterPlugins
    执行配置在Filter扩展点上的插件

    func (f *frameworkImpl) RunFilterPlugins(
    	ctx context.Context,
    	state *framework.CycleState,
    	pod *v1.Pod,
    	nodeInfo *framework.NodeInfo,
    ) framework.PluginToStatus {
    	statuses := make(framework.PluginToStatus)
    	//遍历扩展点上的插件
    	for _, pl := range f.filterPlugins {
    		pluginStatus := f.runFilterPlugin(ctx, pl, state, pod, nodeInfo)
    		if !pluginStatus.IsSuccess() {
    			if !pluginStatus.IsUnschedulable() {
    				// Filter plugins are not supposed to return any status other than
    				// Success or Unschedulable.
    				errStatus := framework.AsStatus(fmt.Errorf("running %q filter plugin: %w", pl.Name(), pluginStatus.AsError())).WithFailedPlugin(pl.Name())
    				return map[string]*framework.Status{pl.Name(): errStatus}
    			}
    			pluginStatus.SetFailedPlugin(pl.Name())
    			statuses[pl.Name()] = pluginStatus
    			if !f.runAllFilters {
    				// Exit early if we don't need to run all filters.
    				return statuses
    			}
    		}
    	}
    
    	return statuses
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    RunFilterPluginsWithNominatedPods
    这里先说几个概念
    successpod: 调度成功,并且bind到node成功的pod
    assumepod:调度成功,假定pod运行在node上,bind异步执行中
    failedpod:调度失败的pod
    NominatedPods:当一个pod调度失败后,在PostFilter中执行抢占插件流程,看是否能抢占某个node上的其他低优先级的pod,如果能抢占成功,则此pod在后面调度周期很可能会被调度到此node上,这种pod被称为NominatedPods,NominatedPods虽然还没真正运行在抢占的node上,但后续的调度周期必须考虑这些pod。NominatedPods肯定是failedpod
    victimpod:牺牲的pod,这些pod是被高优先级pod抢占的pod,在运行的node上被驱逐,再重新调度

    此函数在正常调度流程和抢占流程被调用,目的是确认目标pod(参数pod)是否能运行在指定node(参数info)上。
    正常调度被调用时:在nodeinfo(successpod+assumepod)中加上比目标pod优先级高的NominatedPods后,对目标pod执行Filter扩展点上的插件,如果返回失败,则目标pod肯定不能运行在指定node上。
    抢占流程被调用时:在nodeinfo(successpod+assumepod-victimpod)中加上比目标pod优先级高的NominatedPods后,对目标pod执行Filter扩展点上的插件,如果返回失败,则目标pod肯定不能运行在指定node上。

    // RunFilterPluginsWithNominatedPods runs the set of configured filter plugins
    // for nominated pod on the given node.
    // This function is called from two different places: Schedule and Preempt.
    // When it is called from Schedule, we want to test whether the pod is
    // schedulable on the node with all the existing pods on the node plus higher
    // and equal priority pods nominated to run on the node.
    // When it is called from Preempt, we should remove the victims of preemption
    // and add the nominated pods. Removal of the victims is done by
    // SelectVictimsOnNode(). Preempt removes victims from PreFilter state and
    // NodeInfo before calling this function.
    func (f *frameworkImpl) RunFilterPluginsWithNominatedPods(ctx context.Context, state *framework.CycleState, pod *v1.Pod, info *framework.NodeInfo) *framework.Status {
    	var status *framework.Status
    
    	podsAdded := false
    	//有些场景下需要执行两次filters. 如果node上有比目标pod优先级高或者相等的nominated pod,会将这些pod资源添加到node上,执行filters。
    	//如果所有的filters都通过了,还要不加这些nominatedpod资源,再次执行filters,因为有些filters(比如pod亲和性filter)在
    	//不加nominated pod时可能会失败。
    	
    	//如果没有比目标pod优先级高或者相等的nominated pod或者第一次执行失败了,不会再执行第二次。
    	
    	//这里为什么只考虑比目标pod优先级高或者相等的nominated pod呢,因为这些pod本身不占用node资源,并且在抢占流程中会将这些pod的提名node删除,
    	//从而进行正常调度,可参考函数prepareCandidate。
    	
    	//为什么要执行两次来保证目标pod在加和不加nominated pod时都能被调度呢,这是一个保守的决定:有些filter(比如pod间反亲和性filter)可能在
    	//加了nominated pod时失败,有些filter(比如pod间亲和性filter)可能在不加nominated pod时失败。我们不能假定nominated pod已经运行在node
    	//上了,因为他们还没真正调度到此node上,而且最终还有可能被调度到其他node上。
    	for i := 0; i < 2; i++ {
    		stateToUse := state
    		nodeInfoToUse := info
    		//第一轮执行,加上抢占成功的pod
    		if i == 0 {
    			var err error
    			//将node上优先级比目标pod高的抢占成功的pod的资源考虑进去保存到nodeInfoToUse
    			//如果有这样的pod,则podsAdded为true
    			podsAdded, stateToUse, nodeInfoToUse, err = addNominatedPods(ctx, f, pod, state, info)
    			if err != nil {
    				return framework.AsStatus(err)
    			}
    		//如果没有抢占pod,或者第一轮执行失败
    		} else if !podsAdded || !status.IsSuccess() {
    			break
    		}
    
    		statusMap := f.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse)
    		status = statusMap.Merge()
    		if !status.IsSuccess() && !status.IsUnschedulable() {
    			return status
    		}
    	}
    
    	return status
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
  • 相关阅读:
    剑指 Offer 10- I. 斐波那契数列
    【编程教室】PONG - 100行代码写一个弹球游戏
    计算机网络---第四章网络层---ipv4---应用题
    基于MetaTown构建数字资产平台
    金仓数据库KingbaseES 插件kdb_date_function
    误差卡尔曼中的四元数运动学-第二章
    Python函数
    Istio知识点
    unity 血条跟随
    java通过IO流下载保存文件
  • 原文地址:https://blog.csdn.net/fengcai_ke/article/details/127471160