本文介绍了在 PySpark 的文字列上检测到 INNER 连接的笛卡尔积的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

以下代码引发检测到内连接的笛卡尔积"异常:

The following code raises "Detected cartesian product for INNER join" exception:

first_df = spark.createDataFrame([{"first_id": "1"}, {"first_id": "1"}, {"first_id": "1"}, ])
second_df = spark.createDataFrame([{"some_value": "????"}, ])

second_df = second_df.withColumn("second_id", F.lit("1"))

# If the next line is uncommented, then the JOIN is working fine.
# second_df.persist()

result_df = first_df.join(second_df,
                          first_df.first_id == second_df.second_id,
                          'inner')
data = result_df.collect()

result_df.explain()

并向我展示逻辑计划如下所示:

and shows me that the logical plan is as shown below:

Filter (first_id#0 = 1)
+- LogicalRDD [first_id#0], false
and
Project [some_value#2, 1 AS second_id#4]
+- LogicalRDD [some_value#2], false
Join condition is missing or trivial.
Use the CROSS JOIN syntax to allow cartesian products between these relations.;

当 RuleExecutor 应用名为 CheckCartesianProducts 的优化规则集时,这些逻辑计划的 JOIN 条件中不存在列是有原因的(请参阅 https://github.com/apache/spark/blob/v2.3.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L1114).

It looks like for a reason there is no a column existing in the JOIN condition for those logical plans when RuleExecutor applies optimization rule set called CheckCartesianProducts (see https://github.com/apache/spark/blob/v2.3.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L1114).

但是,如果我在 JOIN 之前使用persist"方法,它会起作用并且物理计划是:

But, if I use "persist" method before JOIN it works and the Physical Plan is:

*(3) SortMergeJoin [first_id#0], [second_id#4], Inner
:- *(1) Sort [first_id#0 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(first_id#0, 10)
:     +- Scan ExistingRDD[first_id#0]
+- *(2) Sort [second_id#4 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(second_id#4, 10)
      +- InMemoryTableScan [some_value#2, second_id#4]
            +- InMemoryRelation [some_value#2, second_id#4], true, 10000, StorageLevel(disk, memory, 1 replicas)
                  +- *(1) Project [some_value#2, 1 AS second_id#4]
                     +- Scan ExistingRDD[some_value#2]

因此,可能有人可以解释导致此类结果的内部原因,因为持久化数据帧看起来不是解决方案.

So, may be someone can explain internal leading to such results, because persisting the data frame does not look as a solution.

推荐答案

问题是,一旦您持久保存数据,second_id 就会合并到缓存表中,不再被视为常量.因此,规划器不能再推断查询应该表示为笛卡尔积,而是在散列分区 second_id 上使用标准的 SortMergeJoin.

The problem is, that once you persist your data, second_id is incorporated into the cached table and no longer considered constant. As a result planner can no longer infer that the query should be expressed a Cartesian product, and uses standard SortMergeJoin on hash partitioned second_id.

使用udf

== Physical Plan ==
*(6) SortMergeJoin [cast(first_id#4 as int)], [second_id#129], Inner
:- *(2) Sort [cast(first_id#4 as int) ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(cast(first_id#4 as int), 200)
:     +- *(1) Filter isnotnull(first_id#4)
:        +- Scan ExistingRDD[first_id#4]
+- *(5) Sort [second_id#129 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(second_id#129, 200)
      +- *(4) Project [some_value#6, pythonUDF0#154 AS second_id#129]
         +- ArrowEvalPython [identity(1)], [some_value#6, pythonUDF0#154]
            +- *(3) Project [some_value#6]
               +- *(3) Filter isnotnull(pythonUDF0#153)
                  +- ArrowEvalPython [identity(1)], [some_value#6, pythonUDF0#153]
                     +- Scan ExistingRDD[some_value#6]

然而,SortMergeJoin 不是您应该在这里尝试实现的.使用常量键,它会导致极端的数据倾斜,并且可能会失败,除了玩具数据之外的任何东西.

However SortMergeJoin is not what you should try to achieve here. With constant key, it would result in an extreme data skew, and likely fail, on anything but toy data.

笛卡尔积虽然很贵,但不会受到这个问题的影响,在这里应该是首选.所以它会建议启用交叉联接或使用显式交叉联接语法(spark.sql.crossJoin.enabled for Spark 2.x) 然后继续.

Cartesian Product however, as expensive as it is, won't suffer from this issue, and should be preferred here. So it would recommend enabling cross joins or using explicit cross join syntax (spark.sql.crossJoin.enabled for Spark 2.x) and move on.

一个悬而未决的问题仍然是如何在缓存数据时防止不良行为.不幸的是,我没有为此准备好答案.我相当确定可以使用自定义优化器规则,但这不是单独使用 Python 可以完成的.

A pending question remains how to prevent undesired behavior when data is cached. Unfortunately I don't have an answer ready for that. I fairly sure it is possible to use custom optimizer rules, but this is not something that can be done with Python alone.

这篇关于在 PySpark 的文字列上检测到 INNER 连接的笛卡尔积的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

11-01 08:19