MapReduce整体执行流程详解
MapReduce整体执行流程详解
[TOC]
MR整体执行流程(Yarn模式)
Map端
在MapReduce程序读取文件的输入目录上存放的待处理的文本文件0
客户端程序在submit() 方法执行前,获取待处理的信息,然后根据集群中的参数的配置形成一个任务分配规划
客户端提交切片信息给Yarn,Yarn中的resourcemanager启动MRAppmaster
MRAppmaster启动后根据本次的job描述信息,计算出需要的maptask的实例对象,然后向集群申请机器启动相应数量的maptask进程
第一次I/O… … 开始
Maptask利用客户端指定的inputformat来读取数据(默认为textinputformat方法),形成输出的KV键值对
Maptask将输入的KV键值对传递给客户定义的map() 方法,做逻辑运算;Map() 方法运算完毕后将KV键值对收集到OutputCollector(新版本叫做Context)
shuffle阶段… …start
OutputCollector将KV键值对放到环形缓冲区中(起到削峰的作用)
环形缓冲区的数据不断溢写到本地磁盘文件的过程中,对KV键值对进行分区(默认分区根据key的hash值%reduce数量进行分区),分区内进行快排
数据溢写到文件(分区有序,分区内键有序)
第一次I/O… … 结束
第二次I/O… … 开始
使用merge归并排序,将同分区的数据聚合,多个文件会被合并成大的溢出文件
第二次I/O… … 结束
调用combine方法,将同键的值合并(优化操作之一,详情下文会详解)
第三次I/O… … 开始
当maptask执行完成后,默认启动相应数量的reducetask,并告知reducetask处理的数据范围(数据分区)
reducetask根据分区号,去各个maptask机器上获取到相应的分区数据;第三次I/O… … 结束|第四次I/O… … 开始因为会取到同一个分区的来自不同maptask的结果文件,reducetask会将这些文件进归并排序,再合并成一个大文件(内容按照K有序)
第四次I/O… … 结束
第五次I/O… … 开始
shuffle阶段… …end
大文件数据进入reducetask的逻辑运算过程,首先调用GroupingComparator对大文件里面的数据进行分组)
从文件中每次取出一组(K,V)键值对,调用自定义的reduce() 方法进行逻辑处理
reducetask运算完毕后,调用客户指定的outputformat(默认为TextOutputFormat)将结果数据输出到外部(part-r-000* 的结果文件)
第五次I/O… … 结束
Reduce端
基于MR的wordcount案例(图解)
Hadoop Shuffle原理
map方法之后,reduce方法之前的这段处理过程称之为Shuffle
Map端
map方法之后,数据首先进入到分区方法,把数据标记好分区,然后把数据发送到环形缓冲区,记录起始偏移量
- 环形缓冲区的默认大小为100M,环形缓冲区的本质就是一个数组,通过
mapreducetask.io.sort.mb
设置缓冲区大小
- 环形缓冲区的默认大小为100M,环形缓冲区的本质就是一个数组,通过
当环形缓冲区的容量达到80%的时候,记录终止偏移量,进行溢写
将数据进行分区(默认分组根据key的hash值%reduce数量进行分区)
溢写前对分区内数据进行排序,排序按照对
key
的索引进行字典顺序排序,排序方式为快排- 分区、排序结束后,将数据溢写到磁盘(这个过程中,maptask输出的数据写入到剩余的20%环形缓冲区,同样需要记录起始偏移量)
溢写产生大量的溢写文件,需要对溢写文件进行归并排序
- 对溢写的文件也阔以进行Combiner操作,前提是汇总操作,求平均值不行
最后将文件按照分区储存到磁盘,等待Reduce端拉取
Reduce端
- 每个Reducer拉取Map端对应分区的数据
- 拉取数据后,先储存到内存中,内存不够了,在存储到磁盘
- 拉取所有的数据后,采用归并排序将内存和磁盘中的数据都进行排序
- 在进入Reduce方法之前,阔以对数据进行分组操作
- reducetask将拉取过来的数据 “分组”,每一组数据调用一次reduce() 方法
整体Shuffle流程总结:
map端:
map() –> 分区 –> 环形缓冲区 **–> ** 排序 –> 溢写 –> 归并排序 –> 写入磁盘
等待reduce端拉取:
reduce端拉取对应分区的数据 –> 存在内存中(内存不足,写入磁盘) –> 拉取完数据,归并排序 –> 对数据进行分组 –> 每组数据调用一次reduce() 方法
combine方法的作用及目的
原理:combine分为map端和reduce端,把一个map方法产生的<K,V>对(多个Key,value),合并成一个新的<K2,V2>,将新的 <K2,V2>,作为输入传入到reduce方法中。这里的V2也阔以称之为Vlues;
作用:把同一个key的键值对合并在一起
目的:减少网络传输
MapReduce的优化方法
MapReduce优化方法主要从六个方面考虑:输入阶段、Map阶段、Reduce阶段、IO阶段、数据倾斜问题、常用的调优参数
1、输入阶段
- 合并小文件
- 在执行MR任务之前合并小文件,大量的小文件会产生大量的map任务,增大map任务的装载次数,而任务的装载比较耗时,从而导致MR运行慢
- 采用combineTextInputFormat作为输入,来解决输入端大量小文件的场景
2、Map阶段
- 减少溢写(Spill)次数
- 调整
mapreduce.task.io.sort.mb
和mapreduce.map.sort.spill.percent
,增大出发Spill的内存上限,减少Spill次数,从而减少磁盘IO
- 调整
- 减少合并(merge)次数
- 调整
mapreduce.task.io.sort.factor
,增加Merge的文件数,以此来减少Merge的次数,从而缩短MR的处理时间
- 调整
- 优先 combine
- 在Map之后,不影响业务的逻辑的前提下,阔以先进行combine,以此减少IO
3、Reduce阶段
- 设置合理的map/reduce数量
- 二者皆不可设置的太多或者太少,太多:会导致Map与Reduce之间相互竞争资源,造成处理时间超时等错误;太短:会导致Task等待,延长处理时间;
- 设置map与reduce共存
- 调整
slow start completedmaps
参数,使得Map运行到一定程度后,Reduce也开始运行,从而减少Reduce的等待时间
- 调整
- 规避使用reduce
- 数据能在map端处理完成就直接处理完成,走reduce会产生大量的资源消耗
- 设置合理的reduce端的buffer
- 默认情况下,数据达到一个阈值的时候,Buffer中的数据就会写入磁盘,然后Reduce会从磁盘中获得所有的数据。也就是说,Buffer和Reduce是没有直接关联的,中间多次写磁盘->读磁盘的过程,既然有这个弊端,那么就可以通过参数来配置,使得Buffer中的-一部分数据可以直接输送到Reduce,从而减少I/O开销
mapreduce.reduce.input.buffer.perent
的默认为0.0。当值大于0时,会保留指定比例的内存以读取Bulfer中的数据直接拿给Reduce使用。
4、IO阶段
- 采用数据压缩的方式
- 压缩数据,数据量减小,减少任务的IO时间
- Snappy 和 LZO 格式需要安装压缩编码器
- 使用seq二进制文件
5、数据倾斜
- 数据倾斜现象
- 数据频率倾斜 — 某一区域的数据量要远远大于其他区域
- 数据大小倾斜 — 部分记录的大小远远大于平均值
- 减少数据倾斜的方法
- 抽样和范围分区
- 可以通过对原始数据进行抽样得到的结果集来预设分区边界值
- 自定义分区
- 基于输出键的背景知识进行自定义分区。例如,如果Map输出键的单词来源于一本书。且其中某几个专业词汇较多。那么就可以自定义分区将这这些专业词汇发送给固定的一部分Reduce实例。而将其他的都发送给剩余的Reduce实例。
- Combine
- 使用Combine可以大量地减小数据倾斜。在可能的情兄下,Combine的目的就是聚合并精简数据。
- 采用 Map Join,尽量避免 Reduce Join
- 抽样和范围分区
6、计算机性能
- CPU、内存、磁盘管理、网络
7、调优参数
资源相关参数
mapred-default.xml
配置参数 参数说明 备注 mapreduce.map.memory.mb 一个MapTask可使用的资源上限(单位:MB),默认为1024。如果MapTask实际使用的资源量超过该值,则会被强制杀死。 mapreduce.reduce.memory.mb 一个ReduceTask可使用的资源上限(单位:MB),默认为1024。如果ReduceTask实际使用的资源量超过该值,则会被强制杀死。 mapreduce.map.cpu.vcores 每个MapTask可使用的最多cpu core数目,默认值: 1 mapreduce.reduce.cpu.vcores 每个ReduceTask可使用的最多cpu core数目,默认值: 1 mapreduce.reduce.shuffle.parallelcopies 每个Reduce去Map中取数据的并行数。默认值是5 mapreduce.reduce.shuffle.merge.percent Buffer中的数据达到多少比例开始写入磁盘。默认值0.66 mapreduce.reduce.shuffle.input.buffer.percent Buffer大小占Reduce可用内存的比例。默认值0.7 mapreduce.reduce.input.buffer.percent 指定多少比例的内存用来存放Buffer中的数据,默认值是0.0 yarn-default.xml
配置参数 参数说明 备注 yarn.scheduler.minimum-allocation-mb 给应用程序Container分配的最小内存,默认值:1024 yarn.scheduler.maximum-allocation-mb 给应用程序Container分配的最大内存,默认值:8192 yarn.scheduler.minimum-allocation-vcores 每个Container申请的最小CPU核数,默认值:1 yarn.scheduler.maximum-allocation-vcores 每个Container申请的最大CPU核数,默认值:32 yarn.nodemanager.resource.memory-mb 给Containers分配的最大物理内存,默认值:8192 Shuffle性能优化的关键参数 — mapred-default.xml
配置参数 参数说明 备注 mapreduce.task.io.sort.mb Shuffle的环形缓冲区大小,默认100M mapreduce.map.sort.spill.percent 环形缓冲区溢出的阈值,默认80%
容错相关参数(MR性能优化)
配置参数 参数说明 备注 mapreduce.map.maxattempts 每个Map Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。 mapreduce.reduce.maxattempts 每个Reduce Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。 mapreduce.task.timeout Task超时时间,经常需要设置的一个参数,该参数表达的意思为:如果一个Task在一定时间内没有任何进入,即不会读取新的数据,也没有输出数据,则认为该Task处于Block状态,可能是卡住了,也许永远会卡住,为了防止因为用户程序永远Block住不退出,则强制设置了一个该超时时间(单位毫秒),默认是600000。 如果你的程序对每条输入数据的处理时间过长(比如会访问数据库,通过网络拉取数据等),建议将该参数调大,该参数过小常出现的错误提示是“AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.”。
基于JAVA实现的MR
关键类
- GenericOptionsParser :是为Hadoop 框架解析命令行参数的工具类
- InputFotmat接口 :它的实现类包括; Fileinputformat、Composableinputformat、textinputformat 等,主要用于文件的输入与切割
- **Mapper **:将输入的KV键值对转化为中间数据KV键值对集合,Maps将输入记录转变为中间记录
- Reducer:根据 key 将中间数据集合处理合并为更小的数据结果集
- Partitioner:将数据根据key进行分区
- OutputCollector:文件的输出
- Combiner:本地聚合,本地化的reduce
mapper的方法
- setup
- 用于管理mapper生命周期中的资源,家在一些初始化的工作,每个job执行一次,setup在完成后mapper构造,即将开始执行map方法前执行
- map
- 主要逻辑的编写
- cleanup
- 主要做一些收尾的工作,如关闭文件、释放资源或者执行map() 后的键值分发等,每个job执行一次,比较适合用来计算全局最大值之类的任务
- run
- 按顺序执行上述的过程:setup —> map —> cleanup
wordcount案例代码附上
Mapper.java
1 |
|
Reducer.java
1 |
|
MapjoinJob.java
1 |
|
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!