前言

MapReduce是一种分布式计算框架,最初由Google设计和实现,用于处理大规模数据集的并行计算。它的核心思想是将大规模数据集分解成多个小的子任务,并在分布式计算环境中并行地进行处理和计算。MapReduce框架的设计目标是简化并行计算的编程模型,使开发人员能够更轻松地编写并行计算任务,而不用关心底层的分布式细节。

一、InputFormat数据输入

1.1 切片与MapTask并行度决定机制

1.1.1 问题引出

MapTask的并行度决定Map阶段的任务处理并发度,进而影响到整个Job的处理速度。
思考:1G的数据,启动8个MapTask,可以提高集群的并发处理能力。那么1K的数据,也启动8个MapTask,会提高集群性能吗?MapTask并行任务是否越多越好呢?哪些因素影响了MapTask并行度?
对于1K的数据,启动8个MapTask并不会明显提高集群性能,反而可能增加了任务调度和通信开销。因为数据量较小,每个MapTask的处理时间会很短,而任务调度和通信的开销可能会占据更大的比例。因此,在这种情况下,启动较少的MapTask可能更为合适

1.1.2 MapTask并行度决定机制

数据块:Block是HDFS物理上把数据分成一块一块。数据块是HDFS存储数据单位。
数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。数据切片是MapReduce程序计算输入数据的单位,一个切片会对应启动一个MapTask。

1.1.3 数据切片与MapTask并行度决定机制

  • 一个Job的Map阶段并行度是由客户端在提交Job时的切片数决定的
  • 每一个split切片分配一个并行实例处理
  • 默认情况下,切片大小 = 块大小(BlockSize)
  • 切片时不考虑数据集整体,而是针对每一个文件单独切片

1.2 FileInputFormat切片机制

1.2.1 切片大小参数配置

  • 计算切片大小的公式:Math.max(minSize,Math.min(maxSize,blockSize));
  • 切片大小中设置
    • mapreduce.input.fileinputformat.split.minsize=1
    • mapreduce.input.fileinputformat.split.maxsize=Long.MAXValue
  • 获取切片信息API
    • 获取切片的文件名称:String name = inputSplit.getPath().getName();
    • 根据文件类型获取切片信息:FileSplit inputSplit = (FileSplit)context.getInputSplit();

1.2.2 切片机制

(1)简单的按照文件的内容长度进行切片
(2)切片大小,默认等于Block大小
(3)切片时不考虑数据集整体,而是针对每一个文件单独切片

案例

(1)两个数据文件

(2)经过FileInputFormat的切片机制运算之后,形成的切片信息如下:

1.3 TextInputFormat

1.3.1 FileInputFormat实现类

思考:在运行MapReduce程序时,输入的文件格式包括:基于行的日志文件、二进制格式文件、数据库表等。那么,针对不同的数据类型,MapReduce是如何读取这些数据的呢?

对于不同的数据类型,MapReduce提供了相应的InputFormat来读取数据。

  1. 基于行的日志文件:

    MapReduce使用TextInputFormat来读取基于行的日志文件,它将文件逐行分割成键值对,其中键是行的偏移量,值是文本行的内容。

  2. 二进制格式文件:

    如果需要处理二进制格式的文件,可以使用SequenceFileInputFormat。这个InputFormat能够读取Hadoop序列文件(Sequence File),它提供了将多个键值对按顺序组织到一个文件中的功能。

  3. 数据库表:

    如果要读取数据库表中的数据,可以使用DBInputFormat。这个InputFormat需要配置数据库连接信息,并提供输入查询语句,通过将数据库表中的每一行作为键值对,进行MapReduce处理。

  4. 其他格式文件:

    对于其他类型的文件,你可以实现自定义的InputFormat来读取特定格式的数据。你需要继承InputFormat类,并实现一些必要的方法,如getSplitscreateRecordReader等,以便MapReduce框架能够正确读取你的数据。

MapReduce提供了多种InputFormat来支持不同类型的数据输入。

FileInputFormat常见的接口实现类包括:TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat和自定义InputFormat等。

1.3.2 TextInputFormat

TextInputFormat是MapReduce中用于读取基于行的文本文件的输入格式,是默认的FileInputFormat实现类。按行读取每条记录。键是存储该行在整个文件中的起始字节偏移量(从文件开头到该行的字节数), LongWritable类型。值是该行的文本,不包括任何行终止符(换行符和回车符),Text类型。

