當前位置:
首頁 > 知識 > 大規模系統的消息隊列技術方案!

大規模系統的消息隊列技術方案!

之前文章我們分析了如何利用消息中間件對兩系統進行解耦處理。

同時,我們也提到了,使用消息中間件還有利於一份數據被多個系統同時訂閱,供多個系統用於不同目的。

目前的一個架構如下圖所示。

在這個圖裡,我們可以清晰的看到,實時計算平台發布的一份數據到消息中間件里,然後接下來:

數據查詢平台會訂閱這份數據,並落入自己本地的資料庫集群和緩存集群里,接著對外提供數據查詢的服務

數據質量監控系統會對計算結果按照一定的業務規則進行監控,如果發現有數據計算錯誤,則會立馬進行報警

數據鏈路追蹤系統會採集計算結果作為一個鏈路節點,同時對一條數據的整個計算鏈路都進行採集並組裝出來一系列的數據計算鏈路落地存儲,最後如果某個數據計算錯誤了,就可以立馬通過計算鏈路進行回溯排查問題

通過以上回顧,我們已經清楚,在上述場景中,使用消息中間件一來可以解耦,二來可以實現消息「Pub/Sub」模型,實現消息的發布與訂閱。

這篇文章,咱們就來落地實踐一把,基於RabbitMQ消息中間件,如何實現一份數據被多個系統同時訂閱的「Pub/Sub」模型?

2. 基於消息中間件的隊列消費模型

上圖其實就是採用的RabbitMQ最基本的隊列消費模型的支持,你可以理解為RabbitMQ內部有一個隊列,生產者不斷的發送數據到隊列里,消息按照先後順序進入隊列中排隊。

現在假設隊列里有4條數據,我們有2個消費者一起消費這個隊列的數據。

此時每個消費會均勻的分配到2條數據,也就是說4條數據會均勻的分配給各個消費者,每個消費者只不過是處理一部分數據罷了,這個就是典型的隊列消費模型。

這幾篇文章給出了上述那個最基本的隊列消費模型的RabbitMQ代碼實現,以及如何保證消費者宕機時數據不丟失,如何讓RabbitMQ集群對queue和message都進行持久化,整體代碼實現比較完整,可以參考一下。

3. 基於消息中間件的「Pub/Sub」模型

除了上述的基本模型外,消息中間件還可以實現一種「Pub/Sub」模型,也就是「發布/訂閱」模型,Pub就是Publish,Sub就是Subscribe。

這種模型可以支持多個系統同時消費一份數據,也就是說你發布出去的每條數據,都會廣播給每個系統,看下圖:

也就是說,我們想要實現的上圖的效果:實時計算平台發布一系列的數據到消息中間件里,然後數據查詢平台、數據質量監控系統、數據鏈路追蹤系統,都會訂閱數據,都會消費到同一份完整的數據,每個系統都可以根據自己的需要使用數據。

那麼這個所謂的「Pub/Sub」模型,基於RabbitMQ應該怎麼來處理呢?

4. RabbitMQ中的exchange到底是個什麼東西?

實際上,在RabbitMQ裡面是不允許生產者直接投遞消息到某個queue(隊列)里的,而是只能讓生產者投遞消息給RabbitMQ內部的一個特殊組件,叫做「exchange」,你大概可以理解為一種消息路由組件。

也就是說,實時計算平台發送出去的message到RabbitMQ中都是由一個exchange來接收的。

然後這個exchange會根據一定的規則決定要將這個message路由轉發到哪個queue里去,這實際上就是RabbitMQ中的一個核心的消息模型。

大家看下面的圖來理解一下。

5. 默認的exchange

你也許會說,我投遞消息到RabbitMQ的時候,也沒有用什麼exchange,但是為什麼還是把消息投遞到了queue里去呢?

那是因為你使用了默認的exchange,他會直接把消息路由到你指定的那個queue里去,所以如果簡單用隊列消費模型,就省去了exchange的概念。

上面這段就是之前給大家展示的,讓消息持久化的一種投遞消息的方式。

大家注意裡面的第一個參數,是一個空的字元串,這個空字元串的意思,就是說投遞消息到默認的exchange里去,然後他就會路由消息到我們指定的queue里去。

6. 將消息投遞到fanout exchange

在RabbitMQ里,exchange這種組件有很多種類型,比如說:direct、topic、headers以及fanout,本文我們來看最後一種fanout。

這種exchange組件其實非常的簡單,你可以創建一個fanout類型的exchange,然後給這個exchange綁定多個queue,接著只要你投遞一條消息到這個exchange,他就會把消息路由給他綁定的所有queue。

使用下面的代碼就可以創建一個exchange,比如說在實時計算平台(生產者)的代碼里,可以加入下面的一段,創建一個fanout類型的exchange。

第一個參數我們叫做「rt_compute_data」,這個就是exchange的名字,rt就是「RealTime」的縮寫,意思就是實時計算系統的計算結果數據。

第二個參數就是定義了這個exchange的類型是「fanout」。

channel.exchangeDeclare(rt_compute_data, fanout);

接著我們就採用下面的代碼來投遞消息到我們創建好的exchange組件里去:

大家會注意到,此時消息就是投遞到指定的exchange里去了,但是路由到哪個queue里去呢?此時我們暫時還沒確定,要讓消費者自己把自己的queue綁定到這個exchange上去才可以。

7. 綁定自己的隊列到exchange上

對消費者的代碼也進行修改,之前我們在這裡關閉了autoAck機制,然後每次都是自己手動ack。

上面的代碼里,每個消費者系統,都會有一些不一樣,就是每個消費者都需要定義自己的隊列,然後綁定到exchange上去。

比如說數據查詢平台的隊列是「rt_compute_data_query」,數據質量監控平台的隊列是「rt_compute_data_monitor」,數據鏈路追蹤系統的隊列是「rt_compute_data_link」。

這樣每個訂閱這份數據的系統其實都有一個屬於自己的隊列,然後隊列里被會被exchange路由進去實時計算平台生產的所有數據。

而且因為是多個隊列的模式,每個系統都可以部署消費者集群來進行數據的消費和處理,非常的方便。

8. 整體架構圖

如上圖所示,實時計算平台會投遞消息到「rt_compute_data」這個「exchange」里去,但是他沒指定這個exchange要路由消息到哪個隊列,因為這個他本身是不知道的。

接著數據查詢平台、數據質量監控系統、數據鏈路追蹤系統,就可以聲明自己的隊列,都綁定到exchange上去。

因為queue和exchange的綁定,在這裡是由要訂閱數據的平台自己指定的。而且因為這個exchange是fanout類型的,他只要接收到了數據,就會路由數據到所有綁定到他的隊列里去,這樣每個隊列里都有同樣的一份數據,供對應的平台來消費。

而且針對每個平台自己的隊列,自己還可以部署消費服務集群來消費自己的一個隊列,自己的隊列里的數據還是會均勻分發給各個消費服務實例來處理,每個消費服務實例會獲取到一部分的數據。

這樣是不是就實現了不同的系統訂閱一份數據的「Pub/Sub」的模型?

當然,RabbitMQ還支持各種不同類型的exchange,可以實現各種複雜的功能,後續我們再來給大家通過實際的線上系統架構案例,來闡述消息中間件技術的用法。

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 千鋒JAVA開發學院 的精彩文章:

Android 進程和線程
一致性模型之Sequential Consistency

TAG:千鋒JAVA開發學院 |