RocketMQ學習-消息發布和訂閱
前面一篇文章分析了broker的啟動過程,瀏覽了broker的基本功能。接下來的幾篇文章,準備按照十分鐘入門RocketMQ一文中提到的一系列特性,依次進行學習。這篇文章準備分析RocketMQ作為MQ的最基本的功能:消息的發布(publish)和訂閱(subscribe)。首先,我參考Spring Boot系列文章(六):SpringBoot RocketMQ 整合使用和監控這篇文章完成了一個簡單的例子。
一、RocketMQ消息模型
在部署RocketMQ的時候,先啟動name server,再啟動broker,這時候broker會將自己註冊到name server。應用程序中的producer啟動的時候,首先連接一台name server,獲取broker的地址列表;然後再和broker建立連接,接下來就可以發送消息了。其中:一個producer只與一個name server連接,一個producer會跟所有broker建立連接,每個連接都會有心跳檢測機制。
producer會輪詢向指定topic的mq集合發送消息。
consumer有兩種消費模式:集群消費和廣播消費。集群消費:多個consumer平均消費該topic下所有mq的消息,即某個消息在某個message queue中被一個consumer消費後,其他消費者就不會消費到它;廣播消費:所有consumer可以消費到發到這個topic下的所有消息。
consumer有兩種獲取消息的模式:推模式和拉模式,在RocketMQ中,從技術實現角度看,推模式也是在拉模式上做了一層封裝。
二、消息發送
生產者Demo
首先給出代碼,
生產者中有兩個屬性:
name server的地址,用於獲得broker的相關信息
生產者集合producerGroup,在同一個producer group中有不同的producer實例,如果最早一個producer奔潰,則broker會通知該組內的其他producer實例進行事務提交或回滾。
RocketMQ中的消息,使用Message表示,代碼定義如下:
topic:該消息將要往哪個topic發
flag:可以用作消息過濾
properties:暫時沒理解【TODO】
body:消息內容
每個消息發送完後,會得到一個SendResult對象,看下該對象的結構:
在這個demo中,我們是將消息內容和消息狀態一併列印到控制台。
消息發送源碼分析
在RocketMQ中的client模塊的包結構如下,可以看出,作者並沒有將介面的定義和實現放在一個包下(這在我們的業務應用中是常見的做法,不一定合理)。producer和consumer包下分別定義了生產者和消費者的介面,將具體的實現放在impl包中。
首先關注producer包里的內容,幾個主要的類如下:DefaultMQProducer是生產者的默認實現、MQAdmin用於定義一些管理介面、MQProducer用於定義一些生產者特有的介面。
在ProducerDemo中,通過`defaultMQProducer.start();啟動生產者,接下來看下start()方法的過程:
根據服務狀態決定接下來的動作
對於CREATE_JUST狀態
設置服務狀態
檢查配置
獲取或創建MQClientInstance實例
將生產者註冊到指定的producerGroup,即producerTable這個數據結構中,是一個map
填充topicPublishInfoTable數據結構
啟動生產者
對於RUNNING、STARTFAILED和SHUTDOWNALREADY,拋出異常
順著 往下跟,可以進一步了解生產者的細節,主要步驟有:
建立請求響應通道
啟動各種定時任務,例如:每隔2分鐘向name server拉取一次broker集群的地址,這意味著如果某個broker宕機了,生產者在這兩分鐘之內的消息是投遞失敗的;定期從name server拉取topic等路由信息;定期清理失效的broker以及向broker發送心跳消息等。
啟動拉服務、負載均衡服務、推服務等服務,這三個服務跟消費者有關。這裡設計上不太明了,將消費者和生產者的啟動邏輯放在一起了。看pullMessageService和rebalanceService和初始化,它們是根據MQClientInstance初始化的,而MQClientInstance又是根據ClientConfig來配置的。
生產者啟動後,接下來看下消息的發送過程,如下圖所示,DefaultMQProducer提供了很多發送消息的方法,可以實現同步發消息、非同步發消息、指定消息隊列、OneWay消息、事務消息等。
這裡我們只看最簡單的 方法,最終在DefaultMQProducerImpl中實現:
發送消息的主要過程如下:
首先檢查生產者和消息的合法性
然後獲取消息發送的信息,該信息存放在TopicPublishInfo對象中:
選擇要發送給該topic下的那個MessageQueue,選擇的邏輯分兩種情況:(1)默認情況,在上次投遞的broker節點上,輪詢到下一個message queue來發送;(2)sendLatencyFaultEnable這個值設置為true的時候,這塊沒太看懂。
投遞消息
根據消息隊列運行模式,針對投遞結果做不同的處理。
二、消息消費
消費者Demo
消費者里有個屬性需要看下:
consumerGroup:位於同一個consumerGroup中的consumer實例和producerGroup中的各個produer實例承擔的角色類似;consumerGroup中的實例還可以實現負載均衡和容災。PS:處於同一個consumerGroup里的consumer實例一定是訂閱了同一個topic。
nameServer的地址:name server地址,用於獲取broker、topic信息
消費者Demo里做了以下幾個事情:
設置配置屬性
設置訂閱的topic,可以指定tag
設置第一次啟動的時候,從message queue的哪裡開始消費
設置消息處理器
啟動消費者
消費者源碼分析
前面分析過了,RocketMQ中的client模塊統一提供了生產者和消費者客戶端,這塊我們看下消費者的幾個主要的類。前面提到過,RocketMQ實際上都是拉模式,這裡的DefaultMQPushConsumer實現了推模式,也只是對拉消息服務做了一層封裝,即拉到消息的時候觸發業務消費者註冊到這裡的callback,而具體拉消息的服務是由PullMessageService實現的,這個細節後續再研究。
在ConsumerDemo中,設置好配置信息後,會進行topic訂閱,調用了DefaultMQPushConsumer的subscribe方法,源碼如下:
第一個參數是topic信息,第二個參數用於用於消息過濾tag欄位。真正的訂閱發生在DefaultMQPushConsumerImpl中,代碼如下:
在ConsumerDemo中,接下里會設置消費者首次啟動時消費消息的起始位置,這涉及到DefaultMQPushConsumer中的一個屬性——consumeFromWhere,這個值有三個可能的值
CONSUMEFROMLAST_OFFSET,默認值,表示從上次停止時的地方開始消費
CONSUMEFROMFIRST_OFFSET,從隊列的頭部開始消費
CONSUMEFROMTIMESTAMP,從指定的時間點開始消費
ConsumerDemo接下來會註冊一個callback,當消息到達的時候就處理消息(最新的消息監聽者支持並發消費):
最後,我們看下ConsumerDemo的啟動過程,即DefaultMQPushConsumerImpl的start方法,主要做了下面幾件事:
檢查配置
將訂閱信息拷貝到負載均衡組件(rebalanceImpl)中;
負載均衡組件的幾個屬性的設置
處理不同消息模式(集群模式或廣播模式)的配置
處理順序消費和並發消費的不同配置
將消費者信息和consumer group註冊到MQ客戶端實例的consumerTable中
啟動消費者客戶端
參考資料
分散式開放消息系統(RocketMQ)的原理與實踐
買好車提供的rocketmq-spring-boot-starter
Spring Boot系列文章(六):SpringBoot RocketMQ 整合使用和監控
TAG:dqVoice |