Kubernetes common work queue 設計真d不錯

 ·  ☕ 7 

首先本文所以 source code 基於 kubernetes 1.19 版本,所有 source code 的為了版面的整潔會精簡掉部分 log 相關的程式碼,僅保留核心邏輯,如果有見解錯誤的地方,還麻煩觀看本文的大大們提出,感謝!

kubernetes work queue

Kubernetes controller/operator 是一個非常精彩的設計模式,在了解Kubernetes controller/operator 怎麼撰寫之前,了解kubernetes work queue的實作模式是非常重要的,下面引用了How to Create a Kubernetes Custom Controller Using client-go的 controller 架構圖可以看到在 sharedindexinformer 內有引用到這個元件,這個元件實際被定義在 kubernetes 的 client-go library 中。


圖片來源:How to Create a Kubernetes Custom Controller Using client-go

Kubernetes 為什麼要實踐一個 work queue 呢?就我們所知 kubernetes 是用 go 撰寫應該可以使用 channel 的機制直接將物件送給要用的元件(thread)啊,原因其實非常簡單,go channel 的設計功能非常單一無法滿足 kubernetes 所要的場景,例如帶有延遲時間物件 queue 就需要根據延遲時間排序 ,例如限制物件取出速度的 queue 。

剛剛提到了兩種 queue ,kubernetes 實作上稱為 rate limiters queue 以及 delaying queue ,此外還有一種通用的 common queue ,本篇文章會先從 common queue 開始探討。

common queue

kubernetes source code 設計得非常精美,我們可以先從 interface 定義了哪些方法來推敲實作這個 interface 的物件可能有什麼功能。

interface

source code

1
2
3
4
5
6
7
8
type Interface interface {
	Add(item interface{})     //向 queue 送出 interface 類型的物件
	Len() int                 //拿到目前儲存在 queue 中的物件的數量
	Get() (item interface{}, shutdown bool) //拿到 queue 中第 0 個 item,另外 queue 是否已經關閉
	Done(item interface{})    //告知 queue 某個 item 已經處理完成
	ShutDown()                //關閉 queue
	ShuttingDown() bool       //查詢 queue 是否關閉
}

看完了抽象的定義之後,必須要回過來看 common queue 實際物件定義了哪些屬性

struct

source code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Type is a work queue (see the package comment).
type Type struct {
	// queue defines the order in which we will work on items. Every
	// element of queue should be in the dirty set and not in the
	// processing set.
	queue []t    // queue 的 item 用 slice 儲存,item 的類型為 t 等等會看到 t 是什麼

	// dirty defines all of the items that need to be processed.
	dirty set    // 用來判斷我們哪些 item 還沒處理,是一個 set 的集合

	// Things that are currently being processed are in the processing set.
	// These things may be simultaneously in the dirty set. When we finish
	// processing something and remove it from this set, we'll check if
	// it's in the dirty set, and if so, add it to the queue.
	processing set    // 儲存我們現在哪些 item 正在處理,是一個 set 的集合。

	cond *sync.Cond    //同步鎖,用來通知其他thread可以取item

	shuttingDown bool    //標記 queue是否關閉

	metrics queueMetrics    // metric不解釋

	unfinishedWorkUpdatePeriod time.Duration // 給 metric 用確認 queue 是否還活著
	clock                      clock.Clock    // 給 metric 用確認 queue 是否還活
}

剛剛上面有一個疑點那就 type t 到底是什麼,以及 type set 到底是什麼。
我們先來看看 type t 的結構
source code

1
type t interface{}    //其實也沒什麼好說的,就是一個泛形表示什麼類型的物件我都能吃

接下來看看 type set 的結構
source code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19

type empty struct{}
type set map[t]empty    // type set 就是一個 map ,key 是一個泛型物件 值就是空結構

// set 資料結構的,判斷有沒有 t 類型的物件
func (s set) has(item t) bool {    
	_, exists := s[item]
	return exists
}

// set 資料結構的,插入 t 類型的物件
func (s set) insert(item t) {
	s[item] = empty{}
}

// set 資料結構的,刪除 t 類型的物件
func (s set) delete(item t) {
	delete(s, item)
}

看完了資料結構我們接著來看 common work queue 實作的方法,與初始化方法。(看到common work queue的型態為Type讓我一直搞混….)

