本文介绍了dropDuplicates 运算符中使用了哪一行?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在 Spark DF 中使用 dropDuplicates 函数时会保留哪一行?Spark 文档中没有说明.

  1. Keep First(按行顺序)
  2. 保持最后(按行顺序)
  3. 随机?

附言假设在分布式 YARN 环境中(不是 master 本地)

解决方案

TL;DR Keep First(按行序)

dropDuplicates Spark SQL 中的运算符 使用Deduplicate 运算符创建逻辑计划.

那个 Deduplicate 操作符 被 Spark SQL 的 Catalyst Optimizer 翻译为 First 逻辑运算符,它很好地回答了您的问题 (!)>

您可以在下面的逻辑计划中看到 Deduplicate 运算符.

//创建重复数据集val dups = spark.range(9).map(_ % 3)val q = dups.dropDuplicates

以下是q数据集的逻辑规划.

scala>println(q.queryExecution.logical.numberedTreeString)00 重复数据删除 [value#64L],假01 +- SerializeFromObject [input[0, bigint, false] AS value#64L]02 +- MapElements <function1>, class java.lang.Long, [StructField(value,LongType,true)], obj#63: bigint03 +- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, cast(id#58L as bigint), true), obj#62: java.lang.Long04 +- 范围 (0, 9, step=1, splits=Some(8))

Deduplicate 运算符然后被转换为 First 逻辑运算符(在优化后显示为 Aggregate 运算符).

scala>println(q.queryExecution.optimizedPlan.numberedTreeString)00 聚合 [值#64L]、[值#64L]01 +- SerializeFromObject [input[0, bigint, false] AS value#64L]02 +- MapElements <function1>, class java.lang.Long, [StructField(value,LongType,true)], obj#63: bigint03 +- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#58L, true), obj#62: java.lang.Long04 +- 范围 (0, 9, step=1, splits=Some(8))

花一些时间查看 Apache Spark 的代码后,dropDuplicates 运算符相当于 groupBy 后跟 第一个函数.

first(columnName: String, ignoreNulls: Boolean): Column 聚合函数:返回组中某列的第一个值.

import org.apache.spark.sql.functions.firstval firsts = dups.groupBy("value").agg(first("value") as "value")标度>println(firsts.queryExecution.logical.numberedTreeString)00 '聚合 [value#64L], [value#64L, first('value, false) AS value#139]01 +- SerializeFromObject [input[0, bigint, false] AS value#64L]02 +- MapElements <function1>, class java.lang.Long, [StructField(value,LongType,true)], obj#63: bigint03 +- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, cast(id#58L as bigint), true), obj#62: java.lang.Long04 +- 范围 (0, 9, step=1, splits=Some(8))标度>首先解释一下== 物理计划 ==*HashAggregate(keys=[value#64L], functions=[first(value#64L, false)])+- 交换哈希分区(value#64L, 200)+- *HashAggregate(keys=[value#64L], functions=[partial_first(value#64L, false)])+- *SerializeFromObject [input[0, bigint, false] AS value#64L]+- *MapElements , obj#63: bigint+- *DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#58L, true), obj#62: java.lang.Long+- *范围 (0, 9, step=1, splits=8)

我也认为 dropDuplicates 运算符可能更高效.

Which row will be kept when one uses the dropDuplicatesfunction in Spark DF? It is not stated in the Spark documentation.

  1. Keep First (according to row order)
  2. Keep Last (according to row order)
  3. Random?

P.S. assuming in a distributed YARN environment (not master local)

解决方案

TL;DR Keep First (according to row order)

dropDuplicates operator in Spark SQL creates a logical plan with Deduplicate operator.

That Deduplicate operator is translated to First logical operator by Spark SQL's Catalyst Optimizer which answers your question nicely (!)

You can see the Deduplicate operator in the logical plan below.

// create datasets with duplicates
val dups = spark.range(9).map(_ % 3)

val q = dups.dropDuplicates

The following is the logical plan of q dataset.

scala> println(q.queryExecution.logical.numberedTreeString)
00 Deduplicate [value#64L], false
01 +- SerializeFromObject [input[0, bigint, false] AS value#64L]
02    +- MapElements <function1>, class java.lang.Long, [StructField(value,LongType,true)], obj#63: bigint
03       +- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, cast(id#58L as bigint), true), obj#62: java.lang.Long
04          +- Range (0, 9, step=1, splits=Some(8))

Deduplicate operator is then translated to First logical operator (that shows itself as Aggregate operator after optimizations).

scala> println(q.queryExecution.optimizedPlan.numberedTreeString)
00 Aggregate [value#64L], [value#64L]
01 +- SerializeFromObject [input[0, bigint, false] AS value#64L]
02    +- MapElements <function1>, class java.lang.Long, [StructField(value,LongType,true)], obj#63: bigint
03       +- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#58L, true), obj#62: java.lang.Long
04          +- Range (0, 9, step=1, splits=Some(8))


After spending some time reviewing the code of Apache Spark, dropDuplicates operator is equivalent to groupBy followed by first function.

import org.apache.spark.sql.functions.first
val firsts = dups.groupBy("value").agg(first("value") as "value")
scala> println(firsts.queryExecution.logical.numberedTreeString)
00 'Aggregate [value#64L], [value#64L, first('value, false) AS value#139]
01 +- SerializeFromObject [input[0, bigint, false] AS value#64L]
02    +- MapElements <function1>, class java.lang.Long, [StructField(value,LongType,true)], obj#63: bigint
03       +- DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, cast(id#58L as bigint), true), obj#62: java.lang.Long
04          +- Range (0, 9, step=1, splits=Some(8))

scala> firsts.explain
== Physical Plan ==
*HashAggregate(keys=[value#64L], functions=[first(value#64L, false)])
+- Exchange hashpartitioning(value#64L, 200)
   +- *HashAggregate(keys=[value#64L], functions=[partial_first(value#64L, false)])
      +- *SerializeFromObject [input[0, bigint, false] AS value#64L]
         +- *MapElements <function1>, obj#63: bigint
            +- *DeserializeToObject staticinvoke(class java.lang.Long, ObjectType(class java.lang.Long), valueOf, id#58L, true), obj#62: java.lang.Long
               +- *Range (0, 9, step=1, splits=8)

I also think that dropDuplicates operator may be more performant.

这篇关于dropDuplicates 运算符中使用了哪一行?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-14 20:08