问题描述
我想按组来构建随机森林模型(学校ID,超过3千人)使用星火斯卡拉API一个大模型的输入csv文件。各组含有约3000-4000的记录。我有支配的资源是20-30 AWS m3.2xlarge实例。
I am trying to build random forest models by group(School_ID, more than 3 thousands) on a large model input csv file using Spark Scala API. Each of the group contains about 3000-4000 records. The resources I have at disposal are 20-30 aws m3.2xlarge instances.
在R,我可以按组构建模型并将其保存到像这个 -
In R, I can construct models by group and save them to a list like this-
library(dplyr);library(randomForest);
Rf_model <- train %>% group_by(School_ID) %>%
do(school= randomForest(formula=Rf_formula, data=., importance = TRUE))
该列表可以存储在某个地方,我可以给他们打电话时,我需要使用它们像下面 -
The list can be stored somewhere and I can call them when I need to use them like below -
save(Rf_model.school,file=paste0(Modelpath,"Rf_model.dat"))
load(file=paste0(Modelpath,"Rf_model.dat"))
pred <- predict(Rf_model.school$school[school_index][[1]], newdata=test)
我不知道如何做,在星火,不管是不是我需要首先分成由组数据,以及如何有效地做到这一点,如果它是必要的。
I was wondering how to do that in Spark, whether or not I need to split the data by group first and how to do it efficiently if it's necessary.
我能够通过学校ID根据以下code分裂的文件,但现在看来,这将创建一个单独的作业子集每次迭代,需要很长的时间来完成作业。有没有办法做到这一点在一通?
I was able to split up the file by School_ID based on the below code but it seems it creates one individual job to subset for each iteration and takes a long time to finish the jobs. Is there a way to do it in one pass?
model_input.cache()
val schools = model_input.select("School_ID").distinct.collect.flatMap(_.toSeq)
val bySchoolArray = schools.map(School_ID => model_input.where($"School_ID" <=> School_ID))
for( i <- 0 to programs.length - 1 ){
bySchoolArray(i).
write.format("com.databricks.spark.csv").
option("header", "true").
save("model_input_bySchool/model_input_"+ schools(i))
}
来源:
How我可以拆分数据帧与在Scala和同列的值dataframes SPARK
修改2015年8月24日
我想我的数据帧转换成由随机林模型接受的格式。我下面就这个线程指令
How创建于火花ML 分类正确的数据帧
基本上,我创建了一个新的变量标签,并存储在我的Double类。然后,我用结合功能VectorAssembler我所有的功能和改变我输入的数据如下: -
Basically, I create a new variable "label" and store my class in Double. Then I combine all my features using VectorAssembler function and transform my input data as follows-
val assembler = new VectorAssembler().
setInputCols(Array("COL1", "COL2", "COL3")).
setOutputCol("features")
val model_input = assembler.transform(model_input_raw).
select("SCHOOL_ID", "label", "features")
部分的错误消息(让我知道如果你需要完整的日志消息) -
Partial error message(let me know if you need the complete log message) -
scala.MatchError:StringType(类
org.apache.spark.sql.types.StringType $)
在org.apache.spark.ml.feature.VectorAssembler $$ anonfun $ 2.适用(VectorAssembler.scala:57)
这是所有的变量转换为数字类型之后解决了。
This is resolved after converting all the variables to numeric types.
修改2015年8月25日
该模型毫升不接受我的标签codeD手工,所以我需要使用StringIndexer去解决问题指示here.据href=\"https://spark.apache.org/docs/latest/ml-features.html#stringindexer\" rel=\"nofollow\">官方文档的
Edit 8/25/2015The ml model doesn't accept the label I coded manually so I need to use StringIndexer to go around the problem as indicated here. According to the official documentation, the most frequent label gets 0. It causes inconsistent labels across School_ID. I was wondering if there's a way to create the labels without resetting the order of the values.
val indexer = new StringIndexer().
setInputCol("label_orig").
setOutputCol("label")
任何建议或方向将是有益的,并随时提出任何问题。谢谢!
Any suggestions or directions would be helpful and feel free to raise any questions. Thanks!
推荐答案
既然你已经为每所学校单独的数据帧没有太多要在这里完成。既然你的数据帧我假设你想使用 ml.classification.RandomForestClassifier
。如果是的话,你可以尝试这样的事:
Since you already have separate data frame for each school there is not much to be done here. Since you data frames I assume you want to use ml.classification.RandomForestClassifier
. If so you can try something like this:
-
提取管道的逻辑。根据您的要求调整
RandomForestClassifier
参数和变压器
import org.apache.spark.sql.DataFrame
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.{Pipeline, PipelineModel}
def trainModel(df: DataFrame): PipelineModel = {
val rf = new RandomForestClassifier()
val pipeline = new Pipeline().setStages(Array(rf))
pipeline.fit(df)
}
在每个子集火车模型
Train models on each subset
val bySchoolArrayModels = bySchoolArray.map(df => trainModel(df))
保存模型
Save models
import java.io._
def saveModel(name: String, model: PipelineModel) = {
val oos = new ObjectOutputStream(new FileOutputStream(s"/some/path/$name"))
oos.writeObject(model)
oos.close
}
schools.zip(bySchoolArrayModels).foreach{
case (name, model) => saveModel(name, Model)
}
可选:由于各个子集是相当小的,你可以尝试一个类似于我已经描述的同时提交多个任务。
Optional: Since individual subsets are rather small you can try an approach similar to the one I've describe here to submit multiple tasks at the same time.
如果您使用 mllib.tree.model.RandomForestModel
则可以省略3,并使用 model.save
直接。因为似乎有一些问题与反序列化( - 据我可以告诉它工作得很好但有备无患,我猜)这可能是一个preferred方法。
If you use mllib.tree.model.RandomForestModel
you can omit 3. and use model.save
directly. Since there seem to be some problems with deserialization (deserializing spark ml pipeline model - as far as I can tell it works just fine but better safe than sorry, I guess) it could be a preferred approach.
修改
根据:
VectorAssembler
接受以下输入列类型:所有数值类型,布尔类型和载体类型。
由于错误表明你的列是字符串
你应该使用的。
Since error indicates your column is a String
you should transform it first, for example using StringIndexer
.
这篇关于运行3000+随机森林模型按组使用星火MLlib斯卡拉API的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!