MapReduce学习笔记

MapReduce的思想核心是“分而治之”,适用于大量复杂的任务处理场景(大规模数据处理场景)。

Map负责“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理。可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系。

Reduce负责“合”,即对map阶段的结果进行全局汇总。

这两个阶段合起来正是MapReduce思想的体现。

Hadoop MapReduce设计构思

MapReduce是一个分布式运算程序的编程框架,核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在Hadoop集群上。

Hadoop MapReduce构思体现在如下的三个方面:

  • 对于大数据处理:分而治之

  • 构建抽象模型:MapReduce

    MapReduce中定义了如下的MapReduce两个抽象的编程接口,由用户去编程实现:

    map: (k1;v1) → [(k2;v2)]

    reduce: (k2;[v2]) → [(k3;v3)]

  • 统一架构,隐藏系统层细节

MapReduce框架结构

一个完整的mapreduce程序在分布式运行时有三类实例进程:

  1. MRAppMaster:负责整个程序的过程调度及状态协调
  2. MapTask:负责map阶段的整个数据处理流程
  3. ReduceTask:负责reduce阶段的整个数据处理流程

MapReduce编程规范

MapReduce的开发一共有八个 步骤:map阶段2个,shuffle阶段4个,reduce阶段2个

Map阶段

  1. 设置inputFormat类,将数据切分成key,value对,输入到第二步;
  2. 自定义Map逻辑,处理第一步的输入数据,然后转换成新的key,value对进行输出;

Shuffle阶段

  1. 对输出key-value对进行分区
  2. 对不同分区的数据按照相同的key进行排序
  3. (可选)对分组过的数据初步规约,降低数据的网络拷贝;
  4. 对数据进行分组,相同key的value放到一个集合当中;

Reduce阶段

  1. 对多个map的任务进行合并,排序,写reduce函数自己的逻辑,对输入的key-value对进行从处理,转换成新的key-value对进行输出;
  2. 设置outputFormat将输出的key,value对数据进行保存到文件中。

MapReduce编程示例

需求:在一堆给定的文本文件中统计输出每一个单词出现的总次数。

数据格式准备如下:

1
$ vim wordcount.txt
1
2
3
4
5
6
7
8
9
hello,world,hadoop
hive,sqoop,flume,hello
kitty,tom,jerry,world
hadoop,hive,sqoop,flume,hello
kitty,tom,jerry,world
hadoop,hello,world,hadoop
hive,sqoop,hello,world,hadoop
hive,sqoop,flume,hello
kitty,tom,jerry,world,hadoop

上传到hdfs

1
2
$ hdfs dfs -mkdir /wordcount/
$ hdfs dfs -put wordcount.txt /wordcount/

定义mapper类

1
2
3
4
5
6
7
8
9
10
public class WordCountMapper extends Mapper<LongWritable,Text,Text,LongWirtable>{
@Override
public void map(LongWritable key,Text value,Context context) throws Exception{
String line = value.toString();
String[] split = line.split(",");
for(String word : split){
context.write(new Text(word),new LongWritable(1));
}
}
}

定义reducer类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class WordCountReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
/**
* @param key 单词
* @param values 单词出现的次数
* @param context
* @throws Exception
*/
@Override
protected void reduce(Text key,Iterable<LongWirtable> values,Context context) throws Exception{
long count = 0;
for(LongWirtable value : values){
count += value.get();
}
context.write(key,new LongWritable(count));
}
}

定义主类,描述job并提交job

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class JobMain extends Configured implements Tool{
//该方法用于指定一个job任务
@Override
public int run(String[] args) throws Exception{
Job job = Job.getInstance(super.getConf(),JobMain.class.getSimpleName());
//打包到集群,需要添加以下配置,指定程序的main函数
job.setJarByClass(JobMain.class);
//1. 读取输入文件解析成key,value对
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("hdfs://192.168.17.100:8020/wordcount"));
//本地模式运行
//TextInputFormat.addInputPath(job,new Path("file:///D:\\mapreduce\\wordcount.txt"));
//2. 设置我们的mapper类
job.setMapperClass(WordCountMapper.class);
//设置map阶段完成之后的输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWirtable.class);
//3.分区 4.排序 5.规约 6.分组 shuffle阶段采用默认方式
//7. 设置reduce类
job.setReducerClass(WordCountReducer.class);
//设置reduce阶段完成之后的输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//8. 设置输出类型以及输出路径
job.setOutputFormatClass(TextOutputFormat.class);
Path path = new Path("hdfs://192.168.17.100:8020/wordcount_out");
TextOutputFormat.setOutputPath(job,path);
//TextOutputFormat.setOutputPath(job,new Path("hdfs://192.168.17.100:8020/wordcount_out"));
//本地模式运行
//TextOutputFormat.setOutputPath(job,new Path("file:///D:\\mapreduce\\output"));

//获取fileSystem
FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.17.100:8020"),new Configuration());
//判断目录是否存在
boolean bl2 = fileSystem.exists(path);
if(bl2){
//删除目标目录
fileSystem.delete(path,true);
}

//等待任务结束
boolean b = job.waitForCompletion(true);
return b?0:1;
}