1.4 CombineTextInputFormat切片机制

  • 应用场景:CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个MapTask处理。
  • 虚拟存储切片最大值设置:CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
    注意:虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值。
  • 切片机制:生成切片过程包括:虚拟存储过程和切片过程二部分。
  1. 虚拟存储过程:
    将输入目录下所有文件大小,依次和设置的setMaxInputSplitSize值比较,如果不大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍,那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值2倍,此时将文件均分成2个虚拟存储块(防止出现太小切片)。
    例如setMaxInputSplitSize值为4M,输入文件大小为8.02M,则先逻辑上分成一个4M。剩余的大小为4.02M,如果按照4M逻辑划分,就会出现0.02M的小的虚拟存储文件,所以将剩余的4.02M文件切分成(2.01M和2.01M)两个文件。
  2. 切片过程:
    (a)判断虚拟存储的文件大小是否大于setMaxInputSplitSize值,大于等于则单独形成一个切片。
    (b)如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片。

CombineTextInputFormat通过设置最大输入切片大小,将多个小文件合并到一个切片中,从而减少了切片的数量,提高了作业的执行效率。这样可以减少Map任务的启动次数,减少任务间的通信开销,提高整体处理速度。

CombineTextInputFormat的切片机制如下:

  1. 默认情况下,CombineTextInputFormat将输入切片的大小设置为64MB。
  2. 框架会根据设定的输入切片大小,将多个小文件按照文件大小和块的边界进行组合,生成更大的输入切片。
  3. 这些组合后的输入切片被分配给Map任务进行处理。

使用CombineTextInputFormat的方法如下:

  1. 在作业的驱动类中设置输入格式为CombineTextInputFormat
  2. 调用CombineTextInputFormat.setInputPaths()方法设置输入路径。
  3. 可以通过调用CombineTextInputFormat.setMaxInputSplitSize()方法自定义最大切片大小,单位为字节。

示例代码:

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;

public class CombineTextFileProcessing {
    
    public static class TextMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
        
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            // 处理每一行文本的逻辑
            // ...
        }
    }
    
    public static void main(String[] args) throws Exception {
        Job job = Job.getInstance();
        job.setJarByClass(CombineTextFileProcessing.class);
        
        // 设置输入格式为CombineTextInputFormat
        job.setInputFormatClass(CombineTextInputFormat.class);
        
        // 设置输入路径
        CombineTextInputFormat.setInputPaths(job, new Path("input_path"));
        
        // 设置最大切片大小,默认为64MB
        CombineTextInputFormat.setMaxInputSplitSize(job, 128 * 1024 * 1024); // 128MB
        
        // 设置Mapper类和输出键值对类型
        job.setMapperClass(TextMapper.class);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Text.class);
        
        // 设置输出路径
        // job.setOuputFormatClass(XXXOutputFormat.class);
        // XXXOutputFormat.setOutputPath(job, new Path("output_path"));
        
        // 提交作业
        job.waitForCompletion(true);
    }
}

通过使用CombineTextInputFormat,就可以更有效地处理多个小文件,并提高MapReduce作业的执行效率。

二、MapReduce工作流程

MapReduce是一种用于处理大规模数据集的编程模型和算法。它主要用于分布式计算环境中,例如Hadoop集群。下面是MapReduce的工作流程:

MapReduce框架原理-LMLPHP

  1. 划分输入数据:将输入数据拆分为多个数据块,每个数据块称为一个Input Split。

  2. Map阶段:Map阶段是将输入数据转换为键值对(Key-Value pairs)的过程,也就是将输入数据映射为中间数据。

  3. Shuffle阶段:Shuffle阶段将Map阶段输出的中间数据按照键进行分组,然后将相同键的所有值收集到一起。

  4. Reduce阶段:Reduce阶段是将Shuffle阶段输出的中间数据进行聚合和计算的过程,最终输出最终结果。

三、OutputFormat数据输出

在MapReduce中,OutputFormat用于定义数据输出的格式。它负责将Reduce阶段的输出数据写入指定的输出目录或者其他存储介质。

