问题描述
这样做的目的是为了处理和保存每个数据文件的副本在HDFS第二位置。我将使用
RddName.coalesce(1).saveAsTextFile(将pathName)
将结果保存到HDFS。
这就是为什么我想要做的每个文件分开,即使我相信性能不会被视为有效。不过,我还没有决定如何的CSV文件路径列表遍历每一个有一个单独的RDD存储到字符串数组,然后。
让我们用下面的匿名的例子作为HDFS源位置:
/data/email/click/date=2015-01-01/sent_20150101.csv
/data/email/click/date=2015-01-02/sent_20150102.csv
/data/email/click/date=2015-01-03/sent_20150103.csv
我知道如何使用列出文件路径Hadoop的FS外壳:
HDFS DFS -ls /data/email/click/*/*.csv
我知道如何为所有的数据创建一个RDD:
VAL sentRdd = sc.textFile(/data/email/click/*/*.csv)
这是最终为我工作:
进口org.apache.hadoop.fs._
进口org.apache.spark.deploy.SparkHadoopUtil
进口java.net.URI中VAL hdfs_conf = SparkHadoopUtil.get.newConfiguration(sc.getConf)
VAL HDFS = FileSystem.get(hdfs_conf)
在HDFS //源数据
VAL SOURCEPATH =新路径(/<&源位置GT; /< filename_pattern>中)hdfs.globStatus(SOURCEPATH).foreach {时间filestatus =>
VAL filePathName = fileStatus.getPath()。的toString()
VAL文件名= fileStatus.getPath()的getName() //<做的东西HERE>} //结束foreach循环
The purpose of this is in order to manipulate and save a copy of each data file in a second location in HDFS. I will be using
RddName.coalesce(1).saveAsTextFile(pathName)
to save the result to HDFS.
This is why I want to do each file separately even though I am sure the performance will not be as efficient. However, I have yet to determine how to store the list of CSV file paths into an array of strings and then loop through each one with a separate RDD.
Let us use the following anonymous example as the HDFS source locations:
/data/email/click/date=2015-01-01/sent_20150101.csv
/data/email/click/date=2015-01-02/sent_20150102.csv
/data/email/click/date=2015-01-03/sent_20150103.csv
I know how to list the file paths using Hadoop FS Shell:
HDFS DFS -ls /data/email/click/*/*.csv
I know how to create one RDD for all the data:
val sentRdd = sc.textFile( "/data/email/click/*/*.csv" )
This is what ultimately worked for me:
import org.apache.hadoop.fs._
import org.apache.spark.deploy.SparkHadoopUtil
import java.net.URI
val hdfs_conf = SparkHadoopUtil.get.newConfiguration(sc.getConf)
val hdfs = FileSystem.get(hdfs_conf)
// source data in HDFS
val sourcePath = new Path("/<source_location>/<filename_pattern>")
hdfs.globStatus( sourcePath ).foreach{ fileStatus =>
val filePathName = fileStatus.getPath().toString()
val fileName = fileStatus.getPath().getName()
// < DO STUFF HERE>
} // end foreach loop
这篇关于如何能一个列表星火斯卡拉内的HDFS的位置所有CSV文件的壳呢?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!