本文介绍了运行3000+随机森林模型按组使用星火MLlib斯卡拉API的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想按组来构建随机森林模型(学校ID,超过3千人)使用星火斯卡拉API一个大模型的输入csv文件。各组含有约3000-4000的记录。我有支配的资源是20-30 AWS m3.2xlarge实例。

I am trying to build random forest models by group(School_ID, more than 3 thousands) on a large model input csv file using Spark Scala API. Each of the group contains about 3000-4000 records. The resources I have at disposal are 20-30 aws m3.2xlarge instances.

在R,我可以按组构建模型并将其保存到像这个 -

In R, I can construct models by group and save them to a list like this-

library(dplyr);library(randomForest);
    Rf_model <- train %>% group_by(School_ID) %>%
                do(school= randomForest(formula=Rf_formula, data=., importance = TRUE))

该列表可以存储在某个地方,我可以给他们打电话时,我需要使用它们像下面 -

The list can be stored somewhere and I can call them when I need to use them like below -

save(Rf_model.school,file=paste0(Modelpath,"Rf_model.dat"))
load(file=paste0(Modelpath,"Rf_model.dat"))
pred <-  predict(Rf_model.school$school[school_index][[1]], newdata=test)

我不知道如何做,在星火,不管是不是我需要首先分成由组数据,以及如何有效地做到这一点,如果它是必要的。

I was wondering how to do that in Spark, whether or not I need to split the data by group first and how to do it efficiently if it's necessary.

我能够通过学校ID根据以下code分裂的文件,但现在看来,这将创建一个单独的作业子集每次迭代,需要很长的时间来完成作业。有没有办法做到这一点在一通?

I was able to split up the file by School_ID based on the below code but it seems it creates one individual job to subset for each iteration and takes a long time to finish the jobs. Is there a way to do it in one pass?

model_input.cache()

val schools = model_input.select("School_ID").distinct.collect.flatMap(_.toSeq)
val bySchoolArray = schools.map(School_ID => model_input.where($"School_ID" <=> School_ID))

for( i <- 0 to programs.length - 1 ){
  bySchoolArray(i).
    write.format("com.databricks.spark.csv").
    option("header", "true").
    save("model_input_bySchool/model_input_"+ schools(i))
}

来源:
How我可以拆分数据帧与在Scala和同列的值dataframes SPARK

修改2015年8月24日
我想我的数据帧转换成由随机林模型接受的格式。我下面就这个线程指令
How创建于火花ML 分类正确的数据帧

基本上,我创建了一个新的变量标签,并存储在我的Double类。然后,我用结合功能VectorAssembler我所有的功能和改变我输入的数据如下: -

Basically, I create a new variable "label" and store my class in Double. Then I combine all my features using VectorAssembler function and transform my input data as follows-

val assembler = new VectorAssembler().
  setInputCols(Array("COL1", "COL2", "COL3")).
  setOutputCol("features")

val model_input = assembler.transform(model_input_raw).
  select("SCHOOL_ID", "label", "features")

部分的错误消息(让我知道如果你需要完整的日志消息) -

Partial error message(let me know if you need the complete log message) -

scala.MatchError:StringType(类
  org.apache.spark.sql.types.StringType $)
          在org.apache.spark.ml.feature.VectorAssembler $$ anonfun $ 2.适用(VectorAssembler.scala:57)

这是所有的变量转换为数字类型之后解决了。

This is resolved after converting all the variables to numeric types.

修改2015年8月25日
该模型毫升不接受我的标签codeD手工,所以我需要使用StringIndexer去解决问题指示here.据href=\"https://spark.apache.org/docs/latest/ml-features.html#stringindexer\" rel=\"nofollow\">官方文档的

Edit 8/25/2015The ml model doesn't accept the label I coded manually so I need to use StringIndexer to go around the problem as indicated here. According to the official documentation, the most frequent label gets 0. It causes inconsistent labels across School_ID. I was wondering if there's a way to create the labels without resetting the order of the values.

val indexer = new StringIndexer().
  setInputCol("label_orig").
  setOutputCol("label")

任何建议或方向将是有益的,并随时提出任何问题。谢谢!

Any suggestions or directions would be helpful and feel free to raise any questions. Thanks!

推荐答案

既然你已经为每所学校单独的数据帧没有太多要在这里完成。既然你的数据帧我假设你想使用 ml.classification.RandomForestClassifier 。如果是的话,你可以尝试这样的事:

Since you already have separate data frame for each school there is not much to be done here. Since you data frames I assume you want to use ml.classification.RandomForestClassifier. If so you can try something like this:


  1. 提取管道的逻辑。根据您的要求调整 RandomForestClassifier 参数和变压器

import org.apache.spark.sql.DataFrame
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.{Pipeline, PipelineModel}

def trainModel(df: DataFrame): PipelineModel = {
   val rf  = new RandomForestClassifier()
   val pipeline = new Pipeline().setStages(Array(rf))
   pipeline.fit(df)
}


  • 在每个子集火车模型

  • Train models on each subset

    val bySchoolArrayModels = bySchoolArray.map(df => trainModel(df))
    


  • 保存模型

  • Save models

    import java.io._
    
    def saveModel(name: String, model: PipelineModel) = {
      val oos = new ObjectOutputStream(new FileOutputStream(s"/some/path/$name"))
      oos.writeObject(model)
      oos.close
    }
    
    schools.zip(bySchoolArrayModels).foreach{
      case (name, model) => saveModel(name, Model)
    }
    


  • 可选:由于各个子集是相当小的,你可以尝试一个类似于我已经描述的同时提交多个任务。

  • Optional: Since individual subsets are rather small you can try an approach similar to the one I've describe here to submit multiple tasks at the same time.

    如果您使用 mllib.tree.model.RandomForestModel 则可以省略3,并使用​​ model.save 直接。因为似乎有一些问题与反序列化( - 据我可以告诉它工作得很好但有备无患,我猜)这可能是一个preferred方法。

    If you use mllib.tree.model.RandomForestModel you can omit 3. and use model.save directly. Since there seem to be some problems with deserialization (deserializing spark ml pipeline model - as far as I can tell it works just fine but better safe than sorry, I guess) it could be a preferred approach.

    修改

    根据:

    VectorAssembler 接受以下输入列类型:所有数值类型,布尔类型和载体类型。

    由于错误表明你的列是字符串你应该使用的。

    Since error indicates your column is a String you should transform it first, for example using StringIndexer.

    这篇关于运行3000+随机森林模型按组使用星火MLlib斯卡拉API的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

  • 08-13 19:26