在Hadoop中,你可以使用各种类型的OutputFormat来定义输出格式,包括:

  • TextOutputFormat:以文本格式输出键值对,每个键值对占据一行。
  • SequenceFileOutputFormat:以二进制序列文件格式输出键值对,可以用于高效地序列化和反序列化数据。
  • MultipleOutputs:用于将不同类型的输出数据写入不同的输出文件或目录。
  • DBOutputFormat:用于将输出数据写入关系型数据库。

除了以上几种常用的OutputFormat,还可以根据自己的需求实现自定义的OutputFormat。自定义OutputFormat需要实现OutputFormat接口,并重写相应的方法来实现数据的输出格式。

四、MapTask工作机制

MapReduce框架原理-LMLPHP

MapTask工作详细步骤:

  1. 读取数据组件InputFormat(默认 Textlnputformat) 会通过getsplit方法对输入目录中文件进行逻辑切片规划得到block,有多少block就对应启动多少个Maptask
  2. 将输入文件切分为block后,由RecordReader对象(默认为 LineRecordReader)进行读取,以\n作为分隔符,读取一行数据,返回 <key,value>,key表示我们行首字符偏移量,value表示这一行文本内容
  3. 读取block返回<key,value>,进入用户自己集成Mapper类中,执行用户重写的map函数,RecordReader读取一行这里调用一次
  4. Mapper逻辑结束之后,将Mapper的每条结果通过context.write进行collection数据收集,再collect中,会先对其进行分区处理,默认使用HashPartitioner。
    MapReduce提供Partitioner接口,它的作用就是根据kev或value及Reducer的数量来决定对当前的这对输出教据最终应该交给哪个Reduce task外理,默认对key hash后再以reduce的数量取模,默认的取模方式只是为了平均Reduce的处理能力,如果用户自己对Partitioner有需求,可以定制并设置到ob上
  5. 接下来,会将数据写入内存,内存中这片区域叫做环形缓冲中区,缓冲中区的作用是批量收集Mapper结果,减少磁盘IO的影响,我们的key/value对 以及Partition的结果都会被写入缓冲区,当然key与value值都会被序列化成字节数组环形缓冲区其实是一个数组,数组中存放着key和value的序列化数据,与key,value的元数据信息,包括Partition,key的起始位置。value的起始位置,以及value的长度,环形结构是一个抽象概念。 缓冲区是有大小限制,默认为100M,当Mapper的输出结果很多时,很可能会撑爆内存,所以需要再一定条件下将缓冲区的数据临时下入磁盘,然后重新利用这块缓冲区,这个从内存往磁盘写数据的过程称为spill,中文翻译为溢写,这个溢写是有单独线程完成,不影响往缓冲中区写Mapper结果的线程。溢写线程启动时不应该明止Mapper结果输出,所以整个缓冲区中有个溢写比例spill.percent,这个比例默认为0.8,也就是当缓冲区数据已经达到國值buffer.sizespill.percent=100M0.8=80M时,溢写线程启动,锁定80M内存,执行溢写过程。Mapper的数据结果还以往剩下的20M中写,互不影响
  6. 当溢写线程启动后,需要对着80M空间内的Key做排序(Sort)排序是MapReduce模型默认行为,这里的排序是对序列化的字节做的排序如果job设置Combiner,那么现在就是使用Combiner的时候了,将有相同Key的Key/value对的value加起来,减少溢写到磁盘的数据量,Combiner会优化MapReduce的中间结果,所以它在整个模型中会多次使用
    在哪些场景才能使用Combiner呢?从这里分析,Combiner的输出是Reducer的输出,Combiner绝不能改变最终的计算结果,Combiner只应该用于那种Reduce的输入K/V与输出的K/V类型完全一致,且不影响最终结果的场景,例如累加、最大值等、Combiner的使用一定得慎重,如果用好,它对job执行效率有帮助,反之会影响Reducer的最终结果
  7. 合并溢写文件,每次溢写会在磁盘上生成一个临时文件(写之前判断是否有Combiner),如果Mapper的暑促结果真的很大,有多次这样的溢写发生,磁盘上响应的就会有多个临时文件存在,当整个数据处理结束之后开始对磁盘中的临时文件进行Merae合并,因为最终的文件只有一个,写入磁盘,并目为这个文件提供了一个索引文件,以记录每个reduce对应数据的偏移量

五、ReduceTask工作机制

MapReduce框架原理-LMLPHP

