本文介绍了如何在 Spark 中使用稀疏矩阵训练随机森林?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

考虑这个使用 sparklyr 的简单示例:

库(sparklyr)library(janeaustenr) # 获取一些文本数据图书馆(字符串)图书馆(dplyr)mytext <- austen_books() %>%mutate(label = as.integer(str_detect(text, 'great'))) #创建一个假标签变量mytext_spark <- copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE)# 来源:table<mytext_spark>[??× 3]# 数据库:spark_connection课本标签<chr><chr><int>1 SENSE AND 感性 Sense &感性 02""感&感性 03 由简·奥斯汀 Sense &感性 04""感&感性 05 (1811) 感觉与感性 06"" Sense &感性 07""感&感性 08"" Sense &感性 09""感&感性 010 第 1 章感性 011"" Sense &感性 012"" Sense &感性 013 达什伍德一家早已定居在苏塞克斯.他们的庄园 Sense &感性 014 号很大,他们的住所位于 Sense & 中心的诺兰德公园 (Norland Park).感性 015 他们的财产,在那里,他们几代人都生活在如此 Sense &感性 016 可敬的方式来参与他们的 Sense & 的普遍好评感性 0

数据框的大小相当小(大约 70k 行和 14k 个唯一字).

现在,在我的集群上训练一个 naive bayes 模型只需要几秒钟.首先,我定义了pipeline

pipeline %ft_regex_tokenizer(input.col='text',output.col = 'mytoken',模式 = "\\s+",间隙=真)%>%ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>%ml_naive_bayes( label_col = "label",features_col = "finaltoken",predict_col = "pcol",概率_col = "prcol",raw_prediction_col = "rpcol",model_type = "多项式",平滑 = 0,阈值 = c(1, 1))

然后训练朴素贝叶斯模型

