Kubernetes error handle 好吃驚

 ·  ☕ 6 

首先本文所有的 source code 基於 kubernetes 1.19 版本,所有 source code 為了版面的整潔會精簡掉部分 log 相關的程式碼,僅保留核心邏輯,如果有見解錯誤的地方,還麻煩觀看本文的大大們提出,感謝!

換換口味今天我們來分析一下 kubernetes 如何處理錯誤, golang 的錯誤處理總是令人詬病,看看開源的大專案面對這種語言本身的缺陷(算嗎xD?)是怎麼處理的,從中學習到一些妙招~

本篇只涵蓋到 kubernetes 錯誤處理的一部分,還有許多錯誤處理的方法等待我們去挖掘~未來會持續解析相關議題!

errors

kubernetes 定義了一個 interface ,來描述錯誤訊息要如何被封裝,先來看看這個 interface 是如何定義的。

interface

source code

1
2
3
4
5
6
// 聚合 error
type Aggregate interface {
	error                    //繼承了golang builtin package 的 error interface
	Errors() []error         //列出聚合所有的 error
	Is(error) bool           //檢查聚合的內容有沒有包含指定的 error
}

看完了 interface 後可以來看看哪個 struct 實作了這個 interface

struct

聚合的意思就是把一群東西放在一起,error 的聚合就是把 error 放在一起很自然地就會想到 slice ~

沒錯!kubernetes也是這麼做的,透過 error slice 把 error 聚合再一起。
source code

1
type aggregate []error

看完了資料結構後我們來看看要怎麼把這個物件建立起來

New function

傳入一組 error slice 並且轉換成 Aggregate 物件回傳出去
source code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
func NewAggregate(errlist []error) Aggregate {
    // error 是空的表示沒有錯誤要處理
	if len(errlist) == 0 {
		return nil
	}
	// In case of input error list contains nil
	var errs []error
    // 把 error list 的東西全部倒出來,檢查有沒有偷藏 nil
    // 沒有 nil 的加入到 errs 的 slice 中
	for _, e := range errlist {
		if e != nil {
			errs = append(errs, e)
		}
	}
    //檢查轉換完的 errs 是不是空的
	if len(errs) == 0 {
		return nil
	}
    //轉換物件型態~轉成有實作 Aggregate interface 的物件
	return aggregate(errs)
}

impliment

看完了 aggregate 的資料結構以及如何建立一個實作 Aggregate interface 的物件後,我們來接著看實作得部分。

Error

Error 的部分是實作 golang builtin package 的 error
source code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
// Error is part of the error interface.
func (agg aggregate) Error() string {
    //確認一下 error slice 長度,在 new function 的時候就確認過理論上不會出錯
	if len(agg) == 0 {
		// This should never happen, really.
		return ""
	}
    //如果錯誤只有一個的話就回傳那一個
	if len(agg) == 1 {
		return agg[0].Error()
	}
    // new 一個errs set 
	seenerrs := sets.NewString()
	result := ""
    //需要搭配下面一點的 visit function 來看~
    //visit 代表訪問每一個 item
	agg.visit(func(err error) bool {
        // 印出錯誤
		msg := err.Error()
        // 如果errs set  有包含這個錯誤的話就回傳 false(表示錯誤重複了)
		if seenerrs.Has(msg) {
			return false
		}
        //把錯誤加入 error set
		seenerrs.Insert(msg)
        //如果錯誤大於一個需要用,分割錯誤
		if len(seenerrs) > 1 {
			result += ", "
		}
		result += msg
		return false
	})
    //只有一個錯誤,直接回傳錯誤不需要加[]
	if len(seenerrs) == 1 {
		return result
	}
    //錯誤是一個陣列,需要加[]
	return "[" + result + "]"
}

