本文介绍了Spark 2.3 with Java8 将行转换为列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是使用 Java 8 的 Spark 2.4 的新手.我需要帮助.以下是实例示例:

I am new to Spark 2.4 with Java 8. I need help. Here is example of instances:

源数据帧

+--------------+
| key | Value  |
+--------------+
| A   | John   |
| B   | Nick   |
| A   | Mary   |
| B   | Kathy  |
| C   | Sabrina|
| B   | George |
+--------------+

元数据帧

+-----+
| key |
+-----+
| A   |
| B   |
| C   |
| D   |
| E   |
| F   |
+-----+

我想将其转换为以下内容:元数据框和行中的列名将根据源数据框进行转换

I would like to transform it to the following: Column names from Meta Dataframe and Rows will be transformed based on Source Dataframe

+-----------------------------------------------+
| A    | B      | C       | D     | E    | F    |
+-----------------------------------------------+
| John | Nick   | Sabrina | null  | null | null |
| Mary | Kathy  | null    | null  | null | null |
| null | George | null    | null  | null | null |
+-----------------------------------------------+

需要用Java8编写代码Spark 2.3.感谢您的帮助.

Need to write a code Spark 2.3 with Java8. Appreciated your help.

推荐答案

为了使事情更清晰(并且易于重现),让我们定义数据框:

To make things clearer (and easily reproducible) let's define dataframes:

val df1 = Seq("A" -> "John", "B" -> "Nick", "A" -> "Mary", 
              "B" -> "Kathy", "C" -> "Sabrina", "B" -> "George")
          .toDF("key", "value")
val df2 = Seq("A", "B", "C", "D", "E", "F").toDF("key")

据我所知,您试图在 df2key 列中按值创建一列.这些列应包含与命名列的 key 关联的 value 列的所有值.如果我们举个例子,列 A 的第一个值应该是 A 第一次出现的值(如果存在,否则为 null):"John".它的第二个值应该是第二次出现 A 的值:"Mary".没有第三个值,所以列的第三个值应该是 null.

From what I see, you are trying to create one column by value in the key column of df2. These columns should contain all the values of the value column that are associated to the key naming the column. If we take an example, column A's first value should be the value of the first occurrence of A (if it exists, null otherwise): "John". Its second value should be the value of the second occurrence of A: "Mary". There is no third value so the third value of the column should be null.

我详细说明了我们需要每个键(窗口函数)的值的等级概念,并按该等级概念进行分组.它将如下所示:

I detailed it to show that we need a notion of rank of the values for each key (windowing function), and group by that notion of rank. It would go as follows:

import org.apache.spark.sql.expressions.Window
val df1_win = df1
    .withColumn("id", monotonically_increasing_id)
    .withColumn("rank", rank() over Window.partitionBy("key").orderBy("id"))
// the id is just here to maintain the original order.

// getting the keys in df2. Add distinct if there are duplicates.
val keys = df2.collect.map(_.getAs[String](0)).sorted

// then it's just about pivoting
df1_win
    .groupBy("rank")
    .pivot("key", keys) 
    .agg(first('value))
    .orderBy("rank")
    //.drop("rank") // I keep here it for clarity
    .show()
+----+----+------+-------+----+----+----+                                       
|rank|   A|     B|      C|   D|   E|   F|
+----+----+------+-------+----+----+----+
|   1|John|  Nick|Sabrina|null|null|null|
|   2|Mary| Kathy|   null|null|null|null|
|   3|null|George|   null|null|null|null|
+----+----+------+-------+----+----+----+

这是 Java 中完全相同的代码

Here is the very same code in Java

Dataset<Row> df1_win = df1
    .withColumn("id", functions.monotonically_increasing_id())
    .withColumn("rank", functions.rank().over(Window.partitionBy("key").orderBy("id")));
    // the id is just here to maintain the original order.

// getting the keys in df2. Add distinct if there are duplicates.
// Note that it is a list of objects, to match the (strange) signature of pivot
List<Object> keys = df2.collectAsList().stream()
    .map(x -> x.getString(0))
    .sorted().collect(Collectors.toList());

// then it's just about pivoting
df1_win
    .groupBy("rank")
    .pivot("key", keys)
    .agg(functions.first(functions.col("value")))
    .orderBy("rank")
    // .drop("rank") // I keep here it for clarity
    .show();

这篇关于Spark 2.3 with Java8 将行转换为列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-20 09:18