• kube-controller-manager源码分析(三)之 Informer机制
  • 0. 原理示意图
    • 0.1. client-go组件
    • 0.2. 自定义controller组件
  • 1. sharedInformerFactory.Start
  • 2. sharedIndexInformer.Run
    • 2.1. NewDeltaFIFO
    • 2.2. Config
    • 2.3. controller
    • 2.4. cacheMutationDetector.Run
    • 2.5. processor.run
    • 2.6. controller.Run
  • 3. Reflector
    • 3.1. Reflector
    • 3.2. NewReflector
    • 3.3. Reflector.Run
    • 3.4. ListAndWatch
      • 3.4.1. List
      • 3.4.2. store.Resync
      • 3.4.3. Watch
      • 3.4.4. watchHandler
  • 4. DeltaFIFO
    • 4.1. NewDeltaFIFO
    • 4.2. DeltaFIFO
    • 4.3. Queue & Store
  • 5. store
    • 6.1. cache
    • 6.2. ThreadSafeStore
  • 6. processLoop
    • 5.1. DeltaFIFO.Pop
    • 5.2. HandleDeltas
    • 5.3. sharedProcessor.distribute
  • 7. processor
    • 7.1. sharedProcessor.Run
      • 7.1.1. listener.pop
      • 7.1.2. listener.run
    • 7.2. ResourceEventHandler
      • 7.2.1. addDeployment
  • 8. 总结
    • 8.1. Reflector
    • 8.2. ListAndWatch
    • 8.3. DeltaFIFO
    • 8.4. store
    • 8.5. processor
    • 8.6. 主要步骤

    kube-controller-manager源码分析(三)之 Informer机制

    以下代码分析基于 kubernetes v1.12.0 版本。

    本文主要分析k8s中各个核心组件经常使用到的Informer机制(即List-Watch)。该部分的代码主要位于client-go这个第三方包中。

    此部分的逻辑主要位于/vendor/k8s.io/client-go/tools/cache包中,代码目录结构如下:

    1. cache
    2. ├── controller.go # 包含:Config、Run、processLoop、NewInformer、NewIndexerInformer
    3. ├── delta_fifo.go # 包含:NewDeltaFIFO、DeltaFIFO、AddIfNotPresent
    4. ├── expiration_cache.go
    5. ├── expiration_cache_fakes.go
    6. ├── fake_custom_store.go
    7. ├── fifo.go # 包含:Queue、FIFO、NewFIFO
    8. ├── heap.go
    9. ├── index.go # 包含:Indexer、MetaNamespaceIndexFunc
    10. ├── listers.go
    11. ├── listwatch.go # 包含:ListerWatcher、ListWatch、List、Watch
    12. ├── mutation_cache.go
    13. ├── mutation_detector.go
    14. ├── reflector.go # 包含:Reflector、NewReflector、Run、ListAndWatch
    15. ├── reflector_metrics.go
    16. ├── shared_informer.go # 包含:NewSharedInformer、WaitForCacheSync、Run、HasSynced
    17. ├── store.go # 包含:Store、MetaNamespaceKeyFunc、SplitMetaNamespaceKey
    18. ├── testing
    19. ├── fake_controller_source.go
    20. ├── thread_safe_store.go # 包含:ThreadSafeStore、threadSafeMap
    21. ├── undelta_store.go

    0. 原理示意图

    示意图1:

    Informer机制 - 图1

    示意图2:

    Informer机制 - 图2

    0.1. client-go组件

    • Reflector:reflector用来watch特定的k8s API资源。具体的实现是通过ListAndWatch的方法,watch可以是k8s内建的资源或者是自定义的资源。当reflector通过watch API接收到有关新资源实例存在的通知时,它使用相应的列表API获取新创建的对象,并将其放入watchHandler函数内的Delta Fifo队列中。

    • Informer:informer从Delta Fifo队列中弹出对象。执行此操作的功能是processLoop。base controller的作用是保存对象以供以后检索,并调用我们的控制器将对象传递给它。

    • Indexer:索引器提供对象的索引功能。典型的索引用例是基于对象标签创建索引。 Indexer可以根据多个索引函数维护索引。Indexer使用线程安全的数据存储来存储对象及其键。 在Store中定义了一个名为MetaNamespaceKeyFunc的默认函数,该函数生成对象的键作为该对象的<namespace> / <name>组合。

    0.2. 自定义controller组件

    • Informer reference:指的是Informer实例的引用,定义如何使用自定义资源对象。 自定义控制器代码需要创建对应的Informer。

    • Indexer reference: 自定义控制器对Indexer实例的引用。自定义控制器需要创建对应的Indexser。

    client-go中提供NewIndexerInformer函数可以创建Informer 和 Indexer。

    • Resource Event Handlers:资源事件回调函数,当它想要将对象传递给控制器时,它将被调用。 编写这些函数的典型模式是获取调度对象的key,并将该key排入工作队列以进行进一步处理。

    • Work queue:任务队列。 编写资源事件处理程序函数以提取传递的对象的key并将其添加到任务队列。

    • Process Item:处理任务队列中对象的函数, 这些函数通常使用Indexer引用或Listing包装器来重试与该key对应的对象。

    1. sharedInformerFactory.Start

    在controller-manager的Run函数部分调用了InformerFactory.Start的方法。

    此部分代码位于/cmd/kube-controller-manager/app/controllermanager.go

    1. // Run runs the KubeControllerManagerOptions. This should never exit.
    2. func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error {
    3. ...
    4. controllerContext.InformerFactory.Start(controllerContext.Stop)
    5. close(controllerContext.InformersStarted)
    6. ...
    7. }

    InformerFactory是一个SharedInformerFactory的接口,接口定义如下:

    此部分代码位于vendor/k8s.io/client-go/informers/internalinterfaces/factory_interfaces.go

    1. // SharedInformerFactory a small interface to allow for adding an informer without an import cycle
    2. type SharedInformerFactory interface {
    3. Start(stopCh <-chan struct{})
    4. InformerFor(obj runtime.Object, newFunc NewInformerFunc) cache.SharedIndexInformer
    5. }

    Start方法初始化各种类型的informer,并且每个类型起了个informer.Run的goroutine。

    此部分代码位于vendor/k8s.io/client-go/informers/factory.go

    1. // Start initializes all requested informers.
    2. func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
    3. f.lock.Lock()
    4. defer f.lock.Unlock()
    5. for informerType, informer := range f.informers {
    6. if !f.startedInformers[informerType] {
    7. go informer.Run(stopCh)
    8. f.startedInformers[informerType] = true
    9. }
    10. }
    11. }

    2. sharedIndexInformer.Run

    此部分的代码位于/vendor/k8s.io/client-go/tools/cache/shared_informer.go

    1. func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
    2. defer utilruntime.HandleCrash()
    3. fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, nil, s.indexer)
    4. cfg := &Config{
    5. Queue: fifo,
    6. ListerWatcher: s.listerWatcher,
    7. ObjectType: s.objectType,
    8. FullResyncPeriod: s.resyncCheckPeriod,
    9. RetryOnError: false,
    10. ShouldResync: s.processor.shouldResync,
    11. Process: s.HandleDeltas,
    12. }
    13. func() {
    14. s.startedLock.Lock()
    15. defer s.startedLock.Unlock()
    16. s.controller = New(cfg)
    17. s.controller.(*controller).clock = s.clock
    18. s.started = true
    19. }()
    20. // Separate stop channel because Processor should be stopped strictly after controller
    21. processorStopCh := make(chan struct{})
    22. var wg wait.Group
    23. defer wg.Wait() // Wait for Processor to stop
    24. defer close(processorStopCh) // Tell Processor to stop
    25. wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
    26. wg.StartWithChannel(processorStopCh, s.processor.run)
    27. defer func() {
    28. s.startedLock.Lock()
    29. defer s.startedLock.Unlock()
    30. s.stopped = true // Don't want any new listeners
    31. }()
    32. s.controller.Run(stopCh)
    33. }

    2.1. NewDeltaFIFO

    DeltaFIFO是一个对象变化的存储队列,依据先进先出的原则,process的函数接收该队列的Pop方法的输出对象来处理相关功能。

    1. fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, nil, s.indexer)

    2.2. Config

    构造controller的配置文件,构造process,即HandleDeltas,该函数为后面使用到的process函数。

    1. cfg := &Config{
    2. Queue: fifo,
    3. ListerWatcher: s.listerWatcher,
    4. ObjectType: s.objectType,
    5. FullResyncPeriod: s.resyncCheckPeriod,
    6. RetryOnError: false,
    7. ShouldResync: s.processor.shouldResync,
    8. Process: s.HandleDeltas,
    9. }

    2.3. controller

    调用New(cfg),构建sharedIndexInformer的controller。

    1. func() {
    2. s.startedLock.Lock()
    3. defer s.startedLock.Unlock()
    4. s.controller = New(cfg)
    5. s.controller.(*controller).clock = s.clock
    6. s.started = true
    7. }()

    2.4. cacheMutationDetector.Run

    调用s.cacheMutationDetector.Run,检查缓存对象是否变化。

    1. wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)

    defaultCacheMutationDetector.Run

    1. func (d *defaultCacheMutationDetector) Run(stopCh <-chan struct{}) {
    2. // we DON'T want protection from panics. If we're running this code, we want to die
    3. for {
    4. d.CompareObjects()
    5. select {
    6. case <-stopCh:
    7. return
    8. case <-time.After(d.period):
    9. }
    10. }
    11. }

    CompareObjects

    1. func (d *defaultCacheMutationDetector) CompareObjects() {
    2. d.lock.Lock()
    3. defer d.lock.Unlock()
    4. altered := false
    5. for i, obj := range d.cachedObjs {
    6. if !reflect.DeepEqual(obj.cached, obj.copied) {
    7. fmt.Printf("CACHE %s[%d] ALTERED!\n%v\n", d.name, i, diff.ObjectDiff(obj.cached, obj.copied))
    8. altered = true
    9. }
    10. }
    11. if altered {
    12. msg := fmt.Sprintf("cache %s modified", d.name)
    13. if d.failureFunc != nil {
    14. d.failureFunc(msg)
    15. return
    16. }
    17. panic(msg)
    18. }
    19. }

    2.5. processor.run

    调用s.processor.run,将调用sharedProcessor.run,会调用Listener.run和Listener.pop,执行处理queue的函数。

    1. wg.StartWithChannel(processorStopCh, s.processor.run)

    sharedProcessor.Run

    1. func (p *sharedProcessor) run(stopCh <-chan struct{}) {
    2. func() {
    3. p.listenersLock.RLock()
    4. defer p.listenersLock.RUnlock()
    5. for _, listener := range p.listeners {
    6. p.wg.Start(listener.run)
    7. p.wg.Start(listener.pop)
    8. }
    9. }()
    10. <-stopCh
    11. p.listenersLock.RLock()
    12. defer p.listenersLock.RUnlock()
    13. for _, listener := range p.listeners {
    14. close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
    15. }
    16. p.wg.Wait() // Wait for all .pop() and .run() to stop
    17. }

    该部分逻辑待后面分析。

    2.6. controller.Run

    调用s.controller.Run,构建Reflector,进行对etcd的缓存

    1. defer func() {
    2. s.startedLock.Lock()
    3. defer s.startedLock.Unlock()
    4. s.stopped = true // Don't want any new listeners
    5. }()
    6. s.controller.Run(stopCh)

    controller.Run

    此部分代码位于/vendor/k8s.io/client-go/tools/cache/controller.go

    1. // Run begins processing items, and will continue until a value is sent down stopCh.
    2. // It's an error to call Run more than once.
    3. // Run blocks; call via go.
    4. func (c *controller) Run(stopCh <-chan struct{}) {
    5. defer utilruntime.HandleCrash()
    6. go func() {
    7. <-stopCh
    8. c.config.Queue.Close()
    9. }()
    10. r := NewReflector(
    11. c.config.ListerWatcher,
    12. c.config.ObjectType,
    13. c.config.Queue,
    14. c.config.FullResyncPeriod,
    15. )
    16. r.ShouldResync = c.config.ShouldResync
    17. r.clock = c.clock
    18. c.reflectorMutex.Lock()
    19. c.reflector = r
    20. c.reflectorMutex.Unlock()
    21. var wg wait.Group
    22. defer wg.Wait()
    23. wg.StartWithChannel(stopCh, r.Run)
    24. wait.Until(c.processLoop, time.Second, stopCh)
    25. }

    核心代码:

    1. // 构建Reflector
    2. r := NewReflector(
    3. c.config.ListerWatcher,
    4. c.config.ObjectType,
    5. c.config.Queue,
    6. c.config.FullResyncPeriod,
    7. )
    8. // 运行Reflector
    9. wg.StartWithChannel(stopCh, r.Run)
    10. // 执行processLoop
    11. wait.Until(c.processLoop, time.Second, stopCh)

    3. Reflector

    3.1. Reflector

    Reflector的主要作用是watch指定的k8s资源,并将变化同步到本地是store中。Reflector只会放置指定的expectedType类型的资源到store中,除非expectedType为nil。如果resyncPeriod不为零,那么Reflector为以resyncPeriod为周期定期执行list的操作,这样就可以使用Reflector来定期处理所有的对象,也可以逐步处理变化的对象。

    常用属性说明:

    • expectedType:期望放入缓存store的资源类型。
    • store:watch的资源对应的本地缓存。
    • listerWatcher:list和watch的接口。
    • period:watch的周期,默认为1秒。
    • resyncPeriod:resync的周期,当非零的时候,会按该周期执行list。
    • lastSyncResourceVersion:最新一次看到的资源的版本号,主要在watch时候使用。
    1. // Reflector watches a specified resource and causes all changes to be reflected in the given store.
    2. type Reflector struct {
    3. // name identifies this reflector. By default it will be a file:line if possible.
    4. name string
    5. // metrics tracks basic metric information about the reflector
    6. metrics *reflectorMetrics
    7. // The type of object we expect to place in the store.
    8. expectedType reflect.Type
    9. // The destination to sync up with the watch source
    10. store Store
    11. // listerWatcher is used to perform lists and watches.
    12. listerWatcher ListerWatcher
    13. // period controls timing between one watch ending and
    14. // the beginning of the next one.
    15. period time.Duration
    16. resyncPeriod time.Duration
    17. ShouldResync func() bool
    18. // clock allows tests to manipulate time
    19. clock clock.Clock
    20. // lastSyncResourceVersion is the resource version token last
    21. // observed when doing a sync with the underlying store
    22. // it is thread safe, but not synchronized with the underlying store
    23. lastSyncResourceVersion string
    24. // lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
    25. lastSyncResourceVersionMutex sync.RWMutex
    26. }

    3.2. NewReflector

    NewReflector主要用来构建Reflector的结构体。

    此部分的代码位于/vendor/k8s.io/client-go/tools/cache/reflector.go

    1. // NewReflector creates a new Reflector object which will keep the given store up to
    2. // date with the server's contents for the given resource. Reflector promises to
    3. // only put things in the store that have the type of expectedType, unless expectedType
    4. // is nil. If resyncPeriod is non-zero, then lists will be executed after every
    5. // resyncPeriod, so that you can use reflectors to periodically process everything as
    6. // well as incrementally processing the things that change.
    7. func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
    8. return NewNamedReflector(getDefaultReflectorName(internalPackages...), lw, expectedType, store, resyncPeriod)
    9. }
    10. // reflectorDisambiguator is used to disambiguate started reflectors.
    11. // initialized to an unstable value to ensure meaning isn't attributed to the suffix.
    12. var reflectorDisambiguator = int64(time.Now().UnixNano() % 12345)
    13. // NewNamedReflector same as NewReflector, but with a specified name for logging
    14. func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
    15. reflectorSuffix := atomic.AddInt64(&reflectorDisambiguator, 1)
    16. r := &Reflector{
    17. name: name,
    18. // we need this to be unique per process (some names are still the same)but obvious who it belongs to
    19. metrics: newReflectorMetrics(makeValidPromethusMetricLabel(fmt.Sprintf("reflector_"+name+"_%d", reflectorSuffix))),
    20. listerWatcher: lw,
    21. store: store,
    22. expectedType: reflect.TypeOf(expectedType),
    23. period: time.Second,
    24. resyncPeriod: resyncPeriod,
    25. clock: &clock.RealClock{},
    26. }
    27. return r
    28. }

    3.3. Reflector.Run

    Reflector.Run主要执行了ListAndWatch的方法。

    1. // Run starts a watch and handles watch events. Will restart the watch if it is closed.
    2. // Run will exit when stopCh is closed.
    3. func (r *Reflector) Run(stopCh <-chan struct{}) {
    4. glog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
    5. wait.Until(func() {
    6. if err := r.ListAndWatch(stopCh); err != nil {
    7. utilruntime.HandleError(err)
    8. }
    9. }, r.period, stopCh)
    10. }

    3.4. ListAndWatch

    ListAndWatch第一次会列出所有的对象,并获取资源对象的版本号,然后watch资源对象的版本号来查看是否有被变更。首先会将资源版本号设置为0,list()可能会导致本地的缓存相对于etcd里面的内容存在延迟,Reflector会通过watch的方法将延迟的部分补充上,使得本地的缓存数据与etcd的数据保持一致。

    3.4.1. List

    1. // ListAndWatch first lists all items and get the resource version at the moment of call,
    2. // and then use the resource version to watch.
    3. // It returns error if ListAndWatch didn't even try to initialize watch.
    4. func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
    5. glog.V(3).Infof("Listing and watching %v from %s", r.expectedType, r.name)
    6. var resourceVersion string
    7. // Explicitly set "0" as resource version - it's fine for the List()
    8. // to be served from cache and potentially be delayed relative to
    9. // etcd contents. Reflector framework will catch up via Watch() eventually.
    10. options := metav1.ListOptions{ResourceVersion: "0"}
    11. r.metrics.numberOfLists.Inc()
    12. start := r.clock.Now()
    13. list, err := r.listerWatcher.List(options)
    14. if err != nil {
    15. return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err)
    16. }
    17. r.metrics.listDuration.Observe(time.Since(start).Seconds())
    18. listMetaInterface, err := meta.ListAccessor(list)
    19. if err != nil {
    20. return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
    21. }
    22. resourceVersion = listMetaInterface.GetResourceVersion()
    23. items, err := meta.ExtractList(list)
    24. if err != nil {
    25. return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
    26. }
    27. r.metrics.numberOfItemsInList.Observe(float64(len(items)))
    28. if err := r.syncWith(items, resourceVersion); err != nil {
    29. return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
    30. }
    31. r.setLastSyncResourceVersion(resourceVersion)
    32. ...
    33. }

    首先将资源的版本号设置为0,然后调用listerWatcher.List(options),列出所有list的内容。

    1. // 版本号设置为0
    2. options := metav1.ListOptions{ResourceVersion: "0"}
    3. // list接口
    4. list, err := r.listerWatcher.List(options)

    获取资源版本号,并将list的内容提取成对象列表。

    1. // 获取版本号
    2. resourceVersion = listMetaInterface.GetResourceVersion()
    3. // 将list的内容提取成对象列表
    4. items, err := meta.ExtractList(list)

    将list中对象列表的内容和版本号存储到本地的缓存store中,并全量替换已有的store的内容。

    1. err := r.syncWith(items, resourceVersion)

    syncWith调用了store的Replace的方法来替换原来store中的数据。

    1. // syncWith replaces the store's items with the given list.
    2. func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
    3. found := make([]interface{}, 0, len(items))
    4. for _, item := range items {
    5. found = append(found, item)
    6. }
    7. return r.store.Replace(found, resourceVersion)
    8. }

    Store.Replace方法定义如下:

    1. type Store interface {
    2. ...
    3. // Replace will delete the contents of the store, using instead the
    4. // given list. Store takes ownership of the list, you should not reference
    5. // it after calling this function.
    6. Replace([]interface{}, string) error
    7. ...
    8. }

    最后设置最新的资源版本号。

    1. r.setLastSyncResourceVersion(resourceVersion)

    setLastSyncResourceVersion:

    1. func (r *Reflector) setLastSyncResourceVersion(v string) {
    2. r.lastSyncResourceVersionMutex.Lock()
    3. defer r.lastSyncResourceVersionMutex.Unlock()
    4. r.lastSyncResourceVersion = v
    5. rv, err := strconv.Atoi(v)
    6. if err == nil {
    7. r.metrics.lastResourceVersion.Set(float64(rv))
    8. }
    9. }

    3.4.2. store.Resync

    1. resyncerrc := make(chan error, 1)
    2. cancelCh := make(chan struct{})
    3. defer close(cancelCh)
    4. go func() {
    5. resyncCh, cleanup := r.resyncChan()
    6. defer func() {
    7. cleanup() // Call the last one written into cleanup
    8. }()
    9. for {
    10. select {
    11. case <-resyncCh:
    12. case <-stopCh:
    13. return
    14. case <-cancelCh:
    15. return
    16. }
    17. if r.ShouldResync == nil || r.ShouldResync() {
    18. glog.V(4).Infof("%s: forcing resync", r.name)
    19. if err := r.store.Resync(); err != nil {
    20. resyncerrc <- err
    21. return
    22. }
    23. }
    24. cleanup()
    25. resyncCh, cleanup = r.resyncChan()
    26. }
    27. }()

    核心代码:

    1. err := r.store.Resync()

    store的具体对象为DeltaFIFO,即调用DeltaFIFO.Resync

    1. // Resync will send a sync event for each item
    2. func (f *DeltaFIFO) Resync() error {
    3. f.lock.Lock()
    4. defer f.lock.Unlock()
    5. if f.knownObjects == nil {
    6. return nil
    7. }
    8. keys := f.knownObjects.ListKeys()
    9. for _, k := range keys {
    10. if err := f.syncKeyLocked(k); err != nil {
    11. return err
    12. }
    13. }
    14. return nil
    15. }

    3.4.3. Watch

    1. for {
    2. // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
    3. select {
    4. case <-stopCh:
    5. return nil
    6. default:
    7. }
    8. timemoutseconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
    9. options = metav1.ListOptions{
    10. ResourceVersion: resourceVersion,
    11. // We want to avoid situations of hanging watchers. Stop any wachers that do not
    12. // receive any events within the timeout window.
    13. TimeoutSeconds: &timemoutseconds,
    14. }
    15. r.metrics.numberOfWatches.Inc()
    16. w, err := r.listerWatcher.Watch(options)
    17. if err != nil {
    18. switch err {
    19. case io.EOF:
    20. // watch closed normally
    21. case io.ErrUnexpectedEOF:
    22. glog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedType, err)
    23. default:
    24. utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedType, err))
    25. }
    26. // If this is "connection refused" error, it means that most likely apiserver is not responsive.
    27. // It doesn't make sense to re-list all objects because most likely we will be able to restart
    28. // watch where we ended.
    29. // If that's the case wait and resend watch request.
    30. if urlError, ok := err.(*url.Error); ok {
    31. if opError, ok := urlError.Err.(*net.OpError); ok {
    32. if errno, ok := opError.Err.(syscall.Errno); ok && errno == syscall.ECONNREFUSED {
    33. time.Sleep(time.Second)
    34. continue
    35. }
    36. }
    37. }
    38. return nil
    39. }
    40. if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
    41. if err != errorStopRequested {
    42. glog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
    43. }
    44. return nil
    45. }
    46. }

    设置watch的超时时间,默认为5分钟。

    1. timemoutseconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
    2. options = metav1.ListOptions{
    3. ResourceVersion: resourceVersion,
    4. // We want to avoid situations of hanging watchers. Stop any wachers that do not
    5. // receive any events within the timeout window.
    6. TimeoutSeconds: &timemoutseconds,
    7. }

    执行listerWatcher.Watch(options)。

    1. w, err := r.listerWatcher.Watch(options)

    执行watchHandler。

    1. err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh)

    3.4.4. watchHandler

    watchHandler主要是通过watch的方式保证当前的资源版本是最新的。

    1. // watchHandler watches w and keeps *resourceVersion up to date.
    2. func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
    3. start := r.clock.Now()
    4. eventCount := 0
    5. // Stopping the watcher should be idempotent and if we return from this function there's no way
    6. // we're coming back in with the same watch interface.
    7. defer w.Stop()
    8. // update metrics
    9. defer func() {
    10. r.metrics.numberOfItemsInWatch.Observe(float64(eventCount))
    11. r.metrics.watchDuration.Observe(time.Since(start).Seconds())
    12. }()
    13. loop:
    14. for {
    15. select {
    16. case <-stopCh:
    17. return errorStopRequested
    18. case err := <-errc:
    19. return err
    20. case event, ok := <-w.ResultChan():
    21. if !ok {
    22. break loop
    23. }
    24. if event.Type == watch.Error {
    25. return apierrs.FromObject(event.Object)
    26. }
    27. if e, a := r.expectedType, reflect.TypeOf(event.Object); e != nil && e != a {
    28. utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
    29. continue
    30. }
    31. meta, err := meta.Accessor(event.Object)
    32. if err != nil {
    33. utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
    34. continue
    35. }
    36. newResourceVersion := meta.GetResourceVersion()
    37. switch event.Type {
    38. case watch.Added:
    39. err := r.store.Add(event.Object)
    40. if err != nil {
    41. utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
    42. }
    43. case watch.Modified:
    44. err := r.store.Update(event.Object)
    45. if err != nil {
    46. utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
    47. }
    48. case watch.Deleted:
    49. // TODO: Will any consumers need access to the "last known
    50. // state", which is passed in event.Object? If so, may need
    51. // to change this.
    52. err := r.store.Delete(event.Object)
    53. if err != nil {
    54. utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
    55. }
    56. default:
    57. utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
    58. }
    59. *resourceVersion = newResourceVersion
    60. r.setLastSyncResourceVersion(newResourceVersion)
    61. eventCount++
    62. }
    63. }
    64. watchDuration := r.clock.Now().Sub(start)
    65. if watchDuration < 1*time.Second && eventCount == 0 {
    66. r.metrics.numberOfShortWatches.Inc()
    67. return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
    68. }
    69. glog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedType, eventCount)
    70. return nil
    71. }

    获取watch接口中的事件的channel,来获取事件的内容。

    1. for {
    2. select {
    3. ...
    4. case event, ok := <-w.ResultChan():
    5. ...
    6. }

    当获得添加、更新、删除的事件时,将对应的对象更新到本地缓存store中。

    1. switch event.Type {
    2. case watch.Added:
    3. err := r.store.Add(event.Object)
    4. if err != nil {
    5. utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
    6. }
    7. case watch.Modified:
    8. err := r.store.Update(event.Object)
    9. if err != nil {
    10. utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
    11. }
    12. case watch.Deleted:
    13. // TODO: Will any consumers need access to the "last known
    14. // state", which is passed in event.Object? If so, may need
    15. // to change this.
    16. err := r.store.Delete(event.Object)
    17. if err != nil {
    18. utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
    19. }
    20. default:
    21. utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
    22. }

    更新当前的最新版本号。

    1. newResourceVersion := meta.GetResourceVersion()
    2. *resourceVersion = newResourceVersion
    3. r.setLastSyncResourceVersion(newResourceVersion)

    通过对Reflector模块的分析,可以看到多次使用到本地缓存store模块,而store的数据由DeltaFIFO赋值而来,以下针对DeltaFIFO和store做分析。

    4. DeltaFIFO

    DeltaFIFO由NewDeltaFIFO初始化,并赋值给config.Queue。

    1. func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
    2. fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, nil, s.indexer)
    3. cfg := &Config{
    4. Queue: fifo,
    5. ...
    6. }
    7. ...
    8. }

    4.1. NewDeltaFIFO

    1. // NewDeltaFIFO returns a Store which can be used process changes to items.
    2. //
    3. // keyFunc is used to figure out what key an object should have. (It's
    4. // exposed in the returned DeltaFIFO's KeyOf() method, with bonus features.)
    5. //
    6. // 'compressor' may compress as many or as few items as it wants
    7. // (including returning an empty slice), but it should do what it
    8. // does quickly since it is called while the queue is locked.
    9. // 'compressor' may be nil if you don't want any delta compression.
    10. //
    11. // 'keyLister' is expected to return a list of keys that the consumer of
    12. // this queue "knows about". It is used to decide which items are missing
    13. // when Replace() is called; 'Deleted' deltas are produced for these items.
    14. // It may be nil if you don't need to detect all deletions.
    15. // TODO: consider merging keyLister with this object, tracking a list of
    16. // "known" keys when Pop() is called. Have to think about how that
    17. // affects error retrying.
    18. // TODO(lavalamp): I believe there is a possible race only when using an
    19. // external known object source that the above TODO would
    20. // fix.
    21. //
    22. // Also see the comment on DeltaFIFO.
    23. func NewDeltaFIFO(keyFunc KeyFunc, compressor DeltaCompressor, knownObjects KeyListerGetter) *DeltaFIFO {
    24. f := &DeltaFIFO{
    25. items: map[string]Deltas{},
    26. queue: []string{},
    27. keyFunc: keyFunc,
    28. deltaCompressor: compressor,
    29. knownObjects: knownObjects,
    30. }
    31. f.cond.L = &f.lock
    32. return f
    33. }

    controller.Run的部分调用了NewReflector。

    1. func (c *controller) Run(stopCh <-chan struct{}) {
    2. ...
    3. r := NewReflector(
    4. c.config.ListerWatcher,
    5. c.config.ObjectType,
    6. c.config.Queue,
    7. c.config.FullResyncPeriod,
    8. )
    9. ...
    10. }

    NewReflector构造函数,将c.config.Queue赋值给Reflector.store的属性。

    1. func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
    2. return NewNamedReflector(getDefaultReflectorName(internalPackages...), lw, expectedType, store, resyncPeriod)
    3. }
    4. // NewNamedReflector same as NewReflector, but with a specified name for logging
    5. func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
    6. reflectorSuffix := atomic.AddInt64(&reflectorDisambiguator, 1)
    7. r := &Reflector{
    8. name: name,
    9. // we need this to be unique per process (some names are still the same)but obvious who it belongs to
    10. metrics: newReflectorMetrics(makeValidPromethusMetricLabel(fmt.Sprintf("reflector_"+name+"_%d", reflectorSuffix))),
    11. listerWatcher: lw,
    12. store: store,
    13. expectedType: reflect.TypeOf(expectedType),
    14. period: time.Second,
    15. resyncPeriod: resyncPeriod,
    16. clock: &clock.RealClock{},
    17. }
    18. return r
    19. }

    4.2. DeltaFIFO

    DeltaFIFO是一个生产者与消费者的队列,其中Reflector是生产者,消费者调用Pop()的方法。

    DeltaFIFO主要用在以下场景:

    • 希望对象变更最多处理一次
    • 处理对象时,希望查看自上次处理对象以来发生的所有事情
    • 要处理对象的删除
    • 希望定期重新处理对象
    1. // DeltaFIFO is like FIFO, but allows you to process deletes.
    2. //
    3. // DeltaFIFO is a producer-consumer queue, where a Reflector is
    4. // intended to be the producer, and the consumer is whatever calls
    5. // the Pop() method.
    6. //
    7. // DeltaFIFO solves this use case:
    8. // * You want to process every object change (delta) at most once.
    9. // * When you process an object, you want to see everything
    10. // that's happened to it since you last processed it.
    11. // * You want to process the deletion of objects.
    12. // * You might want to periodically reprocess objects.
    13. //
    14. // DeltaFIFO's Pop(), Get(), and GetByKey() methods return
    15. // interface{} to satisfy the Store/Queue interfaces, but it
    16. // will always return an object of type Deltas.
    17. //
    18. // A note on threading: If you call Pop() in parallel from multiple
    19. // threads, you could end up with multiple threads processing slightly
    20. // different versions of the same object.
    21. //
    22. // A note on the KeyLister used by the DeltaFIFO: It's main purpose is
    23. // to list keys that are "known", for the purpose of figuring out which
    24. // items have been deleted when Replace() or Delete() are called. The deleted
    25. // object will be included in the DeleteFinalStateUnknown markers. These objects
    26. // could be stale.
    27. //
    28. // You may provide a function to compress deltas (e.g., represent a
    29. // series of Updates as a single Update).
    30. type DeltaFIFO struct {
    31. // lock/cond protects access to 'items' and 'queue'.
    32. lock sync.RWMutex
    33. cond sync.Cond
    34. // We depend on the property that items in the set are in
    35. // the queue and vice versa, and that all Deltas in this
    36. // map have at least one Delta.
    37. items map[string]Deltas
    38. queue []string
    39. // populated is true if the first batch of items inserted by Replace() has been populated
    40. // or Delete/Add/Update was called first.
    41. populated bool
    42. // initialPopulationCount is the number of items inserted by the first call of Replace()
    43. initialPopulationCount int
    44. // keyFunc is used to make the key used for queued item
    45. // insertion and retrieval, and should be deterministic.
    46. keyFunc KeyFunc
    47. // deltaCompressor tells us how to combine two or more
    48. // deltas. It may be nil.
    49. deltaCompressor DeltaCompressor
    50. // knownObjects list keys that are "known", for the
    51. // purpose of figuring out which items have been deleted
    52. // when Replace() or Delete() is called.
    53. knownObjects KeyListerGetter
    54. // Indication the queue is closed.
    55. // Used to indicate a queue is closed so a control loop can exit when a queue is empty.
    56. // Currently, not used to gate any of CRED operations.
    57. closed bool
    58. closedLock sync.Mutex
    59. }

    4.3. Queue & Store

    DeltaFIFO的类型是Queue接口,Reflector.store是Store接口,Queue接口是一个存储队列,Process的方法执行Queue.Pop出来的数据对象,

    1. // Queue is exactly like a Store, but has a Pop() method too.
    2. type Queue interface {
    3. Store
    4. // Pop blocks until it has something to process.
    5. // It returns the object that was process and the result of processing.
    6. // The PopProcessFunc may return an ErrRequeue{...} to indicate the item
    7. // should be requeued before releasing the lock on the queue.
    8. Pop(PopProcessFunc) (interface{}, error)
    9. // AddIfNotPresent adds a value previously
    10. // returned by Pop back into the queue as long
    11. // as nothing else (presumably more recent)
    12. // has since been added.
    13. AddIfNotPresent(interface{}) error
    14. // Return true if the first batch of items has been popped
    15. HasSynced() bool
    16. // Close queue
    17. Close()
    18. }

    5. store

    Store是一个通用的存储接口,Reflector通过watch server的方式更新数据到store中,store给Reflector提供本地的缓存,让Reflector可以像消息队列一样的工作。

    Store实现的是一种可以准确的写入对象和获取对象的机制。

    1. // Store is a generic object storage interface. Reflector knows how to watch a server
    2. // and update a store. A generic store is provided, which allows Reflector to be used
    3. // as a local caching system, and an LRU store, which allows Reflector to work like a
    4. // queue of items yet to be processed.
    5. //
    6. // Store makes no assumptions about stored object identity; it is the responsibility
    7. // of a Store implementation to provide a mechanism to correctly key objects and to
    8. // define the contract for obtaining objects by some arbitrary key type.
    9. type Store interface {
    10. Add(obj interface{}) error
    11. Update(obj interface{}) error
    12. Delete(obj interface{}) error
    13. List() []interface{}
    14. ListKeys() []string
    15. Get(obj interface{}) (item interface{}, exists bool, err error)
    16. GetByKey(key string) (item interface{}, exists bool, err error)
    17. // Replace will delete the contents of the store, using instead the
    18. // given list. Store takes ownership of the list, you should not reference
    19. // it after calling this function.
    20. Replace([]interface{}, string) error
    21. Resync() error
    22. }

    其中Replace方法会删除原来store中的内容,并将新增的list的内容存入store中,即完全替换数据。

    6.1. cache

    cache实现了store的接口,而cache的具体实现又是调用ThreadSafeStore接口来实现功能的。

    cache的功能主要有以下两点:

    • 通过keyFunc计算对象的key
    • 调用ThreadSafeStorage接口的方法
    1. // cache responsibilities are limited to:
    2. // 1. Computing keys for objects via keyFunc
    3. // 2. Invoking methods of a ThreadSafeStorage interface
    4. type cache struct {
    5. // cacheStorage bears the burden of thread safety for the cache
    6. cacheStorage ThreadSafeStore
    7. // keyFunc is used to make the key for objects stored in and retrieved from items, and
    8. // should be deterministic.
    9. keyFunc KeyFunc
    10. }

    其中ListAndWatch主要用到以下的方法:

    cache.Replace

    1. // Replace will delete the contents of 'c', using instead the given list.
    2. // 'c' takes ownership of the list, you should not reference the list again
    3. // after calling this function.
    4. func (c *cache) Replace(list []interface{}, resourceVersion string) error {
    5. items := map[string]interface{}{}
    6. for _, item := range list {
    7. key, err := c.keyFunc(item)
    8. if err != nil {
    9. return KeyError{item, err}
    10. }
    11. items[key] = item
    12. }
    13. c.cacheStorage.Replace(items, resourceVersion)
    14. return nil
    15. }

    cache.Add

    1. // Add inserts an item into the cache.
    2. func (c *cache) Add(obj interface{}) error {
    3. key, err := c.keyFunc(obj)
    4. if err != nil {
    5. return KeyError{obj, err}
    6. }
    7. c.cacheStorage.Add(key, obj)
    8. return nil
    9. }

    cache.Update

    1. // Update sets an item in the cache to its updated state.
    2. func (c *cache) Update(obj interface{}) error {
    3. key, err := c.keyFunc(obj)
    4. if err != nil {
    5. return KeyError{obj, err}
    6. }
    7. c.cacheStorage.Update(key, obj)
    8. return nil
    9. }

    cache.Delete

    1. // Delete removes an item from the cache.
    2. func (c *cache) Delete(obj interface{}) error {
    3. key, err := c.keyFunc(obj)
    4. if err != nil {
    5. return KeyError{obj, err}
    6. }
    7. c.cacheStorage.Delete(key)
    8. return nil
    9. }

    6.2. ThreadSafeStore

    cache的具体是调用ThreadSafeStore来实现的。

    1. // ThreadSafeStore is an interface that allows concurrent access to a storage backend.
    2. // TL;DR caveats: you must not modify anything returned by Get or List as it will break
    3. // the indexing feature in addition to not being thread safe.
    4. //
    5. // The guarantees of thread safety provided by List/Get are only valid if the caller
    6. // treats returned items as read-only. For example, a pointer inserted in the store
    7. // through `Add` will be returned as is by `Get`. Multiple clients might invoke `Get`
    8. // on the same key and modify the pointer in a non-thread-safe way. Also note that
    9. // modifying objects stored by the indexers (if any) will *not* automatically lead
    10. // to a re-index. So it's not a good idea to directly modify the objects returned by
    11. // Get/List, in general.
    12. type ThreadSafeStore interface {
    13. Add(key string, obj interface{})
    14. Update(key string, obj interface{})
    15. Delete(key string)
    16. Get(key string) (item interface{}, exists bool)
    17. List() []interface{}
    18. ListKeys() []string
    19. Replace(map[string]interface{}, string)
    20. Index(indexName string, obj interface{}) ([]interface{}, error)
    21. IndexKeys(indexName, indexKey string) ([]string, error)
    22. ListIndexFuncValues(name string) []string
    23. ByIndex(indexName, indexKey string) ([]interface{}, error)
    24. GetIndexers() Indexers
    25. // AddIndexers adds more indexers to this store. If you call this after you already have data
    26. // in the store, the results are undefined.
    27. AddIndexers(newIndexers Indexers) error
    28. Resync() error
    29. }

    threadSafeMap

    1. // threadSafeMap implements ThreadSafeStore
    2. type threadSafeMap struct {
    3. lock sync.RWMutex
    4. items map[string]interface{}
    5. // indexers maps a name to an IndexFunc
    6. indexers Indexers
    7. // indices maps a name to an Index
    8. indices Indices
    9. }

    6. processLoop

    1. func (c *controller) Run(stopCh <-chan struct{}) {
    2. ...
    3. wait.Until(c.processLoop, time.Second, stopCh)
    4. }

    在controller.Run方法中会调用processLoop,以下分析processLoop的处理逻辑。

    1. // processLoop drains the work queue.
    2. // TODO: Consider doing the processing in parallel. This will require a little thought
    3. // to make sure that we don't end up processing the same object multiple times
    4. // concurrently.
    5. //
    6. // TODO: Plumb through the stopCh here (and down to the queue) so that this can
    7. // actually exit when the controller is stopped. Or just give up on this stuff
    8. // ever being stoppable. Converting this whole package to use Context would
    9. // also be helpful.
    10. func (c *controller) processLoop() {
    11. for {
    12. obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
    13. if err != nil {
    14. if err == FIFOClosedError {
    15. return
    16. }
    17. if c.config.RetryOnError {
    18. // This is the safe way to re-enqueue.
    19. c.config.Queue.AddIfNotPresent(obj)
    20. }
    21. }
    22. }
    23. }

    processLoop主要处理任务队列中的任务,其中处理逻辑是调用具体的ProcessFunc函数来实现,核心代码为:

    1. obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))

    5.1. DeltaFIFO.Pop

    Pop会阻塞住直到队列里面添加了新的对象,如果有多个对象,按照先进先出的原则处理,如果某个对象没有处理成功会重新被加入该队列中。

    Pop中会调用具体的process函数来处理对象。

    1. // Pop blocks until an item is added to the queue, and then returns it. If
    2. // multiple items are ready, they are returned in the order in which they were
    3. // added/updated. The item is removed from the queue (and the store) before it
    4. // is returned, so if you don't successfully process it, you need to add it back
    5. // with AddIfNotPresent().
    6. // process function is called under lock, so it is safe update data structures
    7. // in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc
    8. // may return an instance of ErrRequeue with a nested error to indicate the current
    9. // item should be requeued (equivalent to calling AddIfNotPresent under the lock).
    10. //
    11. // Pop returns a 'Deltas', which has a complete list of all the things
    12. // that happened to the object (deltas) while it was sitting in the queue.
    13. func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
    14. f.lock.Lock()
    15. defer f.lock.Unlock()
    16. for {
    17. for len(f.queue) == 0 {
    18. // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
    19. // When Close() is called, the f.closed is set and the condition is broadcasted.
    20. // Which causes this loop to continue and return from the Pop().
    21. if f.IsClosed() {
    22. return nil, FIFOClosedError
    23. }
    24. f.cond.Wait()
    25. }
    26. id := f.queue[0]
    27. f.queue = f.queue[1:]
    28. item, ok := f.items[id]
    29. if f.initialPopulationCount > 0 {
    30. f.initialPopulationCount--
    31. }
    32. if !ok {
    33. // Item may have been deleted subsequently.
    34. continue
    35. }
    36. delete(f.items, id)
    37. err := process(item)
    38. if e, ok := err.(ErrRequeue); ok {
    39. f.addIfNotPresent(id, item)
    40. err = e.Err
    41. }
    42. // Don't need to copyDeltas here, because we're transferring
    43. // ownership to the caller.
    44. return item, err
    45. }
    46. }

    核心代码:

    1. for {
    2. ...
    3. item, ok := f.items[id]
    4. ...
    5. err := process(item)
    6. if e, ok := err.(ErrRequeue); ok {
    7. f.addIfNotPresent(id, item)
    8. err = e.Err
    9. }
    10. // Don't need to copyDeltas here, because we're transferring
    11. // ownership to the caller.
    12. return item, err
    13. }

    5.2. HandleDeltas

    1. cfg := &Config{
    2. Queue: fifo,
    3. ListerWatcher: s.listerWatcher,
    4. ObjectType: s.objectType,
    5. FullResyncPeriod: s.resyncCheckPeriod,
    6. RetryOnError: false,
    7. ShouldResync: s.processor.shouldResync,
    8. Process: s.HandleDeltas,
    9. }

    其中process函数就是在sharedIndexInformer.Run方法中,给config.Process赋值的HandleDeltas函数。

    1. func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
    2. s.blockDeltas.Lock()
    3. defer s.blockDeltas.Unlock()
    4. // from oldest to newest
    5. for _, d := range obj.(Deltas) {
    6. switch d.Type {
    7. case Sync, Added, Updated:
    8. isSync := d.Type == Sync
    9. s.cacheMutationDetector.AddObject(d.Object)
    10. if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
    11. if err := s.indexer.Update(d.Object); err != nil {
    12. return err
    13. }
    14. s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
    15. } else {
    16. if err := s.indexer.Add(d.Object); err != nil {
    17. return err
    18. }
    19. s.processor.distribute(addNotification{newObj: d.Object}, isSync)
    20. }
    21. case Deleted:
    22. if err := s.indexer.Delete(d.Object); err != nil {
    23. return err
    24. }
    25. s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
    26. }
    27. }
    28. return nil
    29. }

    核心代码:

    1. switch d.Type {
    2. case Sync, Added, Updated:
    3. ...
    4. if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
    5. ...
    6. s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
    7. } else {
    8. ...
    9. s.processor.distribute(addNotification{newObj: d.Object}, isSync)
    10. }
    11. case Deleted:
    12. ...
    13. s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
    14. }

    根据不同的类型,调用processor.distribute方法,该方法将对象加入processorListener的channel中。

    5.3. sharedProcessor.distribute

    1. func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
    2. p.listenersLock.RLock()
    3. defer p.listenersLock.RUnlock()
    4. if sync {
    5. for _, listener := range p.syncingListeners {
    6. listener.add(obj)
    7. }
    8. } else {
    9. for _, listener := range p.listeners {
    10. listener.add(obj)
    11. }
    12. }
    13. }

    processorListener.add:

    1. func (p *processorListener) add(notification interface{}) {
    2. p.addCh <- notification
    3. }

    综合以上的分析,可以看出processLoop通过调用HandleDeltas,再调用distribute,processorListener.add最终将不同更新类型的对象加入processorListener的channel中,供processorListener.Run使用。以下分析processorListener.Run的部分。

    7. processor

    processor的主要功能就是记录了所有的回调函数实例(即 ResourceEventHandler 实例),并负责触发这些函数。在sharedIndexInformer.Run部分会调用processor.run。

    流程:

    1. listenser的add函数负责将notify装进pendingNotifications。
    2. pop函数取出pendingNotifications的第一个nofify,输出到nextCh channel。
    3. run函数则负责取出notify,然后根据notify的类型(增加、删除、更新)触发相应的处理函数,这些函数是在不同的NewXxxcontroller实现中注册的。
    1. func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
    2. ...
    3. wg.StartWithChannel(processorStopCh, s.processor.run)
    4. ...
    5. }

    7.1. sharedProcessor.Run

    1. func (p *sharedProcessor) run(stopCh <-chan struct{}) {
    2. func() {
    3. p.listenersLock.RLock()
    4. defer p.listenersLock.RUnlock()
    5. for _, listener := range p.listeners {
    6. p.wg.Start(listener.run)
    7. p.wg.Start(listener.pop)
    8. }
    9. }()
    10. <-stopCh
    11. p.listenersLock.RLock()
    12. defer p.listenersLock.RUnlock()
    13. for _, listener := range p.listeners {
    14. close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
    15. }
    16. p.wg.Wait() // Wait for all .pop() and .run() to stop
    17. }

    7.1.1. listener.pop

    pop函数取出pendingNotifications的第一个nofify,输出到nextCh channel。

    1. func (p *processorListener) pop() {
    2. defer utilruntime.HandleCrash()
    3. defer close(p.nextCh) // Tell .run() to stop
    4. var nextCh chan<- interface{}
    5. var notification interface{}
    6. for {
    7. select {
    8. case nextCh <- notification:
    9. // Notification dispatched
    10. var ok bool
    11. notification, ok = p.pendingNotifications.ReadOne()
    12. if !ok { // Nothing to pop
    13. nextCh = nil // Disable this select case
    14. }
    15. case notificationToAdd, ok := <-p.addCh:
    16. if !ok {
    17. return
    18. }
    19. if notification == nil { // No notification to pop (and pendingNotifications is empty)
    20. // Optimize the case - skip adding to pendingNotifications
    21. notification = notificationToAdd
    22. nextCh = p.nextCh
    23. } else { // There is already a notification waiting to be dispatched
    24. p.pendingNotifications.WriteOne(notificationToAdd)
    25. }
    26. }
    27. }
    28. }

    7.1.2. listener.run

    listener.run部分根据不同的更新类型调用不同的处理函数。

    1. func (p *processorListener) run() {
    2. defer utilruntime.HandleCrash()
    3. for next := range p.nextCh {
    4. switch notification := next.(type) {
    5. case updateNotification:
    6. p.handler.OnUpdate(notification.oldObj, notification.newObj)
    7. case addNotification:
    8. p.handler.OnAdd(notification.newObj)
    9. case deleteNotification:
    10. p.handler.OnDelete(notification.oldObj)
    11. default:
    12. utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
    13. }
    14. }
    15. }

    其中具体的实现函数handler是在NewDeploymentController(其他不同类型的controller类似)中赋值的,而该handler是一个接口,具体如下:

    1. // ResourceEventHandler can handle notifications for events that happen to a
    2. // resource. The events are informational only, so you can't return an
    3. // error.
    4. // * OnAdd is called when an object is added.
    5. // * OnUpdate is called when an object is modified. Note that oldObj is the
    6. // last known state of the object-- it is possible that several changes
    7. // were combined together, so you can't use this to see every single
    8. // change. OnUpdate is also called when a re-list happens, and it will
    9. // get called even if nothing changed. This is useful for periodically
    10. // evaluating or syncing something.
    11. // * OnDelete will get the final state of the item if it is known, otherwise
    12. // it will get an object of type DeletedFinalStateUnknown. This can
    13. // happen if the watch is closed and misses the delete event and we don't
    14. // notice the deletion until the subsequent re-list.
    15. type ResourceEventHandler interface {
    16. OnAdd(obj interface{})
    17. OnUpdate(oldObj, newObj interface{})
    18. OnDelete(obj interface{})
    19. }

    7.2. ResourceEventHandler

    以下以DeploymentController的处理逻辑为例。

    NewDeploymentController部分会注册deployment的事件函数,以下注册了三种类型的事件函数,其中包括:dInformer、rsInformer和podInformer。

    1. // NewDeploymentController creates a new DeploymentController.
    2. func NewDeploymentController(dInformer extensionsinformers.DeploymentInformer, rsInformer extensionsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
    3. ...
    4. dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    5. AddFunc: dc.addDeployment,
    6. UpdateFunc: dc.updateDeployment,
    7. // This will enter the sync loop and no-op, because the deployment has been deleted from the store.
    8. DeleteFunc: dc.deleteDeployment,
    9. })
    10. rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    11. AddFunc: dc.addReplicaSet,
    12. UpdateFunc: dc.updateReplicaSet,
    13. DeleteFunc: dc.deleteReplicaSet,
    14. })
    15. podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    16. DeleteFunc: dc.deletePod,
    17. })
    18. ...
    19. }

    7.2.1. addDeployment

    以下以addDeployment为例,addDeployment主要是将对象加入到enqueueDeployment的队列中。

    1. func (dc *DeploymentController) addDeployment(obj interface{}) {
    2. d := obj.(*extensions.Deployment)
    3. glog.V(4).Infof("Adding deployment %s", d.Name)
    4. dc.enqueueDeployment(d)
    5. }

    enqueueDeployment的定义

    1. type DeploymentController struct {
    2. ...
    3. enqueueDeployment func(deployment *extensions.Deployment)
    4. ...
    5. }

    将dc.enqueue赋值给dc.enqueueDeployment

    1. dc.enqueueDeployment = dc.enqueue

    dc.enqueue调用了dc.queue.Add(key)

    1. func (dc *DeploymentController) enqueue(deployment *extensions.Deployment) {
    2. key, err := controller.KeyFunc(deployment)
    3. if err != nil {
    4. utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", deployment, err))
    5. return
    6. }
    7. dc.queue.Add(key)
    8. }

    dc.queue主要记录了需要被同步的deployment的对象,供syncDeployment使用。

    1. dc := &DeploymentController{
    2. ...
    3. queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
    4. }

    NewNamedRateLimitingQueue

    1. func NewNamedRateLimitingQueue(rateLimiter RateLimiter, name string) RateLimitingInterface {
    2. return &rateLimitingType{
    3. DelayingInterface: NewNamedDelayingQueue(name),
    4. rateLimiter: rateLimiter,
    5. }
    6. }

    通过以上分析,可以看出processor记录了不同类似的事件函数,其中事件函数在NewXxxController构造函数部分注册,具体事件函数的处理,一般是将需要处理的对象加入对应的controller的任务队列中,然后由类似syncDeployment的同步函数来维持期望状态的同步逻辑。

    8. 总结

    本文分析的部分主要是k8s的informer机制,即List-Watch机制。

    8.1. Reflector

    Reflector的主要作用是watch指定的k8s资源,并将变化同步到本地是store中。Reflector只会放置指定的expectedType类型的资源到store中,除非expectedType为nil。如果resyncPeriod不为零,那么Reflector为以resyncPeriod为周期定期执行list的操作,这样就可以使用Reflector来定期处理所有的对象,也可以逐步处理变化的对象。

    8.2. ListAndWatch

    ListAndWatch第一次会列出所有的对象,并获取资源对象的版本号,然后watch资源对象的版本号来查看是否有被变更。首先会将资源版本号设置为0,list()可能会导致本地的缓存相对于etcd里面的内容存在延迟,Reflector会通过watch的方法将延迟的部分补充上,使得本地的缓存数据与etcd的数据保持一致。

    8.3. DeltaFIFO

    DeltaFIFO是一个生产者与消费者的队列,其中Reflector是生产者,消费者调用Pop()的方法。

    DeltaFIFO主要用在以下场景:

    • 希望对象变更最多处理一次
    • 处理对象时,希望查看自上次处理对象以来发生的所有事情
    • 要处理对象的删除
    • 希望定期重新处理对象

    8.4. store

    Store是一个通用的存储接口,Reflector通过watch server的方式更新数据到store中,store给Reflector提供本地的缓存,让Reflector可以像消息队列一样的工作。

    Store实现的是一种可以准确的写入对象和获取对象的机制。

    8.5. processor

    processor的主要功能就是记录了所有的回调函数实例(即 ResourceEventHandler 实例),并负责触发这些函数。在sharedIndexInformer.Run部分会调用processor.run。

    流程:

    1. listenser的add函数负责将notify装进pendingNotifications。
    2. pop函数取出pendingNotifications的第一个nofify,输出到nextCh channel。
    3. run函数则负责取出notify,然后根据notify的类型(增加、删除、更新)触发相应的处理函数,这些函数是在不同的NewXxxcontroller实现中注册的。

    processor记录了不同类似的事件函数,其中事件函数在NewXxxController构造函数部分注册,具体事件函数的处理,一般是将需要处理的对象加入对应的controller的任务队列中,然后由类似syncDeployment的同步函数来维持期望状态的同步逻辑。

    8.6. 主要步骤

    1. 在controller-manager的Run函数部分调用了InformerFactory.Start的方法,Start方法初始化各种类型的informer,并且每个类型起了个informer.Run的goroutine。
    2. informer.Run的部分先生成一个DeltaFIFO的队列来存储对象变化的数据。然后调用processor.Run和controller.Run函数。
    3. controller.Run函数会生成一个Reflector,Reflector的主要作用是watch指定的k8s资源,并将变化同步到本地是store中。ReflectorresyncPeriod为周期定期执行list的操作,这样就可以使用Reflector来定期处理所有的对象,也可以逐步处理变化的对象。
    4. Reflector接着执行ListAndWatch函数,ListAndWatch第一次会列出所有的对象,并获取资源对象的版本号,然后watch资源对象的版本号来查看是否有被变更。首先会将资源版本号设置为0,list()可能会导致本地的缓存相对于etcd里面的内容存在延迟,Reflector会通过watch的方法将延迟的部分补充上,使得本地的缓存数据与etcd的数据保持一致。
    5. controller.Run函数还会调用processLoop函数,processLoop通过调用HandleDeltas,再调用distribute,processorListener.add最终将不同更新类型的对象加入processorListener的channel中,供processorListener.Run使用。
    6. processor的主要功能就是记录了所有的回调函数实例(即 ResourceEventHandler 实例),并负责触发这些函数。processor记录了不同类型的事件函数,其中事件函数在NewXxxController构造函数部分注册,具体事件函数的处理,一般是将需要处理的对象加入对应的controller的任务队列中,然后由类似syncDeployment的同步函数来维持期望状态的同步逻辑。

    参考文章:

    • https://github.com/kubernetes/client-go/tree/master/tools/cache

    • https://github.com/kubernetes/sample-controller/blob/master/docs/controller-client-go.md

    • https://github.com/kubernetes/client-go/blob/master/examples/workqueue/main.go