// DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to
// requeue items after failures without ending up in a hot-loop.
// 原生的註解寫得非常棒了,大致上意思為一個物件處理失敗,如果很快地物件在被處理一次失敗的可能性還是很高會造成
// hot-loop,所以讓物件等待一下在排隊進入 queue 就是 delaying queue 的用意
typeDelayingInterfaceinterface{Interface//嵌入了common work queue的interface,delaying queue 也是common queue 的一種
AddAfter(iteminterface{},durationtime.Duration)//表示物件需要等待多久才能被放入 queue 中
}
// delayingType wraps an Interface and provides delayed re-enquing
typedelayingTypestruct{Interface//嵌入了一個common queue
// clock tracks time for delayed firing
clockclock.Clock//用來比對物件延遲時間
// stopCh lets us signal a shutdown to the waiting loop
stopChchanstruct{}//異步退出用
// stopOnce guarantees we only signal shutdown a single time
stopOncesync.Once//異步退出用,保證退出只會被呼叫一次
// heartbeat ensures we wait no more than maxWait before firing
heartbeatclock.Ticker//定時器,定時喚醒thread處理物件
// waitingForAddCh is a buffered channel that feeds waitingForAdd
waitingForAddChchan*waitFor//用以添加延遲物件的channel
// metrics counts the number of retries
metricsretryMetrics//用以紀錄重試的metric
}
剛剛上面有一個疑點那就 type waitFor 到底是什麼
我們先來看看 type waitFor 的結構 source code
1
2
3
4
5
6
7
8
// waitFor holds the data to add and the time it should be added
// 如果需要延遲的物件都會被轉換成這個類型
typewaitForstruct{datat// t 在common queue介紹過,為一個泛行表示什麼都接受的物件
readyAttime.Time//在什麼時間加入到queue中的
// index in the priority queue (heap)
indexint// index會用在後面的排序,延遲時間較小的排前面(用heap排序)
}
// waitForPriorityQueue implements a priority queue for waitFor items.
//
// waitForPriorityQueue implements heap.Interface. The item occurring next in
// time (i.e., the item with the smallest readyAt) is at the root (index 0).
// Peek returns this minimum item at index 0. Pop returns the minimum item after
// it has been removed from the queue and placed at index Len()-1 by
// container/heap. Push adds an item at index Len(), and container/heap
// percolates it into the correct location.
typewaitForPriorityQueue[]*waitFor//waitForPriorityQueue 這個類型實作了 heap interface ,排序的物件為 waitFor
//實作heap interface 的len方法,取出heap當前的長度。
func(pqwaitForPriorityQueue)Len()int{returnlen(pq)}//實作 heap interface 的 Less 方法,確認在 waitForPriorityQueue 的第 i 個元素是否比第 j 個元素小
// 若是第 i 個元素比第 j 個元素小就交換,因為我們希望,因為我們希望越小的排越前面。
func(pqwaitForPriorityQueue)Less(i,jint)bool{returnpq[i].readyAt.Before(pq[j].readyAt)// 比的是時間
}//實作 heap interface 的 swap ,實作 i j 交換
func(pqwaitForPriorityQueue)Swap(i,jint){pq[i],pq[j]=pq[j],pq[i]pq[i].index=ipq[j].index=j}//實作 heap interface 的 Push ,向 heap 添加物件
func(pq*waitForPriorityQueue)Push(xinterface{}){n:=len(*pq)item:=x.(*waitFor)item.index=n//新加入的物件會記錄當前自己的位置
*pq=append(*pq,item)//新加入的物件排到heap的最後面
}//實作 heap interface 的 Pop ,從 heap 的尾巴彈出最後一個物件。
func(pq*waitForPriorityQueue)Pop()interface{}{n:=len(*pq)item:=(*pq)[n-1]item.index=-1*pq=(*pq)[0:(n-1)]//縮小heap,移除最後一個物件
returnitem}//回傳heap第一個物件,延遲時間最短的那一個物件
func(pqwaitForPriorityQueue)Peek()interface{}{returnpq[0]}
// AddAfter adds the given item to the work queue after the given delay
func(q*delayingType)AddAfter(iteminterface{},durationtime.Duration){// don't add if we're already shutting down
ifq.ShuttingDown(){//如果queue關閉了就不能放入
return}q.metrics.retry()//metric不解釋
// immediately add things with no delay
ifduration<=0{//如果延遲時間小於等於0表示不用延遲
q.Add(item)//直接丟入common queue中
return}select{case<-q.stopCh://因為可能會組塞在 waitingForAddCh 透過 stop 保證退出?
// unblock if ShutDown() is called
// 要延遲的物件會封裝成 waitFor 型態並且方入 channel 等待處理
caseq.waitingForAddCh<-&waitFor{data:item,readyAt:q.clock.Now().Add(duration)}:}}
// waitingLoop runs until the workqueue is shutdown and keeps a check on the list of items to be added.
func(q*delayingType)waitingLoop(){deferutilruntime.HandleCrash()// Make a placeholder channel to use when there are no items in our list
never:=make(<-chantime.Time)//我不是很定他的用意...可以看到 nerver channel 又換一個別名表示
// Make a timer that expires when the item at the head of the waiting queue is ready
varnextReadyAtTimerclock.Timer//當 heap 吐出一個延遲物件時透過這個 timer 延遲
waitingForQueue:=&waitForPriorityQueue{}// heap 物件
heap.Init(waitingForQueue)// heap初始化
waitingEntryByData:=map[t]*waitFor{}//用來防止同一個物件重複放入,如果有重複的物件就更新延遲時間
for{//如果queue關閉就離開
ifq.Interface.ShuttingDown(){return}//標記現在時間
now:=q.clock.Now()// 如果在 heap 裡面有東西
forwaitingForQueue.Len()>0{//拿出第一個在 heap 的物件
entry:=waitingForQueue.Peek().(*waitFor)//如果現在時間還沒達到物件要等待的時間就退出
ifentry.readyAt.After(now){break}//如果現在時間達到物件要等到的時間,將物件從heap彈出
entry=heap.Pop(waitingForQueue).(*waitFor)//加到queue中
q.Add(entry.data)//刪除set儲存的物件
delete(waitingEntryByData,entry.data)}// Set up a wait for the first item's readyAt (if one exists)
nextReadyAt:=never//在上面有提到過nerver channel 只換成這個名字,不知道用意為何
// 如果在 heap 裡面有東西
ifwaitingForQueue.Len()>0{// 若是前一個物件的計時器有殘留物就清除前一個物件的計時器
ifnextReadyAtTimer!=nil{nextReadyAtTimer.Stop()}//拿出第一個在 heap 的物件
entry:=waitingForQueue.Peek().(*waitFor)//看物件延遲多久
nextReadyAtTimer=q.clock.NewTimer(entry.readyAt.Sub(now))//當物件延遲時間到了發通知
nextReadyAt=nextReadyAtTimer.C()}select{// queue關閉
case<-q.stopCh:return// 定時被心跳喚醒
case<-q.heartbeat.C():// continue the loop, which will add ready items
// 當收到物件延遲時間到了發通知
case<-nextReadyAt:// continue the loop, which will add ready items
//當有人放需要延遲的物件進queue中
casewaitEntry:=<-q.waitingForAddCh://如果新放入的物件還沒超過延遲時間
ifwaitEntry.readyAt.After(q.clock.Now()){//放入heap中
insert(waitingForQueue,waitingEntryByData,waitEntry)}else{//已經到了延遲時間直接放入queue
q.Add(waitEntry.data)}//一次取光用
drained:=falsefor!drained{select{// 一次把把延遲物件的channel取乾淨
casewaitEntry:=<-q.waitingForAddCh://如果新放入的物件還沒超過延遲時間
ifwaitEntry.readyAt.After(q.clock.Now()){//放入heap中
insert(waitingForQueue,waitingEntryByData,waitEntry)}else{//已經到了延遲時間直接放入queue
q.Add(waitEntry.data)}default:// 保證會退出這個取光的loop
drained=true}}}}}
// insert adds the entry to the priority queue, or updates the readyAt if it already exists in the queue
funcinsert(q*waitForPriorityQueue,knownEntriesmap[t]*waitFor,entry*waitFor){//先判斷加入的物件有沒有重複的
existing,exists:=knownEntries[entry.data]//若是有重複的話
ifexists{// 跟之前放入的物件比較哪個延遲時間比較短
// 若是現在要放入的物件比較短的話就更新 set 中的物件延遲時間
ifexisting.readyAt.After(entry.readyAt){existing.readyAt=entry.readyAtheap.Fix(q,existing.index)}return}//如果 set 沒有重複的話就直接加到 heap 中,以及使用 set 紀錄 heap 有這個物件。
heap.Push(q,entry)knownEntries[entry.data]=entry}
小結
本章講述了 kubernetes delaying work queue 的底層實作方法,接下來還會有幾篇介紹基於 common work queue 的 rate limiters work queue 以及 其他類型的 work queue ,從中我們可以了解 kubernetes controller 監聽到 etcd 變化的物件後如何把 變化的物件丟入 queue 中等待其他人取出並處理,相關業務邏輯,如果文中有錯希望大家不吝嗇提出,讓我們互相交流學習。