今天开始记录spark中机器学习的相关应用。

spark某种意义上讲就是为机器学习准备的,其一,spark是一种内存计算框架,速度快,其二,spark更擅长处理迭代式的数据计算,而迭代运算这是机器学习模型经常遇到的。延申一点,目前大数据还有一种是流式运算,也就是处理的是实时数据,虽然这种spark也可以,但是毕竟是一种伪造的流式。

所以更多时候spark是处理离线的、迭代式的运算。spark里面目前已经集成了专门的机器学习框架mllib,目前为止已经非常丰富成熟了,实乃大数据机器学习领域的必须框架。

机器学习任务第一步要做的一般都是数据准备,包括数据类型以及数据格式,通常情况下特征是以向量或者矩阵的形式进行训练的,spark的基础数据格式是RDD,那么结合机器学习的数据格式是什么样的呢?

  • mllib的基本数据类型

机器学习的特征数据无非两种:向量类型的,矩阵类型的。mllib也是如此,再将数据分为本地的和分布式的,就是四种基本数据类型:
(1)本地的向量
(2)本地的矩阵
(3)分布式的向量(是一种向量标签)
(4)分布式的矩阵

  • 本地的向量

向量应该说是spark机器学习里面最重要的一种结构,在spark里面,有专门根据数组生成可以用的向量的方法,只需要调用一下对应的包即可:

import org.apache.spark.mllib.linalg.{Vector, Vectors}

注意机器学习的所有的数据结构相关的都在org.apache.spark.mllib.linalg这个包里面可以找到。

举个例子假设我们想将array数组变成机器学习的向量,可以如下:

scala> var arr=Array(1.0, 2.0, 3.0, 4.0)
arr: Array[Double] = Array(1.0, 2.0, 3.0, 4.0)
scala> import org.apache.spark.mllib.linalg.{Vector, Vectors}
scala> var vec=Vectors.dense(arr)
vec: org.apache.spark.mllib.linalg.Vector = [1.0,2.0,3.0,4.0]

说起向量,我们自然会想到有两种向量存在:稠密向量与稀疏向量。mllib中也都提供。稠密向量就是挨着将所有数据都列出来,也就是我们一般看到的向量。稀疏向量是采用索引的格式表示的向量,这种通常是针对大维度向量同时里面大多数元素为0的向量。比如你有一个10万维的特征,但是多数情况下一个向量里面有9千多维都是0,如果你完整存的话,就是10万维的长度,里面一堆0,这样对内存是一种极大的浪费,这个时候你只需要存哪些不是0的数值就可以大幅度缩小内存,这也就出现了采用索引方式存储的稀疏向量。上面的例子是一种稠密向量,下面看一个稀疏向量的表示:

scala> var arr=Array(1.0, 2.0, 3.0, 4.0)
scala> import org.apache.spark.mllib.linalg.{Vector, Vectors}
scala> var vec=Vectors.sparse(8,Array(0,2,4,6), arr)
vec: org.apache.spark.mllib.linalg.Vector = (8,[0,2,4,6],[1.0,2.0,3.0,4.0])

scala> vec(0)
res1: Double = 1.0

scala> vec(1)
res2: Double = 0.0

解释一下,Vectors.sparse需要三个参数,第一个是整个向量的长度,好比上面的10万一样,第二个是不等于0的特征的索引位置(注意必须递增),是一个向量,第三个是这些位置对应的特征值,很容易理解,一个稀疏向量必须包含这几个才行。我给的例子就是一个长度为8的向量,但是里面只有4个元素有值的情况,打印出来可以发现没有赋值的位置为0.

  • 向量标签

说完了向量表示,来看看和机器学习最相关的一种数据结构:向量标签。

所谓向量标签就是将向量和标签叠在一起组成一个数据结构就是向量标签。我们知道,有监督模型训练下,训练数据除了特征外,还必须有标签,代表它所属的类别,这样才能训练模型。

mllib里面的这种向量标签格式的数据需要倒入对应的LabeledPoint包:

import org.apache.spark.mllib.regression.LabeledPoint

这样,一个带标签的向量标签数据就可以使用了,首先必须有一个向量,然后添加一个label一起生成:

scala> val vec=Vectors.dense(Array(1.0,2,3,4))
scala> val vd = LabeledPoint(1,vec)

vd: org.apache.spark.mllib.regression.LabeledPoint = (1.0,[1.0,2.0,3.0,4.0])

LabeledPoint接受两个参数,第一个是特征的label,第二个是特征向量。当然LabeledPoint也接受稀疏向量。

那么后续的很多模型训练的时候,直接接受这样的LabeledPoint数据就可以训练了。

当然,对于训练模型需要的数据,人为构造是一种形式,就好比上面的这个过程,自己一步步构造LabeledPoint。还可以用带的api直接读取固定格式的数据知道到达LabeledPoint这一步,典型的函数就是:MLUtils.loadLibSVMFile,这个函数所读取的数据是一种label与特征索引的格式,如下:

1 0:2 1:3 2:4
0 0:1 1:1 2:3
...

第一个是label,然后是特征,特征里面第一个是位置,冒号后面是值,很好理解。但是要注意,这种带索引的读进来的可就是稀疏向量了。

  • 本地矩阵

