1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
|
// ListAndWatch first lists all items and get the resource version at the moment of call,
// and then use the resource version to watch.
// It returns error if ListAndWatch didn't even try to initialize watch.
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
var resourceVersion string
//簡單來說就是一開始把 lsit 的過濾條件 resource version 設定成 0
options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}
if err := func() error {
// 承裝 lister watcher 列出的物件
var list runtime.Object
// 確定 list 結果是否有分頁
var paginatedResult bool
var err error
//監聽 list 事件是否完成
listCh := make(chan struct{}, 1)
//監聽 list 事件是否有 error
panicCh := make(chan interface{}, 1)
//啟動 thread 執行 list 動作
go func() {
//捕捉錯誤 ,發給監聽錯誤的 channel
defer func() {
if r := recover(); r != nil {
panicCh <- r
}
}()
//主要是建立一個 ListWatcher 的分頁處理器
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
return r.listerWatcher.List(opts)
}))
//設定pager相關的參數
switch {
//設定chunk size
case r.WatchListPageSize != 0:
pager.PageSize = r.WatchListPageSize
//若是 list 的結果有分頁的話
case r.paginatedResult:
// 當有同時設定ResourceVersion且ResourceVersion!=0的時候
case options.ResourceVersion != "" && options.ResourceVersion != "0":
//不啟用分頁
pager.PageSize = 0
}
//透過 pager.List 檢索 (list) 出指定的資源,並透過 options 過濾<過程很複雜...有機會再來看>
//我們會得到 list 結果型態是 runtime.Object
//並且拿到回傳的資料是否有做分頁以及相關錯誤
list, paginatedResult, err = pager.List(context.Background(), options)
//處理一些已知的錯誤 例如 StatusReasonExpired , StatusReasonGone 等等,緣由可以看一下原 source code 的註解(歷史因素)
if isExpiredError(err) || isTooLargeResourceVersionError(err) {
//標記有出現過 StatusReasonExpired , StatusReasonGone 等等的錯誤
r.setIsLastSyncResourceVersionUnavailable(true)
//簡單來說就是退回第零版再重新list一次
list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
}
//表示資源檢索(list)完成,透過 channel 發送訊號
close(listCh)
}()
//阻塞操作,等 list 完成或是觸發 panic error ,或者接收到 stop channel 的訊號終止
select {
case <-stopCh:
return nil
case r := <-panicCh:
panic(r)
case <-listCh:
}
if err != nil {
return fmt.Errorf("failed to list %v: %v", r.expectedTypeName, err)
}
//如果 resource 為 0 並且 list 結果的 paginatedResult 也表示資料有分頁
//就要標記 Reflector 的結果是有分頁的
if options.ResourceVersion == "0" && paginatedResult {
r.paginatedResult = true
}
//標記 list 成功
r.setIsLastSyncResourceVersionUnavailable(false) // list was successful
//把 list 出的結果轉換成實作 List 的物件(這邊很繞牽扯到apimachinery),先了解意思就好
listMetaInterface, err := meta.ListAccessor(list)
if err != nil {
return fmt.Errorf("unable to understand list result %#v: %v", list, err)
}
//取得 list 資料內 metadata 的 resourceVersion ,得知當前版本
resourceVersion = listMetaInterface.GetResourceVersion()
//把檢索出來的物件取出 items 的欄位會得到[]runtime.Object,例如裡面就是存 [podA{},podB{},e.t.c]
items, err := meta.ExtractList(list)
if err != nil {
return fmt.Errorf("unable to understand list result %#v (%v)", list, err)
}
//同步到DeltaFIFO內,下面會看到如何同步的不急
if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("unable to sync list result: %v", err)
}
//設定從etcd同步過來的最新的版本
r.setLastSyncResourceVersion(resourceVersion)
return nil
}(); err != nil {
return err
}
//////////////////以上為 lister watcher lister 的過程,也就是說 reflector 會先完成 list 的工作!
//處理 delta fifo queue 同步錯誤用的channel,非阻塞
resyncerrc := make(chan error, 1)
//watcher 處理結束用的channel
cancelCh := make(chan struct{})
defer close(cancelCh)
go func() {
//建立同步用的channel,時間到會從 channel 發出訊號
resyncCh, cleanup := r.resyncChan()
defer func() {
cleanup() //關閉同步用的channel
}()
for {
//等待同步訊號,stop channel 或是 cancel channel 都是用來監聽關閉的訊號
// resyncCh 則是會被定時觸發
select {
case <-resyncCh:
case <-stopCh:
return
case <-cancelCh:
return
}
// ShouldResync 是一個 function
//用來用來確定 Delta FIFO Queue 是否已經同步
if r.ShouldResync == nil || r.ShouldResync() {
//執行 Delta FIFO Queue 的 resync
//不了解的小夥伴可以到之前的章節複習
if err := r.store.Resync(); err != nil {
//若是 Delta FIFO Queue 的 resync 執行 就丟到外面 channel 這裡不處理
resyncerrc <- err
return
}
}
//關閉同步用的channel
cleanup()
//綁定新的同步用的channel
resyncCh, cleanup = r.resyncChan()
}
}()
////////////////以上這一小段是定時確認 Delta FIFO QUEUE 同步過程是否有問題
for {
// give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
//stop channel 收到訊號表示外部要關閉 reflactor 直接退出
select {
case <-stopCh:
return nil
// 讓 for 迴圈不會卡住
default:
}
//watch timeout 時間 minWatchTimeout 為 300 秒
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
//設定 watch 過濾條件
options = metav1.ListOptions{
ResourceVersion: resourceVersion, //watch 某一個版本以上的 resource
TimeoutSeconds: &timeoutSeconds, //設定watch timeout 時間
AllowWatchBookmarks: true, //用以降低 api server 附載用的...有空再展開來看為什麼可以降低附載
}
// start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
start := r.clock.Now()
//透過 watcher 監控指定的資源,並且透過指定過濾條件進行過濾
w, err := r.listerWatcher.Watch(options)
if err != nil {
//簡單來說當遇到ConnectionRefused時候會透過initConnBackoffManager,來停等一下
//停等之後再重新 watch 試試看
if utilnet.IsConnectionRefused(err) {
<-r.initConnBackoffManager.Backoff().C()
continue
}
return err
}
//處理 watcher 監控到的資源,下面會看到實作的方法
if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
//判斷一下錯誤...但沒有特別處理,
if err != errorStopRequested {
switch {
case isExpiredError(err):
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
default:
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
}
}
return nil
}
}
}
// 回傳一個定時器的 channel ,以及關閉定時器的方法
func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
if r.resyncPeriod == 0 {
return neverExitWatch, func() bool { return false }
}
t := r.clock.NewTimer(r.resyncPeriod)
return t.C(), t.Stop
}
// 透過給定的物件與版本號替換掉 delta fifo queue的資料
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
//複製一份 list 列出來的所有物件 到 found
found := make([]interface{}, 0, len(items))
for _, item := range items {
found = append(found, item)
}
// delta fifo queue 進行替換
return r.store.Replace(found, resourceVersion)
}
|