Hadoop MapReduce:编程
(未完)
认识最简单的MapReduce程序
以下MinimalMapReduce.java实现了一个简单的读-写功能,MinimalMapReduceWithDefaults.java与MinimalMapReduce.java是等价的,它是对MinimalMapReduce.java显式设置了默认代码而成。其中,参考了MinimalMapReduce和MinimalMapReduceWithDefaults。
1 | public class MinimalMapReduce extends Configured implements Tool { |
1 | public class MinimalMapReduceWithDefaults extends Configured implements Tool { |
MapReduce程序相关
表1:通过属性和方法设置对应的类型
| Job设置方法 | 属性 | 输入类型 | 中间类型 | 输出类型 | |||
|---|---|---|---|---|---|---|---|
| K1 | V1 | K2 | V2 | K3 | V3 | ||
| job.setInputFormatClass() | mapreduce.job.inputformat.class | Y | Y | ||||
| job.setMapOutputKeyClass() | mapreduce.map.output.key.class | Y | |||||
| job.setMapOutputValueClass() | mapreduce.map.output.value.class | Y | |||||
| job.setOutputKeyClass() | mapreduce.job.output.key.class | Y | |||||
| job.setOutputValueClass() | mapreduce.job.output.value.class | Y | |||||
- 输入类型:输入类型是由
InputFormat设置的; - 中间类型:如果没有显式设置中间类型,那么中间类型默认是对应的输出类型;
- 输出类型:输出类型在有Reduce任务时为Reduce任务的输出;
表2:该表中的使用Job设置方法设置的类中范型类型必须兼容表1中相应的类型
| Job设置方法 | 属性 | 输入类型 | 中间类型 | 输出类型 | |||
|---|---|---|---|---|---|---|---|
| K1 | V1 | K2 | V2 | K3 | V3 | ||
| job.setMapperClass() | mapreduce.job.map.class | Y | Y | Y | Y | ||
| job.setCombinerClass() | mapreduce.job.combine.class | Y | Y | ||||
| job.setPartitionerClass() | mapreduce.job.partitioner.class | Y | Y | ||||
| job.setSortComparatorClass() | mapreduce.job.output.key.comparator.class | Y | |||||
| job.setGroupingComparatorClass() | mapreduce.job.output.group.comparator.class | Y | |||||
| job.setReducerClass() | mapreduce.job.reduce.class | Y | Y | Y | Y | ||
| job.setOutputFormatClass() | mapreduce.job.outputformat.class | Y | Y | ||||
注意:表1中设置的类型是运行时的数据类型;而表2中的设置的类型为编译时的类型,目的是编程时方便做编译检查。
Input Format
| InputFormat<K, V> | |||
| FileInputFormat<K, V> | |||
| TextInputFormat | |||
| KeyValueTextInputFormat | |||
| FixedLengthInputFormat | |||
| NLineInputFormat | |||
| CombineFileInputFormat<K, V> | |||
| SequenceFileInputFormat<K, V> | |||
| SequenceFileAsBinaryInputFormat | |||
| SequenceFileAsTextInputFormat | |||
| SequenceFileInputFilter<K, V> | |||
| ComposableInputFormat<K, V> | |||
| CompositeInputFormat<K> | |||
| DBInputFormat<T> |
文本输入
TextInputFormat
说明:TextInputFormat是MR中默认的InputFormat,它按行解释文件。其中,键是LongWritable类型的,表示当前数据行起点在该文件中的字节偏移量;值是Text类型的,表示当前行的内容。
以下记录为文本文件中的一个分片内容:
1 | On the top of the Crumpetty Tree |
如果使用TextInputFormat来解释,会得到如下键-值对:
1 | (0, On the top of the Crumpetty Tree) |
KeyValueTextInputFormat
说明:KeyValueTextInputFormat按行解释文件,按照分隔符,将一行解释为一个键-值对。其中,键是Text类型的,表示当前行中分隔符前面的文本内容;值也是Text类型的,表示当前行中分隔符后面的文本内容。
以下记录为文本文件中的一个分片内容,其中,箭号→表示制表符:
1 | line1→On the top of the Crumpetty Tree |
如果使用KeyValueTextInputFormat来解释,会得到如下键-值对:
1 | (line1, On the top of the Crumpetty Tree) |
NLineInputFormat
说明:NLineInputFormat对输入文件按固定N行进行逻辑切分为一个分片,它达到的效果是每个Mapper处理单元处理相同数量的数据。其中,键是LongWritable类型的,表示当前数据行起点在该文件中的字节偏移量;值是Text类型的,表示当前行的内容。
备注,其中N默认为1,使用mapreduce.input.lineinputformat.linespermap来控制N的值。
以下记录为全部文本文件中内容:
1 | On the top of the Crumpetty Tree |
使用NLineInputFormat,设置N=2,那么每个分片都包含2行:
1 | // Mapper-1接收到的键值对 |
使用场景
参数扫描:创建一个输入文件,文件中每行表示不同的参数,根据数据行并行启动处理单元(即Mapper);
二进制输入
FixedLengthInputFormat
SequenceFileInputFormat
SequenceFileAsTextInputFormat
SequenceFileAsBinaryInputFormat
SequenceFileInputFilter
数据库输入
DBInputFormat
多路径输入
MultipleInputs
尽管,我们可以为MapReduce任务指定多个输入文件,但所有这些文件都将使用同样的InputFormat和Mapper进行解释。然而,使用MultipleInputs可以做到为每个路径指定不同的InputFormat和Mapper类。
以下是MultipleInputs类中提供的两个工具方法:
1 | public static void addInputPath(Job job, Path path, |
使用MultipleInputs.addInputPath(job, path, inputFormatClass, mapperClass)后我们不需要再调用FileInputFormat.addInputPath()和 Job.setMapperClass()了。
Output Format
| OutputFormat<K, V> | |||
| FileOutputFormat<K, V> | |||
| TextOutputFormat<K, V> | |||
| MapFileOutputFormat | |||
| SequenceFileOutputFormat <K,V> | |||
| SequenceFileAsBinaryOutputFormat | |||
| NullOutputFormat<K, V> | |||
| DBOutputFormat<K extends DBWritable, V> | |||
| FilterOutputFormat <K,V> | |||
| LazyOutputFormat <K,V> |
文本输出
TextOutputFormat
二进制输出
SequenceFileOutputFormat
SequenceFileAsBinaryOutputFormat
MapFileOutputFormat
数据库输出
多路径输出
MultipleOutputs
LazyOutputFormat
相关参数
Shuffle
失败
任务失败
| 参数 | 默认值 | 范围 | 描述 |
|---|---|---|---|
| mapreduce.task.timeout | 600000 | ||
| mapreduce.map.maxattempts | 4 | ||
| mapreduce.reduce.maxattempts | 4 | ||
| mapreduce.map.failures.maxpercent | |||
| mapreduce.reduce.failures.maxpercent |
Application Master失败
| 参数 | 默认值 | 范围 | 描述 |
|---|---|---|---|
| mapreduce.am.max-attempts | 2 | ||
| yarn.resourcemanager.am.max-attempts | 2 | ||
| yarn.app.mapreduce.am.job.recovery.enable | |||
Node Manager失败
| 参数 | 默认值 | 范围 | 描述 |
|---|---|---|---|
| yarn.resourcemanager.nm.liveness-monitor.expiry-interval-ms | |||
| mapreduce.job.maxtaskfailures.per.tracker | |||
参考