當前位置:
首頁 > 知識 > 千萬級WebSocket消息推送服務技術分析

千萬級WebSocket消息推送服務技術分析

拉模式和推模式區別

拉模式(定時輪詢訪問介面獲取數據)

數據更新頻率低,則大多數的數據請求時無效的

在線用戶數量多,則服務端的查詢負載很高

定時輪詢拉取,無法滿足時效性要求

推模式(向客戶端進行數據的推送)

僅在數據更新時,才有推送

需要維護大量的在線長連接

數據更新後,可以立即推送

基於WebSocket協議做推送

瀏覽器支持的socket編程,輕鬆維持服務端的長連接

基於TCP協議之上的高層協議,無需開發者關心通訊細節

提供了高度抽象的編程介面,業務開發成本較低

WebSocket協議的交互流程

千萬級WebSocket消息推送服務技術分析

客戶端首先發起一個Http請求到服務端,請求的特殊之處,在於在請求裡面帶了一個upgrade的欄位,告訴服務端,我想生成一個websocket的協議,服務端收到請求後,會給客戶端一個握手的確認,返回一個switching, 意思允許客戶端向websocket協議轉換,完成這個協商之後,客戶端與服務端之間的底層TCP協議是沒有中斷的,接下來,客戶端可以向服務端發起一個基於websocket協議的消息,服務端也可以主動向客戶端發起websocket協議的消息,websocket協議裡面通訊的單位就叫message。

傳輸協議原理

協議升級後,繼續復用Http協議的底層socket完成後續通訊

message底層會被切分成多個frame幀進行傳輸,從協議層面不能傳輸一個大包,只能切成一個個小包傳輸

編程時,只需操作message,無需關心frame(屬於協議和類庫自身去操作的)

框架底層完成TCP網路I/O,WebSocket協議的解析,開發者無需關心

服務端技術選型與考慮

NodeJs

單線程模型(儘管可以多進程),推送性能有限

C/C++

TCP通訊、WebSocket協議實現成本高

Go

多線程,基於協程模型並發

Go語言屬於編譯型語言,運行速度並不慢

成熟的WebSocket標準庫,無需造輪子

基於Go實現WebSocket服務端

用Go語言對WebSocket做一個簡單的服務端實現,以及HTML頁面進行調試,並對WebSocket封裝,這裡就直接給出代碼了。

WebSocket服務端

package main

import (

"net/http"

"github.com/gorilla/websocket"

"github.com/myproject/gowebsocket/impl"

"time"

)

var(

upgrader = websocket.Upgrader{

// 允許跨域

CheckOrigin:func(r *http.Request) bool{

return true

},

}

)

func wsHandler(w http.ResponseWriter , r *http.Request){

// w.Write([]byte("hello"))

var(

wsConn *websocket.Conn

err error

conn *impl.Connection

data []byte

)

// 完成ws協議的握手操作

// Upgrade:websocket

if wsConn , err = upgrader.Upgrade(w,r,nil); err != nil{

return

}

if conn , err = impl.InitConnection(wsConn); err != nil{

goto ERR

}

// 啟動線程,不斷發消息

go func(){

var (err error)

for{

if err = conn.WriteMessage([]byte("heartbeat"));err != nil{

return

}

time.Sleep(1*time.Second)

}

}()

for {

if data , err = conn.ReadMessage();err != nil{

goto ERR

}

if err = conn.WriteMessage(data);err !=nil{

goto ERR

}

}

ERR:

conn.Close()

}

func main(){

http.HandleFunc("/ws",wsHandler)

http.ListenAndServe("0.0.0.0:7777",nil)

}

前端頁面

<!DOCTYPE html>

<html>

<head>

<title>go websocket</title>

<meta charset="utf-8" />

</head>

<body>

<script type="text/javascript">

var wsUri ="ws://127.0.0.1:7777/ws";

var output;

function init() {

output = document.getElementById("output");

testWebSocket();

}

function testWebSocket() {

websocket = new WebSocket(wsUri);

websocket.onopen = function(evt) {

onOpen(evt)

};

websocket.onclose = function(evt) {

onClose(evt)

};

websocket.onmessage = function(evt) {

onMessage(evt)

};

websocket.onerror = function(evt) {

onError(evt)

};

}

function onOpen(evt) {

writeToScreen("CONNECTED");

// doSend("WebSocket rocks");

}

function onClose(evt) {

writeToScreen("DISCONNECTED");

}

function onMessage(evt) {

writeToScreen("<span stylex="color: blue;">RESPONSE: "+ evt.data+"</span>");

// websocket.close();

}

function onError(evt) {

writeToScreen("<span stylex="color: red;">ERROR:</span> "+ evt.data);

}

function doSend(message) {

writeToScreen("SENT: " + message);

websocket.send(message);

}

function writeToScreen(message) {

var pre = document.createElement("p");

pre.style.wordWrap = "break-word";

pre.innerHTML = message;

output.appendChild(pre);

}

window.addEventListener("load", init, false);

function sendBtnClick(){

var msg = document.getElementById("input").value;

doSend(msg);

document.getElementById("input").value = "";

}

function closeBtnClick(){

websocket.close();

}

</script>

<h2>WebSocket Test</h2>

<input type="text" id="input"></input>

<button onclick="sendBtnClick()" >send</button>

