在《深入理解Spark RDD——RDD依赖(构建DAG的关键)》一文,详细描述了RDD的宽窄依赖。RDD之间的依赖关系如果是Shuffle依赖,那么上游RDD该如何确定每个分区的输出将交由下游RDD的哪些分区呢?或者下游RDD的各个分区将具体依赖于上游RDD的哪些分区呢?Spark提供了分区计算器来解决这个问题。ShuffleDependency的partitioner属性的类型是Partitioner,抽象类Partitioner定义了分区计算器的接口规范,ShuffleDependency的分区取决于Partitioner的具体实现。Partitioner的定义如下:
abstract class Partitioner extends Serializable {
def numPartitions: Int
def getPartition(key: Any): Int
}
Partitioner的numPartitions方法用于获取分区数量。Partitioner的getPartition方法用于将输入的key映射到下游RDD的从0到numPartitions-1这一范围中的某一个分区。
Partitioner有很多具体的实现类,它们的继承体系如图1所示。
Spark除图1中列出的Partitioner子类,还有很多Partitioner的匿名实现类,这里就不一一介绍了。本书以HashPartitioner(哈希分区计算器)为例,详细介绍Partitioner的实现。之所以选择对HashPartitioner的实现进行分析,一方面是由于其实现简洁明了,读者更容易理解;另一方面通过介绍HashPartitioner已经足够达到本书的目的。
HashPartitioner的实现见代码清单1。
代码清单1 哈希分区计算器的实现
class HashPartitioner(partitions: Int) extends Partitioner {
require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
def numPartitions: Int = partitions
def getPartition(key: Any): Int = key match { // 计算出下游RDD的各个分区将具体处理哪些key
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
override def equals(other: Any): Boolean = other match {
case h: HashPartitioner =>
h.numPartitions == numPartitions
case _ =>
false
}
override def hashCode: Int = numPartitions
}
根据代码清单1,HashPartitioner增加了一个名为partitions的构造器参数作为分区数,重写的numPartitions方法只是返回了partitions。重写的getPartition方法实际以key的hashCode和numPartitions作为参数调用了Utils工具类的nonNegativeMod方法(具体实现可以参阅附录A)。nonNegativeMod方法将对key的hashCode和numPartitions进行取模运算,得到key对应的分区索引。使用哈希和取模的方式,可以方便的计算出下游RDD的各个分区将具体处理哪些key。由于上游RDD所处理的key的哈希值在取模后很可能产生数据倾斜,所以HashPartitioner并不是一个均衡的分区计算器。
根据HashPartitioner的实现,我们知道ShuffleDependency中的分区依赖关系不再是一对一的,而是取决于key,并且当前RDD的某个分区将可能依赖于ShuffleDependency的RDD的任何一个分区。经过以上分析,ShuffleDependency采用HashPartitioner后的分区依赖可以用图2来表示。
请继续阅读《深入理解Spark RDD——RDD信息对象》
深入理解Spark RDD系列文章:
《深入理解Spark RDD——RDD依赖(构建DAG的关键)》