我正在使用带有pyspark的Spark DataFrame模型降低PCA的维数(使用spark ml库),如下所示:

pca = PCA(k=3, inputCol="features", outputCol="pca_features")
model = pca.fit(data)
其中dataSpark DataFrame,其中一列标记为features,这是3维的DenseVector:
data.take(1)
Row(features=DenseVector([0.4536,-0.43218, 0.9876]), label=u'class1')
拟合后,我转换数据:
transformed = model.transform(data)
transformed.first()
Row(features=DenseVector([0.4536,-0.43218, 0.9876]), label=u'class1', pca_features=DenseVector([-0.33256, 0.8668, 0.625]))
我的问题是:如何提取此PCA的特征 vector ?我如何计算他们解释了多少差异?

最佳答案

[更新:从Spark 2.2开始,PySpark中都可以使用PCA和SVD-请参见JIRA票证SPARK-6227和Sparkt ML 2.2的PCAPCAModel;以下原始答案仍适用于旧版Spark。]

好吧,这似乎令人难以置信,但是实际上没有办法从PCA分解中提取此类信息(至少从Spark 1.5开始)。但是,同样有许多类似的“投诉”,例如here,因为它们无法从CrossValidatorModel中提取最佳参数。

幸运的是,几个月前,我参加了AMPLab(Berkeley)和Databricks(即Spark的创建者)的'Scalable Machine Learning' MOOC,在其中,我们“手工”实现了完整的PCA管道,作为作业的一部分。从那时起,我就修改了我的函数(放心,我功不可没:-),以便将数据框作为输入(而不是RDD的)与您的格式相同(即,包含数字特征的DenseVectors行) 。

我们首先需要定义一个中间函数estimatedCovariance,如下所示:

import numpy as np

def estimateCovariance(df):
    """Compute the covariance matrix for a given dataframe.

    Note:
        The multi-dimensional covariance array should be calculated using outer products.  Don't
        forget to normalize the data by first subtracting the mean.

    Args:
        df:  A Spark dataframe with a column named 'features', which (column) consists of DenseVectors.

    Returns:
        np.ndarray: A multi-dimensional array where the number of rows and columns both equal the
            length of the arrays in the input dataframe.
    """
    m = df.select(df['features']).map(lambda x: x[0]).mean()
    dfZeroMean = df.select(df['features']).map(lambda x:   x[0]).map(lambda x: x-m)  # subtract the mean

    return dfZeroMean.map(lambda x: np.outer(x,x)).sum()/df.count()

然后,我们可以编写一个主要的pca函数,如下所示:

from numpy.linalg import eigh

def pca(df, k=2):
    """Computes the top `k` principal components, corresponding scores, and all eigenvalues.

    Note:
        All eigenvalues should be returned in sorted order (largest to smallest). `eigh` returns
        each eigenvectors as a column.  This function should also return eigenvectors as columns.

    Args:
        df: A Spark dataframe with a 'features' column, which (column) consists of DenseVectors.
        k (int): The number of principal components to return.

    Returns:
        tuple of (np.ndarray, RDD of np.ndarray, np.ndarray): A tuple of (eigenvectors, `RDD` of
        scores, eigenvalues).  Eigenvectors is a multi-dimensional array where the number of
        rows equals the length of the arrays in the input `RDD` and the number of columns equals
        `k`.  The `RDD` of scores has the same number of rows as `data` and consists of arrays
        of length `k`.  Eigenvalues is an array of length d (the number of features).
     """
    cov = estimateCovariance(df)
    col = cov.shape[1]
    eigVals, eigVecs = eigh(cov)
    inds = np.argsort(eigVals)
    eigVecs = eigVecs.T[inds[-1:-(col+1):-1]]
    components = eigVecs[0:k]
    eigVals = eigVals[inds[-1:-(col+1):-1]]  # sort eigenvals
    score = df.select(df['features']).map(lambda x: x[0]).map(lambda x: np.dot(x, components.T) )
    # Return the `k` principal components, `k` scores, and all eigenvalues

    return components.T, score, eigVals

测试

让我们首先使用Spark ML PCA documentation中的示例数据(将它们修改为DenseVectors),使用现有方法查看结果:

 from pyspark.ml.feature import *
 from pyspark.mllib.linalg import Vectors
 data = [(Vectors.dense([0.0, 1.0, 0.0, 7.0, 0.0]),),
         (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
         (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
 df = sqlContext.createDataFrame(data,["features"])
 pca_extracted = PCA(k=2, inputCol="features", outputCol="pca_features")
 model = pca_extracted.fit(df)
 model.transform(df).collect()

 [Row(features=DenseVector([0.0, 1.0, 0.0, 7.0, 0.0]), pca_features=DenseVector([1.6486, -4.0133])),
  Row(features=DenseVector([2.0, 0.0, 3.0, 4.0, 5.0]), pca_features=DenseVector([-4.6451, -1.1168])),
  Row(features=DenseVector([4.0, 0.0, 0.0, 6.0, 7.0]), pca_features=DenseVector([-6.4289, -5.338]))]

然后,使用我们的方法:

 comp, score, eigVals = pca(df)
 score.collect()

 [array([ 1.64857282,  4.0132827 ]),
  array([-4.64510433,  1.11679727]),
  array([-6.42888054,  5.33795143])]

让我强调一下,我们不要在我们定义的函数中使用任何collect()方法-scoreRDD,应该正确。

注意,第二列的符号与现有方法得出的符号完全相反。但这不是问题:根据由Hastie&Tibshirani合着的第An Introduction to Statistical Learning(可免费下载)一书。 382



最后,既然我们有可用的特征值,那么为解释的方差百分比编写一个函数很简单:

 def varianceExplained(df, k=1):
     """Calculate the fraction of variance explained by the top `k` eigenvectors.

     Args:
         df: A Spark dataframe with a 'features' column, which (column) consists of DenseVectors.
         k: The number of principal components to consider.

     Returns:
         float: A number between 0 and 1 representing the percentage of variance explained
             by the top `k` eigenvectors.
     """
     components, scores, eigenvalues = pca(df, k)
     return sum(eigenvalues[0:k])/sum(eigenvalues)


 varianceExplained(df,1)
 # 0.79439325322305299

作为测试,我们还检查了示例数据中解释的方差是否为1.0(对于k = 5)(因为原始数据是5维的):

 varianceExplained(df,5)
 # 1.0

这应该可以有效地完成您的工作随时提出您可能需要的任何澄清。

[使用Spark 1.5.0和1.5.1开发和测试]

关于apache-spark - Pyspark和PCA:如何提取此PCA的特征向量?我如何计算他们解释了多少差异?,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/33428589/

10-16 21:36