大數據有道之spark選擇去重
一.spark簡介
spark是基於內存運算的大數據分散式並行計算框架,本身具有豐富的API,可實現與HDFS、HBase、Hive、Kafka、Elasticsearch、Druid等組件的交互,同時也是優秀的MapReduce替代方案。
spark卓越的計算性能得意於其核心的分散式數據架構:RDD和DataFrame。
1、RDD
RDD(Resilient Distributes Dataset), 是spark中最基礎、最常用的數據結構。其本身封裝了作業中input data數據,並以分區方式分布在內存或者磁碟上的Block中。但實質上RDD對象是一個元數據結構,存儲著Block、Node映射關係等元數據信息。
RDD常規去重運算元:
2、DataFrame
DataFrame是一種以RDD為基礎的分散式數據集,具有schema元數據信息,即標註了DataFrame中每一列名稱和類型,能夠大幅提升Transform、Action的計算效率。
DataFrame常規去重運算元:
3、RDD與DataFrame對比
二.選擇去重
接下來,大數據有道將和大家一起學習一下spark RDD和DataFrame選擇去重的技巧。
1、原始數據
江南皮革廠訂單數據(input),需要指出「original_price」和real_pay對應double類型、「create_time」和「modify_time」為long類型。
源數據預處理:
為了方便對每條訂單進行提取和計算,作業中封裝了訂單對象RiveSouthOrder:
2、RDD選擇去重
a.選擇去重代碼(scala):
b.執行日誌:
c.計算結果:
d.邏輯解析:
第一部分,載入源數據並封裝到RiveSouthOrder樣例類中,生成RDD;
第二部分,首先通過groupBy對order_id數據做分組後生成RDD[(String, Iterable[RiveSouthOrder])]對象([K,V]結構),隨即使用map對每個Key(order_id)下多組記錄(Iterable[RiveSouthOrder])進行reduce操作(maxBy),最後在maxBy運算元傳入一個字面量函數(也可寫為x=>x.modify_time),即提取該order_id下每條記錄中的modify_time進行比對,然後選出最新時間記錄(maxBy為高階函數,依賴reduceLeft實現);
第三部分,toDebugString方法列印RDD轉換過程,最後值得注意collect才是真正觸發一系列運算的源頭。
3、DataFrame選擇去重
a.選擇去重代碼(scala):
b.執行日誌:
第一部分,引入依賴和隱式轉換,分別對應DataFrame類型識別、使用sql格式的$"modify_time"和row_number+Window函數的使用;
第二部分,載入源數據,由於源數據由RiveSouthOrder封裝,可直接toDF;
第三部分,首先使用withColumn方法添加Num欄位,Num是由row_number+Window+orderBy實現(原理同Hive sql),原則是根據modify_time對每個order_id分區下的訂單進行降序排序,接著使用where做過濾(也可使用filter),最後drop掉不再使用的Num欄位;
第四部分,通過explain列印dataFrame的物理執行過程,show方法作為action運算元觸發了以上的系列運算。
三.歸納總結
spark RDD和DataFrame均提供了豐富的API介面,極大的提升了開發效率和計算性能;
RDD的計算更傾向於map和reduce方式,而DataFrame含有schema元信息更容易與sql計算方式相結合;
RDD選擇去重使用了groupBy+maxBy方法,一氣呵成;DataFrame則使用row_number+window+orderBy方法,邏輯清晰;兩者處理方式所展現的spark函數式編程的精妙之處都值得探索和學習。
※數據存儲或供應增長是否增加了存儲容量的需求?
※AMD,要把嵌入式處理器市場進行到底!
TAG:中國存儲 |