本文介绍了使用 Spark MLlib Scala API 按组运行 3000 多个随机森林模型的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 Spark Scala API 在大型模型输入 csv 文件上按组(School_ID,超过 3 千个)构建随机森林模型.每个组包含大约 3000-4000 条记录.我可以使用的资源是 20-30 个 aws m3.2xlarge 实例.

I am trying to build random forest models by group(School_ID, more than 3 thousands) on a large model input csv file using Spark Scala API. Each of the group contains about 3000-4000 records. The resources I have at disposal are 20-30 aws m3.2xlarge instances.

在 R 中,我可以按组构建模型并将它们保存到这样的列表中-

In R, I can construct models by group and save them to a list like this-

library(dplyr);library(randomForest);
    Rf_model <- train %>% group_by(School_ID) %>%
                do(school= randomForest(formula=Rf_formula, data=., importance = TRUE))

列表可以存储在某个地方,我可以在需要使用它们时调用它们,如下所示 -

The list can be stored somewhere and I can call them when I need to use them like below -

save(Rf_model.school,file=paste0(Modelpath,"Rf_model.dat"))
load(file=paste0(Modelpath,"Rf_model.dat"))
pred <-  predict(Rf_model.school$school[school_index][[1]], newdata=test)

我想知道如何在 Spark 中做到这一点,是否需要先按组拆分数据,以及在必要时如何高效地进行拆分.

I was wondering how to do that in Spark, whether or not I need to split the data by group first and how to do it efficiently if it's necessary.

我能够根据以下代码按 School_ID 拆分文件,但似乎它为每次迭代创建了一个单独的作业以进行子集化,并且需要很长时间才能完成这些作业.有没有办法一次性完成?

I was able to split up the file by School_ID based on the below code but it seems it creates one individual job to subset for each iteration and takes a long time to finish the jobs. Is there a way to do it in one pass?

model_input.cache()

val schools = model_input.select("School_ID").distinct.collect.flatMap(_.toSeq)
val bySchoolArray = schools.map(School_ID => model_input.where($"School_ID" <=> School_ID))

for( i <- 0 to programs.length - 1 ){
  bySchoolArray(i).
    write.format("com.databricks.spark.csv").
    option("header", "true").
    save("model_input_bySchool/model_input_"+ schools(i))
}

来源:如何才能我将数据帧拆分为 SCALA 和 SPARK 中具有相同列值的数据帧

编辑 8/24/2015我正在尝试将我的数据帧转换为随机森林模型接受的格式.我正在按照此线程上的说明进行操作如何在 Spark 中为分类创建正确的数据框机器学习

Edit 8/24/2015I'm trying to convert my dataframe into a format that is accepted by the random forest model. I'm following the instruction on this threadHow to create correct data frame for classification in Spark ML

基本上,我创建了一个新变量标签"并将我的类存储在 Double 中.然后我使用 VectorAssembler 函数组合我的所有特征并按如下方式转换我的输入数据-

Basically, I create a new variable "label" and store my class in Double. Then I combine all my features using VectorAssembler function and transform my input data as follows-

val assembler = new VectorAssembler().
  setInputCols(Array("COL1", "COL2", "COL3")).
  setOutputCol("features")

val model_input = assembler.transform(model_input_raw).
  select("SCHOOL_ID", "label", "features")

部分错误信息(如果您需要完整的日志信息,请告诉我) -

Partial error message(let me know if you need the complete log message) -

scala.MatchError: StringType (of classorg.apache.spark.sql.types.StringType$)在 org.apache.spark.ml.feature.VectorAssembler$$anonfun$2.apply(VectorAssembler.scala:57)

这是在将所有变量转换为数字类型后解决的.

This is resolved after converting all the variables to numeric types.

编辑 8/25/2015ml 模型不接受我手动编码的标签,因此我需要使用 StringIndexer 解决该问题,如此处.根据官方文档,最常见的标签为 0. 它会导致 School_ID 中的标签不一致.我想知道是否有办法在不重置值的顺序的情况下创建标签.

Edit 8/25/2015The ml model doesn't accept the label I coded manually so I need to use StringIndexer to go around the problem as indicated here. According to the official documentation, the most frequent label gets 0. It causes inconsistent labels across School_ID. I was wondering if there's a way to create the labels without resetting the order of the values.

val indexer = new StringIndexer().
  setInputCol("label_orig").
  setOutputCol("label")

任何建议或指示都会有所帮助,您可以随时提出任何问题.谢谢!

Any suggestions or directions would be helpful and feel free to raise any questions. Thanks!

推荐答案

由于您已经为每所学校提供了单独的数据框,因此这里无需做太多工作.由于您使用数据框,我假设您想使用 ml.classification.RandomForestClassifier.如果是这样,你可以尝试这样的事情:

Since you already have separate data frame for each school there is not much to be done here. Since you data frames I assume you want to use ml.classification.RandomForestClassifier. If so you can try something like this:

  1. 提取流水线逻辑.根据您的要求调整 RandomForestClassifier 参数和转换器

import org.apache.spark.sql.DataFrame
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.{Pipeline, PipelineModel}

def trainModel(df: DataFrame): PipelineModel = {
   val rf  = new RandomForestClassifier()
   val pipeline = new Pipeline().setStages(Array(rf))
   pipeline.fit(df)
}

  • 在每个子集上训练模型

  • Train models on each subset

    val bySchoolArrayModels = bySchoolArray.map(df => trainModel(df))
    

  • 保存模型

  • Save models

    import java.io._
    
    def saveModel(name: String, model: PipelineModel) = {
      val oos = new ObjectOutputStream(new FileOutputStream(s"/some/path/$name"))
      oos.writeObject(model)
      oos.close
    }
    
    schools.zip(bySchoolArrayModels).foreach{
      case (name, model) => saveModel(name, Model)
    }
    

  • 可选:由于单个子集相当小,您可以尝试一种类似于我描述的方法 此处 可同时提交多个任务.

  • Optional: Since individual subsets are rather small you can try an approach similar to the one I've describe here to submit multiple tasks at the same time.

    如果你使用mllib.tree.model.RandomForestModel,你可以省略3.并直接使用model.save.由于反序列化似乎存在一些问题(How to deserialize Pipeline model in spark.ml? - 尽我所能告诉它工作得很好,但比抱歉更安全,我猜)这可能是一种首选方法.

    If you use mllib.tree.model.RandomForestModel you can omit 3. and use model.save directly. Since there seem to be some problems with deserialization (How to deserialize Pipeline model in spark.ml? - as far as I can tell it works just fine but better safe than sorry, I guess) it could be a preferred approach.

    编辑

    根据官方文档:

    VectorAssembler 接受以下输入列类型:所有数字类型、布尔类型和向量类型.

    由于错误表明您的列是一个 String,您应该先对其进行转换,例如使用 StringIndexer.

    Since error indicates your column is a String you should transform it first, for example using StringIndexer.

    这篇关于使用 Spark MLlib Scala API 按组运行 3000 多个随机森林模型的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

  • 08-13 19:26