// Type is a work queue (see the package comment).
typeTypestruct{// queue defines the order in which we will work on items. Every
// element of queue should be in the dirty set and not in the
// processing set.
queue[]t// queue 的 item 用 slice 儲存,item 的類型為 t 等等會看到 t 是什麼
// dirty defines all of the items that need to be processed.
dirtyset// 用來判斷我們哪些 item 還沒處理,是一個 set 的集合
// Things that are currently being processed are in the processing set.
// These things may be simultaneously in the dirty set. When we finish
// processing something and remove it from this set, we'll check if
// it's in the dirty set, and if so, add it to the queue.
processingset// 儲存我們現在哪些 item 正在處理,是一個 set 的集合。
cond*sync.Cond//同步鎖,用來通知其他thread可以取item
shuttingDownbool//標記 queue是否關閉
metricsqueueMetrics// metric不解釋
unfinishedWorkUpdatePeriodtime.Duration// 給 metric 用確認 queue 是否還活著
clockclock.Clock// 給 metric 用確認 queue 是否還活
}
剛剛上面有一個疑點那就 type t 到底是什麼,以及 type set 到底是什麼。
我們先來看看 type t 的結構 source code
typeemptystruct{}typesetmap[t]empty// type set 就是一個 map ,key 是一個泛型物件 值就是空結構
// set 資料結構的,判斷有沒有 t 類型的物件
func(sset)has(itemt)bool{_,exists:=s[item]returnexists}// set 資料結構的,插入 t 類型的物件
func(sset)insert(itemt){s[item]=empty{}}// set 資料結構的,刪除 t 類型的物件
func(sset)delete(itemt){delete(s,item)}
看完了資料結構我們接著來看 common work queue 實作的方法,與初始化方法。(看到common work queue的型態為Type讓我一直搞混….)
// Get blocks until it can return an item to be processed. If shutdown = true,
// the caller should end their goroutine. You must call Done with item when you
// have finished processing it.
func(q*Type)Get()(iteminterface{},shutdownbool){// lock 鎖
q.cond.L.Lock()// 離開function 還回lock
deferq.cond.L.Unlock()// 如果 queue 長度=0代表 queue 中沒東西,而且 queue 狀態為啟動的狀態,這裡會開始睡覺等別人通知說可以來取貨再醒來
forlen(q.queue)==0&&!q.shuttingDown{q.cond.Wait()}//如果醒來後發現 queue 中長度為0而且 queue 狀態為關閉的那就退出告訴 caller 說 queue 已經關閉了
iflen(q.queue)==0{// We must be shutting down.
returnnil,true}//取出 queue 列隊中第一個元素,並且讓後面的元素往前推進一個
item,q.queue=q.queue[0],q.queue[1:]//metric不解釋
q.metrics.get(item)// 把拿出來item丟入processing set 裡面,表示正在處理。
q.processing.insert(item)// dirty set 刪除 item 表示有人已經把 item 拿走,准許再把相同的 item 放進入
q.dirty.delete(item)// 回傳 queue 列隊中第一個元素,並且告知使用者 queue 有沒有關閉
returnitem,false}
這裏 Get 的邏輯十分簡單,使用者透過 Get 就能拿到 queue 裡面第一筆資料,若是 queue 中沒有資料會 pedding 等到有人通知來拿資料才會醒來 ,此外若是 Get function 告訴使用者說 queue 已經關閉了,那麼 使用者應該把處理這段的 thread 關掉。
Done
這邊是告訴 common work queue 說我們已經做完了,可以把 processing set 所標記的資料刪除囉,那我們就深入 source code 來看看 kubernetes 是怎麼處理的吧!
// Done marks item as done processing, and if it has been marked as dirty again
// while it was being processed, it will be re-added to the queue for
// re-processing.
func(q*Type)Done(iteminterface{}){// lock 鎖
q.cond.L.Lock()// 離開function 還回lock
deferq.cond.L.Unlock()//metric不進行說明
q.metrics.done(item)//從processing set 刪除,表示我們已經做完了!
q.processing.delete(item)//如果 dirty set 裡面還有的話我們就把他加入 queue 中讓他回去排隊等待處理
ifq.dirty.has(item){q.queue=append(q.queue,item)// 告訴其他thread該醒了,來拿新的item囉!
q.cond.Signal()}}
流程大致上如下圖所示
Len
主要回傳給使用者,現在 queue 長度為多少。
1
2
3
4
5
6
7
8
9
10
11
12
13
// Len returns the current queue length, for informational purposes only. You
// shouldn't e.g. gate a call to Add() or Get() on Len() being a particular
// value, that can't be synchronized properly.
func(q*Type)Len()int{// lock 鎖
q.cond.L.Lock()// 離開function 還回lock
deferq.cond.L.Unlock()//回傳目前 queue 的長度
returnlen(q.queue)}
// ShutDown will cause q to ignore all new items added to it. As soon as the
// worker goroutines have drained the existing items in the queue, they will be
// instructed to exit.
func(q*Type)ShutDown(){// lock 鎖
q.cond.L.Lock()// 離開function 還回lock
deferq.cond.L.Unlock()//設定queue為關閉狀態
q.shuttingDown=true//告知所有要取 queue 的人說,我要關了,快來取值。
q.cond.Broadcast()}