本文介绍了如何在Spark中训练具有稀疏矩阵的随机森林?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

考虑使用sparklyr的简单示例:

library(sparklyr)
library(janeaustenr) # to get some text data
library(stringr)
library(dplyr)

mytext <- austen_books() %>%
  mutate(label = as.integer(str_detect(text, 'great'))) #create a fake label variable

mytext_spark <- copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE)

# Source:   table<mytext_spark> [?? x 3]
# Database: spark_connection
   text                                                                    book                label
   <chr>                                                                   <chr>               <int>
 1 SENSE AND SENSIBILITY                                                   Sense & Sensibility     0
 2 ""                                                                      Sense & Sensibility     0
 3 by Jane Austen                                                          Sense & Sensibility     0
 4 ""                                                                      Sense & Sensibility     0
 5 (1811)                                                                  Sense & Sensibility     0
 6 ""                                                                      Sense & Sensibility     0
 7 ""                                                                      Sense & Sensibility     0
 8 ""                                                                      Sense & Sensibility     0
 9 ""                                                                      Sense & Sensibility     0
10 CHAPTER 1                                                               Sense & Sensibility     0
11 ""                                                                      Sense & Sensibility     0
12 ""                                                                      Sense & Sensibility     0
13 The family of Dashwood had long been settled in Sussex.  Their estate   Sense & Sensibility     0
14 was large, and their residence was at Norland Park, in the centre of    Sense & Sensibility     0
15 their property, where, for many generations, they had lived in so       Sense & Sensibility     0
16 respectable a manner as to engage the general good opinion of their     Sense & Sensibility     0

数据框的大小相当小(大约70k行和14k唯一字).

The dataframe is reasonably tiny in size (about 70k rows and 14k unique words).

现在,在我的集群上训练naive bayes模型只需要几秒钟.首先,我定义pipeline

Now, training a naive bayes model only takes a few seconds on my cluster.First, I define the pipeline

pipeline <- ml_pipeline(sc) %>%
  ft_regex_tokenizer(input.col='text',
                     output.col = 'mytoken',
                     pattern = "\\s+",
                     gaps =TRUE) %>%
  ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>%
  ml_naive_bayes( label_col = "label",
                  features_col = "finaltoken",
                  prediction_col = "pcol",
                  probability_col = "prcol",
                  raw_prediction_col = "rpcol",
                  model_type = "multinomial",
                  smoothing = 0,
                  thresholds = c(1, 1))

然后训练naive bayes模型

> library(microbenchmark)
> microbenchmark(model <- ml_fit(pipeline, mytext_spark),times = 3)
Unit: seconds
                                    expr      min       lq     mean   median       uq      max neval
 model <- ml_fit(pipeline, mytext_spark) 6.718354 6.996424 7.647227 7.274494 8.111663 8.948832     3

现在的问题是,尝试在相同(实际上很小!)的数据集上运行任何基于tree的模型(random forestboosted trees等)将不起作用.

Now the problem is that trying to run any tree-based model (random forest, boosted trees, etc) on the same (actually tiny!!) dataset will not work.

pipeline2 <- ml_pipeline(sc) %>%
  ft_regex_tokenizer(input.col='text',
                     output.col = 'mytoken',
                     pattern = "\\s+",
                     gaps =TRUE) %>%
  ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>%
  ml_gbt_classifier( label_col = "label",
                     features_col = "finaltoken",
                     prediction_col = "pcol",
                     probability_col = "prcol",
                     raw_prediction_col = "rpcol",
                     max_memory_in_mb = 10240,
                     cache_node_ids = TRUE)

model2 <- ml_fit(pipeline2, mytext_spark)
# wont work :(

我认为这是由于令牌的矩阵表示的稀疏性引起的,但是在这里有什么可以做的吗?这是sparklyr问题吗? spark问题?我的代码效率不高吗?

I think this is due to the sparseness of the matrix representation of the tokens, but is there anything that can be done here? Is this a sparklyr problem? A spark problem? Is my code non-efficient?

谢谢!

推荐答案

您收到此错误,是因为您实际上达到了Spark中的著名2G限制 https://issues.apache.org/jira/browse/SPARK-6235

You are getting this error because you are actually hitting the famous 2G limit that we have in Spark https://issues.apache.org/jira/browse/SPARK-6235

解决方案是对数据进行重新分区,然后再将其提供给算法.

这实际上是这篇文章中的两个陷阱:

This is actually two gotchas in this post :

  • 使用本地数据.
  • Spark中基于树的模型需要大量内存.

因此,让我们回顾一下看起来无害的代码;

So, let’s review your code which seems harmless;

 library(janeaustenr) # to get some text data
 library(stringr)

 mytext <- austen_books() %>%
    mutate(label = as.integer(str_detect(text, 'great'))) # create a fake label variable

 mytext_spark <- copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE)

那么最后一行是做什么的?

So what does the last line do ?

copy_to(不适用于大数据集),实际上只是将本地R数据帧复制到1个分区Spark DataFrame

copy_to (not designed for big data sets), actually just copies the local R data frame to a 1 partition Spark DataFrame

因此,您只需要重新分区数据,以确保一旦管道在将数据输入到gbt中之前准备好数据,分区大小就会小于2GB.

So you’ll just need to repartition your data to make sure that once the pipeline prepares your data before feeding into gbt, the partition size is smaller than 2GB.

因此,您只需执行以下操作即可对数据进行重新分区:

So you can just do the following to repartition your data :

# 20 is an arbitrary number I chose to test and it seems to work well in this case,
# you might want to reconsider that if you have a bigger dataset.
mytext_spark <-
 copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE) %>%
 sdf_repartition(partitions = 20)

PS1 :max_memory_in_mb是您为gbt计算其统计信息而提供的内存量.它与输入的数据量没有直接关系.

