當前位置:
首頁 > 知識 > Spark源碼閱讀之存儲體系--存儲體系概述與shuffle服務

Spark源碼閱讀之存儲體系--存儲體系概述與shuffle服務


一、概述

根據《深入理解Spark:核心思想與源碼分析》一書,結合最新的spark源代碼master分支進行源碼閱讀,對新版本的代碼加上自己的一些理解,如有錯誤,希望指出。

1.塊管理器BlockManager的實現

塊管理器是Spark存儲體系的核心組件,Driver Application和Executor都會創建BlockManager,源代碼位置在core/org.apache.spark.storage,部分代碼如下。

private[spark] val externalShuffleServiceEnabled =
conf.getBoolean("spark.shuffle.service.enabled", false)

val diskBlockManager = {
// Only perform cleanup if an external service is not serving our shuffle files.
val deleteFilesOnStop =
!externalShuffleServiceEnabled || executorId == SparkContext.DRIVER_IDENTIFIER
new DiskBlockManager(conf, deleteFilesOnStop)
}

// Visible for testing
private[storage] val blockInfoManager = new BlockInfoManager

private val futureExecutionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128))

// Actual storage of where blocks are kept
private[spark] val memoryStore =
new MemoryStore(conf, blockInfoManager, serializerManager, memoryManager, this)
private[spark] val diskStore = new DiskStore(conf, diskBlockManager, securityManager)
memoryManager.setMemoryStore(memoryStore)

// Note: depending on the memory manager, `maxMemory` may actually vary over time.
// However, since we use this only for reporting and logging, what we actually want here is
// the absolute maximum value that `maxMemory` can ever possibly reach. We may need
// to revisit whether reporting this value as the "max" is intuitive to the user.
private val maxOnHeapMemory = memoryManager.maxOnHeapStorageMemory
private val maxOffHeapMemory = memoryManager.maxOffHeapStorageMemory

// Port used by the external shuffle service. In Yarn mode, this may be already be
// set through the Hadoop configuration as the server is launched in the Yarn NM.
private val externalShuffleServicePort = {
val tmpPort = Utils.getSparkOrYarnConfig(conf, "spark.shuffle.service.port", "7337").toInt
if (tmpPort == 0) {
// for testing, we set "spark.shuffle.service.port" to 0 in the yarn config, so yarn finds
// an open port. But we still need to tell our spark apps the right port to use. So
// only if the yarn config has the port set to 0, we prefer the value in the spark config
conf.get("spark.shuffle.service.port").toInt
} else {
tmpPort
}
}

var blockManagerId: BlockManagerId = _

// Address of the server that serves this executor"s shuffle files. This is either an external
// service, or just our own Executor"s BlockManager.
private[spark] var shuffleServerId: BlockManagerId = _

// Client to read other executors" shuffle files. This is either an external service, or just the
// standard BlockTransferService to directly connect to other Executors.
private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores)
new ExternalShuffleClient(transConf, securityManager,
securityManager.isAuthenticationEnabled, conf.get(config.SHUFFLE_REGISTRATION_TIMEOUT))
} else {
blockTransferService
}

// Max number of failures before this block manager refreshes the block locations from the driver
private val maxFailuresBeforeLocationRefresh =
conf.getInt("spark.block.failures.beforeLocationRefresh", 5)

private val slaveEndpoint = rpcEnv.setupEndpoint(
"BlockManagerEndpoint" + BlockManager.ID_GENERATOR.next,
new BlockManagerSlaveEndpoint(rpcEnv, this, mapOutputTracker))

// Pending re-registration action being executed asynchronously or null if none is pending.
// Accesses should synchronize on asyncReregisterLock.
private var asyncReregisterTask: Future[Unit] = null
private val asyncReregisterLock = new Object

// Field related to peer block managers that are necessary for block replication
@volatile private var cachedPeers: Seq[BlockManagerId] = _
private val peerFetchLock = new Object
private var lastPeerFetchTime = 0L

private var blockReplicationPolicy: BlockReplicationPolicy = _