>图书馆(微基准)>微基准(模型 

现在的问题是试图在同一个 (实际上很小!!)数据集将不起作用.

pipeline2 %ft_regex_tokenizer(input.col='text',output.col = 'mytoken',模式 = "\\s+",间隙=真)%>%ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>%ml_gbt_classifier( label_col = "label",features_col = "finaltoken",predict_col = "pcol",概率_col = "prcol",raw_prediction_col = "rpcol",max_memory_in_mb = 10240,cache_node_ids = TRUE)模型 2 <- ml_fit(pipeline2, mytext_spark)# 行不通 :(

错误:org.apache.spark.SparkException:作业因阶段失败而中止:阶段 69.0 中的任务 0 失败 4 次,最近一次失败:在 69.0 阶段丢失任务 0.3(TID 1580,1.1.1.1.1,执行者 5):java.lang.IllegalArgumentException: 大小超过 Integer.MAX_VALUE

我认为这是由于标记的矩阵表示的稀疏性造成的,但是这里有什么可以做的吗?这是 sparklyr 问题吗?spark 问题?我的代码效率低下吗?

谢谢!

解决方案

您收到此错误是因为您实际上达到了我们在 Spark 中设置的著名的 2G 限制 https://issues.apache.org/jira/browse/SPARK-6235

解决方案是在将数据提供给算法之前重新分区.

这实际上是这篇文章中的两个陷阱:

  • 使用本地数据.
  • Spark 中基于树的模型需要大量内存.

那么,让我们检查一下您的代码,它看起来无害;

 library(janeaustenr) # 获取一些文本数据图书馆(字符串)mytext <- austen_books() %>%mutate(label = as.integer(str_detect(text, 'great'))) # 创建一个假标签变量mytext_spark <- copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE)

那么最后一行是做什么的?

copy_to(不是为大数据集设计的),实际上只是将本地R数据帧复制到一个1分区的Spark DataFrame

因此,您只需要重新分区您的数据,以确保一旦管道在输入 gbt 之前准备好您的数据,分区大小小于 2GB.

因此,您只需执行以下操作即可重新分区您的数据:

# 20 是我选择测试的任意数字,在这种情况下它似乎运行良好,# 如果您有更大的数据集,您可能需要重新考虑这一点.mytext_spark <-copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE) %>%sdf_repartition(partitions = 20)

PS1: max_memory_in_mb 是您为 gbt 计算其统计数据而提供的内存量.它与作为输入的数据量没有直接关系.

PS2:如果您没有为执行程序设置足够的内存,您可能会遇到 java.lang.OutOfMemoryError : GC 开销限制超出

重新分区数据的含义是什么?

在谈论重新分区之前,我们总是可以参考分区的定义.我会尽量简短.

分区是大型分布式数据集的逻辑块.

Spark 使用分区来管理数据,这些分区有助于以最少的网络流量并行化分布式数据处理,以便在执行器之间发送数据.默认情况下,Spark 尝试从靠近它的节点将数据读入 RDD.由于 Spark 通常访问分布式分区数据,为了优化转换操作,它会创建分区来保存数据块.

增加分区数将使每个分区的数据更少(或根本没有!)

来源:摘自@JacekLaskowski 掌握 Apache Spark 书籍.

但数据分区并不总是正确的,就像在这种情况下一样.所以需要重新分区.(sdf_repartition for sparklyr)

sdf_repartition 将在您的节点之间分散和混洗您的数据.即 sdf_repartition(20) 将创建 20 个数据分区,而不是在这种情况下您最初拥有的 1 个分区.

我希望这会有所帮助.

整个代码:

库(sparklyr)图书馆(dplyr)配置 <- spark_config()config$`sparklyr.shell.driver-memory` <-4G"config$`sparklyr.shell.executor-memory` <-4G"Sys.setenv(SPARK_HOME = "/Users/eliasah/server/spark-2.3.1-SNAPSHOT-bin-2.7.3")sc <- spark_connect(master = "local", config = config)library(janeaustenr) # 获取一些文本数据图书馆(字符串)mytext <- austen_books() %>%mutate(label = as.integer(str_detect(text, 'great'))) #创建一个假标签变量mytext_spark <- copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE) %>% sdf_repartition(partitions = 20)管道<- ml_pipeline(sc) %>%ft_regex_tokenizer(input.col='text',output.col = 'mytoken',模式 = "\\s+",间隙=真)%>%ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>%ml_naive_bayes( label_col = "label",features_col = "finaltoken",predict_col = "pcol",概率_col = "prcol",raw_prediction_col = "rpcol",model_type = "多项式",平滑 = 0,阈值 = c(1, 1))图书馆(微基准)微基准(模型 %ml_gbt_classifier( label_col = "label",features_col = "finaltoken",predict_col = "pcol",概率_col = "prcol",raw_prediction_col = "rpcol",max_memory_in_mb = 10240, # 这是可用于的数据量cache_node_ids = TRUE)模型 2 <- ml_fit(pipeline2, mytext_spark)管道3<- ml_pipeline(sc) %>%ft_regex_tokenizer(input.col='text',output.col = 'mytoken',模式 = "\\s+",间隙=真)%>%ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken')模型2# PipelineModel (Transformer) 具有 3 个阶段# <pipeline_1ce45bb8b7a7># 阶段# |--1 RegexTokenizer(转换器)# |<regex_tokenizer_1ce4342b543b># |(参数——列名)# |input_col:文本# |output_col: mytoken# |--2 CountVectorizerModel(变压器)# |<count_vectorizer_1ce4e0e6489># |(参数——列名)# |input_col: mytoken# |output_col:最终令牌# |(变压器信息)# |词汇:<list># |--3 GBTClassificationModel (Transformer)# |<gbt_classifier_1ce41ab30213># |(参数——列名)# |features_col:最终令牌# |label_col:标签# |预测列: pcol# |概率_col:prcol# |raw_prediction_col: rpcol# |(变压器信息)# |feature_importances: num [1:39158] 6.73e-04 7.20e-04 1.01e-15 1.97e-03 0.00 ...# |num_classes: int 2# |num_features: int 39158# |total_num_nodes: int 540# |tree_weights: num [1:20] 1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 ...# |树:<list>

Consider this simple example that uses sparklyr:

library(sparklyr)
library(janeaustenr) # to get some text data
library(stringr)
library(dplyr)

mytext <- austen_books() %>%
  mutate(label = as.integer(str_detect(text, 'great'))) #create a fake label variable

mytext_spark <- copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE)

