Spark源碼閱讀之存儲體系--存儲體系概述與shuffle服務
一、概述
根據《深入理解Spark:核心思想與源碼分析》一書,結合最新的spark源代碼master分支進行源碼閱讀,對新版本的代碼加上自己的一些理解,如有錯誤,希望指出。
1.塊管理器BlockManager的實現塊管理器是Spark存儲體系的核心組件,Driver Application和Executor都會創建BlockManager,源代碼位置在core/org.apache.spark.storage,部分代碼如下。
private[spark] val externalShuffleServiceEnabled =
conf.getBoolean("spark.shuffle.service.enabled", false)
val diskBlockManager = {
// Only perform cleanup if an external service is not serving our shuffle files.
val deleteFilesOnStop =
!externalShuffleServiceEnabled || executorId == SparkContext.DRIVER_IDENTIFIER
new DiskBlockManager(conf, deleteFilesOnStop)
}
// Visible for testing
private[storage] val blockInfoManager = new BlockInfoManager
private val futureExecutionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128))
// Actual storage of where blocks are kept
private[spark] val memoryStore =
new MemoryStore(conf, blockInfoManager, serializerManager, memoryManager, this)
private[spark] val diskStore = new DiskStore(conf, diskBlockManager, securityManager)
memoryManager.setMemoryStore(memoryStore)
// Note: depending on the memory manager, `maxMemory` may actually vary over time.
// However, since we use this only for reporting and logging, what we actually want here is
// the absolute maximum value that `maxMemory` can ever possibly reach. We may need
// to revisit whether reporting this value as the "max" is intuitive to the user.
private val maxOnHeapMemory = memoryManager.maxOnHeapStorageMemory
private val maxOffHeapMemory = memoryManager.maxOffHeapStorageMemory
// Port used by the external shuffle service. In Yarn mode, this may be already be
// set through the Hadoop configuration as the server is launched in the Yarn NM.
private val externalShuffleServicePort = {
val tmpPort = Utils.getSparkOrYarnConfig(conf, "spark.shuffle.service.port", "7337").toInt
if (tmpPort == 0) {
// for testing, we set "spark.shuffle.service.port" to 0 in the yarn config, so yarn finds
// an open port. But we still need to tell our spark apps the right port to use. So
// only if the yarn config has the port set to 0, we prefer the value in the spark config
conf.get("spark.shuffle.service.port").toInt
} else {
tmpPort
}
}
var blockManagerId: BlockManagerId = _
// Address of the server that serves this executor"s shuffle files. This is either an external
// service, or just our own Executor"s BlockManager.
private[spark] var shuffleServerId: BlockManagerId = _
// Client to read other executors" shuffle files. This is either an external service, or just the
// standard BlockTransferService to directly connect to other Executors.
private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores)
new ExternalShuffleClient(transConf, securityManager,
securityManager.isAuthenticationEnabled, conf.get(config.SHUFFLE_REGISTRATION_TIMEOUT))
} else {
blockTransferService
}
// Max number of failures before this block manager refreshes the block locations from the driver
private val maxFailuresBeforeLocationRefresh =
conf.getInt("spark.block.failures.beforeLocationRefresh", 5)
private val slaveEndpoint = rpcEnv.setupEndpoint(
"BlockManagerEndpoint" + BlockManager.ID_GENERATOR.next,
new BlockManagerSlaveEndpoint(rpcEnv, this, mapOutputTracker))
// Pending re-registration action being executed asynchronously or null if none is pending.
// Accesses should synchronize on asyncReregisterLock.
private var asyncReregisterTask: Future[Unit] = null
private val asyncReregisterLock = new Object
// Field related to peer block managers that are necessary for block replication
@volatile private var cachedPeers: Seq[BlockManagerId] = _
private val peerFetchLock = new Object
private var lastPeerFetchTime = 0L
private var blockReplicationPolicy: BlockReplicationPolicy = _
View Code
上面代碼中聲明的BlockInfoManager用於管理BlockManager緩存BlockId及對應的BlockInfo,BlockInfoManager提供一些列的同步讀寫策略。BlockManager由以下部分組成。
1)shuffle客戶端shuffleClient;
2)BlockManagerMaster,對存在於所有Executor上的BlockManager進行統一管理;
3)磁碟塊管理器DiskBlockManager;
4)內存存儲MemoryStore;
5)磁碟存儲DiskStore;
BlockManager要生效必須要初始化,初始化代碼如下,
def initialize(appId: String): Unit = {
blockTransferService.init(this)
shuffleClient.init(appId)
blockReplicationPolicy = {
val priorityClass = conf.get(
"spark.storage.replication.policy", classOf[RandomBlockReplicationPolicy].getName)
val clazz = Utils.classForName(priorityClass)
val ret = clazz.newInstance.asInstanceOf[BlockReplicationPolicy]
logInfo(s"Using $priorityClass for block replication policy")
ret
}
val id =
BlockManagerId(executorId, blockTransferService.hostName, blockTransferService.port, None)
val idFromMaster = master.registerBlockManager(
id,
maxOnHeapMemory,
maxOffHeapMemory,
slaveEndpoint)
blockManagerId = if (idFromMaster != null) idFromMaster else id
shuffleServerId = if (externalShuffleServiceEnabled) {
logInfo(s"external shuffle service port = $externalShuffleServicePort")
BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
} else {
blockManagerId
}
// Register Executors" configuration with the local shuffle service, if one should exist.
if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
registerWithExternalShuffleServer
}
logInfo(s"Initialized BlockManager: $blockManagerId")
}
View Code
1)BlockTransferService和shuffle客戶端shuffleClient的初始化,ShuffleClien默認是BlockTransferService,當有外部的ShuffleService時,則調用外部的ExternalShuffleClient。
2)創建id為本地BlockManagerId,向BlockManagerMaster註冊此id,獲取從BlockManagerMaster的idFromMaster,如果idFromMaster為空則BlockManagerId為剛才創建的id,否則使用BlockManagerMaster註冊到的idFromMaster。
3)ShuffleServerId的創建,當有外部的ShuffleService時,創建新的BlockManagerId作為ShuffleServerId。
4)當有外部的ShuffleService並且當前BlockMaId不是Driver端,則需要向ShuffleClient註冊ShuffleServerId
2.Spark存儲體系架構
1)1表示Executor的BlockManager與Driver的BlockManager進行消息通信,例如註冊BlockManager、更新Block信息、獲取Block所在的BlockManager、刪除Executor等
2)2表示對BlockManager的讀操作如get、doGetLocal等和寫操作doPut、puSingle等
3)3表示當MemoryStore的內存不足時,寫入DiskStore,而DiskStore實際依賴於DiskBlockManager
4)4表示通過訪問遠端節點的Executor的BlockManager中的TransportServer提供RPC服務下載或者上傳Block
5)5表示遠端節點的Executor的BlockManager訪問本地Executor的BlockManager中的TransportServer提供的RPC服務下載或者上傳Block。
二、shuffle服務與客戶端
1.Block的RPC服務
當map任務與reduce任務處於不同的節點時,reduce任務需要從遠端節點下載map任務的中間件輸出,因此NettyBlockRpcServer提供打開,即下載Block文件的功能;一些情況下,為了容錯,需要將Block的數據備份到其他節點上,所以NettyBlockRpcServer還提供了上傳Block文件的RPC服務,實現見代碼,代碼位置:core/org.apache.spark.network.netty。
class NettyBlockRpcServer(
appId: String,
serializer: Serializer,
blockManager: BlockDataManager)
extends RpcHandler with Logging {
private val streamManager = new OneForOneStreamManager
override def receive(
client: TransportClient,
rpcMessage: ByteBuffer,
responseContext: RpcResponseCallback): Unit = {
val message = BlockTransferMessage.Decoder.fromByteBuffer(rpcMessage)
logTrace(s"Received request: $message")
message match {
case openBlocks: OpenBlocks =>
val blocksNum = openBlocks.blockIds.length
val blocks = for (i <- (0 until blocksNum).view)
yield blockManager.getBlockData(BlockId.apply(openBlocks.blockIds(i)))
val streamId = streamManager.registerStream(appId, blocks.iterator.asJava)
logTrace(s"Registered streamId $streamId with $blocksNum buffers")
responseContext.onSuccess(new StreamHandle(streamId, blocksNum).toByteBuffer)
case uploadBlock: UploadBlock =>
// StorageLevel and ClassTag are serialized as bytes using our JavaSerializer.
val (level: StorageLevel, classTag: ClassTag[_]) = {
serializer
.newInstance
.deserialize(ByteBuffer.wrap(uploadBlock.metadata))
.asInstanceOf[(StorageLevel, ClassTag[_])]
}
val data = new NioManagedBuffer(ByteBuffer.wrap(uploadBlock.blockData))
val blockId = BlockId(uploadBlock.blockId)
blockManager.putBlockData(blockId, data, level, classTag)
responseContext.onSuccess(ByteBuffer.allocate(0))
}
}
override def getStreamManager: StreamManager = streamManager
}
View Code
2.構造傳輸上下文TransportContext代碼位置,common/network-common/org.apache.spark.network
public TransportContext(
TransportConf conf,
RpcHandler rpcHandler,
boolean closeIdleConnections) {
this.conf = conf;
this.rpcHandler = rpcHandler;
this.closeIdleConnections = closeIdleConnections;
}
View Code
TransportContext既可以創建Netty服務,也可以創建Netty訪問客戶端,組成部分如下。
1)TransportConf:主要控制Netty框架提供的shuffle的I/O交互的客戶端和服務端線程數量等
2)RpcHandler:負責shuffle的I/O服務端在接收到客戶端的RPC請求後,提供打開Block或者上傳Block的RPC處理,此處實現為NettyBlockRpcServer
3)是否關閉閑置連接
3.RPC客戶端工廠TransportClientFactory public TransportClientFactory createClientFactory(List View Code 以下分析TransportClientFactory代碼。 public TransportClientFactory( IOMode ioMode = IOMode.valueOf(conf.ioMode); View Code 1)clientBootstraps:用於緩存客戶端列表 2)connectionPool:用戶緩存客戶端連接 3)numConnectionsPerPeer:節點之間取數據的連接數,可以使用屬性spark.shuffle.io.numConnectionsPerPeer來配置,默認為1
return new TransportClientFactory(this, bootstraps);
}
TransportContext context,
List
this.context = Preconditions.checkNotNull(context);
this.conf = context.getConf;
this.clientBootstraps = Lists.newArrayList(Preconditions.checkNotNull(clientBootstraps));
this.connectionPool = new ConcurrentHashMap<>;
this.numConnectionsPerPeer = conf.numConnectionsPerPeer;
this.rand = new Random;
this.socketChannelClass = NettyUtils.getClientChannelClass(ioMode);
this.workerGroup = NettyUtils.createEventLoop(
ioMode,
conf.clientThreads,
conf.getModuleName + "-client");
this.pooledAllocator = NettyUtils.createPooledByteBufAllocator(
conf.preferDirectBufs, false /* allowCache */, conf.clientThreads);
}
4)SocketChannelClass:客戶端channel被創建時使用的類,可以使用屬性spark.shuffle.io.mode來配置
5)workerGroup:根據Netty的規範,客戶端只有worker組,所以此處創建workerGroup,實際是NioEventLoopGroup
6)pooledAllocator:彙集ByteBuf但對本地線程緩存禁用的分配器。
4.Netty伺服器TransportServerTransportServer提供了Netty實現的伺服器端,用於提供RPC服務,如上傳、下載等,代碼如下。
public TransportServer createServer( View Code TransportServer構造器如下 public TransportServer( try { View Code init方法對TransportServer進行初始化,通過使用Netty框架的EventLoopGroup、ServerBootstrap等API創建shuffle的I/O交互的服務端,主要代碼見清單。 private void init(String hostToBind, int portToBind) { IOMode ioMode = IOMode.valueOf(conf.ioMode); PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator( bootstrap = new ServerBootstrap if (conf.backLog > 0) { if (conf.receiveBuf > 0) { if (conf.sendBuf > 0) { bootstrap.childHandler(new ChannelInitializer InetSocketAddress address = hostToBind == null ? port = ((InetSocketAddress) channelFuture.channel.localAddress).getPort; View Code 1)ioMode:NIO或者EPOLL
String host, int port, List
return new TransportServer(this, host, port, rpcHandler, bootstraps);
}
TransportContext context,
String hostToBind,
int portToBind,
RpcHandler appRpcHandler,
List
this.context = context;
this.conf = context.getConf;
this.appRpcHandler = appRpcHandler;
this.bootstraps = Lists.newArrayList(Preconditions.checkNotNull(bootstraps));
init(hostToBind, portToBind);
} catch (RuntimeException e) {
JavaUtils.closeQuietly(this);
throw e;
}
}
EventLoopGroup bossGroup =
NettyUtils.createEventLoop(ioMode, conf.serverThreads, conf.getModuleName + "-server");
EventLoopGroup workerGroup = bossGroup;
conf.preferDirectBufs, true /* allowCache */, conf.serverThreads);
.group(bossGroup, workerGroup)
.channel(NettyUtils.getServerChannelClass(ioMode))
.option(ChannelOption.ALLOCATOR, allocator)
.childOption(ChannelOption.ALLOCATOR, allocator);
bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog);
}
bootstrap.childOption(ChannelOption.SO_RCVBUF, conf.receiveBuf);
}
bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf);
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
RpcHandler rpcHandler = appRpcHandler;
for (TransportServerBootstrap bootstrap : bootstraps) {
rpcHandler = bootstrap.doBootstrap(ch, rpcHandler);
}
context.initializePipeline(ch, rpcHandler);
}
});
new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind);
channelFuture = bootstrap.bind(address);
channelFuture.syncUninterruptibly;
logger.debug("Shuffle server started on port: {}", port);
}
2) ChannelOption.ALLOCATOR:在Netty 4中實現了一個新的ByteBuf內存池,它是一個純Java版本的 jemalloc (Facebook也在用)。現在,Netty不會再因為用零填充緩衝區而浪費內存帶寬了。不過,由於它不依賴於GC,開發人員需要小心內存泄漏。如果忘記在處理程序中釋放緩衝區,那麼內存使用率會無限地增長。Netty默認不使用內存池,需要在創建客戶端或者服務端的時候進行指定,使用內存池之後,內存的申請和釋放必須成對出現,即retain和release要成對出現,否則會導致內存泄露。
3)RpcHandler處理接收到的數據邏輯
5.獲取遠程shuffle文件NettyBlockTransferService的fetchBlocks方法用於獲取遠程的shuffle文件,實際是使用NettyBlockTransferService中創建的Netty服務。
override def fetchBlocks(
host: String,
port: Int,
execId: String,
blockIds: Array[String],
listener: BlockFetchingListener,
tempShuffleFileManager: TempShuffleFileManager): Unit = {
logTrace(s"Fetch blocks from $host:$port (executor id $execId)")
try {
val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter {
override def createAndStart(blockIds: Array[String], listener: BlockFetchingListener) {
val client = clientFactory.createClient(host, port)
new OneForOneBlockFetcher(client, appId, execId, blockIds, listener,
transportConf, tempShuffleFileManager).start
}
}
val maxRetries = transportConf.maxIORetries
if (maxRetries > 0) {
// Note this Fetcher will correctly handle maxRetries == 0; we avoid it just in case there"s
// a bug in this code. We should remove the if statement once we"re sure of the stability.
new RetryingBlockFetcher(transportConf, blockFetchStarter, blockIds, listener).start
} else {
blockFetchStarter.createAndStart(blockIds, listener)
}
} catch {
case e: Exception =>
logError("Exception while beginning fetchBlocks", e)
blockIds.foreach(listener.onBlockFetchFailure(_, e))
}
}
View Code
6.上傳shuffle文件NettyBlockTransferService的uploadBlock方法用於上傳shuffle文件到遠程的Executor,實際也是用NettyBlockTransferService中創建的Netty服務,步驟如下。
override def uploadBlock(
hostname: String,
port: Int,
execId: String,
blockId: BlockId,
blockData: ManagedBuffer,
level: StorageLevel,
classTag: ClassTag[_]): Future[Unit] = {
val result = Promise[Unit]
val client = clientFactory.createClient(hostname, port)
// StorageLevel and ClassTag are serialized as bytes using our JavaSerializer.
// Everything else is encoded using our binary protocol.
val metadata = JavaUtils.bufferToArray(serializer.newInstance.serialize((level, classTag)))
// Convert or copy nio buffer into array in order to serialize it.
val array = JavaUtils.bufferToArray(blockData.nioByteBuffer)
client.sendRpc(new UploadBlock(appId, execId, blockId.toString, metadata, array).toByteBuffer,
new RpcResponseCallback {
override def onSuccess(response: ByteBuffer): Unit = {
logTrace(s"Successfully uploaded block $blockId")
result.success(: Unit)
}
override def onFailure(e: Throwable): Unit = {
logError(s"Error while uploading block $blockId", e)
result.failure(e)
}
})
result.future
}
View Code
1)創建Netty服務的客戶端,客戶端連接的hostname和port正是BlockManager的hostname和port
2)將Block的存儲級別StorageLevel和類標籤序列化
3)將Block的ByteBuffer轉化為數據,便於序列化
4)將appId、execId、blockId、metadata、轉化為數組的Block封裝為UploadBlock,並將其序列化為位元組數組
5)最終調用Netty客戶端的sendRpc方法將位元組數組上傳,回掉函數RpcResponseCallback根據RPC的結果更改上傳狀態。
※使用C創建WCF服務控制台應用程序
※Redis Pipeline原理分析
TAG:達人科技 |
※通過區塊鏈構建分散式信用數據體系,Distributed Credit Chain打造去中心化的金融生態系統
※我的Weekly、Daily體系
※RoboSense聯手地平線、菜鳥等合作夥伴發布Smart Sensor System戰略體系
※動作—功能動作訓練體系Movement,Fuctional movement system
※Maxim TTS 部常務董事Anders Reisch談高效技術服務體系
※Azzedine Ala?a 的反時裝體系啟示錄
※微軟推出自主版本Linux,構建物聯網Azure Sphere體系
※講座信息 | George Cardona:Pā?ini(波你尼),Pā?ini』s Kārakas波你尼語法體系中的「作者」
※2019計算機體系結構最高獎Eckert-Mauchly公布,Mark D. Hill獲獎
※會員體系PK:亞馬遜Prime依然殺不死Costco
※2019計算機體系結構最高獎Eckert-Mauchly獎公布,Mark D.Hill獲獎
※DxOMark推出Selfie前置攝像頭評測體系
※國際清算銀行(BIS)總裁Agustin Carstens:要防止數字貨幣成為現行金融體系「寄生蟲」
※Jmeter體系架構分析
※Cohda Wireless推出硬體系統MK6C EVK 用於C-V2X通信測試和評估應用
※Wish認證體系
※愛德萬收購Astronics半導體系統測試部
※體系解析:Liquid與FTM所有體系揭秘
※評測|HPE Nimble AF全快閃記憶體系列,詮釋真正的高端存儲
※2017圖靈獎重磅公布!體系結構宗師Hennessy和Patterson獲獎!