美團點評基於Storm的實時數據處理實踐
背景
目前美團點評已累計了豐富的線上交易與用戶行為數據,為商家賦能需要我們有更強大的專業化數據加工能力,來幫助商家做出正確的決策從而提高用戶體驗。目前商家端產品在數據應用上主要基於離線數據加工,數據生產調度以「T+1」為主,伴隨著越來越深入的精細化運營,實時數據應用訴求逾加強烈。本文將從目前主流實時數據處理引擎的特點和我們面臨的問題出發,簡單的介紹一下我們是如何搭建實時數據處理系統。
設計框架
目前比較流行的實時處理引擎有 Storm,Spark Streaming,Flink。每個引擎都有各自的特點和應用場景。 下表是對這三個引擎的簡單對比:
打開今日頭條,查看更多精彩圖片考慮到每個引擎的特點、商家端應用的特點和系統的高可用性,我們最終選擇了 Storm 作為本系統的實時處理引擎。
面臨的問題
- 數據量的不穩定性,導致對機器需求的不確定性。用戶的行為數據會受到時間的影響,比如半夜時刻和用餐高峰時段每分鐘產生的數據量有兩個數量級的差異。
- 上游數據質量的不確定性。
- 數據計算時,數據的落地點應該放到哪裡來保證計算的高效性。
- 如何保證數據在多線程處理時數據計算的正確性。
- 計算好的數據以什麼樣的方式提供給應用方。
具體的實施方案
實時攝入數據完整性保障
數據完整性保證層:如何保證數據攝入到計算引擎的完整性呢?正如表格中比較的那樣,Storm 框架的語義為 At Least Once,至少攝入一次。這個語義的存在正好保證了數據的完整性,所以只需要根據自己的需求編寫 Spout 即可。好消息是我們的技術團隊已經開發好了一個滿足大多數需求的 Spout,可以直接拿來使用。特別需要注意的一點,在數據處理的過程中需要我們自己來剔除已經處理過的數據,因為 Storm 的語義會可能導致同一條數據攝入兩次。灰度發布期間(一周)對數據完整性進行驗證,數據完整性為100%。
實時數據平滑處理
數據預測層:實時的數據預測可以幫助我們對到達的數據進行有效的平滑,從而可以減少在某一時刻對集群的壓力。 在數據預測方面,我們採用了在數學上比較簡單的多元線性回歸模型(如果此模型不滿足業務需求,可以選用一些更高級別的預測模型),預測下一分鐘可能到來的數據的量。在數據延遲可接受的範圍內,對數據進行平滑,並完成對數據的計算。通過對該方案的使用,減輕了對集群約33%的壓力。具體步驟如下:
- 步驟一:將多個業務的實時數據進行抽象化,轉換為(Y_i,X_1,X_2,X_3i,... ,X_ni),其中Y_i為在(X_1i...X_ni)屬性下的數據量,(X_1i...X_ni)為n個不同的屬性,比如時間、業務、用戶的性別等等。
- 步驟二:因為考慮到實時數據的特殊性,不同業務的數據量隨時間變數基本呈現為M走勢,所以為了將非線性走勢轉換為線性走勢,可以將時間段分為4部分,保證在每個時間段內數據的走勢為線性走勢。同理,如果其他的屬性使得走勢變為非線性,也可以分段分析。
- 步驟三:將抽象好的數據代入到多元線性回歸模型中,其方程組形式為:
- 即
- 通過對該模型的求解方式求得估計參數,最後得多元線性回歸方程。
- 步驟四:數據預測完之後通過控制對數據的處理速度,保證在規定的時間內完成對規定數據的計算,減輕對集群的壓力。
實時數據計算策略
策略層:Key/Value 模式更適應於實時數據模型,不管是在存儲還是計算方面。Cellar(我們內部基於阿里開源的Tair研發的公共KV存儲)作為一個分散式的 Key/Value 結構數據的解決方案,可以做到幾乎無延遲的進行 IO 操作,並且可以支持高達千萬級別的 QPS,更重要的是 Cellar 支持很多原子操作,運用在實時數據計算上是一個不錯的選擇。所以作為數據的落腳點,本系統選擇了Cellar。
但是在數據計算的過程中會遇到一些問題,比如說統計截止到當前時刻入住旅館的男女比例是多少?很容易就會想到,從 Cellar 中取出截止到當前時刻入住的男生是多少,女生是多少,然後做一個比值就 OK 了。但是本系統是在多線程的環境運行的,如果該時刻有兩對夫婦入住了,產生了兩筆訂單,恰好這兩筆訂單被兩個線程所處理,當線程A將該男士計算到結果中,正要打算將該女士計算到結果中的時候,線程B已經計算完結果了,那麼線程B計算出的結果就是2/1,那就出錯啦。
所以為了保證數據在多線程處理時數據計算的正確性,我們需要用到分散式鎖。實現分散式鎖的方式有很多,本文就不贅述了。這裡給大家介紹一種更簡單快捷的方法。Cellar 中有個 setNx 函數,該函數是原子的,並且是(Set If Not Exists),所以用該函數鎖住關鍵的欄位就可以。就上面的例子而言,我們可以鎖住該旅館的唯一 ID 欄位,計算完之後 delete 該鎖,這樣就可以保證了計算的正確性。
另外一個重要的問題是 Cellar 不支持事務,就會導致該計算系統在升級或者重啟時會造成少量數據的不準確。為了解決該問題,運用到一種 getset 原子思想的方法。如下:
public void doSomeWork(String input) {
cellar.mapPut("uniq_ID");
cellar.add("uniq_ID_1","some data");
cellar.add("uniq_ID_2","some data again");
....
cellar.mapRemove("uniq_ID");
}
如果上述代碼執行到[2..5]某一行時系統重啟了,導致後續的操作並沒有完成,如何將沒有完成的操作添加上去呢?如下:
public void remedySomething() {
map = cellar.mapGetAll();
version = cellar.mapGet("uniq_ID").getVersion();
for (string str : map) {
if (cellar.get(str + "_1").getVersion()!= version) {
cellar.add(str + "_1", "some data");
cellar.mapRemove(str);
}
.......
}
}
正如代碼里那樣,會有一個容器記錄了哪些數據正在被操作,當系統重啟的時候,從該容器取出上次未執行完的數據,用 Version(版本號)來記錄哪些操作還沒有完成,將沒有完成的操作補上,這樣就可以保證了計算結果的準確性。起初 Version(版本號)被設計出來解決的問題是防止由於數據的並發更新導致的問題。
比如,系統有一個 value 為「a,b,c」,A和B同時get到這個 value。A執行操作,在後面添加一個d,value 為 「a,b,c,d」。B執行操作添加一個e,value為」a,b,c,e」。如果不加控制,無論A和B誰先更新成功,它的更新都會被後到的更新覆蓋。Tair 無法解決這個問題,但是引入了version 機制避免這樣的問題。還是拿剛才的例子,A和B取到數據,假設版本號為10,A先更新,更新成 功後,value 為」a,b,c,d」,與此同時,版本號會變為11。當B更新時,由於其基於的版本號是10,伺服器會拒絕更新,從而避免A的更新被覆蓋。B可以選擇 get 新版本的 value,然後在其基礎上修改,也可以選擇強行更新。
將 Version 運用到事務的解決上也算是一種新型的使用。為驗證該功能的正確性,灰度發布期間每天不同時段對項目進行殺死並重啟,並對數據正確性進行校驗,數據的正確性為100%。
實時數據存儲
為了契合更多的需求,將數據分為三部分存儲。
Kafka:存儲稍加工之後的明細數據,方便做更多的擴展。
MySQL:存儲中間的計算結果數據,方便計算過程的可視化。
Cellar:存儲最終的結果數據,供應用層直接查詢使用。
應用案例
美團開店寶的實時經營數據卡片
。- 美團開店寶作為美團商家的客戶端,支持著眾多餐飲商家的輔助經營,而經營數據的實時性對影響商家決策尤為重要。該功能上線之後受到了商家的熱烈歡迎。卡片展示如下圖:
美團點評金融合作門店的實時熱度標籤
。- 該功能用於與美團點評金融合作商家增加支付標籤,用以突出這些商家,增加營銷點。另一方面為優質商家吸引更多流量,為平台帶來更多收益。展示如下圖:
總結與展望
以上就是該系統的設計框架與思路,並且部分功能已應用到系統中。為了商家更好的決策,用戶更好的體驗,在業務不斷增長的情況下,對實時數據的分析就需要做到更全面。所以實時數據分析還有很多東西可以去做。
老生常談的大數據 4V+1O 特徵,即數據量大(Volume)、類型繁多(Variety)、價值密度低(Value)、速度快時效性高(Velocity)、數據在線(Online),相比離線數據系統,對實時數據的計算和應用挑戰尤其艱巨。在技術框架演進層面,對流式數據進行高度抽象,簡化開發流程;在應用端,我們後續希望在數據大屏、用戶行為分析產品、營銷效果跟蹤等 DW/BI 產品進行持續應用,通過加快數據流轉的速度,更好的發揮數據價值。
※callback Promise async await 非同步回調 案例
TAG:程序員小新人學習 |