TaskSchedulerImpl

上一篇讲到DAGScheduler根据shuffle依赖对作业的整个计算链划分成多个stage之后,就开始提交最后一个ResultStage,而由于stage之间的依赖关系,实际上最终是循着计算链从上到下依次提交stage的。每提交一个stage,就会将这个stage分成多个Task,并且会计算每个Task的偏向位置,将RDD和ShuffleDependency,TaskMetrics等对象序列化用于远程传输,最后把一个stage的所有Task包装成一个任务集,提交给TaskSchedulerImpl运行。本节就来分析一下这个TaskSchedulerImpl。首先把TaskSchedulerImpl的说明翻译一下:

TaskSchedulerImpl的主要作用是调度Task,内部通过调度后端进行实际任务的传输。不同的集群类型对应不同的具体的调度后端的实现,例如本地模式的调度后端实现是LocalSchedulerBackend,而任务调度器的实现只有一种就是TaskSchedulerImpl。TaskSchedulerImpl主要处理一些通用的逻辑,例如在多个作业之间决定调度顺序,执行推测执行的逻辑等等。

TaskSchedulerImpl在使用之前应该先调用initialize() 和 start()方法,然后再提交任务集。

这里插一句,这两个方法分别在上面地方调用呢?都是在SparkContext初始化的时候调用的,具体可以看SparkContext初始化代码。

关于线程安全的一些提示:由于会存在多线程同时提交任务的情况,所以面向外部的public的方法必须加锁以保持内部状态量,簿记量的一致性。
此外,一些调度后端(SchedulerBackend)的方法会先获取自身的锁,然后获取TaskSchedulerImpl对象的锁,所有应该避免在持有TaskSchedulerImpl对象的锁的情况下再尝试获取调度后端的锁,这样会造成死锁。实际上这句话的意思就是因为有些操作需要同时持有调度后端的锁和TaskSchedulerImpl锁,对于这种需要同时持有多把锁的情况,应该保持获取锁的顺序是一致的,这样就能避免出现死锁的情况。
举个例子:有些操作要同时获取A锁和B锁,如果方法m1的获取顺序是先获取A锁然后获取B锁,而m2是先B后A,如果同时又两个线程t1,t2分别执行方法m1和m2,有可能在某个时刻形成这样的情况:t1获取了A在等待B,而t2获取了B在等待A,这样互相等待而又不释放锁就形成死锁。

好了,接下来我们接着任务提交的逻辑继续分析。

submitTasks

override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
// 加锁,更新一些簿记量和状态量
this.synchronized {
  // 创建一个TaskSetManager,对Task集的进一步封装
  val manager = createTaskSetManager(taskSet, maxTaskFailures)
  val stage = taskSet.stageId
  // 更新stageId和TaskSetManager的映射关系,由于失败重试机制,一个stage可能会被多次尝试
  val stageTaskSets =
    taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
  // 将本次尝试的TaskSetManager添加到映射中
  stageTaskSets(taskSet.stageAttemptId) = manager
  // 检测是否有还在运行的stage尝试,如果有就是重复提交了,需要抛出一个异常
  val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
    ts.taskSet != taskSet && !ts.isZombie
  }
  if (conflictingTaskSet) {
    throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
      s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
  }
  // 将新创建的TaskSetManager添加到调度池中,调度池决定了如果存在多个TaskSet在排队应该如何进行排序,
  // 可以通过taskSet.properties设置队列名称,taskSet.properties是通过SparkContext的一个ThreadLocal变量设置的
  schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

  // 启动一个定时任务打印提示信息
  if (!isLocal && !hasReceivedTask) {
    starvationTimer.scheduleAtFixedRate(new TimerTask() {
      override def run() {
        if (!hasLaunchedTask) {
          logWarning("Initial job has not accepted any resources; " +
            "check your cluster UI to ensure that workers are registered " +
            "and have sufficient resources")
        } else {
          this.cancel()
        }
      }
    }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
  }
  hasReceivedTask = true
}
// 到这里,锁已经释放
// 通知调度后端有任务可以运行了,
backend.reviveOffers()
}
  • 获取锁,更新一些簿记量
  • 将新的任务集封装为TaskSetManager添加到调度池中
  • 调用SchedulerBackEnd,给任务分配可用资源