View Code

上面代碼中聲明的BlockInfoManager用於管理BlockManager緩存BlockId及對應的BlockInfo,BlockInfoManager提供一些列的同步讀寫策略。BlockManager由以下部分組成。

1)shuffle客戶端shuffleClient;

2)BlockManagerMaster,對存在於所有Executor上的BlockManager進行統一管理;

3)磁碟塊管理器DiskBlockManager;

4)內存存儲MemoryStore;

5)磁碟存儲DiskStore;

BlockManager要生效必須要初始化,初始化代碼如下,

def initialize(appId: String): Unit = {
blockTransferService.init(this)
shuffleClient.init(appId)

blockReplicationPolicy = {
val priorityClass = conf.get(
"spark.storage.replication.policy", classOf[RandomBlockReplicationPolicy].getName)
val clazz = Utils.classForName(priorityClass)
val ret = clazz.newInstance.asInstanceOf[BlockReplicationPolicy]
logInfo(s"Using $priorityClass for block replication policy")
ret
}

val id =
BlockManagerId(executorId, blockTransferService.hostName, blockTransferService.port, None)

val idFromMaster = master.registerBlockManager(
id,
maxOnHeapMemory,
maxOffHeapMemory,
slaveEndpoint)

blockManagerId = if (idFromMaster != null) idFromMaster else id

shuffleServerId = if (externalShuffleServiceEnabled) {
logInfo(s"external shuffle service port = $externalShuffleServicePort")
BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort)
} else {
blockManagerId
}

// Register Executors" configuration with the local shuffle service, if one should exist.
if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
registerWithExternalShuffleServer
}

logInfo(s"Initialized BlockManager: $blockManagerId")
}

View Code

1)BlockTransferService和shuffle客戶端shuffleClient的初始化,ShuffleClien默認是BlockTransferService,當有外部的ShuffleService時,則調用外部的ExternalShuffleClient。

2)創建id為本地BlockManagerId,向BlockManagerMaster註冊此id,獲取從BlockManagerMaster的idFromMaster,如果idFromMaster為空則BlockManagerId為剛才創建的id,否則使用BlockManagerMaster註冊到的idFromMaster。

3)ShuffleServerId的創建,當有外部的ShuffleService時,創建新的BlockManagerId作為ShuffleServerId。

4)當有外部的ShuffleService並且當前BlockMaId不是Driver端,則需要向ShuffleClient註冊ShuffleServerId

2.Spark存儲體系架構

Spark源碼閱讀之存儲體系--存儲體系概述與shuffle服務

1)1表示Executor的BlockManager與Driver的BlockManager進行消息通信,例如註冊BlockManager、更新Block信息、獲取Block所在的BlockManager、刪除Executor等

2)2表示對BlockManager的讀操作如get、doGetLocal等和寫操作doPut、puSingle等

3)3表示當MemoryStore的內存不足時,寫入DiskStore,而DiskStore實際依賴於DiskBlockManager

4)4表示通過訪問遠端節點的Executor的BlockManager中的TransportServer提供RPC服務下載或者上傳Block

5)5表示遠端節點的Executor的BlockManager訪問本地Executor的BlockManager中的TransportServer提供的RPC服務下載或者上傳Block。


二、shuffle服務與客戶端

1.Block的RPC服務

當map任務與reduce任務處於不同的節點時,reduce任務需要從遠端節點下載map任務的中間件輸出,因此NettyBlockRpcServer提供打開,即下載Block文件的功能;一些情況下,為了容錯,需要將Block的數據備份到其他節點上,所以NettyBlockRpcServer還提供了上傳Block文件的RPC服務,實現見代碼,代碼位置:core/org.apache.spark.network.netty。

