问题描述
关于如何获取n RDD
和DataFrame
的分区数存在很多问题:答案总是:
There are a number of questions about how to obtain the number of partitions of a n RDD
and or a DataFrame
: the answers invariably are:
rdd.getNumPartitions
或
df.rdd.getNumPartitions
不幸的是,这是对DataFrame
的昂贵操作,因为
Unfortunately that is an expensive operation on a DataFrame
because the
df.rdd
需要从DataFrame
到rdd
的转换.这是运行时间的顺序
requires conversion from the DataFrame
to an rdd
. This is on the order of the time it takes to run
df.count
我正在编写可选 repartition
或coalesce
sa DataFrame
的逻辑-基于分区的当前数量是否在范围内可接受的值,或者低于或高于它们.
I am writing logic that optionally repartition
's or coalesce
's a DataFrame
- based on whether the current number of partitions were within a range of acceptable values or instead below or above them.
def repartition(inDf: DataFrame, minPartitions: Option[Int],
maxPartitions: Option[Int]): DataFrame = {
val inputPartitions= inDf.rdd.getNumPartitions // EXPENSIVE!
val outDf = minPartitions.flatMap{ minp =>
if (inputPartitions < minp) {
info(s"Repartition the input from $inputPartitions to $minp partitions..")
Option(inDf.repartition(minp))
} else {
None
}
}.getOrElse( maxPartitions.map{ maxp =>
if (inputPartitions > maxp) {
info(s"Coalesce the input from $inputPartitions to $maxp partitions..")
inDf.coalesce(maxp)
} else inDf
}.getOrElse(inDf))
outDf
}
但是我们不能以这种方式为每个 DataFrame
承担rdd.getNumPartitions
的费用.
But we can not afford to incur the cost of the rdd.getNumPartitions
for every DataFrame
in this manner.
没有任何方法可以获取此信息-例如从在线/临时catalog
的registered
表中查询?
Is there not any way to obtain this information - e.g. from querying the online/temporary catalog
for the registered
table maybe?
更新:Spark GUI显示DataFrame.rdd操作所花费的时间与作业中最长的sql一样长.我将重新运行作业,并在此处添加屏幕截图.
Update The Spark GUI showed the DataFrame.rdd operation as taking as long as the longest sql in the job. I will re-run the job and attach the screenshot in a bit here.
以下只是一个 testcase :它使用的是生产数据量的一小部分.最长的sql
仅五分钟-而这也是花费时间 的方式(请注意,sql
不是 在这里:它还必须随后执行,从而有效地使累积执行时间加倍.
The following is just a testcase : it is using a small fraction of the data size of that in production. The longest sql
is only five minutes - and this one is on its way to spending that amount of time as well (note that the sql
is not helped out here: it also has to execute subsequently thus effectively doubling the cumulative execution time).
我们可以看到,在DataFrameUtils
行30上的.rdd
操作(如上面的代码段所示)花费了5.1分钟-而save
操作 still 花费了5.2分钟之后-i.e.就后续save
的执行时间而言,我们通过执行.rdd
节省了 not 的任何时间.
We can see that the .rdd
operation at DataFrameUtils
line 30 (shown in the snippet above) takes 5.1mins - and yet the save
operation still took 5.2 mins later -i.e. we did not save any time by doing the .rdd
in terms of the execution time of the subsequent save
.
推荐答案
rdd.getNumPartitions
中没有rdd
组件的固有成本,因为从不评估返回的RDD
.
There is no inherent cost of rdd
component in rdd.getNumPartitions
, because returned RDD
is never evaluated.
尽管您可以凭经验轻松地确定这一点,但可以使用调试器(我将其留给读者练习),或者确定在基本情况下没有触发任何作业
While you can easily determine this empirically, using debugger (I'll leave this as an exercise for the reader), or establishing that no jobs are triggered in the base case scenario
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.0
/_/
Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val ds = spark.read.text("README.md")
ds: org.apache.spark.sql.DataFrame = [value: string]
scala> ds.rdd.getNumPartitions
res0: Int = 1
scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty // Check if there are any known jobs
res1: Boolean = true
说服您可能还不够.因此,让我们以更系统的方式进行处理:
it might be not enough to convince you. So let's approach this in a more systematic way:
-
rdd
返回MapPartitionRDD
(如上定义的ds
):
rdd
returns aMapPartitionRDD
(ds
as defined above):
scala> ds.rdd.getClass
res2: Class[_ <: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]] = class org.apache.spark.rdd.MapPartitionsRDD
RDD.getNumPartitions
调用RDD.partitions
.
RDD.getNumPartitions
invokes RDD.partitions
.
rdd
和源之间只有MapPartitionsRDD
.
scala> ds.rdd.toDebugString
res3: String =
(1) MapPartitionsRDD[3] at rdd at <console>:26 []
| MapPartitionsRDD[2] at rdd at <console>:26 []
| MapPartitionsRDD[1] at rdd at <console>:26 []
| FileScanRDD[0] at rdd at <console>:26 []
类似地,如果Dataset
包含一次交换,我们将跟随父母到最近的随机播放:
Similarly if Dataset
contained an exchange we would follow the parents to the nearest shuffle:
scala> ds.orderBy("value").rdd.toDebugString
res4: String =
(67) MapPartitionsRDD[13] at rdd at <console>:26 []
| MapPartitionsRDD[12] at rdd at <console>:26 []
| MapPartitionsRDD[11] at rdd at <console>:26 []
| ShuffledRowRDD[10] at rdd at <console>:26 []
+-(1) MapPartitionsRDD[9] at rdd at <console>:26 []
| MapPartitionsRDD[5] at rdd at <console>:26 []
| FileScanRDD[4] at rdd at <console>:26 []
请注意,这种情况特别有趣,因为我们实际上触发了工作:
Note that this case is particularly interesting, because we actually triggered a job:
scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty
res5: Boolean = false
scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null)
res6: Array[Int] = Array(0)
那是因为我们遇到了无法静态确定分区的情况(请参阅排序后的数据框分区数?和为什么sortBy转换会触发Spark作业?).
That's because we've encountered as scenario where the partitions cannot be determined statically (see Number of dataframe partitions after sorting? and Why does sortBy transformation trigger a Spark job?).
在这种情况下,getNumPartitions
也会触发工作:
In such scenario getNumPartitions
will also trigger a job:
scala> ds.orderBy("value").rdd.getNumPartitions
res7: Int = 67
scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null) // Note new job id
res8: Array[Int] = Array(1, 0)
但是,这并不意味着观察到的成本与.rdd
通话有某种关系.相反,如果没有静态公式(例如,某些Hadoop输入格式需要完全扫描数据)的情况下,查找partitions
是固有的成本.
however it doesn't mean that the observed cost is somehow related to .rdd
call. Instead it is an intrinsic cost of finding partitions
in case, where there is no static formula (some Hadoop input formats for example, where full scan of the data is required).
请注意,此处提出的观点不应外推到Dataset.rdd
的其他应用程序.例如,ds.rdd.count
确实是昂贵且浪费的.
Please note that the points made here shouldn't be extrapolated to other applications of Dataset.rdd
. For example ds.rdd.count
would be indeed expensive and wasteful.
这篇关于如何在不产生.rdd成本的情况下检查Spark DataFrame的分区数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!