1 Shuffle

  1. step1(input)
  • InputFormat
- 读取数据
- 转换成<key,value>
  • FileInputFormat
- TextInputFormat
  1. step2 (map)
  • ModuleMapper
- map(KEYIN,VALUEIN,KEYOUT,VALUEOUT),
- 默认情况下:KEYIN : LongWritable,  VALUEIN:TEXT
  1. step3 (shuffle)
    process
 map,output<key,value>
* memory
* spill,溢写到磁盘中,可能有很多文件。(1)分区 partition (2) 排序 sort
很多小文件,spill
* 合并,merge
* 排序
大文件
* copy, Reduce Task 会到 Map Task 运行的机器上,拷贝要处理的数据
* 合并,merge,排序
* 分组 group ,将相同 key 的 value 放在一起

  1. reduce
  2. output
- OutputFormat
- FileOutputFormat, TextOutputFormat,
- 每个key-value对,输出一行,key 与 value 中间分隔符为 \t
- 默认调用 Key 和 Value 的 toString() 方法
  • Map-01
  • Map-02
  • Map-03

1.1 总结

  1. 分区:partitioner
  2. 排序
  3. copy, 用户无法干涉
  4. 分组,
  5. 可设置
* 压缩
* combiner

2 MapReduce Shuffle 过程的 5 步如何在 Job 中设置

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;


public class ModelMapReduce extends Configured implements Tool {


    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        conf.set("mapreduce.map.output.compress","true");
        conf.set("mapreduce.map.output.compress.codec","");

        int status =  ToolRunner.run(conf,new WordCount(),args);

        System.exit(status);
    }

    public static class ModelMapper extends Mapper<LongWritable, Text, Text, IntWritable> {


        @Override
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        }

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {

        }

        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {

        }

        @Override
        public void run(Context context) throws IOException, InterruptedException {

        }
    }

    public static class ModelReducer extends Reducer<Text, IntWritable, Text, IntWritable> {


        @Override
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

        }

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {

        }

        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {

        }

        @Override
        public void run(Context context) throws IOException, InterruptedException {

        }
    }

    public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        Configuration conf = getConf();

        Job job = Job.getInstance(conf, this.getClass().getSimpleName());

        job.setJarByClass(this.getClass());

        Path inPath = new Path(args[0]);

        FileInputFormat.addInputPath(job, inPath);

        job.setMapperClass(ModelMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //********************* Shuffle*******************************************
        // 1. partitioner
        // job.setPartitionerClass();

        //2. sort
        //job.setSortComparatorClass();

        //3. combioner[optional]
        //job.setCombinerClass();

        //4.group
        //job.setGroupingComparatorClass();

        //************************ Shuffle ***********************************************
        job.setReducerClass(ModelReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        Path outPath = new Path(args[1]);
        FileOutputFormat.setOutputPath(job, outPath);

        boolean isSuccess = job.waitForCompletion(true);

        return isSuccess ? 0 : 1;
    }
}

3 MapReduce 调优

  • Reduce Task Number
  • Map Task 输出压缩
  • Shuffle 参数
02-01 01:05