Contents

·Apache Hadoop

·The Combiner

·MapReduce aggregate computation

2018年10月5日
[参考链接:]https://www.cnblogs.com/riordon/p/4605022.html


1、什么是MapReduce

MapReduce是一种并行可扩展计算模型,并且有较好的容错性,主要解决海量离线数据的批处理。其拥有以下特性:

 (1)易于编程
 (2)良好的扩展性
 (3)高容错性

MapReduce由JobTracker和TaskTracker组成。其中:
JobTracker负责资源管理和作业控制
TaskTracker负责任务的运行

MapReduce的执行过程如下:

(1) 开发人员编写好MapReduce program,将程序打包运行。
(2) JobClient向JobTracker申请可用Job,JobTracker返回JobClient一个可用Job ID。
(3) JobClient得到Job ID后,将运行Job所需要的资源拷贝到共享文件系统HDFS中。
	<获取资源如下>
	● 程序jar包、作业配置文件xml
	● 输入划分信息,决定作业该启动多少个map任务
	● 本地文件,包含依赖的第三方jar包(-libjars)、依赖的归档文件(-archives)和普通文件(-files),如果已经上传,则不需上传
(4) 资源准备完备后,JobClient向JobTracker提交Job。
(5) JobTracker收到提交的Job后,初始化Job。
(6) 初始化完成后,JobTracker从HDFS中获取输入splits(作业可以该启动多少Mapper任务)。
(7) 与此同时,TaskTracker不断地向JobTracker汇报心跳信息,并且返回要执行的任务。
(8) TaskTracker得到JobTracker分配(尽量满足数据本地化)的任务后,向HDFS获取Job资源(若数据是本地的,不需拷贝数据)。
(9) 获取资源后,TaskTracker会开启JVM子进程运行任务。

2、MapReduce工作原理
2.1 Map task

【BDP】——week2-Lecture课堂笔记-LMLPHP
程序会根据InputFormat将输入文件分割成splits,每个split会作为一个Map task的输入,每个Map task会有一个内存缓冲区,输入数据经过Map阶段处理后的中间结果会写入内存缓冲区,并且决定数据写入到哪个划分器(Partitioner),当写入的数据到达内存缓冲区的的阈值(默认是0.8),会启动一个线程将内存中的数据溢写入磁盘,同时不影响Map中间结果继续写入缓冲区。在溢写过程中,MapReduce框架会对key进行排序(sort),如果中间结果比较大,会形成多个溢写文件,最后的缓冲区数据也会全部溢写入磁盘形成一个溢写文件(最少有一个溢写文件),如果是多个溢写文件,则最后合并(merge)所有的溢写文件为一个文件。

2.2 Reduce task

当所有的Map task完成后,每个Map task会形成一个最终文件,并且该文件按区划分。Reduce任务启动之前,一个Map task完成后,就会启动线程来拉取Map结果数据到相应的Reduce task,不断地合并数据,为Reduce的数据输入做准备,当所有的Map tesk完成后,数据也拉取合并完毕后,Reduce task 启动,最终将输出输出结果存入HDFS上。


3、Shuffle

Shuffle是描述数据从Map输出到Reduce输入的这段过程。sheffle要能够:

完整地从map端拉取数据到reduce端
跨界点拉取数据时,尽量减少对带宽的不必要消耗
减小磁盘IO对task执行的影响
3.1 Shuffle在Map端

Split被送入Map端后,程序库决定数据结果数据属于哪个Partitioner,写入到内存缓冲区,到达阈值,开启溢写过程,进行key排序,如果有combiner步骤,则会对相同的key做归并处理,最终多个溢写文件合并为一个文件。
下图为shuffle在map端的工作流程
【BDP】——week2-Lecture课堂笔记-LMLPHP


3.1.1 关于Combiner

