當前位置:
首頁 > 知識 > Netty解決TCP粘包和拆包問題的四種方案

Netty解決TCP粘包和拆包問題的四種方案

在RPC框架中,TCP粘包和拆包問題是必須解決一個問題,因為RPC框架中,各個微服務相互之間都是維繫了一個TCP長連接,比如dubbo就是一個全雙工的長連接。由於微服務往對方發送信息的時候,所有的請求都是使用的同一個連接,這樣就會產生粘包和拆包的問題。本文首先會對TCP粘包和拆包問題進行描述,然後介紹其常用的解決方案,最後會對Netty提供的幾種解決方案進行講解。這裡說明一下,由於oschina將「jie ma qi」認定為敏感文字,因而本文統一使用「解碼一器」表示該含義

1. 粘包和拆包

產生TCP粘包和拆包問題的主要原因是,操作系統在發送TCP數據的時候,底層會有一個緩衝區,例如1024個位元組大小,如果一次請求發送的數據量比較小,沒達到緩衝區大小,TCP則會將多個請求合併為同一個請求進行發送,這就形成了粘包問題;如果一次請求發送的數據量比較大,超過了緩衝區大小,TCP就會將其拆分為多次發送,這就是拆包,也就是將一個大的包拆分為多個小包進行發送。如下圖展示了TCP粘包和拆包的一個示意圖:

Netty解決TCP粘包和拆包問題的四種方案

打開今日頭條,查看更多圖片

上圖中演示了TCP粘包和拆包的三種情況:

  • A和B兩個包都剛好滿足TCP緩衝區的大小,或者說其等待時間已經達到TCP等待時長,從而還是使用兩個獨立的包進行發送;
  • A和B兩次請求間隔時間內較短,並且數據包較小,因而合併為同一個包發送給服務端;
  • B包比較大,因而將其拆分為兩個包B_1和B_2進行發送,而這裡由於拆分後的B_2比較小,其又與A包合併在一起發送。

2. 常見解決方案

對於粘包和拆包問題,常見的解決方案有四種:

  • 客戶端在發送數據包的時候,每個包都固定長度,比如1024個位元組大小,如果客戶端發送的數據長度不足1024個位元組,則通過補充空格的方式補全到指定長度;
  • 客戶端在每個包的末尾使用固定的分隔符,例如
    ,如果一個包被拆分了,則等待下一個包發送過來之後找到其中的
    ,然後對其拆分後的頭部部分與前一個包的剩餘部分進行合併,這樣就得到了一個完整的包;
  • 將消息分為頭部和消息體,在頭部中保存有當前整個消息的長度,只有在讀取到足夠長度的消息之後才算是讀到了一個完整的消息;
  • 通過自定義協議進行粘包和拆包的處理。

3. Netty提供的粘包拆包解決方案

3.1 FixedLengthFrameDecoder

對於使用固定長度的粘包和拆包場景,可以使用FixedLengthFrameDecoder,該解碼一器會每次讀取固定長度的消息,如果當前讀取到的消息不足指定長度,那麼就會等待下一個消息到達後進行補足。其使用也比較簡單,只需要在構造函數中指定每個消息的長度即可。這裡需要注意的是,FixedLengthFrameDecoder只是一個解碼一器,Netty也只提供了一個解碼一器,這是因為對於解碼是需要等待下一個包的進行補全的,代碼相對複雜,而對於編碼器,用戶可以自行編寫,因為編碼時只需要將不足指定長度的部分進行補全即可。下面的示例中展示了如何使用FixedLengthFrameDecoder來進行粘包和拆包處理:

public class EchoServer {
public void bind(int port) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 這裡將FixedLengthFrameDecoder添加到pipeline中,指定長度為20
ch.pipeline().addLast(new FixedLengthFrameDecoder(20));
// 將前一步解碼得到的數據轉碼為字元串
ch.pipeline().addLast(new StringDecoder());
// 這裡FixedLengthFrameEncoder是我們自定義的,用於將長度不足20的消息進行補全空格
ch.pipeline().addLast(new FixedLengthFrameEncoder(20));
// 最終的數據處理
ch.pipeline().addLast(new EchoServerHandler());
}
});
ChannelFuture future = bootstrap.bind(port).sync();
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new EchoServer().bind(8080);
}
}

上面的pipeline中,對於入棧數據,這裡主要添加了FixedLengthFrameDecoder和StringDecoder,前面一個用於處理固定長度的消息的粘包和拆包問題,第二個則是將處理之後的消息轉換為字元串。最後由EchoServerHandler處理最終得到的數據,處理完成後,將處理得到的數據交由FixedLengthFrameEncoder處理,該編碼器是我們自定義的實現,主要作用是將長度不足20的消息進行空格補全。下面是FixedLengthFrameEncoder的實現代碼:

