當前位置:
首頁 > 知識 > spark入門框架+python

spark入門框架+python

簡介:

不可否認,spark是一種大數據框架,它的出現往往會有Hadoop的身影,其實Hadoop更多的可以看做是大數據的基礎設施,它本身提供了HDFS文件系統用於大數據的存儲,當然還提供了MR用於大數據處理,但是MR有很多自身的缺點,針對這些缺點也已經有很多其他的方法,類如針對MR編寫的複雜性有了Hive,針對MR的實時性差有了流處理Strom等等,spark設計也是針對MR功能的,它並沒有大數據的存儲功能,只是改進了大數據的處理部分,它的最大優勢就是快,因為它是基於內存的,不像MR每一個job都要和磁碟打交道,所以大大節省了時間,它的核心是RDD,裡面體現了一個彈性概念意思就是說,在內存存儲不下數據的時候,spark會自動的將部分數據轉存到磁碟,而這個過程是對用戶透明的。

spark入門框架+python

spark入門框架+python

spark編寫框架:

1 sparkconf:

sparkconf對象是是spark應用的配置信息:

SparkConf conf=new SparkConf()

.setAppName("HelloWorld")

.setMaster("local");

2 sparkcontext:

是調用spark一切功能的一個介面,使用不同的開發語言對應不同的介面,類如java就是javasparkcontext,SQL就是SQLspark,Python,Scala等等都是如此

JavaSparkContext sc=new JavaSparkContext(conf)

3 RDD(核心):

創建初始RDD有三種方法:

使用並行化集合方式創建

List<Integer> number = Arrays.asList(1,2,3,4,5);

JavaRDD<Integer> numberRDD=sc.parallelize(number,5) //5個partitions

使用本地文件創建:

JavaRDD<String> lines = sc.textFile("c://data.txt");

注意:在linux上面要使用本地文件時,需要將data.txt拷貝到所有worker。

使用一些其他文件儲存系統類如Hdsf:

JavaRDD<String> lines = sc.textFile("hdfs://spark1:9000/data.txt");

注意:使用Hdfs時,在配置Spark時,將setMaster設置的local模式去掉即:

SparkConf conf=new SparkConf()

.setAppName("HelloWorld");

4 transformation(核心):

spark中的一些運算元都可以看做是transformation,類如map,flatmap,reduceByKey等等,通過transformation使一種GDD轉化為一種新的RDD。

一些運算元介紹:

map:就是對每一條輸入進行指定操作,為每一條返回一個對象

//假設此時numberrdd是一個list(1,2,3,4)

JavaRDD<Integer> multiplenumberrdd = numberrdd.map(

new multipleFunction<Integer,Integer>(){

private static final long serialVersionUID=1L;

@Override

public Integer call(Integer t) throws Exception(

return t*3;

)

}

)

flatmap: map+flatten即map+扁平化.第一步map,然後將map結果的所有對象合併為一個對象返回

注意map和flatmap的區別:https://blog.csdn.net/sicofield/article/details/50914050

類如切分單詞,用map的話會返回多條記錄,每條記錄就是一行的單詞,

而用flatmap則會整體返回一個對象即全文的單詞這也是我們想要的。

//假設此時lines就是由多行組成的文本

//注意flatMap返回的不是規定的類型,類如這裡的String,而是Iterable<String>

//所以這裡要將結果轉化為list

JavaRDD<String> wordsrdd = linesrdd.flatMap(

new wordsFunction<String,String>(){

private static final long serialVersionUID=1L;

@Override

public Integer call(Integer t) throws Exception(

return Arrays.asList(t.split(" "));

)

}

)

filter:篩選符合一定條件的數據,返回值為Bool類型

//假設此時numberrdd是一個list(1,2,3,4)

JavaRDD<Integer> evenumberrdd = numberrdd.filter(

new evenFunction<Integer,Boolean>(){

private static final long serialVersionUID=1L;

@Override

public Boolean call(Integer t) throws Exception(

return t%2==0;

)

}

)

groupbykey:類似mysql中的groupby返回類型還是一個JavaPairRDD,第一個類型是key,第二個是Iterable裡面放了所有相同key的values值

//假設此時scoresrdd是一個list(("id1",35),("id2",45),("id3",23),("id2",31),("id1",45))

