问题描述
在 Spark DF 中使用 dropDuplicates
函数时会保留哪一行?Spark 文档中没有说明.
- Keep First(按行顺序)
- 保持最后(按行顺序)
- 随机?
附言假设在分布式 YARN 环境中(不是 master 本地)
TL;DR Keep First(按行序)
dropDuplicates
Spark SQL 中的运算符 使用Deduplicate
运算符创建逻辑计划.
那个 Deduplicate
操作符 被 Spark SQL 的 Catalyst Optimizer 翻译为 First
逻辑运算符,它很好地回答了您的问题 (!)>
您可以在下面的逻辑计划中看到 Deduplicate
运算符.
//创建重复数据集val dups = spark.range(9).map(_ % 3)val q = dups.dropDuplicates
以下是q
数据集的逻辑规划.
scala>println(q.queryExecution.logical.numberedTreeString)00 重复数据删除 [value#64L],假01 +- SerializeFromObject [input[0, bigint, false] AS value#64L]02 +- MapElements <function1>, class java.lang.Long, [StructField(value,LongType,true)], obj#63: bigint03 +- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, cast(id#58L as bigint), true), obj#62: java.lang.Long04 +- 范围 (0, 9, step=1, splits=Some(8))
Deduplicate
运算符然后被转换为 First
逻辑运算符(在优化后显示为 Aggregate
运算符).
scala>println(q.queryExecution.optimizedPlan.numberedTreeString)00 聚合 [值#64L]、[值#64L]01 +- SerializeFromObject [input[0, bigint, false] AS value#64L]02 +- MapElements <function1>, class java.lang.Long, [StructField(value,LongType,true)], obj#63: bigint03 +- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#58L, true), obj#62: java.lang.Long04 +- 范围 (0, 9, step=1, splits=Some(8))
花一些时间查看 Apache Spark 的代码后,dropDuplicates
运算符相当于 groupBy
后跟 第一个函数.
first(columnName: String, ignoreNulls: Boolean): Column 聚合函数:返回组中某列的第一个值.
import org.apache.spark.sql.functions.firstval firsts = dups.groupBy("value").agg(first("value") as "value")标度>println(firsts.queryExecution.logical.numberedTreeString)00 '聚合 [value#64L], [value#64L, first('value, false) AS value#139]01 +- SerializeFromObject [input[0, bigint, false] AS value#64L]02 +- MapElements <function1>, class java.lang.Long, [StructField(value,LongType,true)], obj#63: bigint03 +- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, cast(id#58L as bigint), true), obj#62: java.lang.Long04 +- 范围 (0, 9, step=1, splits=Some(8))标度>首先解释一下== 物理计划 ==*HashAggregate(keys=[value#64L], functions=[first(value#64L, false)])+- 交换哈希分区(value#64L, 200)+- *HashAggregate(keys=[value#64L], functions=[partial_first(value#64L, false)])+- *SerializeFromObject [input[0, bigint, false] AS value#64L]+- *MapElements , obj#63: bigint+- *DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#58L, true), obj#62: java.lang.Long+- *范围 (0, 9, step=1, splits=8)
我也认为 dropDuplicates
运算符可能更高效.
Which row will be kept when one uses the dropDuplicates
function in Spark DF? It is not stated in the Spark documentation.
- Keep First (according to row order)
- Keep Last (according to row order)
- Random?
P.S. assuming in a distributed YARN environment (not master local)
TL;DR Keep First (according to row order)
dropDuplicates
operator in Spark SQL creates a logical plan with Deduplicate
operator.
That Deduplicate
operator is translated to First
logical operator by Spark SQL's Catalyst Optimizer which answers your question nicely (!)
You can see the Deduplicate
operator in the logical plan below.
// create datasets with duplicates
val dups = spark.range(9).map(_ % 3)
val q = dups.dropDuplicates
The following is the logical plan of q
dataset.
scala> println(q.queryExecution.logical.numberedTreeString)
00 Deduplicate [value#64L], false
01 +- SerializeFromObject [input[0, bigint, false] AS value#64L]
02 +- MapElements <function1>, class java.lang.Long, [StructField(value,LongType,true)], obj#63: bigint
03 +- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, cast(id#58L as bigint), true), obj#62: java.lang.Long
04 +- Range (0, 9, step=1, splits=Some(8))
Deduplicate
operator is then translated to First
logical operator (that shows itself as Aggregate
operator after optimizations).
scala> println(q.queryExecution.optimizedPlan.numberedTreeString)
00 Aggregate [value#64L], [value#64L]
01 +- SerializeFromObject [input[0, bigint, false] AS value#64L]
02 +- MapElements <function1>, class java.lang.Long, [StructField(value,LongType,true)], obj#63: bigint
03 +- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#58L, true), obj#62: java.lang.Long
04 +- Range (0, 9, step=1, splits=Some(8))
After spending some time reviewing the code of Apache Spark, dropDuplicates
operator is equivalent to groupBy
followed by first function.
import org.apache.spark.sql.functions.first
val firsts = dups.groupBy("value").agg(first("value") as "value")
scala> println(firsts.queryExecution.logical.numberedTreeString)
00 'Aggregate [value#64L], [value#64L, first('value, false) AS value#139]
01 +- SerializeFromObject [input[0, bigint, false] AS value#64L]
02 +- MapElements <function1>, class java.lang.Long, [StructField(value,LongType,true)], obj#63: bigint
03 +- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, cast(id#58L as bigint), true), obj#62: java.lang.Long
04 +- Range (0, 9, step=1, splits=Some(8))
scala> firsts.explain
== Physical Plan ==
*HashAggregate(keys=[value#64L], functions=[first(value#64L, false)])
+- Exchange hashpartitioning(value#64L, 200)
+- *HashAggregate(keys=[value#64L], functions=[partial_first(value#64L, false)])
+- *SerializeFromObject [input[0, bigint, false] AS value#64L]
+- *MapElements <function1>, obj#63: bigint
+- *DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#58L, true), obj#62: java.lang.Long
+- *Range (0, 9, step=1, splits=8)
I also think that dropDuplicates
operator may be more performant.
这篇关于dropDuplicates 运算符中使用了哪一行?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!