public class FixedLengthFrameEncoder extends MessageToByteEncoder<String> {
private int length;
public FixedLengthFrameEncoder(int length) {
this.length = length;
}
@Override
protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out)
throws Exception {
// 對於超過指定長度的消息,這裡直接拋出異常
if (msg.length() > length) {
throw new UnsupportedOperationException(
"message length is too large, it"s limited " + length);
}
// 如果長度不足,則進行補全
if (msg.length() < length) {
msg = addSpace(msg);
}
ctx.writeAndFlush(Unpooled.wrappedBuffer(msg.getBytes()));
}
// 進行空格補全
private String addSpace(String msg) {
StringBuilder builder = new StringBuilder(msg);
for (int i = 0; i < length - msg.length(); i++) {
builder.append(" ");
}
return builder.toString();
}
}

這裡FixedLengthFrameEncoder實現了decode()方法,在該方法中,主要是將消息長度不足20的消息進行空格補全。EchoServerHandler的作用主要是列印接收到的消息,然後發送響應給客戶端:

public class EchoServerHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("server receives message: " + msg.trim());
ctx.writeAndFlush("hello client!");
}
}

對於客戶端,其實現方式基本與服務端的使用方式類似,只是在最後進行消息發送的時候與服務端的處理方式不同。如下是客戶端EchoClient的代碼:

public class EchoClient {
public void connect(String host, int port) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 對服務端發送的消息進行粘包和拆包處理,由於服務端發送的消息已經進行了空格補全,
// 並且長度為20,因而這裡指定的長度也為20
ch.pipeline().addLast(new FixedLengthFrameDecoder(20));
// 將粘包和拆包處理得到的消息轉換為字元串
ch.pipeline().addLast(new StringDecoder());
// 對客戶端發送的消息進行空格補全,保證其長度為20
ch.pipeline().addLast(new FixedLengthFrameEncoder(20));
// 客戶端發送消息給服務端,並且處理服務端響應的消息
ch.pipeline().addLast(new EchoClientHandler());
}
});
ChannelFuture future = bootstrap.connect(host, port).sync();
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new EchoClient().connect("127.0.0.1", 8080);
}
}

對於客戶端而言,其消息的處理流程其實與服務端是相似的,對於入站消息,需要對其進行粘包和拆包處理,然後將其轉碼為字元串,對於出站消息,則需要將長度不足20的消息進行空格補全。客戶端與服務端處理的主要區別在於最後的消息處理handler不一樣,也即這裡的EchoClientHandler,如下是該handler的源碼:

public class EchoClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("client receives message: " + msg.trim());
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush("hello server!");
}
}

這裡客戶端的處理主要是重寫了channelActive()和channelRead0()兩個方法,這兩個方法的主要作用在於,channelActive()會在客戶端連接上伺服器時執行,也就是說,其連上伺服器之後就會往伺服器發送消息。而channelRead0()主要是在伺服器發送響應給客戶端時執行,這裡主要是列印伺服器的響應消息。對於服務端而言,前面我們我們可以看到,EchoServerHandler只重寫了channelRead0()方法,這是因為伺服器只需要等待客戶端發送消息過來,然後在該方法中進行處理,處理完成後直接將響應發送給客戶端。如下是分別啟動服務端和客戶端之後控制台列印的數據:

// server
server receives message: hello server!
// client
client receives message: hello client!

3.2 LineBasedFrameDecoder與DelimiterBasedFrameDecoder

對於通過分隔符進行粘包和拆包問題的處理,Netty提供了兩個編解碼的類,LineBasedFrameDecoder和DelimiterBasedFrameDecoder。這裡LineBasedFrameDecoder的作用主要是通過換行符,即
或者
對數據進行處理;而DelimiterBasedFrameDecoder的作用則是通過用戶指定的分隔符對數據進行粘包和拆包處理。同樣的,這兩個類都是解碼一器類,而對於數據的編碼,也即在每個數據包最後添加換行符或者指定分割符的部分需要用戶自行進行處理。這裡以DelimiterBasedFrameDecoder為例進行講解,如下是EchoServer中使用該類的代碼片段,其餘部分與前面的例子中的完全一致:

