Hadoop MapReduce:编程
(未完)
认识最简单的MapReduce程序
以下MinimalMapReduce.java
实现了一个简单的读-写
功能,MinimalMapReduceWithDefaults.java
与MinimalMapReduce.java
是等价的,它是对MinimalMapReduce.java
显式设置了默认代码而成。其中,参考了MinimalMapReduce和MinimalMapReduceWithDefaults。
1 | public class MinimalMapReduce extends Configured implements Tool { |
1 | public class MinimalMapReduceWithDefaults extends Configured implements Tool { |
MapReduce程序相关
表1:通过属性和方法设置对应的类型
Job设置方法 | 属性 | 输入类型 | 中间类型 | 输出类型 | |||
---|---|---|---|---|---|---|---|
K1 | V1 | K2 | V2 | K3 | V3 | ||
job.setInputFormatClass() | mapreduce.job.inputformat.class | Y | Y | ||||
job.setMapOutputKeyClass() | mapreduce.map.output.key.class | Y | |||||
job.setMapOutputValueClass() | mapreduce.map.output.value.class | Y | |||||
job.setOutputKeyClass() | mapreduce.job.output.key.class | Y | |||||
job.setOutputValueClass() | mapreduce.job.output.value.class | Y |
- 输入类型:输入类型是由
InputFormat
设置的; - 中间类型:如果没有显式设置中间类型,那么中间类型默认是对应的输出类型;
- 输出类型:输出类型在有Reduce任务时为Reduce任务的输出;
表2:该表中的使用Job设置方法设置的类中范型类型必须兼容表1中相应的类型
Job设置方法 | 属性 | 输入类型 | 中间类型 | 输出类型 | |||
---|---|---|---|---|---|---|---|
K1 | V1 | K2 | V2 | K3 | V3 | ||
job.setMapperClass() | mapreduce.job.map.class | Y | Y | Y | Y | ||
job.setCombinerClass() | mapreduce.job.combine.class | Y | Y | ||||
job.setPartitionerClass() | mapreduce.job.partitioner.class | Y | Y | ||||
job.setSortComparatorClass() | mapreduce.job.output.key.comparator.class | Y | |||||
job.setGroupingComparatorClass() | mapreduce.job.output.group.comparator.class | Y | |||||
job.setReducerClass() | mapreduce.job.reduce.class | Y | Y | Y | Y | ||
job.setOutputFormatClass() | mapreduce.job.outputformat.class | Y | Y |
注意:表1中设置的类型是运行时的数据类型;而表2中的设置的类型为编译时的类型,目的是编程时方便做编译检查。
Input Format
InputFormat<K, V> | |||
FileInputFormat<K, V> | |||
TextInputFormat | |||
KeyValueTextInputFormat | |||
FixedLengthInputFormat | |||
NLineInputFormat | |||
CombineFileInputFormat<K, V> | |||
SequenceFileInputFormat<K, V> | |||
SequenceFileAsBinaryInputFormat | |||
SequenceFileAsTextInputFormat | |||
SequenceFileInputFilter<K, V> | |||
ComposableInputFormat<K, V> | |||
CompositeInputFormat<K> | |||
DBInputFormat<T> |
文本输入
TextInputFormat
说明:TextInputFormat
是MR中默认的InputFormat
,它按行解释文件。其中,键是LongWritable
类型的,表示当前数据行起点在该文件中的字节偏移量;值是Text
类型的,表示当前行的内容。
以下记录为文本文件中的一个分片内容:
1 | On the top of the Crumpetty Tree |
如果使用TextInputFormat
来解释,会得到如下键-值对:
1 | (0, On the top of the Crumpetty Tree) |
KeyValueTextInputFormat
说明:KeyValueTextInputFormat
按行解释文件,按照分隔符,将一行解释为一个键-值对。其中,键是Text
类型的,表示当前行中分隔符前面的文本内容;值也是Text
类型的,表示当前行中分隔符后面的文本内容。
以下记录为文本文件中的一个分片内容,其中,箭号→
表示制表符:
1 | line1→On the top of the Crumpetty Tree |
如果使用KeyValueTextInputFormat
来解释,会得到如下键-值对:
1 | (line1, On the top of the Crumpetty Tree) |
NLineInputFormat
说明:NLineInputFormat
对输入文件按固定N
行进行逻辑切分为一个分片,它达到的效果是每个Mapper
处理单元处理相同数量的数据。其中,键是LongWritable
类型的,表示当前数据行起点在该文件中的字节偏移量;值是Text
类型的,表示当前行的内容。
备注,其中N
默认为1,使用mapreduce.input.lineinputformat.linespermap
来控制N
的值。
以下记录为全部文本文件中内容:
1 | On the top of the Crumpetty Tree |
使用NLineInputFormat
,设置N=2
,那么每个分片都包含2行:
1 | // Mapper-1接收到的键值对 |
使用场景
参数扫描:创建一个输入文件,文件中每行表示不同的参数,根据数据行并行启动处理单元(即Mapper
);
二进制输入
FixedLengthInputFormat
SequenceFileInputFormat
SequenceFileAsTextInputFormat
SequenceFileAsBinaryInputFormat
SequenceFileInputFilter
数据库输入
DBInputFormat
多路径输入
MultipleInputs
尽管,我们可以为MapReduce任务指定多个输入文件,但所有这些文件都将使用同样的InputFormat
和Mapper
进行解释。然而,使用MultipleInputs
可以做到为每个路径指定不同的InputFormat
和Mapper
类。
以下是MultipleInputs
类中提供的两个工具方法:
1 | public static void addInputPath(Job job, Path path, |
使用MultipleInputs.addInputPath(job, path, inputFormatClass, mapperClass)
后我们不需要再调用FileInputFormat.addInputPath()
和 Job.setMapperClass()
了。
Output Format
OutputFormat<K, V> | |||
FileOutputFormat<K, V> | |||
TextOutputFormat<K, V> | |||
MapFileOutputFormat | |||
SequenceFileOutputFormat <K,V> | |||
SequenceFileAsBinaryOutputFormat | |||
NullOutputFormat<K, V> | |||
DBOutputFormat<K extends DBWritable, V> | |||
FilterOutputFormat <K,V> | |||
LazyOutputFormat <K,V> |
文本输出
TextOutputFormat
二进制输出
SequenceFileOutputFormat
SequenceFileAsBinaryOutputFormat
MapFileOutputFormat
数据库输出
多路径输出
MultipleOutputs
LazyOutputFormat
相关参数
Shuffle
失败
任务失败
参数 | 默认值 | 范围 | 描述 |
---|---|---|---|
mapreduce.task.timeout | 600000 | ||
mapreduce.map.maxattempts | 4 | ||
mapreduce.reduce.maxattempts | 4 | ||
mapreduce.map.failures.maxpercent | |||
mapreduce.reduce.failures.maxpercent |
Application Master失败
参数 | 默认值 | 范围 | 描述 |
---|---|---|---|
mapreduce.am.max-attempts | 2 | ||
yarn.resourcemanager.am.max-attempts | 2 | ||
yarn.app.mapreduce.am.job.recovery.enable | |||
Node Manager失败
参数 | 默认值 | 范围 | 描述 |
---|---|---|---|
yarn.resourcemanager.nm.liveness-monitor.expiry-interval-ms | |||
mapreduce.job.maxtaskfailures.per.tracker | |||
参考