當前位置:
首頁 > 最新 > 消息隊列的對比調研

消息隊列的對比調研

我們發現Redis的作者出了一個新的消息隊列系統Disque,我做了一點調研來決定我們使用哪種消息隊列,主要對比了Disque、Kafka和RocketMQ。

Disque的特性

消息發送可以選擇至少一次或者最多一次。

消息需要消費者確認。

如果沒有確認,會一直重發,直至到期。確認信息會廣播給擁有消息副本的所有結點,然後消息會被垃圾收集或者刪除。

隊列是持久的。(需要開啟aof)

Disque默認只運行在內存里,持久性是通過同步備份實現的。

隊列為了保證最大吞吐量,不是全局一致的,但會儘力提供排序。

在壓力大的時候,消息不會丟棄,但會拒絕新的消息。(kafka在消息大量積壓時,會直接丟棄新消息)

消費者和生產者可以通過命令查看隊列中的消息。

隊列儘力提供FIFO。(kafka可以保證順序)

一組master作為中介,客戶端可以與任一結點通信。

中介有命名的隊列,無需消費者和生產者干預。

消息發送是事務性的,保證集群中會有所需數量的副本。

消息接收不是事務性的。

消費者默認是接收時是阻塞的,但也可以選擇查看新消息。

生產者在隊列滿時發新消息可以得到錯誤信息,也可以讓集群非同步地複製消息。

支持延遲作業,粒度是秒,最久可以長達數年。但需要消耗內存。(kafka不支持,這個功能用來做分布式定時任務系統很不錯)

消費者和生產者可以連接不同的結點。

Disque的一些不足

最近一次提交時間:Apr 29, 2016(調研時間2017-2-21)

社區並不活躍,網上可以查到的資料較少

C編寫,我們的技術棧為go和c#,我花了一些時間閱讀了資料,並做了大量測試,也閱讀了一些源碼,在一定程度上彌補了上述問題

由於運行在內存,獲得大吞吐量的同時,失去了保存大量消息的能力

Disque的消息存儲

一條消息稱為一個job,job會根據配置的副本數量分布在多個節點的內存中,隊列信息是每個節點單獨存儲的

存儲job使用的跳錶

Kafka的特性和優勢

以時間複雜度為O(1)的方式提供消息持久化能力,即使對TB級以上數據也能保證常數時間複雜度的訪問性能。

高吞吐率。即使在非常廉價的商用機器上也能做到單機支持每秒100K條以上消息的傳輸。

支持Kafka Server間的消息分區,及分布式消費,同時保證每個Partition內的消息順序傳輸。(這裡是個很討巧的做法,topic下所有消息順序傳輸幾乎不能實現;相比之下Disque更加簡單暴力,直接宣稱不保證順序)

同時支持離線數據處理和實時數據處理。

Scale out:支持在線水平擴展。

最近一次提交時間:Feb 20, 2017(調研時間2017-2-21)

社區非常活躍,網上資料很多,眾多公司在使用

Kafka的消息存儲

partition簡單圖示.png

物理存儲簡單圖示.png

RocketMQ與Kafka的一些對比

RocketMQ使用Java實現,kafka使用Scala實現

性能上kafka略高,可用性和數據可靠性都差不多(因為存儲模型差不多)

RocketMQ支持事務消息(這裡的事務消息存在缺陷,而且在分布式事務中)

與 主要在這幾個方面:消息的存儲、Prdocuer端的服務發現、消費offset的存儲、consumer負載均衡、Name Server和ZooKeeper

總體來說, 與 比較類似,RocketMQ數據安全性稍好,kafka性能稍好;RocketMQ不需要zookeeper,但是同樣需要額外的機器來部署他的Name Server

另外, 有延時消費功能

RocketMQ和Disque的對比

在消息量大時, RocketMQ在成本上具有優勢;消息量小時,Disque具有優勢

性能: Disque > RocketMQ

數據安全性: RocketMQ > Disque

都有延時消費功能

