當前位置:
首頁 > 知識 > 拿來就能用!去哪兒網消息中間件 QMQ 詳解 | 技術頭條

拿來就能用!去哪兒網消息中間件 QMQ 詳解 | 技術頭條

拿來就能用!去哪兒網消息中間件 QMQ 詳解 | 技術頭條

拿來就能用!去哪兒網消息中間件 QMQ 詳解 | 技術頭條

作者 | 去哪兒網QMQ團隊

責編 | 伍杏玲

出品 | CSDN(ID:CSDNnews)

2012 年,隨著業務快速增長,公司內部開始了服務化進程,通過拆分單體服務加快開發部署效率,提高業務迭代速度。服務化情況下不同的服務分開部署,不同服務之間需要通過網路來通信。

當時公司內部選擇使用 Dubbo 作為主要的 RPC 框架。但是 RPC 並不適用於所有通信場景,RPC 主要表達同步的直接調用關係,但是實際上還有非同步通知類型的通信需求。

分散式消息隊列(MQ)是典型的非同步通知類型的通信實現,有很多好處:

  • 業務可以專註核心流程,通過消息解耦「通知類」業務,減少核心流程受影響的可能性。

  • 可以方便的實現最終一致性,提高服務可用性和吞吐能力。

  • 通過錯峰和流控可以提高服務可靠性,降低下游系統對上游的影響力。

QMQ 就是 Qunar 內部實現的分散式消息隊列。

實際上,在 Qunar 內部,QMQ 的服務端實現有新老兩個完全不同的版本。目前開源的是最新的版本,本文也會專註於介紹新版本的使用、設計和實現。

最初引入 QMQ 主要用於支付、訂單等場景,對服務的可靠性、一致性要求比較高,但是當時並沒有滿足要求的可靠開源實現,所以公司內部自己造了 QMQ 這個輪子。根據當時考慮支持的主要業務場景,初版 QMQ 利用業務庫實現了消息發送的可靠性和一致性,服務端存儲則選擇了MySQL+Redis。後來也基於 MySQL 開發了延時消息隊列。

隨著業務的發展和 QMQ 在公司內部的推廣,QMQ 支撐的業務種類越來越多,消息量越來越大。這種情況下,初版 QMQ 暴露了一些問題:

  • 吞吐能力不足,逐漸不能滿足公司內部的需求,和使用越來越多的Kafka相比差距很大。

  • 堆積能力弱,不能有效的實現錯峰。

  • 推模型在消息堆積時會增加消費者的壓力。

這些問題迫使我們重新設計了 QMQ 的存儲模型。此時直接基於本地順序文件的 Kafka 和 RocketMQ 等已經成為業內廣泛使用的方案,它們提供了非常好的寫入和堆積能力,QMQ 重新設計時也參考了他們的存儲模型。

但根據公司內部的實際使用場景,我們單獨設計了消費管理的模型,這點會在後面詳細說明。

拿來就能用!去哪兒網消息中間件 QMQ 詳解 | 技術頭條

基本概念

首先,我們介紹一些使用時需要知道的基本概念。

producer 表示消息生產者,consumer 表示消息消費者,broker 一般用來表示 QMQ 的服務端。

subject 表示消息的主題,可以理解為消息的分類,每條消息發送時都需要指定一個主題,消費者訂閱消息時也需要指定需要訂閱的主題。

consumer group 表示消費者所在的消費組。在 QMQ 中,所有的消費者都屬於一個消費組,消費組內共享消費進度,不同消費組之間的消費進度相互獨立。消費時一條消息一般情況下只會派發給消費組中的一個消費者。

比如一個消費組中有 10 個消費者,那麼這 10 個消費者就會均分主題的消息,而不是每個消費者都消費全部消息。實際業務場景里,也有單個消費者需要消費一個主題全部消息的情況,這種消費方式在 QMQ 中稱為廣播消費,廣播消費時每個消費者都屬於一個單獨的消費組。