class NettyBlockRpcServer(
appId: String,
serializer: Serializer,
blockManager: BlockDataManager)
extends RpcHandler with Logging {

private val streamManager = new OneForOneStreamManager

override def receive(
client: TransportClient,
rpcMessage: ByteBuffer,
responseContext: RpcResponseCallback): Unit = {
val message = BlockTransferMessage.Decoder.fromByteBuffer(rpcMessage)
logTrace(s"Received request: $message")

message match {
case openBlocks: OpenBlocks =>
val blocksNum = openBlocks.blockIds.length
val blocks = for (i <- (0 until blocksNum).view) yield blockManager.getBlockData(BlockId.apply(openBlocks.blockIds(i))) val streamId = streamManager.registerStream(appId, blocks.iterator.asJava) logTrace(s"Registered streamId $streamId with $blocksNum buffers") responseContext.onSuccess(new StreamHandle(streamId, blocksNum).toByteBuffer) case uploadBlock: UploadBlock =>
// StorageLevel and ClassTag are serialized as bytes using our JavaSerializer.
val (level: StorageLevel, classTag: ClassTag[_]) = {
serializer
.newInstance
.deserialize(ByteBuffer.wrap(uploadBlock.metadata))
.asInstanceOf[(StorageLevel, ClassTag[_])]
}
val data = new NioManagedBuffer(ByteBuffer.wrap(uploadBlock.blockData))
val blockId = BlockId(uploadBlock.blockId)
blockManager.putBlockData(blockId, data, level, classTag)
responseContext.onSuccess(ByteBuffer.allocate(0))
}
}

override def getStreamManager: StreamManager = streamManager
}

View Code

2.構造傳輸上下文TransportContext

代碼位置,common/network-common/org.apache.spark.network

public TransportContext(
TransportConf conf,
RpcHandler rpcHandler,
boolean closeIdleConnections) {
this.conf = conf;
this.rpcHandler = rpcHandler;
this.closeIdleConnections = closeIdleConnections;
}

View Code

TransportContext既可以創建Netty服務,也可以創建Netty訪問客戶端,組成部分如下。

1)TransportConf:主要控制Netty框架提供的shuffle的I/O交互的客戶端和服務端線程數量等

2)RpcHandler:負責shuffle的I/O服務端在接收到客戶端的RPC請求後,提供打開Block或者上傳Block的RPC處理,此處實現為NettyBlockRpcServer

3)是否關閉閑置連接

3.RPC客戶端工廠TransportClientFactory

public TransportClientFactory createClientFactory(List bootstraps) {
return new TransportClientFactory(this, bootstraps);
}

View Code

以下分析TransportClientFactory代碼。

public TransportClientFactory(
TransportContext context,
List clientBootstraps) {
this.context = Preconditions.checkNotNull(context);
this.conf = context.getConf;
this.clientBootstraps = Lists.newArrayList(Preconditions.checkNotNull(clientBootstraps));
this.connectionPool = new ConcurrentHashMap<>;
this.numConnectionsPerPeer = conf.numConnectionsPerPeer;
this.rand = new Random;

IOMode ioMode = IOMode.valueOf(conf.ioMode);
this.socketChannelClass = NettyUtils.getClientChannelClass(ioMode);
this.workerGroup = NettyUtils.createEventLoop(
ioMode,
conf.clientThreads,
conf.getModuleName + "-client");
this.pooledAllocator = NettyUtils.createPooledByteBufAllocator(
conf.preferDirectBufs, false /* allowCache */, conf.clientThreads);
}

View Code

1)clientBootstraps:用於緩存客戶端列表

2)connectionPool:用戶緩存客戶端連接

3)numConnectionsPerPeer:節點之間取數據的連接數,可以使用屬性spark.shuffle.io.numConnectionsPerPeer來配置,默認為1

4)SocketChannelClass:客戶端channel被創建時使用的類,可以使用屬性spark.shuffle.io.mode來配置

5)workerGroup:根據Netty的規範,客戶端只有worker組,所以此處創建workerGroup,實際是NioEventLoopGroup

6)pooledAllocator:彙集ByteBuf但對本地線程緩存禁用的分配器。

4.Netty伺服器TransportServer

TransportServer提供了Netty實現的伺服器端,用於提供RPC服務,如上傳、下載等,代碼如下。

