MapReduce 是一个分布式运算程序的编程框架,是用户开发“基于 Hadoop 的数据分析应用”的核心框架。
MapReduce 核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个 Hadoop 集群上。
优点:
1)MapReduce 易于编程
它简单的实现一些接口,就可以完成一个分布式程序。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的。用户只需要关心业务逻辑,实现一些简单的接口就可以了
2)良好的扩展性
可以动态增加服务器,解决计算资源不够的问题
3)高容错性
MapReduce 设计的初衷就是使程序能够部署在廉价的 PC 机器上,这就要求它具有很高的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由 Hadoop 内部完成的。
4)适合 PB 级以上海量数据的离线处理
可以实现上千台服务器集群并发工作,提供数据处理能力。
缺点:
1)不擅长实时计算
MapReduce 无法像 MySQL 一样,在毫秒或者秒级内返回结果。(MapReduce基于磁盘)
2)不擅长流式计算
流式计算的输入数据是动态的,而 MapReduce 的输入数据集是静态的,不能动态变化。 这是因为 MapReduce 自身的设计特点决定了数据源必须是静态的。 (Spark Streaming、Flink擅长)
3)不擅长 DAG(有向无环图)计算
即多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。
在这种情况下,MapReduce 并不是不能做,而是使用后,因为每个 MapReduce 作业的输出结果都会写入到磁盘,会造成大量的磁盘 IO,导致性能非常的低下。 (Spark擅长)
一个完整的MapReduce 程序在分布式运行时有三类实例进程:
(1)MrAppMaster:负责整个程序的过程调度及状态协调。 (MRAppMaster是MapReduce的ApplicationMaster实现)
(2)MapTask:负责 Map阶段的整个数据处理流程。
(3)ReduceTask:负责 Reduce 阶段的整个数据处理流程。
(MapTask和ReduceTesk属于Yarnchild)
定义:
序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输。
反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。
为什么要序列化?
不同节点要通过网络进行传输,网络传输就必须是字节流!所以对象必须要通过序列化转化为字节流再进行传输,
为什么不用 Java 的序列化?
Java 的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。
所以, Hadoop 自己开发了一套序列化机制(Writable)。
默认使用的实现类是:TextInputFormat;
TextInputFormat 的功能逻辑是:一次读一行文本,然后将该行的起始偏移量作为key,行内容作为 value 返回。

问:块大小blockSize能不能调整?
答:不能!块是实实在在存储在物理储存上的
框架默认的 TextInputFormat 切片机制是每个文件单独切片,都会交给一个 MapTask,(一个切片对应一个MapTask),这样如果有大量小文件,就会产生大量的MapTask,一个MapTask需要约1g内存,一个cpu,开销较大,处理效率极其低下。
CombineTextInputFormat 用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中;
通过设置切片最大值:
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304); // 4mb
让多个小文件合并为少量的切片;

0)提交Job:
1.客户端提交job给ResourceManager,ResourceManager返回job id和资源提交路径;
2.Client提交jar包、切片信息和配置文件到指定的资源提交路径;
3.Client提交完资源后,向ResourceManager申请运行MrAppMaster(整个任务运行的资源节点);
1)Read阶段:
4.MrAppMaster开启;
5.MrappMaster会读取客户端提供的信息,主要是job.split 切片信息,会根据切片个数开启对应个数的MapTask;
6.MapTask启动后,使用TestInputFormat的recodReader读取文件;
2)Map阶段:
7.recordReader读完之后将K、V返回给Mapper (mapper里面是业务逻辑);
8.Map逻辑完了之后,通过context.write进行collect收集,默认再使用HashPartitioner进行分区处理;
3)collect + shuffle:
9.数据从Mapper输出,先标记分区,然后到【内存中】的outputCollector环形缓冲区。环形缓冲区的作用是为了批量收集Map的结果(环形缓冲区默认大小100M)
10.当缓冲区的数据达到80%的时候,将数据溢写到磁盘—【溢写之前】快速排序(shuffle第一次排序);
11.按照分区溢写,【溢写后】产生多个溢写文件,使用归并排序保证分区内有序(Shuffle阶段第二次排序),等待reduceTask来拉取;
1)拉取阶段+merge
1.ReduceTask会从MapTask对应的分区通过HTTP方式拉取分区数据 ,拉过来之后先放在内存,内存不够再溢写到磁盘上;
(切片对应MapTask ,分区对应ReduceTask)
2.数据拉取过来后进行归并排序,形成ReduceTask内的全局排序的文件;达到相同的key进入一个reducer的效果;
2)reduce + output
3.reducer进行业务逻辑处理;
4.使用outputformat将reducer的数据写到HDFS;
作用:
1.Map数量一般多于Reduce,而使用Combine可以减少数据量,Combiner对每一个MapTask的输出进行局部汇总,以减小网络传输量以提高效率;
2.可以减少数据倾斜!
哪里可以使用 Combiner ?
Combiner的输出是Reducer的输入,即对MapTask的输出结果进行汇总,要在ReduceTask拉取数据之前使用;
使用 Combiner 的前提?
Combiner 绝不能改变最终的计算结果!
Combiner 只应该用于那种 Reduce 的输入 key/value与输出 key/value 类型完全一致,且不影响最终结果的场景
当数据量非常庞大时,如果都在Reduce端进行处理,那么资源不够用!则可以在Map阶段进行表的合并。
1)使用场景
Map Join 适用于一张表十分小、一张表很大的场景。 (将小表缓存到内存当中)
2)优点
思考:在 Reduce 端处理过多的表,非常容易产生数据倾斜。怎么办?
在 Map 端缓存多张表,提前处理业务逻辑,这样增加 Map 端业务,减少 Reduce 端数据的压力,尽可能的减少数据倾斜。
3)具体办法:采用 DistributedCache
①在 Mapper 的 setup 阶段,将小表文件读取到缓存集合中。
②在 Driver 驱动类中加载缓存:
//缓存普通文件到 Task 运行节点。
job.addCacheFile(new URI(“file:///e:/cache/pd.txt”));
//如果是集群运行,需要设置 HDFS 路径
job.addCacheFile(new URI(“hdfs://hadoop102:8020/cache/pd.txt”));
写多读少—优先考虑压缩比,用Gzip
写少读多—优先考虑速度,用Snappy、Lzo
Snappy
不支持切片;
速度快
Gzip
不支持切片;
压缩比高,
速度慢
Lzo
支持切片;
速度快;
压缩率一般;
Bzip2
支持切片;
压缩率高;
速度慢;