Hadoop MapReduce:编程

(未完)

认识最简单的MapReduce程序

以下MinimalMapReduce.java实现了一个简单的读-写功能,MinimalMapReduceWithDefaults.javaMinimalMapReduce.java是等价的,它是对MinimalMapReduce.java显式设置了默认代码而成。其中,参考了MinimalMapReduceMinimalMapReduceWithDefaults

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class MinimalMapReduce extends Configured implements Tool {
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new MinimalMapReduce(), args);
System.exit(res);
}

@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.printf("Usage: %s [generic options] <input> <output>\n",
getClass().getSimpleName());
ToolRunner.printGenericCommandUsage(System.err);
return -1;
}

Job job = Job.getInstance(getConf());
job.setJarByClass(getClass());
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

return job.waitForCompletion(true) ? 0 : 1;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class MinimalMapReduceWithDefaults extends Configured implements Tool {
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new MinimalMapReduce(), args);
System.exit(res);
}

@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.printf("Usage: %s [generic options] <input> <output>\n",
getClass().getSimpleName());
ToolRunner.printGenericCommandUsage(System.err);
return -1;
}

Job job = Job.getInstance(getConf());
job.setJarByClass(getClass());
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

/*[*/
job.setInputFormatClass(TextInputFormat.class);

job.setMapperClass(Mapper.class);

job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);

job.setPartitionerClass(HashPartitioner.class);

job.setNumReduceTasks(1);
job.setReducerClass(Reducer.class);

job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);

job.setOutputFormatClass(TextOutputFormat.class);
/*]*/

return job.waitForCompletion(true) ? 0 : 1;
}
}

MapReduce程序相关

表1:通过属性和方法设置对应的类型

Job设置方法 属性 输入类型 中间类型 输出类型
K1 V1 K2 V2 K3 V3
job.setInputFormatClass() mapreduce.job.inputformat.class Y Y
job.setMapOutputKeyClass() mapreduce.map.output.key.class Y
job.setMapOutputValueClass() mapreduce.map.output.value.class Y
job.setOutputKeyClass() mapreduce.job.output.key.class Y
job.setOutputValueClass() mapreduce.job.output.value.class Y
  • 输入类型:输入类型是由InputFormat设置的;
  • 中间类型:如果没有显式设置中间类型,那么中间类型默认是对应的输出类型;
  • 输出类型:输出类型在有Reduce任务时为Reduce任务的输出;

表2:该表中的使用Job设置方法设置的类中范型类型必须兼容表1中相应的类型

Job设置方法 属性 输入类型 中间类型 输出类型
K1 V1 K2 V2 K3 V3
job.setMapperClass() mapreduce.job.map.class Y Y Y Y
job.setCombinerClass() mapreduce.job.combine.class Y Y
job.setPartitionerClass() mapreduce.job.partitioner.class Y Y
job.setSortComparatorClass() mapreduce.job.output.key.comparator.class Y
job.setGroupingComparatorClass() mapreduce.job.output.group.comparator.class Y
job.setReducerClass() mapreduce.job.reduce.class Y Y Y Y
job.setOutputFormatClass() mapreduce.job.outputformat.class Y Y

注意:表1中设置的类型是运行时的数据类型;而表2中的设置的类型为编译时的类型,目的是编程时方便做编译检查。

Input Format

InputFormat<K, V>
FileInputFormat<K, V>
TextInputFormat
KeyValueTextInputFormat
FixedLengthInputFormat
NLineInputFormat
CombineFileInputFormat<K, V>
SequenceFileInputFormat<K, V>
SequenceFileAsBinaryInputFormat
SequenceFileAsTextInputFormat
SequenceFileInputFilter<K, V>
ComposableInputFormat<K, V>
CompositeInputFormat<K>
DBInputFormat<T>

文本输入

TextInputFormat

说明:TextInputFormat是MR中默认的InputFormat,它按行解释文件。其中,键是LongWritable类型的,表示当前数据行起点在该文件中的字节偏移量;值是Text类型的,表示当前行的内容。

以下记录为文本文件中的一个分片内容:

1
2
3
4
On the top of the Crumpetty Tree
The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat.

如果使用TextInputFormat来解释,会得到如下键-值对:

1
2
3
4
(0, On the top of the Crumpetty Tree)
(33, The Quangle Wangle sat,)
(57, But his face you could not see,)
(89, On account of his Beaver Hat.)

