spark入門框架+python
簡介:
不可否認,spark是一種大數據框架,它的出現往往會有Hadoop的身影,其實Hadoop更多的可以看做是大數據的基礎設施,它本身提供了HDFS文件系統用於大數據的存儲,當然還提供了MR用於大數據處理,但是MR有很多自身的缺點,針對這些缺點也已經有很多其他的方法,類如針對MR編寫的複雜性有了Hive,針對MR的實時性差有了流處理Strom等等,spark設計也是針對MR功能的,它並沒有大數據的存儲功能,只是改進了大數據的處理部分,它的最大優勢就是快,因為它是基於內存的,不像MR每一個job都要和磁碟打交道,所以大大節省了時間,它的核心是RDD,裡面體現了一個彈性概念意思就是說,在內存存儲不下數據的時候,spark會自動的將部分數據轉存到磁碟,而這個過程是對用戶透明的。
spark編寫框架:
1 sparkconf:
sparkconf對象是是spark應用的配置信息:
SparkConf conf=new SparkConf()
.setAppName("HelloWorld")
.setMaster("local");
2 sparkcontext:
是調用spark一切功能的一個介面,使用不同的開發語言對應不同的介面,類如java就是javasparkcontext,SQL就是SQLspark,Python,Scala等等都是如此
JavaSparkContext sc=new JavaSparkContext(conf)
3 RDD(核心):
創建初始RDD有三種方法:
使用並行化集合方式創建
List<Integer> number = Arrays.asList(1,2,3,4,5);
JavaRDD<Integer> numberRDD=sc.parallelize(number,5) //5個partitions
使用本地文件創建:
JavaRDD<String> lines = sc.textFile("c://data.txt");
注意:在linux上面要使用本地文件時,需要將data.txt拷貝到所有worker。
使用一些其他文件儲存系統類如Hdsf:
JavaRDD<String> lines = sc.textFile("hdfs://spark1:9000/data.txt");
注意:使用Hdfs時,在配置Spark時,將setMaster設置的local模式去掉即:
SparkConf conf=new SparkConf()
.setAppName("HelloWorld");
4 transformation(核心):
spark中的一些運算元都可以看做是transformation,類如map,flatmap,reduceByKey等等,通過transformation使一種GDD轉化為一種新的RDD。
一些運算元介紹:
map:就是對每一條輸入進行指定操作,為每一條返回一個對象
//假設此時numberrdd是一個list(1,2,3,4)
JavaRDD<Integer> multiplenumberrdd = numberrdd.map(
new multipleFunction<Integer,Integer>(){
private static final long serialVersionUID=1L;
@Override
public Integer call(Integer t) throws Exception(
return t*3;
)
}
)
flatmap: map+flatten即map+扁平化.第一步map,然後將map結果的所有對象合併為一個對象返回
注意map和flatmap的區別:https://blog.csdn.net/sicofield/article/details/50914050
類如切分單詞,用map的話會返回多條記錄,每條記錄就是一行的單詞,
而用flatmap則會整體返回一個對象即全文的單詞這也是我們想要的。
//假設此時lines就是由多行組成的文本
//注意flatMap返回的不是規定的類型,類如這裡的String,而是Iterable<String>
//所以這裡要將結果轉化為list
JavaRDD<String> wordsrdd = linesrdd.flatMap(
new wordsFunction<String,String>(){
private static final long serialVersionUID=1L;
@Override
public Integer call(Integer t) throws Exception(
return Arrays.asList(t.split(" "));
)
}
)
filter:篩選符合一定條件的數據,返回值為Bool類型
//假設此時numberrdd是一個list(1,2,3,4)
JavaRDD<Integer> evenumberrdd = numberrdd.filter(
new evenFunction<Integer,Boolean>(){
private static final long serialVersionUID=1L;
@Override
public Boolean call(Integer t) throws Exception(
return t%2==0;
)
}
)
groupbykey:類似mysql中的groupby返回類型還是一個JavaPairRDD,第一個類型是key,第二個是Iterable裡面放了所有相同key的values值
//假設此時scoresrdd是一個list(("id1",35),("id2",45),("id3",23),("id2",31),("id1",45))
JavaPairRDD<String,Integer<Integer>> groupscorerdd = scoresrdd.groupByKey();
列印部分:
groupscorerdd.foreach(new VoidFunction<Tuple2<String,Interator<Integer>>>){
private static final long serialVersionUID=1L;
@Override
public void call(Tuple2<String,Interator<Integer>> t) throws Exception{
System.out.println(t._1);
Interator<Integer> ite=t._2.iterator();
while(ite.hasNext()){
System.out.println(ite.next());
}
System.out.println("---------------------");
}
}
mapToPair:生成key-value形式的Tuple元祖:
JavaPairRDD<String,Integer> pairs = lines.mapToPair(
new PairFunction<String,String,Integer>(){
private static final long serialVersionUID=1L;
@Override
public Tuple2<String,Integer> call(String t) throws Exception(
return new Tuple2<String,Integer>(t,1)
)
}
)
reduceByKey:傳入是function,有三個參數,第一個和第二個分別是key,value,第三個是每次reduce操作後返回的類型,默認與原始RDD的value類型相同,其返回還是JavaPairRDD<key,value>
JavaPairRDD<String,Integer> LineCounts = paits.reduceByKey(
new ByKeyFunction<Integer,Integer,Integer>(){
private static final long serialVersionUID=1L;
@Override
public Integer call(Integer n1,Integer n2) throws Exception(
return n1+n2;
)
}
)
sortByKey:排序
//假設此時scoresrdd是一個list((35,"Tom"),(45,"jack"),(23,"leans"),("id2",31))
JavaPairRDD<Integer,String> sortscorerdd = scoresrdd.sortByKey(false);
join:就是mysal裡面的join,連接兩個原始RDD,返回類型還是JavaPairRDD,第一個參數還是相同的key,第二個參數是一個Tuple2<v1,v2> v1和v2分別是兩個原始RDD的value值,舉例:
一個nameRDD是((1,"Tom"),(2,"jack"),(3,"jen"))
一個scoreRDD是((1,67),(2,87),(3,56))
那麼通過nameRDD.join(scoreRDD)那麼返回的就是:
((1,"Tom",67),(2,"jack",87),(3,"jen",56))
cogroup:和join類似,只不過返回的RDD兩個都是Iterable,
一個nameRDD是((1,"Tom"),(2,"jack"),(3,"jen"))
一個scoreRDD是((1,67),(2,87),(3,56),(1,54),(2,45),(3,43))
那麼通過nameRDD.cogroup(scoreRDD)那麼返回的就是:
((1,Iterable<"Tom","Tom">,Iterable<67,54>),.........)
transformation 的一個重要特性就是Lazy,就是說雖然定義了各種transformation,但是都不會執行,只有在執行了一個action動作後才會觸發所有的transformation,這是spark的一種優化,避免產生過多的中間結果,所以下面看一下什麼是action
5 action(核心):
例如foreach,reduce就是一種action操作,後者是將RDD中多有元素進行聚合,獲取最終結果,返回給Drive程序,action 的特性就是觸發一個spark job,進一步觸發上面的transformation。即在執行action後,Driver才會提交task到之前註冊的worker上的executor一步步執行整個spark任務(定義的那些transformation啥的)
action 也有很多:
reduce:即將RDD所有元素聚合,第一個和第二個元素聚合產生的值再和第三個元素聚合,以此類推
collect:將RDD中所有元素獲取到本地客戶端
count:獲取RDD元素總數
take(n):獲取RDD中前n個元素
saveAsTextFile:將RDD元素保存到文件中,對每個元素調用toString方法
countBykey:對每個key對應的值進行count計算
foreach:遍歷RDD中的每個元素
最後來看一下SparkSQL:
SparkSQL的核心是一個特殊類型的Spark RDD:SchemaRDD。
它的本質也是一張表:
Schema: 數據行模式:列名、列數據類型等
Rows:數據行對象
SparkSQL讀取的數據既來源可以是已有RDD,也可以來自Hive、HDFS等第三方數據源(數據格式可以是行式存儲結構JSON,列式存儲結構parquet等)
行式存儲結構就是一行一行存儲數據
列式存儲結構就是以列來存儲數據,它的好處就是降低I/O操作(要讀取某一個欄位,不必像行式存儲結構那樣把每一行數據讀入,再在每一行中篩選該欄位,而是直接讀取該列即可),支持向量運算等。
SparkSQL可以通過編寫Spark應用程序使用SQL語句進行數據查詢,還可以使用標準的資料庫連接器(比如JDBC或ODBC)連接Spark進行SQL查詢
總之簡單歸結起來:
下面主要介紹Spark 中DataFrame
RDD轉化為DataFrame:
(1)反射:
#Student.class本身就是反射的一個應用,底層通過調用Student class進行反射,獲取其中field
#students就是一個RDD,其是new student類。然後set id=...
#反射!!!!!!!!!!!!!!!!
#scala就是toDF(),並且不能用def main()方法,要用object .... extends App的方法
#否則就會報錯no typetag for ..... class
DataFrame studentDF = sqlContext.createDataFrame(students,Student.class);
#就可以註冊為一個臨時表,然後針對其中的數據執行SQL
studentDF.registerTempTable("students");
#針對錶查詢
DataFrame teenagerDF=sqlContext.sql("select * from students where age<=18");
#查詢結果為DataFrame,將其再映射回RDD
JavaRDD<Row> teenagerRDD = teenagerDF.javaRDD();
#對結果列印.這裡的順序在java中亂的,在scala中不是亂的
JavaRDD<Student> teenagerStudentRDD = teenagerRDD.map(new Fun)
(2)編程介面動態進行:
# studentRDD:JavaRDD<Row>,RowFactory.create([0]].toInt,[1],[2])
#scala中就是Row([0].toInt,)
#元數據構造StructField:stu (列名,類型,true)
#scala中就是StructType(
Array(
StructField("id",IntegerType,true),StructField("name",StringType,true),StructField("age",IntegerType,true)
)
)
DataFrame studentDF = sqlContext.createDataFrame(studentRDD,stu)
讀取JSON格式數據為DataFrame:
從第三方資料庫讀取為DataFrame:
val conf =new SparkConf().setAppName("DataFrame")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
DataFrame df = sqlContext.read().json("hdfs://localhost:9000/sparktest.txt");
DataFrame df = sqlContext.read().json("RDD");
df.show();
df.printSchema();
df.select("name").show();
df.select(df.col("name"),df.col("age").plus(1)).show();
df.filter(df.col("age").gt(18)).show();
df.roupBy(df.col("age")).count().show();
#DataFrame userDF = aqlContext.read().format("json").load("c://data.json");
DataFrame userDF = aqlContext.read().load("c://data.parqet");
DataFrame df = sqlContext.read().json("hdfs://localhost:9000/sparktest.txt");
DataFrame df = sqlContext.read().parquet("hdfs://localhost:9000/sparktest.txt");
#SaveMode.ErrorIfExists是默認,SaveMode.Append,SaveMode.Overwrite,SaveMode.Ignore
#userDF.select("name","age").write().format("parquet").save("test.parquet",SaveMode.ErrorIfExists)
userDF.select("name","age").write().save(("test","json",SaveMode.ErrorIfExists)
parquet:
parquet數據其還可以進行自動分區:
假設有以下目錄
/
/gender=male
/gender=male/country=us
/gender=male/country=us/text.parquet
那麼其會在text.parquet數據中增加兩列gender,country其values分別是male,us
Spark1.3以後,SparkSQL中的SchemaRDD改成了DataFrame,DataFrame比SchemaRDD提供了更多好用的API
parquet數據合併元數據:
假設原先text.parquet中是
name gender
tom male
jack female
在其後追加了
name score
tom 34
jack 56
那麼
DataFrame stu=sqlContext.read.option("mergeSchema","true").parquet("hdfs://text.parquet")
stu.printSchema()
stu.show()
待更新。
---------------------
作者:weixin_42001089
原文:https://blog.csdn.net/weixin_42001089/article/details/82377349
版權聲明:本文為博主原創文章,轉載請附上博文鏈接!
※LeetCode中那些應該背下來的經典代碼
※Linux/CentOS安裝MySQL(RPM安裝、編譯安裝)
TAG:程序員小新人學習 |