MapReduce整体执行流程详解

MapReduce整体执行流程详解

[TOC]

MR整体执行流程(Yarn模式)

Map端

Map端

  1. 在MapReduce程序读取文件的输入目录上存放的待处理的文本文件0

  2. 客户端程序在submit() 方法执行前,获取待处理的信息,然后根据集群中的参数的配置形成一个任务分配规划

  3. 客户端提交切片信息给Yarn,Yarn中的resourcemanager启动MRAppmaster

  4. MRAppmaster启动后根据本次的job描述信息,计算出需要的maptask的实例对象,然后向集群申请机器启动相应数量的maptask进程

    第一次I/O… … 开始

  5. Maptask利用客户端指定的inputformat来读取数据(默认为textinputformat方法),形成输出的KV键值对

  6. Maptask将输入的KV键值对传递给客户定义的map() 方法,做逻辑运算;Map() 方法运算完毕后将KV键值对收集到OutputCollector(新版本叫做Context)

    shuffle阶段… …start

  7. OutputCollector将KV键值对放到环形缓冲区中(起到削峰的作用)

  8. 环形缓冲区的数据不断溢写到本地磁盘文件的过程中,对KV键值对进行分区(默认分区根据key的hash值%reduce数量进行分区),分区内进行快排

  9. 数据溢写到文件(分区有序,分区内键有序)

    第一次I/O… … 结束

    第二次I/O… … 开始

  10. 使用merge归并排序,将同分区的数据聚合,多个文件会被合并成大的溢出文件

    第二次I/O… … 结束

  11. 调用combine方法,将同键的值合并(优化操作之一,详情下文会详解)

    第三次I/O… … 开始

  12. 当maptask执行完成后,默认启动相应数量的reducetask,并告知reducetask处理的数据范围(数据分区)

  13. reducetask根据分区号,去各个maptask机器上获取到相应的分区数据;第三次I/O… … 结束|第四次I/O… … 开始因为会取到同一个分区的来自不同maptask的结果文件,reducetask会将这些文件进归并排序,再合并成一个大文件(内容按照K有序)

    第四次I/O… … 结束

    第五次I/O… … 开始

    shuffle阶段… …end

  14. 大文件数据进入reducetask的逻辑运算过程,首先调用GroupingComparator对大文件里面的数据进行分组)

  15. 从文件中每次取出一组(K,V)键值对,调用自定义的reduce() 方法进行逻辑处理

  16. reducetask运算完毕后,调用客户指定的outputformat(默认为TextOutputFormat)将结果数据输出到外部(part-r-000* 的结果文件)

    第五次I/O… … 结束

Reduce端

Reduce端

基于MR的wordcount案例(图解)

MapReduce_shuffle基于业务处理流程

Hadoop Shuffle原理

  1. map方法之后,reduce方法之前的这段处理过程称之为Shuffle

  2. Map端

  3. map方法之后,数据首先进入到分区方法,把数据标记好分区,然后把数据发送到环形缓冲区,记录起始偏移量

    • 环形缓冲区的默认大小为100M,环形缓冲区的本质就是一个数组,通过mapreducetask.io.sort.mb设置缓冲区大小
  4. 当环形缓冲区的容量达到80%的时候,记录终止偏移量,进行溢写

    • 将数据进行分区(默认分组根据key的hash值%reduce数量进行分区)

    • 溢写前对分区内数据进行排序,排序按照对key的索引进行字典顺序排序,排序方式为快排

      • 分区、排序结束后,将数据溢写到磁盘(这个过程中,maptask输出的数据写入到剩余的20%环形缓冲区,同样需要记录起始偏移量)
  5. 溢写产生大量的溢写文件,需要对溢写文件进行归并排序

    • 对溢写的文件也阔以进行Combiner操作,前提是汇总操作,求平均值不行
  6. 最后将文件按照分区储存到磁盘,等待Reduce端拉取

  7. Reduce端

    1. 每个Reducer拉取Map端对应分区的数据
    2. 拉取数据后,先储存到内存中,内存不够了,在存储到磁盘
    3. 拉取所有的数据后,采用归并排序将内存和磁盘中的数据都进行排序
    4. 在进入Reduce方法之前,阔以对数据进行分组操作
      • reducetask将拉取过来的数据 “分组”,每一组数据调用一次reduce() 方法

整体Shuffle流程总结

map端:

map() –> 分区 –> 环形缓冲区 **–> ** 排序 –> 溢写 –> 归并排序 –> 写入磁盘

等待reduce端拉取

reduce端拉取对应分区的数据 –> 存在内存中(内存不足,写入磁盘) –> 拉取完数据,归并排序 –> 对数据进行分组 –> 每组数据调用一次reduce() 方法

combine方法的作用及目的

原理:combine分为map端和reduce端,把一个map方法产生的<K,V>对(多个Key,value),合并成一个新的<K2,V2>,将新的 <K2,V2>,作为输入传入到reduce方法中。这里的V2也阔以称之为Vlues;

作用:把同一个key的键值对合并在一起

目的:减少网络传输

MapReduce的优化方法