拿來就能用!去哪兒網消息中間件 QMQ 詳解 | 技術頭條

發送消息

發送消息時首先需要創建一個producer,創建時需要提供兩個參數,一個是標識當前項目的 App Code,另一個是QMQ metaserver的地址。

MessageProducerProvider producer = new MessageProducerProvider;

producer.setAppCode("your app");

producer.setMetaServer("http://<meta server address>/meta/address");

producer.init;

發送實時消息時先使用producer創建消息,然後將需要發送的數據放入消息並發送。

//每次都需要使用generateMessage生成一個新消息

Message message = producer.generateMessage("qmq_subject");

//QMQ提供的Message是key/value的形式

message.setProperty("key", "value");

//發送消息

producer.sendMessage(message);

發送延時消息的過程和發送實時消息類似,只是需要指定延時時間。

Message message = producer.generateMessage("qmq_delay_subject");

message.setProperty("key", "value");

//設置延時時間

message.setDelayTime(15, TimeUnit.MINUTES);

producer.sendMessage(message);

需要注意的是 sendMessage 實際是個非同步操作,如果想要知道一條消息的發送結果,需要額外提供回調。

producer.sendMessage(message, new MessageSendStateListener {

@Override

public void onSuccess(Message message) {

//send success

}

@Override

public void onFailed(Message message) {

//send failed

}

});

更多說明請參考:

https://github.com/qunarcorp/qmq/blob/master/docs/cn/producer.md

拿來就能用!去哪兒網消息中間件 QMQ 詳解 | 技術頭條

消費消息

消息消息時比較推薦的使用方式是直接在 Spring 中啟用 QMQ 消費者相關的註解。

首先是配置 Spring,引入 QMQ 相關的註解支持。

第一種是 Spring 的 XML 配置方式。

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xmlns:qmq="http://www.qunar.com/schema/qmq"

xmlns:context="http://www.springframework.org/schema/context"

xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd

http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd

http://www.qunar.com/schema/qmq http://www.qunar.com/schema/qmq/qmq.xsd">

<qmq:consumer appCode="your app" metaServer="http://meta server/meta/address" />

<context:annotation-config />

<context:component-scan base-package="qunar.tc.qmq.demo.consumer.*" />

<!-- 處理消息時使用的線程池 -->

<bean id="qmqExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolExecutorFactoryBean">

<property name="corePoolSize" value="2" />

<property name="maxPoolSize" value="2" />

<property name="queueCapacity" value="1000" />

<property name="threadNamePrefix" value="qmq-process" />

</bean>

</beans>

第二種是Spring Boot中的註解配置方式。

@Configuration

@EnableQmq(appCode="your app", metaServer="http://<meta server address>/meta/address")

public class Config {}

之後就可以使用QmqConsumer註解來訂閱消息。使用這種方式時,消息會自動被ACK。如果處理時拋出異常,那麼這條消息就會被標記為消費失敗,延後一段時間後這條消息會被重新派發。

@QmqConsumer(subject = "qmq_subject", consumerGroup = "group", executor = "executor bean name")

public void onMessage(Message message){

//處理消息

String value = message.getStringProperty("key");

}

QmqConsumer註解可以支持最多派發一次、廣播消費等模式。

使用consumeMostOnce參數可以開啟最多派發一次。

@QmqConsumer(subject = "qmq_subject", consumerGroup = "group", consumeMostOnce = true, executor = "executor bean name")

public void onMessage(Message message){

}

使用isBroadcast參數可以開啟廣播消費。

@QmqConsumer(subject = "qmq_subject", consumerGroup = "group", isBroadcast = true, executor = "executor bean name")

public void onMessage(Message message){

}

更多說明請參考:

https://github.com/qunarcorp/qmq/blob/master/docs/cn/consumer.md

拿來就能用!去哪兒網消息中間件 QMQ 詳解 | 技術頭條


事務消息

