• kubelet源码分析(四)之 syncLoopIteration
  • 1. syncLoop
  • 2. syncLoopIteration
    • 2.1. configCh
    • 2.2. plegCh
    • 2.3. syncCh
    • 2.4. livenessManager.Update
    • 2.5. housekeepingCh
  • 3. SyncHandler
    • 3.1. HandlePodAdditions
    • 3.2. HandlePodUpdates
    • 3.3. HandlePodRemoves
    • 3.4. HandlePodReconcile
    • 3.5. HandlePodSyncs
    • 3.6. HandlePodCleanups
  • 4. dispatchWork
  • 5. PodWorkers.UpdatePod
  • 6. managePodLoop
  • 7. 总结

    kubelet源码分析(四)之 syncLoopIteration

    以下代码分析基于 kubernetes v1.12.0 版本。

    本文主要分析kubelet中syncLoopIteration部分。syncLoopIteration通过几种channel来对不同类型的事件进行监听并做增删改查的处理。

    1. syncLoop

    syncLoop是处理变更的循环。 它监听来自三种channel(file,apiserver和http)的更改。 对于看到的任何新更改,将针对所需状态和运行状态运行同步。 如果没有看到配置的变化,将在每个同步频率秒同步最后已知的所需状态。

    此部分代码位于pkg/kubelet/kubelet.go

    1. // syncLoop is the main loop for processing changes. It watches for changes from
    2. // three channels (file, apiserver, and http) and creates a union of them. For
    3. // any new change seen, will run a sync against desired state and running state. If
    4. // no changes are seen to the configuration, will synchronize the last known desired
    5. // state every sync-frequency seconds. Never returns.
    6. func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
    7. glog.Info("Starting kubelet main sync loop.")
    8. // The resyncTicker wakes up kubelet to checks if there are any pod workers
    9. // that need to be sync'd. A one-second period is sufficient because the
    10. // sync interval is defaulted to 10s.
    11. syncTicker := time.NewTicker(time.Second)
    12. defer syncTicker.Stop()
    13. housekeepingTicker := time.NewTicker(housekeepingPeriod)
    14. defer housekeepingTicker.Stop()
    15. plegCh := kl.pleg.Watch()
    16. const (
    17. base = 100 * time.Millisecond
    18. max = 5 * time.Second
    19. factor = 2
    20. )
    21. duration := base
    22. for {
    23. if rs := kl.runtimeState.runtimeErrors(); len(rs) != 0 {
    24. glog.Infof("skipping pod synchronization - %v", rs)
    25. // exponential backoff
    26. time.Sleep(duration)
    27. duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
    28. continue
    29. }
    30. // reset backoff if we have a success
    31. duration = base
    32. kl.syncLoopMonitor.Store(kl.clock.Now())
    33. if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
    34. break
    35. }
    36. kl.syncLoopMonitor.Store(kl.clock.Now())
    37. }
    38. }

    其中调用了syncLoopIteration的函数来执行更具体的监控pod变化的循环。

    2. syncLoopIteration

    syncLoopIteration主要通过几种channel来对不同类型的事件进行监听并处理。其中包括:configChplegChsyncChhouseKeepingChlivenessManager.Updates()

    syncLoopIteration实际执行了pod的操作,此部分设置了几种不同的channel:

    • configCh:将配置更改的pod分派给事件类型的相应处理程序回调。
    • plegCh:更新runtime缓存,同步pod。
    • syncCh:同步所有等待同步的pod。
    • houseKeepingCh:触发清理pod。
    • livenessManager.Updates():对失败的pod或者liveness检查失败的pod进行sync操作。

    syncLoopIteration部分代码位于pkg/kubelet/kubelet.go

    2.1. configCh

    configCh将配置更改的pod分派给事件类型的相应处理程序回调,该部分主要通过SyncHandler对pod的不同事件进行增删改查等操作。

    1. func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
    2. syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
    3. select {
    4. case u, open := <-configCh:
    5. // Update from a config source; dispatch it to the right handler
    6. // callback.
    7. if !open {
    8. glog.Errorf("Update channel is closed. Exiting the sync loop.")
    9. return false
    10. }
    11. switch u.Op {
    12. case kubetypes.ADD:
    13. glog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))
    14. // After restarting, kubelet will get all existing pods through
    15. // ADD as if they are new pods. These pods will then go through the
    16. // admission process and *may* be rejected. This can be resolved
    17. // once we have checkpointing.
    18. handler.HandlePodAdditions(u.Pods)
    19. case kubetypes.UPDATE:
    20. glog.V(2).Infof("SyncLoop (UPDATE, %q): %q", u.Source, format.PodsWithDeletionTimestamps(u.Pods))
    21. handler.HandlePodUpdates(u.Pods)
    22. case kubetypes.REMOVE:
    23. glog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods))
    24. handler.HandlePodRemoves(u.Pods)
    25. case kubetypes.RECONCILE:
    26. glog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods))
    27. handler.HandlePodReconcile(u.Pods)
    28. case kubetypes.DELETE:
    29. glog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods))
    30. // DELETE is treated as a UPDATE because of graceful deletion.
    31. handler.HandlePodUpdates(u.Pods)
    32. case kubetypes.RESTORE:
    33. glog.V(2).Infof("SyncLoop (RESTORE, %q): %q", u.Source, format.Pods(u.Pods))
    34. // These are pods restored from the checkpoint. Treat them as new
    35. // pods.
    36. handler.HandlePodAdditions(u.Pods)
    37. case kubetypes.SET:
    38. // TODO: Do we want to support this?
    39. glog.Errorf("Kubelet does not support snapshot update")
    40. }
    41. ...
    42. }

    可以看出syncLoopIteration根据podUpdate的值来执行不同的pod操作,具体如下:

    • ADD:HandlePodAdditions
    • UPDATE:HandlePodUpdates
    • REMOVE:HandlePodRemoves
    • RECONCILE:HandlePodReconcile
    • DELETE:HandlePodUpdates
    • RESTORE:HandlePodAdditions
    • podsToSync:HandlePodSyncs

    其中执行pod的handler操作的是SyncHandler,该类型是一个接口,实现体为kubelet本身,具体见后续分析。

    2.2. plegCh

    plegCh:更新runtime缓存,同步pod。此处调用了HandlePodSyncs的函数。

    1. case e := <-plegCh:
    2. if isSyncPodWorthy(e) {
    3. // PLEG event for a pod; sync it.
    4. if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
    5. glog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", format.Pod(pod), e)
    6. handler.HandlePodSyncs([]*v1.Pod{pod})
    7. } else {
    8. // If the pod no longer exists, ignore the event.
    9. glog.V(4).Infof("SyncLoop (PLEG): ignore irrelevant event: %#v", e)
    10. }
    11. }
    12. if e.Type == pleg.ContainerDied {
    13. if containerID, ok := e.Data.(string); ok {
    14. kl.cleanUpContainersInPod(e.ID, containerID)
    15. }
    16. }

    2.3. syncCh

    syncCh:同步所有等待同步的pod。此处调用了HandlePodSyncs的函数。

    1. case <-syncCh:
    2. // Sync pods waiting for sync
    3. podsToSync := kl.getPodsToSync()
    4. if len(podsToSync) == 0 {
    5. break
    6. }
    7. glog.V(4).Infof("SyncLoop (SYNC): %d pods; %s", len(podsToSync), format.Pods(podsToSync))
    8. handler.HandlePodSyncs(podsToSync)

    2.4. livenessManager.Update

    livenessManager.Updates():对失败的pod或者liveness检查失败的pod进行sync操作。此处调用了HandlePodSyncs的函数。

    1. case update := <-kl.livenessManager.Updates():
    2. if update.Result == proberesults.Failure {
    3. // The liveness manager detected a failure; sync the pod.
    4. // We should not use the pod from livenessManager, because it is never updated after
    5. // initialization.
    6. pod, ok := kl.podManager.GetPodByUID(update.PodUID)
    7. if !ok {
    8. // If the pod no longer exists, ignore the update.
    9. glog.V(4).Infof("SyncLoop (container unhealthy): ignore irrelevant update: %#v", update)
    10. break
    11. }
    12. glog.V(1).Infof("SyncLoop (container unhealthy): %q", format.Pod(pod))
    13. handler.HandlePodSyncs([]*v1.Pod{pod})
    14. }

    2.5. housekeepingCh

    houseKeepingCh:触发清理pod。此处调用了HandlePodCleanups的函数。

    1. case <-housekeepingCh:
    2. if !kl.sourcesReady.AllReady() {
    3. // If the sources aren't ready or volume manager has not yet synced the states,
    4. // skip housekeeping, as we may accidentally delete pods from unready sources.
    5. glog.V(4).Infof("SyncLoop (housekeeping, skipped): sources aren't ready yet.")
    6. } else {
    7. glog.V(4).Infof("SyncLoop (housekeeping)")
    8. if err := handler.HandlePodCleanups(); err != nil {
    9. glog.Errorf("Failed cleaning pods: %v", err)
    10. }
    11. }

    3. SyncHandler

    SyncHandler是一个定义Pod的不同Handler的接口,具体是实现者是kubelet,该接口的方法主要在syncLoopIteration中调用,接口定义如下:

    1. // SyncHandler is an interface implemented by Kubelet, for testability
    2. type SyncHandler interface {
    3. HandlePodAdditions(pods []*v1.Pod)
    4. HandlePodUpdates(pods []*v1.Pod)
    5. HandlePodRemoves(pods []*v1.Pod)
    6. HandlePodReconcile(pods []*v1.Pod)
    7. HandlePodSyncs(pods []*v1.Pod)
    8. HandlePodCleanups() error
    9. }

    SyncHandler部分代码位于pkg/kubelet/kubelet.go

    3.1. HandlePodAdditions

    HandlePodAdditions先根据pod创建时间对pod进行排序,然后遍历pod列表,来执行pod的相关操作。

    1. // HandlePodAdditions is the callback in SyncHandler for pods being added from
    2. // a config source.
    3. func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
    4. start := kl.clock.Now()
    5. sort.Sort(sliceutils.PodsByCreationTime(pods))
    6. for _, pod := range pods {
    7. ...
    8. }
    9. }

    将pod添加到pod manager中。

    1. for _, pod := range pods {
    2. // Responsible for checking limits in resolv.conf
    3. if kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {
    4. kl.dnsConfigurer.CheckLimitsForResolvConf()
    5. }
    6. existingPods := kl.podManager.GetPods()
    7. // Always add the pod to the pod manager. Kubelet relies on the pod
    8. // manager as the source of truth for the desired state. If a pod does
    9. // not exist in the pod manager, it means that it has been deleted in
    10. // the apiserver and no action (other than cleanup) is required.
    11. kl.podManager.AddPod(pod)
    12. ...
    13. }

    如果是mirror pod,则对mirror pod进行处理。

    1. if kubepod.IsMirrorPod(pod) {
    2. kl.handleMirrorPod(pod, start)
    3. continue
    4. }

    如果当前pod的状态不是Terminated状态,则判断是否接受该pod,如果不接受则将pod状态改为Failed

    1. if !kl.podIsTerminated(pod) {
    2. // Only go through the admission process if the pod is not
    3. // terminated.
    4. // We failed pods that we rejected, so activePods include all admitted
    5. // pods that are alive.
    6. activePods := kl.filterOutTerminatedPods(existingPods)
    7. // Check if we can admit the pod; if not, reject it.
    8. if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
    9. kl.rejectPod(pod, reason, message)
    10. continue
    11. }
    12. }

    执行dispatchWork函数,该函数是syncHandler中调用到的核心函数,该函数在pod worker中启动一个异步循环,来分派pod的相关操作。该函数的具体操作待后续分析。

    1. mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
    2. kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)

    最后加pod添加到probe manager中。

    1. kl.probeManager.AddPod(pod)

    3.2. HandlePodUpdates

    HandlePodUpdates同样遍历pod列表,执行相应的操作。

    1. // HandlePodUpdates is the callback in the SyncHandler interface for pods
    2. // being updated from a config source.
    3. func (kl *Kubelet) HandlePodUpdates(pods []*v1.Pod) {
    4. start := kl.clock.Now()
    5. for _, pod := range pods {
    6. ...
    7. }
    8. }

    将pod更新到pod manager中。

    1. for _, pod := range pods {
    2. // Responsible for checking limits in resolv.conf
    3. if kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" {
    4. kl.dnsConfigurer.CheckLimitsForResolvConf()
    5. }
    6. kl.podManager.UpdatePod(pod)
    7. ...
    8. }

    如果是mirror pod,则对mirror pod进行处理。

    1. if kubepod.IsMirrorPod(pod) {
    2. kl.handleMirrorPod(pod, start)
    3. continue
    4. }

    执行dispatchWork函数。

    1. // TODO: Evaluate if we need to validate and reject updates.
    2. mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
    3. kl.dispatchWork(pod, kubetypes.SyncPodUpdate, mirrorPod, start)

    3.3. HandlePodRemoves

    HandlePodRemoves遍历pod列表。

    1. // HandlePodRemoves is the callback in the SyncHandler interface for pods
    2. // being removed from a config source.
    3. func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) {
    4. start := kl.clock.Now()
    5. for _, pod := range pods {
    6. ...
    7. }
    8. }

    从pod manager中删除pod。

    1. for _, pod := range pods {
    2. kl.podManager.DeletePod(pod)
    3. ...
    4. }

    如果是mirror pod,则对mirror pod进行处理。

    1. if kubepod.IsMirrorPod(pod) {
    2. kl.handleMirrorPod(pod, start)
    3. continue
    4. }

    调用kubelet的deletePod函数来删除pod。

    1. // Deletion is allowed to fail because the periodic cleanup routine
    2. // will trigger deletion again.
    3. if err := kl.deletePod(pod); err != nil {
    4. glog.V(2).Infof("Failed to delete pod %q, err: %v", format.Pod(pod), err)
    5. }

    deletePod 函数将需要删除的pod加入podKillingCh的channel中,有podKiller监听这个channel去执行删除任务,实现如下:

    1. // deletePod deletes the pod from the internal state of the kubelet by:
    2. // 1. stopping the associated pod worker asynchronously
    3. // 2. signaling to kill the pod by sending on the podKillingCh channel
    4. //
    5. // deletePod returns an error if not all sources are ready or the pod is not
    6. // found in the runtime cache.
    7. func (kl *Kubelet) deletePod(pod *v1.Pod) error {
    8. if pod == nil {
    9. return fmt.Errorf("deletePod does not allow nil pod")
    10. }
    11. if !kl.sourcesReady.AllReady() {
    12. // If the sources aren't ready, skip deletion, as we may accidentally delete pods
    13. // for sources that haven't reported yet.
    14. return fmt.Errorf("skipping delete because sources aren't ready yet")
    15. }
    16. kl.podWorkers.ForgetWorker(pod.UID)
    17. // Runtime cache may not have been updated to with the pod, but it's okay
    18. // because the periodic cleanup routine will attempt to delete again later.
    19. runningPods, err := kl.runtimeCache.GetPods()
    20. if err != nil {
    21. return fmt.Errorf("error listing containers: %v", err)
    22. }
    23. runningPod := kubecontainer.Pods(runningPods).FindPod("", pod.UID)
    24. if runningPod.IsEmpty() {
    25. return fmt.Errorf("pod not found")
    26. }
    27. podPair := kubecontainer.PodPair{APIPod: pod, RunningPod: &runningPod}
    28. kl.podKillingCh <- &podPair
    29. // TODO: delete the mirror pod here?
    30. // We leave the volume/directory cleanup to the periodic cleanup routine.
    31. return nil
    32. }

    从probe manager中移除pod。

    1. kl.probeManager.RemovePod(pod)

    3.4. HandlePodReconcile

    遍历pod列表。

    1. // HandlePodReconcile is the callback in the SyncHandler interface for pods
    2. // that should be reconciled.
    3. func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) {
    4. start := kl.clock.Now()
    5. for _, pod := range pods {
    6. ...
    7. }
    8. }

    将pod更新到pod manager中。

    1. for _, pod := range pods {
    2. // Update the pod in pod manager, status manager will do periodically reconcile according
    3. // to the pod manager.
    4. kl.podManager.UpdatePod(pod)
    5. ...
    6. }

    必要时调整pod的Ready状态,执行dispatchWork函数。

    1. // Reconcile Pod "Ready" condition if necessary. Trigger sync pod for reconciliation.
    2. if status.NeedToReconcilePodReadiness(pod) {
    3. mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
    4. kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)
    5. }

    如果pod被设定为需要被驱逐的,则删除pod中的容器。

    1. // After an evicted pod is synced, all dead containers in the pod can be removed.
    2. if eviction.PodIsEvicted(pod.Status) {
    3. if podStatus, err := kl.podCache.Get(pod.UID); err == nil {
    4. kl.containerDeletor.deleteContainersInPod("", podStatus, true)
    5. }
    6. }

    3.5. HandlePodSyncs

    HandlePodSyncssyncHandler接口回调函数,调用dispatchWork,通过pod worker来执行任务。

    1. // HandlePodSyncs is the callback in the syncHandler interface for pods
    2. // that should be dispatched to pod workers for sync.
    3. func (kl *Kubelet) HandlePodSyncs(pods []*v1.Pod) {
    4. start := kl.clock.Now()
    5. for _, pod := range pods {
    6. mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
    7. kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)
    8. }
    9. }

    3.6. HandlePodCleanups

    HandlePodCleanups主要用来执行pod的清理任务,其中包括terminating的pod,orphaned的pod等。

    首先查看pod使用到的cgroup。

    1. // HandlePodCleanups performs a series of cleanup work, including terminating
    2. // pod workers, killing unwanted pods, and removing orphaned volumes/pod
    3. // directories.
    4. // NOTE: This function is executed by the main sync loop, so it
    5. // should not contain any blocking calls.
    6. func (kl *Kubelet) HandlePodCleanups() error {
    7. // The kubelet lacks checkpointing, so we need to introspect the set of pods
    8. // in the cgroup tree prior to inspecting the set of pods in our pod manager.
    9. // this ensures our view of the cgroup tree does not mistakenly observe pods
    10. // that are added after the fact...
    11. var (
    12. cgroupPods map[types.UID]cm.CgroupName
    13. err error
    14. )
    15. if kl.cgroupsPerQOS {
    16. pcm := kl.containerManager.NewPodContainerManager()
    17. cgroupPods, err = pcm.GetAllPodsFromCgroups()
    18. if err != nil {
    19. return fmt.Errorf("failed to get list of pods that still exist on cgroup mounts: %v", err)
    20. }
    21. }
    22. ...
    23. }

    列出所有pod包括mirror pod。

    1. allPods, mirrorPods := kl.podManager.GetPodsAndMirrorPods()
    2. // Pod phase progresses monotonically. Once a pod has reached a final state,
    3. // it should never leave regardless of the restart policy. The statuses
    4. // of such pods should not be changed, and there is no need to sync them.
    5. // TODO: the logic here does not handle two cases:
    6. // 1. If the containers were removed immediately after they died, kubelet
    7. // may fail to generate correct statuses, let alone filtering correctly.
    8. // 2. If kubelet restarted before writing the terminated status for a pod
    9. // to the apiserver, it could still restart the terminated pod (even
    10. // though the pod was not considered terminated by the apiserver).
    11. // These two conditions could be alleviated by checkpointing kubelet.
    12. activePods := kl.filterOutTerminatedPods(allPods)
    13. desiredPods := make(map[types.UID]empty)
    14. for _, pod := range activePods {
    15. desiredPods[pod.UID] = empty{}
    16. }

    pod worker停止不再存在的pod的任务,并从probe manager中清除pod。

    1. // Stop the workers for no-longer existing pods.
    2. // TODO: is here the best place to forget pod workers?
    3. kl.podWorkers.ForgetNonExistingPodWorkers(desiredPods)
    4. kl.probeManager.CleanupPods(activePods)

    将需要杀死的pod加入到podKillingCh的channel中,podKiller的任务会监听该channel并获取需要杀死的pod列表来执行杀死pod的操作。

    1. runningPods, err := kl.runtimeCache.GetPods()
    2. if err != nil {
    3. glog.Errorf("Error listing containers: %#v", err)
    4. return err
    5. }
    6. for _, pod := range runningPods {
    7. if _, found := desiredPods[pod.ID]; !found {
    8. kl.podKillingCh <- &kubecontainer.PodPair{APIPod: nil, RunningPod: pod}
    9. }
    10. }

    当pod不再被绑定到该节点,移除podStatus,其中removeOrphanedPodStatuses最后调用的函数是statusManagerRemoveOrphanedStatuses方法。

    1. kl.removeOrphanedPodStatuses(allPods, mirrorPods)

    移除所有的orphaned volume。

    1. // Remove any orphaned volumes.
    2. // Note that we pass all pods (including terminated pods) to the function,
    3. // so that we don't remove volumes associated with terminated but not yet
    4. // deleted pods.
    5. err = kl.cleanupOrphanedPodDirs(allPods, runningPods)
    6. if err != nil {
    7. // We want all cleanup tasks to be run even if one of them failed. So
    8. // we just log an error here and continue other cleanup tasks.
    9. // This also applies to the other clean up tasks.
    10. glog.Errorf("Failed cleaning up orphaned pod directories: %v", err)
    11. }

    移除mirror pod。

    1. // Remove any orphaned mirror pods.
    2. kl.podManager.DeleteOrphanedMirrorPods()

    删除不再运行的pod的cgroup。

    1. // Remove any cgroups in the hierarchy for pods that are no longer running.
    2. if kl.cgroupsPerQOS {
    3. kl.cleanupOrphanedPodCgroups(cgroupPods, activePods)
    4. }

    执行垃圾回收(GC)操作。

    1. kl.backOff.GC()

    4. dispatchWork

    dispatchWork通过pod worker启动一个异步的循环。

    完整代码如下:

    1. // dispatchWork starts the asynchronous sync of the pod in a pod worker.
    2. // If the pod is terminated, dispatchWork
    3. func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
    4. if kl.podIsTerminated(pod) {
    5. if pod.DeletionTimestamp != nil {
    6. // If the pod is in a terminated state, there is no pod worker to
    7. // handle the work item. Check if the DeletionTimestamp has been
    8. // set, and force a status update to trigger a pod deletion request
    9. // to the apiserver.
    10. kl.statusManager.TerminatePod(pod)
    11. }
    12. return
    13. }
    14. // Run the sync in an async worker.
    15. kl.podWorkers.UpdatePod(&UpdatePodOptions{
    16. Pod: pod,
    17. MirrorPod: mirrorPod,
    18. UpdateType: syncType,
    19. OnCompleteFunc: func(err error) {
    20. if err != nil {
    21. metrics.PodWorkerLatency.WithLabelValues(syncType.String()).Observe(metrics.SinceInMicroseconds(start))
    22. }
    23. },
    24. })
    25. // Note the number of containers for new pods.
    26. if syncType == kubetypes.SyncPodCreate {
    27. metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
    28. }
    29. }

    以下分段进行分析:

    如果pod的状态是处于Terminated状态,则执行statusManagerTerminatePod操作。

    1. // dispatchWork starts the asynchronous sync of the pod in a pod worker.
    2. // If the pod is terminated, dispatchWork
    3. func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
    4. if kl.podIsTerminated(pod) {
    5. if pod.DeletionTimestamp != nil {
    6. // If the pod is in a terminated state, there is no pod worker to
    7. // handle the work item. Check if the DeletionTimestamp has been
    8. // set, and force a status update to trigger a pod deletion request
    9. // to the apiserver.
    10. kl.statusManager.TerminatePod(pod)
    11. }
    12. return
    13. }
    14. ...
    15. }

    执行pod worker的UpdatePod函数,该函数是pod worker的核心函数,来执行pod相关操作。具体逻辑待下文分析。

    1. // Run the sync in an async worker.
    2. kl.podWorkers.UpdatePod(&UpdatePodOptions{
    3. Pod: pod,
    4. MirrorPod: mirrorPod,
    5. UpdateType: syncType,
    6. OnCompleteFunc: func(err error) {
    7. if err != nil {
    8. metrics.PodWorkerLatency.WithLabelValues(syncType.String()).Observe(metrics.SinceInMicroseconds(start))
    9. }
    10. },
    11. })

    当创建类型是SyncPodCreate(即创建pod的时候),统计新pod中容器的数目。

    1. // Note the number of containers for new pods.
    2. if syncType == kubetypes.SyncPodCreate {
    3. metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
    4. }

    5. PodWorkers.UpdatePod

    PodWorkers是一个接口类型:

    1. // PodWorkers is an abstract interface for testability.
    2. type PodWorkers interface {
    3. UpdatePod(options *UpdatePodOptions)
    4. ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty)
    5. ForgetWorker(uid types.UID)
    6. }

    其中UpdatePod是一个核心方法,通过podUpdates的channel来传递需要处理的pod信息,对于新创建的pod每个pod都会由一个goroutine来执行managePodLoop

    此部分代码位于pkg/kubelet/pod_workers.go

    1. // Apply the new setting to the specified pod.
    2. // If the options provide an OnCompleteFunc, the function is invoked if the update is accepted.
    3. // Update requests are ignored if a kill pod request is pending.
    4. func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {
    5. pod := options.Pod
    6. uid := pod.UID
    7. var podUpdates chan UpdatePodOptions
    8. var exists bool
    9. p.podLock.Lock()
    10. defer p.podLock.Unlock()
    11. if podUpdates, exists = p.podUpdates[uid]; !exists {
    12. // We need to have a buffer here, because checkForUpdates() method that
    13. // puts an update into channel is called from the same goroutine where
    14. // the channel is consumed. However, it is guaranteed that in such case
    15. // the channel is empty, so buffer of size 1 is enough.
    16. podUpdates = make(chan UpdatePodOptions, 1)
    17. p.podUpdates[uid] = podUpdates
    18. // Creating a new pod worker either means this is a new pod, or that the
    19. // kubelet just restarted. In either case the kubelet is willing to believe
    20. // the status of the pod for the first pod worker sync. See corresponding
    21. // comment in syncPod.
    22. go func() {
    23. defer runtime.HandleCrash()
    24. p.managePodLoop(podUpdates)
    25. }()
    26. }
    27. if !p.isWorking[pod.UID] {
    28. p.isWorking[pod.UID] = true
    29. podUpdates <- *options
    30. } else {
    31. // if a request to kill a pod is pending, we do not let anything overwrite that request.
    32. update, found := p.lastUndeliveredWorkUpdate[pod.UID]
    33. if !found || update.UpdateType != kubetypes.SyncPodKill {
    34. p.lastUndeliveredWorkUpdate[pod.UID] = *options
    35. }
    36. }
    37. }

    6. managePodLoop

    managePodLoop通过读取podUpdateschannel的信息,执行syncPodFn函数,而syncPodFn函数在newPodWorkers的时候赋值了,即kubelet.syncPodkubelet.syncPod具体代码逻辑待后续文章单独分析。

    1. // newPodWorkers传入syncPod函数
    2. klet.podWorkers = newPodWorkers(klet.syncPod, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)

    newPodWorkers函数参考:

    1. func newPodWorkers(syncPodFn syncPodFnType, recorder record.EventRecorder, workQueue queue.WorkQueue,
    2. resyncInterval, backOffPeriod time.Duration, podCache kubecontainer.Cache) *podWorkers {
    3. return &podWorkers{
    4. podUpdates: map[types.UID]chan UpdatePodOptions{},
    5. isWorking: map[types.UID]bool{},
    6. lastUndeliveredWorkUpdate: map[types.UID]UpdatePodOptions{},
    7. syncPodFn: syncPodFn, // 构造传入klet.syncPod函数
    8. recorder: recorder,
    9. workQueue: workQueue,
    10. resyncInterval: resyncInterval,
    11. backOffPeriod: backOffPeriod,
    12. podCache: podCache,
    13. }
    14. }

    managePodLoop函数参考:

    此部分代码位于pkg/kubelet/pod_workers.go

    1. func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
    2. var lastSyncTime time.Time
    3. for update := range podUpdates {
    4. err := func() error {
    5. podUID := update.Pod.UID
    6. // This is a blocking call that would return only if the cache
    7. // has an entry for the pod that is newer than minRuntimeCache
    8. // Time. This ensures the worker doesn't start syncing until
    9. // after the cache is at least newer than the finished time of
    10. // the previous sync.
    11. status, err := p.podCache.GetNewerThan(podUID, lastSyncTime)
    12. if err != nil {
    13. // This is the legacy event thrown by manage pod loop
    14. // all other events are now dispatched from syncPodFn
    15. p.recorder.Eventf(update.Pod, v1.EventTypeWarning, events.FailedSync, "error determining status: %v", err)
    16. return err
    17. }
    18. err = p.syncPodFn(syncPodOptions{
    19. mirrorPod: update.MirrorPod,
    20. pod: update.Pod,
    21. podStatus: status,
    22. killPodOptions: update.KillPodOptions,
    23. updateType: update.UpdateType,
    24. })
    25. lastSyncTime = time.Now()
    26. return err
    27. }()
    28. // notify the call-back function if the operation succeeded or not
    29. if update.OnCompleteFunc != nil {
    30. update.OnCompleteFunc(err)
    31. }
    32. if err != nil {
    33. // IMPORTANT: we do not log errors here, the syncPodFn is responsible for logging errors
    34. glog.Errorf("Error syncing pod %s (%q), skipping: %v", update.Pod.UID, format.Pod(update.Pod), err)
    35. }
    36. p.wrapUp(update.Pod.UID, err)
    37. }
    38. }

    7. 总结

    syncLoopIteration基本流程如下:

    1. 通过几种channel来对不同类型的事件进行监听并处理。其中channel包括:configChplegChsyncChhouseKeepingChlivenessManager.Updates()
    2. 不同的SyncHandler执行不同的增删改查操作。
    3. 其中HandlePodAdditionsHandlePodUpdatesHandlePodReconcileHandlePodSyncs都调用到了dispatchWork来执行pod的相关操作。HandlePodCleanups的pod清理任务,通过channel的方式加需要清理的pod给podKiller来清理。
    4. dispatchWork调用podWorkers.UpdatePod执行异步操作。
    5. podWorkers.UpdatePod中调用managePodLoop来执行pod相关操作循环。

    channel类型及作用:

    • configCh:将配置更改的pod分派给事件类型的相应处理程序回调。
    • plegCh:更新runtime缓存,同步pod。
    • syncCh:同步所有等待同步的pod。
    • houseKeepingCh:触发清理pod。
    • livenessManager.Updates():对失败的pod或者liveness检查失败的pod进行sync操作。

    参考:

    • https://github.com/kubernetes/kubernetes/blob/v1.12.0/pkg/kubelet/kubelet.go
    • https://github.com/kubernetes/kubernetes/blob/v1.12.0/pkg/kubelet/pod_workers.go