很多时候,为了提高运算速度,将一堆向量进行矩阵化表示可以明显提高计算速度。熟悉Matlab的可以知道,Matlab号称矩阵实验室,操作的数据多是矩阵,而且其对矩阵的操作速度可谓无人能及了吧(想一想自己的matlab技能已经荒废很久了)。

既然矩阵这么叼,那么spark自然也会集成。本地矩阵就是存储在一台机器上的数据矩阵,和向量一样,同样需要导入包才能使用,操作的对象依然很灵活,可以是向量也可以是数组。比如:

scala> import org.apache.spark.mllib.linalg.{Matrix, Matrices}
scala> val mx = Matrices.dense(2,3, Array(1,2,3,4,5,6))

mx: org.apache.spark.mllib.linalg.Matrix =
1.0  3.0  5.0
2.0  4.0  6.0

可以看到将一个数组变成2行3列的矩阵。

  • 分布式矩阵

把一个大矩阵存储在多个机器上的就是分布式矩阵了。对于小矩阵,就完全没有必要搞分布式矩阵了,浪费内存。

mllib有四种分布式矩阵的存储格式,分别是:行矩阵,带索引的行矩阵,坐标矩阵,块矩阵。

(1)行矩阵

行矩阵比较简单也最基础。就是将向量一行一行叠起来组成一个矩阵就是行矩阵了,说白了就是整了一个编号串起来所有向量,感觉内部还是向量并不是什么矩阵。当然前提是,矩阵每一行元素大小要一致,这和矩阵类似,不能一行长一行短的。行矩阵每一行都可以取出来单独操作。

可能有人会问既然就是向量加个编号,为什么还要多此一举呢?其实就是为了方便,行矩阵的数据格式是RowMatrix,当数据格式变成矩阵以后,既可以执行矩阵对应的操作了,比如RowMatrix里面就有一些方法,比如求所有向量的均值向量,只需要一行代码,这比你自己去求均值还是要快不少的。

向量变成行矩阵的形式也很容易,看个例子:

val rdd = sc.textFile("file/data/MatrixRow.txt") //创建RDD文件路径
      .map(_.split(' ') // 分割
      .map(_.toDouble)) //转成Double类型
      .map(line => Vectors.dense(line)) //转成Vector格式

val rm = new RowMatrix(rdd) //读入行矩阵
println(rm.numRows()) //打印列数
println(rm.numCols()) //打印行数
rm.rows.foreach(println)

>
2
3
[1.0,2.0,3.0]
[4.0,5.0,6.0]

MatrixRow.txt的文件:

1 2 3
4 5 6

(2)带有行索引的行矩阵
所谓行索引矩阵就是在上面的行矩阵基础上加一个东西当成索引值在最前面,感觉有点鸡肋有没有。至于这个加的索引值是什么就很灵活了,感觉某些时候还是很有用的。

用法也很简单,在上面基础上多了一个map:

val rdd = sc.textFile("file/data/MatrixRow.txt") //创建RDD文件路径
      .map(_.split(' ') // 分割
      .map(_.toDouble)) //转成Double类型
      .map(line => Vectors.dense(line)) //转成Vector格式
      .map(vd => new IndexedRow(vd.size,vd))

val rm = new IndexedMatrix(rdd) //读入行矩阵

这就生成了一个索引矩阵。

(3)坐标矩阵

坐标矩阵听名字也可以大概猜出来,每个元素都有一个坐标形成的矩阵。每一个具体的特征值都有三个东西组成(x,y,value)。说实话没用过,感觉也很鸡肋的样子,但是设计者既然设计了,肯定在某些时候大有用途,比如在非常大的稀疏矩阵下,和稀疏向量类似,这时坐标矩阵可以大大减小内存。

生成方式和上面类似,包换一下,每个值多加一个x与y就好了。

val rdd = sc.textFile("file/data/MatrixRow.txt") //创建RDD文件路径
      .map(_.split(' ') // 分割
      .map(_.toDouble)) //转成Double类型
      .map(line => Vectors.dense(line)) //转成Vector格式
	  .map(vue => (vue(0).toLong, vue(1).toLong, vue(2))) //转化成坐标格式
      .map(vue2 => new MatrixEntry(vue2 _1, vue2 _2, vue2 _3)) //转化成坐标矩阵格式

val crm = new CoordinateMatrix(rdd3) //实例化坐标矩阵

(4)块矩阵

所谓块矩阵也是将一个超大矩阵进行分块存储起来的一种格式。块矩阵的包:org.apache.spark.mllib.linalg.distributed.BlockMatrix。当进行大规模的矩阵运算,加减乘除等的时候,块矩阵也许是加速计算的唯一办法。BlockMatrix本身集成矩阵的一些四则运算,

RDD的分布式矩阵块MatrixBlock,其中每一个矩阵块MatrixBlock都是一个元组((Int, Int), Matrix),其中(Int, Int)是块的索引,而Matrix则是在对应位置的子矩阵(sub-matrix),其尺寸由rowsPerBlock和colsPerBlock决定,默认值均为1024。

总结,上述四种矩阵形式是层层递进的,越往后理论上越高级,高级的通常可以往低级转换。某几个也可以相互转换,需要测试。使用中,当到了计算速度瓶颈的时候就需要考虑一下数据的矩阵化操作。

12-02 07:59