• kube-scheduler源码分析(三)之 scheduleOne
  • 1. Scheduler.Run
  • 2. Scheduler.scheduleOne
  • 3. config.NextPod
    • 3.1. getNextPod
  • 4. Scheduler.schedule
    • 4.1. ScheduleAlgorithm
  • 5. genericScheduler.Schedule
    • 5.1. podPassesBasicChecks
    • 5.2. findNodesThatFit
    • 5.3. PrioritizeNodes
    • 5.4. selectHost
      • 5.4.1. findMaxScores
  • 6. Scheduler.preempt
  • 7. Scheduler.assume
  • 8. Scheduler.bind
  • 9. 总结

    kube-scheduler源码分析(三)之 scheduleOne

    以下代码分析基于 kubernetes v1.12.0 版本。

    本文主要分析/pkg/scheduler/中调度的基本流程。具体的预选调度逻辑优选调度逻辑节点抢占逻辑待后续再独立分析。

    scheduler的pkg代码目录结构如下:

    1. scheduler
    2. ├── algorithm # 主要包含调度的算法
    3. ├── predicates # 预选的策略
    4. ├── priorities # 优选的策略
    5. ├── scheduler_interface.go # ScheduleAlgorithm、SchedulerExtender接口定义
    6. ├── types.go # 使用到的type的定义
    7. ├── algorithmprovider
    8. ├── defaults
    9. ├── defaults.go # 默认算法的初始化操作,包括预选和优选策略
    10. ├── cache # scheduler调度使用到的cache
    11. ├── cache.go # schedulerCache
    12. ├── interface.go
    13. ├── node_info.go
    14. ├── node_tree.go
    15. ├── core # 调度逻辑的核心代码
    16. ├── equivalence
    17. ├── eqivalence.go # 存储相同pod的调度结果缓存,主要给预选策略使用
    18. ├── extender.go
    19. ├── generic_scheduler.go # genericScheduler,主要包含默认调度器的调度逻辑
    20. ├── scheduling_queue.go # 调度使用到的队列,主要用来存储需要被调度的pod
    21. ├── factory
    22. ├── factory.go # 主要包括NewConfigFactory、NewPodInformer,监听pod事件来更新调度队列
    23. ├── metrics
    24. └── metrics.go # 主要给prometheus使用
    25. ├── scheduler.go # pkg部分的Run入口(核心代码),主要包含Run、scheduleOne、schedule、preempt等函数
    26. └── volumebinder
    27. └── volume_binder.go # volume bind

    1. Scheduler.Run

    此部分代码位于pkg/scheduler/scheduler.go

    此处为具体调度逻辑的入口。

    1. // Run begins watching and scheduling. It waits for cache to be synced, then starts a goroutine and returns immediately.
    2. func (sched *Scheduler) Run() {
    3. if !sched.config.WaitForCacheSync() {
    4. return
    5. }
    6. go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
    7. }

    2. Scheduler.scheduleOne

    此部分代码位于pkg/scheduler/scheduler.go

    scheduleOne主要为单个pod选择一个适合的节点,为调度逻辑的核心函数。

    对单个pod进行调度的基本流程如下:

    1. 通过podQueue的待调度队列中弹出需要调度的pod。
    2. 通过具体的调度算法为该pod选出合适的节点,其中调度算法就包括预选和优选两步策略。
    3. 如果上述调度失败,则会尝试抢占机制,将优先级低的pod剔除,让优先级高的pod调度成功。
    4. 将该pod和选定的节点进行假性绑定,存入scheduler cache中,方便具体绑定操作可以异步进行。
    5. 实际执行绑定操作,将node的名字添加到pod的节点相关属性中。

    完整代码如下:

    1. // scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting.
    2. func (sched *Scheduler) scheduleOne() {
    3. pod := sched.config.NextPod()
    4. if pod.DeletionTimestamp != nil {
    5. sched.config.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
    6. glog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
    7. return
    8. }
    9. glog.V(3).Infof("Attempting to schedule pod: %v/%v", pod.Namespace, pod.Name)
    10. // Synchronously attempt to find a fit for the pod.
    11. start := time.Now()
    12. suggestedHost, err := sched.schedule(pod)
    13. if err != nil {
    14. // schedule() may have failed because the pod would not fit on any host, so we try to
    15. // preempt, with the expectation that the next time the pod is tried for scheduling it
    16. // will fit due to the preemption. It is also possible that a different pod will schedule
    17. // into the resources that were preempted, but this is harmless.
    18. if fitError, ok := err.(*core.FitError); ok {
    19. preemptionStartTime := time.Now()
    20. sched.preempt(pod, fitError)
    21. metrics.PreemptionAttempts.Inc()
    22. metrics.SchedulingAlgorithmPremptionEvaluationDuration.Observe(metrics.SinceInMicroseconds(preemptionStartTime))
    23. metrics.SchedulingLatency.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime))
    24. }
    25. return
    26. }
    27. metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInMicroseconds(start))
    28. // Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet.
    29. // This allows us to keep scheduling without waiting on binding to occur.
    30. assumedPod := pod.DeepCopy()
    31. // Assume volumes first before assuming the pod.
    32. //
    33. // If all volumes are completely bound, then allBound is true and binding will be skipped.
    34. //
    35. // Otherwise, binding of volumes is started after the pod is assumed, but before pod binding.
    36. //
    37. // This function modifies 'assumedPod' if volume binding is required.
    38. allBound, err := sched.assumeVolumes(assumedPod, suggestedHost)
    39. if err != nil {
    40. return
    41. }
    42. // assume modifies `assumedPod` by setting NodeName=suggestedHost
    43. err = sched.assume(assumedPod, suggestedHost)
    44. if err != nil {
    45. return
    46. }
    47. // bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
    48. go func() {
    49. // Bind volumes first before Pod
    50. if !allBound {
    51. err = sched.bindVolumes(assumedPod)
    52. if err != nil {
    53. return
    54. }
    55. }
    56. err := sched.bind(assumedPod, &v1.Binding{
    57. ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
    58. Target: v1.ObjectReference{
    59. Kind: "Node",
    60. Name: suggestedHost,
    61. },
    62. })
    63. metrics.E2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start))
    64. if err != nil {
    65. glog.Errorf("Internal error binding pod: (%v)", err)
    66. }
    67. }()
    68. }

    以下对重要代码分别进行分析。

    3. config.NextPod

    通过podQueue的方式存储待调度的pod队列,NextPod拿出下一个需要被调度的pod。

    1. pod := sched.config.NextPod()
    2. if pod.DeletionTimestamp != nil {
    3. sched.config.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
    4. glog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
    5. return
    6. }
    7. glog.V(3).Infof("Attempting to schedule pod: %v/%v", pod.Namespace, pod.Name)

    NextPod的具体函数在factory.go的CreateFromKey函数中定义,如下:

    1. func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*scheduler.Config, error) {
    2. ...
    3. return &scheduler.Config{
    4. ...
    5. NextPod: func() *v1.Pod {
    6. return c.getNextPod()
    7. }
    8. ...
    9. }

    3.1. getNextPod

    通过一个podQueue来存储需要调度的pod的队列,通过队列Pop的方式弹出需要被调度的pod。

    1. func (c *configFactory) getNextPod() *v1.Pod {
    2. pod, err := c.podQueue.Pop()
    3. if err == nil {
    4. glog.V(4).Infof("About to try and schedule pod %v/%v", pod.Namespace, pod.Name)
    5. return pod
    6. }
    7. glog.Errorf("Error while retrieving next pod from scheduling queue: %v", err)
    8. return nil
    9. }

    4. Scheduler.schedule

    此部分代码位于pkg/scheduler/scheduler.go

    此部分为调度逻辑的核心,通过不同的算法为具体的pod选择一个最合适的节点。

    1. // Synchronously attempt to find a fit for the pod.
    2. start := time.Now()
    3. suggestedHost, err := sched.schedule(pod)
    4. if err != nil {
    5. // schedule() may have failed because the pod would not fit on any host, so we try to
    6. // preempt, with the expectation that the next time the pod is tried for scheduling it
    7. // will fit due to the preemption. It is also possible that a different pod will schedule
    8. // into the resources that were preempted, but this is harmless.
    9. if fitError, ok := err.(*core.FitError); ok {
    10. preemptionStartTime := time.Now()
    11. sched.preempt(pod, fitError)
    12. metrics.PreemptionAttempts.Inc()
    13. metrics.SchedulingAlgorithmPremptionEvaluationDuration.Observe(metrics.SinceInMicroseconds(preemptionStartTime))
    14. metrics.SchedulingLatency.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime))
    15. }
    16. return
    17. }

    schedule通过调度算法返回一个最优的节点。

    1. // schedule implements the scheduling algorithm and returns the suggested host.
    2. func (sched *Scheduler) schedule(pod *v1.Pod) (string, error) {
    3. host, err := sched.config.Algorithm.Schedule(pod, sched.config.NodeLister)
    4. if err != nil {
    5. pod = pod.DeepCopy()
    6. sched.config.Error(pod, err)
    7. sched.config.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", "%v", err)
    8. sched.config.PodConditionUpdater.Update(pod, &v1.PodCondition{
    9. Type: v1.PodScheduled,
    10. Status: v1.ConditionFalse,
    11. Reason: v1.PodReasonUnschedulable,
    12. Message: err.Error(),
    13. })
    14. return "", err
    15. }
    16. return host, err
    17. }

    4.1. ScheduleAlgorithm

    ScheduleAlgorithm是一个调度算法的接口,主要的实现体是genericScheduler,后续分析genericScheduler.Schedule

    ScheduleAlgorithm接口定义如下:

    1. // ScheduleAlgorithm is an interface implemented by things that know how to schedule pods
    2. // onto machines.
    3. type ScheduleAlgorithm interface {
    4. Schedule(*v1.Pod, NodeLister) (selectedMachine string, err error)
    5. // Preempt receives scheduling errors for a pod and tries to create room for
    6. // the pod by preempting lower priority pods if possible.
    7. // It returns the node where preemption happened, a list of preempted pods, a
    8. // list of pods whose nominated node name should be removed, and error if any.
    9. Preempt(*v1.Pod, NodeLister, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error)
    10. // Predicates() returns a pointer to a map of predicate functions. This is
    11. // exposed for testing.
    12. Predicates() map[string]FitPredicate
    13. // Prioritizers returns a slice of priority config. This is exposed for
    14. // testing.
    15. Prioritizers() []PriorityConfig
    16. }

    5. genericScheduler.Schedule

    此部分代码位于/pkg/scheduler/core/generic_scheduler.go

    genericScheduler.Schedule实现了基本的调度逻辑,基于给定需要调度的pod和node列表,如果执行成功返回调度的节点的名字,如果执行失败,则返回错误和原因。主要通过预选和优选两步操作完成调度的逻辑。

    基本流程如下:

    1. 对pod做基本性检查,目前主要是对pvc的检查。
    2. 通过findNodesThatFit预选策略选出满足调度条件的node列表。
    3. 通过PrioritizeNodes优选策略给预选的node列表中的node进行打分。
    4. 在打分的node列表中选择一个分数最高的node作为调度的节点。

    完整代码如下:

    1. // Schedule tries to schedule the given pod to one of the nodes in the node list.
    2. // If it succeeds, it will return the name of the node.
    3. // If it fails, it will return a FitError error with reasons.
    4. func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister) (string, error) {
    5. trace := utiltrace.New(fmt.Sprintf("Scheduling %s/%s", pod.Namespace, pod.Name))
    6. defer trace.LogIfLong(100 * time.Millisecond)
    7. if err := podPassesBasicChecks(pod, g.pvcLister); err != nil {
    8. return "", err
    9. }
    10. nodes, err := nodeLister.List()
    11. if err != nil {
    12. return "", err
    13. }
    14. if len(nodes) == 0 {
    15. return "", ErrNoNodesAvailable
    16. }
    17. // Used for all fit and priority funcs.
    18. err = g.cache.UpdateNodeNameToInfoMap(g.cachedNodeInfoMap)
    19. if err != nil {
    20. return "", err
    21. }
    22. trace.Step("Computing predicates")
    23. startPredicateEvalTime := time.Now()
    24. filteredNodes, failedPredicateMap, err := g.findNodesThatFit(pod, nodes)
    25. if err != nil {
    26. return "", err
    27. }
    28. if len(filteredNodes) == 0 {
    29. return "", &FitError{
    30. Pod: pod,
    31. NumAllNodes: len(nodes),
    32. FailedPredicates: failedPredicateMap,
    33. }
    34. }
    35. metrics.SchedulingAlgorithmPredicateEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPredicateEvalTime))
    36. metrics.SchedulingLatency.WithLabelValues(metrics.PredicateEvaluation).Observe(metrics.SinceInSeconds(startPredicateEvalTime))
    37. trace.Step("Prioritizing")
    38. startPriorityEvalTime := time.Now()
    39. // When only one node after predicate, just use it.
    40. if len(filteredNodes) == 1 {
    41. metrics.SchedulingAlgorithmPriorityEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPriorityEvalTime))
    42. return filteredNodes[0].Name, nil
    43. }
    44. metaPrioritiesInterface := g.priorityMetaProducer(pod, g.cachedNodeInfoMap)
    45. priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
    46. if err != nil {
    47. return "", err
    48. }
    49. metrics.SchedulingAlgorithmPriorityEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPriorityEvalTime))
    50. metrics.SchedulingLatency.WithLabelValues(metrics.PriorityEvaluation).Observe(metrics.SinceInSeconds(startPriorityEvalTime))
    51. trace.Step("Selecting host")
    52. return g.selectHost(priorityList)
    53. }

    5.1. podPassesBasicChecks

    podPassesBasicChecks主要做一下基本性检查,目前主要是对pvc的检查。

    1. if err := podPassesBasicChecks(pod, g.pvcLister); err != nil {
    2. return "", err
    3. }

    podPassesBasicChecks具体实现如下:

    1. // podPassesBasicChecks makes sanity checks on the pod if it can be scheduled.
    2. func podPassesBasicChecks(pod *v1.Pod, pvcLister corelisters.PersistentVolumeClaimLister) error {
    3. // Check PVCs used by the pod
    4. namespace := pod.Namespace
    5. manifest := &(pod.Spec)
    6. for i := range manifest.Volumes {
    7. volume := &manifest.Volumes[i]
    8. if volume.PersistentVolumeClaim == nil {
    9. // Volume is not a PVC, ignore
    10. continue
    11. }
    12. pvcName := volume.PersistentVolumeClaim.ClaimName
    13. pvc, err := pvcLister.PersistentVolumeClaims(namespace).Get(pvcName)
    14. if err != nil {
    15. // The error has already enough context ("persistentvolumeclaim "myclaim" not found")
    16. return err
    17. }
    18. if pvc.DeletionTimestamp != nil {
    19. return fmt.Errorf("persistentvolumeclaim %q is being deleted", pvc.Name)
    20. }
    21. }
    22. return nil
    23. }

    5.2. findNodesThatFit

    预选,通过预选函数来判断每个节点是否适合被该Pod调度。

    具体的findNodesThatFit代码实现细节待后续文章独立分析。

    genericScheduler.Schedule中对findNodesThatFit的调用过程如下:

    1. trace.Step("Computing predicates")
    2. startPredicateEvalTime := time.Now()
    3. filteredNodes, failedPredicateMap, err := g.findNodesThatFit(pod, nodes)
    4. if err != nil {
    5. return "", err
    6. }
    7. if len(filteredNodes) == 0 {
    8. return "", &FitError{
    9. Pod: pod,
    10. NumAllNodes: len(nodes),
    11. FailedPredicates: failedPredicateMap,
    12. }
    13. }
    14. metrics.SchedulingAlgorithmPredicateEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPredicateEvalTime))
    15. metrics.SchedulingLatency.WithLabelValues(metrics.PredicateEvaluation).Observe(metrics.SinceInSeconds(startPredicateEvalTime))

    5.3. PrioritizeNodes

    优选,从满足的节点中选择出最优的节点。

    具体操作如下:

    • PrioritizeNodes通过并行运行各个优先级函数来对节点进行优先级排序。
    • 每个优先级函数会给节点打分,打分范围为0-10分。
    • 0 表示优先级最低的节点,10表示优先级最高的节点。
    • 每个优先级函数也有各自的权重。
    • 优先级函数返回的节点分数乘以权重以获得加权分数。
    • 最后组合(添加)所有分数以获得所有节点的总加权分数。

    具体PrioritizeNodes的实现逻辑待后续文章独立分析。

    genericScheduler.Schedule中对PrioritizeNodes的调用过程如下:

    1. trace.Step("Prioritizing")
    2. startPriorityEvalTime := time.Now()
    3. // When only one node after predicate, just use it.
    4. if len(filteredNodes) == 1 {
    5. metrics.SchedulingAlgorithmPriorityEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPriorityEvalTime))
    6. return filteredNodes[0].Name, nil
    7. }
    8. metaPrioritiesInterface := g.priorityMetaProducer(pod, g.cachedNodeInfoMap)
    9. priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
    10. if err != nil {
    11. return "", err
    12. }
    13. metrics.SchedulingAlgorithmPriorityEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPriorityEvalTime))
    14. metrics.SchedulingLatency.WithLabelValues(metrics.PriorityEvaluation).Observe(metrics.SinceInSeconds(startPriorityEvalTime))

    5.4. selectHost

    scheduler在最后会从priorityList中选择分数最高的一个节点。

    1. trace.Step("Selecting host")
    2. return g.selectHost(priorityList)

    selectHost获取优先级的节点列表,然后从分数最高的节点以循环方式选择一个节点。

    具体代码如下:

    1. // selectHost takes a prioritized list of nodes and then picks one
    2. // in a round-robin manner from the nodes that had the highest score.
    3. func (g *genericScheduler) selectHost(priorityList schedulerapi.HostPriorityList) (string, error) {
    4. if len(priorityList) == 0 {
    5. return "", fmt.Errorf("empty priorityList")
    6. }
    7. maxScores := findMaxScores(priorityList)
    8. ix := int(g.lastNodeIndex % uint64(len(maxScores)))
    9. g.lastNodeIndex++
    10. return priorityList[maxScores[ix]].Host, nil
    11. }

    5.4.1. findMaxScores

    findMaxScores返回priorityList中具有最高Score的节点的索引。

    1. // findMaxScores returns the indexes of nodes in the "priorityList" that has the highest "Score".
    2. func findMaxScores(priorityList schedulerapi.HostPriorityList) []int {
    3. maxScoreIndexes := make([]int, 0, len(priorityList)/2)
    4. maxScore := priorityList[0].Score
    5. for i, hp := range priorityList {
    6. if hp.Score > maxScore {
    7. maxScore = hp.Score
    8. maxScoreIndexes = maxScoreIndexes[:0]
    9. maxScoreIndexes = append(maxScoreIndexes, i)
    10. } else if hp.Score == maxScore {
    11. maxScoreIndexes = append(maxScoreIndexes, i)
    12. }
    13. }
    14. return maxScoreIndexes
    15. }

    6. Scheduler.preempt

    如果pod在预选和优选调度中失败,则执行抢占操作。抢占主要是将低优先级的pod的资源空间腾出给待调度的高优先级的pod。

    具体Scheduler.preempt的实现逻辑待后续文章独立分析。

    1. suggestedHost, err := sched.schedule(pod)
    2. if err != nil {
    3. // schedule() may have failed because the pod would not fit on any host, so we try to
    4. // preempt, with the expectation that the next time the pod is tried for scheduling it
    5. // will fit due to the preemption. It is also possible that a different pod will schedule
    6. // into the resources that were preempted, but this is harmless.
    7. if fitError, ok := err.(*core.FitError); ok {
    8. preemptionStartTime := time.Now()
    9. sched.preempt(pod, fitError)
    10. metrics.PreemptionAttempts.Inc()
    11. metrics.SchedulingAlgorithmPremptionEvaluationDuration.Observe(metrics.SinceInMicroseconds(preemptionStartTime))
    12. metrics.SchedulingLatency.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime))
    13. }
    14. return
    15. }

    7. Scheduler.assume

    将该pod和选定的节点进行假性绑定,存入scheduler cache中,方便可以继续执行调度逻辑,而不需要等待绑定操作的发生,具体绑定操作可以异步进行。

    1. // Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet.
    2. // This allows us to keep scheduling without waiting on binding to occur.
    3. assumedPod := pod.DeepCopy()
    4. // Assume volumes first before assuming the pod.
    5. //
    6. // If all volumes are completely bound, then allBound is true and binding will be skipped.
    7. //
    8. // Otherwise, binding of volumes is started after the pod is assumed, but before pod binding.
    9. //
    10. // This function modifies 'assumedPod' if volume binding is required.
    11. allBound, err := sched.assumeVolumes(assumedPod, suggestedHost)
    12. if err != nil {
    13. return
    14. }
    15. // assume modifies `assumedPod` by setting NodeName=suggestedHost
    16. err = sched.assume(assumedPod, suggestedHost)
    17. if err != nil {
    18. return
    19. }

    如果假性绑定成功则发送请求给apiserver,如果失败则scheduler会立即释放已分配给假性绑定的pod的资源。

    assume方法的具体实现:

    1. // assume signals to the cache that a pod is already in the cache, so that binding can be asynchronous.
    2. // assume modifies `assumed`.
    3. func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
    4. // Optimistically assume that the binding will succeed and send it to apiserver
    5. // in the background.
    6. // If the binding fails, scheduler will release resources allocated to assumed pod
    7. // immediately.
    8. assumed.Spec.NodeName = host
    9. // NOTE: Because the scheduler uses snapshots of SchedulerCache and the live
    10. // version of Ecache, updates must be written to SchedulerCache before
    11. // invalidating Ecache.
    12. if err := sched.config.SchedulerCache.AssumePod(assumed); err != nil {
    13. glog.Errorf("scheduler cache AssumePod failed: %v", err)
    14. // This is most probably result of a BUG in retrying logic.
    15. // We report an error here so that pod scheduling can be retried.
    16. // This relies on the fact that Error will check if the pod has been bound
    17. // to a node and if so will not add it back to the unscheduled pods queue
    18. // (otherwise this would cause an infinite loop).
    19. sched.config.Error(assumed, err)
    20. sched.config.Recorder.Eventf(assumed, v1.EventTypeWarning, "FailedScheduling", "AssumePod failed: %v", err)
    21. sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{
    22. Type: v1.PodScheduled,
    23. Status: v1.ConditionFalse,
    24. Reason: "SchedulerError",
    25. Message: err.Error(),
    26. })
    27. return err
    28. }
    29. // Optimistically assume that the binding will succeed, so we need to invalidate affected
    30. // predicates in equivalence cache.
    31. // If the binding fails, these invalidated item will not break anything.
    32. if sched.config.Ecache != nil {
    33. sched.config.Ecache.InvalidateCachedPredicateItemForPodAdd(assumed, host)
    34. }
    35. return nil
    36. }

    8. Scheduler.bind

    异步的方式给pod绑定到具体的调度节点上。

    1. // bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
    2. go func() {
    3. // Bind volumes first before Pod
    4. if !allBound {
    5. err = sched.bindVolumes(assumedPod)
    6. if err != nil {
    7. return
    8. }
    9. }
    10. err := sched.bind(assumedPod, &v1.Binding{
    11. ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
    12. Target: v1.ObjectReference{
    13. Kind: "Node",
    14. Name: suggestedHost,
    15. },
    16. })
    17. metrics.E2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start))
    18. if err != nil {
    19. glog.Errorf("Internal error binding pod: (%v)", err)
    20. }
    21. }()

    bind具体实现如下:

    1. // bind binds a pod to a given node defined in a binding object. We expect this to run asynchronously, so we
    2. // handle binding metrics internally.
    3. func (sched *Scheduler) bind(assumed *v1.Pod, b *v1.Binding) error {
    4. bindingStart := time.Now()
    5. // If binding succeeded then PodScheduled condition will be updated in apiserver so that
    6. // it's atomic with setting host.
    7. err := sched.config.GetBinder(assumed).Bind(b)
    8. if err := sched.config.SchedulerCache.FinishBinding(assumed); err != nil {
    9. glog.Errorf("scheduler cache FinishBinding failed: %v", err)
    10. }
    11. if err != nil {
    12. glog.V(1).Infof("Failed to bind pod: %v/%v", assumed.Namespace, assumed.Name)
    13. if err := sched.config.SchedulerCache.ForgetPod(assumed); err != nil {
    14. glog.Errorf("scheduler cache ForgetPod failed: %v", err)
    15. }
    16. sched.config.Error(assumed, err)
    17. sched.config.Recorder.Eventf(assumed, v1.EventTypeWarning, "FailedScheduling", "Binding rejected: %v", err)
    18. sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{
    19. Type: v1.PodScheduled,
    20. Status: v1.ConditionFalse,
    21. Reason: "BindingRejected",
    22. })
    23. return err
    24. }
    25. metrics.BindingLatency.Observe(metrics.SinceInMicroseconds(bindingStart))
    26. metrics.SchedulingLatency.WithLabelValues(metrics.Binding).Observe(metrics.SinceInSeconds(bindingStart))
    27. sched.config.Recorder.Eventf(assumed, v1.EventTypeNormal, "Scheduled", "Successfully assigned %v/%v to %v", assumed.Namespace, assumed.Name, b.Target.Name)
    28. return nil
    29. }

    9. 总结

    本文主要分析了单个pod的调度过程。具体流程如下:

    1. 通过podQueue的待调度队列中弹出需要调度的pod。
    2. 通过具体的调度算法为该pod选出合适的节点,其中调度算法就包括预选和优选两步策略。
    3. 如果上述调度失败,则会尝试抢占机制,将优先级低的pod剔除,让优先级高的pod调度成功。
    4. 将该pod和选定的节点进行假性绑定,存入scheduler cache中,方便具体绑定操作可以异步进行。
    5. 实际执行绑定操作,将node的名字添加到pod的节点相关属性中。

    其中核心的部分为通过具体的调度算法选出调度节点的过程,即genericScheduler.Schedule的实现部分。该部分包括预选和优选两个部分。

    genericScheduler.Schedule调度的基本流程如下:

    1. 对pod做基本性检查,目前主要是对pvc的检查。
    2. 通过findNodesThatFit预选策略选出满足调度条件的node列表。
    3. 通过PrioritizeNodes优选策略给预选的node列表中的node进行打分。
    4. 在打分的node列表中选择一个分数最高的node作为调度的节点。

    参考:

    • https://github.com/kubernetes/kubernetes/blob/v1.12.0/pkg/scheduler/scheduler.go
    • https://github.com/kubernetes/kubernetes/blob/v1.12.0/pkg/scheduler/core/generic_scheduler.go