Hadoop MapReduce:二次排序(Secondary Sort)

在《Hadoop MapReduce原理》一篇中我们介绍了Hadoop MapReduce的处理过程。此外,关于次排序的原理强烈推荐参考文章《Hadoop Basics III: Secondary Sort in MapReduce》。本篇旨在通过源码和示例的角度对自定义Key、Partitioner、SortComparator、GroupingComparator的关键配置进行介绍,而并非详细介绍二次排序(Secondary Sort)原理。

定义分区器

NewOutputCollector是一个收集器包装类, 当Mapper.map()函数中调用Context.write(key, value)最终会落实到NewOutputCollector.write()NewOutputCollector除了封装了MapOutputCollector对象,它还负责解析分区器,如下代码所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
JobConf job,
TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, ClassNotFoundException {
collector = createSortingCollector(job, reporter);
partitions = jobContext.getNumReduceTasks();
if (partitions > 1) {
partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
} else {
partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
@Override
public int getPartition(K key, V value, int numPartitions) {
return partitions - 1;
}
};
}
}

@Override
public void write(K key, V value) throws IOException, InterruptedException {
collector.collect(key, value,
partitioner.getPartition(key, value, partitions));
}

进一步从JobContextImpl.getPartitionerClass()可以看出,如果没有特别指定分区器类,那么默认使用HashPartitioner

1
2
3
4
5
public Class<? extends Partitioner<?,?>> getPartitionerClass() 
throws ClassNotFoundException {
return (Class<? extends Partitioner<?,?>>)
conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
}

如何设置

所以,我们可以通过 Job.setPartitionerClass()来设置自定义的分区器:

1
2
3
4
5
6
public void setPartitionerClass(Class<? extends Partitioner> cls
) throws IllegalStateException {
ensureState(JobState.DEFINE);
conf.setClass(PARTITIONER_CLASS_ATTR, cls,
Partitioner.class);
}

定义排序方式

在《Hadoop MapReduce原理》一篇中我们可以看到,数据排序发生在两处:一处是在map端的sort阶段,一处是在reduce端的sort的阶段。

  • Shuffle.run()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public RawKeyValueIterator run() throws IOException, InterruptedException {
... ...
// Start the map-output fetcher threads
boolean isLocal = localMapFiles != null;
final int numFetchers = isLocal ? 1 :
jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];
if (isLocal) {
fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler,
merger, reporter, metrics, this, reduceTask.getShuffleSecret(),
localMapFiles);
fetchers[0].start();
} else {
for (int i=0; i < numFetchers; ++i) {
fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger,
reporter, metrics, this,
reduceTask.getShuffleSecret());
fetchers[i].start();
}
}
... ...
copyPhase.complete(); // copy is already complete
taskStatus.setPhase(TaskStatus.Phase.SORT);

// Finish the on-going merges...
RawKeyValueIterator kvIter = null;
try {
kvIter = merger.close();
} catch (Throwable e) {
throw new ShuffleError("Error while doing final merge " , e);
}
... ...
return kvIter;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public RawKeyValueIterator close() throws Throwable {
... ...
List<InMemoryMapOutput<K, V>> memory =
new ArrayList<InMemoryMapOutput<K, V>>(inMemoryMergedMapOutputs);
inMemoryMergedMapOutputs.clear();
memory.addAll(inMemoryMapOutputs);
inMemoryMapOutputs.clear();
List<CompressAwarePath> disk = new ArrayList<CompressAwarePath>(onDiskMapOutputs);
onDiskMapOutputs.clear();
return finalMerge(jobConf, rfs, memory, disk);
}

