本文介绍了在加入RDD随机分区程序行为的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想加入两个数据集。一种类型的(ID,salesRecord)另一个(ID,姓名)。
第一个数据集是由HashPartitioner分区,二是通过自定义分区程序分区。当我加入这些RDDS通过ID和尝试看看哪个分区-ER信息保留我随意看到一些次joinRDD显示自定义分区,有时HashPartitioner。我收到不同的效果partioner同时改变分区也数。

根据学习火花书,rdd1.join(RDD2)保留从RDD1集分区资讯

下面是code。

  VAL hashPartitionedRDD = cusotmerIDSalesRecord.partitionBy(新HashPartitioner(10))
的println(hashPartitionedRDD的分区+ hashPartitionedRDD.partitioner)// HashParitioner的实例见此VAL customPartitionedRDD = customerIdNamePair1.partitionBy(新CustomerPartitioner)
的println(customPartitionedRDD分区+ customPartitionedRDD.partitioner)// CustomPartitioner的实例见此VAL expectedHash = hashPartitionedRDD.join(customPartitionedRDD)
VAL expectedCustom = customPartitionedRDD.join(hashPartitionedRDD)的println(预期哈希+ expectedHash.partitioner)//自定义分区程序的实例见此
的println(预期自定义+ expectedCustom.partitioner)//自定义分区程序的实例见此//只是为了更增加它当两个数据集的分区数我等于我看到了相反的结果。即
// expectedHash显示CustomPartitioner和
// expectedCustom显示Hashpartitioner实例。


解决方案

加入 方法内部调用, Partitioner.defaultPartitioner() 方法。

根据定义的 defaultPartitioner

 高清defaultPartitioner(RDD:RDD [_],其他:RDD [_] *):分区程序= {
    VAL bySize =(SEQ(RDD)++等).sortBy(_。partitions.size).reverse
    为(为r - bySize如果r.partitioner.isDefined&放大器;&放大器; r.partitioner.get.numPartitions大于0){
      返回r.partitioner.get
    }
    如果(rdd.context.conf.contains(spark.default.parallelism)){
      新HashPartitioner(rdd.context.defaultParallelism)
    }其他{
      新HashPartitioner(bySize.head.partitions.size)
    }
  }
}

如果您在网上仔细一看:

  VAL bySize =(SEQ(RDD)++等).sortBy(_。partitions.size).reverse

它启动 for循环(或搜索)的基础上按降序排列分区的数量。所以,如果这两个 RDDS 有自己的partitioners,在一个更高的分区数将选择即可。

修改:您提出了关于看到逆转行为的问题是相当简单的。在这里,如果两者都具有相同的分区数,在其他会在 SEQ 的顶部。因此,争论的分区 RDD 将被选择。

 (SEQ(RDD)++等).sortBy(_。partitions.size).reverse

这行为是可以解释的,但也许并不直观。

I am trying to join two data sets. One of type (Id, salesRecord) another (Id,Name).First data-set is partitioned by HashPartitioner and Second is partitioned by Custom Partitioner. When I join these RDDs by id and try to see which partition-er information is retained I randomly see that some times joinRDD displays custom partitioner and sometimes HashPartitioner. I received different partioner results while changing the number of partitions also.

According to the Learning Spark book, rdd1.join(rdd2) retains the partition info from the rdd1.

Here is the code.

  val hashPartitionedRDD = cusotmerIDSalesRecord.partitionBy(new HashPartitioner(10))
println("hashPartitionedRDD's partitioner " + hashPartitionedRDD.partitioner) // Seeing Instance of HashParitioner

val customPartitionedRDD = customerIdNamePair1.partitionBy(new CustomerPartitioner)
println("customPartitionedRDD partitioner " + customPartitionedRDD.partitioner) // Seeing instance of CustomPartitioner

val expectedHash = hashPartitionedRDD.join(customPartitionedRDD)
val expectedCustom = customPartitionedRDD.join(hashPartitionedRDD)

println("Expected Hash " + expectedHash.partitioner) // Seeing instance of Custom Partitioner
println("Expected Custom " + expectedCustom.partitioner) //Seeing instance of Custom Partitioner

// Just to add more to it when number of partitions of both the data sets I made equal I am seeing the reverse results. i.e. 
// expectedHash shows CustomPartitioner and 
// expectedCustom shows Hashpartitioner Instance.
解决方案

The join method internally calls, Partitioner.defaultPartitioner() method.

Based on definition of defaultPartitioner:

def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
    val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
    for (r <- bySize if r.partitioner.isDefined && r.partitioner.get.numPartitions > 0) {
      return r.partitioner.get
    }
    if (rdd.context.conf.contains("spark.default.parallelism")) {
      new HashPartitioner(rdd.context.defaultParallelism)
    } else {
      new HashPartitioner(bySize.head.partitions.size)
    }
  }
}

If you look closely in line:

val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse

It starts the for-loop (or search) based on number of partitions in descending order. So if both RDDs have their own partitioners, the one with higher number of partitions will be chosen.

EDIT: The issue you raised about seeing reverse behavior is quite simple. Here, if both have same number of partitions, the others will come at the top of the Seq. So, the partitioner of the argument RDD will be chosen.

(Seq(rdd) ++ others).sortBy(_.partitions.size).reverse

This behavior is explainable, but perhaps not intuitive.

这篇关于在加入RDD随机分区程序行为的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-28 21:38