在消息發送方面,QMQ藉助資料庫事務提供了一種強一致性的事務消息,這是QMQ的一個比較特別的功能。

有些使用場景下一致性是非常關鍵的需求,比如在很多交易場景中,不能出現業務操作成功但消息未發出或消息已發出但是業務操作失敗的情況。舉個具體的例子,支付服務使用消息通知出票服務,這時不能出現支付成功消息卻沒有發出,這會引起用戶投訴,也不能出現支付未成功消息卻發出導致成功出票,這會導致公司損失。簡單來說就是發消息和業務需要有事務保證。公司內部這種強一致性的業務基本都會使用資料庫的事務來實現,QMQ根據這種情況演化出了基於資料庫事務的事務消息。

在同一個資料庫實例上,我們可以在同一個事務中操作多個不同的表。利用這個特性可以透明的將業務操作和消息發送放在同一個事務中。首先在公司里所有MySQL實例里都創建一個message database,這個可以讓DBA放到自動化流程中,不需要使用方主動參與。然後我們在producer中添加事務消息的支持,在事務中不直接發送消息,而是先藉助事務保存消息到業務實例上,等事務提交之後再開始發送,發送成功再刪除消息,事務回滾時就不必發送這條消息。事務提交後消息發送失敗時,QMQ有一個watchdog服務會定時檢查各個業務庫的message database,重新發送其中發送失敗的消息。

實例使用方式可以參考下面的代碼。首先創建producer時需要指定一個TransactionProvider,目前在Spring中提供支持的是SpringTransactionProvider,創建時指定業務的資料庫datasource即可。

<bean id="transactionProvider" class="qunar.tc.qmq.producer.tx.spring.SpringTransactionProvider">

<constructor-arg name="bizDataSource" ref="dataSource" />

</bean>

<bean id="messageProducer" class="qunar.tc.qmq.producer.MessageProducerProvider">

<property name="appCode" value="your app"/>

<property name="metaServer" value="http://<metaserver address>/meta/address"/>

<property name="transactionProvider" ref="transactionProvider"/>

</bean>

利用Spring的Transactional註解引入事務支持。sendMessage只是將消息寫入業務庫,且寫入操作和payDao.append在同一個事務中,同時成功或失敗。

@Transactional

public void pay(Order order){

PayTransaction t = buildPayTransaction(order);

payDao.append(t);

producer.sendMessage(buildMessage(t));

}

更多說明請參考:

https://github.com/qunarcorp/qmq/blob/master/docs/cn/transaction.md

拿來就能用!去哪兒網消息中間件 QMQ 詳解 | 技術頭條


整體架構

下面簡單介紹下QMQ服務端的整體架構,整體架構圖如下所示。

拿來就能用!去哪兒網消息中間件 QMQ 詳解 | 技術頭條

QMQ服務端主要由metaserver、broker、delay這三個核心組件組成。

metaserver主要是負責管理各種元數據。第一種元數據是主題的路由信息,producer和consumer需要通過metaserver獲得主題對應的broker、delay的地址,這樣才能正確發送、消費消息。第二種元數據是實時和延時集群的組成,且需要維護與broker和delay的心跳,及時下線宕機的服務。metaserver中的各項元數據是保存在資料庫中的。

實時集群和延時集群都是由多個group組成的,每個group內有主從2台機器。每個主題都會分配至少2個group,這樣才能保證可用性,即掛掉一個group也不會影響消息的發送和消費。

延遲消息首先會發送到延遲集群,等延遲時間到達,延遲集群會將消息發送到實時集群,消費者只從實時集群消費消息,不直接和延遲集群交互。

實時集群和延遲集群中的服務都會和metaserver保持心跳,一旦metaserver檢測到心跳過期,就會將對應的group標記為下線狀態。

拿來就能用!去哪兒網消息中間件 QMQ 詳解 | 技術頭條


實時隊列