​ MapReduce优化方法主要从六个方面考虑:输入阶段、Map阶段、Reduce阶段、IO阶段、数据倾斜问题、常用的调优参数

1、输入阶段

  1. 合并小文件
    • 在执行MR任务之前合并小文件,大量的小文件会产生大量的map任务,增大map任务的装载次数,而任务的装载比较耗时,从而导致MR运行慢
  2. 采用combineTextInputFormat作为输入,来解决输入端大量小文件的场景

2、Map阶段

  1. 减少溢写(Spill)次数
    • 调整 mapreduce.task.io.sort.mbmapreduce.map.sort.spill.percent ,增大出发Spill的内存上限,减少Spill次数,从而减少磁盘IO
  2. 减少合并(merge)次数
    • 调整 mapreduce.task.io.sort.factor ,增加Merge的文件数,以此来减少Merge的次数,从而缩短MR的处理时间
  3. 优先 combine
    • 在Map之后,不影响业务的逻辑的前提下,阔以先进行combine,以此减少IO

3、Reduce阶段

  1. 设置合理的map/reduce数量
    • 二者皆不可设置的太多或者太少,太多:会导致Map与Reduce之间相互竞争资源,造成处理时间超时等错误;太短:会导致Task等待,延长处理时间;
  2. 设置map与reduce共存
    • 调整slow start completedmaps 参数,使得Map运行到一定程度后,Reduce也开始运行,从而减少Reduce的等待时间
  3. 规避使用reduce
    • 数据能在map端处理完成就直接处理完成,走reduce会产生大量的资源消耗
  4. 设置合理的reduce端的buffer
    • 默认情况下,数据达到一个阈值的时候,Buffer中的数据就会写入磁盘,然后Reduce会从磁盘中获得所有的数据。也就是说,Buffer和Reduce是没有直接关联的,中间多次写磁盘->读磁盘的过程,既然有这个弊端,那么就可以通过参数来配置,使得Buffer中的-一部分数据可以直接输送到Reduce,从而减少I/O开销
    • mapreduce.reduce.input.buffer.perent的默认为0.0。当值大于0时,会保留指定比例的内存以读取Bulfer中的数据直接拿给Reduce使用。

4、IO阶段

  1. 采用数据压缩的方式
    • 压缩数据,数据量减小,减少任务的IO时间
    • Snappy 和 LZO 格式需要安装压缩编码器
  2. 使用seq二进制文件

5、数据倾斜

  1. 数据倾斜现象
    1. 数据频率倾斜 — 某一区域的数据量要远远大于其他区域
    2. 数据大小倾斜 — 部分记录的大小远远大于平均值
  2. 减少数据倾斜的方法
    1. 抽样和范围分区
      • 可以通过对原始数据进行抽样得到的结果集来预设分区边界值
    2. 自定义分区
      • 基于输出键的背景知识进行自定义分区。例如,如果Map输出键的单词来源于一本书。且其中某几个专业词汇较多。那么就可以自定义分区将这这些专业词汇发送给固定的一部分Reduce实例。而将其他的都发送给剩余的Reduce实例。
    3. Combine
      • 使用Combine可以大量地减小数据倾斜。在可能的情兄下,Combine的目的就是聚合并精简数据。
    4. 采用 Map Join,尽量避免 Reduce Join

6、计算机性能

  • CPU、内存、磁盘管理、网络

