// Indexer extends Store with multiple indices and restricts each
// accumulator to simply hold the current object (and be empty after
// Delete).
//
// There are three kinds of strings here:
// 1. a storage key, as defined in the Store interface,
// 2. a name of an index, and
// 3. an "indexed value", which is produced by an IndexFunc and
// can be a field value or any other string computed from the object.
typeIndexerinterface{//嵌入了stroe,擁有stroe全部的方法
Store// 回傳在indexName內與給定的Object有交集的所有object
Index(indexNamestring,objinterface{})([]interface{},error)// 回傳在indexname與透過indexed取的所有關聯的Object Key
IndexKeys(indexName,indexedValuestring)([]string,error)// 回傳在indexName所有的indexed
ListIndexFuncValues(indexNamestring)[]string// 回傳在indexName與透過indexed回傳所有關聯的object
ByIndex(indexName,indexedValuestring)([]interface{},error)// 回傳indexer
GetIndexers()Indexers// 新增indexer
AddIndexers(newIndexersIndexers)error}
typecachestruct{// cacheStorage bears the burden of thread safety for the cache
cacheStorageThreadSafeStore//一個 thread safe 的 interface ,稍後會解釋。
// keyFunc is used to make the key for objects stored in and retrieved from items, and
// should be deterministic.
keyFuncKeyFunc//計算object key的方法,稍後會解釋
}
typeThreadSafeStoreinterface{Add(keystring,objinterface{})Update(keystring,objinterface{})Delete(keystring)Get(keystring)(iteminterface{},existsbool)List()[]interface{}ListKeys()[]stringReplace(map[string]interface{},string)Index(indexNamestring,objinterface{})([]interface{},error)IndexKeys(indexName,indexKeystring)([]string,error)ListIndexFuncValues(namestring)[]stringByIndex(indexName,indexKeystring)([]interface{},error)GetIndexers()Indexers// AddIndexers adds more indexers to this store. If you call this after you already have data
// in the store, the results are undefined.
AddIndexers(newIndexersIndexers)error// Resync is a no-op and is deprecated
Resync()error}
快速地看過一次應該會覺得跟上面提到的 Store Interface 非常相似,我認為大致上的差異在於 ThreadSafeStroe 主要針對 Index 做處理,我們看一下UML會更加清楚。
// threadSafeMap implements ThreadSafeStore
typethreadSafeMapstruct{locksync.RWMutex//讀寫鎖(可能讀多寫少?)
itemsmap[string]interface{}//儲存Object Key:Object
// indexers maps a name to an IndexFunc
indexersIndexers//用以計算indexed的function
// indices maps a name to an Index
indicesIndices//indexed map透過這個表獲取Onject Key
}
我知道頭很痛,又出現 Index , indexed 這一坨東西,我會盡力講解些東西的關聯,先看看在 Indexers Indices 這些東西在 client go 裡面的定義吧,從中我們可以獲取一些靈感。
// IndexFunc knows how to compute the set of indexed values for an object.
typeIndexFuncfunc(objinterface{})([]string,error)//傳入Obaject會幫你算出多種[]indexed-name
// Index maps the indexed value to a set of keys in the store that match on that value
typeIndexmap[string]sets.String//每個indexed-name會對應到一個[set]object key set
//inedxed->object key set
// Indexers maps a name to a IndexFunc
typeIndexersmap[string]IndexFunc//計算 indexed 的 function 們(透過 name 去分類)
//index name->index function->index
// Indices maps a name to an Index
typeIndicesmap[string]Index//index name(function name)->index[indexed-name]->[set]object key
把上面的source code轉換成圖形應該會變成這樣子,
從最上面的部分往下看
Indexer 裡面有許多 index-name 分別對應著不同的 index function
index function 算出多個 indexed-name
從 Indices 內透過 index-name 可以拿到一個 Index-name 所對應的 index
// NewStore returns a Store implemented simply with a map and a lock.
funcNewStore(keyFuncKeyFunc)Store{return&cache{cacheStorage:NewThreadSafeStore(Indexers{},Indices{}),keyFunc:keyFunc,}}
// ListKeys returns a list of all the keys of the objects currently
// in the threadSafeMap.
func(c*threadSafeMap)ListKeys()[]string{c.lock.RLock()//讀操作,用讀鎖
deferc.lock.RUnlock()//解鎖
//for跑一次map全部加入到slice就好
list:=make([]string,0,len(c.items))forkey:=rangec.items{list=append(list,key)}returnlist//回傳slice
}
func(c*threadSafeMap)Replace(itemsmap[string]interface{},resourceVersionstring){c.lock.Lock()//鎖不解釋
deferc.lock.Unlock()//退出解鎖
c.items=items//覆蓋整個紀錄
// rebuild any index
c.indices=Indices{}//indexed 索引重建
forkey,item:=rangec.items{//更新indexed索引列表
c.updateIndices(nil,item,key)}}
接下來的部分會著重在 indexed 的計算
Index
透過 index-name 取得indexer中的 object key 計算 function 算出對應的 indexed value , 透過對應的 indices-name 像 indices 取的對應的 index 。
使用對應的 index 以及計算出來的 indexed value取得 object key set ,遞迴跑一是 key set 從 map 中一一取得對應的物件。 source-code
// Index returns a list of items that match the given object on the index function.
// Index is thread-safe so long as you treat all items as immutable.
//透過index name取得indexed後
func(c*threadSafeMap)Index(indexNamestring,objinterface{})([]interface{},error){c.lock.Lock()//鎖不解釋
deferc.lock.Unlock()//退出解鎖
indexFunc:=c.indexers[indexName]//透過index name從indexer中取得對應的infexed function
ifindexFunc==nil{returnnil,fmt.Errorf("Index with name %s does not exist",indexName)}indexedValues,err:=indexFunc(obj)//透過indexed function 計算indexed
iferr!=nil{returnnil,err}index:=c.indices[indexName]//透過index name 從 indices 拿對應的 index
varstoreKeySetsets.String//object key的集合
iflen(indexedValues)==1{//大多數情狂indexed 只會有一個
// In majority of cases, there is exactly one value matching.
// Optimize the most common path - deduping is not needed here.
storeKeySet=index[indexedValues[0]]//透過indexed從index拿到對應的 object key set
}else{//不知道什麼情況會跑進去這裡,我用dlv跑測試沒有跑進這裡過...以下是看註解猜測的
// Need to de-dupe the return list.
// Since multiple keys are allowed, this can happen.
storeKeySet=sets.String{}for_,indexedValue:=rangeindexedValues{//遞迴index所有的indexed,並且將數值插入set中
forkey:=rangeindex[indexedValue]{storeKeySet.Insert(key)}}}list:=make([]interface{},0,storeKeySet.Len())//Objest set 取出所有的 Object key
//將map有對應到的 Object 放入list 回傳
forstoreKey:=rangestoreKeySet{list=append(list,c.items[storeKey])}returnlist,nil}
看完了Index的實作,我是這樣猜的透過一個物件算出 Indexed-name,在 Indices 中 index name 對應到一組 Index,最後再從這個 Index 中透過 Indexed name拿到關聯的Object。
dlv 實際拿到的數值 … todo
IndexKeys
透過 index name 從 indexer 中拿到計算 indexe value 的 function ,接著從 indices 使用 index name 拿到到對應的 index 。
使用算出來的 index value 從 index 拿到所有的 object key set 。 source-code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// IndexKeys returns a list of the Store keys of the objects whose indexed values in the given index include the given indexed value.
// IndexKeys is thread-safe so long as you treat all items as immutable.
func(c*threadSafeMap)IndexKeys(indexName,indexedValuestring)([]string,error){c.lock.RLock()//讀操作,用讀鎖
deferc.lock.RUnlock()//解鎖
indexFunc:=c.indexers[indexName]//透過 index name從indexer中取得對應的infexed function
ifindexFunc==nil{returnnil,fmt.Errorf("Index with name %s does not exist",indexName)}index:=c.indices[indexName]//透過 index name 從 indices 拿對應的 index
set:=index[indexedValue]//透過indexed 從 index 拿到所有的 Object key
returnset.List(),nil}
ListIndexFuncValues
透過 index name 從 indices 取得對應的 index,遞迴跑過 index 取得所有的 index key。 source-code
1
2
3
4
5
6
7
8
9
10
11
12
func(c*threadSafeMap)ListIndexFuncValues(indexNamestring)[]string{c.lock.RLock()//讀操作,用讀鎖
deferc.lock.RUnlock()//解鎖
index:=c.indices[indexName]//透過 index name 從 indices 拿對應的 index
names:=make([]string,0,len(index))//拿出 index 中所有的 indexed key
forkey:=rangeindex{names=append(names,key)}returnnames}
ByIndex
透過 index name 從 indexer 取得計算 indexed value 的 function ,以及使用 index name 從 indices 中拿到對應的 index 。
使用對應的 index 以及計算出來的 indexed value 取得對應的 object key set 。
遞迴object key set 把 object key set 對應到的 object 回傳出來。 source-code
// ByIndex returns a list of the items whose indexed values in the given index include the given indexed value
func(c*threadSafeMap)ByIndex(indexName,indexedValuestring)([]interface{},error){c.lock.RLock()//讀操作,用讀鎖
deferc.lock.RUnlock()//解鎖
indexFunc:=c.indexers[indexName]//透過index name從indexer 中取的 indexed function
ifindexFunc==nil{returnnil,fmt.Errorf("Index with name %s does not exist",indexName)}index:=c.indices[indexName]//透過index name從Indices 中取得 index
set:=index[indexedValue]//透過indexed name從index取的 Object key set
list:=make([]interface{},0,set.Len())//遞迴儲存的object key set 對應的object倒出
forkey:=rangeset{list=append(list,c.items[key])}returnlist,nil}
func(c*threadSafeMap)Resync()error{// Nothing to do
returnnil//沒幹啥
}
最後幾個 function 加油!
updateIndices
更新 indices ,如果有就的物件就需要刪除舊的 index
遞迴所有 indexer 拿到計算 indexed value 的 function ,透過 indexed function 計算出 indexed value。
使用 index name 從 indices 拿到 index。
如果 index 不存在建立新的index,並且建立 index name 與 indices 的連結
透過 indexed function 計算出 indexed value
遞迴所有 indexed value ,從 index 中到對應的 Object key set ,若是找不到 Set ,則建立一個 Object ket set 並且讓 indexed value 與 Object key set 建立連結。
// updateIndices modifies the objects location in the managed indexes, if this is an update, you must provide an oldObj
// updateIndices must be called from a function that already has a lock on the cache
//傳入舊的 object 為了確認有沒有這個 index (更新用)
//傳入新的 object
//私有方法只能從add 或是 update呼叫,add/update 有上鎖,保證thread safe
func(c*threadSafeMap)updateIndices(oldObjinterface{},newObjinterface{},keystring){// if we got an old object, we need to remove it before we add it again
//如果有代舊的object的話需要先刪除舊的index,下一個段落會說到如何刪除
ifoldObj!=nil{c.deleteFromIndices(oldObj,key)}//將所有的indexer跑過一次,拿到所有的index name以及indexed function
forname,indexFunc:=rangec.indexers{//透過indexed function 算出indexed
indexValues,err:=indexFunc(newObj)iferr!=nil{panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v",key,name,err))}//透過 index name 從 indices 拿到對應的 index
index:=c.indices[name]//確認 index 是否存在,不存在表示這個 indices 還沒有這個 index
//建立 index 並且對應到 indecies 上
ifindex==nil{index=Index{}c.indices[name]=index}for_,indexValue:=rangeindexValues{//透過 indexed 檢查 index 對應的 Object key set
set:=index[indexValue]//若是set 為空表示 index 還沒建立object key set
//建立 set 並且對應到 index 上
ifset==nil{set=sets.String{}index[indexValue]=set}//object set set 插入 object key
set.Insert(key)}}}
deleteFromIndices
當更新物件或是刪除物件的時候會被呼叫為了更新 Indices
遞迴 indexer 取出所有的 indexed value function ,並且從 indices 確認 index 是否存在
若是不存在,表示物件已經被清理且 indices 已更新
若是存在,需要進一步確認 indexed value function 算出來的所有 indexed value 在 index 中 Object key set 的情況
若是 Object key set 存在就刪除 Object 內對應的 Object key,另外 Obeset set 若是為空 需要回收 Object 減少記憶體使用。 source-code
// deleteFromIndices removes the object from each of the managed indexes
// it is intended to be called from a function that already has a lock on the cache
func(c*threadSafeMap)deleteFromIndices(objinterface{},keystring){//將indexer 中全部 indexed function 取出
forname,indexFunc:=rangec.indexers{//透過indexed function 算出 indexed
indexValues,err:=indexFunc(obj)iferr!=nil{panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v",key,name,err))}//透過index name 從indices拿到對應的index
index:=c.indices[name]//有可能 index name 在indeces中找不到(不太可能...)
ifindex==nil{continue}//將indexed 全部跑一次
for_,indexValue:=rangeindexValues{//從index拿到對應的object key set
set:=index[indexValue]//如果set 存在的話就刪除對應的 Object key
ifset!=nil{set.Delete(key)//刪除 object key 為零的 set ,為了避免佔用空間(kubernetes 上有相關的issuekubernetes/kubernetes/issues/84959)
iflen(set)==0{delete(index,indexValue)}}}}}
小結
我想先介紹到這邊把 threadSafeMap 的部分介紹完,讓大家先了解 cache 底層是怎麼做的,以及 indexer 、 indexed-name 、 Indices 、 index 以及 Object set 之間的關聯。
因為 cahce 是基於 threadSafeMap 往上蓋的,所以先建立最底層的概念是非常重要,下一篇我將會繼續介紹 cache 的部分。