業餘草教你解讀Spark源碼閱讀之HistoryServer
HistoryServer服務可以讓用戶通過SparkUI界面,查看歷史應用(已經執行完的應用)的執行細節,比如job信息、stage信息、task信息等,該功能是基於spark eventlogs日誌文件的,所以必須打開eventlogs日誌開關,關於日誌開關的打開和HistoryServer服務的啟動方法這裡不再講述,下面進入正題
下面使用的spark版本是2.0.2
類結構圖
Web相關
數據流相關
相關類及特質
WebUI
Web Server服務中UI層次結構的最頂層。每一個WebUI包含了一個tabs的集合,而每一個tab又包含了一個pages的集合。tabs頁是可選的,而且WebUI也可以直接添加page
繼承該特質的有SparkUI、MasterWebUI、WorkerWebUI和HistoryServer,在這裡我們主要介紹HistoryServer
WebUITab
一個tab包含了一個pages的集合。prefix通過追加到parent的url組成一個完整的url path,而且不能包含斜杠
繼承該特質有JobsTab、StagesTab、ExecutorsTab、StorageTab等(這裡沒有列全),對應於Spark UI界面上的Jobs、Stages、Executors、Storage等Tab頁
WebUIPage
一個page表示UI層次結構中的葉子節點。WebUIPage的直接父類即可以是WebUI,也可以是WebUITab。
如果父類是WebUI,prefix追加到parent的url形成完整的url path,如果父類是WebUITab,prefix追加到parent的prefix形成一個相對url path。Prefix中不能包含斜杠
繼承該特質的有JobPage、StagePage、ExecutionPage、StoragePage等,對應於Tab頁中具體的Page
HistoryPage
繼承至WebUIPage,通過render函數渲染生成history頁面
UIRoot
該特質被根容器(HistoryServer、SparkUI)繼承,用來為它們提供獲取application信息的統一介面
HistoryServer
def main(argStrings: Array[String]): Unit = {
……
val providerName = conf.getOption("spark.history.provider")
.getOrElse(classOf[FsHistoryProvider].getName)
val provider = Utils.classForName(providerName)
.getConstructor(classOf[SparkConf])
.newInstance(conf)
.asInstanceOf[ApplicationHistoryProvider]
val port = conf.getInt("spark.history.ui.port", 18080)
val server = new HistoryServer(conf, provider, securityManager, port)
server.bind
ShutdownHookManager.addShutdownHook { => server.stop }
// Wait until the end of the world... or if the HistoryServer process is manually stopped
while(true) { Thread.sleep(Int.MaxValue) }
}
HistoryServer繼承至WebUI,啟動的時候,會將環境配置以及provider作為成員變數來初始化HistoryServer實例,其中provider用來提供application的信息供web展示使用,HistoryServer實例化後執行bind函數,啟動jetty,將HTTP服務與web介面綁定,這時候historyserver web服務已經啟動了,之後添加了關閉server鉤子函數後進入無限循環等待
在HistoryServer實例化的過程中,會執行initialize函數,
def initialize {
attachPage(new HistoryPage(this))
attachHandler(ApiRootResource.getServletHandler(this))
attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
val contextHandler = new ServletContextHandler
contextHandler.setContextPath(HistoryServer.UI_PATH_PREFIX)
contextHandler.addServlet(new ServletHolder(loaderServlet), "/*")
attachHandler(contextHandler)
}
在該函數中,首先通過attachPage函數在UI中添加了HistoryPage實例,該實例負責渲染生成history page,然後通過attachHandler添加了不同的handler,可以訪問url路由獲取對應的信息,其中ApiRootResource提供了api/vi/開頭的路由,通過該路由,history page可以獲取後台解析出的eventlog信息用以呈現,數據通過UIRoot提供的介面獲取
到這裡,HistoryServer的Web端基本構建完成
HsitoryServer數據緩存及獲取
數據緩存主要通過使用google緩存機制LoadingCache實現,關於LoadingCache在Spark HistoryServer中的運用在另外一篇文章中分析
FsHistoryProvider
前面完成了web結構的構建,接下來就需要提供介面獲取歷史application的信息來呈現,而FsHistoryProvider就是這個介面,作為成員變數傳遞給HistoryServer。這個類在實例化的時候,執行了initialize函數,在該函數中,首先會檢查hdfs是否處於安全模式,如果處於安全模式,則會等待至退出安全模式,如果不處於安全模式,則走進startPolling函數,在該函數中會讀取配置的eventlog路徑(默認為file:/tmp/spark-events,通過spark.history.fs.logDirectory配置),然後啟動一個線程不斷掃描該路徑下的eventlog文件,將文件解析後載入到內存中供web查詢使用,相關函數如下:
private val pool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder
.setNameFormat("spark-history-task-%d").setDaemon(true).build)
pool.scheduleWithFixedDelay(getRunner(checkForLogs), 0, UPDATE_INTERVAL_S, TimeUnit.SECONDS)
if (conf.getBoolean("spark.history.fs.cleaner.enabled", false)) {
pool.scheduleWithFixedDelay(getRunner(cleanLogs), 0, CLEAN_INTERVAL_S, TimeUnit.SECONDS)
}
另外如果配置了清理開關(默認一天清理一次),則會清理內存中超時的application信息,並刪除超時且已完成的文件,載入和清理這兩個動作由同一個線程完成,以防止衝突。
for (file <- logInfos) { tasks += replayExecutor.submit(new Runnable { override def run: Unit = mergeApplicationListing(file) }) }
在checkForLogs函數中,會首先檢查文件是否有更新,已經掃描過的文件保存在一個以文件名為key的映射中fileToAppInfo,如果文件不在這個映射中,或者存在這個映射中但是文件大小變大了,則將此文件加入到載入列表中,隨後進行解析。解析的過程是採用一個固定線程數的線程池replayExecutor對需要載入的文件進行解析,每解析完一個文件,會將此文件的信息更新至fileToAppInfo,這個過程在mergeApplicationListing函數中完成,另外pendingReplayTasksCount中保存了等待解析的文件數目,所有文件解析完成後,更新一下解析完成時間
private def replay(
eventLog: FileStatus,
appCompleted: Boolean,
bus: ReplayListenerBus,
eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): ApplicationEventListener = {
val logPath = eventLog.getPath
logInfo(s"Replaying log path: $logPath")
val logInput = EventLoggingListener.openEventLog(logPath, fs)
try {
val appListener = new ApplicationEventListener
bus.addListener(appListener)
bus.replay(logInput, logPath.toString, !appCompleted, eventsFilter)
appListener
} finally {
logInput.close
}
}
在mergeApplicationListing函數中,主要通過執行reply函數將eventlog日誌文件解析出來,在該函數中,首先將ApplicationEventListener監聽器加入到ReplayListenerBus實例中,ReplayListenerBus主要通過調用該實例的replay函數從eventlog記錄中解析event事件,每解析一個event,都會發通知到各監聽器處理event,在這裡通過監聽者模式將日誌解析與結果處理兩個過程解耦開。執行完reply函數後,也就完成了一個eventlog文件的解析,如果解析成功,則將該eventlog的信息加入到fileToAppInfo,表明已經掃描過該文件
在cleanLogs函數中,會在log directory中刪除已經任務執行完成且超時的文件。歡迎關注業餘草:www.xttblog.com;CODE大全:www.codedq.net;愛分享:www.ndislwf.com
※「PHP」PHP面向對象編程——phpOOP入門
※將git版本號編譯進程序
※webgl自學筆記——幾何圖形
※「LeetCode」Wildcard Matching 題解
TAG:達人科技 |
※mybaits sqlSession 源碼解讀
※disruptor 源碼解讀
※親子閱讀/英語啟蒙:The Very Lonely Firefly解讀
※解讀目標檢測新範式:Segmentations is All You Need
※專業解讀 Business Analytics項目
※解讀葡萄牙人鑽石Portuguese Diamond
※Kaggle Carvana 圖像分割比賽冠軍模型 TernausNet 解讀
※全面解讀Liquidity.Network
※NIPS2018最佳論文解讀:Neural Ordinary Differential Equations
※深度解讀Chaumet Bee my Love愛·巢 & Liens 緣系?一生
※解讀區塊鏈瀏覽器Tokenview.com
※解讀 | 超級賬本 Brian Behlendorf 教你認識區塊鏈
※網路專家解讀YouTube,Twitter或Reddit的盈利模式
※靈感全揭秘-從A到Z解讀VirgilAbloh的LouisVuitton首秀
※深入解讀Google Lens
※《Nature Genetics》解讀腫瘤利器,預測癌症進化
※【大牌解讀】鑽石之王Harry Winston 到底吸引了多少位時尚Icon?
※Deep Forest 演算法解讀
※iPhone8/8Plus iPhone X 核心資訊全解讀
※英偉達官方解讀:Volta Tensor Core GPU實現AI性能新里程碑