微紀錄Kafka參數調整

 ·  ☕ 6 

紀錄使用Kafka開發隊友與我的採坑之旅並如何解決問題的,如果有錯的地方希望大家能夠指出,謝謝。

JVM

GC調整

Kafka 是用 Scala 語言撰寫的最後在 JVM 上運行,因此 JVM 參數的設定對於 Kafka 的重要性可想而知。

Broker 所在節點的 CPU 資源還不錯的話,建議可以使用 CMS 垃圾回收方法。
UseCurrentMarkSweepGC,另外 kafka runtime 是 java 8 的話可以使用 G1 垃圾回收方法,會有較好的 GC 效能。

1
-XX:+UseCurrentMarkSweepGC

反之,CPU 資源匱乏請使用UseParallelGC

1
-XX:+UseParallelGC

HEAP調整

預設 Kafka 啟動時只有用 1G 的 HEAP 對於大多數情境非常不友好...會造成吞吐量的問題。
如果環境如果環境的記憶體空間許可,我建議可以設定到6G以上

1
KAFKA_HEAP_OPTS=--Xms6g --Xmx6g

Topic調整

  • log.retention.<hours|minutes|ms>

    • 基本上這個數值要依照kafka的單日傳輸量來決定,保存太久會引響io速度,以及硬碟的大小
  • message.max.bytes

    • kafka預設的數值為1000012,我定睛一看連1MB都不到...大多數的 kafka 使用場景都會超過 1MB 的,這個需要根據自身使用情境做調整!
  • unclean.leader.election.enable

    • 每個版本預設的數值都不同,至於有什麼影響如果設置為 true 的話,當跑得慢的 partition 變成 leader ,那所有中間那一段資料就遺失了。設值為flase的話這些跑得慢的partition就不讓他來選幾囉!
      • leader 掛掉了 Kafka 會從 ISR 剩下的副本中選擇一個當 leader ,但如果 ISR 也沒有副本了, leader 就選不出來了。如果設置 unclean.leader.election.enable=true,則允許 Kafka 從那些不在 ISR 但依然存活的副本中選擇一個出來當 leader 。此時是有資料丟失的風險。
  • auto.leader.rebalance.enable

    • 堅決設定為flase沒什麼好說的,重新選舉的代價很大!
  • auto.create.topics.enable

    • 預設為 true ,需要調整為 false ,不然 cluster 會有一堆不知名的 topic ,可能你上 code 不小心打錯字… topic 就建立了,滿浪費資源的。

OS 調整

  • Disk-Flush
    • 根據爬了將近三週kafka官方說明以及許多關於kafka雜談,replicas 如何判定資料確實收到?
      • 只要資料被寫入 system 的 cache 就算了完成了(buffer / cache 差異請自行google)
      • 系統會根據 LRU 定期地(預設是五秒鐘)將 cache 的資料寫入硬碟(stoarge device)
      • 需要依照情況調整 LRU 定期寫入硬碟的時間
        • 可能會有人說在這五秒鐘或是更長的時間資料還沒落到硬碟就死機了,那資料不就遺失了??
        • 確實是遺失了,但一般來說副本會複製三份(一份掉了,會被同步回來)。
        • 加長這個時間是避免io頻繁的操作,用效能換可用性
  • File-system
  • Swap
    • 不建議關閉Swap(裸機情況)
      • 之前用裸機跑的時候在可用的記憶體空間變少時會發生 oomkill 造成 brocker 直接死掉
    • 在k8s上運行
      • 沒什麼好說的 k8s 不給用swap
  • FD
    • Too many open files 的錯誤
      • 在k8s上我沒有遇到這個錯誤,還沒去深究原因
      • 在裸機的情況下,我會調整成ulimit -n 1000000

Producer

最多一次(at most once):訊息在傳送的過程中可能會遺失,但是絕對不會重送
至少一次(at least once):訊息在傳送的過程中不會遺失,但可能會重送
精确一次(exactly once):訊息在傳送的過程中不會遺失,也不會重送

