本文介绍了在PySpark的文字列中检测到用于INNER的笛卡尔积的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

以下代码引发检测到的用于INNER联接的笛卡尔积"异常:

The following code raises "Detected cartesian product for INNER join" exception:

first_df = spark.createDataFrame([{"first_id": "1"}, {"first_id": "1"}, {"first_id": "1"}, ])
second_df = spark.createDataFrame([{"some_value": "????"}, ])

second_df = second_df.withColumn("second_id", F.lit("1"))

# If the next line is uncommented, then the JOIN is working fine.
# second_df.persist()

result_df = first_df.join(second_df,
                          first_df.first_id == second_df.second_id,
                          'inner')
data = result_df.collect()

result_df.explain()

并告诉我逻辑计划如下所示:

and shows me that the logical plan is as shown below:

Filter (first_id#0 = 1)
+- LogicalRDD [first_id#0], false
and
Project [some_value#2, 1 AS second_id#4]
+- LogicalRDD [some_value#2], false
Join condition is missing or trivial.
Use the CROSS JOIN syntax to allow cartesian products between these relations.;

当RuleExecutor应用称为CheckCartesianProducts的优化规则集(请参阅 https://github.com/apache/spark/blob/v2.3.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L1114 ).

It looks like for a reason there is no a column existing in the JOIN condition for those logical plans when RuleExecutor applies optimization rule set called CheckCartesianProducts (see https://github.com/apache/spark/blob/v2.3.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L1114).

但是,如果我在加入之前使用"persist"方法,则该方法有效并且物理计划是:

But, if I use "persist" method before JOIN it works and the Physical Plan is:

*(3) SortMergeJoin [first_id#0], [second_id#4], Inner
:- *(1) Sort [first_id#0 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(first_id#0, 10)
:     +- Scan ExistingRDD[first_id#0]
+- *(2) Sort [second_id#4 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(second_id#4, 10)
      +- InMemoryTableScan [some_value#2, second_id#4]
            +- InMemoryRelation [some_value#2, second_id#4], true, 10000, StorageLevel(disk, memory, 1 replicas)
                  +- *(1) Project [some_value#2, 1 AS second_id#4]
                     +- Scan ExistingRDD[some_value#2]

因此,也许有人可以解释导致这种结果的内部原因,因为持久保存数据帧并不是解决方案.

So, may be someone can explain internal leading to such results, because persisting the data frame does not look as a solution.

推荐答案

问题是,一旦持久化数据,second_id就被合并到缓存表中,不再被视为常量.结果,计划人员无法再推断该查询应表示为笛卡尔积,而在散列分区的second_id上使用标准的SortMergeJoin.

The problem is, that once you persist your data, second_id is incorporated into the cached table and no longer considered constant. As a result planner can no longer infer that the query should be expressed a Cartesian product, and uses standard SortMergeJoin on hash partitioned second_id.

使用udf

== Physical Plan ==
*(6) SortMergeJoin [cast(first_id#4 as int)], [second_id#129], Inner
:- *(2) Sort [cast(first_id#4 as int) ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(cast(first_id#4 as int), 200)
:     +- *(1) Filter isnotnull(first_id#4)
:        +- Scan ExistingRDD[first_id#4]
+- *(5) Sort [second_id#129 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(second_id#129, 200)
      +- *(4) Project [some_value#6, pythonUDF0#154 AS second_id#129]
         +- ArrowEvalPython [identity(1)], [some_value#6, pythonUDF0#154]
            +- *(3) Project [some_value#6]
               +- *(3) Filter isnotnull(pythonUDF0#153)
                  +- ArrowEvalPython [identity(1)], [some_value#6, pythonUDF0#153]
                     +- Scan ExistingRDD[some_value#6]

但是SortMergeJoin不是什么,您应该在这里尝试实现.如果使用恒定的键,则除了玩具数据以外的任何数据都将导致极端的数据歪斜,并且很可能会失败.

However SortMergeJoin is not what you should try to achieve here. With constant key, it would result in an extreme data skew, and likely fail, on anything but toy data.

笛卡尔积,尽管价格昂贵,但不会受到此问题的影响,因此应首选.因此,建议您启用交叉连接或使用显式交叉连接语法(针对Spark 2.x的 spark.sql.crossJoin.enabled )并继续前进.

Cartesian Product however, as expensive as it is, won't suffer from this issue, and should be preferred here. So it would recommend enabling cross joins or using explicit cross join syntax (spark.sql.crossJoin.enabled for Spark 2.x) and move on.

一个尚待解决的问题仍然是如何防止在缓存数据时出现不良行为.不幸的是,我还没有答案.我相当确定可以使用自定义优化程序规则,但这不是仅靠Python就能完成的事情.

A pending question remains how to prevent undesired behavior when data is cached. Unfortunately I don't have an answer ready for that. I fairly sure it is possible to use custom optimizer rules, but this is not something that can be done with Python alone.

这篇关于在PySpark的文字列中检测到用于INNER的笛卡尔积的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

11-01 08:19