spark的存储系统--BlockManager源码分析

根据之前的一系列分析,我们对spark作业从创建到调度分发,到执行,最后结果回传driver的过程有了一个大概的了解。但是在分析源码的过程中也留下了大量的问题,最主要的就是涉及到的spark中重要的几个基础模块,我们对这些基础设施的内部细节并不是很了解,之前走读源码时基本只是大概了解每个模块的作用以及对外的主要接口,这些重要的模块包括BlockMananger, MemoryMananger, ShuffleManager, MapOutputTracker, rpc模块NettyRPCEnv,以及BroadcastManager。 而对于调度系统涉及到的几个类包括DAGSchedulerManager, TaskSchedulerManager, CoarseGrainedSchedulerBackend, CoarseGrainedExecutorBackend, Executor, TaskRunner,我们之前已经做了较为详细的分析,因此这几个模块暂告一段落。
本篇,我们来看一下spark中最基础的一个的模块--存储系统BlockManager的内部实现。

BlockManager调用时机

首先,我们来整理一下在一个作业的运行过程中都有哪些地方使用到了BlockManager。

  • DAGScheduler.getCacheLocs。这个方法的调用是在提交一个stage时,需要获取分区的偏向位置时会调用该方法。我们知道rdd是可以缓存的,而rdd的缓存就是通过blockManager来管理的,有一个专门的RDDBlockId用来表示一个RDD缓存块的唯一标识。

      最终调用的方法是:blockManagerMaster.getLocations(blockIds)
  • 广播变量。在DAGscheduler中提交stage时需要把rdd和ShuffleDependency(对于ResultStage则是一个函数)对象序列化用于网络传输,实际上序列化后的字节数组是通过broadcastManager组件进行网络传输的,而broadcastManager实际又是通过BlockMananger来将要广播的数据存储成block,并在executor端发送rpc请求向BlockManangerMaster请求数据。每个广播变量会对应一个TorrentBroadcast对象,TorrentBroadcast对象内的writeBlocks和readBlocks是读写广播变量的方法,

      最终调用的方法是:blockManager.putSingle和blockManager.putBytes
  • Shuffle的map阶段输出。如果我们没有启动外部shuffle服务及ExternalShuffle,那么就会用spark自己的shuffle机制,在map阶段输出时通过blockManager对输出的文件进行管理。shuffle这部分主要使用的是DiskBlockManager组件。

      最终调用的是:DiskBlockManager相关方法包括createTempShuffleBlock,getDiskWriter,
      DiskBlockObjectWriter相关方法,包括write方法和commitAndGet方法
  • 任务运行结果序列化后传回driver。这里分为两种情况,如果结果序列化后体积较小,小于maxDirectResultSize,则直接通过rpc接口传回,如果体积较大,就需要先通过blockManager写入executor几点的内存和磁盘中,然后在driver端进行拉取。

      最终调用的是:blockManager.putBytes

此外,我们还注意到,以上几种情形中使用的BlockId都是不同的,具体可以看一下BlockId.scala文件中关于各种BlockId的定义。
所以,接下来,我们的思路就很清晰了,以上面提到的对BlockManager的方法调用为切入点进行分析。

BlockManagerMaster.getLocations

这个方法用于获取指定的blockId对应的块所在存储位置。

