1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| public RawKeyValueIterator close() throws Throwable { ... ... List<InMemoryMapOutput<K, V>> memory = new ArrayList<InMemoryMapOutput<K, V>>(inMemoryMergedMapOutputs); inMemoryMergedMapOutputs.clear(); memory.addAll(inMemoryMapOutputs); inMemoryMapOutputs.clear(); List<CompressAwarePath> disk = new ArrayList<CompressAwarePath>(onDiskMapOutputs); onDiskMapOutputs.clear(); return finalMerge(jobConf, rfs, memory, disk); }
private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs, List<InMemoryMapOutput<K,V>> inMemoryMapOutputs, List<CompressAwarePath> onDiskMapOutputs ) throws IOException { ... ... final Path tmpDir = new Path(reduceId.toString()); final RawComparator<K> comparator = (RawComparator<K>)job.getOutputKeyComparator();
if (inMemoryMapOutputs.size() > 0) { ... ... if (numMemDiskSegments > 0 && ioSortFactor > onDiskMapOutputs.size()) { ... ... final Path outputPath = mapOutputFile.getInputFileForWrite(mapId, inMemToDiskBytes).suffix( Task.MERGED_OUTPUT_PREFIX); final RawKeyValueIterator rIter = Merger.merge(job, fs, keyClass, valueClass, memDiskSegments, numMemDiskSegments, tmpDir, comparator, reporter, spilledRecordsCounter, null, mergePhase); ... ... } else if (inMemToDiskBytes != 0) { ... ... } }
... ... if (0 != onDiskBytes) { ... ... RawKeyValueIterator diskMerge = Merger.merge( job, fs, keyClass, valueClass, codec, diskSegments, ioSortFactor, numInMemSegments, tmpDir, comparator, reporter, false, spilledRecordsCounter, null, thisPhase); diskSegments.clear(); if (0 == finalSegments.size()) { return diskMerge; } finalSegments.add(new Segment<K,V>( new RawKVIteratorReader(diskMerge, onDiskBytes), true, rawBytes)); } return Merger.merge(job, fs, keyClass, valueClass, finalSegments, finalSegments.size(), tmpDir, comparator, reporter, spilledRecordsCounter, null, null); }
|