CoarseGrainedSchedulerBackend.reviveOffers

override def reviveOffers() {
driverEndpoint.send(ReviveOffers)
}

这个方法通过rpc模块给DriverEndPoint发送一个消息,本地进程内调用rpc方法,主要是为了代码模块的统一。 在DriverEndpoint.receive方法中,我们可以看到,在接收到ReviveOffers消息后,就会调用makeOffers方法,

DriverEndpoint.makeOffers

private def makeOffers() {
  // Make sure no executor is killed while some task is launching on it
  // 获取锁,同步
  val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
    // Filter out executors under killing
    // 过滤掉正在被杀死的executor
    val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
    // 把所有可用的executor封装成资源对象
    val workOffers = activeExecutors.map {
      case (id, executorData) =>
        new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
    }.toIndexedSeq
    // 把这些可用的资源交给TaskSchedulerImpl进行调度
    // TaskSchedulerImpl会综合考虑任务本地性,黑名单,调度池的调度顺序等因素,返回TaskDescription集合
    // TaskDescription对象是对一个Task的完整描述,
    // 包括序列化的任务数据,任务在哪个executor上运行,依赖文件和jar包等信息
    scheduler.resourceOffers(workOffers)
  }
  if (!taskDescs.isEmpty) {
    launchTasks(taskDescs)
  }
}

这个方法主要是将当前所有可用的资源(executor)封装成资源对象(WorkerOffer)交给TaskSchedulerImpl,TaskSchedulerImpl会综合考虑任务本地性,黑名单,调度池的调度顺序等因素,返回TaskDescription集合,TaskDescription对象是对一个Task的完整描述,包括序列化的任务数据,任务在哪个executor上运行,依赖文件和jar包等信息。

从这里也可以看出,调度后端的职责其实相对比较少主要是对executor的管理,以及调用rpc远端服务的引用发送任务数据,大部分的调度工作还是由TaskSchedulerImpl来完成。
接下来我们分析一下Task调度最重要的一个方法,TaskSchedulerImpl.resourceOffers

TaskSchedulerImpl.resourceOffers

// 这个方法由调度后端调用,调度后端会将可用的executor资源告诉TaskSchedulerImpl,
// TaskSchedulerImpl根据TaskSet优先级(调度池),黑名单,本地性等因素给出要实际运行的任务。
// 我们使用round-robin的方式将任务分配到各个executor上,以使得计算资源的 使用更均衡。
def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
// Mark each slave as alive and remember its hostname
// Also track if new executor is added
// 标记是否有新的可用executor加入
var newExecAvail = false
// 这个循环主要目的是两个:
// 1. 更新一些簿记量,如物理节点和executor的相互映射关系,机架和host的映射关系,host和executor上运行的任务信息等等
// 2. 检查是否有新的可用executor加入
for (o <- offers) {
  if (!hostToExecutors.contains(o.host)) {
    hostToExecutors(o.host) = new HashSet[String]()
  }
  if (!executorIdToRunningTaskIds.contains(o.executorId)) {
    hostToExecutors(o.host) += o.executorId
    executorAdded(o.executorId, o.host)
    executorIdToHost(o.executorId) = o.host
    executorIdToRunningTaskIds(o.executorId) = HashSet[Long]()
    newExecAvail = true
  }
  for (rack <- getRackForHost(o.host)) {
    hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
  }
}

// Before making any offers, remove any nodes from the blacklist whose blacklist has expired. Do
// this here to avoid a separate thread and added synchronization overhead, and also because
// updating the blacklist is only relevant when task offers are being made.
// 触发黑名单的超时检查,被加入黑明单的节点或executor是由一定超时时间的,
// 在超时时间内不能像他们提交任务,而过了超时时间,这些资源将被重新投入使用
blacklistTrackerOpt.foreach(_.applyBlacklistTimeout())