【概念】
1、Combiner是Hadoop允许用户针对map task的输出(即进入Reducer端之前)指定的一个合并函数。目的是为了避免map任务和reduce任务之间的数据传输,从而削减Mapper的输出从而减少网络带宽和Reducer之上的负载

2、Combiner最基本的功能是实现本地key的归并,其具有类似本地的reduce功能。如果不用Combiner,那么,所有的结果都是reduce完成,效率会相对低下。 而使用Combiner,先完成的map会在本地聚合,进而提升速度。

3、Combiner的输出是Reducer的输入,如果Combiner是可插拔的,添加Combiner绝不能改变最终的计算结果。所以Combiner只应该用于Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加(sum),最大值(max)等。

【误区与说明】
1)有很多人认为这个combiner和map输出的数据合并是一个过程,其实不然,map输出的数据合并只会产生在有数据split出的时候,即进行合并(merge)操作。

2)与mapper与reducer不同的是,combiner没有默认的实现,需要显式的设置在conf中才有作用

3)并不是所有的job都适用combiner,只有操作满足结合律的才可设置combiner。【combine操作类似于:opt(opt(1, 2, 3), opt(4, 5, 6))。如果opt为求和、求最大值的话,可以使用,但是如果是求中值的话,不适用。】

4)一般来说,combiner和reducer它们俩进行同样的操作。但一个combiner只是处理一个结点中的的输出,而不能享受像Reduce一样的输入(经过了shuffle阶段的数据),这点非常关键。


3.2 Shuffle在Reduce端

多个map task形成的最终文件的对应partitioner会被对应的reduce task拉取至内存缓冲区,对可能形成多个溢写文件合并,最终作为resuce task的数据输入 。
【BDP】——week2-Lecture课堂笔记-LMLPHP


3、MapReduce编程的主要组件

InputFormat类:分割成多个splits和每行怎么解析。
Mapper类:对输入的每对<key,value>生成中间结果。
Combiner类:在map端,对相同的key进行合并。
Partitioner类:在shuffle过程中,将按照key值将中间结果分为R份,每一份都由一个reduce去完成。
Reducer类:对所有的map中间结果,进行合并。
OutputFormat类:负责输出结果格式。


4、MapReduce中的任务处理
4.1 Map任务处理

Step1:读取HDFS中的文件。每一行解析成一个<k,v>。每一个键值对调用一次map函数。

例如:原始数据格式:
<k1, v1>  <k2, v2> =  <0,hello you>   <10,hello me>

Step2:对step1中解析的数据覆盖map()函数,接收step1产生的<k,v>,进行处理,转换为新的<k,v>输出。

map后的数据格式<k, v>
<hello,1>    <you,1>    <hello,1>    <me,1>

Step3:对step2输出的map后的<k,v>进行分区处理。默认分为一个区。此过程调用Partitioner

Step4:对不同分区中的数据进行排序sort分组(按照k的值)。其中,分组是指将相同key值的value放到一个集合中

对Step2中的数据排序后为:
<hello,1>    <hello,1>    <me,1>    <you,1>
进行分组后变为:
<hello,{1,1}>     <me,{1}>      <you,{1}>

Step5:(可选步骤)对分组后的数据进行Combiner归约处理。(当数据量较大时)


4.2 Reduce任务处理

Step1:多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点上。该过程就是shuffle的处理过程

Step2:对多个map的输出进行合并、排序。覆盖reduce函数,接收的是分组后的数据,实现自己的业务逻辑。经过处理后,产生新的<k,v>输出。

例如,	从map端接收到的数据为:
<hello,{1,1}>     <me,{1}>      <you,{1}>
经过reduce函数覆盖,例如count(yield),将会得到新的<k, v>:
<hello,2> 		  <me,1>		<you,1>

Step3:将reduce输出的<k,v>写到HDFS中。


下周实验内容

用HDFS读取数值型数据,计算max(output)
下周要求:key值记录location,value是pm2.5的值

10-05 18:58