//訪問每一個錯誤透過傳入的 function 來處理每個錯誤
func (agg aggregate) visit(f func(err error) bool) bool {
    // 遞迴全部的錯誤
	for _, err := range agg {
        //判斷錯誤類型
		switch err := err.(type) {
            //如錯錯誤是一個 aggregate 的物件的話
		case aggregate:
                    
                    //表示這個錯誤裡面包了其他錯誤,需要再展開遞迴的處理
			if match := err.visit(f); match {
				return match
			}
            //如錯錯誤是一個實作 aggregate interface 的物件的話
		case Aggregate:
                    //等等會看到 Errors ,簡單來說就是把錯誤展開成一個slice
                    //檢查每一個錯誤
			for _, nestedErr := range err.Errors() {
				if match := f(nestedErr); match {
					return match
				}
			}
            //如果是一般錯誤的話
		default:
                //檢查個錯誤
			if match := f(err); match {
				return match
			}
		}
	}

	return false
}

Is

透過golang errors packages 的 is 來輔助判斷 aggregate slice 內有無相同的error
source code

1
2
3
4
5
6
7
8
func (agg aggregate) Is(target error) bool {
    //需要搭配剛剛提到的visit function 來看~
    //visit 代表訪問每一個 item
	return agg.visit(func(err error) bool {
                //透過errors packages 來輔助判斷 aggregate slice 內有無相同的error
		return errors.Is(err, target)
	})
}

Errors

顯示所有error,簡單來說就是把 aggregate 轉換成 []error
source code

1
2
3
4
// Errors is part of the Aggregate interface.
func (agg aggregate) Errors() []error {
	return []error(agg)
}

以上透過 aggregate 簡單的封裝了 error,讓 golang 的錯誤處理有較好的處理方式。

我認為 kubernetes 處理 error 的靖華在於下面這幾個 function

Flatten

比如有一個巢狀的 aggregate 可以用 Flatten 把 aggregate 攤平。

1
2
3
4
5
6
7
8
aggregate{
    aggregate{
        fmt.Errorf("abc"),
            aggregate{
                fmt.Errorf("def")
            }
    }
}

我們想把上面這個結構攤平,就可以透過 Flatten 來幫忙輸出的結果會變成

1
2
3
aggregate{
    fmt.Errorf("abc"), fmt.Errorf("def")
}

我們來看看他怎麼實作的
source code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// Flatten takes an Aggregate, which may hold other Aggregates in arbitrary
// nesting, and flattens them all into a single Aggregate, recursively.
func Flatten(agg Aggregate) Aggregate {
    //建立一個 error slice
	result := []error{}
    //判斷 Aggregate 存不存在,不存在可以直接回傳
	if agg == nil {
		return nil
	}
    //把 Aggregate 展開成 errors slice  並且遞迴所有error
	for _, err := range agg.Errors() {
            //如果判斷到 error 是 Aggregate 就要繼續遞迴展開
		if a, ok := err.(Aggregate); ok {
                    //遞迴展開的error結果加入到result內
			r := Flatten(a)
			if r != nil {
				result = append(result, r.Errors()...)
			}
		} else {
                    //error結果加入到result內
			if err != nil {
				result = append(result, err)
			}
		}
	}
    //回傳一個 Aggregate error slice
	return NewAggregate(result)
}

CreateAggregateFromMessageCountMap

有些情況我們會紀錄錯誤訊息發生的次數,我們可以透過 CreateAggregateFromMessageCountMap 幫助我們把這些訊息轉成 Aggregate error slice 。
source code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// MessageCountMap contains occurrence for each error message.
// 計數錯誤訊息出現次數
type MessageCountMap map[string]int

// CreateAggregateFromMessageCountMap converts MessageCountMap Aggregate
// 輸入計數訊息錯誤次數的 map 轉換成 Aggregate 輸出
func CreateAggregateFromMessageCountMap(m MessageCountMap) Aggregate {
    //簡單的判斷一下輸入,沒有輸入就不做事
	if m == nil {
		return nil
	}
    //建立一個error slice 長度為 MessageCountMap 的大小
	result := make([]error, 0, len(m))
    //遞迴 MessageCountMap
	for errStr, count := range m {
		var countStr string
        //錯誤出現超過一次的才放入 error slice 中,並且在錯誤訊息中標示 錯誤訊息與錯誤次數
		if count > 1 {
			countStr = fmt.Sprintf(" (repeated %v times)", count)
		}
		result = append(result, fmt.Errorf("%v%v", errStr, countStr))
	}
    //回傳一個 Aggregate error slice
	return NewAggregate(result)
}