# Source:   table<mytext_spark> [?? x 3]
# Database: spark_connection
   text                                                                    book                label
   <chr>                                                                   <chr>               <int>
 1 SENSE AND SENSIBILITY                                                   Sense & Sensibility     0
 2 ""                                                                      Sense & Sensibility     0
 3 by Jane Austen                                                          Sense & Sensibility     0
 4 ""                                                                      Sense & Sensibility     0
 5 (1811)                                                                  Sense & Sensibility     0
 6 ""                                                                      Sense & Sensibility     0
 7 ""                                                                      Sense & Sensibility     0
 8 ""                                                                      Sense & Sensibility     0
 9 ""                                                                      Sense & Sensibility     0
10 CHAPTER 1                                                               Sense & Sensibility     0
11 ""                                                                      Sense & Sensibility     0
12 ""                                                                      Sense & Sensibility     0
13 The family of Dashwood had long been settled in Sussex.  Their estate   Sense & Sensibility     0
14 was large, and their residence was at Norland Park, in the centre of    Sense & Sensibility     0
15 their property, where, for many generations, they had lived in so       Sense & Sensibility     0
16 respectable a manner as to engage the general good opinion of their     Sense & Sensibility     0

The dataframe is reasonably tiny in size (about 70k rows and 14k unique words).

Now, training a naive bayes model only takes a few seconds on my cluster.First, I define the pipeline

pipeline <- ml_pipeline(sc) %>%
  ft_regex_tokenizer(input.col='text',
                     output.col = 'mytoken',
                     pattern = "\\s+",
                     gaps =TRUE) %>%
  ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>%
  ml_naive_bayes( label_col = "label",
                  features_col = "finaltoken",
                  prediction_col = "pcol",
                  probability_col = "prcol",
                  raw_prediction_col = "rpcol",
                  model_type = "multinomial",
                  smoothing = 0,
                  thresholds = c(1, 1))

then training the naive bayes model

> library(microbenchmark)
> microbenchmark(model <- ml_fit(pipeline, mytext_spark),times = 3)
Unit: seconds
                                    expr      min       lq     mean   median       uq      max neval
 model <- ml_fit(pipeline, mytext_spark) 6.718354 6.996424 7.647227 7.274494 8.111663 8.948832     3

Now the problem is that trying to run any tree-based model (random forest, boosted trees, etc) on the same (actually tiny!!) dataset will not work.

pipeline2 <- ml_pipeline(sc) %>%
  ft_regex_tokenizer(input.col='text',
                     output.col = 'mytoken',
                     pattern = "\\s+",
                     gaps =TRUE) %>%
  ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>%
  ml_gbt_classifier( label_col = "label",
                     features_col = "finaltoken",
                     prediction_col = "pcol",
                     probability_col = "prcol",
                     raw_prediction_col = "rpcol",
                     max_memory_in_mb = 10240,
                     cache_node_ids = TRUE)

model2 <- ml_fit(pipeline2, mytext_spark)
# wont work :(

I think this is due to the sparseness of the matrix representation of the tokens, but is there anything that can be done here? Is this a sparklyr problem? A spark problem? Is my code non-efficient?

Thanks!

解决方案

You are getting this error because you are actually hitting the famous 2G limit that we have in Spark https://issues.apache.org/jira/browse/SPARK-6235

The solution is to repartition your data before feeding it to the algorithm.

This is actually two gotchas in this post :

  • Working with local data.
  • Tree based models in Spark are memory hungry.

So, let’s review your code which seems harmless;

 library(janeaustenr) # to get some text data
 library(stringr)

 mytext <- austen_books() %>%
    mutate(label = as.integer(str_detect(text, 'great'))) # create a fake label variable

 mytext_spark <- copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE)

So what does the last line do ?

copy_to (not designed for big data sets), actually just copies the local R data frame to a 1 partition Spark DataFrame

So you’ll just need to repartition your data to make sure that once the pipeline prepares your data before feeding into gbt, the partition size is smaller than 2GB.

So you can just do the following to repartition your data :

# 20 is an arbitrary number I chose to test and it seems to work well in this case,
# you might want to reconsider that if you have a bigger dataset.
mytext_spark <-
 copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE) %>%
 sdf_repartition(partitions = 20)

