本文介绍了如何充分利用集群中的所有Spark节点?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经在 Spark 的独立模式下使用 ec2-script 启动了一个 10 节点集群.我正在从 PySpark shell 中访问 s3 存储桶中的数据,但是当我在 RDD 上执行转换时,只使用了一个节点.例如,下面将从 CommonCorpus 中读取数据:

bucket = ("s3n://@aws-publicdatasets/common-crawl/crawl-data/CC-MAIN-2014-23/"/segments/1404776400583.60/warc/CC-MAIN-20140707234000-00000-ip-10""-180-212-248.ec2.internal.warc.gz")数据 = sc.textFile(bucket)数据计数()

当我运行它时,我的 10 个从站中只有一个处理数据.我知道这一点,因为从 Spark Web 控制台查看时,只有一个从站 (213) 有任何活动日志.当我查看 Ganglia 中的活动时,该节点 (213) 是唯一一个在活动运行时内存使用量激增的从站.

此外,当我使用只有一个从站的 ec2 集群运行相同的脚本时,我的性能完全相同.我正在使用 Spark 1.1.0,非常感谢任何帮助或建议.

解决方案

我认为您遇到了 gzipped 文件的一个相当典型的问题,因为它们无法并行加载.更具体地说,单个 gzipped 文件不能被多个任务并行加载,因此 Spark 将加载 1 个任务,从而为您提供一个带有 1 个分区的 RDD.

(但是请注意,Spark 可以并行加载 10 个 gzipped 文件就好了;只是这 10 个文件中的每一个只能由 1 个任务加载.您仍然可以 个文件获得并行性,只是不在文件中.)

您可以通过明确检查您的 RDD 中的分区数来确认您只有 1 个分区:

data.getNumPartitions()

可以在 RDD 上并行运行的任务数量的上限是 RDD 中的分区数量或集群中的从内核数量,以较低者为准.>

在您的情况下,它是 RDD 分区的数量.您可以通过按如下方式重新分区 RDD 来增加它:

data = sc.textFile(bucket).repartition(sc.defaultParallelism * 3)

为什么 sc.defaultParallelism * 3?

Spark Tuning 指南建议 每个内核 2-3 个任务sc.defaultParalellism 为您提供集群中的核心数.

I have launched a 10 node cluster with the ec2-script in standalone mode for Spark. I am accessing data in s3 buckets from within the PySpark shell but when I perform transormations on the RDD, only one node is ever used. For example the below will read in data from the CommonCorpus:

bucket = ("s3n://@aws-publicdatasets/common-crawl/crawl-data/CC-MAIN-2014-23/"
          "/segments/1404776400583.60/warc/CC-MAIN-20140707234000-00000-ip-10"
          "-180-212-248.ec2.internal.warc.gz")

data = sc.textFile(bucket)
data.count()

When I run this, only one of my 10 slaves processes the data. I know this because only one slave (213) has any logs of the activity when viewed from the Spark web console. When I view the the activity in Ganglia, this same node (213) is the only slave with a spike in mem usage when the activity was run.

Furthermore I have the exact same performance when I run the same script with an ec2 cluster of only one slave. I am using Spark 1.1.0 and any help or advice is greatly appreciated.

解决方案

I think you've hit a fairly typical problem with gzipped files in that they cannot be loaded in parallel. More specifically, a single gzipped file cannot be loaded in parallel by multiple tasks, so Spark will load it with 1 task and thus give you an RDD with 1 partition.

(Note, however, that Spark can load 10 gzipped files in parallel just fine; it's just that each of those 10 files can only be loaded by 1 task. You can still get parallelism across files, just not within a file.)

You can confirm that you only have 1 partition by checking the number of partitions in your RDD explicitly:

data.getNumPartitions()

The upper bound on the number of tasks that can run in parallel on an RDD is the number of partitions in the RDD or the number of slave cores in your cluster, whichever is lower.

In your case, it's the number of RDD partitions. You can increase that by repartitioning your RDD as follows:

data = sc.textFile(bucket).repartition(sc.defaultParallelism * 3)

Why sc.defaultParallelism * 3?

The Spark Tuning guide recommends having 2-3 tasks per core, and sc.defaultParalellism gives you the number of cores in your cluster.

这篇关于如何充分利用集群中的所有Spark节点?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

06-29 16:13