1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
   | public RawKeyValueIterator close() throws Throwable {   ... ...   List<InMemoryMapOutput<K, V>> memory =      new ArrayList<InMemoryMapOutput<K, V>>(inMemoryMergedMapOutputs);   inMemoryMergedMapOutputs.clear();   memory.addAll(inMemoryMapOutputs);   inMemoryMapOutputs.clear();   List<CompressAwarePath> disk = new ArrayList<CompressAwarePath>(onDiskMapOutputs);   onDiskMapOutputs.clear();   return finalMerge(jobConf, rfs, memory, disk); }
  private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs,                                      List<InMemoryMapOutput<K,V>> inMemoryMapOutputs,                                      List<CompressAwarePath> onDiskMapOutputs                                      ) throws IOException {   ... ...   final Path tmpDir = new Path(reduceId.toString());   final RawComparator<K> comparator =     (RawComparator<K>)job.getOutputKeyComparator();
       if (inMemoryMapOutputs.size() > 0) {     ... ...     if (numMemDiskSegments > 0 && ioSortFactor > onDiskMapOutputs.size()) {       ... ...       final Path outputPath =          mapOutputFile.getInputFileForWrite(mapId,                                            inMemToDiskBytes).suffix(                                                Task.MERGED_OUTPUT_PREFIX);       final RawKeyValueIterator rIter = Merger.merge(job, fs,           keyClass, valueClass, memDiskSegments, numMemDiskSegments,           tmpDir, comparator, reporter, spilledRecordsCounter, null,            mergePhase);       ... ...     } else if (inMemToDiskBytes != 0) {       ... ...     }   }
       ... ...         if (0 != onDiskBytes) {     ... ...     RawKeyValueIterator diskMerge = Merger.merge(         job, fs, keyClass, valueClass, codec, diskSegments,         ioSortFactor, numInMemSegments, tmpDir, comparator,         reporter, false, spilledRecordsCounter, null, thisPhase);     diskSegments.clear();     if (0 == finalSegments.size()) {       return diskMerge;     }     finalSegments.add(new Segment<K,V>(           new RawKVIteratorReader(diskMerge, onDiskBytes), true, rawBytes));   }   return Merger.merge(job, fs, keyClass, valueClass,                finalSegments, finalSegments.size(), tmpDir,                comparator, reporter, spilledRecordsCounter, null,                null); }
  |