MapReduce實現之Reduce端重分區Join操作優化!
在前一篇文章中(鏈接參加文末),我們介紹了map端Join操作的幾大方法。一般情況下,我會推薦企業選擇map端的Join操作,這可以節省不小的成本。但是,如果數據集過於龐大以至於沒有合適的map端連接方法適用,則需要使用MapReduce中的shuffle對數據進行排序和連接,並考慮選擇Reduce端的Join操作。
一、重分區Join操作(Reduce端)
本文介紹的第一種方法是最基本的重分區Join操作,該方法允許執行內部和外部Join。開始之前,我們先搞清楚要解決的問題是將大型數據集Join在一起,我們選用的解決方案是Reduce端重分區Join。該方法是一種Reduce端Join實現,利用MapReduce的sortmerge將記錄組合在一起,作為單個MapReduce作業實現,可支持N路連接,其中N是要連接的數據集數量。
Map端負責從數據集中讀取數據,確定每個Join操作的value,並將該value的key輸出,輸出key包含在reducer中並將數據集組合在一起以生成最終結果。
單個reducer調用接收map函數Join操作發出的Key對應的所有值,並將數據分N個分區,其中N是要連接的數據集數量。reducer讀取連接value的所有輸入並將它們分區到內存中,然後跨所有分區執行笛卡爾積,並發出每個Join操作的結果。
圖6.10 重分區Join操作的基本MapReduce實現
MapReduce代碼要支持這種技術,需要滿足以下條件:
- 支持多個map類,每個map類處理不同的輸入數據集,這是通過使用MultipleInputs類完成的。
- 需要一種方法來標記mapper發出的記錄,以便可以與其原點的數據集相關聯,本文將使用htuple項目處理MapReduce中的數據。
重分區Join操作的代碼如下:
可以使用以下命令運行作業並查看輸出:
總結
Hadoop捆綁了一個hadoop-datajoin模塊,這是一個重分區Join操作框架,包括用於處理多個輸入數據集和執行Join操作的管道。上述操作示例及hadoop-datajoin代碼是重分區Join的最基本形式,兩者都要求在執行笛卡爾積之前將連接key的所有數據載入到內存中,但如果連接key的基數大於可用內存,那麼,這種方法就不太適用。下一個技術將著眼解決此問題。
二、優化重分區Join操作
舊版重分區Join操作實現會浪費大量空間,需要將給定key的所有value載入到內存中才能執行多路連接,將較小的數據集載入到內存中才能迭代更大的數據集,沿途執行Join更有效。
我們希望在MapReduce中執行重分區Join,且無需緩存reducer中的所有記錄。優化後的重分區Join框架將僅緩存要連接的其中一個數據集,以減少reducer中緩存的數據量。此優化僅緩存來自兩個數據集中較小者的記錄,以減少緩存所有記錄的內存開銷,圖6.11顯示了改進的重分區Join實現。
圖6.11 重分區Join操作優化MapReduce實現
該技術與舊版相比存在一定差異,此處使用輔助排序確保來自較小數據集的所有記錄在較大數據集的記錄之前到達reducer,以此來儘可能減少reducer中要緩存的數據量。此外,mapper會發出需要進行Join操作的用戶名元組的key以及標識原始數據集的欄位。
以下代碼顯示了一個新的枚舉,顯示了用戶mapper如何填充元組欄位:
需要更新MapReduce驅動程序代碼以指示元組中的哪些欄位應用於排序、分區和分組:
- 分區程序應僅基於用戶名進行分區,以便用戶的所有記錄都到達同一個reducer。
- 排序應使用用戶名和數據集指示符,以便首先排序較小的數據集(由於USERS常量小於USER_LOGS常量,導致用戶記錄在用戶登錄之前排序)。
- 分組應對用戶進行分組,以便將兩個數據集都流式傳輸到同一個reducer調用:
最後,我們要修改reducer以緩存傳入的用戶記錄,然後將其與用戶日誌Join:
可以使用以下命令來運行作業並查看輸出:
Hive
在執行重分區Join操作時,Hive可支持類似優化。Hive可緩存Join鍵的所有數據集,然後流式傳輸大型數據集,使其不需要存儲在內存中。假定在查詢時,Hive最後指定的數據集最大。想像一下,你有兩個名為users和user_logs的表,而user_logs要大得多。要連接這些表,我們需要確保user_logs表被引用為查詢中的最後一個:
如果不想重新查詢,可以使用STREAMTABLE提示告訴Hive哪個表更大:
總結
此操作實現通過僅緩衝較小數據集的value來改進早期技術,但它仍然存在數據在map和reducer之間的傳輸問題,這是一個昂貴的網路成本。此外,舊版可以支持N路連接,但是這種實現僅支持雙向連接。
三、使用Bloom過濾器來減少混洗數據
如果希望根據某些謂詞對數據子集執行Join操作,例如「僅限居住在加利福尼亞地區的用戶」。到目前為止,我們還必須在reducer中執行過濾器才可以實現這一目的 ,因為只有一個數據集存放了有關狀態的詳細信息——用戶日誌沒有該信息。接下來,我將介紹如何在map端使用Bloom過濾器,這會對作業執行時間產生很大影響。我要解決的問題是在重分區Join操作中過濾數據,但要將該過濾器推送到mapper。一個可行的解決方案是使用預處理作業創建Bloom過濾器,然後在重分區作業中載入Bloom過濾器以過濾mapper中的記錄。
Bloom過濾器是一種非常有用的隨機數據結構,它利用位數組簡潔表明集合,並能判斷一個元素是否屬於該集合。然而,與Java中的HashSet相比,Bloom需要的內存要少得多,因此它們非常適合處理大型數據集。此解決方案有兩個步驟,一是運行作業來生成Bloom過濾器,該過濾器將對用戶數據進行操作,並由居住在加利福尼亞地區的用戶填充;二是在重分區Join操作中使用此Bloom過濾器丟棄不需要的用戶,該過程需要Bloom過濾器的原因是用戶日誌的mapper沒有狀態的詳細信息。
圖6.12 在重分區Join中使用Bloom過濾器的兩步過程
第1步:創建Bloom過濾器
第一個作業是創建Bloom過濾器,其中包含加利福尼亞州的用戶名。mapper生成中間Bloom過濾器,reducer將其組合成一個Bloom過濾器,作業輸出是包含序列化Bloom過濾器的Avro文件:
第2步:重分區Join
重分區Join與上文提到的唯一區別是mapper載入第一步中生成的Bloom過濾器,並且在處理map記錄時,執行針對Bloom過濾器的元素審查以確定是否應將記錄發送給reducer。以下代碼顯示了兩件事:一般化Bloom過濾器載入、抽象mapper以及支持兩個Join數據集的子類:
以下命令運行兩個作業並轉儲Join輸出:
總結
該技術提出了一種在兩個數據集上執行map端過濾的有效方法,以最小化mapper和reducer之間的網路I/O。作為shuffle的一部分,它還減少了mapper和reducer的磁碟溢出數據量。過濾器通常是加速和優化作業最簡單有效的方法,重分區Join也同樣適用於其他MapReduce作業。
四、reducer端Join操作可能發生數據傾斜
數據傾斜是實際操作中很容易碰到的問題,可能存在兩種類型的數據傾斜:
- 高Join-key基數,其中有一些連接key在一個或兩個數據集中具有大量記錄,我把這種稱之為join-product偏差。
- 糟糕的散列分區,少數reducer在總記錄數中占很大比例,我將此稱為散列分區傾斜。
五、加入具有高連接密鑰基數的大型數據集
這種技術解決了join-product的傾斜問題,下一個技術檢查了散列分區偏差。現在面臨的問題是某些連接key是高基數的,這會導致某些reducer在嘗試緩存這些key時耗盡內存。我們可以過濾掉這些key並將它們單獨連接或將其溢出到reducer中並安排後續作業Join。
如果提前知道了哪些Key是高基數的,則可以將其分成單獨的Join作業,如果不確定高基數Key是哪些,則可能需要在reducer中構建智能檢測並將其寫入副本文件,該文件由後續作業Join,如圖6.14所示。
圖6.13 提前知道高基數密鑰時處理傾斜
圖6.14 提前知道高基數密鑰處理時的偏差
Hive
Hive支持類似於第二種方法的偏斜緩解策略,運行作業之前可指定以下配置啟用:
可以選擇設置一些其他配置來控制在高基數key上運行的map端連接:
最後,如果在SQL中使用GROUP BY,可能還需要考慮啟用以下配置來處理分組數據中的偏差:
總結
此技術假設給定的Join鍵,只有一個數據集具有高基數出現,因此可緩存較小數據集的map端連接。如果兩個數據集都是高基數的,那麼將面臨一個昂貴的笛卡爾積運算,執行起來會很慢,因為它不適合MapReduce的工作方式(這意味著它本身不可拆分和可並行化)。在這種情況下,我們應該重新檢查是否有任何技術(如過濾或投影)可幫助減少執行join所需的時間。
六、處理由散列分區生成的偏差
MapReduce的默認分區程序是一個散列分區程序,接受每個map輸出key的散列,並對reducer數量建模,以確定key被發送到哪個reducer。散列分區程序可以很好地用作通用分區程序,但是有些數據集可能會導致散列分區程序因一些不成比例的密鑰散列到同一個reducer而使其重載。與大多數reducer相比,這些reducer需要更長時間才能完成。此外,當檢查straggler reducer計數器時,會注意到發送給落後者的組數遠遠高於已完成的其他組。
區分高基數key與散列分區引起的偏差可以使用MapReduce reducer來識別數據傾斜類型。由性能較差的哈希分區器引入的偏差將具有更多的組(唯一密鑰)發送到這些reducer,而導致傾斜的高基數密鑰可以通過所有reducer中大致相等數量的組來證明,傾斜越多,reducer的記錄數量越多。
我們要解決的問題是reducer端連接需要很長時間才能完成,而落後的組需要比大多數reducer更長時間。使用範圍分區程序或編寫自定義分區程序,將偏移的key集中到一組reducer。此解決方案的目標是省去默認的散列分區程序,並將其替換為可以更好處理數據傾斜的內容,本文提供兩個選項可供探索:
- 使用與Hadoop捆綁在一起的sampler和TotalOrderPartitioner,將散列分區程序替換為範圍分區程序。
- 編寫自定義分區程序,將具有數據傾斜的key路由到為傾斜key保留的Reducer。
範圍分區法
範圍分區根據預定義值分配map輸出,其中每個map接收該範圍內的所有reducer,這正是TotalOrderPartitioner的工作原理。實際上,TeraSort使用TotalOrderPartitioner在所有Reducer之間均勻分布,以最大限度減少數據傾斜。TotalOrderPartitioner附帶採樣器,可對輸入數據進行採樣並將其寫入HDFS,然後在分區時由TotalOrderPartitioner使用。
自定義分區法
如果已經知道哪些Key顯示數據傾斜,並且該組Key是靜態的,則可以編寫自定義分區程序以將這些高基數key推送到一組reducer。
總結
在上述兩種方法中,範圍分區可能是最佳解決方案,因為大多數情況下,我們可能不知道哪些Key是傾斜的,並且表現出傾斜的key也可能隨時間而變化。MapReduce中可能有reducer端連接,因為它們將map輸出Key排序並關聯在一起。在之後的文章中,我們將介紹MapReduce相關的排序技術。
※無線列印便捷不止是說說 聯想小新印表機初體驗
※WiNet智慧網路解決方案解讀 讓網路運維更簡單
TAG:IT168企業級 |