消息隊列的對比調研
我們發現Redis的作者出了一個新的消息隊列系統Disque,我做了一點調研來決定我們使用哪種消息隊列,主要對比了Disque、Kafka和RocketMQ。
Disque的特性:
消息發送可以選擇至少一次或者最多一次。
消息需要消費者確認。
如果沒有確認,會一直重發,直至到期。確認信息會廣播給擁有消息副本的所有結點,然後消息會被垃圾收集或者刪除。
隊列是持久的。(需要開啟aof)
Disque默認只運行在內存里,持久性是通過同步備份實現的。
隊列為了保證最大吞吐量,不是全局一致的,但會儘力提供排序。
在壓力大的時候,消息不會丟棄,但會拒絕新的消息。(kafka在消息大量積壓時,會直接丟棄新消息)
消費者和生產者可以通過命令查看隊列中的消息。
隊列儘力提供FIFO。(kafka可以保證順序)
一組master作為中介,客戶端可以與任一結點通信。
中介有命名的隊列,無需消費者和生產者干預。
消息發送是事務性的,保證集群中會有所需數量的副本。
消息接收不是事務性的。
消費者默認是接收時是阻塞的,但也可以選擇查看新消息。
生產者在隊列滿時發新消息可以得到錯誤信息,也可以讓集群非同步地複製消息。
支持延遲作業,粒度是秒,最久可以長達數年。但需要消耗內存。(kafka不支持,這個功能用來做分布式定時任務系統很不錯)
消費者和生產者可以連接不同的結點。
Disque的一些不足
最近一次提交時間:Apr 29, 2016(調研時間2017-2-21)
社區並不活躍,網上可以查到的資料較少
C編寫,我們的技術棧為go和c#,我花了一些時間閱讀了資料,並做了大量測試,也閱讀了一些源碼,在一定程度上彌補了上述問題
由於運行在內存,獲得大吞吐量的同時,失去了保存大量消息的能力
Disque的消息存儲
一條消息稱為一個job,job會根據配置的副本數量分布在多個節點的內存中,隊列信息是每個節點單獨存儲的
存儲job使用的跳錶
Kafka的特性和優勢
以時間複雜度為O(1)的方式提供消息持久化能力,即使對TB級以上數據也能保證常數時間複雜度的訪問性能。
高吞吐率。即使在非常廉價的商用機器上也能做到單機支持每秒100K條以上消息的傳輸。
支持Kafka Server間的消息分區,及分布式消費,同時保證每個Partition內的消息順序傳輸。(這裡是個很討巧的做法,topic下所有消息順序傳輸幾乎不能實現;相比之下Disque更加簡單暴力,直接宣稱不保證順序)
同時支持離線數據處理和實時數據處理。
Scale out:支持在線水平擴展。
最近一次提交時間:Feb 20, 2017(調研時間2017-2-21)
社區非常活躍,網上資料很多,眾多公司在使用
Kafka的消息存儲
partition簡單圖示.png
物理存儲簡單圖示.png
RocketMQ與Kafka的一些對比
RocketMQ使用Java實現,kafka使用Scala實現
性能上kafka略高,可用性和數據可靠性都差不多(因為存儲模型差不多)
RocketMQ支持事務消息(這裡的事務消息存在缺陷,而且在分布式事務中)
與 主要在這幾個方面:消息的存儲、Prdocuer端的服務發現、消費offset的存儲、consumer負載均衡、Name Server和ZooKeeper
總體來說, 與 比較類似,RocketMQ數據安全性稍好,kafka性能稍好;RocketMQ不需要zookeeper,但是同樣需要額外的機器來部署他的Name Server
另外, 有延時消費功能
RocketMQ和Disque的對比
在消息量大時, RocketMQ在成本上具有優勢;消息量小時,Disque具有優勢
性能: Disque > RocketMQ
數據安全性: RocketMQ > Disque
都有延時消費功能
RocketMQ運維更加複雜,需要額外部署Name Server集群
Disque客戶端對go的支持更好
一些匯總
數據安全性: RocketMQ > Kafka > Disque
吞吐量:Disque > Kafka > RocketMQ
機器成本(數據量大時):Disque > RocketMQ ≈ Kafka
機器成本(數據量小時): RocketMQ ≈ Kafka > Disque
Kafka和RocketMQ支持1對多廣播,Disque不支持
Disque和RocketMQ支持延時消費, Kafka不支持
我們的現狀:
數據量不大
需要一個定時任務系統,但是並沒有太多開發力量自己開發
沒有部署Zookeeper,使用的consul做服務註冊和發現等
我們的選擇:Disque
數據量不大,所以Disque消息積壓能力弱,並不會引發問題
超高的並發能力在大促時可以發揮力量
有延時消費的功能,可以用來做定時任務
運維比較簡單
Disque集群時需要注意的點
只要集群中有一個節點失效,則 ADDJOB topic message timeout 這種默認命令會返回失敗,必須使用 ADDJOB topic message timeout REPLICATE n RETRY m 這種帶有 REPLICATE 和 RETRY 的命令
在使用 REPLICATE 參數時,必須同時有 RETRY 命令,否則 REPLICATE 參數將失效,並被固定為1
如果生產者和消費者連接不同的節點,如果消費者在 GETJOB 後不立即 ACKJOB,則會產生重複消費
如果生產者和消費者連接不同的節點,消費者會產生比較大的延遲,經常發生生產者發送消息幾秒之後消費者才收到消息
這些命令將可能會返回錯誤的值:QLEN、QPEEK、DEQUEUE
測試數據
測試用的我的開發機mac book air,處理器:1.6 GHz Intel Core i5,內存:4 GB 1600 MHz DDR3
Replicate=2,3節點集群(各自佔用一個核),20個topic,每個topic 2w消息,每條消息12B(因為我的機器內存比較小...)
讀寫同時進行讀寫比1:1,QPS:1.8w 對應單機時QPS:2.3w,每條消息1KB
讀寫同時進行讀寫比1:1,QPS:2w (開啟aof:1.7w) 對應單機時QPS:2.7w
只發消息:QPS:1w3 (開啟aof:1w) 對應單機是QPS:3.2w
只收消息:QPS:0.8w 對應單機QPS:1.8w
EZMqClient是一組介面,用於操作遠程消息隊列,目前完成了Disque的實現版本。這個實現版本在disque-go的基礎上封裝而成。解決了超時後error和message同時為nil的bug,解決了有時會panic的問題,封裝了使用細節,增加消費監聽介面。
封裝了Disque的配置細節,結合 ,可使Disque集群擁有高可用、可熱擴展的能力
同時提供消息廣播和定時任務介面(這裡在我有空時會再更新一篇博客來說明我的實現方式)
下面是 的使用示例:
// EZMqClient mqClient = ezlib.NewDisqueClientWithDefaultConf(conf.MqServer, conf.MqClient) // 添加消息監聽 mqClient.AddConsumeListener("testTopic", func(job *ezlib.EZJob) { fmt.Printf("consume message: %v", job) }) // 發送消息 mqClient.Push("testTopic", "bbbbbb") // 發送延時消息,發送後,消息隊列會在delay之後將消息推送給消費者 mqClient.PushDelay("testDelayTopic", "aaaaaaaaaaaaaaa", 15*time.Second) // 定時任務,你可以在多個節點啟動相同的定時任務,系統會保證只有一個節點去執行任務,所以只需要有一個節點存活,這個定時任務就可以執行 mqClient.AddCrontab("testCrontab", "0/10 * * * * *", func(taskTime time.Time) { fmt.Printf("crontab taskTime:%v now:%v", taskTime, time.Now()) }) /////////////// 消息廣播 ///////////////////////// // 增加廣播消息監聽,同一個topic下的不同的group都會收到廣播消息,同一個group中只有一個節點會收到消息 mqClient.AddBroadcastListener("broadcastNameTest", "broadcastNameTestGroup1", func(msg string) { fmt.Printf("broadcast1:%v", msg) }) // 發送廣播消息 mqClient.PushBroadcast("broadcastNameTest", "hahahahaha") // 發送廣播延時消息 mqClient.PushBroadcastDelay("broadcastNameTest", "ooooooooooo", 10*time.Second)
Disque熱擴展
熱擴展:Disque在一個節點內存不足時,收到新的Job,會將這個Job轉發給其他內存足夠的節點;熱擴展只需增加節點,並且通過Disque提供的客戶端將新啟動的節點加進集群即可
Disque如何實現高可用
高可用: EZMqClient 會為每個addjob操作增加參數指定副本數量,並且在getjob成功之後調用ackjob,以此保證「至少一次」的消息消費;當有節點失效時,只要一個job的多個副本不是都在那些失效節點上時,則job不會丟失,整個集群正常工作
目前打算設置副本數量為2,集群物理機3台,由於Disque單線程,一台物理機可以啟動多個Disque實例,但需要注意job的2個副本不可處於同一台物理機,否則這台物理機失效時將導致job丟失,考慮到Disque的吞吐量完全足夠,而且Disque無法保證job的2個副本所在的節點一定會分布在不同的物理機上,所以單機啟動一個Disque實例就可以了,可以容忍集群中1台物理機的掛機
閱讀Disque源碼的一些建議
Disque 大量重用了 Redis 的底層代碼, 比如數據結構部分、事件部分、網路通信部分、伺服器主循環部分等等。
Job會根據命令指定的副本數量存放在多個節點中,Queue底層為跳錶,客戶端addjob時連接哪個節點則在哪個節點建立Queue(這個節點沒有這個topic的Queue時),副本傳播到的節點只會存儲Job副本不會建立Queue
只有同一個topic的所有job都在同一個節點的同一個Queue中時,才能保證順序(但由於網路延遲,消費者也不一定會按順序接收到消息),其他情況都無法保證順序(這裡比較複雜,想要詳細了解的同學請自行閱讀源碼並實際操作嘗試)
如果對Redis源碼有閱讀過的同學,可以只需要閱讀Disque的job、queue、cluster、disque* 這幾部分的源碼
※React Native 入坑筆記
※光速React–Vixlet
※上海共享雨傘上線當天就不翼而飛
※陽光沙灘上的小米手機 6 拍妹兒體驗
※13種應用推廣方法與渠道
TAG:推酷 |
※消息隊列篇—常用消息隊列MQ產品介紹及對比
※消息隊列 MQ 專欄
※大規模系統的消息隊列技術方案!
※進行隊列條令訓練
※面對美國將革命衛隊列為恐怖組織的威脅,伊朗怎麼應對?
※油炸食品消費與全因死亡率、心血管死亡率和癌症死亡率的關聯:前瞻性隊列研究
※中軟國際哈爾濱ETC:消息隊列消息代理和消息中間件的區別聯繫
※印度閱兵隊列中居然有手拿掃帚的清潔工,只因為前面隊列大牲口多
※Linux 下的進程間通信:使用管道和消息隊列
※軍樂隊列隊行進
※金融級消息隊列的演進—螞蟻金服的實踐之路
※B測藍帖 大型戰場將會列入單獨隨機隊列
※當過兵的人都進行過隊列訓練,但是能說出隊列訓練作用恐怕沒幾個
※印軍隊列行進
※糞鈣衛蛋白和內鏡下UCEIS評分預測急性重症UC的短期結局:前瞻性隊列研究
※消息隊列CKafka
※RabbitMQ高級篇九TTL設置隊列或消息有效期隊列及消息
※進程間的通信 IPC——實現消息隊列(msg)
※JCO:利妥昔單抗時代濾泡性淋巴瘤死亡原因:隊列匯總分析
※設計師分享:守望先鋒的組隊與匹配隊列