當前位置:
首頁 > 知識 > Spark 數據傾斜調優

Spark 數據傾斜調優

一、what is a shuffle?

1.1 shuffle簡介

一個stage執行完後,下一個stage開始執行的每個task會從上一個stage執行的task所在的節點,通過網路傳輸獲取task需要處理的所有key,然後每個task對相同的key進行運算元操作,這個過程就是shuffle過程。

我們常說的shuffle過程之所以慢是因為有大量的磁碟IO以及網路傳輸操作。spark中負責shuffle的組件主要是ShuffleManager,在spark1.1之前採用的都是HashShuffleManager,在1.1之後開始引入效果更優SortShuffleManager,並在1.2開始默認使用SortShuffleManager。

1.2 HashShuffleManager

我們來看下最初的ShuffleManager:HashShuffleManager中shuffle的讀寫過程

從上圖我們可以看出,Executor中每個core對應的task在shuffle寫的時候都會產生和下一個stage包含task數目一樣的磁碟文件,也就是說下一個stage包含多少個task(即Reducer),當前stage的task就會產生多少個磁碟文件。那麼100個單核的Executor,當前stage有200個task,每個Executor負責執行2個task,下一個stage有100個task,那麼一次shuffle write需要產生200*100=20000個磁碟文件。每個buffer(即圖中的bucket)的大小默認為32KB(Spark1.1中默認是100KB,可以通過spark.shuffle.file.buffer.kb來設置);在shuffle read階段每個task從上一個stage中的每一個task中通過網路傳輸拉取相同key的數據進行聚合等shuffle操作。所以產生的磁碟文件越多,shuffle read的IO操作就越頻繁,且大量的buffer將對Executor的存儲空間產生巨大的壓力。

Spark團隊對針對「磁碟文件多」這一弊端進行了優化,優化後的HashShuffleManager的shuffle的讀寫過程:

從上圖我們可以看出,下一個stage的每個task的入度變成了優化前的一半,主要是因為每個core都產生了和下一個stage的task相同數目的磁碟文件,同一core中的不同task復用一批磁碟文件,減少磁碟文件數據,提升shuffle write性能。那麼與上面相同環境下,優化後需要產生的磁碟文件數量為Executor數*Executor的core數*下一個stage的task數=100*1*100=10000。可以通過將spark.shuffle.consolidateFiles設置為true來開啟consolidate機制,即優化後的HashShuffleManager。

1.3 sortShuffleManager

Spark 1.2 後開始默認使用sortShuffleManager

SortShuffleManager主要改進點是在內存溢寫到磁碟文件之前,會根據Partition id以及key對內存數據進行sort排序,然後再分批寫入磁碟文件,分批的batch數量大小為1w條,最後將產生的多個磁碟文件merge成一個磁碟文件,併產生一個索引文件,用以標識下游stage中的各個task的數據在文件中的start offset 和 end offset,直觀來看,一個task僅產生一個磁碟文件和一個索引文件。產生的磁碟文件少了,但增加了排序的性能開銷,如果這部分在你的業務場景下開銷大,那麼可以選擇SortShuffleManager的bypass機制。

在ShuffleManager一路優化的過程中,一個重要優化思想其實就是在減少shuffle過程中產生磁碟文件數量,一個直觀的邏輯:磁碟文件少,上下游stage需要進行的磁碟IO操作就相對少了。而磁碟文件過多會帶來以下問題:

如果磁碟文件多,進行shuffle操作時需要同時打開的文件數多,大量的文件句柄和寫操作分配的臨時內存將對內存和GC帶來壓力,特別是在YARN的模式下,往往Executor分配的內存不足以支持這麼大的內存壓力;

如果磁碟文件多,那麼其所帶來的隨機讀寫需要多次磁碟尋道和旋轉延遲,比順序讀寫的時間多許多倍。

可以通過Spark.shuffle.manager參數來設置使用哪種shuffle manager。