def getLocations(blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = {
driverEndpoint.askSync[IndexedSeq[Seq[BlockManagerId]]](
  GetLocationsMultipleBlockIds(blockIds))

}

这里向driverEndpoint发送了一个GetLocations消息,注意这里的driverEndpoint并不是DriverEndpoint的端点引用,在SparkEnv的构造过程我们可以看到,这是一个BlockManagerMasterEndpoint端点的引用。所以我们需要在BlockManagerMasterEndpoint中寻找对于该消息的处理。注意,由于这里调用了ask方法,所以在服务端是由receiveAndReply方法来处理并响应的。

BlockManagerMasterEndpoint.receiveAndReply

我们截取了对GetLocations处理的部分代码

case GetLocationsMultipleBlockIds(blockIds) =>
  context.reply(getLocationsMultipleBlockIds(blockIds))

调用的是getLocations方法:

private def getLocations(blockId: BlockId): Seq[BlockManagerId] = {
  if (blockLocations.containsKey(blockId)) blockLocations.get(blockId).toSeq else Seq.empty
}

这个方法很简单,就是直接从缓存中查找blockId对应的位置,位置信息用BlockManagerId封装。那么缓存中的信息什么时候加进去呢?当然是写入新的block并更新block位置信息的时候,后面的会分析到。

BlockManager.putSingle

这个方法写入一个有单个对象组成的块,

def putSingle[T: ClassTag](
  blockId: BlockId,
  value: T,
  level: StorageLevel,
  tellMaster: Boolean = true): Boolean = {
putIterator(blockId, Iterator(value), level, tellMaster)
}

可以看到,把对象包装成了一个只有一个元素的迭代器,然后调用putIterator方法,最后调用doPutIterator方法

BlockManager.doPutIterator

上面的方法,最终调用了doPutIterator方法。

private def doPutIterator[T](
  blockId: BlockId,
  iterator: () => Iterator[T],
  level: StorageLevel,
  classTag: ClassTag[T],
  tellMaster: Boolean = true,
  keepReadLock: Boolean = false): Option[PartiallyUnrolledIterator[T]] = {
//
doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = keepReadLock) { info =>
  val startTimeMs = System.currentTimeMillis
  var iteratorFromFailedMemoryStorePut: Option[PartiallyUnrolledIterator[T]] = None
  // Size of the block in bytes
  var size = 0L
  // 如果存储等级中包含内存级别,那么我们优先写入内存中
  if (level.useMemory) {
    // Put it in memory first, even if it also has useDisk set to true;
    // We will drop it to disk later if the memory store can't hold it.
    // 对于不进行序列化的情况,只能存储内存中
    if (level.deserialized) {
      memoryStore.putIteratorAsValues(blockId, iterator(), classTag) match {
        case Right(s) =>
          size = s
        case Left(iter) =>
          // Not enough space to unroll this block; drop to disk if applicable
          // 内存空间不够时,如果存储等级允许磁盘,则存储到磁盘中
          if (level.useDisk) {
            logWarning(s"Persisting block $blockId to disk instead.")
            diskStore.put(blockId) { channel =>
              val out = Channels.newOutputStream(channel)
              // 注意对于存储到磁盘的情况一定是要序列化的
              serializerManager.dataSerializeStream(blockId, out, iter)(classTag)
            }
            size = diskStore.getSize(blockId)
          } else {
            iteratorFromFailedMemoryStorePut = Some(iter)
          }
      }
    } else { // !level.deserialized
      // 以序列化的形式进行存储
      memoryStore.putIteratorAsBytes(blockId, iterator(), classTag, level.memoryMode) match {
        case Right(s) =>
          size = s
        case Left(partiallySerializedValues) =>
          // Not enough space to unroll this block; drop to disk if applicable
          if (level.useDisk) {
            logWarning(s"Persisting block $blockId to disk instead.")
            diskStore.put(blockId) { channel =>
              val out = Channels.newOutputStream(channel)
              partiallySerializedValues.finishWritingToStream(out)
            }
            size = diskStore.getSize(blockId)
          } else {
            iteratorFromFailedMemoryStorePut = Some(partiallySerializedValues.valuesIterator)
          }
      }
    }
  } else if (level.useDisk) {// 对于存储级别不允许存入内存的情况,我们只能选择存入磁盘
    diskStore.put(blockId) { channel =>
      val out = Channels.newOutputStream(channel)
      // 存储到磁盘是一定要序列化的
      serializerManager.dataSerializeStream(blockId, out, iterator())(classTag)
    }
    size = diskStore.getSize(blockId)
  }

  // 获取刚刚刚刚写入的块的状态信息
  val putBlockStatus = getCurrentBlockStatus(blockId, info)
  val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
  // 如果块存储成功,那么进行接下来的动作
  if (blockWasSuccessfullyStored) {
    // Now that the block is in either the memory or disk store, tell the master about it.
    info.size = size
    // 向driver汇报块信息
    if (tellMaster && info.tellMaster) {
      reportBlockStatus(blockId, putBlockStatus)
    }
    // 更新任务度量系统中关于块信息的相关统计值
    addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)
    logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
    // 如果副本数大于1,那么需要进行额外的复制
    if (level.replication > 1) {
      val remoteStartTime = System.currentTimeMillis
      val bytesToReplicate = doGetLocalBytes(blockId, info)
      // [SPARK-16550] Erase the typed classTag when using default serialization, since
      // NettyBlockRpcServer crashes when deserializing repl-defined classes.
      // TODO(ekl) remove this once the classloader issue on the remote end is fixed.
      val remoteClassTag = if (!serializerManager.canUseKryo(classTag)) {
        scala.reflect.classTag[Any]
      } else {
        classTag
      }
      try {
        replicate(blockId, bytesToReplicate, level, remoteClassTag)
      } finally {
        bytesToReplicate.dispose()
      }
      logDebug("Put block %s remotely took %s"
        .format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
    }
  }
  assert(blockWasSuccessfullyStored == iteratorFromFailedMemoryStorePut.isEmpty)
  iteratorFromFailedMemoryStorePut
}
}