PS1: max_memory_in_mb is the amount of memory you are giving for gbt to computes it's statistics. It's not related directly to the amount of data as input.

PS2::如果没有为执行程序设置足够的内存,则可能会遇到java.lang.OutOfMemoryError : GC overhead limit exceeded

PS2: If you didn't set up enough memory to your executors, you might run into a java.lang.OutOfMemoryError : GC overhead limit exceeded

编辑:重新分区数据是什么意思?

What's the meaning of repartitioning data ?

在谈论重新分区之前,我们总是可以参考分区的定义.我会尽量简短.

We can always refer to the definition of what a partition is before talking about repartitioning. I'll try to be short.

Spark使用分区管理数据,该分区有助于以最少的网络流量并行化分布式数据处理,以便在执行程序之间发送数据. 默认情况下,Spark尝试将数据从RDD附近的节点读取到RDD中.由于Spark通常会访问分布式分区数据,因此为了优化转换操作,它会创建分区来保存数据块.

Spark manages data using partitions that helps parallelize distributed data processing with minimal network traffic for sending data between executors. By default, Spark tries to read data into an RDD from the nodes that are close to it. Since Spark usually accesses distributed partitioned data, to optimize transformation operations it creates partitions to hold the data chunks.

增加分区数将使每个分区的数据更少(或根本没有!)

Increasing partitions count will make each partition to have less data (or not at all!)

来源:@JacekLaskowski的摘录精通Apache Spark书.

source: excerpt from @JacekLaskowski Mastering Apache Spark book.

但是,在这种情况下,数据分区并不总是正确的.因此需要重新分区. (sdf_repartition表示sparklyr)

But data partitions isn't always right, like in this case. So repartition is needed. (sdf_repartition for sparklyr)

sdf_repartition将在节点上分散和重新排列您的数据.即sdf_repartition(20)将创建20个数据分区,而不是本例中的原始分区.

sdf_repartition will scatter and shuffle your data across your nodes. i.e sdf_repartition(20) will create of 20 partitions of your data instead of the 1 you originally have in this case.

我希望这会有所帮助.

整个代码:

library(sparklyr)
library(dplyr)
config <- spark_config()
config$`sparklyr.shell.driver-memory` <- "4G"
config$`sparklyr.shell.executor-memory` <- "4G"
Sys.setenv(SPARK_HOME = "/Users/eliasah/server/spark-2.3.1-SNAPSHOT-bin-2.7.3")
sc <- spark_connect(master = "local", config = config)

library(janeaustenr) # to get some text data
library(stringr)

mytext <- austen_books() %>%
  mutate(label = as.integer(str_detect(text, 'great'))) #create a fake label variable

mytext_spark <- copy_to(sc, mytext, name = 'mytext_spark', overwrite = TRUE) %>% sdf_repartition(partitions = 20)

pipeline <- ml_pipeline(sc) %>%
  ft_regex_tokenizer(input.col='text',
                     output.col = 'mytoken',
                     pattern = "\\s+",
                     gaps =TRUE) %>%
  ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>%
  ml_naive_bayes( label_col = "label",
                  features_col = "finaltoken",
                  prediction_col = "pcol",
                  probability_col = "prcol",
                  raw_prediction_col = "rpcol",
                  model_type = "multinomial",
                  smoothing = 0,
                  thresholds = c(1, 1))

library(microbenchmark)
microbenchmark(model <- ml_fit(pipeline, mytext_spark),times = 3)

pipeline2 <- ml_pipeline(sc) %>%
  ft_regex_tokenizer(input.col='text',
                     output.col = 'mytoken',
                     pattern = "\\s+",
                     gaps =TRUE) %>%
  ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken') %>%
  ml_gbt_classifier( label_col = "label",
                     features_col = "finaltoken",
                     prediction_col = "pcol",
                     probability_col = "prcol",
                     raw_prediction_col = "rpcol",
                     max_memory_in_mb = 10240, # this is amount of data that can be use for
                     cache_node_ids = TRUE)

model2 <- ml_fit(pipeline2, mytext_spark)

pipeline3 <- ml_pipeline(sc) %>%
  ft_regex_tokenizer(input.col='text',
                     output.col = 'mytoken',
                     pattern = "\\s+",
                     gaps =TRUE) %>%
  ft_count_vectorizer(input_col = 'mytoken', output_col = 'finaltoken')

model2
# PipelineModel (Transformer) with 3 stages
# <pipeline_1ce45bb8b7a7>
#   Stages
# |--1 RegexTokenizer (Transformer)
# |    <regex_tokenizer_1ce4342b543b>
# |     (Parameters -- Column Names)
# |      input_col: text
# |      output_col: mytoken
# |--2 CountVectorizerModel (Transformer)
# |    <count_vectorizer_1ce4e0e6489>
# |     (Parameters -- Column Names)
# |      input_col: mytoken
# |      output_col: finaltoken
# |     (Transformer Info)
# |      vocabulary: <list>
# |--3 GBTClassificationModel (Transformer)
# |    <gbt_classifier_1ce41ab30213>
# |     (Parameters -- Column Names)
# |      features_col: finaltoken
# |      label_col: label
# |      prediction_col: pcol
# |      probability_col: prcol
# |      raw_prediction_col: rpcol
# |     (Transformer Info)
# |      feature_importances:  num [1:39158] 6.73e-04 7.20e-04 1.01e-15 1.97e-03 0.00 ...
# |      num_classes:  int 2
# |      num_features:  int 39158
# |      total_num_nodes:  int 540
# |      tree_weights:  num [1:20] 1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 0.1 ...
# |      trees: <list>

这篇关于如何在Spark中训练具有稀疏矩阵的随机森林?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-13 18:51