我已经以独立模式为Spark启动了带有ec2-script的10节点集群。我正在从PySpark Shell中访问s3存储桶中的数据,但是当我在RDD上执行转换时,只会使用一个节点。例如,以下内容将从CommonCorpus中读取数据:
bucket = ("s3n://@aws-publicdatasets/common-crawl/crawl-data/CC-MAIN-2014-23/"
"/segments/1404776400583.60/warc/CC-MAIN-20140707234000-00000-ip-10"
"-180-212-248.ec2.internal.warc.gz")
data = sc.textFile(bucket)
data.count()
运行此命令时,我的10个从属中只有一个处理数据。我知道这是因为从Spark Web控制台查看时,只有一个从属(213)具有活动的任何日志。当我在Ganglia中查看活动时,运行活动时,同一节点(213)是唯一一个内存使用量激增的从属服务器。
此外,当我在只有一个从属的ec2集群上运行相同的脚本时,我具有完全相同的性能。我正在使用Spark 1.1.0,非常感谢任何帮助或建议。
...ec2.internal.warc.gz
我认为您在gzip压缩文件中遇到了一个相当典型的问题,因为它们无法并行加载。更具体地说,单个gzip压缩文件不能由多个任务并行加载,因此Spark将为它加载1个任务,从而为您提供1个分区的RDD。
(不过,请注意,Spark可以并行加载10个压缩文件,这很好;只是这10个文件中的每个文件只能通过1个任务加载。您仍然可以跨文件获得并行性,而不能在文件内)。
通过显式检查RDD中的分区数,可以确认只有1个分区:
data.getNumPartitions()
可在RDD上并行运行的任务数的上限是RDD中的分区数或群集中的从属内核数,以较低者为准。
您的情况就是RDD分区的数量。您可以通过如下方式对RDD进行重新分区来增加它:
data = sc.textFile(bucket).repartition(sc.defaultParallelism * 3)
为什么sc.defaultParallelism * 3
呢
《 Spark Tuning指南》建议每个内核有2-3个任务,并sc.defaultParalellism
为您提供集群中内核的数量。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句