JavaPairRDD<String,Integer<Integer>> groupscorerdd = scoresrdd.groupByKey();

列印部分:

groupscorerdd.foreach(new VoidFunction<Tuple2<String,Interator<Integer>>>){

private static final long serialVersionUID=1L;

@Override

public void call(Tuple2<String,Interator<Integer>> t) throws Exception{

System.out.println(t._1);

Interator<Integer> ite=t._2.iterator();

while(ite.hasNext()){

System.out.println(ite.next());

}

System.out.println("---------------------");

}

}

mapToPair:生成key-value形式的Tuple元祖:

JavaPairRDD<String,Integer> pairs = lines.mapToPair(

new PairFunction<String,String,Integer>(){

private static final long serialVersionUID=1L;

@Override

public Tuple2<String,Integer> call(String t) throws Exception(

return new Tuple2<String,Integer>(t,1)

)

}

)

reduceByKey:傳入是function,有三個參數,第一個和第二個分別是key,value,第三個是每次reduce操作後返回的類型,默認與原始RDD的value類型相同,其返回還是JavaPairRDD<key,value>

JavaPairRDD<String,Integer> LineCounts = paits.reduceByKey(

new ByKeyFunction<Integer,Integer,Integer>(){

private static final long serialVersionUID=1L;

@Override

public Integer call(Integer n1,Integer n2) throws Exception(

return n1+n2;

)

}

)

sortByKey:排序

//假設此時scoresrdd是一個list((35,"Tom"),(45,"jack"),(23,"leans"),("id2",31))

JavaPairRDD<Integer,String> sortscorerdd = scoresrdd.sortByKey(false);

join:就是mysal裡面的join,連接兩個原始RDD,返回類型還是JavaPairRDD,第一個參數還是相同的key,第二個參數是一個Tuple2<v1,v2> v1和v2分別是兩個原始RDD的value值,舉例:

一個nameRDD是((1,"Tom"),(2,"jack"),(3,"jen"))

一個scoreRDD是((1,67),(2,87),(3,56))

那麼通過nameRDD.join(scoreRDD)那麼返回的就是:

((1,"Tom",67),(2,"jack",87),(3,"jen",56))

cogroup:和join類似,只不過返回的RDD兩個都是Iterable,

一個nameRDD是((1,"Tom"),(2,"jack"),(3,"jen"))

一個scoreRDD是((1,67),(2,87),(3,56),(1,54),(2,45),(3,43))

那麼通過nameRDD.cogroup(scoreRDD)那麼返回的就是:

((1,Iterable<"Tom","Tom">,Iterable<67,54>),.........)

transformation 的一個重要特性就是Lazy,就是說雖然定義了各種transformation,但是都不會執行,只有在執行了一個action動作後才會觸發所有的transformation,這是spark的一種優化,避免產生過多的中間結果,所以下面看一下什麼是action

5 action(核心):

例如foreach,reduce就是一種action操作,後者是將RDD中多有元素進行聚合,獲取最終結果,返回給Drive程序,action 的特性就是觸發一個spark job,進一步觸發上面的transformation。即在執行action後,Driver才會提交task到之前註冊的worker上的executor一步步執行整個spark任務(定義的那些transformation啥的)

action 也有很多:

reduce:即將RDD所有元素聚合,第一個和第二個元素聚合產生的值再和第三個元素聚合,以此類推

collect:將RDD中所有元素獲取到本地客戶端

count:獲取RDD元素總數

take(n):獲取RDD中前n個元素

saveAsTextFile:將RDD元素保存到文件中,對每個元素調用toString方法

countBykey:對每個key對應的值進行count計算

foreach:遍歷RDD中的每個元素

最後來看一下SparkSQL:

SparkSQL的核心是一個特殊類型的Spark RDD:SchemaRDD。

它的本質也是一張表:

Schema: 數據行模式:列名、列數據類型等

Rows:數據行對象

SparkSQL讀取的數據既來源可以是已有RDD,也可以來自Hive、HDFS等第三方數據源(數據格式可以是行式存儲結構JSON,列式存儲結構parquet等)

行式存儲結構就是一行一行存儲數據

