DeltaFIFO 還實作了 KeyListerGetter interface ,對 ListKeys() 以及 GetByKey() 這兩個 function 還有印象的地方應該是在 Store interface 裡面,如果忘記了可以回去複習一下。 source code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// A KeyListerGetter is anything that knows how to list its keys and look up by key.
typeKeyListerGetterinterface{KeyLister//繼承 KeyLister
KeyGetter//繼承 KeyGetter
}// A KeyLister is anything that knows how to list its keys.
typeKeyListerinterface{ListKeys()[]string//回傳所有的 object key
}// A KeyGetter is anything that knows how to get the value stored under a given key.
typeKeyGetterinterface{// GetByKey returns the value associated with the key, or sets exists=false.
GetByKey(keystring)(valueinterface{},existsbool,errerror)//透過object key 取的 object
}
//Deltas 是一個 slice 的資料結構,再往下看 slice 裡面存了什麼
typeDeltas[]Delta// slice 裡面存了 DeltaType 以及 Object
typeDeltastruct{TypeDeltaTypeObjectinterface{}}// DeltaType 表示 Object 的變化型態為 string
typeDeltaTypestring//事件總共會有五種,分別是 Add 事件,updated 事件, Deleted 事件 , Replaced 事件 以及 Sync事件。
const(AddedDeltaType="Added"UpdatedDeltaType="Updated"DeletedDeltaType="Deleted"// Replaced is emitted when we encountered watch errors and had to do a
// relist. We don't know if the replaced object has changed.
//
// NOTE: Previous versions of DeltaFIFO would use Sync for Replace events
// as well. Hence, Replaced is only emitted when the option
// EmitDeltaTypeReplaced is true.
ReplacedDeltaType="Replaced"// Sync is for synthetic events during a periodic resync.
SyncDeltaType="Sync")//取出最舊一筆資料
func(dDeltas)Oldest()*Delta{iflen(d)>0{return&d[0]}returnnil}//取出最新一筆資料
func(dDeltas)Newest()*Delta{ifn:=len(d);n>0{return&d[n-1]}returnnil}//複製一組 deltas 的資料(因為是複製的所以就算更改了複製的資料也跟原先那組 deltas 沒有關係)
funccopyDeltas(dDeltas)Deltas{d2:=make(Deltas,len(d))copy(d2,d)returnd2}
// MetaNamespaceKeyFunc is a convenient default KeyFunc which knows how to make
// keys for API objects which implement meta.Interface.
// The key uses the format <namespace>/<name> unless <namespace> is empty, then
// it's just <name>.
funcMetaNamespaceKeyFunc(objinterface{})(string,error){ifkey,ok:=obj.(ExplicitKey);ok{returnstring(key),nil}meta,err:=meta.Accessor(obj)iferr!=nil{return"",fmt.Errorf("object has no meta: %v",err)}iflen(meta.GetNamespace())>0{returnmeta.GetNamespace()+"/"+meta.GetName(),nil}returnmeta.GetName(),nil}
impliment
Close
1
2
3
4
5
6
7
// Close the queue.
func(f*DeltaFIFO)Close(){f.lock.Lock()//鎖不解釋
deferf.lock.Unlock()//退出解鎖不解釋
f.closed=true//設置關閉標記
f.cond.Broadcast()//通知所有等待的工人
}
KeyOf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// KeyOf exposes f's keyFunc, but also detects the key of a Deltas object or
// DeletedFinalStateUnknown objects.
// Keyof 就是把 DeltaFIFO 的私有變數 keyfunc 變形一個暴露,並且檢測 Object 轉換成Deltas 型態與 DeletedFinalStateUnknown 是否會出現問題
func(f*DeltaFIFO)KeyOf(objinterface{})(string,error){//嘗試轉換型態
ifd,ok:=obj.(Deltas);ok{iflen(d)==0{return"",KeyError{obj,ErrZeroLengthDeltasObject}}//取得 Deltas 中最新一筆資料
obj=d.Newest().Object}//嘗試轉換型態,如果是 DeletedFinalStateUnknown 型態的話,就直接回傳Object key,關於 DeletedFinalStateUnknown 是什麼後面會提到。
ifd,ok:=obj.(DeletedFinalStateUnknown);ok{returnd.Key,nil}returnf.keyFunc(obj)}
func(f*DeltaFIFO)queueActionLocked(actionTypeDeltaType,objinterface{})error{//計算 object key
id,err:=f.KeyOf(obj)iferr!=nil{returnKeyError{obj,err}}//從 deltas item map 中取出舊的 deltas 資料
oldDeltas:=f.items[id]//把目標物件轉換成 delta 型態並且加入到既有的 deltas 資料後面
newDeltas:=append(oldDeltas,Delta{actionType,obj})//去除冗余的 deltas 資料
newDeltas=dedupDeltas(newDeltas)//去除冗余的 deltass 資料後,還要判斷 deltas 資料是不是被清空了(不過soure code 的註解寫不會發生這種情況XD)
iflen(newDeltas)>0{//判斷 deltas item map 有沒有資料,如果沒有資料的話,就要加到 fifo queue 中
if_,exists:=f.items[id];!exists{f.queue=append(f.queue,id)}// 如果 deltas item map 有資料表示 queue 裡面已經有了(還沒取走),只要更新 delta item map就好
f.items[id]=newDeltas//發出果廣播告知卡在pop的人可以醒來取貨囉!
f.cond.Broadcast()}else{// This never happens, because dedupDeltas never returns an empty list
// when given a non-empty list (as it is here).
// If somehow it happens anyway, deal with it but complain.
//上面原先的註解我就不拿掉,表示去除冗余資料後不會把 deltas slice 清空~如果真的發生的清空的話舊回報給社群然後還是要做處理xD
ifoldDeltas==nil{klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring",id,oldDeltas,obj)returnnil}klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; breaking invariant by storing empty Deltas",id,oldDeltas,obj)f.items[id]=newDeltasreturnfmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas",id,oldDeltas,obj)}returnnil}//去除 delta 冗余資料
funcdedupDeltas(deltasDeltas)Deltas{//判斷 deltas slice 長度 ,若是小 2 沒什麼好去除的啊
n:=len(deltas)ifn<2{returndeltas}//拿出倒數兩個來處理
a:=&deltas[n-1]b:=&deltas[n-2]//進行冗余比較
ifout:=isDup(a,b);out!=nil{// `a` and `b` are duplicates. Only keep the one returned from isDup().
// TODO: This extra array allocation and copy seems unnecessary if all we do to dedup is compare the new delta with the last element in `items`, which could be done by mutating `items` directly.
// Might be worth profiling and investigating if it is safe to optimize.
d:=append(Deltas{},deltas[:n-2]...)returnappend(d,*out)}returndeltas}//確認 a b delta 事件是否一樣,目前只能判斷 delete delta 事件
funcisDup(a,b*Delta)*Delta{// 判斷 deletea delta 事件
ifout:=isDeletionDup(a,b);out!=nil{returnout}// TODO: Detect other duplicate situations? Are there any?
returnnil}// 判斷 delete delta 事件
funcisDeletionDup(a,b*Delta)*Delta{//只要兩個其中一個不是 delete delta 事件,那就不是一樣的事件
ifb.Type!=Deleted||a.Type!=Deleted{returnnil}// 如果最後一個狀態是 DeletedFinalStateUnknown ,就回傳在奧數第二個狀態 a
if_,ok:=b.Object.(DeletedFinalStateUnknown);ok{returna}returnb}
這裡有幾個部分直得我們思考
為什麼 deltas 要做冗余合併?
什麼樣的 deltas 資料可以進行冗余合併?
我自己認為上述兩個問題得答案分別是
由於加入 delta fifo 的速度與拿出 delta fifo 的速度是不一致的
watch kubernetes api (e.g. pod , deployment , service e.t.c)視為放入 delta fifo
// Replace atomically does two things: (1) it adds the given objects using the Sync or Replace DeltaType and then (2) it does some deletions.
// In particular: for every pre-existing key K that is not the key of
// an object in `list` there is the effect of
// `Delete(DeletedFinalStateUnknown{K, O})` where O is current object
// of K. If `f.knownObjects == nil` then the pre-existing keys are
// those in `f.items` and the current object of K is the `.Newest()`
// of the Deltas associated with K. Otherwise the pre-existing keys
// are those listed by `f.knownObjects` and the current object of K is
// what `f.knownObjects.GetByKey(K)` returns.
// replace做的有兩件事:(1)使用 Sync 或 Replace DeltaType 事件(2)刪除一些就的物件。
func(f*DeltaFIFO)Replace(list[]interface{},resourceVersionstring)error{f.lock.Lock()deferf.lock.Unlock()//建立key set
keys:=make(sets.String,len(list))// 使用 sync 標記
action:=Sync//若是設定為可以用replace,那標記成replace
iff.emitDeltaTypeReplaced{action=Replaced}// 為每個Object 轉換成 replace/sync delta 型態
for_,item:=rangelist{//計算 object key
key,err:=f.KeyOf(item)iferr!=nil{returnKeyError{item,err}}//插入object key
keys.Insert(key)//處理 object key enqueue 以及設定 object ket map 的 delta object 對應
iferr:=f.queueActionLocked(action,item);err!=nil{returnfmt.Errorf("couldn't enqueue object: %v",err)}}//如果沒有設定indexer的話,就要對自己儲存的進行檢查
iff.knownObjects==nil{// Do deletion detection against our own list.
queuedDeletions:=0//遞迴所有的 deltas items map
fork,oldItem:=rangef.items{//如果新的 key set 裡面有 deltas map 所包含的 key 的話,就忽略。(deltas items map 有 新的 key set 的 key 下次只要更新就好)
ifkeys.Has(k){continue}// Delete pre-existing items not in the new list.
// This could happen if watch deletion event was missed while
// disconnected from apiserver.
//刪除不再新的 object key sey 內的 object ,並將其標記成 DeletedFinalStateUnknown。
vardeletedObjinterface{}ifn:=oldItem.Newest();n!=nil{deletedObj=n.Object}//累加不在新的 object key sey 內的 object,總共有多少個
queuedDeletions++//處理 object key enqueue 以及設定 object ket map 的 delta object 對應(設定成DeletedFinalStateUnknown)
iferr:=f.queueActionLocked(Deleted,DeletedFinalStateUnknown{k,deletedObj});err!=nil{returnerr}}//如果populated 為 false 表示還沒有人操作(add delete update)過 delta fifo,這是第一次 replace。
if!f.populated{//標記inqueue
f.populated=true// While there shouldn't be any queued deletions in the initial population of the queue, it's better to be on the safe side.
//initialPopulationCount 表示初始化要pop多少數量是由queue的長度決定
//這邊比較弔詭的是如果沒有 delete 操作過()也就是 populated=false 那應該不會有 queuedDeletions(也就是queuedDeletions=0) ,註解是寫為了保險~
f.initialPopulationCount=len(list)+queuedDeletions}returnnil}// 從 indexer 中取得所有的 object key
knownKeys:=f.knownObjects.ListKeys()queuedDeletions:=0// 遞迴 indexer 中所有的 object key
for_,k:=rangeknownKeys{//如果新的object key set 有indexer object key 的話就忽略不處理(表示已經有delta item maps 已經有資料了)
ifkeys.Has(k){continue}// 新的 object key set 沒有 indexer object key 的話
// 從 indexer 透過 object key 取的 object
deletedObj,exists,err:=f.knownObjects.GetByKey(k)iferr!=nil{deletedObj=nilklog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object",err,k)}elseif!exists{deletedObj=nilklog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object",k)}// 計數 indexer 有的 key ,但新的 object key set 沒有的
queuedDeletions++//處理 object key enqueue 以及設定 object ket map 的 delta object 對應(設定成DeletedFinalStateUnknown)
iferr:=f.queueActionLocked(Deleted,DeletedFinalStateUnknown{k,deletedObj});err!=nil{returnerr}}//如果populated 為 false 表示還沒有人操作(add delete update)過 delta fifo,這是第一次 replace。
if!f.populated{f.populated=true//這邊比較弔詭的是如果沒有 delete 操作過()也就是 populated=false 那應該不會有 queuedDeletions(也就是queuedDeletions=0) ,註解是寫為了保險~
f.initialPopulationCount=len(list)+queuedDeletions}returnnil}
以上就是 store 的實作,說簡單不簡單說困難倒也還好。就是要花點時間整理思路了解到底儲存了什麼,怎麼存的已經批次取代要標記成DeletedFinalStateUnknown的狀態。
func(f*DeltaFIFO)Pop(processPopProcessFunc)(interface{},error){f.lock.Lock()deferf.lock.Unlock()//不斷循環等有資料可以取
for{//如果 fifo queue 沒有資料了
forlen(f.queue)==0{//需要先判斷 fifo queue 是不是已經關閉了
iff.closed{returnnil,ErrFIFOClosed}//工人需要在這裡等有資料,在加入資料時工人會被喚醒
f.cond.Wait()}//把第一筆資料pop出來
id:=f.queue[0]f.queue=f.queue[1:]//todo
iff.initialPopulationCount>0{f.initialPopulationCount--}//從 deltas item map 找到對應的 delta slice
item,ok:=f.items[id]//註解也寫了這不會發生xD,deltas items map一定找得到資料的意思(但也不處理錯誤讓他跳過去)
if!ok{// This should never happen
klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.",id)continue}//pop出一個 delta 資料後 需要把 delta 從 deltas items map刪除
delete(f.items,id)//外部處理 delta 資料的 function
err:=process(item)//如果有錯就需要從新加入 queue 中
ife,ok:=err.(ErrRequeue);ok{f.addIfNotPresent(id,item)err=e.Err}// Don't need to copyDeltas here, because we're transferring ownership to the caller.
returnitem,err}}
在 process delta 資料的時候若是發生錯誤,需要再把資料加入 fifo queue 中,我們來看一下怎麼再把資料重新加到 fifo queue 中。
// AddIfNotPresent inserts an item, and puts it in the queue. If the item is already
// present in the set, it is neither enqueued nor added to the set.
//
// This is useful in a single producer/consumer scenario so that the consumer can
// safely retry items without contending with the producer and potentially enqueueing
// stale items.
//
// Important: obj must be a Deltas (the output of the Pop() function). Yes, this is
// different from the Add/Update/Delete functions.
func(f*DeltaFIFO)AddIfNotPresent(objinterface{})error{//把物件轉換成 deltas 型態
deltas,ok:=obj.(Deltas)if!ok{returnfmt.Errorf("object must be of type deltas, but got: %#v",obj)}//取得 deltas 物件最新的 object key
id,err:=f.KeyOf(deltas.Newest().Object)iferr!=nil{returnKeyError{obj,err}}f.lock.Lock()deferf.lock.Unlock()//透過私有方法重新enque物件
f.addIfNotPresent(id,deltas)returnnil}// addIfNotPresent inserts deltas under id if it does not exist, and assumes the caller
// already holds the fifo lock.
func(f*DeltaFIFO)addIfNotPresent(idstring,deltasDeltas){f.populated=true//標記有 object inqueue
if_,exists:=f.items[id];exists{//需要先確認 object 使否存在於 deltas map 中,如果存在表示 fifo queue 裡面還有資料
return}//重新加入 fifo queue
f.queue=append(f.queue,id)//對應deltas 與 deltas map 關係
f.items[id]=deltas//通知pop工人可以來取資料
f.cond.Broadcast()}
func(f*DeltaFIFO)Resync()error{f.lock.Lock()deferf.lock.Unlock()//如果沒有 indexer 就不需要處理
iff.knownObjects==nil{returnnil}//從 indexer 中取出每一個 object key set
keys:=f.knownObjects.ListKeys()//遞迴 indexer object key set
for_,k:=rangekeys{//取出 object key 透過 syncKeyLocked ,下面會看到~
iferr:=f.syncKeyLocked(k);err!=nil{returnerr}}returnnil}//實作 object sync 的方法~
func(f*DeltaFIFO)syncKeyLocked(keystring)error{//透過 indexer 使用 object 取得對應的 object key
obj,exists,err:=f.knownObjects.GetByKey(key)iferr!=nil{klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync",err,key)returnnil}elseif!exists{klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync",key)returnnil}// If we are doing Resync() and there is already an event queued for that object,
// we ignore the Resync for it. This is to avoid the race, in which the resync
// comes with the previous value of object (since queueing an event for the object
// doesn't trigger changing the underlying store <knownObjects>.
//透過key func 計算 object key
id,err:=f.KeyOf(obj)iferr!=nil{returnKeyError{obj,err}}//如果 deltas item map 已經有資料了就不處理
iflen(f.items[id])>0{returnnil}//標記已sync的方式加入fifo queue中
iferr:=f.queueActionLocked(Sync,obj);err!=nil{returnfmt.Errorf("couldn't queue object: %v",err)}returnnil}
funcTestDeltaFIFO_ReplaceMakesDeletionsReplaced(t*testing.T){//先建立一個 delta fifo queue,怎麼計算 object key 的不重要,另外還放入了類似 indexer 的物件(實作 KeyListerGetter interface)
//上面有提過可以再回去看一下
//另外這邊打開了 EmitDeltaTypeReplaced
f:=NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction:testFifoObjectKeyFunc,KnownObjects:literalListerGetter(func()[]testFifoObject{return[]testFifoObject{mkFifoObj("foo",5),mkFifoObj("bar",6),mkFifoObj("baz",7)}}),EmitDeltaTypeReplaced:true,})//加入一個刪除的事件
f.Delete(mkFifoObj("baz",10))//觸發replace事件,帶入要replace的物件
f.Replace([]interface{}{mkFifoObj("foo",6)},"0")//預期replace後的結果,基本上就是把指定要replace的物件 inqueue
//indexer 內 replace 有的就標記成replaced
//indexer 內 replace 沒有的就標記成deleted
//中間如有 add 或是 update 過就會有兩筆資料,一筆是 add/update 一筆是 deleted~
expectedList:=[]Deltas{{{Deleted,mkFifoObj("baz",10)}},{{Replaced,mkFifoObj("foo",6)}},// Since "bar" didn't have a delete event and wasn't in the Replace list
// it should get a tombstone key with the right Obj.
{{Deleted,DeletedFinalStateUnknown{Key:"bar",Obj:mkFifoObj("bar",6)}}},}for_,expected:=rangeexpectedList{cur:=Pop(f).(Deltas)ife,a:=expected,cur;!reflect.DeepEqual(e,a){t.Errorf("Expected %#v, got %#v",e,a)}}}
從上述幾個 test case ,可以看到 DeltaFIFO 基本上怎麼使用,需要承上啟下了解 DeltaFIFO 在 controler/operator 的作用還需要了解其他元件如 Reflector怎麼把資料送進來的,所以這裡怎麼使用 Delta FIFO 使用 test case 來呈現。