找到一个数据框区间数与分位数功能的使用Scala星火

找到一个数据框区间数与分位数功能的使用Scala星火

本文介绍了找到一个数据框区间数与分位数功能的使用Scala星火的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图从端口R A code到斯卡拉进行客户分析。我已经计算近因,频率和货币因素对星火成数据帧。

I'm trying to port a code from R to Scala to perform Customer Analysis. I have already computed Recency, Frequency and Monetary factors on Spark into a DataFrame.

下面是数据框的模式:

df.printSchema
root
 |-- customerId: integer (nullable = false)
 |-- recency: long (nullable = false)
 |-- frequency: long (nullable = false)
 |-- monetary: double (nullable = false)

这里是一个数据样本,以及:

And here is a data sample as well :

df.order($"customerId").show

+----------+-------+---------+------------------+
|customerId|recency|frequency|          monetary|
+----------+-------+---------+------------------+
|         1|    297|      114|            733.27|
|         2|    564|       11|            867.66|
|         3|   1304|        1|             35.89|
|         4|    287|       25|            153.08|
|         6|    290|       94|           316.772|
|         8|   1186|        3|            440.21|
|        11|    561|        5|            489.70|
|        14|    333|       57|            123.94|

我试图找到区间上给出一个概率段每一列向量位数。

I'm trying to find the intervals for on a quantile vector for each column given a probability segment.

,换言之给出非递减断点的概率矢量,
在我的情况下,这将是位数载体,查找包含x的每个元素的区间;

In other words, given a probability vector of non-decreasing breakpoints,in my case it will be the quantile vector, find the interval containing each element of x;

即。 (伪code)

if i <- findInterval(x,v),
for each index j in x
    v[i[j]] ≤ x[j] < v[i[j] + 1] where v[0] := - Inf, v[N+1] := + Inf, and N <- length(v).

在R,这转化为下面code:

In R, this translates to the following code :

probSegment <- c(0.0, 0.25, 0.50, 0.75, 1.0)

RFM_table$Rsegment <- findInterval(RFM_table$Recency, quantile(RFM_table$Recency, probSegment))
RFM_table$Fsegment <- findInterval(RFM_table$Frequency, quantile(RFM_table$Frequency, probSegment))
RFM_table$Msegment <- findInterval(RFM_table$Monetary, quantile(RFM_table$Monetary, probSegment))

我有点坚持的位数函数思想。

I'm kind of stuck with the quantile function thought.

在较早<一href=\"http://stackoverflow.com/questions/31432843/how-to-find-median-using-pyspark/31437177?noredirect=1#comment53486994_31437177\">discussion与@ zero323,他建议我用它可以作为快捷方式的 PERCENTRANK 窗函数。我不知道,我可以申请PERCENTRANK函数在这种情况下。

In an earlier discussion with @zero323, he suggest that I used the percentRank window function which can be used as a shortcut. I'm not sure that I can apply the percentRank function in this case.

如何申请与斯卡拉星火一个数据框列位数函数?如果这是不可能的,我可以使用PERCENTRANK函数呢?

How can I apply a quantile function on a Dataframe column with Scala Spark? If this is not possible, can I use the percentRank function instead?

感谢。

推荐答案

好吧,我仍然认为, PERCENT_RANK 是不够好这里。百分比 PERCENT_RANK 窗函数计算如下:

Well, I still believe that percent_rank is good enough here. Percent percent_rank window function is computed as:

让我们定义的 PR 的是:

转化如下:

给人使用的百分位的定义,根据维基百科 ,由Microsoft Excel。

gives a definition of a percentile used, according to Wikipedia, by Microsoft Excel.

所以,你真正需要的唯一事情是 findInterval UDF将返回一个正确的区间索引。或者您可以使用等级直接和匹配的等级范围。

So the only thing you really need is findInterval UDF which will return a correct interval index. Alternatively you can use rank directly and match on rank ranges.

修改

OK,它看起来像 PERCENT_RANK 是不是一个好主意,毕竟:

OK, it looks like percent_rank is not a good idea after all:

WARN窗口:无分区定义窗口操作!所有的数据移动到单个分区,这可能会导致严重的性能下降。

我不完全知道什么是移动数据到一个分区叫非集合函数的点,但看起来我们又回到了起点。它可以使用 zipWithIndex 在普通 RDD

I am not exactly sure what is the point of moving data to a single partition to call non-aggregate function but it looks like we are back to square one. It is possible to use zipWithIndex on plain RDD:

import org.apache.spark.sql.{Row, DataFrame, Column}
import org.apache.spark.sql.types.{StructType, StructField, LongType}
import org.apache.spark.sql.functions.udf

val df = sc.parallelize(Seq(
  (1, 297, 114, 733.27),
  (2, 564, 11, 867.66),
  (3, 1304, 1,  35.89),
  (4, 287, 25, 153.08),
  (6, 290, 94, 316.772),
  (8, 1186, 3, 440.21),
  (11, 561, 5, 489.70),
  (14, 333, 57, 123.94)
)).toDF("customerId", "recency", "frequency", "monetary")


df.registerTempTable("df")
sqlContext.cacheTable("df")

一个小帮手:

def addRowNumber(df: DataFrame): DataFrame = {
  // Prepare new schema
  val schema = StructType(
    StructField("row_number", LongType, false) +: df.schema.fields)
  // Add row number
  val rowsWithIndex = df.rdd.zipWithIndex
    .map{case (row: Row, idx: Long) => Row.fromSeq(idx +: row.toSeq)}
  // Create DataFrame
  sqlContext.createDataFrame(rowsWithIndex, schema)
}

和实际的功能:

def findInterval(df: DataFrame, column: Column,
    probSegment: Array[Double], outname: String): DataFrame = {

  val n = df.count
  // Map quantiles to indices
  val breakIndices  = probSegment.map(p => (p * (n - 1)).toLong)

  // Add row number
  val dfWithRowNumber = addRowNumber(df.orderBy(column))

  // Map indices to values
  val breaks  = dfWithRowNumber
    .where($"row_number".isin(breakIndices:_*))
    .select(column.cast("double"))
    .map(_.getDouble(0))
    .collect

  // Get interval
  val f = udf((x: Double) =>
    scala.math.abs(java.util.Arrays.binarySearch(breaks, x) + 1))

  // Final result
  dfWithRowNumber
    .select($"*", f(column.cast("double")).alias(outname))
    .drop("row_number")
}

和用法示例:

scala> val probs = Array(0.0, 0.25, 0.50, 0.75, 1.0)
probs: Array[Double] = Array(0.0, 0.25, 0.5, 0.75, 1.0)

scala>  findInterval(df, $"recency", probs, "interval").show
+----------+-------+---------+--------+--------+
|customerId|recency|frequency|monetary|interval|
+----------+-------+---------+--------+--------+
|         4|    287|       25|  153.08|       1|
|         6|    290|       94| 316.772|       2|
|         1|    297|      114|  733.27|       2|
|        14|    333|       57|  123.94|       3|
|        11|    561|        5|   489.7|       3|
|         2|    564|       11|  867.66|       4|
|         8|   1186|        3|  440.21|       4|
|         3|   1304|        1|   35.89|       5|
+----------+-------+---------+--------+--------+

但我想这是很不理想。

but I guess it is far from optimal.

这篇关于找到一个数据框区间数与分位数功能的使用Scala星火的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!