Kafka Controller Redesign 方案
(點擊
上方公眾號
,可快速關注)
來源:王蒙 ,
matt33.com/2018/07/14/kafka-controller-redesign/
Kafka Controller 是 Kafka 的核心組件,在前面的文章中,已經詳細講述過 Controller 部分的內容。在過去的幾年根據大家在生產環境中應用的反饋,Controller 也積累了一些比較大的問題,而針對這些問題的修復,代碼的改動量都是非常大的,無疑是一次重構,因此,社區準備在新版的系統里對 Controller 做一些相應的優化(0.11.0及以後的版本),相應的設計方案見:
Kafka Controller Redesign
,本文的內容就是結合這篇文章做一個簡單的總結。
https://docs.google.com/document/d/1rLDmzDOGQQeSiMANP0rC2RYp_L7nUGHzFD9MQISgXYM/edit#heading=h.pxfjarumuhko
Controller 功能
在一個 Kafka 中,Controller 要處理的事情總結如下表所示:
Controller 目前存在的問題
之所以要重新設計 Controller,是因為現在的 Controller 積累了一些比較難解決的問題,這些問題解決起來,代碼改動量都是巨大的,甚至需要改變 controller 部門的設計,基本就跟重構差不多了,下面我們先來了看一下 controller 之前(主要是 0.11.0 之前的版本)存在的一些問題。
目前遇到的比較大的問題有以下幾個:
Partition 級別同步 zk 寫;
sequential per-partition controller-to-broker requests;
Controller 複雜的並發語義;
代碼組織混亂;
控制類請求與數據類請求未分離;
Controller 給 broker 的請求中沒有 broker 的 generation信息;
ZkClient 阻礙 Client 的狀態管理。
Partition 級別同步 zk 寫
zookeeper 的同步寫意味著在下次寫之前需要等待前面整個過程的結束,而且由於它們都是 partition 粒度的(一個 Partition 一個 Partition 的去執行寫操作),對於 Partition 非常多的集群來說,需要等待的時間會更長,Controller 通常會在下面這兩個地方做 Partition 級別 zookeeper 同步寫操作:
PartitionStateMachine 在進行觸發 leader 選舉(partition 目的狀態是 OnlinePartition),將會觸發上面的操作;
ReplicaStateMachine 更新 LeaderAndIsr 信息到 zk(replica 狀態轉變為 OfflineReplica),這種情況也觸發這種情況,它既阻礙了 Controller 進程,也有可能會 zk 造成壓力。
sequential per-partition controller-to-broker requests
Controller 在向 Broker 發送請求,有些情況下也是 Partition 粒度去發送的,效率非常低,比如在 Controller 處理 broker shutdown 請求時,這裡是按 Partition 級別處理,每處理一個 Partition 都會執行 Partition、Replica 狀態變化以及 Metadata 更新,並且調用 sendRequestsToBrokers() 向 broker 發送請求,這樣的話,效率將變得非常低。
Controller 複雜的並發語義
Controller 需要在多個線程之間共享狀態信息,這些線程有:
IO threads handling controlled shutdown requests
The ZkClient org.I0Itec.zkclient.ZkEventThread processing zookeeper callbacks sequentially;
The TopicDeletionManager kafka.controller.DeleteTopicsThread;
Per-broker RequestSendThread within ControllerChannelManager.
所有這些線程都需要訪問或修改狀態信息(ControllerContext),現在它們是通過 ControllerContext 的 controllerLock(排它鎖)實現的,Controller 的並發變得虛弱無力。
代碼組織混亂
KafkaController 部分的代碼組織(KafkaController、PartitionStateMachine 和 ReplicaStateMachine)不是很清晰,比如,下面的問題就很難回答:
where and when does zookeeper get updated?
where and when does a controller-to-broker request get formed?
what impact does a failing zookeeper update or controller-to-broker request have on the cluster state?
這也導致了這部分很多開發者不敢輕易去改動。
控制類請求與數據類請求未分離
現在 broker 收到的請求,有來自 client、broker 和 controller 的請求,這些請求都會被放到同一個 requestQueue 中,它們有著同樣的優先順序,所以來自 client 的請求很可能會影響來自 controller 請求的處理(如果是 leader 變動的請求,ack 設置的不是 all,這種情況有可能會導致數據丟失)。
Controller 給 broker 的請求中沒有 broker 的 generation信息
這裡的 Broker generation 代表著一個標識,每當它重新加入集群時,這個標識都會變化。如果 Controller 的請求沒有這個信息的話,可能會導致一個重啟的 Broker 收到之前的請求,讓 Broker 進入到一個錯誤的狀態。
比如,Broker 收到之前的 StopReplica 請求,可能會導致副本同步線程退出。
ZkClient 阻礙 Client 的狀態管理
這裡的狀態管理指的是當 Client 發生重連或會話過期時,Client 可以監控這種狀態變化,並做出一些處理,因為開源版的 ZKClient 在處理 notification 時,是線性處理的,一些 notification 會被先放到 ZkEventThread』s queue 中,這樣會導致一些最新的 notification 不能及時被處理,特別是與 zk 連接斷開重連的情況。
Controller 改進方案
關於上述問題,Kafka 提出了一些改進方案,有些已經在最新版的系統中實現,有的還在規劃中。
使用非同步的 zk API
Zookeeper 的 client 提供三種執行請求的方式:
同步調用,意味著下次請求需要等待當前當前請求的完成;
非同步調用,意味著不需要等待當前請求的完成就可以開始下次請求的執行,並且我們可以通過回調機制去處理請求返回的結果;
單請求的 batch 調用,意味著 batch 內的所有請求都會在一次事務處理中完成,這裡需要關注的是 zookeeper 的 server 對單請求的大小是有限制的(jute.maxbuffer)。
文章中給出了三種請求的測試結果,Kafka 最後選取的是非同步處理機制,因為對於單請求處理,非同步處理更加簡潔,並且相比於同步處理還可以保持一個更好的寫性能。
improve controller-to-broker request batching
這個在設計文檔還是 TODO 狀態,具體的方案還沒確定,不過基本可以猜測一下,因為目的是提高 batch 發送能力,那麼只能是在調用對每個 broker 的 RequestSenderThread 線程發送請求之前,做一下檢測,而不是來一個請求立馬就發送,這是一個性能與時間的權衡,如果不是立馬發送請求,那麼可能會帶來 broker 短時 metadata 信息的不一致,這個不一致時間不同的應用場景要求是不一樣的。
單線程的事件處理模型
採用單線程的時間處理模型將極大簡化 Controller 的並發實現,只允許這個線程訪問和修改 Controller 的本地狀態信息,因此在 Controller 部分也就不需要到處加鎖來保證線程安全了。
目前 1.1.0 的實現中,Controller 使用了一個 ControllerEventThread 線程來處理所有的 event,目前可以支持13種不同類型事件:
Idle:代表當前 ControllerEventThread 處理空閑狀態;
ControllerChange:Controller 切換處理;
BrokerChange:Broker 變動處理,broker 可能有上線或掉線;
TopicChange:Topic 新增處理;
TopicDeletion:Topic 刪除處理;
PartitionReassignment:Partition 副本遷移處理;
AutoLeaderBalance:自動 rebalance 處理;
ManualLeaderBalance:最優 leader 選舉處理,這裡叫做手動 rebalance,手動去切流量;
ControlledShutdown:優雅關閉 broker;
IsrChange:Isr 變動處理;
LeaderAndIsrResponseReceived;
LogDirChange:Broker 某個目錄失敗後的處理(比如磁碟壞掉等);
ControllerShutdown:ControllerEventThread 處理這個事件時,會關閉當前線程。
重構集群狀態管理
這部分的改動,目前社區也沒有一個很好的解決思路,重構這部分的目的是希望 Partition、Replica 的狀態管理變得更清晰一些,讓我們從代碼中可以清楚地明白狀態是在什麼時間、什麼地方、什麼條件下被觸發的。這個優化其實是跟上面那個有很大關聯,採用單線程的事件處理模型,可以讓狀態管理也變得更清晰。
prioritize controller requests
我們想要把控制類請求與數據類請求分開,提高 controller 請求的優先順序,這樣的話即使 Broker 中請求有堆積,Broker 也會優先處理控制類的請求。
這部分的優化可以在網路層的 RequestChannel 中做,RequestChannel 可以根據請求的 id 信息把請求分為正常的和優先的,如果請求是 UpdateMetadataRequest、LeaderAndIsrRequest 或者 StopReplicaRequest,那麼這個請求的優先順序應該提高。實現方案有以下兩種:
在請求隊列中增加一個優先順序隊列,優先順序高的請求放到 the prioritized request queue 中,優先順序低的放到普通請求隊列中,但是無論使用一個定時拉取(poll)還是2個定時拉取,都會帶來其他的問題,要麼是增大普通請求的處理延遲,要麼是增大了優先順序高請求的延遲;
直接使用優先順序隊列代替現在的普通隊列,設計上更傾向與這一種。
目前這部分在1.1.0中還未實現。
Controller 發送請求中添加 broker 的 generation 信息
generation 信息是用來標識當前 broker 加入集群 epoch 信息,每當 broker 重新加入集群中,該 broker.id 對應的 generation 都應該變化(要求遞增),目前有兩種實現方案:
為 broker 分配的一個全局唯一的 id,由 controller 廣播給其他 broker;
直接使用 zookeeper 的 zxid 信息(broker.id 註冊時的 zxid)。
直接使用原生的 Zookeeper client
Client 端的狀態管理意味著當 Client 端發生狀態變化(像連接中斷或回話超時)時,我們有能力做一些操作。其中,zookeeper client 有效的狀態(目前的 client 比下面又多了幾種狀態,這裡先不深入)是:
NOT_CONNECTED: the initial state of the client;
CONNECTING: the client is establishing a connection to zookeeper;
CONNECTED: the client has established a connection and session to zookeeper;
CLOSED: the session has closed or expired。
有效的狀態轉移是:
NOT_CONNECTED > CONNECTING
CONNECTING > CONNECTED
CONNECTING > CLOSED
CONNECTED > CONNECTING
CONNECTED > CLOSED
最開始的設想是直接使用原生 Client 的非同步調用方式,這樣的話依然可以通過回調方法監控到狀態的變化(像連接中斷或回話超時),同樣,在每次事件處理時,可以通過檢查狀態信息來監控到 Client 狀態的變化,及時做一些處理。
當一個 Client 接收到連接中斷的 notification(Client 狀態變成了 CONNECTING 狀態),它意味著 Client 不能再從 zookeeper 接收到任何 notification 了。如果斷開連接,對於 Controller 而言,無論它現在正在做什麼它都應該先暫停,因為可能集群的 Controller 已經切換到其他機器上了,只是它還沒接收到通知,它如果還在工作,可能會導致集群狀態不一致。當連接斷開後,Client 可以重新建立連接(re-establish,狀態變為 CONNECTED)或者會話過期(狀態變為 CLOSED,會話過期是由 zookeeper Server 來決定的)。如果變成了 CONNECTED 狀態,Controller 應該重新開始這些暫停的操作,而如果狀態變成了 CLOSED 狀態,舊的 Controller 就會知道它不再是 controller,應該丟棄掉這些任務。
參考
Kafka Controller Redesign;
https://docs.google.com/document/d/1rLDmzDOGQQeSiMANP0rC2RYp_L7nUGHzFD9MQISgXYM/edit#heading=h.pxfjarumuhko
Kafka controller重設計。
https://www.cnblogs.com/huxi2b/p/6980045.html
【關於投稿】
如果大家有原創好文投稿,請直接給公號發送留言。
① 留言格式:
【投稿】+《 文章標題》+ 文章鏈接
② 示例:
【投稿】《不要自稱是程序員,我十多年的 IT 職場總結》:http://blog.jobbole.com/94148/
③ 最後請附上您的個人簡介哈~
看完本文有收穫?請轉發分享給更多人
關注「ImportNew」,提升Java技能
※學機器學習要走彎路?不存在的!
※Spring Boot 基礎教程 ( 二 ) :快速構建 Spring Boot/Cloud 工程
TAG:ImportNew |