总结一下这段代码的主要逻辑:

  • 如果存储等级允许存入内存,那么优先存入内存中。根据存储的数据是否需要序列化分别选择调用memoryStore的不同方法。
  • 如果存储等级不允许内存,那么只能存入磁盘中,存入磁盘中的数据一定是经过序列化的,这点要注意。
  • 向BlockManagerMaster汇报刚写入的块的位置信息
  • 更新任务度量系统中关于块信息的相关统计值
  • 如果副本数大于1,那么需要进行额外的复制

从上面的步骤可以看到,在完成数据写入后,会通过rpc调用向BlockManagerMaster汇报块的信息,这也解答了blockManagerMaster.getLocations方法从内存的map结构中查询块的位置信息的来源。

单纯就存储数据来说,最重要的无疑是内存管理器MemoryStore和磁盘管理器DiskStore。
对于MemoryStore和DiskStore调用的存储方法有:

memoryStore.putIteratorAsValues
memoryStore.putIteratorAsBytes
diskStore.put(blockId: BlockId)(writeFunc: WritableByteChannel => Unit): Unit
diskStore.getSize(blockId)

blockManager.putBytes

我们再来接着看另一个写入方法,putBytes,即写入字节数组数据。它的实际写入的逻辑在doPutBytes方法中,我们看一下这个方法:

blockManager.doPutBytes

这个方法的主要步骤与doPutIterator方法差不多。只不过doPutIterator方法插入的是java对象,如果存储级别要求序列化或者存储到磁盘时,需要将对象序列化。