ReduceTask工作详细步骤:

  1. Copy阶段:ReduceTasK从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定國值,则写到磁盘上,否则直接放到内存中。
  2. Merge阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。
  3. Sort阶段:按照MapReduce语义,用户编写reduce0函数输入数据是按Key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。
  4. Reduce阶段:reduce0函数将计算结果写到HDFS上。键相等的键值对调用一次reduce方法,每次调用会产生零个或者多个键值对

六、MapReduce的shuffle过程

因为频繁的磁盘I/O操作会严重的降低效率,因此“中间结果”不会立马写入磁盘,而是优先存储到map节点的“环形内存缓冲区”,在写入的过程中进行分区(partition),也就是对于每个键值对来说,都增加了一个partition属性值,然后连同键值对一起序列化成字节数组写入到缓冲区(缓冲区采用的就是字节数组,默认大小为100M)。当写入的数据量达到预先设置的阙值后(mapreduce.map.io.sort.spill.percent,默认0.80,或者80%)便会启动溢写出线程将缓冲区中的那部分数据溢出写(spill)到磁盘的临时文件中,并在写入前根据key进行排序(sort)和合并(combine,可选操作)。溢出写过程按轮询方式将缓冲区中的内容写到mapreduce.cluster.local.dir属性指定的目录中。当整个map任务完成溢出写后,会对磁盘中这个map任务产生的所有临时文件(spill文件)进行归并(merge)操作生成最终的正式输出文件,此时的归并是将所有spill文件中的相同partition合并到一起,并对各个partition中的数据再进行次排序(sort),生成key和对应的value-list,文件归并时,如果溢写文件数量超过参数min.num.spills.for.combine的值(默认为3)时,可以再次进行合并。至此,map端shuffle过程结束,接下来等待reduce task来拉取数据。对于reduce端的shufle过程来说,reduce task在执行之前的工作就是不断地拉取当前job里每个map task的最终结果,然后对从不同地方拉取过来的数据不断地做merge最后合并成一个分区相同的大文件,然后对这个文件中的键值对核照key进行sort排序,排好序之后紧接着进行分组,分组完成后才将整个文件交给reduce task处理。
MapReduce框架原理-LMLPHP
MapReduce框架原理-LMLPHP