列式存儲結構就是以列來存儲數據,它的好處就是降低I/O操作(要讀取某一個欄位,不必像行式存儲結構那樣把每一行數據讀入,再在每一行中篩選該欄位,而是直接讀取該列即可),支持向量運算等。

SparkSQL可以通過編寫Spark應用程序使用SQL語句進行數據查詢,還可以使用標準的資料庫連接器(比如JDBC或ODBC)連接Spark進行SQL查詢

總之簡單歸結起來:

spark入門框架+python

下面主要介紹Spark 中DataFrame

RDD轉化為DataFrame:

(1)反射:

#Student.class本身就是反射的一個應用,底層通過調用Student class進行反射,獲取其中field

#students就是一個RDD,其是new student類。然後set id=...

#反射!!!!!!!!!!!!!!!!

#scala就是toDF(),並且不能用def main()方法,要用object .... extends App的方法

#否則就會報錯no typetag for ..... class

DataFrame studentDF = sqlContext.createDataFrame(students,Student.class);

#就可以註冊為一個臨時表,然後針對其中的數據執行SQL

studentDF.registerTempTable("students");

#針對錶查詢

DataFrame teenagerDF=sqlContext.sql("select * from students where age<=18");

#查詢結果為DataFrame,將其再映射回RDD

JavaRDD<Row> teenagerRDD = teenagerDF.javaRDD();

#對結果列印.這裡的順序在java中亂的,在scala中不是亂的

JavaRDD<Student> teenagerStudentRDD = teenagerRDD.map(new Fun)

(2)編程介面動態進行:

# studentRDD:JavaRDD<Row>,RowFactory.create([0]].toInt,[1],[2])

#scala中就是Row([0].toInt,)

#元數據構造StructField:stu (列名,類型,true)

#scala中就是StructType(

Array(

StructField("id",IntegerType,true),StructField("name",StringType,true),StructField("age",IntegerType,true)

)

)

DataFrame studentDF = sqlContext.createDataFrame(studentRDD,stu)

讀取JSON格式數據為DataFrame:

從第三方資料庫讀取為DataFrame:

val conf =new SparkConf().setAppName("DataFrame")

val sc = new SparkContext(conf)

val sqlContext = new SQLContext(sc)

DataFrame df = sqlContext.read().json("hdfs://localhost:9000/sparktest.txt");

DataFrame df = sqlContext.read().json("RDD");

df.show();

df.printSchema();

df.select("name").show();

df.select(df.col("name"),df.col("age").plus(1)).show();

df.filter(df.col("age").gt(18)).show();

df.roupBy(df.col("age")).count().show();

#DataFrame userDF = aqlContext.read().format("json").load("c://data.json");

DataFrame userDF = aqlContext.read().load("c://data.parqet");

DataFrame df = sqlContext.read().json("hdfs://localhost:9000/sparktest.txt");

DataFrame df = sqlContext.read().parquet("hdfs://localhost:9000/sparktest.txt");

#SaveMode.ErrorIfExists是默認,SaveMode.Append,SaveMode.Overwrite,SaveMode.Ignore

#userDF.select("name","age").write().format("parquet").save("test.parquet",SaveMode.ErrorIfExists)

userDF.select("name","age").write().save(("test","json",SaveMode.ErrorIfExists)

parquet:

parquet數據其還可以進行自動分區:

假設有以下目錄

/

/gender=male

/gender=male/country=us

/gender=male/country=us/text.parquet

那麼其會在text.parquet數據中增加兩列gender,country其values分別是male,us

Spark1.3以後,SparkSQL中的SchemaRDD改成了DataFrame,DataFrame比SchemaRDD提供了更多好用的API

parquet數據合併元數據:

假設原先text.parquet中是

name gender

tom male

jack female

在其後追加了

name score

tom 34

jack 56

那麼

DataFrame stu=sqlContext.read.option("mergeSchema","true").parquet("hdfs://text.parquet")

stu.printSchema()

stu.show()

待更新。

---------------------

作者:weixin_42001089

原文:https://blog.csdn.net/weixin_42001089/article/details/82377349

版權聲明:本文為博主原創文章,轉載請附上博文鏈接!

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 程序員小新人學習 的精彩文章:

LeetCode中那些應該背下來的經典代碼
Linux/CentOS安裝MySQL(RPM安裝、編譯安裝)

TAG:程序員小新人學習 |