• kube-scheduler源码分析(一)之 NewSchedulerCommand
  • 1. Main 函数
  • 2. NewSchedulerCommand
    • 2.1. NewOptions
    • 2.2. Options.Config
    • 2.3. AddFlags
  • 3. Run
    • 3.1. NewSchedulerConfig
    • 3.2. InformerFactory.Start
    • 3.3. WaitForCacheSync
      • 3.3.1. InformerFactory.WaitForCacheSync
      • 3.3.2. controller.WaitForCacheSync
    • 3.4. LeaderElection
    • 3.5. Scheduler.Run

    kube-scheduler源码分析(一)之 NewSchedulerCommand

    以下代码分析基于 kubernetes v1.12.0 版本。

    scheduler的cmd代码目录结构如下:

    1. kube-scheduler
    2. ├── BUILD
    3. ├── OWNERS
    4. ├── app # app的目录下主要为运行scheduler相关的对象
    5. ├── BUILD
    6. ├── config
    7. ├── BUILD
    8. └── config.go # Scheduler的配置对象config
    9. ├── options # options主要记录 Scheduler 使用到的参数
    10. ├── BUILD
    11. ├── configfile.go
    12. ├── deprecated.go
    13. ├── deprecated_test.go
    14. ├── insecure_serving.go
    15. ├── insecure_serving_test.go
    16. ├── options.go # 主要包括Options、NewOptions、AddFlags、Config等函数
    17. └── options_test.go
    18. └── server.go # 主要包括 NewSchedulerCommand、NewSchedulerConfig、Run等函数
    19. └── scheduler.go # main入口函数

    1. Main函数

    此部分的代码为/cmd/kube-scheduler/scheduler.go

    kube-scheduler的入口函数Main函数,仍然是采用统一的代码风格,使用Cobra命令行框架。

    1. func main() {
    2. rand.Seed(time.Now().UTC().UnixNano())
    3. command := app.NewSchedulerCommand()
    4. // TODO: once we switch everything over to Cobra commands, we can go back to calling
    5. // utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the
    6. // normalize func and add the go flag set by hand.
    7. pflag.CommandLine.SetNormalizeFunc(utilflag.WordSepNormalizeFunc)
    8. pflag.CommandLine.AddGoFlagSet(goflag.CommandLine)
    9. // utilflag.InitFlags()
    10. logs.InitLogs()
    11. defer logs.FlushLogs()
    12. if err := command.Execute(); err != nil {
    13. fmt.Fprintf(os.Stderr, "%v\n", err)
    14. os.Exit(1)
    15. }
    16. }

    核心代码:

    1. // 初始化scheduler命令结构体
    2. command := app.NewSchedulerCommand()
    3. // 执行Execute
    4. err := command.Execute()

    2. NewSchedulerCommand

    此部分的代码为/cmd/kube-scheduler/app/server.go

    NewSchedulerCommand主要用来构造和初始化SchedulerCommand结构体,

    1. // NewSchedulerCommand creates a *cobra.Command object with default parameters
    2. func NewSchedulerCommand() *cobra.Command {
    3. opts, err := options.NewOptions()
    4. if err != nil {
    5. glog.Fatalf("unable to initialize command options: %v", err)
    6. }
    7. cmd := &cobra.Command{
    8. Use: "kube-scheduler",
    9. Long: `The Kubernetes scheduler is a policy-rich, topology-aware,
    10. workload-specific function that significantly impacts availability, performance,
    11. and capacity. The scheduler needs to take into account individual and collective
    12. resource requirements, quality of service requirements, hardware/software/policy
    13. constraints, affinity and anti-affinity specifications, data locality, inter-workload
    14. interference, deadlines, and so on. Workload-specific requirements will be exposed
    15. through the API as necessary.`,
    16. Run: func(cmd *cobra.Command, args []string) {
    17. verflag.PrintAndExitIfRequested()
    18. utilflag.PrintFlags(cmd.Flags())
    19. if len(args) != 0 {
    20. fmt.Fprint(os.Stderr, "arguments are not supported\n")
    21. }
    22. if errs := opts.Validate(); len(errs) > 0 {
    23. fmt.Fprintf(os.Stderr, "%v\n", utilerrors.NewAggregate(errs))
    24. os.Exit(1)
    25. }
    26. if len(opts.WriteConfigTo) > 0 {
    27. if err := options.WriteConfigFile(opts.WriteConfigTo, &opts.ComponentConfig); err != nil {
    28. fmt.Fprintf(os.Stderr, "%v\n", err)
    29. os.Exit(1)
    30. }
    31. glog.Infof("Wrote configuration to: %s\n", opts.WriteConfigTo)
    32. return
    33. }
    34. c, err := opts.Config()
    35. if err != nil {
    36. fmt.Fprintf(os.Stderr, "%v\n", err)
    37. os.Exit(1)
    38. }
    39. stopCh := make(chan struct{})
    40. if err := Run(c.Complete(), stopCh); err != nil {
    41. fmt.Fprintf(os.Stderr, "%v\n", err)
    42. os.Exit(1)
    43. }
    44. },
    45. }
    46. opts.AddFlags(cmd.Flags())
    47. cmd.MarkFlagFilename("config", "yaml", "yml", "json")
    48. return cmd
    49. }

    核心代码:

    1. // 构造option
    2. opts, err := options.NewOptions()
    3. // 初始化config对象
    4. c, err := opts.Config()
    5. // 执行run函数
    6. err := Run(c.Complete(), stopCh)
    7. // 添加参数
    8. opts.AddFlags(cmd.Flags())

    2.1. NewOptions

    NewOptions主要用来构造SchedulerServer使用的参数和上下文,其中核心参数是KubeSchedulerConfiguration

    1. opts, err := options.NewOptions()

    NewOptions:

    1. // NewOptions returns default scheduler app options.
    2. func NewOptions() (*Options, error) {
    3. cfg, err := newDefaultComponentConfig()
    4. if err != nil {
    5. return nil, err
    6. }
    7. hhost, hport, err := splitHostIntPort(cfg.HealthzBindAddress)
    8. if err != nil {
    9. return nil, err
    10. }
    11. o := &Options{
    12. ComponentConfig: *cfg,
    13. SecureServing: nil, // TODO: enable with apiserveroptions.NewSecureServingOptions()
    14. CombinedInsecureServing: &CombinedInsecureServingOptions{
    15. Healthz: &apiserveroptions.DeprecatedInsecureServingOptions{
    16. BindNetwork: "tcp",
    17. },
    18. Metrics: &apiserveroptions.DeprecatedInsecureServingOptions{
    19. BindNetwork: "tcp",
    20. },
    21. BindPort: hport,
    22. BindAddress: hhost,
    23. },
    24. Authentication: nil, // TODO: enable with apiserveroptions.NewDelegatingAuthenticationOptions()
    25. Authorization: nil, // TODO: enable with apiserveroptions.NewDelegatingAuthorizationOptions()
    26. Deprecated: &DeprecatedOptions{
    27. UseLegacyPolicyConfig: false,
    28. PolicyConfigMapNamespace: metav1.NamespaceSystem,
    29. },
    30. }
    31. return o, nil
    32. }

    2.2. Options.Config

    Config初始化调度器的配置对象。

    1. c, err := opts.Config()

    Config函数主要执行以下操作:

    • 构建scheduler client、leaderElectionClient、eventClient。
    • 创建event recorder
    • 设置leader选举
    • 创建informer对象,主要函数有NewSharedInformerFactoryNewPodInformer

    Config具体代码如下:

    1. // Config return a scheduler config object
    2. func (o *Options) Config() (*schedulerappconfig.Config, error) {
    3. c := &schedulerappconfig.Config{}
    4. if err := o.ApplyTo(c); err != nil {
    5. return nil, err
    6. }
    7. // prepare kube clients.
    8. client, leaderElectionClient, eventClient, err := createClients(c.ComponentConfig.ClientConnection, o.Master, c.ComponentConfig.LeaderElection.RenewDeadline.Duration)
    9. if err != nil {
    10. return nil, err
    11. }
    12. // Prepare event clients.
    13. eventBroadcaster := record.NewBroadcaster()
    14. recorder := eventBroadcaster.NewRecorder(legacyscheme.Scheme, corev1.EventSource{Component: c.ComponentConfig.SchedulerName})
    15. // Set up leader election if enabled.
    16. var leaderElectionConfig *leaderelection.LeaderElectionConfig
    17. if c.ComponentConfig.LeaderElection.LeaderElect {
    18. leaderElectionConfig, err = makeLeaderElectionConfig(c.ComponentConfig.LeaderElection, leaderElectionClient, recorder)
    19. if err != nil {
    20. return nil, err
    21. }
    22. }
    23. c.Client = client
    24. c.InformerFactory = informers.NewSharedInformerFactory(client, 0)
    25. c.PodInformer = factory.NewPodInformer(client, 0)
    26. c.EventClient = eventClient
    27. c.Recorder = recorder
    28. c.Broadcaster = eventBroadcaster
    29. c.LeaderElection = leaderElectionConfig
    30. return c, nil
    31. }

    2.3. AddFlags

    AddFlags为SchedulerServer添加指定的参数。

    1. opts.AddFlags(cmd.Flags())

    AddFlags函数的具体代码如下:

    1. // AddFlags adds flags for the scheduler options.
    2. func (o *Options) AddFlags(fs *pflag.FlagSet) {
    3. fs.StringVar(&o.ConfigFile, "config", o.ConfigFile, "The path to the configuration file. Flags override values in this file.")
    4. fs.StringVar(&o.WriteConfigTo, "write-config-to", o.WriteConfigTo, "If set, write the configuration values to this file and exit.")
    5. fs.StringVar(&o.Master, "master", o.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)")
    6. o.SecureServing.AddFlags(fs)
    7. o.CombinedInsecureServing.AddFlags(fs)
    8. o.Authentication.AddFlags(fs)
    9. o.Authorization.AddFlags(fs)
    10. o.Deprecated.AddFlags(fs, &o.ComponentConfig)
    11. leaderelectionconfig.BindFlags(&o.ComponentConfig.LeaderElection.LeaderElectionConfiguration, fs)
    12. utilfeature.DefaultFeatureGate.AddFlag(fs)
    13. }

    3. Run

    此部分的代码为/cmd/kube-scheduler/app/server.go

    1. err := Run(c.Complete(), stopCh)

    Run运行一个不退出的常驻进程,来执行scheduler的相关操作。

    Run函数的主要内容如下:

    • 通过scheduler config来创建scheduler的结构体。
    • 运行event broadcaster、healthz server、metrics server。
    • 运行所有的informer并在调度前等待cache的同步(重点)。
    • 执行sched.Run()来运行scheduler的调度逻辑。
    • 如果多个scheduler并开启了LeaderElect,则执行leader选举。

    以下对重点代码分开分析:

    3.1. NewSchedulerConfig

    NewSchedulerConfig初始化SchedulerConfig(此部分具体逻辑待后续专门分析),最后初始化生成scheduler结构体。

    1. // Build a scheduler config from the provided algorithm source.
    2. schedulerConfig, err := NewSchedulerConfig(c)
    3. if err != nil {
    4. return err
    5. }
    6. // Create the scheduler.
    7. sched := scheduler.NewFromConfig(schedulerConfig)

    3.2. InformerFactory.Start

    运行PodInformer,并运行InformerFactory。此部分的逻辑为client-go的informer机制,在Informer机制中有详细分析。

    1. // Start all informers.
    2. go c.PodInformer.Informer().Run(stopCh)
    3. c.InformerFactory.Start(stopCh)

    3.3. WaitForCacheSync

    在调度前等待cache同步。

    1. // Wait for all caches to sync before scheduling.
    2. c.InformerFactory.WaitForCacheSync(stopCh)
    3. controller.WaitForCacheSync("scheduler", stopCh, c.PodInformer.Informer().HasSynced)

    3.3.1. InformerFactory.WaitForCacheSync

    InformerFactory.WaitForCacheSync等待所有启动的informer的cache进行同步,保持本地的store信息与etcd的信息是最新一致的。

    1. // WaitForCacheSync waits for all started informers' cache were synced.
    2. func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool {
    3. informers := func() map[reflect.Type]cache.SharedIndexInformer {
    4. f.lock.Lock()
    5. defer f.lock.Unlock()
    6. informers := map[reflect.Type]cache.SharedIndexInformer{}
    7. for informerType, informer := range f.informers {
    8. if f.startedInformers[informerType] {
    9. informers[informerType] = informer
    10. }
    11. }
    12. return informers
    13. }()
    14. res := map[reflect.Type]bool{}
    15. for informType, informer := range informers {
    16. res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced)
    17. }
    18. return res
    19. }

    接着调用cache.WaitForCacheSync

    1. // WaitForCacheSync waits for caches to populate. It returns true if it was successful, false
    2. // if the controller should shutdown
    3. func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
    4. err := wait.PollUntil(syncedPollPeriod,
    5. func() (bool, error) {
    6. for _, syncFunc := range cacheSyncs {
    7. if !syncFunc() {
    8. return false, nil
    9. }
    10. }
    11. return true, nil
    12. },
    13. stopCh)
    14. if err != nil {
    15. glog.V(2).Infof("stop requested")
    16. return false
    17. }
    18. glog.V(4).Infof("caches populated")
    19. return true
    20. }

    3.3.2. controller.WaitForCacheSync

    controller.WaitForCacheSync是对cache.WaitForCacheSync的一层封装,通过不同的controller的名字来记录不同controller等待cache同步。

    1. controller.WaitForCacheSync("scheduler", stop, s.PodInformer.Informer().HasSynced)

    controller.WaitForCacheSync具体代码如下:

    1. // WaitForCacheSync is a wrapper around cache.WaitForCacheSync that generates log messages
    2. // indicating that the controller identified by controllerName is waiting for syncs, followed by
    3. // either a successful or failed sync.
    4. func WaitForCacheSync(controllerName string, stopCh <-chan struct{}, cacheSyncs ...cache.InformerSynced) bool {
    5. glog.Infof("Waiting for caches to sync for %s controller", controllerName)
    6. if !cache.WaitForCacheSync(stopCh, cacheSyncs...) {
    7. utilruntime.HandleError(fmt.Errorf("Unable to sync caches for %s controller", controllerName))
    8. return false
    9. }
    10. glog.Infof("Caches are synced for %s controller", controllerName)
    11. return true
    12. }

    3.4. LeaderElection

    如果有多个scheduler,并开启leader选举,则运行LeaderElector直到选举结束或退出。

    1. // If leader election is enabled, run via LeaderElector until done and exit.
    2. if c.LeaderElection != nil {
    3. c.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
    4. OnStartedLeading: run,
    5. OnStoppedLeading: func() {
    6. utilruntime.HandleError(fmt.Errorf("lost master"))
    7. },
    8. }
    9. leaderElector, err := leaderelection.NewLeaderElector(*c.LeaderElection)
    10. if err != nil {
    11. return fmt.Errorf("couldn't create leader elector: %v", err)
    12. }
    13. leaderElector.Run(ctx)
    14. return fmt.Errorf("lost lease")
    15. }

    3.5. Scheduler.Run

    1. // Prepare a reusable run function.
    2. run := func(ctx context.Context) {
    3. sched.Run()
    4. <-ctx.Done()
    5. }
    6. ctx, cancel := context.WithCancel(context.TODO()) // TODO once Run() accepts a context, it should be used here
    7. defer cancel()
    8. go func() {
    9. select {
    10. case <-stopCh:
    11. cancel()
    12. case <-ctx.Done():
    13. }
    14. }()
    15. ...
    16. run(ctx)

    Scheduler.Run先等待cache同步,然后开启调度逻辑的goroutine。

    Scheduler.Run的具体代码如下:

    1. // Run begins watching and scheduling. It waits for cache to be synced, then starts a goroutine and returns immediately.
    2. func (sched *Scheduler) Run() {
    3. if !sched.config.WaitForCacheSync() {
    4. return
    5. }
    6. go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
    7. }

    以上是对/cmd/kube-scheduler/scheduler.go部分代码的分析,Scheduler.Run后续的具体代码位于pkg/scheduler/scheduler.go待后续文章分析。

    参考:

    • https://github.com/kubernetes/kubernetes/tree/v1.12.0/cmd/kube-scheduler
    • https://github.com/kubernetes/kubernetes/blob/v1.12.0/cmd/kube-scheduler/scheduler.go
    • https://github.com/kubernetes/kubernetes/blob/v1.12.0/cmd/kube-scheduler/app/server.go