本文介绍了具有不同Windows规范的链式火花列表达式会产生无效的DAG的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设您处理时间序列数据。您所需的结果取决于具有不同窗口规格的多个窗口功能。结果可能类似于单个spark列表达式,例如间隔标识符。

Let's say you deal with time series data. Your desired outcome relies on multiple window functions with distinct window specifications. The result may resemble a single spark column expression, like an identifier for intervals.

通常,我不使用 df.withColumn 存储中间结果,而是使用链/堆栈列表达式并信任Spark来找到最有效的DAG(在处理DataFrame时)。

Usually, I don't store intermediate results with df.withColumn but rather chain/stack column expressions and trust Spark to find the most effective DAG (when dealing with DataFrame).

但是,在下面的示例(PySpark 2.4.4独立版本)中,存储中间结果为 df.withColumn 降低了DAG的复杂度。让我们考虑以下测试设置:

However, in the following example (PySpark 2.4.4 standalone), storing an intermediate result with df.withColumn reduces the DAG complexity. Let's consider following test setup:

import pandas as pd
import numpy as np

from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()

dfp = pd.DataFrame(
    {
        "col1": np.random.randint(0, 5, size=100),
        "col2": np.random.randint(0, 5, size=100),
        "col3": np.random.randint(0, 5, size=100),
        "col4": np.random.randint(0, 5, size=100),        
    }
)

df = spark.createDataFrame(dfp)
df.show(5)



+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|   1|   2|   4|   1|
|   0|   2|   3|   0|
|   2|   0|   1|   0|
|   4|   1|   1|   2|
|   1|   3|   0|   4|
+----+----+----+----+
only showing top 5 rows

计算是任意的。基本上,我们有2个窗口规范和3个计算步骤。这3个计算步骤彼此依赖,并使用交替的窗口规格:

The computation is arbitrary. Basically we have 2 window specs and 3 computational steps. The 3 computational steps are dependend on each other and use alternating window specs:

w1 = Window.partitionBy("col1").orderBy("col2")
w2 = Window.partitionBy("col3").orderBy("col4")

# first step, arbitrary window func over 1st window
step1 = F.lag("col3").over(w1)

# second step, arbitrary window func over 2nd window with step 1
step2 = F.lag(step1).over(w2)

# third step, arbitrary window func over 1st window with step 2
step3 = F.when(step2 > 1, F.max(step2).over(w1))

df_result = df.withColumn("result", step3)

通过 df_result.explain()进行的物理计划揭示了4种交换方式和排序方式!但是,这里只需要3个,因为我们只更改了两次窗口规格。

Inspecting the phyiscal plan via df_result.explain() reveals 4 exchanges and sorts! However, only 3 should be necessary here because we change the window spec only twice.

df_result.explain()



