本文介绍了带有Java8的Spark 2.3将行转换为列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是使用Java 8的Spark 2.4的新手.我需要帮助.这是实例的示例:

I am new to Spark 2.4 with Java 8. I need help. Here is example of instances:

源数据框架

+--------------+
| key | Value  |
+--------------+
| A   | John   |
| B   | Nick   |
| A   | Mary   |
| B   | Kathy  |
| C   | Sabrina|
| B   | George |
+--------------+

元数据框架

+-----+
| key |
+-----+
| A   |
| B   |
| C   |
| D   |
| E   |
| F   |
+-----+

我想将其转换为以下内容:元数据框和行中的列名将根据源数据框进行转换

I would like to transform it to the following: Column names from Meta Dataframe and Rows will be transformed based on Source Dataframe

+-----------------------------------------------+
| A    | B      | C       | D     | E    | F    |
+-----------------------------------------------+
| John | Nick   | Sabrina | null  | null | null |
| Mary | Kathy  | null    | null  | null | null |
| null | George | null    | null  | null | null |
+-----------------------------------------------+

需要使用Java8编写代码Spark 2.3.感谢您的帮助.

Need to write a code Spark 2.3 with Java8. Appreciated your help.

推荐答案

为了使事情更清晰(并且易于再现),让我们定义数据帧:

To make things clearer (and easily reproducible) let's define dataframes:

val df1 = Seq("A" -> "John", "B" -> "Nick", "A" -> "Mary", 
              "B" -> "Kathy", "C" -> "Sabrina", "B" -> "George")
          .toDF("key", "value")
val df2 = Seq("A", "B", "C", "D", "E", "F").toDF("key")

据我所见,您正在尝试按df2key列中的值创建一列.这些列应包含与key命名列相关联的value列的所有值.如果以一个示例为例,列A的第一个值应为A的首次出现的值(如果存在,否则为null):"John".它的第二个值应该是第二次出现的A:"Mary"的值.没有第三个值,因此列的第三个值应为null.

From what I see, you are trying to create one column by value in the key column of df2. These columns should contain all the values of the value column that are associated to the key naming the column. If we take an example, column A's first value should be the value of the first occurrence of A (if it exists, null otherwise): "John". Its second value should be the value of the second occurrence of A: "Mary". There is no third value so the third value of the column should be null.

我对其进行了详细说明,以表明我们需要每个键的值的排名概念(窗口函数),并按该排名概念进行分组.它将如下:

I detailed it to show that we need a notion of rank of the values for each key (windowing function), and group by that notion of rank. It would go as follows:

import org.apache.spark.sql.expressions.Window
val df1_win = df1
    .withColumn("id", monotonically_increasing_id)
    .withColumn("rank", rank() over Window.partitionBy("key").orderBy("id"))
// the id is just here to maintain the original order.

// getting the keys in df2. Add distinct if there are duplicates.
val keys = df2.collect.map(_.getAs[String](0)).sorted

// then it's just about pivoting
df1_win
    .groupBy("rank")
    .pivot("key", keys) 
    .agg(first('value))
    .orderBy("rank")
    //.drop("rank") // I keep here it for clarity
    .show()
+----+----+------+-------+----+----+----+                                       
|rank|   A|     B|      C|   D|   E|   F|
+----+----+------+-------+----+----+----+
|   1|John|  Nick|Sabrina|null|null|null|
|   2|Mary| Kathy|   null|null|null|null|
|   3|null|George|   null|null|null|null|
+----+----+------+-------+----+----+----+

这是Java中完全相同的代码

Here is the very same code in Java

Dataset<Row> df1_win = df1
    .withColumn("id", functions.monotonically_increasing_id())
    .withColumn("rank", functions.rank().over(Window.partitionBy("key").orderBy("id")));
    // the id is just here to maintain the original order.

// getting the keys in df2. Add distinct if there are duplicates.
// Note that it is a list of objects, to match the (strange) signature of pivot
List<Object> keys = df2.collectAsList().stream()
    .map(x -> x.getString(0))
    .sorted().collect(Collectors.toList());

// then it's just about pivoting
df1_win
    .groupBy("rank")
    .pivot("key", keys)
    .agg(functions.first(functions.col("value")))
    .orderBy("rank")
    // .drop("rank") // I keep here it for clarity
    .show();

这篇关于带有Java8的Spark 2.3将行转换为列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-20 09:18