Spark如何在一個SparkContext中提交多個任務
在使用spark處理數據的時候,大多數都是提交一個job執行,然後job內部會根據具體的任務,生成task任務,運行在多個進程中,比如讀取的HDFS文件的數據,spark會載入所有的數據,然後根據block個數生成task數目,多個task運行中不同的進程中,是並行的,如果在同一個進程中一個JVM裡面有多個task,那麼多個task也可以並行,這是常見的使用方式。
考慮下面一種場景,在HDFS上某個目錄下面有10個文件,我想要同時並行的去統計每個文件的數量,應該怎麼做? 其實spark是支持在一個spark context中可以通過多線程同時提交多個任務運行,然後spark context接到這所有的任務之後,通過中央調度,在來分配執行各個task,最終任務完成程序退出。
下面就來看下如何使用多線程提交任務,可以直接使用new Thread來創建線程提交,但是不建議這麼做,推薦的做法是通過Executors線程池來非同步管理線程,尤其是在提交的任務比較多的時候用這個會更加方便。
核心代碼如下:
def main(args: Array[String]): Unit = {
val sparkConf=new SparkConf
//實例化spark context
val sc=new SparkContext(sparkConf)
sparkConf.setAppName("multi task submit ")
//保存任務返回值
val list=new util.ArrayList[Future[String]]
//並行任務讀取的path
val task_paths=new util.ArrayList[String]
task_paths.add("/tmp/data/path1/")
task_paths.add("/tmp/data/path2/")
task_paths.add("/tmp/data/path3/")
//線程數等於path的數量
val nums_threads=task_paths.size
//構建線程池
val executors=Executors.newFixedThreadPool(nums_threads)
for(i<-0 until nums_threads){
val task= executors.submit(new Callable[String] {
override def call: String ={
val count=sc.textFile(task_paths.get(i)).count//獲取統計文件數量
return task_paths.get(i)+" 文件數量: "+count
}
})
list.add(task)//添加集合裡面
}
//遍歷獲取結果
list.asScala.foreach(result=>{
log.info(result.get)
})
//停止spark
sc.stop
}
可以看到使用scala寫的代碼比較精簡,這樣就完成了一個並行task提交的spark任務,最後我們打包完畢後,上傳到linux上進行提交,命令如下:
/opt/bigdata/spark/bin/spark-submit
--class MultiTaskSubmit
--master yarn
--deploy-mode cluster
--executor-cores 3
--driver-memory 1g
--executor-memory 1g
--num-executors 10
--jars $jars task.jar
最後需要注意一點,在線程裡面調用的方法如果包含一些全局載入的屬性,最好放在線程的成員變數裡面進行初始化,否則多個線程去更改全局屬性,有可能會造成一些未知的問題。
※獲取自定義的當前時間格式
※Docker for windows on VMware
※開源 .net license tool, EasyLicense !
TAG:達人科技 |
※Windows 10 和 Edge 全面支持PWA,PWA可提交至Microsoft Store
※蘋果:4月起App Store中提交的新應用必須適配iPhone X
※又有新VR設備?三星提交Perfect Reality和Odyssey
※Lyft或將本周內提交IPO申請,Slack、Pinterest等明星公司都在籌備上市
※iOS 12.2封堵的幾個漏洞由谷歌Project Zero團隊提交
※谷歌Project Zero團隊提交iOS 12.4新漏洞
※Kanye West要進軍電影界了!現已提交「Half Beast,LLC」商標申請!
※Ups提交BlockChain專利以提高交付效率
※RNDA已發但Vega還要繼續:AMD提交新Vega GPU的Linux驅動
※蘋果提交新專利申請,iPhone或徹底淘汰Lightning介面
※Pinterest 提交 IPO 申請,計劃 4月IPO
※Pinterest 提交 IPO 申請,計劃 4 月 IPO
※form表單提交時,action url中參數無效的解決方法
※WiseWear將提交破產申請歸咎於蘋果停用Apple Watch診斷埠的決定
※研發實戰:提交前如何用VRC Validator審查Rift程序
※Oculus推VRC Validator審查工具,提交應用前可自檢
※Salesforce斥巨資收購數據分析平台Tableau;阿里或在未來幾周提交香港上市申請
※Emoji-Log:編寫 Git 提交信息的新方法
※Bitwise向SEC提交比特幣ETF申請
※Oculus推出VRC Validator,可簡化開發商內容提交流程!