在《深入理解Spark RDD——RDD依赖(构建DAG的关键)》一文,详细描述了RDD的宽窄依赖。RDD之间的依赖关系如果是Shuffle依赖,那么上游RDD该如何确定每个分区的输出将交由下游RDD的哪些分区呢?或者下游RDD的各个分区将具体依赖于上游RDD的哪些分区呢?Spark提供了分区计算器来解决这个问题。ShuffleDependencypartitioner属性的类型是Partitioner,抽象类Partitioner定义了分区计算器的接口规范,ShuffleDependency的分区取决于Partitioner的具体实现。Partitioner的定义如下:

abstract class Partitioner extends Serializable {
  def numPartitions: Int
  def getPartition(key: Any): Int
}

PartitionernumPartitions方法用于获取分区数量。PartitionergetPartition方法用于将输入的key映射到下游RDD的从0numPartitions-1这一范围中的某一个分区。

Partitioner有很多具体的实现类,它们的继承体系如图1所示。

Spark除图1中列出的Partitioner子类,还有很多Partitioner的匿名实现类,这里就不一一介绍了。本书以HashPartitioner(哈希分区计算器)为例,详细介绍Partitioner的实现。之所以选择对HashPartitioner的实现进行分析,一方面是由于其实现简洁明了,读者更容易理解;另一方面通过介绍HashPartitioner已经足够达到本书的目的。

HashPartitioner的实现见代码清单1

代码清单1        哈希分区计算器的实现

class HashPartitioner(partitions: Int) extends Partitioner {
  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

  def numPartitions: Int = partitions

  def getPartition(key: Any): Int = key match { // 计算出下游RDD的各个分区将具体处理哪些key
    case null => 0
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }

  override def equals(other: Any): Boolean = other match {
    case h: HashPartitioner =>
      h.numPartitions == numPartitions
    case _ =>
      false
  }

  override def hashCode: Int = numPartitions
}

根据代码清单1HashPartitioner增加了一个名为partitions的构造器参数作为分区数,重写的numPartitions方法只是返回了partitions。重写的getPartition方法实际以keyhashCodenumPartitions作为参数调用了Utils工具类的nonNegativeMod方法(具体实现可以参阅附录A)。nonNegativeMod方法将对keyhashCodenumPartitions进行取模运算,得到key对应的分区索引。使用哈希和取模的方式,可以方便的计算出下游RDD的各个分区将具体处理哪些key。由于上游RDD所处理的key的哈希值在取模后很可能产生数据倾斜,所以HashPartitioner并不是一个均衡的分区计算器。

根据HashPartitioner的实现,我们知道ShuffleDependency中的分区依赖关系不再是一对一的,而是取决于key,并且当前RDD的某个分区将可能依赖于ShuffleDependencyRDD的任何一个分区。经过以上分析,ShuffleDependency采用HashPartitioner后的分区依赖可以用图2来表示。

 

请继续阅读《深入理解Spark RDD——RDD信息对象

 

深入理解Spark RDD系列文章:

《深入理解Spark RDD——为什么需要RDD?》

深入理解Spark RDD——RDD实现的初次分析》

深入理解Spark RDD——RDD依赖(构建DAG的关键)》

深入理解Spark RDD——RDD分区计算器Partitioner

深入理解Spark RDD——RDD信息对象

08-21 05:58