// 根据最新的黑名单过滤掉在黑名单中的计算资源,包括host和executor
val filteredOffers = blacklistTrackerOpt.map { blacklistTracker =>
  offers.filter { offer =>
    !blacklistTracker.isNodeBlacklisted(offer.host) &&
      !blacklistTracker.isExecutorBlacklisted(offer.executorId)
  }
}.getOrElse(offers)

// 对资源进行混洗,使得分配更加均匀,使用scala库的Random进行混洗
val shuffledOffers = shuffleOffers(filteredOffers)
// Build a list of tasks to assign to each worker.
// 每个executor能分配多少个任务,cores / CPUS_PER_TASK
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
// 每个executor提供的cpu核数
val availableCpus = shuffledOffers.map(o => o.cores).toArray
// 通过调度池对所有的任务集按优先级进行排序,获取排序后的任务集
val sortedTaskSets = rootPool.getSortedTaskSetQueue
// 如果有新的executor加入,需要通知每个TaskSetManager
for (taskSet <- sortedTaskSets) {
  logDebug("parentName: %s, name: %s, runningTasks: %s".format(
    taskSet.parent.name, taskSet.name, taskSet.runningTasks))
  if (newExecAvail) {
    taskSet.executorAdded()
  }
}

// Take each TaskSet in our scheduling order, and then offer it each node in increasing order
// of locality levels so that it gets a chance to launch local tasks on all of them.
// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
// 对于每一个任务集,为其分配资源
for (taskSet <- sortedTaskSets) {
  var launchedAnyTask = false
  var launchedTaskAtCurrentMaxLocality = false
  // 本地性从低到高的顺序
  for (currentMaxLocality <- taskSet.myLocalityLevels) {
    // 每个本地性级别会进行多轮分配,
    // 每一轮依次轮询每个executor,每个executor分配一个任务,
    // 这样一轮下来每个executor都会分配到一个任务,显然大多数情况下,executor的资源是不会被占满的
    // 没关系,我们会接着进行第二轮分配,知道没有资源或者在当前的本地性级别下任务被分配完了,就跳出循环
    do {
      launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
        taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)
      launchedAnyTask |= launchedTaskAtCurrentMaxLocality
    } while (launchedTaskAtCurrentMaxLocality)
  }
  if (!launchedAnyTask) {
    taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
  }
}

if (tasks.size > 0) {
  hasLaunchedTask = true
}
return tasks
}
  • 更新一些簿记量,如物理节点和executor的相互映射关系,机架和host的映射关系,host和executor上运行的任务信息等等; 检查是否有新的可用executor加入
  • 触发黑名单的超时检查,被加入黑明单的节点或executor是由一定超时时间的,在超时时间内不能像他们提交任务,而过了超时时间,这些资源将被重新投入使用; 根据最新的黑名单过滤掉在黑名单中的计算资源,包括host和executor
  • 通过调度池对所有的任务集按优先级进行排序,获取排序后的任务集
  • 对于每一个任务集,按照对executor进行round-robin的方式分配任务,会进行多轮分配,每一轮依次轮询所有的executor,为每一个executor分配一个符合本地性要求的任务

TaskSchedulerImpl.resourceOfferSingleTaskSet

private def resourceOfferSingleTaskSet(
  taskSet: TaskSetManager,
  maxLocality: TaskLocality,
  shuffledOffers: Seq[WorkerOffer],
  availableCpus: Array[Int],
  tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {
var launchedTask = false
// nodes and executors that are blacklisted for the entire application have already been
// filtered out by this point
// 轮询每一个executor,分别为其分配一个Task
for (i <- 0 until shuffledOffers.size) {
  val execId = shuffledOffers(i).executorId
  val host = shuffledOffers(i).host
  // 检查这个executor上的cpu资源是否够用
  if (availableCpus(i) >= CPUS_PER_TASK) {
    try {
      // 根据最大允许的本地性级别取出能够在这个executor上执行的任务,
      for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
        // 如果能够找出一个可以在这个executor上运行的符合本地性要求的任务,
        // 将这个任务加入传进来的集合中
        tasks(i) += task
        val tid = task.taskId
        taskIdToTaskSetManager(tid) = taskSet
        taskIdToExecutorId(tid) = execId
        executorIdToRunningTaskIds(execId).add(tid)
        availableCpus(i) -= CPUS_PER_TASK
        assert(availableCpus(i) >= 0)
        launchedTask = true
      }
    } catch {
      case e: TaskNotSerializableException =>
        logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
        // Do not offer resources for this task, but don't throw an error to allow other
        // task sets to be submitted.
        return launchedTask
    }
  }
}
return launchedTask
}