我認為精確性提交比較符合場景,以下我將針對exactly once模式說明與探討相關設定。

  • 設定 acks = all

    • 有副本 replicas 都要接收到消息,該訊息才算是commited。這是符合exactly once定義
      • ISR 中只有 1 replicas 時,acks=all 也就相當於 acks=1,注意ISR會變動
      • 當 replicas=3 時,ack=all 則需要 3 個副本都同步後才算commited,當有一個副本掛掉的時候,此時 ISR 會調整為 2 ,由於 ack=all 的設定這時候只需要 2 個副本同步後就認為“commited”
  • 設定 retries 為一個較大的數值

    • 網路的瞬間抖動可能會導致訊息發送失敗,若是設定 retries > 0 的 Producer 能夠自動重新發送訊息。
  • 設定 min.insync.replicas > 1

    • 除了 ISR 需要全部都寫入外,還要保證寫入的 ISR 數量要大於等於 min.insync.replicas ,注意ISR會變動

    • 一旦指定 min.insync.replicas=xxx,不管什麼情況下 ISR 同步的數量最少都要跟設定xxx數量一樣,如果 ISR 當前數量小於 xxxx 那 commited 一定失敗。

Consumer

確保訊息消費完成再提交。 Consumer 有個參數 enable.auto.commit,最好把它設置成 false,並採用手動提交位移的方式。

手動提交又分成 sync 跟 async 兩種方法,需要這兩種方法結合使用才能確保 consumer 端不掉訊息,這裡採用一個小技巧。

  1. poll 資料後先使用 async 異步提交 commit
  2. 若有異常可以使用處理異常的方法,可以透過 Kafka 重送的機制
  3. 等 poll 所有資料做完可以使用 sync 同步提交 commit 確保本次 poll 的東西都有提交上去

另外本來要去嘗試更細粒度的提交方式,例如每次poll下來是1000筆資料,但我想每100筆去提交一次。
看到Kafka有提供兩種方法去使用(我這裡沒去驗證,趕時間QQ)。

  1. commitSync(Map<TopicPartition, OffsetAndMetadata>)
  2. commitAsync(Map<TopicPartition, OffsetAndMetadata>)

這個也非常簡單,看 samplecode 應該是先建立一個 map 放入訊息從哪個 Partition 來目前處理到第幾個。
假設目前處理到第100個,那就應該提交給 kafka ,以此達到較細粒度的提交。

  1. 增加拉取時間

比如消費 100 筆資料我們要花費 400ms 的時間,那麼設定max.poll.interval.ms為300就不符合當前場景的需求。

顯示的增加這個數值很直接的能解這個錯誤的產生,相對的 TPS 就會下降。

  1. 減少 poll 消息數量

max.poll.records 是設定 cunsumer 向 kafka brocker 一次拉下來的量是多少,假設是1000筆資料,那我們在 max.poll.interval.ms 300 ms 單位時間內最多只能處理 800 筆資料,那這個數值設定為 1000 似乎也不符合我們的應用場景,酌量下降設定為 750 可能會是一個比較好的做法,但是也會造成 TPS 下降。

kafka架設在私有機房經驗

注意本章節使用的單位
另外本章節推估 kafka server 數量不是非常嚴謹,但符合我們的業務需求與場景。需要針對不同的場景做微調與優化並且小心服用。

機房架設 kafka cluster switch 以及 baremetal 提供的頻寬為 1Gbps,我們業務的目標( SLA )是一個小時內處理 1TB 的資料,我們要設計多少台 kafka Server 來滿足這個業務需求呢?

  1. 每台機器只有安裝 Kafka Server (排除固定會安裝的 ssh-server 等),換句話說每台機器沒有混合部署其他服務。

  2. 假設 Kafka 滿載的情況下會使用到 80 % 頻寬(為其他常駐的服務保留一點資源),另外過去的實驗經驗告訴我超過 80 % 的使用頻寬 kafka 就會有網路調包的可能,因此可以說單台 Kafka Server 最多大改能使用 800 Mb 的頻寬。(同時避免高峰直流量的情況作些微的保留)

  3. 要記得 Kafka Server 之間有互相備援的機制,這大概會佔用掉 1/3 的頻寬(800Mb的3分之1),那我們可以計算實際上 Kafka 提供給使用者的頻寬大概是多少 ≈ 533Mbps。

  4. 業務上 SLA 一個小時要求我們要處理 1TB 的資料,所以表示每秒的資料量為 10001000/(6060)≈2222Mb(注意!這邊有Byte換成bits)。

  5. 計算需要多少台 Kafka Server ,每秒需要處理 2222Mb 的業務量一台 kafka 一秒頻寬最大只能使用 533 Mb ,用 2222 除與 533 就可以換換算出 4 台左右的 kafka server 。

  6. Kafka 一般來說不太吃 CPU ,從過去監控的紀錄來看 CPU 很少吃滿常態的情況之下 CPU 只用到一半不到(producer 與 consumer 需要用同一個壓縮算法喔!)


Meng Ze Li
Meng Ze Li
Kubernetes / DevOps / Backend