下面簡單介紹一下QMQ實時隊列的主要設計,主要從消費模型和存儲設計兩方面介紹。消費模型是QMQ根據公司業務場景單獨設計的,和Kafka這種常見消息隊列有所區別。消費模型也決定了存儲設計上的一些取捨。

消費模型

之前在QMQ背景介紹中提到,重新設計存儲模型時我們參考了Kafka和RocketMQ的存儲設計,那QMQ和這些消息隊列在設計上有何不同呢?最大的不同就在消費模型上。QMQ的消費模型也算是它的一個特色,也是我們在過去幾年運維消息中間件期間覺得必須提供的和難以捨棄的,尤其是在業務場景下使用時。

Kafka和RocketMQ都是基於partition的存儲模型,每個主題分為一個或多個partition,server收到消息後將其分發到某個partition上,consumer消費消息時是與partition關聯的。比如,某個主題a分配了3個partition(p1, p2, p3),某個消費組內有3個消費者(c1, c2, c3)消費該主題,則會建立c1 - p1, c2 - p2, c3 - p3這樣的消費對應關係。如下圖所示。

拿來就能用!去哪兒網消息中間件 QMQ 詳解 | 技術頭條

那麼如果我們的consumer個數比partition個數多呢?則有的consumer會是空閑的。

拿來就能用!去哪兒網消息中間件 QMQ 詳解 | 技術頭條

而如果partition個數比consumer個數多呢?則可能存在有的consumer消費的partition個數會比其他的consumer多的情況。

拿來就能用!去哪兒網消息中間件 QMQ 詳解 | 技術頭條

這就導致合理的分配策略最好是partition個數與consumer個數成倍數關係。以上都是基於partition的MQ所帶來的負載均衡問題。

這種靜態的綁定的關係還會導致consumer擴容縮容麻煩。使用Kafka或者RocketMQ這種基於partition的消息隊列時,如果處理速度跟不上,光簡單的增加consumer並不能馬上提高處理能力,需要對應的增加partition個數,特別在Kafka里partition是一個比較重的資源,增加partition還需要考慮整個集群的處理能力。高峰期過了之後,如果想縮容consumer也比較麻煩,因為partition只能增加,不能減少。

跟擴容相關的另外一個問題是,已經堆積的消息是不能通過擴容consumer快速消費的。比如開始時分配了2個partition,由2個consumer消費,但是突然發送方大量發送消息(這個在日常運維中經常遇到),導致消息快速堆積,這時我們如何能快速擴容消費這些消息呢?這時增加partition和consumer都沒有用,因為堆積的那2個partition只能由2個consumer來消費,這時你只能縱向擴展,而不能橫向擴展,而我們都知道縱向擴展很多時候是不現實的。

拿來就能用!去哪兒網消息中間件 QMQ 詳解 | 技術頭條

基於這些考慮我們單獨設計了QMQ的消費模型,並制定了對應的存儲模型。我們的設計考慮的是消費和存儲模型是完全解耦,consumer可以很容易的擴容縮容,從現在來看這個選擇也是正確的。現在去哪兒網的系統架構基本上呈現為基於消息驅動的架構,在我們內部系統之間的交互大部分都是以消息這種非同步的方式來進行。比如我們酒店的訂單變更消息就有接近70個不同的消費組訂閱(可以將消費組理解為不同的應用),整個交易流程都是靠消息來驅動,那麼從上面對基於partition模型的描述來看,要在70個不同應用之間協調partition和consumer的均衡幾乎是不可能的。

存儲模型

上面我們已經描述了QMQ沒有採用基於partition的存儲模型,但是Kafka和RocketMQ的存儲實現方式後是有很多地方是值得借鑒的:

  • 順序append文件,提供很好的寫入性能

  • 順序消費文件,使用offset表示消費進度,成本極低

  • 將所有subject的消息合併在一起,減少partition數量,可以提供更多的subject(RocketMQ)

在演化QMQ的存儲模型時,覺得這幾點是非常重要的。那如何在不使用partition的情況下能得到這些特性呢?

