嗨,我是Hadoop Mapreduce编程的新手。实际上,我有如下要求:
较大的文件,即输入文件input.txt
101 Vince 12000
102 James 33
103 Tony 32
104 John 25
105 Nataliya 19
106 Anna 20
107 Harold 29
这是较小的文件lookupfile.txt
101 Vince 12000
102 James 10000
103 Tony 20000
104 John 25000
105 Nataliya 15000
现在,我们想要得到的结果具有相同的ID号。因此,为了实现此目的,请使用较小的文件作为查找文件,使用较大的文件作为输入文件。下面给出了完整的Java代码和每个组件的说明:
这是我们在运行上述代码后所得到的结果。
102 James 33 10000
103 Tony 32 20000
104 John 25 25000
105 Nataliya 19 15000
码:
public class Join extends Configured implements Tool
{
public static class JoinMapper extends Mapper
{
Path[] cachefiles = new Path[0]; //To store the path of lookup files
List exEmployees = new ArrayList();//To store the data of lookup files
/********************Setup Method******************************************/
@Override
public void setup(Context context)
{
Configuration conf = context.getConfiguration();
try
{
cachefiles = DistributedCache.getLocalCacheFiles(conf);
BufferedReader reader = new BufferedReader(new FileReader(cachefiles[0].toString()));
String line;
while ((line = reader.readLine())!= null)
{
exEmployees.add(line); //Data of lookup files get stored in list object
}
}
catch (IOException e)
{
e.printStackTrace();
}
}
/************************setup method ends***********************************************/
/********************Map Method******************************************/
public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException
{
String[] line = value.toString().split("\t");
for (String e : exEmployees)
{
String[] listLine = e.toString().split("\t");
if(line[0].equals(listLine[0]))
{
context.write(new Text(line[0]), new Text(line[1]+"\t"+line[2]+"\t"+listLine[2]));
}
}
} //map method ends
/***********************************************************************/
}
/********************run Method******************************************/
public int run(String[] args) throws Exception
{
Configuration conf = new Configuration();
Job job = new Job(conf, "aggprog");
job.setJarByClass(Join.class);
DistributedCache.addCacheFile(new Path(args[0]).toUri(), job.getConfiguration());
FileInputFormat.addInputPath(job, new Path(args [1]));
FileOutputFormat.setOutputPath(job, new Path(args [2]));
job.setMapperClass(JoinMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
return (job.waitForCompletion(true) ? 0 : 1);
}
public static void main (String[] args) throws Exception
{
int ecode = ToolRunner.run(new Join(), args);
System.exit(ecode);
}
}
执行命令:
情况1:
hadoop jar '/home/cloudera/Desktop/DistributedCache.jar' Join My_Job/MultiInput_1/Input/input.txt My_Job/MultiInput_1/Input/smallerinput.txt My_Job/MultiInput_1/My_Output
情况2:
hadoop jar '/home/cloudera/Desktop/DistributedCache.jar' Join My_Job/MultiInput_1/Input/input.txt My_Job/MultiInput_1/Input/smallerinput.txt My_Job/MultiInput_1/My_Output
我已经尝试了以上两个命令,但是它不起作用。我不知道问题出在哪里,问题也出在哪里。我无法执行以上代码。
最后我尝试了下面的代码
hadoop jar '/home/cloudera/Desktop/DistributedCache.jar' Join hdfs/Input/smallerfile.txt hdfs/Input/input.txt My_Job/MultiInput_1/MyOutput
我发现了我的错误。我正在用小文件检查大文件。但是,当我尝试以相反的方式为我工作时,但是输出却没有达到预期。
预期输出为:
101 Vince 12000
102 James 33 10000
103 Tony 32 20000
104 John 25 25000
105 Nataliya 19 15000
106 Anna 20
107 Harold 29
但是我的输出是:
101 Vince 12000
102 James 33 10000
103 Tony 32 20000
104 John 25 25000
105 Nataliya 19 15000
106 Anna 20
107 Harold 29
有人可以帮我吗?
最佳答案
是的user3475485。您的文件应放在hdfs中以运行此代码,或者因为您正在使用Genericoptionsparse,所以可以使用此格式
hadoop jar jarname.jar驱动程序名称-filesfile1,file2应该适合您。