@Override
protected void initChannel(SocketChannel ch) throws Exception {
String delimiter = "_$";
// 將delimiter設置到DelimiterBasedFrameDecoder中,經過該解碼一器進行處理之後,源數據將會
// 被按照_$進行分隔,這裡1024指的是分隔的最大長度,即當讀取到1024個位元組的數據之後,若還是未
// 讀取到分隔符,則捨棄當前數據段,因為其很有可能是由於碼流紊亂造成的
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,
Unpooled.wrappedBuffer(delimiter.getBytes())));
// 將分隔之後的位元組數據轉換為字元串數據
ch.pipeline().addLast(new StringDecoder());
// 這是我們自定義的一個編碼器,主要作用是在返回的響應數據最後添加分隔符
ch.pipeline().addLast(new DelimiterBasedFrameEncoder(delimiter));
// 最終處理數據並且返迴響應的handler
ch.pipeline().addLast(new EchoServerHandler());
}

上面pipeline的設置中,添加的解碼一器主要有DelimiterBasedFrameDecoder和StringDecoder,經過這兩個處理器處理之後,接收到的位元組流就會被分隔,並且轉換為字元串數據,最終交由EchoServerHandler處理。這裡DelimiterBasedFrameEncoder是我們自定義的編碼器,其主要作用是在返回的響應數據之後添加分隔符。如下是該編碼器的源碼:

public class DelimiterBasedFrameEncoder extends MessageToByteEncoder<String> {
private String delimiter;
public DelimiterBasedFrameEncoder(String delimiter) {
this.delimiter = delimiter;
}
@Override
protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out)
throws Exception {
// 在響應的數據後面添加分隔符
ctx.writeAndFlush(Unpooled.wrappedBuffer((msg + delimiter).getBytes()));
}
}

對於客戶端而言,這裡的處理方式與服務端類似,其pipeline的添加方式如下:

@Override
protected void initChannel(SocketChannel ch) throws Exception {
String delimiter = "_$";
// 對服務端返回的消息通過_$進行分隔,並且每次查找的最大大小為1024位元組
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,
Unpooled.wrappedBuffer(delimiter.getBytes())));
// 將分隔之後的位元組數據轉換為字元串
ch.pipeline().addLast(new StringDecoder());
// 對客戶端發送的數據進行編碼,這裡主要是在客戶端發送的數據最後添加分隔符
ch.pipeline().addLast(new DelimiterBasedFrameEncoder(delimiter));
// 客戶端發送數據給服務端,並且處理從服務端響應的數據
ch.pipeline().addLast(new EchoClientHandler());
}

這裡客戶端的處理方式與服務端基本一致,關於這裡沒展示的代碼,其與示例一中的代碼完全一致,這裡則不予展示。

3.3 LengthFieldBasedFrameDecoder與LengthFieldPrepender

這裡LengthFieldBasedFrameDecoder與LengthFieldPrepender需要配合起來使用,其實本質上來講,這兩者一個是解碼,一個是編碼的關係。它們處理粘拆包的主要思想是在生成的數據包中添加一個長度欄位,用於記錄當前數據包的長度。LengthFieldBasedFrameDecoder會按照參數指定的包長度偏移量數據對接收到的數據進行解碼,從而得到目標消息體數據;而LengthFieldPrepender則會在響應的數據前面添加指定的位元組數據,這個位元組數據中保存了當前消息體的整體位元組數據長度。LengthFieldBasedFrameDecoder的解碼過程如下圖所示:

Netty解決TCP粘包和拆包問題的四種方案

LengthFieldPrepender的編碼過程如下圖所示:

Netty解決TCP粘包和拆包問題的四種方案

關於LengthFieldBasedFrameDecoder,這裡需要對其構造函數參數進行介紹:

  • maxFrameLength:指定了每個包所能傳遞的最大數據包大小;
  • lengthFieldOffset:指定了長度欄位在位元組碼中的偏移量;
  • lengthFieldLength:指定了長度欄位所佔用的位元組長度;
  • lengthAdjustment:對一些不僅包含有消息頭和消息體的數據進行消息頭的長度的調整,這樣就可以只得到消息體的數據,這裡的lengthAdjustment指定的就是消息頭的長度;
  • initialBytesToStrip:對於長度欄位在消息頭中間的情況,可以通過initialBytesToStrip忽略掉消息頭以及長度欄位佔用的位元組。

這裡我們以json序列化為例對LengthFieldBasedFrameDecoder和LengthFieldPrepender的使用方式進行講解。如下是EchoServer的源碼:

