本文介绍了在spark中,追加新行时,它们是否可以替代union()函数?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在我的代码中, table_df 有一些列,我在上面进行一些计算,例如最小值,最大值,平均值等,并且我想使用指定的架构new_df_schema创建new_df.按照我的逻辑,我编写了spark-sql进行计算,并将每个新生成的行附加到最初为空的new_df处,最后,它生成 new_df ,其中包含所有列的所有计算值.

In my code table_df has some columns on which I am doing some calculations like min, max, mean etc. and I want to create new_df with specified schema new_df_schema. In my logic, I have written spark-sql for calculations and appending each new generated row to initially empty new_df and at the end, it results in new_df with all calculated values for all columns.

但是问题是,当列数更多时,会导致性能问题.可以不使用union()函数或任何其他方法来提高性能吗?

But the problem is when the columns are more in number it leads to performance issue. Can this be done without using union() function or any other approach to increase performance?

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import sparkSession.sqlContext.implicits._

    val table_df = Seq(
      (10, 20, 30, 40, 50),
      (100, 200, 300, 400, 500),
      (111, 222, 333, 444, 555),
      (1123, 2123, 3123, 4123, 5123),
      (1321, 2321, 3321, 4321, 5321)
    ).toDF("col_1", "col_2", "col_3", "col_4", "col_5")
    table_df.show(false)

    table_df.createOrReplaceTempView("table_df")

     val new_df_schema = StructType(
      StructField("Column_Name", StringType, false) ::
        StructField("number_of_values", LongType, false) ::
        StructField("number_of_distinct_values", LongType, false) ::
        StructField("distinct_count_with_nan", LongType, false) ::
        StructField("distinct_count_without_nan", LongType, false) ::
        StructField("is_unique", BooleanType, false) ::
        StructField("number_of_missing_values", LongType, false) ::
        StructField("percentage_of_missing_values", DoubleType, false) ::
        StructField("percentage_of_unique_values", DoubleType, false) ::
        StructField("05_PCT", DoubleType, false) ::
        StructField("25_PCT", DoubleType, false) ::
        StructField("50_PCT", DoubleType, false) ::
        StructField("75_PCT", DoubleType, false) ::
        StructField("95_PCT", DoubleType, false) ::
        StructField("max", DoubleType, false) ::
        StructField("min", DoubleType, false) ::
        StructField("mean", DoubleType, false) ::
        StructField("std", DoubleType, false) ::
        StructField("skewness", DoubleType, false) ::
        StructField("kurtosis", DoubleType, false) ::
        StructField("range", DoubleType, false) ::
        StructField("variance", DoubleType, false) :: Nil
    )
    var new_df = sparkSession.createDataFrame(sparkSession.sparkContext.emptyRDD[Row], new_df_schema)

    for (c <- table_df.columns) {
      val num = sparkSession.sql(
        s"""SELECT
           | '$c' AS Column_Name,
           | COUNT(${c}) AS number_of_values,
           | COUNT(DISTINCT ${c}) AS number_of_distinct_values,
           | COUNT(DISTINCT ${c}) AS distinct_count_with_nan,
           | (COUNT(DISTINCT ${c}) - 1) AS distinct_count_without_nan,
           | (COUNT(${c}) == COUNT(DISTINCT ${c})) AS is_unique,
           | (COUNT(*) - COUNT(${c})) AS number_of_missing_values,
           | ((COUNT(*) - COUNT(${c}))/COUNT(*)) AS percentage_of_missing_values,
           | (COUNT(DISTINCT ${c})/COUNT(*)) AS percentage_of_unique_values,
           | APPROX_PERCENTILE($c,0.05) AS 05_PCT,
           | APPROX_PERCENTILE($c,0.25) AS 25_PCT,
           | APPROX_PERCENTILE($c,0.50) AS 50_PCT,
           | APPROX_PERCENTILE($c,0.75) AS 75_PCT,
           | APPROX_PERCENTILE($c,0.95) AS 95_PCT,
           | MAX($c) AS max,
           | MIN($c) AS min,
           | MEAN($c) AS mean,
           | STD($c) AS std,
           | SKEWNESS($c) AS skewness,
           | KURTOSIS($c) AS kurtosis,
           | (MAX($c) - MIN($c)) AS range,
           | VARIANCE($c) AS variance
           | FROM
           | table_df""".stripMargin)
        .toDF()
      new_df = new_df.union(num) // this results performance issue when then number of columns in table_df is more
    }
    new_df.show(false)

==================================================
table_df:
+-----+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|col_5|
+-----+-----+-----+-----+-----+
|10   |20   |30   |40   |50   |
|100  |200  |300  |400  |500  |
|111  |222  |333  |444  |555  |
|1123 |2123 |3123 |4123 |5123 |
|1321 |2321 |3321 |4321 |5321 |
+-----+-----+-----+-----+-----+

new_df:

