问题描述
考虑这个使用 sparklyr
的简单示例:
库(sparklyr)library(janeaustenr) # 获取一些文本数据图书馆(字符串)图书馆(dplyr)mytext <- austen_books() %>%mutate(label = as.integer(str_detect(text, 'great'))) #创建一个假标签变量mytext_spark <- copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE)# 来源:table<mytext_spark>[??× 3]# 数据库:spark_connection课本标签<chr><chr><int>1 SENSE AND 感性 Sense &感性 02""感&感性 03 由简·奥斯汀 Sense &感性 04""感&感性 05 (1811) 感觉与感性 06"" Sense &感性 07""感&感性 08"" Sense &感性 09""感&感性 010 第 1 章感性 011"" Sense &感性 012"" Sense &感性 013 达什伍德一家早已定居在苏塞克斯.他们的庄园 Sense &感性 014 号很大,他们的住所位于 Sense & 中心的诺兰德公园 (Norland Park).感性 015 他们的财产,在那里,他们几代人都生活在如此 Sense &感性 016 可敬的方式来参与他们的 Sense & 的普遍好评感性 0
数据框的大小相当小(大约 70k
行和 14k
个唯一字).
现在,在我的集群上训练一个 naive bayes
模型只需要几秒钟.首先,我定义了pipeline
pipeline %ft_regex_tokenizer(input.col='text',output.col = 'mytoken',模式 = "\\s+",间隙=真)%>%ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>%ml_naive_bayes( label_col = "label",features_col = "finaltoken",predict_col = "pcol",概率_col = "prcol",raw_prediction_col = "rpcol",model_type = "多项式",平滑 = 0,阈值 = c(1, 1))
然后训练朴素贝叶斯
模型
>图书馆(微基准)>微基准(模型
现在的问题是试图在同一个 (实际上很小!!)数据集将不起作用.
pipeline2 %ft_regex_tokenizer(input.col='text',output.col = 'mytoken',模式 = "\\s+",间隙=真)%>%ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>%ml_gbt_classifier( label_col = "label",features_col = "finaltoken",predict_col = "pcol",概率_col = "prcol",raw_prediction_col = "rpcol",max_memory_in_mb = 10240,cache_node_ids = TRUE)模型 2 <- ml_fit(pipeline2, mytext_spark)# 行不通 :(
错误:org.apache.spark.SparkException:作业因阶段失败而中止:阶段 69.0 中的任务 0 失败 4 次,最近一次失败:在 69.0 阶段丢失任务 0.3(TID 1580,1.1.1.1.1,执行者 5):java.lang.IllegalArgumentException: 大小超过 Integer.MAX_VALUE
我认为这是由于标记的矩阵表示的稀疏性造成的,但是这里有什么可以做的吗?这是 sparklyr
问题吗?spark
问题?我的代码效率低下吗?
谢谢!
您收到此错误是因为您实际上达到了我们在 Spark 中设置的著名的 2G 限制 https://issues.apache.org/jira/browse/SPARK-6235
解决方案是在将数据提供给算法之前重新分区.
这实际上是这篇文章中的两个陷阱:
- 使用本地数据.
- Spark 中基于树的模型需要大量内存.
那么,让我们检查一下您的代码,它看起来无害;
library(janeaustenr) # 获取一些文本数据图书馆(字符串)mytext <- austen_books() %>%mutate(label = as.integer(str_detect(text, 'great'))) # 创建一个假标签变量mytext_spark <- copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE)
那么最后一行是做什么的?
copy_to
(不是为大数据集设计的),实际上只是将本地R数据帧复制到一个1分区的Spark DataFrame
因此,您只需要重新分区您的数据,以确保一旦管道在输入 gbt
之前准备好您的数据,分区大小小于 2GB.
因此,您只需执行以下操作即可重新分区您的数据:
# 20 是我选择测试的任意数字,在这种情况下它似乎运行良好,# 如果您有更大的数据集,您可能需要重新考虑这一点.mytext_spark <-copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE) %>%sdf_repartition(partitions = 20)
PS1: max_memory_in_mb
是您为 gbt
计算其统计数据而提供的内存量.它与作为输入的数据量没有直接关系.
PS2:如果您没有为执行程序设置足够的内存,您可能会遇到 java.lang.OutOfMemoryError : GC 开销限制超出
重新分区数据的含义是什么?
在谈论重新分区之前,我们总是可以参考分区的定义.我会尽量简短.
分区是大型分布式数据集的逻辑块.
Spark 使用分区来管理数据,这些分区有助于以最少的网络流量并行化分布式数据处理,以便在执行器之间发送数据.默认情况下,Spark 尝试从靠近它的节点将数据读入 RDD.由于 Spark 通常访问分布式分区数据,为了优化转换操作,它会创建分区来保存数据块.
增加分区数将使每个分区的数据更少(或根本没有!)
来源:摘自@JacekLaskowski 掌握 Apache Spark 书籍.
但数据分区并不总是正确的,就像在这种情况下一样.所以需要重新分区.(sdf_repartition
for sparklyr
)
sdf_repartition
将在您的节点之间分散和混洗您的数据.即 sdf_repartition(20)
将创建 20 个数据分区,而不是在这种情况下您最初拥有的 1 个分区.
我希望这会有所帮助.
整个代码:
库(sparklyr)图书馆(dplyr)配置 <- spark_config()config$`sparklyr.shell.driver-memory` <-4G"config$`sparklyr.shell.executor-memory` <-4G"Sys.setenv(SPARK_HOME = "/Users/eliasah/server/spark-2.3.1-SNAPSHOT-bin-2.7.3")sc <- spark_connect(master = "local", config = config)library(janeaustenr) # 获取一些文本数据图书馆(字符串)mytext <- austen_books() %>%mutate(label = as.integer(str_detect(text, 'great'))) #创建一个假标签变量mytext_spark <- copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE) %>% sdf_repartition(partitions = 20)管道<- ml_pipeline(sc) %>%ft_regex_tokenizer(input.col='text',output.col = 'mytoken',模式 = "\\s+",间隙=真)%>%ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>%ml_naive_bayes( label_col = "label",features_col = "finaltoken",predict_col = "pcol",概率_col = "prcol",raw_prediction_col = "rpcol",model_type = "多项式",平滑 = 0,阈值 = c(1, 1))图书馆(微基准)微基准(模型 %ml_gbt_classifier( label_col = "label",features_col = "finaltoken",predict_col = "pcol",概率_col = "prcol",raw_prediction_col = "rpcol",max_memory_in_mb = 10240, # 这是可用于的数据量cache_node_ids = TRUE)模型 2 <- ml_fit(pipeline2, mytext_spark)管道3<- ml_pipeline(sc) %>%ft_regex_tokenizer(input.col='text',output.col = 'mytoken',模式 = "\\s+",间隙=真)%>%ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken')模型2# PipelineModel (Transformer) 具有 3 个阶段# <pipeline_1ce45bb8b7a7># 阶段# |--1 RegexTokenizer(转换器)# |<regex_tokenizer_1ce4342b543b># |(参数——列名)# |input_col:文本# |output_col: mytoken# |--2 CountVectorizerModel(变压器)# |<count_vectorizer_1ce4e0e6489># |(参数——列名)# |input_col: mytoken# |output_col:最终令牌# |(变压器信息)# |词汇:<list># |--3 GBTClassificationModel (Transformer)# |<gbt_classifier_1ce41ab30213># |(参数——列名)# |features_col:最终令牌# |label_col:标签# |预测列: pcol# |概率_col:prcol# |raw_prediction_col: rpcol# |(变压器信息)# |feature_importances: num [1:39158] 6.73e-04 7.20e-04 1.01e-15 1.97e-03 0.00 ...# |num_classes: int 2# |num_features: int 39158# |total_num_nodes: int 540# |tree_weights: num [1:20] 1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 ...# |树:<list>
Consider this simple example that uses sparklyr
:
library(sparklyr)
library(janeaustenr) # to get some text data
library(stringr)
library(dplyr)
mytext <- austen_books() %>%
mutate(label = as.integer(str_detect(text, 'great'))) #create a fake label variable
mytext_spark <- copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE)
# Source: table<mytext_spark> [?? x 3]
# Database: spark_connection
text book label
<chr> <chr> <int>
1 SENSE AND SENSIBILITY Sense & Sensibility 0
2 "" Sense & Sensibility 0
3 by Jane Austen Sense & Sensibility 0
4 "" Sense & Sensibility 0
5 (1811) Sense & Sensibility 0
6 "" Sense & Sensibility 0
7 "" Sense & Sensibility 0
8 "" Sense & Sensibility 0
9 "" Sense & Sensibility 0
10 CHAPTER 1 Sense & Sensibility 0
11 "" Sense & Sensibility 0
12 "" Sense & Sensibility 0
13 The family of Dashwood had long been settled in Sussex. Their estate Sense & Sensibility 0
14 was large, and their residence was at Norland Park, in the centre of Sense & Sensibility 0
15 their property, where, for many generations, they had lived in so Sense & Sensibility 0
16 respectable a manner as to engage the general good opinion of their Sense & Sensibility 0
The dataframe is reasonably tiny in size (about 70k
rows and 14k
unique words).
Now, training a naive bayes
model only takes a few seconds on my cluster.First, I define the pipeline
pipeline <- ml_pipeline(sc) %>%
ft_regex_tokenizer(input.col='text',
output.col = 'mytoken',
pattern = "\\s+",
gaps =TRUE) %>%
ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>%
ml_naive_bayes( label_col = "label",
features_col = "finaltoken",
prediction_col = "pcol",
probability_col = "prcol",
raw_prediction_col = "rpcol",
model_type = "multinomial",
smoothing = 0,
thresholds = c(1, 1))
then training the naive bayes
model
> library(microbenchmark)
> microbenchmark(model <- ml_fit(pipeline, mytext_spark),times = 3)
Unit: seconds
expr min lq mean median uq max neval
model <- ml_fit(pipeline, mytext_spark) 6.718354 6.996424 7.647227 7.274494 8.111663 8.948832 3
Now the problem is that trying to run any tree
-based model (random forest
, boosted trees
, etc) on the same (actually tiny!!) dataset will not work.
pipeline2 <- ml_pipeline(sc) %>%
ft_regex_tokenizer(input.col='text',
output.col = 'mytoken',
pattern = "\\s+",
gaps =TRUE) %>%
ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>%
ml_gbt_classifier( label_col = "label",
features_col = "finaltoken",
prediction_col = "pcol",
probability_col = "prcol",
raw_prediction_col = "rpcol",
max_memory_in_mb = 10240,
cache_node_ids = TRUE)
model2 <- ml_fit(pipeline2, mytext_spark)
# wont work :(
I think this is due to the sparseness of the matrix representation of the tokens, but is there anything that can be done here? Is this a sparklyr
problem? A spark
problem? Is my code non-efficient?
Thanks!
You are getting this error because you are actually hitting the famous 2G limit that we have in Spark https://issues.apache.org/jira/browse/SPARK-6235
The solution is to repartition your data before feeding it to the algorithm.
This is actually two gotchas in this post :
- Working with local data.
- Tree based models in Spark are memory hungry.
So, let’s review your code which seems harmless;
library(janeaustenr) # to get some text data
library(stringr)
mytext <- austen_books() %>%
mutate(label = as.integer(str_detect(text, 'great'))) # create a fake label variable
mytext_spark <- copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE)
So what does the last line do ?
copy_to
(not designed for big data sets), actually just copies the local R data frame to a 1 partition Spark DataFrame
So you’ll just need to repartition your data to make sure that once the pipeline prepares your data before feeding into gbt
, the partition size is smaller than 2GB.
So you can just do the following to repartition your data :
# 20 is an arbitrary number I chose to test and it seems to work well in this case,
# you might want to reconsider that if you have a bigger dataset.
mytext_spark <-
copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE) %>%
sdf_repartition(partitions = 20)
PS1: max_memory_in_mb
is the amount of memory you are giving for gbt
to computes it's statistics. It's not related directly to the amount of data as input.
PS2: If you didn't set up enough memory to your executors, you might run into a java.lang.OutOfMemoryError : GC overhead limit exceeded
EDIT: What's the meaning of repartitioning data ?
We can always refer to the definition of what a partition is before talking about repartitioning. I'll try to be short.
But data partitions isn't always right, like in this case. So repartition is needed. (sdf_repartition
for sparklyr
)
sdf_repartition
will scatter and shuffle your data across your nodes. i.e sdf_repartition(20)
will create of 20 partitions of your data instead of the 1 you originally have in this case.
I hope this helps.
The whole code :
library(sparklyr)
library(dplyr)
config <- spark_config()
config$`sparklyr.shell.driver-memory` <- "4G"
config$`sparklyr.shell.executor-memory` <- "4G"
Sys.setenv(SPARK_HOME = "/Users/eliasah/server/spark-2.3.1-SNAPSHOT-bin-2.7.3")
sc <- spark_connect(master = "local", config = config)
library(janeaustenr) # to get some text data
library(stringr)
mytext <- austen_books() %>%
mutate(label = as.integer(str_detect(text, 'great'))) #create a fake label variable
mytext_spark <- copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE) %>% sdf_repartition(partitions = 20)
pipeline <- ml_pipeline(sc) %>%
ft_regex_tokenizer(input.col='text',
output.col = 'mytoken',
pattern = "\\s+",
gaps =TRUE) %>%
ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>%
ml_naive_bayes( label_col = "label",
features_col = "finaltoken",
prediction_col = "pcol",
probability_col = "prcol",
raw_prediction_col = "rpcol",
model_type = "multinomial",
smoothing = 0,
thresholds = c(1, 1))
library(microbenchmark)
microbenchmark(model <- ml_fit(pipeline, mytext_spark),times = 3)
pipeline2 <- ml_pipeline(sc) %>%
ft_regex_tokenizer(input.col='text',
output.col = 'mytoken',
pattern = "\\s+",
gaps =TRUE) %>%
ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>%
ml_gbt_classifier( label_col = "label",
features_col = "finaltoken",
prediction_col = "pcol",
probability_col = "prcol",
raw_prediction_col = "rpcol",
max_memory_in_mb = 10240, # this is amount of data that can be use for
cache_node_ids = TRUE)
model2 <- ml_fit(pipeline2, mytext_spark)
pipeline3 <- ml_pipeline(sc) %>%
ft_regex_tokenizer(input.col='text',
output.col = 'mytoken',
pattern = "\\s+",
gaps =TRUE) %>%
ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken')
model2
# PipelineModel (Transformer) with 3 stages
# <pipeline_1ce45bb8b7a7>
# Stages
# |--1 RegexTokenizer (Transformer)
# | <regex_tokenizer_1ce4342b543b>
# | (Parameters -- Column Names)
# | input_col: text
# | output_col: mytoken
# |--2 CountVectorizerModel (Transformer)
# | <count_vectorizer_1ce4e0e6489>
# | (Parameters -- Column Names)
# | input_col: mytoken
# | output_col: finaltoken
# | (Transformer Info)
# | vocabulary: <list>
# |--3 GBTClassificationModel (Transformer)
# | <gbt_classifier_1ce41ab30213>
# | (Parameters -- Column Names)
# | features_col: finaltoken
# | label_col: label
# | prediction_col: pcol
# | probability_col: prcol
# | raw_prediction_col: rpcol
# | (Transformer Info)
# | feature_importances: num [1:39158] 6.73e-04 7.20e-04 1.01e-15 1.97e-03 0.00 ...
# | num_classes: int 2
# | num_features: int 39158
# | total_num_nodes: int 540
# | tree_weights: num [1:20] 1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 ...
# | trees: <list>
这篇关于如何在 Spark 中使用稀疏矩阵训练随机森林?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!