new function

source code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
//透過 NewQueue function 建立一個common work queue。
//其中需要帶入 clock , metric , updatePeriod 參數
func newQueue(c clock.Clock, metrics queueMetrics, updatePeriod time.Duration) *Type {
	t := &Type{
		clock:                      c,
		dirty:                      set{},
		processing:                 set{},
		cond:                       sync.NewCond(&sync.Mutex{}),
        //metric物件
		metrics:                    metrics,
        //定期檢測時間
		unfinishedWorkUpdatePeriod: updatePeriod,
	}
    // 啟動一個 thread 檢測 queue 是否關閉,並且定期回報 metric 
	go t.updateUnfinishedWorkLoop()
	return t
}

const defaultUnfinishedWorkUpdatePeriod = 500 * time.Millisecond

// 不同的封裝方式,不提 (設定 metric 的 name 為空)
func New() *Type {
	return NewNamed("")
}

// 不同的封裝方式,提供 name (設定 metric 的 name )
func NewNamed(name string) *Type {
	rc := clock.RealClock{}
	return newQueue(
		rc,
		globalMetricsFactory.newQueueMetrics(name, rc),
		defaultUnfinishedWorkUpdatePeriod,
	)
}

//
func (q *Type) updateUnfinishedWorkLoop() {
    //設置了一個定時器
	t := q.clock.NewTicker(q.unfinishedWorkUpdatePeriod)
    //在function結束的時候停止計時器
	defer t.Stop()
    //當受到計時器訊號時
	for range t.C() {
		if !func() bool {
            // Lock 鎖
			q.cond.L.Lock()
            // 當離開時解鎖
			defer q.cond.L.Unlock()
            // 判斷 queue 是否關閉(沒有關閉)
			if !q.shuttingDown {
                //告訴 metric work queue 還沒關閉
				q.metrics.updateUnfinishedWork()
                // 回傳繼續等在 計時器訊號
				return true
			}
            // queue 關閉了 跳出整個 updateUnfinishedWorkLoop() function
			return false

		}() {
            // queue 關閉了 跳出整個 updateUnfinishedWorkLoop() function
			return
		}
	}
}

implement function

看完了初始化common work queue function 後接下來看看核心的功能。
source code

Add

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// Add marks item as needing processing.
func (q *Type) Add(item interface{}) {
    // lock鎖
	q.cond.L.Lock()
    // 離開時解鎖
	defer q.cond.L.Unlock()
    // 如果queue 關閉就離開
	if q.shuttingDown {
		return
	}
    // 如果 dirty set 裡面已經有了表示物件已經儲存過但還沒被處理那就離開
	if q.dirty.has(item) {
		return
	}
    // metric 不解釋
	q.metrics.add(item)
    // 放入 dirty set ,表示物件等待處理
	q.dirty.insert(item)
    // 物件如果在 processing set 就退出
	if q.processing.has(item) {
		return
	}
    //加入queue
	q.queue = append(q.queue, item)
    //告知其他thread可以來取物件了
	q.cond.Signal()
}

這邊看似很簡單實際上有點複雜,有幾種狀態需要特別用圖片來解釋。

  1. 如果 queue 關閉了就不能放入物件
  2. 如果 dirty set 內有這個物件,表示誰都還沒處理過物件,那這個物件就不能放到 dirty 裡面裡面
    • 換句話說 diry set 表示物件放進去 queue 過但還沒有被處理
  3. 如果 processing set 內有這個物件表示,有人正在處理這個物件,新進來的就先丟到 dirty set 裡面吧

其中 2 , 3 較難理解,我透過圖片來幫助我們了解狀況。

基本上 2 , 3 的邏輯如圖所示,比較有問題的應該是在 processing set 有東西的時候 ,還要放入 item 為什麼不會放到 queue 裡面去呢?這會在後面的分析中看到!

Get