KeyValueTextInputFormat

说明:KeyValueTextInputFormat按行解释文件,按照分隔符,将一行解释为一个键-值对。其中,键是Text类型的,表示当前行中分隔符前面的文本内容;值也是Text类型的,表示当前行中分隔符后面的文本内容。

以下记录为文本文件中的一个分片内容,其中,箭号表示制表符:

1
2
3
4
line1→On the top of the Crumpetty Tree
line2→The Quangle Wangle sat,
line3→But his face you could not see,
line4→On account of his Beaver Hat.

如果使用KeyValueTextInputFormat来解释,会得到如下键-值对:

1
2
3
4
(line1, On the top of the Crumpetty Tree)
(line2, The Quangle Wangle sat,)
(line3, But his face you could not see,)
(line4, On account of his Beaver Hat.)

NLineInputFormat

说明:NLineInputFormat对输入文件按固定N行进行逻辑切分为一个分片,它达到的效果是每个Mapper处理单元处理相同数量的数据。其中,键是LongWritable类型的,表示当前数据行起点在该文件中的字节偏移量;值是Text类型的,表示当前行的内容。

备注,其中N默认为1,使用mapreduce.input.lineinputformat.linespermap来控制N的值。

以下记录为全部文本文件中内容:

1
2
3
4
On the top of the Crumpetty Tree
The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat.

使用NLineInputFormat,设置N=2,那么每个分片都包含2行:

1
2
3
4
5
6
7
// Mapper-1接收到的键值对
(0, On the top of the Crumpetty Tree)
(33, The Quangle Wangle sat,)

// Mapper-2接收到的键值对
(57, But his face you could not see,)
(89, On account of his Beaver Hat.)

使用场景

参数扫描:创建一个输入文件,文件中每行表示不同的参数,根据数据行并行启动处理单元(即Mapper);

二进制输入

FixedLengthInputFormat

SequenceFileInputFormat

SequenceFileAsTextInputFormat

SequenceFileAsBinaryInputFormat

SequenceFileInputFilter

数据库输入

DBInputFormat

多路径输入

MultipleInputs

尽管,我们可以为MapReduce任务指定多个输入文件,但所有这些文件都将使用同样的InputFormatMapper进行解释。然而,使用MultipleInputs可以做到为每个路径指定不同的InputFormatMapper类。

以下是MultipleInputs类中提供的两个工具方法:

1
2
3
4
5
6
public static void addInputPath(Job job, Path path,
Class<? extends InputFormat> inputFormatClass)

public static void addInputPath(Job job, Path path,
Class<? extends InputFormat> inputFormatClass,
Class<? extends Mapper> mapperClass)

使用MultipleInputs.addInputPath(job, path, inputFormatClass, mapperClass)后我们不需要再调用FileInputFormat.addInputPath()Job.setMapperClass()了。

Output Format

OutputFormat<K, V>
FileOutputFormat<K, V>
TextOutputFormat<K, V>
MapFileOutputFormat
SequenceFileOutputFormat <K,V>
SequenceFileAsBinaryOutputFormat
NullOutputFormat<K, V>
DBOutputFormat<K extends DBWritable, V>
FilterOutputFormat <K,V>
LazyOutputFormat <K,V>

文本输出

TextOutputFormat

二进制输出

SequenceFileOutputFormat

SequenceFileAsBinaryOutputFormat

MapFileOutputFormat

数据库输出

多路径输出

MultipleOutputs

LazyOutputFormat

相关参数

Shuffle

失败

任务失败

参数 默认值 范围 描述
mapreduce.task.timeout 600000
mapreduce.map.maxattempts 4
mapreduce.reduce.maxattempts 4
mapreduce.map.failures.maxpercent
mapreduce.reduce.failures.maxpercent

Application Master失败

参数 默认值 范围 描述
mapreduce.am.max-attempts 2
yarn.resourcemanager.am.max-attempts 2
yarn.app.mapreduce.am.job.recovery.enable

Node Manager失败

参数 默认值 范围 描述
yarn.resourcemanager.nm.liveness-monitor.expiry-interval-ms
mapreduce.job.maxtaskfailures.per.tracker

参考

  1. 《Hadoop权威指南 第4版》
  2. https://hadoop.apache.org/docs/r2.10.1/hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml
  3. https://hadoop.apache.org/docs/r2.10.1/hadoop-project-dist/hadoop-common/DeprecatedProperties.html