美團點評基於 Flink 的實時數倉建設實踐
引言
近些年,企業對數據服務實時化服務的需求日益增多。本文整理了常見實時數據組件的性能特點和適用場景,介紹了美團如何通過 Flink 引擎構建實時數據倉庫,從而提供高效、穩健的實時數據服務。此前我們美團技術博客發布過一篇文章《流計算框架 Flink 與 Storm 的性能對比》,對 Flink 和 Storm 倆個引擎的計算性能進行了比較。本文主要闡述使用 Flink 在實際數據生產上的經驗。
實時平台初期架構
在實時數據系統建設初期,由於對實時數據的需求較少,形成不了完整的數據體系。我們採用的是「一路到底」的開發模式:通過在實時計算平台上部署 Storm 作業處理實時數據隊列來提取數據指標,直接推送到實時應用服務中。
圖1 初期實時數據架構
但是,隨著產品和業務人員對實時數據需求的不斷增多,新的挑戰也隨之發生。
數據指標越來越多,「煙囪式」的開發導致代碼耦合問題嚴重。
需求越來越多,有的需要明細數據,有的需要 OLAP 分析。單一的開發模式難以應付多種需求。
缺少完善的監控系統,無法在對業務產生影響之前發現並修復問題。
實時數據倉庫的構建
為解決以上問題,我們根據生產離線數據的經驗,選擇使用分層設計方案來建設實時數據倉庫,其分層架構如下圖所示:
圖2 實時數倉數據分層架構
該方案由以下四層構成:
ODS 層:Binlog 和流量日誌以及各業務實時隊列。
數據明細層:業務領域整合提取事實數據,離線全量和實時變化數據構建實時維度數據。
數據匯總層:使用寬表模型對明細數據補充維度數據,對共性指標進行匯總。
App 層:為了具體需求而構建的應用層,通過 RPC 框架對外提供服務。
通過多層設計我們可以將處理數據的流程沉澱在各層完成。比如在數據明細層統一完成數據的過濾、清洗、規範、脫敏流程;在數據匯總層加工共性的多維指標匯總數據。提高了代碼的復用率和整體生產效率。同時各層級處理的任務類型相似,可以採用統一的技術方案優化性能,使數倉技術架構更簡潔。
技術選型
1.存儲引擎的調研
實時數倉在設計中不同於離線數倉在各層級使用同種儲存方案,比如都存儲在 Hive 、DB 中的策略。首先對中間過程的表,採用將結構化的數據通過消息隊列存儲和高速 KV 存儲混合的方案。實時計算引擎可以通過監聽消息消費消息隊列內的數據,進行實時計算。而在高速 KV 存儲上的數據則可以用於快速關聯計算,比如維度數據。 其次在應用層上,針對數據使用特點配置存儲方案直接寫入。避免了離線數倉應用層同步數據流程帶來的處理延遲。 為了解決不同類型的實時數據需求,合理的設計各層級存儲方案,我們調研了美團內部使用比較廣泛的幾種存儲方案。
表1 存儲方案列表
方案 優勢 劣勢
MySQL 1. 具有完備的事務功能,可以對數據進行更新。2. 支持 SQL,開發成本低。 1. 橫向擴展成本大,存儲容易成為瓶頸; 2. 實時數據的更新和查詢頻率都很高,線上單個實時應用請求就有 1000+ QPS;使用 MySQL 成本太高。
Elasticsearch 1. 吞吐量大,單個機器可以支持 2500+ QPS,並且集群可以快速橫向擴展。2. Term 查詢時響應速度很快,單個機器在 2000+ QPS時,查詢延遲在 20 ms以內。 1. 沒有原生的 SQL 支持,查詢 DSL 有一定的學習門檻;2. 進行聚合運算時性能下降明顯。
Druid 1. 支持超大數據量,通過 Kafka 獲取實時數據時,單個作業可支持 6W+ QPS;2. 可以在數據導入時通過預計算對數據進行匯總,減少的數據存儲。提高了實際處理數據的效率;3. 有很多開源 OLAP 分析框架。實現如 Superset。 1. 預聚合導致無法支持明細的查詢;2. 無法支持 Join 操作;3. Append-only 不支持數據的修改。只能以 Segment 為單位進行替換。
Cellar 1. 支持超大數據量,採用內存加分散式存儲的架構,存儲性價比很高;2. 吞吐性能好,經測試處理 3W+ QPS 讀寫請求時,平均延遲在 1ms左右;通過非同步讀寫線上最高支持 10W+ QPS。 1. 介面僅支持 KV,Map,List 以及原子加減等;2. 單個 Key 值不得超過 1KB ,而 Value 的值超過 100KB 時則性能下降明顯。
根據不同業務場景,實時數倉各個模型層次使用的存儲方案大致如下:
圖3 實時數倉存儲分層架構
數據明細層 對於維度數據部分場景下關聯的頻率可達 10w+ TPS,我們選擇 Cellar(美團內部存儲系統) 作為存儲,封裝維度服務為實時數倉提供維度數據。
數據匯總層 對於通用的匯總指標,需要進行歷史數據關聯的數據,採用和維度數據一樣的方案通過 Cellar 作為存儲,用服務的方式進行關聯操作。
數據應用層 應用層設計相對複雜,再對比了幾種不同存儲方案後。我們制定了以數據讀寫頻率 1000 QPS 為分界的判斷依據。對於讀寫平均頻率高於 1000 QPS 但查詢不太複雜的實時應用,比如商戶實時的經營數據。採用 Cellar 為存儲,提供實時數據服務。對於一些查詢複雜的和需要明細列表的應用,使用 Elasticsearch 作為存儲則更為合適。而一些查詢頻率低,比如一些內部運營的數據。 Druid 通過實時處理消息構建索引,並通過預聚合可以快速的提供實時數據 OLAP 分析功能。對於一些歷史版本的數據產品進行實時化改造時,也可以使用 MySQL 存儲便於產品迭代。
2.計算引擎的調研
在實時平台建設初期我們使用 Storm 引擎來進行實時數據處理。Storm 引擎雖然在靈活性和性能上都表現不錯。但是由於 API 過於底層,在數據開發過程中需要對一些常用的數據操作進行功能實現。比如表關聯、聚合等,產生了很多額外的開發工作,不僅引入了很多外部依賴比如緩存,而且實際使用時性能也不是很理想。同時 Storm 內的數據對象 Tuple 支持的功能也很簡單,通常需要將其轉換為 Java 對象來處理。對於這種基於代碼定義的數據模型,通常我們只能通過文檔來進行維護。不僅需要額外的維護工作,同時在增改欄位時也很麻煩。綜合來看使用 Storm 引擎構建實時數倉難度較大。我們需要一個新的實時處理方案,要能夠實現:
提供高級 API,支持常見的數據操作比如關聯聚合,最好是能支持 SQL。
具有狀態管理和自動支持久化方案,減少對存儲的依賴。
便於接入元數據服務,避免通過代碼管理數據結構。
處理性能至少要和 Storm 一致。
我們對主要的實時計算引擎進行了技術調研。總結了各類引擎特性如下表所示:
表2 實時計算方案列表
項目/引擎 Storm Flink spark-treaming
API 靈活的底層 API 和具有事務保證的 Trident API 流 API 和更加適合數據開發的 Table API 和 Flink SQL 支持 流 API 和 Structured-Streaming API 同時也可以使用更適合數據開發的 Spark SQL
容錯機制 ACK 機制 State 分散式快照保存點 RDD 保存點
狀態管理 Trident State狀態管理 Key State 和 Operator State兩種 State 可以使用,支持多種持久化方案 有 UpdateStateByKey 等 API 進行帶狀態的變更,支持多種持久化方案
處理模式 單條流式處理 單條流式處理 Mic batch處理
延遲 毫秒級 毫秒級 秒級
語義保障 At Least Once,Exactly Once Exactly Once,At Least Once At Least Once
從調研結果來看,Flink 和 Spark Streaming 的 API 、容錯機制與狀態持久化機制都可以解決一部分我們目前使用 Storm 中遇到的問題。但 Flink 在數據延遲上和 Storm 更接近,對現有應用影響最小。而且在公司內部的測試中 Flink 的吞吐性能對比 Storm 有十倍左右提升。綜合考量我們選定 Flink 引擎作為實時數倉的開發引擎。
更加引起我們注意的是,Flink 的 Table 抽象和 SQL 支持。雖然使用 Strom 引擎也可以處理結構化數據。但畢竟依舊是基於消息的處理 API ,在代碼層層面上不能完全享受操作結構化數據的便利。而 Flink 不僅支持了大量常用的 SQL 語句,基本覆蓋了我們的開發場景。而且 Flink 的 Table 可以通過 TableSchema 進行管理,支持豐富的數據類型和數據結構以及數據源。可以很容易的和現有的元數據管理系統或配置管理系統結合。通過下圖我們可以清晰的看出 Storm 和 Flink 在開發統過程中的區別。
圖4 Flink - Storm 對比圖
在使用 Storm 開發時處理邏輯與實現需要固化在 Bolt 的代碼。Flink 則可以通過 SQL 進行開發,代碼可讀性更高,邏輯的實現由開源框架來保證可靠高效,對特定場景的優化只要修改 Flink SQL 優化器功能實現即可,而不影響邏輯代碼。使我們可以把更多的精力放到到數據開發中,而不是邏輯的實現。當需要離線數據和實時數據口徑統一的場景時,我們只需對離線口徑的 SQL 腳本稍加改造即可,極大地提高了開發效率。同時對比圖中 Flink 和 Storm 使用的數據模型,Storm 需要通過一個 Java 的 Class 去定義數據結構,Flink Table 則可以通過元數據來定義。可以很好的和數據開發中的元數據,數據治理等系統結合,提高開發效率。
Flink使用心得
在利用 Flink-Table 構建實時數據倉庫過程中。我們針對一些構建數據倉庫的常用操作,比如數據指標的維度擴充,數據按主題關聯,以及數據的聚合運算通過 Flink 來實現總結了一些使用心得。
1.維度擴充
數據指標的維度擴充,我們採用的是通過維度服務獲取維度信息。雖然基於 Cellar 的維度服務通常的響應延遲可以在 1ms 以下。但是為了進一步優化 Flink 的吞吐,我們對維度數據的關聯全部採用了非同步介面訪問的方式,避免了使用 RPC 調用影響數據吞吐。
對於一些數據量很大的流,比如流量日誌數據量在 10W 條/秒這個量級。在關聯 UDF 的時候內置了緩存機制,可以根據命中率和時間對緩存進行淘汰,配合用關聯的 Key 值進行分區,顯著減少了對外部服務的請求次數,有效的減少了處理延遲和對外部系統的壓力。
2.數據關聯
數據主題合併,本質上就是多個數據源的關聯,簡單的來說就是 Join 操作。Flink 的 Table 是建立在無限流這個概念上的。在進行 Join 操作時並不能像離線數據一樣對兩個完整的表進行關聯。採用的是在窗口時間內對數據進行關聯的方案,相當於從兩個數據流中各自截取一段時間的數據進行 Join 操作。有點類似於離線數據通過限制分區來進行關聯。同時需要注意 Flink 關聯表時必須有至少一個「等於」關聯條件,因為等號兩邊的值會用來分組。
由於 Flink 會緩存窗口內的全部數據來進行關聯,緩存的數據量和關聯的窗口大小成正比。因此 Flink 的關聯查詢,更適合處理一些可以通過業務規則限制關聯數據時間範圍的場景。比如關聯下單用戶購買之前 30 分鐘內的瀏覽日誌。過大的窗口不僅會消耗更多的內存,同時會產生更大的 Checkpoint ,導致吞吐下降或 Checkpoint 超時。在實際生產中可以使用 RocksDB 和啟用增量保存點模式,減少 Checkpoint 過程對吞吐產生影響。對於一些需要關聯窗口期很長的場景,比如關聯的數據可能是幾天以前的數據。對於這些歷史數據,我們可以將其理解為是一種已經固定不變的"維度"。可以將需要被關聯的歷史數據採用和維度數據一致的處理方法:"緩存 + 離線"數據方式存儲,用介面的方式進行關聯。另外需要注意 Flink 對多表關聯是直接順序鏈接的,因此需要注意先進行結果集小的關聯。
3.聚合運算
使用聚合運算時,Flink 對常見的聚合運算如求和、極值、均值等都有支持。美中不足的是對於 Distinct 的支持,Flink-1.6 之前的採用的方案是通過先對去重欄位進行分組再聚合實現。對於需要對多個欄位去重聚合的場景,只能分別計算再進行關聯處理效率很低。為此我們開發了自定義的 UDAF,實現了 MapView 精確去重、BloomFilter 非精確去重、 HyperLogLog 超低內存去重方案應對各種實時去重場景。但是在使用自定義的 UDAF 時,需要注意 RocksDBStateBackend 模式對於較大的 Key 進行更新操作時序列化和反序列化耗時很多。可以考慮使用 FsStateBackend 模式替代。另外要注意的一點 Flink 框架在計算比如 Rank 這樣的分析函數時,需要緩存每個分組窗口下的全部數據才能進行排序,會消耗大量內存。建議在這種場景下優先轉換為 TopN 的邏輯,看是否可以解決需求。
下圖展示一個完整的使用 Flink 引擎生產一張實時數據表的過程:
圖5 實時計算流程圖
實時數倉成果
通過使用實時數倉代替原有流程,我們將數據生產中的各個流程抽象到實時數倉的各層當中。實現了全部實時數據應用的數據源統一,保證了應用數據指標、維度的口徑的一致。在幾次數據口徑發生修改的場景中,我們通過對倉庫明細和匯總進行改造,在完全不用修改應用代碼的情況下就完成全部應用的口徑切換。在開發過程中通過嚴格的把控數據分層、主題域劃分、內容組織標準規範和命名規則。使數據開發的鏈路更為清晰,減少了代碼的耦合。再配合上使用 Flink SQL 進行開發,代碼加簡潔。單個作業的代碼量從平均 300+ 行的 JAVA 代碼 ,縮減到幾十行的 SQL 腳本。項目的開發時長也大幅減短,一人日開發多個實時數據指標情況也不少見。
除此以外我們通過針對數倉各層級工作內容的不同特點,可以進行針對性的性能優化和參數配置。比如 ODS 層主要進行數據的解析、過濾等操作,不需要 RPC 調用和聚合運算。 我們針對數據解析過程進行優化,減少不必要的 JSON 欄位解析,並使用更高效的 JSON 包。在資源分配上,單個 CPU 只配置 1GB 的內存即可滿需求。而匯總層主要則主要進行聚合與關聯運算,可以通過優化聚合演算法、內外存共同運算來提高性能、減少成本。資源配置上也會分配更多的內存,避免內存溢出。通過這些優化手段,雖然相比原有流程實時數倉的生產鏈路更長,但數據延遲並沒有明顯增加。同時實時數據應用所使用的計算資源也有明顯減少。
展望
我們的目標是將實時倉庫建設成可以和離線倉庫數據準確性,一致性媲美的數據系統。為商家,業務人員以及美團用戶提供及時可靠的數據服務。同時作為到餐實時數據的統一出口,為集團其他業務部門助力。未來我們將更加關注在數據可靠性和實時數據指標管理。建立完善的數據監控,數據血緣檢測,交叉檢查機制。及時對異常數據或數據延遲進行監控和預警。同時優化開發流程,降低開發實時數據學習成本。讓更多有實時數據需求的人,可以自己動手解決問題。
※Rocketmq之消息隊列分配策略演算法實現的源碼分析
※跳轉語句與數組基礎
TAG:程序員小新人學習 |