private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs,
List<InMemoryMapOutput<K,V>> inMemoryMapOutputs,
List<CompressAwarePath> onDiskMapOutputs
) throws IOException {
... ...
final Path tmpDir = new Path(reduceId.toString());
final RawComparator<K> comparator =
(RawComparator<K>)job.getOutputKeyComparator();

// 合并内存中的map输出
if (inMemoryMapOutputs.size() > 0) {
... ...
if (numMemDiskSegments > 0 && ioSortFactor > onDiskMapOutputs.size()) {
... ...
final Path outputPath =
mapOutputFile.getInputFileForWrite(mapId,
inMemToDiskBytes).suffix(
Task.MERGED_OUTPUT_PREFIX);
final RawKeyValueIterator rIter = Merger.merge(job, fs,
keyClass, valueClass, memDiskSegments, numMemDiskSegments,
tmpDir, comparator, reporter, spilledRecordsCounter, null,
mergePhase);
... ...
} else if (inMemToDiskBytes != 0) {
... ...
}
}

// 将磁盘中的map输出映射为Segment对象
... ...

// 合并为最终文件
if (0 != onDiskBytes) {
... ...
RawKeyValueIterator diskMerge = Merger.merge(
job, fs, keyClass, valueClass, codec, diskSegments,
ioSortFactor, numInMemSegments, tmpDir, comparator,
reporter, false, spilledRecordsCounter, null, thisPhase);
diskSegments.clear();
if (0 == finalSegments.size()) {
return diskMerge;
}
finalSegments.add(new Segment<K,V>(
new RawKVIteratorReader(diskMerge, onDiskBytes), true, rawBytes));
}
return Merger.merge(job, fs, keyClass, valueClass,
finalSegments, finalSegments.size(), tmpDir,
comparator, reporter, spilledRecordsCounter, null,
null);
}
  • Merger.merge():
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
RawKeyValueIterator merge(Configuration conf, FileSystem fs,
Class<K> keyClass, Class<V> valueClass,
List<Segment<K, V>> segments,
int mergeFactor, Path tmpDir,
RawComparator<K> comparator, Progressable reporter,
boolean sortSegments,
Counters.Counter readsCounter,
Counters.Counter writesCounter,
Progress mergePhase)
throws IOException {
return new MergeQueue<K, V>(conf, fs, segments, comparator, reporter,
sortSegments,
TaskType.REDUCE).merge(keyClass, valueClass,
mergeFactor, tmpDir,
readsCounter, writesCounter,
mergePhase);
}
  • MergeQueue.merge(),MergeQueue自身是一个优先级队列,并且继承自RawKeyValueIterator
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass,
int factor, int inMem, Path tmpDir,
Counters.Counter readsCounter,
Counters.Counter writesCounter,
Progress mergePhase)
throws IOException {
... ...
//feed the streams to the priority queue
initialize(segmentsToMerge.size());
clear();
for (Segment<K, V> segment : segmentsToMerge) {
put(segment);
}
... ...
}

protected boolean lessThan(Object a, Object b) {
DataInputBuffer key1 = ((Segment<K, V>)a).getKey();
DataInputBuffer key2 = ((Segment<K, V>)b).getKey();
int s1 = key1.getPosition();
int l1 = key1.getLength() - s1;
int s2 = key2.getPosition();
int l2 = key2.getLength() - s2;

return comparator.compare(key1.getData(), s1, l1, key2.getData(), s2, l2) < 0;
}
  • JobConf.getOutputKeyComparator()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public RawComparator getOutputKeyComparator() {
Class<? extends RawComparator> theClass = getClass(
JobContext.KEY_COMPARATOR, null, RawComparator.class);
if (theClass != null)
return ReflectionUtils.newInstance(theClass, this);
return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);
}

public Class<?> getMapOutputKeyClass() {
Class<?> retv = getClass(JobContext.MAP_OUTPUT_KEY_CLASS, null, Object.class);
if (retv == null) {
retv = getOutputKeyClass();
}
return retv;
}

public Class<?> getOutputKeyClass() {
return getClass(JobContext.OUTPUT_KEY_CLASS,
LongWritable.class, Object.class);
}

如何设置

  1. 可以使用Job.setSortComparatorClass()进行显式设置;
  2. JobConf.getOutputKeyComparator()可以看出,如果没有使用Job.setSortComparatorClass()设置,那么默认使用Mapper的Key作为Comparator,Mapper Key需要实现WritableComparable<TextPair>.compareTo()方法,我们在实现业务需要的二次排序的时候可以通过这个方法实现我们的业务key排序方式,这样就不需要单独定义一个Comparator类来设置Job.setSortComparatorClass();但如果我们计划使用同一个Mapper Key来处理多种二次排序方式,建议还是通过为每种排序方式定义不同的Comparator类,使用Job.setSortComparatorClass()来设置。

