當前位置:
首頁 > 最新 > Kafka解惑之Old Producer(2)——Sync Analysis

Kafka解惑之Old Producer(2)——Sync Analysis

上接:[Kafka解惑之Old Producer(1)—— Beginning]

上篇結尾一下子擴展的有點多,我們還是先回到DefaultEventHandler上來,當調用producer.send方法發送消息的時候,緊接著就是調用DefaultEventHandler的handle方法。下面是handle方法的主要內容,雖然行數有點多,但是這是Producer中最最核心的一塊,需要反覆研磨,方能一探究竟:

注意handle方法的參數是個Seq[KeyedMessage]類型的,而不是KeyedMessage。雖然Demo中用的只是單個KeyedMessage,最後調用底層的handle方法都是轉換為Seq類型,你可以把Seq看成是java中的List,在Scala中表示序列,指的是一類具有一定長度的可迭代訪問的對象,其中每個元素均帶有一個從0開始計數的固定索引位置。

這個handle方法中首先是調用serialize(events)方法對消息進行序列化操作,這個容易理解,就是通過serializer.class參數指定的序列化類進行序列化。

其次獲取所發送消息對應的元數據信息,然後將一坨消息(也有可能是一條)轉換為HashMap[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]格式,其中key:Int表示broker的id,value是TopicAndPartition與消息集的Map,對應的方法為dispatchSerializedData()。因為客戶端發消息是發到對應的broker上,所以要對每個消息找出對應的leader副本所在的broker的位置,然後將要發送的消息集分類,每個broker對應其各自所要接收的消息。而TopicAndPartition是針對broker上的存儲層的,每個TopicAndPartition對應特定的當前的存儲文件(Segment文件),將消息寫入到存儲文件中。

This is for bootstrapping and the producer will only use it for getting metadata (topics, partitions and replicas). The socket connections for sending the actual data will be established based on the broker information returned in the metadata. The format is host1:port1,host2:port2, and the list can be a subset of brokers or a VIP pointing to a subset of brokers.

接下去所要做的工作就是查看是否需要壓縮,如果客戶端設置了壓縮,則根據compression.type參數配置的壓縮方式對消息進行壓縮處理。0.8.2.x版本支持gzip和snappy的壓縮方式,1.0.0版本還支持lz4的壓縮方式。compression.type參數的默認值值none,即不需要壓縮。

最後根據brokerId分組發送消息。這個分組發送的過程就與ProducerPool有關了,我們前面提到在實例化Producer的時候引入了DefaultEventHandler和ProducerPool。這個ProducerPool保存的是生產者和broker的連接,每個連接對應一個SyncProducer對象。SyncProducer包裝了NIO網路層的操作,每個SyncProducer都是一個與對應broker的socket連接,是真正發送消息至broker中的執行者。

當調用最上層的send方法發送消息的時候,下面的執行順序為DefaultEventHandler.handle()->DefaultEventHandler.dispatchSerializedData()->DefaultEventHandler.send()。在底層的DefaultEventHandler.send方法定義為:

會Java的讀者看這段代碼的時候應該能看出來個90%以上,解釋下這段代碼:首先是找到更新的元數據中所有的brorker(更具體的來說是broker的id、主機地址host和埠號port三元組信息);之後在查到原有的ProducerPool中是否有相應的SyncProducer,如果有則關閉之後再重新建立;如果沒有則新建。SyncProducer底層是阻塞式的NIO,所以關閉再建立會有一定程度上的開銷,相關細節如下:

玩過NIO的讀者對這段代碼相比很是熟絡,雖然是scala版的。如果沒有接觸過NIO,那麼可以先看看這一篇:攻破JAVA NIO技術壁壘(百度「攻破JAVA NIO技術壁壘」即可,認準:朱小廝)。

說道這裡我們用一副結構圖來說明下Old Producer的大致脈絡(註:圖中的所有操作都是在一個線程中執行的):

篇幅限制,更多內容將在下一篇中進行介紹,關注本微信公眾號,了解更多細節。

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 全球大搜羅 的精彩文章:

出行,你可能也需要帶一罐燙飯

TAG:全球大搜羅 |