private def doPutBytes[T](
  blockId: BlockId,
  bytes: ChunkedByteBuffer,
  level: StorageLevel,
  classTag: ClassTag[T],
  tellMaster: Boolean = true,
  keepReadLock: Boolean = false): Boolean = {
doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = keepReadLock) { info =>
  val startTimeMs = System.currentTimeMillis
  // Since we're storing bytes, initiate the replication before storing them locally.
  // This is faster as data is already serialized and ready to send.
  // 启动副本复制
  val replicationFuture = if (level.replication > 1) {
    Future {
      // This is a blocking action and should run in futureExecutionContext which is a cached
      // thread pool. The ByteBufferBlockData wrapper is not disposed of to avoid releasing
      // buffers that are owned by the caller.
      replicate(blockId, new ByteBufferBlockData(bytes, false), level, classTag)
    }(futureExecutionContext)
  } else {
    null
  }

  val size = bytes.size

  // 如果缓存级别中包含内存,优先写入内存中
  if (level.useMemory) {
    // Put it in memory first, even if it also has useDisk set to true;
    // We will drop it to disk later if the memory store can't hold it.
    // 是否以序列化形式存储
    val putSucceeded = if (level.deserialized) {
      val values =
        serializerManager.dataDeserializeStream(blockId, bytes.toInputStream())(classTag)
      memoryStore.putIteratorAsValues(blockId, values, classTag) match {
        case Right(_) => true
        case Left(iter) =>
          // If putting deserialized values in memory failed, we will put the bytes directly to
          // disk, so we don't need this iterator and can close it to free resources earlier.
          iter.close()
          false
      }
    } else {
      // 如果以序列化格式存储,则不需要反序列化
      val memoryMode = level.memoryMode
      memoryStore.putBytes(blockId, size, memoryMode, () => {
        // 如果存在非直接内存,那么需要将数据拷贝一份到直接内存中
        if (memoryMode == MemoryMode.OFF_HEAP &&
            bytes.chunks.exists(buffer => !buffer.isDirect)) {
          bytes.copy(Platform.allocateDirectBuffer)
        } else {
          bytes
        }
      })
    }
    // 如果插入内存失败,并且允许写入磁盘的话,就将数据写入磁盘
    // 插入内存失败一般是因为内存不够引起
    if (!putSucceeded && level.useDisk) {
      logWarning(s"Persisting block $blockId to disk instead.")
      diskStore.putBytes(blockId, bytes)
    }
  } else if (level.useDisk) {// 如果只允许存储到磁盘,那就只能存到磁盘了
    // 存储到磁盘的数据一定是序列化的
    diskStore.putBytes(blockId, bytes)
  }

  // 刚刚插入的块的信息
  val putBlockStatus = getCurrentBlockStatus(blockId, info)
  val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
  if (blockWasSuccessfullyStored) {
    // Now that the block is in either the memory or disk store,
    // tell the master about it.
    info.size = size
    // 向driver端的BlockManagerMaster组件汇报块信息
    if (tellMaster && info.tellMaster) {
      reportBlockStatus(blockId, putBlockStatus)
    }
    // 更新任务度量值
    addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)
  }
  logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))
  if (level.replication > 1) {
    // Wait for asynchronous replication to finish
    // 等待之前启动的副本复制线程完成
    // 注意这里的超时被设成了无穷大
    try {
      ThreadUtils.awaitReady(replicationFuture, Duration.Inf)
    } catch {
      case NonFatal(t) =>
        throw new Exception("Error occurred while waiting for replication to finish", t)
    }
  }
  if (blockWasSuccessfullyStored) {
    None
  } else {
    Some(bytes)
  }
}.isEmpty
}

对于MemoryStore和DiskStore调用的方法有:

memoryStore.putBytes
diskStore.putBytes(blockId, bytes)

总结

综上,我们把一个spark作业运行过程中需要调用到BlockManager的时机以及调用的BlockManager的一些写入数据的方法大致整理了一下。BlockManager主要是通过内部的两个组件MemoryStore和DiskStore来管理数据向内存或磁盘写入的。此外DiskBlockManager组件主要是用来管理Block和磁盘文件之间的对应关系,分配文件路径,管理本地文件系统路径等作用。对于MemoryStore和DiskStore的调用主要有如下几个方法:

memoryStore.putIteratorAsValues
memoryStore.putIteratorAsBytes
diskStore.put(blockId: BlockId)(writeFunc: WritableByteChannel => Unit): Unit
diskStore.getSize(blockId)
memoryStore.putBytes
diskStore.putBytes(blockId, bytes)
06-10 21:39