當前位置:
首頁 > 知識 > RocketMQ底層通信機制

RocketMQ底層通信機制

分散式系統各個角色間的通信效率很關鍵,通信效率的高低直接影響系統性能,基於Socket實現一個高效的Tcp通信協議是個很有挑戰的事情,本節說明RocketMQ是如何解決這個問題的

01

Remoting模塊

_____

RocketMQ的通信相關代碼在Remoting模塊里,先來看看主要類結構。

RocketMQ底層通信機制

圖1-1 Remoting模塊的類繼承關係

RemotingService為最上層介面,定義了三個方法:

void start();

void shutdown();

void registerRPCHook(RPCHookrpcHook);

RemotingClient,RemotingServer繼承RemotingService介面, 並增加了自己特有的方法。

代碼清單1-1 RemotingClient主要函數定義

1void registerProcessor(final int requestCode, finalNettyRequestProcessor processor,final ExecutorService executor);
2RemotingCommand invokeSync(final String addr, final RemotingCommandrequest, final long timeoutMillis);
3void invokeAsync(final String addr, final RemotingCommand request,final long timeoutMillis,final InvokeCallback invokeCallback);
4void invokeOneway(final String addr, final RemotingCommand request,final long timeoutMillis);
5void updateNameServerAddressList(final List<String> addrs);

然後看看具體的實現類,NettyRemotingClient和NettyRemotingServer分別實現了RemotingClient和RemotingServer, 而且都繼承了NettyRemotingAbstract類.

通過上面的封裝,RocketMQ各個模塊間的通信,可以通過發送統一格式的自定義消息(RemotingCommand)來完成的,各個模塊間的通信實現簡潔明了。

比如NameServer模塊中,NameServerController有個remotingServer變數,NameServer在啟動時初始化好各個變數,然後啟動remotingServer即可,剩下NameServer要做的是專心實現好處理RemotingCommand的邏輯。

代碼清單1-2 NameServer處理主流程代碼

1@Override
2public RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {
3 if (log.isDebugEnabled()){
4 log.debug("receive request, {} {} {}",
5 request.getCode(),
6 RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
7 request);
8 }
9 switch (request.getCode()){
10 caseRequestCode.PUT_KV_CONFIG:
11 returnthis.putKVConfig(ctx, request);
12 caseRequestCode.GET_KV_CONFIG:
13 returnthis.getKVConfig(ctx, request);
14 caseRequestCode.DELETE_KV_CONFIG:
15 returnthis.deleteKVConfig(ctx, request);
16 caseRequestCode.REGISTER_BROKER:
17 VersionbrokerVersion = MQVersion.value2Version(request.getVersion());
18 if (brokerVersion.ordinal()>= MQVersion.Version.V3_0_11.ordinal()) {
19 returnthis.registerBrokerWithFilterServer(ctx, request);
20 } else {
21 returnthis.registerBroker(ctx, request);
22 }
23 caseRequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
24 returnthis.getHasUnitSubUnUnitTopicList(ctx, request);
25 caseRequestCode.UPDATE_NAMESRV_CONFIG:
26 returnthis.updateConfig(ctx, request);
27 caseRequestCode.GET_NAMESRV_CONFIG:
28 returnthis.getConfig(ctx, request);
29 default:
30 break;
31 }
32 return null;
33}

在Consumer的源碼中,獲取消息的底層的通信部分也是發送一個RemotingComand 請求,返回的response也是個RemotingCommand類型。

代碼清單1-3 Consumer請求消息底層實現代碼

1private PullResult pullMessageSync(//
2 final String addr, // 1
3 final RemotingCommandrequest, // 2
4 final long timeoutMillis//3
5) throws RemotingException, InterruptedException, MQBrokerException{
6 RemotingCommand response =this.remotingClient.invokeSync(addr, request, timeoutMillis);
7 assert response != null;
8 returnthis.processPullResponse(response);
9}

從源碼中可以看出,RocketMQ中複雜的通信過程,被RemotingCommand統一起來,大部分的邏輯都是通過發送Command,接受並處理Command完成。

02

協議設計和編解碼

_____

RocketMQ自己定義了一個通信協議,使得模塊間傳輸的二進位消息和有意義的內容之間互相轉換。協議格式如圖1-2所示。

RocketMQ底層通信機制

圖1-2 RocketMQ的通信協議

(1)第一部分是大端4個位元組整數,值等於第二,三,四部分長度總和

(2)第二部分是大端4個位元組整數,值等於第三部分的長度

(3)第三部分是通過json 序列化的數據

(4)第四部分是通過應用自定義二進位序列化的數據

消息的解碼過程在RomotingCommand的decode函數里。

代碼清單1-4 消息解碼函數

1public static RemotingCommand decode(final ByteBuffer byteBuffer) {
2 int length =byteBuffer.limit();
3 int oriHeaderLen =byteBuffer.getInt();
4 int headerLength =getHeaderLength(oriHeaderLen);
5 byte[] headerData = newbyte[headerLength];
6 byteBuffer.get(headerData);
7 RemotingCommand cmd =headerDecode(headerData, getProtocolType(oriHeaderLen));
8 int bodyLength = length - 4 - headerLength;
9 byte[] bodyData = null;
10 if (bodyLength > 0) {
11 bodyData = newbyte[bodyLength];
12 byteBuffer.get(bodyData);
13 }
14 cmd.body = bodyData;
15 return cmd;
16}

對應的消息編碼過程在RemotingCommand的encode函數中。

代碼清單1-5 消息編碼函數

1public ByteBuffer encode() {
2 // 1> header lengthsize
3 int length = 4;
4 // 2> header datalength
5 byte[] headerData =this.headerEncode();
6 length +=headerData.length;
7 // 3> body data length
8 if (this.body != null) {
9 length += body.length;
10 }
11 ByteBuffer result =ByteBuffer.allocate(4 + length);
12 // length
13 result.putInt(length);
14 // header length
15 result.put(markProtocolType(headerData.length,serializeTypeCurrentRPC));
16 // header data
17 result.put(headerData);
18 // body data;
19 if (this.body != null) {
20 result.put(this.body);
21 }
22 result.flip();
23 return result;
24}

03

Netty庫

_____

RocketMQ是基於Netty庫來完成RemotingServer和RemotingClient具體的通信實現的,Netty是個事件驅動的網路編程框架,它屏蔽了Java Socket,Nio等複雜細節,用戶只需用好Netty,就可以實現一個網路編程專家+並發編程專家水平的Server、Client網路程序。應用Netty有一定的門檻,需要了解它的EventLoopGroup,Channel,Handler模型以及各種具體的配置。RocketMQ利用Netty實現的通信類是NettyRemotingServer和NettyRemotingClient,用戶也可以參考這兩個類的實現來學習使用Netty。

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 程序員小新人學習 的精彩文章:

操作系統開發什麼是內核?
Discuz全版本任意文件刪除漏洞

TAG:程序員小新人學習 |