indexer,informer:=cache.NewIndexerInformer(podListWatcher,&v1.Pod{},0,cache.ResourceEventHandlerFuncs{...},cache.Indexers{})funcNewIndexerInformer(lwListerWatcher,objTyperuntime.Object,resyncPeriodtime.Duration,hResourceEventHandler,indexersIndexers,)(Indexer,Controller){// This will hold the client state, as we know it.
clientState:=NewIndexer(DeletionHandlingMetaNamespaceKeyFunc,indexers)returnclientState,newInformer(lw,objType,resyncPeriod,h,clientState)}
從上述的source code中可以看到透過 NewIndexerInformer function 最後會回傳 indexer 以及 controller , indexer 在先前的章節已經討論過,還不了解的小夥伴可以到本篇文章Kubernetes Indexers local cache 之美 (I)複習相關知識,以下會針對 controller 的實作進行琢磨。
Controller
我們先從 Controller 定義了什麼行為開始探討
interface
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Controller is a low-level controller that is parameterized by a
// Config and used in sharedIndexInformer.
typeControllerinterface{// Run does two things. One is to construct and run a Reflector
// to pump objects/notifications from the Config's ListerWatcher
// to the Config's Queue and possibly invoke the occasional Resync
// on that Queue. The other is to repeatedly Pop from the Queue
// and process with the Config's ProcessFunc. Both of these
// continue until `stopCh` is closed.
Run(stopCh<-chanstruct{})//執行一個函數,並且透過stopchannel來決定是否跳出
// HasSynced delegates to the Config's Queue
HasSynced()bool//檢查觀測到 object 是不是同步到 indexer 了
// LastSyncResourceVersion delegates to the Reflector when there
// is one, otherwise returns the empty string
LastSyncResourceVersion()string//觀測到最新的 object version
}
//傳入的參數我們在上一章節有看過,這邊再來複習一次
//傳入監控物件變化的觀察者
//傳入要觀察的物件資料型態
//傳入多久要同步一次
//傳入事件處理器
//傳入本地儲存器(local storage)
funcnewInformer(lwListerWatcher,objTyperuntime.Object,resyncPeriodtime.Duration,hResourceEventHandler,clientStateStore,)Controller{//建立delta fifo queue ,之前章節有探討過delta fifo queue
//不了解的讀者可以回去複習一次
fifo:=NewDeltaFIFOWithOptions(DeltaFIFOOptions{KnownObjects:clientState,EmitDeltaTypeReplaced:true,})//建立controller config的設定檔
cfg:=&Config{Queue:fifo,//使用delta fifo queue
ListerWatcher:lw,//使用觀測哪個物件的 listwatch(e.g. pod configmap e.t.c)
ObjectType:objType,//觀測物件的資料型態(e.g. pod configmap e.t.c)
FullResyncPeriod:resyncPeriod,//多久要重同步一次
RetryOnError:false,//錯誤是否要重試
Process:func(objinterface{})error{//事件處理器
// from oldest to newest
...},}returnNew(cfg)//建立一個實作 controller interface 的物件
}// New makes a new Controller from the given Config.
funcNew(c*Config)Controller{ctlr:=&controller{//建立一個實作 controller 的物件
config:*c,clock:&clock.RealClock{},}returnctlr}
// `*controller` implements Controller
typecontrollerstruct{configConfig//contrller 相關設定
reflector*Reflector//下一章節會展開來看,本章節暫時用不到
reflectorMutexsync.RWMutex//Reflector 讀寫鎖,本章節暫時用不到
clockclock.Clock//同步用
}// Config contains all the settings for one of these low-level controllers.
typeConfigstruct{Queue//這裡要設定 controller 用的 DeltaFIFO Queue
//前幾個章節有詳細的說明,不了解的朋友可以回去複習。
ListerWatcher//前一個章節有帶到,實作監視以及列出特定的資源的物件
ProcessProcessFunc//當物件從 DeltaFIFO Queue 彈出,處理事件的function
// ObjectType is an example object of the type this controller is
// expected to handle. Only the type needs to be right, except
// that when that is `unstructured.Unstructured` the object's
// `"apiVersion"` and `"kind"` must also be right.
ObjectTyperuntime.Object//這個我認為很難理解,要告訴controller即將到來的物件是什麼例如 pod , deployment e.t.c.
FullResyncPeriodtime.Duration//多久要 resync 一次
ShouldResyncShouldResyncFunc//reflector會定期透過ShouldResync function來確定是否重新同步queue,
RetryOnErrorbool//如果為true,則Process()返回錯誤時,需要requeue object。
//看註解這是有爭議的,有些開發者認為要拉到更該高的層次決掉 error的處理方式
WatchErrorHandlerWatchErrorHandler//每當ListAndWatch斷開連接並出現錯誤時會呼叫這個 function 處理。
WatchListPageSizeint64//初始化時設定list watch 的 chunk size.
}//reflector會定期透過ShouldResync function來確定是否重新同步queue,
typeShouldResyncFuncfunc()bool//每當ListAndWatch斷開連接並出現錯誤時調用。
typeWatchErrorHandlerfunc(r*Reflector,errerror)
// Run begins processing items, and will continue until a value is sent down stopCh or it is closed.
// It's an error to call Run more than once.
// Run blocks; call via go.
func(c*controller)Run(stopCh<-chanstruct{}){//錯誤處理,有機會再來談,先不理他
deferutilruntime.HandleCrash()//當收到stop 訊號關閉 delta fifof queue
gofunc(){<-stopChc.config.Queue.Close()}()//建立一個 Reflector ,下一章節會展開討論 Reflector 先不理他
r:=NewReflector(c.config.ListerWatcher,c.config.ObjectType,c.config.Queue,c.config.FullResyncPeriod,)//Reflector 設定 resync 時間
r.ShouldResync=c.config.ShouldResync//Reflector 設定list watch 的 chunk size.
r.WatchListPageSize=c.config.WatchListPageSize//Reflector 設定時間
r.clock=c.clock//Reflector 套用錯誤處理
ifc.config.WatchErrorHandler!=nil{r.watchErrorHandler=c.config.WatchErrorHandler}// todo 我不知道為什麼 controller 綁定 reflector 的時候要加鎖
c.reflectorMutex.Lock()// controller 綁定 Reflector
c.reflector=rc.reflectorMutex.Unlock()//wait.Group 預計未來還會拉出來再講一篇
//簡單來就是被這個 wg 管理的 thread 全部都 done 了之後才會退出 wait
varwgwait.Group//這個function 會啟動一個 thread 並且在裡面呼叫 剛剛建立的 reflector.run 並且傳入 stop channel
//stop channel用來終止 thread
wg.StartWithChannel(stopCh,r.Run)//規律性的呼叫processLoop(),若是收到 stop channel 的訊號就退出
wait.Until(c.processLoop,time.Second,stopCh)//等待所有 wait.Group 的 thread done 才能離開,不然會一直卡在這裡~
wg.Wait()}//會被wait.Until 規律性的呼叫
func(c*controller)processLoop(){for{//從 deltafifo pop 出物件,
//pop 出的事件會交給 config.Process function 處理
obj,err:=c.config.Queue.Pop(PopProcessFunc(c.config.Process))iferr!=nil{//如果 deltafifo queue關閉就退出
iferr==ErrFIFOClosed{return}//如果處理發生錯誤就重新加回 delta fifo queue 中
ifc.config.RetryOnError{// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)}}}}
HasSynced
1
2
3
4
5
6
7
// Returns true once this controller has completed an initial resource listing
//簡單來說就是看Delta FIFO 是不是把資料同步完了
func(c*controller)HasSynced()bool{//委任給Delta FIFO QUEUE的HasSynced()
//不了解的部分可以到前面的章節看一下 Delta FIFO Queue 是怎麼做的
returnc.config.Queue.HasSynced()}