参考下图:

定义分组方式

  • ReduceTask.run()中调用job.getOutputValueGroupingComparator()作为分组的Comparator,如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, InterruptedException, ClassNotFoundException {

Class keyClass = job.getMapOutputKeyClass();
Class valueClass = job.getMapOutputValueClass();
RawComparator comparator = job.getOutputValueGroupingComparator();
... ...
if (useNewApi) {
runNewReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
} else {
... ...
}
... ...
}

private <INKEY,INVALUE,OUTKEY,OUTVALUE>
void runNewReducer(JobConf job,
final TaskUmbilicalProtocol umbilical,
final TaskReporter reporter,
RawKeyValueIterator rIter,
RawComparator<INKEY> comparator,
Class<INKEY> keyClass,
Class<INVALUE> valueClass
) throws IOException,InterruptedException,
ClassNotFoundException {
... ...
// make a reducer
org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =
(org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)
ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
... ...
org.apache.hadoop.mapreduce.Reducer.Context
reducerContext = createReduceContext(reducer, job, getTaskID(),
rIter, reduceInputKeyCounter,
reduceInputValueCounter,
trackedRW,
committer,
reporter, comparator, keyClass,
valueClass);
try {
reducer.run(reducerContext);
} finally {
... ...
}
}
  • Reducer.run()中调用ReduceContextImpl.nextKey()进行reduce端key的分组,如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKey()) {
reduce(context.getCurrentKey(), context.getValues(), context);
// If a back up store is used, reset it
Iterator<VALUEIN> iter = context.getValues().iterator();
if(iter instanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();
}
}
} finally {
cleanup(context);
}
}
  • ReduceContextImpl.nextKeyValue()使用GroupingComparator比较record中的key,如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public boolean nextKey() throws IOException,InterruptedException {
while (hasMore && nextKeyIsSame) {
nextKeyValue();
}
if (hasMore) {
if (inputKeyCounter != null) {
inputKeyCounter.increment(1);
}
return nextKeyValue();
} else {
return false;
}
}

public boolean nextKeyValue() throws IOException, InterruptedException {
... ...
key = keyDeserializer.deserialize(key);
value = valueDeserializer.deserialize(value);
... ...
hasMore = input.next();
if (hasMore) {
nextKey = input.getKey();
nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0,
currentRawKey.getLength(),
nextKey.getData(),
nextKey.getPosition(),
nextKey.getLength() - nextKey.getPosition()
) == 0;
} else {
nextKeyIsSame = false;
}
inputValueCounter.increment(1);
return true;
}
  • JobConf.getOutputValueGroupingComparator()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public RawComparator getOutputValueGroupingComparator() {
Class<? extends RawComparator> theClass = getClass(
JobContext.GROUP_COMPARATOR_CLASS, null, RawComparator.class);
if (theClass == null) {
return getOutputKeyComparator();
}

return ReflectionUtils.newInstance(theClass, this);
}

public RawComparator getOutputKeyComparator() {
Class<? extends RawComparator> theClass = getClass(
JobContext.KEY_COMPARATOR, null, RawComparator.class);
if (theClass != null)
return ReflectionUtils.newInstance(theClass, this);
return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);
}

public Class<?> getMapOutputKeyClass() {
Class<?> retv = getClass(JobContext.MAP_OUTPUT_KEY_CLASS, null, Object.class);
if (retv == null) {
retv = getOutputKeyClass();
}
return retv;
}

public Class<?> getOutputKeyClass() {
return getClass(JobContext.OUTPUT_KEY_CLASS,
LongWritable.class, Object.class);
}

如何设置

  1. 可以使用Job.setGroupingComparatorClass()进行显式设置;
  2. JobConf.getOutputValueGroupingComparator()可以看出,如果没有使用Job.setGroupingComparatorClass()设置,那么默认使用Mapper的中间Key作为Comparator;

参考下图:

参考

  1. http://blog.ditullio.fr/2015/12/28/hadoop-basics-secondary-sort-in-mapreduce/
  2. https://medium.com/@sudarshan_sreenivasan/what-is-secondary-sort-in-hadoop-and-how-does-it-work-fe35609b5319