/**
* 程序main函数的入口类
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception{
Configuration configuration = new Configuration();
Tool tool = new JobMain();
int run = ToolRunner.run(configuration,tool,args);
System.exit(run);
}
}

运行程序

本地运行

  1. mapreduce程序时再本地以单进程的形式运行
  2. 处理的数据及输出结果在本地文件系统

JobMain类中修改以下两行代码:

1
2
TextInputFormat.addInputPath(job,new Path("file:///D:\\mapreduce\\wordcount.txt"));
TextOutputFormat.setOutputPath(job,new Path("file:///D:\\mapreduce\\output"));

然后再IDEA中右键点击run即可。

集群运行模式

  1. 将mapreduce程序提交给yarn集群,分发到很多节点上并发执行;

  2. 处理的数据和输出结果位于hdfs文件系统;

  3. 提交集群的实现步骤:将程序打包成jar包,然后再集群的任意一个节点上用hadoop命令启动。

    1
    $ hadoop jar original-mapReduce-1.0-SNAPSHOT.jar cn.wjqixige.demo01.JobMain

    执行结果如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    flume	3
    hadoop 6
    hello 6
    hive 4
    jerry 3
    kitty 3
    sqoop 4
    tom 3
    world 6

MapReduce分区

在MapReduce中,通过指定分区,会将同一个分区的数据发送到同一个reduce当中进行处理。简单说就是相同类型的数据,送到一起去处理,在reduce当中默认分区只有1个。

MapReduce当中的的分区类图:

MapReduce排序以及序列化

序列化(Serialization)是指结构化对象转换为字节流。

反序列化(Deserialization)是序列化的逆过程。把字节流转为结构化对象。

当要在进程间传递对象或持久化对象的时候,就需要序列化对象为字节流;反之当要接受到或从磁盘读取的字节流转换为对象,就要进行反序列化。

hadoop自己开发了一套序列化机制(Writable)。Writable是hadoop的序列化格式,hadoop定义了这样一个Wirtable接口,一个类要支持可序列化只需要实现这个接口即可。

Writable有一个子接口是WritableComparable, 该接口既可以实现序列化,也可以对key进行比较。

MapReduce的规约(combiner)

每一个map都会产生大量的本地输出,Combiner的作用就是对map端的输出先做一次合并,以减少map和reduce节点之间的数据传输量,提高网络IO性能,是MapReduce的一种优化手段之一。

  • combiner是MR程序中Mapper和Reducer之外的一种组件;
  • combiner组件的父类就是Reducer;
  • combiner和reducer的取别在于运行的位置:
    • combiner是在每一个maptask所在的节点运行;
    • Reduer是接受全局所有Mapper的输出结果
  • combiner的意义就是对每一个maptask的输出进行局部汇总,以减少网络传输量

实现步骤

  1. 自定义一个combiner继承Reducer,重写reduce方法;
  2. 在job中设置:job.setCombinerClass(CustomCominer.class)

combiner能够应用的前提是不能影响最终的业务逻辑,而且combiner的输出key-value跟reducer的输入key-value类型要对应起来。

MapReduce中的计数器

计数器是手机作业统计信息的有效手段之一,用于质量控制或应用级统计。计数器可辅助诊断系统故障。如果需要将日志信息传输到map或reduce任务, 更好的方法通常是看能否用一个计数器值来记录某一特定事件的发生。对于大型分布式作业而言,使用计数器更为方便。除了因为获取计数器值比输出日志更方便,还有根据计数器值统计特定事件的发生次数要比分析一堆日志文件容易得多。

hadoop内置计数器列表(具体参考《hadoop权威指南第四版》P244页)

组别 名称/类别
MapReduce任务计数器 org.apache.hadoop.mapreduce.TaskCounter
文件系统计数器 org.apache.hadoop.mapreduce.FileSystemCounter
FileInputFormat计数器 org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter
FileOutputFormat计数器 org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter
作业计数器 org.apache.hadoop.mapreduce.JobCounter

每次mapreduce执行完成之后,我们都会看到一些日志记录出来,其中最重要的一些日志记录如下截图:

MapReduce运行机制

MapTask工作机制

整个Map阶段流程大体如上图所示。简单概述:inputFile通过split被逻辑切分为多个split文件,通过Record按行读取内容给map(用户自己实现的)进行处理,数据被map处理结束之后交给OutputCollector收集器,对其结果key进行分区(默认使用hash分区),然后写入buffer,每个map task都有一个内存缓冲区,存储着map的输出结果,当缓冲区快满的时候需要将缓冲区的数据以一个临时文件的方式存放到磁盘,当整个map task结束后再对磁盘中这个map task产生的所有临时文件做合并,生成最终的正式输出文件,然后等待reduce task来拉数据。

详细步骤

  1. 首先,读取数据组件InputFormat(默认TextInputFormat)会通过getSplits方法对输入目录中文件进行逻辑切片规划得到splits,有多少个split就对应启动多少个MapTask。split与block的对应关系默认是一对一。

  2. 将输入文件切分为splits之后,由RecordReader对象(默认LineRecordReader)进行读取,以\n作为分隔符,读取一行数据,返回<key,value>。Key表示每行首字符偏移值,value表示这一行文本内容。

  3. 读取split返回<key,value>,进入用户自己继承的Mapper类中,执行用户重写的map函数。RecordReader读取一行这里调用一次。

  4. map逻辑完之后,将map的每条结果通过context.write进行collect数据收集。在collect中,会先对其进行分区处理,默认使用HashPartitioner。

    MapReduce提供Partitioner接口,它的作用就是根据key或value及reduce的数量来决定当前的这对输出数据最终应该交由哪个reduce task处理。默认对key hash后再以reduce task数量取模。默认的取模方式只是为了平均reduce的处理能力,如果用户自己对Partitioner有需求,可以订制并设置到job上。

  5. 接下来,会将数据写入内存,内存中这片区域叫做环形缓冲区,缓冲区的作用是批量收集map结果,减少磁盘IO的影响。我们的key/value对以及Partition的结果都会被写入缓冲区。当然写入之前,key与value值都会被序列化成字节数组。

    环形缓冲区其实是一个数组,数组中存放着key、value的序列化数据和key、value的元数据信息,包括partition、key的起始位置、value的起始位置以及value的长度。环形结构是一个抽象概念。缓冲区是有大小限制,默认是100MB。当map task的输出结果很多时,就可能会撑爆内存,所以需要在一定条件下将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区。这个从内存往磁盘写数据的过程被称为Spill,中文可译为溢写。这个溢写是由单独线程来完成,不影响往缓冲区写map结果的线程。溢写线程启动时不应该阻止map的结果输出,所以整个缓冲区有个溢写的比例spill.percent。这个比例默认是0.8,也就是当缓冲区的数据已经达到阈值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。Map task的输出结果还可以往剩下的20MB内存中写,互不影响。

  6. 当溢写线程启动后,需要对这80MB空间内的key做排序(Sort)。排序是MapReduce模型默认的行为,这里的排序也是对序列化的字节做的排序。

    如果job设置过Combiner,那么现在就是使用Combiner的时候了。将有相同key的key/value对的value加起来,减少溢写到磁盘的数据量。Combiner会优化MapReduce的中间结果,所以它在整个模型中会多次使用。

    那哪些场景才能使用Combiner呢?从这里分析,Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果。Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。Combiner的使用一定得慎重,如果用好,它对job执行效率有帮助,反之会影响reduce的最终结果。

  7. 合并溢写文件:每次溢写会在磁盘上生成一个临时文件(写之前判断是否有combiner),如果map的输出结果真的很大,有多次这样的溢写发生,磁盘上相应的就会有多个临时文件存在。当整个数据处理结束之后开始对磁盘中的临时文件进行merge合并,因为最终的文件只有一个,写入磁盘,并且为这个文件提供了一个索引文件,以记录每个reduce对应数据的偏移量。

    至此map整个阶段结束。

mapTask的一些基础设置配置(mapred-site.xml)

设置一:设置环型缓冲区的内存值大小

1
2
3
4
<property>
<name>mapreduce.tash.io.sort.mb</name>
<value>100</value>
</property>

设置二:设置溢写百分比

1
2
3
4
<property>
<name>mapreduce.map.sort.spill.percent</name>
<value>0.80</value>
</property>

设置三:设置溢写数据目录

1
2
3
4
<property>
<name>mapreduce.cluster.local.dir</name>
<value>${hadoop.tmp.dir}/mapred/local</value>
</property>

设置四:设置一次最多合并多少个溢写文件

1
2
3
4
<property>
<name>mapreduce.task.io.sort.factor</name>
<value>10</value>
</property>

ReduceTask工作机制

Reduce大致分为copy、sort、reduce三个阶段,重点在前两个阶段。copy阶段包含一个eventFetcher来获取已完成的map列表,由Fetcher线程去copy数据,在此过程中会启动两个merge线程,分别为inMemoryMerger和onDiskMerger,分别将内存中的数据merge到磁盘和将磁盘中的数据进行merge。待数据copy完成之后,copy阶段就完成了,开始进行sort阶段,sort阶段主要是执行finalMerge操作,纯粹的sort阶段,完成之后就是reduce阶段,调用用户定义的reduce函数进行处理。

详细步骤:

  1. Copy阶段,简单地拉取数据。Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求maptask获取属于自己的文件。

  2. Merge阶段,这里的merge如map端的merge动作,只是数组中存放的是不同map端copy来的数值。Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活。merge有三种形式:内存到内存;内存到磁盘;磁盘到磁盘。默认情况下第一种形式不启用。当内存中的数据量到达一定阈值,就启动内存到磁盘的merge。与map 端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。第二种merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的文件。

  3. 合并排序,把分散的数据合并成一个大的数据后,还会再对合并后的数据排序。

  4. 对排序后的键值对调用reduce方法,键相等的键值对调用一次reduce方法,每次调用会产生零个或者多个键值对,最后把这些输出的键值对写入到HDFS文件中。

Shuffle过程

map阶段处理的数据如何传递给reduce阶段,是MapReduce框架中最关键的一个流程,这个流程就叫shuffle。

shuffle: 洗牌、发牌——(核心机制:数据分区,排序,分组,规约,合并等过程)。

shuffle是Mapreduce的核心,它分布在Mapreduce的map阶段和reduce阶段。一般把从Map产生输出开始到Reduce取得数据作为输入之前的过程称作shuffle。

  1. Collect阶段:将MapTask的结果输出到默认大小为100M的环形缓冲区,保存的是key/value,Partition分区信息等。

  2. Spill阶段:当内存中的数据量达到一定的阀值的时候,就会将数据写入本地磁盘,在将数据写入磁盘之前需要对数据进行一次排序的操作,如果配置了combiner,还会将有相同分区号和key的数据进行排序。

  3. Merge阶段:把所有溢出的临时文件进行一次合并操作,以确保一个MapTask最终只产生一个中间数据文件。

  4. Copy阶段:ReduceTask启动Fetcher线程到已经完成MapTask的节点上复制一份属于自己的数据,这些数据默认会保存在内存的缓冲区中,当内存的缓冲区达到一定的阀值的时候,就会将数据写到磁盘之上。

  5. Merge阶段:在ReduceTask远程复制数据的同时,会在后台开启两个线程对内存到本地的数据文件进行合并操作。

  6. Sort阶段:在对数据进行合并的同时,会进行排序操作,由于MapTask阶段已经对数据进行了局部的排序,ReduceTask只需保证Copy的数据的最终整体有效性即可。

Shuffle中的缓冲区大小会影响到mapreduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快

缓冲区的大小可以通过参数调整, 参数:mapreduce.task.io.sort.mb 默认100M

shuffle阶段数据的压缩机制

在shuffle阶段,可以看到数据通过大量的拷贝,从map阶段输出的数据,都要通过网络拷贝,发送到reduce阶段,这一过程中,涉及到大量的网络IO,如果数据能够进行压缩,那么数据的发送量就会少得多。

hadoop当中支持的压缩算法

文件压缩有两大好处,节约磁盘空间,加速数据在网络和磁盘上的传输

使用bin/hadoop checknative 来查看我们编译之后的hadoop支持的各种压缩,如果出现openssl为false,那么就在线安装一下依赖包:

1
2
$ bin/hadoop checknative
$ yum install openssl-devel

hadoop支持的压缩算法

压缩格式 工具 算法 文件扩展名 是否可切分
DEFLATE DEFLATE .deflate
Gzip gzip DEFLATE .gz
bzip2 bzip2 bzip2 bz2
LZO lzop LZO .lzo
LZ4 LZ4 .lz4
Snappy Snappy .snappy

各种压缩算法对应使用的java类

压缩格式 对应使用的java类
DEFLATE org.apache.hadoop.io.compress.DeFaultCodec
gzip org.apache.hadoop.io.compress.GZipCodec
bzip2 org.apache.hadoop.io.compress.BZip2Codec
LZO com.hadoop.compression.lzo.LzopCodec
LZ4 org.apache.hadoop.io.compress.Lz4Codec
Snappy org.apache.hadoop.io.compress.SnappyCodec

常见的压缩速率比较

压缩算法 原始文件大小 压缩后的文件大小 压缩速度 解压缩速度
gzip 8.3GB 1.8GB 17.5MB/s 58MB/s
bzip2 8.3GB 1.1GB 2.4MB/s 9.5MB/s
LZO-bset 8.3GB 2GB 4MB/s 60.6MB/s
LZO 8.3GB 2.9GB 49.3MB/S 74.6MB/s

如何开启压缩

方式一:代码中进行设置压缩

设置map阶段的压缩

1
2
3
Configuration configuration = new Configuration();
configuration.set("mapreduce.map.output.compress","true");
configuration.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");

设置reduce阶段的压缩

1
2
3
configuration.set("mapreduce.output.fileoutputformat.compress","true");
configuration.set("mapreduce.output.fileoutputformat.compress.type","RECORD");
configuration.set("mapreduce.output.fileoutputformat.compress.codec","org.apache.hadoop.io.compress.SnappyCodec");

方式二:配置全局的MR压缩

修改mapred-site.xml配置文件,然后重启集群,以便对所有的mapreduce任务进行压缩。

map输出数据进行压缩

1
2
3
4
5
6
7
8
<property>
<name>mapreduce.map.output.compress</name>
<value>true</value>
</property>
<property>
<name>mapreduce.map.output.compress.codec</name>
<value>org.apache.hadoop.io.compress.SnappyCodec</value>
</property>

reduce输出数据进行压缩

1
2
3
4
5
6
7
8
9
10
11
<property>
<name>mapreduce.output.fileoutputformat.compress</name>
<value>true</value>
</property>
<property>
<name>mapreduce.output.fileoutputformat.compress.type</name>
<value>RECORD</value>
</property>
<property><name>mapreduce.output.fileoutputformat.compress.codec</name>
<value>org.apache.hadoop.io.compress.SnappyCodec</value>
</property>

所有节点都要修改mapred-site.xml,修改完成之后记得重启集群

Mapreduce的其他补充

多job串联

一个稍复杂点的处理逻辑往往需要多个mapreduce程序串联处理,多job的串联可以借助mapreduce框架的JobControl实现

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
ControlledJob cJob1 = new ControlledJob(job1.getConfiguration());             ControlledJob cJob2 = new ControlledJob(job2.getConfiguration());             ControlledJob cJob3 = new ControlledJob(job3.getConfiguration());             cJob1.setJob(job1);
cJob2.setJob(job2);
cJob3.setJob(job3);
// 设置作业依赖关系
cJob2.addDependingJob(cJob1);
cJob3.addDependingJob(cJob2);
JobControl jobControl = new JobControl("RecommendationJob"); jobControl.addJob(cJob1);
jobControl.addJob(cJob2);
jobControl.addJob(cJob3);
// 新建一个线程来运行已加入JobControl中的作业,开始进程并等待结束
Thread jobControlThread = new Thread(jobControl);
jobControlThread.start();
while (!jobControl.allFinished()) {
Thread.sleep(500);
}
jobControl.stop();
return 0;

mapreduce参数优化

MapReduce重要配置参数

资源相关参数

以下调整参数都在mapred-site.xml这个配置文件当中,在用户自己的mr应用程序中配置就可以生效:

配置项 描述 默认值
mapreduce.map.memory.mb 一个MapTask可使用的资源上限(单位:MB) 1024MB,如果Map Task实际使用的资源量超过该值,则会被强制杀死
mapreduce.reduce.memory.mb 一个ReduceTask可使用的资源上限 1024MB,如果Reduce Task实际使用的资源量超过该值,则会被强制杀死
mapred.child.java.opts 配置每个map或者reduce使用的内存的大小 200M
mapreduce.map.cpu.vcores 每个Map task可使用的最多cpu core数目 1
mapreduce.reduce.cpu.vcores 每个Reduce task可使用的最多cpu core数目 1

shuffle性能优化的关键参数,应在yarn启动之前就配置好

配置项 描述 默认值
mapreduce.task.io.sort.mb shuffle的环形缓冲区大小 100MB
mapreduce.map.sort.spill.percent 环形缓冲区溢出的阈值 0.8

应该在yarn启动之前就配置在服务器的配置文件中才能生效,都在yarn-site.xml配置文件当中配置

配置项 描述 默认值
yarn.scheduler.minimum-allocation-mb 给应用程序container分配的最小内存 1024
yarn.scheduler.maximum-allocation-mb 给应用程序container分配的最大内存 8192
yarn.scheduler.minimum-allocation-vcores 1
yarn.scheduler.maximum-allocation-vcores 32
yarn.nodemanager.resource.memory-mb 8192

容错相关参数

  1. mapreduce.map.maxattempts: 每个Map Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。

  2. mapreduce.reduce.maxattempts: 每个Reduce Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。

  3. mapreduce.job.maxtaskfailures.per.tracker: 当失败的Map Task失败比例超过该值为,整个作业则失败,默认值为0. 如果你的应用程序允许丢弃部分输入数据,则该该值设为一个大于0的值,比如5,表示如果有低于5%的Map Task失败(如果一个Map Task重试次数超过mapreduce.map.maxattempts,则认为这个Map Task失败,其对应的输入数据将不会产生任何结果),整个作业仍认为成功。

  4. mapreduce.task.timeout: Task超时时间,默认值为600000毫秒,经常需要设置的一个参数,该参数表达的意思为:如果一个task在一定时间内没有任何进入,即不会读取新的数据,也没有输出数据,则认为该task处于block状态,可能是卡住了,也许永远会卡主,为了防止因为用户程序永远block住不退出,则强制设置了一个该超时时间(单位毫秒)。如果你的程序对每条输入数据的处理时间过长(比如会访问数据库,通过网络拉取数据等),建议将该参数调大,该参数过小常出现的错误提示是“AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.”。

本地运行mapreduce 作业

设置以下几个参数:

1
2
3
mapreduce.framework.name=local
mapreduce.jobtracker.address=local
fs.defaultFS=local

效率和稳定性相关参数

  1. mapreduce.map.speculative: 是否为Map Task打开推测执行机制,默认为true,如果为true,如果Map执行时间比较长,那么集群就会推测这个Map已经卡住了,会重新启动同样的Map进行并行的执行,哪个先执行完了,就采取哪个的结果来作为最终结果,一般直接关闭推测执行

  2. mapreduce.reduce.speculative: 是否为Reduce Task打开推测执行机制,默认为true,如果reduce执行时间比较长,那么集群就会推测这个reduce已经卡住了,会重新启动同样的reduce进行并行的执行,哪个先执行完了,就采取哪个的结果来作为最终结果,一般直接关闭推测执行

  3. mapreduce.input.fileinputformat.split.minsize: FileInputFormat做切片时的最小切片大小,默认为0

-------------本文结束感谢您的阅读-------------

本文标题:MapReduce学习笔记

文章作者:Mr.wj

发布时间:2019年12月14日 - 12:42

最后更新:2019年12月14日 - 12:55

原始链接:https://www.wjqixige.cn/2019/12/14/MapReduce学习笔记/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

Mr.wj wechat
欢迎您扫一扫上面的微信公众号,订阅我的博客!