我們通過添加一層拉取的log(pull log)來動態映射consumer與partition的邏輯關係,這樣不僅解決了consumer的動態擴容縮容問題,還可以繼續使用一個offset表示消費進度。

下圖是QMQ的存儲模型:

拿來就能用!去哪兒網消息中間件 QMQ 詳解 | 技術頭條

先解釋一下上圖中的數字的意義。上圖中方框上方的數字,表示該方框在自己log中的偏移,而方框內的數字是該項的內容。比如message log方框上方的數字:3,6,9幾表示這幾條消息在message log中的偏移。而consume log中方框內的數字3,6,9,20正對應著message log的偏移,表示這幾個位置上的消息都是subject1的消息,consume log方框上方的1,2,3,4表示這幾個方框在consume log中的邏輯偏移。下面的pull log方框內的內容對應著consume log的邏輯偏移,而pull log方框外的數字表示pull log的邏輯偏移。

在實時broker存儲模型中有三種重要的log:

  • message log,所有subject的消息進入該log,消息的主存儲

  • consume log,存儲的是單個主題在message log的索引信息

  • pull log,每個consumer拉取消息的時候會產生pull log,pull log記錄的是拉取的消息在consumer log中的sequence,這樣消費者就可以使用pull log上的sequence來表示消費進度。

拿來就能用!去哪兒網消息中間件 QMQ 詳解 | 技術頭條


延時隊列

除了實時消息,QMQ還支持任意時間的延時消息,當時在開源版本的RocektMQ里提供了多種固定延遲level的延時消息支持,也就是可以發送幾種固定延時時間的延時消息,比如延時10s, 30s…,但是基於我們現有的業務特徵,這種不同延時level的延時消息並不能滿足我們的需求,我們需要任意時間延時。在OTA場景中,客人經常是預訂未來某個時刻的酒店或者機票,這個時間是不固定的,我們無法使用幾個固定的延時level來實現這個場景。

我們的延時消息是使用兩層hash wheel timer來實現的。第一層位於磁碟上,每個小時(默認一個小時,可配置)為一個刻度,每個刻度會生成一個數據日誌文件,根據業務特徵,我們覺得支持兩年(默認兩年,可配置)內任意時間延時就夠了,那麼最多會生成2 * 366 * 24 = 17568個文件。第二層在內存中,當消息的投遞時間即將到來的時候,會將這個小時的消息索引 (偏移量,投遞時間等) 從磁碟文件載入到內存中的 hash wheel timer上。

拿來就能用!去哪兒網消息中間件 QMQ 詳解 | 技術頭條

在延時消息里也存在三種 log:

  • message log,和實時消息里的message log類似,收到消息後append到該 log,append成功後立即返回。

  • schedule log,按照投遞時間組織,每小時一個。該log是回放message log後根據延時時間放置對應的log上,這是上面描述的兩層hash wheel timer的第一層,位於磁碟上。該log包含完整消息內容,所以message log里回放了之前的都可以刪除,可以大大的節約磁碟空間。

  • dispatch log,延時消息投遞後寫入,主要用於在應用重啟後能確定哪些消息已經投遞。

工程地址:https://github.com/qunarcorp/qmq

作者簡介:去哪兒網QMQ團隊隸屬於去哪兒網基礎研發部 - 基礎架構部,主要負責開發和維護QMQ消息中間件,滿足業務團隊需求,為業務團隊提供良好的消息使用體驗,同時保證系統的平穩運行。

負責人王克禮:2015年加入去哪兒網,資深Java開發工程師,具備多年企業中間件的開發實踐經驗,完整參與了QMQ新版本的設計與實現,希望能夠持續提升QMQ。

【END】

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 CSDN 的精彩文章:

華為將發布鯤鵬 920 晶元數據;三星 S10 自燃;Mageia 7 正式發布 | 極客頭條
Github Trending被中文項目「佔領」,國外開發者不開心了

TAG:CSDN |