== Physical Plan ==
*(7) Project [col1#0L, col2#1L, col3#2L, col4#3L, CASE WHEN (_we0#25L > 1) THEN _we1#26L END AS result#22L]
+- Window [lag(_w0#23L, 1, null) windowspecdefinition(col3#2L, col4#3L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _we0#25L], [col3#2L], [col4#3L ASC NULLS FIRST]
   +- *(6) Sort [col3#2L ASC NULLS FIRST, col4#3L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(col3#2L, 200)
         +- *(5) Project [col1#0L, col2#1L, col3#2L, col4#3L, _w0#23L, _we1#26L]
            +- Window [max(_w1#24L) windowspecdefinition(col1#0L, col2#1L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS _we1#26L], [col1#0L], [col2#1L ASC NULLS FIRST]
               +- *(4) Sort [col1#0L ASC NULLS FIRST, col2#1L ASC NULLS FIRST], false, 0
                  +- Exchange hashpartitioning(col1#0L, 200)
                     +- *(3) Project [col1#0L, col2#1L, col3#2L, col4#3L, _w0#23L, _w1#24L]
                        +- Window [lag(_w0#27L, 1, null) windowspecdefinition(col3#2L, col4#3L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _w1#24L], [col3#2L], [col4#3L ASC NULLS FIRST]
                           +- *(2) Sort [col3#2L ASC NULLS FIRST, col4#3L ASC NULLS FIRST], false, 0
                              +- Exchange hashpartitioning(col3#2L, 200)
                                 +- Window [lag(col3#2L, 1, null) windowspecdefinition(col1#0L, col2#1L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _w0#27L, lag(col3#2L, 1, null) windowspecdefinition(col1#0L, col2#1L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _w0#23L], [col1#0L], [col2#1L ASC NULLS FIRST]
                                    +- *(1) Sort [col1#0L ASC NULLS FIRST, col2#1L ASC NULLS FIRST], false, 0
                                       +- Exchange hashpartitioning(col1#0L, 200)
                                          +- Scan ExistingRDD[col1#0L,col2#1L,col3#2L,col4#3L]



改进



为了获得更好的DAG,我们需要稍微修改代码以存储以下内容的列表达式 step2 和 withColumn ,只需传递此列的引用即可。新的逻辑计划确实只需要3次改组!

Improvement

To get a better DAG, we slightly modify the code to store the column expression of step2 with withColumn and just pass the reference of this column. The new logical plan requires only 3 shuffles indeed!

w1 = Window.partitionBy("col1").orderBy("col2")
w2 = Window.partitionBy("col3").orderBy("col4")

# first step, arbitrary window func
step1 = F.lag("col3").over(w1)

# second step, arbitrary window func over 2nd window with step 1
step2 = F.lag(step1).over(w2)

# save temporary
df = df.withColumn("tmp_variable", step2)
step2 = F.col("tmp_variable")

# third step, arbitrary window func over 1st window with step 2
step3 = F.when(step2 > 1, F.max(step2).over(w1))

df_result = df.withColumn("result", step3).drop("tmp_variable")
df_result.explain()



== Physical Plan ==
*(5) Project [col1#0L, col2#1L, col3#2L, col4#3L, CASE WHEN (tmp_variable#33L > 1) THEN _we0#42L END AS result#41L]
+- Window [max(tmp_variable#33L) windowspecdefinition(col1#0L, col2#1L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS _we0#42L], [col1#0L], [col2#1L ASC NULLS FIRST]
   +- *(4) Sort [col1#0L ASC NULLS FIRST, col2#1L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(col1#0L, 200)
         +- *(3) Project [col1#0L, col2#1L, col3#2L, col4#3L, tmp_variable#33L]
            +- Window [lag(_w0#34L, 1, null) windowspecdefinition(col3#2L, col4#3L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS tmp_variable#33L], [col3#2L], [col4#3L ASC NULLS FIRST]
               +- *(2) Sort [col3#2L ASC NULLS FIRST, col4#3L ASC NULLS FIRST], false, 0
                  +- Exchange hashpartitioning(col3#2L, 200)
                     +- Window [lag(col3#2L, 1, null) windowspecdefinition(col1#0L, col2#1L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _w0#34L], [col1#0L], [col2#1L ASC NULLS FIRST]
                        +- *(1) Sort [col1#0L ASC NULLS FIRST, col2#1L ASC NULLS FIRST], false, 0
                           +- Exchange hashpartitioning(col1#0L, 200)
                              +- Scan ExistingRDD[col1#0L,col2#1L,col3#2L,col4#3L]



相关性



我原来的示例更加复杂,导致DAG的差异更大(对于现实世界的数据,速度要慢10倍)

Relevance

My original example was even more complex and resulted in an even greater difference of the DAG (on real world data up to 10 times slower)

有人对此有一个答案吗?行为?我认为堆叠/链接列表达式是最佳实践,因为它使Spark能够最有效地优化中间步骤(与为中间结果创建引用相反)。

Does anyone have an answer to this odd behavior? I've thought that stacking/chaining column expressions is best practice since it allows Spark to optimize intermediate steps most effectively (in contrast to create references for intermediate results).

推荐答案

如果我们查看分析的逻辑计划 (by = df_result.explain(True)),我们可以看到那,尽管我们没有 tmp_variable ,但是由于数据集/数据帧/的 **懒惰评估** 假设逻辑列存在(惰性),则在创建逻辑计划,分析器的过程中对表进行分析。并且由于这个假设,现在需要比以前减少2个腋窗才能获得相同的结果。实际上,实际上,通过遵循 Parsed Logical Plan(逻辑分析计划),我们发现分析器在创建时需要构建较少评估的窗口(windowspecdefinition) tmp_variable 而不是通过下推方式构建窗口,而是主要执行简单的项目(选择)。

If we look at the Analyzed Logical Plan (by=df_result.explain(True)) we can see that, while we don't have tmp_variable, but because of **lazy evaluation** of datasets/dataframes/tables on the way of creation Logical Plan, Analyzer perform analysis on that column assuming that column exist (lazy). And because of this assumption now it needs to build 2 axillary windows less than the previous case to achieve the same result. Actually also, by following the Parsed Logical Plan we see that analyzer needs to build less unevaluated windows (windowspecdefinition) when creating tmp_variable where instead of building windows on the it's push-down way, it mostly performs simple projects(selects).

这篇关于具有不同Windows规范的链式火花列表达式会产生无效的DAG的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-26 22:12