PS1: max_memory_in_mb is the amount of memory you are giving for gbt to computes it's statistics. It's not related directly to the amount of data as input.

PS2: If you didn't set up enough memory to your executors, you might run into a java.lang.OutOfMemoryError : GC overhead limit exceeded

EDIT: What's the meaning of repartitioning data ?

We can always refer to the definition of what a partition is before talking about repartitioning. I'll try to be short.

But data partitions isn't always right, like in this case. So repartition is needed. (sdf_repartition for sparklyr)

sdf_repartition will scatter and shuffle your data across your nodes. i.e sdf_repartition(20) will create of 20 partitions of your data instead of the 1 you originally have in this case.

I hope this helps.

The whole code :

library(sparklyr)
library(dplyr)
config <- spark_config()
config$`sparklyr.shell.driver-memory` <- "4G"
config$`sparklyr.shell.executor-memory` <- "4G"
Sys.setenv(SPARK_HOME = "/Users/eliasah/server/spark-2.3.1-SNAPSHOT-bin-2.7.3")
sc <- spark_connect(master = "local", config = config)

library(janeaustenr) # to get some text data
library(stringr)

mytext <- austen_books() %>%
  mutate(label = as.integer(str_detect(text, 'great'))) #create a fake label variable

mytext_spark <- copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE) %>% sdf_repartition(partitions = 20)

pipeline <- ml_pipeline(sc) %>%
  ft_regex_tokenizer(input.col='text',
                     output.col = 'mytoken',
                     pattern = "\\s+",
                     gaps =TRUE) %>%
  ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>%
  ml_naive_bayes( label_col = "label",
                  features_col = "finaltoken",
                  prediction_col = "pcol",
                  probability_col = "prcol",
                  raw_prediction_col = "rpcol",
                  model_type = "multinomial",
                  smoothing = 0,
                  thresholds = c(1, 1))

library(microbenchmark)
microbenchmark(model <- ml_fit(pipeline, mytext_spark),times = 3)

pipeline2 <- ml_pipeline(sc) %>%
  ft_regex_tokenizer(input.col='text',
                     output.col = 'mytoken',
                     pattern = "\\s+",
                     gaps =TRUE) %>%
  ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>%
  ml_gbt_classifier( label_col = "label",
                     features_col = "finaltoken",
                     prediction_col = "pcol",
                     probability_col = "prcol",
                     raw_prediction_col = "rpcol",
                     max_memory_in_mb = 10240, # this is amount of data that can be use for
                     cache_node_ids = TRUE)

model2 <- ml_fit(pipeline2, mytext_spark)

pipeline3 <- ml_pipeline(sc) %>%
  ft_regex_tokenizer(input.col='text',
                     output.col = 'mytoken',
                     pattern = "\\s+",
                     gaps =TRUE) %>%
  ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken')

model2
# PipelineModel (Transformer) with 3 stages
# <pipeline_1ce45bb8b7a7>
#   Stages
# |--1 RegexTokenizer (Transformer)
# |    <regex_tokenizer_1ce4342b543b>
# |     (Parameters -- Column Names)
# |      input_col: text
# |      output_col: mytoken
# |--2 CountVectorizerModel (Transformer)
# |    <count_vectorizer_1ce4e0e6489>
# |     (Parameters -- Column Names)
# |      input_col: mytoken
# |      output_col: finaltoken
# |     (Transformer Info)
# |      vocabulary: <list>
# |--3 GBTClassificationModel (Transformer)
# |    <gbt_classifier_1ce41ab30213>
# |     (Parameters -- Column Names)
# |      features_col: finaltoken
# |      label_col: label
# |      prediction_col: pcol
# |      probability_col: prcol
# |      raw_prediction_col: rpcol
# |     (Transformer Info)
# |      feature_importances:  num [1:39158] 6.73e-04 7.20e-04 1.01e-15 1.97e-03 0.00 ...
# |      num_classes:  int 2
# |      num_features:  int 39158
# |      total_num_nodes:  int 540
# |      tree_weights:  num [1:20] 1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 ...
# |      trees: <list>

这篇关于如何在 Spark 中使用稀疏矩阵训练随机森林?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-13 18:52