以上我們介紹了what is a shuffle,shuffle write 與 shuffle read的過程,以及為什麼shuffle對spark任務性能消耗大,在整體上了解shuffle之後,我們來了解下如何handle shuffle。

二、判斷定位

spark web ui 上task的執行時間或分配的數據量,如果一般task執行時間只有幾秒,而某些task執行時間是幾分鐘甚至更久,那這部分task對於的stage就出現了數據傾斜,根據之前的stage的劃分方式即可定位哪段代碼中的運算元導致了數據傾斜。

常見的觸發shuffle操作的運算元:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等

三、深究key分布

如果是數據傾斜的數據來源於hive表,那麼我們可以分析下spark sql中key的數據分布情況

如果數據來源於中間的RDD,那麼可以使用RDD.countByKey()來統計不同key出現的次數

如果數據量大,可以使用採樣來分析,比如:

val sampledRDD = shuffleRDD.sample(false, 0.1)

val sampledKeyCounts = sampledRDD.countByKey()

sampledKeyCounts.foreach(println(_))

四、How to fix it?

數據來源於hive表,將導致數據傾斜的shuffle運算元前置到**hive ETL(提取、轉換和載入)**中,之後的spark任務可反覆基於hive ETL後的中間表,保證了spark任務的性能。適用於多次數據計算,且對spark性能要求高的場景。

不是所有的數據都有用,如果filter少數幾個數據量大的key不影響數據結果,那在數據預處理的時候可以進行過濾,或者需要動態判定key是否有用,可以在數據計算前對RDD進行sample採樣,過濾數據量大的key,這樣不僅可以避免數據傾斜,也可以避免相同的代碼在某天突然OOM的情況,有可能這一天有某個平時表現正常的key暴增導致OOM。

shuffle運算元並行操作,我們知道在shuffle過程中,分布在不同task的相同key的數據會通過網路傳輸到同一個task進行shuffle計算,這時候一個task可能會處理多種key的數據,比如k1,k2,k3可能都被拉取到某一個task上進行reduce操作,如果k1,k2,k3的數量比較大,我們可以通過提高reduce的並行度來使得k1,k2,k3的數據分別拉取到t1,t2,t3三個task上計算,怎麼做呢?如果是RDD的shuffle操作,給shuffle運算元傳入一個參數即可,比如reduceByKey(600),如果是Spark SQL的shuffle操作,配置一個shuffle參數:spark.sql.shuffle.partitions,該參數表示shuffle read task的並行度,默認200,可根據業務場景進行修改。

key 散列設計再聚合,spark的shuffle操作導致的數據傾斜問題在一定意義上可以類比HBase的熱點問題,因此HBase的rowkey的散列設計思想可以套用在聚合類的shuffle操作導致的數據傾斜的場景,怎麼做呢?先對key進行hash散列,可以使用隨機數,也可以針對key的具體內容進行hash,目的是將原本數據量大的key先hash成k個的key,那麼原本必須拉取到一個task上進行shuffle計算的數據可以拉取到k個不同的task上計算,在一定程度上可以緩解單個task處理過多數據導致的數據傾斜,然後再對局部聚合後的key去除hash再聚合。這種key散列設計思想在解決join的shuffle操作廣泛使用。

」map join replace 「reduce join」,如果join操作是大小表的join,可以考慮將小表廣播,首先collect到driver的內存中,為其創建一個broadcase變數,這時候Driver和每個Executor都會保存一份小表的全量數據,再在map操作中自定義join的邏輯,在這個join邏輯里,使用已在內存中的全量的小表數據與大表的每一條數據進行key對比連接,用map join來代替普通的reduce join,可以避免數據傾斜。由於需要在內存中存放全量小表,所以小表數據量在一兩G是可取的。


喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 千鋒JAVA開發學院 的精彩文章:

你不能不知道的14個大數據專業辭彙
社交網路大數據的應用有多大的價值

TAG:千鋒JAVA開發學院 |