public TransportServer createServer(
String host, int port, List bootstraps) {
return new TransportServer(this, host, port, rpcHandler, bootstraps);
}

View Code

TransportServer構造器如下

public TransportServer(
TransportContext context,
String hostToBind,
int portToBind,
RpcHandler appRpcHandler,
List bootstraps) {
this.context = context;
this.conf = context.getConf;
this.appRpcHandler = appRpcHandler;
this.bootstraps = Lists.newArrayList(Preconditions.checkNotNull(bootstraps));

try {
init(hostToBind, portToBind);
} catch (RuntimeException e) {
JavaUtils.closeQuietly(this);
throw e;
}
}

View Code

init方法對TransportServer進行初始化,通過使用Netty框架的EventLoopGroup、ServerBootstrap等API創建shuffle的I/O交互的服務端,主要代碼見清單。

private void init(String hostToBind, int portToBind) {

IOMode ioMode = IOMode.valueOf(conf.ioMode);
EventLoopGroup bossGroup =
NettyUtils.createEventLoop(ioMode, conf.serverThreads, conf.getModuleName + "-server");
EventLoopGroup workerGroup = bossGroup;

PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator(
conf.preferDirectBufs, true /* allowCache */, conf.serverThreads);

bootstrap = new ServerBootstrap
.group(bossGroup, workerGroup)
.channel(NettyUtils.getServerChannelClass(ioMode))
.option(ChannelOption.ALLOCATOR, allocator)
.childOption(ChannelOption.ALLOCATOR, allocator);

if (conf.backLog > 0) {
bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog);
}

if (conf.receiveBuf > 0) {
bootstrap.childOption(ChannelOption.SO_RCVBUF, conf.receiveBuf);
}

if (conf.sendBuf > 0) {
bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf);
}

bootstrap.childHandler(new ChannelInitializer {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
RpcHandler rpcHandler = appRpcHandler;
for (TransportServerBootstrap bootstrap : bootstraps) {
rpcHandler = bootstrap.doBootstrap(ch, rpcHandler);
}
context.initializePipeline(ch, rpcHandler);
}
});

InetSocketAddress address = hostToBind == null ?
new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind);
channelFuture = bootstrap.bind(address);
channelFuture.syncUninterruptibly;

port = ((InetSocketAddress) channelFuture.channel.localAddress).getPort;
logger.debug("Shuffle server started on port: {}", port);
}

View Code

1)ioMode:NIO或者EPOLL

2) ChannelOption.ALLOCATOR:在Netty 4中實現了一個新的ByteBuf內存池,它是一個純Java版本的 jemalloc (Facebook也在用)。現在,Netty不會再因為用零填充緩衝區而浪費內存帶寬了。不過,由於它不依賴於GC,開發人員需要小心內存泄漏。如果忘記在處理程序中釋放緩衝區,那麼內存使用率會無限地增長。Netty默認不使用內存池,需要在創建客戶端或者服務端的時候進行指定,使用內存池之後,內存的申請和釋放必須成對出現,即retain和release要成對出現,否則會導致內存泄露。

3)RpcHandler處理接收到的數據邏輯

5.獲取遠程shuffle文件

NettyBlockTransferService的fetchBlocks方法用於獲取遠程的shuffle文件,實際是使用NettyBlockTransferService中創建的Netty服務。

override def fetchBlocks(
host: String,
port: Int,
execId: String,
blockIds: Array[String],
listener: BlockFetchingListener,
tempShuffleFileManager: TempShuffleFileManager): Unit = {
logTrace(s"Fetch blocks from $host:$port (executor id $execId)")
try {
val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter {
override def createAndStart(blockIds: Array[String], listener: BlockFetchingListener) {
val client = clientFactory.createClient(host, port)
new OneForOneBlockFetcher(client, appId, execId, blockIds, listener,
transportConf, tempShuffleFileManager).start
}
}

val maxRetries = transportConf.maxIORetries
if (maxRetries > 0) {
// Note this Fetcher will correctly handle maxRetries == 0; we avoid it just in case there"s
// a bug in this code. We should remove the if statement once we"re sure of the stability.
new RetryingBlockFetcher(transportConf, blockFetchStarter, blockIds, listener).start
} else {
blockFetchStarter.createAndStart(blockIds, listener)
}
} catch {
case e: Exception =>
logError("Exception while beginning fetchBlocks", e)
blockIds.foreach(listener.onBlockFetchFailure(_, e))
}
}

