消息隊列NetMQ 原理分析5-StreamEngine、Encord和Decord
- 消息隊列NetMQ 原理分析5-StreamEngine,Encord和Decord
- 前言
- 介紹
- 目的
- StreamEngine
- 發送數據
- 接收數據
- 流程分析
- Encoder
- V2Encoder
- V1Encoder
- RawEncoder
- Decoder
- V2Decoder
- V1Decoder
- RawDecoder
- 總結
- 前言
前言介紹
[NetMQ](https://github.com/zeromq/netmq.git)是ZeroMQ的C#移植版本,它是對標準socket介面的擴展。它提供了一種非同步消息隊列,多消息模式,消息過濾(訂閱),對多種傳輸協議的無縫訪問。
當前有2個版本正在維護,版本3最新版為3.3.4,版本4最新版本為4.0.1。本文檔是對4.0.1分支代碼進行分析。
目的
對NetMQ的源碼進行學習並分析理解,因此寫下該系列文章,本系列文章暫定編寫計劃如下:
- 消息隊列NetMQ 原理分析1-Context和ZObject
- 消息隊列NetMQ 原理分析2-IO線程和完成埠
- 消息隊列NetMQ 原理分析3-命令產生/處理、創建Socket和回收線程
- 消息隊列NetMQ 原理分析4-Socket、Session、Option和Pipe
- 消息隊列NetMQ 原理分析5-StreamEngine,Encord和Decord
- 消息隊列NetMQ 原理分析6-TCP和Inpoc實現
- 消息隊列NetMQ 原理分析7-Device
- 消息隊列NetMQ 原理分析8-不同類型的Socket
- 消息隊列NetMQ 原理分析9-實戰
友情提示: 看本系列文章時最好獲取源碼,更有助於理解。
StreamEngine
SocketBase
將Msg
發送給SessionBase
之後需要將Msg
轉化為byte
進行傳輸,Engine
就是做轉換的工作,轉換完成之後就會和實際的底層Socket
進行消息傳輸。
NetMQ
在Tcp
協議消息轉換使用的是StreamEngine
。
internal sealed class StreamEngine : IEngine, IProactorEvents, IMsgSink
{
}
上一章介紹到管道事件。
發送數據當出管道有數據可讀時,會調用SessionBase
的ReadActivated
事件
public void ReadActivated(Pipe pipe)
{
...
if (m_engine != null)
m_engine.ActivateOut;
else
m_pipe.CheckRead;
}
然後會調用對應m_engine的ActivateOut事件
public void ActivateOut
{
FeedAction(Action.ActivateOut, SocketError.Success, 0);
}
public void FeedAction{
...
case State.Active:
switch (action)
{
case Action.OutCompleted:
int bytesSent = EndWrite(socketError, bytesTransferred);
// IO error has occurred. We stop waiting for output events.
// The engine is not terminated until we detect input error;
// this is necessary to prevent losing incoming messages.
if (bytesSent == -1)
{
m_sendingState = SendState.Error;
}
else
{
m_outpos.AdvanceOffset(bytesSent);
m_outsize -= bytesSent;
BeginSending;
}
break;
...
}
...
}
當TCPConnect
客戶端發送請求完成時,會調用OutCompleted
事件
private void Loop
{
...
switch (completion.OperationType)
{
...
case OperationType.Connect:
case OperationType.Disconnect:
case OperationType.Send:
item.ProactorEvents.OutCompleted(
completion.SocketError,
completion.BytesTransferred);
}
}
...
public void OutCompleted(SocketError socketError, int bytesTransferred)
{
...
// Create the engine object for this connection.
var engine = new StreamEngine(m_s, m_options, m_endpoint);
...
// Attach the engine to the corresponding session object.
SendAttach(m_session, engine);
...
}
此時會創建一個StreamEngine
和請求的SessionBase
對象進行關聯。
protected override void ProcessAttach(IEngine engine)
{
Debug.Assert(engine != null);
// Create the pipe if it does not exist yet.
if (m_pipe == null && !IsTerminating)
{
ZObject parents = { this, m_socket };
int highWaterMarks = { m_options.ReceiveHighWatermark, m_options.SendHighWatermark };
int lowWaterMarks = { m_options.ReceiveLowWatermark, m_options.SendLowWatermark };
bool delays = { m_options.DelayOnClose, m_options.DelayOnDisconnect };
Pipe pipes = Pipe.PipePair(parents, highWaterMarks, lowWaterMarks, delays);
// Plug the local end of the pipe.
pipes[0].SetEventSink(this);
// Remember the local end of the pipe.
Debug.Assert(m_pipe == null);
m_pipe = pipes[0];
// Ask socket to plug into the remote end of the pipe.
SendBind(m_socket, pipes[1]);
}
// Plug in the engine.
Debug.Assert(m_engine == null);
m_engine = engine;
m_engine.Plug(m_ioThread, this);
}
接收數據
當完成埠通知數據接收完成時,會調用Proactor
的InCompleted
事件,實際就是調用的對應的StreamEngine
的InCompleted
事件
public void InCompleted(SocketError socketError, int bytesTransferred)
{
FeedAction(Action.InCompleted, socketError, bytesTransferred);
}
public void FeedAction{
...
case State.Active:
switch (action)
{
case Action.InCompleted:
m_insize = EndRead(socketError, bytesTransferred);
ProcessInput;
break;
...
}
...
}
接收完成後會對接收到的數據進行處理
private void ProcessInput
{
...
if (m_options.RawSocket)
{
if (m_insize == 0 || !m_decoder.MessageReadySize(m_insize))
{
processed = 0;
}
else
{
processed = m_decoder.ProcessBuffer(m_inpos, m_insize);
}
}
else
{
// Push the data to the decoder.
processed = m_decoder.ProcessBuffer(m_inpos, m_insize);
}
...
// Flush all messages the decoder may have produced.
m_session.Flush;
...
}
public override bool MessageReadySize(int msgSize)
{
m_inProgress = new Msg;
m_inProgress.InitPool(msgSize);
NextStep(new ByteArraySegment(m_inProgress.Data, m_inProgress.Offset),
m_inProgress.Size, RawMessageReadyState);
return true;
}
讀取數據到Msg
後會調用Decoder
的ProcessBuffer
方法
PS:由於
NetMQ
有自己的傳輸協議格式,因此當使用NetMQ
和其他程序進行Socket
傳輸時,必須使用StreamSocket
。
public int ProcessBuffer(ByteArraySegment data, int size)
{
...
while (m_toRead == 0)
{
if (!Next)
{
if (State < 0)
{
return -1;
}
return size;
}
}
return size;
...
}
protected override bool Next
{
if (State == RawMessageReadyState)
{
return RawMessageReady;
}
return false;
}
private bool RawMessageReady
{
...
bool isMessagedPushed = m_msgSink.PushMsg(ref m_inProgress);
if (isMessagedPushed)
{
// NOTE: This is just to break out of process_buffer
// raw_message_ready should never get called in state machine w/o
// message_ready_size from stream_engine.
NextStep(new ByteArraySegment(m_inProgress.Data, m_inProgress.Offset),
1, RawMessageReadyState);
}
return isMessagedPushed;
...
}
對讀到的數據進行處理調用RawDecoder
的Next
的方法,將獲取到的Msg
放入到SeesionBase
的管道中。
讀寫數據流程圖如下圖所示:
我們使用WireShark進行驗證。
我們監聽15557地址,然後創建一個客戶端連接15557地址
前面3條是三次握手。第四條是客戶端向伺服器發送了10位元組長度的請求頭部,以0xff
開頭,0x7f
結尾。中間是8位元組是Identitysize
長度
...
switch (m_handshakeState)
{
case HandshakeState.Closed:
switch (action)
{
case Action.Start:
// Send the "length" and "flags" fields of the identity message.
// The "length" field is encoded in the long format.
m_greetingOutputBuffer[m_outsize++] = 0xff;
m_greetingOutputBuffer.PutLong(m_options.Endian, (long)m_options.IdentitySize + 1, 1);
m_outsize += 8;
m_greetingOutputBuffer[m_outsize++] = 0x7f;
...
}
...
}
...
第6條是伺服器向客戶端發送的10位元組長度的請求頭部,以0xff
開頭,0x7f
結尾。中間是8位元組是identitysize
的信息
第8條是伺服器向客戶端發送的版本號和Socket
類型,01表示版本號1,06表示當前是RouterSocket
...
case HandshakeState.ReceivingGreeting:
switch (action)
{
case Action.InCompleted:
...
if (m_greeting[0] != 0xff || (m_greetingBytesRead == 10 && (m_greeting[9] & 0x01) == 0)){
...
}
else if (m_greetingBytesRead < 10)
{
var greetingSegment = new ByteArraySegment(m_greeting, m_greetingBytesRead);
BeginRead(greetingSegment, PreambleSize - m_greetingBytesRead);
}
else
{
...
m_outpos[m_outsize++] = 1; // Protocol version
m_outpos[m_outsize++] = (byte)m_options.SocketType;
...
}
...
}
...
第10條是客戶端向伺服器發送的版本號和socket類型,05表示當前是DealSocket
...
case HandshakeState.ReceivingRestOfGreeting:
switch (action)
{
case Action.InCompleted:
...
if (m_greeting[VersionPos] == 0)
{
// ZMTP/1.0 framing.
m_encoder = new V1Encoder(Config.OutBatchSize, m_options.Endian);
m_encoder.SetMsgSource(m_session);
m_decoder = new V1Decoder(Config.InBatchSize, m_options.MaxMessageSize, m_options.Endian);
m_decoder.SetMsgSink(m_session);
}
else
{
// v1 framing protocol.
m_encoder = new V2Encoder(Config.OutBatchSize, m_session, m_options.Endian);
m_decoder = new V2Decoder(Config.InBatchSize, m_options.MaxMessageSize, m_session, m_options.Endian);
}
Activate;
...
}
...
EncoderV2Encoder
接下來就是數據傳輸。
public V2Encoder(int bufferSize, IMsgSource session, Endianness endian)
: base(bufferSize, endian)
{
m_inProgress = new Msg;
m_inProgress.InitEmpty;
m_msgSource = session;
// Write 0 bytes to the batch and go to message_ready state.
NextStep(m_tmpbuf, 0, MessageReadyState, true);
}
由於NetMQ
使用的是版本1,用的是V2Encoder
和V2Decoder
進行編碼和解碼。
在初始化Encoder
的時候會向報文寫入2個0位元組數據,暫時不明白為何要這樣做。
int protocolFlags = 0;
if (m_inProgress.HasMore)
protocolFlags |= V2Protocol.MoreFlag;
if (m_inProgress.Size > 255)
protocolFlags |= V2Protocol.LargeFlag;
m_tmpbuf[0] = (byte)protocolFlags;
// Encode the message length. For messages less then 256 bytes,
// the length is encoded as 8-bit unsigned integer. For larger
// messages, 64-bit unsigned integer in network byte order is used.
int size = m_inProgress.Size;
if (size > 255)
{
m_tmpbuf.PutLong(Endian, size, 1);
NextStep(m_tmpbuf, 9, SizeReadyState, false);
}
else
{
m_tmpbuf[1] = (byte)(size);
NextStep(m_tmpbuf, 2, SizeReadyState, false);
}
第一個位元組是Flags
用於標記該報文是否為大報文,超過過255個位元組就會標記為大包標記,是否還有更多報文。若報文長度小於256,則第二個位元組用於存儲報文長度。但是若是大報文,則會8個位元組保存報文長度。
下面就開始發送數據
我們用客戶端發一個字元串test1
,然後服務端原樣返回該字元串
可以看到如我們上面分析的一樣,第一個位元組為0,第二個位元組為大小test1
為5個位元組長度。由於CMD命令單行輸入最長字元限制長度為255,因此我們沒辦法在CMD命令下輸入更長數據進行測試。暫時就不做驗證。
V1Encoder編碼如下所示
if (size < 255)
{
m_tmpbuf[0] = (byte)size;
m_tmpbuf[1] = (byte)(m_inProgress.Flags & MsgFlags.More);
NextStep(m_tmpbuf, 2, SizeReadyState, false);
}
else
{
m_tmpbuf[0] = 0xff;
m_tmpbuf.PutLong(Endian, size, 1);
m_tmpbuf[9] = (byte)(m_inProgress.Flags & MsgFlags.More);
NextStep(m_tmpbuf, 10, SizeReadyState, false);
}
當小於255字元,首字元是長度,第二個字元是Flags
,超過255字元,首字元為0xff
,然後跟著8個字元長度的長度值,接下來是Flags
使用RawEncoder
會將原始數據原樣發送不會增加任何其他字元。
接收到數據會先接收第一個位元組Flags
判斷是否有後續包以及是小包還是打包,若是小包,則解析第一個位元組長度位,否則讀取8個位元組長度位。
接收到數據收先會判斷第一個位元組是不是Oxff
,若為Oxff
則表示為打包,獲取8位位元組長度,否則獲取1位位元組長度處理。
使用RawDecoder
會讀取數據保存到管道中。
本片介紹了NetMQ的報文格式並闡述了底層Msg如何轉換為流進行發送和接收。
※python函數(5):迭代器和生成器
※MySQL優化-所需了解的基礎知識
※數組排序之冒泡排序
TAG:科技優家 |
※被視為代替Kafka的消息隊列:Apache Pulsar設計簡介
※React、頁面渲染、任務隊列、Node.js
※消息隊列CKafka
※簡析Python中的四種隊列
※RabbitMQ 高級篇八 消費端ACK與重回隊列
※分散式隊列神器 Celery
※RabbitMQ消息中間件技術精講17 高級篇十 死信隊列
※kafka消息隊列學習整理
※RabbitMQ高級篇九TTL設置隊列或消息有效期隊列及消息
※Rocketmq之消息隊列分配策略演算法實現的源碼分析
※日常生活中的膳食蛋白質來源和肌肉質量 「Lifelines」 隊列研究
※進程間的通信 IPC——實現消息隊列(msg)
※用於腹側疝修復的改良Chevrel技術:單個中心隊列的長期結果
※linux內核對網卡驅動多隊列的支持
※Linux 下的進程間通信:使用管道和消息隊列
※Nature:氣勢磅礴!中國最大出生隊列研究碩果累累,引國際「圍觀」
※List順序表,鏈表隊列,棧,字典
※糞鈣衛蛋白和內鏡下UCEIS評分預測急性重症UC的短期結局:前瞻性隊列研究
※消息隊列 MQ 專欄
※日航空自衛隊列裝 首架F-35A戰機