本文介绍了mapreduce复合关键样本 - 不显示所需的输出的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述 新加入mapreduce& hadoop世界,在尝试了基本的mapreduce程序之后,我想尝试组合键示例代码。 输入数据集如下: 国家,州,县,人口百万美元 美国,CA,alameda,100美元 美国,CA,洛杉矶,200美元b $ b美国,加州,萨克拉门托,100美元 美国,FL,xxx,10 b $ b美国,FL,yyy,12 b $ b 如下所示: 美国,加州500美元 美国佛罗里达州22美元, p> 在这里,Country + State字段构成组合键。 我得到以下输出。由于某种原因,人口没有增加。有人可以指出我所犯的错误吗?另外请看看实现了WriteableComparable接口的Country.java类。可能是该实现有些问题。 美国,加州,100美元 美国,CA,200 美国,加州,100美元 美国,佛罗里达州10美元 美国,佛罗里达州,12美元b $ b 每个国家+州没有增加人口。 这是实现WritableComparable接口的Country类。 import java.io.DataInput中; import java.io.DataOutput; import java.io.File; import java.io.IOException; import java.util.Iterator; import org.apache.commons.io.FileUtils; 导入org.apache.hadoop.conf.Configuration; 导入org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; * Country类实现了WritabelComparator来实现自定义排序以通过操作来执行组。它 *排序国家,然后状态。 * * / public class Country implements WritableComparable< Country> { 文本国家; 文本状态; 公共国家(文本国家,文本状态){ this.country = country; this.state = state; } public Country(){ this.country = new Text(); this.state = new Text(); } / * *(非Javadoc) * * @ org.apache.hadoop.io.Writable #write(java.io.DataOutput) * / public void write(DataOutput out)throws IOException { this.country.write(out); this.state.write(out); } / * *(非Javadoc) * * @ org.apache.hadoop.io.Writable #readFields(java.io.DataInput) * / public void readFields(DataInput in)throws IOException { this.country.readFields(in); this.state.readFields(in); ; $ b *(非Javadoc) * * @see java.lang.Comparable#compareTo(java如果(pop == null) return 0; * / public int compareTo(Country pop){ if(pop == null) int intcnt = country.compareTo(pop.country); if(intcnt!= 0){ return intcnt; } else { return state.compareTo(pop.state); $ b *(非Javadoc) * * @see java.lang。 Object#toString() * / @Override public String toString(){ return country.toString()+:+ state.toString( ); } } 驱动程序: import java.io.DataInput; import java.io.DataOutput; import java.io.File; import java.io.IOException; import java.util.Iterator; 导入org.apache.commons.io.FileUtils; 导入org.apache.hadoop.conf.Configuration; 导入org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 公共类CompositeKeyDriver { 公共静态无效的主要(字串[] args)抛出IOException异常,ClassNotFoundException的,InterruptedException的{ Configuration conf = new Configuration(); Job job = Job.getInstance(conf,CompositeKeyDriver); //第一个参数是作业本身 //第二个参数是输入数据集的位置 FileInputFormat.addInputPath(job,new Path(args [0])); //第一个参数是作业本身 //第二个参数是输出路径的位置 FileOutputFormat.setOutputPath(job,new Path(args [1])) ; job.setJarByClass(CompositeKeyDriver.class); job.setMapperClass(CompositeKeyMapper.class); job.setReducerClass(CompositeKeyReducer.class); job.setOutputKeyClass(Country.class); job.setOutputValueClass(IntWritable.class); //将第二个参数设置为路径变量中的路径 Path outputPath = new Path(args [1]); //从hdfs中自动删除输出路径,以便我们没有显式删除 outputPath.getFileSystem(conf).delete(outputPath); System.exit(job.waitForCompletion(true)?0:1); } } Mapper程序: import java.io.DataInput; import java.io.DataOutput; import java.io.File; import java.io.IOException; import java.util.Iterator; 导入org.apache.commons.io.FileUtils; 导入org.apache.hadoop.conf.Configuration; 导入org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; //前两个参数是输入键和输入值。输入键=每行的偏移量(记住每行是一条记录)。输入值=行本身 //其次两个参数是映射器的输出键和输出值。顺便说一句,映射器的输出存储在本地文件系统中,而不是HDFS上。 //输出键=发送国家对象。输出值=该国家的人口数以百万计+州组合 公共类CompositeKeyMapper扩展映射器< LongWritable,Text,Country,IntWritable> { / ** cntry。 * / 国家cntry =新国家(); / ** cnt文本。 * / Text cntText = new Text(); / **国家文字。 * / Text stateText = new Text(); //国家+州的人口 IntWritable populat = new IntWritable(); / ** * *如果在程序中没有定义Reducer,那么Reducer在Map-Reduce中是可选的,然后Mapper *的输出直接写入磁盘没有排序。 * * / public void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException { // Reader will give Mapper中的每条记录都在一行中。 //该行与解除限制器分离, String line = value.toString(); String [] keyvalue = line.split(,); //国家是每个记录行中的第一个项目 cntText.set(new Text(keyvalue [0])); // State是每条记录行中的第二项 stateText.set(keyvalue [1]); //这是人口。顺便说一句,我们不能将Java原始数据类型发送到Context对象中。 Java原始数据类型在序列化和反序列化中无效。 //所以我们必须使用由mapreduce框架 $ b populat.set(Integer.parseInt(keyvalue [3]))提供的等效Writable数据类型; //在这里你要创建一个Country类的对象,并在构造器中分配国家名和州 Country cntry = new Country(cntText,stateText); //在这里,您将国家对象及其人口传递给上下文对象。 //请记住,country对象已经实现了与Java中的Comparable接口等价的WritableComparable接口。该实现位于Country.java类中//由于它实现了WritableComparable接口,Country对象可以在shuffle阶段进行排序。如果WritableComparable接口未实现,那么 //不能对对象进行排序。 context.write(cntry,populat); $ b Reducer程序: $ / code> import java.io.DataInput; import java.io.DataOutput; import java.io.File; import java.io.IOException; import java.util.Iterator; 导入org.apache.commons.io.FileUtils; 导入org.apache.hadoop.conf.Configuration; 导入org.apache.hadoop.fs.Path; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; //记住Mapper类的两个输出参数将成为reducer类的前两个输入参数。 公共类CompositeKeyReducer扩展了Reducer< Country,IntWritable,Country,IntWritable> { //减少方法的第一个参数是Country。国家对象有国家名称和国家名称(详情请查看Country.java类) //第二个参数values是Country + State(这是一个组合键)的人口集合 公共无效减少(国家键,迭代< IntWritable>值,语境上下文)抛出IOException异常,InterruptedException的{ INT numberofelements = 0; INT cnt = 0; while(values.hasNext()){ cnt = cnt + values.next()。get(); ) context.write(key,new IntWritable(cnt)); } } HashPartitioner 所以 您的 Country 类需要实现 hashCode()方法。 目前它将在 Object 上使用默认的 hashCode()实现,这将导致您的 以下是一个示例 hashCode()方法: @Override public int hashCode(){ final int prime = 31; int result = 1; result = prime * result +((country == null)?0:country.hashCode()); result = prime * result +((state == null)?0:state.hashCode()); 返回结果; } 其他信息: 为了安全起见,您应该设置 set 文本对象。目前,您在 Country 构造函数中执行此操作。 public国家/地区(Text country,Text state){ this.country = country; this.state = state; } 您应该将其更改为: public Country(Text country,Text state){ this.country.set(country); this.state.set(state); } Being new to mapreduce & hadoop world, after trying out basic mapreduce programs, I wanted to try compositekey sample code.Input dataset is as follows:Country,State,County,populationinmillionsUSA,CA,alameda,100USA,CA,losangels,200USA,CA,Sacramento,100USA,FL,xxx, 10USA,FL,yyy,12Desired output data should be like this:USA,CA,500USA,FL,22 Here instead Country+State fields form the composite key.I am getting the following output. The population is not getting added for some reason. Can someone point me the mistake I am doing. Also kindly take a look at the Country.java class which implements the WriteableComparable interface. May be something is wrong with that implementation.USA,CA,100USA,CA,200USA,CA,100USA,FL,10USA,FL,12The population is not getting added per Country+State. This is the Country class that implements WritableComparable interface.import java.io.DataInput;import java.io.DataOutput;import java.io.File;import java.io.IOException;import java.util.Iterator; import org.apache.commons.io.FileUtils;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; * The Country class implements WritabelComparator to implements custom sorting to perform group by operation. It * sorts country and then state. * */public class Country implements WritableComparable<Country> { Text country; Text state; public Country(Text country, Text state) { this.country = country; this.state = state; } public Country() { this.country = new Text(); this.state = new Text(); } /* * (non-Javadoc) * * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput) */ public void write(DataOutput out) throws IOException { this.country.write(out); this.state.write(out); } /* * (non-Javadoc) * * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput) */ public void readFields(DataInput in) throws IOException { this.country.readFields(in); this.state.readFields(in); ; } /* * (non-Javadoc) * * @see java.lang.Comparable#compareTo(java.lang.Object) */ public int compareTo(Country pop) { if (pop == null) return 0; int intcnt = country.compareTo(pop.country); if (intcnt != 0) { return intcnt; } else { return state.compareTo(pop.state); } } /* * (non-Javadoc) * * @see java.lang.Object#toString() */ @Override public String toString() { return country.toString() + ":" + state.toString(); }}Driver Program:import java.io.DataInput;import java.io.DataOutput;import java.io.File;import java.io.IOException;import java.util.Iterator;import org.apache.commons.io.FileUtils;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class CompositeKeyDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "CompositeKeyDriver"); //first argument is job itself //second argument is location of the input dataset FileInputFormat.addInputPath(job, new Path(args[0])); //first argument is the job itself //second argument is the location of the output path FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setJarByClass(CompositeKeyDriver.class); job.setMapperClass(CompositeKeyMapper.class); job.setReducerClass(CompositeKeyReducer.class); job.setOutputKeyClass(Country.class); job.setOutputValueClass(IntWritable.class); //setting the second argument as a path in a path variable Path outputPath = new Path(args[1]); //deleting the output path automatically from hdfs so that we don't have delete it explicitly outputPath.getFileSystem(conf).delete(outputPath); System.exit(job.waitForCompletion(true) ? 0 : 1);}}Mapper program:import java.io.DataInput;import java.io.DataOutput;import java.io.File;import java.io.IOException;import java.util.Iterator;import org.apache.commons.io.FileUtils;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; // First two parameters are Input Key and Input Value. Input Key = offset of each line (remember each line is a record). Input value = Line itself // Second two parameters are Output Key and Output value of the Mapper. BTW, the outputs of the mapper are stored in the local file system and not on HDFS. // Output Key = Country object is sent. Output Value = population in millions in that country + state combination public class CompositeKeyMapper extends Mapper<LongWritable, Text, Country, IntWritable> { /** The cntry. */ Country cntry = new Country(); /** The cnt text. */ Text cntText = new Text(); /** The state text. */ Text stateText = new Text(); //population in a Country + State IntWritable populat = new IntWritable(); /** * * Reducer are optional in Map-Reduce if there is no Reducer defined in program then the output of the Mapper * directly write to disk without sorting. * */ public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //Reader will give each record in a line to the Mapper. //That line is split with the de-limiter "," String line = value.toString(); String[] keyvalue = line.split(","); //Country is the first item in the line in each record cntText.set(new Text(keyvalue[0])); //State is the second item in the line in each record stateText.set(keyvalue[1]); //This is the population. BTW, we can't send Java primitive datatypes into Context object. Java primitive data types are not effective in Serialization and De-serialization. //So we have to use the equivalent Writable datatypes provided by mapreduce framework populat.set(Integer.parseInt(keyvalue[3])); //Here you are creating an object of Country class and in the constructor assigning the country name and state Country cntry = new Country(cntText, stateText); //Here you are passing the country object and their population to the context object. //Remember that country object already implements "WritableComparable" interface which is equivalient to "Comparable" interface in Java. That implementation is in Country.java class //Because it implements the WritableComparable interface, the Country objects can be sorted in the shuffle phase. If WritableComparable interface is not implemented, we //can't sort the objects. context.write(cntry, populat); }}Reducer program:import java.io.DataInput;import java.io.DataOutput;import java.io.File;import java.io.IOException;import java.util.Iterator;import org.apache.commons.io.FileUtils;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.FloatWritable;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; //Remember the two output parameters of the Mapper class will become the first two input parameters to the reducer class. public class CompositeKeyReducer extends Reducer<Country, IntWritable, Country, IntWritable> { // The first parameter to reduce method is "Country". The country object has country name and state name (look at the Country.java class for more details. // The second parameter "values" is the collection of population for Country+State (this is a composite Key) public void reduce(Country key, Iterator<IntWritable> values, Context context) throws IOException, InterruptedException { int numberofelements = 0; int cnt = 0; while (values.hasNext()) { cnt = cnt + values.next().get(); } context.write(key, new IntWritable(cnt)); }} 解决方案 You're using the HashPartitioner so your Country class needs to implement the hashCode() method.At the moment it will be using the default hashCode() implementation on Object which will result in your keys not grouping correctly.Here's an example hashCode() method:@Overridepublic int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((country == null) ? 0 : country.hashCode()); result = prime * result + ((state == null) ? 0 : state.hashCode()); return result;}Additional information:To be on the safe side you should set Text objects. At the moment you do this in your Country constructor.public Country(Text country, Text state) { this.country = country; this.state = state;}You should change this to:public Country(Text country, Text state) { this.country.set(country); this.state.set(state);} 这篇关于mapreduce复合关键样本 - 不显示所需的输出的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!
10-30 01:51