View Code

6.上傳shuffle文件

NettyBlockTransferService的uploadBlock方法用於上傳shuffle文件到遠程的Executor,實際也是用NettyBlockTransferService中創建的Netty服務,步驟如下。

override def uploadBlock(
hostname: String,
port: Int,
execId: String,
blockId: BlockId,
blockData: ManagedBuffer,
level: StorageLevel,
classTag: ClassTag[_]): Future[Unit] = {
val result = Promise[Unit]
val client = clientFactory.createClient(hostname, port)

// StorageLevel and ClassTag are serialized as bytes using our JavaSerializer.
// Everything else is encoded using our binary protocol.
val metadata = JavaUtils.bufferToArray(serializer.newInstance.serialize((level, classTag)))

// Convert or copy nio buffer into array in order to serialize it.
val array = JavaUtils.bufferToArray(blockData.nioByteBuffer)

client.sendRpc(new UploadBlock(appId, execId, blockId.toString, metadata, array).toByteBuffer,
new RpcResponseCallback {
override def onSuccess(response: ByteBuffer): Unit = {
logTrace(s"Successfully uploaded block $blockId")
result.success(: Unit)
}
override def onFailure(e: Throwable): Unit = {
logError(s"Error while uploading block $blockId", e)
result.failure(e)
}
})

result.future
}

View Code

1)創建Netty服務的客戶端,客戶端連接的hostname和port正是BlockManager的hostname和port

2)將Block的存儲級別StorageLevel和類標籤序列化

3)將Block的ByteBuffer轉化為數據,便於序列化

4)將appId、execId、blockId、metadata、轉化為數組的Block封裝為UploadBlock,並將其序列化為位元組數組

5)最終調用Netty客戶端的sendRpc方法將位元組數組上傳,回掉函數RpcResponseCallback根據RPC的結果更改上傳狀態。

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 達人科技 的精彩文章:

使用C創建WCF服務控制台應用程序
Redis Pipeline原理分析

TAG:達人科技 |

您可能感興趣

通過區塊鏈構建分散式信用數據體系,Distributed Credit Chain打造去中心化的金融生態系統
我的Weekly、Daily體系
RoboSense聯手地平線、菜鳥等合作夥伴發布Smart Sensor System戰略體系
動作—功能動作訓練體系Movement,Fuctional movement system
Maxim TTS 部常務董事Anders Reisch談高效技術服務體系
Azzedine Ala?a 的反時裝體系啟示錄
微軟推出自主版本Linux,構建物聯網Azure Sphere體系
講座信息 | George Cardona:Pā?ini(波你尼),Pā?ini』s Kārakas波你尼語法體系中的「作者」
2019計算機體系結構最高獎Eckert-Mauchly公布,Mark D. Hill獲獎
會員體系PK:亞馬遜Prime依然殺不死Costco
2019計算機體系結構最高獎Eckert-Mauchly獎公布,Mark D.Hill獲獎
DxOMark推出Selfie前置攝像頭評測體系
國際清算銀行(BIS)總裁Agustin Carstens:要防止數字貨幣成為現行金融體系「寄生蟲」
Jmeter體系架構分析
Cohda Wireless推出硬體系統MK6C EVK 用於C-V2X通信測試和評估應用
Wish認證體系
愛德萬收購Astronics半導體系統測試部
體系解析:Liquid與FTM所有體系揭秘
評測|HPE Nimble AF全快閃記憶體系列,詮釋真正的高端存儲
2017圖靈獎重磅公布!體系結構宗師Hennessy和Patterson獲獎!