MapReduce的shuffle过程:

  1. 分区Partiion:在将map()函数处理后得到的(key,value)对写入到缓冲中区之前,需要先进行分区操作,这样就能把map任务处理的结果发送给指定的reducer去执行,从而达到负载均衡,避免数据倾斜。
  2. 写入环形内存缓冲区:因为频繁的磁盘I/O操作会严重的降低效率,因此“中间结果”不会立马写入磁盘,而是优先存储到map节点的“环形内存缓冲区”,并做一些预排序以提高效率,当写入的数据量达到预先设置的阈值后便会执行一次IO操作将数据写入到磁盘。每个map任务都会分配一个环形内存缓冲区,用于存储map任务输出的键值对(默认大小100MB,mapreduce.task.io.sort.mb调整)以及对应的partition,被缓冲的(key,value)对已经被序列化(为了写入磁盘)。
  3. 执行溢写出:一旦缓冲区内容达到阈值(mapreduce.map.io.sor.spill.percent,默认0.80,或者80%),就会会锁定这80%的内存,并在每个分区中对其中的键值对按键进行sort排序,具体是将数据按照partition和kev两个关键字进行排序,排序结果为缓冲中区内的数据按照partition为单位聚集在一起,同一个partition内的数据按照key有序。排序完成后会创建一个溢出写文件(临时文件),然后开启一个后台线程把这部分数据以一个临时文件的方式溢出写(spill)到本地磁盘中(如果客户端自定义了Combiner(相当于map阶段的reduce),则会在分区排序后到溢写出前自动调用combiner,将相同的key的value相加,这样的好处就是减少溢写到磁盘的数据量。这个过程叫“合并”)。剩余的20%的内存在此期间可以继续写入map输出的键值对。溢出写过程技轮询方式将缓冲区中的内容写到mapreduce,cluster.local.dir属性指定的目录中。
  4. 归并merge:当一个map task处理的数据很大,以至于超过缓缓冲区内存时,就会生成多个spill文件。此时就需要对同一个map任务产生的多个spill文件进行归并生成最终的一个已分区且已排序的大文件。配置属性mapreduce.task.io.sort.factor控制着一次最多能合并多少流,默认值是10。这个过程包括排序和合并(可选),归并得到的文件内键值对有可能拥有相同的key,这个过程如果client设置过Combiner,也会合并相同的key值的键值对(根据上面提到的combine的调用时机可知)。
  5. 复制copy:Reduce进程启动一些数据copy线程,通过HTTP方式请求MapTask所在的NodeManager以获取输出文件。NodeManager需要为分区文件运行reduce任务。并月reduce任务需要集群上若干个map任务的map输出作为其特殊的分区文件。而每个map任务的完成时间可能不同,因此只要有一个任务完成,reduce任务就开始复制其输出。
  6. 归并merge:Copy过来的数据会先放入内存缓冲区中,如果内存缓冲区中能放得下这次数据的话就直接把数据写到内存中,即内存到内存merge。Reduce要向每个Map去拖取数据,在内存中每个Map对应一块数据,当内存缓存区中存储的Map数据占用空间达到一定程度的时候,开始启动内存中merge,把大存中的数据merge输出到磁盘上一个文件中,即内存到磁盘merge。与map端的溢写类似,在将buffer中多个map输出合并写入磁盘之前,如果设置了Combiner,则会化简压缩合并的map输出。Reduce的内存缓冲区可通过mapred.job.shufle.input.buffer.percent配置,默认是JVM的heap size的70%.内存到磁盘merge的启动门限可以通过mapred.job.shuffle.merge.percent配置,默认是66%。当属于该reducer的map输出全部拷贝完成,则会在reducer上生成多个文件(如果拖取的所有map数据总量都没有内存缓)中区,则数据就只存在于内存中),这时开始执行合并操作,即磁盘到磁盘merge,Map的输出数据已经是有序的,Merge进行一次合并排序,所调Reduce端的sort过程就是这个合并的过程,采取的排序方法跟map阶段不同,因为每个map端传过来的数据是排好序的,因此众多排好序的map输出文件在reduce端进行合并时采用的是归并排序,针对键进行归并排序。一般Reduce是一边copy一边sort,即copy和sort两个阶段是重而不是完全分开的。最终Reduce shufle过程会输出一个整体有序的数据块。
  7. reduce:当一个reduce任务完成全部的复制和排序后,就会针对已根据键排好序的Key构造对应的Value选代器。这时就要用到分组,默认的根据键分组自定义的可是使用 job.setGroupingComparatorClass()方法设置分组函数类。对于默认分组来说,只要这个比较器比较的两个Key相同,它们就属于同一组,它们的 Value就会放在一个Value选代器,而这个迭代器的Key使用属于同一个组的所有Key的第一个Key。在reduce阶段,reduce0方法的输入是所有的Key和它的Value选代器。此阶段的输出直接写到输出文件系统,一般为HDFS。如果采用HDFS,由于NodeManager也运行数据节点,所以第一个块副本将被写到本地磁盘。

总结

MapReduce框架的原理可以概括为以下几个关键步骤。

  • 分而治之:将大规模数据集划分为多个小的数据块,并将其分配给不同的Map任务进行并行处理。

  • 映射(Map):Map任务读取输入数据块,将其转换为一系列的键值对,并将其输出为中间结果。映射操作可以根据实际需求进行数据转换、过滤或其他计算。

  • 分区(Partition):Map任务根据键值对的键对数据进行分区,将相同键的键值对分配给相同的Reduce任务。分区的目的是将数据划分为更小的数据集,以便后续的Shuffle和Reduce操作。

  • Shuffle和排序:Shuffle过程将Map任务的输出中间结果按照键进行分组、分区和排序,以便后续的Reduce任务能够高效地处理相同键的所有值。

  • 聚合(Reduce):Reduce任务接收来自不同Map任务的中间结果,并对相同键的所有值进行聚合和计算。Reduce任务可以执行各种复杂的计算操作,生成最终的计算结果。

  • 输出:最后,Reduce任务将计算结果输出到指定的输出目录或存储介质中。

总的来说,MapReduce框架通过将大规模数据集划分为多个小任务,并在分布式计算环境中并行地进行处理和计算,实现了高效的并行计算。它的简单易用的编程模型使得开发人员能够更容易地编写并行计算任务,而不需要担心底层的分布式细节。通过充分利用分布式计算的优势,MapReduce框架能够处理大规模数据集,并提供高性能和可伸缩性。

03-20 14:23