RocketMQ底層通信機制
分散式系統各個角色間的通信效率很關鍵,通信效率的高低直接影響系統性能,基於Socket實現一個高效的Tcp通信協議是個很有挑戰的事情,本節說明RocketMQ是如何解決這個問題的
01
Remoting模塊
_____
RocketMQ的通信相關代碼在Remoting模塊里,先來看看主要類結構。
圖1-1 Remoting模塊的類繼承關係
RemotingService為最上層介面,定義了三個方法:
void start();
void shutdown();
void registerRPCHook(RPCHookrpcHook);
RemotingClient,RemotingServer繼承RemotingService介面, 並增加了自己特有的方法。
代碼清單1-1 RemotingClient主要函數定義
1void registerProcessor(final int requestCode, finalNettyRequestProcessor processor,final ExecutorService executor);
2RemotingCommand invokeSync(final String addr, final RemotingCommandrequest, final long timeoutMillis);
3void invokeAsync(final String addr, final RemotingCommand request,final long timeoutMillis,final InvokeCallback invokeCallback);
4void invokeOneway(final String addr, final RemotingCommand request,final long timeoutMillis);
5void updateNameServerAddressList(final List<String> addrs);
然後看看具體的實現類,NettyRemotingClient和NettyRemotingServer分別實現了RemotingClient和RemotingServer, 而且都繼承了NettyRemotingAbstract類.
通過上面的封裝,RocketMQ各個模塊間的通信,可以通過發送統一格式的自定義消息(RemotingCommand)來完成的,各個模塊間的通信實現簡潔明了。
比如NameServer模塊中,NameServerController有個remotingServer變數,NameServer在啟動時初始化好各個變數,然後啟動remotingServer即可,剩下NameServer要做的是專心實現好處理RemotingCommand的邏輯。
代碼清單1-2 NameServer處理主流程代碼
1@Override
2public RemotingCommand processRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {
3 if (log.isDebugEnabled()){
4 log.debug("receive request, {} {} {}",
5 request.getCode(),
6 RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
7 request);
8 }
9 switch (request.getCode()){
10 caseRequestCode.PUT_KV_CONFIG:
11 returnthis.putKVConfig(ctx, request);
12 caseRequestCode.GET_KV_CONFIG:
13 returnthis.getKVConfig(ctx, request);
14 caseRequestCode.DELETE_KV_CONFIG:
15 returnthis.deleteKVConfig(ctx, request);
16 caseRequestCode.REGISTER_BROKER:
17 VersionbrokerVersion = MQVersion.value2Version(request.getVersion());
18 if (brokerVersion.ordinal()>= MQVersion.Version.V3_0_11.ordinal()) {
19 returnthis.registerBrokerWithFilterServer(ctx, request);
20 } else {
21 returnthis.registerBroker(ctx, request);
22 }
23 caseRequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
24 returnthis.getHasUnitSubUnUnitTopicList(ctx, request);
25 caseRequestCode.UPDATE_NAMESRV_CONFIG:
26 returnthis.updateConfig(ctx, request);
27 caseRequestCode.GET_NAMESRV_CONFIG:
28 returnthis.getConfig(ctx, request);
29 default:
30 break;
31 }
32 return null;
33}
在Consumer的源碼中,獲取消息的底層的通信部分也是發送一個RemotingComand 請求,返回的response也是個RemotingCommand類型。
代碼清單1-3 Consumer請求消息底層實現代碼
1private PullResult pullMessageSync(//
2 final String addr, // 1
3 final RemotingCommandrequest, // 2
4 final long timeoutMillis//3
5) throws RemotingException, InterruptedException, MQBrokerException{
6 RemotingCommand response =this.remotingClient.invokeSync(addr, request, timeoutMillis);
7 assert response != null;
8 returnthis.processPullResponse(response);
9}
從源碼中可以看出,RocketMQ中複雜的通信過程,被RemotingCommand統一起來,大部分的邏輯都是通過發送Command,接受並處理Command完成。
02
協議設計和編解碼
_____
RocketMQ自己定義了一個通信協議,使得模塊間傳輸的二進位消息和有意義的內容之間互相轉換。協議格式如圖1-2所示。
圖1-2 RocketMQ的通信協議
(1)第一部分是大端4個位元組整數,值等於第二,三,四部分長度總和
(2)第二部分是大端4個位元組整數,值等於第三部分的長度
(3)第三部分是通過json 序列化的數據
(4)第四部分是通過應用自定義二進位序列化的數據
消息的解碼過程在RomotingCommand的decode函數里。
代碼清單1-4 消息解碼函數
1public static RemotingCommand decode(final ByteBuffer byteBuffer) {
2 int length =byteBuffer.limit();
3 int oriHeaderLen =byteBuffer.getInt();
4 int headerLength =getHeaderLength(oriHeaderLen);
5 byte[] headerData = newbyte[headerLength];
6 byteBuffer.get(headerData);
7 RemotingCommand cmd =headerDecode(headerData, getProtocolType(oriHeaderLen));
8 int bodyLength = length - 4 - headerLength;
9 byte[] bodyData = null;
10 if (bodyLength > 0) {
11 bodyData = newbyte[bodyLength];
12 byteBuffer.get(bodyData);
13 }
14 cmd.body = bodyData;
15 return cmd;
16}
對應的消息編碼過程在RemotingCommand的encode函數中。
代碼清單1-5 消息編碼函數
1public ByteBuffer encode() {
2 // 1> header lengthsize
3 int length = 4;
4 // 2> header datalength
5 byte[] headerData =this.headerEncode();
6 length +=headerData.length;
7 // 3> body data length
8 if (this.body != null) {
9 length += body.length;
10 }
11 ByteBuffer result =ByteBuffer.allocate(4 + length);
12 // length
13 result.putInt(length);
14 // header length
15 result.put(markProtocolType(headerData.length,serializeTypeCurrentRPC));
16 // header data
17 result.put(headerData);
18 // body data;
19 if (this.body != null) {
20 result.put(this.body);
21 }
22 result.flip();
23 return result;
24}
03
Netty庫
_____
RocketMQ是基於Netty庫來完成RemotingServer和RemotingClient具體的通信實現的,Netty是個事件驅動的網路編程框架,它屏蔽了Java Socket,Nio等複雜細節,用戶只需用好Netty,就可以實現一個網路編程專家+並發編程專家水平的Server、Client網路程序。應用Netty有一定的門檻,需要了解它的EventLoopGroup,Channel,Handler模型以及各種具體的配置。RocketMQ利用Netty實現的通信類是NettyRemotingServer和NettyRemotingClient,用戶也可以參考這兩個類的實現來學習使用Netty。
※操作系統開發什麼是內核?
※Discuz全版本任意文件刪除漏洞
TAG:程序員小新人學習 |