本文介绍了如何充分利用集群中的所有节点的火花?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我发起的EC2脚本在独立模式下的星火10节点集群。我访问的S3桶数据从PySpark壳体内,但是当我在RDD执行transormations,只有一个节点被使用过。例如,下面将读取从CommonCorpus数据:

 斗=(S3N:// @ AWS-publicdatasets /共抓取/抓取数据/ CC-MAIN-2014-23 /
          /segments/1404776400583.60/warc/CC-MAIN-20140707234000-00000-ip-10
          -180-212-248.ec2.internal.warc.gz)

数据= sc.textFile(斗)
data.count()
 

当我运行此,只有一个我10奴隶处理数据。我知道这是因为只有一个从(213)有活动的任何日志,从星火Web控制台查看时。当我在神经节观看活动,这个相同的节点(213)是在内存使用尖峰唯一的从活动时的版本。

此外,我有相同的性能,当我运行相同的脚本只有一个从一个EC2集群。我使用的Spark 1.1.0和任何帮助或建议大大AP preciated。

解决方案

我想你已经打了一个相当典型的问题,因为它们无法并行加载gzip压缩文件。更具体地讲,一个gzip压缩的文件不能同时被多个任务加载,所以星火将加载它与1的任务,因此给你一个RDD 1分区。

(请注意,但是,星火可以加载并行10 gzip压缩文件就好了,它只是每个这些10文件只能被1任务加载,您仍然可以并行的的文件。 ,只是没有的的文件。)

您可以确认你只有1分区通过自己RDD明确分区的数量:

  data.getNumPartitions()
 

的上限,可以在一个RDD并行运行的任务数是在RDD或从内核的集群,以较低者为准数分区数目。

在你的情况,这是RDD分区的数量。您可以按如下增加,通过重新分区您的RDD:

 数据= sc.textFile(桶).repartition(sc.defaultParallelism * 3)
 

为什么 sc.defaultParallelism * 3

星火调整指导建议有每个内核 sc.defaultParalellism 为您提供了集群的内核数量。

I have launched a 10 node cluster with the ec2-script in standalone mode for Spark. I am accessing data in s3 buckets from within the PySpark shell but when I perform transormations on the RDD, only one node is ever used. For example the below will read in data from the CommonCorpus:

bucket = ("s3n://@aws-publicdatasets/common-crawl/crawl-data/CC-MAIN-2014-23/"
          "/segments/1404776400583.60/warc/CC-MAIN-20140707234000-00000-ip-10"
          "-180-212-248.ec2.internal.warc.gz")

data = sc.textFile(bucket)
data.count()

When I run this, only one of my 10 slaves processes the data. I know this because only one slave (213) has any logs of the activity when viewed from the Spark web console. When I view the the activity in Ganglia, this same node (213) is the only slave with a spike in mem usage when the activity was run.

Furthermore I have the exact same performance when I run the same script with an ec2 cluster of only one slave. I am using Spark 1.1.0 and any help or advice is greatly appreciated.

I think you've hit a fairly typical problem with gzipped files in that they cannot be loaded in parallel. More specifically, a single gzipped file cannot be loaded in parallel by multiple tasks, so Spark will load it with 1 task and thus give you an RDD with 1 partition.

(Note, however, that Spark can load 10 gzipped files in parallel just fine; it's just that each of those 10 files can only be loaded by 1 task. You can still get parallelism across files, just not within a file.)

You can confirm that you only have 1 partition by checking the number of partitions in your RDD explicitly:

data.getNumPartitions()

The upper bound on the number of tasks that can run in parallel on an RDD is the number of partitions in the RDD or the number of slave cores in your cluster, whichever is lower.

In your case, it's the number of RDD partitions. You can increase that by repartitioning your RDD as follows:

data = sc.textFile(bucket).repartition(sc.defaultParallelism * 3)

Why sc.defaultParallelism * 3?

The Spark Tuning guide recommends having 2-3 tasks per core, and sc.defaultParalellism gives you the number of cores in your cluster.

这篇关于如何充分利用集群中的所有节点的火花?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

06-29 16:12