千萬級WebSocket消息推送服務技術分析
拉模式和推模式區別
拉模式(定時輪詢訪問介面獲取數據)
數據更新頻率低,則大多數的數據請求時無效的
在線用戶數量多,則服務端的查詢負載很高
定時輪詢拉取,無法滿足時效性要求
推模式(向客戶端進行數據的推送)
僅在數據更新時,才有推送
需要維護大量的在線長連接
數據更新後,可以立即推送
基於WebSocket協議做推送
瀏覽器支持的socket編程,輕鬆維持服務端的長連接
基於TCP協議之上的高層協議,無需開發者關心通訊細節
提供了高度抽象的編程介面,業務開發成本較低
WebSocket協議的交互流程
客戶端首先發起一個Http請求到服務端,請求的特殊之處,在於在請求裡面帶了一個upgrade的欄位,告訴服務端,我想生成一個websocket的協議,服務端收到請求後,會給客戶端一個握手的確認,返回一個switching, 意思允許客戶端向websocket協議轉換,完成這個協商之後,客戶端與服務端之間的底層TCP協議是沒有中斷的,接下來,客戶端可以向服務端發起一個基於websocket協議的消息,服務端也可以主動向客戶端發起websocket協議的消息,websocket協議裡面通訊的單位就叫message。
傳輸協議原理
協議升級後,繼續復用Http協議的底層socket完成後續通訊
message底層會被切分成多個frame幀進行傳輸,從協議層面不能傳輸一個大包,只能切成一個個小包傳輸
編程時,只需操作message,無需關心frame(屬於協議和類庫自身去操作的)
框架底層完成TCP網路I/O,WebSocket協議的解析,開發者無需關心
服務端技術選型與考慮
NodeJs
單線程模型(儘管可以多進程),推送性能有限
C/C++
TCP通訊、WebSocket協議實現成本高
Go
多線程,基於協程模型並發
Go語言屬於編譯型語言,運行速度並不慢
成熟的WebSocket標準庫,無需造輪子
基於Go實現WebSocket服務端
用Go語言對WebSocket做一個簡單的服務端實現,以及HTML頁面進行調試,並對WebSocket封裝,這裡就直接給出代碼了。
WebSocket服務端
package main
import (
"net/http"
"github.com/gorilla/websocket"
"github.com/myproject/gowebsocket/impl"
"time"
)
var(
upgrader = websocket.Upgrader{
// 允許跨域
CheckOrigin:func(r *http.Request) bool{
return true
},
}
)
func wsHandler(w http.ResponseWriter , r *http.Request){
// w.Write([]byte("hello"))
var(
wsConn *websocket.Conn
err error
conn *impl.Connection
data []byte
)
// 完成ws協議的握手操作
// Upgrade:websocket
if wsConn , err = upgrader.Upgrade(w,r,nil); err != nil{
return
}
if conn , err = impl.InitConnection(wsConn); err != nil{
goto ERR
}
// 啟動線程,不斷發消息
go func(){
var (err error)
for{
if err = conn.WriteMessage([]byte("heartbeat"));err != nil{
return
}
time.Sleep(1*time.Second)
}
}()
for {
if data , err = conn.ReadMessage();err != nil{
goto ERR
}
if err = conn.WriteMessage(data);err !=nil{
goto ERR
}
}
ERR:
conn.Close()
}
func main(){
http.HandleFunc("/ws",wsHandler)
http.ListenAndServe("0.0.0.0:7777",nil)
}
前端頁面
<!DOCTYPE html>
<html>
<head>
<title>go websocket</title>
<meta charset="utf-8" />
</head>
<body>
<script type="text/javascript">
var wsUri ="ws://127.0.0.1:7777/ws";
var output;
function init() {
output = document.getElementById("output");
testWebSocket();
}
function testWebSocket() {
websocket = new WebSocket(wsUri);
websocket.onopen = function(evt) {
onOpen(evt)
};
websocket.onclose = function(evt) {
onClose(evt)
};
websocket.onmessage = function(evt) {
onMessage(evt)
};
websocket.onerror = function(evt) {
onError(evt)
};
}
function onOpen(evt) {
writeToScreen("CONNECTED");
// doSend("WebSocket rocks");
}
function onClose(evt) {
writeToScreen("DISCONNECTED");
}
function onMessage(evt) {
writeToScreen("<span stylex="color: blue;">RESPONSE: "+ evt.data+"</span>");
// websocket.close();
}
function onError(evt) {
writeToScreen("<span stylex="color: red;">ERROR:</span> "+ evt.data);
}
function doSend(message) {
writeToScreen("SENT: " + message);
websocket.send(message);
}
function writeToScreen(message) {
var pre = document.createElement("p");
pre.style.wordWrap = "break-word";
pre.innerHTML = message;
output.appendChild(pre);
}
window.addEventListener("load", init, false);
function sendBtnClick(){
var msg = document.getElementById("input").value;
doSend(msg);
document.getElementById("input").value = "";
}
function closeBtnClick(){
websocket.close();
}
</script>
<h2>WebSocket Test</h2>
<input type="text" id="input"></input>
<button onclick="sendBtnClick()" >send</button>
<button onclick="closeBtnClick()" >close</button>
<div id="output"></div>
</body>
</html>
封裝WebSocket
package impl
import (
"github.com/gorilla/websocket"
"sync"
"errors"
)
type Connection struct{
wsConnect *websocket.Conn
inChan chan []byte
outChan chan []byte
closeChan chan byte
mutex sync.Mutex // 對closeChan關閉上鎖
isClosed bool // 防止closeChan被關閉多次
}
func InitConnection(wsConn *websocket.Conn)(conn *Connection ,err error){
conn = &Connection{
wsConnect:wsConn,
inChan: make(chan []byte,1000),
outChan: make(chan []byte,1000),
closeChan: make(chan byte,1),
}
// 啟動讀協程
go conn.readLoop();
// 啟動寫協程
go conn.writeLoop();
return
}
func (conn *Connection)ReadMessage()(data []byte , err error){
select{
case data = <- conn.inChan:
case <- conn.closeChan:
err = errors.New("connection is closeed")
}
return
}
func (conn *Connection)WriteMessage(data []byte)(err error){
select{
case conn.outChan <- data:
case <- conn.closeChan:
err = errors.New("connection is closeed")
}
return
}
func (conn *Connection)Close(){
// 線程安全,可多次調用
conn.wsConnect.Close()
// 利用標記,讓closeChan只關閉一次
conn.mutex.Lock()
if !conn.isClosed {
close(conn.closeChan)
conn.isClosed = true
}
conn.mutex.Unlock()
}
// 內部實現
func (conn *Connection)readLoop(){
var(
data []byte
err error
)
for{
if _, data , err = conn.wsConnect.ReadMessage(); err != nil{
goto ERR
}
//阻塞在這裡,等待inChan有空閑位置
select{
case conn.inChan <- data:
case <- conn.closeChan: // closeChan 感知 conn斷開
goto ERR
}
}
ERR:
conn.Close()
}
func (conn *Connection)writeLoop(){
var(
data []byte
err error
)
for{
select{
case data= <- conn.outChan:
case <- conn.closeChan:
goto ERR
}
if err = conn.wsConnect.WriteMessage(websocket.TextMessage , data); err != nil{
goto ERR
}
}
ERR:
conn.Close()
}
千萬級彈幕系統的架構設計
技術難點
內核瓶頸
推送量大:100W在線 * 10條/每秒 = 1000W條/秒
內核瓶頸:linux內核發送TCP的極限包頻 ≈ 100W/秒
鎖瓶頸
需要維護在線用戶集合(100W用戶在線),通常是一個字典結構
推送消息即遍歷整個集合,順序發送消息,耗時極長
推送期間,客戶端仍舊正常的上下線,集合面臨不停的修改,修改需要遍歷,所以集合需要上鎖
CPU瓶頸
瀏覽器與服務端之間一般採用的是JSon格式去通訊
Json編碼非常耗費CPU資源
向100W在線推送一次,則需100W次Json Encode
優化方案
內核瓶頸
減少網路小包的發送,我們將網路上幾百位元組定義成網路的小包了,小包的問題是對內核和網路的中間設備造成處理的壓力。方案是將一秒內N條消息合併成1條消息,合併後,每秒推送數等於在線連接數。
鎖瓶頸
大鎖拆小鎖,將長連接打散到多個集合中去,每個集合都有自己的鎖,多線程並發推送集合,線程之間推送的集合不同,所以沒有鎖的競爭關係,避免鎖競爭。
讀寫鎖取代互斥鎖,多個推送任務可以並發遍歷相同集合
CPU瓶頸
減少重複計算,Json編碼前置,1次消息編碼+100W次推送,消息合併前置,N條消息合併後,只需要編碼一次。
單機架構
最外層是在線的長連接,連接到服務端後,打散到多個集合裡面存儲,我們要發送的消息呢,通過打包後,經過json編碼,被多個線程或協程分發到多個集合中去,最終推給了所有的在線連接。
單機瓶頸
維護海量長連接,會花費不少內存
消息推送的瞬時,消耗大量的CPU
消息推送的瞬時帶寬高達400-600Mb(4-6Gbits),需要用到萬兆網卡,是主要瓶頸
集群
部署多個節點,通過負載均衡,把連接打散到多個 伺服器上,但推送消息的時候,不知道哪個直播間在哪個節點上,最常用的方式是將消息廣播給所有的網關節點,此時就需要做一個邏輯集群。
邏輯集群
基於Http2協議向gateway集群分發消息(Http2支持連接復用,用作RPC性能更佳,即在單個連接上可以做高吞吐的請求應答處理)
基於Http1協議對外提供推送API(Http1更加普及,對業務方更加友好)
整體分散式架構圖如下:
任何業務方通過Http介面調用到邏輯集群,邏輯集群把消息廣播給所有網關,各個網關各自將消息推送給在線的連接即可。
本文講解了開發消息推送服務的難點與解決方案的大體思路,按照整個理論流程下來,基本能實現一套彈幕消息推送的服務。
※通過url來設置log4j的記錄級別
※HTML字元實體與轉義字元串、使用CDATA區批量轉譯
TAG:程序員小新人學習 |