// objectCacheItem is a single item stored in objectCache.
typeobjectCacheItemstruct{refCountint//用來紀錄有多少人引用這個物件,當沒有人引用的時候就可以把 reflector 關掉了
storecache.Store//用來儲存 kubernetes 物件的 local stroage(這裡的 stroe 就是 controller/operaoter 會用到的 store 囉)
hasSyncedfunc()(bool,error)//用來確認 local stroage 有沒有同步
locksync.Mutex//用來防止stop channel重複被呼叫
stopChchanstruct{}//用來傳遞關閉 reflector 的 channel
}
func(i*objectCacheItem)stop()bool{//避免競爭上鎖
i.lock.Lock()deferi.lock.Unlock()//當使用者呼叫關閉 objectCacheItem.stop 時,第一次會進 default 的 select case 。
//關閉 stop channel ,此時 stop channel 會發出訊號給關聯的 reflector 關閉對 kubernetes resource 的追蹤。
select{case<-i.stopCh:// This means that channel is already closed.
returnfalsedefault:close(i.stopCh)returntrue}}
new function
透過 NewObjectCache function 產出符合 store interface 的物件也就是會建立一個 ObjectCache 物件,這裡會把一些 function 帶入,例如要監控什麼物件他的 list 條件是什麼、他的 watch 條件是什麼以及 GVS 是什麼。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// NewObjectCache returns a new watch-based instance of Store interface.
funcNewObjectCache(listObjectlistObjectFunc,//使用者會傳入要怎麼列出物件
watchObjectwatchObjectFunc,//使用者會傳入要怎麼監控物件
newObjectnewObjectFunc,//reflector 要用的 object store是哪一個
isImmutableisImmutableFunc,//使用者會傳入怎麼判斷這個物件是不是 Immutable
groupResourceschema.GroupResource)Store{//使用者會傳入物件的GVS
//最後 new fucntion 回傳實作 store interface 的 objectCache 物件
return&objectCache{listObject:listObject,watchObject:watchObject,newObject:newObject,isImmutable:isImmutable,groupResource:groupResource,items:make(map[objectKey]*objectCacheItem),}}
impliment
看完了 objectCache 物件的參數定義與如何 new 出物件後,接著就可以來了解實作的部分囉!
還記得我們在 newObjectCache 的時候有把 list object function 帶入嗎?那一個 list object function 到這裡才派上用, reflector 在用的時候會像是這樣的呼叫練。
當 reflector 呼叫 list function 會先經由-------->newReflector裡面的listFunc<加了 field selector >進行處理
newReflector裡面的listFunc<加了 field selector >處理完後會再丟給--------> 使用者定義的 list watcher function 做最後的處理。
DeleteReference
透過這個 function 減少 reflector 關聯到的物件數量,當 reflector 沒有跟其他物件有所關聯時需要停止 reflector ~
我們先來複習一下 cacheBasedManager 是在哪一段呼叫 objectCache 的 DeleteReference function 如何解除關聯的。
cacheBasedManager 主要透過 UnregisterPod 這個 function 來刪除 reflector 關聯的數量。
func(c*cacheBasedManager)UnregisterPod(pod*v1.Pod){varprev*v1.Pod//以 pod spc 中的 namespace 與 pod name 封裝為 object key
key:=objectKey{namespace:pod.Namespace,name:pod.Name}//防止競爭加鎖
c.lock.Lock()deferc.lock.Unlock()//透過 pod spec 中的 namespace 與 pod name 作為 key 取得 registeredPods map 中對應的資料
prev=c.registeredPods[key]//透過 pod spec 中的 namespace 與 pod name 作為 key 刪除 registeredPods map 中對應的資料
delete(c.registeredPods,key)//如果有資料的話,要刪除對應的 `Reflector` ,這裡也是用到 getReferencedObjects 去解析 pod spec 的每個欄位
ifprev!=nil{forname:=rangec.getReferencedObjects(prev){c.objectStore.DeleteReference(prev.Namespace,name)}}}
func(c*objectCache)Get(namespace,namestring)(runtime.Object,error){//透過 namespace 與物件名稱 name 建立一個 object key 用以後續從 map 中查詢對應的 reference
key:=objectKey{namespace:namespace,name:name}//有可能同時多個 thread 在進行 get 操作所以要上鎖避免競爭
c.lock.RLock()//先把資料讀出來就可以解鎖囉
item,exists:=c.items[key]c.lock.RUnlock()//如果透過 object key 找不到對應的資料表示...阿就還沒加過 reference xD
if!exists{returnnil,fmt.Errorf("object %q/%q not registered",namespace,name)}//會立刻嘗試檢查 reflector 是不是已經同步了,關於怎麼確認是不是已經同步可以參考本篇文章 https://blog.jjmengze.website/posts/kubernetes/source-code/controller/deltafifo/kubernetes-delta-fifo-queue/#impliment
iferr:=wait.PollImmediate(10*time.Millisecond,time.Second,item.hasSynced);err!=nil{returnnil,fmt.Errorf("failed to sync %s cache: %v",c.groupResource.String(),err)}//透過 object key 取得 object 怎麼取得的可以參考之前的文章,會從reflector 的 storage 透過 object key 把物件取出。
//這裡有個小 tip 就是我們 reflector 再把物件放入 store 的時候是透過 indexed function 計算後放入 store 的
//所以我們再取出的時候一樣要先透過 indexed function 算出物件的位置。
obj,exists,err:=item.store.GetByKey(c.key(namespace,name))iferr!=nil{returnnil,err}if!exists{returnnil,apierrors.NewNotFound(c.groupResource,name)}//因為 storage 儲存的是 interface 什麼東西都可以放進去,我們要先判物件是不是 runtime.Object 型態。
ifobject,ok:=obj.(runtime.Object);ok{// If the returned object is immutable, stop the reflector.
//
// NOTE: we may potentially not even start the reflector if the object is
// already immutable. However, given that:
// - we want to also handle the case when object is marked as immutable later
// - Secrets and ConfigMaps are periodically fetched by volumemanager anyway
// - doing that wouldn't provide visible scalability/performance gain - we
// already have it from here
// - doing that would require significant refactoring to reflector
// we limit ourselves to just quickly stop the reflector here.
//會先檢查有沒有開啟 FeatureGate ImmutableEphemeralVolumes
//以及物件是否有明確標注處於 immutable = true ,若是有這兩種情況同時存在
//就停止監控 kubernetes 物件,因為物件已經處於 immutable 狀態
ifutilfeature.DefaultFeatureGate.Enabled(features.ImmutableEphemeralVolumes)&&c.isImmutable(object){ifitem.stop(){klog.V(4).Infof("Stopped watching for changes of %q/%q - object is immutable",namespace,name)}}returnobject,nil}returnnil,fmt.Errorf("unexpected object type: %v",obj)}// 因為我們 storage 用的 index function 是 MetaNamespaceKeyFunc 所以我們要從 storage 把 object 拿出來的時候要符合 MetaNamespaceKeyFunc 的格式。
func(c*objectCache)key(namespace,namestring)string{iflen(namespace)>0{returnnamespace+"/"+name}returnname}
回收伏筆
上面再說明 objectCache 時有賣一個關子,從註解來看 objectCache 是一個透過獨立 watcher 傳播物件的 local cache 。
// objectCache is a local cache of objects propagated via individual watches.
typeobjectCachestruct{listObjectlistObjectFunc//列出 kubernetes 物件的過濾方法
watchObjectwatchObjectFunc//監控 kubernetes 物件的過濾方法
...
這裡終於可以回收為什麼 objectCache 是一個透過獨立 watcher 傳播物件的 local cache 了!
isImmutableFunc
最後要討論的就是在 Get function 會把 isImmutableFunc 作為判斷 object 是否為 Immutable 物件的方法。
對應到的 code 是以下這一段。 soure code
1
2
3
4
5
6
7
ifutilfeature.DefaultFeatureGate.Enabled(features.ImmutableEphemeralVolumes)&&c.isImmutable(object){ifitem.stop(){klog.V(4).Infof("Stopped watching for changes of %q/%q - object is immutable",namespace,name)}}returnobject,nil}