<button onclick="closeBtnClick()" >close</button>

<div id="output"></div>

</body>

</html>

封裝WebSocket

package impl

import (

"github.com/gorilla/websocket"

"sync"

"errors"

)

type Connection struct{

wsConnect *websocket.Conn

inChan chan []byte

outChan chan []byte

closeChan chan byte

mutex sync.Mutex // 對closeChan關閉上鎖

isClosed bool // 防止closeChan被關閉多次

}

func InitConnection(wsConn *websocket.Conn)(conn *Connection ,err error){

conn = &Connection{

wsConnect:wsConn,

inChan: make(chan []byte,1000),

outChan: make(chan []byte,1000),

closeChan: make(chan byte,1),

}

// 啟動讀協程

go conn.readLoop();

// 啟動寫協程

go conn.writeLoop();

return

}

func (conn *Connection)ReadMessage()(data []byte , err error){

select{

case data = <- conn.inChan:

case <- conn.closeChan:

err = errors.New("connection is closeed")

}

return

}

func (conn *Connection)WriteMessage(data []byte)(err error){

select{

case conn.outChan <- data:

case <- conn.closeChan:

err = errors.New("connection is closeed")

}

return

}

func (conn *Connection)Close(){

// 線程安全,可多次調用

conn.wsConnect.Close()

// 利用標記,讓closeChan只關閉一次

conn.mutex.Lock()

if !conn.isClosed {

close(conn.closeChan)

conn.isClosed = true

}

conn.mutex.Unlock()

}

// 內部實現

func (conn *Connection)readLoop(){

var(

data []byte

err error

)

for{

if _, data , err = conn.wsConnect.ReadMessage(); err != nil{

goto ERR

}

//阻塞在這裡,等待inChan有空閑位置

select{

case conn.inChan <- data:

case <- conn.closeChan: // closeChan 感知 conn斷開

goto ERR

}

}

ERR:

conn.Close()

}

func (conn *Connection)writeLoop(){

var(

data []byte

err error

)

for{

select{

case data= <- conn.outChan:

case <- conn.closeChan:

goto ERR

}

if err = conn.wsConnect.WriteMessage(websocket.TextMessage , data); err != nil{

goto ERR

}

}

ERR:

conn.Close()

}

千萬級彈幕系統的架構設計

技術難點

內核瓶頸

推送量大:100W在線 * 10條/每秒 = 1000W條/秒

內核瓶頸:linux內核發送TCP的極限包頻 ≈ 100W/秒

鎖瓶頸

需要維護在線用戶集合(100W用戶在線),通常是一個字典結構

推送消息即遍歷整個集合,順序發送消息,耗時極長

推送期間,客戶端仍舊正常的上下線,集合面臨不停的修改,修改需要遍歷,所以集合需要上鎖

CPU瓶頸

瀏覽器與服務端之間一般採用的是JSon格式去通訊

Json編碼非常耗費CPU資源

向100W在線推送一次,則需100W次Json Encode

優化方案

內核瓶頸

減少網路小包的發送,我們將網路上幾百位元組定義成網路的小包了,小包的問題是對內核和網路的中間設備造成處理的壓力。方案是將一秒內N條消息合併成1條消息,合併後,每秒推送數等於在線連接數。

鎖瓶頸

大鎖拆小鎖,將長連接打散到多個集合中去,每個集合都有自己的鎖,多線程並發推送集合,線程之間推送的集合不同,所以沒有鎖的競爭關係,避免鎖競爭。

讀寫鎖取代互斥鎖,多個推送任務可以並發遍歷相同集合

CPU瓶頸

減少重複計算,Json編碼前置,1次消息編碼+100W次推送,消息合併前置,N條消息合併後,只需要編碼一次。

單機架構

千萬級WebSocket消息推送服務技術分析

最外層是在線的長連接,連接到服務端後,打散到多個集合裡面存儲,我們要發送的消息呢,通過打包後,經過json編碼,被多個線程或協程分發到多個集合中去,最終推給了所有的在線連接。

單機瓶頸

維護海量長連接,會花費不少內存

消息推送的瞬時,消耗大量的CPU

消息推送的瞬時帶寬高達400-600Mb(4-6Gbits),需要用到萬兆網卡,是主要瓶頸

集群

部署多個節點,通過負載均衡,把連接打散到多個 伺服器上,但推送消息的時候,不知道哪個直播間在哪個節點上,最常用的方式是將消息廣播給所有的網關節點,此時就需要做一個邏輯集群。

邏輯集群

基於Http2協議向gateway集群分發消息(Http2支持連接復用,用作RPC性能更佳,即在單個連接上可以做高吞吐的請求應答處理)

基於Http1協議對外提供推送API(Http1更加普及,對業務方更加友好)

整體分散式架構圖如下:

千萬級WebSocket消息推送服務技術分析

任何業務方通過Http介面調用到邏輯集群,邏輯集群把消息廣播給所有網關,各個網關各自將消息推送給在線的連接即可。

本文講解了開發消息推送服務的難點與解決方案的大體思路,按照整個理論流程下來,基本能實現一套彈幕消息推送的服務。

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 程序員小新人學習 的精彩文章:

通過url來設置log4j的記錄級別
HTML字元實體與轉義字元串、使用CDATA區批量轉譯

TAG:程序員小新人學習 |