當前位置:
首頁 > 知識 > RocketMQ 源碼學習 2 : Namesrv

RocketMQ 源碼學習 2 : Namesrv

(點擊

上方公眾號

,可快速關注)




來源:謝晞鳴 ,


fdx321.github.io/2017/08/17/【RocketMQ源碼學習】2-Namesrv/




1. Namesrv 簡介



Namesrv 可以理解為一個註冊中心, 整個Namesrv的代碼非常簡單,主要包含兩塊功能:






  • 管理一些 KV 的配置



  • 管理一些 Topic、Broker的註冊信息







2. Namesrv 啟動過程




啟動過程主要涉及 NamesrvStartup/NamesrvController 兩個類, NamesrvStartup 負責解析命令行的一些參數到各種 Config 對象中(NamesrvConfig/NettyServerConfig等),如果命令行參數中帶有配置文件的路徑,也會從配置文件中讀取配置到各種 Config 對象中,然後初始化 NamesrvController,配置shutdownHook, 啟動 NamesrvController。 NamesrvController 會去初始化和啟動各個組件,主要是:






  • 創建NettyServer,註冊 requestProcessor,用於處理不同的網路請求



  • 啟動 NettyServer



  • 啟動各種 scheduled task.




不僅僅 Namesrv 是這樣,其他模塊在啟動過程中也都是 startup/controller/config 一起完成這樣的套路。



3. Namesrv 主要組件




Processor 線程池,nettyServer 接收到請求後,封裝成任務提交到該線程池。


remoting 模塊維護了這樣一個 processorTable:





HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable



一個 processor 可以處理多個 request code, 多個 processor 也可以共用一個線程池。對於 Namesrv, 只有一個 processor 線程池,給兩個 Processor 共享。




DefaultRequestProcessor(Namesrv 還有一個 ClusterTestRequestProcessor 繼承了該 Processor,在 clusterTest enable的情* 況下使用它來 getRouteInfoByTopic),用來處理 namesrv 接收到的所有 RequestCode, Processor 內部會根據不同的RequestCode 調用不同的方法。




KVConfigManager, 維護了一些KV方式的配置數據,可以根據請求,執行添加、刪除、查詢等操作




RouteInfoManager, 維護了topic/broker/cluster/filter這些東西的路由信息,同樣支持增刪改查的操作




schedued 線程,按一定的頻率做兩個事情,掃描不活躍的broker;列印所有KV配置信息



4. 以broker註冊為例看下Namesrv的工作過程




1. DefaultRequestProcessor 處理來自 NettyServer的 [RemotingCommand] request, 如果 request.getCode 是 RequestCode.REGISTER_BROKER, 就去註冊。這裡會根據request.version來判斷,從V3_0_11 開始支持了FilterServer。




2. 從 request 解碼得到 RegisterBrokerRequestHeader, 包含以下欄位:






  • brokerName, // 默認是BrokerConfig里的獲得的locakHostName



  • brokerAddr, //brokerConfig.getBrokerIP1() + 「:」 + nettyServerConfig.getListenPort()



  • clusterName, //默認是BrokerConfig的」DefaultCluster」



  • haServerAddr, //brokerConfig.getBrokerIP2() + 「:」 + messageStoreConfig.getHaListenPort()



  • brokerId, //如果是MASTER,就是MixAll.MASTER_ID(也就0),否則就是其他




3. 從 request.body 解碼得到 RegisterBrokerBody, RegisterBrokerBody 包含以下內容,用JSON的方式來描述吧:





{


  "topicConfigSerializeWrapper": {


      "topicConfigTable":{


         "topic_xxx":{


           "defaultReadQueueNums":"16",


          "defaultWriteQueueNums":"16",


          "topicName":"xxx",


          "readQueueNums":"",


          "writeQueueNums":"",


          "perm":"",


          "topicFilterType":"",


          "topicSysFlag":"",


          "order":""


         },


      },


      "dataVersion":{


         "timestamp":"xxxx",


         "counter":"xxxx"


      }


   },


  "filterServerList":[


     "",//filterServerAddr


  ]


}




4. 在 clusterAddrTable 中新增一條記錄


5. 在 brokerAddrTable 中新增一條記錄,這裡會構建一個BrokerData





{


  "cluster":"xxx",


  "brokerName":"xxx",


  "brokerAddrs":{


     "brokerId_xx":"broker address xxx"


   }


}




6. 如果是第一次註冊或者topicConfig發生了變更,會去更新topicQueueTable


7. 在brokerLiveTable新增該broker


8. 在filterServerTable新增這些filterServer的地址列表




5.其他




以上內容看下來,namesrv 是一個無狀態的應用,可以水平任意擴展。每一個 broker 都會和所有的 namesrv 保持長連接(有個scheduled task會按一定頻率給所有namesrv做register broker的操作),所以 namesrv 之間沒有主從關係,也不需要複製數據。client(producer/consumer) 隨機選一個 namesrv 連接。client 中的 namesrv 地址列表是怎麼來的呢,有兩種方式:






  1. 通過命令行或配置文件在啟動的時候獲得的



  2. 通過 Scheduled task,按一定的頻率從一個 web 服務 fetch的(web服務可以自建),如果有變更,就更新這個 namesrv 地址列表。




client 選擇 namesrv的過程如下, index遞增取模,然並不是每次都這麼干,取到後會緩存起來。





if (addrList != null && !addrList.isEmpty()) {


    for (int i = 0; i < addrList.size(); i++) {


        int index = this.namesrvIndex.incrementAndGet();


        index = Math.abs(index);


        index = index % addrList.size();


        String newAddr = addrList.get(index);


        this.namesrvAddrChoosed.set(newAddr);


        Channel channelNew = this.createChannel(newAddr);


        if (channelNew != null)


            return channelNew;


    }


}




看到這裡我產生了疑問,那豈不是每個 client 啟動的時候都取的是第一個 namesrv,它不會壓力很大嗎,後來發現 namesrvIndex 的初始值是隨機的。




以上所有扯淡都是基於源碼 https://github.com/apache/incubator-rocketmq (tag:rocketmq-all-4.1.0-incubating)所貼代碼有所刪減。




看完本文有收穫?請轉發分享給更多人


關注「ImportNew」,提升Java技能


喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 ImportNew 的精彩文章:

10 個有關 String 的面試問題
spring-cloud 服務網關中的 Timeout 設置

TAG:ImportNew |