FilterOut

把錯誤進行過濾~有通過 Matcher function 才能輸出
source code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
type Matcher func(error) bool

func FilterOut(err error, fns ...Matcher) error {
    //沒有輸入就不需要做事直接退出
	if err == nil {
		return nil
	}
    //如果 error 有實作 Aggregate 
	if agg, ok := err.(Aggregate); ok {
            //把 error 展開成 error slice 
            //再把 Matcher 與 error slice 交給 filterErrors 處理(下面有解釋~)
		return NewAggregate(filterErrors(agg.Errors(), fns...))
	}
    // 判斷error 有沒有通過 matcher function
	if !matchesError(err, fns...) {
		return err
	}
	return nil
}

// matchesError returns true if any Matcher returns true
//透過 matcher function 檢查輸入的error
func matchesError(err error, fns ...Matcher) bool {
    //遞迴全部的 Matcher function
	for _, fn := range fns {
            //檢查所有error
		if fn(err) {
			return true
		}
	}
	return false
}

//檢查errors slice 
func filterErrors(list []error, fns ...Matcher) []error {
    //建立result error slice 
	result := []error{}
    //遞迴輸入的 error slice 
	for _, err := range list {
            //這裡會回到 FilterOut 確認 err 是不是有實作 Aggregate 
            //如果有實作 Aggregate 還需要繼續的迴展開
		r := FilterOut(err, fns...)
            //遞迴展開後結果不是空數就可以合併到 result error slice 
		if r != nil {
			result = append(result, r)
		}
	}
	return result
}

AggregateGoroutines

假設有很多 function 會併發執行且我們需要收集她的 error 做聚合組裡的時候這個 AggregateGoroutines function 就很好用~
source code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// AggregateGoroutines runs the provided functions in parallel, stuffing all
// non-nil errors into the returned Aggregate.
// Returns nil if all the functions complete successfully.
func AggregateGoroutines(funcs ...func() error) Aggregate {
    //建立有 n 個 buffer 的 channel  
	errChan := make(chan error, len(funcs))
    
    //遞迴執行function
	for _, f := range funcs {
        //透過 goroutine 在背景執行,把執行的結果丟在 buffer channel
		go func(f func() error) {
			errChan <- f()
		}(f)
	}
    // 建立 error slice 
	errs := make([]error, 0)
    //接收全部的 channel 結果
	for i := 0; i < cap(errChan); i++ {
            //如果 error 不是 nil 就加入到 error slice 內
		if err := <-errChan; err != nil {
			errs = append(errs, err)
		}
	}
    // 將error 轉換成 Aggregate 
	return NewAggregate(errs)
}

Reduce

我不確定 Reduce function 的用途….看不是很懂…
source code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// Reduce will return err or, if err is an Aggregate and only has one item, the first item in the aggregate.
//輸入一個error 
func Reduce(err error) error {
    //如過error 有實作 Aggregate 的話 ,要接著判斷 errors slice 的長度
	if agg, ok := err.(Aggregate); ok && err != nil {
		switch len(agg.Errors()) {
            //如果長度是1回傳第一個錯誤
		case 1:
			return agg.Errors()[0]
            //不然就是nil
		case 0:
			return nil
		}
	}
	return err
}

小結

kubernetes 設計了很多很棒的架構,我們可以以 kubernetes 為借鏡設計模式出適合公司的框架,從中還可以學到不少神奇有趣的方法。
感恩開源讚嘆開源,讓我學習到更多!文中如果有見解錯誤的或是寫的不正確的還希望觀看此文的大大們可以不吝嗇地提出讓我修正與學習!感謝~


Meng Ze Li
Meng Ze Li
Kubernetes / DevOps / Backend