Spark数据本地化是在哪个阶段计算首选位置的?

先看一下DAGScheduler的注释,可以看到DAGScheduler除了Stage和Task的划分外,还做了缓存的跟踪和首选运行位置的计算。

DAGScheduler注释: 

DAGScheduler的运行时机

DAGScheduler运行时机:Driver端初始化SparkContext时。DAGScheduler是在整个Spark Application的入口即 SparkContext中声明并实例化的。在实例化DAGScheduler之前,巳经实例化了SchedulerBackend和底层调度器 TaskScheduler。

如果是SQL任务的话,SparkSQL通过Catalyst(Spark SQL的核心是Catalyst优化器)将SQL先翻译成逻辑计划再翻译成物理计划,再转换成RDD的操作。之后运行时再通过DAGScheduler做RDD任务的划分和调度。

【大数据面试知识点】Spark的DAGScheduler-LMLPHP

DAGScheduler如何划分Stage的?

用户提交的计算任务是一个由RDD依赖构成的DAG,Spark会把RDD的依赖以shuffle依赖为边界划分成多个Stage,这些Stage之间也相互依赖,形成了Stage的DAG。然后,DAGScheduler会按依赖关系顺序执行这些Stage。

要是把RDD依赖构成的DAG看成是逻辑执行计划(logic plan),那么,可以把Stage看成物理执行计划,为了更好的理解这个概念,我们来看一个例子。

下面的代码用来对README.md文件中包含整数值的单词进行计数,并打印RDD之间的依赖关系(Lineage):

scala> val counts = sc.textFile("README.md")
               .flatMap(x=>x.split("\\W+"))
               .filter(_.matches(".*\\d.*"))
               .map(x=>(x,1))
               .reduceByKey(_+_)
 // 调用一个action函数,用来触发任务的提交和执行
 scala> counts.collect()
 ​
 // 打印RDD的依赖关系(Lineage)
 scala> counts.toDebugString
 res7: String =
 (2) ShuffledRDD[17] at reduceByKey at <console>:24 []
  +-(2) MapPartitionsRDD[16] at map at <console>:24 []
     |  MapPartitionsRDD[15] at filter at <console>:24 []
     |  MapPartitionsRDD[14] at flatMap at <console>:24 []
     |  README.md MapPartitionsRDD[13] at textFile at <console>:24 []
     |  README.md HadoopRDD[12] at textFile at <console>:24 []

DAGScheduler会根据Shuffle划分前后两个Stage:即StageShuffleMapStage和ResultStage

ShuffleMapStage

先看下ShuffleMapStage的注释,核心就是再讲ShuffleMapStage是做ShuffleWrite的Stage,Stage中是算子的pipline。

ShuffleMapStages是在DAG执行过程中产生的Stage,用来为Shuffle产生数据。ShuffleMapStages发生在每个Shuffle操作之前,在Shuffle之前可能有多个窄转换操作,比如:map,filter,这些操作可以形成流水线(pipeline)。当执行ShuffleMapStages时,会产生Map的输出文件,这些文件会被随后的Reduce任务使用。

ShuffleMapStages也可以作为Jobs,通过DAGScheduler.submitMapStage函数单独进行提交。对于这样的Stages,会在变量mapStageJobs中跟踪提交它们的ActiveJobs。 要注意的是,可能有多个ActiveJob尝试计算相同的ShuffleMapStages。

它为一个shuffle过程产生map操作的输出文件。它也可能是自适应查询规划/自适应调度工作的最后阶段。

ResultStage

再看ResultStage的注释

ResultStage是Job的最后一个Stage,该Stage是基于执行action函数的rdd来创建的。该Stage用来计算一个action操作的结果。该类的声明如下:

 private[spark] class ResultStage(
     id: Int,
     rdd: RDD[_],
     val func: (TaskContext, Iterator[_]) => _,
     val partitions: Array[Int],
     parents: List[Stage],   //依赖的父Stage
     firstJobId: Int,
     callSite: CallSite)
   extends Stage(id, rdd, partitions.length, parents, firstJobId, callSite) {

为了计算action操作的结果,ResultStage会在目标RDD的一个或多个分区上使用函数:func。需要计算的分区id集合保存在成员变量:partitions中。但对于有些action操作,比如:first(),take()等,函数:func可能不会在所有分区上使用。

另外,在提交Job时,会先创建ResultStage。但在提交Stage时,会先递归找到该Stage依赖的父级Stage,并先提交父级Stage。如下图所示:

【大数据面试知识点】Spark的DAGScheduler-LMLPHP

举个例子:【大数据面试知识点】Spark的DAGScheduler-LMLPHP

思考题 

如下rdd运算,为什么最终只划分了3个Stage

scala> val rdd1 = sc.textFile("/root/tmp/a.txt",3).flatMap(x=>x.split(",")).map(x=>(x,1)).reduceByKey((a,b)=>a+b)
val rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:1

scala> val rdd2 = sc.textFile("/root/tmp/a.txt",3).flatMap(_.split(",")).map((_,1)).reduceByKey(_+_)
val rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[9] at reduceByKey at <console>:1

scala> val rdd3 = rdd1.join(rdd2)
val rdd3: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[12] at join at <console>:1

scala> val rdd4 = rdd3.groupByKey()
val rdd4: org.apache.spark.rdd.RDD[(String, Iterable[(Int, Int)])] = MapPartitionsRDD[13] at groupByKey at <console>:1

scala> rdd4.collect().foreach(println)
(c,Seq((2,2)))                                                                  
(d,Seq((1,1)))
(a,Seq((2,2)))
(b,Seq((1,1)))

scala> rdd4.toDebugString
val res8: String =
(3) MapPartitionsRDD[13] at groupByKey at <console>:1 []
 |  MapPartitionsRDD[12] at join at <console>:1 []
 |  MapPartitionsRDD[11] at join at <console>:1 []
 |  CoGroupedRDD[10] at join at <console>:1 []
 |  ShuffledRDD[4] at reduceByKey at <console>:1 []
 +-(3) MapPartitionsRDD[3] at map at <console>:1 []
    |  MapPartitionsRDD[2] at flatMap at <console>:1 []
    |  /root/tmp/a.txt MapPartitionsRDD[1] at textFile at <console>:1 []
    |  /root/tmp/a.txt HadoopRDD[0] at textFile at <console>:1 []
 |  ShuffledRDD[9] at reduceByKey at <console>:1 []
 +-(3) MapPartitionsRDD[8] at map at <console>:1 []
    |  MapPartitionsRDD[7] at flatMap at <console>:1 []
    |  /root/tmp/a.txt MapPartitionsRDD[6] at textFile at <console>:1 []
    |  /root/t...

【大数据面试知识点】Spark的DAGScheduler-LMLPHP

01-01 19:20