當前位置:
首頁 > 最新 > RocketMQ學習-消息發布和訂閱

RocketMQ學習-消息發布和訂閱

前面一篇文章分析了broker的啟動過程,瀏覽了broker的基本功能。接下來的幾篇文章,準備按照十分鐘入門RocketMQ一文中提到的一系列特性,依次進行學習。這篇文章準備分析RocketMQ作為MQ的最基本的功能:消息的發布(publish)和訂閱(subscribe)。首先,我參考Spring Boot系列文章(六):SpringBoot RocketMQ 整合使用和監控這篇文章完成了一個簡單的例子。


在部署RocketMQ的時候,先啟動name server,再啟動broker,這時候broker會將自己註冊到name server。應用程序中的producer啟動的時候,首先連接一台name server,獲取broker的地址列表;然後再和broker建立連接,接下來就可以發送消息了。其中:一個producer只與一個name server連接,一個producer會跟所有broker建立連接,每個連接都會有心跳檢測機制。

producer會輪詢向指定topic的mq集合發送消息。

consumer有兩種消費模式:集群消費和廣播消費。集群消費:多個consumer平均消費該topic下所有mq的消息,即某個消息在某個message queue中被一個consumer消費後,其他消費者就不會消費到它;廣播消費:所有consumer可以消費到發到這個topic下的所有消息。

consumer有兩種獲取消息的模式:推模式和拉模式,在RocketMQ中,從技術實現角度看,推模式也是在拉模式上做了一層封裝。


首先給出代碼,

生產者中有兩個屬性:

name server的地址,用於獲得broker的相關信息

生產者集合producerGroup,在同一個producer group中有不同的producer實例,如果最早一個producer奔潰,則broker會通知該組內的其他producer實例進行事務提交或回滾。

RocketMQ中的消息,使用Message表示,代碼定義如下:

topic:該消息將要往哪個topic發

flag:可以用作消息過濾

properties:暫時沒理解【TODO】

body:消息內容

每個消息發送完後,會得到一個SendResult對象,看下該對象的結構:

在這個demo中,我們是將消息內容和消息狀態一併列印到控制台。


在RocketMQ中的client模塊的包結構如下,可以看出,作者並沒有將介面的定義和實現放在一個包下(這在我們的業務應用中是常見的做法,不一定合理)。producer和consumer包下分別定義了生產者和消費者的介面,將具體的實現放在impl包中。

首先關注producer包里的內容,幾個主要的類如下:DefaultMQProducer是生產者的默認實現、MQAdmin用於定義一些管理介面、MQProducer用於定義一些生產者特有的介面。

在ProducerDemo中,通過`defaultMQProducer.start();啟動生產者,接下來看下start()方法的過程:

根據服務狀態決定接下來的動作

對於CREATE_JUST狀態

設置服務狀態

檢查配置

獲取或創建MQClientInstance實例

將生產者註冊到指定的producerGroup,即producerTable這個數據結構中,是一個map

填充topicPublishInfoTable數據結構

啟動生產者

對於RUNNING、STARTFAILED和SHUTDOWNALREADY,拋出異常

順著 往下跟,可以進一步了解生產者的細節,主要步驟有:

建立請求響應通道

啟動各種定時任務,例如:每隔2分鐘向name server拉取一次broker集群的地址,這意味著如果某個broker宕機了,生產者在這兩分鐘之內的消息是投遞失敗的;定期從name server拉取topic等路由信息;定期清理失效的broker以及向broker發送心跳消息等。

啟動拉服務、負載均衡服務、推服務等服務,這三個服務跟消費者有關。這裡設計上不太明了,將消費者和生產者的啟動邏輯放在一起了。看pullMessageService和rebalanceService和初始化,它們是根據MQClientInstance初始化的,而MQClientInstance又是根據ClientConfig來配置的。

生產者啟動後,接下來看下消息的發送過程,如下圖所示,DefaultMQProducer提供了很多發送消息的方法,可以實現同步發消息、非同步發消息、指定消息隊列、OneWay消息、事務消息等。

這裡我們只看最簡單的 方法,最終在DefaultMQProducerImpl中實現:

發送消息的主要過程如下:

首先檢查生產者和消息的合法性

然後獲取消息發送的信息,該信息存放在TopicPublishInfo對象中:

選擇要發送給該topic下的那個MessageQueue,選擇的邏輯分兩種情況:(1)默認情況,在上次投遞的broker節點上,輪詢到下一個message queue來發送;(2)sendLatencyFaultEnable這個值設置為true的時候,這塊沒太看懂。

投遞消息

根據消息隊列運行模式,針對投遞結果做不同的處理。


消費者里有個屬性需要看下:

consumerGroup:位於同一個consumerGroup中的consumer實例和producerGroup中的各個produer實例承擔的角色類似;consumerGroup中的實例還可以實現負載均衡和容災。PS:處於同一個consumerGroup里的consumer實例一定是訂閱了同一個topic。

nameServer的地址:name server地址,用於獲取broker、topic信息

消費者Demo里做了以下幾個事情:

設置配置屬性

設置訂閱的topic,可以指定tag

設置第一次啟動的時候,從message queue的哪裡開始消費

設置消息處理器

啟動消費者


前面分析過了,RocketMQ中的client模塊統一提供了生產者和消費者客戶端,這塊我們看下消費者的幾個主要的類。前面提到過,RocketMQ實際上都是拉模式,這裡的DefaultMQPushConsumer實現了推模式,也只是對拉消息服務做了一層封裝,即拉到消息的時候觸發業務消費者註冊到這裡的callback,而具體拉消息的服務是由PullMessageService實現的,這個細節後續再研究。

在ConsumerDemo中,設置好配置信息後,會進行topic訂閱,調用了DefaultMQPushConsumer的subscribe方法,源碼如下:

第一個參數是topic信息,第二個參數用於用於消息過濾tag欄位。真正的訂閱發生在DefaultMQPushConsumerImpl中,代碼如下:

在ConsumerDemo中,接下里會設置消費者首次啟動時消費消息的起始位置,這涉及到DefaultMQPushConsumer中的一個屬性——consumeFromWhere,這個值有三個可能的值

CONSUMEFROMLAST_OFFSET,默認值,表示從上次停止時的地方開始消費

CONSUMEFROMFIRST_OFFSET,從隊列的頭部開始消費

CONSUMEFROMTIMESTAMP,從指定的時間點開始消費

ConsumerDemo接下來會註冊一個callback,當消息到達的時候就處理消息(最新的消息監聽者支持並發消費):

最後,我們看下ConsumerDemo的啟動過程,即DefaultMQPushConsumerImpl的start方法,主要做了下面幾件事:

檢查配置

將訂閱信息拷貝到負載均衡組件(rebalanceImpl)中;

負載均衡組件的幾個屬性的設置

處理不同消息模式(集群模式或廣播模式)的配置

處理順序消費和並發消費的不同配置

將消費者信息和consumer group註冊到MQ客戶端實例的consumerTable中

啟動消費者客戶端


分散式開放消息系統(RocketMQ)的原理與實踐

買好車提供的rocketmq-spring-boot-starter

Spring Boot系列文章(六):SpringBoot RocketMQ 整合使用和監控


喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 dqVoice 的精彩文章:

TAG:dqVoice |