當前位置:
首頁 > 科技 > Spark Streaming 實時計算在甜橙金融監控系統中的應用及優化

Spark Streaming 實時計算在甜橙金融監控系統中的應用及優化

接收程序員的技術早餐

作者|張璐波

編輯|小智

1

寫在前面

天翼電子商務有限公司(簡稱「甜橙金融」)是中國電信的全資子公司,2011 年 3 月成立於北京,作為中國人民銀行核准的第三方支付機構,是兼具金融、電信、互聯網文化的跨界國家高新技術企業。目前公司對實時性計算的需要及應用越來越多,本文選取了其中之一的 Spark Streaming 來介紹如何實現高吞吐量並具備容錯機制的實時流應用。在甜橙金融監控系統項目中,需要對每天億萬級(10T)的日誌記錄進行實時的指標統計,在生產者一端,我們通過 Flume 將數據存入 Kafka 當中, 而在消費者一端,我們利用 Spark Streaming 從 Kafka 中不斷的拉取數據進行指標統計並存入外部存儲中。

本文將從以下幾個方面進行介紹,目的是帶領大家對實時流處理有個初步的認識,一起交流學習。

監控系統架構及存在的主要問題

Spark Streaming 流處理框架介紹

Streaming 相關的優化

Streaming 任務的監控

寫在最後

2

監控系統架構及存在的主要問題

系統架構介紹

整個實時監控系統的架構是先由 Flume 收集伺服器產生的日誌 Log 和前端埋點數據, 然後實時把這些信息發送到 Kafka 分散式發布訂閱消息系統,接著由 Spark Streaming 消費 Kafka 中的消息,同時消費記錄由 Zookeeper 集群統一管理,這樣即使 Kafka 宕機重啟後也能找到上次的消費記錄繼而進行消費。在這裡 Spark Streaming 首先從 MySQL 讀取規則然後進行 ETL 清洗並計算多個聚合指標,最後將結果的一部分存儲到 Hbase 中,另一部分重新發回到 Kafka 中再消費更新到 MySQL 中,監控前端實時獲取指標進行展示。

主要問題

在上面的框架介紹中,下游監控系統的指標數據來源於 Spark Streaming 的實時計算,可見 Streaming 計算處於極為重要的環節,而計算性能不足就會成為整個系統的瓶頸。大部分時候我們 Spark 指標計算都能應付過來,但是在節日流量翻倍的情況下就力不從心了,為應對這種情況之前採取的措施一般是關閉一些非關鍵性日誌介面把監控流量降下來。雖然此舉能暫時解決問題,但仍需要治標更治本的方法。

首先來看看優化前 Streaming 的計算能力。

圖一所示為每批次(30 秒)800W+ 日誌流量下,Spark Streaming 計算大概需要 50 多秒。雖無明顯延時,但計算能力很弱雞 14w/s

圖 1

隨著流量不斷的增大,如圖 2 所示為每批次(時間 30 秒)1000W+ 條日誌流量下,Spark 計算已嚴重超時,越來越多的 batch 加入到 queue 的隊列等待處理,此時監控系統基本失效。

圖 2

既然痛點已找到,那麼剩下要做的就是想辦法去優化。下文在講如何優化前,先帶大家認識下流式處理框架中的兩個經典好搭檔 Spark Streaming + Kafka。

3

Spark Streaming + Kafka 流處理框架

為什麼選擇 Spark Streaming 和 Kafka

Kafka 支持分散式及出色的吞吐量

Spark Streaming 流式處理框架已被各大公司廣泛應用且成熟度高,支持大部分的數據源和存儲,如下圖所示其豐富生態圈

Kafka 與 Spark Streaming 集成度高

Spark Streaming 初識

Spark Streaming 接受實時輸入數據並將數據切分成多個 batches, 然後由 Spark engine 進行計算並將結果輸出到外部存儲。

接下來看看 Spark Streaming 從 Kafka 中接受數據的兩種方式。

基於 Receiver 方式

這種方式使用 Receiver 方式接受數據,實現是以 Kafka 高階用戶 API 介面,收到的數據會存到 Spark executor,之後 Spark Streaming 提交 Job 處理這些數據。為了保證數據不會丟失,需要開啟 Write Ahead Logs,流程如下圖所示:

基於 Direct 方式

在 Spark 1.3 之後,引入了 Direct 方式以提供更強的端到端的保證。不同於 Receiver 方式,其會周期性的獲取 Kafka 每個 topic 中每個 Partition 最新的 offsets。之後 Spark job 會基於 Kafka simple API 讀取 Kafka 相應 Offset 數據並進行處理,流程如下圖所示:

該方式相對於 Receiver 方式具有以下優勢:

簡化的並行度:基於 Receiver 的方式中要提高數據傳輸並行度我們需要創建多個 Receiver 實例之後再 Union 起來合併成一個 Dstream。而 Direct 方式中提供了更為簡單的映射關係,Kafka 中的 partition 與 Spark RDD 中的 partition 是一一映射的,因而可以並行讀取數據。

高效性:在 Receiver 的方式中,為了達到零數據丟失需要將數據備份到 Write Ahead Log 中,這樣系統中就保存了兩份數據浪費資源。而 Direct 方式只要知道當前消費的 Offsets 就能恢復出相應的數據。

精確一次的語義保證:基於 Receiver 的方式中,通過 Kafka 的高階 API 介面從 Zookeeper 中獲取 offset 值,這也是傳統的從 Kafka 中讀取數據的方式,但由於 Spark Streaming 消費的數據和 Zookeeper 中記錄的 offset 不同步,這種方式偶爾會造成數據重複消費。而第二種方式,直接使用了簡單的低階 Kafka API,Offsets 可以利用 Spark Streaming 的 checkpoints 進行記錄來消除這種不一致性。

以上翻譯自官方文檔。既然 Direct 方式有這麼多優點,那麼在我們的監控系統中理所當然也用了這種方式,同時為了能使基於 Zookeeper 的 Kafka monitor 工具生效,我們也實現了 Offset 的管理,具體流程如下:

Spark Streaming 任務啟動後首先去 Zookeeper 中去讀取特定 topic 中每個 Partition 的 offset 並組裝 fromOffsets 變數;

Spark Streaming 獲取到 fromOffsets 後通過 KafkaUtils.createDirectStream 去消費 Kafka 的數據;

讀取 Kafka 數據然後進行批的邏輯處理,如下圖所示為該 Job 的 DAG,包括一些基本的 RDD 運算元操作 (flatMap, reduceByKey, Map 等), 並將計算結果存儲到 Hbase 和回吐到 Kafka 中,最後更新 offsets 到 Zookeeper 中。

4

Spark Streaming 性能優化及任務監控

重點來了,那麼說起優化,我們首先想到的就是最大限度利用集群資源,將硬體性能壓榨到極致,先看看如何在用 spark-submit 提交命令的時候進行資源調優。

資源參數調優

增加 Driver 和 Executor 的內存(driver-memory、executor-memory)

通過增加 Driver 和 Executor 的內存數量,可以減小程序 Out of memory 和 意外崩潰 產生的概率,當然也不能無限制增加以免造成資源的浪費或者導致其它任務申請資源失敗。

設置合理的 CPU 個數

--num-executors 和 --executor-cores 兩個參數配合使用來調節計算資源佔有情況。通常對於集群中一定量的 CPU Core,設置較多的 Executor 個數和較少的 Executor core 個數來達到資源最大使用率。

結合內存和 CPU 參數,我們來舉個例子,看看怎麼設置會比較合理。

假設在擁有 6 個節點,每個節點有 16 個 Core 和 64G 內存集群中提交 Job, 一種可能的配置參數如下:

--num-executors 6 –executor-cores 15 –executor-memory 63G

這種方式其實不太合理,原因如下:

由於我們的 OS 以及 Hadoop daemons 要佔用一定內存,因此 yarn.nodemanager.resources.memory-mb 和 yarn.nodemanager.resources.cpu-vcores 不可能佔用 100% 資源,一般是 63 * 1024 和 15Core.

Application master 也會佔用一個 core, 因此在 master 節點上也不可能設置為 15 個 core

每個 executor 設置 15Core 會造成低效的 HDFS I/O 吞吐量

鑒於上面的原因,一種更為合理的的設置是:

--num-executors 17 –executor-cores 5 –executor-memory 19G

增加 parallelism:增加 Spark Partition 數量

Partition 即 Spark 中的數據分區,每個 task 在同一時間只能處理一個 Partition 的數據,這個值不能設置的太小也不能設置的太大。

設置的太大,每個分區中的數據很少,因此會需要更多的 task 來處理這些數據,增加任務調度器的負擔

設置的太小,每個分區中的數據很多,也會對內存造成壓力,executor 無法最大程度利用集群計算資源。

此外在 Spark Streaming + Kafka 的案例中,我們採用 Direct 方式從 Kafka 中獲取數據,此時 Kafka partition 的數量和 Spark RDD 的分區數量是 1:1 映射的關係,而調優之前該 topic 創建時的分區數量是 64,並發度太小導致集群資源利用不夠。我們一開始採取的優化方式是創建 InputDstream 之後先 Repartition 到一個更大的並行度,然後進行邏輯計算,結果證明該方式較之前性能上有一定提升但還是沒有達到我們想要的理想結果,這是由於 repartition 會造成 Shuffle 操作,而 Shuffle 比較耗時,會引起大量的磁碟 IO, 序列化、反序列化、網路數據傳輸等操作,因此要盡量避免。之後我們直接從數據源頭 Kafka 那邊增加 Topic 分區數(240),從而極大的提升了處理效率。如圖所示:

設置合理的批處理時間和 Kafka 數據拉取速率

使用 Kryo 序列化

Spark Streaming 在傳輸、使用對象的時候要用到序列化和反序列化,而 Kryo 序列化方式比 Java 序列化機制性能高 10 倍,因此我們可在使用的時候註冊自定義類型,如下函數所示:

設置 Streaming job 的並行度

這裡的 job 主要由兩個參數決定:

在每個 batch 內,可能有一批 Streaming job, 默認是 1,這些 job 由 jobExecutor 執行並提交,而 JobExecutor 是一個默認池子大小為 1 的線程池,大小由參數 Spark.streaming 。concurrentjobs 控制。如果 concurrentjobs 設置為 2,那麼只要資源允許,那麼會同時提交執行兩個 job,否則仍順序執行。

開發調優

Hbase 輸出操作

在我們的項目中,需要將 Spark Streaming 計算完的結果存入到 Hbase 中,這裡我們採用的是批量 Put 數據到 Hbase 中,而非每次插入單條數據,參考如下事例:

輸出到 Kafka

此外我們還會將計算結果回吐到 Kafka 中。通常你可能會 Google 「Spark Streaming to kafka」來尋找案例,而大多數情況你會找到下面這樣的例子,當然很大程度上你也會這麼寫。針對 Partition 中的每條數據建立一個 Kafka Producer, 然後再發送數據,這種做法不靈活且低效。

比較高效的做法有兩種:

定義 Kafka producer 為 lazy 並廣播到每個 executor 上,之後就可以用這個 producer 發送數據,事例如下:

使用也比較方便:

或者使用單例模式:

遇到的坑

經過上述調優方案後,Spark Streaming 實時處理能力較之前有了質的提高,但是我們也經常會發現一些異常現象。在流量逐步升高的情況下,會出現丟包的情況,Streaming 的計算性能也受到了很大的影響。通過使用 Zabbix 工具查看網卡流量,發現有時候 eth3 網卡出口流量能達到 638Mbps, 如下圖所示,而我們的網卡是千兆網,並且在存在多個 kafka Consumer 的情況下就不難解釋之前的丟包現象了,同樣 spark 計算過程中需要傳輸數據,因為受到帶寬的限制也會導致計算性能的下降。

隨後我們將 Kafka 集群中的網卡換到萬兆,重新提交 Spark Streaming 任務後發現計算性能提升數倍:上圖為調優前約 15w/s 的處理量,下圖為調優後每秒 50w/s 的處理量。

VS

當在一個 Batch 時間內輸入數據達到 1000W 以上事件時,Streaming 仍能很好的 handle,計算性能仍是 50W+/s 的處理速率,相比調優前基本失效的狀態也大大提高了穩定性。

任務監控

對於 Spark Streaming 任務的監控可以直觀的通過 Spark Web UI ,該頁面包括 Input Rate, Scheduling Delay、Processing Time 等,但是這種方法運維成本較高,需要人工不間斷的巡視。

另一種推薦的方式可以通過 StreamingListener 介面獲取 Scheduling Delay 和 Processing Time,事例如下:

除此之外你還可以自己寫 Python 腳本在 yarn 管理界面解析該應用的 ApplicationMaster 的地址,之後再通過 Spark Streaming 的 UI 去獲取相關參數。

5

寫在最後

目前我們在做 Structured Streaming 的測試,相關文檔參見:

http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

該實時流框架端到端的延遲為 100ms,而且在 Spark 最新版本 2.3 中支持 Continuous Processing 模式,延遲能降到更低 1ms,對比 Spark Streaming 就要好很多。

總之性能優化的路還很長,這就需要我們不斷的嘗試新的技術新的框架,最後希望本文能給正在做 spark streaming 實時流優化的同學帶來一些幫助,歡迎大家一起交流。

6

參考文獻:

1.http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/

2.https://ngorchakova.github.io/jvmwarstories/spark-kafka-sink/

作者介紹

張璐波,甜橙金融大數據實時計算專家,高級架構師,對海量數據實時處理有深入的研究,涉及 Kafka, Spark Streaming, Structured Streaming, Flink, Strom 等開源流計算框架。目前主要工作集中在實時風控,智能風控以及流計算平台的搭建。


喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 InfoQ 的精彩文章:

你被大數據「殺熟」過嗎?怎麼解決的?
有這幾樣神器,還怕人工智慧搞不定?

TAG:InfoQ |