當前位置:
首頁 > 知識 > Spark的ShuffleManager

Spark的ShuffleManager

ShuffleManager的主要職責是shuffle過程的執行、計算和處理。包括HashShuffleManager和SortShuffleManager。1.2版本以前的Spark使用HashShuffleManager,1.2版本以後使用SortShuffleManager。

1.未經優化的HashShuffleManager

在shuffle write階段,也就是一個stage結束之後,每個task對自己處理的數據進行哈希,根據哈希結果,將相同key的數據寫入同一個磁碟文件,每個磁碟文件屬於下一個stage的一個task。

寫入數據時,首先將數據寫入內存緩衝中,緩衝填滿後,溢寫到磁碟中。

假設下一個stage的task的數量為100,當前stage的每個task都將創建100個磁碟文件,假設當前stage的task數量為50,則將創建5000個磁碟文件。這樣數量巨大的磁碟文件會導致大量IO影響性能。

接下來介紹shuffle read的過程,當一個stage開始時,這個stage的每個task會去上游stage的所有task所在節點通過網路拉取屬於自己的磁碟文件,然後進行聚合或連接等操作。

shuffle read是一邊拉取一邊聚合的,每個shuffle read task有自己的buffer,每次拉取和buffer同樣大數據,在內存中使用Map進行聚合。聚合完一批數據後再拉取下一批數據。

2.優化的HashShuffleManager

通過設置spark.shuffle.consolidateFiles參數HashShuffleManager的優化。

在shuffle write階段,第一批並行執行的task會創建shuffleFileGroup,並將數據寫入。當這批rask執行完後,下一批task復用這些shuffleFileGroup。

也就是說,假設每個Executor只有一個CPU core,每個Executor分配5個task,但由於CPU core的數量每次只能並行執行一個task,假設有10個Executor,並發執行的task數量為10,創建10個shuffleFileGroup,下一批task復用這些shuffleFileGroup。假設下游stage有100個task,則創建磁碟文件數量為1000個。

3.SortShuffleManager(普通運行機制)

在shuffle write階段,首先將數據寫入一個內存數據結構,如果達到某個臨界閾值,就溢寫到磁碟中。

溢寫到磁碟之前,先對key進行排序。排序過後,分批將數據寫到磁碟。這樣可以減少磁碟IO數量,提高性能。

上游的每個task再將數據寫入內存數據結構時會發生多次磁碟溢寫操作,所以會產生多個臨時磁碟文件。接下來進行merge,對所有磁碟文件合併。最終,每個task對應一個磁碟文件。還需要寫一份索引文件,標識下游各個task的數據在文件中的start offset 和end offset。

假設上游stage有50個task,不管下游有多少task,磁碟文件數量為50。

4.SortShuffleManager(bypass運行機制)

bypass機制出發條件:

(1)shuffle write task數量小於spark.shuffle.sort.bypassMergeThreshold。

(2)不是聚合類shuffle運算元。

上游task為每個下游task創建一個臨時磁碟文件,然後將所有磁碟文件合併,並創建索引文件。

Spark的ShuffleManager

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 程序員小新人學習 的精彩文章:

常用cookie處理方法工具類
tomcat集群和session共享

TAG:程序員小新人學習 |