我們會在 consumer 設定 max.poll.interval.ms ,例如這個數值設定為 300 表示300 毫秒要去向 kafka brcoker 拉取資料,若是 consumer 消費訊息花了太多時間就會噴出 CommitFailedException 這個錯誤訊息。
社群解決方案
- 增加拉取時間
比如消費 100 筆資料我們要花費 400ms 的時間,那麼設定max.poll.interval.ms為300就不符合當前場景的需求。
顯示的增加這個數值很直接的能解這個錯誤的產生,相對的 TPS 就會下降。(花的時間一樣,我們只是跟Kafka說我們拉資料的間隔時間拉長而已,有種掩耳盜鈴的感覺)
- 減少 poll 消息數量
max.poll.records 是設定 cunsumer 向 kafka brocker 一次拉下來的量是多少,假設是1000筆資料,那我們在 max.poll.interval.ms 300 ms 單位時間內最多只能處理 800 筆資料,那這個數值設定為 1000 似乎也不符合我們的應用場景,酌量下降設定為 750 可能會是一個比較好的做法,但是也會造成 TPS 下降。(花的時間一樣,我們只是跟 Kafka 說我們一次垃少一點的資料,一樣有種掩耳盜鈴的感覺)
還可以怎麼做
社群解決方案需要對自身系統足夠了解,這裡我舉一個例子。
我們需要搞清楚你的 consumer 系統消費每條訊息的平均花費時間是多少。
比如我們的 consumer 邏輯是從 Kafka brocker 拉取消息後,進行一些業務處理最後再寫入下游的永久儲存 MySQL 中。我們可以觀測在最後一哩路存到 MySQL 的平均花費時間不超過 1 秒,我們大致上可以認為這筆訊息只要 1 秒的時間被消費。
如果 consumer 的 max.poll.records 設定為 400,那麼消費這些訊息要大概花上 1000 秒的時間,因此我們 Consumer 的 max.poll.interval.ms 數值就不能設定 400 秒。了解了自身系統後可以針對這兩個數值去調整減少CommitFailedException的錯誤發生,另外社群給出調整這兩項數值是傷害TPS的。
我推薦這麼做
- 縮短資料處理時間
很直觀的若是一條若是一條訊息原始處理時間為 50ms , 現在經過優化過後處理時間變為 40 ms 速度上直接提升了1.25倍,但是這也是最難做的部分需要優化處理流程(演算法、資料結構)。 - 使用Muti Thread 來處理訊息
Tips: 不能在多個 Thread 中共享同一個 KafkaConsumer instance ,否則會產生 ConcurrentModificationException Error
我們向kafka brocker 說拉取了資料,這時候要啟動多條Thread 來處理這些訊息。
我們假假設每條訊息都開一條Thread來處理(先不考慮thread pool),如果有1000條訊息就會有1000條thread,如下圖所示。
|
|
我認為困難點是我要怎麼知道Thread全部做完了,什麼時候要commit 要怎麼commit 用async還是用sync開開Thread很簡單,但commit才是最麻煩的地方。
缺點
實作起來非常複雜且麻煩,在 Thread 中維護資料確定哪些沒有 commit 哪些 commit 了 非常的費心思,另外 如果用等冪的 topic 資料在 partaction 會是有順序的,例如 partation 的資料順序為 1 2 3 4 5 6 7 8 9 10 ,如果採用 MutiThread的方案會在造成 處理的順序是隨機的,可能處理的順序變為 1 4 3 2 10 9 7 8 之類的。