这个方法就是对所有可用的executor进行一轮round-robin方式的分配,一轮分配中,每个executor最多只能得到一个任务,这样做是为了尽量将任务“打散”,均匀第“撒到”所有executor上。

TaskSetManager.resourceOffer

@throws[TaskNotSerializableException]
def resourceOffer(
  execId: String,
  host: String,
  maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription] =
{
// 首先检查黑名单
val offerBlacklisted = taskSetBlacklistHelperOpt.exists { blacklist =>
  blacklist.isNodeBlacklistedForTaskSet(host) ||
    blacklist.isExecutorBlacklistedForTaskSet(execId)
}

if (!isZombie && !offerBlacklisted) {
  val curTime = clock.getTimeMillis()

  var allowedLocality = maxLocality

  // 根据本地性等待时间重新计算本地性级别
  if (maxLocality != TaskLocality.NO_PREF) {
    allowedLocality = getAllowedLocalityLevel(curTime)
    if (allowedLocality > maxLocality) {
      // We're not allowed to search for farther-away tasks
      allowedLocality = maxLocality
    }
  }

  // 找出一个在指定的本地性级别下,能够在这个executor上运行的任务
  dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) =>
    // Found a task; do some bookkeeping and return a task description
    val task = tasks(index)
    // 分配一个taskId
    val taskId = sched.newTaskId()
    // Do various bookkeeping
    // 更新一些簿记量
    copiesRunning(index) += 1
    // task的尝试次数
    val attemptNum = taskAttempts(index).size
    val info = new TaskInfo(taskId, index, attemptNum, curTime,
      execId, host, taskLocality, speculative)
    taskInfos(taskId) = info
    // 记录每次尝试的任务信息
    taskAttempts(index) = info :: taskAttempts(index)
    // Update our locality level for delay scheduling
    // NO_PREF will not affect the variables related to delay scheduling
    // 更新本地性信息和事件信息用于计算本地性等待时间
    // 而对于没有本地性偏好的任务则不会影响这些簿记量
    if (maxLocality != TaskLocality.NO_PREF) {
      currentLocalityIndex = getLocalityIndex(taskLocality)
      lastLaunchTime = curTime
    }
    // Serialize and return the task
    // 对task进行序列化
    val serializedTask: ByteBuffer = try {
      ser.serialize(task)
    } catch {
      // If the task cannot be serialized, then there's no point to re-attempt the task,
      // as it will always fail. So just abort the whole task-set.
      case NonFatal(e) =>
        val msg = s"Failed to serialize task $taskId, not attempting to retry it."
        logError(msg, e)
        abort(s"$msg Exception during serialization: $e")
        throw new TaskNotSerializableException(e)
    }
    // 如果序列化后的体积超过指定阈值,那么会打印一条警告信息
    if (serializedTask.limit() > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
      !emittedTaskSizeWarning) {
      emittedTaskSizeWarning = true
      logWarning(s"Stage ${task.stageId} contains a task of very large size " +
        s"(${serializedTask.limit() / 1024} KB). The maximum recommended task size is " +
        s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
    }
    // 更新调度池中的运行任务统计的簿记量
    addRunningTask(taskId)

    // We used to log the time it takes to serialize the task, but task size is already
    // a good proxy to task serialization time.
    // val timeTaken = clock.getTime() - startTime
    val taskName = s"task ${info.id} in stage ${taskSet.id}"
    logInfo(s"Starting $taskName (TID $taskId, $host, executor ${info.executorId}, " +
      s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit()} bytes)")

    // 通过dagScheduler给事件总线头第一个任务开始的事件
    sched.dagScheduler.taskStarted(task, info)
    // 封装成一个TaskDescription对象,并返回给上层调用
    new TaskDescription(
      taskId,
      attemptNum,
      execId,
      taskName,
      index,
      addedFiles,
      addedJars,
      task.localProperties,
      serializedTask)
  }
} else {
  None
}
}

