嗨,我是Hadoop Mapreduce编程的新手。实际上,我有如下要求:

较大的文件,即输入文件input.txt

101 Vince 12000

102 James 33

103 Tony 32

104 John 25

105 Nataliya 19

106 Anna 20

107 Harold 29

这是较小的文件lookupfile.txt
101 Vince 12000

102 James 10000

103 Tony 20000

104 John 25000

105 Nataliya 15000

现在,我们想要得到的结果具有相同的ID号。因此,为了实现此目的,请使用较小的文件作为查找文件,使用较大的文件作为输入文件。下面给出了完整的Java代码和每个组件的说明:

这是我们在运行上述代码后所得到的结果。
102     James   33      10000

103     Tony    32      20000

104     John    25      25000

105     Nataliya        19      15000

码:
public class Join extends Configured implements Tool
{

public static class JoinMapper extends Mapper
{
   Path[] cachefiles = new Path[0]; //To store the path of lookup files
   List exEmployees = new ArrayList();//To store the data of lookup files

  /********************Setup Method******************************************/
  @Override
  public void setup(Context context)

   {
    Configuration conf = context.getConfiguration();

   try
   {

  cachefiles = DistributedCache.getLocalCacheFiles(conf);
  BufferedReader reader = new BufferedReader(new FileReader(cachefiles[0].toString()));

      String line;

 while ((line = reader.readLine())!= null)
  {
   exEmployees.add(line);  //Data of lookup files get stored in list object
  }

  }
 catch (IOException e)
 {
  e.printStackTrace();
 }

   }


    /************************setup method ends***********************************************/

   /********************Map Method******************************************/

     public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException
    {


   String[] line = value.toString().split("\t");


   for (String e : exEmployees)
     {

      String[] listLine = e.toString().split("\t");

        if(line[0].equals(listLine[0]))

      {
     context.write(new Text(line[0]), new Text(line[1]+"\t"+line[2]+"\t"+listLine[2]));
      }

    }


   }    //map method ends
 /***********************************************************************/


}

   /********************run Method******************************************/

    public int run(String[] args) throws Exception
    {

  Configuration conf = new Configuration();
  Job job = new Job(conf, "aggprog");
  job.setJarByClass(Join.class);
  DistributedCache.addCacheFile(new Path(args[0]).toUri(), job.getConfiguration());

  FileInputFormat.addInputPath(job, new Path(args [1]));
  FileOutputFormat.setOutputPath(job, new Path(args [2]));
  job.setMapperClass(JoinMapper.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);
  return (job.waitForCompletion(true) ? 0 : 1);
}

 public static void main (String[] args) throws Exception
 {
 int ecode = ToolRunner.run(new Join(), args);
   System.exit(ecode);
 }
 }

执行命令:

情况1:
hadoop jar '/home/cloudera/Desktop/DistributedCache.jar' Join My_Job/MultiInput_1/Input/input.txt My_Job/MultiInput_1/Input/smallerinput.txt My_Job/MultiInput_1/My_Output

情况2:
hadoop jar '/home/cloudera/Desktop/DistributedCache.jar' Join My_Job/MultiInput_1/Input/input.txt My_Job/MultiInput_1/Input/smallerinput.txt My_Job/MultiInput_1/My_Output

我已经尝试了以上两个命令,但是它不起作用。我不知道问题出在哪里,问题也出在哪里。我无法执行以上代码。

最后我尝试了下面的代码
hadoop jar '/home/cloudera/Desktop/DistributedCache.jar' Join hdfs/Input/smallerfile.txt hdfs/Input/input.txt My_Job/MultiInput_1/MyOutput

我发现了我的错误。我正在用小文件检查大文件。但是,当我尝试以相反的方式为我工作时,但是输出却没有达到预期。

预期输出为:
101 Vince 12000

102 James 33 10000

103 Tony 32 20000

104 John 25 25000

105 Nataliya 19 15000

106 Anna 20

107 Harold 29

但是我的输出是:
101 Vince 12000

102 James 33 10000

103 Tony 32 20000

104 John 25 25000

105 Nataliya 19 15000

106 Anna 20

107 Harold 29

有人可以帮我吗?

最佳答案

是的user3475485。您的文件应放在hdfs中以运行此代码,或者因为您正在使用Genericoptionsparse,所以可以使用此格式

hadoop jar jarname.jar驱动程序名称-filesfile1,file2应该适合您。

08-07 17:13