source code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// Get blocks until it can return an item to be processed. If shutdown = true,
// the caller should end their goroutine. You must call Done with item when you
// have finished processing it.
func (q *Type) Get() (item interface{}, shutdown bool) {
    // lock 鎖
	q.cond.L.Lock()
    // 離開function 還回lock
	defer q.cond.L.Unlock()
    // 如果 queue 長度=0代表 queue 中沒東西,而且 queue 狀態為啟動的狀態,這裡會開始睡覺等別人通知說可以來取貨再醒來
	for len(q.queue) == 0 && !q.shuttingDown {
		q.cond.Wait()
	}
    //如果醒來後發現 queue 中長度為0而且 queue 狀態為關閉的那就退出告訴 caller 說 queue 已經關閉了
	if len(q.queue) == 0 {
		// We must be shutting down.
		return nil, true
	}
    
    //取出 queue 列隊中第一個元素,並且讓後面的元素往前推進一個
	item, q.queue = q.queue[0], q.queue[1:]
    
    //metric不解釋
	q.metrics.get(item)

    // 把拿出來item丟入processing set 裡面,表示正在處理。
	q.processing.insert(item)
    // dirty set 刪除 item 表示有人已經把 item 拿走,准許再把相同的 item 放進入
	q.dirty.delete(item)
    // 回傳 queue 列隊中第一個元素,並且告知使用者 queue 有沒有關閉
	return item, false
}

這裏 Get 的邏輯十分簡單,使用者透過 Get 就能拿到 queue 裡面第一筆資料,若是 queue 中沒有資料會 pedding 等到有人通知來拿資料才會醒來 ,此外若是 Get function 告訴使用者說 queue 已經關閉了,那麼 使用者應該把處理這段的 thread 關掉。

Done

這邊是告訴 common work queue 說我們已經做完了,可以把 processing set 所標記的資料刪除囉,那我們就深入 source code 來看看 kubernetes 是怎麼處理的吧!

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// Done marks item as done processing, and if it has been marked as dirty again
// while it was being processed, it will be re-added to the queue for
// re-processing.
func (q *Type) Done(item interface{}) {

	// lock 鎖
	q.cond.L.Lock()
    
    // 離開function 還回lock
	defer q.cond.L.Unlock()

    //metric不進行說明
	q.metrics.done(item)

    //從processing set 刪除,表示我們已經做完了!
	q.processing.delete(item)
    //如果 dirty set 裡面還有的話我們就把他加入 queue 中讓他回去排隊等待處理
	if q.dirty.has(item) {
		q.queue = append(q.queue, item)
        // 告訴其他thread該醒了,來拿新的item囉!
		q.cond.Signal()
	}
}

流程大致上如下圖所示

Len

主要回傳給使用者,現在 queue 長度為多少。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// Len returns the current queue length, for informational purposes only. You
// shouldn't e.g. gate a call to Add() or Get() on Len() being a particular
// value, that can't be synchronized properly.
func (q *Type) Len() int {
    // lock 鎖
	q.cond.L.Lock()
    
    // 離開function 還回lock
	defer q.cond.L.Unlock()
    
    //回傳目前 queue 的長度
	return len(q.queue)
}

ShutDown

關閉 queue , queue 不再接受 add() ,使用者只能從 queue 中取值直到取完為止。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// ShutDown will cause q to ignore all new items added to it. As soon as the
// worker goroutines have drained the existing items in the queue, they will be
// instructed to exit.
func (q *Type) ShutDown() {
	// lock 鎖
	q.cond.L.Lock()
    
    // 離開function 還回lock
	defer q.cond.L.Unlock()
    
    //設定queue為關閉狀態
	q.shuttingDown = true
    
    //告知所有要取 queue 的人說,我要關了,快來取值。
	q.cond.Broadcast()
}

ShuttingDown

查詢當前 queue 是否關閉

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
func (q *Type) ShuttingDown() bool {
    // lock 鎖
	q.cond.L.Lock()
    
    // 離開function 還回lock
	defer q.cond.L.Unlock()

    // 回傳queue是否關閉
	return q.shuttingDown
}

小結

本章講述了 kubernetes common work queue 的底層實作方法,接下來還會有兩篇介紹基於 common work queue的 rate limiters queue 以及 delaying queue ,從中我們可以了解 kubernetes controller 監聽到 etcd 變化的物件後如何把 變化的物件丟入 queue 中等待其他人取出並處理,相關業務邏輯,如果文中有錯希望大家不吝嗇提出,讓我們互相交流學習。


Meng Ze Li
Meng Ze Li
Kubernetes / DevOps / Backend