+-----------+----------------+-------------------------+-----------------------+--------------------------+---------+------------------------+----------------------------+---------------------------+------+------+------+------+------+------+----+------+------------------+-------------------+-------------------+------+-----------------+
|Column_Name|number_of_values|number_of_distinct_values|distinct_count_with_nan|distinct_count_without_nan|is_unique|number_of_missing_values|percentage_of_missing_values|percentage_of_unique_values|05_PCT|25_PCT|50_PCT|75_PCT|95_PCT|max   |min |mean  |std               |skewness           |kurtosis           |range |variance         |
+-----------+----------------+-------------------------+-----------------------+--------------------------+---------+------------------------+----------------------------+---------------------------+------+------+------+------+------+------+----+------+------------------+-------------------+-------------------+------+-----------------+
|col_1      |5               |5                        |5                      |4                         |true     |0                       |0.0                         |1.0                        |10.0  |100.0 |111.0 |1123.0|1321.0|1321.0|10.0|533.0 |634.0634826261484 |0.4334269738367067 |-1.7463346405299973|1311.0|402036.5         |
|col_2      |5               |5                        |5                      |4                         |true     |0                       |0.0                         |1.0                        |20.0  |200.0 |222.0 |2123.0|2321.0|2321.0|20.0|977.2 |1141.1895986206673|0.4050513738738682 |-1.799741951675132 |2301.0|1302313.7        |
|col_3      |5               |5                        |5                      |4                         |true     |0                       |0.0                         |1.0                        |30.0  |300.0 |333.0 |3123.0|3321.0|3321.0|30.0|1421.4|1649.399072389699 |0.3979251063785061 |-1.8119558312496054|3291.0|2720517.3        |
|col_4      |5               |5                        |5                      |4                         |true     |0                       |0.0                         |1.0                        |40.0  |400.0 |444.0 |4123.0|4321.0|4321.0|40.0|1865.6|2157.926620624529 |0.39502047381456235|-1.8165124206347685|4281.0|4656647.3        |
|col_5      |5               |5                        |5                      |4                         |true     |0                       |0.0                         |1.0                        |50.0  |500.0 |555.0 |5123.0|5321.0|5321.0|50.0|2309.8|2666.59027598917  |0.3935246673563026 |-1.8186685628112493|5271.0|7110703.699999999|
+-----------+----------------+-------------------------+-----------------------+--------------------------+---------+------------------------+----------------------------+---------------------------+------+------+------+------+------+------+----+------+------------------+-------------------+-------------------+------+-----------------+

推荐答案

union 的替代方法.

检查以下代码.

scala> df.show(false)
+-----+-----+-----+-----+-----+
|col_1|col_2|col_3|col_4|col_5|
+-----+-----+-----+-----+-----+
|10   |20   |30   |40   |50   |
|100  |200  |300  |400  |500  |
|111  |222  |333  |444  |555  |
|1123 |2123 |3123 |4123 |5123 |
|1321 |2321 |3321 |4321 |5321 |
+-----+-----+-----+-----+-----+

构建必需的表达式.

scala> val descExpr = array(
    df.columns
    .map(c => struct(
        lit(c).cast("string").as("column_name"),
        max(col(c)).cast("string").as("max"),
        min(col(c)).cast("string").as("min"),
        mean(col(c)).cast("string").as("mean"),
        stddev(col(c)).cast("string").as("std"),
        skewness(col(c)).cast("string").as("skewness"),
        kurtosis(col(c)).cast("string").as("kurtosis")
        )
    ):_*
).as("data")

必填列.

val columns = Seq("column_name","max","min","mean","std","skewness","kurtosis")
 .map(c => if(c != "column_name") col(c).cast("double").as(c) else col(c))```

最终输出

scala> df
 .select(descExpr)
 .selectExpr("explode(data) as data")
 .select("data.*")
 .select(columns:_*)
 .show(false)

+-----------+------+----+------+------------------+-------------------+-------------------+
|column_name|max   |min |mean  |std               |skewness           |kurtosis           |
+-----------+------+----+------+------------------+-------------------+-------------------+
|col_1      |1321.0|10.0|533.0 |634.0634826261484 |0.43342697383670664|-1.7463346405299978|
|col_2      |2321.0|20.0|977.2 |1141.1895986206673|0.4050513738738679 |-1.7997419516751327|
|col_3      |3321.0|30.0|1421.4|1649.3990723896993|0.397925106378506  |-1.8119558312496056|
|col_4      |4321.0|40.0|1865.6|2157.9266206245293|0.3950204738145622 |-1.8165124206347691|
|col_5      |5321.0|50.0|2309.8|2666.5902759891706|0.3935246673563026 |-1.81866856281125  |
+-----------+------+----+------+------------------+-------------------+-------------------+

已更新

scala> val finalDF = df.select(descExpr).selectExpr("explode(data) as data").select("data.*").select(columns:_*)

使用 Approx Quantile 为所有列创建新的数据框.

Create new dataframe with Approx Quantile for all columns.

scala> val approxQuantileDF = df
.columns
.map(c => (c,df.stat.approxQuantile(c,Array(0.25,0.5,0.75),0.0)))
.toList
.toDF("column_name","approx_quantile")
scala> finalDF
        .join(approxQuantileDF,
              Seq("column_name"),
              "left"
    ).show(false)
+-----------+------+----+------+------------------+-------------------+-------------------+----------------------+
|column_name|max   |min |mean  |std               |skewness           |kurtosis           |approx_quantile       |
+-----------+------+----+------+------------------+-------------------+-------------------+----------------------+
|col_1      |1321.0|10.0|533.0 |634.0634826261484 |0.43342697383670664|-1.7463346405299978|[100.0, 111.0, 1123.0]|
|col_2      |2321.0|20.0|977.2 |1141.1895986206673|0.4050513738738679 |-1.7997419516751327|[200.0, 222.0, 2123.0]|
|col_3      |3321.0|30.0|1421.4|1649.3990723896993|0.397925106378506  |-1.8119558312496056|[300.0, 333.0, 3123.0]|
|col_4      |4321.0|40.0|1865.6|2157.9266206245293|0.3950204738145622 |-1.8165124206347691|[400.0, 444.0, 4123.0]|
|col_5      |5321.0|50.0|2309.8|2666.5902759891706|0.3935246673563026 |-1.81866856281125  |[500.0, 555.0, 5123.0]|
+-----------+------+----+------+------------------+-------------------+-------------------+----------------------+

这篇关于在spark中,追加新行时,它们是否可以替代union()函数?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-30 00:27