public class EchoServer {
public void bind(int port) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 這裡將LengthFieldBasedFrameDecoder添加到pipeline的首位,因為其需要對接收到的數據
// 進行長度欄位解碼,這裡也會對數據進行粘包和拆包處理
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
// LengthFieldPrepender是一個編碼器,主要是在響應位元組數據前面添加位元組長度欄位
ch.pipeline().addLast(new LengthFieldPrepender(2));
// 對經過粘包和拆包處理之後的數據進行json反序列化,從而得到User對象
ch.pipeline().addLast(new JsonDecoder());
// 對響應數據進行編碼,主要是將User對象序列化為json
ch.pipeline().addLast(new JsonEncoder());
// 處理客戶端的請求的數據,並且進行響應
ch.pipeline().addLast(new EchoServerHandler());
}
});
ChannelFuture future = bootstrap.bind(port).sync();
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new EchoServer().bind(8080);
}
}

這裡EchoServer主要是在pipeline中添加了兩個編碼器和兩個解碼一器,編碼器主要是負責將響應的User對象序列化為json對象,然後在其位元組數組前面添加一個長度欄位的位元組數組;解碼一器主要是對接收到的數據進行長度欄位的解碼,然後將其反序列化為一個User對象。下面是JsonDecoder的源碼:

public class JsonDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out)
throws Exception {
byte[] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes);
User user = JSON.parseObject(new String(bytes, CharsetUtil.UTF_8), User.class);
out.add(user);
}
}

JsonDecoder首先從接收到的數據流中讀取位元組數組,然後將其反序列化為一個User對象。下面我們看看JsonEncoder的源碼:

public class JsonEncoder extends MessageToByteEncoder<User> {
@Override
protected void encode(ChannelHandlerContext ctx, User user, ByteBuf buf)
throws Exception {
String json = JSON.toJSONString(user);
ctx.writeAndFlush(Unpooled.wrappedBuffer(json.getBytes()));
}
}

JsonEncoder將響應得到的User對象轉換為一個json對象,然後寫入響應中。對於EchoServerHandler,其主要作用就是接收客戶端數據,並且進行響應,如下是其源碼:

public class EchoServerHandler extends SimpleChannelInboundHandler<User> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, User user) throws Exception {
System.out.println("receive from client: " + user);
ctx.write(user);
}
}

對於客戶端,其主要邏輯與服務端的基本類似,這裡主要展示其pipeline的添加方式,以及最後發送請求,並且對伺服器響應進行處理的過程:

@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
ch.pipeline().addLast(new LengthFieldPrepender(2));
ch.pipeline().addLast(new JsonDecoder());
ch.pipeline().addLast(new JsonEncoder());
ch.pipeline().addLast(new EchoClientHandler());
}
public class EchoClientHandler extends SimpleChannelInboundHandler<User> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.write(getUser());
}
private User getUser() {
User user = new User();
user.setAge(27);
user.setName("zhangxufeng");
return user;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, User user) throws Exception {
System.out.println("receive message from server: " + user);
}
}

這裡客戶端首先會在連接上伺服器時,往伺服器發送一個User對象數據,然後在接收到伺服器響應之後,會列印伺服器響應的數據。

3.4 自定義粘包與拆包器

對於粘包與拆包問題,其實前面三種基本上已經能夠滿足大多數情形了,但是對於一些更加複雜的協議,可能有一些定製化的需求。對於這些場景,其實本質上,我們也不需要手動從頭開始寫一份粘包與拆包處理器,而是通過繼承LengthFieldBasedFrameDecoder和LengthFieldPrepender來實現粘包和拆包的處理。

如果用戶確實需要不通過繼承的方式實現自己的粘包和拆包處理器,這裡可以通過實現MessageToByteEncoder和ByteToMessageDecoder來實現。這裡MessageToByteEncoder的作用是將響應數據編碼為一個ByteBuf對象,而ByteToMessageDecoder則是將接收到的ByteBuf數據轉換為某個對象數據。通過實現這兩個抽象類,用戶就可以達到實現自定義粘包和拆包處理的目的。如下是這兩個類及其抽象方法的聲明:

public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception;
}
public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter {
protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out)
throws Exception;
}

4. 小結

本文首先對粘包和拆包的問題原理進行描述,幫助讀者理解粘包和拆包問題所在。然後對處理粘包和拆包的幾種常用解決方案進行講解。接著通過輔以示例的方式對Netty提供的幾種解決粘包和拆包問題的解決方案進行了詳細講解。

作者:愛寶貝丶

原文:https://my.oschina.net/zhangxufeng/blog/3023794

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 程序員小新人學習 的精彩文章:

MySQL運維實戰之PHP訪問MySQL,你使用對了嗎?
Strangler重寫模式

TAG:程序員小新人學習 |