这个方法的作用是对给定的executor和本地性级别,分配一个符合要求的任务给这个executor。 最终任务被封装成TaskDescription对象。

小结

对任务分配做一个小结。在给定的计算资源上分配合适的任务,这个工作主要是由TaskScheduler和TaskSetManager两个类协同完成的。而任务本地性的维护与分配时的检查工作是在TaskSetManager中完成的。
接下来,我们分析一下获取到可以实际运行的任务后,调度后端是怎么把这些任务发送到制定的executor上执行的。

DriverEndpoint.makeOffers

首先还得接着回到DriverEndpoint.makeOffers方法,makeOffers方法中通过调用TaskSchedulerImpl.resourceOffers方法切入TaskSchedulerImpl,然后就是TaskSchedulerImpl在做任务分配的工作,最终TaskSchedulerImpl将分配好的任务以TaskDescription的封装形式返回给DriverEndpoint(DriverEndpoint是调度后端的一个内部类),然后紧接着调用DriverEndpoint.launchTasks方法将这些任务传给相应的executor执行。

DriverEndpoint.launchTasks

private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
  for (task <- tasks.flatten) {
    val serializedTask = TaskDescription.encode(task)
    if (serializedTask.limit() >= maxRpcMessageSize) {
      scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
        try {
          var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
            "spark.rpc.message.maxSize (%d bytes). Consider increasing " +
            "spark.rpc.message.maxSize or using broadcast variables for large values."
          msg = msg.format(task.taskId, task.index, serializedTask.limit(), maxRpcMessageSize)
          // 注意,这里只要有一个task体积超过阈值,就将该task所属的taskSet取消掉
          // 为什么这么做呢?因为其实一个taskset中的所有task的体积都是一样的,只是partition序号不同而已,
          // 所以一个task体积超过阈值,taskset中的其他task也必然超过阈值,
          // 所以没有必要再尝试其他的task, 直接把taskset取消掉更高效
          taskSetMgr.abort(msg)
        } catch {
          case e: Exception => logError("Exception in error callback", e)
        }
      }
    }
    else {
      val executorData = executorDataMap(task.executorId)
      // 维护cpu资源信息
      executorData.freeCores -= scheduler.CPUS_PER_TASK

      logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
        s"${executorData.executorHost}.")

      // 通过rpc发送任务到指定的executor上
      executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
    }
  }
}

这个方法跟简单,主要就是对TaskDescription进行序列化,然后检查体积是否超过阈值,如果没超过阈值就调用rpc服务引用,将任务发送到指定的executor上。

总结

好了,经过漫长的调用,终于我们的任务要离开driver,驶向executor了,回顾一下任务在driver中从诞生到最终发送的过程,主要有一下几个步骤:

  • DAGScheduler对作业计算链按照shuffle依赖划分多个stage,提交一个stage根据个stage的一些信息创建多个Task,包括ShuffleMapTask和ResultTask, 并封装成一个任务集(TaskSet),把这个任务集交给TaskScheduler
  • TaskSchedulerImpl将接收到的任务集加入调度池中,然后通知调度后端SchedulerBackend
  • CoarseGrainedSchedulerBackend收到新任务提交的通知后,检查下现在可用 executor有哪些,并把这些可用的executor交给TaskSchedulerImpl
  • TaskSchedulerImpl根据获取到的计算资源,根据任务本地性级别的要求以及考虑到黑名单因素,按照round-robin的方式对可用的executor进行轮询分配任务,经过多个本地性级别分配,多轮分配后最终得出任务与executor之间的分配关系,并封装成TaskDescription形式返回给SchedulerBackend
  • SchedulerBackend拿到这些分配关系后,就知道哪些任务该发往哪个executor了,通过调用rpc接口将任务通过网络发送即可。
06-04 04:18