7、调优参数

  1. 资源相关参数

    1. mapred-default.xml

      配置参数 参数说明 备注
      mapreduce.map.memory.mb 一个MapTask可使用的资源上限(单位:MB),默认为1024。如果MapTask实际使用的资源量超过该值,则会被强制杀死。
      mapreduce.reduce.memory.mb 一个ReduceTask可使用的资源上限(单位:MB),默认为1024。如果ReduceTask实际使用的资源量超过该值,则会被强制杀死。
      mapreduce.map.cpu.vcores 每个MapTask可使用的最多cpu core数目,默认值: 1
      mapreduce.reduce.cpu.vcores 每个ReduceTask可使用的最多cpu core数目,默认值: 1
      mapreduce.reduce.shuffle.parallelcopies 每个Reduce去Map中取数据的并行数。默认值是5
      mapreduce.reduce.shuffle.merge.percent Buffer中的数据达到多少比例开始写入磁盘。默认值0.66
      mapreduce.reduce.shuffle.input.buffer.percent Buffer大小占Reduce可用内存的比例。默认值0.7
      mapreduce.reduce.input.buffer.percent 指定多少比例的内存用来存放Buffer中的数据,默认值是0.0
    2. yarn-default.xml

      配置参数 参数说明 备注
      yarn.scheduler.minimum-allocation-mb 给应用程序Container分配的最小内存,默认值:1024
      yarn.scheduler.maximum-allocation-mb 给应用程序Container分配的最大内存,默认值:8192
      yarn.scheduler.minimum-allocation-vcores 每个Container申请的最小CPU核数,默认值:1
      yarn.scheduler.maximum-allocation-vcores 每个Container申请的最大CPU核数,默认值:32
      yarn.nodemanager.resource.memory-mb 给Containers分配的最大物理内存,默认值:8192
    3. Shuffle性能优化的关键参数 — mapred-default.xml

      配置参数 参数说明 备注
      mapreduce.task.io.sort.mb Shuffle的环形缓冲区大小,默认100M
      mapreduce.map.sort.spill.percent 环形缓冲区溢出的阈值,默认80%
  2. 容错相关参数(MR性能优化

    配置参数 参数说明 备注
    mapreduce.map.maxattempts 每个Map Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。
    mapreduce.reduce.maxattempts 每个Reduce Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。
    mapreduce.task.timeout Task超时时间,经常需要设置的一个参数,该参数表达的意思为:如果一个Task在一定时间内没有任何进入,即不会读取新的数据,也没有输出数据,则认为该Task处于Block状态,可能是卡住了,也许永远会卡住,为了防止因为用户程序永远Block住不退出,则强制设置了一个该超时时间(单位毫秒),默认是600000。 如果你的程序对每条输入数据的处理时间过长(比如会访问数据库,通过网络拉取数据等),建议将该参数调大,该参数过小常出现的错误提示是“AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.”。

基于JAVA实现的MR

关键类

  1. GenericOptionsParser :是为Hadoop 框架解析命令行参数的工具类
  2. InputFotmat接口 :它的实现类包括; Fileinputformat、Composableinputformat、textinputformat 等,主要用于文件的输入与切割
  3. **Mapper **:将输入的KV键值对转化为中间数据KV键值对集合,Maps将输入记录转变为中间记录
  4. Reducer:根据 key 将中间数据集合处理合并为更小的数据结果集
  5. Partitioner:将数据根据key进行分区
  6. OutputCollector:文件的输出
  7. Combiner:本地聚合,本地化的reduce

mapper的方法

  1. setup
    • 用于管理mapper生命周期中的资源,家在一些初始化的工作,每个job执行一次,setup在完成后mapper构造,即将开始执行map方法前执行
  2. map
    • 主要逻辑的编写
  3. cleanup
    • 主要做一些收尾的工作,如关闭文件、释放资源或者执行map() 后的键值分发等,每个job执行一次,比较适合用来计算全局最大值之类的任务
  4. run
    • 按顺序执行上述的过程:setup —> map —> cleanup

wordcount案例代码附上

Mapper.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.fs.FileSystem;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

public class Mapper extends Mapper<LongWritable, Text,Text, IntWritable> {

Map<String, String> mapIdName;
Text keyOut = new Text();
IntWritable valueOut = new IntWritable();


@Override
// 执行一次
protected void setup(Context context) throws IOException, InterruptedException {
FileSystem fs = FileSystem.get(context.getConfiguration());
URI[] caches = context.getCacheFiles();
// 依据缓存文件的列表, 根据下标获取目标的文件路径
FSDataInputStream fsDis = fs.open(new Path(caches[0]));
BufferedReader br = new BufferedReader(new InputStreamReader(fsDis));
// 开始=读取
String line = null;
mapIdName = new HashMap<>();
while (null != (line = br.readLine())) {
String[] ps = line.trim().split(",");
mapIdName.put(ps[0], ps[1]);
}
// 读取 hdfs 数据至缓存, fs结束
//fs.close();

}

@Override
// 按行检索
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] ps = value.toString().trim().split(",");
keyOut.set(mapIdName.get(ps[0]));
valueOut.set(Integer.parseInt(ps[1]));
context.write(keyOut, valueOut);
}

@Override
// 执行一次
protected void cleanup(Context context) throws IOException, InterruptedException {
mapIdName.clear();
mapIdName = null;
}
}

Reducer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.io.Text;
import java.io.IOException;

public class Reducer extends Reducer<Text, IntWritable, Text, FloatWritable> {

FloatWritable valueOut = new FloatWritable();

@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
float score = 0;
int count = 0;

for (IntWritable value : values) {
score += value.get();
count++;
}
valueOut.set(score/count);
context.write(key, valueOut);
}
}

MapjoinJob.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.io.Text;
import java.net.URI;

public class MapJoinJob {
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.set("mapreduce.app-submission.cross-platform","true");
try {
Path pathOut = new Path("/test/avgScore");
FileSystem fs = FileSystem.get(conf);
if (fs.exists(pathOut)) {
fs.deleteOnExit(pathOut);
System.out.println(pathOut.getName() + "已删除!!!");
}
fs.close();


Job job = Job.getInstance(conf, "mapJoinJob");
job.setJarByClass(MapJoinJob.class);
job.setJar("target/hadoop_first-1.0-jar-with-dependencies.jar");

job.setMapperClass(ClaAvgMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

job.setReducerClass(ClaAVgReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FloatWritable.class);

job.addCacheFile(URI.create("/test/class/class.log"));
FileInputFormat.addInputPath(job, new Path("/test/score/score.log"));
FileOutputFormat.setOutputPath(job,pathOut);

job.waitForCompletion(true);

} catch (Exception e) {
e.printStackTrace();
}
}
}

本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!