RocketMQ 源碼學習 2 : Namesrv
(點擊
上方公眾號
,可快速關注)
來源:謝晞鳴 ,
fdx321.github.io/2017/08/17/【RocketMQ源碼學習】2-Namesrv/
1. Namesrv 簡介
Namesrv 可以理解為一個註冊中心, 整個Namesrv的代碼非常簡單,主要包含兩塊功能:
管理一些 KV 的配置
管理一些 Topic、Broker的註冊信息
2. Namesrv 啟動過程
啟動過程主要涉及 NamesrvStartup/NamesrvController 兩個類, NamesrvStartup 負責解析命令行的一些參數到各種 Config 對象中(NamesrvConfig/NettyServerConfig等),如果命令行參數中帶有配置文件的路徑,也會從配置文件中讀取配置到各種 Config 對象中,然後初始化 NamesrvController,配置shutdownHook, 啟動 NamesrvController。 NamesrvController 會去初始化和啟動各個組件,主要是:
創建NettyServer,註冊 requestProcessor,用於處理不同的網路請求
啟動 NettyServer
啟動各種 scheduled task.
不僅僅 Namesrv 是這樣,其他模塊在啟動過程中也都是 startup/controller/config 一起完成這樣的套路。
3. Namesrv 主要組件
Processor 線程池,nettyServer 接收到請求後,封裝成任務提交到該線程池。
remoting 模塊維護了這樣一個 processorTable:
HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable
一個 processor 可以處理多個 request code, 多個 processor 也可以共用一個線程池。對於 Namesrv, 只有一個 processor 線程池,給兩個 Processor 共享。
DefaultRequestProcessor(Namesrv 還有一個 ClusterTestRequestProcessor 繼承了該 Processor,在 clusterTest enable的情* 況下使用它來 getRouteInfoByTopic),用來處理 namesrv 接收到的所有 RequestCode, Processor 內部會根據不同的RequestCode 調用不同的方法。
KVConfigManager, 維護了一些KV方式的配置數據,可以根據請求,執行添加、刪除、查詢等操作
RouteInfoManager, 維護了topic/broker/cluster/filter這些東西的路由信息,同樣支持增刪改查的操作
schedued 線程,按一定的頻率做兩個事情,掃描不活躍的broker;列印所有KV配置信息
4. 以broker註冊為例看下Namesrv的工作過程
1. DefaultRequestProcessor 處理來自 NettyServer的 [RemotingCommand] request, 如果 request.getCode 是 RequestCode.REGISTER_BROKER, 就去註冊。這裡會根據request.version來判斷,從V3_0_11 開始支持了FilterServer。
2. 從 request 解碼得到 RegisterBrokerRequestHeader, 包含以下欄位:
brokerName, // 默認是BrokerConfig里的獲得的locakHostName
brokerAddr, //brokerConfig.getBrokerIP1() + 「:」 + nettyServerConfig.getListenPort()
clusterName, //默認是BrokerConfig的」DefaultCluster」
haServerAddr, //brokerConfig.getBrokerIP2() + 「:」 + messageStoreConfig.getHaListenPort()
brokerId, //如果是MASTER,就是MixAll.MASTER_ID(也就0),否則就是其他
3. 從 request.body 解碼得到 RegisterBrokerBody, RegisterBrokerBody 包含以下內容,用JSON的方式來描述吧:
{
"topicConfigSerializeWrapper": {
"topicConfigTable":{
"topic_xxx":{
"defaultReadQueueNums":"16",
"defaultWriteQueueNums":"16",
"topicName":"xxx",
"readQueueNums":"",
"writeQueueNums":"",
"perm":"",
"topicFilterType":"",
"topicSysFlag":"",
"order":""
},
},
"dataVersion":{
"timestamp":"xxxx",
"counter":"xxxx"
}
},
"filterServerList":[
"",//filterServerAddr
]
}
4. 在 clusterAddrTable 中新增一條記錄
5. 在 brokerAddrTable 中新增一條記錄,這裡會構建一個BrokerData
{
"cluster":"xxx",
"brokerName":"xxx",
"brokerAddrs":{
"brokerId_xx":"broker address xxx"
}
}
6. 如果是第一次註冊或者topicConfig發生了變更,會去更新topicQueueTable
7. 在brokerLiveTable新增該broker
8. 在filterServerTable新增這些filterServer的地址列表
5.其他
以上內容看下來,namesrv 是一個無狀態的應用,可以水平任意擴展。每一個 broker 都會和所有的 namesrv 保持長連接(有個scheduled task會按一定頻率給所有namesrv做register broker的操作),所以 namesrv 之間沒有主從關係,也不需要複製數據。client(producer/consumer) 隨機選一個 namesrv 連接。client 中的 namesrv 地址列表是怎麼來的呢,有兩種方式:
通過命令行或配置文件在啟動的時候獲得的
通過 Scheduled task,按一定的頻率從一個 web 服務 fetch的(web服務可以自建),如果有變更,就更新這個 namesrv 地址列表。
client 選擇 namesrv的過程如下, index遞增取模,然並不是每次都這麼干,取到後會緩存起來。
if (addrList != null && !addrList.isEmpty()) {
for (int i = 0; i < addrList.size(); i++) {
int index = this.namesrvIndex.incrementAndGet();
index = Math.abs(index);
index = index % addrList.size();
String newAddr = addrList.get(index);
this.namesrvAddrChoosed.set(newAddr);
Channel channelNew = this.createChannel(newAddr);
if (channelNew != null)
return channelNew;
}
}
看到這裡我產生了疑問,那豈不是每個 client 啟動的時候都取的是第一個 namesrv,它不會壓力很大嗎,後來發現 namesrvIndex 的初始值是隨機的。
以上所有扯淡都是基於源碼 https://github.com/apache/incubator-rocketmq (tag:rocketmq-all-4.1.0-incubating)所貼代碼有所刪減。
看完本文有收穫?請轉發分享給更多人
關注「ImportNew」,提升Java技能
![](https://pic.pimg.tw/zzuyanan/1488615166-1259157397.png)
![](https://pic.pimg.tw/zzuyanan/1482887990-2595557020.jpg)
※10 個有關 String 的面試問題
※spring-cloud 服務網關中的 Timeout 設置
TAG:ImportNew |