本文介绍了如何在不产生.rdd成本的情况下检查Spark DataFrame的分区数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

关于如何获取n RDDDataFrame的分区数存在很多问题:答案总是:

There are a number of questions about how to obtain the number of partitions of a n RDD and or a DataFrame : the answers invariably are:

 rdd.getNumPartitions

 df.rdd.getNumPartitions

不幸的是,这是对DataFrame昂贵操作,因为

Unfortunately that is an expensive operation on a DataFrame because the

 df.rdd

需要从DataFramerdd的转换.这是运行时间的顺序

requires conversion from the DataFrame to an rdd. This is on the order of the time it takes to run

 df.count

我正在编写可选 repartitioncoalesce sa DataFrame的逻辑-基于分区的当前数量是否在范围内可接受的值,或者低于或高于它们.

I am writing logic that optionally repartition's or coalesce's a DataFrame - based on whether the current number of partitions were within a range of acceptable values or instead below or above them.

  def repartition(inDf: DataFrame, minPartitions: Option[Int],
       maxPartitions: Option[Int]): DataFrame = {
    val inputPartitions= inDf.rdd.getNumPartitions  // EXPENSIVE!
    val outDf = minPartitions.flatMap{ minp =>
      if (inputPartitions < minp) {
        info(s"Repartition the input from $inputPartitions to $minp partitions..")
        Option(inDf.repartition(minp))
      } else {
        None
      }
    }.getOrElse( maxPartitions.map{ maxp =>
      if (inputPartitions > maxp) {
        info(s"Coalesce the input from $inputPartitions to $maxp partitions..")
        inDf.coalesce(maxp)
      } else inDf
    }.getOrElse(inDf))
    outDf
  }

但是我们不能以这种方式为每个 DataFrame承担rdd.getNumPartitions的费用.

But we can not afford to incur the cost of the rdd.getNumPartitions for every DataFrame in this manner.

没有任何方法可以获取此信息-例如从在线/临时catalogregistered表中查询?

Is there not any way to obtain this information - e.g. from querying the online/temporary catalog for the registered table maybe?

更新:Spark GUI显示DataFrame.rdd操作所花费的时间与作业中最长的sql一样长.我将重新运行作业,并在此处添加屏幕截图.

Update The Spark GUI showed the DataFrame.rdd operation as taking as long as the longest sql in the job. I will re-run the job and attach the screenshot in a bit here.

以下只是一个 testcase :它使用的是生产数据量的一小部分.最长的sql仅五分钟-而这也是花费时间 的方式(请注意,sql不是 在这里:它还必须随后执行,从而有效地使累积执行时间加倍.

The following is just a testcase : it is using a small fraction of the data size of that in production. The longest sql is only five minutes - and this one is on its way to spending that amount of time as well (note that the sql is not helped out here: it also has to execute subsequently thus effectively doubling the cumulative execution time).

我们可以看到,在DataFrameUtils行30上的.rdd操作(如上面的代码段所示)花费了5.1分钟-而save操作 still 花费了5.2分钟之后-i.e.就后续save的执行时间而言,我们通过执行.rdd节省了 not 的任何时间.

We can see that the .rdd operation at DataFrameUtils line 30 (shown in the snippet above) takes 5.1mins - and yet the save operation still took 5.2 mins later -i.e. we did not save any time by doing the .rdd in terms of the execution time of the subsequent save.

推荐答案

rdd.getNumPartitions中没有rdd组件的固有成本,因为从不评估返回的RDD.

There is no inherent cost of rdd component in rdd.getNumPartitions, because returned RDD is never evaluated.

尽管您可以凭经验轻松地确定这一点,但可以使用调试器(我将其留给读者练习),或者确定在基本情况下没有触发任何作业

While you can easily determine this empirically, using debugger (I'll leave this as an exercise for the reader), or establishing that no jobs are triggered in the base case scenario

Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/

Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val ds = spark.read.text("README.md")
ds: org.apache.spark.sql.DataFrame = [value: string]

scala> ds.rdd.getNumPartitions
res0: Int = 1

scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty // Check if there are any known jobs
res1: Boolean = true

说服您可能还不够.因此,让我们以更系统的方式进行处理:

it might be not enough to convince you. So let's approach this in a more systematic way:

  • rdd返回MapPartitionRDD(如上定义的ds):

  • rdd returns a MapPartitionRDD (ds as defined above):

scala> ds.rdd.getClass
res2: Class[_ <: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]] = class org.apache.spark.rdd.MapPartitionsRDD

  • RDD.getNumPartitions 调用RDD.partitions .

  • RDD.getNumPartitions invokes RDD.partitions.

    rdd和源之间只有MapPartitionsRDD.

    scala> ds.rdd.toDebugString
    res3: String =
    (1) MapPartitionsRDD[3] at rdd at <console>:26 []
     |  MapPartitionsRDD[2] at rdd at <console>:26 []
     |  MapPartitionsRDD[1] at rdd at <console>:26 []
     |  FileScanRDD[0] at rdd at <console>:26 []
    

    类似地,如果Dataset包含一次交换,我们将跟随父母到最近的随机播放:

    Similarly if Dataset contained an exchange we would follow the parents to the nearest shuffle:

    scala> ds.orderBy("value").rdd.toDebugString
    res4: String =
    (67) MapPartitionsRDD[13] at rdd at <console>:26 []
     |   MapPartitionsRDD[12] at rdd at <console>:26 []
     |   MapPartitionsRDD[11] at rdd at <console>:26 []
     |   ShuffledRowRDD[10] at rdd at <console>:26 []
     +-(1) MapPartitionsRDD[9] at rdd at <console>:26 []
        |  MapPartitionsRDD[5] at rdd at <console>:26 []
        |  FileScanRDD[4] at rdd at <console>:26 []
    

    请注意,这种情况特别有趣,因为我们实际上触发了工作:

    Note that this case is particularly interesting, because we actually triggered a job:

    scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty
    res5: Boolean = false
    
    scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null)
    res6: Array[Int] = Array(0)
    

    那是因为我们遇到了无法静态确定分区的情况(请参阅排序后的数据框分区数?为什么sortBy转换会触发Spark作业?).

    That's because we've encountered as scenario where the partitions cannot be determined statically (see Number of dataframe partitions after sorting? and Why does sortBy transformation trigger a Spark job?).

    在这种情况下,getNumPartitions也会触发工作:

    In such scenario getNumPartitions will also trigger a job:

    scala> ds.orderBy("value").rdd.getNumPartitions
    res7: Int = 67
    
    scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null)  // Note new job id
    res8: Array[Int] = Array(1, 0)
    

    但是,这并不意味着观察到的成本与.rdd通话有某种关系.相反,如果没有静态公式(例如,某些Hadoop输入格式需要完全扫描数据)的情况下,查找partitions是固有的成本.

    however it doesn't mean that the observed cost is somehow related to .rdd call. Instead it is an intrinsic cost of finding partitions in case, where there is no static formula (some Hadoop input formats for example, where full scan of the data is required).

    请注意,此处提出的观点不应外推到Dataset.rdd的其他应用程序.例如,ds.rdd.count确实是昂贵且浪费的.

    Please note that the points made here shouldn't be extrapolated to other applications of Dataset.rdd. For example ds.rdd.count would be indeed expensive and wasteful.

    这篇关于如何在不产生.rdd成本的情况下检查Spark DataFrame的分区数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

  • 10-19 17:09