RocketMQ運維更加複雜,需要額外部署Name Server集群

Disque客戶端對go的支持更好

一些匯總

數據安全性: RocketMQ > Kafka > Disque

吞吐量:Disque > Kafka > RocketMQ

機器成本(數據量大時):Disque > RocketMQ ≈ Kafka

機器成本(數據量小時): RocketMQ ≈ Kafka > Disque

Kafka和RocketMQ支持1對多廣播,Disque不支持

Disque和RocketMQ支持延時消費, Kafka不支持

我們的現狀:

數據量不大

需要一個定時任務系統,但是並沒有太多開發力量自己開發

沒有部署Zookeeper,使用的consul做服務註冊和發現等

我們的選擇:Disque

數據量不大,所以Disque消息積壓能力弱,並不會引發問題

超高的並發能力在大促時可以發揮力量

有延時消費的功能,可以用來做定時任務

運維比較簡單

Disque集群時需要注意的點

只要集群中有一個節點失效,則 ADDJOB topic message timeout 這種默認命令會返回失敗,必須使用 ADDJOB topic message timeout REPLICATE n RETRY m 這種帶有 REPLICATE 和 RETRY 的命令

在使用 REPLICATE 參數時,必須同時有 RETRY 命令,否則 REPLICATE 參數將失效,並被固定為1

如果生產者和消費者連接不同的節點,如果消費者在 GETJOB 後不立即 ACKJOB,則會產生重複消費

如果生產者和消費者連接不同的節點,消費者會產生比較大的延遲,經常發生生產者發送消息幾秒之後消費者才收到消息

這些命令將可能會返回錯誤的值:QLEN、QPEEK、DEQUEUE

測試數據

測試用的我的開發機mac book air,處理器:1.6 GHz Intel Core i5,內存:4 GB 1600 MHz DDR3

Replicate=2,3節點集群(各自佔用一個核),20個topic,每個topic 2w消息,每條消息12B(因為我的機器內存比較小...)

讀寫同時進行讀寫比1:1,QPS:1.8w 對應單機時QPS:2.3w,每條消息1KB

讀寫同時進行讀寫比1:1,QPS:2w (開啟aof:1.7w) 對應單機時QPS:2.7w

只發消息:QPS:1w3 (開啟aof:1w) 對應單機是QPS:3.2w

只收消息:QPS:0.8w 對應單機QPS:1.8w

EZMqClient是一組介面,用於操作遠程消息隊列,目前完成了Disque的實現版本。這個實現版本在disque-go的基礎上封裝而成。解決了超時後error和message同時為nil的bug,解決了有時會panic的問題,封裝了使用細節,增加消費監聽介面。

封裝了Disque的配置細節,結合 ,可使Disque集群擁有高可用、可熱擴展的能力

同時提供消息廣播和定時任務介面(這裡在我有空時會再更新一篇博客來說明我的實現方式)

下面是 的使用示例:

// EZMqClient mqClient = ezlib.NewDisqueClientWithDefaultConf(conf.MqServer, conf.MqClient) // 添加消息監聽 mqClient.AddConsumeListener("testTopic", func(job *ezlib.EZJob) { fmt.Printf("consume message: %v", job) }) // 發送消息 mqClient.Push("testTopic", "bbbbbb") // 發送延時消息,發送後,消息隊列會在delay之後將消息推送給消費者 mqClient.PushDelay("testDelayTopic", "aaaaaaaaaaaaaaa", 15*time.Second) // 定時任務,你可以在多個節點啟動相同的定時任務,系統會保證只有一個節點去執行任務,所以只需要有一個節點存活,這個定時任務就可以執行 mqClient.AddCrontab("testCrontab", "0/10 * * * * *", func(taskTime time.Time) { fmt.Printf("crontab taskTime:%v now:%v", taskTime, time.Now()) }) /////////////// 消息廣播 ///////////////////////// // 增加廣播消息監聽,同一個topic下的不同的group都會收到廣播消息,同一個group中只有一個節點會收到消息 mqClient.AddBroadcastListener("broadcastNameTest", "broadcastNameTestGroup1", func(msg string) { fmt.Printf("broadcast1:%v", msg) }) // 發送廣播消息 mqClient.PushBroadcast("broadcastNameTest", "hahahahaha") // 發送廣播延時消息 mqClient.PushBroadcastDelay("broadcastNameTest", "ooooooooooo", 10*time.Second)

