mr 程序自定义分组的实现-LMLPHP

mr 程序自定义分组的实现-LMLPHP

AreaPartitioner

package cn.itcast.hadoop.mr.areapartition;

import java.util.HashMap;
import org.apache.hadoop.mapreduce.Partitioner;

public class AreaPartitioner<KEY, VALUE> extends Partitioner<KEY, VALUE> {

	private static HashMap<String, Integer> areaMap = new HashMap<>();

	static {
		areaMap.put("135", 0);
		areaMap.put("136", 1);
		areaMap.put("137", 2);
		areaMap.put("138", 3);
		areaMap.put("139", 4);
	}

	@Override
	public int getPartition(KEY key, VALUE value, int numPartitions) {
		// 从 key 中拿到手机号,查询手机归属字典,不同省份返回不同的组号

		int areaCoder = areaMap.get(key.toString().substring(0, 3)) == null ? 5
				: areaMap.get(key.toString().substring(0, 3));

		return areaCoder;
	}

}

FlowSumArea

package cn.itcast.hadoop.mr.areapartition;

import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import cn.itcast.hadoop.mr.flowsum.FlowBean;


/**
 * 对流量原始日志进行流量统计,将不同省份的用户统计结果输出到不同文件
 * 需要自定义改造两个机制:
 * 1.改造分区的逻辑,自定义一个parttioner
 * 2.自定义 reduce task 的并发任务数量
 *
 * @author duanhaitao@itcast.cn
 *
 */
public class FlowSumArea {

	public static class FlowSumAreaMapper extends Mapper<LongWritable, Text, Text, FlowBean>{

		@Override
		protected void map(LongWritable key, Text value,Context context)
				throws IOException, InterruptedException {

			//拿一行数据
			String line = value.toString();
			//切分各个字段
			String[] fields = StringUtils.split(line, "\t");

			//拿到我们需要的字段
			String phoneNB = fields[1];
			long u_flow = Long.parseLong(fields[7]);
			long d_flow = Long.parseLong(fields[8]);

			//封装成 kv 并输出
			context.write(new Text(phoneNB), new FlowBean(phoneNB,u_flow,d_flow));

		}


	}


	public static class FlowSumAreaReducer extends Reducer<Text, FlowBean, Text, FlowBean>{

		@Override
		protected void reduce(Text key, Iterable<FlowBean> values,Context context)
				throws IOException, InterruptedException {

			long up_flow_counter = 0;
			long d_flow_counter = 0;

			for(FlowBean bean: values){

				up_flow_counter += bean.getUp_flow();
				d_flow_counter += bean.getD_flow();


			}

			context.write(key, new FlowBean(key.toString(), up_flow_counter, d_flow_counter));



		}

	}

	public static void main(String[] args) throws Exception {

		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);

		job.setJarByClass(FlowSumArea.class);

		job.setMapperClass(FlowSumAreaMapper.class);
		job.setReducerClass(FlowSumAreaReducer.class);

		//设置我们自定义的逻辑定义
		job.setPartitionerClass(AreaPartitioner.class);


		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(FlowBean.class);

		//设置reduce的任务并发数,应该跟分组的数量保持一致
		job.setNumReduceTasks(6);


		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));


		System.exit(job.waitForCompletion(true)?0:1);


	}


}

打包 jar 包,上传:

mr 程序自定义分组的实现-LMLPHP

上面最后一个加个 2 。。

mr 程序自定义分组的实现-LMLPHP

mr 程序自定义分组的实现-LMLPHP

reduce 并发数量如果 < 分组数,会报错;但是改成 1 不会报错。。。

reduce 并发数量如果 < 分组数,多的分组没有数据

map 不会涉及到业务逻辑,,如果有 10 个map ,每个就处理 1/10 的数据,map 的并发量是可以任意去设置的。

10-04 12:29