本文介绍了阅读递归火花的Java API从HDFS中的所有文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用的火花读取HDFS在一个RDD从目录中的所有文件的数据和它的子目录以及。我找不到任何有效的方法来做到这一点。于是,我试着写一些定制code,如下所示:

I 'm using spark to read data of all files from HDFS in a single RDD from a directory and it's sub directories as well. I could not find any efficient method to do that. So I tried to write some customized code as shown below:

public Object fetch(String source,String sink) {

    //reading data
    boolean isDir=new File(source).isDirectory();
    System.out.println("isDir="+isDir);
    JavaRDD<String> lines;
    if(isDir)
    {

        lines=readFiles(new File(source).listFiles(), null);
    }
    else
        lines= sc.textFile(source);

    lines.saveAsTextFile(sink);
    return true;
}

public static JavaRDD<String> readFiles(File[] files,JavaRDD<String> lines) {
    for (File file : files) {
        if (file.isDirectory()) {
            readFiles(file.listFiles(),lines); // Calls same method again.
        } 
        else {
            if(lines==null)
                lines=sc.textFile(file.getPath());
            else
            {
                JavaRDD<String> r=sc.textFile(file.getPath());
                lines.union(r);
            }
        }
    }
    return lines;
}

但这不是我做的工作预期为ISDIR包含虚假说明问题,它不是一个目录。
请ü可以指导我一下有什么不对?而且有一些有效的方法来做好这项工作?
非常感谢

but this is not doing my expected job as isDir contains false telling that it's not a directory.Please can u guide me about what's wrong? and is there some efficient way to do this job?Thanks alot

推荐答案

由于火花可以阅读基础上的Hadoop作业的配置数据,您可以使用 FileInputFormat#setInputDirRecursive 方法

As spark can read data based on a Hadoop Job configuration, you can use the FileInputFormat#setInputDirRecursive method.

JavaSparkContext context = new JavaSparkContext();

Job job;

try {
  job = Job.getInstance();
  FileInputFormat.setInputPaths(job, new Path("/path/to/input/directory));
  FileInputFormat.setInputDirRecursive(job, true);
} catch (IOException e1) {
  e1.printStackTrace();
  System.exit(1);
}

JavaRDD<Text> sourceData = context.newAPIHadoopRDD(job.getConfiguration(), TextInputFormat.class, LongWritable.class, Text.class)
  .values();

显然,你会最终有一个文本数据类型而不是字符串。

Obviously you will end up with a Text data type instead of a String.

这篇关于阅读递归火花的Java API从HDFS中的所有文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-23 22:26