當前位置:
首頁 > 知識 > Spark如何在一個SparkContext中提交多個任務

Spark如何在一個SparkContext中提交多個任務

在使用spark處理數據的時候,大多數都是提交一個job執行,然後job內部會根據具體的任務,生成task任務,運行在多個進程中,比如讀取的HDFS文件的數據,spark會載入所有的數據,然後根據block個數生成task數目,多個task運行中不同的進程中,是並行的,如果在同一個進程中一個JVM裡面有多個task,那麼多個task也可以並行,這是常見的使用方式。

考慮下面一種場景,在HDFS上某個目錄下面有10個文件,我想要同時並行的去統計每個文件的數量,應該怎麼做? 其實spark是支持在一個spark context中可以通過多線程同時提交多個任務運行,然後spark context接到這所有的任務之後,通過中央調度,在來分配執行各個task,最終任務完成程序退出。

下面就來看下如何使用多線程提交任務,可以直接使用new Thread來創建線程提交,但是不建議這麼做,推薦的做法是通過Executors線程池來非同步管理線程,尤其是在提交的任務比較多的時候用這個會更加方便。

核心代碼如下:

def main(args: Array[String]): Unit = {

val sparkConf=new SparkConf
//實例化spark context
val sc=new SparkContext(sparkConf)
sparkConf.setAppName("multi task submit ")
//保存任務返回值
val list=new util.ArrayList[Future[String]]
//並行任務讀取的path
val task_paths=new util.ArrayList[String]
task_paths.add("/tmp/data/path1/")
task_paths.add("/tmp/data/path2/")
task_paths.add("/tmp/data/path3/")

//線程數等於path的數量
val nums_threads=task_paths.size
//構建線程池
val executors=Executors.newFixedThreadPool(nums_threads)
for(i<-0 until nums_threads){ val task= executors.submit(new Callable[String] { override def call: String ={ val count=sc.textFile(task_paths.get(i)).count//獲取統計文件數量 return task_paths.get(i)+" 文件數量: "+count } }) list.add(task)//添加集合裡面 } //遍歷獲取結果 list.asScala.foreach(result=>{
log.info(result.get)
})
//停止spark
sc.stop

}

可以看到使用scala寫的代碼比較精簡,這樣就完成了一個並行task提交的spark任務,最後我們打包完畢後,上傳到linux上進行提交,命令如下:

/opt/bigdata/spark/bin/spark-submit
--class MultiTaskSubmit
--master yarn
--deploy-mode cluster
--executor-cores 3
--driver-memory 1g
--executor-memory 1g
--num-executors 10
--jars $jars task.jar

最後需要注意一點,在線程裡面調用的方法如果包含一些全局載入的屬性,最好放在線程的成員變數裡面進行初始化,否則多個線程去更改全局屬性,有可能會造成一些未知的問題。

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 達人科技 的精彩文章:

獲取自定義的當前時間格式
Docker for windows on VMware
開源 .net license tool, EasyLicense !

TAG:達人科技 |

您可能感興趣

Windows 10 和 Edge 全面支持PWA,PWA可提交至Microsoft Store
蘋果:4月起App Store中提交的新應用必須適配iPhone X
又有新VR設備?三星提交Perfect Reality和Odyssey
Lyft或將本周內提交IPO申請,Slack、Pinterest等明星公司都在籌備上市
iOS 12.2封堵的幾個漏洞由谷歌Project Zero團隊提交
谷歌Project Zero團隊提交iOS 12.4新漏洞
Kanye West要進軍電影界了!現已提交「Half Beast,LLC」商標申請!
Ups提交BlockChain專利以提高交付效率
RNDA已發但Vega還要繼續:AMD提交新Vega GPU的Linux驅動
蘋果提交新專利申請,iPhone或徹底淘汰Lightning介面
Pinterest 提交 IPO 申請,計劃 4月IPO
Pinterest 提交 IPO 申請,計劃 4 月 IPO
form表單提交時,action url中參數無效的解決方法
WiseWear將提交破產申請歸咎於蘋果停用Apple Watch診斷埠的決定
研發實戰:提交前如何用VRC Validator審查Rift程序
Oculus推VRC Validator審查工具,提交應用前可自檢
Salesforce斥巨資收購數據分析平台Tableau;阿里或在未來幾周提交香港上市申請
Emoji-Log:編寫 Git 提交信息的新方法
Bitwise向SEC提交比特幣ETF申請
Oculus推出VRC Validator,可簡化開發商內容提交流程!