DAGScheduler最终创建了task set,并提交给了taskScheduler。那先得看看task是怎么定义和执行的。
Task是execution执行的一个单元。

Task: executor执行的基本单元,也是spark操作的最小单位。和java executor的task基本上是相同含义的。
/**
* A unit of execution. We have two kinds of Task's in Spark:
* - [[org.apache.spark.scheduler.ShuffleMapTask]]
* - [[org.apache.spark.scheduler.ResultTask]]
*
* A Spark job consists of one or more stages. The very last stage in a job consists of multiple
* ResultTasks, while earlier stages consist of ShuffleMapTasks. A ResultTask executes the task
* and sends the task output back to the driver application. A ShuffleMapTask executes the task
* and divides the task output to multiple buckets (based on the task's partitioner).
*
* @param stageId id of the stage this task belongs to
* @param partitionId index of the number in the RDD
*/
private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) extends Serializable {
主要属性:
final def run(attemptId: Long): T = {
context = new TaskContext(stageId, partitionId, attemptId, runningLocally = false)
context.taskMetrics.hostname = Utils.localHostName()
taskThread = Thread.currentThread()
if (_killed) {
kill(interruptThread = false)
}
runTask(context)
def runTask(context: TaskContext): T
// Map output tracker epoch. Will be set by TaskScheduler.
var epoch: Long = -1

var metrics: Option[TaskMetrics] = None

// Task context, to be initialized in run().
@transient protected var context: TaskContext = _
// The actual Thread on which the task is running, if any. Initialized in run().
@volatile @transient private var taskThread: Thread = _

/**
* Handles transmission of tasks and their dependencies, because this can be slightly tricky. We
* need to send the list of JARs and files added to the SparkContext with each task to ensure that
* worker nodes find out about it, but we can't make it part of the Task because the user's code in
* the task might depend on one of the JARs. Thus we serialize each task as multiple objects, by
* first writing out its dependencies.
*/
private[spark] object Task {
/**
* Serialize a task and the current app dependencies (files and JARs added to the SparkContext)
*/
def serializeWithDependencies(
/**
* Deserialize the list of dependencies in a task serialized with serializeWithDependencies,
* and return the task itself as a serialized ByteBuffer. The caller can then update its
* ClassLoaders and deserialize the task.
*
* @return (taskFiles, taskJars, taskBytes)
*/
def deserializeWithDependencies(serializedTask: ByteBuffer)
ShuffleMapTask: 它是对应于transformation操作的task,主要更能是解决提供action操作所需要的数据。依旧是它是被action依赖的task,需要提前执行。
/**
* A ShuffleMapTask divides the elements of an RDD into multiple buckets (based on a partitioner
* specified in the ShuffleDependency).
*
* See [[org.apache.spark.scheduler.Task]] for more information.
*
* @param stageId id of the stage this task belongs to
* @param taskBinary broadcast version of of the RDD and the ShuffleDependency. Once deserialized,
* the type should be (RDD[_], ShuffleDependency[_, _, _]).
* @param partition partition of the RDD this task is associated with
* @param locs preferred task execution locations for locality scheduling
*/
private[spark] class ShuffleMapTask(
stageId: Int,
taskBinary: Broadcast[Array[Byte]],
partition: Partition,
@transient private var locs: Seq[TaskLocation])
extends Task[MapStatus](stageId, partition.index) with Logging {
override def runTask(context: TaskContext): MapStatus = {
// Deserialize the RDD using the broadcast variable.
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)

metrics = Some(context.taskMetrics)
var writer: ShuffleWriter[Any, Any] = null
try {
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
return writer.stop(success = true).get --HashShuffleWriter
} catch {
case e: Exception =>
if (writer != null) {
writer.stop(success = false)
}
throw e
} finally {
context.markTaskCom pleted()
}
}
ResultTask: 它是与action操作对应的,也就是依赖树的叶子节点上。
/**
* A task that sends back the output to the driver application.
*
* See [[Task]] for more information.
*
* @param stageId id of the stage this task belongs to
* @param taskBinary broadcasted version of the serialized RDD and the function to apply on each
* partition of the given RDD. Once deserialized, the type should be
* (RDD[T], (TaskContext, Iterator[T]) => U).
* @param partition partition of the RDD this task is associated with
* @param locs preferred task execution locations for locality scheduling
* @param outputId index of the task in this job (a job can launch tasks on only a subset of the
* input RDD's partitions).
*/
private[spark] class ResultTask[T, U](
stageId: Int,
taskBinary: Broadcast[Array[Byte]],
partition: Partition,
@transient locs: Seq[TaskLocation],
val outputId: Int)
extends Task[U](stageId, partition.index) with Serializable {
override def runTask(context: TaskContext): U = {
// Deserialize the RDD and the func using the broadcast variables.
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)

metrics = Some(context.taskMetrics)
try {
func(context, rdd.iterator(partition, context))
} finally {
context.markTaskCompleted()
}
}







05-28 01:36