Disque熱擴展

熱擴展:Disque在一個節點內存不足時,收到新的Job,會將這個Job轉發給其他內存足夠的節點;熱擴展只需增加節點,並且通過Disque提供的客戶端將新啟動的節點加進集群即可

Disque如何實現高可用

高可用: EZMqClient 會為每個addjob操作增加參數指定副本數量,並且在getjob成功之後調用ackjob,以此保證「至少一次」的消息消費;當有節點失效時,只要一個job的多個副本不是都在那些失效節點上時,則job不會丟失,整個集群正常工作

目前打算設置副本數量為2,集群物理機3台,由於Disque單線程,一台物理機可以啟動多個Disque實例,但需要注意job的2個副本不可處於同一台物理機,否則這台物理機失效時將導致job丟失,考慮到Disque的吞吐量完全足夠,而且Disque無法保證job的2個副本所在的節點一定會分布在不同的物理機上,所以單機啟動一個Disque實例就可以了,可以容忍集群中1台物理機的掛機

閱讀Disque源碼的一些建議

Disque 大量重用了 Redis 的底層代碼, 比如數據結構部分、事件部分、網路通信部分、伺服器主循環部分等等。

Job會根據命令指定的副本數量存放在多個節點中,Queue底層為跳錶,客戶端addjob時連接哪個節點則在哪個節點建立Queue(這個節點沒有這個topic的Queue時),副本傳播到的節點只會存儲Job副本不會建立Queue

只有同一個topic的所有job都在同一個節點的同一個Queue中時,才能保證順序(但由於網路延遲,消費者也不一定會按順序接收到消息),其他情況都無法保證順序(這裡比較複雜,想要詳細了解的同學請自行閱讀源碼並實際操作嘗試)

如果對Redis源碼有閱讀過的同學,可以只需要閱讀Disque的job、queue、cluster、disque* 這幾部分的源碼

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 推酷 的精彩文章:

React Native 入坑筆記
光速React–Vixlet
上海共享雨傘上線當天就不翼而飛
陽光沙灘上的小米手機 6 拍妹兒體驗
13種應用推廣方法與渠道

TAG:推酷 |

您可能感興趣

消息隊列篇—常用消息隊列MQ產品介紹及對比
消息隊列 MQ 專欄
大規模系統的消息隊列技術方案!
進行隊列條令訓練
面對美國將革命衛隊列為恐怖組織的威脅,伊朗怎麼應對?
油炸食品消費與全因死亡率、心血管死亡率和癌症死亡率的關聯:前瞻性隊列研究
中軟國際哈爾濱ETC:消息隊列消息代理和消息中間件的區別聯繫
印度閱兵隊列中居然有手拿掃帚的清潔工,只因為前面隊列大牲口多
Linux 下的進程間通信:使用管道和消息隊列
軍樂隊列隊行進
金融級消息隊列的演進—螞蟻金服的實踐之路
B測藍帖 大型戰場將會列入單獨隨機隊列
當過兵的人都進行過隊列訓練,但是能說出隊列訓練作用恐怕沒幾個
印軍隊列行進
糞鈣衛蛋白和內鏡下UCEIS評分預測急性重症UC的短期結局:前瞻性隊列研究
消息隊列CKafka
RabbitMQ高級篇九TTL設置隊列或消息有效期隊列及消息
進程間的通信 IPC——實現消息隊列(msg)
JCO:利妥昔單抗時代濾泡性淋巴瘤死亡原因:隊列匯總分析
設計師分享:守望先鋒的組隊與匹配隊列